185 lines
7.1 KiB
Python
185 lines
7.1 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Cron Scheduler - планировщик задач для автоматического выполнения.
|
||
|
||
Проверяет задачи каждую минуту и выполняет те, у которых наступило время.
|
||
"""
|
||
|
||
import logging
|
||
import asyncio
|
||
from datetime import datetime
|
||
from typing import Optional, Callable, Awaitable
|
||
from pathlib import Path
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class CronScheduler:
|
||
"""
|
||
Планировщик cron-задач.
|
||
|
||
Автоматически проверяет задачи и выполняет их через AI-агент.
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
cron_tool,
|
||
ai_agent,
|
||
send_notification: Optional[Callable[[int, str], Awaitable[None]]] = None
|
||
):
|
||
"""
|
||
Инициализировать планировщик.
|
||
|
||
Args:
|
||
cron_tool: Экземпляр CronTool
|
||
ai_agent: Экземпляр AI-агента для выполнения задач
|
||
send_notification: Асинхронная функция для отправки уведомлений (user_id, message)
|
||
"""
|
||
self.cron_tool = cron_tool
|
||
self.ai_agent = ai_agent
|
||
self.send_notification = send_notification
|
||
self._running = False
|
||
self._task: Optional[asyncio.Task] = None
|
||
self._check_interval = 60 # Проверка каждую минуту
|
||
|
||
async def start(self):
|
||
"""Запустить планировщик в фоновом режиме."""
|
||
if self._running:
|
||
logger.warning("Планировщик уже запущен")
|
||
return
|
||
|
||
self._running = True
|
||
self._task = asyncio.create_task(self._run_loop())
|
||
logger.info("🕐 Планировщик cron-задач запущен")
|
||
|
||
async def stop(self):
|
||
"""Остановить планировщик."""
|
||
self._running = False
|
||
if self._task:
|
||
self._task.cancel()
|
||
try:
|
||
await self._task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
logger.info("🕐 Планировщик cron-задач остановлен")
|
||
|
||
async def _run_loop(self):
|
||
"""Основной цикл планировщика."""
|
||
while self._running:
|
||
try:
|
||
await self._check_and_run_tasks()
|
||
except Exception as e:
|
||
logger.exception(f"Ошибка в цикле планировщика: {e}")
|
||
|
||
await asyncio.sleep(self._check_interval)
|
||
|
||
async def _check_and_run_tasks(self):
|
||
"""Проверить задачи и выполнить те, у которых наступило время."""
|
||
now = datetime.now()
|
||
logger.debug(f"🕐 Проверка задач на {now.strftime('%Y-%m-%d %H:%M:%S')}")
|
||
|
||
# Получаем список всех задач
|
||
result = await self.cron_tool.list_jobs()
|
||
|
||
if not result.success:
|
||
logger.error(f"Ошибка получения списка задач: {result.error}")
|
||
return
|
||
|
||
jobs = result.data
|
||
executed_count = 0
|
||
|
||
for job in jobs:
|
||
if not job.get('enabled'):
|
||
continue
|
||
|
||
next_run_str = job.get('next_run')
|
||
if not next_run_str:
|
||
continue
|
||
|
||
try:
|
||
next_run = datetime.strptime(next_run_str, '%Y-%m-%d %H:%M:%S')
|
||
except ValueError:
|
||
logger.error(f"Ошибка парсинга next_run для задачи {job['id']}: {next_run_str}")
|
||
continue
|
||
|
||
# Если время пришло
|
||
if now >= next_run:
|
||
# ЗАЩИТА ОТ DUPLICATE: проверяем last_run
|
||
last_run_str = job.get('last_run')
|
||
if last_run_str:
|
||
try:
|
||
last_run = datetime.strptime(last_run_str, '%Y-%m-%d %H:%M:%S')
|
||
# Если задача уже выполнялась в этом окне (менее минуты назад) - пропускаем
|
||
if (now - last_run).total_seconds() < 60:
|
||
logger.debug(f"⏭️ Задача #{job['id']} уже выполнена в этом окне, пропускаем")
|
||
continue
|
||
except ValueError:
|
||
pass # Игнорируем ошибку парсинга last_run
|
||
|
||
logger.info(f"⏰ Время задачи #{job['id']}: {job['name']}")
|
||
await self._execute_job(job)
|
||
executed_count += 1
|
||
|
||
if executed_count > 0:
|
||
logger.info(f"✅ Выполнено задач: {executed_count}")
|
||
|
||
async def _execute_job(self, job: dict):
|
||
"""
|
||
Выполнить задачу.
|
||
|
||
Args:
|
||
job: Словарь с данными задачи
|
||
"""
|
||
job_id = job['id']
|
||
job_name = job['name']
|
||
notify = job.get('notify', False)
|
||
log_results = job.get('log_results', True)
|
||
user_id = job.get('user_id') # ID пользователя который создал задачу
|
||
schedule = job.get('schedule', '')
|
||
|
||
# Выполняем задачу через AI-агент
|
||
result = await self.cron_tool.run_job(
|
||
job_id=job_id,
|
||
ai_agent=self.ai_agent,
|
||
user_id=user_id
|
||
)
|
||
|
||
if result.success:
|
||
logger.info(f"✅ Задача '{job_name}' выполнена успешно")
|
||
|
||
# ПЕРЕСЧЁТ NEXT_RUN: обновляем время следующего выполнения
|
||
await self.cron_tool.update_next_run(job_id)
|
||
|
||
# Отправляем уведомление если нужно
|
||
if notify and self.send_notification and user_id:
|
||
result_text = result.metadata.get('result_text', 'Задача выполнена')
|
||
await self.send_notification(user_id, result_text)
|
||
else:
|
||
logger.error(f"❌ Задача '{job_name}' не выполнена: {result.error}")
|
||
|
||
if notify and self.send_notification and user_id:
|
||
await self.send_notification(
|
||
user_id,
|
||
f"❌ **Ошибка задачи '{job_name}':**\n{result.error}"
|
||
)
|
||
|
||
def set_notification_callback(self, callback: Callable[[int, str], Awaitable[None]]):
|
||
"""Установить callback для отправки уведомлений."""
|
||
self.send_notification = callback
|
||
|
||
|
||
# Глобальный планировщик
|
||
scheduler: Optional[CronScheduler] = None
|
||
|
||
|
||
def init_scheduler(cron_tool, ai_agent, send_notification=None) -> CronScheduler:
|
||
"""Инициализировать глобальный планировщик."""
|
||
global scheduler
|
||
scheduler = CronScheduler(cron_tool, ai_agent, send_notification)
|
||
return scheduler
|
||
|
||
|
||
def get_scheduler() -> Optional[CronScheduler]:
|
||
"""Получить глобальный планировщик."""
|
||
return scheduler
|