telegram-cli-bot/bot/ai_agent.py

532 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
AI Agent Module - автономный агент с инструментами.
Агент может самостоятельно принимать решения об использовании инструментов
на основе контекста запроса пользователя.
"""
import logging
import re
from typing import Optional, List, Dict, Any
from dataclasses import dataclass
from datetime import datetime
from bot.tools import tools_registry, ToolResult
logger = logging.getLogger(__name__)
@dataclass
class AgentDecision:
"""Решение агента об использовании инструмента."""
should_use_tool: bool
tool_name: Optional[str] = None
tool_args: Optional[Dict[str, Any]] = None
confidence: float = 0.0
reasoning: str = ""
class AIAgent:
"""
AI-агент с доступом к инструментам.
Агент анализирует запрос и решает, нужно ли использовать
какой-либо инструмент для выполнения задачи.
"""
# Триггеры для поиска в интернете
SEARCH_TRIGGERS = [
'найди', 'поиск', 'погугли', 'узнай', 'проверь в интернете',
'что нового', 'последние новости', 'свежая информация',
'как сделать', 'руководство', 'документация', 'tutorial',
'weather', 'news', 'search', 'find', 'look up',
'что это', 'кто такой', 'где находится', 'когда выйдет',
'скачай', 'загрузи', 'найди информацию', 'посмотри в сети'
]
# Триггеры для RSS — ТОЛЬКО явные запросы новостей
# Используем полные фразы чтобы избежать ложных срабатываний
RSS_TRIGGERS = [
'почитай новости', 'покажи новости', 'что нового в linux',
'новости it', 'tech news', 'opensource новости', 'linux новости',
'новости технологий', 'rss лента', 'дайджест новостей',
'свежие новости it', 'последние новости it', 'новости linux',
'it новости', 'новости opensource', 'лента новостей'
]
# Триггеры для SSH-команд
SSH_TRIGGERS = [
'выполни команду', 'ssh', 'запусти на сервере', 'проверь сервер',
'посмотри логи', 'покажи процесс', 'сколько места', 'df', 'top',
'перезапусти', 'останови', 'запусти сервис', 'systemctl',
'проверь нагрузку', 'uptime', 'кто залогинен', 'who', 'last',
'посмотри в /var/log', 'проверь диск', 'мониторинг',
'выполни на 192.168.1', 'запусти скрипт', 'cron'
]
# Триггеры для Cron-задач
CRON_TRIGGERS = [
'напомни', 'запланируй', 'каждый день', 'каждый час',
'периодически', 'по расписанию', 'автоматически',
'создай задачу', 'добавь в cron', 'регулярно',
'повторяй', 'каждую неделю', 'ежедневно', 'ежечасно'
]
# Триггеры для работы с файлами (File System Tool)
FILE_SYSTEM_TRIGGERS = [
'прочитай файл', 'покажи файл', 'открой файл', 'посмотри файл',
'создай файл', 'запиши в файл', 'сохрани в файл',
'скопируй файл', 'перемести файл', 'удали файл',
'создай директорию', 'создай папку', 'покажи директорию',
'список файлов', 'что в папке', 'что в директории',
'найди файл', 'поиск файла', 'переименуй файл',
'посмотри содержимое', 'содержимое файла', 'cat ',
'ls ', 'mkdir ', 'cp ', 'mv ', 'rm ', 'touch ',
'сохрани текст', 'запиши текст', 'скопируй', 'перемести',
'удали директорию', 'удали папку', 'покажи файлы'
]
def __init__(self):
self.registry = tools_registry
self._tool_use_history: List[Dict] = []
self._user_preferences: Dict[int, Dict] = {} # preferences per user
def _should_search(self, message: str) -> tuple[bool, float]:
"""Проверить, нужен ли поиск в интернете."""
message_lower = message.lower()
score = 0.0
# Прямые триггеры — высокий приоритет
# Используем паттерн с границами для избежания частичных совпадений
for trigger in self.SEARCH_TRIGGERS:
escaped_trigger = re.escape(trigger)
pattern = rf'(?:^|[\s,\.!?;:])({escaped_trigger})(?:$|[\s,\.!?;:])'
if re.search(pattern, message_lower):
return True, 0.9
# Вопросы с "что", "как", "где", "когда" о внешних фактах
question_words = ['что такое', 'как сделать', 'где найти', 'когда будет']
for qword in question_words:
if qword in message_lower:
score = max(score, 0.7)
# Упоминания текущих событий
current_events = ['сегодня', 'сейчас', 'в этом году', 'recent', 'latest', '2024', '2025', '2026']
for event in current_events:
if event in message_lower:
score = max(score, 0.6)
# Если есть вопросительные слова + внешние факты
if any(word in message_lower for word in ['почему', 'зачем', 'как работает']):
score = max(score, 0.65)
return score >= 0.65, score
def _should_read_rss(self, message: str) -> tuple[bool, float]:
"""Проверить, нужно ли читать RSS ленты.
ВАЖНО: Используем ТОЛЬКО полные фразы-триггеры.
Отдельные слова (типа "новости") НЕ активируют RSS — это предотвращает
ложные срабатывания когда пользователь просто упоминает слово в контексте.
"""
message_lower = message.lower()
# Только прямые фразы-триггеры — высокий порог
# Проверяем чтобы триггер был словом/фразой в контексте, а не частью слова
for trigger in self.RSS_TRIGGERS:
escaped_trigger = re.escape(trigger)
# Паттерн: начало строки ИЛИ пробел/знак препинания перед триггером,
# и конец строки ИЛИ пробел/знак препинания после
pattern = rf'(?:^|[\s,\.!?;:])({escaped_trigger})(?:$|[\s,\.!?;:])'
if re.search(pattern, message_lower):
return True, 0.95
# Отдельные ключевые слова НЕ проверяем — только явные фразы!
# Это предотвращает срабатывание на сообщения типа:
# - "новости" (просто упомянул слово)
# - "н.овости" (разбитое слово)
# - "я читал новости вчера" (прошедшее время, не запрос)
return False, 0.0
def _should_use_ssh(self, message: str) -> tuple[bool, float]:
"""Проверить, нужна ли SSH-команда."""
message_lower = message.lower()
score = 0.0
# Прямые триггеры
# Используем паттерн с границами для избежания частичных совпадений
for trigger in self.SSH_TRIGGERS:
escaped_trigger = re.escape(trigger)
pattern = rf'(?:^|[\s,\.!?;:])({escaped_trigger})(?:$|[\s,\.!?;:])'
if re.search(pattern, message_lower):
return True, 0.9
# Команды системного администрирования
sysadmin_tasks = ['проверь', 'посмотри', 'покажи', 'выполни', 'запусти']
sysadmin_objects = ['сервер', 'лог', 'процесс', 'диск', 'память', 'сервис', 'демон']
has_task = any(task in message_lower for task in sysadmin_tasks)
has_object = any(obj in message_lower for obj in sysadmin_objects)
if has_task and has_object:
score = max(score, 0.75)
# Упоминания конкретных утилит
utils = ['systemctl', 'journalctl', 'top', 'htop', 'df', 'du', 'free', 'ps', 'netstat']
for util in utils:
if util in message_lower:
score = max(score, 0.8)
return score >= 0.75, score
def _should_use_cron(self, message: str) -> tuple[bool, float]:
"""Проверить, нужна ли cron-задача."""
message_lower = message.lower()
score = 0.0
# Прямые триггеры
# Используем паттерн с границами для избежания частичных совпадений
for trigger in self.CRON_TRIGGERS:
escaped_trigger = re.escape(trigger)
pattern = rf'(?:^|[\s,\.!?;:])({escaped_trigger})(?:$|[\s,\.!?;:])'
if re.search(pattern, message_lower):
return True, 0.85
# Расписания
schedules = ['каждый', 'каждую', 'ежедневно', 'ежечасно', 'еженедельно', 'раз в']
for sched in schedules:
if sched in message_lower:
score = max(score, 0.8)
# Напоминания и периодические задачи
if any(word in message_lower for word in ['напомни', 'запланируй', 'повторяй']):
score = max(score, 0.85)
return score >= 0.8, score
def _should_use_file_system(self, message: str) -> tuple[bool, float]:
"""Проверить, нужна ли операция с файловой системой."""
message_lower = message.lower()
score = 0.0
# Прямые триггеры
# Используем паттерн с границами для избежания частичных совпадений
for trigger in self.FILE_SYSTEM_TRIGGERS:
escaped_trigger = re.escape(trigger)
pattern = rf'(?:^|[\s,\.!?;:])({escaped_trigger})(?:$|[\s,\.!?;:])'
if re.search(pattern, message_lower):
return True, 0.9
# Операции с файлами
file_operations = ['прочитай', 'покажи', 'создай', 'запиши', 'скопируй', 'перемести', 'удали', 'открой']
file_objects = ['файл', 'директорию', 'папку', 'документ', 'текст', 'содержимое']
has_op = any(op in message_lower for op in file_operations)
has_obj = any(obj in message_lower for obj in file_objects)
if has_op and has_obj:
score = max(score, 0.75)
# Упоминания конкретных команд
commands = ['cat', 'ls', 'mkdir', 'cp', 'mv', 'rm', 'touch', 'pwd']
for cmd in commands:
if f'{cmd} ' in message_lower or message_lower.endswith(cmd):
score = max(score, 0.85)
return score >= 0.75, score
async def decide(self, message: str, context: Optional[Dict] = None) -> AgentDecision:
"""
Принять решение об использовании инструмента.
Args:
message: Сообщение пользователя
context: Дополнительный контекст (история, состояние)
Returns:
AgentDecision с решением агента
"""
user_id = context.get('user_id') if context else None
# Приоритет: File System > SSH > Cron > Поиск > RSS
# Проверяем в порядке приоритета
# 1. Проверка на операции с файловой системой (ВЫСОКИЙ ПРИОРИТЕТ)
should_fs, fs_conf = self._should_use_file_system(message)
if should_fs and fs_conf > 0.75:
return AgentDecision(
should_use_tool=True,
tool_name='file_system_tool',
tool_args=self._extract_file_system_args(message),
confidence=fs_conf,
reasoning='Пользователю нужно выполнить операцию с файлами'
)
# 2. Проверка на SSH-команды (системные задачи)
should_ssh, ssh_conf = self._should_use_ssh(message)
if should_ssh and ssh_conf > 0.75:
return AgentDecision(
should_use_tool=True,
tool_name='ssh_tool',
tool_args={'command': self._extract_ssh_command(message)},
confidence=ssh_conf,
reasoning='Пользователю нужно выполнить команду на сервере'
)
# 3. Проверка на Cron-задачи (планирование)
should_cron, cron_conf = self._should_use_cron(message)
if should_cron and cron_conf > 0.75:
return AgentDecision(
should_use_tool=True,
tool_name='cron_tool',
tool_args={'action': 'list'}, # Показываем список задач
confidence=cron_conf,
reasoning='Пользователь хочет создать или управлять задачей'
)
# 4. Проверка на поиск
should_search, search_conf = self._should_search(message)
if should_search and search_conf > 0.7:
query = self._extract_search_query(message)
return AgentDecision(
should_use_tool=True,
tool_name='ddgs_tool',
tool_args={'query': query, 'max_results': 5},
confidence=search_conf,
reasoning='Пользователю нужна информация из интернета'
)
# 5. Проверка на RSS — только явные запросы
should_rss, rss_conf = self._should_read_rss(message)
if should_rss: # Порог уже проверен в _should_read_rss (0.95)
return AgentDecision(
should_use_tool=True,
tool_name='rss_tool',
tool_args={'action': 'list', 'limit': 10, 'undigested_only': True},
confidence=rss_conf,
reasoning='Пользователь хочет прочитать новости из лент'
)
# Инструменты не нужны
return AgentDecision(
should_use_tool=False,
confidence=0.0,
reasoning='Инструменты не требуются'
)
def _extract_search_query(self, message: str) -> str:
"""Извлечь поисковый запрос из сообщения."""
triggers_to_remove = self.SEARCH_TRIGGERS + ['покажи', 'напиши', 'дай', 'расскажи', 'хочу', 'надо', 'нужно']
query = message.lower()
for trigger in triggers_to_remove:
query = query.replace(trigger, '')
query = query.strip(' ?:.,!')
if not query:
query = message
return query.strip()
def _extract_ssh_command(self, message: str) -> str:
"""Извлечь SSH-команду из сообщения."""
message_lower = message.lower()
# Если есть явная команда в кавычках
import re
quoted = re.search(r'["\']([^"\']+)["\']', message)
if quoted:
return quoted.group(1).strip()
# Если команда после триггера
for trigger in ['выполни команду', 'запусти', 'ssh']:
if trigger in message_lower:
idx = message_lower.find(trigger)
return message[idx + len(trigger):].strip()
# Возвращаем оригинальное сообщение как команду
return message
def _extract_file_system_args(self, message: str) -> Dict[str, Any]:
"""
Извлечь аргументы для file_system_tool из сообщения.
Возвращает dict с operation и другими параметрами.
"""
import re
message_lower = message.lower()
# Определяем операцию по триггерам
operation_map = {
'прочитай файл': 'read',
'покажи файл': 'read',
'открой файл': 'read',
'посмотри файл': 'read',
'посмотри содержимое': 'read',
'содержимое файла': 'read',
'cat ': 'read',
'создай файл': 'write',
'запиши в файл': 'write',
'сохрани в файл': 'write',
'сохрани текст': 'write',
'запиши текст': 'write',
'touch ': 'write',
'скопируй файл': 'copy',
'скопируй': 'copy',
'cp ': 'copy',
'перемести файл': 'move',
'перемести': 'move',
'mv ': 'move',
'переименуй файл': 'move', # Переименование = перемещение
'удали файл': 'delete',
'удали директорию': 'delete',
'удали папку': 'delete',
'rm ': 'delete',
'создай директорию': 'mkdir',
'создай папку': 'mkdir',
'mkdir ': 'mkdir',
'покажи директорию': 'list',
'список файлов': 'list',
'что в папке': 'list',
'что в директории': 'list',
'покажи файлы': 'list',
'ls ': 'list',
'найди файл': 'search',
'поиск файла': 'search',
}
# Определяем операцию
operation = 'shell' # по умолчанию
for trigger, op in operation_map.items():
if trigger in message_lower:
operation = op
break
# Извлекаем путь (после команды)
path = None
source = None
destination = None
content = None
# Паттерн для извлечения пути после команды
for cmd in ['cat', 'ls', 'mkdir', 'rm', 'touch']:
match = re.search(rf'{cmd}\s+([^\s]+)', message_lower)
if match:
path = match.group(1).strip()
break
# Для copy/move ищем два пути
if operation in ('copy', 'move'):
# Ищем паттерн "X в Y" или "X Y"
match = re.search(r'([^\s]+)\s+(?:в|into|to)\s+([^\s]+)', message_lower)
if match:
source = match.group(1).strip()
destination = match.group(2).strip()
else:
# Просто два слова подряд
parts = message.split()
for i, part in enumerate(parts):
if part.lower() in ['cp', 'mv', 'copy', 'move', 'скопируй', 'перемести']:
if i + 2 < len(parts):
source = parts[i + 1].strip()
destination = parts[i + 2].strip()
break
# Для write пытаемся извлечь содержимое
if operation == 'write':
# Ищем текст после "сохрани" или "запиши"
match = re.search(r'(?:сохрани|запиши)\s*(?:в файл|текст)?\s*[:\-]?\s*(.+)', message, re.IGNORECASE)
if match:
content = match.group(1).strip()
# Если есть кавычки - извлекаем содержимое
quoted = re.search(r'["\']([^"\']+)["\']', message)
if quoted:
content = quoted.group(1)
# Для search ищем паттерн
pattern = '*'
if operation == 'search':
match = re.search(r'pattern\s*[=:]\s*([^\s]+)', message_lower)
if match:
pattern = match.group(1).strip()
# Или ищем *.extension
glob_match = re.search(r'\*\.[^\s]+', message_lower)
if glob_match:
pattern = glob_match.group(0).strip()
# Формируем аргументы
args = {'operation': operation}
if path:
args['path'] = path
if source:
args['source'] = source
if destination:
args['destination'] = destination
if content:
args['content'] = content
if pattern and operation == 'search':
args['pattern'] = pattern
# Если путь не найден, пробуем извлечь общее слово после операции
if not path and not source:
words = message.split()
for i, word in enumerate(words):
if word.lower() in ['cat', 'ls', 'mkdir', 'rm', 'touch', 'read', 'write', 'delete', 'list']:
if i + 1 < len(words):
args['path'] = words[i + 1].strip()
break
logger.info(f"Извлечены аргументы file_system: {args}")
return args
async def execute_tool(self, tool_name: str, **kwargs) -> ToolResult:
"""Выполнить инструмент и сохранить историю."""
logger.info(f"🤖 AI-агент выполняет инструмент: {tool_name} с аргументами: {kwargs}")
result = await self.registry.execute_tool(tool_name, **kwargs)
# Сохраняем историю использования
self._tool_use_history.append({
'tool_name': tool_name,
'args': kwargs,
'result': result.to_dict(),
'timestamp': datetime.now().isoformat()
})
# Ограничиваем историю
if len(self._tool_use_history) > 100:
self._tool_use_history = self._tool_use_history[-50:]
logger.info(f"✅ Инструмент {tool_name} выполнен: success={result.success}")
return result
def get_tool_history(self, limit: int = 10) -> List[Dict]:
"""Получить историю использования инструментов."""
return self._tool_use_history[-limit:]
def set_user_preference(self, user_id: int, preference: str, value: Any):
"""Установить предпочтение пользователя для инструментов."""
if user_id not in self._user_preferences:
self._user_preferences[user_id] = {}
self._user_preferences[user_id][preference] = value
logger.info(f"Установлено предпочтение для пользователя {user_id}: {preference} = {value}")
def get_user_preference(self, user_id: int, preference: str, default: Any = None) -> Any:
"""Получить предпочтение пользователя."""
return self._user_preferences.get(user_id, {}).get(preference, default)
# Глобальный агент
ai_agent = AIAgent()