From 1dc40507c687cff9a872bf8e61463685303790bd Mon Sep 17 00:00:00 2001 From: mirivlad Date: Wed, 25 Feb 2026 12:09:30 +0800 Subject: [PATCH] feat: add dialogue compaction module for context management MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - bot/compaction.py: новый модуль для сжатия истории диалога - Автоматическая компактификация при 70% заполнении контекста - Сохранение summary в ChromaDB с структурированным форматом - Интеграция с handle_ai_task для прозрачной работы - Сохраняет последние 20 сообщений без изменений - Структурированный промпт: факты, URL, серверы, настройки, решения Co-authored-by: Qwen-Coder --- bot.py | 31 +++ bot/compaction.py | 497 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 528 insertions(+) create mode 100644 bot/compaction.py diff --git a/bot.py b/bot.py index 22659a8..c5916e5 100644 --- a/bot.py +++ b/bot.py @@ -33,6 +33,9 @@ from vector_memory import ( get_memory_stats ) +# Импорты компактификации +from bot.compaction import init_compactor, get_compactor, DialogueCompactor + from dotenv import load_dotenv from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, BotCommand from telegram.ext import ( @@ -85,6 +88,7 @@ from bot.tools import tools_registry # Глобальные менеджеры сессий ssh_session_manager = SSHSessionManager() local_session_manager = LocalSessionManager() +compactor: Optional[DialogueCompactor] = None async def handle_text_message(update: Update, context: ContextTypes.DEFAULT_TYPE): @@ -129,6 +133,33 @@ async def handle_ai_task(update: Update, text: str): user_id = update.effective_user.id state = state_manager.get(user_id) + # === ПРОВЕРКА: Нужна ли компактификация? === + global compactor + if compactor is None: + compactor = init_compactor(qwen_manager, hybrid_memory_manager.vector) + logger.info("Компактор инициализирован") + + # Проверяем порог заполненности контекста + if compactor.check_compaction_needed(): + logger.info("Запуск компактификации истории диалога...") + status_msg = await update.message.reply_text("🔄 **Запуск компактификации истории...**\n\n_Это может занять несколько секунд._", parse_mode="Markdown") + + result = await compactor.compact() + + await status_msg.delete() + + if result.success: + await update.message.reply_text( + f"✅ **Компактификация завершена!**\n\n" + f"📊 Сжато сообщений: {result.messages_compressed}\n" + f"📝 Длина summary: {result.summary_length} символов\n" + f"💾 Экономия токенов: ~{result.tokens_saved}", + parse_mode="Markdown" + ) + else: + logger.error(f"Компактификация не удалась: {result.error}") + await update.message.reply_text(f"⚠️ **Ошибка компактификации:** {result.error}", parse_mode="Markdown") + # Сохраняем сообщение пользователя в памяти save_message(user_id, "user", text) diff --git a/bot/compaction.py b/bot/compaction.py new file mode 100644 index 0000000..81c0129 --- /dev/null +++ b/bot/compaction.py @@ -0,0 +1,497 @@ +#!/usr/bin/env python3 +""" +Модуль компактификации истории диалога. + +Сжимает старую историю диалога в структурированный summary, +сохраняя важные факты, URL, настройки и договорённости. + +Архитектура: +1. Извлекаем все сообщения кроме последних 20 из ChromaDB +2. Сжимаем их в структурированный summary через Qwen Code +3. Сохраняем summary как отдельный документ в ChromaDB (type=summary) +4. При загрузке контекста — берём summary + последние 20 сообщений +""" + +import logging +from pathlib import Path +from datetime import datetime +from typing import Optional, List, Dict, Any, Tuple +from dataclasses import dataclass + +logger = logging.getLogger(__name__) + +# Константы компактификации +COMPACTION_THRESHOLD_PERCENT = 70 # Порог запуска (70% контекста) +COMPACTION_KEEP_LAST = 20 # Сколько последних сообщений сохранять +COMPACTION_SUMMARY_ID = "dialogue_summary" # ID для summary документа + + +@dataclass +class CompactionResult: + """Результат компактификации.""" + success: bool + messages_compressed: int = 0 + summary_length: int = 0 + tokens_saved: int = 0 + error: Optional[str] = None + + +class DialogueCompactor: + """ + Менеджер компактификации истории диалога. + + Использует Qwen Code для сжатия истории в структурированный summary. + """ + + def __init__(self, qwen_manager=None, vector_memory=None): + """ + Инициализация компактора. + + Args: + qwen_manager: QwenCodeManager для выполнения сжатия + vector_memory: VectorMemoryStorage для работы с ChromaDB + """ + self.qwen_manager = qwen_manager + self.vector_memory = vector_memory + self._chroma_client = None + self._collection = None + + def _init_chroma(self): + """Инициализация ChromaDB клиента.""" + if self._collection is not None: + return + + import chromadb + from chromadb.config import Settings + + # Путь к векторной БД + persist_dir = str(Path(__file__).parent / "vector_db") + + self._client = chromadb.PersistentClient( + path=persist_dir, + settings=Settings( + anonymized_telemetry=False, + allow_reset=True + ) + ) + + self._collection = self._client.get_or_create_collection( + name="telegram_messages", + metadata={"description": "История диалогов Telegram бота"} + ) + + logger.info(f"ChromaDB инициализирован для компактификации: {persist_dir}") + + def _get_summary(self) -> Optional[str]: + """ + Получить существующий summary из ChromaDB. + + Returns: + existing summary или None если не найден + """ + self._init_chroma() + + try: + result = self._collection.get( + ids=[COMPACTION_SUMMARY_ID], + include=["documents"] + ) + + if result["documents"] and len(result["documents"]) > 0: + return result["documents"][0] + return None + except Exception as e: + logger.error(f"Ошибка получения summary: {e}") + return None + + def _save_summary(self, summary: str): + """ + Сохранить summary в ChromaDB. + + Args: + summary: структурированный summary для сохранения + """ + self._init_chroma() + + try: + from sentence_transformers import SentenceTransformer + import os + os.environ["TRANSFORMERS_OFFLINE"] = "1" + os.environ["HF_HUB_OFFLINE"] = "1" + + model = SentenceTransformer("all-MiniLM-L6-v2", local_files_only=True) + embedding = model.encode(summary, convert_to_numpy=True).tolist() + + # Проверяем есть ли уже summary + existing = self._collection.get(ids=[COMPACTION_SUMMARY_ID]) + + if existing["ids"]: + # Обновляем существующий + self._collection.update( + ids=[COMPACTION_SUMMARY_ID], + embeddings=[embedding], + documents=[summary], + metadatas=[{ + "type": "summary", + "timestamp": datetime.now().isoformat(), + "user_id": "system" + }] + ) + logger.info(f"Summary обновлён в ChromaDB (длина: {len(summary)})") + else: + # Добавляем новый + self._collection.add( + ids=[COMPACTION_SUMMARY_ID], + embeddings=[embedding], + documents=[summary], + metadatas=[{ + "type": "summary", + "timestamp": datetime.now().isoformat(), + "user_id": "system" + }] + ) + logger.info(f"Summary сохранён в ChromaDB (длина: {len(summary)})") + + except Exception as e: + logger.error(f"Ошибка сохранения summary: {e}") + raise + + def _get_old_messages(self, keep_last: int = COMPACTION_KEEP_LAST) -> List[Dict[str, Any]]: + """ + Получить все сообщения кроме последних keep_last. + + Args: + keep_last: количество последних сообщений для сохранения + + Returns: + список сообщений для сжатия + """ + self._init_chroma() + + try: + # Получаем все сообщения + result = self._collection.get( + include=["documents", "metadatas"] + ) + + if not result["documents"]: + return [] + + # Собираем сообщения с метаданными + messages = [] + for i, doc in enumerate(result["documents"]): + # Пропускаем summary + if result["metadatas"][i].get("type") == "summary": + continue + + messages.append({ + "content": doc, + "role": result["metadatas"][i].get("role", "unknown"), + "timestamp": result["metadatas"][i].get("timestamp", ""), + "user_id": result["metadatas"][i].get("user_id", "unknown") + }) + + # Сортируем по timestamp + messages.sort(key=lambda x: x["timestamp"]) + + # Возвращаем все кроме последних keep_last + if len(messages) <= keep_last: + return [] + + return messages[:-keep_last] + + except Exception as e: + logger.error(f"Ошибка получения сообщений: {e}") + return [] + + def _build_compaction_prompt(self, messages: List[Dict[str, Any]]) -> str: + """ + Построить промпт для сжатия истории. + + Args: + messages: список сообщений для сжатия + + Returns: + промпт для Qwen Code + """ + # Форматируем сообщения в читаемый вид + dialogue_text = "" + for msg in messages: + role = "Пользователь" if msg["role"] == "user" else "Ассистент" + dialogue_text += f"{role}: {msg['content']}\n\n" + + prompt = f""" +Ты — ассистент для сжатия истории диалога в структурированный summary. + +## ЗАДАЧА +Сожми историю диалога в компактный структурированный summary для сохранения контекста. + +## ВАЖНО СОХРАНИТЬ: +- 📌 ФАКТЫ: имена, названия, версии, даты, числа +- 🔗 URL: адреса сайтов, домены, ссылки на репозитории +- 🖥 СЕРВЕРЫ: IP-адреса, логины, пароли, имена серверов +- ⚙️ НАСТРОЙКИ: конфигурации, пути к файлам, переменные окружения +- 📋 РЕШЕНИЯ: договорённости, выводы, утверждённые планы +- 🎯 ЗАДАЧИ: что планировалось, что сделано, что в процессе +- 🛠 ИНСТРУМЕНТЫ: названия утилит, команд, библиотек + +## ФОРМАТ SUMMARY: + +``` +=== SUMMARY ДИАЛОГА === +📅 Период: {datetime.now().strftime('%Y-%m-%d')} + +🎯 ОСНОВНАЯ ТЕМА: +[Краткий пересказ основной темы диалога (3-5 предложений)] + +📌 КЛЮЧЕВЫЕ ФАКТЫ: +• Факт 1 +• Факт 2 +• ... + +🔗 URL И РЕСУРСЫ: +• https://... +• ... + +🖥 СЕРВЕРЫ И ДОСТУПЫ: +• server: IP, login, password +• ... + +⚙️ НАСТРОЙКИ И КОНФИГУРАЦИИ: +• path/to/config: значение +• ... + +📋 ПРИНЯТЫЕ РЕШЕНИЯ: +• Решение 1 +• Решение 2 +• ... + +🎯 ЗАДАЧИ: +✅ Сделано: ... +🔄 В процессе: ... +⏳ Запланировано: ... + +🛠 ИНСТРУМЕНТЫ И КОМАНДЫ: +• команда1 — описание +• команда2 — описание +``` + +## ДИАЛОГ ДЛЯ СЖАТИЯ: +{dialogue_text} + +## ТРЕБОВАНИЯ: +1. Будь краток но информативен +2. Сохраняй все технические детали (команды, пути, URL) +3. Используй маркированные списки для читаемости +4. Не добавляй информацию которой не было в диалоге +5. Выделяй важное эмодзи для быстрого поиска + +Сожми диалог в summary согласно формату выше: +""" + return prompt + + async def compact(self, keep_last: int = COMPACTION_KEEP_LAST) -> CompactionResult: + """ + Выполнить компактификацию истории диалога. + + Args: + keep_last: количество последних сообщений для сохранения + + Returns: + CompactionResult с результатами операции + """ + logger.info(f"Начало компактификации (сохраняем последние {keep_last} сообщений)") + + try: + # Получаем старые сообщения + old_messages = self._get_old_messages(keep_last) + + if not old_messages: + logger.info("Нет сообщений для компактификации") + return CompactionResult( + success=True, + messages_compressed=0, + summary_length=0, + tokens_saved=0 + ) + + messages_count = len(old_messages) + logger.info(f"Найдено {messages_count} сообщений для сжатия") + + # Строим промпт + prompt = self._build_compaction_prompt(old_messages) + + # Проверяем наличие Qwen Code + if not self.qwen_manager: + logger.error("Qwen manager не инициализирован") + return CompactionResult( + success=False, + error="Qwen manager не инициализирован" + ) + + # Выполняем сжатие через Qwen Code + output_parts = [] + + async def on_output(text: str): + output_parts.append(text) + + async def on_oauth_url(url: str): + logger.warning(f"OAuth URL: {url}") + + logger.info("Запуск Qwen Code для сжатия...") + + # Запускаем задачу + await self.qwen_manager.run_task( + user_id=999, # Системный user_id для компактификации + task=prompt, + on_output=on_output, + on_oauth_url=on_oauth_url, + use_system_prompt=False # Не добавляем системный промпт бота + ) + + summary = "".join(output_parts) + + # Очищаем summary от служебных символов + summary = summary.strip() + + if not summary: + logger.error("Пустой summary после сжатия") + return CompactionResult( + success=False, + error="Пустой summary после сжатия" + ) + + # Сохраняем summary в ChromaDB + self._save_summary(summary) + + # Оцениваем экономию токенов (примерно) + original_tokens = sum(len(msg["content"]) for msg in old_messages) // 4 + summary_tokens = len(summary) // 4 + tokens_saved = original_tokens - summary_tokens + + logger.info( + f"Компактификация завершена: " + f"сообщений={messages_count}, " + f"длина summary={len(summary)}, " + f"экономия токенов≈{tokens_saved}" + ) + + return CompactionResult( + success=True, + messages_compressed=messages_count, + summary_length=len(summary), + tokens_saved=tokens_saved + ) + + except Exception as e: + logger.error(f"Ошибка компактификации: {e}", exc_info=True) + return CompactionResult( + success=False, + error=str(e) + ) + + def get_context_with_summary(self, limit: int = 20) -> Tuple[Optional[str], List[Dict[str, Any]]]: + """ + Получить контекст с использованием summary. + + Args: + limit: количество последних сообщений для загрузки + + Returns: + (summary, recent_messages) кортеж + """ + # Получаем summary + summary = self._get_summary() + + # Получаем последние сообщения + self._init_chroma() + + try: + result = self._collection.get( + include=["documents", "metadatas"], + limit=limit + ) + + messages = [] + if result["documents"]: + for i, doc in enumerate(result["documents"]): + # Пропускаем summary + if result["metadatas"][i].get("type") == "summary": + continue + + messages.append({ + "content": doc, + "role": result["metadatas"][i].get("role", "unknown"), + "timestamp": result["metadatas"][i].get("timestamp", ""), + "user_id": result["metadatas"][i].get("user_id", "unknown") + }) + + logger.info(f"Загружен контекст: summary={summary is not None}, сообщений={len(messages)}") + return summary, messages + + except Exception as e: + logger.error(f"Ошибка загрузки контекста: {e}") + return None, [] + + def check_compaction_needed(self, threshold_percent: int = COMPACTION_THRESHOLD_PERCENT) -> bool: + """ + Проверить нужна ли компактификация. + + Args: + threshold_percent: порог заполненности контекста (%) + + Returns: + True если нужна компактификация + """ + self._init_chroma() + + try: + # Получаем количество сообщений + result = self._collection.get(include=[]) + + if not result["ids"]: + return False + + # Считаем сообщения без summary + message_count = 0 + for i, id_ in enumerate(result["ids"]): + if id_ != COMPACTION_SUMMARY_ID: + message_count += 1 + + # Оцениваем заполненность контекста + # Примерный расчёт: 1 сообщение ≈ 100 токенов ≈ 400 символов + # Максимум контекста ≈ 200K токенов ≈ 800K символов + # Для простоты: 1000 сообщений = 100% контекста + + context_percent = (message_count / 1000) * 100 + + logger.info( + f"Проверка компактификации: " + f"сообщений={message_count}, " + f"заполненность={context_percent:.1f}%, " + f"порог={threshold_percent}%" + ) + + return context_percent >= threshold_percent + + except Exception as e: + logger.error(f"Ошибка проверки компактификации: {e}") + return False + + +# Глобальный экземпляр для использования в боте +compactor: Optional[DialogueCompactor] = None + + +def init_compactor(qwen_manager=None, vector_memory=None) -> DialogueCompactor: + """Инициализировать глобальный компактор.""" + global compactor + compactor = DialogueCompactor(qwen_manager, vector_memory) + return compactor + + +def get_compactor() -> DialogueCompactor: + """Получить глобальный компактор.""" + if compactor is None: + raise RuntimeError("Compactor не инициализирован. Вызовите init_compactor().") + return compactor