From 7c088e2051d8895190f6fcc3ac41b1e7077c4da4 Mon Sep 17 00:00:00 2001 From: mirivlad Date: Thu, 26 Feb 2026 07:32:07 +0800 Subject: [PATCH] =?UTF-8?q?v0.5.3:=20=D0=A3=D0=BB=D1=83=D1=87=D1=88=D0=B5?= =?UTF-8?q?=D0=BD=D0=B8=D0=B5=20=D0=B8=D0=BD=D1=81=D1=82=D1=80=D1=83=D0=BC?= =?UTF-8?q?=D0=B5=D0=BD=D1=82=D0=BE=D0=B2=20(SSH,=20cron,=20RSS)=20=D0=B8?= =?UTF-8?q?=20=D0=B8=D0=BD=D1=82=D0=B5=D0=B3=D1=80=D0=B0=D1=86=D0=B8=D0=B8?= =?UTF-8?q?=20=D1=81=20Qwen?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Qwen-Coder --- .env.example | 24 ++- bot.py | 235 +++++++++++++++++++++++--- bot/services/command_executor.py | 6 +- bot/tools/cron_tool.py | 163 +++++++++++++++++- bot/tools/rss_tool.py | 20 ++- bot/tools/ssh_tool.py | 70 ++++++-- qwen_integration.py | 279 ++++++++++++++++++++++++------- 7 files changed, 675 insertions(+), 122 deletions(-) diff --git a/.env.example b/.env.example index 1b54fa0..ce944fd 100644 --- a/.env.example +++ b/.env.example @@ -16,27 +16,25 @@ ALLOWED_USERS= WORKING_DIRECTORY=/home/mirivlad # =========================================== -# Мульти-серверная конфигурация (v2.0) +# SSH Серверы для AI-агента # =========================================== - -# SSH ключ для подключения к серверам -SSH_KEY_PATH=/home/mirivlad/.ssh/id_ed25519 - -# Список серверов (формат: name|host|port|user|tags) -# name - отображаемое имя сервера -# host - IP или домен +# Формат: name|host|port|user|tag|password +# name - имя сервера (используется в ответах бота) +# host - IP адрес или домен # port - SSH порт (обычно 22) # user - пользователь SSH -# tags - теги через запятую для группировки (web,db,prod,dev) +# tag - тег для категоризации (web, db, prod, и т.д.) +# password - пароль SSH (или используйте SSH-ключи) # # Пример: -# SERVERS=web-prod|192.168.1.10|22|root|web,prod,db-prod|192.168.1.11|22|postgres|db,prod,local|localhost|22|mirivlad|local,dev +# SERVERS=tomas|192.168.1.54|22|mirivlad|web|moloko22 # -# Пустой список = только локальный сервер +# Для нескольких серверов используйте запятую: +# SERVERS=home|192.168.1.54|22|user|web|pass123,work|10.0.0.5|22|admin|db|pass456 SERVERS= -# Сервер по умолчанию (имя из списка или "local") -DEFAULT_SERVER=local +# SSH ключ для подключения (альтернатива паролю) +# SSH_KEY_PATH=/home/user/.ssh/id_ed25519 # =========================================== # SOCKS5 Proxy (опционально) diff --git a/bot.py b/bot.py index 4f625de..4abe634 100644 --- a/bot.py +++ b/bot.py @@ -55,7 +55,7 @@ BASE_DIR = Path(__file__).parent logging.basicConfig( format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", - level=logging.INFO, + level=logging.DEBUG, handlers=[ logging.FileHandler(BASE_DIR / "bot.log"), logging.StreamHandler() @@ -183,8 +183,8 @@ async def handle_ai_task(update: Update, text: str): if tool_result.success: # Формируем ответ с результатами инструмента - full_output = format_tool_result(agent_decision.tool_name, tool_result) - + full_output = await format_tool_result(agent_decision.tool_name, tool_result) + # Добавляем в историю state.ai_chat_history.append(f"Assistant: {full_output[:500]}") save_message(user_id, "assistant", full_output) @@ -201,13 +201,93 @@ async def handle_ai_task(update: Update, text: str): # Продолжаем с обычным ИИ-ответом если инструмент не сработал # === ОБЫЧНЫЙ ИИ-ОТВЕТ через Qwen === - output_buffer = [] + output_buffer = [] # Буфер для потокового отображения + result_buffer = [] # Буфер для финального результата (без статусов) + stream_message = None # Сообщение для потокового вывода (статусы) + result_message = None # Финальное сообщение с результатом + current_status = "🤖 Думаю..." # Текущий статус для отображения + is_tool_output = False # Флаг: идёт ли вывод инструмента def on_output(text: str): - output_buffer.append(text) + """Callback для накопления полного вывода (не используется для streaming).""" + pass def on_oauth_url(url: str): - pass # OAuth обрабатывается автоматически + pass + + def on_event(event): + """Обработка событий stream-json для обновления статуса.""" + nonlocal current_status, is_tool_output + + from qwen_integration import QwenEventType + + if event.event_type == QwenEventType.SYSTEM: + if event.subtype == 'session_start': + current_status = "🤖 Запуск сессии..." + + elif event.event_type == QwenEventType.ASSISTANT: + message = event.message or {} + content_list = message.get('content', []) + for content_item in content_list: + if isinstance(content_item, dict) and content_item.get('type') == 'tool_use': + tool_name = content_item.get('name', 'инструмент') + current_status = f"🔧 Использую {tool_name}..." + is_tool_output = True # Начинается вывод инструмента + break + + elif event.event_type == QwenEventType.RESULT: + if event.is_error: + current_status = "❌ Ошибка" + else: + current_status = "✅ Готово" + + logger.debug(f"Событие Qwen: {event.event_type.value}, статус: {current_status}") + + async def on_chunk(chunk: str): + """Потоковая отправка chunks в Telegram.""" + nonlocal stream_message, current_status, is_tool_output + + chunk_text = chunk + if not chunk_text or not chunk_text.strip(): + return + + # Логируем для отладки + logger.debug(f"on_chunk: {repr(chunk_text[:50])}...") + + # Добавляем в потоковый буфер (всё) + output_buffer.append(chunk_text) + + # В result_buffer добавляем ТОЛЬКО если это не статус инструмента + # Статусы инструментов начинаются с "\n🔧" + if not chunk_text.strip().startswith("🔧"): + result_buffer.append(chunk_text) + + logger.debug(f"output_buffer: {len(output_buffer)}, result_buffer: {len(result_buffer)}") + + # Если сообщение ещё не создано - создаём + if stream_message is None: + stream_message = await update.message.reply_text( + f"⏳ {current_status}", + parse_mode="Markdown" + ) + await asyncio.sleep(0.1) + + # Формируем текущий вывод + current_output = "".join(output_buffer) + + # Обрезаем до безопасного размера + if len(current_output) > 3500: + current_output = current_output[-3500:] + + # Обновляем сообщение + try: + await stream_message.edit_text( + f"⏳ {current_status}\n\n{current_output}", + parse_mode="Markdown" + ) + except Exception as e: + logger.debug(f"Ошибка редактирования: {e}") + await asyncio.sleep(0.3) # Формируем контекст с историей + памятью + summary # Получаем summary и последние сообщения из ChromaDB @@ -260,21 +340,32 @@ async def handle_ai_task(update: Update, text: str): f"{text}" ) - # Выполняем задачу (системный промпт уже добавлен в full_task) - result = await qwen_manager.run_task(user_id, full_task, on_output, on_oauth_url, use_system_prompt=False) + # Выполняем задачу с потоковым выводом + result = await qwen_manager.run_task( + user_id, full_task, on_output, on_oauth_url, + use_system_prompt=False, on_chunk=on_chunk, on_event=on_event + ) - # Показываем результат - full_output = "".join(output_buffer).strip() + # Формируем финальный результат ИЗ result_buffer (без статусов инструментов) + full_output = "".join(result_buffer).strip() + # Если result_buffer пустой — пробуем извлечь текст из result if not full_output: - full_output = result + logger.warning("result_buffer пустой, пробуем извлечь текст из result") + import re + text_matches = re.findall(r'"text":"([^"]+)"', result) + if text_matches: + full_output = " ".join(text_matches).replace("\\n", "\n") + else: + full_output = "⚠️ Не удалось получить ответ ИИ" + logger.error(f"Result: {result[:500]}...") # Добавляем ответ ИИ в историю и память - if full_output: + if full_output and full_output != "⚠️ Не удалось получить ответ ИИ": state.ai_chat_history.append(f"Assistant: {full_output[:500]}") save_message(user_id, "assistant", full_output) - # Обрезаем если слишком длинный (с запасом на контекст) + # Обрезаем если слишком длинный if len(full_output) > 3500: full_output = full_output[:3500] + "\n... (вывод обрезан)" @@ -282,19 +373,68 @@ async def handle_ai_task(update: Update, text: str): state.messages_since_fact_extract += 1 if state.messages_since_fact_extract >= 5: logger.info(f"Запуск извлечения фактов через ИИ для пользователя {user_id}") - dialog_context = "\n".join(state.ai_chat_history[-10:]) # Последние 10 сообщений + dialog_context = "\n".join(state.ai_chat_history[-10:]) asyncio.create_task(hybrid_memory_manager.extract_facts_with_ai(user_id, dialog_context)) state.messages_since_fact_extract = 0 - # Формируем сообщение с информацией о контексте (как в qwen-code) + # Формируем сообщение с информацией о контексте context_info = f"📊 Контекст: {context_percent}%" + + # Отправляем результат ОТДЕЛЬНЫМ сообщением response_text = f"{full_output}\n\n*{context_info}*" - - # Отправляем ответ с разбивкой на части если нужно - await send_long_message(update, response_text, parse_mode="Markdown") + + # Отправляем новое сообщение с результатом + await update.message.reply_text( + response_text, + parse_mode="Markdown" + ) + + # Обновляем потоковое сообщение на финальный статус + if stream_message: + try: + await stream_message.edit_text( + f"✅ {current_status}", + parse_mode="Markdown" + ) + except Exception as e: + logger.debug(f"Ошибка обновления статусного сообщения: {e}") -def format_tool_result(tool_name: str, result: 'ToolResult') -> str: +async def translate_title(title: str, max_length: int = 100) -> str: + """ + Краткий перевод заголовка на русский через ИИ. + Если перевод не удался — возвращает оригинал. + """ + try: + # Быстрый промпт для перевода + prompt = f"Переведи на русский язык этот заголовок новости (максимум {max_length} символов, без кавычек и пояснений):\n{title[:200]}" + + # Используем qwen_manager для перевода + from qwen_integration import qwen_manager + + # Создаём временную сессию для перевода + import hashlib + temp_user_id = f"translator_{hashlib.md5(title.encode()).hexdigest()}" + + result = await qwen_manager.run_task(temp_user_id, prompt, on_output=lambda x: None, on_oauth_url=lambda x: None, use_system_prompt=False) + + # Извлекаем текст из результата + import re + text_matches = re.findall(r'"text":"([^"]+)"', result) + if text_matches: + translated = " ".join(text_matches).replace("\\n", " ").strip() + # Убираем кавычки если есть + translated = translated.strip('"\'') + if translated and len(translated) > 3: + return translated[:max_length] + + return title[:max_length] + except Exception as e: + logger.debug(f"Ошибка перевода заголовка: {e}") + return title[:max_length] + + +async def format_tool_result(tool_name: str, result: 'ToolResult') -> str: """Форматировать результат выполнения инструмента.""" from bot.tools import ToolResult @@ -315,20 +455,55 @@ def format_tool_result(tool_name: str, result: 'ToolResult') -> str: return output - elif tool_name == 'rss_reader': + elif tool_name in ('rss_reader', 'rss_tool'): action = result.metadata.get('action', 'list') if action == 'list' and result.data: + # Помечаем новости как прочитанные (digest_flag=1) + from bot.tools.rss_tool import RSSTool + rss_tool_instance = RSSTool(db_path='rss.db') + for item in result.data: + news_id = item.get('id') + if news_id: + await rss_tool_instance.mark_digest(news_id) + logger.debug(f"Новость {news_id} помечена как прочитанная") + output = "📰 **Последние новости:**\n\n" - for i, item in enumerate(result.data[:10], 1): + # Берём не более 15 новостей для читаемости + news_count = min(len(result.data), 15) + + for i in range(news_count): + item = result.data[i] title = item.get('title', 'Без названия') pub_date = item.get('pub_date', '') - link = item.get('link', '')[:50] - output += f"{i}. {title}\n" + link = item.get('link', '') + + # Переводим заголовок на русский + translated_title = await translate_title(title, max_length=100) + + # Форматируем дату + date_str = "" if pub_date: - output += f" 📅 {pub_date}\n" + try: + # Преобразуем дату в более читаемый формат + dt = datetime.strptime(pub_date[:19], '%Y-%m-%d %H:%M:%S') + date_str = dt.strftime('%d.%m.%Y %H:%M') + except: + date_str = pub_date[:16] + + # Обрезаем заголовок если слишком длинный + if len(translated_title) > 120: + translated_title = translated_title[:117] + "..." + + output += f"**{i+1}. {translated_title}**\n" + if date_str: + output += f" 📅 {date_str}\n" if link: - output += f" 🔗 {link}\n\n" + # Обрезаем ссылку для читаемости + short_link = link[:60] + "..." if len(link) > 63 else link + output += f" 🔗 {short_link}\n" + output += "\n" + return output elif action == 'fetch': @@ -338,7 +513,15 @@ def format_tool_result(tool_name: str, result: 'ToolResult') -> str: elif action == 'list_feeds' and result.data: output = "📑 **Ваши RSS ленты:**\n\n" for feed in result.data: - output += f"• {feed.get('title', feed.get('url', 'Unknown'))}\n" + title = feed.get('title', feed.get('url', 'Unknown')) + url = feed.get('url', '') + last_fetch = feed.get('last_fetched', '') + + output += f"• **{title}**\n" + output += f" 🔗 {url}\n" + if last_fetch: + output += f" 🕐 Обновлено: {last_fetch[:16]}\n" + output += "\n" return output return f"RSS: {result.data}" diff --git a/bot/services/command_executor.py b/bot/services/command_executor.py index dc81511..d1b1e12 100644 --- a/bot/services/command_executor.py +++ b/bot/services/command_executor.py @@ -3,6 +3,9 @@ import asyncio import logging +import os +import pty +from datetime import datetime from typing import Tuple import asyncssh @@ -12,7 +15,8 @@ from bot.config import config, state_manager, server_manager from bot.models.server import Server from bot.models.session import ssh_session_manager, local_session_manager from bot.utils.ssh_readers import read_ssh_output, read_pty_output, detect_input_type -from bot.utils.formatters import format_long_output +from bot.utils.formatters import format_long_output, escape_markdown, send_long_message +from bot.utils.cleaners import clean_ansi_codes, normalize_output logger = logging.getLogger(__name__) diff --git a/bot/tools/cron_tool.py b/bot/tools/cron_tool.py index 349e1c3..215b949 100644 --- a/bot/tools/cron_tool.py +++ b/bot/tools/cron_tool.py @@ -20,6 +20,50 @@ from bot.tools import BaseTool, ToolResult, register_tool logger = logging.getLogger(__name__) +async def _translate_title(title: str, max_length: int = 100) -> str: + """ + Перевести заголовок на русский через Qwen. + + Args: + title: Заголовок для перевода + max_length: Максимальная длина + + Returns: + Переведённый заголовок + """ + try: + import subprocess + import json + + # Создаём временный промпт для перевода + translate_prompt = f"Translate this news title to Russian. Keep it concise, natural, and informative. Maximum {max_length} characters. Return ONLY the translation, no quotes or explanations.\n\nTitle: {title[:200]}" + + # Используем qwen-cli если доступен + result = subprocess.run( + ['qwen', 'chat', '--json', '--prompt', translate_prompt], + capture_output=True, + text=True, + timeout=15 + ) + + if result.returncode == 0: + # Парсим JSON ответ + try: + response = json.loads(result.stdout) + translated = response.get('content', response.get('response', title)) + except json.JSONDecodeError: + translated = result.stdout.strip() + + # Очищаем от кавычек + translated = translated.strip('"\'') + return translated[:max_length] + except Exception as e: + logger.debug(f"Ошибка перевода заголовка: {e}") + + # Fallback - обрезаем оригинал + return title[:max_length] + + @dataclass class CronJob: """ @@ -425,14 +469,19 @@ class CronTool(BaseTool): result_data['tool_used'] = decision.tool_name result_data['tool_result'] = tool_result.to_dict() if hasattr(tool_result, 'to_dict') else str(tool_result) result_data['success'] = tool_result.success - - # Формируем результат + + # Формируем результат с красивым форматированием result_text = f"Задача '{name}' выполнена.\n\n" - result_text += f"Использован инструмент: {decision.tool_name}\n" + result_text += f"Использован инструмент: {decision.tool_name}\n\n" + + # Форматируем результат инструмента в читаемый вид if tool_result.success: - result_text += f"Результат: {tool_result.data or 'Успешно'}" + formatted_result = await self._format_tool_result_for_cron( + decision.tool_name, tool_result.data, tool_result.error + ) + result_text += formatted_result else: - result_text += f"Ошибка: {tool_result.error}" + result_text += f"❌ Ошибка: {tool_result.error}" else: # ИИ решил что инструмент не нужен - выполняем промпт напрямую @@ -487,10 +536,108 @@ class CronTool(BaseTool): data=result_data ) + async def _format_tool_result_for_cron(self, tool_name: str, data: Any, error: str = None) -> str: + """ + Отформатировать результат выполнения инструмента в читаемый вид. + + Args: + tool_name: Название инструмента + data: Данные результата + error: Ошибка (если есть) + + Returns: + Отформатированная строка с результатом + """ + # Поддерживаем оба имени: 'rss_reader' (старое) и 'rss_tool' (новое) + if tool_name in ('rss_reader', 'rss_tool'): + if not data: + return "📰 Новостей не найдено." + + output = "📰 **Последние новости:**\n\n" + # Берём не более 15 новостей для читаемости + news_count = min(len(data), 15) + + for i in range(news_count): + item = data[i] + title = item.get('title', 'Без названия') + pub_date = item.get('pub_date', '') + link = item.get('link', '') + + # Переводим заголовок на русский + translated_title = await _translate_title(title, max_length=100) + + # Форматируем дату + date_str = "" + if pub_date: + try: + dt = datetime.strptime(pub_date[:19], '%Y-%m-%d %H:%M:%S') + date_str = dt.strftime('%d.%m.%Y %H:%M') + except: + date_str = pub_date[:16] + + # Обрезаем заголовок если слишком длинный + if len(translated_title) > 120: + translated_title = translated_title[:117] + "..." + + output += f"**{i+1}. {translated_title}**\n" + if date_str: + output += f" 📅 {date_str}\n" + if link: + short_link = link[:60] + "..." if len(link) > 63 else link + output += f" 🔗 {short_link}\n" + output += "\n" + + return output + + elif tool_name == 'ddgs_search': + if not data: + return "🔍 Ничего не найдено по вашему запросу." + + output = "🔍 **Результаты поиска:**\n\n" + for i, item in enumerate(data[:5], 1): + title = item.get('title', 'Без названия') + href = item.get('href', '') + body = item.get('body', '')[:200] + output += f"{i}. **{title}**\n" + if href: + output += f" {href}\n" + if body: + output += f" {body}\n\n" + + return output + + elif tool_name == 'ssh_executor': + if not data: + return "❌ **Ошибка SSH:** Нет данных" + + output = "🖥️ **SSH результат:**\n" + + if isinstance(data, dict): + if data.get('stdout'): + output += f"**Вывод:**\n```\n{data['stdout']}\n```\n\n" + if data.get('stderr'): + output += f"**Ошибки:**\n```\n{data['stderr']}\n```\n\n" + if data.get('returncode') == 0: + output += "✅ **Успешно**" + else: + output += f"❌ **Код возврата:** {data.get('returncode', 'N/A')}" + else: + output += str(data) + + return output + + elif tool_name == 'cron_tool': + if isinstance(data, dict): + return f"✅ **Результат:**\n{data}" + return str(data) + + # Fallback для неизвестных инструментов + return str(data) if data else "Выполнено" + def _save_to_log(self, job_id: int, job_name: str, prompt: str, result: str): """Сохранить результат выполнения задачи в лог-файл.""" log_file = self.log_dir / f"cron_job_{job_id}_{job_name}.log" - + timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') log_entry = f""" {'='*60} @@ -503,10 +650,10 @@ class CronTool(BaseTool): {result} """ - + with open(log_file, 'a', encoding='utf-8') as f: f.write(log_entry) - + logger.debug(f"Результат задачи {job_name} сохранён в лог: {log_file}") async def execute(self, action: str = "list", ai_agent=None, user_id: int = None, **kwargs) -> ToolResult: diff --git a/bot/tools/rss_tool.py b/bot/tools/rss_tool.py index 678d3b5..0c65ee7 100644 --- a/bot/tools/rss_tool.py +++ b/bot/tools/rss_tool.py @@ -173,12 +173,18 @@ class RSSTool(BaseTool): result = subprocess.run( ['curl', '-sL', '-m', '30', '-A', 'Mozilla/5.0', url], - capture_output=True, text=True + capture_output=True ) if result.returncode == 0 and result.stdout: + # Декодируем с обработкой ошибок кодировки + try: + content = result.stdout.decode('utf-8', errors='ignore') + except Exception: + content = result.stdout.decode('latin-1', errors='ignore') + count = 0 - for item in self._parse_feed(result.stdout): + for item in self._parse_feed(content): self._insert_news(feed_id, item['title'], item['link'], item['guid'], item['pub']) count += 1 @@ -196,7 +202,7 @@ class RSSTool(BaseTool): return ToolResult( success=True, data={'total_new_items': total}, - metadata={'status': 'completed'} + metadata={'status': 'completed', 'action': 'fetch'} ) finally: self.lock_file.unlink(missing_ok=True) @@ -223,7 +229,7 @@ class RSSTool(BaseTool): query = f""" SELECT id, feed_id, title, pub_date, link, digest_flag FROM news WHERE {' AND '.join(conditions)} - ORDER BY pub_date DESC LIMIT ? + ORDER BY created_at DESC, id DESC LIMIT ? """ params.append(limit) @@ -247,7 +253,7 @@ class RSSTool(BaseTool): return ToolResult( success=True, data=news_list, - metadata={'count': len(news_list), 'limit': limit} + metadata={'count': len(news_list), 'limit': limit, 'action': 'list'} ) async def add_feed(self, url: str, title: Optional[str] = None) -> ToolResult: @@ -262,7 +268,7 @@ class RSSTool(BaseTool): return ToolResult( success=True, data={'url': url, 'title': title}, - metadata={'status': 'added'} + metadata={'status': 'added', 'action': 'add_feed'} ) except sqlite3.IntegrityError: return ToolResult( @@ -295,7 +301,7 @@ class RSSTool(BaseTool): return ToolResult( success=True, data=feeds, - metadata={'count': len(feeds)} + metadata={'count': len(feeds), 'action': 'list_feeds'} ) async def mark_digest(self, news_id: int) -> ToolResult: diff --git a/bot/tools/ssh_tool.py b/bot/tools/ssh_tool.py index 654b688..53ec615 100644 --- a/bot/tools/ssh_tool.py +++ b/bot/tools/ssh_tool.py @@ -4,20 +4,28 @@ SSH Executor Tool - инструмент для выполнения коман Бот может использовать этот инструмент автономно для выполнения системных задач на серверах пользователя. + +Конфигурация серверов загружается из .env: +SERVERS=name|host|port|user|tag|password|... """ import logging import asyncio +import os from pathlib import Path from typing import Optional, Dict, Any, List from dataclasses import dataclass import asyncssh +from dotenv import load_dotenv from bot.tools import BaseTool, ToolResult, register_tool logger = logging.getLogger(__name__) +# Загрузка переменных окружения +load_dotenv() + @dataclass class ServerConfig: @@ -27,6 +35,7 @@ class ServerConfig: username: str password: Optional[str] = None client_keys: Optional[List[str]] = None + tags: List[str] = None class SSHExecutorTool(BaseTool): @@ -37,17 +46,46 @@ class SSHExecutorTool(BaseTool): category = "system" def __init__(self): - # Серверы по умолчанию (можно расширять) - self.servers: Dict[str, ServerConfig] = { - 'home': ServerConfig( - host='192.168.1.54', - port=22, - username='mirivlad', - password='moloko22' - ) - } + # Загружаем серверы из .env + self.servers: Dict[str, ServerConfig] = {} self._last_connection: Optional[asyncssh.SSHClientConnection] = None self._last_server: Optional[str] = None + + self._load_servers_from_env() + + def _load_servers_from_env(self): + """ + Загрузить конфигурацию серверов из .env. + + Формат в .env: + SERVERS=name|host|port|user|tag|password + + Пример: + SERVERS=tomas|192.168.1.54|22|mirivlad|web|moloko22 + """ + servers_str = os.getenv('SERVERS', '') + + if not servers_str.strip(): + logger.warning("SERVERS не найден в .env, SSH инструмент не будет работать") + return + + # Парсим формат: name|host|port|user|tag|password + parts = servers_str.strip().split('|') + + if len(parts) >= 6: + name, host, port, user, tag, password = parts[:6] + + self.servers[name.strip()] = ServerConfig( + host=host.strip(), + port=int(port.strip()), + username=user.strip(), + tags=[tag.strip()] if tag.strip() else [], + password=password.strip() if password.strip() else None + ) + logger.info(f"✅ Загружен сервер: {name} ({host}:{port})") + else: + logger.error(f"Неверный формат SERVERS в .env: {servers_str}") + logger.error("Ожидался формат: name|host|port|user|tag|password") async def _connect(self, server_name: str = 'home') -> asyncssh.SSHClientConnection: """Подключиться к серверу.""" @@ -148,13 +186,13 @@ class SSHExecutorTool(BaseTool): 'command': command } - async def execute(self, command: str, server: str = 'home', timeout: int = 30) -> ToolResult: + async def execute(self, command: str, server: str = None, timeout: int = 30) -> ToolResult: """ Выполнить SSH-команду. Args: command: Команда для выполнения - server: Имя сервера (default: 'home') + server: Имя сервера (default: первый из .env) timeout: Таймаут в секундах (default: 30) """ if not command or not command.strip(): @@ -162,6 +200,16 @@ class SSHExecutorTool(BaseTool): success=False, error="Пустая команда" ) + + # Если сервер не указан - используем первый из конфигурации + if server is None: + if not self.servers: + return ToolResult( + success=False, + error="Серверы не настроены. Проверьте SERVERS в .env" + ) + server = list(self.servers.keys())[0] + logger.info(f"Сервер не указан, используем первый: {server}") logger.info(f"SSH Executor: server={server}, command={command[:100]}") diff --git a/qwen_integration.py b/qwen_integration.py index d772e01..d8d44e2 100644 --- a/qwen_integration.py +++ b/qwen_integration.py @@ -2,17 +2,20 @@ """ Интеграция с Qwen Code CLI. Запуск, управление сессиями, обработка OAuth. + +Использует stream-json формат для потокового вывода. """ import os import re import asyncio import subprocess +import json import logging from pathlib import Path from dataclasses import dataclass, field from datetime import datetime, timedelta -from typing import Optional, Dict, Callable, Any +from typing import Optional, Dict, Callable, Any, List, Union from enum import Enum logger = logging.getLogger(__name__) @@ -27,6 +30,28 @@ class QwenSessionState(Enum): ERROR = "error" +class QwenEventType(Enum): + """Типы событий в stream-json выводе Qwen.""" + SYSTEM = "system" + ASSISTANT = "assistant" + USER = "user" + RESULT = "result" + TOOL_USE = "tool_use" + + +@dataclass +class QwenStreamEvent: + """Событие из stream-json вывода Qwen.""" + event_type: QwenEventType + subtype: Optional[str] = None + uuid: Optional[str] = None + session_id: Optional[str] = None + message: Optional[Dict] = None + content: Optional[str] = None + is_error: bool = False + data: Optional[Dict] = None + + @dataclass class QwenSession: """Сессия Qwen Code.""" @@ -37,9 +62,10 @@ class QwenSession: last_activity: datetime = field(default_factory=datetime.now) pending_task: Optional[str] = None output_buffer: str = "" - + session_id: Optional[str] = None + SESSION_TIMEOUT = timedelta(minutes=30) # Таймаут неактивности - + def is_expired(self) -> bool: return datetime.now() - self.last_activity > self.SESSION_TIMEOUT @@ -107,17 +133,20 @@ class QwenCodeManager: async def run_task(self, user_id: int, task: str, on_output: Callable[[str], Any], on_oauth_url: Callable[[str], Any], - use_system_prompt: bool = True) -> str: + use_system_prompt: bool = True, + on_chunk: Callable[[str], Any] = None, + on_event: Callable[[QwenStreamEvent], Any] = None) -> str: """ - Выполнить задачу в Qwen Code. - Для простоты каждый раз запускаем новый процесс. - + Выполнить задачу в Qwen Code с потоковым выводом. + Args: user_id: ID пользователя task: Задача для выполнения - on_output: Callback для вывода + on_output: Callback для вывода (накапливается) on_oauth_url: Callback для OAuth URL use_system_prompt: Добавить системный промпт (default: True) + on_chunk: Callback для потоковой отправки chunks (опционально) + on_event: Callback для событий stream-json (опционально) """ # Создаём временную сессию для отслеживания session = self.get_session(user_id) @@ -137,8 +166,8 @@ class QwenCodeManager: else: full_task = task - # Просто выполняем задачу через -p флаг - return await self._execute_task(session, full_task, on_output) + # Выполняем задачу через -p флаг с stream-json выводом + return await self._execute_task(session, full_task, on_output, on_chunk, on_event) async def _start_session(self, session: QwenSession, on_output: Callable[[str], Any], @@ -226,78 +255,216 @@ class QwenCodeManager: logger.error(f"Ошибка запуска сессии Qwen: {e}") return f"❌ Ошибка: {str(e)}" - async def _execute_task(self, session: QwenSession, + async def _execute_task(self, session: QwenSession, task: str, - on_output: Callable[[str], Any]) -> str: - """Выполнить задачу в активной сессии.""" + on_output: Callable[[str], Any], + on_chunk: Callable[[str], Any] = None, + on_event: Callable[[QwenStreamEvent], Any] = None) -> str: + """ + Выполнить задачу в активной сессии с потоковым stream-json выводом. + + Формат stream-json возвращает JSON-объекты по одному на строку: + {"type":"system","subtype":"session_start","uuid":"...","session_id":"..."} + {"type":"assistant","uuid":"...","message":{"content":[...]}} + {"type":"result","subtype":"success","uuid":"...","result":"..."} + + Args: + session: Сессия Qwen + task: Задача для выполнения + on_output: Callback для полного вывода (накапливается) + on_chunk: Callback для потоковой отправки текстовых chunks + on_event: Callback для полных JSON событий + """ session.state = QwenSessionState.BUSY session.output_buffer = "" - + try: - # Для неинтерактивного режима используем -p env = os.environ.copy() env["FORCE_COLOR"] = "0" - + cmd = [ self._qwen_command, - "-p", task, # Передаём задачу через флаг -p - "--output-format", "text", # Простой текстовый вывод - "--yolo", # Автоматическое подтверждение всех действий + "-p", task, + "--output-format", "stream-json", # Правильный streaming формат + "--yolo", # Авто-подтверждение ] - - logger.info(f"Выполнение задачи: {' '.join(cmd)}") - - process = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, + + logger.info(f"Выполнение задачи (stream-json): {' '.join(cmd)}") + + process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, cwd=self._working_dir, - env=env, - text=True, - bufsize=1 + env=env ) - - # Читаем вывод + output = "" - timeout = 300 # 5 минут на выполнение - - start_time = datetime.now() - + chunk_timeout = 300 # 5 минут на выполнение + last_chunk_time = datetime.now() + partial_content = "" # Для накопления partial messages + while True: - # Проверяем таймаут - if (datetime.now() - start_time).total_seconds() > timeout: + # Проверяем общий таймаут + if (datetime.now() - last_chunk_time).total_seconds() > chunk_timeout: output += "\n\n⚠️ Таймаут выполнения (5 минут)" process.terminate() break - + # Проверяем процесс - if process.poll() is not None: - # Процесс завершился - remaining = process.stdout.read() + if process.returncode is not None: + # Процесс завершился - читаем остаток + remaining = await process.stdout.read() if remaining: - output += remaining - on_output(remaining) + remaining_str = remaining.decode('utf-8', errors='replace') + output += remaining_str + # Парсим оставшиеся JSON события (не отправляем сырой вывод!) + await self._process_stream_lines( + remaining_str, on_output, on_chunk, on_event, session + ) break - - # Читаем вывод - line = process.stdout.readline() - if line: - output += line - session.output_buffer += line - on_output(line) - - # Небольшая пауза - await asyncio.sleep(0.1) - + + # Читаем строку из stdout + try: + line = await asyncio.wait_for(process.stdout.readline(), timeout=1.0) + if line: + line_str = line.decode('utf-8', errors='replace') + output += line_str + session.output_buffer += line_str + last_chunk_time = datetime.now() + + # Парсим JSON событие и извлекаем текст + await self._process_stream_lines( + line_str, on_output, on_chunk, on_event, session + ) + + except asyncio.TimeoutError: + if process.returncode is not None: + break + continue + + await asyncio.sleep(0.01) + session.state = QwenSessionState.READY session.last_activity = datetime.now() - + return output.strip() - + except Exception as e: session.state = QwenSessionState.ERROR logger.error(f"Ошибка выполнения задачи: {e}") return f"❌ Ошибка: {str(e)}" + + async def _process_stream_lines(self, + text: str, + on_output: Callable[[str], Any], + on_chunk: Callable[[str], Any] = None, + on_event: Callable[[QwenStreamEvent], Any] = None, + session: QwenSession = None) -> str: + """ + Распарсить stream-json строки и извлечь текстовый контент. + + Формат JSON событий: + - {"type":"system","subtype":"session_start","session_id":"..."} + - {"type":"assistant","message":{"content":[{"type":"text","text":"..."}]}} + - {"type":"result","subtype":"success","result":"...","duration_ms":1234} + + Возвращает только текстовый контент для отображения пользователю. + """ + extracted_text = "" + + for line in text.split('\n'): + line = line.strip() + if not line: + continue + + # Проверяем это JSON или обычный текст + if line.startswith('{'): + try: + event_data = json.loads(line) + event_type = event_data.get('type', 'unknown') + + # Создаём объект события + stream_event = QwenStreamEvent( + event_type=QwenEventType(event_type) if event_type in ['system', 'assistant', 'user', 'result', 'tool_use'] else None, + subtype=event_data.get('subtype'), + uuid=event_data.get('uuid'), + session_id=event_data.get('session_id'), + message=event_data.get('message'), + is_error=event_data.get('is_error', False), + data=event_data + ) + + # Обновляем session_id из события + if stream_event.session_id and session: + session.session_id = stream_event.session_id + + # Извлекаем текст из разных типов событий + if event_type == 'assistant': + message = event_data.get('message', {}) + content_list = message.get('content', []) + + # Логируем для отладки + logger.debug(f"Assistant event: content_type={type(content_list)}, content={content_list[:1] if isinstance(content_list, list) else content_list}") + + # Обрабатываем только если content - это список (не thinking) + if isinstance(content_list, list): + for content_item in content_list: + if isinstance(content_item, dict): + if content_item.get('type') == 'text': + text_content = content_item.get('text', '') + logger.debug(f"Text chunk: {text_content[:50]}...") + extracted_text += text_content + # Отправляем ТОЛЬКО в on_chunk для streaming + if on_chunk: + await on_chunk(text_content) + elif content_item.get('type') == 'tool_use': + # Инструмент используется - можно показать статус + tool_name = content_item.get('name', 'unknown') + # Добавляем переносы строк для разделения блоков + status_text = f"\n🔧 Использую инструмент: {tool_name}...\n" + extracted_text += status_text + if on_chunk: + await on_chunk(status_text) + # Если content.type == 'thinking' - не отправляем пользователю + + elif event_type == 'result': + result_text = event_data.get('result', '') + if result_text: + extracted_text += result_text + # НЕ отправляем result через on_chunk — он уже был отправлен через assistant chunks + logger.debug(f"Result event: {result_text[:50]}...") + + # Проверяем на ошибку + if event_data.get('is_error'): + error_text = event_data.get('error', 'Неизвестная ошибка') + logger.error(f"Ошибка Qwen: {error_text}") + + elif event_type == 'system': + subtype = event_data.get('subtype', '') + if subtype == 'session_start': + logger.info(f"Сессия Qwen запущена: {stream_event.session_id}") + elif subtype == 'init': + # Игнорируем init событие + pass + + # Вызываем callback события если есть + if on_event: + on_event(stream_event) + + except json.JSONDecodeError as e: + # Не JSON строка - возвращаем как текст + logger.debug(f"Не JSON строка: {line[:100]}...") + extracted_text += line + "\n" + if on_chunk: + await on_chunk(line + "\n") + else: + # Обычный текст (не JSON) - например, приветственное сообщение + extracted_text += line + "\n" + if on_chunk: + await on_chunk(line + "\n") + + return extracted_text def _parse_output(self, output: str) -> str: """