#!/usr/bin/env python3 """ Система памяти для ИИ-чата на SQLite. Архитектура: 1. SQLite для хранения истории диалогов 2. Извлечение фактов через эвристики 3. Поиск по истории через LIKE Просто и надёжно — без внешних зависимостей. """ import logging import sqlite3 from pathlib import Path from datetime import datetime from dataclasses import dataclass, field from typing import Optional, List, Dict, Any, Tuple from enum import Enum logger = logging.getLogger(__name__) # ============================================================================ # Модели данных # ============================================================================ class FactType(Enum): """Типы извлекаемых фактов.""" PERSONAL = "personal" TECHNICAL = "technical" PROJECT = "project" PREFERENCE = "preference" OTHER = "other" @dataclass class Fact: """Факт о пользователе.""" id: Optional[int] user_id: int fact_type: FactType content: str source_message: str confidence: float created_at: datetime = field(default_factory=datetime.now) updated_at: datetime = field(default_factory=datetime.now) is_active: bool = True @dataclass class Message: """Сообщение диалога.""" id: Optional[int] user_id: int role: str content: str timestamp: datetime = field(default_factory=datetime.now) session_id: Optional[str] = None @dataclass class DialogSession: """Сессия диалога.""" id: str user_id: int started_at: datetime = field(default_factory=datetime.now) ended_at: Optional[datetime] = None message_count: int = 0 summary: Optional[str] = None # ============================================================================ # SQLite хранилище # ============================================================================ class SQLiteMemoryStorage: """ SQLite-хранилище для памяти. """ def __init__(self, db_path: str): self.db_path = db_path self._init_db() def _init_db(self): """Инициализация базы данных.""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS facts ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, fact_type TEXT NOT NULL, content TEXT NOT NULL, source_message TEXT, confidence REAL DEFAULT 0.5, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, is_active BOOLEAN DEFAULT 1 ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, role TEXT NOT NULL, content TEXT NOT NULL, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, session_id TEXT ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS sessions ( id TEXT PRIMARY KEY, user_id INTEGER NOT NULL, started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, ended_at TIMESTAMP, message_count INTEGER DEFAULT 0, summary TEXT ) """) cursor.execute("CREATE INDEX IF NOT EXISTS idx_facts_user ON facts(user_id, is_active)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_messages_user ON messages(user_id, timestamp)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_sessions_user ON sessions(user_id)") conn.commit() conn.close() logger.info(f"Инициализирована БД памяти: {self.db_path}") def _get_connection(self) -> sqlite3.Connection: conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row return conn # --- Факты --- def save_fact(self, fact: Fact) -> int: """Сохранить факт.""" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" INSERT INTO facts (user_id, fact_type, content, source_message, confidence, created_at, updated_at, is_active) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( fact.user_id, fact.fact_type.value, fact.content, fact.source_message, fact.confidence, fact.created_at.isoformat() if fact.created_at else None, fact.updated_at.isoformat() if fact.updated_at else None, 1 if fact.is_active else 0 )) fact_id = cursor.lastrowid conn.commit() conn.close() logger.debug(f"Сохранён факт для пользователя {fact.user_id}: {fact.content[:50]}...") return fact_id def get_facts(self, user_id: int, fact_type: Optional[FactType] = None) -> List[Fact]: """Получить факты пользователя.""" conn = self._get_connection() cursor = conn.cursor() query = "SELECT * FROM facts WHERE user_id = ? AND is_active = 1" params = [user_id] if fact_type: query += " AND fact_type = ?" params.append(fact_type.value) query += " ORDER BY created_at DESC" cursor.execute(query, params) rows = cursor.fetchall() conn.close() facts = [] for row in rows: facts.append(Fact( id=row["id"], user_id=row["user_id"], fact_type=FactType(row["fact_type"]), content=row["content"], source_message=row["source_message"], confidence=row["confidence"], created_at=datetime.fromisoformat(row["created_at"]), updated_at=datetime.fromisoformat(row["updated_at"]), is_active=bool(row["is_active"]) )) return facts def update_fact(self, fact_id: int, content: str = None, confidence: float = None, is_active: bool = None): """Обновить факт.""" conn = self._get_connection() cursor = conn.cursor() updates = [] params = [] if content is not None: updates.append("content = ?") params.append(content) if confidence is not None: updates.append("confidence = ?") params.append(confidence) if is_active is not None: updates.append("is_active = ?") params.append(1 if is_active else 0) if updates: updates.append("updated_at = ?") params.append(datetime.now().isoformat()) params.append(fact_id) query = f"UPDATE facts SET {', '.join(updates)} WHERE id = ?" cursor.execute(query, params) conn.commit() conn.close() # --- Сообщения --- def save_message(self, message: Message) -> int: """Сохранить сообщение.""" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" INSERT INTO messages (user_id, role, content, timestamp, session_id) VALUES (?, ?, ?, ?, ?) """, ( message.user_id, message.role, message.content, message.timestamp.isoformat() if message.timestamp else None, message.session_id )) message_id = cursor.lastrowid # Обновляем счётчик сессии if message.session_id: cursor.execute(""" UPDATE sessions SET message_count = message_count + 1 WHERE id = ? """, (message.session_id,)) conn.commit() conn.close() return message_id def get_recent_messages(self, user_id: int, limit: int = 10) -> List[Message]: """Получить последние сообщения.""" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" SELECT * FROM messages WHERE user_id = ? ORDER BY timestamp DESC LIMIT ? """, (user_id, limit)) rows = cursor.fetchall() conn.close() messages = [] for row in reversed(rows): # Возвращаем в хронологическом порядке messages.append(Message( id=row["id"], user_id=row["user_id"], role=row["role"], content=row["content"], timestamp=datetime.fromisoformat(row["timestamp"]), session_id=row["session_id"] )) return messages def get_messages_by_session(self, session_id: str) -> List[Message]: """Получить сообщения сессии.""" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" SELECT * FROM messages WHERE session_id = ? ORDER BY timestamp ASC """, (session_id,)) rows = cursor.fetchall() conn.close() messages = [] for row in rows: messages.append(Message( id=row["id"], user_id=row["user_id"], role=row["role"], content=row["content"], timestamp=datetime.fromisoformat(row["timestamp"]), session_id=row["session_id"] )) return messages def search_messages(self, user_id: int, query: str, limit: int = 5) -> List[Message]: """ Поиск сообщений по тексту (простой LIKE поиск). Для продакшена лучше использовать FTS5 или векторный поиск. """ conn = self._get_connection() cursor = conn.cursor() # Поиск по содержимому cursor.execute(""" SELECT * FROM messages WHERE user_id = ? AND content LIKE ? ORDER BY timestamp DESC LIMIT ? """, (user_id, f"%{query}%", limit)) rows = cursor.fetchall() conn.close() messages = [] for row in rows: messages.append(Message( id=row["id"], user_id=row["user_id"], role=row["role"], content=row["content"], timestamp=datetime.fromisoformat(row["timestamp"]), session_id=row["session_id"] )) return messages # --- Сессии --- def create_session(self, session: DialogSession) -> str: """Создать сессию.""" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" INSERT INTO sessions (id, user_id, started_at, message_count) VALUES (?, ?, ?, ?) """, ( session.id, session.user_id, session.started_at.isoformat() if session.started_at else None, session.message_count )) conn.commit() conn.close() return session.id def close_session(self, session_id: str, summary: str = None): """Завершить сессию.""" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" UPDATE sessions SET ended_at = ?, summary = ? WHERE id = ? """, (datetime.now().isoformat(), summary, session_id)) conn.commit() conn.close() def get_active_session(self, user_id: int) -> Optional[DialogSession]: """Получить активную сессию пользователя.""" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" SELECT * FROM sessions WHERE user_id = ? AND ended_at IS NULL ORDER BY started_at DESC LIMIT 1 """, (user_id,)) row = cursor.fetchone() conn.close() if row: return DialogSession( id=row["id"], user_id=row["user_id"], started_at=datetime.fromisoformat(row["started_at"]), ended_at=datetime.fromisoformat(row["ended_at"]) if row["ended_at"] else None, message_count=row["message_count"], summary=row["summary"] ) return None def get_user_stats(self, user_id: int) -> Dict[str, Any]: """Получить статистику пользователя.""" conn = self._get_connection() cursor = conn.cursor() # Количество сессий cursor.execute(""" SELECT COUNT(*) FROM sessions WHERE user_id = ? """, (user_id,)) total_sessions = cursor.fetchone()[0] # Количество сообщений cursor.execute(""" SELECT COUNT(*) FROM messages WHERE user_id = ? """, (user_id,)) total_messages = cursor.fetchone()[0] # Количество фактов cursor.execute(""" SELECT COUNT(*) FROM facts WHERE user_id = ? AND is_active = 1 """, (user_id,)) total_facts = cursor.fetchone()[0] conn.close() return { "total_sessions": total_sessions, "total_messages": total_messages, "total_facts": total_facts } # ============================================================================ # Менеджер памяти (основной интерфейс) # ============================================================================ class MemoryManager: """ Менеджер памяти — основной интерфейс для работы с памятью. Координирует: - Сохранение/загрузку фактов - Историю сообщений - Извлечение фактов через ИИ - RAG-поиск """ def __init__(self, storage: SQLiteMemoryStorage, ai_client=None): self.storage = storage self.ai_client = ai_client # Будет использоваться для извлечения фактов self._active_sessions: Dict[int, str] = {} # user_id -> session_id def start_session(self, user_id: int) -> str: """Начать новую сессию.""" import uuid session_id = str(uuid.uuid4()) session = DialogSession(id=session_id, user_id=user_id) self.storage.create_session(session) self._active_sessions[user_id] = session_id logger.info(f"Начата новая сессия {session_id} для пользователя {user_id}") return session_id def end_session(self, user_id: int, summary: str = None): """Завершить сессию.""" session_id = self._active_sessions.pop(user_id, None) if session_id: self.storage.close_session(session_id, summary) logger.info(f"Завершена сессия {session_id} для пользователя {user_id}") def get_session_id(self, user_id: int) -> Optional[str]: """Получить ID текущей сессии.""" # Проверяем кэш if user_id in self._active_sessions: return self._active_sessions[user_id] # Проверяем БД session = self.storage.get_active_session(user_id) if session: self._active_sessions[user_id] = session.id return session.id # Создаём новую return self.start_session(user_id) def add_message(self, user_id: int, role: str, content: str) -> int: """Добавить сообщение.""" session_id = self.get_session_id(user_id) message = Message( id=None, user_id=user_id, role=role, content=content, session_id=session_id ) return self.storage.save_message(message) def get_context(self, user_id: int, max_messages: int = 10) -> List[Message]: """Получить контекст для ИИ (последние сообщения).""" return self.storage.get_recent_messages(user_id, max_messages) # --- Факты --- def get_user_profile(self, user_id: int) -> Dict[FactType, List[str]]: """ Получить профиль пользователя (все активные факты). Возвращает: { FactType.PERSONAL: ["Пользователя зовут Владимир"], FactType.TECHNICAL: ["Использует Python", "Работает с Telegram API"], ... } """ facts = self.storage.get_facts(user_id) profile = {} for fact in facts: if fact.fact_type not in profile: profile[fact.fact_type] = [] profile[fact.fact_type].append(fact.content) return profile def extract_facts_from_message(self, user_id: int, message: str, response: str = None) -> List[Fact]: """ Извлечь факты из сообщения (с помощью ИИ или эвристик). Пока простая реализация на эвристиках. В будущем можно использовать ИИ для анализа. """ extracted_facts = [] message_lower = message.lower() # Эвристики для извлечения фактов fact_candidates = [] # Имя пользователя if "меня зовут" in message_lower: parts = message.split("меня зовут") if len(parts) > 1: name = parts[1].strip().split()[0] fact_candidates.append((FactType.PERSONAL, f"Пользователя зовут {name}", 0.8)) # Предпочтения технологий tech_patterns = [ (r"я (люблю|предпочитаю|использую)\s+(\w+)", "technical"), (r"мой (язык|стек)\s+(\w+)", "technical"), (r"работаю с\s+([\w\s,]+)", "technical"), ] import re for pattern, fact_type in tech_patterns: match = re.search(pattern, message_lower) if match: tech = match.group(2) if len(match.groups()) > 1 else match.group(1) fact_candidates.append((FactType.TECHNICAL, f"Использует {tech}", 0.6)) # Проекты/директории if "мой проект" in message_lower or "проект в" in message_lower: fact_candidates.append((FactType.PROJECT, f"Есть проект, упомянутый в диалоге", 0.5)) # Сохраняем факты с высокой уверенностью for fact_type, content, confidence in fact_candidates: if confidence >= 0.6: fact = Fact( id=None, user_id=user_id, fact_type=fact_type, content=content, source_message=message, confidence=confidence ) self.storage.save_fact(fact) extracted_facts.append(fact) if extracted_facts: logger.info(f"Извлечено {len(extracted_facts)} фактов из сообщения пользователя {user_id}") return extracted_facts # --- RAG-поиск --- def search_relevant_context(self, user_id: int, query: str, max_results: int = 3) -> Tuple[List[Message], List[Fact]]: """ Найти релевантный контекст для запроса. Возвращает: - Сообщения по теме - Факты по теме """ # Поиск в сообщениях relevant_messages = self.storage.search_messages(user_id, query, max_results) # Поиск в фактах (простой поиск по содержимому) all_facts = self.storage.get_facts(user_id) relevant_facts = [] query_lower = query.lower() for fact in all_facts: if query_lower in fact.content.lower() or fact.fact_type.value in query_lower: relevant_facts.append(fact) logger.debug(f"Найдено {len(relevant_messages)} сообщений и {len(relevant_facts)} фактов для запроса: {query[:30]}...") return relevant_messages, relevant_facts def format_context_for_ai(self, user_id: int, query: str = None) -> str: """ Сформировать контекст для передачи ИИ. Включает: - Профиль пользователя - Последние сообщения - Релевантные факты (если есть запрос) """ parts = [] # Профиль пользователя profile = self.get_user_profile(user_id) if profile: parts.append("📋 ПРОФИЛЬ ПОЛЬЗОВАТЕЛЯ:") for fact_type, facts in profile.items(): parts.append(f" [{fact_type.value}]:") for f in facts: parts.append(f" - {f}") # Последние сообщения (контекст диалога) recent_messages = self.storage.get_recent_messages(user_id, 5) if recent_messages: parts.append("\n💬 ПОСЛЕДНИЕ СООБЩЕНИЯ:") for msg in recent_messages: role_ru = "Пользователь" if msg.role == "user" else "Ассистент" parts.append(f" {role_ru}: {msg.content[:100]}...") # Релевантный контекст по запросу if query: relevant_msgs, relevant_facts = self.search_relevant_context(user_id, query) if relevant_facts: parts.append("\n🔍 РЕЛЕВАНТНЫЕ ФАКТЫ:") for f in relevant_facts: parts.append(f" - {f.content}") return "\n".join(parts) def get_stats(self, user_id: int) -> Dict[str, Any]: """Получить статистику памяти пользователя.""" return self.storage.get_user_stats(user_id) # ============================================================================ # Глобальный экземпляр # ============================================================================ # Путь к БД памяти MEMORY_DB_PATH = str(Path(__file__).parent / "memory.db") # Глобальный менеджер памяти memory_manager = MemoryManager(SQLiteMemoryStorage(MEMORY_DB_PATH)) # ============================================================================ # Интеграция с ботом (хелперы для bot.py) # ============================================================================ def format_memory_context(user_id: int, query: str = None) -> str: """ Получить форматированный контекст памяти для ИИ. Используется в qwen_integration.py или при вызове ИИ. """ return memory_manager.format_context_for_ai(user_id, query) def save_ai_message(user_id: int, role: str, content: str): """Сохранить сообщение ИИ-чата.""" memory_manager.add_message(user_id, role, content) # Если сообщение от пользователя — пытаемся извлечь факты if role == "user": memory_manager.extract_facts_from_message(user_id, content) def get_user_profile_summary(user_id: int) -> str: """Получить краткую сводку профиля пользователя.""" profile = memory_manager.get_user_profile(user_id) if not profile: return "" lines = ["Профиль пользователя:"] for fact_type, facts in profile.items(): for f in facts: lines.append(f" • {f}") return "\n".join(lines)