telegram-cli-bot/bot/compaction.py

519 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Модуль компактификации истории диалога.
Сжимает старую историю диалога в структурированный summary,
сохраняя важные факты, URL, настройки и договорённости.
Архитектура:
1. Извлекаем все сообщения кроме последних 20 из ChromaDB
2. Сжимаем их в структурированный summary через Qwen Code
3. Сохраняем summary как отдельный документ в ChromaDB (type=summary)
4. При загрузке контекста — берём summary + последние 20 сообщений
"""
import logging
from pathlib import Path
from datetime import datetime
from typing import Optional, List, Dict, Any, Tuple
from dataclasses import dataclass
logger = logging.getLogger(__name__)
# Константы компактификации
COMPACTION_THRESHOLD_PERCENT = 70 # Порог запуска (70% контекста)
COMPACTION_KEEP_LAST = 20 # Сколько последних сообщений сохранять
COMPACTION_SUMMARY_ID = "dialogue_summary" # ID для summary документа
@dataclass
class CompactionResult:
"""Результат компактификации."""
success: bool
messages_compressed: int = 0
summary_length: int = 0
tokens_saved: int = 0
error: Optional[str] = None
class DialogueCompactor:
"""
Менеджер компактификации истории диалога.
Использует Qwen Code для сжатия истории в структурированный summary.
"""
def __init__(self, qwen_manager=None, vector_memory=None):
"""
Инициализация компактора.
Args:
qwen_manager: QwenCodeManager для выполнения сжатия
vector_memory: VectorMemoryStorage для работы с ChromaDB
"""
self.qwen_manager = qwen_manager
self.vector_memory = vector_memory
self._chroma_client = None
self._collection = None
def _init_chroma(self):
"""Инициализация ChromaDB клиента."""
if self._collection is not None:
return
import chromadb
from chromadb.config import Settings
# Путь к векторной БД
persist_dir = str(Path(__file__).parent / "vector_db")
self._client = chromadb.PersistentClient(
path=persist_dir,
settings=Settings(
anonymized_telemetry=False,
allow_reset=True
)
)
self._collection = self._client.get_or_create_collection(
name="telegram_messages",
metadata={"description": "История диалогов Telegram бота"}
)
logger.info(f"ChromaDB инициализирован для компактификации: {persist_dir}")
def _get_summary(self) -> Optional[str]:
"""
Получить существующий summary из ChromaDB.
Returns:
existing summary или None если не найден
"""
self._init_chroma()
try:
result = self._collection.get(
ids=[COMPACTION_SUMMARY_ID],
include=["documents"]
)
if result["documents"] and len(result["documents"]) > 0:
return result["documents"][0]
return None
except Exception as e:
logger.error(f"Ошибка получения summary: {e}")
return None
def _save_summary(self, summary: str):
"""
Сохранить summary в ChromaDB.
Args:
summary: структурированный summary для сохранения
"""
self._init_chroma()
try:
from sentence_transformers import SentenceTransformer
import os
os.environ["TRANSFORMERS_OFFLINE"] = "1"
os.environ["HF_HUB_OFFLINE"] = "1"
model = SentenceTransformer("all-MiniLM-L6-v2", local_files_only=True)
embedding = model.encode(summary, convert_to_numpy=True).tolist()
# Проверяем есть ли уже summary
existing = self._collection.get(ids=[COMPACTION_SUMMARY_ID])
if existing["ids"]:
# Обновляем существующий
self._collection.update(
ids=[COMPACTION_SUMMARY_ID],
embeddings=[embedding],
documents=[summary],
metadatas=[{
"type": "summary",
"timestamp": datetime.now().isoformat(),
"user_id": "system"
}]
)
logger.info(f"Summary обновлён в ChromaDB (длина: {len(summary)})")
else:
# Добавляем новый
self._collection.add(
ids=[COMPACTION_SUMMARY_ID],
embeddings=[embedding],
documents=[summary],
metadatas=[{
"type": "summary",
"timestamp": datetime.now().isoformat(),
"user_id": "system"
}]
)
logger.info(f"Summary сохранён в ChromaDB (длина: {len(summary)})")
except Exception as e:
logger.error(f"Ошибка сохранения summary: {e}")
raise
def _get_old_messages(self, keep_last: int = COMPACTION_KEEP_LAST) -> List[Dict[str, Any]]:
"""
Получить все сообщения кроме последних keep_last.
Args:
keep_last: количество последних сообщений для сохранения
Returns:
список сообщений для сжатия
"""
self._init_chroma()
try:
# Получаем все сообщения
result = self._collection.get(
include=["documents", "metadatas"]
)
if not result["documents"]:
return []
# Собираем сообщения с метаданными
messages = []
for i, doc in enumerate(result["documents"]):
# Пропускаем summary
if result["metadatas"][i].get("type") == "summary":
continue
messages.append({
"content": doc,
"role": result["metadatas"][i].get("role", "unknown"),
"timestamp": result["metadatas"][i].get("timestamp", ""),
"user_id": result["metadatas"][i].get("user_id", "unknown")
})
# Сортируем по timestamp
messages.sort(key=lambda x: x["timestamp"])
# Возвращаем все кроме последних keep_last
if len(messages) <= keep_last:
return []
return messages[:-keep_last]
except Exception as e:
logger.error(f"Ошибка получения сообщений: {e}")
return []
def _build_compaction_prompt(self, messages: List[Dict[str, Any]]) -> str:
"""
Построить промпт для сжатия истории.
Args:
messages: список сообщений для сжатия
Returns:
промпт для Qwen Code
"""
# Форматируем сообщения в читаемый вид
dialogue_text = ""
for msg in messages:
role = "Пользователь" if msg["role"] == "user" else "Ассистент"
dialogue_text += f"{role}: {msg['content']}\n\n"
prompt = f"""
Ты — ассистент для сжатия истории диалога в структурированный summary.
## ЗАДАЧА
Сожми историю диалога в компактный структурированный summary для сохранения контекста.
## ВАЖНО СОХРАНИТЬ:
- 📌 ФАКТЫ: имена, названия, версии, даты, числа
- 🔗 URL: адреса сайтов, домены, ссылки на репозитории
- 🖥 СЕРВЕРЫ: IP-адреса, логины, пароли, имена серверов
- ⚙️ НАСТРОЙКИ: конфигурации, пути к файлам, переменные окружения
- 📋 РЕШЕНИЯ: договорённости, выводы, утверждённые планы
- 🎯 ЗАДАЧИ: что планировалось, что сделано, что в процессе
- 🛠 ИНСТРУМЕНТЫ: названия утилит, команд, библиотек
## ФОРМАТ SUMMARY:
```
=== SUMMARY ДИАЛОГА ===
📅 Период: {datetime.now().strftime('%Y-%m-%d')}
🎯 ОСНОВНАЯ ТЕМА:
[Краткий пересказ основной темы диалога (3-5 предложений)]
📌 КЛЮЧЕВЫЕ ФАКТЫ:
• Факт 1
• Факт 2
• ...
🔗 URL И РЕСУРСЫ:
• https://...
• ...
🖥 СЕРВЕРЫ И ДОСТУПЫ:
• server: IP, login, password
• ...
⚙️ НАСТРОЙКИ И КОНФИГУРАЦИИ:
• path/to/config: значение
• ...
📋 ПРИНЯТЫЕ РЕШЕНИЯ:
• Решение 1
• Решение 2
• ...
🎯 ЗАДАЧИ:
✅ Сделано: ...
🔄 В процессе: ...
⏳ Запланировано: ...
🛠 ИНСТРУМЕНТЫ И КОМАНДЫ:
• команда1 — описание
• команда2 — описание
```
## ДИАЛОГ ДЛЯ СЖАТИЯ:
{dialogue_text}
## ТРЕБОВАНИЯ:
1. Будь краток но информативен
2. Сохраняй все технические детали (команды, пути, URL)
3. Используй маркированные списки для читаемости
4. Не добавляй информацию которой не было в диалоге
5. Выделяй важное эмодзи для быстрого поиска
Сожми диалог в summary согласно формату выше:
"""
return prompt
async def compact(self, keep_last: int = COMPACTION_KEEP_LAST) -> CompactionResult:
"""
Выполнить компактификацию истории диалога.
Args:
keep_last: количество последних сообщений для сохранения
Returns:
CompactionResult с результатами операции
"""
logger.info(f"Начало компактификации (сохраняем последние {keep_last} сообщений)")
try:
# Получаем старые сообщения
old_messages = self._get_old_messages(keep_last)
if not old_messages:
logger.info("Нет сообщений для компактификации")
return CompactionResult(
success=True,
messages_compressed=0,
summary_length=0,
tokens_saved=0
)
messages_count = len(old_messages)
logger.info(f"Найдено {messages_count} сообщений для сжатия")
# Строим промпт
prompt = self._build_compaction_prompt(old_messages)
# Проверяем наличие Qwen Code
if not self.qwen_manager:
logger.error("Qwen manager не инициализирован")
return CompactionResult(
success=False,
error="Qwen manager не инициализирован"
)
# Выполняем сжатие через Qwen Code
output_parts = []
async def on_output(text: str):
output_parts.append(text)
async def on_oauth_url(url: str):
logger.warning(f"OAuth URL: {url}")
logger.info("Запуск Qwen Code для сжатия...")
# Запускаем задачу
result = await self.qwen_manager.run_task(
user_id=999, # Системный user_id для компактификации
task=prompt,
on_output=on_output,
on_oauth_url=on_oauth_url,
use_system_prompt=False # Не добавляем системный промпт бота
)
# Парсим результат - извлекаем текст из JSON как в основном коде
import re
summary = "".join(output_parts).strip()
# Извлекаем текст из JSON ответа (как в bot.py)
text_matches = re.findall(r'"text":"([^"]+)"', summary)
if text_matches:
summary = " ".join(text_matches).replace("\\n", "\n")
else:
# Fallback: пробуем найти result поле
try:
import json
for line in summary.split('\n'):
line = line.strip()
if line.startswith('{'):
data = json.loads(line)
if data.get('type') == 'result':
summary = data.get('result', summary)
break
if data.get('result'):
summary = data.get('result', summary)
except Exception:
pass
summary = summary.strip()
if not summary:
logger.error("Пустой summary после сжатия")
return CompactionResult(
success=False,
error="Пустой summary после сжатия"
)
# Сохраняем summary в ChromaDB
self._save_summary(summary)
# Оцениваем экономию токенов (примерно)
original_tokens = sum(len(msg["content"]) for msg in old_messages) // 4
summary_tokens = len(summary) // 4
tokens_saved = original_tokens - summary_tokens
logger.info(
f"Компактификация завершена: "
f"сообщений={messages_count}, "
f"длина summary={len(summary)}, "
f"экономия токенов≈{tokens_saved}"
)
return CompactionResult(
success=True,
messages_compressed=messages_count,
summary_length=len(summary),
tokens_saved=tokens_saved
)
except Exception as e:
logger.error(f"Ошибка компактификации: {e}", exc_info=True)
return CompactionResult(
success=False,
error=str(e)
)
def get_context_with_summary(self, limit: int = 20) -> Tuple[Optional[str], List[Dict[str, Any]]]:
"""
Получить контекст с использованием summary.
Args:
limit: количество последних сообщений для загрузки
Returns:
(summary, recent_messages) кортеж
"""
# Получаем summary
summary = self._get_summary()
# Получаем последние сообщения
self._init_chroma()
try:
result = self._collection.get(
include=["documents", "metadatas"],
limit=limit
)
messages = []
if result["documents"]:
for i, doc in enumerate(result["documents"]):
# Пропускаем summary
if result["metadatas"][i].get("type") == "summary":
continue
messages.append({
"content": doc,
"role": result["metadatas"][i].get("role", "unknown"),
"timestamp": result["metadatas"][i].get("timestamp", ""),
"user_id": result["metadatas"][i].get("user_id", "unknown")
})
logger.info(f"Загружен контекст: summary={summary is not None}, сообщений={len(messages)}")
return summary, messages
except Exception as e:
logger.error(f"Ошибка загрузки контекста: {e}")
return None, []
def check_compaction_needed(self, threshold_percent: int = COMPACTION_THRESHOLD_PERCENT) -> bool:
"""
Проверить нужна ли компактификация.
Args:
threshold_percent: порог заполненности контекста (%)
Returns:
True если нужна компактификация
"""
self._init_chroma()
try:
# Получаем количество сообщений
result = self._collection.get(include=[])
if not result["ids"]:
return False
# Считаем сообщения без summary
message_count = 0
for i, id_ in enumerate(result["ids"]):
if id_ != COMPACTION_SUMMARY_ID:
message_count += 1
# Оцениваем заполненность контекста
# Примерный расчёт: 1 сообщение ≈ 100 токенов ≈ 400 символов
# Максимум контекста ≈ 200K токенов ≈ 800K символов
# Для простоты: 1000 сообщений = 100% контекста
context_percent = (message_count / 1000) * 100
logger.info(
f"Проверка компактификации: "
f"сообщений={message_count}, "
f"заполненность={context_percent:.1f}%, "
f"порог={threshold_percent}%"
)
return context_percent >= threshold_percent
except Exception as e:
logger.error(f"Ошибка проверки компактификации: {e}")
return False
# Глобальный экземпляр для использования в боте
compactor: Optional[DialogueCompactor] = None
def init_compactor(qwen_manager=None, vector_memory=None) -> DialogueCompactor:
"""Инициализировать глобальный компактор."""
global compactor
compactor = DialogueCompactor(qwen_manager, vector_memory)
return compactor
def get_compactor() -> DialogueCompactor:
"""Получить глобальный компактор."""
if compactor is None:
raise RuntimeError("Compactor не инициализирован. Вызовите init_compactor().")
return compactor