#!/usr/bin/env python3 """Модели состояния пользователя.""" from typing import Dict, List, Optional, Any from dataclasses import dataclass, field # Пресеты AI-провайдеров AI_PRESET_OFF = "off" # ИИ отключен, режим CLI команд AI_PRESET_QWEN = "qwen" # Qwen Code (бесплатно, локально) AI_PRESET_GIGA_AUTO = "giga_auto" # GigaChat авто-переключение (Lite/Pro) AI_PRESET_GIGA_LITE = "giga_lite" # GigaChat Lite (дешевле) AI_PRESET_GIGA_PRO = "giga_pro" # GigaChat Pro (максимальное качество) @dataclass class UserState: """Состояние пользователя в диалоге.""" current_menu: str = "main" waiting_for_input: bool = False input_type: Optional[str] = None # "name", "host", "port", "user", "tags", "server_action" parent_menu: Optional[str] = None context: Dict[str, Any] = field(default_factory=dict) working_directory: Optional[str] = None current_server: str = "local" # Имя текущего сервера editing_server: Optional[str] = None # Имя сервера, который редактируем ai_chat_mode: bool = True # Режим чата с ИИ агентом (включен по умолчанию) ai_chat_history: List[str] = field(default_factory=list) # История диалога с ИИ messages_since_fact_extract: int = 0 # Счётчик для извлечения фактов ai_preset: str = AI_PRESET_QWEN # Текущий AI-пресет current_ai_provider: str = "qwen" # Текущий AI-провайдер (для совместимости) # Для управления длинным выводом waiting_for_output_control: bool = False # Ожидание решения пользователя output_remaining: int = 0 # Сколько сообщений осталось output_wait_message = None # Сообщение с кнопками output_continue_event = None # asyncio.Event для разблокировки continue_output: bool = True # Решение пользователя output_next_index: Optional[int] = None # Индекс следующего сообщения для отправки output_text: Optional[str] = None # Текст для продолжения отправки output_parse_mode: Optional[str] = None # Parse mode для продолжения # Для команды /restart_bot waiting_for_restart_password: bool = False # Ожидание пароля sudo для перезапуска class StateManager: """Управление состояниями пользователей.""" def __init__(self): self._states: Dict[int, UserState] = {} def get(self, user_id: int) -> UserState: if user_id not in self._states: self._states[user_id] = UserState() return self._states[user_id] def reset(self, user_id: int): self._states[user_id] = UserState()