514 lines
23 KiB
Python
514 lines
23 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
AI Agent Module - автономный агент с инструментами.
|
||
|
||
Агент может самостоятельно принимать решения об использовании инструментов
|
||
на основе контекста запроса пользователя.
|
||
"""
|
||
|
||
import logging
|
||
from typing import Optional, List, Dict, Any
|
||
from dataclasses import dataclass
|
||
from datetime import datetime
|
||
|
||
from bot.tools import tools_registry, ToolResult
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
@dataclass
|
||
class AgentDecision:
|
||
"""Решение агента об использовании инструмента."""
|
||
should_use_tool: bool
|
||
tool_name: Optional[str] = None
|
||
tool_args: Optional[Dict[str, Any]] = None
|
||
confidence: float = 0.0
|
||
reasoning: str = ""
|
||
|
||
|
||
class AIAgent:
|
||
"""
|
||
AI-агент с доступом к инструментам.
|
||
|
||
Агент анализирует запрос и решает, нужно ли использовать
|
||
какой-либо инструмент для выполнения задачи.
|
||
"""
|
||
|
||
# Триггеры для поиска в интернете
|
||
SEARCH_TRIGGERS = [
|
||
'найди', 'поиск', 'погугли', 'узнай', 'проверь в интернете',
|
||
'что нового', 'последние новости', 'свежая информация',
|
||
'как сделать', 'руководство', 'документация', 'tutorial',
|
||
'weather', 'news', 'search', 'find', 'look up',
|
||
'что это', 'кто такой', 'где находится', 'когда выйдет',
|
||
'скачай', 'загрузи', 'найди информацию', 'посмотри в сети'
|
||
]
|
||
|
||
# Триггеры для RSS — ТОЛЬКО явные запросы новостей
|
||
# Используем полные фразы чтобы избежать ложных срабатываний
|
||
RSS_TRIGGERS = [
|
||
'почитай новости', 'покажи новости', 'что нового в linux',
|
||
'новости it', 'tech news', 'opensource новости', 'linux новости',
|
||
'новости технологий', 'rss лента', 'дайджест новостей',
|
||
'свежие новости it', 'последние новости it', 'новости linux',
|
||
'it новости', 'новости opensource', 'лента новостей'
|
||
]
|
||
|
||
# Триггеры для SSH-команд
|
||
SSH_TRIGGERS = [
|
||
'выполни команду', 'ssh', 'запусти на сервере', 'проверь сервер',
|
||
'посмотри логи', 'покажи процесс', 'сколько места', 'df', 'top',
|
||
'перезапусти', 'останови', 'запусти сервис', 'systemctl',
|
||
'проверь нагрузку', 'uptime', 'кто залогинен', 'who', 'last',
|
||
'посмотри в /var/log', 'проверь диск', 'мониторинг',
|
||
'выполни на 192.168.1', 'запусти скрипт', 'cron'
|
||
]
|
||
|
||
# Триггеры для Cron-задач
|
||
CRON_TRIGGERS = [
|
||
'напомни', 'запланируй', 'каждый день', 'каждый час',
|
||
'периодически', 'по расписанию', 'автоматически',
|
||
'создай задачу', 'добавь в cron', 'регулярно',
|
||
'повторяй', 'каждую неделю', 'ежедневно', 'ежечасно'
|
||
]
|
||
|
||
# Триггеры для работы с файлами (File System Tool)
|
||
FILE_SYSTEM_TRIGGERS = [
|
||
'прочитай файл', 'покажи файл', 'открой файл', 'посмотри файл',
|
||
'создай файл', 'запиши в файл', 'сохрани в файл',
|
||
'скопируй файл', 'перемести файл', 'удали файл',
|
||
'создай директорию', 'создай папку', 'покажи директорию',
|
||
'список файлов', 'что в папке', 'что в директории',
|
||
'найди файл', 'поиск файла', 'переименуй файл',
|
||
'посмотри содержимое', 'содержимое файла', 'cat ',
|
||
'ls ', 'mkdir ', 'cp ', 'mv ', 'rm ', 'touch ',
|
||
'сохрани текст', 'запиши текст', 'скопируй', 'перемести',
|
||
'удали директорию', 'удали папку', 'покажи файлы'
|
||
]
|
||
|
||
def __init__(self):
|
||
self.registry = tools_registry
|
||
self._tool_use_history: List[Dict] = []
|
||
self._user_preferences: Dict[int, Dict] = {} # preferences per user
|
||
|
||
def _should_search(self, message: str) -> tuple[bool, float]:
|
||
"""Проверить, нужен ли поиск в интернете."""
|
||
message_lower = message.lower()
|
||
score = 0.0
|
||
|
||
# Прямые триггеры — высокий приоритет
|
||
for trigger in self.SEARCH_TRIGGERS:
|
||
if trigger in message_lower:
|
||
return True, 0.9
|
||
|
||
# Вопросы с "что", "как", "где", "когда" о внешних фактах
|
||
question_words = ['что такое', 'как сделать', 'где найти', 'когда будет']
|
||
for qword in question_words:
|
||
if qword in message_lower:
|
||
score = max(score, 0.7)
|
||
|
||
# Упоминания текущих событий
|
||
current_events = ['сегодня', 'сейчас', 'в этом году', 'recent', 'latest', '2024', '2025', '2026']
|
||
for event in current_events:
|
||
if event in message_lower:
|
||
score = max(score, 0.6)
|
||
|
||
# Если есть вопросительные слова + внешние факты
|
||
if any(word in message_lower for word in ['почему', 'зачем', 'как работает']):
|
||
score = max(score, 0.65)
|
||
|
||
return score >= 0.65, score
|
||
|
||
def _should_read_rss(self, message: str) -> tuple[bool, float]:
|
||
"""Проверить, нужно ли читать RSS ленты.
|
||
|
||
ВАЖНО: Используем ТОЛЬКО полные фразы-триггеры.
|
||
Отдельные слова (типа "новости") НЕ активируют RSS — это предотвращает
|
||
ложные срабатывания когда пользователь просто упоминает слово в контексте.
|
||
"""
|
||
message_lower = message.lower()
|
||
|
||
# Только прямые фразы-триггеры — высокий порог
|
||
for trigger in self.RSS_TRIGGERS:
|
||
if trigger in message_lower:
|
||
return True, 0.95
|
||
|
||
# Отдельные ключевые слова НЕ проверяем — только явные фразы!
|
||
# Это предотвращает срабатывание на сообщения типа:
|
||
# - "новости" (просто упомянул слово)
|
||
# - "н.овости" (разбитое слово)
|
||
# - "я читал новости вчера" (прошедшее время, не запрос)
|
||
|
||
return False, 0.0
|
||
|
||
def _should_use_ssh(self, message: str) -> tuple[bool, float]:
|
||
"""Проверить, нужна ли SSH-команда."""
|
||
message_lower = message.lower()
|
||
score = 0.0
|
||
|
||
# Прямые триггеры
|
||
for trigger in self.SSH_TRIGGERS:
|
||
if trigger in message_lower:
|
||
return True, 0.9
|
||
|
||
# Команды системного администрирования
|
||
sysadmin_tasks = ['проверь', 'посмотри', 'покажи', 'выполни', 'запусти']
|
||
sysadmin_objects = ['сервер', 'лог', 'процесс', 'диск', 'память', 'сервис', 'демон']
|
||
|
||
has_task = any(task in message_lower for task in sysadmin_tasks)
|
||
has_object = any(obj in message_lower for obj in sysadmin_objects)
|
||
|
||
if has_task and has_object:
|
||
score = max(score, 0.75)
|
||
|
||
# Упоминания конкретных утилит
|
||
utils = ['systemctl', 'journalctl', 'top', 'htop', 'df', 'du', 'free', 'ps', 'netstat']
|
||
for util in utils:
|
||
if util in message_lower:
|
||
score = max(score, 0.8)
|
||
|
||
return score >= 0.75, score
|
||
|
||
def _should_use_cron(self, message: str) -> tuple[bool, float]:
|
||
"""Проверить, нужна ли cron-задача."""
|
||
message_lower = message.lower()
|
||
score = 0.0
|
||
|
||
# Прямые триггеры
|
||
for trigger in self.CRON_TRIGGERS:
|
||
if trigger in message_lower:
|
||
return True, 0.85
|
||
|
||
# Расписания
|
||
schedules = ['каждый', 'каждую', 'ежедневно', 'ежечасно', 'еженедельно', 'раз в']
|
||
for sched in schedules:
|
||
if sched in message_lower:
|
||
score = max(score, 0.8)
|
||
|
||
# Напоминания и периодические задачи
|
||
if any(word in message_lower for word in ['напомни', 'запланируй', 'повторяй']):
|
||
score = max(score, 0.85)
|
||
|
||
return score >= 0.8, score
|
||
|
||
def _should_use_file_system(self, message: str) -> tuple[bool, float]:
|
||
"""Проверить, нужна ли операция с файловой системой."""
|
||
message_lower = message.lower()
|
||
score = 0.0
|
||
|
||
# Прямые триггеры
|
||
for trigger in self.FILE_SYSTEM_TRIGGERS:
|
||
if trigger in message_lower:
|
||
return True, 0.9
|
||
|
||
# Операции с файлами
|
||
file_operations = ['прочитай', 'покажи', 'создай', 'запиши', 'скопируй', 'перемести', 'удали', 'открой']
|
||
file_objects = ['файл', 'директорию', 'папку', 'документ', 'текст', 'содержимое']
|
||
|
||
has_op = any(op in message_lower for op in file_operations)
|
||
has_obj = any(obj in message_lower for obj in file_objects)
|
||
|
||
if has_op and has_obj:
|
||
score = max(score, 0.75)
|
||
|
||
# Упоминания конкретных команд
|
||
commands = ['cat', 'ls', 'mkdir', 'cp', 'mv', 'rm', 'touch', 'pwd']
|
||
for cmd in commands:
|
||
if f'{cmd} ' in message_lower or message_lower.endswith(cmd):
|
||
score = max(score, 0.85)
|
||
|
||
return score >= 0.75, score
|
||
|
||
async def decide(self, message: str, context: Optional[Dict] = None) -> AgentDecision:
|
||
"""
|
||
Принять решение об использовании инструмента.
|
||
|
||
Args:
|
||
message: Сообщение пользователя
|
||
context: Дополнительный контекст (история, состояние)
|
||
|
||
Returns:
|
||
AgentDecision с решением агента
|
||
"""
|
||
user_id = context.get('user_id') if context else None
|
||
|
||
# Приоритет: File System > SSH > Cron > Поиск > RSS
|
||
# Проверяем в порядке приоритета
|
||
|
||
# 1. Проверка на операции с файловой системой (ВЫСОКИЙ ПРИОРИТЕТ)
|
||
should_fs, fs_conf = self._should_use_file_system(message)
|
||
if should_fs and fs_conf > 0.75:
|
||
return AgentDecision(
|
||
should_use_tool=True,
|
||
tool_name='file_system_tool',
|
||
tool_args=self._extract_file_system_args(message),
|
||
confidence=fs_conf,
|
||
reasoning='Пользователю нужно выполнить операцию с файлами'
|
||
)
|
||
|
||
# 2. Проверка на SSH-команды (системные задачи)
|
||
should_ssh, ssh_conf = self._should_use_ssh(message)
|
||
if should_ssh and ssh_conf > 0.75:
|
||
return AgentDecision(
|
||
should_use_tool=True,
|
||
tool_name='ssh_tool',
|
||
tool_args={'command': self._extract_ssh_command(message)},
|
||
confidence=ssh_conf,
|
||
reasoning='Пользователю нужно выполнить команду на сервере'
|
||
)
|
||
|
||
# 3. Проверка на Cron-задачи (планирование)
|
||
should_cron, cron_conf = self._should_use_cron(message)
|
||
if should_cron and cron_conf > 0.75:
|
||
return AgentDecision(
|
||
should_use_tool=True,
|
||
tool_name='cron_tool',
|
||
tool_args={'action': 'list'}, # Показываем список задач
|
||
confidence=cron_conf,
|
||
reasoning='Пользователь хочет создать или управлять задачей'
|
||
)
|
||
|
||
# 4. Проверка на поиск
|
||
should_search, search_conf = self._should_search(message)
|
||
if should_search and search_conf > 0.7:
|
||
query = self._extract_search_query(message)
|
||
return AgentDecision(
|
||
should_use_tool=True,
|
||
tool_name='ddgs_tool',
|
||
tool_args={'query': query, 'max_results': 5},
|
||
confidence=search_conf,
|
||
reasoning='Пользователю нужна информация из интернета'
|
||
)
|
||
|
||
# 5. Проверка на RSS — только явные запросы
|
||
should_rss, rss_conf = self._should_read_rss(message)
|
||
if should_rss: # Порог уже проверен в _should_read_rss (0.95)
|
||
return AgentDecision(
|
||
should_use_tool=True,
|
||
tool_name='rss_tool',
|
||
tool_args={'action': 'list', 'limit': 10, 'undigested_only': True},
|
||
confidence=rss_conf,
|
||
reasoning='Пользователь хочет прочитать новости из лент'
|
||
)
|
||
|
||
# Инструменты не нужны
|
||
return AgentDecision(
|
||
should_use_tool=False,
|
||
confidence=0.0,
|
||
reasoning='Инструменты не требуются'
|
||
)
|
||
|
||
def _extract_search_query(self, message: str) -> str:
|
||
"""Извлечь поисковый запрос из сообщения."""
|
||
triggers_to_remove = self.SEARCH_TRIGGERS + ['покажи', 'напиши', 'дай', 'расскажи', 'хочу', 'надо', 'нужно']
|
||
|
||
query = message.lower()
|
||
for trigger in triggers_to_remove:
|
||
query = query.replace(trigger, '')
|
||
|
||
query = query.strip(' ?:.,!')
|
||
|
||
if not query:
|
||
query = message
|
||
|
||
return query.strip()
|
||
|
||
def _extract_ssh_command(self, message: str) -> str:
|
||
"""Извлечь SSH-команду из сообщения."""
|
||
message_lower = message.lower()
|
||
|
||
# Если есть явная команда в кавычках
|
||
import re
|
||
quoted = re.search(r'["\']([^"\']+)["\']', message)
|
||
if quoted:
|
||
return quoted.group(1).strip()
|
||
|
||
# Если команда после триггера
|
||
for trigger in ['выполни команду', 'запусти', 'ssh']:
|
||
if trigger in message_lower:
|
||
idx = message_lower.find(trigger)
|
||
return message[idx + len(trigger):].strip()
|
||
|
||
# Возвращаем оригинальное сообщение как команду
|
||
return message
|
||
|
||
def _extract_file_system_args(self, message: str) -> Dict[str, Any]:
|
||
"""
|
||
Извлечь аргументы для file_system_tool из сообщения.
|
||
|
||
Возвращает dict с operation и другими параметрами.
|
||
"""
|
||
import re
|
||
message_lower = message.lower()
|
||
|
||
# Определяем операцию по триггерам
|
||
operation_map = {
|
||
'прочитай файл': 'read',
|
||
'покажи файл': 'read',
|
||
'открой файл': 'read',
|
||
'посмотри файл': 'read',
|
||
'посмотри содержимое': 'read',
|
||
'содержимое файла': 'read',
|
||
'cat ': 'read',
|
||
|
||
'создай файл': 'write',
|
||
'запиши в файл': 'write',
|
||
'сохрани в файл': 'write',
|
||
'сохрани текст': 'write',
|
||
'запиши текст': 'write',
|
||
'touch ': 'write',
|
||
|
||
'скопируй файл': 'copy',
|
||
'скопируй': 'copy',
|
||
'cp ': 'copy',
|
||
|
||
'перемести файл': 'move',
|
||
'перемести': 'move',
|
||
'mv ': 'move',
|
||
'переименуй файл': 'move', # Переименование = перемещение
|
||
|
||
'удали файл': 'delete',
|
||
'удали директорию': 'delete',
|
||
'удали папку': 'delete',
|
||
'rm ': 'delete',
|
||
|
||
'создай директорию': 'mkdir',
|
||
'создай папку': 'mkdir',
|
||
'mkdir ': 'mkdir',
|
||
|
||
'покажи директорию': 'list',
|
||
'список файлов': 'list',
|
||
'что в папке': 'list',
|
||
'что в директории': 'list',
|
||
'покажи файлы': 'list',
|
||
'ls ': 'list',
|
||
|
||
'найди файл': 'search',
|
||
'поиск файла': 'search',
|
||
}
|
||
|
||
# Определяем операцию
|
||
operation = 'shell' # по умолчанию
|
||
for trigger, op in operation_map.items():
|
||
if trigger in message_lower:
|
||
operation = op
|
||
break
|
||
|
||
# Извлекаем путь (после команды)
|
||
path = None
|
||
source = None
|
||
destination = None
|
||
content = None
|
||
|
||
# Паттерн для извлечения пути после команды
|
||
for cmd in ['cat', 'ls', 'mkdir', 'rm', 'touch']:
|
||
match = re.search(rf'{cmd}\s+([^\s]+)', message_lower)
|
||
if match:
|
||
path = match.group(1).strip()
|
||
break
|
||
|
||
# Для copy/move ищем два пути
|
||
if operation in ('copy', 'move'):
|
||
# Ищем паттерн "X в Y" или "X Y"
|
||
match = re.search(r'([^\s]+)\s+(?:в|into|to)\s+([^\s]+)', message_lower)
|
||
if match:
|
||
source = match.group(1).strip()
|
||
destination = match.group(2).strip()
|
||
else:
|
||
# Просто два слова подряд
|
||
parts = message.split()
|
||
for i, part in enumerate(parts):
|
||
if part.lower() in ['cp', 'mv', 'copy', 'move', 'скопируй', 'перемести']:
|
||
if i + 2 < len(parts):
|
||
source = parts[i + 1].strip()
|
||
destination = parts[i + 2].strip()
|
||
break
|
||
|
||
# Для write пытаемся извлечь содержимое
|
||
if operation == 'write':
|
||
# Ищем текст после "сохрани" или "запиши"
|
||
match = re.search(r'(?:сохрани|запиши)\s*(?:в файл|текст)?\s*[:\-]?\s*(.+)', message, re.IGNORECASE)
|
||
if match:
|
||
content = match.group(1).strip()
|
||
# Если есть кавычки - извлекаем содержимое
|
||
quoted = re.search(r'["\']([^"\']+)["\']', message)
|
||
if quoted:
|
||
content = quoted.group(1)
|
||
|
||
# Для search ищем паттерн
|
||
pattern = '*'
|
||
if operation == 'search':
|
||
match = re.search(r'pattern\s*[=:]\s*([^\s]+)', message_lower)
|
||
if match:
|
||
pattern = match.group(1).strip()
|
||
# Или ищем *.extension
|
||
glob_match = re.search(r'\*\.[^\s]+', message_lower)
|
||
if glob_match:
|
||
pattern = glob_match.group(0).strip()
|
||
|
||
# Формируем аргументы
|
||
args = {'operation': operation}
|
||
|
||
if path:
|
||
args['path'] = path
|
||
if source:
|
||
args['source'] = source
|
||
if destination:
|
||
args['destination'] = destination
|
||
if content:
|
||
args['content'] = content
|
||
if pattern and operation == 'search':
|
||
args['pattern'] = pattern
|
||
|
||
# Если путь не найден, пробуем извлечь общее слово после операции
|
||
if not path and not source:
|
||
words = message.split()
|
||
for i, word in enumerate(words):
|
||
if word.lower() in ['cat', 'ls', 'mkdir', 'rm', 'touch', 'read', 'write', 'delete', 'list']:
|
||
if i + 1 < len(words):
|
||
args['path'] = words[i + 1].strip()
|
||
break
|
||
|
||
logger.info(f"Извлечены аргументы file_system: {args}")
|
||
return args
|
||
|
||
async def execute_tool(self, tool_name: str, **kwargs) -> ToolResult:
|
||
"""Выполнить инструмент и сохранить историю."""
|
||
logger.info(f"🤖 AI-агент выполняет инструмент: {tool_name} с аргументами: {kwargs}")
|
||
|
||
result = await self.registry.execute_tool(tool_name, **kwargs)
|
||
|
||
# Сохраняем историю использования
|
||
self._tool_use_history.append({
|
||
'tool_name': tool_name,
|
||
'args': kwargs,
|
||
'result': result.to_dict(),
|
||
'timestamp': datetime.now().isoformat()
|
||
})
|
||
|
||
# Ограничиваем историю
|
||
if len(self._tool_use_history) > 100:
|
||
self._tool_use_history = self._tool_use_history[-50:]
|
||
|
||
logger.info(f"✅ Инструмент {tool_name} выполнен: success={result.success}")
|
||
return result
|
||
|
||
def get_tool_history(self, limit: int = 10) -> List[Dict]:
|
||
"""Получить историю использования инструментов."""
|
||
return self._tool_use_history[-limit:]
|
||
|
||
def set_user_preference(self, user_id: int, preference: str, value: Any):
|
||
"""Установить предпочтение пользователя для инструментов."""
|
||
if user_id not in self._user_preferences:
|
||
self._user_preferences[user_id] = {}
|
||
self._user_preferences[user_id][preference] = value
|
||
logger.info(f"Установлено предпочтение для пользователя {user_id}: {preference} = {value}")
|
||
|
||
def get_user_preference(self, user_id: int, preference: str, default: Any = None) -> Any:
|
||
"""Получить предпочтение пользователя."""
|
||
return self._user_preferences.get(user_id, {}).get(preference, default)
|
||
|
||
|
||
# Глобальный агент
|
||
ai_agent = AIAgent()
|