Improve Telegram bot status and approval UX

This commit is contained in:
mirivlad 2026-04-09 02:51:21 +08:00
parent 371c5b198b
commit 4a9b2e27fa
2 changed files with 215 additions and 20 deletions

View File

@ -1,6 +1,8 @@
from __future__ import annotations from __future__ import annotations
import html
import json import json
import re
import time import time
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@ -11,6 +13,7 @@ from telegram_api import TelegramAPI
STATE_FILE = Path(__file__).resolve().parent.parent / ".new-qwen" / "telegram-state.json" STATE_FILE = Path(__file__).resolve().parent.parent / ".new-qwen" / "telegram-state.json"
TYPING_INTERVAL_SECONDS = 4.0
def load_state() -> dict[str, Any]: def load_state() -> dict[str, Any]:
@ -63,6 +66,82 @@ def send_text_chunks(api: TelegramAPI, chat_id: int, text: str) -> None:
api.send_message(chat_id, normalized[start : start + chunk_size]) api.send_message(chat_id, normalized[start : start + chunk_size])
def render_markdownish_html(text: str) -> str:
normalized = (text or "Пустой ответ.").replace("\r\n", "\n")
fence_pattern = re.compile(r"```(?:[^\n`]*)\n(.*?)```", re.DOTALL)
placeholders: list[str] = []
def replace_fence(match: re.Match[str]) -> str:
placeholders.append(f"<pre>{html.escape(match.group(1).strip())}</pre>")
return f"@@CODEBLOCK_{len(placeholders) - 1}@@"
normalized = fence_pattern.sub(replace_fence, normalized)
escaped = html.escape(normalized)
escaped = re.sub(r"(?<!\*)\*\*(.+?)\*\*(?!\*)", r"<b>\1</b>", escaped)
escaped = re.sub(r"`([^`\n]+)`", r"<code>\1</code>", escaped)
escaped = re.sub(r"(?m)^\*\s+", "", escaped)
escaped = re.sub(r"(?m)^-\s+", "", escaped)
for index, block in enumerate(placeholders):
escaped = escaped.replace(f"@@CODEBLOCK_{index}@@", block)
return escaped
def update_status_message(
api: TelegramAPI,
job_state: dict[str, Any],
text: str,
) -> None:
normalized = (text or "Пустой ответ.")[:4000]
chat_id = int(job_state["chat_id"])
message_id = int(job_state.get("status_message_id") or 0)
if message_id:
api.edit_message_text(chat_id, message_id, normalized)
else:
message_id = api.send_message(chat_id, normalized)
job_state["status_message_id"] = message_id
job_state["status_message_text"] = normalized
def set_final_message(
api: TelegramAPI,
job_state: dict[str, Any],
text: str,
) -> None:
normalized = text or "Пустой ответ."
chunk_size = 3800
first_chunk = normalized[:chunk_size]
rendered = render_markdownish_html(first_chunk)
chat_id = int(job_state["chat_id"])
message_id = int(job_state.get("status_message_id") or 0)
if message_id:
api.edit_message_text(chat_id, message_id, rendered, parse_mode="HTML")
else:
job_state["status_message_id"] = api.send_message(
chat_id,
rendered,
parse_mode="HTML",
)
job_state["status_message_text"] = first_chunk
for start in range(chunk_size, len(normalized), chunk_size):
api.send_message(
int(job_state["chat_id"]),
render_markdownish_html(normalized[start : start + chunk_size]),
parse_mode="HTML",
)
def keep_job_typing(
api: TelegramAPI,
job_state: dict[str, Any],
) -> None:
now = time.time()
last_typing_at = float(job_state.get("last_typing_at") or 0.0)
if now - last_typing_at < TYPING_INTERVAL_SECONDS:
return
api.send_chat_action(int(job_state["chat_id"]), "typing")
job_state["last_typing_at"] = now
def summarize_event(event: dict[str, Any]) -> str | None: def summarize_event(event: dict[str, Any]) -> str | None:
event_type = event.get("type") event_type = event.get("type")
if event_type == "job_status": if event_type == "job_status":
@ -93,14 +172,23 @@ def summarize_event(event: dict[str, Any]) -> str | None:
return None return None
def format_approval_keyboard(approval_id: str) -> dict[str, Any]:
return {
"inline_keyboard": [
[
{"text": "Approve", "callback_data": f"approve:{approval_id}"},
{"text": "Reject", "callback_data": f"reject:{approval_id}"},
]
]
}
def format_approval_request(event: dict[str, Any]) -> str: def format_approval_request(event: dict[str, Any]) -> str:
return ( return (
"Нужно подтверждение инструмента.\n" "Нужно подтверждение инструмента.\n"
f"approval_id: {event.get('approval_id')}\n" f"approval_id: {event.get('approval_id')}\n"
f"tool: {event.get('tool_name')}\n" f"tool: {event.get('tool_name')}\n"
f"args: {json.dumps(event.get('arguments', {}), ensure_ascii=False)}\n\n" f"args: {json.dumps(event.get('arguments', {}), ensure_ascii=False)}"
f"/approve {event.get('approval_id')}\n"
f"/reject {event.get('approval_id')}"
) )
@ -203,7 +291,7 @@ def start_chat_job(
) -> None: ) -> None:
session_id = state.setdefault("sessions", {}).get(session_key) session_id = state.setdefault("sessions", {}).get(session_key)
prefix = "Обрабатываю отложенный запрос..." if delayed else "Обрабатываю запрос..." prefix = "Обрабатываю отложенный запрос..." if delayed else "Обрабатываю запрос..."
api.send_message(chat_id, prefix) status_message_id = api.send_message(chat_id, prefix)
start_result = post_json( start_result = post_json(
f"{config.server_url}/api/v1/chat/start", f"{config.server_url}/api/v1/chat/start",
{ {
@ -220,7 +308,9 @@ def start_chat_job(
"session_key": session_key, "session_key": session_key,
"session_id": start_result["session_id"], "session_id": start_result["session_id"],
"seen_seq": 0, "seen_seq": 0,
"sent_statuses": [], "status_message_id": status_message_id,
"status_message_text": prefix,
"last_typing_at": 0.0,
} }
state.setdefault("chat_active_jobs", {})[str(chat_id)] = start_result["job_id"] state.setdefault("chat_active_jobs", {})[str(chat_id)] = start_result["job_id"]
@ -392,6 +482,7 @@ def process_active_jobs(
pending_approvals = state.setdefault("pending_approvals", {}) pending_approvals = state.setdefault("pending_approvals", {})
for job_id in list(active_jobs.keys()): for job_id in list(active_jobs.keys()):
job_state = active_jobs[job_id] job_state = active_jobs[job_id]
keep_job_typing(api, job_state)
poll_result = post_json( poll_result = post_json(
f"{config.server_url}/api/v1/chat/poll", f"{config.server_url}/api/v1/chat/poll",
{"job_id": job_id, "since_seq": job_state.get("seen_seq", 0)}, {"job_id": job_id, "since_seq": job_state.get("seen_seq", 0)},
@ -404,25 +495,22 @@ def process_active_jobs(
"approval_id": event["approval_id"], "approval_id": event["approval_id"],
"job_id": job_id, "job_id": job_id,
} }
send_text_chunks( api.send_message(
api,
int(job_state["chat_id"]), int(job_state["chat_id"]),
format_approval_request(event), format_approval_request(event),
reply_markup=format_approval_keyboard(str(event["approval_id"])),
) )
continue continue
summary = summarize_event(event) summary = summarize_event(event)
sent_statuses = set(job_state.get("sent_statuses", [])) if summary and summary != job_state.get("status_message_text"):
if summary and summary not in sent_statuses: update_status_message(api, job_state, summary)
api.send_message(int(job_state["chat_id"]), summary[:4000])
sent_statuses.add(summary)
job_state["sent_statuses"] = sorted(sent_statuses)
status = poll_result.get("status") status = poll_result.get("status")
if status == "completed": if status == "completed":
state["sessions"][job_state["session_key"]] = poll_result["session_id"] state["sessions"][job_state["session_key"]] = poll_result["session_id"]
send_text_chunks( set_final_message(
api, api,
int(job_state["chat_id"]), job_state,
poll_result.get("answer") or "Пустой ответ от модели.", poll_result.get("answer") or "Пустой ответ от модели.",
) )
active_jobs.pop(job_id, None) active_jobs.pop(job_id, None)
@ -432,9 +520,9 @@ def process_active_jobs(
pending_approvals.pop(str(job_state["chat_id"]), None) pending_approvals.pop(str(job_state["chat_id"]), None)
start_next_queued_job(api, config, state, int(job_state["chat_id"])) start_next_queued_job(api, config, state, int(job_state["chat_id"]))
elif status == "failed": elif status == "failed":
send_text_chunks( set_final_message(
api, api,
int(job_state["chat_id"]), job_state,
f"Job завершился с ошибкой: {poll_result.get('error')}", f"Job завершился с ошибкой: {poll_result.get('error')}",
) )
active_jobs.pop(job_id, None) active_jobs.pop(job_id, None)
@ -444,9 +532,9 @@ def process_active_jobs(
pending_approvals.pop(str(job_state["chat_id"]), None) pending_approvals.pop(str(job_state["chat_id"]), None)
start_next_queued_job(api, config, state, int(job_state["chat_id"])) start_next_queued_job(api, config, state, int(job_state["chat_id"]))
elif status == "canceled": elif status == "canceled":
send_text_chunks( set_final_message(
api, api,
int(job_state["chat_id"]), job_state,
f"Job отменён: {poll_result.get('error') or 'Canceled by operator'}", f"Job отменён: {poll_result.get('error') or 'Canceled by operator'}",
) )
active_jobs.pop(job_id, None) active_jobs.pop(job_id, None)
@ -643,6 +731,53 @@ def handle_message(api: TelegramAPI, config: BotConfig, state: dict[str, Any], m
start_or_queue_chat_job(api, config, state, chat_id, user_id, session_key, text) start_or_queue_chat_job(api, config, state, chat_id, user_id, session_key, text)
def handle_callback_query(
api: TelegramAPI,
config: BotConfig,
state: dict[str, Any],
callback_query: dict[str, Any],
) -> None:
callback_query_id = str(callback_query.get("id") or "")
data = str(callback_query.get("data") or "")
message = callback_query.get("message") or {}
chat = message.get("chat") or {}
chat_id = int(chat.get("id"))
message_id = int(message.get("message_id"))
actor = str((callback_query.get("from") or {}).get("id") or chat_id)
if ":" not in data:
api.answer_callback_query(callback_query_id, "Неизвестное действие")
return
action, approval_id = data.split(":", 1)
if action not in {"approve", "reject"}:
api.answer_callback_query(callback_query_id, "Неизвестное действие")
return
response = post_json(
f"{config.server_url}/api/v1/approval/respond",
{
"approval_id": approval_id,
"approved": action == "approve",
"actor": actor,
},
)
status = str(response.get("status") or "unknown")
pending = state.setdefault("pending_approvals", {}).get(str(chat_id))
if response.get("status") != "pending":
state["pending_approvals"].pop(str(chat_id), None)
api.edit_message_text(
chat_id,
message_id,
f"Approval {approval_id}: {status}",
reply_markup={"inline_keyboard": []},
)
api.answer_callback_query(
callback_query_id,
"Подтверждено" if action == "approve" else "Отклонено",
)
def main() -> None: def main() -> None:
config = BotConfig.load() config = BotConfig.load()
api = TelegramAPI(config.token, proxy_url=config.proxy_url) api = TelegramAPI(config.token, proxy_url=config.proxy_url)
@ -661,6 +796,9 @@ def main() -> None:
message = update.get("message") message = update.get("message")
if message: if message:
handle_message(api, config, state, message) handle_message(api, config, state, message)
callback_query = update.get("callback_query")
if callback_query:
handle_callback_query(api, config, state, callback_query)
save_state(state) save_state(state)
save_state(state) save_state(state)
except Exception as exc: except Exception as exc:

View File

@ -36,5 +36,62 @@ class TelegramAPI:
response = self._get("getUpdates", params) response = self._get("getUpdates", params)
return response.get("result", []) return response.get("result", [])
def send_message(self, chat_id: int, text: str) -> None: def send_message(
self._post("sendMessage", {"chat_id": chat_id, "text": text}) self,
chat_id: int,
text: str,
*,
parse_mode: str | None = None,
reply_markup: dict[str, Any] | None = None,
) -> int:
payload: dict[str, Any] = {"chat_id": chat_id, "text": text}
if parse_mode:
payload["parse_mode"] = parse_mode
if reply_markup:
payload["reply_markup"] = reply_markup
response = self._post("sendMessage", payload)
result = response.get("result") or {}
return int(result.get("message_id", 0))
def edit_message_text(
self,
chat_id: int,
message_id: int,
text: str,
*,
parse_mode: str | None = None,
reply_markup: dict[str, Any] | None = None,
) -> None:
try:
payload: dict[str, Any] = {
"chat_id": chat_id,
"message_id": message_id,
"text": text,
}
if parse_mode:
payload["parse_mode"] = parse_mode
if reply_markup is not None:
payload["reply_markup"] = reply_markup
self._post(
"editMessageText",
payload,
)
except Exception as exc:
if "message is not modified" in str(exc).lower():
return
raise
def send_chat_action(self, chat_id: int, action: str = "typing") -> None:
self._post(
"sendChatAction",
{
"chat_id": chat_id,
"action": action,
},
)
def answer_callback_query(self, callback_query_id: str, text: str | None = None) -> None:
payload: dict[str, Any] = {"callback_query_id": callback_query_id}
if text:
payload["text"] = text
self._post("answerCallbackQuery", payload)