telegram-cli-bot/bot/tools/cron_tool.py

555 lines
22 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Cron Tool - инструмент для управления интеллектуальными задачами.
Позволяет создавать, планировать и выполнять периодические задачи через AI-агент.
Задачи хранятся как промпты для ИИ, а не как команды.
"""
import logging
import sqlite3
import json
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Any, Optional, Callable
from dataclasses import dataclass, field
from croniter import croniter
from bot.tools import BaseTool, ToolResult, register_tool
logger = logging.getLogger(__name__)
@dataclass
class CronJob:
"""
Интеллектуальная задача cron.
Attributes:
id: ID задачи
name: Название задачи
prompt: Промпт для ИИ-агента (вместо команды)
schedule: Расписание (cron format: "*/5 * * * *" или "@daily", "@hourly")
enabled: Включена ли задача
user_id: ID пользователя Telegram
notify: Уведомлять ли пользователя в Telegram о результате
log_results: Сохранять ли результат в лог-файл
last_run: Время последнего выполнения
next_run: Время следующего выполнения
created_at: Время создания
"""
id: Optional[int]
name: str
prompt: str # Промпт для ИИ вместо команды
schedule: str
user_id: Optional[int] = None # ID пользователя Telegram
enabled: bool = True
notify: bool = False # Уведомлять пользователя в Telegram
log_results: bool = True # Сохранять в лог
last_run: Optional[datetime] = None
next_run: Optional[datetime] = None
created_at: datetime = field(default_factory=datetime.now)
class CronTool(BaseTool):
"""Инструмент для управления интеллектуальными задачами пользователя."""
name = "cron_tool"
description = "Управление периодическими задачами через AI-агент. Создание, планирование и выполнение задач по расписанию."
category = "automation"
def __init__(self, db_path: str = None, log_dir: str = None):
self.db_path = Path(db_path) if db_path else Path(__file__).parent.parent.parent / "cron.db"
self.log_dir = Path(log_dir) if log_dir else Path(__file__).parent.parent.parent / "cron_logs"
self.log_dir.mkdir(parents=True, exist_ok=True)
self._jobs: Dict[int, CronJob] = {}
self._init_db()
def _init_db(self):
"""Инициализировать БД."""
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
# Создаём таблицу со всеми колонками
c.execute('''
CREATE TABLE IF NOT EXISTS cron_jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
prompt TEXT NOT NULL,
schedule TEXT NOT NULL,
user_id INTEGER,
enabled INTEGER DEFAULT 1,
notify INTEGER DEFAULT 0,
log_results INTEGER DEFAULT 1,
last_run DATETIME,
next_run DATETIME,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
# Проверяем наличие всех колонок (для обратной совместимости)
c.execute("PRAGMA table_info(cron_jobs)")
columns = [col[1] for col in c.fetchall()]
# Миграции для старых БД
migrations = {
'prompt': 'ALTER TABLE cron_jobs ADD COLUMN prompt TEXT DEFAULT ""',
'user_id': 'ALTER TABLE cron_jobs ADD COLUMN user_id INTEGER',
'enabled': 'ALTER TABLE cron_jobs ADD COLUMN enabled INTEGER DEFAULT 1',
'notify': 'ALTER TABLE cron_jobs ADD COLUMN notify INTEGER DEFAULT 0',
'log_results': 'ALTER TABLE cron_jobs ADD COLUMN log_results INTEGER DEFAULT 1',
'last_run': 'ALTER TABLE cron_jobs ADD COLUMN last_run DATETIME',
'next_run': 'ALTER TABLE cron_jobs ADD COLUMN next_run DATETIME'
}
for col_name, alter_query in migrations.items():
if col_name not in columns:
logger.info(f"Добавление колонки {col_name} в таблицу cron_jobs")
try:
c.execute(alter_query)
except sqlite3.OperationalError as e:
# Игнорируем ошибку если колонка уже существует (race condition)
if "duplicate column" not in str(e).lower():
raise
conn.commit()
conn.close()
def _parse_schedule(self, schedule: str, base_time: datetime = None) -> Optional[datetime]:
"""
Распарсить расписание и вернуть следующее время выполнения.
Поддерживает полноценный cron-синтаксис через croniter:
- "*/5 * * * *" - каждые 5 минут
- "0 * * * *" - каждый час в 0 минут
- "0 5 * * *" - каждый день в 05:00
- "0 0 1 * *" - каждый месяц 1 числа в 00:00
- "0 0 * * 0" - каждое воскресенье в 00:00
- "@hourly", "@daily", "@weekly", "@monthly", "@yearly"
Args:
schedule: Cron-выражение или special string
base_time: Базовое время для расчёта (по умолчанию сейчас)
Returns:
Следующее время выполнения или None если ошибка парсинга
"""
if base_time is None:
base_time = datetime.now()
# Поддержка special strings
special_schedules = {
'@hourly': '0 * * * *',
'@daily': '0 0 * * *',
'@midnight': '0 0 * * *',
'@weekly': '0 0 * * 0',
'@monthly': '0 0 1 * *',
'@yearly': '0 0 1 1 *',
'@annually': '0 0 1 1 *'
}
cron_expr = special_schedules.get(schedule.lower(), schedule)
try:
# croniter возвращает следующее время выполнения
cron = croniter(cron_expr, base_time)
next_run = cron.get_next(datetime)
return next_run
except Exception as e:
logger.error(f"Ошибка парсинга cron-расписания '{schedule}': {e}")
return None
def _calculate_next_run(self, schedule: str, last_run: datetime = None) -> Optional[datetime]:
"""
Рассчитать следующее время выполнения на основе last_run.
Args:
schedule: Cron-выражение
last_run: Время последнего выполнения (по умолчанию сейчас)
Returns:
Следующее время выполнения
"""
base_time = last_run if last_run else datetime.now()
return self._parse_schedule(schedule, base_time)
async def add_job(self, name: str, prompt: str, schedule: str, user_id: int = None, notify: bool = False, log_results: bool = True) -> ToolResult:
"""
Добавить интеллектуальную задачу.
Args:
name: Название задачи
prompt: Промпт для ИИ-агента
schedule: Расписание (cron format или @daily, @hourly, @weekly)
user_id: ID пользователя Telegram
notify: Уведомлять ли пользователя в Telegram
log_results: Сохранять ли результат в лог
"""
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
try:
next_run = self._calculate_next_run(schedule)
next_run_str = next_run.strftime('%Y-%m-%d %H:%M:%S') if next_run else None
c.execute('''
INSERT INTO cron_jobs (name, prompt, schedule, user_id, notify, log_results, next_run)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (name, prompt, schedule, user_id, 1 if notify else 0, 1 if log_results else 0, next_run_str))
job_id = c.lastrowid
conn.commit()
self._jobs[job_id] = CronJob(
id=job_id,
name=name,
prompt=prompt,
schedule=schedule,
user_id=user_id,
notify=notify,
log_results=log_results,
next_run=next_run
)
return ToolResult(
success=True,
data={'id': job_id, 'name': name, 'prompt': prompt, 'schedule': schedule, 'user_id': user_id, 'next_run': next_run_str},
metadata={'status': 'added', 'notify': notify, 'log_results': log_results}
)
except Exception as e:
logger.exception(f"Ошибка добавления задачи: {e}")
return ToolResult(
success=False,
error=str(e)
)
finally:
conn.close()
async def update_next_run(self, job_id: int) -> ToolResult:
"""
Пересчитать время следующего выполнения после успешного запуска.
Args:
job_id: ID задачи
Returns:
ToolResult с новым next_run
"""
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
try:
# Получаем текущие данные задачи
c.execute("SELECT schedule, last_run FROM cron_jobs WHERE id = ?", (job_id,))
row = c.fetchone()
if not row:
return ToolResult(
success=False,
error=f"Задача не найдена: {job_id}"
)
schedule, last_run_str = row
# Рассчитываем следующее время выполнения на основе last_run
last_run = datetime.strptime(last_run_str, '%Y-%m-%d %H:%M:%S') if last_run_str else datetime.now()
next_run = self._calculate_next_run(schedule, last_run)
if not next_run:
return ToolResult(
success=False,
error=f"Не удалось рассчитать next_run для расписания '{schedule}'"
)
next_run_str = next_run.strftime('%Y-%m-%d %H:%M:%S')
# Обновляем next_run в БД
c.execute("UPDATE cron_jobs SET next_run = ? WHERE id = ?", (next_run_str, job_id))
conn.commit()
return ToolResult(
success=True,
data={'id': job_id, 'next_run': next_run_str},
metadata={'status': 'next_run_updated'}
)
except Exception as e:
logger.exception(f"Ошибка обновления next_run: {e}")
return ToolResult(
success=False,
error=str(e)
)
finally:
conn.close()
async def list_jobs(self, user_id: int = None) -> ToolResult:
"""
Получить список всех задач.
Args:
user_id: ID пользователя для фильтрации (если None - все задачи)
"""
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
if user_id:
c.execute('''
SELECT id, name, prompt, schedule, user_id, enabled, notify, log_results, last_run, next_run, created_at
FROM cron_jobs WHERE user_id = ? ORDER BY id
''', (user_id,))
else:
c.execute('''
SELECT id, name, prompt, schedule, user_id, enabled, notify, log_results, last_run, next_run, created_at
FROM cron_jobs ORDER BY id
''')
rows = c.fetchall()
conn.close()
jobs = []
for row in rows:
jobs.append({
'id': row[0],
'name': row[1],
'prompt': row[2],
'schedule': row[3],
'user_id': row[4],
'enabled': bool(row[5]),
'notify': bool(row[6]),
'log_results': bool(row[7]),
'last_run': row[8],
'next_run': row[9],
'created_at': row[10]
})
return ToolResult(
success=True,
data=jobs,
metadata={'count': len(jobs)}
)
async def remove_job(self, job_id: int) -> ToolResult:
"""Удалить задачу."""
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
c.execute("DELETE FROM cron_jobs WHERE id = ?", (job_id,))
if c.rowcount == 0:
conn.close()
return ToolResult(
success=False,
error=f"Задача не найдена: {job_id}"
)
conn.commit()
conn.close()
if job_id in self._jobs:
del self._jobs[job_id]
return ToolResult(
success=True,
data={'id': job_id},
metadata={'status': 'removed'}
)
async def toggle_job(self, job_id: int, enabled: bool) -> ToolResult:
"""Включить/выключить задачу."""
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
c.execute("UPDATE cron_jobs SET enabled = ? WHERE id = ?", (1 if enabled else 0, job_id))
if c.rowcount == 0:
conn.close()
return ToolResult(
success=False,
error=f"Задача не найдена: {job_id}"
)
conn.commit()
conn.close()
return ToolResult(
success=True,
data={'id': job_id, 'enabled': enabled},
metadata={'status': 'toggled'}
)
async def run_job(self, job_id: int, ai_agent=None, user_id: int = None) -> ToolResult:
"""
Выполнить интеллектуальную задачу через AI-агент.
Args:
job_id: ID задачи
ai_agent: Экземпляр AI-агента для выполнения промпта
user_id: ID пользователя для контекста
Returns:
ToolResult с результатом выполнения
"""
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
c.execute("SELECT name, prompt, notify, log_results FROM cron_jobs WHERE id = ?", (job_id,))
row = c.fetchone()
if not row:
conn.close()
return ToolResult(
success=False,
error=f"Задача не найдена: {job_id}"
)
name, prompt, notify, log_results = row
conn.close()
logger.info(f"🕐 Выполнение задачи #{job_id}: {name}")
logger.info(f" Промпт: {prompt}")
result_data = {
'id': job_id,
'name': name,
'prompt': prompt,
'executed_at': datetime.now().isoformat()
}
# Выполняем задачу через AI-агент
if ai_agent:
try:
# Отправляем промпт ИИ-агенту
logger.info(f"🤖 Отправка промпта AI-агенту для задачи {name}")
# ИИ-агент анализирует промпт и решает какой инструмент использовать
decision = await ai_agent.decide(prompt, context={'user_id': user_id})
if decision.should_use_tool:
logger.info(f"🔧 AI-агент решил использовать инструмент: {decision.tool_name}")
tool_result = await ai_agent.execute_tool(decision.tool_name, **decision.tool_args)
result_data['tool_used'] = decision.tool_name
result_data['tool_result'] = tool_result.to_dict() if hasattr(tool_result, 'to_dict') else str(tool_result)
result_data['success'] = tool_result.success
# Формируем результат
result_text = f"Задача '{name}' выполнена.\n\n"
result_text += f"Использован инструмент: {decision.tool_name}\n"
if tool_result.success:
result_text += f"Результат: {tool_result.data or 'Успешно'}"
else:
result_text += f"Ошибка: {tool_result.error}"
else:
# ИИ решил что инструмент не нужен - выполняем промпт напрямую
logger.info(f" AI-агент решил что инструмент не требуется")
result_text = f"Задача '{name}' выполнена (без инструментов).\nПромпт: {prompt}"
result_data['success'] = True
result_data['ai_reasoning'] = decision.reasoning
# Сохраняем в лог если нужно
if log_results:
self._save_to_log(job_id, name, prompt, result_text)
# Обновляем last_run
conn = sqlite3.connect(self.db_path)
c = conn.cursor()
c.execute("UPDATE cron_jobs SET last_run = datetime('now') WHERE id = ?", (job_id,))
conn.commit()
conn.close()
return ToolResult(
success=True,
data=result_data,
metadata={
'status': 'executed',
'notify': notify,
'log_results': log_results,
'result_text': result_text
}
)
except Exception as e:
logger.exception(f"Ошибка выполнения задачи через AI-агент: {e}")
if log_results:
self._save_to_log(job_id, name, prompt, f"Ошибка: {e}")
return ToolResult(
success=False,
error=str(e),
data=result_data
)
else:
# AI-агент не предоставлен - просто логируем
logger.warning(f"AI-агент не предоставлен, задача {name} не выполнена")
if log_results:
self._save_to_log(job_id, name, prompt, "Ошибка: AI-агент не предоставлен")
return ToolResult(
success=False,
error="AI-агент не предоставлен",
data=result_data
)
def _save_to_log(self, job_id: int, job_name: str, prompt: str, result: str):
"""Сохранить результат выполнения задачи в лог-файл."""
log_file = self.log_dir / f"cron_job_{job_id}_{job_name}.log"
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log_entry = f"""
{'='*60}
[{timestamp}] Задача: {job_name} (ID: {job_id})
{'='*60}
Промпт:
{prompt}
Результат:
{result}
"""
with open(log_file, 'a', encoding='utf-8') as f:
f.write(log_entry)
logger.debug(f"Результат задачи {job_name} сохранён в лог: {log_file}")
async def execute(self, action: str = "list", ai_agent=None, user_id: int = None, **kwargs) -> ToolResult:
"""
Выполнить действие с cron задачами.
Args:
action: Действие - list, add, remove, toggle, run
ai_agent: Экземпляр AI-агента (для run)
user_id: ID пользователя (для add, run, list)
kwargs: Дополнительные аргументы
"""
actions = {
'list': lambda: self.list_jobs(user_id=user_id),
'add': lambda: self.add_job(
name=kwargs.get('name'),
prompt=kwargs.get('prompt'),
schedule=kwargs.get('schedule'),
user_id=user_id,
notify=kwargs.get('notify', False),
log_results=kwargs.get('log_results', True)
),
'remove': lambda: self.remove_job(job_id=kwargs.get('job_id')),
'toggle': lambda: self.toggle_job(
job_id=kwargs.get('job_id'),
enabled=kwargs.get('enabled', True)
),
'run': lambda: self.run_job(job_id=kwargs.get('job_id'), ai_agent=ai_agent, user_id=user_id)
}
if action not in actions:
return ToolResult(
success=False,
error=f"Неизвестное действие: {action}. Доступные: {list(actions.keys())}"
)
logger.info(f"Cron действие: {action} с аргументами: {kwargs}")
return await actions[action]()
# Автоматическая регистрация при импорте
@register_tool
class CronToolAuto(CronTool):
"""Авто-регистрируемая версия CronTool."""
pass