#!/usr/bin/env python3 """ Qwen OAuth 2.0 Device Flow клиент. Реализует авторизацию через Device Authorization Grant (RFC 8628). """ import os import json import hashlib import secrets import time import aiohttp import logging from pathlib import Path from dataclasses import dataclass, field from datetime import datetime, timedelta from typing import Optional, Dict, Any logger = logging.getLogger(__name__) # Qwen OAuth константы QWEN_OAUTH_BASE_URL = 'https://chat.qwen.ai' QWEN_OAUTH_DEVICE_CODE_ENDPOINT = f'{QWEN_OAUTH_BASE_URL}/api/v1/oauth2/device/code' QWEN_OAUTH_TOKEN_ENDPOINT = f'{QWEN_OAUTH_BASE_URL}/api/v1/oauth2/token' QWEN_OAUTH_CLIENT_ID = 'f0304373b74a44d2b584a3fb70ca9e56' QWEN_OAUTH_SCOPE = 'openid profile email model.completion' # Пути для хранения токенов (как в qwen-code CLI) QWEN_CONFIG_DIR = Path.home() / '.qwen' QWEN_CREDENTIALS_FILE = QWEN_CONFIG_DIR / 'oauth_creds.json' @dataclass class QwenCredentials: """OAuth токены Qwen.""" access_token: str = '' refresh_token: str = '' token_type: str = 'Bearer' expiry_date: int = 0 # Unix timestamp в миллисекундах resource_url: str = 'portal.qwen.ai' def is_expired(self, buffer_minutes: int = 5) -> bool: """Проверка истечения токена с буфером.""" if not self.expiry_date: return True expiry_ms = self.expiry_date - (buffer_minutes * 60 * 1000) return int(datetime.now().timestamp() * 1000) >= expiry_ms @dataclass class DeviceAuthorizationResponse: """Ответ устройства авторизации.""" device_code: str = '' user_code: str = '' verification_uri: str = '' verification_uri_complete: str = '' expires_in: int = 0 interval: int = 5 # Polling interval в секундах @property def authorization_url(self) -> str: """Полная ссылка для авторизации.""" return self.verification_uri_complete @property def is_expired(self) -> bool: """Истёк ли срок действия device code.""" return time.time() > (self.expires_in - 30) # 30 сек буфер class QwenOAuthClient: """Qwen OAuth 2.0 клиент.""" def __init__(self, credentials_path: Optional[Path] = None): self.credentials_path = credentials_path or QWEN_CREDENTIALS_FILE self._credentials: Optional[QwenCredentials] = None self._load_credentials() def _load_credentials(self) -> None: """Загрузить токены из файла.""" if self.credentials_path.exists(): try: with open(self.credentials_path, 'r') as f: data = json.load(f) self._credentials = QwenCredentials(**data) logger.info(f"Токены загружены из {self.credentials_path}") except Exception as e: logger.error(f"Ошибка загрузки токенов: {e}") self._credentials = None else: logger.debug("Файл с токенами не найден") self._credentials = None # Загружаем code_verifier из device_code.json если есть device_code_file = QWEN_CONFIG_DIR / 'device_code.json' if device_code_file.exists(): try: with open(device_code_file, 'r') as f: data = json.load(f) code_verifier = data.get('code_verifier', '') if code_verifier: self._code_verifier = code_verifier logger.info(f"Code verifier загружен из {device_code_file}") except Exception as e: logger.debug(f"Ошибка загрузки code_verifier: {e}") def _save_credentials(self) -> None: """Сохранить токены в файл.""" if self._credentials: self.credentials_path.parent.mkdir(parents=True, exist_ok=True) with open(self.credentials_path, 'w') as f: json.dump(self._credentials.__dict__, f, indent=2) # Устанавливаем права 600 (только владелец) os.chmod(self.credentials_path, 0o600) logger.info(f"Токены сохранены в {self.credentials_path}") def has_valid_token(self) -> bool: """Проверка наличия валидного токена.""" return self._credentials is not None and not self._credentials.is_expired() async def get_access_token(self) -> Optional[str]: """Получить access token (обновляет если истёк).""" if self.has_valid_token(): return self._credentials.access_token if self._credentials and self._credentials.refresh_token: # Пробуем обновить токен if await self._refresh_token(): return self._credentials.access_token return None async def request_device_authorization(self) -> DeviceAuthorizationResponse: """ Запросить Device Authorization. Returns: DeviceAuthorizationResponse с данными для авторизации """ # Проверяем есть ли сохранённый code_verifier if not hasattr(self, '_code_verifier') or not self._code_verifier: # Генерируем PKCE code verifier и challenge self._code_verifier = secrets.token_urlsafe(32) code_challenge = hashlib.sha256(self._code_verifier.encode()).hexdigest() payload = { 'client_id': QWEN_OAUTH_CLIENT_ID, 'scope': QWEN_OAUTH_SCOPE, 'code_challenge': code_challenge, 'code_challenge_method': 'S256' } headers = { 'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'application/json', 'x-request-id': secrets.token_hex(16), 'User-Agent': 'qwen-code-cli/0.11.0' } form_data = '&'.join(f'{k}={v}' for k, v in payload.items()) async with aiohttp.ClientSession() as session: async with session.post( QWEN_OAUTH_DEVICE_CODE_ENDPOINT, data=form_data, headers=headers ) as resp: if resp.status != 200: text = await resp.text() raise Exception(f"Device authorization failed: {resp.status} - {text}") data = await resp.json() return DeviceAuthorizationResponse( device_code=data.get('device_code', ''), user_code=data.get('user_code', ''), verification_uri=data.get('verification_uri', ''), verification_uri_complete=data.get('verification_uri_complete', ''), expires_in=data.get('expires_in', 900), interval=data.get('interval', 5) ) async def poll_for_token(self, device_code: str, timeout_seconds: int = 900) -> bool: """ Опрос сервера для получения токена после авторизации пользователем. Args: device_code: Device code из request_device_authorization timeout_seconds: Максимальное время ожидания Returns: True если авторизация успешна """ if not hasattr(self, '_code_verifier'): raise Exception("Code verifier not set. Call request_device_authorization first.") start_time = time.time() interval = 5 # Начальный интервал while time.time() - start_time < timeout_seconds: payload = { 'grant_type': 'urn:ietf:params:oauth:grant-type:device_code', 'device_code': device_code, 'code_verifier': self._code_verifier, 'client_id': QWEN_OAUTH_CLIENT_ID } headers = { 'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'application/json', 'User-Agent': 'qwen-code-cli/0.11.0' } form_data = '&'.join(f'{k}={v}' for k, v in payload.items()) logger.debug(f"Polling payload: grant_type={payload['grant_type']}, device_code={device_code[:20]}..., code_verifier={self._code_verifier[:20]}...") async with aiohttp.ClientSession() as session: async with session.post( QWEN_OAUTH_TOKEN_ENDPOINT, data=form_data, headers=headers ) as resp: data = await resp.json() logger.info(f"Polling response: status={resp.status}, data={data}") if resp.status == 200: # Успех! Сохраняем токены self._credentials = QwenCredentials( access_token=data.get('access_token', ''), refresh_token=data.get('refresh_token', ''), token_type=data.get('token_type', 'Bearer'), expiry_date=int(datetime.now().timestamp() * 1000) + (data.get('expires_in', 3600) * 1000), resource_url=data.get('resource_url', 'portal.qwen.ai') ) self._save_credentials() logger.info("Авторизация успешна!") return True error = data.get('error', '') if error == 'authorization_pending': # Пользователь ещё не авторизовался logger.debug("Ожидание авторизации пользователя...") await asyncio.sleep(interval) continue elif error == 'slow_down': # Сервер просит увеличить интервал interval += 5 logger.debug(f"Увеличиваем интервал до {interval} сек") await asyncio.sleep(interval) continue elif error == 'expired_token': logger.error("Device code истёк") return False elif error == 'access_denied': logger.error("Пользователь отклонил авторизацию") return False else: logger.error(f"Неизвестная ошибка: {error}") return False logger.error("Таймаут авторизации") return False async def _refresh_token(self) -> bool: """Обновить access token используя refresh token.""" if not self._credentials or not self._credentials.refresh_token: return False payload = { 'grant_type': 'refresh_token', 'refresh_token': self._credentials.refresh_token, 'client_id': QWEN_OAUTH_CLIENT_ID } headers = { 'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'application/json', 'User-Agent': 'qwen-code-cli/0.11.0' } form_data = '&'.join(f'{k}={v}' for k, v in payload.items()) try: async with aiohttp.ClientSession() as session: async with session.post( QWEN_OAUTH_TOKEN_ENDPOINT, data=form_data, headers=headers ) as resp: if resp.status == 200: data = await resp.json() self._credentials.access_token = data.get('access_token', '') self._credentials.refresh_token = data.get('refresh_token', self._credentials.refresh_token) self._credentials.expiry_date = int(datetime.now().timestamp() * 1000) + (data.get('expires_in', 3600) * 1000) self._save_credentials() logger.info("Токен обновлён") return True else: logger.error(f"Ошибка обновления токена: {resp.status}") self._credentials = None # Очищаем неверные токены return False except Exception as e: logger.error(f"Ошибка обновления токена: {e}") return False def clear_credentials(self) -> None: """Очистить сохранённые токены.""" self._credentials = None if self.credentials_path.exists(): self.credentials_path.unlink() logger.info("Токены очищены") # Глобальный клиент (singleton) _oauth_client: Optional[QwenOAuthClient] = None def get_oauth_client() -> QwenOAuthClient: """Получить OAuth клиент (singleton).""" global _oauth_client if _oauth_client is None: _oauth_client = QwenOAuthClient() return _oauth_client async def get_authorization_url() -> Optional[str]: """ Получить URL для авторизации. Returns: URL для авторизации или None если ошибка """ try: # Проверяем есть ли активный device code device_code_file = QWEN_CONFIG_DIR / 'device_code.json' if device_code_file.exists(): with open(device_code_file, 'r') as f: data = json.load(f) start_time = data.get('start_time', 0) expires_in = data.get('expires_in', 900) code_verifier = data.get('code_verifier', '') auth_url = data.get('authorization_url', '') device_code = data.get('device_code', '') # Если device code ещё активен (900 сек = 15 мин) — используем его # Проверяем что прошло меньше половины времени жизни для надёжности if time.time() - start_time < expires_in / 2 and code_verifier and auth_url and device_code: logger.info(f"Используем существующий device code (осталось {expires_in - (time.time() - start_time):.0f} сек)") return auth_url else: logger.info("Device code истёк или скоро истечёт, запрашиваем новый") # Удаляем старый файл device_code_file.unlink() # Генерируем PKCE пару как в qwen-code CLI import base64 import hashlib code_verifier = base64.urlsafe_b64encode(os.urandom(32)).decode('utf-8').rstrip('=') code_challenge = base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).digest()).decode('utf-8').rstrip('=') # Запрос device authorization payload = { 'client_id': QWEN_OAUTH_CLIENT_ID, 'scope': QWEN_OAUTH_SCOPE, 'code_challenge': code_challenge, 'code_challenge_method': 'S256' } headers = { 'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'application/json', 'x-request-id': secrets.token_hex(16), 'User-Agent': 'qwen-code-cli/0.11.0' } form_data = '&'.join(f'{k}={v}' for k, v in payload.items()) async with aiohttp.ClientSession() as session: async with session.post( QWEN_OAUTH_DEVICE_CODE_ENDPOINT, data=form_data, headers=headers ) as resp: if resp.status != 200: text = await resp.text() logger.error(f"Device authorization failed: {resp.status} - {text[:200]}") raise Exception(f"Device authorization failed: {resp.status}") # Проверяем Content-Type content_type = resp.headers.get('Content-Type', '') if 'application/json' not in content_type: logger.error(f"Unexpected content type: {content_type}") raise Exception(f"Expected JSON response, got {content_type}") data = await resp.json() # Извлекаем данные из ответа auth_url = data.get('verification_uri_complete', '') device_code = data.get('device_code', '') expires_in = data.get('expires_in', 900) if not auth_url or not device_code: logger.error(f"Missing auth_url or device_code in response: {data}") raise Exception("Invalid OAuth response") logger.info(f"Получен OAuth URL: {auth_url}") # Сохраняем device code и code verifier для polling device_code_file.parent.mkdir(parents=True, exist_ok=True) with open(device_code_file, 'w') as f: json.dump({ 'device_code': device_code, 'code_verifier': code_verifier, # Сохраняем тот же code_verifier! 'expires_in': expires_in, 'start_time': time.time(), 'authorization_url': auth_url }, f, indent=2) # Устанавливаем права 600 (только владелец) os.chmod(device_code_file, 0o600) logger.info(f"Device code сохранён: {device_code[:20]}..., expires in {expires_in}s") return auth_url except aiohttp.ContentTypeError as e: logger.error(f"Content-Type error: {e}") return None except Exception as e: logger.error(f"Ошибка получения URL авторизации: {e}", exc_info=True) return None async def check_authorization_complete() -> bool: """ Проверить завершение авторизации (polling). Returns: True если авторизация завершена успешно """ try: # Читаем device code device_code_file = QWEN_CONFIG_DIR / 'device_code.json' if not device_code_file.exists(): logger.debug("device_code.json не найден") return False with open(device_code_file, 'r') as f: data = json.load(f) device_code = data.get('device_code', '') code_verifier = data.get('code_verifier', '') # Получаем code_verifier из файла start_time = data.get('start_time', time.time()) expires_in = data.get('expires_in', 900) logger.info(f"Device code: {device_code[:20]}..., code_verifier: {code_verifier[:20]}...") # Проверяем не истёк ли timeout if time.time() - start_time > expires_in: logger.warning("Device code истёк") device_code_file.unlink() return False # Polling с code_verifier из файла logger.info("Запуск polling для получения токена...") payload = { 'grant_type': 'urn:ietf:params:oauth:grant-type:device_code', 'device_code': device_code, 'code_verifier': code_verifier, 'client_id': QWEN_OAUTH_CLIENT_ID } headers = { 'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'application/json', 'User-Agent': 'qwen-code-cli/0.11.0' } form_data = '&'.join(f'{k}={v}' for k, v in payload.items()) async with aiohttp.ClientSession() as session: async with session.post( QWEN_OAUTH_TOKEN_ENDPOINT, data=form_data, headers=headers ) as resp: data = await resp.json() logger.info(f"Polling response: status={resp.status}, data={data}") if resp.status == 200: # Успех! Сохраняем токены credentials = { 'access_token': data.get('access_token', ''), 'refresh_token': data.get('refresh_token', ''), 'token_type': data.get('token_type', 'Bearer'), 'expiry_date': int(time.time() * 1000) + (data.get('expires_in', 3600) * 1000), 'resource_url': data.get('resource_url', 'portal.qwen.ai') } # Сохраняем токены в файл QWEN_CONFIG_DIR.mkdir(parents=True, exist_ok=True) with open(QWEN_CREDENTIALS_FILE, 'w') as f: json.dump(credentials, f, indent=2) os.chmod(QWEN_CREDENTIALS_FILE, 0o600) device_code_file.unlink() logger.info("Авторизация успешна! Токены сохранены.") return True error = data.get('error', '') if error == 'authorization_pending': logger.debug("Ожидание авторизации пользователя...") return False elif error == 'slow_down': logger.debug("Сервер просит увеличить интервал") await asyncio.sleep(5) return False elif error == 'expired_token': logger.error("Device code истёк") device_code_file.unlink() return False elif error == 'access_denied': logger.error("Пользователь отклонил авторизацию") device_code_file.unlink() return False elif error == 'invalid_request': error_desc = data.get('error_description', '') logger.error(f"Invalid request: {error_desc}") # Проверяем не истёк ли code_verifier if 'code_verifier' in error_desc.lower(): logger.error("Code verifier не совпадает — удаляем device_code.json") device_code_file.unlink() return False else: logger.error(f"Неизвестная ошибка: {error}") return False except Exception as e: logger.error(f"Ошибка проверки авторизации: {e}", exc_info=True) return False async def is_authorized() -> bool: """Проверить авторизован ли пользователь.""" client = get_oauth_client() return client.has_valid_token() async def get_access_token() -> Optional[str]: """Получить access token.""" client = get_oauth_client() return await client.get_access_token() def clear_authorization() -> None: """Очистить авторизацию.""" client = get_oauth_client() client.clear_credentials()