194 lines
7.7 KiB
Python
194 lines
7.7 KiB
Python
#!/usr/bin/env python3
|
||
"""Утилиты для форматирования и отправки сообщений."""
|
||
|
||
import asyncio
|
||
import logging
|
||
import re
|
||
from typing import List, Tuple
|
||
from telegram import Update
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Лимиты Telegram
|
||
MAX_MESSAGE_LENGTH = 4096 # Максимальная длина сообщения
|
||
RESERVED_FOR_HEADER = 20 # Резервируем место для "(N/N) "
|
||
|
||
|
||
def escape_markdown(text: str) -> str:
|
||
"""
|
||
Экранирование специальных символов Markdown для Telegram API.
|
||
"""
|
||
text = text.replace('```', '\\`\\`\\`')
|
||
return text
|
||
|
||
|
||
def find_code_blocks(text: str) -> List[Tuple[int, int]]:
|
||
"""
|
||
Найти все блоки кода (```) в тексте.
|
||
Возвращает список кортежей (start, end) для каждого блока.
|
||
"""
|
||
blocks = []
|
||
pattern = re.compile(r'```')
|
||
matches = list(pattern.finditer(text))
|
||
|
||
# Пары start-end для каждого блока
|
||
i = 0
|
||
while i < len(matches) - 1:
|
||
start = matches[i].start()
|
||
end = matches[i + 1].end()
|
||
blocks.append((start, end))
|
||
i += 2
|
||
|
||
return blocks
|
||
|
||
|
||
def split_message(text: str, max_length: int = MAX_MESSAGE_LENGTH) -> List[Tuple[str, bool]]:
|
||
"""
|
||
Умно разбить длинный текст на сообщения <= max_length символов.
|
||
|
||
Возвращает список кортежей (text, has_markdown):
|
||
- text: текст сообщения
|
||
- has_markdown: True если сообщение содержит блоки кода
|
||
|
||
Алгоритм:
|
||
1. Находим все блоки кода
|
||
2. Стараемся не разрывать блоки кода
|
||
3. Если блок кода не влезает — отправляем отдельным сообщением без Markdown
|
||
"""
|
||
if len(text) <= max_length:
|
||
return [(text, '```' in text)]
|
||
|
||
parts = []
|
||
code_blocks = find_code_blocks(text)
|
||
|
||
# Текущая позиция в тексте
|
||
pos = 0
|
||
current = ""
|
||
current_has_code = False
|
||
|
||
for block_start, block_end in code_blocks:
|
||
# Текст до блока кода
|
||
before_code = text[pos:block_start]
|
||
|
||
# Сам блок кода (включая ```)
|
||
code_block = text[block_start:block_end]
|
||
|
||
# Обрабатываем текст до блока
|
||
for line in before_code.split('\n'):
|
||
if len(current) + len(line) + 1 > max_length - RESERVED_FOR_HEADER:
|
||
if current:
|
||
parts.append((current, current_has_code))
|
||
current = line
|
||
current_has_code = False
|
||
else:
|
||
current += ('\n' if current else '') + line
|
||
|
||
# Проверяем влезет ли блок кода
|
||
if len(current) + len(code_block) + 1 > max_length - RESERVED_FOR_HEADER:
|
||
# Отправляем что накопилось
|
||
if current:
|
||
parts.append((current, current_has_code))
|
||
|
||
# Если блок кода слишком длинный — режем его на части
|
||
if len(code_block) > max_length - RESERVED_FOR_HEADER:
|
||
# Отправляем блок кода частями без Markdown
|
||
for i in range(0, len(code_block), max_length - RESERVED_FOR_HEADER):
|
||
chunk = code_block[i:i + max_length - RESERVED_FOR_HEADER]
|
||
parts.append((chunk, False)) # Без Markdown!
|
||
current = ""
|
||
current_has_code = False
|
||
else:
|
||
# Блок влезает в следующее сообщение
|
||
current = code_block
|
||
current_has_code = True
|
||
else:
|
||
# Блок кода влезает в текущее сообщение
|
||
current += ('\n' if current else '') + code_block
|
||
current_has_code = True
|
||
|
||
pos = block_end
|
||
|
||
# Обрабатываем остаток текста после последнего блока
|
||
if pos < len(text):
|
||
remaining = text[pos:]
|
||
for line in remaining.split('\n'):
|
||
if len(current) + len(line) + 1 > max_length - RESERVED_FOR_HEADER:
|
||
if current:
|
||
parts.append((current, current_has_code))
|
||
current = line
|
||
current_has_code = False
|
||
else:
|
||
current += ('\n' if current else '') + line
|
||
|
||
if current:
|
||
parts.append((current, current_has_code))
|
||
|
||
return parts
|
||
|
||
|
||
async def send_long_message(update: Update, text: str, parse_mode: str = None):
|
||
"""
|
||
Отправить длинный текст, разбив на несколько сообщений.
|
||
|
||
Умная разбивка:
|
||
- Блоки кода не разрываются между сообщениями
|
||
- Если блок кода не влезает — отправляется без Markdown
|
||
- Нумерация (1/3), (2/3) только если сообщений > 1
|
||
"""
|
||
parts = split_message(text)
|
||
total = len(parts)
|
||
|
||
for i, (part, has_code) in enumerate(parts):
|
||
# Добавляем номер части если их несколько
|
||
if total > 1:
|
||
header = f"({i+1}/{total}) "
|
||
if len(header) + len(part) <= MAX_MESSAGE_LENGTH:
|
||
part = header + part
|
||
|
||
# Определяем parse_mode для этого сообщения
|
||
# Если передан parse_mode и нет проблем с блоками кода — используем его
|
||
# Если блок кода разорван — отправляем без Markdown для этой части
|
||
if parse_mode and has_code:
|
||
# Сообщение содержит полный блок кода — используем Markdown
|
||
actual_parse_mode = parse_mode
|
||
elif parse_mode and not has_code:
|
||
# Сообщение без блоков кода — всё равно используем Markdown для другого форматирования
|
||
actual_parse_mode = parse_mode
|
||
else:
|
||
# Нет parse_mode или проблемы с кодом
|
||
actual_parse_mode = None
|
||
|
||
try:
|
||
await update.message.reply_text(part, parse_mode=actual_parse_mode)
|
||
except Exception as e:
|
||
# Фоллбэк: отправляем без разметки
|
||
logger.debug(f"Ошибка Markdown, отправляем без разметки: {e}")
|
||
await update.message.reply_text(part)
|
||
|
||
# Небольшая пауза между сообщениями
|
||
await asyncio.sleep(0.1)
|
||
|
||
|
||
def format_long_output(text: str, max_lines: int = 20, head_lines: int = 10, tail_lines: int = 10) -> str:
|
||
"""
|
||
Форматировать длинный вывод: показать первые и последние строки.
|
||
По умолчанию: первые 10 + последние 10 строк = 20 строк максимум.
|
||
"""
|
||
lines = text.split('\n')
|
||
total_lines = len(lines)
|
||
|
||
if total_lines <= max_lines:
|
||
return text
|
||
|
||
# Показываем первые head_lines и последние tail_lines
|
||
head = lines[:head_lines]
|
||
tail = lines[-tail_lines:]
|
||
|
||
skipped = total_lines - head_lines - tail_lines
|
||
|
||
result = '\n'.join(head)
|
||
result += f'\n\n... ({skipped} строк пропущено) ...\n'
|
||
result += '\n'.join(tail)
|
||
|
||
return result
|