#!/usr/bin/env python3 """Утилиты для форматирования и отправки сообщений.""" import asyncio import logging import re from typing import List, Tuple from telegram import Update logger = logging.getLogger(__name__) # Лимиты Telegram MAX_MESSAGE_LENGTH = 4096 # Максимальная длина сообщения RESERVED_FOR_HEADER = 20 # Резервируем место для "(N/N) " def escape_markdown(text: str) -> str: """ Экранирование специальных символов Markdown для Telegram API. """ text = text.replace('```', '\\`\\`\\`') return text def find_code_blocks(text: str) -> List[Tuple[int, int]]: """ Найти все блоки кода (```) в тексте. Возвращает список кортежей (start, end) для каждого блока. """ blocks = [] pattern = re.compile(r'```') matches = list(pattern.finditer(text)) # Пары start-end для каждого блока i = 0 while i < len(matches) - 1: start = matches[i].start() end = matches[i + 1].end() blocks.append((start, end)) i += 2 return blocks def split_message(text: str, max_length: int = MAX_MESSAGE_LENGTH) -> List[Tuple[str, bool]]: """ Умно разбить длинный текст на сообщения <= max_length символов. Возвращает список кортежей (text, has_markdown): - text: текст сообщения - has_markdown: True если сообщение содержит блоки кода Алгоритм: 1. Находим все блоки кода 2. Стараемся не разрывать блоки кода 3. Если блок кода не влезает — отправляем отдельным сообщением без Markdown """ if len(text) <= max_length: return [(text, '```' in text)] parts = [] code_blocks = find_code_blocks(text) # Текущая позиция в тексте pos = 0 current = "" current_has_code = False for block_start, block_end in code_blocks: # Текст до блока кода before_code = text[pos:block_start] # Сам блок кода (включая ```) code_block = text[block_start:block_end] # Обрабатываем текст до блока for line in before_code.split('\n'): if len(current) + len(line) + 1 > max_length - RESERVED_FOR_HEADER: if current: parts.append((current, current_has_code)) current = line current_has_code = False else: current += ('\n' if current else '') + line # Проверяем влезет ли блок кода if len(current) + len(code_block) + 1 > max_length - RESERVED_FOR_HEADER: # Отправляем что накопилось if current: parts.append((current, current_has_code)) # Если блок кода слишком длинный — режем его на части if len(code_block) > max_length - RESERVED_FOR_HEADER: # Отправляем блок кода частями без Markdown for i in range(0, len(code_block), max_length - RESERVED_FOR_HEADER): chunk = code_block[i:i + max_length - RESERVED_FOR_HEADER] parts.append((chunk, False)) # Без Markdown! current = "" current_has_code = False else: # Блок влезает в следующее сообщение current = code_block current_has_code = True else: # Блок кода влезает в текущее сообщение current += ('\n' if current else '') + code_block current_has_code = True pos = block_end # Обрабатываем остаток текста после последнего блока if pos < len(text): remaining = text[pos:] for line in remaining.split('\n'): if len(current) + len(line) + 1 > max_length - RESERVED_FOR_HEADER: if current: parts.append((current, current_has_code)) current = line current_has_code = False else: current += ('\n' if current else '') + line if current: parts.append((current, current_has_code)) return parts async def send_long_message(update: Update, text: str, parse_mode: str = None): """ Отправить длинный текст, разбив на несколько сообщений. Умная разбивка: - Блоки кода не разрываются между сообщениями - Если блок кода не влезает — отправляется без Markdown - Нумерация (1/3), (2/3) только если сообщений > 1 """ parts = split_message(text) total = len(parts) for i, (part, has_code) in enumerate(parts): # Добавляем номер части если их несколько if total > 1: header = f"({i+1}/{total}) " if len(header) + len(part) <= MAX_MESSAGE_LENGTH: part = header + part # Определяем parse_mode для этого сообщения # Если передан parse_mode и нет проблем с блоками кода — используем его # Если блок кода разорван — отправляем без Markdown для этой части if parse_mode and has_code: # Сообщение содержит полный блок кода — используем Markdown actual_parse_mode = parse_mode elif parse_mode and not has_code: # Сообщение без блоков кода — всё равно используем Markdown для другого форматирования actual_parse_mode = parse_mode else: # Нет parse_mode или проблемы с кодом actual_parse_mode = None try: await update.message.reply_text(part, parse_mode=actual_parse_mode) except Exception as e: # Фоллбэк: отправляем без разметки logger.debug(f"Ошибка Markdown, отправляем без разметки: {e}") await update.message.reply_text(part) # Небольшая пауза между сообщениями await asyncio.sleep(0.1) def format_long_output(text: str, max_lines: int = 20, head_lines: int = 10, tail_lines: int = 10) -> str: """ Форматировать длинный вывод: показать первые и последние строки. По умолчанию: первые 10 + последние 10 строк = 20 строк максимум. """ lines = text.split('\n') total_lines = len(lines) if total_lines <= max_lines: return text # Показываем первые head_lines и последние tail_lines head = lines[:head_lines] tail = lines[-tail_lines:] skipped = total_lines - head_lines - tail_lines result = '\n'.join(head) result += f'\n\n... ({skipped} строк пропущено) ...\n' result += '\n'.join(tail) return result