telegram-cli-bot/bot/tools/telegram_web_tool.py

233 lines
8.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Telegram Web Tool - чтение публичных Telegram-каналов через web (t.me)
"""
import aiohttp
from bs4 import BeautifulSoup
from typing import List, Dict, Optional
from datetime import datetime
import logging
import json
import os
from bot.tools import BaseTool, ToolResult, register_tool
logger = logging.getLogger(__name__)
# Путь к файлу хранения подписок
SUBSCRIPTIONS_FILE = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
'telegram_channels.json'
)
def load_subscriptions() -> List[str]:
"""Загрузить список подписок из файла"""
if os.path.exists(SUBSCRIPTIONS_FILE):
try:
with open(SUBSCRIPTIONS_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
return data.get('channels', [])
except Exception as e:
logger.error(f"Ошибка загрузки подписок: {e}")
return []
def save_subscriptions(channels: List[str]) -> bool:
"""Сохранить список подписок в файл"""
try:
with open(SUBSCRIPTIONS_FILE, 'w', encoding='utf-8') as f:
json.dump({'channels': channels}, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
logger.error(f"Ошибка сохранения подписок: {e}")
return False
async def fetch_channel_messages(username: str, limit: int = 10) -> List[Dict]:
"""
Получить сообщения из публичного Telegram-канала через t.me
Args:
username: Имя канала (без @)
limit: Количество сообщений для получения
Returns:
Список сообщений
"""
url = f"https://t.me/s/{username}"
messages = []
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=10) as response:
if response.status != 200:
logger.error(f"Ошибка доступа к каналу {username}: {response.status}")
return messages
html = await response.text()
soup = BeautifulSoup(html, 'html.parser')
# Найти все сообщения
telegram_messages = soup.find_all('div', class_='tgme_widget_message')
for msg in telegram_messages[-limit:]:
try:
# Текст сообщения
text_elem = msg.find('div', class_='tgme_widget_message_text')
text = text_elem.get_text(strip=True) if text_elem else ""
# Дата
date_elem = msg.find('a', class_='tgme_widget_message_date')
date_text = date_elem.get_text(strip=True) if date_elem else ""
# Ссылка на сообщение
link = date_elem.get('href') if date_elem else ""
# Автор (если есть)
author_elem = msg.find('div', class_='tgme_widget_message_author')
author = author_elem.get_text(strip=True) if author_elem else username
messages.append({
'text': text,
'date': date_text,
'link': link,
'author': author
})
except Exception as e:
logger.debug(f"Ошибка парсинга сообщения: {e}")
continue
except Exception as e:
logger.error(f"Ошибка получения канала {username}: {e}")
return messages
async def telegram_web_tool(action: str, **kwargs) -> Dict:
"""
Основной инструмент для работы с Telegram-каналами
Actions:
- add: добавить канал (username)
- list: показать список каналов
- read: прочитать сообщения (username, limit)
Args:
action: Действие
**kwargs: Параметры для действия
Returns:
Dict с результатом
"""
if action == 'add':
username = kwargs.get('username')
if not username:
return {
'success': False,
'error': 'username обязателен',
'action': 'add'
}
# Очистить username от @ и пробелов
username = username.strip().lstrip('@')
channels = load_subscriptions()
if username in channels:
return {
'success': False,
'error': f'Канал @{username} уже в списке',
'action': 'add'
}
channels.append(username)
save_subscriptions(channels)
return {
'success': True,
'message': f'Канал @{username} добавлен',
'action': 'add',
'username': username
}
elif action == 'list':
channels = load_subscriptions()
return {
'success': True,
'channels': channels,
'count': len(channels),
'action': 'list'
}
elif action == 'read':
username = kwargs.get('username')
limit = kwargs.get('limit', 10)
if not username:
return {
'success': False,
'error': 'username обязателен',
'action': 'read'
}
username = username.strip().lstrip('@')
messages = await fetch_channel_messages(username, limit)
return {
'success': True,
'channel': username,
'messages': messages,
'count': len(messages),
'action': 'read'
}
elif action == 'fetch_all':
"""Получить новые сообщения из всех каналов"""
channels = load_subscriptions()
limit = kwargs.get('limit', 5)
all_messages = {}
for channel in channels:
messages = await fetch_channel_messages(channel, limit)
if messages:
all_messages[channel] = messages
return {
'success': True,
'channels_checked': len(channels),
'messages': all_messages,
'action': 'fetch_all'
}
else:
return {
'success': False,
'error': f'Неизвестное действие: {action}',
'action': action
}
@register_tool
class TelegramWebTool(BaseTool):
"""Инструмент для работы с Telegram-каналами через web"""
name = "telegram_web_tool"
description = "Чтение публичных Telegram-каналов через t.me/s/username. Actions: add (username), list, read (username, limit), fetch_all (limit)"
category = "telegram"
async def execute(self, action: str = 'list', **kwargs) -> ToolResult:
"""Выполнить инструмент"""
result = await telegram_web_tool(action, **kwargs)
if result['success']:
return ToolResult(
success=True,
data=result,
metadata={'action': action, 'tool_name': self.name}
)
else:
return ToolResult(
success=False,
error=result.get('error', 'Неизвестная ошибка'),
metadata={'action': action, 'tool_name': self.name}
)