telegram-cli-bot/bot/utils/qwen_oauth.py.bak

573 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Qwen OAuth 2.0 Device Flow клиент.
Реализует авторизацию через Device Authorization Grant (RFC 8628).
"""
import os
import json
import hashlib
import secrets
import time
import aiohttp
import logging
from pathlib import Path
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
# Qwen OAuth константы
QWEN_OAUTH_BASE_URL = 'https://chat.qwen.ai'
QWEN_OAUTH_DEVICE_CODE_ENDPOINT = f'{QWEN_OAUTH_BASE_URL}/api/v1/oauth2/device/code'
QWEN_OAUTH_TOKEN_ENDPOINT = f'{QWEN_OAUTH_BASE_URL}/api/v1/oauth2/token'
QWEN_OAUTH_CLIENT_ID = 'f0304373b74a44d2b584a3fb70ca9e56'
QWEN_OAUTH_SCOPE = 'openid profile email model.completion'
# Пути для хранения токенов (как в qwen-code CLI)
QWEN_CONFIG_DIR = Path.home() / '.qwen'
QWEN_CREDENTIALS_FILE = QWEN_CONFIG_DIR / 'oauth_creds.json'
@dataclass
class QwenCredentials:
"""OAuth токены Qwen."""
access_token: str = ''
refresh_token: str = ''
token_type: str = 'Bearer'
expiry_date: int = 0 # Unix timestamp в миллисекундах
resource_url: str = 'portal.qwen.ai'
def is_expired(self, buffer_minutes: int = 5) -> bool:
"""Проверка истечения токена с буфером."""
if not self.expiry_date:
return True
expiry_ms = self.expiry_date - (buffer_minutes * 60 * 1000)
return int(datetime.now().timestamp() * 1000) >= expiry_ms
@dataclass
class DeviceAuthorizationResponse:
"""Ответ устройства авторизации."""
device_code: str = ''
user_code: str = ''
verification_uri: str = ''
verification_uri_complete: str = ''
expires_in: int = 0
interval: int = 5 # Polling interval в секундах
@property
def authorization_url(self) -> str:
"""Полная ссылка для авторизации."""
return self.verification_uri_complete
@property
def is_expired(self) -> bool:
"""Истёк ли срок действия device code."""
return time.time() > (self.expires_in - 30) # 30 сек буфер
class QwenOAuthClient:
"""Qwen OAuth 2.0 клиент."""
def __init__(self, credentials_path: Optional[Path] = None):
self.credentials_path = credentials_path or QWEN_CREDENTIALS_FILE
self._credentials: Optional[QwenCredentials] = None
self._load_credentials()
def _load_credentials(self) -> None:
"""Загрузить токены из файла."""
if self.credentials_path.exists():
try:
with open(self.credentials_path, 'r') as f:
data = json.load(f)
self._credentials = QwenCredentials(**data)
logger.info(f"Токены загружены из {self.credentials_path}")
except Exception as e:
logger.error(f"Ошибка загрузки токенов: {e}")
self._credentials = None
else:
logger.debug("Файл с токенами не найден")
self._credentials = None
# Загружаем code_verifier из device_code.json если есть
device_code_file = QWEN_CONFIG_DIR / 'device_code.json'
if device_code_file.exists():
try:
with open(device_code_file, 'r') as f:
data = json.load(f)
code_verifier = data.get('code_verifier', '')
if code_verifier:
self._code_verifier = code_verifier
logger.info(f"Code verifier загружен из {device_code_file}")
except Exception as e:
logger.debug(f"Ошибка загрузки code_verifier: {e}")
def _save_credentials(self) -> None:
"""Сохранить токены в файл."""
if self._credentials:
self.credentials_path.parent.mkdir(parents=True, exist_ok=True)
with open(self.credentials_path, 'w') as f:
json.dump(self._credentials.__dict__, f, indent=2)
# Устанавливаем права 600 (только владелец)
os.chmod(self.credentials_path, 0o600)
logger.info(f"Токены сохранены в {self.credentials_path}")
def has_valid_token(self) -> bool:
"""Проверка наличия валидного токена."""
return self._credentials is not None and not self._credentials.is_expired()
async def get_access_token(self) -> Optional[str]:
"""Получить access token (обновляет если истёк)."""
if self.has_valid_token():
return self._credentials.access_token
if self._credentials and self._credentials.refresh_token:
# Пробуем обновить токен
if await self._refresh_token():
return self._credentials.access_token
return None
async def request_device_authorization(self) -> DeviceAuthorizationResponse:
"""
Запросить Device Authorization.
Returns:
DeviceAuthorizationResponse с данными для авторизации
"""
# Проверяем есть ли сохранённый code_verifier
if not hasattr(self, '_code_verifier') or not self._code_verifier:
# Генерируем PKCE code verifier и challenge
self._code_verifier = secrets.token_urlsafe(32)
code_challenge = hashlib.sha256(self._code_verifier.encode()).hexdigest()
payload = {
'client_id': QWEN_OAUTH_CLIENT_ID,
'scope': QWEN_OAUTH_SCOPE,
'code_challenge': code_challenge,
'code_challenge_method': 'S256'
}
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json',
'x-request-id': secrets.token_hex(16),
'User-Agent': 'qwen-code-cli/0.11.0'
}
form_data = '&'.join(f'{k}={v}' for k, v in payload.items())
async with aiohttp.ClientSession() as session:
async with session.post(
QWEN_OAUTH_DEVICE_CODE_ENDPOINT,
data=form_data,
headers=headers
) as resp:
if resp.status != 200:
text = await resp.text()
raise Exception(f"Device authorization failed: {resp.status} - {text}")
data = await resp.json()
return DeviceAuthorizationResponse(
device_code=data.get('device_code', ''),
user_code=data.get('user_code', ''),
verification_uri=data.get('verification_uri', ''),
verification_uri_complete=data.get('verification_uri_complete', ''),
expires_in=data.get('expires_in', 900),
interval=data.get('interval', 5)
)
async def poll_for_token(self, device_code: str, timeout_seconds: int = 900) -> bool:
"""
Опрос сервера для получения токена после авторизации пользователем.
Args:
device_code: Device code из request_device_authorization
timeout_seconds: Максимальное время ожидания
Returns:
True если авторизация успешна
"""
if not hasattr(self, '_code_verifier'):
raise Exception("Code verifier not set. Call request_device_authorization first.")
start_time = time.time()
interval = 5 # Начальный интервал
while time.time() - start_time < timeout_seconds:
payload = {
'grant_type': 'urn:ietf:params:oauth:grant-type:device_code',
'device_code': device_code,
'code_verifier': self._code_verifier,
'client_id': QWEN_OAUTH_CLIENT_ID
}
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json',
'User-Agent': 'qwen-code-cli/0.11.0'
}
form_data = '&'.join(f'{k}={v}' for k, v in payload.items())
logger.debug(f"Polling payload: grant_type={payload['grant_type']}, device_code={device_code[:20]}..., code_verifier={self._code_verifier[:20]}...")
async with aiohttp.ClientSession() as session:
async with session.post(
QWEN_OAUTH_TOKEN_ENDPOINT,
data=form_data,
headers=headers
) as resp:
data = await resp.json()
logger.info(f"Polling response: status={resp.status}, data={data}")
if resp.status == 200:
# Успех! Сохраняем токены
self._credentials = QwenCredentials(
access_token=data.get('access_token', ''),
refresh_token=data.get('refresh_token', ''),
token_type=data.get('token_type', 'Bearer'),
expiry_date=int(datetime.now().timestamp() * 1000) + (data.get('expires_in', 3600) * 1000),
resource_url=data.get('resource_url', 'portal.qwen.ai')
)
self._save_credentials()
logger.info("Авторизация успешна!")
return True
error = data.get('error', '')
if error == 'authorization_pending':
# Пользователь ещё не авторизовался
logger.debug("Ожидание авторизации пользователя...")
await asyncio.sleep(interval)
continue
elif error == 'slow_down':
# Сервер просит увеличить интервал
interval += 5
logger.debug(f"Увеличиваем интервал до {interval} сек")
await asyncio.sleep(interval)
continue
elif error == 'expired_token':
logger.error("Device code истёк")
return False
elif error == 'access_denied':
logger.error("Пользователь отклонил авторизацию")
return False
else:
logger.error(f"Неизвестная ошибка: {error}")
return False
logger.error("Таймаут авторизации")
return False
async def _refresh_token(self) -> bool:
"""Обновить access token используя refresh token."""
if not self._credentials or not self._credentials.refresh_token:
return False
payload = {
'grant_type': 'refresh_token',
'refresh_token': self._credentials.refresh_token,
'client_id': QWEN_OAUTH_CLIENT_ID
}
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json',
'User-Agent': 'qwen-code-cli/0.11.0'
}
form_data = '&'.join(f'{k}={v}' for k, v in payload.items())
try:
async with aiohttp.ClientSession() as session:
async with session.post(
QWEN_OAUTH_TOKEN_ENDPOINT,
data=form_data,
headers=headers
) as resp:
if resp.status == 200:
data = await resp.json()
self._credentials.access_token = data.get('access_token', '')
self._credentials.refresh_token = data.get('refresh_token', self._credentials.refresh_token)
self._credentials.expiry_date = int(datetime.now().timestamp() * 1000) + (data.get('expires_in', 3600) * 1000)
self._save_credentials()
logger.info("Токен обновлён")
return True
else:
logger.error(f"Ошибка обновления токена: {resp.status}")
self._credentials = None # Очищаем неверные токены
return False
except Exception as e:
logger.error(f"Ошибка обновления токена: {e}")
return False
def clear_credentials(self) -> None:
"""Очистить сохранённые токены."""
self._credentials = None
if self.credentials_path.exists():
self.credentials_path.unlink()
logger.info("Токены очищены")
# Глобальный клиент (singleton)
_oauth_client: Optional[QwenOAuthClient] = None
def get_oauth_client() -> QwenOAuthClient:
"""Получить OAuth клиент (singleton)."""
global _oauth_client
if _oauth_client is None:
_oauth_client = QwenOAuthClient()
return _oauth_client
async def get_authorization_url() -> Optional[str]:
"""
Получить URL для авторизации.
Returns:
URL для авторизации или None если ошибка
"""
try:
# Проверяем есть ли активный device code
device_code_file = QWEN_CONFIG_DIR / 'device_code.json'
if device_code_file.exists():
with open(device_code_file, 'r') as f:
data = json.load(f)
start_time = data.get('start_time', 0)
expires_in = data.get('expires_in', 900)
code_verifier = data.get('code_verifier', '')
auth_url = data.get('authorization_url', '')
device_code = data.get('device_code', '')
# Если device code ещё активен (900 сек = 15 мин) — используем его
# Проверяем что прошло меньше половины времени жизни для надёжности
if time.time() - start_time < expires_in / 2 and code_verifier and auth_url and device_code:
logger.info(f"Используем существующий device code (осталось {expires_in - (time.time() - start_time):.0f} сек)")
return auth_url
else:
logger.info("Device code истёк или скоро истечёт, запрашиваем новый")
# Удаляем старый файл
device_code_file.unlink()
# Генерируем PKCE пару как в qwen-code CLI
import base64
import hashlib
code_verifier = base64.urlsafe_b64encode(os.urandom(32)).decode('utf-8').rstrip('=')
code_challenge = base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).digest()).decode('utf-8').rstrip('=')
# Запрос device authorization
payload = {
'client_id': QWEN_OAUTH_CLIENT_ID,
'scope': QWEN_OAUTH_SCOPE,
'code_challenge': code_challenge,
'code_challenge_method': 'S256'
}
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json',
'x-request-id': secrets.token_hex(16),
'User-Agent': 'qwen-code-cli/0.11.0'
}
form_data = '&'.join(f'{k}={v}' for k, v in payload.items())
async with aiohttp.ClientSession() as session:
async with session.post(
QWEN_OAUTH_DEVICE_CODE_ENDPOINT,
data=form_data,
headers=headers
) as resp:
if resp.status != 200:
text = await resp.text()
logger.error(f"Device authorization failed: {resp.status} - {text[:200]}")
raise Exception(f"Device authorization failed: {resp.status}")
# Проверяем Content-Type
content_type = resp.headers.get('Content-Type', '')
if 'application/json' not in content_type:
logger.error(f"Unexpected content type: {content_type}")
raise Exception(f"Expected JSON response, got {content_type}")
data = await resp.json()
# Извлекаем данные из ответа
auth_url = data.get('verification_uri_complete', '')
device_code = data.get('device_code', '')
expires_in = data.get('expires_in', 900)
if not auth_url or not device_code:
logger.error(f"Missing auth_url or device_code in response: {data}")
raise Exception("Invalid OAuth response")
logger.info(f"Получен OAuth URL: {auth_url}")
# Сохраняем device code и code verifier для polling
device_code_file.parent.mkdir(parents=True, exist_ok=True)
with open(device_code_file, 'w') as f:
json.dump({
'device_code': device_code,
'code_verifier': code_verifier, # Сохраняем тот же code_verifier!
'expires_in': expires_in,
'start_time': time.time(),
'authorization_url': auth_url
}, f, indent=2)
# Устанавливаем права 600 (только владелец)
os.chmod(device_code_file, 0o600)
logger.info(f"Device code сохранён: {device_code[:20]}..., expires in {expires_in}s")
return auth_url
except aiohttp.ContentTypeError as e:
logger.error(f"Content-Type error: {e}")
return None
except Exception as e:
logger.error(f"Ошибка получения URL авторизации: {e}", exc_info=True)
return None
async def check_authorization_complete() -> bool:
"""
Проверить завершение авторизации (polling).
Returns:
True если авторизация завершена успешно
"""
try:
# Читаем device code
device_code_file = QWEN_CONFIG_DIR / 'device_code.json'
if not device_code_file.exists():
logger.debug("device_code.json не найден")
return False
with open(device_code_file, 'r') as f:
data = json.load(f)
device_code = data.get('device_code', '')
code_verifier = data.get('code_verifier', '') # Получаем code_verifier из файла
start_time = data.get('start_time', time.time())
expires_in = data.get('expires_in', 900)
logger.info(f"Device code: {device_code[:20]}..., code_verifier: {code_verifier[:20]}...")
# Проверяем не истёк ли timeout
if time.time() - start_time > expires_in:
logger.warning("Device code истёк")
device_code_file.unlink()
return False
# Polling с code_verifier из файла
logger.info("Запуск polling для получения токена...")
payload = {
'grant_type': 'urn:ietf:params:oauth:grant-type:device_code',
'device_code': device_code,
'code_verifier': code_verifier,
'client_id': QWEN_OAUTH_CLIENT_ID
}
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json',
'User-Agent': 'qwen-code-cli/0.11.0'
}
form_data = '&'.join(f'{k}={v}' for k, v in payload.items())
async with aiohttp.ClientSession() as session:
async with session.post(
QWEN_OAUTH_TOKEN_ENDPOINT,
data=form_data,
headers=headers
) as resp:
data = await resp.json()
logger.info(f"Polling response: status={resp.status}, data={data}")
if resp.status == 200:
# Успех! Сохраняем токены
credentials = {
'access_token': data.get('access_token', ''),
'refresh_token': data.get('refresh_token', ''),
'token_type': data.get('token_type', 'Bearer'),
'expiry_date': int(time.time() * 1000) + (data.get('expires_in', 3600) * 1000),
'resource_url': data.get('resource_url', 'portal.qwen.ai')
}
# Сохраняем токены в файл
QWEN_CONFIG_DIR.mkdir(parents=True, exist_ok=True)
with open(QWEN_CREDENTIALS_FILE, 'w') as f:
json.dump(credentials, f, indent=2)
os.chmod(QWEN_CREDENTIALS_FILE, 0o600)
device_code_file.unlink()
logger.info("Авторизация успешна! Токены сохранены.")
return True
error = data.get('error', '')
if error == 'authorization_pending':
logger.debug("Ожидание авторизации пользователя...")
return False
elif error == 'slow_down':
logger.debug("Сервер просит увеличить интервал")
await asyncio.sleep(5)
return False
elif error == 'expired_token':
logger.error("Device code истёк")
device_code_file.unlink()
return False
elif error == 'access_denied':
logger.error("Пользователь отклонил авторизацию")
device_code_file.unlink()
return False
elif error == 'invalid_request':
error_desc = data.get('error_description', '')
logger.error(f"Invalid request: {error_desc}")
# Проверяем не истёк ли code_verifier
if 'code_verifier' in error_desc.lower():
logger.error("Code verifier не совпадает — удаляем device_code.json")
device_code_file.unlink()
return False
else:
logger.error(f"Неизвестная ошибка: {error}")
return False
except Exception as e:
logger.error(f"Ошибка проверки авторизации: {e}", exc_info=True)
return False
async def is_authorized() -> bool:
"""Проверить авторизован ли пользователь."""
client = get_oauth_client()
return client.has_valid_token()
async def get_access_token() -> Optional[str]:
"""Получить access token."""
client = get_oauth_client()
return await client.get_access_token()
def clear_authorization() -> None:
"""Очистить авторизацию."""
client = get_oauth_client()
client.clear_credentials()