telegram-cli-bot/bot/utils/formatters.py

336 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Утилиты для форматирования и отправки сообщений."""
import asyncio
import logging
import re
from typing import List, Tuple
from telegram import Update
logger = logging.getLogger(__name__)
# Лимиты Telegram
MAX_MESSAGE_LENGTH = 4096 # Максимальная длина сообщения
RESERVED_FOR_HEADER = 20 # Резервируем место для "(N/N) "
def escape_markdown(text: str) -> str:
"""
Экранирование специальных символов Markdown для Telegram API.
Telegram Markdown v1 использует: * _ ` [ ] ( )
Эти символы нужно экранировать обратным слэшем.
"""
if not text:
return text
# Экранируем специальные символы Markdown
# Порядок важен: сначала экранируем обратные слэши
text = text.replace('\\', '\\\\')
text = text.replace('`', '\\`')
text = text.replace('*', '\\*')
text = text.replace('_', '\\_')
text = text.replace('[', '\\[')
text = text.replace(']', '\\]')
text = text.replace('(', '\\(')
text = text.replace(')', '\\)')
return text
def find_code_blocks(text: str) -> List[Tuple[int, int]]:
"""
Найти все блоки кода (```) в тексте.
Возвращает список кортежей (start, end) для каждого блока.
"""
blocks = []
pattern = re.compile(r'```')
matches = list(pattern.finditer(text))
# Пары start-end для каждого блока
i = 0
while i < len(matches) - 1:
start = matches[i].start()
end = matches[i + 1].end()
blocks.append((start, end))
i += 2
return blocks
def split_message(text: str, max_length: int = MAX_MESSAGE_LENGTH) -> List[Tuple[str, bool, bool, bool]]:
"""
Умно разбить длинный текст на сообщения <= max_length символов.
Возвращает список кортежей (text, has_code, code_opened, code_closed):
- text: текст сообщения
- has_code: True если сообщение содержит часть блока кода
- code_opened: True если блок кода ОТКРЫТ в этом сообщении (есть открывающий ```)
- code_closed: True если блок кода ЗАКРЫТ в этом сообщении (есть закрывающий ```)
Алгоритм:
1. Находим все блоки кода
2. Стараемся не разрывать блоки кода
3. Если блок кода не влезает — отправляем отдельным сообщением без Markdown
"""
def calc_code_flags(txt: str) -> Tuple[bool, bool, bool]:
"""Вычислить флаги code для данного текста."""
has_code = '```' in txt
backtick_count = txt.count('```')
# code_opened: есть хотя бы один ```
code_opened = backtick_count >= 1
# code_closed: есть хотя бы 2 ``` (пара) или нечётное количество (открыт и закрыт в одном)
code_closed = backtick_count >= 2
return has_code, code_opened, code_closed
if len(text) <= max_length:
has_code, code_opened, code_closed = calc_code_flags(text)
return [(text, has_code, code_opened, code_closed)]
parts = []
code_blocks = find_code_blocks(text)
# Текущая позиция в тексте
pos = 0
current = ""
for block_start, block_end in code_blocks:
# Текст до блока кода
before_code = text[pos:block_start]
# Сам блок кода (включая ```)
code_block = text[block_start:block_end]
# Обрабатываем текст до блока
for line in before_code.split('\n'):
if len(current) + len(line) + 1 > max_length - RESERVED_FOR_HEADER:
if current:
has_code, code_opened, code_closed = calc_code_flags(current)
parts.append((current, has_code, code_opened, code_closed))
current = line
else:
current += ('\n' if current else '') + line
# Проверяем влезет ли блок кода
if len(current) + len(code_block) + 1 > max_length - RESERVED_FOR_HEADER:
# Отправляем что накопилось
if current:
has_code, code_opened, code_closed = calc_code_flags(current)
parts.append((current, has_code, code_opened, code_closed))
# Если блок кода слишком длинный — режем его на части
if len(code_block) > max_length - RESERVED_FOR_HEADER:
# Блок кода включает ```, нужно резать правильно
# block_start и block_end включают оба ```
# Находим позицию открывающего и закрывающего ```
open_end = code_block.find('```', 3) + 3 # Конец открывающего ```
close_start = code_block.rfind('```') # Начало закрывающего ```
# Содержимое блока без ```
code_content = code_block[3:close_start] # Только содержимое
# Режем содержимое на части
content_max_len = max_length - RESERVED_FOR_HEADER - 3 # -3 для ```
chunks = []
for i in range(0, len(code_content), content_max_len):
chunks.append(code_content[i:i + content_max_len])
# Создаём части с правильными ```
for i, chunk in enumerate(chunks):
if i == 0 and len(chunks) == 1:
# Один чанк — полный блок
full_block = f"```{chunk}```"
parts.append((full_block, True, True, True))
elif i == 0:
# Первая часть — только открывающий ```
first_part = f"```{chunk}"
parts.append((first_part, True, True, False))
elif i == len(chunks) - 1:
# Последняя часть — только закрывающий ```
last_part = f"{chunk}```"
parts.append((last_part, True, False, True))
else:
# Средняя часть — без ```
parts.append((chunk, False, False, False))
current = ""
else:
# Блок влезает в следующее сообщение
current = code_block
else:
# Блок кода влезает в текущее сообщение
current += ('\n' if current else '') + code_block
pos = block_end
# Обрабатываем остаток текста после последнего блока
if pos < len(text):
remaining = text[pos:]
for line in remaining.split('\n'):
if len(current) + len(line) + 1 > max_length - RESERVED_FOR_HEADER:
if current:
has_code, code_opened, code_closed = calc_code_flags(current)
parts.append((current, has_code, code_opened, code_closed))
current = line
else:
current += ('\n' if current else '') + line
if current:
has_code, code_opened, code_closed = calc_code_flags(current)
parts.append((current, has_code, code_opened, code_closed))
return parts
async def send_long_message(update: Update, text: str, parse_mode: str = None, pause_every: int = 3):
"""
Отправить длинный текст, разбив на несколько сообщений.
Умная разбивка:
- Блоки кода не разрываются между сообщениями
- Если блок кода не влезает — отправляется без Markdown
- Нумерация (1/N), (2/N) только если сообщений > 1
- КАЖДЫЕ pause_every сообщений — пауза с кнопками "Продолжить" / "Отменить"
Args:
update: Telegram update
text: Текст для отправки
parse_mode: Режим парсинга (Markdown)
pause_every: Каждые сколько сообщений делать паузу (0 = без паузы)
"""
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
import asyncio
parts = split_message(text)
total = len(parts)
messages_sent = 0
wait_msg = None
for i, (part, has_code, code_opened, code_closed) in enumerate(parts):
# Определяем parse_mode для этого сообщения
actual_parse_mode = parse_mode if parse_mode and (has_code or code_opened or code_closed) else None
# Логика работы с блоками кода между сообщениями:
need_prepend = False
need_append = False
if total > 1 and actual_parse_mode:
# Проверяем нужно ли открыть блок в начале этого сообщения
if i > 0 and not parts[i-1][3]: # предыдущее не закрыло блок (parts[i-1][3] = code_closed)
need_prepend = True
# Проверяем нужно ли закрыть блок в конце этого сообщения
# Если текущее не закрыло блок и следующее не имеет кода/не закроет
if i < total - 1 and not code_closed:
next_has = parts[i+1][1]
next_opened = parts[i+1][2]
# Закрываем если следующее не имеет кода или не открывает свой блок
if not next_has:
need_append = True
# Применяем модификации
if need_prepend:
part = "```\n" + part
if need_append:
part = part + "\n```"
# Добавляем номер части ПОСЛЕ работы с блоками кода
if total > 1:
header = f"({i+1}/{total}) "
if len(header) + len(part) <= MAX_MESSAGE_LENGTH:
part = header + part
try:
await update.message.reply_text(part, parse_mode=actual_parse_mode)
except Exception as e:
# Фоллбэк: отправляем без разметки
logger.debug(f"Ошибка Markdown, отправляем без разметки: {e}")
await update.message.reply_text(part)
messages_sent += 1
# Небольшая пауза между сообщениями
await asyncio.sleep(0.1)
# КАЖДЫЕ pause_every сообщений — спрашиваем продолжать ли
if pause_every > 0 and messages_sent % pause_every == 0 and i < total - 1:
remaining = total - (i + 1)
keyboard = InlineKeyboardMarkup([
[
InlineKeyboardButton("▶️ Продолжить", callback_data=f"continue_output_{remaining}"),
InlineKeyboardButton("❌ Отменить", callback_data="cancel_output")
]
])
wait_msg = await update.message.reply_text(
f"📊 **Отправлено {messages_sent} из {total} сообщений**\n\n"
f"Осталось ещё {remaining} сообщений.\n\n"
f"Продолжить вывод?",
parse_mode="Markdown",
reply_markup=keyboard
)
# Ждём ответа пользователя (до 60 секунд)
from bot.config import state_manager
user_id = update.effective_user.id
# Сохраняем состояние ожидания
state = state_manager.get(user_id)
state.waiting_for_output_control = True
state.output_remaining = remaining
state.output_wait_message = wait_msg
# Ждём 60 секунд
for _ in range(60):
await asyncio.sleep(1)
state = state_manager.get(user_id)
if not state.waiting_for_output_control:
# Пользователь ответил
if state.continue_output:
# Продолжаем - удаляем кнопки
try:
await wait_msg.delete()
except:
pass
break
else:
# Отменил - редактируем сообщение и убираем кнопки
try:
await wait_msg.edit_text("❌ **Вывод отменен пользователем**", parse_mode="Markdown")
except:
pass
return
# Таймаут
if state.waiting_for_output_control:
state.waiting_for_output_control = False
try:
await wait_msg.edit_text("⏱️ **Время ожидания истекло**. Вывод продолжен.", parse_mode="Markdown")
except:
pass
def format_long_output(text: str, max_lines: int = 100, head_lines: int = 50, tail_lines: int = 50) -> str:
"""
Форматировать длинный вывод: показать первые и последние строки.
По умолчанию: первые 50 + последние 50 строк = 100 строк максимум.
"""
lines = text.split('\n')
total_lines = len(lines)
if total_lines <= max_lines:
return text
# Показываем первые head_lines и последние tail_lines
head = lines[:head_lines]
tail = lines[-tail_lines:]
skipped = total_lines - head_lines - tail_lines
result = '\n'.join(head)
result += f'\n\n... ({skipped} строк пропущено) ...\n'
result += '\n'.join(tail)
return result