Serialize chat jobs and add cancel flow

This commit is contained in:
mirivlad 2026-04-08 11:48:21 +08:00
parent 85ba029133
commit 84a6e8b5d8
7 changed files with 338 additions and 4 deletions

View File

@ -46,6 +46,16 @@ Qwen OAuth + OpenAI-compatible endpoint
- пока нет MCP, skill system, subagents и rich-streaming UI
- Telegram-бот работает через long polling
## Roadmap
Текущий список работ вынесен в [TODO.md](./TODO.md).
Ближайший фокус:
- сериализация jobs на один Telegram chat
- cancel flow между `bot` и `serv`
- выравнивание approval semantics для всех chat API
## Переменные окружения
Сервер:
@ -117,6 +127,7 @@ curl -X POST http://127.0.0.1:8080/api/v1/auth/device/start
- `POST /api/v1/chat`
- `POST /api/v1/chat/start`
- `POST /api/v1/chat/poll`
- `POST /api/v1/chat/cancel`
- `POST /api/v1/approval/respond`
## Telegram Approval Flow
@ -127,3 +138,7 @@ curl -X POST http://127.0.0.1:8080/api/v1/auth/device/start
- `/approve <approval_id>`
- `/reject <approval_id>`
Управление job:
- `/cancel` - отменить активный запрос и очистить очередь сообщений в чате

18
TODO.md Normal file
View File

@ -0,0 +1,18 @@
# TODO
## Current Focus
- [x] Сериализовать jobs на один Telegram chat
- [x] Добавить `/cancel` и server-side cancel API
- [ ] Привести `/api/v1/chat` к тем же approval semantics, что и job API
- [ ] Довести approvals до более строгой модели на уровне job/session
## Next
- [ ] Добавить headless browser session layer
- [ ] Добавить HTTP tools: `http_request`, `download_file`
- [ ] Добавить safe git tools: `git_add`, `git_commit`, `git_log`, `git_branch_create`
- [ ] Добавить structured edit tools: `insert_before`, `insert_after`, `replace_block`, `json_edit`, `yaml_edit`
- [ ] Сделать richer event protocol для progress/patch preview
- [ ] Добавить cleanup/retention для session history
- [ ] Добавить auth/token между `bot` и `serv`

View File

