From 59792f5d5fca90a12d08495ee3e1569082e51ab9 Mon Sep 17 00:00:00 2001 From: mirivlad Date: Tue, 7 Apr 2026 16:38:21 +0800 Subject: [PATCH] Add client-server Qwen OAuth bot skeleton --- .gitignore | 5 ++ README.md | 80 +++++++++++++++++++ bot/.env.example | 4 + bot/app.py | 142 ++++++++++++++++++++++++++++++++++ bot/config.py | 37 +++++++++ bot/telegram_api.py | 37 +++++++++ serv/.env.example | 8 ++ serv/app.py | 166 +++++++++++++++++++++++++++++++++++++++ serv/config.py | 51 ++++++++++++ serv/llm.py | 104 +++++++++++++++++++++++++ serv/oauth.py | 184 ++++++++++++++++++++++++++++++++++++++++++++ serv/sessions.py | 43 +++++++++++ serv/tools.py | 153 ++++++++++++++++++++++++++++++++++++ 13 files changed, 1014 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 bot/.env.example create mode 100644 bot/app.py create mode 100644 bot/config.py create mode 100644 bot/telegram_api.py create mode 100644 serv/.env.example create mode 100644 serv/app.py create mode 100644 serv/config.py create mode 100644 serv/llm.py create mode 100644 serv/oauth.py create mode 100644 serv/sessions.py create mode 100644 serv/tools.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2009a16 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +__pycache__/ +*.pyc +.env +.new-qwen/ + diff --git a/README.md b/README.md new file mode 100644 index 0000000..47f8278 --- /dev/null +++ b/README.md @@ -0,0 +1,80 @@ +# new-qwen + +Клиент-серверная замена локального агента `qwen-code`. + +- `serv` отвечает за OAuth, сессии, работу с Qwen LLM и вызов инструментов. +- `bot` отвечает за Telegram и пересылку сообщений на сервер. + +Проект написан на Python stdlib, чтобы не зависеть от Node/npm в текущем окружении. + +## Архитектура + +```text +Telegram User + | + v +bot/app.py + | + v +serv/app.py + | + v +Qwen OAuth + OpenAI-compatible endpoint +``` + +## Что уже реализовано + +- Qwen OAuth Device Flow, совместимый с `qwen-code` +- хранение токенов в `~/.qwen/oauth_creds.json` +- HTTP API сервера +- агентный цикл с tool calling +- базовые инструменты: `list_files`, `read_file`, `write_file`, `exec_command` +- Telegram polling без внешних библиотек +- JSON-хранилище сессий + +## Ограничения текущей реализации + +- это упрощённая серверная архитектура, а не побайтный порт всего `qwen-code` +- пока нет MCP, skill system, subagents и rich-streaming UI +- Telegram-бот работает через long polling + +## Переменные окружения + +Сервер: + +```bash +cp serv/.env.example serv/.env +``` + +Бот: + +```bash +cp bot/.env.example bot/.env +``` + +## Запуск + +Сервер: + +```bash +python3 serv/app.py +``` + +Бот: + +```bash +python3 bot/app.py +``` + +## Авторизация Qwen OAuth + +1. Отправить боту `/auth` +2. Открыть ссылку подтверждения +3. После завершения авторизации писать обычные сообщения боту + +Либо можно вызвать API сервера напрямую: + +```bash +curl -X POST http://127.0.0.1:8080/api/v1/auth/device/start +``` + diff --git a/bot/.env.example b/bot/.env.example new file mode 100644 index 0000000..086e9bb --- /dev/null +++ b/bot/.env.example @@ -0,0 +1,4 @@ +NEW_QWEN_BOT_TOKEN=0000000000:telegram-token +NEW_QWEN_SERVER_URL=http://127.0.0.1:8080 +NEW_QWEN_POLL_TIMEOUT=30 + diff --git a/bot/app.py b/bot/app.py new file mode 100644 index 0000000..4ab9705 --- /dev/null +++ b/bot/app.py @@ -0,0 +1,142 @@ +from __future__ import annotations + +import json +import time +from pathlib import Path +from typing import Any +from urllib import request + +from config import BotConfig +from telegram_api import TelegramAPI + + +STATE_FILE = Path(__file__).resolve().parent.parent / ".new-qwen" / "telegram-state.json" + + +def load_state() -> dict[str, Any]: + if not STATE_FILE.exists(): + return {"offset": None, "sessions": {}} + return json.loads(STATE_FILE.read_text(encoding="utf-8")) + + +def save_state(state: dict[str, Any]) -> None: + STATE_FILE.parent.mkdir(parents=True, exist_ok=True) + STATE_FILE.write_text(json.dumps(state, ensure_ascii=False, indent=2), encoding="utf-8") + + +def post_json(url: str, payload: dict[str, Any]) -> dict[str, Any]: + data = json.dumps(payload).encode("utf-8") + req = request.Request( + url, + data=data, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with request.urlopen(req, timeout=300) as response: + return json.loads(response.read().decode("utf-8")) + + +def get_json(url: str) -> dict[str, Any]: + with request.urlopen(url, timeout=60) as response: + return json.loads(response.read().decode("utf-8")) + + +def ensure_auth(api: TelegramAPI, config: BotConfig, chat_id: int) -> bool: + status = get_json(f"{config.server_url}/api/v1/auth/status") + if status.get("authenticated"): + return True + started = post_json(f"{config.server_url}/api/v1/auth/device/start", {}) + api.send_message( + chat_id, + "Qwen OAuth не настроен.\n" + f"Откройте ссылку:\n{started['verification_uri_complete']}\n\n" + f"Потом отправьте /auth_check {started['flow_id']}", + ) + return False + + +def handle_message(api: TelegramAPI, config: BotConfig, state: dict[str, Any], message: dict[str, Any]) -> None: + chat_id = message["chat"]["id"] + user_id = str(message.get("from", {}).get("id", chat_id)) + text = (message.get("text") or "").strip() + if not text: + api.send_message(chat_id, "Поддерживаются только текстовые сообщения.") + return + + session_key = f"{chat_id}:{user_id}" + session_id = state.setdefault("sessions", {}).get(session_key) + + if text == "/start": + api.send_message(chat_id, "new-qwen bot готов. Команды: /auth, /clear.") + return + + if text == "/auth": + started = post_json(f"{config.server_url}/api/v1/auth/device/start", {}) + api.send_message( + chat_id, + "Откройте ссылку для авторизации Qwen OAuth:\n" + f"{started['verification_uri_complete']}\n\n" + f"После подтверждения отправьте /auth_check {started['flow_id']}", + ) + return + + if text.startswith("/auth_check"): + parts = text.split(maxsplit=1) + if len(parts) != 2: + api.send_message(chat_id, "Использование: /auth_check ") + return + result = post_json( + f"{config.server_url}/api/v1/auth/device/poll", + {"flow_id": parts[1]}, + ) + if result.get("done"): + api.send_message(chat_id, "Qwen OAuth успешно настроен.") + else: + api.send_message(chat_id, "Авторизация ещё не завершена. Повторите команду через пару секунд.") + return + + if text == "/clear": + if session_id: + post_json(f"{config.server_url}/api/v1/session/clear", {"session_id": session_id}) + state["sessions"].pop(session_key, None) + api.send_message(chat_id, "Контекст сессии очищен.") + return + + if not ensure_auth(api, config, chat_id): + return + + api.send_message(chat_id, "Обрабатываю запрос...") + result = post_json( + f"{config.server_url}/api/v1/chat", + { + "session_id": session_id, + "user_id": user_id, + "message": text, + }, + ) + state["sessions"][session_key] = result["session_id"] + answer = result.get("answer") or "Пустой ответ от модели." + api.send_message(chat_id, answer[:4000]) + + +def main() -> None: + config = BotConfig.load() + api = TelegramAPI(config.token) + state = load_state() + print("new-qwen bot polling started") + while True: + try: + updates = api.get_updates(state.get("offset"), config.poll_timeout) + for update in updates: + state["offset"] = update["update_id"] + 1 + message = update.get("message") + if message: + handle_message(api, config, state, message) + save_state(state) + except Exception as exc: + print(f"bot loop error: {exc}") + time.sleep(3) + + +if __name__ == "__main__": + main() diff --git a/bot/config.py b/bot/config.py new file mode 100644 index 0000000..03bc483 --- /dev/null +++ b/bot/config.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +import os +from dataclasses import dataclass +from pathlib import Path + + +def load_env_file(path: Path) -> None: + if not path.exists(): + return + for raw_line in path.read_text(encoding="utf-8").splitlines(): + line = raw_line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, value = line.split("=", 1) + os.environ.setdefault(key.strip(), value.strip()) + + +@dataclass(slots=True) +class BotConfig: + token: str + server_url: str + poll_timeout: int + + @classmethod + def load(cls) -> "BotConfig": + base_dir = Path(__file__).resolve().parent + load_env_file(base_dir / ".env") + token = os.environ.get("NEW_QWEN_BOT_TOKEN", "").strip() + if not token: + raise RuntimeError("NEW_QWEN_BOT_TOKEN is required") + return cls( + token=token, + server_url=os.environ.get("NEW_QWEN_SERVER_URL", "http://127.0.0.1:8080").rstrip("/"), + poll_timeout=int(os.environ.get("NEW_QWEN_POLL_TIMEOUT", "30")), + ) + diff --git a/bot/telegram_api.py b/bot/telegram_api.py new file mode 100644 index 0000000..af9d3b2 --- /dev/null +++ b/bot/telegram_api.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +import json +from typing import Any +from urllib import parse, request + + +class TelegramAPI: + def __init__(self, token: str) -> None: + self.base_url = f"https://api.telegram.org/bot{token}" + + def _post(self, method: str, payload: dict[str, Any]) -> dict[str, Any]: + data = json.dumps(payload).encode("utf-8") + req = request.Request( + f"{self.base_url}/{method}", + data=data, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with request.urlopen(req, timeout=90) as response: + return json.loads(response.read().decode("utf-8")) + + def _get(self, method: str, params: dict[str, Any]) -> dict[str, Any]: + query = parse.urlencode(params) + with request.urlopen(f"{self.base_url}/{method}?{query}", timeout=90) as response: + return json.loads(response.read().decode("utf-8")) + + def get_updates(self, offset: int | None, timeout: int) -> list[dict[str, Any]]: + params: dict[str, Any] = {"timeout": timeout} + if offset is not None: + params["offset"] = offset + response = self._get("getUpdates", params) + return response.get("result", []) + + def send_message(self, chat_id: int, text: str) -> None: + self._post("sendMessage", {"chat_id": chat_id, "text": text}) + diff --git a/serv/.env.example b/serv/.env.example new file mode 100644 index 0000000..42f1eb6 --- /dev/null +++ b/serv/.env.example @@ -0,0 +1,8 @@ +NEW_QWEN_HOST=127.0.0.1 +NEW_QWEN_PORT=8080 +NEW_QWEN_MODEL=qwen3.6-plus +NEW_QWEN_WORKSPACE_ROOT=/home/mirivlad/git +NEW_QWEN_SESSION_DIR=/home/mirivlad/git/new-qwen/.new-qwen/sessions +NEW_QWEN_SYSTEM_PROMPT= +NEW_QWEN_MAX_TOOL_ROUNDS=8 + diff --git a/serv/app.py b/serv/app.py new file mode 100644 index 0000000..aded473 --- /dev/null +++ b/serv/app.py @@ -0,0 +1,166 @@ +from __future__ import annotations + +import json +import threading +import time +import uuid +from http import HTTPStatus +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from typing import Any + +from config import ServerConfig +from llm import QwenAgent +from oauth import DeviceAuthState, OAuthError, QwenOAuthManager +from sessions import SessionStore +from tools import ToolRegistry + + +class AppState: + def __init__(self, config: ServerConfig) -> None: + self.config = config + self.oauth = QwenOAuthManager() + self.sessions = SessionStore(config.session_dir) + self.tools = ToolRegistry(config.workspace_root) + self.agent = QwenAgent(config, self.oauth, self.tools) + self.pending_device_flows: dict[str, DeviceAuthState] = {} + self.lock = threading.Lock() + + def auth_status(self) -> dict[str, Any]: + creds = self.oauth.load_credentials() + if not creds: + return {"authenticated": False} + return { + "authenticated": True, + "resource_url": creds.get("resource_url"), + "expires_at": creds.get("expiry_date"), + } + + +class RequestHandler(BaseHTTPRequestHandler): + server_version = "new-qwen-serv/0.1" + + def _json_body(self) -> dict[str, Any]: + length = int(self.headers.get("Content-Length", "0")) + if length <= 0: + return {} + raw = self.rfile.read(length).decode("utf-8") + return json.loads(raw) if raw else {} + + def _send(self, status: int, payload: dict[str, Any]) -> None: + body = json.dumps(payload, ensure_ascii=False).encode("utf-8") + self.send_response(status) + self.send_header("Content-Type", "application/json; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + @property + def app(self) -> AppState: + return self.server.app_state # type: ignore[attr-defined] + + def do_GET(self) -> None: + if self.path == "/health": + self._send(HTTPStatus.OK, {"ok": True, "time": int(time.time())}) + return + if self.path == "/api/v1/auth/status": + self._send(HTTPStatus.OK, self.app.auth_status()) + return + self._send(HTTPStatus.NOT_FOUND, {"error": "Not found"}) + + def do_POST(self) -> None: + try: + if self.path == "/api/v1/auth/device/start": + flow_id = uuid.uuid4().hex + state = self.app.oauth.start_device_flow(open_browser=False) + with self.app.lock: + self.app.pending_device_flows[flow_id] = state + self._send( + HTTPStatus.OK, + { + "flow_id": flow_id, + "user_code": state.user_code, + "verification_uri": state.verification_uri, + "verification_uri_complete": state.verification_uri_complete, + "expires_at": state.expires_at, + "interval_seconds": state.interval_seconds, + }, + ) + return + + if self.path == "/api/v1/auth/device/poll": + body = self._json_body() + flow_id = body["flow_id"] + with self.app.lock: + state = self.app.pending_device_flows.get(flow_id) + if not state: + self._send(HTTPStatus.NOT_FOUND, {"error": "Unknown flow_id"}) + return + creds = self.app.oauth.poll_device_flow(state) + if creds is None: + self._send(HTTPStatus.OK, {"done": False, "interval_seconds": state.interval_seconds}) + return + with self.app.lock: + self.app.pending_device_flows.pop(flow_id, None) + self._send(HTTPStatus.OK, {"done": True, "credentials": {"resource_url": creds.get("resource_url")}}) + return + + if self.path == "/api/v1/chat": + body = self._json_body() + session_id = body.get("session_id") or uuid.uuid4().hex + user_id = str(body.get("user_id") or "anonymous") + message = body["message"] + session = self.app.sessions.load(session_id) + history = session.get("messages", []) + result = self.app.agent.run(history, message) + persisted_messages = result["messages"][1:] + self.app.sessions.save( + session_id, + { + "session_id": session_id, + "user_id": user_id, + "updated_at": int(time.time()), + "messages": persisted_messages, + }, + ) + self._send( + HTTPStatus.OK, + { + "session_id": session_id, + "answer": result["answer"], + "events": result["events"], + "usage": result.get("usage"), + }, + ) + return + + if self.path == "/api/v1/session/clear": + body = self._json_body() + session_id = body["session_id"] + self.app.sessions.clear(session_id) + self._send(HTTPStatus.OK, {"ok": True, "session_id": session_id}) + return + + self._send(HTTPStatus.NOT_FOUND, {"error": "Not found"}) + except OAuthError as exc: + self._send(HTTPStatus.BAD_GATEWAY, {"error": str(exc)}) + except KeyError as exc: + self._send(HTTPStatus.BAD_REQUEST, {"error": f"Missing field: {exc}"}) + except Exception as exc: + self._send(HTTPStatus.INTERNAL_SERVER_ERROR, {"error": str(exc)}) + + def log_message(self, fmt: str, *args: Any) -> None: + return + + +def main() -> None: + config = ServerConfig.load() + config.session_dir.mkdir(parents=True, exist_ok=True) + httpd = ThreadingHTTPServer((config.host, config.port), RequestHandler) + httpd.app_state = AppState(config) # type: ignore[attr-defined] + print(f"new-qwen serv listening on http://{config.host}:{config.port}") + httpd.serve_forever() + + +if __name__ == "__main__": + main() + diff --git a/serv/config.py b/serv/config.py new file mode 100644 index 0000000..b3f08cd --- /dev/null +++ b/serv/config.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +import os +from dataclasses import dataclass +from pathlib import Path + + +def load_env_file(path: Path) -> None: + if not path.exists(): + return + for raw_line in path.read_text(encoding="utf-8").splitlines(): + line = raw_line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, value = line.split("=", 1) + os.environ.setdefault(key.strip(), value.strip()) + + +@dataclass(slots=True) +class ServerConfig: + host: str + port: int + model: str + workspace_root: Path + session_dir: Path + system_prompt: str + max_tool_rounds: int + + @classmethod + def load(cls) -> "ServerConfig": + base_dir = Path(__file__).resolve().parent + load_env_file(base_dir / ".env") + workspace_root = Path( + os.environ.get("NEW_QWEN_WORKSPACE_ROOT", str(Path.cwd())) + ).expanduser() + session_dir = Path( + os.environ.get( + "NEW_QWEN_SESSION_DIR", + str(base_dir.parent / ".new-qwen" / "sessions"), + ) + ).expanduser() + return cls( + host=os.environ.get("NEW_QWEN_HOST", "127.0.0.1"), + port=int(os.environ.get("NEW_QWEN_PORT", "8080")), + model=os.environ.get("NEW_QWEN_MODEL", "qwen3.6-plus"), + workspace_root=workspace_root.resolve(), + session_dir=session_dir.resolve(), + system_prompt=os.environ.get("NEW_QWEN_SYSTEM_PROMPT", "").strip(), + max_tool_rounds=int(os.environ.get("NEW_QWEN_MAX_TOOL_ROUNDS", "8")), + ) + diff --git a/serv/llm.py b/serv/llm.py new file mode 100644 index 0000000..54219f5 --- /dev/null +++ b/serv/llm.py @@ -0,0 +1,104 @@ +from __future__ import annotations + +import json +from typing import Any +from urllib import request + +from config import ServerConfig +from oauth import OAuthError, QwenOAuthManager +from tools import ToolError, ToolRegistry + + +DEFAULT_SYSTEM_PROMPT = """You are new-qwen-serv, a server-side coding agent. +You help the user through a remote client. +Use tools when they are necessary. +Be concise, precise and action-oriented. +When you modify files, explain what changed. +Do not claim to have executed tools unless a tool result confirms it.""" + + +class QwenAgent: + def __init__(self, config: ServerConfig, oauth: QwenOAuthManager, tools: ToolRegistry) -> None: + self.config = config + self.oauth = oauth + self.tools = tools + + def _request_completion(self, messages: list[dict[str, Any]]) -> dict[str, Any]: + creds = self.oauth.get_valid_credentials() + base_url = self.oauth.get_openai_base_url(creds) + payload = { + "model": self.config.model, + "messages": messages, + "tools": self.tools.schemas(), + "tool_choice": "auto", + } + data = json.dumps(payload).encode("utf-8") + req = request.Request( + f"{base_url}/chat/completions", + data=data, + headers={ + "Content-Type": "application/json", + "Authorization": f"Bearer {creds['access_token']}", + }, + method="POST", + ) + with request.urlopen(req, timeout=180) as response: + return json.loads(response.read().decode("utf-8")) + + def run(self, history: list[dict[str, Any]], user_message: str) -> dict[str, Any]: + system_prompt = self.config.system_prompt or DEFAULT_SYSTEM_PROMPT + messages: list[dict[str, Any]] = [{"role": "system", "content": system_prompt}] + messages.extend(history) + messages.append({"role": "user", "content": user_message}) + events: list[dict[str, Any]] = [] + + for _ in range(self.config.max_tool_rounds): + response = self._request_completion(messages) + choice = response["choices"][0]["message"] + tool_calls = choice.get("tool_calls") or [] + content = choice.get("content") + if content: + events.append({"type": "assistant", "content": content}) + + if not tool_calls: + return { + "answer": content or "", + "events": events, + "usage": response.get("usage"), + "messages": messages + [{"role": "assistant", "content": content or ""}], + } + + messages.append( + { + "role": "assistant", + "content": content or "", + "tool_calls": tool_calls, + } + ) + + for call in tool_calls: + tool_name = call["function"]["name"] + try: + arguments = json.loads(call["function"]["arguments"] or "{}") + except json.JSONDecodeError: + arguments = {} + + events.append({"type": "tool_call", "name": tool_name, "arguments": arguments}) + try: + result = self.tools.execute(tool_name, arguments) + except ToolError as exc: + result = {"error": str(exc)} + except Exception as exc: + result = {"error": f"Unexpected tool failure: {exc}"} + + events.append({"type": "tool_result", "name": tool_name, "result": result}) + messages.append( + { + "role": "tool", + "tool_call_id": call["id"], + "content": self.tools.encode_result(result), + } + ) + + raise OAuthError("Max tool rounds exceeded") + diff --git a/serv/oauth.py b/serv/oauth.py new file mode 100644 index 0000000..fee0eb0 --- /dev/null +++ b/serv/oauth.py @@ -0,0 +1,184 @@ +from __future__ import annotations + +import base64 +import hashlib +import json +import secrets +import time +import uuid +import webbrowser +from dataclasses import dataclass +from pathlib import Path +from typing import Any +from urllib import error, parse, request + + +QWEN_OAUTH_BASE_URL = "https://chat.qwen.ai" +QWEN_DEVICE_CODE_ENDPOINT = f"{QWEN_OAUTH_BASE_URL}/api/v1/oauth2/device/code" +QWEN_TOKEN_ENDPOINT = f"{QWEN_OAUTH_BASE_URL}/api/v1/oauth2/token" +QWEN_CLIENT_ID = "f0304373b74a44d2b584a3fb70ca9e56" +QWEN_SCOPE = "openid profile email model.completion" +QWEN_DEVICE_GRANT = "urn:ietf:params:oauth:grant-type:device_code" + + +class OAuthError(RuntimeError): + pass + + +@dataclass(slots=True) +class DeviceAuthState: + device_code: str + code_verifier: str + user_code: str + verification_uri: str + verification_uri_complete: str + expires_at: float + interval_seconds: float + + +class QwenOAuthManager: + def __init__(self, creds_path: Path | None = None) -> None: + self.creds_path = creds_path or Path.home() / ".qwen" / "oauth_creds.json" + self.creds_path.parent.mkdir(parents=True, exist_ok=True) + + def _post_form(self, url: str, payload: dict[str, str]) -> dict[str, Any]: + data = parse.urlencode(payload).encode("utf-8") + req = request.Request( + url, + data=data, + headers={ + "Content-Type": "application/x-www-form-urlencoded", + "Accept": "application/json", + "x-request-id": str(uuid.uuid4()), + }, + method="POST", + ) + try: + with request.urlopen(req, timeout=60) as response: + return json.loads(response.read().decode("utf-8")) + except error.HTTPError as exc: + body = exc.read().decode("utf-8", errors="replace") + try: + payload = json.loads(body) + except json.JSONDecodeError: + raise OAuthError(f"HTTP {exc.code}: {body}") from exc + message = payload.get("error_description") or payload.get("error") or body + raise OAuthError(message) from exc + + def load_credentials(self) -> dict[str, Any] | None: + if not self.creds_path.exists(): + return None + return json.loads(self.creds_path.read_text(encoding="utf-8")) + + def save_credentials(self, payload: dict[str, Any]) -> None: + self.creds_path.write_text( + json.dumps(payload, ensure_ascii=True, indent=2), + encoding="utf-8", + ) + + def clear_credentials(self) -> None: + if self.creds_path.exists(): + self.creds_path.unlink() + + def start_device_flow(self, open_browser: bool = False) -> DeviceAuthState: + code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(32)).decode("ascii").rstrip("=") + code_challenge = ( + base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode("utf-8")).digest()) + .decode("ascii") + .rstrip("=") + ) + response = self._post_form( + QWEN_DEVICE_CODE_ENDPOINT, + { + "client_id": QWEN_CLIENT_ID, + "scope": QWEN_SCOPE, + "code_challenge": code_challenge, + "code_challenge_method": "S256", + }, + ) + state = DeviceAuthState( + device_code=response["device_code"], + code_verifier=code_verifier, + user_code=response["user_code"], + verification_uri=response["verification_uri"], + verification_uri_complete=response["verification_uri_complete"], + expires_at=time.time() + float(response.get("expires_in", 600)), + interval_seconds=2.0, + ) + if open_browser: + try: + webbrowser.open(state.verification_uri_complete) + except Exception: + pass + return state + + def poll_device_flow(self, state: DeviceAuthState) -> dict[str, Any] | None: + if time.time() >= state.expires_at: + raise OAuthError("Device authorization expired") + try: + response = self._post_form( + QWEN_TOKEN_ENDPOINT, + { + "grant_type": QWEN_DEVICE_GRANT, + "client_id": QWEN_CLIENT_ID, + "device_code": state.device_code, + "code_verifier": state.code_verifier, + }, + ) + except OAuthError as exc: + text = str(exc) + if "authorization_pending" in text: + return None + if "slow_down" in text: + state.interval_seconds = min(state.interval_seconds * 1.5, 10.0) + return None + raise + + creds = { + "access_token": response["access_token"], + "refresh_token": response.get("refresh_token"), + "token_type": response.get("token_type", "Bearer"), + "resource_url": response.get("resource_url"), + "expiry_date": int(time.time() * 1000) + int(response.get("expires_in", 3600)) * 1000, + } + self.save_credentials(creds) + return creds + + def refresh_credentials(self, creds: dict[str, Any]) -> dict[str, Any]: + refresh_token = creds.get("refresh_token") + if not refresh_token: + raise OAuthError("No refresh token available") + response = self._post_form( + QWEN_TOKEN_ENDPOINT, + { + "grant_type": "refresh_token", + "refresh_token": refresh_token, + "client_id": QWEN_CLIENT_ID, + }, + ) + updated = { + "access_token": response["access_token"], + "refresh_token": response.get("refresh_token") or refresh_token, + "token_type": response.get("token_type", "Bearer"), + "resource_url": response.get("resource_url") or creds.get("resource_url"), + "expiry_date": int(time.time() * 1000) + int(response.get("expires_in", 3600)) * 1000, + } + self.save_credentials(updated) + return updated + + def get_valid_credentials(self) -> dict[str, Any]: + creds = self.load_credentials() + if not creds: + raise OAuthError("Qwen OAuth is not configured. Start device flow first.") + expiry = int(creds.get("expiry_date", 0)) + if expiry and expiry - int(time.time() * 1000) > 30_000: + return creds + return self.refresh_credentials(creds) + + def get_openai_base_url(self, creds: dict[str, Any]) -> str: + resource_url = creds.get("resource_url") or "https://dashscope.aliyuncs.com/compatible-mode" + if not resource_url.startswith("http"): + resource_url = f"https://{resource_url}" + if resource_url.endswith("/v1"): + return resource_url + return resource_url.rstrip("/") + "/v1" diff --git a/serv/sessions.py b/serv/sessions.py new file mode 100644 index 0000000..e758ed6 --- /dev/null +++ b/serv/sessions.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import json +import threading +from pathlib import Path +from typing import Any + + +class SessionStore: + def __init__(self, base_dir: Path) -> None: + self.base_dir = base_dir + self.base_dir.mkdir(parents=True, exist_ok=True) + self._lock = threading.Lock() + + def _path(self, session_id: str) -> Path: + return self.base_dir / f"{session_id}.json" + + def load(self, session_id: str) -> dict[str, Any]: + path = self._path(session_id) + if not path.exists(): + return {"session_id": session_id, "messages": []} + return json.loads(path.read_text(encoding="utf-8")) + + def save(self, session_id: str, payload: dict[str, Any]) -> None: + with self._lock: + self._path(session_id).write_text( + json.dumps(payload, ensure_ascii=False, indent=2), + encoding="utf-8", + ) + + def append_message(self, session_id: str, message: dict[str, Any]) -> dict[str, Any]: + with self._lock: + payload = self.load(session_id) + payload.setdefault("messages", []).append(message) + self.save(session_id, payload) + return payload + + def clear(self, session_id: str) -> None: + with self._lock: + path = self._path(session_id) + if path.exists(): + path.unlink() + diff --git a/serv/tools.py b/serv/tools.py new file mode 100644 index 0000000..039f453 --- /dev/null +++ b/serv/tools.py @@ -0,0 +1,153 @@ +from __future__ import annotations + +import json +import subprocess +from pathlib import Path +from typing import Any, Callable + + +class ToolError(RuntimeError): + pass + + +class ToolRegistry: + def __init__(self, workspace_root: Path) -> None: + self.workspace_root = workspace_root.resolve() + self._handlers: dict[str, Callable[[dict[str, Any]], dict[str, Any]]] = { + "list_files": self.list_files, + "read_file": self.read_file, + "write_file": self.write_file, + "exec_command": self.exec_command, + } + + def schemas(self) -> list[dict[str, Any]]: + return [ + { + "type": "function", + "function": { + "name": "list_files", + "description": "List files in a directory inside the workspace.", + "parameters": { + "type": "object", + "properties": { + "path": {"type": "string"}, + }, + "required": ["path"], + }, + }, + }, + { + "type": "function", + "function": { + "name": "read_file", + "description": "Read a UTF-8 text file from the workspace.", + "parameters": { + "type": "object", + "properties": { + "path": {"type": "string"}, + }, + "required": ["path"], + }, + }, + }, + { + "type": "function", + "function": { + "name": "write_file", + "description": "Write UTF-8 text into a file inside the workspace.", + "parameters": { + "type": "object", + "properties": { + "path": {"type": "string"}, + "content": {"type": "string"}, + }, + "required": ["path", "content"], + }, + }, + }, + { + "type": "function", + "function": { + "name": "exec_command", + "description": "Run a shell command inside the workspace and return stdout, stderr and exit code.", + "parameters": { + "type": "object", + "properties": { + "command": {"type": "string"}, + "cwd": {"type": "string"}, + }, + "required": ["command"], + }, + }, + }, + ] + + def _resolve(self, raw_path: str) -> Path: + candidate = Path(raw_path) + if not candidate.is_absolute(): + candidate = self.workspace_root / candidate + resolved = candidate.resolve() + if self.workspace_root not in resolved.parents and resolved != self.workspace_root: + raise ToolError("Path escapes workspace root") + return resolved + + def execute(self, name: str, arguments: dict[str, Any]) -> dict[str, Any]: + handler = self._handlers.get(name) + if not handler: + raise ToolError(f"Unknown tool: {name}") + return handler(arguments) + + def list_files(self, arguments: dict[str, Any]) -> dict[str, Any]: + target = self._resolve(arguments["path"]) + if not target.exists(): + raise ToolError("Directory does not exist") + if not target.is_dir(): + raise ToolError("Path is not a directory") + items = [] + for item in sorted(target.iterdir(), key=lambda value: value.name): + items.append( + { + "name": item.name, + "path": str(item.relative_to(self.workspace_root)), + "type": "dir" if item.is_dir() else "file", + } + ) + return {"items": items} + + def read_file(self, arguments: dict[str, Any]) -> dict[str, Any]: + target = self._resolve(arguments["path"]) + if not target.exists(): + raise ToolError("File does not exist") + if not target.is_file(): + raise ToolError("Path is not a file") + return {"path": str(target.relative_to(self.workspace_root)), "content": target.read_text(encoding="utf-8")} + + def write_file(self, arguments: dict[str, Any]) -> dict[str, Any]: + target = self._resolve(arguments["path"]) + target.parent.mkdir(parents=True, exist_ok=True) + target.write_text(arguments["content"], encoding="utf-8") + return {"path": str(target.relative_to(self.workspace_root)), "bytes_written": len(arguments["content"].encode("utf-8"))} + + def exec_command(self, arguments: dict[str, Any]) -> dict[str, Any]: + cwd = self._resolve(arguments.get("cwd", ".")) + command = arguments["command"] + completed = subprocess.run( + command, + cwd=str(cwd), + shell=True, + capture_output=True, + text=True, + timeout=120, + ) + return { + "command": command, + "cwd": str(cwd.relative_to(self.workspace_root)), + "returncode": completed.returncode, + "stdout": completed.stdout[-12000:], + "stderr": completed.stderr[-12000:], + } + + @staticmethod + def encode_result(result: dict[str, Any]) -> str: + return json.dumps(result, ensure_ascii=False) +