# ============================================================================== # Copyright (C) 2021 Evil0ctal # # This file is part of the Douyin_TikTok_Download_API project. # # This project is licensed under the Apache License 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== #         __ #        />  フ #       |  _  _ l #       /` ミ_xノ #      /      | Feed me Stars ⭐ ️ #     /  ヽ   ノ #     │  | | | #  / ̄|   | | | #  | ( ̄ヽ__ヽ_)__) #  \二つ # ============================================================================== # # Contributor Link: # - https://github.com/Evil0ctal # - https://github.com/Johnserf-Seed # # ============================================================================== import httpx import json import asyncio import re from httpx import Response from crawlers.utils.logger import logger from crawlers.utils.api_exceptions import ( APIError, APIConnectionError, APIResponseError, APITimeoutError, APIUnavailableError, APIUnauthorizedError, APINotFoundError, APIRateLimitError, APIRetryExhaustedError, ) class BaseCrawler: """ 基础爬虫客户端 (Base crawler client) """ def __init__( self, proxies: dict = None, max_retries: int = 3, max_connections: int = 50, timeout: int = 10, max_tasks: int = 50, crawler_headers: dict = {}, ): if isinstance(proxies, dict): self.proxies = proxies # [f"{k}://{v}" for k, v in proxies.items()] else: self.proxies = None # 爬虫请求头 / Crawler request header self.crawler_headers = crawler_headers or {} # 异步的任务数 / Number of asynchronous tasks self._max_tasks = max_tasks self.semaphore = asyncio.Semaphore(max_tasks) # 限制最大连接数 / Limit the maximum number of connections self._max_connections = max_connections self.limits = httpx.Limits(max_connections=max_connections) # 业务逻辑重试次数 / Business logic retry count self._max_retries = max_retries # 底层连接重试次数 / Underlying connection retry count self.atransport = httpx.AsyncHTTPTransport(retries=max_retries) # 超时等待时间 / Timeout waiting time self._timeout = timeout self.timeout = httpx.Timeout(timeout) # 异步客户端 / Asynchronous client self.aclient = httpx.AsyncClient( headers=self.crawler_headers, proxies=self.proxies, timeout=self.timeout, limits=self.limits, transport=self.atransport, ) async def fetch_response(self, endpoint: str) -> Response: """获取数据 (Get data) Args: endpoint (str): 接口地址 (Endpoint URL) Returns: Response: 原始响应对象 (Raw response object) """ return await self.get_fetch_data(endpoint) async def fetch_get_json(self, endpoint: str) -> dict: """获取 JSON 数据 (Get JSON data) Args: endpoint (str): 接口地址 (Endpoint URL) Returns: dict: 解析后的JSON数据 (Parsed JSON data) """ response = await self.get_fetch_data(endpoint) return self.parse_json(response) async def fetch_post_json(self, endpoint: str, params: dict = {}, data=None) -> dict: """获取 JSON 数据 (Post JSON data) Args: endpoint (str): 接口地址 (Endpoint URL) Returns: dict: 解析后的JSON数据 (Parsed JSON data) """ response = await self.post_fetch_data(endpoint, params, data) return self.parse_json(response) def parse_json(self, response: Response) -> dict: """解析JSON响应对象 (Parse JSON response object) Args: response (Response): 原始响应对象 (Raw response object) Returns: dict: 解析后的JSON数据 (Parsed JSON data) """ if ( response is not None and isinstance(response, Response) and response.status_code == 200 ): try: return response.json() except json.JSONDecodeError as e: # 尝试使用正则表达式匹配response.text中的json数据 match = re.search(r"\{.*\}", response.text) try: return json.loads(match.group()) except json.JSONDecodeError as e: logger.error("解析 {0} 接口 JSON 失败: {1}".format(response.url, e)) raise APIResponseError("解析JSON数据失败") else: if isinstance(response, Response): logger.error( "获取数据失败。状态码: {0}".format(response.status_code) ) else: logger.error("无效响应类型。响应类型: {0}".format(type(response))) raise APIResponseError("获取数据失败") async def get_fetch_data(self, url: str): """ 获取GET端点数据 (Get GET endpoint data) Args: url (str): 端点URL (Endpoint URL) Returns: response: 响应内容 (Response content) """ for attempt in range(self._max_retries): try: response = await self.aclient.get(url, follow_redirects=True) if not response.text.strip() or not response.content: error_message = "第 {0} 次响应内容为空, 状态码: {1}, URL:{2}".format(attempt + 1, response.status_code, response.url) logger.warning(error_message) if attempt == self._max_retries - 1: raise APIRetryExhaustedError( "获取端点数据失败, 次数达到上限" ) await asyncio.sleep(self._timeout) continue # logger.info("响应状态码: {0}".format(response.status_code)) response.raise_for_status() return response except httpx.RequestError: raise APIConnectionError("连接端点失败,检查网络环境或代理:{0} 代理:{1} 类名:{2}" .format(url, self.proxies, self.__class__.__name__) ) except httpx.HTTPStatusError as http_error: self.handle_http_status_error(http_error, url, attempt + 1) except APIError as e: e.display_error() async def post_fetch_data(self, url: str, params: dict = {}, data=None): """ 获取POST端点数据 (Get POST endpoint data) Args: url (str): 端点URL (Endpoint URL) params (dict): POST请求参数 (POST request parameters) Returns: response: 响应内容 (Response content) """ for attempt in range(self._max_retries): try: response = await self.aclient.post( url, json=None if not params else dict(params), data=None if not data else data, follow_redirects=True ) if not response.text.strip() or not response.content: error_message = "第 {0} 次响应内容为空, 状态码: {1}, URL:{2}".format(attempt + 1, response.status_code, response.url) logger.warning(error_message) if attempt == self._max_retries - 1: raise APIRetryExhaustedError( "获取端点数据失败, 次数达到上限" ) await asyncio.sleep(self._timeout) continue # logger.info("响应状态码: {0}".format(response.status_code)) response.raise_for_status() return response except httpx.RequestError: raise APIConnectionError( "连接端点失败,检查网络环境或代理:{0} 代理:{1} 类名:{2}".format(url, self.proxies, self.__class__.__name__) ) except httpx.HTTPStatusError as http_error: self.handle_http_status_error(http_error, url, attempt + 1) except APIError as e: e.display_error() async def head_fetch_data(self, url: str): """ 获取HEAD端点数据 (Get HEAD endpoint data) Args: url (str): 端点URL (Endpoint URL) Returns: response: 响应内容 (Response content) """ try: response = await self.aclient.head(url) # logger.info("响应状态码: {0}".format(response.status_code)) response.raise_for_status() return response except httpx.RequestError: raise APIConnectionError("连接端点失败,检查网络环境或代理:{0} 代理:{1} 类名:{2}".format( url, self.proxies, self.__class__.__name__ ) ) except httpx.HTTPStatusError as http_error: self.handle_http_status_error(http_error, url, 1) except APIError as e: e.display_error() def handle_http_status_error(self, http_error, url: str, attempt): """ 处理HTTP状态错误 (Handle HTTP status error) Args: http_error: HTTP状态错误 (HTTP status error) url: 端点URL (Endpoint URL) attempt: 尝试次数 (Number of attempts) Raises: APIConnectionError: 连接端点失败 (Failed to connect to endpoint) APIResponseError: 响应错误 (Response error) APIUnavailableError: 服务不可用 (Service unavailable) APINotFoundError: 端点不存在 (Endpoint does not exist) APITimeoutError: 连接超时 (Connection timeout) APIUnauthorizedError: 未授权 (Unauthorized) APIRateLimitError: 请求频率过高 (Request frequency is too high) APIRetryExhaustedError: 重试次数达到上限 (The number of retries has reached the upper limit) """ response = getattr(http_error, "response", None) status_code = getattr(response, "status_code", None) if response is None or status_code is None: logger.error("HTTP状态错误: {0}, URL: {1}, 尝试次数: {2}".format( http_error, url, attempt ) ) raise APIResponseError(f"处理HTTP错误时遇到意外情况: {http_error}") if status_code == 302: pass elif status_code == 404: raise APINotFoundError(f"HTTP Status Code {status_code}") elif status_code == 503: raise APIUnavailableError(f"HTTP Status Code {status_code}") elif status_code == 408: raise APITimeoutError(f"HTTP Status Code {status_code}") elif status_code == 401: raise APIUnauthorizedError(f"HTTP Status Code {status_code}") elif status_code == 429: raise APIRateLimitError(f"HTTP Status Code {status_code}") else: logger.error("HTTP状态错误: {0}, URL: {1}, 尝试次数: {2}".format( status_code, url, attempt ) ) raise APIResponseError(f"HTTP状态错误: {status_code}") async def close(self): await self.aclient.aclose() async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.aclient.aclose()