@ -20,12 +20,16 @@ def load_state() -> dict[str, Any]:
"sessions": {},
"auth_flows": {},
"active_jobs": {},
"chat_active_jobs": {},
"chat_queues": {},
"pending_approvals": {},
}
state = json.loads(STATE_FILE.read_text(encoding="utf-8"))
state.setdefault("sessions", {})
state.setdefault("auth_flows", {})
state.setdefault("active_jobs", {})
state.setdefault("chat_active_jobs", {})
state.setdefault("chat_queues", {})
state.setdefault("pending_approvals", {})
return state
@ -202,6 +206,93 @@ def start_chat_job(
"seen_seq": 0,
"sent_statuses": [],
}
state.setdefault("chat_active_jobs", {})[str(chat_id)] = start_result["job_id"]
def enqueue_chat_message(
state: dict[str, Any],
chat_id: int,
user_id: str,
session_key: str,
text: str,
*,
delayed: bool = False,
) -> int:
queue = state.setdefault("chat_queues", {}).setdefault(str(chat_id), [])
queue.append(
{
"user_id": user_id,
"session_key": session_key,
"text": text,
"delayed": delayed,
"created_at": int(time.time()),
}
)
return len(queue)
def start_or_queue_chat_job(
api: TelegramAPI,
config: BotConfig,
state: dict[str, Any],
chat_id: int,
user_id: str,
session_key: str,
text: str,
*,
delayed: bool = False,
) -> None:
active_job_id = state.setdefault("chat_active_jobs", {}).get(str(chat_id))
if active_job_id:
queue_size = enqueue_chat_message(
state,
chat_id,
user_id,
session_key,
text,
delayed=delayed,
)
api.send_message(
chat_id,
f"В этом чате уже есть активный запрос. Сообщение поставлено в очередь: {queue_size}.",
)
return
start_chat_job(
api,
config,
state,
chat_id,
user_id,
session_key,
text,
delayed=delayed,
)
def start_next_queued_job(
api: TelegramAPI,
config: BotConfig,
state: dict[str, Any],
chat_id: int,
) -> None:
if state.setdefault("chat_active_jobs", {}).get(str(chat_id)):
return
queue = state.setdefault("chat_queues", {}).get(str(chat_id)) or []
if not queue:
return
next_item = queue.pop(0)
if not queue:
state["chat_queues"].pop(str(chat_id), None)
start_chat_job(
api,
config,
state,
chat_id,
next_item["user_id"],
next_item["session_key"],
next_item["text"],
delayed=bool(next_item.get("delayed")),
)
def poll_auth_flow(
@ -249,7 +340,7 @@ def poll_auth_flow(
state["auth_flows"].pop(str(chat_id), None)
api.send_message(chat_id, "Qwen OAuth успешно настроен.")
for item in flow.get("pending_messages", []):
start_chat_job(
start_or_queue_chat_job(
api,
config,
state,
@ -319,6 +410,11 @@ def process_active_jobs(
poll_result.get("answer") or "Пустой ответ от модели.",
)
active_jobs.pop(job_id, None)
state.setdefault("chat_active_jobs", {}).pop(str(job_state["chat_id"]), None)
pending = pending_approvals.get(str(job_state["chat_id"]))
if pending and pending.get("job_id") == job_id:
pending_approvals.pop(str(job_state["chat_id"]), None)
start_next_queued_job(api, config, state, int(job_state["chat_id"]))
elif status == "failed":
send_text_chunks(
api,
@ -326,6 +422,53 @@ def process_active_jobs(
f"Job завершился с ошибкой: {poll_result.get('error')}",
)
active_jobs.pop(job_id, None)
state.setdefault("chat_active_jobs", {}).pop(str(job_state["chat_id"]), None)
pending = pending_approvals.get(str(job_state["chat_id"]))
if pending and pending.get("job_id") == job_id:
pending_approvals.pop(str(job_state["chat_id"]), None)
start_next_queued_job(api, config, state, int(job_state["chat_id"]))
elif status == "canceled":
send_text_chunks(
api,
int(job_state["chat_id"]),
f"Job отменён: {poll_result.get('error') or 'Canceled by operator'}",
)
active_jobs.pop(job_id, None)
state.setdefault("chat_active_jobs", {}).pop(str(job_state["chat_id"]), None)
pending = pending_approvals.get(str(job_state["chat_id"]))
if pending and pending.get("job_id") == job_id:
pending_approvals.pop(str(job_state["chat_id"]), None)
start_next_queued_job(api, config, state, int(job_state["chat_id"]))
def cancel_chat_work(
api: TelegramAPI,
config: BotConfig,
state: dict[str, Any],
chat_id: int,
actor: str,
*,
clear_queue: bool,
) -> bool:
canceled = False
active_job_id = state.setdefault("chat_active_jobs", {}).get(str(chat_id))
if active_job_id:
post_json(
f"{config.server_url}/api/v1/chat/cancel",
{
"job_id": active_job_id,
"actor": actor,
"reason": "Canceled from Telegram bot",
},
)
canceled = True
if clear_queue:
queue = state.setdefault("chat_queues", {}).pop(str(chat_id), [])
canceled = canceled or bool(queue)
pending = state.setdefault("pending_approvals", {}).get(str(chat_id))
if pending and pending.get("job_id") == active_job_id:
state["pending_approvals"].pop(str(chat_id), None)
return canceled
def handle_message(api: TelegramAPI, config: BotConfig, state: dict[str, Any], message: dict[str, Any]) -> None:
@ -343,7 +486,7 @@ def handle_message(api: TelegramAPI, config: BotConfig, state: dict[str, Any], m
if text == "/start":
api.send_message(
chat_id,
"new-qwen bot готов.\nКоманды: /help, /auth, /status, /session, /clear, /approve, /reject.",
"new-qwen bot готов.\nКоманды: /help, /auth, /status, /session, /cancel, /clear, /approve, /reject.",
)
return
@ -355,6 +498,7 @@ def handle_message(api: TelegramAPI, config: BotConfig, state: dict[str, Any], m
"/auth_check [flow_id] - проверить авторизацию\n"
"/status - статус OAuth и сервера\n"
"/session - показать текущую сессию\n"
"/cancel - отменить активный запрос и очистить очередь\n"
"/approve [approval_id] - подтвердить инструмент\n"
"/reject [approval_id] - отклонить инструмент\n"
"/clear - очистить контекст",
@ -404,6 +548,8 @@ def handle_message(api: TelegramAPI, config: BotConfig, state: dict[str, Any], m
if text == "/status":
status = get_json(f"{config.server_url}/api/v1/auth/status")
queue_size = len(state.setdefault("chat_queues", {}).get(str(chat_id), []))
active_job = state.setdefault("chat_active_jobs", {}).get(str(chat_id))
send_text_chunks(
api,
chat_id,
@ -412,10 +558,27 @@ def handle_message(api: TelegramAPI, config: BotConfig, state: dict[str, Any], m
f"resource_url: {status.get('resource_url')}\n"
f"expires_at: {status.get('expires_at')}\n"
f"tool_policy: {status.get('tool_policy')}\n"
f"pending_approvals: {status.get('pending_approvals')}",
f"pending_approvals: {status.get('pending_approvals')}\n"
f"active_job: {active_job}\n"
f"queued_messages: {queue_size}",
)
return
if text == "/cancel":
canceled = cancel_chat_work(
api,
config,
state,
chat_id,
user_id,
clear_queue=True,
)
if canceled:
api.send_message(chat_id, "Активный job отменён, очередь чата очищена.")
else:
api.send_message(chat_id, "Для этого чата нет активных или queued jobs.")
return
if text == "/session":
if not session_id:
api.send_message(chat_id, "У этого чата ещё нет активной сессии.")
@ -436,6 +599,14 @@ def handle_message(api: TelegramAPI, config: BotConfig, state: dict[str, Any], m
return
if text == "/clear":
cancel_chat_work(
api,
config,
state,
chat_id,
user_id,
clear_queue=True,
)
if session_id:
post_json(f"{config.server_url}/api/v1/session/clear", {"session_id": session_id})
state["sessions"].pop(session_key, None)
@ -447,7 +618,7 @@ def handle_message(api: TelegramAPI, config: BotConfig, state: dict[str, Any], m
api.send_message(chat_id, "Сообщение поставлено в очередь до завершения авторизации.")
return
start_chat_job(api, config, state, chat_id, user_id, session_key, text)
start_or_queue_chat_job(api, config, state, chat_id, user_id, session_key, text)
def main() -> None:

View File

@ -156,6 +156,14 @@ class RequestHandler(BaseHTTPRequestHandler):
def _run_chat_job(self, job_id: str, session_id: str, user_id: str, message: str) -> None:
try:
if self.app.jobs.is_cancel_requested(job_id):
reason = "Job canceled before execution started"
self.app.jobs.append_event(
job_id,
{"type": "job_status", "message": reason},
)
self.app.jobs.mark_canceled(job_id, reason)
return
self.app.jobs.set_status(job_id, "running")
self.app.jobs.append_event(
job_id,
@ -172,7 +180,16 @@ class RequestHandler(BaseHTTPRequestHandler):
tool_name,
arguments,
),
is_cancelled=lambda: self.app.jobs.is_cancel_requested(job_id),
)
if self.app.jobs.is_cancel_requested(job_id):
reason = "Job canceled by operator"
self.app.jobs.append_event(
job_id,
{"type": "job_status", "message": reason},
)
self.app.jobs.mark_canceled(job_id, reason)
return
persisted_messages = result["messages"][1:]
self.app.sessions.save(
session_id,
@ -194,6 +211,14 @@ class RequestHandler(BaseHTTPRequestHandler):
usage=result.get("usage"),
)
except Exception as exc:
if self.app.jobs.is_cancel_requested(job_id):
reason = "Job canceled by operator"
self.app.jobs.append_event(
job_id,
{"type": "job_status", "message": reason},
)
self.app.jobs.mark_canceled(job_id, reason)
return
self.app.jobs.append_event(
job_id,
{"type": "error", "message": str(exc)},
@ -225,6 +250,8 @@ class RequestHandler(BaseHTTPRequestHandler):
approval["approval_id"],
timeout_seconds=float(self.app.config.approval_timeout_seconds),
)
if self.app.jobs.is_cancel_requested(job_id):
return decision
self.app.jobs.set_status(job_id, "running")
return decision
@ -344,6 +371,35 @@ class RequestHandler(BaseHTTPRequestHandler):
)
return
if self.path == "/api/v1/chat/cancel":
body = self._json_body()
job_id = body["job_id"]
actor = str(body.get("actor") or "unknown")
reason = str(body.get("reason") or "Canceled by operator")
job = self.app.jobs.request_cancel(
job_id,
actor=actor,
reason=reason,
)
self.app.approvals.reject_pending_for_job(
job_id,
actor=actor,
reason=reason,
)
self.app.jobs.append_event(
job_id,
{"type": "job_status", "message": reason},
)
self._send(
HTTPStatus.OK,
{
"job_id": job_id,
"status": job.get("status"),
"cancel_requested": job.get("cancel_requested"),
},
)
return
if self.path == "/api/v1/approval/respond":
body = self._json_body()
approval_id = body["approval_id"]

View File

@ -138,6 +138,29 @@ class ApprovalStore:
condition.wait(timeout=remaining)
return approval.copy()
def reject_pending_for_job(
self,
job_id: str,
*,
actor: str,
reason: str,
) -> list[dict[str, Any]]:
rejected: list[dict[str, Any]] = []
with self._lock:
for approval_id, approval in self._approvals.items():
if approval.get("job_id") != job_id or approval.get("status") != "pending":
continue
approval["status"] = "rejected"
approval["actor"] = actor
approval["reason"] = reason
approval["updated_at"] = time.time()
self._save(approval)
condition = self._conditions.get(approval_id)
if condition:
condition.notify_all()
rejected.append(approval.copy())
return rejected
def cleanup(self) -> None:
now = time.time()
with self._lock:

View File

@ -65,6 +65,9 @@ class JobStore:
"answer": None,
"usage": None,
"error": None,
"cancel_requested": False,
"cancel_actor": None,
"cancel_reason": None,
}
with self._lock:
self._jobs[job_id] = job
@ -96,6 +99,42 @@ class JobStore:
job["updated_at"] = time.time()
self._save_job(job)
def request_cancel(
self,
job_id: str,
*,
actor: str,
reason: str,
) -> dict[str, Any]:
with self._lock:
job = self._jobs[job_id]
if job["status"] in {"completed", "failed", "canceled"}:
return job.copy()
job["cancel_requested"] = True
job["cancel_actor"] = actor
job["cancel_reason"] = reason
if job["status"] in {"queued", "waiting_approval"}:
job["status"] = "canceled"
job["error"] = reason
elif job["status"] == "running":
job["status"] = "canceling"
job["updated_at"] = time.time()
self._save_job(job)
return job.copy()
def is_cancel_requested(self, job_id: str) -> bool:
with self._lock:
job = self._jobs.get(job_id)
return bool(job and job.get("cancel_requested"))
def mark_canceled(self, job_id: str, reason: str) -> None:
with self._lock:
job = self._jobs[job_id]
job["status"] = "canceled"
job["error"] = reason
job["updated_at"] = time.time()
self._save_job(job)
def finish(
self,
job_id: str,

View File

@ -55,8 +55,15 @@ class QwenAgent:
user_message: str,
on_event: Callable[[dict[str, Any]], None] | None = None,
approval_callback: Callable[[str, dict[str, Any]], dict[str, Any]] | None = None,
is_cancelled: Callable[[], bool] | None = None,
) -> dict[str, Any]:
emit = on_event or (lambda _event: None)
cancel_check = is_cancelled or (lambda: False)
def ensure_not_cancelled() -> None:
if cancel_check():
raise ToolError("Job canceled by operator")
system_prompt = self.config.system_prompt or DEFAULT_SYSTEM_PROMPT
messages: list[dict[str, Any]] = [{"role": "system", "content": system_prompt}]
messages.extend(history)
@ -64,8 +71,10 @@ class QwenAgent:
events: list[dict[str, Any]] = []
for _ in range(self.config.max_tool_rounds):
ensure_not_cancelled()
emit({"type": "model_request", "message": "Запрашиваю ответ модели"})
response = self._request_completion(messages)
ensure_not_cancelled()
choice = response["choices"][0]["message"]
tool_calls = choice.get("tool_calls") or []
content = choice.get("content")
@ -91,6 +100,7 @@ class QwenAgent:
)
for call in tool_calls:
ensure_not_cancelled()
tool_name = call["function"]["name"]
try:
arguments = json.loads(call["function"]["arguments"] or "{}")
@ -111,6 +121,7 @@ class QwenAgent:
}
events.append(approval_event)
emit(approval_event)
ensure_not_cancelled()
if approval_result["status"] != "approved":
result = {"error": f"Tool '{tool_name}' was rejected by operator"}
tool_result_event = {"type": "tool_result", "name": tool_name, "result": result}
@ -125,6 +136,7 @@ class QwenAgent:
)
continue
try:
ensure_not_cancelled()
result = self.tools.execute(tool_name, arguments)
except ToolError as exc:
result = {"error": str(exc)}