498 lines
18 KiB
Python
498 lines
18 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Модуль компактификации истории диалога.
|
||
|
||
Сжимает старую историю диалога в структурированный summary,
|
||
сохраняя важные факты, URL, настройки и договорённости.
|
||
|
||
Архитектура:
|
||
1. Извлекаем все сообщения кроме последних 20 из ChromaDB
|
||
2. Сжимаем их в структурированный summary через Qwen Code
|
||
3. Сохраняем summary как отдельный документ в ChromaDB (type=summary)
|
||
4. При загрузке контекста — берём summary + последние 20 сообщений
|
||
"""
|
||
|
||
import logging
|
||
from pathlib import Path
|
||
from datetime import datetime
|
||
from typing import Optional, List, Dict, Any, Tuple
|
||
from dataclasses import dataclass
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Константы компактификации
|
||
COMPACTION_THRESHOLD_PERCENT = 70 # Порог запуска (70% контекста)
|
||
COMPACTION_KEEP_LAST = 20 # Сколько последних сообщений сохранять
|
||
COMPACTION_SUMMARY_ID = "dialogue_summary" # ID для summary документа
|
||
|
||
|
||
@dataclass
|
||
class CompactionResult:
|
||
"""Результат компактификации."""
|
||
success: bool
|
||
messages_compressed: int = 0
|
||
summary_length: int = 0
|
||
tokens_saved: int = 0
|
||
error: Optional[str] = None
|
||
|
||
|
||
class DialogueCompactor:
|
||
"""
|
||
Менеджер компактификации истории диалога.
|
||
|
||
Использует Qwen Code для сжатия истории в структурированный summary.
|
||
"""
|
||
|
||
def __init__(self, qwen_manager=None, vector_memory=None):
|
||
"""
|
||
Инициализация компактора.
|
||
|
||
Args:
|
||
qwen_manager: QwenCodeManager для выполнения сжатия
|
||
vector_memory: VectorMemoryStorage для работы с ChromaDB
|
||
"""
|
||
self.qwen_manager = qwen_manager
|
||
self.vector_memory = vector_memory
|
||
self._chroma_client = None
|
||
self._collection = None
|
||
|
||
def _init_chroma(self):
|
||
"""Инициализация ChromaDB клиента."""
|
||
if self._collection is not None:
|
||
return
|
||
|
||
import chromadb
|
||
from chromadb.config import Settings
|
||
|
||
# Путь к векторной БД
|
||
persist_dir = str(Path(__file__).parent / "vector_db")
|
||
|
||
self._client = chromadb.PersistentClient(
|
||
path=persist_dir,
|
||
settings=Settings(
|
||
anonymized_telemetry=False,
|
||
allow_reset=True
|
||
)
|
||
)
|
||
|
||
self._collection = self._client.get_or_create_collection(
|
||
name="telegram_messages",
|
||
metadata={"description": "История диалогов Telegram бота"}
|
||
)
|
||
|
||
logger.info(f"ChromaDB инициализирован для компактификации: {persist_dir}")
|
||
|
||
def _get_summary(self) -> Optional[str]:
|
||
"""
|
||
Получить существующий summary из ChromaDB.
|
||
|
||
Returns:
|
||
existing summary или None если не найден
|
||
"""
|
||
self._init_chroma()
|
||
|
||
try:
|
||
result = self._collection.get(
|
||
ids=[COMPACTION_SUMMARY_ID],
|
||
include=["documents"]
|
||
)
|
||
|
||
if result["documents"] and len(result["documents"]) > 0:
|
||
return result["documents"][0]
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"Ошибка получения summary: {e}")
|
||
return None
|
||
|
||
def _save_summary(self, summary: str):
|
||
"""
|
||
Сохранить summary в ChromaDB.
|
||
|
||
Args:
|
||
summary: структурированный summary для сохранения
|
||
"""
|
||
self._init_chroma()
|
||
|
||
try:
|
||
from sentence_transformers import SentenceTransformer
|
||
import os
|
||
os.environ["TRANSFORMERS_OFFLINE"] = "1"
|
||
os.environ["HF_HUB_OFFLINE"] = "1"
|
||
|
||
model = SentenceTransformer("all-MiniLM-L6-v2", local_files_only=True)
|
||
embedding = model.encode(summary, convert_to_numpy=True).tolist()
|
||
|
||
# Проверяем есть ли уже summary
|
||
existing = self._collection.get(ids=[COMPACTION_SUMMARY_ID])
|
||
|
||
if existing["ids"]:
|
||
# Обновляем существующий
|
||
self._collection.update(
|
||
ids=[COMPACTION_SUMMARY_ID],
|
||
embeddings=[embedding],
|
||
documents=[summary],
|
||
metadatas=[{
|
||
"type": "summary",
|
||
"timestamp": datetime.now().isoformat(),
|
||
"user_id": "system"
|
||
}]
|
||
)
|
||
logger.info(f"Summary обновлён в ChromaDB (длина: {len(summary)})")
|
||
else:
|
||
# Добавляем новый
|
||
self._collection.add(
|
||
ids=[COMPACTION_SUMMARY_ID],
|
||
embeddings=[embedding],
|
||
documents=[summary],
|
||
metadatas=[{
|
||
"type": "summary",
|
||
"timestamp": datetime.now().isoformat(),
|
||
"user_id": "system"
|
||
}]
|
||
)
|
||
logger.info(f"Summary сохранён в ChromaDB (длина: {len(summary)})")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Ошибка сохранения summary: {e}")
|
||
raise
|
||
|
||
def _get_old_messages(self, keep_last: int = COMPACTION_KEEP_LAST) -> List[Dict[str, Any]]:
|
||
"""
|
||
Получить все сообщения кроме последних keep_last.
|
||
|
||
Args:
|
||
keep_last: количество последних сообщений для сохранения
|
||
|
||
Returns:
|
||
список сообщений для сжатия
|
||
"""
|
||
self._init_chroma()
|
||
|
||
try:
|
||
# Получаем все сообщения
|
||
result = self._collection.get(
|
||
include=["documents", "metadatas"]
|
||
)
|
||
|
||
if not result["documents"]:
|
||
return []
|
||
|
||
# Собираем сообщения с метаданными
|
||
messages = []
|
||
for i, doc in enumerate(result["documents"]):
|
||
# Пропускаем summary
|
||
if result["metadatas"][i].get("type") == "summary":
|
||
continue
|
||
|
||
messages.append({
|
||
"content": doc,
|
||
"role": result["metadatas"][i].get("role", "unknown"),
|
||
"timestamp": result["metadatas"][i].get("timestamp", ""),
|
||
"user_id": result["metadatas"][i].get("user_id", "unknown")
|
||
})
|
||
|
||
# Сортируем по timestamp
|
||
messages.sort(key=lambda x: x["timestamp"])
|
||
|
||
# Возвращаем все кроме последних keep_last
|
||
if len(messages) <= keep_last:
|
||
return []
|
||
|
||
return messages[:-keep_last]
|
||
|
||
except Exception as e:
|
||
logger.error(f"Ошибка получения сообщений: {e}")
|
||
return []
|
||
|
||
def _build_compaction_prompt(self, messages: List[Dict[str, Any]]) -> str:
|
||
"""
|
||
Построить промпт для сжатия истории.
|
||
|
||
Args:
|
||
messages: список сообщений для сжатия
|
||
|
||
Returns:
|
||
промпт для Qwen Code
|
||
"""
|
||
# Форматируем сообщения в читаемый вид
|
||
dialogue_text = ""
|
||
for msg in messages:
|
||
role = "Пользователь" if msg["role"] == "user" else "Ассистент"
|
||
dialogue_text += f"{role}: {msg['content']}\n\n"
|
||
|
||
prompt = f"""
|
||
Ты — ассистент для сжатия истории диалога в структурированный summary.
|
||
|
||
## ЗАДАЧА
|
||
Сожми историю диалога в компактный структурированный summary для сохранения контекста.
|
||
|
||
## ВАЖНО СОХРАНИТЬ:
|
||
- 📌 ФАКТЫ: имена, названия, версии, даты, числа
|
||
- 🔗 URL: адреса сайтов, домены, ссылки на репозитории
|
||
- 🖥 СЕРВЕРЫ: IP-адреса, логины, пароли, имена серверов
|
||
- ⚙️ НАСТРОЙКИ: конфигурации, пути к файлам, переменные окружения
|
||
- 📋 РЕШЕНИЯ: договорённости, выводы, утверждённые планы
|
||
- 🎯 ЗАДАЧИ: что планировалось, что сделано, что в процессе
|
||
- 🛠 ИНСТРУМЕНТЫ: названия утилит, команд, библиотек
|
||
|
||
## ФОРМАТ SUMMARY:
|
||
|
||
```
|
||
=== SUMMARY ДИАЛОГА ===
|
||
📅 Период: {datetime.now().strftime('%Y-%m-%d')}
|
||
|
||
🎯 ОСНОВНАЯ ТЕМА:
|
||
[Краткий пересказ основной темы диалога (3-5 предложений)]
|
||
|
||
📌 КЛЮЧЕВЫЕ ФАКТЫ:
|
||
• Факт 1
|
||
• Факт 2
|
||
• ...
|
||
|
||
🔗 URL И РЕСУРСЫ:
|
||
• https://...
|
||
• ...
|
||
|
||
🖥 СЕРВЕРЫ И ДОСТУПЫ:
|
||
• server: IP, login, password
|
||
• ...
|
||
|
||
⚙️ НАСТРОЙКИ И КОНФИГУРАЦИИ:
|
||
• path/to/config: значение
|
||
• ...
|
||
|
||
📋 ПРИНЯТЫЕ РЕШЕНИЯ:
|
||
• Решение 1
|
||
• Решение 2
|
||
• ...
|
||
|
||
🎯 ЗАДАЧИ:
|
||
✅ Сделано: ...
|
||
🔄 В процессе: ...
|
||
⏳ Запланировано: ...
|
||
|
||
🛠 ИНСТРУМЕНТЫ И КОМАНДЫ:
|
||
• команда1 — описание
|
||
• команда2 — описание
|
||
```
|
||
|
||
## ДИАЛОГ ДЛЯ СЖАТИЯ:
|
||
{dialogue_text}
|
||
|
||
## ТРЕБОВАНИЯ:
|
||
1. Будь краток но информативен
|
||
2. Сохраняй все технические детали (команды, пути, URL)
|
||
3. Используй маркированные списки для читаемости
|
||
4. Не добавляй информацию которой не было в диалоге
|
||
5. Выделяй важное эмодзи для быстрого поиска
|
||
|
||
Сожми диалог в summary согласно формату выше:
|
||
"""
|
||
return prompt
|
||
|
||
async def compact(self, keep_last: int = COMPACTION_KEEP_LAST) -> CompactionResult:
|
||
"""
|
||
Выполнить компактификацию истории диалога.
|
||
|
||
Args:
|
||
keep_last: количество последних сообщений для сохранения
|
||
|
||
Returns:
|
||
CompactionResult с результатами операции
|
||
"""
|
||
logger.info(f"Начало компактификации (сохраняем последние {keep_last} сообщений)")
|
||
|
||
try:
|
||
# Получаем старые сообщения
|
||
old_messages = self._get_old_messages(keep_last)
|
||
|
||
if not old_messages:
|
||
logger.info("Нет сообщений для компактификации")
|
||
return CompactionResult(
|
||
success=True,
|
||
messages_compressed=0,
|
||
summary_length=0,
|
||
tokens_saved=0
|
||
)
|
||
|
||
messages_count = len(old_messages)
|
||
logger.info(f"Найдено {messages_count} сообщений для сжатия")
|
||
|
||
# Строим промпт
|
||
prompt = self._build_compaction_prompt(old_messages)
|
||
|
||
# Проверяем наличие Qwen Code
|
||
if not self.qwen_manager:
|
||
logger.error("Qwen manager не инициализирован")
|
||
return CompactionResult(
|
||
success=False,
|
||
error="Qwen manager не инициализирован"
|
||
)
|
||
|
||
# Выполняем сжатие через Qwen Code
|
||
output_parts = []
|
||
|
||
async def on_output(text: str):
|
||
output_parts.append(text)
|
||
|
||
async def on_oauth_url(url: str):
|
||
logger.warning(f"OAuth URL: {url}")
|
||
|
||
logger.info("Запуск Qwen Code для сжатия...")
|
||
|
||
# Запускаем задачу
|
||
await self.qwen_manager.run_task(
|
||
user_id=999, # Системный user_id для компактификации
|
||
task=prompt,
|
||
on_output=on_output,
|
||
on_oauth_url=on_oauth_url,
|
||
use_system_prompt=False # Не добавляем системный промпт бота
|
||
)
|
||
|
||
summary = "".join(output_parts)
|
||
|
||
# Очищаем summary от служебных символов
|
||
summary = summary.strip()
|
||
|
||
if not summary:
|
||
logger.error("Пустой summary после сжатия")
|
||
return CompactionResult(
|
||
success=False,
|
||
error="Пустой summary после сжатия"
|
||
)
|
||
|
||
# Сохраняем summary в ChromaDB
|
||
self._save_summary(summary)
|
||
|
||
# Оцениваем экономию токенов (примерно)
|
||
original_tokens = sum(len(msg["content"]) for msg in old_messages) // 4
|
||
summary_tokens = len(summary) // 4
|
||
tokens_saved = original_tokens - summary_tokens
|
||
|
||
logger.info(
|
||
f"Компактификация завершена: "
|
||
f"сообщений={messages_count}, "
|
||
f"длина summary={len(summary)}, "
|
||
f"экономия токенов≈{tokens_saved}"
|
||
)
|
||
|
||
return CompactionResult(
|
||
success=True,
|
||
messages_compressed=messages_count,
|
||
summary_length=len(summary),
|
||
tokens_saved=tokens_saved
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Ошибка компактификации: {e}", exc_info=True)
|
||
return CompactionResult(
|
||
success=False,
|
||
error=str(e)
|
||
)
|
||
|
||
def get_context_with_summary(self, limit: int = 20) -> Tuple[Optional[str], List[Dict[str, Any]]]:
|
||
"""
|
||
Получить контекст с использованием summary.
|
||
|
||
Args:
|
||
limit: количество последних сообщений для загрузки
|
||
|
||
Returns:
|
||
(summary, recent_messages) кортеж
|
||
"""
|
||
# Получаем summary
|
||
summary = self._get_summary()
|
||
|
||
# Получаем последние сообщения
|
||
self._init_chroma()
|
||
|
||
try:
|
||
result = self._collection.get(
|
||
include=["documents", "metadatas"],
|
||
limit=limit
|
||
)
|
||
|
||
messages = []
|
||
if result["documents"]:
|
||
for i, doc in enumerate(result["documents"]):
|
||
# Пропускаем summary
|
||
if result["metadatas"][i].get("type") == "summary":
|
||
continue
|
||
|
||
messages.append({
|
||
"content": doc,
|
||
"role": result["metadatas"][i].get("role", "unknown"),
|
||
"timestamp": result["metadatas"][i].get("timestamp", ""),
|
||
"user_id": result["metadatas"][i].get("user_id", "unknown")
|
||
})
|
||
|
||
logger.info(f"Загружен контекст: summary={summary is not None}, сообщений={len(messages)}")
|
||
return summary, messages
|
||
|
||
except Exception as e:
|
||
logger.error(f"Ошибка загрузки контекста: {e}")
|
||
return None, []
|
||
|
||
def check_compaction_needed(self, threshold_percent: int = COMPACTION_THRESHOLD_PERCENT) -> bool:
|
||
"""
|
||
Проверить нужна ли компактификация.
|
||
|
||
Args:
|
||
threshold_percent: порог заполненности контекста (%)
|
||
|
||
Returns:
|
||
True если нужна компактификация
|
||
"""
|
||
self._init_chroma()
|
||
|
||
try:
|
||
# Получаем количество сообщений
|
||
result = self._collection.get(include=[])
|
||
|
||
if not result["ids"]:
|
||
return False
|
||
|
||
# Считаем сообщения без summary
|
||
message_count = 0
|
||
for i, id_ in enumerate(result["ids"]):
|
||
if id_ != COMPACTION_SUMMARY_ID:
|
||
message_count += 1
|
||
|
||
# Оцениваем заполненность контекста
|
||
# Примерный расчёт: 1 сообщение ≈ 100 токенов ≈ 400 символов
|
||
# Максимум контекста ≈ 200K токенов ≈ 800K символов
|
||
# Для простоты: 1000 сообщений = 100% контекста
|
||
|
||
context_percent = (message_count / 1000) * 100
|
||
|
||
logger.info(
|
||
f"Проверка компактификации: "
|
||
f"сообщений={message_count}, "
|
||
f"заполненность={context_percent:.1f}%, "
|
||
f"порог={threshold_percent}%"
|
||
)
|
||
|
||
return context_percent >= threshold_percent
|
||
|
||
except Exception as e:
|
||
logger.error(f"Ошибка проверки компактификации: {e}")
|
||
return False
|
||
|
||
|
||
# Глобальный экземпляр для использования в боте
|
||
compactor: Optional[DialogueCompactor] = None
|
||
|
||
|
||
def init_compactor(qwen_manager=None, vector_memory=None) -> DialogueCompactor:
|
||
"""Инициализировать глобальный компактор."""
|
||
global compactor
|
||
compactor = DialogueCompactor(qwen_manager, vector_memory)
|
||
return compactor
|
||
|
||
|
||
def get_compactor() -> DialogueCompactor:
|
||
"""Получить глобальный компактор."""
|
||
if compactor is None:
|
||
raise RuntimeError("Compactor не инициализирован. Вызовите init_compactor().")
|
||
return compactor
|