diff --git a/README.md b/README.md index 15eb4fb..4a55548 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,16 @@ Qwen OAuth + OpenAI-compatible endpoint - пока нет MCP, skill system, subagents и rich-streaming UI - Telegram-бот работает через long polling +## Roadmap + +Текущий список работ вынесен в [TODO.md](./TODO.md). + +Ближайший фокус: + +- сериализация jobs на один Telegram chat +- cancel flow между `bot` и `serv` +- выравнивание approval semantics для всех chat API + ## Переменные окружения Сервер: @@ -117,6 +127,7 @@ curl -X POST http://127.0.0.1:8080/api/v1/auth/device/start - `POST /api/v1/chat` - `POST /api/v1/chat/start` - `POST /api/v1/chat/poll` +- `POST /api/v1/chat/cancel` - `POST /api/v1/approval/respond` ## Telegram Approval Flow @@ -127,3 +138,7 @@ curl -X POST http://127.0.0.1:8080/api/v1/auth/device/start - `/approve ` - `/reject ` + +Управление job: + +- `/cancel` - отменить активный запрос и очистить очередь сообщений в чате diff --git a/TODO.md b/TODO.md new file mode 100644 index 0000000..c96efb6 --- /dev/null +++ b/TODO.md @@ -0,0 +1,18 @@ +# TODO + +## Current Focus + +- [x] Сериализовать jobs на один Telegram chat +- [x] Добавить `/cancel` и server-side cancel API +- [ ] Привести `/api/v1/chat` к тем же approval semantics, что и job API +- [ ] Довести approvals до более строгой модели на уровне job/session + +## Next + +- [ ] Добавить headless browser session layer +- [ ] Добавить HTTP tools: `http_request`, `download_file` +- [ ] Добавить safe git tools: `git_add`, `git_commit`, `git_log`, `git_branch_create` +- [ ] Добавить structured edit tools: `insert_before`, `insert_after`, `replace_block`, `json_edit`, `yaml_edit` +- [ ] Сделать richer event protocol для progress/patch preview +- [ ] Добавить cleanup/retention для session history +- [ ] Добавить auth/token между `bot` и `serv` diff --git a/bot/app.py b/bot/app.py index 538497f..097ac0b 100644 --- a/bot/app.py +++ b/bot/app.py @@ -20,12 +20,16 @@ def load_state() -> dict[str, Any]: "sessions": {}, "auth_flows": {}, "active_jobs": {}, + "chat_active_jobs": {}, + "chat_queues": {}, "pending_approvals": {}, } state = json.loads(STATE_FILE.read_text(encoding="utf-8")) state.setdefault("sessions", {}) state.setdefault("auth_flows", {}) state.setdefault("active_jobs", {}) + state.setdefault("chat_active_jobs", {}) + state.setdefault("chat_queues", {}) state.setdefault("pending_approvals", {}) return state @@ -202,6 +206,93 @@ def start_chat_job( "seen_seq": 0, "sent_statuses": [], } + state.setdefault("chat_active_jobs", {})[str(chat_id)] = start_result["job_id"] + + +def enqueue_chat_message( + state: dict[str, Any], + chat_id: int, + user_id: str, + session_key: str, + text: str, + *, + delayed: bool = False, +) -> int: + queue = state.setdefault("chat_queues", {}).setdefault(str(chat_id), []) + queue.append( + { + "user_id": user_id, + "session_key": session_key, + "text": text, + "delayed": delayed, + "created_at": int(time.time()), + } + ) + return len(queue) + + +def start_or_queue_chat_job( + api: TelegramAPI, + config: BotConfig, + state: dict[str, Any], + chat_id: int, + user_id: str, + session_key: str, + text: str, + *, + delayed: bool = False, +) -> None: + active_job_id = state.setdefault("chat_active_jobs", {}).get(str(chat_id)) + if active_job_id: + queue_size = enqueue_chat_message( + state, + chat_id, + user_id, + session_key, + text, + delayed=delayed, + ) + api.send_message( + chat_id, + f"В этом чате уже есть активный запрос. Сообщение поставлено в очередь: {queue_size}.", + ) + return + start_chat_job( + api, + config, + state, + chat_id, + user_id, + session_key, + text, + delayed=delayed, + ) + + +def start_next_queued_job( + api: TelegramAPI, + config: BotConfig, + state: dict[str, Any], + chat_id: int, +) -> None: + if state.setdefault("chat_active_jobs", {}).get(str(chat_id)): + return + queue = state.setdefault("chat_queues", {}).get(str(chat_id)) or [] + if not queue: + return + next_item = queue.pop(0) + if not queue: + state["chat_queues"].pop(str(chat_id), None) + start_chat_job( + api, + config, + state, + chat_id, + next_item["user_id"], + next_item["session_key"], + next_item["text"], + delayed=bool(next_item.get("delayed")), + ) def poll_auth_flow( @@ -249,7 +340,7 @@ def poll_auth_flow( state["auth_flows"].pop(str(chat_id), None) api.send_message(chat_id, "Qwen OAuth успешно настроен.") for item in flow.get("pending_messages", []): - start_chat_job( + start_or_queue_chat_job( api, config, state, @@ -319,6 +410,11 @@ def process_active_jobs( poll_result.get("answer") or "Пустой ответ от модели.", ) active_jobs.pop(job_id, None) + state.setdefault("chat_active_jobs", {}).pop(str(job_state["chat_id"]), None) + pending = pending_approvals.get(str(job_state["chat_id"])) + if pending and pending.get("job_id") == job_id: + pending_approvals.pop(str(job_state["chat_id"]), None) + start_next_queued_job(api, config, state, int(job_state["chat_id"])) elif status == "failed": send_text_chunks( api, @@ -326,6 +422,53 @@ def process_active_jobs( f"Job завершился с ошибкой: {poll_result.get('error')}", ) active_jobs.pop(job_id, None) + state.setdefault("chat_active_jobs", {}).pop(str(job_state["chat_id"]), None) + pending = pending_approvals.get(str(job_state["chat_id"])) + if pending and pending.get("job_id") == job_id: + pending_approvals.pop(str(job_state["chat_id"]), None) + start_next_queued_job(api, config, state, int(job_state["chat_id"])) + elif status == "canceled": + send_text_chunks( + api, + int(job_state["chat_id"]), + f"Job отменён: {poll_result.get('error') or 'Canceled by operator'}", + ) + active_jobs.pop(job_id, None) + state.setdefault("chat_active_jobs", {}).pop(str(job_state["chat_id"]), None) + pending = pending_approvals.get(str(job_state["chat_id"])) + if pending and pending.get("job_id") == job_id: + pending_approvals.pop(str(job_state["chat_id"]), None) + start_next_queued_job(api, config, state, int(job_state["chat_id"])) + + +def cancel_chat_work( + api: TelegramAPI, + config: BotConfig, + state: dict[str, Any], + chat_id: int, + actor: str, + *, + clear_queue: bool, +) -> bool: + canceled = False + active_job_id = state.setdefault("chat_active_jobs", {}).get(str(chat_id)) + if active_job_id: + post_json( + f"{config.server_url}/api/v1/chat/cancel", + { + "job_id": active_job_id, + "actor": actor, + "reason": "Canceled from Telegram bot", + }, + ) + canceled = True + if clear_queue: + queue = state.setdefault("chat_queues", {}).pop(str(chat_id), []) + canceled = canceled or bool(queue) + pending = state.setdefault("pending_approvals", {}).get(str(chat_id)) + if pending and pending.get("job_id") == active_job_id: + state["pending_approvals"].pop(str(chat_id), None) + return canceled def handle_message(api: TelegramAPI, config: BotConfig, state: dict[str, Any], message: dict[str, Any]) -> None: @@ -343,7 +486,7 @@ def handle_message(api: TelegramAPI, config: BotConfig, state: dict[str, Any], m if text == "/start": api.send_message( chat_id, - "new-qwen bot готов.\nКоманды: /help, /auth, /status, /session, /clear, /approve, /reject.", + "new-qwen bot готов.\nКоманды: /help, /auth, /status, /session, /cancel, /clear, /approve, /reject.", ) return @@ -355,6 +498,7 @@ def handle_message(api: TelegramAPI, config: BotConfig, state: dict[str, Any], m "/auth_check [flow_id] - проверить авторизацию\n" "/status - статус OAuth и сервера\n" "/session - показать текущую сессию\n" + "/cancel - отменить активный запрос и очистить очередь\n" "/approve [approval_id] - подтвердить инструмент\n" "/reject [approval_id] - отклонить инструмент\n" "/clear - очистить контекст", @@ -404,6 +548,8 @@ def handle_message(api: TelegramAPI, config: BotConfig, state: dict[str, Any], m if text == "/status": status = get_json(f"{config.server_url}/api/v1/auth/status") + queue_size = len(state.setdefault("chat_queues", {}).get(str(chat_id), [])) + active_job = state.setdefault("chat_active_jobs", {}).get(str(chat_id)) send_text_chunks( api, chat_id, @@ -412,10 +558,27 @@ def handle_message(api: TelegramAPI, config: BotConfig, state: dict[str, Any], m f"resource_url: {status.get('resource_url')}\n" f"expires_at: {status.get('expires_at')}\n" f"tool_policy: {status.get('tool_policy')}\n" - f"pending_approvals: {status.get('pending_approvals')}", + f"pending_approvals: {status.get('pending_approvals')}\n" + f"active_job: {active_job}\n" + f"queued_messages: {queue_size}", ) return + if text == "/cancel": + canceled = cancel_chat_work( + api, + config, + state, + chat_id, + user_id, + clear_queue=True, + ) + if canceled: + api.send_message(chat_id, "Активный job отменён, очередь чата очищена.") + else: + api.send_message(chat_id, "Для этого чата нет активных или queued jobs.") + return + if text == "/session": if not session_id: api.send_message(chat_id, "У этого чата ещё нет активной сессии.") @@ -436,6 +599,14 @@ def handle_message(api: TelegramAPI, config: BotConfig, state: dict[str, Any], m return if text == "/clear": + cancel_chat_work( + api, + config, + state, + chat_id, + user_id, + clear_queue=True, + ) if session_id: post_json(f"{config.server_url}/api/v1/session/clear", {"session_id": session_id}) state["sessions"].pop(session_key, None) @@ -447,7 +618,7 @@ def handle_message(api: TelegramAPI, config: BotConfig, state: dict[str, Any], m api.send_message(chat_id, "Сообщение поставлено в очередь до завершения авторизации.") return - start_chat_job(api, config, state, chat_id, user_id, session_key, text) + start_or_queue_chat_job(api, config, state, chat_id, user_id, session_key, text) def main() -> None: diff --git a/serv/app.py b/serv/app.py index 8d3e315..cbe8788 100644 --- a/serv/app.py +++ b/serv/app.py @@ -156,6 +156,14 @@ class RequestHandler(BaseHTTPRequestHandler): def _run_chat_job(self, job_id: str, session_id: str, user_id: str, message: str) -> None: try: + if self.app.jobs.is_cancel_requested(job_id): + reason = "Job canceled before execution started" + self.app.jobs.append_event( + job_id, + {"type": "job_status", "message": reason}, + ) + self.app.jobs.mark_canceled(job_id, reason) + return self.app.jobs.set_status(job_id, "running") self.app.jobs.append_event( job_id, @@ -172,7 +180,16 @@ class RequestHandler(BaseHTTPRequestHandler): tool_name, arguments, ), + is_cancelled=lambda: self.app.jobs.is_cancel_requested(job_id), ) + if self.app.jobs.is_cancel_requested(job_id): + reason = "Job canceled by operator" + self.app.jobs.append_event( + job_id, + {"type": "job_status", "message": reason}, + ) + self.app.jobs.mark_canceled(job_id, reason) + return persisted_messages = result["messages"][1:] self.app.sessions.save( session_id, @@ -194,6 +211,14 @@ class RequestHandler(BaseHTTPRequestHandler): usage=result.get("usage"), ) except Exception as exc: + if self.app.jobs.is_cancel_requested(job_id): + reason = "Job canceled by operator" + self.app.jobs.append_event( + job_id, + {"type": "job_status", "message": reason}, + ) + self.app.jobs.mark_canceled(job_id, reason) + return self.app.jobs.append_event( job_id, {"type": "error", "message": str(exc)}, @@ -225,6 +250,8 @@ class RequestHandler(BaseHTTPRequestHandler): approval["approval_id"], timeout_seconds=float(self.app.config.approval_timeout_seconds), ) + if self.app.jobs.is_cancel_requested(job_id): + return decision self.app.jobs.set_status(job_id, "running") return decision @@ -344,6 +371,35 @@ class RequestHandler(BaseHTTPRequestHandler): ) return + if self.path == "/api/v1/chat/cancel": + body = self._json_body() + job_id = body["job_id"] + actor = str(body.get("actor") or "unknown") + reason = str(body.get("reason") or "Canceled by operator") + job = self.app.jobs.request_cancel( + job_id, + actor=actor, + reason=reason, + ) + self.app.approvals.reject_pending_for_job( + job_id, + actor=actor, + reason=reason, + ) + self.app.jobs.append_event( + job_id, + {"type": "job_status", "message": reason}, + ) + self._send( + HTTPStatus.OK, + { + "job_id": job_id, + "status": job.get("status"), + "cancel_requested": job.get("cancel_requested"), + }, + ) + return + if self.path == "/api/v1/approval/respond": body = self._json_body() approval_id = body["approval_id"] diff --git a/serv/approvals.py b/serv/approvals.py index c68af12..fc95a8b 100644 --- a/serv/approvals.py +++ b/serv/approvals.py @@ -138,6 +138,29 @@ class ApprovalStore: condition.wait(timeout=remaining) return approval.copy() + def reject_pending_for_job( + self, + job_id: str, + *, + actor: str, + reason: str, + ) -> list[dict[str, Any]]: + rejected: list[dict[str, Any]] = [] + with self._lock: + for approval_id, approval in self._approvals.items(): + if approval.get("job_id") != job_id or approval.get("status") != "pending": + continue + approval["status"] = "rejected" + approval["actor"] = actor + approval["reason"] = reason + approval["updated_at"] = time.time() + self._save(approval) + condition = self._conditions.get(approval_id) + if condition: + condition.notify_all() + rejected.append(approval.copy()) + return rejected + def cleanup(self) -> None: now = time.time() with self._lock: diff --git a/serv/jobs.py b/serv/jobs.py index cc6d616..86a8195 100644 --- a/serv/jobs.py +++ b/serv/jobs.py @@ -65,6 +65,9 @@ class JobStore: "answer": None, "usage": None, "error": None, + "cancel_requested": False, + "cancel_actor": None, + "cancel_reason": None, } with self._lock: self._jobs[job_id] = job @@ -96,6 +99,42 @@ class JobStore: job["updated_at"] = time.time() self._save_job(job) + def request_cancel( + self, + job_id: str, + *, + actor: str, + reason: str, + ) -> dict[str, Any]: + with self._lock: + job = self._jobs[job_id] + if job["status"] in {"completed", "failed", "canceled"}: + return job.copy() + job["cancel_requested"] = True + job["cancel_actor"] = actor + job["cancel_reason"] = reason + if job["status"] in {"queued", "waiting_approval"}: + job["status"] = "canceled" + job["error"] = reason + elif job["status"] == "running": + job["status"] = "canceling" + job["updated_at"] = time.time() + self._save_job(job) + return job.copy() + + def is_cancel_requested(self, job_id: str) -> bool: + with self._lock: + job = self._jobs.get(job_id) + return bool(job and job.get("cancel_requested")) + + def mark_canceled(self, job_id: str, reason: str) -> None: + with self._lock: + job = self._jobs[job_id] + job["status"] = "canceled" + job["error"] = reason + job["updated_at"] = time.time() + self._save_job(job) + def finish( self, job_id: str, diff --git a/serv/llm.py b/serv/llm.py index d3cd6eb..9a4f24f 100644 --- a/serv/llm.py +++ b/serv/llm.py @@ -55,8 +55,15 @@ class QwenAgent: user_message: str, on_event: Callable[[dict[str, Any]], None] | None = None, approval_callback: Callable[[str, dict[str, Any]], dict[str, Any]] | None = None, + is_cancelled: Callable[[], bool] | None = None, ) -> dict[str, Any]: emit = on_event or (lambda _event: None) + cancel_check = is_cancelled or (lambda: False) + + def ensure_not_cancelled() -> None: + if cancel_check(): + raise ToolError("Job canceled by operator") + system_prompt = self.config.system_prompt or DEFAULT_SYSTEM_PROMPT messages: list[dict[str, Any]] = [{"role": "system", "content": system_prompt}] messages.extend(history) @@ -64,8 +71,10 @@ class QwenAgent: events: list[dict[str, Any]] = [] for _ in range(self.config.max_tool_rounds): + ensure_not_cancelled() emit({"type": "model_request", "message": "Запрашиваю ответ модели"}) response = self._request_completion(messages) + ensure_not_cancelled() choice = response["choices"][0]["message"] tool_calls = choice.get("tool_calls") or [] content = choice.get("content") @@ -91,6 +100,7 @@ class QwenAgent: ) for call in tool_calls: + ensure_not_cancelled() tool_name = call["function"]["name"] try: arguments = json.loads(call["function"]["arguments"] or "{}") @@ -111,6 +121,7 @@ class QwenAgent: } events.append(approval_event) emit(approval_event) + ensure_not_cancelled() if approval_result["status"] != "approved": result = {"error": f"Tool '{tool_name}' was rejected by operator"} tool_result_event = {"type": "tool_result", "name": tool_name, "result": result} @@ -125,6 +136,7 @@ class QwenAgent: ) continue try: + ensure_not_cancelled() result = self.tools.execute(tool_name, arguments) except ToolError as exc: result = {"error": str(exc)}