new-qwen/bot/app.py

1074 lines
40 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import html
import json
import re
import time
from pathlib import Path
from typing import Any
from urllib import error, request
from config import BotConfig
from telegram_api import TelegramAPI
STATE_FILE = Path(__file__).resolve().parent.parent / ".new-qwen" / "telegram-state.json"
TYPING_INTERVAL_SECONDS = 4.0
BOT_COMMANDS = [
{"command": "help", "description": "Список команд"},
{"command": "status", "description": "Статус сервера и чата"},
{"command": "auth", "description": "Запустить Qwen OAuth"},
{"command": "provider", "description": "Выбрать провайдера"},
{"command": "model", "description": "Выбрать модель"},
{"command": "session", "description": "Показать сессию"},
{"command": "clear", "description": "Очистить контекст"},
{"command": "cancel", "description": "Отменить активный job"},
]
def load_state() -> dict[str, Any]:
if not STATE_FILE.exists():
return {
"offset": None,
"sessions": {},
"auth_flows": {},
"active_jobs": {},
"chat_active_jobs": {},
"chat_queues": {},
"pending_approvals": {},
"chat_preferences": {},
}
state = json.loads(STATE_FILE.read_text(encoding="utf-8"))
state.setdefault("sessions", {})
state.setdefault("auth_flows", {})
state.setdefault("active_jobs", {})
state.setdefault("chat_active_jobs", {})
state.setdefault("chat_queues", {})
state.setdefault("pending_approvals", {})
state.setdefault("chat_preferences", {})
return state
def save_state(state: dict[str, Any]) -> None:
STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
STATE_FILE.write_text(json.dumps(state, ensure_ascii=False, indent=2), encoding="utf-8")
def post_json(url: str, payload: dict[str, Any]) -> dict[str, Any]:
data = json.dumps(payload).encode("utf-8")
req = request.Request(
url,
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
with request.urlopen(req, timeout=300) as response:
return json.loads(response.read().decode("utf-8"))
def get_json(url: str) -> dict[str, Any]:
with request.urlopen(url, timeout=60) as response:
return json.loads(response.read().decode("utf-8"))
def send_text_chunks(api: TelegramAPI, chat_id: int, text: str) -> None:
normalized = text or "Пустой ответ."
chunk_size = 3800
for start in range(0, len(normalized), chunk_size):
api.send_message(chat_id, normalized[start : start + chunk_size])
def ensure_bot_commands(api: TelegramAPI, state: dict[str, Any]) -> None:
api.set_my_commands(BOT_COMMANDS)
def get_provider_catalog(config: BotConfig) -> dict[str, Any]:
return get_json(f"{config.server_url}/api/v1/provider-catalog")
def get_chat_preferences(state: dict[str, Any], chat_id: int) -> dict[str, Any]:
prefs = state.setdefault("chat_preferences", {}).setdefault(str(chat_id), {})
prefs.setdefault("provider", None)
prefs.setdefault("model", None)
return prefs
def get_selected_provider_and_model(
state: dict[str, Any],
chat_id: int,
catalog: dict[str, Any],
) -> tuple[str | None, str | None]:
prefs = get_chat_preferences(state, chat_id)
provider = prefs.get("provider") or catalog.get("default_provider")
model = prefs.get("model")
providers = {item.get("name"): item for item in catalog.get("providers", [])}
provider_info = providers.get(provider or "")
available_models = provider_info.get("models", []) if provider_info else []
if not model and available_models:
model = available_models[0].get("id")
return provider, model
def format_status_text(job_state: dict[str, Any], text: str) -> str:
del job_state
return (text or "Пустой ответ.")[:4000]
def format_final_signature(job_state: dict[str, Any]) -> str:
model = job_state.get("model") if isinstance(job_state.get("model"), str) else None
if not model:
return ""
return f"\n\n<i>{html.escape(model)}</i>"
def render_markdownish_html(text: str) -> str:
normalized = (text or "Пустой ответ.").replace("\r\n", "\n")
fence_pattern = re.compile(r"```(?:[^\n`]*)\n(.*?)```", re.DOTALL)
placeholders: list[str] = []
def replace_fence(match: re.Match[str]) -> str:
placeholders.append(f"<pre>{html.escape(match.group(1).strip())}</pre>")
return f"@@CODEBLOCK_{len(placeholders) - 1}@@"
normalized = fence_pattern.sub(replace_fence, normalized)
escaped = html.escape(normalized)
escaped = re.sub(r"(?<!\*)\*\*(.+?)\*\*(?!\*)", r"<b>\1</b>", escaped)
escaped = re.sub(r"`([^`\n]+)`", r"<code>\1</code>", escaped)
escaped = re.sub(r"(?m)^\*\s+", "", escaped)
escaped = re.sub(r"(?m)^-\s+", "", escaped)
for index, block in enumerate(placeholders):
escaped = escaped.replace(f"@@CODEBLOCK_{index}@@", block)
return escaped
def update_status_message(
api: TelegramAPI,
job_state: dict[str, Any],
text: str,
) -> None:
normalized = format_status_text(job_state, text)
chat_id = int(job_state["chat_id"])
message_id = int(job_state.get("status_message_id") or 0)
if message_id:
api.edit_message_text(chat_id, message_id, normalized)
else:
message_id = api.send_message(chat_id, normalized)
job_state["status_message_id"] = message_id
job_state["status_message_text"] = normalized
job_state["status_body_text"] = text or "Пустой ответ."
def set_final_message(
api: TelegramAPI,
job_state: dict[str, Any],
text: str,
) -> None:
normalized = text or "Пустой ответ."
chunk_size = 3800
signature = format_final_signature(job_state)
first_chunk = normalized[:chunk_size]
rendered = render_markdownish_html(first_chunk) + signature
chat_id = int(job_state["chat_id"])
message_id = int(job_state.get("status_message_id") or 0)
if message_id:
api.edit_message_text(chat_id, message_id, rendered, parse_mode="HTML")
else:
job_state["status_message_id"] = api.send_message(
chat_id,
rendered,
parse_mode="HTML",
)
job_state["status_message_text"] = first_chunk
for start in range(chunk_size, len(normalized), chunk_size):
api.send_message(
int(job_state["chat_id"]),
render_markdownish_html(normalized[start : start + chunk_size]),
parse_mode="HTML",
)
def keep_job_typing(
api: TelegramAPI,
job_state: dict[str, Any],
) -> None:
now = time.time()
last_typing_at = float(job_state.get("last_typing_at") or 0.0)
if now - last_typing_at < TYPING_INTERVAL_SECONDS:
return
api.send_chat_action(int(job_state["chat_id"]), "typing")
job_state["last_typing_at"] = now
def summarize_event(event: dict[str, Any]) -> str | None:
event_type = event.get("type")
if event_type == "job_status":
message = str(event.get("message") or "").strip()
if message == "Запрос принят сервером":
return "Смотрю, что можно сделать."
if message == "Ответ готов":
return "Формулирую ответ."
return message or None
if event_type == "model_request":
return "Думаю над ответом."
if event_type == "tool_call":
tool_name = str(event.get("name") or "")
if tool_name in {"read_file", "list_files", "glob_search", "grep_text", "stat_path"}:
return "Просматриваю файлы и контекст."
if tool_name in {"git_status", "git_diff"}:
return "Проверяю изменения в проекте."
if tool_name in {"exec_command"}:
return "Проверяю окружение и команды."
if tool_name in {"web_search"}:
return "Ищу нужную информацию."
return "Проверяю детали."
if event_type == "tool_result":
result = event.get("result", {})
if isinstance(result, dict) and "error" in result:
return "Наткнулся на проблему, перепроверяю."
return "Собрал нужные данные."
if event_type == "error":
return f"Ошибка: {event.get('message')}"
if event_type == "approval_result":
status = event.get("status")
tool_name = event.get("tool_name")
if status == "approved":
return f"Получил подтверждение для {tool_name}."
return f"Подтверждение для {tool_name} отклонено."
return None
def format_approval_keyboard(approval_id: str) -> dict[str, Any]:
return {
"inline_keyboard": [
[
{"text": "Approve", "callback_data": f"approve:{approval_id}"},
{"text": "Reject", "callback_data": f"reject:{approval_id}"},
]
]
}
def format_provider_keyboard(catalog: dict[str, Any]) -> dict[str, Any]:
rows: list[list[dict[str, str]]] = []
row: list[dict[str, str]] = []
for item in catalog.get("providers", []):
name = str(item.get("name") or "")
if not name:
continue
row.append({"text": name, "callback_data": f"provider:{name}"})
if len(row) == 2:
rows.append(row)
row = []
if row:
rows.append(row)
return {"inline_keyboard": rows}
def format_model_keyboard(provider: str, models: list[dict[str, str]]) -> dict[str, Any]:
rows = [
[{"text": str(item.get("name") or item.get("id") or "-"), "callback_data": f"model:{provider}:{item.get('id')}"}]
for item in models
if item.get("id")
]
return {"inline_keyboard": rows}
def set_chat_provider(
state: dict[str, Any],
chat_id: int,
provider: str | None,
*,
reset_model: bool = True,
) -> dict[str, Any]:
prefs = get_chat_preferences(state, chat_id)
prefs["provider"] = provider
if reset_model:
prefs["model"] = None
return prefs
def set_chat_model(state: dict[str, Any], chat_id: int, model: str | None) -> dict[str, Any]:
prefs = get_chat_preferences(state, chat_id)
prefs["model"] = model
return prefs
def format_provider_selection(
catalog: dict[str, Any],
selected_provider: str | None,
) -> str:
lines = ["Выбери провайдера."]
for item in catalog.get("providers", []):
name = str(item.get("name") or "")
if not name:
continue
marker = "" if name == selected_provider else " "
availability = "ready" if item.get("available") else f"unavailable: {item.get('reason') or 'unknown'}"
lines.append(f"{marker} {name} ({availability})")
return "\n".join(lines)
def format_model_selection(
provider: str,
models: list[dict[str, str]],
selected_model: str | None,
) -> str:
lines = [f"Выбери модель для {provider}."]
for item in models:
model_id = str(item.get("id") or "")
if not model_id:
continue
marker = "" if model_id == selected_model else " "
description = str(item.get("description") or "").strip()
suffix = f" ({description})" if description else ""
lines.append(f"{marker} {model_id}{suffix}")
return "\n".join(lines)
def format_approval_request(event: dict[str, Any]) -> str:
return (
"Нужно подтверждение инструмента.\n"
f"approval_id: {event.get('approval_id')}\n"
f"tool: {event.get('tool_name')}\n"
f"args: {json.dumps(event.get('arguments', {}), ensure_ascii=False)}"
)
def get_auth_flow(state: dict[str, Any], chat_id: int) -> dict[str, Any] | None:
return state.setdefault("auth_flows", {}).get(str(chat_id))
def start_auth_flow(
api: TelegramAPI,
config: BotConfig,
state: dict[str, Any],
chat_id: int,
*,
force_new: bool = False,
) -> dict[str, Any]:
existing = get_auth_flow(state, chat_id)
now = time.time()
if existing and not force_new and existing.get("expires_at", 0) > now:
send_text_chunks(
api,
chat_id,
"Авторизация Qwen OAuth ещё не завершена.\n"
f"Открой ссылку:\n{existing['verification_uri_complete']}\n\n"
"Бот сам проверит завершение и продолжит работу.",
)
return existing
started = post_json(f"{config.server_url}/api/v1/auth/device/start", {})
flow = {
"flow_id": started["flow_id"],
"user_code": started.get("user_code"),
"verification_uri": started.get("verification_uri"),
"verification_uri_complete": started["verification_uri_complete"],
"expires_at": started["expires_at"],
"interval_seconds": started.get("interval_seconds", 2),
"next_poll_at": now + started.get("interval_seconds", 2),
"pending_messages": existing.get("pending_messages", []) if existing else [],
}
state.setdefault("auth_flows", {})[str(chat_id)] = flow
send_text_chunks(
api,
chat_id,
"Нужна авторизация Qwen OAuth.\n"
f"Открой ссылку:\n{flow['verification_uri_complete']}\n\n"
f"user_code: {flow.get('user_code')}\n"
"После подтверждения бот сам продолжит работу.",
)
return flow
def ensure_auth(api: TelegramAPI, config: BotConfig, state: dict[str, Any], chat_id: int) -> bool:
status = get_json(f"{config.server_url}/api/v1/auth/status")
if status.get("ready") or status.get("available_providers"):
return True
default_provider = status.get("default_provider")
fallback_providers = status.get("fallback_providers") or []
if default_provider != "qwen" and "qwen" not in fallback_providers:
api.send_message(
chat_id,
"На сервере нет доступных model provider-ов. "
f"Текущий default_provider: {default_provider}. "
"Для GigaChat/YandexGPT нужно настроить серверные credentials.",
)
return False
start_auth_flow(api, config, state, chat_id)
return False
def enqueue_pending_message(
state: dict[str, Any],
chat_id: int,
user_id: str,
session_key: str,
text: str,
) -> None:
flow = get_auth_flow(state, chat_id)
if not flow:
return
pending_messages = flow.setdefault("pending_messages", [])
pending_messages.append(
{
"user_id": user_id,
"session_key": session_key,
"text": text,
"created_at": int(time.time()),
}
)
def start_chat_job(
api: TelegramAPI,
config: BotConfig,
state: dict[str, Any],
chat_id: int,
user_id: str,
session_key: str,
text: str,
*,
delayed: bool = False,
) -> None:
session_id = state.setdefault("sessions", {}).get(session_key)
catalog = get_provider_catalog(config)
provider, model = get_selected_provider_and_model(state, chat_id, catalog)
prefix = "Возвращаюсь к вашему сообщению." if delayed else "Смотрю, что можно сделать."
status_message_id = api.send_message(chat_id, prefix)
start_result = post_json(
f"{config.server_url}/api/v1/chat/start",
{
"session_id": session_id,
"user_id": user_id,
"message": text,
"provider": provider,
"model": model,
},
)
state["sessions"][session_key] = start_result["session_id"]
state.setdefault("active_jobs", {})[start_result["job_id"]] = {
"job_id": start_result["job_id"],
"chat_id": chat_id,
"user_id": user_id,
"session_key": session_key,
"session_id": start_result["session_id"],
"seen_seq": 0,
"status_message_id": status_message_id,
"status_message_text": prefix,
"status_body_text": prefix,
"requested_provider": provider,
"requested_model": model,
"provider": start_result.get("provider") or provider,
"model": start_result.get("model") or model,
"fallback_notified": False,
"last_typing_at": 0.0,
}
state.setdefault("chat_active_jobs", {})[str(chat_id)] = start_result["job_id"]
def enqueue_chat_message(
state: dict[str, Any],
chat_id: int,
user_id: str,
session_key: str,
text: str,
*,
delayed: bool = False,
) -> int:
queue = state.setdefault("chat_queues", {}).setdefault(str(chat_id), [])
queue.append(
{
"user_id": user_id,
"session_key": session_key,
"text": text,
"delayed": delayed,
"created_at": int(time.time()),
}
)
return len(queue)
def start_or_queue_chat_job(
api: TelegramAPI,
config: BotConfig,
state: dict[str, Any],
chat_id: int,
user_id: str,
session_key: str,
text: str,
*,
delayed: bool = False,
) -> None:
active_job_id = state.setdefault("chat_active_jobs", {}).get(str(chat_id))
if active_job_id:
queue_size = enqueue_chat_message(
state,
chat_id,
user_id,
session_key,
text,
delayed=delayed,
)
api.send_message(
chat_id,
f"В этом чате уже есть активный запрос. Сообщение поставлено в очередь: {queue_size}.",
)
return
start_chat_job(
api,
config,
state,
chat_id,
user_id,
session_key,
text,
delayed=delayed,
)
def start_next_queued_job(
api: TelegramAPI,
config: BotConfig,
state: dict[str, Any],
chat_id: int,
) -> None:
if state.setdefault("chat_active_jobs", {}).get(str(chat_id)):
return
queue = state.setdefault("chat_queues", {}).get(str(chat_id)) or []
if not queue:
return
next_item = queue.pop(0)
if not queue:
state["chat_queues"].pop(str(chat_id), None)
start_chat_job(
api,
config,
state,
chat_id,
next_item["user_id"],
next_item["session_key"],
next_item["text"],
delayed=bool(next_item.get("delayed")),
)
def poll_auth_flow(
api: TelegramAPI,
config: BotConfig,
state: dict[str, Any],
chat_id: int,
*,
force: bool = False,
) -> bool:
flow = get_auth_flow(state, chat_id)
if not flow:
return False
now = time.time()
if flow.get("expires_at", 0) <= now:
state["auth_flows"].pop(str(chat_id), None)
api.send_message(chat_id, "OAuth flow истёк. Запусти /auth ещё раз.")
return False
if not force and now < flow.get("next_poll_at", 0):
return False
try:
result = post_json(
f"{config.server_url}/api/v1/auth/device/poll",
{"flow_id": flow["flow_id"]},
)
except error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
state["auth_flows"].pop(str(chat_id), None)
send_text_chunks(
api,
chat_id,
"Не удалось завершить OAuth flow на сервере.\n"
f"Ответ сервера: {body}\n"
"Запусти /auth ещё раз.",
)
return False
if not result.get("done"):
interval = result.get("interval_seconds", flow.get("interval_seconds", 2))
flow["interval_seconds"] = interval
flow["next_poll_at"] = now + interval
return False
state["auth_flows"].pop(str(chat_id), None)
api.send_message(chat_id, "Qwen OAuth успешно настроен.")
for item in flow.get("pending_messages", []):
start_or_queue_chat_job(
api,
config,
state,
chat_id,
item["user_id"],
item["session_key"],
item["text"],
delayed=True,
)
if not flow.get("pending_messages"):
api.send_message(chat_id, "Можно отправлять обычные сообщения.")
return True
def process_auth_flows(
api: TelegramAPI,
config: BotConfig,
state: dict[str, Any],
) -> None:
for chat_id_raw in list(state.setdefault("auth_flows", {}).keys()):
try:
poll_auth_flow(api, config, state, int(chat_id_raw), force=False)
except Exception as exc:
print(f"auth flow poll error for chat {chat_id_raw}: {exc}")
def process_active_jobs(
api: TelegramAPI,
config: BotConfig,
state: dict[str, Any],
) -> None:
active_jobs = state.setdefault("active_jobs", {})
pending_approvals = state.setdefault("pending_approvals", {})
for job_id in list(active_jobs.keys()):
job_state = active_jobs[job_id]
keep_job_typing(api, job_state)
poll_result = post_json(
f"{config.server_url}/api/v1/chat/poll",
{"job_id": job_id, "since_seq": job_state.get("seen_seq", 0)},
)
for event in poll_result.get("events", []):
seq = int(event.get("seq", 0))
job_state["seen_seq"] = max(job_state.get("seen_seq", 0), seq)
if event.get("type") == "approval_request":
pending_approvals[str(job_state["chat_id"])] = {
"approval_id": event["approval_id"],
"job_id": job_id,
}
api.send_message(
int(job_state["chat_id"]),
format_approval_request(event),
reply_markup=format_approval_keyboard(str(event["approval_id"])),
)
continue
if event.get("type") == "model_request":
selection_reason = str(event.get("selection_reason") or "")
if (
"fell back to" in selection_reason
and not job_state.get("fallback_notified")
):
requested_provider = job_state.get("requested_provider") or "unknown"
api.send_message(
int(job_state["chat_id"]),
"Выбранный провайдер не ответил, поэтому продолжаю через запасной вариант.\n"
f"Изначально был выбран: {requested_provider}",
)
job_state["fallback_notified"] = True
if event.get("provider"):
job_state["provider"] = event.get("provider")
if event.get("model"):
job_state["model"] = event.get("model")
summary = summarize_event(event)
if summary and summary != job_state.get("status_body_text"):
update_status_message(api, job_state, summary)
status = poll_result.get("status")
if status == "completed":
state["sessions"][job_state["session_key"]] = poll_result["session_id"]
set_final_message(
api,
job_state,
poll_result.get("answer") or "Пустой ответ от модели.",
)
active_jobs.pop(job_id, None)
state.setdefault("chat_active_jobs", {}).pop(str(job_state["chat_id"]), None)
pending = pending_approvals.get(str(job_state["chat_id"]))
if pending and pending.get("job_id") == job_id:
pending_approvals.pop(str(job_state["chat_id"]), None)
start_next_queued_job(api, config, state, int(job_state["chat_id"]))
elif status == "failed":
set_final_message(
api,
job_state,
f"Job завершился с ошибкой: {poll_result.get('error')}",
)
active_jobs.pop(job_id, None)
state.setdefault("chat_active_jobs", {}).pop(str(job_state["chat_id"]), None)
pending = pending_approvals.get(str(job_state["chat_id"]))
if pending and pending.get("job_id") == job_id:
pending_approvals.pop(str(job_state["chat_id"]), None)
start_next_queued_job(api, config, state, int(job_state["chat_id"]))
elif status == "canceled":
set_final_message(
api,
job_state,
f"Job отменён: {poll_result.get('error') or 'Canceled by operator'}",
)
active_jobs.pop(job_id, None)
state.setdefault("chat_active_jobs", {}).pop(str(job_state["chat_id"]), None)
pending = pending_approvals.get(str(job_state["chat_id"]))
if pending and pending.get("job_id") == job_id:
pending_approvals.pop(str(job_state["chat_id"]), None)
start_next_queued_job(api, config, state, int(job_state["chat_id"]))
def cancel_chat_work(
api: TelegramAPI,
config: BotConfig,
state: dict[str, Any],
chat_id: int,
actor: str,
*,
clear_queue: bool,
) -> bool:
canceled = False
active_job_id = state.setdefault("chat_active_jobs", {}).get(str(chat_id))
if active_job_id:
post_json(
f"{config.server_url}/api/v1/chat/cancel",
{
"job_id": active_job_id,
"actor": actor,
"reason": "Canceled from Telegram bot",
},
)
canceled = True
if clear_queue:
queue = state.setdefault("chat_queues", {}).pop(str(chat_id), [])
canceled = canceled or bool(queue)
pending = state.setdefault("pending_approvals", {}).get(str(chat_id))
if pending and pending.get("job_id") == active_job_id:
state["pending_approvals"].pop(str(chat_id), None)
return canceled
def handle_message(api: TelegramAPI, config: BotConfig, state: dict[str, Any], message: dict[str, Any]) -> None:
chat_id = message["chat"]["id"]
user_id = str(message.get("from", {}).get("id", chat_id))
text = (message.get("text") or "").strip()
if not text:
api.send_message(chat_id, "Поддерживаются только текстовые сообщения.")
return
session_key = f"{chat_id}:{user_id}"
session_id = state.setdefault("sessions", {}).get(session_key)
state.setdefault("auth_flows", {})
if text == "/start":
api.send_message(
chat_id,
"new-qwen bot готов.\nКоманды доступны через Telegram menu: /help, /auth, /status, /provider, /model, /session, /cancel, /clear.",
)
return
if text == "/help":
api.send_message(
chat_id,
"Команды:\n"
"/auth - начать Qwen OAuth\n"
"/auth_check [flow_id] - проверить авторизацию\n"
"/status - статус OAuth и сервера\n"
"/provider [name] - выбрать провайдера\n"
"/model [id] - выбрать модель\n"
"/session - показать текущую сессию\n"
"/cancel - отменить активный запрос и очистить очередь\n"
"/approve [approval_id] - подтвердить инструмент\n"
"/reject [approval_id] - отклонить инструмент\n"
"/clear - очистить контекст",
)
return
if text.startswith("/approve") or text.startswith("/reject"):
parts = text.split(maxsplit=1)
approval = state.setdefault("pending_approvals", {}).get(str(chat_id))
approval_id = parts[1] if len(parts) == 2 else approval.get("approval_id") if approval else None
if not approval_id:
api.send_message(chat_id, "Нет pending approval для этого чата.")
return
response = post_json(
f"{config.server_url}/api/v1/approval/respond",
{
"approval_id": approval_id,
"approved": text.startswith("/approve"),
"actor": user_id,
},
)
if response.get("status") != "pending":
state["pending_approvals"].pop(str(chat_id), None)
api.send_message(
chat_id,
f"Approval {approval_id}: {response.get('status')}",
)
return
if text == "/auth":
start_auth_flow(api, config, state, chat_id, force_new=True)
return
if text.startswith("/auth_check"):
parts = text.split(maxsplit=1)
if len(parts) == 2:
flow = get_auth_flow(state, chat_id)
if flow:
flow["flow_id"] = parts[1]
flow = get_auth_flow(state, chat_id)
if not flow:
api.send_message(chat_id, "Нет активного flow_id. Сначала вызови /auth.")
return
if not poll_auth_flow(api, config, state, chat_id, force=True):
api.send_message(chat_id, "Авторизация ещё не завершена. Повторите команду через пару секунд.")
return
if text == "/status":
status = get_json(f"{config.server_url}/api/v1/auth/status")
catalog = get_provider_catalog(config)
queue_size = len(state.setdefault("chat_queues", {}).get(str(chat_id), []))
active_job = state.setdefault("chat_active_jobs", {}).get(str(chat_id))
provider, model = get_selected_provider_and_model(state, chat_id, catalog)
providers_info = "\n".join(
[
f" - {item['name']}: model={item.get('model')}, available={item.get('available')}"
for item in catalog.get("providers", [])
]
) or " Нет доступных провайдеров"
send_text_chunks(
api,
chat_id,
"Сервер доступен.\n"
f"OAuth: {'configured' if status.get('authenticated') else 'not configured'}\n"
f"ready: {status.get('ready')}\n"
f"available_providers: {', '.join(status.get('available_providers') or []) or '-'}\n"
f"default_provider: {status.get('default_provider')}\n"
f"fallback_providers: {', '.join(status.get('fallback_providers') or []) or '-'}\n"
f"selected_provider: {provider}\n"
f"selected_model: {model}\n"
f"resource_url: {status.get('resource_url')}\n"
f"expires_at: {status.get('expires_at')}\n"
f"tool_policy: {status.get('tool_policy')}\n"
f"Провайдеры и модели:\n{providers_info}\n"
f"active_job: {active_job}\n"
f"queued_messages: {queue_size}",
)
return
if text.startswith("/provider"):
catalog = get_provider_catalog(config)
parts = text.split(maxsplit=1)
if len(parts) == 1:
provider, _ = get_selected_provider_and_model(state, chat_id, catalog)
api.send_message(
chat_id,
format_provider_selection(catalog, provider),
reply_markup=format_provider_keyboard(catalog),
)
return
provider_name = parts[1].strip()
provider_names = {str(item.get("name") or "") for item in catalog.get("providers", [])}
if provider_name not in provider_names:
api.send_message(chat_id, f"Неизвестный provider: {provider_name}")
return
set_chat_provider(state, chat_id, provider_name, reset_model=True)
_, model = get_selected_provider_and_model(state, chat_id, catalog)
api.send_message(chat_id, f"Выбран provider: {provider_name}\nmodel: {model or '-'}")
return
if text.startswith("/model"):
catalog = get_provider_catalog(config)
provider, selected_model = get_selected_provider_and_model(state, chat_id, catalog)
provider_map = {str(item.get("name") or ""): item for item in catalog.get("providers", [])}
provider_info = provider_map.get(provider or "")
models = provider_info.get("models", []) if provider_info else []
parts = text.split(maxsplit=1)
if len(parts) == 1:
api.send_message(
chat_id,
format_model_selection(provider or "-", models, selected_model),
reply_markup=format_model_keyboard(provider or "-", models),
)
return
model_id = parts[1].strip()
valid_model_ids = {str(item.get("id") or "") for item in models}
if model_id not in valid_model_ids:
api.send_message(chat_id, f"Неизвестная модель для {provider}: {model_id}")
return
set_chat_model(state, chat_id, model_id)
api.send_message(chat_id, f"Выбрана модель: {model_id}\nprovider: {provider}")
return
if text == "/cancel":
canceled = cancel_chat_work(
api,
config,
state,
chat_id,
user_id,
clear_queue=True,
)
if canceled:
api.send_message(chat_id, "Активный job отменён, очередь чата очищена.")
else:
api.send_message(chat_id, "Для этого чата нет активных или queued jobs.")
return
if text == "/session":
if not session_id:
api.send_message(chat_id, "У этого чата ещё нет активной сессии.")
return
session = post_json(
f"{config.server_url}/api/v1/session/get",
{"session_id": session_id},
)
send_text_chunks(
api,
chat_id,
"session_id: {session_id}\nmessages: {count}\nlast_answer: {last_answer}".format(
session_id=session_id,
count=len(session.get("messages", [])),
last_answer=session.get("last_answer") or "-",
),
)
return
if text == "/clear":
cancel_chat_work(
api,
config,
state,
chat_id,
user_id,
clear_queue=True,
)
if session_id:
post_json(f"{config.server_url}/api/v1/session/clear", {"session_id": session_id})
state["sessions"].pop(session_key, None)
api.send_message(chat_id, "Контекст сессии очищен.")
return
if not ensure_auth(api, config, state, chat_id):
enqueue_pending_message(state, chat_id, user_id, session_key, text)
api.send_message(chat_id, "Сообщение поставлено в очередь до завершения авторизации.")
return
start_or_queue_chat_job(api, config, state, chat_id, user_id, session_key, text)
def handle_callback_query(
api: TelegramAPI,
config: BotConfig,
state: dict[str, Any],
callback_query: dict[str, Any],
) -> None:
callback_query_id = str(callback_query.get("id") or "")
data = str(callback_query.get("data") or "")
message = callback_query.get("message") or {}
chat = message.get("chat") or {}
chat_id = int(chat.get("id"))
message_id = int(message.get("message_id"))
actor = str((callback_query.get("from") or {}).get("id") or chat_id)
if ":" not in data:
api.answer_callback_query(callback_query_id, "Неизвестное действие")
return
action, approval_id = data.split(":", 1)
if action not in {"approve", "reject"}:
if action == "provider":
catalog = get_provider_catalog(config)
provider_names = {str(item.get("name") or "") for item in catalog.get("providers", [])}
if approval_id not in provider_names:
api.answer_callback_query(callback_query_id, "Неизвестный провайдер")
return
set_chat_provider(state, chat_id, approval_id, reset_model=True)
provider, model = get_selected_provider_and_model(state, chat_id, catalog)
api.edit_message_text(
chat_id,
message_id,
f"Выбран provider: {provider}\nmodel: {model or '-'}",
reply_markup={"inline_keyboard": []},
)
api.answer_callback_query(callback_query_id, "Провайдер переключен")
return
if action == "model":
provider, model_id = approval_id.split(":", 1) if ":" in approval_id else ("", "")
catalog = get_provider_catalog(config)
provider_map = {str(item.get("name") or ""): item for item in catalog.get("providers", [])}
provider_info = provider_map.get(provider)
model_ids = {
str(item.get("id") or "")
for item in (provider_info.get("models", []) if provider_info else [])
}
if model_id not in model_ids:
api.answer_callback_query(callback_query_id, "Неизвестная модель")
return
set_chat_provider(state, chat_id, provider, reset_model=False)
set_chat_model(state, chat_id, model_id)
api.edit_message_text(
chat_id,
message_id,
f"Выбрана модель: {model_id}\nprovider: {provider}",
reply_markup={"inline_keyboard": []},
)
api.answer_callback_query(callback_query_id, "Модель переключена")
return
api.answer_callback_query(callback_query_id, "Неизвестное действие")
return
response = post_json(
f"{config.server_url}/api/v1/approval/respond",
{
"approval_id": approval_id,
"approved": action == "approve",
"actor": actor,
},
)
status = str(response.get("status") or "unknown")
pending = state.setdefault("pending_approvals", {}).get(str(chat_id))
if response.get("status") != "pending":
state["pending_approvals"].pop(str(chat_id), None)
api.edit_message_text(
chat_id,
message_id,
f"Approval {approval_id}: {status}",
reply_markup={"inline_keyboard": []},
)
api.answer_callback_query(
callback_query_id,
"Подтверждено" if action == "approve" else "Отклонено",
)
def main() -> None:
config = BotConfig.load()
api = TelegramAPI(config.token, proxy_url=config.proxy_url)
state = load_state()
ensure_bot_commands(api, state)
save_state(state)
print("new-qwen bot polling started")
while True:
try:
process_auth_flows(api, config, state)
process_active_jobs(api, config, state)
timeout = config.poll_timeout
if state.get("active_jobs"):
timeout = min(timeout, 3)
updates = api.get_updates(state.get("offset"), timeout)
for update in updates:
state["offset"] = update["update_id"] + 1
message = update.get("message")
if message:
handle_message(api, config, state, message)
callback_query = update.get("callback_query")
if callback_query:
handle_callback_query(api, config, state, callback_query)
save_state(state)
save_state(state)
except Exception as exc:
print(f"bot loop error: {exc}")
time.sleep(3)
if __name__ == "__main__":
main()