telegram-cli-bot/bot/utils/formatters.py

352 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Утилиты для форматирования и отправки сообщений."""
import asyncio
import logging
import re
from typing import List, Tuple
from telegram import Update
logger = logging.getLogger(__name__)
# Лимиты Telegram
MAX_MESSAGE_LENGTH = 4096 # Максимальная длина сообщения
RESERVED_FOR_HEADER = 20 # Резервируем место для "(N/N) "
def escape_code_block_content(text: str) -> str:
"""
Экранировать спецсимволы Markdown внутри блоков кода.
Нужно для случаев когда сообщение содержит ```блок кода``` и текст с Markdown.
Telegram пытается интерпретировать [text](url) внутри блока кода как ссылку.
"""
# Находим все блоки кода
parts = text.split("```")
result_parts = []
for i, part in enumerate(parts):
if i % 2 == 1:
# Внутри блока кода — экранируем [ и ]
part = part.replace('[', '\\[').replace(']', '\\]')
result_parts.append(part)
return "```".join(result_parts)
def escape_markdown(text: str) -> str:
"""
Экранирование специальных символов Markdown для Telegram API.
Telegram Markdown v1 использует: * _ ` [ ] ( )
Эти символы нужно экранировать обратным слэшем.
ВАЖНО: Не экранирует содержимое блоков кода (```).
"""
if not text:
return text
# Разбиваем текст на части: внутри и снаружи блоков кода
parts = []
last_end = 0
in_code = False
# Находим все блоки кода
code_pattern = re.compile(r'```')
matches = list(code_pattern.finditer(text))
# Обрабатываем текст по частям
result = []
i = 0
while i < len(text):
# Проверяем не находимся ли внутри блока кода
# Ищем ближайший ``` от текущей позиции
remaining = text[i:]
code_match = re.search(r'```', remaining)
if code_match:
# Есть блок кода впереди
code_start = i + code_match.start()
# Экранируем текст ДО блока кода
text_to_escape = text[i:code_start]
if text_to_escape and not in_code:
text_to_escape = _escape_markdown_chars(text_to_escape)
result.append(text_to_escape)
# Добавляем сам блок кода (без экранирования)
result.append('```')
i = code_start + 3
in_code = not in_code
else:
# Нет больше блоков кода
remaining_text = text[i:]
if remaining_text and not in_code:
result.append(_escape_markdown_chars(remaining_text))
else:
result.append(remaining_text)
break
return ''.join(result)
def _escape_markdown_chars(text: str) -> str:
"""Экранировать специальные символы Markdown (вспомогательная функция)."""
# Порядок важен: сначала экранируем обратные слэши
text = text.replace('\\', '\\\\')
text = text.replace('`', '\\`')
text = text.replace('*', '\\*')
text = text.replace('_', '\\_')
text = text.replace('[', '\\[')
text = text.replace(']', '\\]')
text = text.replace('(', '\\(')
text = text.replace(')', '\\)')
return text
def find_code_blocks(text: str) -> List[Tuple[int, int]]:
"""
Найти все блоки кода (```) в тексте.
Возвращает список кортежей (start, end) для каждого блока.
"""
blocks = []
pattern = re.compile(r'```')
matches = list(pattern.finditer(text))
# Пары start-end для каждого блока
i = 0
while i < len(matches) - 1:
start = matches[i].start()
end = matches[i + 1].end()
blocks.append((start, end))
i += 2
return blocks
def split_message(text: str, max_length: int = MAX_MESSAGE_LENGTH) -> List[Tuple[str, bool, bool, bool]]:
"""
Умно разбить длинный текст на сообщения <= max_length символов.
Возвращает список кортежей (text, has_code, code_opened, code_closed):
- text: текст сообщения
- has_code: True если сообщение содержит часть блока кода
- code_opened: True если блок кода ОТКРЫТ в этом сообщении (есть открывающий ```)
- code_closed: True если блок кода ЗАКРЫТ в этом сообщении (есть закрывающий ```)
Алгоритм:
1. Разбиваем текст на строки
2. Накапливаем строки до достижения лимита
3. Отслеживаем состояние блока кода (внутри/снаружи)
"""
def calc_code_flags(txt: str) -> Tuple[bool, bool, bool]:
"""Вычислить флаги code для данного текста."""
has_code = '```' in txt
backtick_count = txt.count('```')
code_opened = backtick_count >= 1
code_closed = backtick_count >= 2 or (backtick_count == 1 and txt.rstrip().endswith('```'))
return has_code, code_opened, code_closed
if len(text) <= max_length:
has_code, code_opened, code_closed = calc_code_flags(text)
return [(text, has_code, code_opened, code_closed)]
parts = []
lines = text.split('\n')
current = ""
in_code_block = False # Состояние: внутри блока кода или нет
for line in lines:
# Проверяем, содержит ли строка ```
backticks_in_line = line.count('```')
# Если строка содержит нечётное количество ```, она меняет состояние
if backticks_in_line % 2 == 1:
# Эта строка содержит ``` который меняет состояние
if in_code_block:
# Были внутри блока — эта строка закрывает его
test_line = current + ('\n' if current else '') + line
if len(test_line) > max_length - RESERVED_FOR_HEADER:
# Сначала отправляем текущее (блок не закрыт в этой части!)
if current:
has_code, code_opened, _ = calc_code_flags(current)
# code_closed=False потому что блок продолжится
parts.append((current, has_code, code_opened, False))
current = line
else:
current = test_line
in_code_block = False
else:
# Были снаружи — эта строка открывает блок
test_line = current + ('\n' if current else '') + line
if len(test_line) > max_length - RESERVED_FOR_HEADER:
if current:
has_code, code_opened, code_closed = calc_code_flags(current)
parts.append((current, has_code, code_opened, code_closed))
current = line
else:
current = test_line
in_code_block = True
else:
# Строка не меняет состояние
test_line = current + ('\n' if current else '') + line
if len(test_line) > max_length - RESERVED_FOR_HEADER:
if current:
has_code, code_opened, _ = calc_code_flags(current)
# Если мы внутри блока кода — block не закрыт
code_closed = not in_code_block
parts.append((current, has_code, code_opened, code_closed))
current = line
else:
current = test_line
if current:
has_code, code_opened, _ = calc_code_flags(current)
code_closed = not in_code_block
parts.append((current, has_code, code_opened, code_closed))
return parts
async def send_long_message(update: Update, text: str, parse_mode: str = None, pause_every: int = 3, start_from: int = 0):
"""
Отправить длинный текст, разбив на несколько сообщений.
Поддерживает:
- Update с update.message (обычные сообщения)
- CallbackQuery (query.edit_message_text / query.message.reply_text)
Args:
start_from: Номер сообщения с которого начать (для продолжения после кнопки)
"""
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from bot.config import state_manager
# Определяем тип объекта и получаем метод для отправки
# CallbackQuery имеет from_user и answer(), но не имеет message.reply_text
is_callback_query = hasattr(update, 'answer') and hasattr(update, 'from_user')
if is_callback_query:
query = update
message = query.message
send_method = message.reply_text if message else query.edit_message_text
user_id = query.from_user.id # Для CallbackQuery используем from_user
else:
message = update.message
send_method = message.reply_text if message else None
user_id = update.effective_user.id # Для Update используем effective_user
if not send_method:
logger.error("send_long_message: не удалось определить метод отправки")
return False
parts = split_message(text)
total = len(parts)
state = state_manager.get(user_id)
# Начинаем с указанного сообщения
for i in range(start_from, total):
part, has_code, code_opened, code_closed = parts[i]
# Проверяем был ли блок кода открыт в предыдущем сообщении
prev_closed = parts[i-1][3] if i > 0 else True
# Определяем находимся ли внутри блока кода (между ``` и ```)
in_code_block = not prev_closed or (code_opened and not code_closed)
# Проверяем будем ли добавлять ``` к этому сообщению
will_add_opening = total > 1 and i > 0 and not prev_closed and not code_closed
will_add_closing = total > 1 and i < total - 1 and not code_closed
# КЛЮЧЕВОЕ ИСПРАВЛЕНИЕ:
# Отключаем parse_mode если:
# 1. Это промежуточная часть блока кода (in_code_block=True, но нет ``` в этом сообщении)
# 2. Это сообщение содержит ``` (has_code=True)
# 3. Мы добавляем искусственные ``` (will_add_opening или will_add_closing)
if in_code_block or has_code or will_add_opening or will_add_closing:
actual_parse_mode = None
else:
# Обычный текст без блоков кода — используем Markdown
actual_parse_mode = parse_mode
# Логика работы с блоками кода между сообщениями
if total > 1 and i > 0 and not prev_closed:
part = "```\n" + part
if total > 1 and i < total - 1 and not code_closed:
part = part + "\n```"
# Добавляем номер части
if total > 1:
header = f"({i+1}/{total}) "
if len(header) + len(part) <= MAX_MESSAGE_LENGTH:
part = header + part
try:
await send_method(part, parse_mode=actual_parse_mode)
except Exception as e:
logger.debug(f"Ошибка Markdown, отправляем без разметки: {e}")
await send_method(part)
await asyncio.sleep(0.1)
# КАЖДЫЕ pause_every сообщений — спрашивать продолжать ли
if pause_every > 0 and (i + 1) % pause_every == 0 and i < total - 1:
remaining = total - (i + 1)
keyboard = InlineKeyboardMarkup([
[
InlineKeyboardButton("▶️ Продолжить", callback_data=f"continue_output_{remaining}_{i+1}"),
InlineKeyboardButton("❌ Отменить", callback_data="cancel_output")
]
])
wait_msg = await send_method(
f"📊 **Отправлено {i + 1} из {total} сообщений**\n\n"
f"Осталось ещё {remaining} сообщений.\n\n"
f"Продолжить вывод?",
parse_mode="Markdown",
reply_markup=keyboard
)
# Сохраняем состояние и ВОЗВРАЩАЕМ УПРАВЛЕНИЕ
state.waiting_for_output_control = True
state.output_remaining = remaining
state.output_wait_message = wait_msg
state.output_next_index = i + 1 # С какого сообщения продолжить
state.output_text = text # Сохраняем текст для продолжения
state.output_parse_mode = parse_mode
logger.info(f"send_long_message: пауза после {i+1}/{total}, ждём кнопки (user_id={user_id})")
return True # Возвращаем True — есть продолжение
# Все сообщения отправлены
state.waiting_for_output_control = False
state.output_remaining = None
state.output_wait_message = None
state.output_next_index = None
state.output_text = None
return False # Возвращаем False — продолжения нет
def format_long_output(text: str, max_lines: int = 100, head_lines: int = 50, tail_lines: int = 50) -> str:
"""
Форматировать длинный вывод: показать первые и последние строки.
По умолчанию: первые 50 + последние 50 строк = 100 строк максимум.
"""
lines = text.split('\n')
total_lines = len(lines)
if total_lines <= max_lines:
return text
# Показываем первые head_lines и последние tail_lines
head = lines[:head_lines]
tail = lines[-tail_lines:]
skipped = total_lines - head_lines - tail_lines
result = '\n'.join(head)
result += f'\n\n... ({skipped} строк пропущено) ...\n'
result += '\n'.join(tail)
return result