telegram-cli-bot/bot/services/cron_scheduler.py

169 lines
6.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Cron Scheduler - планировщик задач для автоматического выполнения.
Проверяет задачи каждую минуту и выполняет те, у которых наступило время.
"""
import logging
import asyncio
from datetime import datetime
from typing import Optional, Callable, Awaitable
from pathlib import Path
logger = logging.getLogger(__name__)
class CronScheduler:
"""
Планировщик cron-задач.
Автоматически проверяет задачи и выполняет их через AI-агент.
"""
def __init__(
self,
cron_tool,
ai_agent,
send_notification: Optional[Callable[[int, str], Awaitable[None]]] = None
):
"""
Инициализировать планировщик.
Args:
cron_tool: Экземпляр CronTool
ai_agent: Экземпляр AI-агента для выполнения задач
send_notification: Асинхронная функция для отправки уведомлений (user_id, message)
"""
self.cron_tool = cron_tool
self.ai_agent = ai_agent
self.send_notification = send_notification
self._running = False
self._task: Optional[asyncio.Task] = None
self._check_interval = 60 # Проверка каждую минуту
async def start(self):
"""Запустить планировщик в фоновом режиме."""
if self._running:
logger.warning("Планировщик уже запущен")
return
self._running = True
self._task = asyncio.create_task(self._run_loop())
logger.info("🕐 Планировщик cron-задач запущен")
async def stop(self):
"""Остановить планировщик."""
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
logger.info("🕐 Планировщик cron-задач остановлен")
async def _run_loop(self):
"""Основной цикл планировщика."""
while self._running:
try:
await self._check_and_run_tasks()
except Exception as e:
logger.exception(f"Ошибка в цикле планировщика: {e}")
await asyncio.sleep(self._check_interval)
async def _check_and_run_tasks(self):
"""Проверить задачи и выполнить те, у которых наступило время."""
now = datetime.now()
logger.debug(f"🕐 Проверка задач на {now.strftime('%Y-%m-%d %H:%M:%S')}")
# Получаем список всех задач
result = await self.cron_tool.list_jobs()
if not result.success:
logger.error(f"Ошибка получения списка задач: {result.error}")
return
jobs = result.data
executed_count = 0
for job in jobs:
if not job.get('enabled'):
continue
next_run_str = job.get('next_run')
if not next_run_str:
continue
try:
next_run = datetime.strptime(next_run_str, '%Y-%m-%d %H:%M:%S')
except ValueError:
logger.error(f"Ошибка парсинга next_run для задачи {job['id']}: {next_run_str}")
continue
# Если время пришло
if now >= next_run:
logger.info(f"⏰ Время задачи #{job['id']}: {job['name']}")
await self._execute_job(job)
executed_count += 1
if executed_count > 0:
logger.info(f"✅ Выполнено задач: {executed_count}")
async def _execute_job(self, job: dict):
"""
Выполнить задачу.
Args:
job: Словарь с данными задачи
"""
job_id = job['id']
job_name = job['name']
notify = job.get('notify', False)
log_results = job.get('log_results', True)
user_id = job.get('user_id') # ID пользователя который создал задачу
# Выполняем задачу через AI-агент
result = await self.cron_tool.run_job(
job_id=job_id,
ai_agent=self.ai_agent,
user_id=user_id
)
if result.success:
logger.info(f"✅ Задача '{job_name}' выполнена успешно")
# Отправляем уведомление если нужно
if notify and self.send_notification and user_id:
result_text = result.metadata.get('result_text', 'Задача выполнена')
await self.send_notification(user_id, result_text)
else:
logger.error(f"❌ Задача '{job_name}' не выполнена: {result.error}")
if notify and self.send_notification and user_id:
await self.send_notification(
user_id,
f"❌ **Ошибка задачи '{job_name}':**\n{result.error}"
)
def set_notification_callback(self, callback: Callable[[int, str], Awaitable[None]]):
"""Установить callback для отправки уведомлений."""
self.send_notification = callback
# Глобальный планировщик
scheduler: Optional[CronScheduler] = None
def init_scheduler(cron_tool, ai_agent, send_notification=None) -> CronScheduler:
"""Инициализировать глобальный планировщик."""
global scheduler
scheduler = CronScheduler(cron_tool, ai_agent, send_notification)
return scheduler
def get_scheduler() -> Optional[CronScheduler]:
"""Получить глобальный планировщик."""
return scheduler