#!/usr/bin/env python3 """ Модуль компактификации истории диалога. Сжимает старую историю диалога в структурированный summary, сохраняя важные факты, URL, настройки и договорённости. Архитектура: 1. Извлекаем все сообщения кроме последних 20 из ChromaDB 2. Сжимаем их в структурированный summary через Qwen Code 3. Сохраняем summary как отдельный документ в ChromaDB (type=summary) 4. При загрузке контекста — берём summary + последние 20 сообщений """ import logging from pathlib import Path from datetime import datetime from typing import Optional, List, Dict, Any, Tuple from dataclasses import dataclass logger = logging.getLogger(__name__) # Константы компактификации COMPACTION_THRESHOLD_PERCENT = 70 # Порог запуска (70% контекста) COMPACTION_KEEP_LAST = 20 # Сколько последних сообщений сохранять COMPACTION_SUMMARY_ID = "dialogue_summary" # ID для summary документа @dataclass class CompactionResult: """Результат компактификации.""" success: bool messages_compressed: int = 0 summary_length: int = 0 tokens_saved: int = 0 error: Optional[str] = None class DialogueCompactor: """ Менеджер компактификации истории диалога. Использует Qwen Code для сжатия истории в структурированный summary. """ def __init__(self, qwen_manager=None, vector_memory=None): """ Инициализация компактора. Args: qwen_manager: QwenCodeManager для выполнения сжатия vector_memory: VectorMemoryStorage для работы с ChromaDB """ self.qwen_manager = qwen_manager self.vector_memory = vector_memory self._chroma_client = None self._collection = None def _init_chroma(self): """Инициализация ChromaDB клиента.""" if self._collection is not None: return import chromadb from chromadb.config import Settings # Путь к векторной БД persist_dir = str(Path(__file__).parent / "vector_db") self._client = chromadb.PersistentClient( path=persist_dir, settings=Settings( anonymized_telemetry=False, allow_reset=True ) ) self._collection = self._client.get_or_create_collection( name="telegram_messages", metadata={"description": "История диалогов Telegram бота"} ) logger.info(f"ChromaDB инициализирован для компактификации: {persist_dir}") def _get_summary(self) -> Optional[str]: """ Получить существующий summary из ChromaDB. Returns: existing summary или None если не найден """ self._init_chroma() try: result = self._collection.get( ids=[COMPACTION_SUMMARY_ID], include=["documents"] ) if result["documents"] and len(result["documents"]) > 0: return result["documents"][0] return None except Exception as e: logger.error(f"Ошибка получения summary: {e}") return None def _save_summary(self, summary: str): """ Сохранить summary в ChromaDB. Args: summary: структурированный summary для сохранения """ self._init_chroma() try: from sentence_transformers import SentenceTransformer import os os.environ["TRANSFORMERS_OFFLINE"] = "1" os.environ["HF_HUB_OFFLINE"] = "1" model = SentenceTransformer("all-MiniLM-L6-v2", local_files_only=True) embedding = model.encode(summary, convert_to_numpy=True).tolist() # Проверяем есть ли уже summary existing = self._collection.get(ids=[COMPACTION_SUMMARY_ID]) if existing["ids"]: # Обновляем существующий self._collection.update( ids=[COMPACTION_SUMMARY_ID], embeddings=[embedding], documents=[summary], metadatas=[{ "type": "summary", "timestamp": datetime.now().isoformat(), "user_id": "system" }] ) logger.info(f"Summary обновлён в ChromaDB (длина: {len(summary)})") else: # Добавляем новый self._collection.add( ids=[COMPACTION_SUMMARY_ID], embeddings=[embedding], documents=[summary], metadatas=[{ "type": "summary", "timestamp": datetime.now().isoformat(), "user_id": "system" }] ) logger.info(f"Summary сохранён в ChromaDB (длина: {len(summary)})") except Exception as e: logger.error(f"Ошибка сохранения summary: {e}") raise def _get_old_messages(self, keep_last: int = COMPACTION_KEEP_LAST) -> List[Dict[str, Any]]: """ Получить все сообщения кроме последних keep_last. Args: keep_last: количество последних сообщений для сохранения Returns: список сообщений для сжатия """ self._init_chroma() try: # Получаем все сообщения result = self._collection.get( include=["documents", "metadatas"] ) if not result["documents"]: return [] # Собираем сообщения с метаданными messages = [] for i, doc in enumerate(result["documents"]): # Пропускаем summary if result["metadatas"][i].get("type") == "summary": continue messages.append({ "content": doc, "role": result["metadatas"][i].get("role", "unknown"), "timestamp": result["metadatas"][i].get("timestamp", ""), "user_id": result["metadatas"][i].get("user_id", "unknown") }) # Сортируем по timestamp messages.sort(key=lambda x: x["timestamp"]) # Возвращаем все кроме последних keep_last if len(messages) <= keep_last: return [] return messages[:-keep_last] except Exception as e: logger.error(f"Ошибка получения сообщений: {e}") return [] def _build_compaction_prompt(self, messages: List[Dict[str, Any]]) -> str: """ Построить промпт для сжатия истории. Args: messages: список сообщений для сжатия Returns: промпт для Qwen Code """ # Форматируем сообщения в читаемый вид dialogue_text = "" for msg in messages: role = "Пользователь" if msg["role"] == "user" else "Ассистент" dialogue_text += f"{role}: {msg['content']}\n\n" prompt = f""" Ты — ассистент для сжатия истории диалога в структурированный summary. ## ЗАДАЧА Сожми историю диалога в компактный структурированный summary для сохранения контекста. ## ВАЖНО СОХРАНИТЬ: - 📌 ФАКТЫ: имена, названия, версии, даты, числа - 🔗 URL: адреса сайтов, домены, ссылки на репозитории - 🖥 СЕРВЕРЫ: IP-адреса, логины, пароли, имена серверов - ⚙️ НАСТРОЙКИ: конфигурации, пути к файлам, переменные окружения - 📋 РЕШЕНИЯ: договорённости, выводы, утверждённые планы - 🎯 ЗАДАЧИ: что планировалось, что сделано, что в процессе - 🛠 ИНСТРУМЕНТЫ: названия утилит, команд, библиотек ## ФОРМАТ SUMMARY: ``` === SUMMARY ДИАЛОГА === 📅 Период: {datetime.now().strftime('%Y-%m-%d')} 🎯 ОСНОВНАЯ ТЕМА: [Краткий пересказ основной темы диалога (3-5 предложений)] 📌 КЛЮЧЕВЫЕ ФАКТЫ: • Факт 1 • Факт 2 • ... 🔗 URL И РЕСУРСЫ: • https://... • ... 🖥 СЕРВЕРЫ И ДОСТУПЫ: • server: IP, login, password • ... ⚙️ НАСТРОЙКИ И КОНФИГУРАЦИИ: • path/to/config: значение • ... 📋 ПРИНЯТЫЕ РЕШЕНИЯ: • Решение 1 • Решение 2 • ... 🎯 ЗАДАЧИ: ✅ Сделано: ... 🔄 В процессе: ... ⏳ Запланировано: ... 🛠 ИНСТРУМЕНТЫ И КОМАНДЫ: • команда1 — описание • команда2 — описание ``` ## ДИАЛОГ ДЛЯ СЖАТИЯ: {dialogue_text} ## ТРЕБОВАНИЯ: 1. Будь краток но информативен 2. Сохраняй все технические детали (команды, пути, URL) 3. Используй маркированные списки для читаемости 4. Не добавляй информацию которой не было в диалоге 5. Выделяй важное эмодзи для быстрого поиска Сожми диалог в summary согласно формату выше: """ return prompt async def compact(self, keep_last: int = COMPACTION_KEEP_LAST) -> CompactionResult: """ Выполнить компактификацию истории диалога. Args: keep_last: количество последних сообщений для сохранения Returns: CompactionResult с результатами операции """ logger.info(f"Начало компактификации (сохраняем последние {keep_last} сообщений)") try: # Получаем старые сообщения old_messages = self._get_old_messages(keep_last) if not old_messages: logger.info("Нет сообщений для компактификации") return CompactionResult( success=True, messages_compressed=0, summary_length=0, tokens_saved=0 ) messages_count = len(old_messages) logger.info(f"Найдено {messages_count} сообщений для сжатия") # Строим промпт prompt = self._build_compaction_prompt(old_messages) # Проверяем наличие Qwen Code if not self.qwen_manager: logger.error("Qwen manager не инициализирован") return CompactionResult( success=False, error="Qwen manager не инициализирован" ) # Выполняем сжатие через Qwen Code output_parts = [] async def on_output(text: str): output_parts.append(text) async def on_oauth_url(url: str): logger.warning(f"OAuth URL: {url}") logger.info("Запуск Qwen Code для сжатия...") # Запускаем задачу await self.qwen_manager.run_task( user_id=999, # Системный user_id для компактификации task=prompt, on_output=on_output, on_oauth_url=on_oauth_url, use_system_prompt=False # Не добавляем системный промпт бота ) summary = "".join(output_parts) # Очищаем summary от служебных символов summary = summary.strip() if not summary: logger.error("Пустой summary после сжатия") return CompactionResult( success=False, error="Пустой summary после сжатия" ) # Сохраняем summary в ChromaDB self._save_summary(summary) # Оцениваем экономию токенов (примерно) original_tokens = sum(len(msg["content"]) for msg in old_messages) // 4 summary_tokens = len(summary) // 4 tokens_saved = original_tokens - summary_tokens logger.info( f"Компактификация завершена: " f"сообщений={messages_count}, " f"длина summary={len(summary)}, " f"экономия токенов≈{tokens_saved}" ) return CompactionResult( success=True, messages_compressed=messages_count, summary_length=len(summary), tokens_saved=tokens_saved ) except Exception as e: logger.error(f"Ошибка компактификации: {e}", exc_info=True) return CompactionResult( success=False, error=str(e) ) def get_context_with_summary(self, limit: int = 20) -> Tuple[Optional[str], List[Dict[str, Any]]]: """ Получить контекст с использованием summary. Args: limit: количество последних сообщений для загрузки Returns: (summary, recent_messages) кортеж """ # Получаем summary summary = self._get_summary() # Получаем последние сообщения self._init_chroma() try: result = self._collection.get( include=["documents", "metadatas"], limit=limit ) messages = [] if result["documents"]: for i, doc in enumerate(result["documents"]): # Пропускаем summary if result["metadatas"][i].get("type") == "summary": continue messages.append({ "content": doc, "role": result["metadatas"][i].get("role", "unknown"), "timestamp": result["metadatas"][i].get("timestamp", ""), "user_id": result["metadatas"][i].get("user_id", "unknown") }) logger.info(f"Загружен контекст: summary={summary is not None}, сообщений={len(messages)}") return summary, messages except Exception as e: logger.error(f"Ошибка загрузки контекста: {e}") return None, [] def check_compaction_needed(self, threshold_percent: int = COMPACTION_THRESHOLD_PERCENT) -> bool: """ Проверить нужна ли компактификация. Args: threshold_percent: порог заполненности контекста (%) Returns: True если нужна компактификация """ self._init_chroma() try: # Получаем количество сообщений result = self._collection.get(include=[]) if not result["ids"]: return False # Считаем сообщения без summary message_count = 0 for i, id_ in enumerate(result["ids"]): if id_ != COMPACTION_SUMMARY_ID: message_count += 1 # Оцениваем заполненность контекста # Примерный расчёт: 1 сообщение ≈ 100 токенов ≈ 400 символов # Максимум контекста ≈ 200K токенов ≈ 800K символов # Для простоты: 1000 сообщений = 100% контекста context_percent = (message_count / 1000) * 100 logger.info( f"Проверка компактификации: " f"сообщений={message_count}, " f"заполненность={context_percent:.1f}%, " f"порог={threshold_percent}%" ) return context_percent >= threshold_percent except Exception as e: logger.error(f"Ошибка проверки компактификации: {e}") return False # Глобальный экземпляр для использования в боте compactor: Optional[DialogueCompactor] = None def init_compactor(qwen_manager=None, vector_memory=None) -> DialogueCompactor: """Инициализировать глобальный компактор.""" global compactor compactor = DialogueCompactor(qwen_manager, vector_memory) return compactor def get_compactor() -> DialogueCompactor: """Получить глобальный компактор.""" if compactor is None: raise RuntimeError("Compactor не инициализирован. Вызовите init_compactor().") return compactor