#!/usr/bin/env python3 """ AI Agent Module - автономный агент с инструментами. Агент может самостоятельно принимать решения об использовании инструментов на основе контекста запроса пользователя. """ import logging from typing import Optional, List, Dict, Any from dataclasses import dataclass from datetime import datetime from bot.tools import tools_registry, ToolResult logger = logging.getLogger(__name__) @dataclass class AgentDecision: """Решение агента об использовании инструмента.""" should_use_tool: bool tool_name: Optional[str] = None tool_args: Optional[Dict[str, Any]] = None confidence: float = 0.0 reasoning: str = "" class AIAgent: """ AI-агент с доступом к инструментам. Агент анализирует запрос и решает, нужно ли использовать какой-либо инструмент для выполнения задачи. """ # Триггеры для поиска в интернете SEARCH_TRIGGERS = [ 'найди', 'поиск', 'погугли', 'узнай', 'проверь в интернете', 'что нового', 'последние новости', 'свежая информация', 'как сделать', 'руководство', 'документация', 'tutorial', 'weather', 'news', 'search', 'find', 'look up', 'что это', 'кто такой', 'где находится', 'когда выйдет', 'скачай', 'загрузи', 'найди информацию', 'посмотри в сети' ] # Триггеры для RSS — ТОЛЬКО явные запросы новостей # Используем полные фразы чтобы избежать ложных срабатываний RSS_TRIGGERS = [ 'почитай новости', 'покажи новости', 'что нового в linux', 'новости it', 'tech news', 'opensource новости', 'linux новости', 'новости технологий', 'rss лента', 'дайджест новостей', 'свежие новости it', 'последние новости it', 'новости linux', 'it новости', 'новости opensource', 'лента новостей' ] # Триггеры для SSH-команд SSH_TRIGGERS = [ 'выполни команду', 'ssh', 'запусти на сервере', 'проверь сервер', 'посмотри логи', 'покажи процесс', 'сколько места', 'df', 'top', 'перезапусти', 'останови', 'запусти сервис', 'systemctl', 'проверь нагрузку', 'uptime', 'кто залогинен', 'who', 'last', 'посмотри в /var/log', 'проверь диск', 'мониторинг', 'выполни на 192.168.1', 'запусти скрипт', 'cron' ] # Триггеры для Cron-задач CRON_TRIGGERS = [ 'напомни', 'запланируй', 'каждый день', 'каждый час', 'периодически', 'по расписанию', 'автоматически', 'создай задачу', 'добавь в cron', 'регулярно', 'повторяй', 'каждую неделю', 'ежедневно', 'ежечасно' ] # Триггеры для работы с файлами (File System Tool) FILE_SYSTEM_TRIGGERS = [ 'прочитай файл', 'покажи файл', 'открой файл', 'посмотри файл', 'создай файл', 'запиши в файл', 'сохрани в файл', 'скопируй файл', 'перемести файл', 'удали файл', 'создай директорию', 'создай папку', 'покажи директорию', 'список файлов', 'что в папке', 'что в директории', 'найди файл', 'поиск файла', 'переименуй файл', 'посмотри содержимое', 'содержимое файла', 'cat ', 'ls ', 'mkdir ', 'cp ', 'mv ', 'rm ', 'touch ', 'сохрани текст', 'запиши текст', 'скопируй', 'перемести', 'удали директорию', 'удали папку', 'покажи файлы' ] def __init__(self): self.registry = tools_registry self._tool_use_history: List[Dict] = [] self._user_preferences: Dict[int, Dict] = {} # preferences per user def _should_search(self, message: str) -> tuple[bool, float]: """Проверить, нужен ли поиск в интернете.""" message_lower = message.lower() score = 0.0 # Прямые триггеры — высокий приоритет for trigger in self.SEARCH_TRIGGERS: if trigger in message_lower: return True, 0.9 # Вопросы с "что", "как", "где", "когда" о внешних фактах question_words = ['что такое', 'как сделать', 'где найти', 'когда будет'] for qword in question_words: if qword in message_lower: score = max(score, 0.7) # Упоминания текущих событий current_events = ['сегодня', 'сейчас', 'в этом году', 'recent', 'latest', '2024', '2025', '2026'] for event in current_events: if event in message_lower: score = max(score, 0.6) # Если есть вопросительные слова + внешние факты if any(word in message_lower for word in ['почему', 'зачем', 'как работает']): score = max(score, 0.65) return score >= 0.65, score def _should_read_rss(self, message: str) -> tuple[bool, float]: """Проверить, нужно ли читать RSS ленты. ВАЖНО: Используем ТОЛЬКО полные фразы-триггеры. Отдельные слова (типа "новости") НЕ активируют RSS — это предотвращает ложные срабатывания когда пользователь просто упоминает слово в контексте. """ message_lower = message.lower() # Только прямые фразы-триггеры — высокий порог for trigger in self.RSS_TRIGGERS: if trigger in message_lower: return True, 0.95 # Отдельные ключевые слова НЕ проверяем — только явные фразы! # Это предотвращает срабатывание на сообщения типа: # - "новости" (просто упомянул слово) # - "н.овости" (разбитое слово) # - "я читал новости вчера" (прошедшее время, не запрос) return False, 0.0 def _should_use_ssh(self, message: str) -> tuple[bool, float]: """Проверить, нужна ли SSH-команда.""" message_lower = message.lower() score = 0.0 # Прямые триггеры for trigger in self.SSH_TRIGGERS: if trigger in message_lower: return True, 0.9 # Команды системного администрирования sysadmin_tasks = ['проверь', 'посмотри', 'покажи', 'выполни', 'запусти'] sysadmin_objects = ['сервер', 'лог', 'процесс', 'диск', 'память', 'сервис', 'демон'] has_task = any(task in message_lower for task in sysadmin_tasks) has_object = any(obj in message_lower for obj in sysadmin_objects) if has_task and has_object: score = max(score, 0.75) # Упоминания конкретных утилит utils = ['systemctl', 'journalctl', 'top', 'htop', 'df', 'du', 'free', 'ps', 'netstat'] for util in utils: if util in message_lower: score = max(score, 0.8) return score >= 0.75, score def _should_use_cron(self, message: str) -> tuple[bool, float]: """Проверить, нужна ли cron-задача.""" message_lower = message.lower() score = 0.0 # Прямые триггеры for trigger in self.CRON_TRIGGERS: if trigger in message_lower: return True, 0.85 # Расписания schedules = ['каждый', 'каждую', 'ежедневно', 'ежечасно', 'еженедельно', 'раз в'] for sched in schedules: if sched in message_lower: score = max(score, 0.8) # Напоминания и периодические задачи if any(word in message_lower for word in ['напомни', 'запланируй', 'повторяй']): score = max(score, 0.85) return score >= 0.8, score def _should_use_file_system(self, message: str) -> tuple[bool, float]: """Проверить, нужна ли операция с файловой системой.""" message_lower = message.lower() score = 0.0 # Прямые триггеры for trigger in self.FILE_SYSTEM_TRIGGERS: if trigger in message_lower: return True, 0.9 # Операции с файлами file_operations = ['прочитай', 'покажи', 'создай', 'запиши', 'скопируй', 'перемести', 'удали', 'открой'] file_objects = ['файл', 'директорию', 'папку', 'документ', 'текст', 'содержимое'] has_op = any(op in message_lower for op in file_operations) has_obj = any(obj in message_lower for obj in file_objects) if has_op and has_obj: score = max(score, 0.75) # Упоминания конкретных команд commands = ['cat', 'ls', 'mkdir', 'cp', 'mv', 'rm', 'touch', 'pwd'] for cmd in commands: if f'{cmd} ' in message_lower or message_lower.endswith(cmd): score = max(score, 0.85) return score >= 0.75, score async def decide(self, message: str, context: Optional[Dict] = None) -> AgentDecision: """ Принять решение об использовании инструмента. Args: message: Сообщение пользователя context: Дополнительный контекст (история, состояние) Returns: AgentDecision с решением агента """ user_id = context.get('user_id') if context else None # Приоритет: File System > SSH > Cron > Поиск > RSS # Проверяем в порядке приоритета # 1. Проверка на операции с файловой системой (ВЫСОКИЙ ПРИОРИТЕТ) should_fs, fs_conf = self._should_use_file_system(message) if should_fs and fs_conf > 0.75: return AgentDecision( should_use_tool=True, tool_name='file_system_tool', tool_args=self._extract_file_system_args(message), confidence=fs_conf, reasoning='Пользователю нужно выполнить операцию с файлами' ) # 2. Проверка на SSH-команды (системные задачи) should_ssh, ssh_conf = self._should_use_ssh(message) if should_ssh and ssh_conf > 0.75: return AgentDecision( should_use_tool=True, tool_name='ssh_tool', tool_args={'command': self._extract_ssh_command(message)}, confidence=ssh_conf, reasoning='Пользователю нужно выполнить команду на сервере' ) # 3. Проверка на Cron-задачи (планирование) should_cron, cron_conf = self._should_use_cron(message) if should_cron and cron_conf > 0.75: return AgentDecision( should_use_tool=True, tool_name='cron_tool', tool_args={'action': 'list'}, # Показываем список задач confidence=cron_conf, reasoning='Пользователь хочет создать или управлять задачей' ) # 4. Проверка на поиск should_search, search_conf = self._should_search(message) if should_search and search_conf > 0.7: query = self._extract_search_query(message) return AgentDecision( should_use_tool=True, tool_name='ddgs_tool', tool_args={'query': query, 'max_results': 5}, confidence=search_conf, reasoning='Пользователю нужна информация из интернета' ) # 5. Проверка на RSS — только явные запросы should_rss, rss_conf = self._should_read_rss(message) if should_rss: # Порог уже проверен в _should_read_rss (0.95) return AgentDecision( should_use_tool=True, tool_name='rss_tool', tool_args={'action': 'list', 'limit': 10, 'undigested_only': True}, confidence=rss_conf, reasoning='Пользователь хочет прочитать новости из лент' ) # Инструменты не нужны return AgentDecision( should_use_tool=False, confidence=0.0, reasoning='Инструменты не требуются' ) def _extract_search_query(self, message: str) -> str: """Извлечь поисковый запрос из сообщения.""" triggers_to_remove = self.SEARCH_TRIGGERS + ['покажи', 'напиши', 'дай', 'расскажи', 'хочу', 'надо', 'нужно'] query = message.lower() for trigger in triggers_to_remove: query = query.replace(trigger, '') query = query.strip(' ?:.,!') if not query: query = message return query.strip() def _extract_ssh_command(self, message: str) -> str: """Извлечь SSH-команду из сообщения.""" message_lower = message.lower() # Если есть явная команда в кавычках import re quoted = re.search(r'["\']([^"\']+)["\']', message) if quoted: return quoted.group(1).strip() # Если команда после триггера for trigger in ['выполни команду', 'запусти', 'ssh']: if trigger in message_lower: idx = message_lower.find(trigger) return message[idx + len(trigger):].strip() # Возвращаем оригинальное сообщение как команду return message def _extract_file_system_args(self, message: str) -> Dict[str, Any]: """ Извлечь аргументы для file_system_tool из сообщения. Возвращает dict с operation и другими параметрами. """ import re message_lower = message.lower() # Определяем операцию по триггерам operation_map = { 'прочитай файл': 'read', 'покажи файл': 'read', 'открой файл': 'read', 'посмотри файл': 'read', 'посмотри содержимое': 'read', 'содержимое файла': 'read', 'cat ': 'read', 'создай файл': 'write', 'запиши в файл': 'write', 'сохрани в файл': 'write', 'сохрани текст': 'write', 'запиши текст': 'write', 'touch ': 'write', 'скопируй файл': 'copy', 'скопируй': 'copy', 'cp ': 'copy', 'перемести файл': 'move', 'перемести': 'move', 'mv ': 'move', 'переименуй файл': 'move', # Переименование = перемещение 'удали файл': 'delete', 'удали директорию': 'delete', 'удали папку': 'delete', 'rm ': 'delete', 'создай директорию': 'mkdir', 'создай папку': 'mkdir', 'mkdir ': 'mkdir', 'покажи директорию': 'list', 'список файлов': 'list', 'что в папке': 'list', 'что в директории': 'list', 'покажи файлы': 'list', 'ls ': 'list', 'найди файл': 'search', 'поиск файла': 'search', } # Определяем операцию operation = 'shell' # по умолчанию for trigger, op in operation_map.items(): if trigger in message_lower: operation = op break # Извлекаем путь (после команды) path = None source = None destination = None content = None # Паттерн для извлечения пути после команды for cmd in ['cat', 'ls', 'mkdir', 'rm', 'touch']: match = re.search(rf'{cmd}\s+([^\s]+)', message_lower) if match: path = match.group(1).strip() break # Для copy/move ищем два пути if operation in ('copy', 'move'): # Ищем паттерн "X в Y" или "X Y" match = re.search(r'([^\s]+)\s+(?:в|into|to)\s+([^\s]+)', message_lower) if match: source = match.group(1).strip() destination = match.group(2).strip() else: # Просто два слова подряд parts = message.split() for i, part in enumerate(parts): if part.lower() in ['cp', 'mv', 'copy', 'move', 'скопируй', 'перемести']: if i + 2 < len(parts): source = parts[i + 1].strip() destination = parts[i + 2].strip() break # Для write пытаемся извлечь содержимое if operation == 'write': # Ищем текст после "сохрани" или "запиши" match = re.search(r'(?:сохрани|запиши)\s*(?:в файл|текст)?\s*[:\-]?\s*(.+)', message, re.IGNORECASE) if match: content = match.group(1).strip() # Если есть кавычки - извлекаем содержимое quoted = re.search(r'["\']([^"\']+)["\']', message) if quoted: content = quoted.group(1) # Для search ищем паттерн pattern = '*' if operation == 'search': match = re.search(r'pattern\s*[=:]\s*([^\s]+)', message_lower) if match: pattern = match.group(1).strip() # Или ищем *.extension glob_match = re.search(r'\*\.[^\s]+', message_lower) if glob_match: pattern = glob_match.group(0).strip() # Формируем аргументы args = {'operation': operation} if path: args['path'] = path if source: args['source'] = source if destination: args['destination'] = destination if content: args['content'] = content if pattern and operation == 'search': args['pattern'] = pattern # Если путь не найден, пробуем извлечь общее слово после операции if not path and not source: words = message.split() for i, word in enumerate(words): if word.lower() in ['cat', 'ls', 'mkdir', 'rm', 'touch', 'read', 'write', 'delete', 'list']: if i + 1 < len(words): args['path'] = words[i + 1].strip() break logger.info(f"Извлечены аргументы file_system: {args}") return args async def execute_tool(self, tool_name: str, **kwargs) -> ToolResult: """Выполнить инструмент и сохранить историю.""" logger.info(f"🤖 AI-агент выполняет инструмент: {tool_name} с аргументами: {kwargs}") result = await self.registry.execute_tool(tool_name, **kwargs) # Сохраняем историю использования self._tool_use_history.append({ 'tool_name': tool_name, 'args': kwargs, 'result': result.to_dict(), 'timestamp': datetime.now().isoformat() }) # Ограничиваем историю if len(self._tool_use_history) > 100: self._tool_use_history = self._tool_use_history[-50:] logger.info(f"✅ Инструмент {tool_name} выполнен: success={result.success}") return result def get_tool_history(self, limit: int = 10) -> List[Dict]: """Получить историю использования инструментов.""" return self._tool_use_history[-limit:] def set_user_preference(self, user_id: int, preference: str, value: Any): """Установить предпочтение пользователя для инструментов.""" if user_id not in self._user_preferences: self._user_preferences[user_id] = {} self._user_preferences[user_id][preference] = value logger.info(f"Установлено предпочтение для пользователя {user_id}: {preference} = {value}") def get_user_preference(self, user_id: int, preference: str, default: Any = None) -> Any: """Получить предпочтение пользователя.""" return self._user_preferences.get(user_id, {}).get(preference, default) # Глобальный агент ai_agent = AIAgent()