diff --git a/bot/services/cron_scheduler.py b/bot/services/cron_scheduler.py index f4de3d1..83e73fd 100644 --- a/bot/services/cron_scheduler.py +++ b/bot/services/cron_scheduler.py @@ -80,7 +80,7 @@ class CronScheduler: # Получаем список всех задач result = await self.cron_tool.list_jobs() - + if not result.success: logger.error(f"Ошибка получения списка задач: {result.error}") return @@ -104,6 +104,18 @@ class CronScheduler: # Если время пришло if now >= next_run: + # ЗАЩИТА ОТ DUPLICATE: проверяем last_run + last_run_str = job.get('last_run') + if last_run_str: + try: + last_run = datetime.strptime(last_run_str, '%Y-%m-%d %H:%M:%S') + # Если задача уже выполнялась в этом окне (менее минуты назад) - пропускаем + if (now - last_run).total_seconds() < 60: + logger.debug(f"⏭️ Задача #{job['id']} уже выполнена в этом окне, пропускаем") + continue + except ValueError: + pass # Игнорируем ошибку парсинга last_run + logger.info(f"⏰ Время задачи #{job['id']}: {job['name']}") await self._execute_job(job) executed_count += 1 @@ -114,7 +126,7 @@ class CronScheduler: async def _execute_job(self, job: dict): """ Выполнить задачу. - + Args: job: Словарь с данными задачи """ @@ -123,6 +135,7 @@ class CronScheduler: notify = job.get('notify', False) log_results = job.get('log_results', True) user_id = job.get('user_id') # ID пользователя который создал задачу + schedule = job.get('schedule', '') # Выполняем задачу через AI-агент result = await self.cron_tool.run_job( @@ -133,14 +146,17 @@ class CronScheduler: if result.success: logger.info(f"✅ Задача '{job_name}' выполнена успешно") - + + # ПЕРЕСЧЁТ NEXT_RUN: обновляем время следующего выполнения + await self.cron_tool.update_next_run(job_id) + # Отправляем уведомление если нужно if notify and self.send_notification and user_id: result_text = result.metadata.get('result_text', 'Задача выполнена') await self.send_notification(user_id, result_text) else: logger.error(f"❌ Задача '{job_name}' не выполнена: {result.error}") - + if notify and self.send_notification and user_id: await self.send_notification( user_id, diff --git a/bot/tools/cron_tool.py b/bot/tools/cron_tool.py index 1d1edfd..349e1c3 100644 --- a/bot/tools/cron_tool.py +++ b/bot/tools/cron_tool.py @@ -10,10 +10,11 @@ import logging import sqlite3 import json from pathlib import Path -from datetime import datetime, timedelta +from datetime import datetime from typing import List, Dict, Any, Optional, Callable from dataclasses import dataclass, field +from croniter import croniter from bot.tools import BaseTool, ToolResult, register_tool logger = logging.getLogger(__name__) @@ -114,41 +115,68 @@ class CronTool(BaseTool): conn.commit() conn.close() - def _parse_schedule(self, schedule: str) -> Optional[datetime]: + def _parse_schedule(self, schedule: str, base_time: datetime = None) -> Optional[datetime]: """ Распарсить расписание и вернуть следующее время выполнения. - Поддерживает: - - "*/N * * * *" - каждые N минут - - "@hourly" - каждый час - - "@daily" - каждый день - - "@weekly" - каждую неделю + Поддерживает полноценный cron-синтаксис через croniter: + - "*/5 * * * *" - каждые 5 минут + - "0 * * * *" - каждый час в 0 минут + - "0 5 * * *" - каждый день в 05:00 + - "0 0 1 * *" - каждый месяц 1 числа в 00:00 + - "0 0 * * 0" - каждое воскресенье в 00:00 + - "@hourly", "@daily", "@weekly", "@monthly", "@yearly" + + Args: + schedule: Cron-выражение или special string + base_time: Базовое время для расчёта (по умолчанию сейчас) + + Returns: + Следующее время выполнения или None если ошибка парсинга """ - now = datetime.now() + if base_time is None: + base_time = datetime.now() - if schedule.startswith('*/'): - # Каждые N минут - try: - minutes = int(schedule.split()[0][2:]) - return now + timedelta(minutes=minutes) - except (ValueError, IndexError): - return None + # Поддержка special strings + special_schedules = { + '@hourly': '0 * * * *', + '@daily': '0 0 * * *', + '@midnight': '0 0 * * *', + '@weekly': '0 0 * * 0', + '@monthly': '0 0 1 * *', + '@yearly': '0 0 1 1 *', + '@annually': '0 0 1 1 *' + } - elif schedule == '@hourly': - return now + timedelta(hours=1) + cron_expr = special_schedules.get(schedule.lower(), schedule) - elif schedule == '@daily': - return now + timedelta(days=1) + try: + # croniter возвращает следующее время выполнения + cron = croniter(cron_expr, base_time) + next_run = cron.get_next(datetime) + return next_run + except Exception as e: + logger.error(f"Ошибка парсинга cron-расписания '{schedule}': {e}") + return None - elif schedule == '@weekly': - return now + timedelta(weeks=1) + def _calculate_next_run(self, schedule: str, last_run: datetime = None) -> Optional[datetime]: + """ + Рассчитать следующее время выполнения на основе last_run. - return None + Args: + schedule: Cron-выражение + last_run: Время последнего выполнения (по умолчанию сейчас) + + Returns: + Следующее время выполнения + """ + base_time = last_run if last_run else datetime.now() + return self._parse_schedule(schedule, base_time) async def add_job(self, name: str, prompt: str, schedule: str, user_id: int = None, notify: bool = False, log_results: bool = True) -> ToolResult: """ Добавить интеллектуальную задачу. - + Args: name: Название задачи prompt: Промпт для ИИ-агента @@ -161,7 +189,7 @@ class CronTool(BaseTool): c = conn.cursor() try: - next_run = self._parse_schedule(schedule) + next_run = self._calculate_next_run(schedule) next_run_str = next_run.strftime('%Y-%m-%d %H:%M:%S') if next_run else None c.execute(''' @@ -198,6 +226,63 @@ class CronTool(BaseTool): finally: conn.close() + async def update_next_run(self, job_id: int) -> ToolResult: + """ + Пересчитать время следующего выполнения после успешного запуска. + + Args: + job_id: ID задачи + + Returns: + ToolResult с новым next_run + """ + conn = sqlite3.connect(self.db_path) + c = conn.cursor() + + try: + # Получаем текущие данные задачи + c.execute("SELECT schedule, last_run FROM cron_jobs WHERE id = ?", (job_id,)) + row = c.fetchone() + + if not row: + return ToolResult( + success=False, + error=f"Задача не найдена: {job_id}" + ) + + schedule, last_run_str = row + + # Рассчитываем следующее время выполнения на основе last_run + last_run = datetime.strptime(last_run_str, '%Y-%m-%d %H:%M:%S') if last_run_str else datetime.now() + next_run = self._calculate_next_run(schedule, last_run) + + if not next_run: + return ToolResult( + success=False, + error=f"Не удалось рассчитать next_run для расписания '{schedule}'" + ) + + next_run_str = next_run.strftime('%Y-%m-%d %H:%M:%S') + + # Обновляем next_run в БД + c.execute("UPDATE cron_jobs SET next_run = ? WHERE id = ?", (next_run_str, job_id)) + conn.commit() + + return ToolResult( + success=True, + data={'id': job_id, 'next_run': next_run_str}, + metadata={'status': 'next_run_updated'} + ) + + except Exception as e: + logger.exception(f"Ошибка обновления next_run: {e}") + return ToolResult( + success=False, + error=str(e) + ) + finally: + conn.close() + async def list_jobs(self, user_id: int = None) -> ToolResult: """ Получить список всех задач. diff --git a/requirements.txt b/requirements.txt index 1f1affc..de150e5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,4 @@ chromadb>=0.4.0 sentence-transformers>=2.2.0 PySocks>=1.7.0 ddgs>=0.3.0 +croniter>=2.0.0