#!/usr/bin/env python3 """Обработчики файлов для Telegram бота. Функционал: - Прием файлов от пользователя с сохранением в uploads/YYYY-MM-DD/filename - Команда /get filename — отправка файла пользователю - Команда /files [date] — список файлов за дату (сегодня по умолчанию) - Проверка размера файлов (max 20MB) - Логирование операций в uploads/files.log """ import os import logging from pathlib import Path from datetime import datetime, timedelta from typing import Optional from telegram import Update from telegram.ext import CommandHandler, MessageHandler, filters, ContextTypes from bot.config import state_manager from bot.utils.decorators import check_access from vector_memory import save_message logger = logging.getLogger(__name__) # ============================================================================ # КОНСТАНТЫ # ============================================================================ # Максимальный размер файла: 20MB (ограничение Telegram) MAX_FILE_SIZE_DOWNLOAD = 20 * 1024 * 1024 # 20 MB в байтах MAX_FILE_SIZE = 20 * 1024 * 1024 # 20 MB в байтах # Базовая директория для загрузок BASE_DIR = Path(__file__).parent.parent.parent UPLOADS_DIR = BASE_DIR / "uploads" # Файл лога операций LOG_FILE = UPLOADS_DIR / "files.log" # ============================================================================ # ЛОГИРОВАНИЕ # ============================================================================ def log_operation(operation: str, user_id: int, username: str, details: str = ""): """Логирование операций с файлами.""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_entry = f"[{timestamp}] {operation} | user_id={user_id} | username={username}" if details: log_entry += f" | {details}" try: with open(LOG_FILE, "a", encoding="utf-8") as f: f.write(log_entry + "\n") except Exception as e: logger.error(f"Ошибка записи в лог файлов: {e}") def setup_file_logging(): """Настройка отдельного логгера для операций с файлами.""" UPLOADS_DIR.mkdir(parents=True, exist_ok=True) file_handler = logging.FileHandler(LOG_FILE, encoding="utf-8") file_handler.setFormatter(logging.Formatter( "[%(asctime)s] %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S" )) file_logger = logging.getLogger("files") file_logger.addHandler(file_handler) file_logger.setLevel(logging.INFO) return file_logger file_logger = setup_file_logging() # ============================================================================ # ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ # ============================================================================ def get_date_dir(date: Optional[datetime] = None) -> Path: """Получить директорию для указанной даты.""" if date is None: date = datetime.now() date_str = date.strftime("%Y-%m-%d") date_dir = UPLOADS_DIR / date_str # Создаем директорию если нет date_dir.mkdir(parents=True, exist_ok=True) return date_dir def get_file_path(filename: str, date: Optional[datetime] = None) -> Path: """Получить полный путь к файлу.""" date_dir = get_date_dir(date) return date_dir / filename def file_exists(filename: str, date: Optional[datetime] = None) -> bool: """Проверить существование файла.""" file_path = get_file_path(filename, date) return file_path.exists() def list_files_for_date(date: Optional[datetime] = None) -> list: """Получить список файлов за указанную дату.""" date_dir = get_date_dir(date) if not date_dir.exists(): return [] files = [] for item in date_dir.iterdir(): if item.is_file() and item.name != "files.log": stat = item.stat() files.append({ "name": item.name, "size": stat.st_size, "path": str(item), "modified": datetime.fromtimestamp(stat.st_mtime) }) # Сортируем по имени files.sort(key=lambda x: x["name"]) return files def format_file_size(size_bytes: int) -> str: """Форматировать размер файла в человекочитаемом виде.""" for unit in ["B", "KB", "MB", "GB"]: if size_bytes < 1024: return f"{size_bytes:.1f} {unit}" size_bytes /= 1024 return f"{size_bytes:.1f} TB" def parse_date_string(date_str: Optional[str]) -> Optional[datetime]: """Распарсить строку даты.""" if not date_str: return None # Поддерживаемые форматы formats = [ "%Y-%m-%d", # 2024-01-15 "%d.%m.%Y", # 15.01.2024 "%d-%m-%Y", # 15-01-2024 "%Y%m%d", # 20240115 ] for fmt in formats: try: return datetime.strptime(date_str, fmt) except ValueError: continue # Относительные даты if date_str.lower() in ["yesterday", "вчера"]: return datetime.now() - timedelta(days=1) elif date_str.lower() in ["today", "сегодня"]: return datetime.now() return None # ============================================================================ # ОБРАБОТЧИК ФАЙЛОВ # ============================================================================ @check_access async def handle_file_message(update: Update, context: ContextTypes.DEFAULT_TYPE): """Обработка входящих файлов от пользователя.""" user_id = update.effective_user.id username = update.effective_user.username or str(user_id) # Определяем тип файла и получаем файл file = None file_type = None if update.message.document: file = update.message.document file_type = "document" elif update.message.photo: # Берем фото в максимальном разрешении (последнее в списке) file = update.message.photo[-1] file_type = "photo" elif update.message.audio: file = update.message.audio file_type = "audio" elif update.message.voice: file = update.message.voice file_type = "voice" elif update.message.video: file = update.message.video file_type = "video" elif update.message.video_note: file = update.message.video_note file_type = "video_note" elif update.message.sticker: file = update.message.sticker file_type = "sticker" elif update.message.animation: file = update.message.animation file_type = "animation" if not file: return # Получаем информацию о файле file_name = file.file_name or f"file_{datetime.now().strftime('%Y%m%d_%H%M%S')}" file_size = file.file_size or 0 logger.info(f"Получен файл: {file_name}, размер: {file_size}, тип: {file_type}") # Проверка размера файла if file_size > MAX_FILE_SIZE: log_operation("REJECTED_SIZE", user_id, username, f"file={file_name}, size={file_size}") await update.message.reply_text( f"❌ **Файл слишком большой**\n\n" f"Максимальный размер: {format_file_size(MAX_FILE_SIZE)}\n" f"Размер файла: {format_file_size(file_size)}", parse_mode="Markdown" ) return # Создаем директорию для сегодняшней даты date_dir = get_date_dir() file_path = date_dir / file_name # Обработка дубликатов имен if file_path.exists(): base, ext = os.path.splitext(file_name) counter = 1 while file_path.exists(): new_name = f"{base}_{counter}{ext}" file_path = date_dir / new_name counter += 1 file_name = file_path.name logger.info(f"Сохранение файла: {file_path}") # Отправляем статус status_msg = await update.message.reply_text( f"⏳ **Загрузка файла...**\n\n" f"📁 {file_name}\n" f"📊 Размер: {format_file_size(file_size)}", parse_mode="Markdown" ) try: # Скачиваем файл telegram_file = await context.bot.get_file(file.file_id) await telegram_file.download_to_drive(file_path) # Логирование log_operation("UPLOAD", user_id, username, f"file={file_name}, size={file_size}, path={file_path}") file_logger.info(f"UPLOAD: user={username}, file={file_name}, size={file_size}") # === СОХРАНЕНИЕ В ПАМЯТЬ ИИ === # Добавляем информацию о файле в историю диалога и векторную память state = state_manager.get(user_id) if state: # Формируем сообщение о файле с ПОЛНЫМ абсолютным путём! # Это важно чтобы ИИ правильно понимал где файл absolute_path = str(file_path.absolute()) file_info = f"Пользователь загрузил файл: {file_name} (тип: {file_type}, размер: {format_file_size(file_size)}, полный путь: {absolute_path})" # Добавляем в историю диалога state.ai_chat_history.append(f"User: {file_info}") # Сохраняем в векторную память save_message(user_id, "user", file_info) logger.info(f"Информация о файле сохранена в памяти ИИ: {file_name}, путь: {absolute_path}") # =============================== # Обновляем статус await status_msg.edit_text( f"✅ **Файл сохранен!**\n\n" f"📁 **Имя:** `{file_name}`\n" f"📊 **Размер:** {format_file_size(file_size)}\n" f"📂 **Директория:** `{date_dir.name}/`\n" f"📅 **Дата:** {datetime.now().strftime('%Y-%m-%d %H:%M')}\n\n" f"Используйте `/files` для просмотра всех файлов за сегодня.", parse_mode="Markdown" ) except Exception as e: logger.exception(f"Ошибка сохранения файла: {e}") log_operation("UPLOAD_ERROR", user_id, username, f"file={file_name}, error={str(e)}") await status_msg.edit_text( f"❌ **Ошибка сохранения файла**\n\n" f"Файл: `{file_name}`\n" f"Ошибка: `{str(e)}`", parse_mode="Markdown" ) # ============================================================================ # КОМАНДА /GET - ПОЛУЧЕНИЕ ФАЙЛА # ============================================================================ @check_access async def get_file_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """ Команда /get filename - отправка файла пользователю. Использование: /get filename.txt - получить файл за сегодня /get filename.txt 2024-01-15 - получить файл за указанную дату """ user_id = update.effective_user.id username = update.effective_user.username or str(user_id) if not context.args: await update.message.reply_text( "❌ **Использование:**\n\n" "`/get <имя_файла>` - получить файл за сегодня\n" "`/get <имя_файла> <дата>` - получить файл за указанную дату\n\n" "**Примеры:**\n" "`/get document.pdf`\n" "`/get photo.jpg 2024-01-15`\n" "`/get data.csv вчера`", parse_mode="Markdown" ) return # Парсим аргументы filename = context.args[0] date_arg = context.args[1] if len(context.args) > 1 else None file_date = parse_date_string(date_arg) # Проверяем существование файла file_path = get_file_path(filename, file_date) if not file_path.exists(): # Ищем файл во всех директориях за сегодня date_dir = get_date_dir(file_date) await update.message.reply_text( f"❌ **Файл не найден**\n\n" f"Файл: `{filename}`\n" f"Директория: `{date_dir.name}/`\n\n" f"Используйте `/files` для просмотра доступных файлов.", parse_mode="Markdown" ) log_operation("GET_NOT_FOUND", user_id, username, f"file={filename}, date={file_date}") return # Проверка размера перед отправкой file_size = file_path.stat().st_size if file_size > MAX_FILE_SIZE: await update.message.reply_text( f"❌ **Файл слишком большой для отправки**\n\n" f"Максимальный размер: {format_file_size(MAX_FILE_SIZE)}\n" f"Размер файла: {format_file_size(file_size)}", parse_mode="Markdown" ) log_operation("GET_TOO_LARGE", user_id, username, f"file={filename}, size={file_size}") return # Отправляем статус status_msg = await update.message.reply_text( f"⏳ **Отправка файла...**\n\n" f"📁 `{filename}`\n" f"📊 {format_file_size(file_size)}", parse_mode="Markdown" ) try: # Отправляем файл with open(file_path, "rb") as f: await update.message.reply_document( document=f, filename=filename, caption=f"📁 **{filename}**\n📊 {format_file_size(file_size)}", parse_mode="Markdown" ) # Логирование log_operation("GET", user_id, username, f"file={filename}, size={file_size}") file_logger.info(f"GET: user={username}, file={filename}, size={file_size}") # Удаляем статус await status_msg.delete() except Exception as e: logger.exception(f"Ошибка отправки файла: {e}") log_operation("GET_ERROR", user_id, username, f"file={filename}, error={str(e)}") await status_msg.edit_text( f"❌ **Ошибка отправки файла**\n\n" f"Файл: `{filename}`\n" f"Ошибка: `{str(e)}`", parse_mode="Markdown" ) # ============================================================================ # КОМАНДА /FILES - СПИСОК ФАЙЛОВ # ============================================================================ @check_access async def list_files_command(update: Update, context: ContextTypes.DEFAULT_TYPE): """ Команда /files [date] - список файлов за указанную дату. Использование: /files - список файлов за сегодня /files 2024-01-15 - список файлов за указанную дату /files вчера - список файлов за вчера """ user_id = update.effective_user.id username = update.effective_user.username or str(user_id) # Парсим дату из аргументов date_arg = context.args[0] if context.args else None file_date = parse_date_string(date_arg) if file_date is None and date_arg: await update.message.reply_text( f"❌ **Неверный формат даты:** `{date_arg}`\n\n" f"**Поддерживаемые форматы:**\n" f"• `2024-01-15` (YYYY-MM-DD)\n" f"• `15.01.2024` (DD.MM.YYYY)\n" f"• `сегодня` / `today`\n" f"• `вчера` / `yesterday`", parse_mode="Markdown" ) return # Получаем список файлов files = list_files_for_date(file_date) date_str = (file_date or datetime.now()).strftime("%Y-%m-%d") date_dir = get_date_dir(file_date) if not files: await update.message.reply_text( f"📭 **Нет файлов за {date_str}**\n\n" f"Директория: `{date_dir.name}/`\n\n" f"Отправьте файл в чат чтобы сохранить его.", parse_mode="Markdown" ) log_operation("LIST_EMPTY", user_id, username, f"date={date_str}") return # Формируем вывод total_size = sum(f["size"] for f in files) output = f"📁 **Файлы за {date_str}**\n\n" output += f"📂 Директория: `{date_dir.name}/`\n" output += f"📊 Всего файлов: {len(files)}\n" output += f"💾 Общий размер: {format_file_size(total_size)}\n\n" for i, file_info in enumerate(files, 1): name = file_info["name"] size = format_file_size(file_info["size"]) modified = file_info["modified"].strftime("%H:%M") # Обрезаем длинные имена if len(name) > 40: display_name = name[:37] + "..." else: display_name = name output += f"{i}. `{display_name}` - {size} ({modified})\n" output += f"\n💡 **Использование:**\n" output += f"`/get <имя_файла>` - скачать файл\n" output += f"`/get <имя_файла> {date_str}` - скачать файл за дату" await update.message.reply_text(output, parse_mode="Markdown") log_operation("LIST", user_id, username, f"date={date_str}, count={len(files)}") # ============================================================================ # РЕГИСТРАЦИЯ ХЕНДЛЕРОВ # ============================================================================ def register_file_handlers(application): """Регистрация обработчиков файлов в приложении.""" # Обработчик входящих файлов (документы, фото, аудио, видео и т.д.) application.add_handler(MessageHandler( filters.Document.ALL | filters.PHOTO | filters.AUDIO | filters.VOICE | filters.VIDEO | filters.VIDEO_NOTE | filters.Sticker.ALL | filters.ANIMATION, handle_file_message )) # Команда /get - получение файла application.add_handler(CommandHandler("get", get_file_command)) # Команда /files - список файлов application.add_handler(CommandHandler("files", list_files_command)) logger.info("Обработчики файлов зарегистрированы")