2024-04-22 21:02:42 -07:00

350 lines
13 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# ==============================================================================
# Copyright (C) 2021 Evil0ctal
#
# This file is part of the Douyin_TikTok_Download_API project.
#
# This project is licensed under the Apache License 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
#         __
#        />  フ
#       |  _  _ l
#       ` ミ_x
#      /      | Feed me Stars ⭐
#     /  ヽ   ノ
#     │  | | |
#  / ̄|   | | |
#  | ( ̄ヽ__ヽ_)__)
#  \二つ
# ==============================================================================
#
# Contributor Link:
# - https://github.com/Evil0ctal
# - https://github.com/Johnserf-Seed
#
# ==============================================================================
import httpx
import json
import asyncio
import re
from httpx import Response
from crawlers.utils.logger import logger
from crawlers.utils.api_exceptions import (
APIError,
APIConnectionError,
APIResponseError,
APITimeoutError,
APIUnavailableError,
APIUnauthorizedError,
APINotFoundError,
APIRateLimitError,
APIRetryExhaustedError,
)
class BaseCrawler:
"""
基础爬虫客户端 (Base crawler client)
"""
def __init__(
self,
proxies: dict = None,
max_retries: int = 3,
max_connections: int = 50,
timeout: int = 10,
max_tasks: int = 50,
crawler_headers: dict = {},
):
if isinstance(proxies, dict):
self.proxies = proxies
# [f"{k}://{v}" for k, v in proxies.items()]
else:
self.proxies = None
# 爬虫请求头 / Crawler request header
self.crawler_headers = crawler_headers or {}
# 异步的任务数 / Number of asynchronous tasks
self._max_tasks = max_tasks
self.semaphore = asyncio.Semaphore(max_tasks)
# 限制最大连接数 / Limit the maximum number of connections
self._max_connections = max_connections
self.limits = httpx.Limits(max_connections=max_connections)
# 业务逻辑重试次数 / Business logic retry count
self._max_retries = max_retries
# 底层连接重试次数 / Underlying connection retry count
self.atransport = httpx.AsyncHTTPTransport(retries=max_retries)
# 超时等待时间 / Timeout waiting time
self._timeout = timeout
self.timeout = httpx.Timeout(timeout)
# 异步客户端 / Asynchronous client
self.aclient = httpx.AsyncClient(
headers=self.crawler_headers,
proxies=self.proxies,
timeout=self.timeout,
limits=self.limits,
transport=self.atransport,
)
async def fetch_response(self, endpoint: str) -> Response:
"""获取数据 (Get data)
Args:
endpoint (str): 接口地址 (Endpoint URL)
Returns:
Response: 原始响应对象 (Raw response object)
"""
return await self.get_fetch_data(endpoint)
async def fetch_get_json(self, endpoint: str) -> dict:
"""获取 JSON 数据 (Get JSON data)
Args:
endpoint (str): 接口地址 (Endpoint URL)
Returns:
dict: 解析后的JSON数据 (Parsed JSON data)
"""
response = await self.get_fetch_data(endpoint)
return self.parse_json(response)
async def fetch_post_json(self, endpoint: str, params: dict = {}, data=None) -> dict:
"""获取 JSON 数据 (Post JSON data)
Args:
endpoint (str): 接口地址 (Endpoint URL)
Returns:
dict: 解析后的JSON数据 (Parsed JSON data)
"""
response = await self.post_fetch_data(endpoint, params, data)
return self.parse_json(response)
def parse_json(self, response: Response) -> dict:
"""解析JSON响应对象 (Parse JSON response object)
Args:
response (Response): 原始响应对象 (Raw response object)
Returns:
dict: 解析后的JSON数据 (Parsed JSON data)
"""
if (
response is not None
and isinstance(response, Response)
and response.status_code == 200
):
try:
return response.json()
except json.JSONDecodeError as e:
# 尝试使用正则表达式匹配response.text中的json数据
match = re.search(r"\{.*\}", response.text)
try:
return json.loads(match.group())
except json.JSONDecodeError as e:
logger.error("解析 {0} 接口 JSON 失败: {1}".format(response.url, e))
raise APIResponseError("解析JSON数据失败")
else:
if isinstance(response, Response):
logger.error(
"获取数据失败。状态码: {0}".format(response.status_code)
)
else:
logger.error("无效响应类型。响应类型: {0}".format(type(response)))
raise APIResponseError("获取数据失败")
async def get_fetch_data(self, url: str):
"""
获取GET端点数据 (Get GET endpoint data)
Args:
url (str): 端点URL (Endpoint URL)
Returns:
response: 响应内容 (Response content)
"""
for attempt in range(self._max_retries):
try:
response = await self.aclient.get(url, follow_redirects=True)
if not response.text.strip() or not response.content:
error_message = "{0} 次响应内容为空, 状态码: {1}, URL:{2}".format(attempt + 1,
response.status_code,
response.url)
logger.warning(error_message)
if attempt == self._max_retries - 1:
raise APIRetryExhaustedError(
"获取端点数据失败, 次数达到上限"
)
await asyncio.sleep(self._timeout)
continue
# logger.info("响应状态码: {0}".format(response.status_code))
response.raise_for_status()
return response
except httpx.RequestError:
raise APIConnectionError("连接端点失败,检查网络环境或代理:{0} 代理:{1} 类名:{2}"
.format(url, self.proxies, self.__class__.__name__)
)
except httpx.HTTPStatusError as http_error:
self.handle_http_status_error(http_error, url, attempt + 1)
except APIError as e:
e.display_error()
async def post_fetch_data(self, url: str, params: dict = {}, data=None):
"""
获取POST端点数据 (Get POST endpoint data)
Args:
url (str): 端点URL (Endpoint URL)
params (dict): POST请求参数 (POST request parameters)
Returns:
response: 响应内容 (Response content)
"""
for attempt in range(self._max_retries):
try:
response = await self.aclient.post(
url,
json=None if not params else dict(params),
data=None if not data else data,
follow_redirects=True
)
if not response.text.strip() or not response.content:
error_message = "{0} 次响应内容为空, 状态码: {1}, URL:{2}".format(attempt + 1,
response.status_code,
response.url)
logger.warning(error_message)
if attempt == self._max_retries - 1:
raise APIRetryExhaustedError(
"获取端点数据失败, 次数达到上限"
)
await asyncio.sleep(self._timeout)
continue
# logger.info("响应状态码: {0}".format(response.status_code))
response.raise_for_status()
return response
except httpx.RequestError:
raise APIConnectionError(
"连接端点失败,检查网络环境或代理:{0} 代理:{1} 类名:{2}".format(url, self.proxies,
self.__class__.__name__)
)
except httpx.HTTPStatusError as http_error:
self.handle_http_status_error(http_error, url, attempt + 1)
except APIError as e:
e.display_error()
async def head_fetch_data(self, url: str):
"""
获取HEAD端点数据 (Get HEAD endpoint data)
Args:
url (str): 端点URL (Endpoint URL)
Returns:
response: 响应内容 (Response content)
"""
try:
response = await self.aclient.head(url)
# logger.info("响应状态码: {0}".format(response.status_code))
response.raise_for_status()
return response
except httpx.RequestError:
raise APIConnectionError("连接端点失败,检查网络环境或代理:{0} 代理:{1} 类名:{2}".format(
url, self.proxies, self.__class__.__name__
)
)
except httpx.HTTPStatusError as http_error:
self.handle_http_status_error(http_error, url, 1)
except APIError as e:
e.display_error()
def handle_http_status_error(self, http_error, url: str, attempt):
"""
处理HTTP状态错误 (Handle HTTP status error)
Args:
http_error: HTTP状态错误 (HTTP status error)
url: 端点URL (Endpoint URL)
attempt: 尝试次数 (Number of attempts)
Raises:
APIConnectionError: 连接端点失败 (Failed to connect to endpoint)
APIResponseError: 响应错误 (Response error)
APIUnavailableError: 服务不可用 (Service unavailable)
APINotFoundError: 端点不存在 (Endpoint does not exist)
APITimeoutError: 连接超时 (Connection timeout)
APIUnauthorizedError: 未授权 (Unauthorized)
APIRateLimitError: 请求频率过高 (Request frequency is too high)
APIRetryExhaustedError: 重试次数达到上限 (The number of retries has reached the upper limit)
"""
response = getattr(http_error, "response", None)
status_code = getattr(response, "status_code", None)
if response is None or status_code is None:
logger.error("HTTP状态错误: {0}, URL: {1}, 尝试次数: {2}".format(
http_error, url, attempt
)
)
raise APIResponseError(f"处理HTTP错误时遇到意外情况: {http_error}")
if status_code == 302:
pass
elif status_code == 404:
raise APINotFoundError(f"HTTP Status Code {status_code}")
elif status_code == 503:
raise APIUnavailableError(f"HTTP Status Code {status_code}")
elif status_code == 408:
raise APITimeoutError(f"HTTP Status Code {status_code}")
elif status_code == 401:
raise APIUnauthorizedError(f"HTTP Status Code {status_code}")
elif status_code == 429:
raise APIRateLimitError(f"HTTP Status Code {status_code}")
else:
logger.error("HTTP状态错误: {0}, URL: {1}, 尝试次数: {2}".format(
status_code, url, attempt
)
)
raise APIResponseError(f"HTTP状态错误: {status_code}")
async def close(self):
await self.aclient.aclose()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.aclient.aclose()