diff --git a/README.md b/README.md index e6f849d..b837dc1 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,8 @@ Qwen OAuth + OpenAI-compatible endpoint - Telegram polling без внешних библиотек - JSON-хранилище сессий - API списка и просмотра сессий +- автоматический polling OAuth flow в боте +- очередь сообщений, пришедших до завершения OAuth ## Ограничения текущей реализации @@ -71,7 +73,9 @@ python3 bot/app.py 1. Отправить боту `/auth` 2. Открыть ссылку подтверждения -3. После завершения авторизации писать обычные сообщения боту +3. Бот сам дождётся завершения OAuth и продолжит работу + +Если пользователь отправит обычное сообщение до завершения OAuth, бот поставит его в очередь и автоматически отправит на сервер после успешной авторизации. Либо можно вызвать API сервера напрямую: diff --git a/bot/app.py b/bot/app.py index 3aebd45..0ed8eb9 100644 --- a/bot/app.py +++ b/bot/app.py @@ -4,7 +4,7 @@ import json import time from pathlib import Path from typing import Any -from urllib import request +from urllib import error, request from config import BotConfig from telegram_api import TelegramAPI @@ -16,7 +16,10 @@ STATE_FILE = Path(__file__).resolve().parent.parent / ".new-qwen" / "telegram-st def load_state() -> dict[str, Any]: if not STATE_FILE.exists(): return {"offset": None, "sessions": {}, "auth_flows": {}} - return json.loads(STATE_FILE.read_text(encoding="utf-8")) + state = json.loads(STATE_FILE.read_text(encoding="utf-8")) + state.setdefault("sessions", {}) + state.setdefault("auth_flows", {}) + return state def save_state(state: dict[str, Any]) -> None: @@ -48,21 +51,181 @@ def send_text_chunks(api: TelegramAPI, chat_id: int, text: str) -> None: api.send_message(chat_id, normalized[start : start + chunk_size]) +def get_auth_flow(state: dict[str, Any], chat_id: int) -> dict[str, Any] | None: + return state.setdefault("auth_flows", {}).get(str(chat_id)) + + +def start_auth_flow( + api: TelegramAPI, + config: BotConfig, + state: dict[str, Any], + chat_id: int, + *, + force_new: bool = False, +) -> dict[str, Any]: + existing = get_auth_flow(state, chat_id) + now = time.time() + if existing and not force_new and existing.get("expires_at", 0) > now: + send_text_chunks( + api, + chat_id, + "Авторизация Qwen OAuth ещё не завершена.\n" + f"Открой ссылку:\n{existing['verification_uri_complete']}\n\n" + "Бот сам проверит завершение и продолжит работу.", + ) + return existing + + started = post_json(f"{config.server_url}/api/v1/auth/device/start", {}) + flow = { + "flow_id": started["flow_id"], + "user_code": started.get("user_code"), + "verification_uri": started.get("verification_uri"), + "verification_uri_complete": started["verification_uri_complete"], + "expires_at": started["expires_at"], + "interval_seconds": started.get("interval_seconds", 2), + "next_poll_at": now + started.get("interval_seconds", 2), + "pending_messages": existing.get("pending_messages", []) if existing else [], + } + state.setdefault("auth_flows", {})[str(chat_id)] = flow + send_text_chunks( + api, + chat_id, + "Нужна авторизация Qwen OAuth.\n" + f"Открой ссылку:\n{flow['verification_uri_complete']}\n\n" + f"user_code: {flow.get('user_code')}\n" + "После подтверждения бот сам продолжит работу.", + ) + return flow + + def ensure_auth(api: TelegramAPI, config: BotConfig, state: dict[str, Any], chat_id: int) -> bool: status = get_json(f"{config.server_url}/api/v1/auth/status") if status.get("authenticated"): return True - started = post_json(f"{config.server_url}/api/v1/auth/device/start", {}) - state.setdefault("auth_flows", {})[str(chat_id)] = started["flow_id"] - api.send_message( - chat_id, - "Qwen OAuth не настроен.\n" - f"Откройте ссылку:\n{started['verification_uri_complete']}\n\n" - f"Потом отправьте /auth_check {started['flow_id']}", - ) + start_auth_flow(api, config, state, chat_id) return False +def enqueue_pending_message( + state: dict[str, Any], + chat_id: int, + user_id: str, + session_key: str, + text: str, +) -> None: + flow = get_auth_flow(state, chat_id) + if not flow: + return + pending_messages = flow.setdefault("pending_messages", []) + pending_messages.append( + { + "user_id": user_id, + "session_key": session_key, + "text": text, + "created_at": int(time.time()), + } + ) + + +def deliver_chat_message( + api: TelegramAPI, + config: BotConfig, + state: dict[str, Any], + chat_id: int, + user_id: str, + session_key: str, + text: str, + *, + delayed: bool = False, +) -> None: + session_id = state.setdefault("sessions", {}).get(session_key) + prefix = "Обрабатываю отложенный запрос..." if delayed else "Обрабатываю запрос..." + api.send_message(chat_id, prefix) + result = post_json( + f"{config.server_url}/api/v1/chat", + { + "session_id": session_id, + "user_id": user_id, + "message": text, + }, + ) + state["sessions"][session_key] = result["session_id"] + answer = result.get("answer") or "Пустой ответ от модели." + send_text_chunks(api, chat_id, answer) + + +def poll_auth_flow( + api: TelegramAPI, + config: BotConfig, + state: dict[str, Any], + chat_id: int, + *, + force: bool = False, +) -> bool: + flow = get_auth_flow(state, chat_id) + if not flow: + return False + now = time.time() + if flow.get("expires_at", 0) <= now: + state["auth_flows"].pop(str(chat_id), None) + api.send_message(chat_id, "OAuth flow истёк. Запусти /auth ещё раз.") + return False + if not force and now < flow.get("next_poll_at", 0): + return False + + try: + result = post_json( + f"{config.server_url}/api/v1/auth/device/poll", + {"flow_id": flow["flow_id"]}, + ) + except error.HTTPError as exc: + body = exc.read().decode("utf-8", errors="replace") + state["auth_flows"].pop(str(chat_id), None) + send_text_chunks( + api, + chat_id, + "Не удалось завершить OAuth flow на сервере.\n" + f"Ответ сервера: {body}\n" + "Запусти /auth ещё раз.", + ) + return False + + if not result.get("done"): + interval = result.get("interval_seconds", flow.get("interval_seconds", 2)) + flow["interval_seconds"] = interval + flow["next_poll_at"] = now + interval + return False + + state["auth_flows"].pop(str(chat_id), None) + api.send_message(chat_id, "Qwen OAuth успешно настроен.") + for item in flow.get("pending_messages", []): + deliver_chat_message( + api, + config, + state, + chat_id, + item["user_id"], + item["session_key"], + item["text"], + delayed=True, + ) + if not flow.get("pending_messages"): + api.send_message(chat_id, "Можно отправлять обычные сообщения.") + return True + + +def process_auth_flows( + api: TelegramAPI, + config: BotConfig, + state: dict[str, Any], +) -> None: + for chat_id_raw in list(state.setdefault("auth_flows", {}).keys()): + try: + poll_auth_flow(api, config, state, int(chat_id_raw), force=False) + except Exception as exc: + print(f"auth flow poll error for chat {chat_id_raw}: {exc}") + + def handle_message(api: TelegramAPI, config: BotConfig, state: dict[str, Any], message: dict[str, Any]) -> None: chat_id = message["chat"]["id"] user_id = str(message.get("from", {}).get("id", chat_id)) @@ -95,30 +258,20 @@ def handle_message(api: TelegramAPI, config: BotConfig, state: dict[str, Any], m return if text == "/auth": - started = post_json(f"{config.server_url}/api/v1/auth/device/start", {}) - state["auth_flows"][str(chat_id)] = started["flow_id"] - api.send_message( - chat_id, - "Откройте ссылку для авторизации Qwen OAuth:\n" - f"{started['verification_uri_complete']}\n\n" - f"После подтверждения отправьте /auth_check {started['flow_id']}", - ) + start_auth_flow(api, config, state, chat_id, force_new=True) return if text.startswith("/auth_check"): parts = text.split(maxsplit=1) - flow_id = parts[1] if len(parts) == 2 else state["auth_flows"].get(str(chat_id)) - if not flow_id: + if len(parts) == 2: + flow = get_auth_flow(state, chat_id) + if flow: + flow["flow_id"] = parts[1] + flow = get_auth_flow(state, chat_id) + if not flow: api.send_message(chat_id, "Нет активного flow_id. Сначала вызови /auth.") return - result = post_json( - f"{config.server_url}/api/v1/auth/device/poll", - {"flow_id": flow_id}, - ) - if result.get("done"): - state["auth_flows"].pop(str(chat_id), None) - api.send_message(chat_id, "Qwen OAuth успешно настроен.") - else: + if not poll_auth_flow(api, config, state, chat_id, force=True): api.send_message(chat_id, "Авторизация ещё не завершена. Повторите команду через пару секунд.") return @@ -161,20 +314,11 @@ def handle_message(api: TelegramAPI, config: BotConfig, state: dict[str, Any], m return if not ensure_auth(api, config, state, chat_id): + enqueue_pending_message(state, chat_id, user_id, session_key, text) + api.send_message(chat_id, "Сообщение поставлено в очередь до завершения авторизации.") return - api.send_message(chat_id, "Обрабатываю запрос...") - result = post_json( - f"{config.server_url}/api/v1/chat", - { - "session_id": session_id, - "user_id": user_id, - "message": text, - }, - ) - state["sessions"][session_key] = result["session_id"] - answer = result.get("answer") or "Пустой ответ от модели." - send_text_chunks(api, chat_id, answer) + deliver_chat_message(api, config, state, chat_id, user_id, session_key, text) def main() -> None: @@ -184,6 +328,7 @@ def main() -> None: print("new-qwen bot polling started") while True: try: + process_auth_flows(api, config, state) updates = api.get_updates(state.get("offset"), config.poll_timeout) for update in updates: state["offset"] = update["update_id"] + 1 @@ -191,6 +336,7 @@ def main() -> None: if message: handle_message(api, config, state, message) save_state(state) + save_state(state) except Exception as exc: print(f"bot loop error: {exc}") time.sleep(3)