telegram-cli-bot/bot/utils/formatters.py

189 lines
7.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Утилиты для форматирования и отправки сообщений."""
import asyncio
import logging
import re
from typing import List, Tuple
from telegram import Update
logger = logging.getLogger(__name__)
# Лимиты Telegram
MAX_MESSAGE_LENGTH = 4096 # Максимальная длина сообщения
RESERVED_FOR_HEADER = 20 # Резервируем место для "(N/N) "
def escape_markdown(text: str) -> str:
"""
Экранирование специальных символов Markdown для Telegram API.
"""
text = text.replace('```', '\\`\\`\\`')
return text
def find_code_blocks(text: str) -> List[Tuple[int, int]]:
"""
Найти все блоки кода (```) в тексте.
Возвращает список кортежей (start, end) для каждого блока.
"""
blocks = []
pattern = re.compile(r'```')
matches = list(pattern.finditer(text))
# Пары start-end для каждого блока
i = 0
while i < len(matches) - 1:
start = matches[i].start()
end = matches[i + 1].end()
blocks.append((start, end))
i += 2
return blocks
def split_message(text: str, max_length: int = MAX_MESSAGE_LENGTH) -> List[Tuple[str, bool]]:
"""
Умно разбить длинный текст на сообщения <= max_length символов.
Возвращает список кортежей (text, has_markdown):
- text: текст сообщения
- has_markdown: True если сообщение содержит блоки кода
Алгоритм:
1. Находим все блоки кода
2. Стараемся не разрывать блоки кода
3. Если блок кода не влезает — отправляем отдельным сообщением без Markdown
"""
if len(text) <= max_length:
return [(text, '```' in text)]
parts = []
code_blocks = find_code_blocks(text)
# Текущая позиция в тексте
pos = 0
current = ""
current_has_code = False
for block_start, block_end in code_blocks:
# Текст до блока кода
before_code = text[pos:block_start]
# Сам блок кода (включая ```)
code_block = text[block_start:block_end]
# Обрабатываем текст до блока
for line in before_code.split('\n'):
if len(current) + len(line) + 1 > max_length - RESERVED_FOR_HEADER:
if current:
parts.append((current, current_has_code))
current = line
current_has_code = False
else:
current += ('\n' if current else '') + line
# Проверяем влезет ли блок кода
if len(current) + len(code_block) + 1 > max_length - RESERVED_FOR_HEADER:
# Отправляем что накопилось
if current:
parts.append((current, current_has_code))
# Если блок кода слишком длинный — режем его на части
if len(code_block) > max_length - RESERVED_FOR_HEADER:
# Отправляем блок кода частями без Markdown
for i in range(0, len(code_block), max_length - RESERVED_FOR_HEADER):
chunk = code_block[i:i + max_length - RESERVED_FOR_HEADER]
parts.append((chunk, False)) # Без Markdown!
current = ""
current_has_code = False
else:
# Блок влезает в следующее сообщение
current = code_block
current_has_code = True
else:
# Блок кода влезает в текущее сообщение
current += ('\n' if current else '') + code_block
current_has_code = True
pos = block_end
# Обрабатываем остаток текста после последнего блока
if pos < len(text):
remaining = text[pos:]
for line in remaining.split('\n'):
if len(current) + len(line) + 1 > max_length - RESERVED_FOR_HEADER:
if current:
parts.append((current, current_has_code))
current = line
current_has_code = False
else:
current += ('\n' if current else '') + line
if current:
parts.append((current, current_has_code))
return parts
async def send_long_message(update: Update, text: str, parse_mode: str = None):
"""
Отправить длинный текст, разбив на несколько сообщений.
Умная разбивка:
- Блоки кода не разрываются между сообщениями
- Если блок кода не влезает — отправляется без Markdown
- Нумерация (1/3), (2/3) только если сообщений > 1
"""
parts = split_message(text)
total = len(parts)
for i, (part, has_markdown) in enumerate(parts):
# Добавляем номер части если их несколько
if total > 1:
header = f"({i+1}/{total}) "
if len(header) + len(part) <= MAX_MESSAGE_LENGTH:
part = header + part
# Определяем parse_mode для этого сообщения
# Если у сообщения есть блоки кода — используем Markdown
# Если нет — отправляем без разметки (безопаснее)
if has_markdown and parse_mode:
actual_parse_mode = parse_mode
else:
actual_parse_mode = None
try:
await update.message.reply_text(part, parse_mode=actual_parse_mode)
except Exception as e:
# Фоллбэк: отправляем без разметки
logger.debug(f"Ошибка Markdown, отправляем без разметки: {e}")
await update.message.reply_text(part)
# Небольшая пауза между сообщениями
await asyncio.sleep(0.1)
def format_long_output(text: str, max_lines: int = 20, head_lines: int = 10, tail_lines: int = 10) -> str:
"""
Форматировать длинный вывод: показать первые и последние строки.
По умолчанию: первые 10 + последние 10 строк = 20 строк максимум.
"""
lines = text.split('\n')
total_lines = len(lines)
if total_lines <= max_lines:
return text
# Показываем первые head_lines и последние tail_lines
head = lines[:head_lines]
tail = lines[-tail_lines:]
skipped = total_lines - head_lines - tail_lines
result = '\n'.join(head)
result += f'\n\n... ({skipped} строк пропущено) ...\n'
result += '\n'.join(tail)
return result