feat: add dialogue compaction module for context management

- bot/compaction.py: новый модуль для сжатия истории диалога
- Автоматическая компактификация при 70% заполнении контекста
- Сохранение summary в ChromaDB с структурированным форматом
- Интеграция с handle_ai_task для прозрачной работы
- Сохраняет последние 20 сообщений без изменений
- Структурированный промпт: факты, URL, серверы, настройки, решения

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
mirivlad 2026-02-25 12:09:30 +08:00
parent 2773680da1
commit 1dc40507c6
2 changed files with 528 additions and 0 deletions

31
bot.py
View File

@ -33,6 +33,9 @@ from vector_memory import (
get_memory_stats get_memory_stats
) )
# Импорты компактификации
from bot.compaction import init_compactor, get_compactor, DialogueCompactor
from dotenv import load_dotenv from dotenv import load_dotenv
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, BotCommand from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, BotCommand
from telegram.ext import ( from telegram.ext import (
@ -85,6 +88,7 @@ from bot.tools import tools_registry
# Глобальные менеджеры сессий # Глобальные менеджеры сессий
ssh_session_manager = SSHSessionManager() ssh_session_manager = SSHSessionManager()
local_session_manager = LocalSessionManager() local_session_manager = LocalSessionManager()
compactor: Optional[DialogueCompactor] = None
async def handle_text_message(update: Update, context: ContextTypes.DEFAULT_TYPE): async def handle_text_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
@ -129,6 +133,33 @@ async def handle_ai_task(update: Update, text: str):
user_id = update.effective_user.id user_id = update.effective_user.id
state = state_manager.get(user_id) state = state_manager.get(user_id)
# === ПРОВЕРКА: Нужна ли компактификация? ===
global compactor
if compactor is None:
compactor = init_compactor(qwen_manager, hybrid_memory_manager.vector)
logger.info("Компактор инициализирован")
# Проверяем порог заполненности контекста
if compactor.check_compaction_needed():
logger.info("Запуск компактификации истории диалога...")
status_msg = await update.message.reply_text("🔄 **Запуск компактификации истории...**\n\n_Это может занять несколько секунд._", parse_mode="Markdown")
result = await compactor.compact()
await status_msg.delete()
if result.success:
await update.message.reply_text(
f"✅ **Компактификация завершена!**\n\n"
f"📊 Сжато сообщений: {result.messages_compressed}\n"
f"📝 Длина summary: {result.summary_length} символов\n"
f"💾 Экономия токенов: ~{result.tokens_saved}",
parse_mode="Markdown"
)
else:
logger.error(f"Компактификация не удалась: {result.error}")
await update.message.reply_text(f"⚠️ **Ошибка компактификации:** {result.error}", parse_mode="Markdown")
# Сохраняем сообщение пользователя в памяти # Сохраняем сообщение пользователя в памяти
save_message(user_id, "user", text) save_message(user_id, "user", text)

497
bot/compaction.py Normal file
View File

@ -0,0 +1,497 @@
#!/usr/bin/env python3
"""
Модуль компактификации истории диалога.
Сжимает старую историю диалога в структурированный summary,
сохраняя важные факты, URL, настройки и договорённости.
Архитектура:
1. Извлекаем все сообщения кроме последних 20 из ChromaDB
2. Сжимаем их в структурированный summary через Qwen Code
3. Сохраняем summary как отдельный документ в ChromaDB (type=summary)
4. При загрузке контекста берём summary + последние 20 сообщений
"""
import logging
from pathlib import Path
from datetime import datetime
from typing import Optional, List, Dict, Any, Tuple
from dataclasses import dataclass
logger = logging.getLogger(__name__)
# Константы компактификации
COMPACTION_THRESHOLD_PERCENT = 70 # Порог запуска (70% контекста)
COMPACTION_KEEP_LAST = 20 # Сколько последних сообщений сохранять
COMPACTION_SUMMARY_ID = "dialogue_summary" # ID для summary документа
@dataclass
class CompactionResult:
"""Результат компактификации."""
success: bool
messages_compressed: int = 0
summary_length: int = 0
tokens_saved: int = 0
error: Optional[str] = None
class DialogueCompactor:
"""
Менеджер компактификации истории диалога.
Использует Qwen Code для сжатия истории в структурированный summary.
"""
def __init__(self, qwen_manager=None, vector_memory=None):
"""
Инициализация компактора.
Args:
qwen_manager: QwenCodeManager для выполнения сжатия
vector_memory: VectorMemoryStorage для работы с ChromaDB
"""
self.qwen_manager = qwen_manager
self.vector_memory = vector_memory
self._chroma_client = None
self._collection = None
def _init_chroma(self):
"""Инициализация ChromaDB клиента."""
if self._collection is not None:
return
import chromadb
from chromadb.config import Settings
# Путь к векторной БД
persist_dir = str(Path(__file__).parent / "vector_db")
self._client = chromadb.PersistentClient(
path=persist_dir,
settings=Settings(
anonymized_telemetry=False,
allow_reset=True
)
)
self._collection = self._client.get_or_create_collection(
name="telegram_messages",
metadata={"description": "История диалогов Telegram бота"}
)
logger.info(f"ChromaDB инициализирован для компактификации: {persist_dir}")
def _get_summary(self) -> Optional[str]:
"""
Получить существующий summary из ChromaDB.
Returns:
existing summary или None если не найден
"""
self._init_chroma()
try:
result = self._collection.get(
ids=[COMPACTION_SUMMARY_ID],
include=["documents"]
)
if result["documents"] and len(result["documents"]) > 0:
return result["documents"][0]
return None
except Exception as e:
logger.error(f"Ошибка получения summary: {e}")
return None
def _save_summary(self, summary: str):
"""
Сохранить summary в ChromaDB.
Args:
summary: структурированный summary для сохранения
"""
self._init_chroma()
try:
from sentence_transformers import SentenceTransformer
import os
os.environ["TRANSFORMERS_OFFLINE"] = "1"
os.environ["HF_HUB_OFFLINE"] = "1"
model = SentenceTransformer("all-MiniLM-L6-v2", local_files_only=True)
embedding = model.encode(summary, convert_to_numpy=True).tolist()
# Проверяем есть ли уже summary
existing = self._collection.get(ids=[COMPACTION_SUMMARY_ID])
if existing["ids"]:
# Обновляем существующий
self._collection.update(
ids=[COMPACTION_SUMMARY_ID],
embeddings=[embedding],
documents=[summary],
metadatas=[{
"type": "summary",
"timestamp": datetime.now().isoformat(),
"user_id": "system"
}]
)
logger.info(f"Summary обновлён в ChromaDB (длина: {len(summary)})")
else:
# Добавляем новый
self._collection.add(
ids=[COMPACTION_SUMMARY_ID],
embeddings=[embedding],
documents=[summary],
metadatas=[{
"type": "summary",
"timestamp": datetime.now().isoformat(),
"user_id": "system"
}]
)
logger.info(f"Summary сохранён в ChromaDB (длина: {len(summary)})")
except Exception as e:
logger.error(f"Ошибка сохранения summary: {e}")
raise
def _get_old_messages(self, keep_last: int = COMPACTION_KEEP_LAST) -> List[Dict[str, Any]]:
"""
Получить все сообщения кроме последних keep_last.
Args:
keep_last: количество последних сообщений для сохранения
Returns:
список сообщений для сжатия
"""
self._init_chroma()
try:
# Получаем все сообщения
result = self._collection.get(
include=["documents", "metadatas"]
)
if not result["documents"]:
return []
# Собираем сообщения с метаданными
messages = []
for i, doc in enumerate(result["documents"]):
# Пропускаем summary
if result["metadatas"][i].get("type") == "summary":
continue
messages.append({
"content": doc,
"role": result["metadatas"][i].get("role", "unknown"),
"timestamp": result["metadatas"][i].get("timestamp", ""),
"user_id": result["metadatas"][i].get("user_id", "unknown")
})
# Сортируем по timestamp
messages.sort(key=lambda x: x["timestamp"])
# Возвращаем все кроме последних keep_last
if len(messages) <= keep_last:
return []
return messages[:-keep_last]
except Exception as e:
logger.error(f"Ошибка получения сообщений: {e}")
return []
def _build_compaction_prompt(self, messages: List[Dict[str, Any]]) -> str:
"""
Построить промпт для сжатия истории.
Args:
messages: список сообщений для сжатия
Returns:
промпт для Qwen Code
"""
# Форматируем сообщения в читаемый вид
dialogue_text = ""
for msg in messages:
role = "Пользователь" if msg["role"] == "user" else "Ассистент"
dialogue_text += f"{role}: {msg['content']}\n\n"
prompt = f"""
Ты ассистент для сжатия истории диалога в структурированный summary.
## ЗАДАЧА
Сожми историю диалога в компактный структурированный summary для сохранения контекста.
## ВАЖНО СОХРАНИТЬ:
- 📌 ФАКТЫ: имена, названия, версии, даты, числа
- 🔗 URL: адреса сайтов, домены, ссылки на репозитории
- 🖥 СЕРВЕРЫ: IP-адреса, логины, пароли, имена серверов
- НАСТРОЙКИ: конфигурации, пути к файлам, переменные окружения
- 📋 РЕШЕНИЯ: договорённости, выводы, утверждённые планы
- 🎯 ЗАДАЧИ: что планировалось, что сделано, что в процессе
- 🛠 ИНСТРУМЕНТЫ: названия утилит, команд, библиотек
## ФОРМАТ SUMMARY:
```
=== SUMMARY ДИАЛОГА ===
📅 Период: {datetime.now().strftime('%Y-%m-%d')}
🎯 ОСНОВНАЯ ТЕМА:
[Краткий пересказ основной темы диалога (3-5 предложений)]
📌 КЛЮЧЕВЫЕ ФАКТЫ:
Факт 1
Факт 2
...
🔗 URL И РЕСУРСЫ:
https://...
...
🖥 СЕРВЕРЫ И ДОСТУПЫ:
server: IP, login, password
...
НАСТРОЙКИ И КОНФИГУРАЦИИ:
path/to/config: значение
...
📋 ПРИНЯТЫЕ РЕШЕНИЯ:
Решение 1
Решение 2
...
🎯 ЗАДАЧИ:
Сделано: ...
🔄 В процессе: ...
Запланировано: ...
🛠 ИНСТРУМЕНТЫ И КОМАНДЫ:
команда1 описание
команда2 описание
```
## ДИАЛОГ ДЛЯ СЖАТИЯ:
{dialogue_text}
## ТРЕБОВАНИЯ:
1. Будь краток но информативен
2. Сохраняй все технические детали (команды, пути, URL)
3. Используй маркированные списки для читаемости
4. Не добавляй информацию которой не было в диалоге
5. Выделяй важное эмодзи для быстрого поиска
Сожми диалог в summary согласно формату выше:
"""
return prompt
async def compact(self, keep_last: int = COMPACTION_KEEP_LAST) -> CompactionResult:
"""
Выполнить компактификацию истории диалога.
Args:
keep_last: количество последних сообщений для сохранения
Returns:
CompactionResult с результатами операции
"""
logger.info(f"Начало компактификации (сохраняем последние {keep_last} сообщений)")
try:
# Получаем старые сообщения
old_messages = self._get_old_messages(keep_last)
if not old_messages:
logger.info("Нет сообщений для компактификации")
return CompactionResult(
success=True,
messages_compressed=0,
summary_length=0,
tokens_saved=0
)
messages_count = len(old_messages)
logger.info(f"Найдено {messages_count} сообщений для сжатия")
# Строим промпт
prompt = self._build_compaction_prompt(old_messages)
# Проверяем наличие Qwen Code
if not self.qwen_manager:
logger.error("Qwen manager не инициализирован")
return CompactionResult(
success=False,
error="Qwen manager не инициализирован"
)
# Выполняем сжатие через Qwen Code
output_parts = []
async def on_output(text: str):
output_parts.append(text)
async def on_oauth_url(url: str):
logger.warning(f"OAuth URL: {url}")
logger.info("Запуск Qwen Code для сжатия...")
# Запускаем задачу
await self.qwen_manager.run_task(
user_id=999, # Системный user_id для компактификации
task=prompt,
on_output=on_output,
on_oauth_url=on_oauth_url,
use_system_prompt=False # Не добавляем системный промпт бота
)
summary = "".join(output_parts)
# Очищаем summary от служебных символов
summary = summary.strip()
if not summary:
logger.error("Пустой summary после сжатия")
return CompactionResult(
success=False,
error="Пустой summary после сжатия"
)
# Сохраняем summary в ChromaDB
self._save_summary(summary)
# Оцениваем экономию токенов (примерно)
original_tokens = sum(len(msg["content"]) for msg in old_messages) // 4
summary_tokens = len(summary) // 4
tokens_saved = original_tokens - summary_tokens
logger.info(
f"Компактификация завершена: "
f"сообщений={messages_count}, "
f"длина summary={len(summary)}, "
f"экономия токенов≈{tokens_saved}"
)
return CompactionResult(
success=True,
messages_compressed=messages_count,
summary_length=len(summary),
tokens_saved=tokens_saved
)
except Exception as e:
logger.error(f"Ошибка компактификации: {e}", exc_info=True)
return CompactionResult(
success=False,
error=str(e)
)
def get_context_with_summary(self, limit: int = 20) -> Tuple[Optional[str], List[Dict[str, Any]]]:
"""
Получить контекст с использованием summary.
Args:
limit: количество последних сообщений для загрузки
Returns:
(summary, recent_messages) кортеж
"""
# Получаем summary
summary = self._get_summary()
# Получаем последние сообщения
self._init_chroma()
try:
result = self._collection.get(
include=["documents", "metadatas"],
limit=limit
)
messages = []
if result["documents"]:
for i, doc in enumerate(result["documents"]):
# Пропускаем summary
if result["metadatas"][i].get("type") == "summary":
continue
messages.append({
"content": doc,
"role": result["metadatas"][i].get("role", "unknown"),
"timestamp": result["metadatas"][i].get("timestamp", ""),
"user_id": result["metadatas"][i].get("user_id", "unknown")
})
logger.info(f"Загружен контекст: summary={summary is not None}, сообщений={len(messages)}")
return summary, messages
except Exception as e:
logger.error(f"Ошибка загрузки контекста: {e}")
return None, []
def check_compaction_needed(self, threshold_percent: int = COMPACTION_THRESHOLD_PERCENT) -> bool:
"""
Проверить нужна ли компактификация.
Args:
threshold_percent: порог заполненности контекста (%)
Returns:
True если нужна компактификация
"""
self._init_chroma()
try:
# Получаем количество сообщений
result = self._collection.get(include=[])
if not result["ids"]:
return False
# Считаем сообщения без summary
message_count = 0
for i, id_ in enumerate(result["ids"]):
if id_ != COMPACTION_SUMMARY_ID:
message_count += 1
# Оцениваем заполненность контекста
# Примерный расчёт: 1 сообщение ≈ 100 токенов ≈ 400 символов
# Максимум контекста ≈ 200K токенов ≈ 800K символов
# Для простоты: 1000 сообщений = 100% контекста
context_percent = (message_count / 1000) * 100
logger.info(
f"Проверка компактификации: "
f"сообщений={message_count}, "
f"заполненность={context_percent:.1f}%, "
f"порог={threshold_percent}%"
)
return context_percent >= threshold_percent
except Exception as e:
logger.error(f"Ошибка проверки компактификации: {e}")
return False
# Глобальный экземпляр для использования в боте
compactor: Optional[DialogueCompactor] = None
def init_compactor(qwen_manager=None, vector_memory=None) -> DialogueCompactor:
"""Инициализировать глобальный компактор."""
global compactor
compactor = DialogueCompactor(qwen_manager, vector_memory)
return compactor
def get_compactor() -> DialogueCompactor:
"""Получить глобальный компактор."""
if compactor is None:
raise RuntimeError("Compactor не инициализирован. Вызовите init_compactor().")
return compactor