# -*- coding: utf-8 -*- import time from datetime import datetime try: import ujson as json except ImportError: import json import requests from requests.exceptions import HTTPError, ConnectionError, Timeout from requests.adapters import HTTPAdapter try: # noinspection PyUnresolvedReferences from requests.packages.urllib3 import fields format_header_param = fields.format_header_param except ImportError: format_header_param = None import telebot from telebot import types from telebot import util logger = telebot.logger proxy = None session = None API_URL = None FILE_URL = None CONNECT_TIMEOUT = 15 READ_TIMEOUT = 30 LONG_POLLING_TIMEOUT = 10 # Should be positive, short polling should be used for testing purposes only (https://core.telegram.org/bots/api#getupdates) SESSION_TIME_TO_LIVE = 600 # In seconds. None - live forever, 0 - one-time RETRY_ON_ERROR = False RETRY_TIMEOUT = 2 MAX_RETRIES = 15 RETRY_ENGINE = 1 CUSTOM_SERIALIZER = None CUSTOM_REQUEST_SENDER = None ENABLE_MIDDLEWARE = False def _get_req_session(reset=False): if SESSION_TIME_TO_LIVE: # If session TTL is set - check time passed creation_date = util.per_thread('req_session_time', lambda: datetime.now(), reset) # noinspection PyTypeChecker if (datetime.now() - creation_date).total_seconds() > SESSION_TIME_TO_LIVE: # Force session reset reset = True # Save reset time util.per_thread('req_session_time', lambda: datetime.now(), True) if SESSION_TIME_TO_LIVE == 0: # Session is one-time use return requests.sessions.Session() else: # Session lives some time or forever once created. Default return util.per_thread('req_session', lambda: session if session else requests.sessions.Session(), reset) def _make_request(token, method_name, method='get', params=None, files=None): """ Makes a request to the Telegram API. :param token: The bot's API token. (Created with @BotFather) :param method_name: Name of the API method to be called. (E.g. 'getUpdates') :param method: HTTP method to be used. Defaults to 'get'. :param params: Optional parameters. Should be a dictionary with key-value pairs. :param files: Optional files. :return: The result parsed to a JSON dictionary. """ if not token: raise Exception('Bot token is not defined') if API_URL: # noinspection PyUnresolvedReferences request_url = API_URL.format(token, method_name) else: request_url = "https://api.telegram.org/bot{0}/{1}".format(token, method_name) logger.debug("Request: method={0} url={1} params={2} files={3}".format(method, request_url, params, files).replace(token, token.split(':')[0] + ":{TOKEN}")) read_timeout = READ_TIMEOUT connect_timeout = CONNECT_TIMEOUT if files: files_copy = dict(files) # process types.InputFile for key, value in files_copy.items(): if isinstance(value, types.InputFile): files[key] = value.file elif isinstance(value, tuple) and (len(value) == 2) and isinstance(value[1], types.InputFile): files[key] = (value[0], value[1].file) if files and format_header_param: fields.format_header_param = _no_encode(format_header_param) if params: if 'timeout' in params: read_timeout = params.pop('timeout') connect_timeout = read_timeout if 'long_polling_timeout' in params: # For getUpdates. It's the only function with timeout parameter on the BOT API side long_polling_timeout = params.pop('long_polling_timeout') params['timeout'] = long_polling_timeout # Long polling hangs for a given time. Read timeout should be greater that long_polling_timeout read_timeout = max(long_polling_timeout + 5, read_timeout) params = params or None # Set params to None if empty result = None if CUSTOM_REQUEST_SENDER: # noinspection PyCallingNonCallable result = CUSTOM_REQUEST_SENDER( method, request_url, params=params, files=files, timeout=(connect_timeout, read_timeout), proxies=proxy) elif RETRY_ON_ERROR and RETRY_ENGINE == 1: got_result = False current_try = 0 while not got_result and current_try 0: ret = ret[:-1] return '[' + ret + ']' def _convert_markup(markup): if isinstance(markup, types.JsonSerializable): return markup.to_json() return markup def _convert_entites(entites): if entites is None: return None elif len(entites) == 0: return [] elif isinstance(entites[0], types.JsonSerializable): return [entity.to_json() for entity in entites] else: return entites def _convert_poll_options(poll_options): if poll_options is None: return None elif len(poll_options) == 0: return [] elif isinstance(poll_options[0], str): # Compatibility mode with previous bug when only list of string was accepted as poll_options return poll_options elif isinstance(poll_options[0], types.PollOption): return [option.text for option in poll_options] else: return poll_options def convert_input_media(media): if isinstance(media, types.InputMedia): return media.convert_input_media() return None, None def convert_input_media_array(array): media = [] files = {} for input_media in array: if isinstance(input_media, types.InputMedia): media_dict = input_media.to_dict() if media_dict['media'].startswith('attach://'): key = media_dict['media'].replace('attach://', '') files[key] = input_media.media media.append(media_dict) return json.dumps(media), files def _no_encode(func): def wrapper(key, val): if key == 'filename': return u'{0}={1}'.format(key, val) else: return func(key, val) return wrapper class ApiException(Exception): """ This class represents a base Exception thrown when a call to the Telegram API fails. In addition to an informative message, it has a `function_name` and a `result` attribute, which respectively contain the name of the failed function and the returned result that made the function to be considered as failed. """ def __init__(self, msg, function_name, result): super(ApiException, self).__init__("A request to the Telegram API was unsuccessful. {0}".format(msg)) self.function_name = function_name self.result = result class ApiHTTPException(ApiException): """ This class represents an Exception thrown when a call to the Telegram API server returns HTTP code that is not 200. """ def __init__(self, function_name, result): super(ApiHTTPException, self).__init__( "The server returned HTTP {0} {1}. Response body:\n[{2}]" \ .format(result.status_code, result.reason, result.text.encode('utf8')), function_name, result) class ApiInvalidJSONException(ApiException): """ This class represents an Exception thrown when a call to the Telegram API server returns invalid json. """ def __init__(self, function_name, result): super(ApiInvalidJSONException, self).__init__( "The server returned an invalid JSON response. Response body:\n[{0}]" \ .format(result.text.encode('utf8')), function_name, result) class ApiTelegramException(ApiException): """ This class represents an Exception thrown when a Telegram API returns error code. """ def __init__(self, function_name, result, result_json): super(ApiTelegramException, self).__init__( "Error code: {0}. Description: {1}" \ .format(result_json['error_code'], result_json['description']), function_name, result) self.result_json = result_json self.error_code = result_json['error_code'] self.description = result_json['description']