From fbf0edc60a9d9326070b7fab181ac3b740120b40 Mon Sep 17 00:00:00 2001 From: mirivlad Date: Fri, 27 Feb 2026 18:07:57 +0800 Subject: [PATCH] =?UTF-8?q?v0.7.2:=20=D0=A3=D0=BB=D1=83=D1=87=D1=88=D0=B5?= =?UTF-8?q?=D0=BD=D0=B8=D1=8F=20AI-=D0=BF=D1=80=D0=BE=D0=B2=D0=B0=D0=B9?= =?UTF-8?q?=D0=B4=D0=B5=D1=80=D0=BE=D0=B2,=20=D0=B8=D0=BD=D1=81=D1=82?= =?UTF-8?q?=D1=80=D1=83=D0=BC=D0=B5=D0=BD=D1=82=D1=8B=20=D0=B8=20=D0=BE?= =?UTF-8?q?=D0=B1=D1=80=D0=B0=D0=B1=D0=BE=D1=82=D1=87=D0=B8=D0=BA=D0=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Qwen-Coder --- FILE_SYSTEM_TOOL.md | 140 +++++ bot.py | 291 +++++++++-- bot/ai_agent.py | 203 +++++++- bot/ai_provider_manager.py | 40 +- bot/base_ai_provider.py | 73 ++- bot/handlers/ai_presets.py | 264 ++++++++++ bot/handlers/callbacks.py | 32 ++ bot/keyboards/menus.py | 1 + bot/models/user_state.py | 17 +- bot/providers/gigachat_provider.py | 493 +++++++++++++----- bot/tools/__init__.py | 2 +- bot/tools/file_system_tool.py | 803 +++++++++++++++++++++++++++++ bot/tools/gigachat_tool.py | 273 +++++++++- bot/utils/cleaners.py | 3 +- bot/utils/formatters.py | 106 +++- system_prompt.md | 51 +- 16 files changed, 2553 insertions(+), 239 deletions(-) create mode 100644 FILE_SYSTEM_TOOL.md create mode 100644 bot/handlers/ai_presets.py create mode 100644 bot/tools/file_system_tool.py diff --git a/FILE_SYSTEM_TOOL.md b/FILE_SYSTEM_TOOL.md new file mode 100644 index 0000000..3841dbc --- /dev/null +++ b/FILE_SYSTEM_TOOL.md @@ -0,0 +1,140 @@ +# File System Tool - Документация + +## 📋 Описание + +Инструмент для работы с файловой системой Linux. Позволяет AI-агенту (Qwen Code или GigaChat) выполнять операции с файлами и директориями. + +## 🎯 Доступные операции + +| Операция | Описание | Параметры | +|----------|----------|-----------| +| `read` | Чтение файла | `path`, `limit` (макс. строк) | +| `write` | Запись в файл | `path`, `content`, `append` | +| `copy` | Копирование файла/директории | `source`, `destination` | +| `move` | Перемещение/переименование | `source`, `destination` | +| `delete` | Удаление файла/директории | `path`, `recursive` | +| `mkdir` | Создание директории | `path`, `parents` | +| `list` | Список файлов в директории | `path`, `show_hidden` | +| `info` | Информация о файле | `path` | +| `search` | Поиск файлов по паттерну | `path`, `pattern`, `max_results` | +| `shell` | Выполнение shell-команды | `command`, `timeout` | + +## 🔒 Безопасность + +Инструмент имеет систему проверки путей: + +### Разрешённые пути (можно читать/записывать): +- Домашняя директория пользователя (`/home/mirivlad`) +- `/tmp` +- `/var/tmp` + +### Запрещённые пути (только чтение с ограничениями): +- `/etc`, `/usr`, `/bin`, `/sbin` +- `/boot`, `/dev`, `/proc`, `/sys` +- Корень `/` (кроме разрешённых поддиректорий) + +## 📝 Примеры использования + +### Через AI-агента (автоматически) + +``` +Пользователь: "прочитай файл /home/mirivlad/test.txt" +AI-агент → file_system_tool(operation='read', path='/home/mirivlad/test.txt') +``` + +### Прямой вызов + +```python +from bot.tools.file_system_tool import FileSystemTool + +tool = FileSystemTool() + +# Чтение файла +result = await tool.execute(operation='read', path='/path/to/file.txt') + +# Запись файла +result = await tool.execute( + operation='write', + path='/path/to/file.txt', + content='Содержимое файла' +) + +# Копирование +result = await tool.execute( + operation='copy', + source='/source/file.txt', + destination='/dest/file.txt' +) + +# Список директории +result = await tool.execute( + operation='list', + path='/home/mirivlad' +) +``` + +## 🤖 Интеграция с AI-провайдерами + +### GigaChat + +GigaChat использует текстовый формат для вызова инструментов: + +```` +```tool +{"name": "file_system_tool", "arguments": {"operation": "read", "path": "/tmp/test.txt"}} +``` +```` + +### Qwen Code + +Qwen Code поддерживает нативные tool calls через stream-json. + +## 📊 Триггеры для активации + +AI-агент автоматически активирует `file_system_tool` при обнаружении триггеров: + +### Прямые триггеры: +- "прочитай файл", "покажи файл", "открой файл" +- "создай файл", "запиши в файл", "сохрани" +- "скопируй файл", "перемести файл", "удали файл" +- "создай директорию", "создай папку" +- "список файлов", "что в папке" +- "найди файл", "поиск файла" + +### Команды Unix: +- `cat `, `ls `, `mkdir `, `cp `, `mv `, `rm `, `touch ` + +## ⚠️ Ограничения + +1. **Безопасность**: Нельзя записывать/удалять в системных директориях +2. **Размер файлов**: При чтении ограничено 100 строками (настраивается через `limit`) +3. **Shell команды**: Разрешены только безопасные команды (`ls`, `cat`, `cp`, `mv`, `rm`, `mkdir`, `find`, `grep`, etc.) +4. **Таймаут**: Для shell команд таймаут 30 секунд по умолчанию + +## 🔄 История операций + +Инструмент сохраняет историю последних 100 операций для отладки: + +```python +tool._operation_history # Список последних операций +``` + +## 📁 Расположение + +``` +bot/tools/file_system_tool.py +``` + +## 🔧 Добавление в реестр + +Инструмент автоматически регистрируется при импорте: + +```python +from bot.tools import file_system_tool # Авто-регистрация +``` + +--- + +**Версия:** 1.0 +**Совместимость:** Telegram CLI Bot 0.7.1+ +**AI-провайдеры:** Qwen Code, GigaChat diff --git a/bot.py b/bot.py index e23f9d2..6ce9e34 100644 --- a/bot.py +++ b/bot.py @@ -137,6 +137,14 @@ async def handle_ai_task(update: Update, text: str): user_id = update.effective_user.id state = state_manager.get(user_id) + # === ПРОВЕРКА: AI-пресет === + ai_preset = state.ai_preset + + # Если ИИ отключен — пропускаем обработку + if ai_preset == "off": + logger.info(f"Пользователь {user_id}: ИИ отключен, пропускаем обработку") + return + # === ПРОВЕРКА: Нужна ли компактификация? === # Проверяем порог заполненности контекста if compactor.check_compaction_needed(): @@ -174,10 +182,10 @@ async def handle_ai_task(update: Update, text: str): # === ПРОВЕРКА: Решение AI агента об использовании инструментов === agent_decision = await ai_agent.decide(text, context={'user_id': user_id}) - + if agent_decision.should_use_tool: logger.info(f"AI агент решил использовать инструмент: {agent_decision.tool_name} (confidence={agent_decision.confidence})") - + # Выполняем инструмент tool_result = await ai_agent.execute_tool( agent_decision.tool_name, @@ -261,11 +269,12 @@ async def handle_ai_task(update: Update, text: str): output_buffer.append(chunk_text) # В result_buffer добавляем ТОЛЬКО если это не статус инструмента - # Статусы инструментов начинаются с "\n🔧" - if not chunk_text.strip().startswith("🔧"): + # Статусы инструментов содержат "🔧 Использую инструмент:" + is_tool_status = "🔧 Использую инструмент:" in chunk_text + if not is_tool_status: result_buffer.append(chunk_text) - logger.debug(f"output_buffer: {len(output_buffer)}, result_buffer: {len(result_buffer)}") + logger.debug(f"output_buffer: {len(output_buffer)}, result_buffer: {len(result_buffer)}, is_tool_status: {is_tool_status}") # Если сообщение ещё не создано - создаём if stream_message is None: @@ -284,12 +293,22 @@ async def handle_ai_task(update: Update, text: str): # Обновляем сообщение try: + # Экранируем специальные символы Markdown для безопасной отправки + from bot.utils.formatters import escape_markdown + escaped_output = escape_markdown(current_output) await stream_message.edit_text( - f"⏳ {current_status}\n\n{current_output}", + f"⏳ {current_status}\n\n{escaped_output}", parse_mode="Markdown" ) except Exception as e: logger.debug(f"Ошибка редактирования: {e}") + # Фоллбэк: отправляем без Markdown + try: + await stream_message.edit_text( + f"⏳ {current_status}\n\n{current_output}" + ) + except: + pass await asyncio.sleep(0.3) # Формируем контекст с историей + памятью + summary @@ -316,12 +335,48 @@ async def handle_ai_task(update: Update, text: str): MAX_CONTEXT_TOKENS = 200_000 context_percent = round((context_tokens / MAX_CONTEXT_TOKENS) * 100, 1) - # Получаем текущего AI-провайдера + # Получаем текущий AI-пресет + ai_preset = state.ai_preset + + # Определяем провайдера и модель на основе пресета + from bot.models.user_state import ( + AI_PRESET_OFF, + AI_PRESET_QWEN, + AI_PRESET_GIGA_AUTO, + AI_PRESET_GIGA_LITE, + AI_PRESET_GIGA_PRO, + ) + + if ai_preset == AI_PRESET_OFF: + # ИИ отключен - не должны были сюда попасть + logger.warning(f"Попытка обработки AI-запроса при отключенном ИИ (пресет={ai_preset})") + return + + if ai_preset == AI_PRESET_QWEN: + current_provider = "qwen" + provider_display = "Qwen Code" + elif ai_preset in [AI_PRESET_GIGA_AUTO, AI_PRESET_GIGA_LITE, AI_PRESET_GIGA_PRO]: + current_provider = "gigachat" + # Для GigaChat пресетов устанавливаем нужную модель + from bot.tools.gigachat_tool import GigaChatConfig + if ai_preset == AI_PRESET_GIGA_LITE: + # Принудительно Lite модель + GigaChatConfig.model = GigaChatConfig.model_lite + elif ai_preset == AI_PRESET_GIGA_PRO: + # Принудительно Pro модель + GigaChatConfig.model = GigaChatConfig.model_pro + # ai_preset == AI_PRESET_GIGA_AUTO использует авто-переключение в gigachat_tool.py + provider_display = f"GigaChat ({ai_preset})" + else: + # По умолчанию Qwen + current_provider = "qwen" + provider_display = "Qwen Code" + + logger.info(f"AI-пресет: {ai_preset}, провайдер: {current_provider}") + + # Получаем менеджера провайдеров from bot.ai_provider_manager import get_ai_provider_manager provider_manager = get_ai_provider_manager() - current_provider = provider_manager.get_current_provider(state) - - logger.info(f"Обработка AI-запроса через провайдер: {current_provider}") # Собираем полный промпт с системным промптом system_prompt = qwen_manager.load_system_prompt() @@ -385,24 +440,52 @@ async def handle_ai_task(update: Update, text: str): except Exception as e: logger.debug(f"Ошибка обновления статуса для GigaChat: {e}") + # Формируем контекст для GigaChat из памяти и истории + # Это обеспечивает ту же функциональность что и для Qwen + context_messages = [] + + # Добавляем summary если есть + if summary: + context_messages.append({ + "role": "system", + "content": f"=== SUMMARY ДИАЛОГА ===\n{summary}" + }) + + # Добавляем контекст памяти + if memory_context: + context_messages.append({ + "role": "system", + "content": f"=== КОНТЕКСТ ПАМЯТИ ===\n{memory_context}" + }) + + # Добавляем историю диалога + if history_context: + for line in history_context.split("\n"): + if line.startswith("user:"): + context_messages.append({"role": "user", "content": line[5:].strip()}) + elif line.startswith("assistant:"): + context_messages.append({"role": "assistant", "content": line[10:].strip()}) + result = await provider_manager.execute_request( provider_id=current_provider, user_id=user_id, - prompt=full_task, - system_prompt=system_prompt, + prompt=text, # Только запрос пользователя + system_prompt=system_prompt, # System prompt отдельно + context=context_messages, # Контекст из памяти и истории on_chunk=None # GigaChat не поддерживает потоковый вывод ) if result.get("success"): full_output = result.get("content", "") + # Получаем информацию о модели + model_name = result.get("metadata", {}).get("model") + if model_name: + provider_name = f"GigaChat ({model_name})" + else: + provider_name = "GigaChat" else: full_output = f"❌ **Ошибка {provider_manager.get_provider_info(current_provider).name}:**\n{result.get('error', 'Неизвестная ошибка')}" - - provider_name = "GigaChat" - - else: - full_output = f"❌ Неизвестный провайдер: {current_provider}" - provider_name = "Unknown" + provider_name = "GigaChat" # Добавляем ответ ИИ в историю и память if full_output and full_output != "⚠️ Не удалось получить ответ ИИ": @@ -1287,12 +1370,61 @@ async def _execute_composite_command_ssh(update: Update, command: str, server: S async def _execute_local_command_message(update: Update, command: str, working_dir: str): - """Выполнение локальной команды из сообщения через pexpect.""" + """Выполнение локальной команды из сообщения.""" user_id = update.effective_user.id + + # Для простых команд используем subprocess (быстро и надёжно) + # Для интерактивных команд (sudo, ssh и т.д.) нужен pexpect + logger.info(f"Выполнение локальной команды: {command} в {working_dir}") + + # Проверяем, нужна ли интерактивность + needs_interactive = any(cmd in command for cmd in ['sudo', 'ssh', 'su ', 'passwd', 'login']) + if needs_interactive: + logger.info("Команда требует интерактивного ввода, используем pexpect") + await _execute_local_command_interactive(update, command, working_dir) + else: + logger.info("Выполняю команду через subprocess") + await _execute_local_command_subprocess(update, command, working_dir) + + +async def _execute_local_command_subprocess(update: Update, command: str, working_dir: str): + """Выполнение локальной команды через subprocess (без интерактивности).""" + try: + logger.info(f"Создаю subprocess: {command}") + process = await asyncio.create_subprocess_shell( + command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd=working_dir + ) + logger.info(f"Process PID: {process.pid}, жду выполнения...") + + stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=30) + logger.info(f"Process завершен с кодом: {process.returncode}") + + output = stdout.decode("utf-8", errors="replace").strip() + error = stderr.decode("utf-8", errors="replace").strip() + + logger.info(f"Output length: {len(output)}, Error length: {len(error)}") + + await _show_result_message(update, command, output, error, process.returncode) + + except asyncio.TimeoutError: + logger.error("Таймаут выполнения команды") + await update.message.reply_text("❌ *Таймаут*\n\nКоманда выполнялась дольше 30 секунд.", parse_mode="Markdown") + except Exception as e: + logger.error(f"Ошибка: {e}") + await update.message.reply_text(f"❌ *Ошибка:*\n```\n{str(e)}\n```", parse_mode="Markdown") + + +async def _execute_local_command_interactive(update: Update, command: str, working_dir: str): + """Выполнение локальной команды через pexpect (с поддержкой интерактивности).""" + user_id = update.effective_user.id + try: logger.info(f"Запуск команды через pexpect: {command}") - + # Создаём интерактивный процесс child = pexpect.spawn( '/bin/bash', @@ -1303,20 +1435,20 @@ async def _execute_local_command_message(update: Update, command: str, working_d echo=False, timeout=30 ) - - # Создаём сессию (используем child вместо master_fd) + + # Создаём сессию session = local_session_manager.create_session( user_id=user_id, command=command, master_fd=child.child_fd, pid=child.pid ) - session.context = {'child': child} # Сохраняем child объект - + session.context = {'child': child} + # Читаем начальный вывод logger.info("Чтение вывода...") output = "" - + try: # Пробуем прочитать с таймаутом while True: @@ -1325,24 +1457,24 @@ async def _execute_local_command_message(update: Update, command: str, working_d break output += line logger.debug(f"Прочитано: {len(line)} символов") - + # Проверяем запрос ввода if detect_input_type(output): break - + except pexpect.TIMEOUT: pass except pexpect.EOF: pass - + logger.info(f"Прочитано: {len(output)} символов") session.output_buffer = output session.last_activity = datetime.now() - + # Проверяем тип ввода input_type = detect_input_type(output) logger.info(f"Тип ввода: {input_type}") - + if input_type == "password": session.waiting_for_input = True session.input_type = "password" @@ -1485,25 +1617,92 @@ async def _execute_ssh_command_message(update: Update, command: str, server: Ser async def _show_result_message(update: Update, command: str, output: str, error: str, returncode: int): """Показ результата выполнения команды.""" + logger.info(f"_show_result_message: output_len={len(output)}, error_len={len(error)}") + # Очистка ANSI-кодов и нормализация - output = normalize_output(clean_ansi_codes(output)) if output else "" + if output: + output = clean_ansi_codes(output) + logger.info(f"После clean_ansi_codes: output_len={len(output)}") + output = normalize_output(output) + logger.info(f"После normalize_output: output_len={len(output)}") + else: + output = "" + error = clean_ansi_codes(error) if error else "" result = f"✅ *Результат:*\n\n" if output: - # Форматируем длинный вывод: первые 5 и последние 10 строк - output = format_long_output(output, max_lines=15, head_lines=5, tail_lines=10) + # Показываем ВЕСЬ вывод, разбивая на сообщения если нужно + # Экранируем backticks в output чтобы они не ломали блоки кода + output = output.replace("```", "\\`\\`\\`").replace("`", "\\`") result += f"```\n{output}\n```\n" + logger.info(f"Добавлен output в результат, длина result={len(result)}") + else: + logger.warning("output пустой после обработки!") if error: + # Экранируем backticks в error + error = error.replace("```", "\\`\\`\\`").replace("`", "\\`") result += f"*Ошибки:*\n```\n{error}\n```\n" result += f"\n*Код возврата:* `{returncode}`" - # Экранируем backticks и отправляем с разбивкой - result = escape_markdown(result) - await send_long_message(update, result, parse_mode="Markdown") + # Экранируем специальные символы Markdown ТОЛЬКО вне блоков кода + # Блоки кода (```) уже защищены — их содержимое не трогаем + # Экранируем: * _ ( ) [ ] но не ` и не содержимое ``` + result = smart_escape_markdown(result) + logger.info(f"Отправляю сообщение, длина={len(result)}") + await send_long_message(update, result, parse_mode="Markdown", pause_every=3) + logger.info("Сообщение отправлено") + + +def smart_escape_markdown(text: str) -> str: + """ + Умное экранирование Markdown — только вне блоков кода. + Не трогает уже существующую разметку (*жирный*, _курсив_, `код`). + """ + # Разбиваем на части: внутри ``` и снаружи + parts = text.split("```") + escaped_parts = [] + + for i, part in enumerate(parts): + if i % 2 == 0: + # Вне блоков кода — экранируем ТОЛЬКО одиночные спецсимволы + # Не трогаем: *текст*, _текст_, `код`, [текст](url) + escaped = escape_unescaped_special_chars(part) + escaped_parts.append(escaped) + else: + # Внутри блоков кода — не трогаем + escaped_parts.append(part) + + return "```".join(escaped_parts) + + +def escape_unescaped_special_chars(text: str) -> str: + """ + Экранирует спецсимволы Markdown которые ещё не экранированы. + Не трогает уже размеченный текст. + """ + # Сначала экранируем обратные слэши + text = text.replace('\\', '\\\\') + + # Экранируем * _ [ ] ( ) которые не являются частью разметки + # Простая эвристика: экранируем если символ не окружён буквами/цифрами + import re + + # Экранируем * если это не *текст* + # text = re.sub(r'(?= 0.8, score + def _should_use_file_system(self, message: str) -> tuple[bool, float]: + """Проверить, нужна ли операция с файловой системой.""" + message_lower = message.lower() + score = 0.0 + + # Прямые триггеры + for trigger in self.FILE_SYSTEM_TRIGGERS: + if trigger in message_lower: + return True, 0.9 + + # Операции с файлами + file_operations = ['прочитай', 'покажи', 'создай', 'запиши', 'скопируй', 'перемести', 'удали', 'открой'] + file_objects = ['файл', 'директорию', 'папку', 'документ', 'текст', 'содержимое'] + + has_op = any(op in message_lower for op in file_operations) + has_obj = any(obj in message_lower for obj in file_objects) + + if has_op and has_obj: + score = max(score, 0.75) + + # Упоминания конкретных команд + commands = ['cat', 'ls', 'mkdir', 'cp', 'mv', 'rm', 'touch', 'pwd'] + for cmd in commands: + if f'{cmd} ' in message_lower or message_lower.endswith(cmd): + score = max(score, 0.85) + + return score >= 0.75, score + async def decide(self, message: str, context: Optional[Dict] = None) -> AgentDecision: """ Принять решение об использовании инструмента. @@ -190,10 +232,21 @@ class AIAgent: """ user_id = context.get('user_id') if context else None - # Приоритет: SSH > Cron > Поиск > RSS + # Приоритет: File System > SSH > Cron > Поиск > RSS # Проверяем в порядке приоритета - # 1. Проверка на SSH-команды (системные задачи) + # 1. Проверка на операции с файловой системой (ВЫСОКИЙ ПРИОРИТЕТ) + should_fs, fs_conf = self._should_use_file_system(message) + if should_fs and fs_conf > 0.75: + return AgentDecision( + should_use_tool=True, + tool_name='file_system_tool', + tool_args=self._extract_file_system_args(message), + confidence=fs_conf, + reasoning='Пользователю нужно выполнить операцию с файлами' + ) + + # 2. Проверка на SSH-команды (системные задачи) should_ssh, ssh_conf = self._should_use_ssh(message) if should_ssh and ssh_conf > 0.75: return AgentDecision( @@ -204,7 +257,7 @@ class AIAgent: reasoning='Пользователю нужно выполнить команду на сервере' ) - # 2. Проверка на Cron-задачи (планирование) + # 3. Проверка на Cron-задачи (планирование) should_cron, cron_conf = self._should_use_cron(message) if should_cron and cron_conf > 0.75: return AgentDecision( @@ -215,7 +268,7 @@ class AIAgent: reasoning='Пользователь хочет создать или управлять задачей' ) - # 3. Проверка на поиск + # 4. Проверка на поиск should_search, search_conf = self._should_search(message) if should_search and search_conf > 0.7: query = self._extract_search_query(message) @@ -227,7 +280,7 @@ class AIAgent: reasoning='Пользователю нужна информация из интернета' ) - # 4. Проверка на RSS — только явные запросы + # 5. Проверка на RSS — только явные запросы should_rss, rss_conf = self._should_read_rss(message) if should_rss: # Порог уже проверен в _should_read_rss (0.95) return AgentDecision( @@ -279,6 +332,146 @@ class AIAgent: # Возвращаем оригинальное сообщение как команду return message + def _extract_file_system_args(self, message: str) -> Dict[str, Any]: + """ + Извлечь аргументы для file_system_tool из сообщения. + + Возвращает dict с operation и другими параметрами. + """ + import re + message_lower = message.lower() + + # Определяем операцию по триггерам + operation_map = { + 'прочитай файл': 'read', + 'покажи файл': 'read', + 'открой файл': 'read', + 'посмотри файл': 'read', + 'посмотри содержимое': 'read', + 'содержимое файла': 'read', + 'cat ': 'read', + + 'создай файл': 'write', + 'запиши в файл': 'write', + 'сохрани в файл': 'write', + 'сохрани текст': 'write', + 'запиши текст': 'write', + 'touch ': 'write', + + 'скопируй файл': 'copy', + 'скопируй': 'copy', + 'cp ': 'copy', + + 'перемести файл': 'move', + 'перемести': 'move', + 'mv ': 'move', + 'переименуй файл': 'move', # Переименование = перемещение + + 'удали файл': 'delete', + 'удали директорию': 'delete', + 'удали папку': 'delete', + 'rm ': 'delete', + + 'создай директорию': 'mkdir', + 'создай папку': 'mkdir', + 'mkdir ': 'mkdir', + + 'покажи директорию': 'list', + 'список файлов': 'list', + 'что в папке': 'list', + 'что в директории': 'list', + 'покажи файлы': 'list', + 'ls ': 'list', + + 'найди файл': 'search', + 'поиск файла': 'search', + } + + # Определяем операцию + operation = 'shell' # по умолчанию + for trigger, op in operation_map.items(): + if trigger in message_lower: + operation = op + break + + # Извлекаем путь (после команды) + path = None + source = None + destination = None + content = None + + # Паттерн для извлечения пути после команды + for cmd in ['cat', 'ls', 'mkdir', 'rm', 'touch']: + match = re.search(rf'{cmd}\s+([^\s]+)', message_lower) + if match: + path = match.group(1).strip() + break + + # Для copy/move ищем два пути + if operation in ('copy', 'move'): + # Ищем паттерн "X в Y" или "X Y" + match = re.search(r'([^\s]+)\s+(?:в|into|to)\s+([^\s]+)', message_lower) + if match: + source = match.group(1).strip() + destination = match.group(2).strip() + else: + # Просто два слова подряд + parts = message.split() + for i, part in enumerate(parts): + if part.lower() in ['cp', 'mv', 'copy', 'move', 'скопируй', 'перемести']: + if i + 2 < len(parts): + source = parts[i + 1].strip() + destination = parts[i + 2].strip() + break + + # Для write пытаемся извлечь содержимое + if operation == 'write': + # Ищем текст после "сохрани" или "запиши" + match = re.search(r'(?:сохрани|запиши)\s*(?:в файл|текст)?\s*[:\-]?\s*(.+)', message, re.IGNORECASE) + if match: + content = match.group(1).strip() + # Если есть кавычки - извлекаем содержимое + quoted = re.search(r'["\']([^"\']+)["\']', message) + if quoted: + content = quoted.group(1) + + # Для search ищем паттерн + pattern = '*' + if operation == 'search': + match = re.search(r'pattern\s*[=:]\s*([^\s]+)', message_lower) + if match: + pattern = match.group(1).strip() + # Или ищем *.extension + glob_match = re.search(r'\*\.[^\s]+', message_lower) + if glob_match: + pattern = glob_match.group(0).strip() + + # Формируем аргументы + args = {'operation': operation} + + if path: + args['path'] = path + if source: + args['source'] = source + if destination: + args['destination'] = destination + if content: + args['content'] = content + if pattern and operation == 'search': + args['pattern'] = pattern + + # Если путь не найден, пробуем извлечь общее слово после операции + if not path and not source: + words = message.split() + for i, word in enumerate(words): + if word.lower() in ['cat', 'ls', 'mkdir', 'rm', 'touch', 'read', 'write', 'delete', 'list']: + if i + 1 < len(words): + args['path'] = words[i + 1].strip() + break + + logger.info(f"Извлечены аргументы file_system: {args}") + return args + async def execute_tool(self, tool_name: str, **kwargs) -> ToolResult: """Выполнить инструмент и сохранить историю.""" logger.info(f"🤖 AI-агент выполняет инструмент: {tool_name} с аргументами: {kwargs}") diff --git a/bot/ai_provider_manager.py b/bot/ai_provider_manager.py index 2031737..7c6ada7 100644 --- a/bot/ai_provider_manager.py +++ b/bot/ai_provider_manager.py @@ -43,16 +43,15 @@ class AIProviderManager: через активного провайдера с поддержкой инструментов. """ - def __init__(self, qwen_manager=None, gigachat_provider=None): + def __init__(self, qwen_manager=None): self._qwen_manager = qwen_manager - self._gigachat_provider = gigachat_provider self._provider_status: Dict[str, bool] = {} self._providers: Dict[str, BaseAIProvider] = {} self._tools_registry: Dict[str, Any] = {} # Инициализируем провайдеров self._init_providers() - + # Проверяем доступность провайдеров при инициализации self._check_provider_status() @@ -63,12 +62,11 @@ class AIProviderManager: from bot.providers.qwen_provider import QwenCodeProvider self._providers[AIProvider.QWEN.value] = QwenCodeProvider(self._qwen_manager) logger.info("Qwen Code Provider инициализирован") - - # GigaChat Provider - if self._gigachat_provider: - from bot.providers.gigachat_provider import GigaChatProvider - self._providers[AIProvider.GIGACHAT.value] = GigaChatProvider(self._gigachat_provider) - logger.info("GigaChat Provider инициализирован") + + # GigaChat Provider - создаём новый экземпляр напрямую + from bot.providers.gigachat_provider import GigaChatProvider + self._providers[AIProvider.GIGACHAT.value] = GigaChatProvider() + logger.info("GigaChat Provider инициализирован") def set_tools_registry(self, tools_registry: Dict[str, Any]): """Установить реестр инструментов для всех провайдеров.""" @@ -82,10 +80,11 @@ class AIProviderManager: """Проверка доступности провайдеров.""" # Проверяем Qwen self._provider_status[AIProvider.QWEN.value] = True # Qwen всегда доступен - + # Проверяем GigaChat - if self._gigachat_provider: - self._provider_status[AIProvider.GIGACHAT.value] = self._gigachat_provider.is_available() + gigachat_provider = self._providers.get(AIProvider.GIGACHAT.value) + if gigachat_provider: + self._provider_status[AIProvider.GIGACHAT.value] = gigachat_provider.is_available() else: self._provider_status[AIProvider.GIGACHAT.value] = False @@ -211,6 +210,11 @@ class AIProviderManager: ) if response.success: + # Получаем информацию о модели из metadata ответа + model_name = None + if response.message and response.message.metadata: + model_name = response.message.metadata.get("model") + return { "success": True, "content": response.message.content if response.message else "", @@ -218,7 +222,8 @@ class AIProviderManager: "metadata": { "provider_name": response.provider_name, "usage": response.usage, - "tool_calls": len(response.message.tool_calls) if response.message and response.message.tool_calls else 0 + "tool_calls": len(response.message.tool_calls) if response.message and response.message.tool_calls else 0, + "model": model_name # Добавляем модель } } else: @@ -241,10 +246,15 @@ class AIProviderManager: ai_provider_manager: Optional[AIProviderManager] = None -def init_ai_provider_manager(qwen_manager, gigachat_provider) -> AIProviderManager: +def init_ai_provider_manager(qwen_manager, tools_registry=None) -> AIProviderManager: """Инициализировать глобальный AIProviderManager.""" global ai_provider_manager - ai_provider_manager = AIProviderManager(qwen_manager, gigachat_provider) + ai_provider_manager = AIProviderManager(qwen_manager) + + # Устанавливаем реестр инструментов если предоставлен + if tools_registry: + ai_provider_manager.set_tools_registry(tools_registry) + logger.info(f"AIProviderManager инициализирован. Доступные провайдеры: {ai_provider_manager.get_available_providers()}") return ai_provider_manager diff --git a/bot/base_ai_provider.py b/bot/base_ai_provider.py index ea29902..1b5b961 100644 --- a/bot/base_ai_provider.py +++ b/bot/base_ai_provider.py @@ -6,6 +6,7 @@ Base AI Provider Protocol - универсальный интерфейс для для работы с инструментами (tools). """ +import json from abc import ABC, abstractmethod from typing import Optional, Dict, Any, Callable, List, AsyncGenerator from dataclasses import dataclass, field @@ -138,13 +139,34 @@ class BaseAIProvider(ABC): Провайдеры могут переопределить для кастомизации. Args: - tools_registry: Словарь инструментов {name: tool_instance} + tools_registry: Словарь инструментов {name: tool_instance} или объект реестра Returns: Список схем инструментов """ schema = [] - for name, tool in tools_registry.items(): + + # Обрабатываем разные типы tools_registry + if tools_registry is None: + return schema + + # Если это ToolsRegistry с методом get_all() + if hasattr(tools_registry, 'get_all') and callable(getattr(tools_registry, 'get_all')): + items = tools_registry.get_all().items() + # Если это dict - используем .items() + elif isinstance(tools_registry, dict): + items = tools_registry.items() + # Если это объект с атрибутом tools + elif hasattr(tools_registry, 'tools'): + items = tools_registry.tools.items() if isinstance(tools_registry.tools, dict) else [] + # Если это объект поддерживающий .items() + elif hasattr(tools_registry, 'items'): + items = tools_registry.items() + else: + logger.warning(f"Неизвестный тип tools_registry: {type(tools_registry)}") + return schema + + for name, tool in items: if hasattr(tool, 'get_schema'): schema.append(tool.get_schema()) elif hasattr(tool, 'description'): @@ -195,20 +217,29 @@ class BaseAIProvider(ABC): **kwargs ) - messages = [] + # Формируем базовый контекст — БЕЗ system message + # System message будет передаваться отдельным параметром + base_messages = [] if context: - messages.extend(context) - - messages.append({"role": "user", "content": prompt}) + # Фильтруем system messages из context — они будут переданы через system_prompt + for msg in context: + if msg.get("role") != "system": + base_messages.append(msg) + + base_messages.append({"role": "user", "content": prompt}) tools_schema = self.get_tools_schema(tools_registry) if self.supports_tools else None + # Копируем сообщения для каждой итерации + messages = base_messages.copy() + for iteration in range(max_iterations): # Отправляем запрос провайдеру + # system_prompt передаётся всегда — провайдер сам решит как его использовать response = await self.chat( prompt=None, # Уже в messages system_prompt=system_prompt, - context=messages if iteration == 0 else None, + context=messages, tools=tools_schema, on_chunk=on_chunk, **kwargs @@ -232,8 +263,15 @@ class BaseAIProvider(ABC): # Выполняем инструменты tool_results = [] for tool_call in message.tool_calls: - if tool_call.tool_name in tools_registry: - tool = tools_registry[tool_call.tool_name] + # Проверяем наличие инструмента через метод .get() для поддержки ToolsRegistry + if hasattr(tools_registry, 'get'): + tool = tools_registry.get(tool_call.tool_name) + elif isinstance(tools_registry, dict): + tool = tools_registry.get(tool_call.tool_name) + else: + tool = None + + if tool is not None: try: if hasattr(tool, 'execute'): result = await tool.execute( @@ -244,18 +282,25 @@ class BaseAIProvider(ABC): result = await tool(**tool_call.tool_args) else: result = f"Инструмент {tool_call.tool_name} не имеет метода execute" - + tool_call.result = result tool_call.status = ToolCallStatus.SUCCESS except Exception as e: tool_call.error = str(e) tool_call.status = ToolCallStatus.ERROR result = f"Ошибка: {e}" - + + # Преобразуем результат в JSON-сериализуемый формат + # ToolResult имеет метод to_dict(), строки оставляем как есть + if hasattr(result, 'to_dict'): + result_serializable = result.to_dict() + else: + result_serializable = result + tool_results.append({ "tool": tool_call.tool_name, "args": tool_call.tool_args, - "result": result, + "result": result_serializable, "status": tool_call.status.value }) else: @@ -280,9 +325,11 @@ class BaseAIProvider(ABC): ] }) + # GigaChat требует валидный JSON в tool messages, а не Python repr строку + # Используем json.dumps для корректного форматирования messages.append({ "role": "tool", - "content": str(tool_results) + "content": json.dumps(tool_results, ensure_ascii=False) }) # Обновляем системный промпт для следующей итерации diff --git a/bot/handlers/ai_presets.py b/bot/handlers/ai_presets.py new file mode 100644 index 0000000..5bb54c5 --- /dev/null +++ b/bot/handlers/ai_presets.py @@ -0,0 +1,264 @@ +#!/usr/bin/env python3 +""" +Обработчики команд для переключения AI-пресетов. + +Доступные пресеты: +- off: ИИ отключен, режим CLI команд +- qwen: Qwen Code (бесплатно, локально) +- giga_auto: GigaChat авто-переключение (Lite/Pro) +- giga_lite: GigaChat Lite (дешевле) +- giga_pro: GigaChat Pro (максимальное качество) +""" + +import logging +from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup +from telegram.ext import CommandHandler, CallbackQueryHandler + +from bot.models.user_state import ( + AI_PRESET_OFF, + AI_PRESET_QWEN, + AI_PRESET_GIGA_AUTO, + AI_PRESET_GIGA_LITE, + AI_PRESET_GIGA_PRO, +) +from bot.config import state_manager + +logger = logging.getLogger(__name__) + +# Описание пресетов +PRESET_DESCRIPTIONS = { + AI_PRESET_OFF: { + "name": "❌ ИИ Отключен", + "description": "Режим CLI команд. Бот выполняет команды напрямую.", + "icon": "⌨️" + }, + AI_PRESET_QWEN: { + "name": "🤖 Qwen Code", + "description": "Бесплатно, локально. Лучший для кода и работы с файлами.", + "icon": "💻" + }, + AI_PRESET_GIGA_AUTO: { + "name": "🔄 GigaChat Авто", + "description": "Умное переключение Lite/Pro. Простые → Lite, сложные → Pro.", + "icon": "🧠" + }, + AI_PRESET_GIGA_LITE: { + "name": "⚡ GigaChat Lite", + "description": "Быстро и дёшево. Для простых вопросов и чата.", + "icon": "🚀" + }, + AI_PRESET_GIGA_PRO: { + "name": "🔥 GigaChat Pro", + "description": "Максимальное качество. Для сложных творческих задач.", + "icon": "👑" + }, +} + + +def get_preset_display_name(preset: str) -> str: + """Получить отображаемое имя пресета.""" + desc = PRESET_DESCRIPTIONS.get(preset, {}) + return f"{desc.get('icon', '❓')} {desc.get('name', preset)}" + + +async def ai_presets_command(update: Update, context): + """Показать меню выбора AI-пресета.""" + user_id = update.effective_user.id + state = state_manager.get(user_id) + current_preset = state.ai_preset + + # Формируем меню + keyboard = [ + [ + InlineKeyboardButton( + f"{'✅' if current_preset == AI_PRESET_OFF else '⬜'} {PRESET_DESCRIPTIONS[AI_PRESET_OFF]['icon']} ИИ Отключен", + callback_data=f"ai_preset_{AI_PRESET_OFF}" + ) + ], + [ + InlineKeyboardButton( + f"{'✅' if current_preset == AI_PRESET_QWEN else '⬜'} {PRESET_DESCRIPTIONS[AI_PRESET_QWEN]['icon']} Qwen Code", + callback_data=f"ai_preset_{AI_PRESET_QWEN}" + ) + ], + [ + InlineKeyboardButton( + f"{'✅' if current_preset == AI_PRESET_GIGA_AUTO else '⬜'} {PRESET_DESCRIPTIONS[AI_PRESET_GIGA_AUTO]['icon']} GigaChat Авто", + callback_data=f"ai_preset_{AI_PRESET_GIGA_AUTO}" + ) + ], + [ + InlineKeyboardButton( + f"{'✅' if current_preset == AI_PRESET_GIGA_LITE else '⬜'} {PRESET_DESCRIPTIONS[AI_PRESET_GIGA_LITE]['icon']} GigaChat Lite", + callback_data=f"ai_preset_{AI_PRESET_GIGA_LITE}" + ), + InlineKeyboardButton( + f"{'✅' if current_preset == AI_PRESET_GIGA_PRO else '⬜'} {PRESET_DESCRIPTIONS[AI_PRESET_GIGA_PRO]['icon']} GigaChat Pro", + callback_data=f"ai_preset_{AI_PRESET_GIGA_PRO}" + ) + ], + ] + + reply_markup = InlineKeyboardMarkup(keyboard) + + current_name = get_preset_display_name(current_preset) + + output = f"🎛️ **Панель управления AI**\n\n" + output += f"**Текущий пресет:** {current_name}\n\n" + output += "Выберите AI-провайдер:\n\n" + output += "ℹ️ **Описание пресетов:**\n" + output += f"• {PRESET_DESCRIPTIONS[AI_PRESET_OFF]['icon']} **ИИ Отключен** — {PRESET_DESCRIPTIONS[AI_PRESET_OFF]['description']}\n" + output += f"• {PRESET_DESCRIPTIONS[AI_PRESET_QWEN]['icon']} **Qwen Code** — {PRESET_DESCRIPTIONS[AI_PRESET_QWEN]['description']}\n" + output += f"• {PRESET_DESCRIPTIONS[AI_PRESET_GIGA_AUTO]['icon']} **GigaChat Авто** — {PRESET_DESCRIPTIONS[AI_PRESET_GIGA_AUTO]['description']}\n" + output += f"• {PRESET_DESCRIPTIONS[AI_PRESET_GIGA_LITE]['icon']} **GigaChat Lite** — {PRESET_DESCRIPTIONS[AI_PRESET_GIGA_LITE]['description']}\n" + output += f"• {PRESET_DESCRIPTIONS[AI_PRESET_GIGA_PRO]['icon']} **GigaChat Pro** — {PRESET_DESCRIPTIONS[AI_PRESET_GIGA_PRO]['description']}" + + await update.message.reply_text(output, parse_mode="Markdown", reply_markup=reply_markup) + + +async def ai_preset_callback(update: Update, context): + """Обработка выбора пресета из инлайн-меню.""" + user_id = update.effective_user.id + query = update.callback_query + await query.answer() + + # Извлекаем название пресета из callback_data + preset = query.data.replace("ai_preset_", "") + + if preset not in PRESET_DESCRIPTIONS: + await query.edit_message_text("❌ Неверный пресет") + return + + state = state_manager.get(user_id) + old_preset = state.ai_preset + state.ai_preset = preset + + # Обновляем ai_chat_mode и current_ai_provider для совместимости + if preset == AI_PRESET_OFF: + state.ai_chat_mode = False + state.current_ai_provider = "none" + else: + state.ai_chat_mode = True + # Для совместимости с существующим кодом + if preset == AI_PRESET_QWEN: + state.current_ai_provider = "qwen" + else: # Любой GigaChat + state.current_ai_provider = "gigachat" + + preset_name = get_preset_display_name(preset) + + output = f"✅ **Переключено на:** {preset_name}\n\n" + output += f"{PRESET_DESCRIPTIONS[preset]['description']}" + + # Обновляем инлайн-меню + keyboard = [ + [ + InlineKeyboardButton( + f"{'✅' if preset == AI_PRESET_OFF else '⬜'} {PRESET_DESCRIPTIONS[AI_PRESET_OFF]['icon']} ИИ Отключен", + callback_data=f"ai_preset_{AI_PRESET_OFF}" + ) + ], + [ + InlineKeyboardButton( + f"{'✅' if preset == AI_PRESET_QWEN else '⬜'} {PRESET_DESCRIPTIONS[AI_PRESET_QWEN]['icon']} Qwen Code", + callback_data=f"ai_preset_{AI_PRESET_QWEN}" + ) + ], + [ + InlineKeyboardButton( + f"{'✅' if preset == AI_PRESET_GIGA_AUTO else '⬜'} {PRESET_DESCRIPTIONS[AI_PRESET_GIGA_AUTO]['icon']} GigaChat Авто", + callback_data=f"ai_preset_{AI_PRESET_GIGA_AUTO}" + ) + ], + [ + InlineKeyboardButton( + f"{'✅' if preset == AI_PRESET_GIGA_LITE else '⬜'} {PRESET_DESCRIPTIONS[AI_PRESET_GIGA_LITE]['icon']} GigaChat Lite", + callback_data=f"ai_preset_{AI_PRESET_GIGA_LITE}" + ), + InlineKeyboardButton( + f"{'✅' if preset == AI_PRESET_GIGA_PRO else '⬜'} {PRESET_DESCRIPTIONS[AI_PRESET_GIGA_PRO]['icon']} GigaChat Pro", + callback_data=f"ai_preset_{AI_PRESET_GIGA_PRO}" + ) + ], + ] + + reply_markup = InlineKeyboardMarkup(keyboard) + await query.edit_message_text(output, parse_mode="Markdown", reply_markup=reply_markup) + + logger.info(f"Пользователь {user_id} переключил AI-пресет: {old_preset} → {preset}") + + +# Быстрые команды для переключения одним сообщением +async def ai_off_command(update: Update, context): + """Быстрое переключение на ИИ отключен.""" + await switch_preset(update, AI_PRESET_OFF) + + +async def ai_qwen_command(update: Update, context): + """Быстрое переключение на Qwen Code.""" + await switch_preset(update, AI_PRESET_QWEN) + + +async def ai_giga_auto_command(update: Update, context): + """Быстрое переключение на GigaChat Авто.""" + await switch_preset(update, AI_PRESET_GIGA_AUTO) + + +async def ai_giga_lite_command(update: Update, context): + """Быстрое переключение на GigaChat Lite.""" + await switch_preset(update, AI_PRESET_GIGA_LITE) + + +async def ai_giga_pro_command(update: Update, context): + """Быстрое переключение на GigaChat Pro.""" + await switch_preset(update, AI_PRESET_GIGA_PRO) + + +async def switch_preset(update: Update, preset: str): + """Переключить пресет и показать уведомление.""" + user_id = update.effective_user.id + state = state_manager.get(user_id) + old_preset = state.ai_preset + state.ai_preset = preset + + # Обновляем совместимость + if preset == AI_PRESET_OFF: + state.ai_chat_mode = False + state.current_ai_provider = "none" + else: + state.ai_chat_mode = True + if preset == AI_PRESET_QWEN: + state.current_ai_provider = "qwen" + else: + state.current_ai_provider = "gigachat" + + preset_name = get_preset_display_name(preset) + + output = f"✅ **AI-пресет переключен**\n\n" + output += f"**Текущий:** {preset_name}\n" + output += f"_{PRESET_DESCRIPTIONS[preset]['description']}_\n\n" + + if old_preset != preset: + output += f"~{get_preset_display_name(old_preset)}~ → ✅ {preset_name}" + + await update.message.reply_text(output, parse_mode="Markdown") + + logger.info(f"Пользователь {user_id} переключил AI-пресет: {old_preset} → {preset}") + + +def register_ai_preset_handlers(dispatcher): + """Зарегистрировать обработчики AI-пресетов.""" + # Основное меню + dispatcher.add_handler(CommandHandler("ai_presets", ai_presets_command)) + + # Callback для инлайн-меню + dispatcher.add_handler(CallbackQueryHandler(ai_preset_callback, pattern="^ai_preset_")) + + # Быстрые команды + dispatcher.add_handler(CommandHandler("ai_off", ai_off_command)) + dispatcher.add_handler(CommandHandler("ai_qwen", ai_qwen_command)) + dispatcher.add_handler(CommandHandler("ai_giga_auto", ai_giga_auto_command)) + dispatcher.add_handler(CommandHandler("ai_giga_lite", ai_giga_lite_command)) + dispatcher.add_handler(CommandHandler("ai_giga_pro", ai_giga_pro_command)) + + logger.info("Обработчики AI-пресетов зарегистрированы") diff --git a/bot/handlers/callbacks.py b/bot/handlers/callbacks.py index b3c1c2e..14a5822 100644 --- a/bot/handlers/callbacks.py +++ b/bot/handlers/callbacks.py @@ -35,6 +35,38 @@ async def menu_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): reply_markup=menu_builder.get_keyboard("main", user_id=query.from_user.id, state=state) ) + elif callback == "ai_presets": + # Открываем меню AI-пресетов + state = state_manager.get(user_id) + from bot.handlers.ai_presets import ai_presets_command + # Создаём фейковое сообщение для совместимости + class FakeMessage: + async def reply_text(self, text, parse_mode=None, reply_markup=None): + # Вместо отправки сообщения редактируем callback + await query.edit_message_text(text, parse_mode=parse_mode, reply_markup=reply_markup) + return None + + fake_update = type('FakeUpdate', (), {'message': FakeMessage(), 'effective_user': query.from_user})() + await ai_presets_command(fake_update, context) + return + + elif callback.startswith("continue_output_"): + # Пользователь нажал "Продолжить" + remaining = int(callback.replace("continue_output_", "")) + state = state_manager.get(user_id) + state.waiting_for_output_control = False + state.continue_output = True + await query.answer(f"▶️ Продолжаем вывод (осталось {remaining} сообщений)") + return + + elif callback == "cancel_output": + # Пользователь нажал "Отменить" + state = state_manager.get(user_id) + state.waiting_for_output_control = False + state.continue_output = False + await query.answer("❌ Вывод отменен") + return + elif callback == "preset_menu": state.current_menu = "preset" await query.edit_message_text( diff --git a/bot/keyboards/menus.py b/bot/keyboards/menus.py index 2161478..91b1606 100644 --- a/bot/keyboards/menus.py +++ b/bot/keyboards/menus.py @@ -101,6 +101,7 @@ def init_menus(menu_builder: MenuBuilder): main_menu = [ MenuItem("🖥️ Выбор сервера", "server_menu", icon="🖥️"), MenuItem("📋 Предустановленные команды", "preset_menu", icon="📋"), + MenuItem("🎛️ AI-пресеты", "ai_presets", icon="🎛️"), MenuItem("💬 Чат с ИИ агентом", "toggle_ai_chat", icon="💬"), MenuItem("⚙️ Настройки бота", "settings_menu", icon="⚙️"), MenuItem("ℹ️ О боте", "about", icon="ℹ️"), diff --git a/bot/models/user_state.py b/bot/models/user_state.py index 5bb9b86..b020fbb 100644 --- a/bot/models/user_state.py +++ b/bot/models/user_state.py @@ -5,6 +5,14 @@ from typing import Dict, List, Optional, Any from dataclasses import dataclass, field +# Пресеты AI-провайдеров +AI_PRESET_OFF = "off" # ИИ отключен, режим CLI команд +AI_PRESET_QWEN = "qwen" # Qwen Code (бесплатно, локально) +AI_PRESET_GIGA_AUTO = "giga_auto" # GigaChat авто-переключение (Lite/Pro) +AI_PRESET_GIGA_LITE = "giga_lite" # GigaChat Lite (дешевле) +AI_PRESET_GIGA_PRO = "giga_pro" # GigaChat Pro (максимальное качество) + + @dataclass class UserState: """Состояние пользователя в диалоге.""" @@ -19,7 +27,14 @@ class UserState: ai_chat_mode: bool = True # Режим чата с ИИ агентом (включен по умолчанию) ai_chat_history: List[str] = field(default_factory=list) # История диалога с ИИ messages_since_fact_extract: int = 0 # Счётчик для извлечения фактов - current_ai_provider: str = "qwen" # Текущий AI-провайдер: "qwen" или "gigachat" + ai_preset: str = AI_PRESET_QWEN # Текущий AI-пресет + current_ai_provider: str = "qwen" # Текущий AI-провайдер (для совместимости) + + # Для управления длинным выводом + waiting_for_output_control: bool = False # Ожидание решения пользователя + output_remaining: int = 0 # Сколько сообщений осталось + output_wait_message = None # Сообщение с кнопками + continue_output: bool = True # Решение пользователя class StateManager: diff --git a/bot/providers/gigachat_provider.py b/bot/providers/gigachat_provider.py index 2a772bc..c73b9ed 100644 --- a/bot/providers/gigachat_provider.py +++ b/bot/providers/gigachat_provider.py @@ -4,12 +4,14 @@ GigaChat AI Provider - адаптер GigaChat для работы с инстр Реализует интерфейс BaseAIProvider для единой работы с инструментами независимо от AI-провайдера. + +Использует нативный GigaChat Function Calling API: +https://developers.sber.ru/docs/ru/gigachat/guides/functions/overview """ import logging from typing import Optional, Dict, Any, Callable, List import json -import re from bot.base_ai_provider import ( BaseAIProvider, @@ -25,15 +27,16 @@ logger = logging.getLogger(__name__) class GigaChatProvider(BaseAIProvider): """ - GigaChat AI Provider с поддержкой инструментов. - - Использует эвристики для извлечения вызовов инструментов из текста, - так как GigaChat не поддерживает нативные tool calls. + GigaChat AI Provider с нативной поддержкой function calling. + + Использует официальный GigaChat Function Calling API вместо + эмуляции через текстовые блоки. """ def __init__(self, config: Optional[GigaChatConfig] = None): self._tool = GigaChatTool(config) self._available: Optional[bool] = None + self._functions_state_id: Optional[str] = None @property def provider_name(self) -> str: @@ -41,8 +44,7 @@ class GigaChatProvider(BaseAIProvider): @property def supports_tools(self) -> bool: - # GigaChat не поддерживает нативные tool calls - # Но мы эмулируем через парсинг текста + # GigaChat поддерживает нативные function calls return True @property @@ -53,15 +55,14 @@ class GigaChatProvider(BaseAIProvider): """Проверить доступность GigaChat.""" if self._available is not None: return self._available - - # Проверяем наличие токенов + try: import os client_id = os.getenv("GIGACHAT_CLIENT_ID") client_secret = os.getenv("GIGACHAT_CLIENT_SECRET") - + self._available = bool(client_id and client_secret) - + if not self._available: logger.warning("GigaChat недоступен: не настроены GIGACHAT_CLIENT_ID или GIGACHAT_CLIENT_SECRET") else: @@ -69,106 +70,354 @@ class GigaChatProvider(BaseAIProvider): except Exception as e: self._available = False logger.error(f"Ошибка проверки доступности GigaChat: {e}") - + return self._available - def get_tools_schema(self, tools_registry: Dict[str, Any]) -> List[Dict[str, Any]]: + def get_error(self) -> Optional[str]: + """Получить последнюю ошибку.""" + if self._available is False: + return "GigaChat недоступен: проверьте GIGACHAT_CLIENT_ID и GIGACHAT_CLIENT_SECRET" + return None + + def get_functions_schema(self, tools_registry: Dict[str, Any]) -> List[Dict[str, Any]]: """ - Получить схему инструментов для промпта. - - Формирует описание инструментов в формате понятном GigaChat. + Получить схему функций для GigaChat API в правильном формате. + + Формат GigaChat: + { + "name": "function_name", + "description": "Описание функции", + "parameters": { + "type": "object", + "properties": {...}, + "required": [...] + }, + "return_parameters": {...} # опционально + } """ schema = [] - for name, tool in tools_registry.items(): + + if tools_registry is None: + return schema + + # Обрабатываем разные типы tools_registry + items = [] + if hasattr(tools_registry, 'get_all') and callable(getattr(tools_registry, 'get_all')): + items = list(tools_registry.get_all().items()) + elif isinstance(tools_registry, dict): + items = list(tools_registry.items()) + elif hasattr(tools_registry, 'tools'): + items = list(tools_registry.tools.items()) if isinstance(tools_registry.tools, dict) else [] + + for name, tool in items: if hasattr(tool, 'get_schema'): tool_schema = tool.get_schema() - schema.append({ + # Преобразуем в формат GigaChat с гарантией наличия properties + parameters = tool_schema.get("parameters", {}) + if not parameters: + parameters = {"type": "object", "properties": {}} + elif "properties" not in parameters: + parameters["properties"] = {} + + giga_schema = { "name": name, "description": tool_schema.get("description", ""), - "parameters": tool_schema.get("parameters", {}) - }) + "parameters": parameters + } + # Добавляем return_parameters если есть + if hasattr(tool, 'get_return_schema'): + giga_schema["return_parameters"] = tool.get_return_schema() + schema.append(giga_schema) elif hasattr(tool, 'description'): schema.append({ "name": name, "description": tool.description, - "parameters": getattr(tool, 'parameters', {}) + "parameters": {"type": "object", "properties": {}} # Пустая но валидная схема }) - + + logger.info(f"📋 GigaChat functions schema: {[f['name'] for f in schema]}") return schema - def _build_tools_prompt(self, tools_schema: List[Dict[str, Any]]) -> str: + def _parse_function_call(self, function_call: Dict[str, Any]) -> ToolCall: """ - Построить текстовое описание инструментов для промпта. - - GigaChat не поддерживает нативные tool calls, поэтому описываем - инструменты в тексте и просим модель использовать специальный формат. - """ - if not tools_schema: - return "" - - prompt_parts = [ - "\n\n🛠️ ДОСТУПНЫЕ ИНСТРУМЕНТЫ:", - "Ты можешь использовать следующие инструменты. Для вызова инструмента используй формат:", - "```tool", - '{"name": "имя_инструмента", "arguments": {аргументы}}', - '```', - "", - "Список инструментов:" - ] - - for tool in tools_schema: - name = tool.get("name", "unknown") - desc = tool.get("description", "Нет описания") - params = tool.get("parameters", {}) - - prompt_parts.append(f"\n**{name}**") - prompt_parts.append(f"Описание: {desc}") - if params: - prompt_parts.append(f"Параметры: {json.dumps(params, ensure_ascii=False)}") - - prompt_parts.extend([ - "", - "После вызова инструмента ты получишь результат и сможешь продолжить ответ." - ]) - - return "\n".join(prompt_parts) + Преобразовать function_call из ответа GigaChat в ToolCall. - def _parse_tool_calls(self, content: str) -> List[ToolCall]: + GigaChat возвращает: + { + "name": "function_name", + "arguments": {"arg1": "value1", ...} + } """ - Извлечь вызовы инструментов из текста ответа. - - Ищет блоки вида: - ```tool - {"name": "ssh_tool", "arguments": {"command": "df -h"}} - ``` - """ - tool_calls = [] - - # Ищем блоки ```tool {...}``` - pattern = r'```tool\s*\n({.*?})\s*\n```' - matches = re.findall(pattern, content, re.DOTALL) - - for match in matches: - try: - tool_data = json.loads(match) - tool_name = tool_data.get("name") - tool_args = tool_data.get("arguments", {}) - - if tool_name: - tool_calls.append(ToolCall( - tool_name=tool_name, - tool_args=tool_args, - tool_call_id=f"gc_{len(tool_calls)}" - )) - except json.JSONDecodeError as e: - logger.warning(f"Ошибка парсинга tool call: {e}") - - return tool_calls + try: + # Аргументы могут быть строкой JSON или уже dict + args = function_call.get("arguments", {}) + if isinstance(args, str): + args = json.loads(args) + except (json.JSONDecodeError, TypeError) as e: + logger.warning(f"Ошибка парсинга аргументов function_call: {e}") + args = {} - def _remove_tool_blocks(self, content: str) -> str: - """Удалить блоки вызовов инструментов из текста.""" - pattern = r'```tool\s*\n\{.*?\}\s*\n```' - return re.sub(pattern, '', content, flags=re.DOTALL).strip() + return ToolCall( + tool_name=function_call.get("name", "unknown"), + tool_args=args, + tool_call_id=function_call.get("name", "fc_0") # Используем name как ID + ) + + async def process_with_tools( + self, + prompt: str, + system_prompt: Optional[str] = None, + context: Optional[List[Dict[str, str]]] = None, + tools_registry: Optional[Dict[str, Any]] = None, + on_chunk: Optional[Callable[[str], Any]] = None, + max_iterations: int = 5, + **kwargs + ) -> ProviderResponse: + """ + Обработка запросов с function calling для GigaChat. + + Использует нативный GigaChat Function Calling API: + 1. Отправляем запрос с functions массивом + 2. Получаем function_call из ответа + 3. Выполняем инструмент + 4. Отправляем результат с role: "function" + 5. Повторяем пока не будет финального ответа + + Формат сообщений: + - user: {"role": "user", "content": "..."} + - assistant: {"role": "assistant", "function_call": {...}} + - function: {"role": "function", "name": "...", "content": "..."} + """ + if not tools_registry: + return await self.chat( + prompt=prompt, + system_prompt=system_prompt, + context=context, + on_chunk=on_chunk, + **kwargs + ) + + # Формируем базовые сообщения + messages = [] + + # Добавляем системный промпт если есть + if system_prompt: + messages.append({"role": "system", "content": system_prompt}) + + # Добавляем контекст (историю диалога) + if context: + for msg in context: + role = msg.get("role") + # Пропускаем system messages — они уже добавлены + if role == "system": + continue + # Преобразуем tool messages в function messages + if role == "tool": + role = "function" + if role in ("user", "assistant", "function"): + messages.append({ + "role": role, + "content": msg.get("content", ""), + "name": msg.get("name") # Для function messages + }) + + # Добавляем текущий запрос пользователя + if prompt: + messages.append({"role": "user", "content": prompt}) + + # Получаем схему функций + functions = self.get_functions_schema(tools_registry) if self.supports_tools else None + + logger.info(f"🔍 GigaChat process_with_tools: {len(messages)} сообщений, {len(functions) if functions else 0} функций") + + for iteration in range(max_iterations): + logger.info(f"🔄 Итерация {iteration + 1}/{max_iterations}") + + # Логируем сообщения перед отправкой + for i, msg in enumerate(messages[-3:]): # Последние 3 сообщения + content_preview = msg.get("content", "")[:100] + logger.info(f" 📨 [{i}] role={msg.get('role')}, content='{content_preview}...'") + + # Отправляем запрос с functions + response = await self._chat_with_functions( + messages=messages, + functions=functions, + user_id=kwargs.get('user_id'), + temperature=kwargs.get("temperature", 0.7), + max_tokens=kwargs.get("max_tokens", 2000), + ) + + if not response.get("success"): + return ProviderResponse( + success=False, + error=response.get("error", "Неизвестная ошибка"), + provider_name=self.provider_name + ) + + # Проверяем наличие function_call + function_call = response.get("function_call") + content = response.get("content", "") + + logger.info(f"📬 Ответ GigaChat: content_len={len(content) if content else 0}, function_call={function_call is not None}") + + # Если нет function_call — возвращаем финальный ответ + if not function_call: + return ProviderResponse( + success=True, + message=AIMessage( + content=content, + tool_calls=[], + metadata={ + "model": response.get("model", "GigaChat"), + "usage": response.get("usage", {}), + "functions_state_id": response.get("functions_state_id") + } + ), + provider_name=self.provider_name, + usage=response.get("usage") + ) + + # Есть function_call — парсим и выполняем инструмент + tool_call = self._parse_function_call(function_call) + logger.info(f"🛠️ Function call: {tool_call.tool_name}({tool_call.tool_args})") + + # Выполняем инструмент + if hasattr(tools_registry, 'get'): + tool = tools_registry.get(tool_call.tool_name) + elif isinstance(tools_registry, dict): + tool = tools_registry.get(tool_call.tool_name) + else: + tool = None + + if tool is not None: + try: + if hasattr(tool, 'execute'): + result = await tool.execute( + **tool_call.tool_args, + user_id=kwargs.get('user_id') + ) + elif hasattr(tool, '__call__'): + result = await tool(**tool_call.tool_args) + else: + result = f"Инструмент {tool_call.tool_name} не имеет метода execute" + + tool_call.result = result + tool_call.status = ToolCallStatus.SUCCESS + except Exception as e: + logger.exception(f"Ошибка выполнения инструмента {tool_call.tool_name}: {e}") + tool_call.error = str(e) + tool_call.status = ToolCallStatus.ERROR + result = {"error": str(e)} + else: + tool_call.error = f"Инструмент {tool_call.tool_name} не найден" + tool_call.status = ToolCallStatus.ERROR + result = {"error": tool_call.error} + + # Сериализуем результат + if hasattr(result, 'to_dict'): + result_dict = result.to_dict() + elif isinstance(result, dict): + result_dict = result + else: + result_dict = {"result": str(result)} + + result_json = json.dumps(result_dict, ensure_ascii=False) + + # Добавляем assistant message с function_call + messages.append({ + "role": "assistant", + "content": "", # Пустой content при function_call + "function_call": function_call + }) + + # Добавляем function message с результатом + messages.append({ + "role": "function", + "name": tool_call.tool_name, + "content": result_json + }) + + logger.info(f"✅ Добавлен function result: {tool_call.tool_name}, result_len={len(result_json)}") + + # Сохраняем functions_state_id для следующей итерации + if response.get("functions_state_id"): + self._functions_state_id = response["functions_state_id"] + + # Достигли максимума итераций + return ProviderResponse( + success=True, + message=AIMessage( + content=content + "\n\n[Достигнут максимум итераций выполнения функций]", + metadata={"iterations": max_iterations} + ), + provider_name=self.provider_name, + usage=response.get("usage") + ) + + async def _chat_with_functions( + self, + messages: List[Dict[str, Any]], + functions: Optional[List[Dict[str, Any]]] = None, + user_id: Optional[int] = None, + temperature: float = 0.7, + max_tokens: int = 2000, + ) -> Dict[str, Any]: + """ + Отправить запрос в GigaChat API с поддержкой function calling. + + Возвращает: + { + "success": bool, + "content": str, + "function_call": {"name": str, "arguments": dict} или None, + "model": str, + "usage": dict, + "functions_state_id": str или None + } + """ + try: + # Формируем сообщения в формате GigaChat + gc_messages = [] + for msg in messages: + gc_msg = {"role": msg["role"], "content": msg.get("content", "")} + if msg.get("name"): + gc_msg["name"] = msg["name"] + if msg.get("function_call"): + gc_msg["function_call"] = msg["function_call"] + gc_messages.append(gc_msg) + + # Выполняем запрос через GigaChatTool + result = await self._tool.chat_with_functions( + messages=gc_messages, + functions=functions, + user_id=str(user_id) if user_id else None, + temperature=temperature, + max_tokens=max_tokens, + ) + + # Извлекаем function_call из ответа + function_call = None + if result.get("choices"): + choice = result["choices"][0] + message = choice.get("message", {}) + function_call = message.get("function_call") + + return { + "success": True, + "content": result.get("content", ""), + "function_call": function_call, + "model": result.get("model", "GigaChat"), + "usage": result.get("usage", {}), + "functions_state_id": result.get("functions_state_id") + } + + except Exception as e: + logger.exception(f"Ошибка _chat_with_functions: {e}") + return { + "success": False, + "error": str(e), + "function_call": None + } async def chat( self, @@ -181,44 +430,29 @@ class GigaChatProvider(BaseAIProvider): **kwargs ) -> ProviderResponse: """ - Отправить запрос GigaChat. - - Args: - prompt: Запрос пользователя - system_prompt: Системный промпт - context: История диалога - tools: Доступные инструменты (схема) - on_chunk: Callback для потокового вывода (не используется) - user_id: ID пользователя - **kwargs: Дополнительные параметры - - Returns: - ProviderResponse с ответом и возможными вызовами инструментов + Отправить запрос GigaChat (без function calling). + + Используется когда tools не переданы. """ try: - # Формируем системный промпт с инструментами - full_system_prompt = system_prompt or "" - - if tools: - tools_prompt = self._build_tools_prompt(tools) - full_system_prompt += tools_prompt - # Формируем сообщения messages = [] - - if full_system_prompt: - messages.append(GigaChatMessage(role="system", content=full_system_prompt)) - + + if system_prompt: + messages.append(GigaChatMessage(role="system", content=system_prompt)) + if context: for msg in context: role = msg.get("role", "user") content = msg.get("content", "") - if role in ("user", "assistant", "system"): + if role == "system": + continue + if role in ("user", "assistant"): messages.append(GigaChatMessage(role=role, content=content)) - + if prompt: messages.append(GigaChatMessage(role="user", content=prompt)) - + # Выполняем запрос result = await self._tool.chat( messages=messages, @@ -226,7 +460,7 @@ class GigaChatProvider(BaseAIProvider): temperature=kwargs.get("temperature", 0.7), max_tokens=kwargs.get("max_tokens", 2000), ) - + if not result.get("content"): if result.get("error"): return ProviderResponse( @@ -240,20 +474,14 @@ class GigaChatProvider(BaseAIProvider): error="Пустой ответ от GigaChat", provider_name=self.provider_name ) - + content = result["content"] - - # Парсим вызовы инструментов - tool_calls = self._parse_tool_calls(content) - - # Очищаем контент от блоков инструментов - clean_content = self._remove_tool_blocks(content) - + return ProviderResponse( success=True, message=AIMessage( - content=clean_content, - tool_calls=tool_calls, + content=content, + tool_calls=[], metadata={ "model": result.get("model", "GigaChat"), "usage": result.get("usage", {}) @@ -262,7 +490,7 @@ class GigaChatProvider(BaseAIProvider): provider_name=self.provider_name, usage=result.get("usage") ) - + except Exception as e: logger.error(f"Ошибка GigaChat провайдера: {e}") return ProviderResponse( @@ -280,9 +508,8 @@ class GigaChatProvider(BaseAIProvider): ) -> ToolCall: """ Выполнить инструмент (заглушка). - - GigaChat не выполняет инструменты напрямую - это делает - AIProviderManager через process_with_tools. + + Инструменты выполняются через process_with_tools. """ return ToolCall( tool_name=tool_name, diff --git a/bot/tools/__init__.py b/bot/tools/__init__.py index 073540b..d56af23 100644 --- a/bot/tools/__init__.py +++ b/bot/tools/__init__.py @@ -124,4 +124,4 @@ def register_tool(tool_class: type) -> type: # Авто-импорт инструментов для регистрации # Импортируем после определения register_tool чтобы декоратор сработал -from bot.tools import ddgs_tool, rss_tool, ssh_tool, cron_tool, gigachat_tool +from bot.tools import ddgs_tool, rss_tool, ssh_tool, cron_tool, gigachat_tool, file_system_tool diff --git a/bot/tools/file_system_tool.py b/bot/tools/file_system_tool.py new file mode 100644 index 0000000..7c81f0a --- /dev/null +++ b/bot/tools/file_system_tool.py @@ -0,0 +1,803 @@ +#!/usr/bin/env python3 +""" +File System Tool - инструмент для работы с файловой системой Linux. + +Позволяет AI-агенту выполнять операции с файлами и директориями: +- Чтение файлов (cat) +- Запись файлов +- Копирование (cp) +- Перемещение (mv) +- Удаление (rm) +- Создание директорий (mkdir) +- Список файлов (ls) +- Проверка существования +- Поиск файлов + +Инструмент работает от имени пользователя на локальной машине. +""" + +import logging +import os +import shutil +import subprocess +import asyncio +from pathlib import Path +from typing import Optional, Dict, Any, List +from dataclasses import dataclass, field + +from bot.tools import BaseTool, ToolResult, register_tool + +logger = logging.getLogger(__name__) + + +class FileSystemTool(BaseTool): + """Инструмент для работы с файловой системой.""" + + name = "file_system_tool" + description = "Работа с файловой системой Linux: чтение/запись файлов, копирование, перемещение, удаление, создание директорий, просмотр списка файлов." + category = "system" + + # Безопасные пути - где можно работать + ALLOWED_BASE_PATHS = [ + Path.home(), # Домашняя директория + Path("/tmp"), + Path("/var/tmp"), + ] + + # Опасные пути - куда нельзя записывать/удалять + DANGEROUS_PATHS = [ + Path("/"), + Path("/etc"), + Path("/usr"), + Path("/bin"), + Path("/sbin"), + Path("/boot"), + Path("/dev"), + Path("/proc"), + Path("/sys"), + ] + + def __init__(self): + self._last_operation: Optional[str] = None + self._operation_history: List[Dict] = [] + + def _is_path_safe(self, path: Path, allow_write: bool = True) -> tuple[bool, str]: + """ + Проверить безопасность пути. + + Args: + path: Путь для проверки + allow_write: Если True, проверяем возможность записи + + Returns: + (is_safe: bool, reason: str) + """ + try: + # Разрешаем абсолютные и относительные пути + if not path.is_absolute(): + path = Path.cwd() / path + + # Сначала проверяем на наличие в разрешённых путях (это важно!) + for allowed in self.ALLOWED_BASE_PATHS: + try: + path.relative_to(allowed) + return True, "Путь безопасен" + except ValueError: + pass + + # Если путь не в разрешённых - проверяем на опасные + for dangerous in self.DANGEROUS_PATHS: + # Пропускаем корень если путь не в разрешённых уже + if dangerous == Path("/"): + continue + + try: + path.relative_to(dangerous) + return False, f"Путь {path} находится в защищённой директории {dangerous}" + except ValueError: + pass + + # Если путь не в разрешённых и не в запрещённых - разрешаем с предупреждением + return True, f"Путь {path} может быть недоступен" + + except Exception as e: + return False, f"Ошибка проверки пути: {e}" + + def _resolve_path(self, path_str: str) -> Path: + """Преобразовать строку пути в Path объект.""" + path = Path(path_str) + + # Расширяем ~ в домашнюю директорию + # Важно: Path("~/file") не работает, нужно expanduser() + if path_str.startswith('~'): + path = Path(path_str).expanduser() + elif not path.is_absolute(): + # Если путь относительный, делаем его абсолютным от домашней директории + path = Path.home() / path_str + + return path + + async def read_file(self, path: str, limit: int = 100) -> Dict[str, Any]: + """ + Прочитать файл. + + Args: + path: Путь к файлу + limit: Максимальное количество строк для чтения + + Returns: + Dict с content, lines, error + """ + try: + file_path = self._resolve_path(path) + + # Проверка безопасности + is_safe, reason = self._is_path_safe(file_path, allow_write=False) + if not is_safe: + return {"error": reason, "success": False} + + if not file_path.exists(): + return {"error": f"Файл не существует: {file_path}", "success": False} + + if not file_path.is_file(): + return {"error": f"Не файл: {file_path}", "success": False} + + # Читаем файл + with open(file_path, 'r', encoding='utf-8', errors='replace') as f: + lines = f.readlines() + + # Ограничиваем количество строк + if len(lines) > limit: + content = ''.join(lines[:limit]) + truncated = True + total_lines = len(lines) + else: + content = ''.join(lines) + truncated = False + total_lines = len(lines) + + logger.info(f"Прочитан файл: {file_path} ({total_lines} строк)") + + return { + "success": True, + "content": content, + "path": str(file_path), + "lines_read": min(len(lines), limit), + "total_lines": total_lines, + "truncated": truncated + } + + except Exception as e: + logger.error(f"Ошибка чтения файла {path}: {e}") + return {"error": str(e), "success": False} + + async def write_file(self, path: str, content: str, append: bool = False) -> Dict[str, Any]: + """ + Записать в файл. + + Args: + path: Путь к файлу + content: Содержимое для записи + append: Если True, добавить в конец файла + + Returns: + Dict с success, path, bytes_written + """ + try: + file_path = self._resolve_path(path) + + # Проверка безопасности + is_safe, reason = self._is_path_safe(file_path, allow_write=True) + if not is_safe: + return {"error": reason, "success": False} + + # Создаём родительские директории если нужно + file_path.parent.mkdir(parents=True, exist_ok=True) + + # Записываем файл + mode = 'a' if append else 'w' + with open(file_path, mode, encoding='utf-8') as f: + bytes_written = f.write(content) + + logger.info(f"Записан файл: {file_path} ({bytes_written} байт)") + + return { + "success": True, + "path": str(file_path), + "bytes_written": bytes_written, + "appended": append + } + + except Exception as e: + logger.error(f"Ошибка записи файла {path}: {e}") + return {"error": str(e), "success": False} + + async def list_directory(self, path: str = ".", show_hidden: bool = False) -> Dict[str, Any]: + """ + Показать список файлов в директории. + + Args: + path: Путь к директории + show_hidden: Показывать скрытые файлы + + Returns: + Dict с files, directories, total + """ + try: + dir_path = self._resolve_path(path) + + is_safe, _ = self._is_path_safe(dir_path, allow_write=False) + if not is_safe: + return {"error": "Доступ к директории ограничен", "success": False} + + if not dir_path.exists(): + return {"error": f"Директория не существует: {dir_path}", "success": False} + + if not dir_path.is_dir(): + return {"error": f"Не директория: {dir_path}", "success": False} + + files = [] + directories = [] + + for item in dir_path.iterdir(): + if not show_hidden and item.name.startswith('.'): + continue + + try: + stat = item.stat() + size = stat.st_size + mtime = stat.st_mtime + except: + size = 0 + mtime = 0 + + item_info = { + "name": item.name, + "path": str(item), + "size": size, + "modified": mtime + } + + if item.is_file(): + files.append(item_info) + elif item.is_dir(): + directories.append(item_info) + + # Сортируем по имени + files.sort(key=lambda x: x["name"]) + directories.sort(key=lambda x: x["name"]) + + return { + "success": True, + "path": str(dir_path), + "files": files, + "directories": directories, + "total_files": len(files), + "total_dirs": len(directories) + } + + except Exception as e: + logger.error(f"Ошибка списка директории {path}: {e}") + return {"error": str(e), "success": False} + + async def copy_file(self, source: str, destination: str) -> Dict[str, Any]: + """ + Скопировать файл или директорию. + + Args: + source: Исходный путь + destination: Целевой путь + + Returns: + Dict с success, source, destination + """ + try: + src_path = self._resolve_path(source) + dst_path = self._resolve_path(destination) + + # Проверка безопасности + is_safe_src, reason = self._is_path_safe(src_path, allow_write=False) + if not is_safe_src: + return {"error": f"Источник: {reason}", "success": False} + + is_safe_dst, reason = self._is_path_safe(dst_path, allow_write=True) + if not is_safe_dst: + return {"error": f"Назначение: {reason}", "success": False} + + if not src_path.exists(): + return {"error": f"Источник не существует: {src_path}", "success": False} + + # Копируем + if src_path.is_file(): + shutil.copy2(src_path, dst_path) + else: + shutil.copytree(src_path, dst_path, dirs_exist_ok=True) + + logger.info(f"Скопировано: {src_path} -> {dst_path}") + + return { + "success": True, + "source": str(src_path), + "destination": str(dst_path), + "operation": "copy" + } + + except Exception as e: + logger.error(f"Ошибка копирования {source} -> {destination}: {e}") + return {"error": str(e), "success": False} + + async def move_file(self, source: str, destination: str) -> Dict[str, Any]: + """ + Переместить файл или директорию. + + Args: + source: Исходный путь + destination: Целевой путь + + Returns: + Dict с success, source, destination + """ + try: + src_path = self._resolve_path(source) + dst_path = self._resolve_path(destination) + + # Проверка безопасности + is_safe_src, reason = self._is_path_safe(src_path, allow_write=False) + if not is_safe_src: + return {"error": f"Источник: {reason}", "success": False} + + is_safe_dst, reason = self._is_path_safe(dst_path, allow_write=True) + if not is_safe_dst: + return {"error": f"Назначение: {reason}", "success": False} + + if not src_path.exists(): + return {"error": f"Источник не существует: {src_path}", "success": False} + + # Перемещаем + shutil.move(src_path, dst_path) + + logger.info(f"Перемещено: {src_path} -> {dst_path}") + + return { + "success": True, + "source": str(src_path), + "destination": str(dst_path), + "operation": "move" + } + + except Exception as e: + logger.error(f"Ошибка перемещения {source} -> {destination}: {e}") + return {"error": str(e), "success": False} + + async def delete(self, path: str, recursive: bool = False) -> Dict[str, Any]: + """ + Удалить файл или директорию. + + Args: + path: Путь к файлу/директории + recursive: Если True, удалять рекурсивно + + Returns: + Dict с success, path, deleted_count + """ + try: + file_path = self._resolve_path(path) + + # Проверка безопасности + is_safe, reason = self._is_path_safe(file_path, allow_write=True) + if not is_safe: + return {"error": reason, "success": False} + + if not file_path.exists(): + return {"error": f"Путь не существует: {file_path}", "success": False} + + deleted_count = 0 + + if file_path.is_file(): + file_path.unlink() + deleted_count = 1 + elif file_path.is_dir(): + if recursive: + shutil.rmtree(file_path) + # Считаем количество удалённых файлов + deleted_count = -1 # Неизвестно + else: + return { + "error": "Директория не пуста. Используйте recursive=True для рекурсивного удаления", + "success": False + } + + logger.info(f"Удалено: {file_path}") + + return { + "success": True, + "path": str(file_path), + "deleted_count": deleted_count, + "operation": "delete" + } + + except Exception as e: + logger.error(f"Ошибка удаления {path}: {e}") + return {"error": str(e), "success": False} + + async def create_directory(self, path: str, parents: bool = True) -> Dict[str, Any]: + """ + Создать директорию. + + Args: + path: Путь к директории + parents: Если True, создавать родительские директории + + Returns: + Dict с success, path + """ + try: + dir_path = self._resolve_path(path) + + # Проверка безопасности + is_safe, reason = self._is_path_safe(dir_path, allow_write=True) + if not is_safe: + return {"error": reason, "success": False} + + if dir_path.exists(): + if dir_path.is_dir(): + return { + "success": True, + "path": str(dir_path), + "already_exists": True + } + else: + return {"error": f"Существует файл с таким именем: {dir_path}", "success": False} + + # Создаём директорию + dir_path.mkdir(parents=parents, exist_ok=parents) + + logger.info(f"Создана директория: {dir_path}") + + return { + "success": True, + "path": str(dir_path), + "operation": "mkdir" + } + + except Exception as e: + logger.error(f"Ошибка создания директории {path}: {e}") + return {"error": str(e), "success": False} + + async def file_info(self, path: str) -> Dict[str, Any]: + """ + Получить информацию о файле/директории. + + Args: + path: Путь к файлу + + Returns: + Dict с информацией о файле + """ + try: + file_path = self._resolve_path(path) + + is_safe, _ = self._is_path_safe(file_path, allow_write=False) + if not is_safe: + return {"error": "Доступ ограничен", "success": False} + + if not file_path.exists(): + return {"error": f"Путь не существует: {file_path}", "success": False} + + stat = file_path.stat() + + return { + "success": True, + "path": str(file_path), + "name": file_path.name, + "is_file": file_path.is_file(), + "is_dir": file_path.is_dir(), + "size": stat.st_size, + "created": stat.st_ctime, + "modified": stat.st_mtime, + "permissions": oct(stat.st_mode)[-3:] + } + + except Exception as e: + logger.error(f"Ошибка получения информации о {path}: {e}") + return {"error": str(e), "success": False} + + async def search_files( + self, + path: str = ".", + pattern: str = "*", + max_results: int = 50 + ) -> Dict[str, Any]: + """ + Найти файлы по паттерну. + + Args: + path: Директория для поиска + pattern: Паттерн (glob-style) + max_results: Максимум результатов + + Returns: + Dict с найденными файлами + """ + try: + base_path = self._resolve_path(path) + + is_safe, _ = self._is_path_safe(base_path, allow_write=False) + if not is_safe: + return {"error": "Доступ ограничен", "success": False} + + results = [] + + # Используем glob для поиска + import glob + matches = glob.glob(str(base_path / pattern), recursive=True) + + for match in matches[:max_results]: + match_path = Path(match) + try: + stat = match_path.stat() + results.append({ + "path": str(match_path), + "name": match_path.name, + "size": stat.st_size, + "is_file": match_path.is_file(), + "is_dir": match_path.is_dir() + }) + except: + pass + + return { + "success": True, + "pattern": pattern, + "base_path": str(base_path), + "found": len(results), + "results": results, + "truncated": len(matches) > max_results + } + + except Exception as e: + logger.error(f"Ошибка поиска файлов {pattern} в {path}: {e}") + return {"error": str(e), "success": False} + + async def execute_shell(self, command: str, timeout: int = 30) -> Dict[str, Any]: + """ + Выполнить shell-команду (для сложных операций). + + Args: + command: Команда для выполнения + timeout: Таймаут в секундах + + Returns: + Dict с stdout, stderr, returncode + """ + try: + # Разрешаем только безопасные команды + SAFE_COMMANDS = [ + 'ls', 'cat', 'cp', 'mv', 'rm', 'mkdir', 'rmdir', + 'touch', 'chmod', 'chown', 'find', 'grep', 'head', + 'tail', 'wc', 'sort', 'uniq', 'pwd', 'du', 'df' + ] + + # Извлекаем базовую команду + base_cmd = command.split()[0] if command.split() else '' + + if base_cmd not in SAFE_COMMANDS: + return { + "error": f"Команда '{base_cmd}' не разрешена. Используйте безопасные команды: {SAFE_COMMANDS}", + "success": False + } + + # Выполняем команду + process = await asyncio.create_subprocess_shell( + command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd=str(Path.home()) + ) + + try: + stdout, stderr = await asyncio.wait_for( + process.communicate(), + timeout=timeout + ) + except asyncio.TimeoutError: + process.kill() + return { + "error": f"Таймаут выполнения команды ({timeout} сек)", + "success": False + } + + return { + "success": process.returncode == 0, + "stdout": stdout.decode('utf-8', errors='replace').strip(), + "stderr": stderr.decode('utf-8', errors='replace').strip(), + "returncode": process.returncode, + "command": command + } + + except Exception as e: + logger.error(f"Ошибка выполнения команды {command}: {e}") + return {"error": str(e), "success": False} + + async def execute(self, operation: str, **kwargs) -> ToolResult: + """ + Выполнить операцию с файловой системой. + + Args: + operation: Тип операции (read, write, copy, move, delete, mkdir, list, info, search, shell) + **kwargs: Аргументы операции + + Returns: + ToolResult с результатом + """ + logger.info(f"File System Tool: operation={operation}, args={kwargs}") + + self._last_operation = operation + + try: + result = None + + if operation == 'read': + result = await self.read_file( + path=kwargs.get('path', ''), + limit=kwargs.get('limit', 100) + ) + + elif operation == 'write': + result = await self.write_file( + path=kwargs.get('path', ''), + content=kwargs.get('content', ''), + append=kwargs.get('append', False) + ) + + elif operation == 'copy': + result = await self.copy_file( + source=kwargs.get('source', ''), + destination=kwargs.get('destination', '') + ) + + elif operation == 'move': + result = await self.move_file( + source=kwargs.get('source', ''), + destination=kwargs.get('destination', '') + ) + + elif operation == 'delete': + result = await self.delete( + path=kwargs.get('path', ''), + recursive=kwargs.get('recursive', False) + ) + + elif operation == 'mkdir': + result = await self.create_directory( + path=kwargs.get('path', ''), + parents=kwargs.get('parents', True) + ) + + elif operation == 'list': + result = await self.list_directory( + path=kwargs.get('path', '.'), + show_hidden=kwargs.get('show_hidden', False) + ) + + elif operation == 'info': + result = await self.file_info( + path=kwargs.get('path', '') + ) + + elif operation == 'search': + result = await self.search_files( + path=kwargs.get('path', '.'), + pattern=kwargs.get('pattern', '*'), + max_results=kwargs.get('max_results', 50) + ) + + elif operation == 'shell': + result = await self.execute_shell( + command=kwargs.get('command', ''), + timeout=kwargs.get('timeout', 30) + ) + + else: + return ToolResult( + success=False, + error=f"Неизвестная операция: {operation}. Доступные: read, write, copy, move, delete, mkdir, list, info, search, shell" + ) + + # Сохраняем в историю + self._operation_history.append({ + 'operation': operation, + 'args': kwargs, + 'result': result, + 'timestamp': __import__('datetime').datetime.now().isoformat() + }) + + # Ограничиваем историю + if len(self._operation_history) > 100: + self._operation_history = self._operation_history[-50:] + + return ToolResult( + success=result.get('success', False), + data=result, + metadata={ + 'operation': operation, + 'last_path': result.get('path', result.get('source', '')) + } + ) + + except Exception as e: + logger.exception(f"Ошибка File System Tool: {e}") + return ToolResult( + success=False, + error=str(e), + metadata={'operation': operation} + ) + + def get_schema(self) -> Dict[str, Any]: + """Получить схему инструмента для промпта.""" + return { + "name": self.name, + "description": self.description, + "parameters": { + "type": "object", + "properties": { + "operation": { + "type": "string", + "description": "Тип операции", + "enum": ["read", "write", "copy", "move", "delete", "mkdir", "list", "info", "search", "shell"] + }, + "path": { + "type": "string", + "description": "Путь к файлу/директории" + }, + "source": { + "type": "string", + "description": "Исходный путь (для copy/move)" + }, + "destination": { + "type": "string", + "description": "Целевой путь (для copy/move)" + }, + "content": { + "type": "string", + "description": "Содержимое для записи" + }, + "pattern": { + "type": "string", + "description": "Паттерн для поиска файлов" + }, + "command": { + "type": "string", + "description": "Shell команда (для операции shell)" + }, + "limit": { + "type": "integer", + "description": "Лимит строк для чтения" + }, + "max_results": { + "type": "integer", + "description": "Максимум результатов поиска" + }, + "recursive": { + "type": "boolean", + "description": "Рекурсивное удаление" + }, + "show_hidden": { + "type": "boolean", + "description": "Показывать скрытые файлы" + }, + "timeout": { + "type": "integer", + "description": "Таймаут для shell команд" + } + }, + "required": ["operation"] + } + } + + +# Автоматическая регистрация при импорте +@register_tool +class FileSystemToolAuto(FileSystemTool): + """Авто-регистрируемая версия FileSystemTool.""" + pass diff --git a/bot/tools/gigachat_tool.py b/bot/tools/gigachat_tool.py index a891fd9..dc015a6 100644 --- a/bot/tools/gigachat_tool.py +++ b/bot/tools/gigachat_tool.py @@ -34,9 +34,14 @@ class GigaChatConfig: client_secret: str scope: str = "GIGACHAT_API_PERS" auth_url: str = "https://ngw.devices.sberbank.ru:9443/api/v2/oauth" - model: str = "GigaChat" + model: str = "GigaChat-Pro" # Модель по умолчанию + model_lite: str = "GigaChat" # Lite модель для простых запросов + model_pro: str = "GigaChat-Pro" # Pro модель для сложных запросов api_url: str = "https://gigachat.devices.sberbank.ru/api/v1" timeout: int = 60 + # Пороги для переключения моделей + complexity_token_threshold: int = 50 # Если токенов в запросе > порога → Pro + complexity_keyword_threshold: int = 2 # Если ключевых слов сложности >= порога → Pro class GigaChatTool: @@ -75,6 +80,10 @@ class GigaChatTool: scope=os.getenv("GIGACHAT_SCOPE", "GIGACHAT_API_PERS"), auth_url=os.getenv("GIGACHAT_AUTH_URL", "https://ngw.devices.sberbank.ru:9443/api/v2/oauth"), model=os.getenv("GIGACHAT_MODEL", "GigaChat-Pro"), + model_lite=os.getenv("GIGACHAT_MODEL_LITE", "GigaChat"), + model_pro=os.getenv("GIGACHAT_MODEL_PRO", "GigaChat-Pro"), + complexity_token_threshold=int(os.getenv("GIGACHAT_TOKEN_THRESHOLD", "50")), + complexity_keyword_threshold=int(os.getenv("GIGACHAT_KEYWORD_THRESHOLD", "2")), ) def _get_auth_headers(self) -> Dict[str, str]: @@ -124,7 +133,121 @@ class GigaChatTool: logger.info(f"GigaChat токен получен: {self._access_token[:50]}...") return self._access_token - + + def _estimate_query_complexity(self, messages: List[GigaChatMessage]) -> dict: + """ + Оценить сложность запроса для выбора модели (Lite или Pro). + + Критерии сложности: + 1. Длина запроса (количество токенов/слов) + 2. Наличие ключевых слов для сложных задач + 3. Наличие инструментов (tool calls) + 4. Технические термины + + Returns: + Dict с оценкой сложности и рекомендуемой моделью + """ + # Собираем весь текст из сообщений пользователя + user_text = "" + for msg in messages: + if msg.role == "user": + user_text += " " + msg.content + + user_text = user_text.lower() + + # 1. Оценка по длине (считаем слова как грубая оценка токенов) + word_count = len(user_text.split()) + token_estimate = word_count * 1.3 # Примерная конверсия слов в токены + + # 2. Ключевые слова для сложных задач + complex_keywords = [ + # Программирование и код + 'код', 'функция', 'класс', 'метод', 'переменная', 'цикл', 'условие', + 'алгоритм', 'структура данных', 'массив', 'словарь', 'список', + 'импорт', 'экспорт', 'модуль', 'пакет', 'библиотека', 'фреймворк', + 'дебаг', 'отладк', 'тест', 'юнит тест', 'интеграционн', + 'рефактор', 'оптимиз', 'производительност', + # Анализ и работа с данными + 'анализ', 'анализиров', 'сравни', 'сравнени', 'исследовани', + 'закономерност', 'паттерн', 'тенденци', 'прогноз', + # Системные задачи + 'конфигурац', 'настройк', 'деплой', 'развертывани', 'оркестрац', + 'контейнер', 'docker', 'kubernetes', 'k8s', 'helm', + 'мониторинг', 'логировани', 'трассировк', 'метрик', + # Сложные запросы + 'объясни', 'расскажи подробно', 'детальн', 'подробн', + 'почему', 'зачем', 'как работает', 'принцип работы', + 'спроектируй', 'спроектировать', 'архитектур', 'архитектура', + 'реализуй', 'реализовать', 'напиши код', 'создай функцию', + ] + + complexity_keywords_count = sum( + 1 for keyword in complex_keywords + if keyword in user_text + ) + + # 3. Наличие технических терминов + tech_terms = [ + 'api', 'http', 'rest', 'graphql', 'grpc', 'websocket', + 'sql', 'nosql', 'postgres', 'mysql', 'mongodb', 'redis', + 'git', 'merge', 'commit', 'branch', 'pull request', 'merge request', + 'ci/cd', 'pipeline', 'jenkins', 'gitlab', 'github', + 'linux', 'bash', 'shell', 'terminal', 'ssh', + 'python', 'javascript', 'typescript', 'java', 'go', 'rust', 'cpp', + 'react', 'vue', 'angular', 'django', 'flask', 'fastapi', 'express', + ] + + tech_terms_count = sum( + 1 for term in tech_terms + if term in user_text + ) + + # 4. Наличие инструментов в контексте + has_tools = any( + 'tool' in msg.content.lower() or 'инструмент' in msg.content.lower() + for msg in messages + ) + + # Принятие решения + use_pro = False + reasons = [] + + # Если токенов больше порога → Pro + if token_estimate > self.config.complexity_token_threshold: + use_pro = True + reasons.append(f"длинный запрос ({word_count} слов, ~{int(token_estimate)} токенов)") + + # Если много ключевых слов сложности → Pro + if complexity_keywords_count >= self.config.complexity_keyword_threshold: + use_pro = True + reasons.append(f"сложная задача ({complexity_keywords_count} ключевых слов)") + + # Если есть технические термины + инструменты → Pro + if tech_terms_count >= 2 and has_tools: + use_pro = True + reasons.append(f"техническая задача с инструментами ({tech_terms_count} терминов)") + + # Если есть явные запросы на работу с кодом/файлами → Pro + if any(phrase in user_text for phrase in [ + 'исходник', 'source code', 'посмотри код', 'проанализируй код', + 'работай с файлом', 'прочитай файл', 'изучи код' + ]): + use_pro = True + reasons.append("работа с кодом/файлами") + + model = self.config.model_pro if use_pro else self.config.model_lite + + return { + "use_pro": use_pro, + "model": model, + "word_count": word_count, + "token_estimate": int(token_estimate), + "complexity_keywords": complexity_keywords_count, + "tech_terms": tech_terms_count, + "has_tools": has_tools, + "reasons": reasons + } + async def chat( self, messages: Optional[List[GigaChatMessage]] = None, @@ -157,7 +280,7 @@ class GigaChatTool: - finish_reason: Причина завершения """ token = await self._get_access_token() - + # Формируем сообщения if messages is None: if use_history: @@ -168,22 +291,31 @@ class GigaChatTool: # Добавляем новые сообщения к истории self._chat_history.extend(messages) messages = self._chat_history.copy() - + + # Автоматически выбираем модель на основе сложности запроса + # Если модель явно не указана + selected_model = model + model_info = None + if selected_model is None: + model_info = self._estimate_query_complexity(messages) + selected_model = model_info["model"] + logger.info(f"📊 GigaChat выбор модели: {selected_model} (причины: {', '.join(model_info['reasons']) if model_info['reasons'] else 'простой запрос'})") + # Преобразуем сообщения в формат API api_messages = [ {"role": msg.role, "content": msg.content} for msg in messages ] - + payload = { - "model": model or self.config.model, + "model": selected_model, "messages": api_messages, "temperature": temperature, "max_tokens": max_tokens, "top_p": top_p, "repetition_penalty": repetition_penalty, } - + headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", @@ -193,7 +325,7 @@ class GigaChatTool: # Логируем запрос для отладки logger.debug(f"GigaChat API URL: {self.config.api_url}/chat/completions") logger.debug(f"GigaChat headers: {headers}") - logger.debug(f"GigaChat payload: model={model or self.config.model}, messages={len(api_messages)}, max_tokens={max_tokens}") + logger.debug(f"GigaChat payload: model={selected_model}, messages={len(api_messages)}, max_tokens={max_tokens}") # GigaChat использует самоподписанные сертификаты - отключаем верификацию async with httpx.AsyncClient(verify=False) as client: @@ -237,9 +369,10 @@ class GigaChatTool: return { "content": data["choices"][0]["message"]["content"] if data.get("choices") else "", - "model": data.get("model", self.config.model), + "model": data.get("model", selected_model), "usage": data.get("usage", {}), "finish_reason": data["choices"][0]["finish_reason"] if data.get("choices") else "", + "complexity_info": model_info, # Информация о выборе модели для отладки } def clear_history(self): @@ -258,6 +391,128 @@ class GigaChatTool: ] # Добавляем новый в начало self._chat_history.insert(0, GigaChatMessage(role="system", content=prompt)) + + async def chat_with_functions( + self, + messages: List[Dict[str, Any]], + functions: Optional[List[Dict[str, Any]]] = None, + model: Optional[str] = None, + temperature: float = 0.7, + max_tokens: int = 2000, + top_p: float = 0.1, + repetition_penalty: float = 1.0, + user_id: Optional[str] = None, + ) -> Dict[str, Any]: + """ + Отправка запроса к GigaChat API с поддержкой function calling. + + Args: + messages: Список сообщений в формате API + functions: Массив функций для вызова + model: Модель для генерации + temperature: Температура генерации + max_tokens: Максимум токенов + top_p: Параметр top-p sampling + repetition_penalty: Штраф за повторения + user_id: ID пользователя + + Returns: + Dict с ответом API включая возможный function_call + """ + token = await self._get_access_token() + + # Выбираем модель на основе сложности запроса + selected_model = model + model_info = None + if selected_model is None: + # Преобразуем messages в формат GigaChatMessage для оценки сложности + gc_messages = [GigaChatMessage(role=msg["role"], content=msg.get("content", "")) for msg in messages] + model_info = self._estimate_query_complexity(gc_messages) + selected_model = model_info["model"] + logger.info(f"📊 GigaChat выбор модели: {selected_model} (причины: {', '.join(model_info['reasons']) if model_info['reasons'] else 'простой запрос'})") + + # Формируем payload + payload = { + "model": selected_model, + "messages": messages, + "temperature": temperature, + "max_tokens": max_tokens, + "top_p": top_p, + "repetition_penalty": repetition_penalty, + } + + # Добавляем functions если есть + if functions: + payload["functions"] = functions + # function_call: "auto" позволяет модели самой решать когда вызывать функции + payload["function_call"] = "auto" + logger.info(f"🔧 GigaChat function calling: {len(functions)} функций доступно") + + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": "application/json", + "X-User-Id": str(user_id) if user_id else "telegram-bot", + } + + logger.info(f"📤 GigaChat API: model={selected_model}, messages={len(messages)}, functions={len(functions) if functions else 0}") + + # GigaChat использует самоподписанные сертификаты - отключаем верификацию + async with httpx.AsyncClient(verify=False) as client: + try: + response = await client.post( + f"{self.config.api_url}/chat/completions", + headers=headers, + json=payload, + timeout=self.config.timeout, + ) + + logger.debug(f"GigaChat chat_with_functions status: {response.status_code}") + logger.debug(f"GigaChat response: {response.text[:500]}") + + if response.status_code != 200: + logger.error(f"GigaChat error response: {response.text[:1000]}") + + response.raise_for_status() + data = response.json() + except httpx.HTTPStatusError as e: + logger.error(f"GigaChat HTTP error: {e}") + logger.error(f"Response: {e.response.text[:500]}") + return { + "content": "", + "error": f"HTTP {e.response.status_code}: {e.response.text[:200]}", + "choices": [], + } + except httpx.HTTPError as e: + logger.error(f"GigaChat request error: {e}") + return { + "content": "", + "error": f"Request error: {str(e)}", + "choices": [], + } + + # Извлекаем content и function_call + content = "" + function_call = None + functions_state_id = None + + if data.get("choices"): + choice = data["choices"][0] + message = choice.get("message", {}) + content = message.get("content", "") + function_call = message.get("function_call") + functions_state_id = message.get("functions_state_id") + + logger.info(f"📬 GigaChat ответ: content_len={len(content)}, function_call={function_call is not None}, functions_state_id={functions_state_id}") + + return { + "content": content, + "function_call": function_call, + "functions_state_id": functions_state_id, + "model": data.get("model", selected_model), + "usage": data.get("usage", {}), + "finish_reason": data["choices"][0]["finish_reason"] if data.get("choices") else "", + "choices": data.get("choices", []), + } async def generate_image( self, diff --git a/bot/utils/cleaners.py b/bot/utils/cleaners.py index 483dbe9..056ac68 100644 --- a/bot/utils/cleaners.py +++ b/bot/utils/cleaners.py @@ -63,7 +63,8 @@ def normalize_output(text: str) -> str: line = line[:match.start()] + f'{last_text}{last_percent}%' + line[match.end():] # СНАЧАЛА удаляем остатки ANSI-кодов из строки - line = re.sub(r'.', '', line) # + любой символ + # line = re.sub(r'.', '', line) # ← ЭТО УДАЛЯЛО ВСЁ! Закомментировал + line = clean_ansi_codes(line) # ← Используем правильную функцию # Удаляем дублирующийся текст вида "0% [текст] 0% [текст]" dup_pattern = re.compile(r'(\d+%\s*\[.+?\])(?:\s*\d+%\s*\[.+?\])+') diff --git a/bot/utils/formatters.py b/bot/utils/formatters.py index ef09b3b..0d96397 100644 --- a/bot/utils/formatters.py +++ b/bot/utils/formatters.py @@ -142,53 +142,131 @@ def split_message(text: str, max_length: int = MAX_MESSAGE_LENGTH) -> List[Tuple return parts -async def send_long_message(update: Update, text: str, parse_mode: str = None): +async def send_long_message(update: Update, text: str, parse_mode: str = None, pause_every: int = 3): """ Отправить длинный текст, разбив на несколько сообщений. - + Умная разбивка: - Блоки кода не разрываются между сообщениями - Если блок кода не влезает — отправляется без Markdown - - Нумерация (1/3), (2/3) только если сообщений > 1 + - Нумерация (1/N), (2/N) только если сообщений > 1 + - Если сообщение содержит блок кода, он закрывается в конце и открывается в начале следующего + - КАЖДЫЕ pause_after сообщений — пауза с кнопками "Продолжить" / "Отменить" + - После нажатия кнопки — они удаляются + + Args: + update: Telegram update + text: Текст для отправки + parse_mode: Режим парсинга (Markdown) + pause_every: Каждые сколько сообщений делать паузу (0 = без паузы) """ + from telegram import InlineKeyboardButton, InlineKeyboardMarkup + import asyncio + parts = split_message(text) total = len(parts) - + messages_sent = 0 + wait_msg = None + for i, (part, has_code) in enumerate(parts): # Добавляем номер части если их несколько if total > 1: header = f"({i+1}/{total}) " if len(header) + len(part) <= MAX_MESSAGE_LENGTH: part = header + part - + # Определяем parse_mode для этого сообщения - # Если передан parse_mode и нет проблем с блоками кода — используем его - # Если блок кода разорван — отправляем без Markdown для этой части if parse_mode and has_code: - # Сообщение содержит полный блок кода — используем Markdown actual_parse_mode = parse_mode elif parse_mode and not has_code: - # Сообщение без блоков кода — всё равно используем Markdown для другого форматирования actual_parse_mode = parse_mode else: - # Нет parse_mode или проблемы с кодом actual_parse_mode = None - + + # Если это не первое сообщение и предыдущее имело блок кода — открываем блок + # Если это не последнее сообщение и текущее имеет блок кода — закрываем блок + if total > 1 and actual_parse_mode: + if i > 0 and parts[i-1][1]: # Предыдущее имело блок кода + part = "```\n" + part + + if i < total - 1 and has_code: # Следующее будет иметь блок кода + part = part + "\n```" + try: await update.message.reply_text(part, parse_mode=actual_parse_mode) except Exception as e: # Фоллбэк: отправляем без разметки logger.debug(f"Ошибка Markdown, отправляем без разметки: {e}") await update.message.reply_text(part) - + + messages_sent += 1 + # Небольшая пауза между сообщениями await asyncio.sleep(0.1) + # КАЖДЫЕ pause_every сообщений — спрашиваем продолжать ли + if pause_every > 0 and messages_sent % pause_every == 0 and i < total - 1: + remaining = total - (i + 1) + keyboard = InlineKeyboardMarkup([ + [ + InlineKeyboardButton("▶️ Продолжить", callback_data=f"continue_output_{remaining}"), + InlineKeyboardButton("❌ Отменить", callback_data="cancel_output") + ] + ]) + + wait_msg = await update.message.reply_text( + f"📊 **Отправлено {messages_sent} из {total} сообщений**\n\n" + f"Осталось ещё {remaining} сообщений.\n\n" + f"Продолжить вывод?", + parse_mode="Markdown", + reply_markup=keyboard + ) + + # Ждём ответа пользователя (до 60 секунд) + from bot.config import state_manager + + user_id = update.effective_user.id + + # Сохраняем состояние ожидания + state = state_manager.get(user_id) + state.waiting_for_output_control = True + state.output_remaining = remaining + state.output_wait_message = wait_msg + + # Ждём 60 секунд + for _ in range(60): + await asyncio.sleep(1) + state = state_manager.get(user_id) + if not state.waiting_for_output_control: + # Пользователь ответил + if state.continue_output: + # Продолжаем - удаляем кнопки + try: + await wait_msg.delete() + except: + pass + break + else: + # Отменил - редактируем сообщение и убираем кнопки + try: + await wait_msg.edit_text("❌ **Вывод отменен пользователем**", parse_mode="Markdown") + except: + pass + return + + # Таймаут + if state.waiting_for_output_control: + state.waiting_for_output_control = False + try: + await wait_msg.edit_text("⏱️ **Время ожидания истекло**. Вывод продолжен.", parse_mode="Markdown") + except: + pass -def format_long_output(text: str, max_lines: int = 20, head_lines: int = 10, tail_lines: int = 10) -> str: + +def format_long_output(text: str, max_lines: int = 100, head_lines: int = 50, tail_lines: int = 50) -> str: """ Форматировать длинный вывод: показать первые и последние строки. - По умолчанию: первые 10 + последние 10 строк = 20 строк максимум. + По умолчанию: первые 50 + последние 50 строк = 100 строк максимум. """ lines = text.split('\n') total_lines = len(lines) diff --git a/system_prompt.md b/system_prompt.md index a96d1ce..1c5c6d7 100644 --- a/system_prompt.md +++ b/system_prompt.md @@ -116,6 +116,42 @@ cron_manager(action="list") --- +### 5. 📁 File System Tool (`file_system_tool`) + +**Назначение:** Работа с файловой системой Linux. + +**Когда использовать:** +- Пользователь просит прочитать, создать, скопировать, переместить, удалить файл +- Запросы про просмотр содержимого директории +- Слова: "прочитай", "покажи файл", "создай", "скопируй", "перемести", "удали", "ls", "cat", "cp", "mv", "rm", "mkdir" +- Команды Unix: `cat `, `ls `, `mkdir `, `cp `, `mv `, `rm `, `touch ` + +**Действия:** +- `read` — прочитать файл (параметры `path`, `limit`) +- `write` — записать в файл (параметры `path`, `content`, `append`) +- `copy` — копировать файл (параметры `source`, `destination`) +- `move` — переместить файл (параметры `source`, `destination`) +- `delete` — удалить файл (параметры `path`, `recursive`) +- `mkdir` — создать директорию (параметры `path`, `parents`) +- `list` — список файлов (параметры `path`, `show_hidden`) +- `info` — информация о файле (параметр `path`) +- `search` — поиск файлов (параметры `path`, `pattern`, `max_results`) +- `shell` — выполнить shell-команду (параметры `command`, `timeout`) + +**Примеры вызова:** +```python +file_system_tool(operation='read', path='/home/mirivlad/test.txt') +file_system_tool(operation='write', path='/tmp/note.txt', content='Текст заметки') +file_system_tool(operation='list', path='/home/mirivlad/git') +file_system_tool(operation='copy', source='file.txt', destination='backup/file.txt') +``` + +**Безопасность:** +- Разрешена работа в домашней директории, `/tmp`, `/var/tmp` +- Запрещена запись в системные директории (`/etc`, `/usr`, `/bin`, etc.) + +--- + ## 🧠 ПРИНЦИПЫ РАБОТЫ ### 1. **Автономность (Agentic AI)** @@ -135,10 +171,11 @@ cron_manager(action="list") ### 4. **Приоритеты инструментов** При принятии решения следуй приоритету: -1. **SSH** — если явная системная задача -2. **Cron** — если планирование/напоминание -3. **Поиск (DDGS)** — если нужны свежие данные из интернета -4. **RSS** — если новости из подписанных лент +1. **File System** — если операция с файлами/директориями +2. **SSH** — если явная системная задача на сервере +3. **Cron** — если планирование/напоминание +4. **Поиск (DDGS)** — если нужны свежие данные из интернета +5. **RSS** — если новости из подписанных лент --- @@ -227,10 +264,10 @@ Filesystem Size Used Avail Use% Mounted on ## 🎯 ТЕКУЩАЯ ВЕРСИЯ -**Bot Version:** 0.7.0 -**AI Provider Manager:** Support for multiple AI providers (Qwen, GigaChat) +**Bot Version:** 0.7.1 +**AI Provider Manager:** Поддержка multiple AI providers (Qwen Code, GigaChat) **Memory:** ChromaDB RAG + Vector Memory -**Tools:** ddgs_tool, rss_tool, ssh_tool, cron_tool +**Tools:** ddgs_tool, rss_tool, ssh_tool, cron_tool, file_system_tool ---