#!/usr/bin/env python3 """Утилиты для форматирования и отправки сообщений.""" import asyncio import logging import re from typing import List, Tuple from telegram import Update logger = logging.getLogger(__name__) # Лимиты Telegram MAX_MESSAGE_LENGTH = 4096 # Максимальная длина сообщения RESERVED_FOR_HEADER = 20 # Резервируем место для "(N/N) " def escape_code_block_content(text: str) -> str: """ Экранировать спецсимволы Markdown внутри блоков кода. Нужно для случаев когда сообщение содержит ```блок кода``` и текст с Markdown. Telegram пытается интерпретировать [text](url) внутри блока кода как ссылку. """ # Находим все блоки кода parts = text.split("```") result_parts = [] for i, part in enumerate(parts): if i % 2 == 1: # Внутри блока кода — экранируем [ и ] part = part.replace('[', '\\[').replace(']', '\\]') result_parts.append(part) return "```".join(result_parts) def escape_markdown(text: str) -> str: """ Экранирование специальных символов Markdown для Telegram API. Telegram Markdown v1 использует: * _ ` [ ] ( ) Эти символы нужно экранировать обратным слэшем. ВАЖНО: Не экранирует содержимое блоков кода (```). """ if not text: return text # Разбиваем текст на части: внутри и снаружи блоков кода parts = [] last_end = 0 in_code = False # Находим все блоки кода code_pattern = re.compile(r'```') matches = list(code_pattern.finditer(text)) # Обрабатываем текст по частям result = [] i = 0 while i < len(text): # Проверяем не находимся ли внутри блока кода # Ищем ближайший ``` от текущей позиции remaining = text[i:] code_match = re.search(r'```', remaining) if code_match: # Есть блок кода впереди code_start = i + code_match.start() # Экранируем текст ДО блока кода text_to_escape = text[i:code_start] if text_to_escape and not in_code: text_to_escape = _escape_markdown_chars(text_to_escape) result.append(text_to_escape) # Добавляем сам блок кода (без экранирования) result.append('```') i = code_start + 3 in_code = not in_code else: # Нет больше блоков кода remaining_text = text[i:] if remaining_text and not in_code: result.append(_escape_markdown_chars(remaining_text)) else: result.append(remaining_text) break return ''.join(result) def _escape_markdown_chars(text: str) -> str: """Экранировать специальные символы Markdown (вспомогательная функция).""" # Порядок важен: сначала экранируем обратные слэши text = text.replace('\\', '\\\\') text = text.replace('`', '\\`') text = text.replace('*', '\\*') text = text.replace('_', '\\_') text = text.replace('[', '\\[') text = text.replace(']', '\\]') text = text.replace('(', '\\(') text = text.replace(')', '\\)') return text def find_code_blocks(text: str) -> List[Tuple[int, int]]: """ Найти все блоки кода (```) в тексте. Возвращает список кортежей (start, end) для каждого блока. """ blocks = [] pattern = re.compile(r'```') matches = list(pattern.finditer(text)) # Пары start-end для каждого блока i = 0 while i < len(matches) - 1: start = matches[i].start() end = matches[i + 1].end() blocks.append((start, end)) i += 2 return blocks def split_message(text: str, max_length: int = MAX_MESSAGE_LENGTH) -> List[Tuple[str, bool, bool, bool]]: """ Умно разбить длинный текст на сообщения <= max_length символов. Возвращает список кортежей (text, has_code, code_opened, code_closed): - text: текст сообщения - has_code: True если сообщение содержит часть блока кода - code_opened: True если блок кода ОТКРЫТ в этом сообщении (есть открывающий ```) - code_closed: True если блок кода ЗАКРЫТ в этом сообщении (есть закрывающий ```) Алгоритм: 1. Разбиваем текст на строки 2. Накапливаем строки до достижения лимита 3. Отслеживаем состояние блока кода (внутри/снаружи) """ def calc_code_flags(txt: str) -> Tuple[bool, bool, bool]: """Вычислить флаги code для данного текста.""" has_code = '```' in txt backtick_count = txt.count('```') code_opened = backtick_count >= 1 code_closed = backtick_count >= 2 or (backtick_count == 1 and txt.rstrip().endswith('```')) return has_code, code_opened, code_closed if len(text) <= max_length: has_code, code_opened, code_closed = calc_code_flags(text) return [(text, has_code, code_opened, code_closed)] parts = [] lines = text.split('\n') current = "" in_code_block = False # Состояние: внутри блока кода или нет for line in lines: # Проверяем, содержит ли строка ``` backticks_in_line = line.count('```') # Если строка содержит нечётное количество ```, она меняет состояние if backticks_in_line % 2 == 1: # Эта строка содержит ``` который меняет состояние if in_code_block: # Были внутри блока — эта строка закрывает его test_line = current + ('\n' if current else '') + line if len(test_line) > max_length - RESERVED_FOR_HEADER: # Сначала отправляем текущее (блок не закрыт в этой части!) if current: has_code, code_opened, _ = calc_code_flags(current) # code_closed=False потому что блок продолжится parts.append((current, has_code, code_opened, False)) current = line else: current = test_line in_code_block = False else: # Были снаружи — эта строка открывает блок test_line = current + ('\n' if current else '') + line if len(test_line) > max_length - RESERVED_FOR_HEADER: if current: has_code, code_opened, code_closed = calc_code_flags(current) parts.append((current, has_code, code_opened, code_closed)) current = line else: current = test_line in_code_block = True else: # Строка не меняет состояние test_line = current + ('\n' if current else '') + line if len(test_line) > max_length - RESERVED_FOR_HEADER: if current: has_code, code_opened, _ = calc_code_flags(current) # Если мы внутри блока кода — block не закрыт code_closed = not in_code_block parts.append((current, has_code, code_opened, code_closed)) current = line else: current = test_line if current: has_code, code_opened, _ = calc_code_flags(current) code_closed = not in_code_block parts.append((current, has_code, code_opened, code_closed)) return parts async def send_long_message(update: Update, text: str, parse_mode: str = None, pause_every: int = 3, start_from: int = 0): """ Отправить длинный текст, разбив на несколько сообщений. Поддерживает: - Update с update.message (обычные сообщения) - CallbackQuery (query.edit_message_text / query.message.reply_text) Args: start_from: Номер сообщения с которого начать (для продолжения после кнопки) """ from telegram import InlineKeyboardButton, InlineKeyboardMarkup from bot.config import state_manager # Определяем тип объекта и получаем метод для отправки # CallbackQuery имеет from_user и answer(), но не имеет message.reply_text is_callback_query = hasattr(update, 'answer') and hasattr(update, 'from_user') if is_callback_query: query = update message = query.message send_method = message.reply_text if message else query.edit_message_text user_id = query.from_user.id # Для CallbackQuery используем from_user else: message = update.message send_method = message.reply_text if message else None user_id = update.effective_user.id # Для Update используем effective_user if not send_method: logger.error("send_long_message: не удалось определить метод отправки") return False parts = split_message(text) total = len(parts) state = state_manager.get(user_id) # Начинаем с указанного сообщения for i in range(start_from, total): part, has_code, code_opened, code_closed = parts[i] # Проверяем был ли блок кода открыт в предыдущем сообщении prev_closed = parts[i-1][3] if i > 0 else True # Определяем находимся ли внутри блока кода (между ``` и ```) in_code_block = not prev_closed or (code_opened and not code_closed) # Проверяем будем ли добавлять ``` к этому сообщению will_add_opening = total > 1 and i > 0 and not prev_closed and not code_closed will_add_closing = total > 1 and i < total - 1 and not code_closed # КЛЮЧЕВОЕ ИСПРАВЛЕНИЕ: # Отключаем parse_mode если: # 1. Это промежуточная часть блока кода (in_code_block=True, но нет ``` в этом сообщении) # 2. Это сообщение содержит ``` (has_code=True) # 3. Мы добавляем искусственные ``` (will_add_opening или will_add_closing) if in_code_block or has_code or will_add_opening or will_add_closing: actual_parse_mode = None else: # Обычный текст без блоков кода — используем Markdown actual_parse_mode = parse_mode # Логика работы с блоками кода между сообщениями if total > 1 and i > 0 and not prev_closed: part = "```\n" + part if total > 1 and i < total - 1 and not code_closed: part = part + "\n```" # Добавляем номер части if total > 1: header = f"({i+1}/{total}) " if len(header) + len(part) <= MAX_MESSAGE_LENGTH: part = header + part try: await send_method(part, parse_mode=actual_parse_mode) except Exception as e: logger.debug(f"Ошибка Markdown, отправляем без разметки: {e}") await send_method(part) await asyncio.sleep(0.1) # КАЖДЫЕ pause_every сообщений — спрашивать продолжать ли if pause_every > 0 and (i + 1) % pause_every == 0 and i < total - 1: remaining = total - (i + 1) keyboard = InlineKeyboardMarkup([ [ InlineKeyboardButton("▶️ Продолжить", callback_data=f"continue_output_{remaining}_{i+1}"), InlineKeyboardButton("❌ Отменить", callback_data="cancel_output") ] ]) wait_msg = await send_method( f"📊 **Отправлено {i + 1} из {total} сообщений**\n\n" f"Осталось ещё {remaining} сообщений.\n\n" f"Продолжить вывод?", parse_mode="Markdown", reply_markup=keyboard ) # Сохраняем состояние и ВОЗВРАЩАЕМ УПРАВЛЕНИЕ state.waiting_for_output_control = True state.output_remaining = remaining state.output_wait_message = wait_msg state.output_next_index = i + 1 # С какого сообщения продолжить state.output_text = text # Сохраняем текст для продолжения state.output_parse_mode = parse_mode logger.info(f"send_long_message: пауза после {i+1}/{total}, ждём кнопки (user_id={user_id})") return True # Возвращаем True — есть продолжение # Все сообщения отправлены state.waiting_for_output_control = False state.output_remaining = None state.output_wait_message = None state.output_next_index = None state.output_text = None return False # Возвращаем False — продолжения нет def format_long_output(text: str, max_lines: int = 100, head_lines: int = 50, tail_lines: int = 50) -> str: """ Форматировать длинный вывод: показать первые и последние строки. По умолчанию: первые 50 + последние 50 строк = 100 строк максимум. """ lines = text.split('\n') total_lines = len(lines) if total_lines <= max_lines: return text # Показываем первые head_lines и последние tail_lines head = lines[:head_lines] tail = lines[-tail_lines:] skipped = total_lines - head_lines - tail_lines result = '\n'.join(head) result += f'\n\n... ({skipped} строк пропущено) ...\n' result += '\n'.join(tail) return result