Fix Telegram proxy transport via CONNECT tunnel
This commit is contained in:
parent
4a9b2e27fa
commit
b0e4249078
|
|
@ -1,33 +1,57 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import http.client
|
||||
import json
|
||||
from typing import Any
|
||||
from urllib import parse, request as urllib_request
|
||||
from urllib import parse
|
||||
|
||||
|
||||
class TelegramAPI:
|
||||
def __init__(self, token: str, proxy_url: str | None = None) -> None:
|
||||
self.base_url = f"https://api.telegram.org/bot{token}"
|
||||
handlers = []
|
||||
if proxy_url:
|
||||
handlers.append(urllib_request.ProxyHandler({"https": proxy_url, "http": proxy_url}))
|
||||
self.opener = urllib_request.build_opener(*handlers) if handlers else urllib_request.build_opener()
|
||||
self.host = "api.telegram.org"
|
||||
self.base_path = f"/bot{token}"
|
||||
self.proxy = parse.urlparse(proxy_url) if proxy_url else None
|
||||
|
||||
def _connection(self) -> http.client.HTTPSConnection:
|
||||
if self.proxy:
|
||||
conn = http.client.HTTPSConnection(
|
||||
self.proxy.hostname,
|
||||
self.proxy.port or 443,
|
||||
timeout=90,
|
||||
)
|
||||
conn.set_tunnel(self.host, 443)
|
||||
return conn
|
||||
return http.client.HTTPSConnection(self.host, 443, timeout=90)
|
||||
|
||||
def _post(self, method: str, payload: dict[str, Any]) -> dict[str, Any]:
|
||||
data = json.dumps(payload).encode("utf-8")
|
||||
req = urllib_request.Request(
|
||||
f"{self.base_url}/{method}",
|
||||
data=data,
|
||||
conn = self._connection()
|
||||
try:
|
||||
conn.request(
|
||||
"POST",
|
||||
f"{self.base_path}/{method}",
|
||||
body=data,
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST",
|
||||
)
|
||||
with self.opener.open(req, timeout=90) as response:
|
||||
return json.loads(response.read().decode("utf-8"))
|
||||
response = conn.getresponse()
|
||||
body = response.read().decode("utf-8")
|
||||
finally:
|
||||
conn.close()
|
||||
return json.loads(body)
|
||||
|
||||
def _get(self, method: str, params: dict[str, Any]) -> dict[str, Any]:
|
||||
query = parse.urlencode(params)
|
||||
with self.opener.open(f"{self.base_url}/{method}?{query}", timeout=90) as response:
|
||||
return json.loads(response.read().decode("utf-8"))
|
||||
path = f"{self.base_path}/{method}"
|
||||
if query:
|
||||
path = f"{path}?{query}"
|
||||
conn = self._connection()
|
||||
try:
|
||||
conn.request("GET", path)
|
||||
response = conn.getresponse()
|
||||
body = response.read().decode("utf-8")
|
||||
finally:
|
||||
conn.close()
|
||||
return json.loads(body)
|
||||
|
||||
def get_updates(self, offset: int | None, timeout: int) -> list[dict[str, Any]]:
|
||||
params: dict[str, Any] = {"timeout": timeout}
|
||||
|
|
|
|||
Loading…
Reference in New Issue