#!/usr/bin/env python3 """ Cron Tool - инструмент для управления интеллектуальными задачами. Позволяет создавать, планировать и выполнять периодические задачи через AI-агент. Задачи хранятся как промпты для ИИ, а не как команды. """ import logging import sqlite3 import json from pathlib import Path from datetime import datetime, timedelta from typing import List, Dict, Any, Optional, Callable from dataclasses import dataclass, field from bot.tools import BaseTool, ToolResult, register_tool logger = logging.getLogger(__name__) @dataclass class CronJob: """ Интеллектуальная задача cron. Attributes: id: ID задачи name: Название задачи prompt: Промпт для ИИ-агента (вместо команды) schedule: Расписание (cron format: "*/5 * * * *" или "@daily", "@hourly") enabled: Включена ли задача user_id: ID пользователя Telegram notify: Уведомлять ли пользователя в Telegram о результате log_results: Сохранять ли результат в лог-файл last_run: Время последнего выполнения next_run: Время следующего выполнения created_at: Время создания """ id: Optional[int] name: str prompt: str # Промпт для ИИ вместо команды schedule: str user_id: Optional[int] = None # ID пользователя Telegram enabled: bool = True notify: bool = False # Уведомлять пользователя в Telegram log_results: bool = True # Сохранять в лог last_run: Optional[datetime] = None next_run: Optional[datetime] = None created_at: datetime = field(default_factory=datetime.now) class CronTool(BaseTool): """Инструмент для управления интеллектуальными задачами пользователя.""" name = "cron_tool" description = "Управление периодическими задачами через AI-агент. Создание, планирование и выполнение задач по расписанию." category = "automation" def __init__(self, db_path: str = None, log_dir: str = None): self.db_path = Path(db_path) if db_path else Path(__file__).parent.parent.parent / "cron.db" self.log_dir = Path(log_dir) if log_dir else Path(__file__).parent.parent.parent / "cron_logs" self.log_dir.mkdir(parents=True, exist_ok=True) self._jobs: Dict[int, CronJob] = {} self._init_db() def _init_db(self): """Инициализировать БД.""" conn = sqlite3.connect(self.db_path) c = conn.cursor() # Создаём таблицу со всеми колонками c.execute(''' CREATE TABLE IF NOT EXISTS cron_jobs ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, prompt TEXT NOT NULL, schedule TEXT NOT NULL, user_id INTEGER, enabled INTEGER DEFAULT 1, notify INTEGER DEFAULT 0, log_results INTEGER DEFAULT 1, last_run DATETIME, next_run DATETIME, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') # Проверяем наличие всех колонок (для обратной совместимости) c.execute("PRAGMA table_info(cron_jobs)") columns = [col[1] for col in c.fetchall()] # Миграции для старых БД migrations = { 'prompt': 'ALTER TABLE cron_jobs ADD COLUMN prompt TEXT DEFAULT ""', 'user_id': 'ALTER TABLE cron_jobs ADD COLUMN user_id INTEGER', 'enabled': 'ALTER TABLE cron_jobs ADD COLUMN enabled INTEGER DEFAULT 1', 'notify': 'ALTER TABLE cron_jobs ADD COLUMN notify INTEGER DEFAULT 0', 'log_results': 'ALTER TABLE cron_jobs ADD COLUMN log_results INTEGER DEFAULT 1', 'last_run': 'ALTER TABLE cron_jobs ADD COLUMN last_run DATETIME', 'next_run': 'ALTER TABLE cron_jobs ADD COLUMN next_run DATETIME' } for col_name, alter_query in migrations.items(): if col_name not in columns: logger.info(f"Добавление колонки {col_name} в таблицу cron_jobs") try: c.execute(alter_query) except sqlite3.OperationalError as e: # Игнорируем ошибку если колонка уже существует (race condition) if "duplicate column" not in str(e).lower(): raise conn.commit() conn.close() def _parse_schedule(self, schedule: str) -> Optional[datetime]: """ Распарсить расписание и вернуть следующее время выполнения. Поддерживает: - "*/N * * * *" - каждые N минут - "@hourly" - каждый час - "@daily" - каждый день - "@weekly" - каждую неделю """ now = datetime.now() if schedule.startswith('*/'): # Каждые N минут try: minutes = int(schedule.split()[0][2:]) return now + timedelta(minutes=minutes) except (ValueError, IndexError): return None elif schedule == '@hourly': return now + timedelta(hours=1) elif schedule == '@daily': return now + timedelta(days=1) elif schedule == '@weekly': return now + timedelta(weeks=1) return None async def add_job(self, name: str, prompt: str, schedule: str, user_id: int = None, notify: bool = False, log_results: bool = True) -> ToolResult: """ Добавить интеллектуальную задачу. Args: name: Название задачи prompt: Промпт для ИИ-агента schedule: Расписание (cron format или @daily, @hourly, @weekly) user_id: ID пользователя Telegram notify: Уведомлять ли пользователя в Telegram log_results: Сохранять ли результат в лог """ conn = sqlite3.connect(self.db_path) c = conn.cursor() try: next_run = self._parse_schedule(schedule) next_run_str = next_run.strftime('%Y-%m-%d %H:%M:%S') if next_run else None c.execute(''' INSERT INTO cron_jobs (name, prompt, schedule, user_id, notify, log_results, next_run) VALUES (?, ?, ?, ?, ?, ?, ?) ''', (name, prompt, schedule, user_id, 1 if notify else 0, 1 if log_results else 0, next_run_str)) job_id = c.lastrowid conn.commit() self._jobs[job_id] = CronJob( id=job_id, name=name, prompt=prompt, schedule=schedule, user_id=user_id, notify=notify, log_results=log_results, next_run=next_run ) return ToolResult( success=True, data={'id': job_id, 'name': name, 'prompt': prompt, 'schedule': schedule, 'user_id': user_id, 'next_run': next_run_str}, metadata={'status': 'added', 'notify': notify, 'log_results': log_results} ) except Exception as e: logger.exception(f"Ошибка добавления задачи: {e}") return ToolResult( success=False, error=str(e) ) finally: conn.close() async def list_jobs(self, user_id: int = None) -> ToolResult: """ Получить список всех задач. Args: user_id: ID пользователя для фильтрации (если None - все задачи) """ conn = sqlite3.connect(self.db_path) c = conn.cursor() if user_id: c.execute(''' SELECT id, name, prompt, schedule, user_id, enabled, notify, log_results, last_run, next_run, created_at FROM cron_jobs WHERE user_id = ? ORDER BY id ''', (user_id,)) else: c.execute(''' SELECT id, name, prompt, schedule, user_id, enabled, notify, log_results, last_run, next_run, created_at FROM cron_jobs ORDER BY id ''') rows = c.fetchall() conn.close() jobs = [] for row in rows: jobs.append({ 'id': row[0], 'name': row[1], 'prompt': row[2], 'schedule': row[3], 'user_id': row[4], 'enabled': bool(row[5]), 'notify': bool(row[6]), 'log_results': bool(row[7]), 'last_run': row[8], 'next_run': row[9], 'created_at': row[10] }) return ToolResult( success=True, data=jobs, metadata={'count': len(jobs)} ) async def remove_job(self, job_id: int) -> ToolResult: """Удалить задачу.""" conn = sqlite3.connect(self.db_path) c = conn.cursor() c.execute("DELETE FROM cron_jobs WHERE id = ?", (job_id,)) if c.rowcount == 0: conn.close() return ToolResult( success=False, error=f"Задача не найдена: {job_id}" ) conn.commit() conn.close() if job_id in self._jobs: del self._jobs[job_id] return ToolResult( success=True, data={'id': job_id}, metadata={'status': 'removed'} ) async def toggle_job(self, job_id: int, enabled: bool) -> ToolResult: """Включить/выключить задачу.""" conn = sqlite3.connect(self.db_path) c = conn.cursor() c.execute("UPDATE cron_jobs SET enabled = ? WHERE id = ?", (1 if enabled else 0, job_id)) if c.rowcount == 0: conn.close() return ToolResult( success=False, error=f"Задача не найдена: {job_id}" ) conn.commit() conn.close() return ToolResult( success=True, data={'id': job_id, 'enabled': enabled}, metadata={'status': 'toggled'} ) async def run_job(self, job_id: int, ai_agent=None, user_id: int = None) -> ToolResult: """ Выполнить интеллектуальную задачу через AI-агент. Args: job_id: ID задачи ai_agent: Экземпляр AI-агента для выполнения промпта user_id: ID пользователя для контекста Returns: ToolResult с результатом выполнения """ conn = sqlite3.connect(self.db_path) c = conn.cursor() c.execute("SELECT name, prompt, notify, log_results FROM cron_jobs WHERE id = ?", (job_id,)) row = c.fetchone() if not row: conn.close() return ToolResult( success=False, error=f"Задача не найдена: {job_id}" ) name, prompt, notify, log_results = row conn.close() logger.info(f"🕐 Выполнение задачи #{job_id}: {name}") logger.info(f" Промпт: {prompt}") result_data = { 'id': job_id, 'name': name, 'prompt': prompt, 'executed_at': datetime.now().isoformat() } # Выполняем задачу через AI-агент if ai_agent: try: # Отправляем промпт ИИ-агенту logger.info(f"🤖 Отправка промпта AI-агенту для задачи {name}") # ИИ-агент анализирует промпт и решает какой инструмент использовать decision = await ai_agent.decide(prompt, context={'user_id': user_id}) if decision.should_use_tool: logger.info(f"🔧 AI-агент решил использовать инструмент: {decision.tool_name}") tool_result = await ai_agent.execute_tool(decision.tool_name, **decision.tool_args) result_data['tool_used'] = decision.tool_name result_data['tool_result'] = tool_result.to_dict() if hasattr(tool_result, 'to_dict') else str(tool_result) result_data['success'] = tool_result.success # Формируем результат result_text = f"Задача '{name}' выполнена.\n\n" result_text += f"Использован инструмент: {decision.tool_name}\n" if tool_result.success: result_text += f"Результат: {tool_result.data or 'Успешно'}" else: result_text += f"Ошибка: {tool_result.error}" else: # ИИ решил что инструмент не нужен - выполняем промпт напрямую logger.info(f"ℹ️ AI-агент решил что инструмент не требуется") result_text = f"Задача '{name}' выполнена (без инструментов).\nПромпт: {prompt}" result_data['success'] = True result_data['ai_reasoning'] = decision.reasoning # Сохраняем в лог если нужно if log_results: self._save_to_log(job_id, name, prompt, result_text) # Обновляем last_run conn = sqlite3.connect(self.db_path) c = conn.cursor() c.execute("UPDATE cron_jobs SET last_run = datetime('now') WHERE id = ?", (job_id,)) conn.commit() conn.close() return ToolResult( success=True, data=result_data, metadata={ 'status': 'executed', 'notify': notify, 'log_results': log_results, 'result_text': result_text } ) except Exception as e: logger.exception(f"Ошибка выполнения задачи через AI-агент: {e}") if log_results: self._save_to_log(job_id, name, prompt, f"Ошибка: {e}") return ToolResult( success=False, error=str(e), data=result_data ) else: # AI-агент не предоставлен - просто логируем logger.warning(f"AI-агент не предоставлен, задача {name} не выполнена") if log_results: self._save_to_log(job_id, name, prompt, "Ошибка: AI-агент не предоставлен") return ToolResult( success=False, error="AI-агент не предоставлен", data=result_data ) def _save_to_log(self, job_id: int, job_name: str, prompt: str, result: str): """Сохранить результат выполнения задачи в лог-файл.""" log_file = self.log_dir / f"cron_job_{job_id}_{job_name}.log" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') log_entry = f""" {'='*60} [{timestamp}] Задача: {job_name} (ID: {job_id}) {'='*60} Промпт: {prompt} Результат: {result} """ with open(log_file, 'a', encoding='utf-8') as f: f.write(log_entry) logger.debug(f"Результат задачи {job_name} сохранён в лог: {log_file}") async def execute(self, action: str = "list", ai_agent=None, user_id: int = None, **kwargs) -> ToolResult: """ Выполнить действие с cron задачами. Args: action: Действие - list, add, remove, toggle, run ai_agent: Экземпляр AI-агента (для run) user_id: ID пользователя (для add, run, list) kwargs: Дополнительные аргументы """ actions = { 'list': lambda: self.list_jobs(user_id=user_id), 'add': lambda: self.add_job( name=kwargs.get('name'), prompt=kwargs.get('prompt'), schedule=kwargs.get('schedule'), user_id=user_id, notify=kwargs.get('notify', False), log_results=kwargs.get('log_results', True) ), 'remove': lambda: self.remove_job(job_id=kwargs.get('job_id')), 'toggle': lambda: self.toggle_job( job_id=kwargs.get('job_id'), enabled=kwargs.get('enabled', True) ), 'run': lambda: self.run_job(job_id=kwargs.get('job_id'), ai_agent=ai_agent, user_id=user_id) } if action not in actions: return ToolResult( success=False, error=f"Неизвестное действие: {action}. Доступные: {list(actions.keys())}" ) logger.info(f"Cron действие: {action} с аргументами: {kwargs}") return await actions[action]() # Автоматическая регистрация при импорте @register_tool class CronToolAuto(CronTool): """Авто-регистрируемая версия CronTool.""" pass