mirror of
https://github.com/Evil0ctal/Douyin_TikTok_Download_API.git
synced 2025-04-23 00:29:23 +08:00
350 lines
13 KiB
Python
350 lines
13 KiB
Python
# ==============================================================================
|
||
# Copyright (C) 2021 Evil0ctal
|
||
#
|
||
# This file is part of the Douyin_TikTok_Download_API project.
|
||
#
|
||
# This project is licensed under the Apache License 2.0 (the "License");
|
||
# you may not use this file except in compliance with the License.
|
||
# You may obtain a copy of the License at:
|
||
# http://www.apache.org/licenses/LICENSE-2.0
|
||
#
|
||
# Unless required by applicable law or agreed to in writing, software
|
||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
# See the License for the specific language governing permissions and
|
||
# limitations under the License.
|
||
# ==============================================================================
|
||
# __
|
||
# /> フ
|
||
# | _ _ l
|
||
# /` ミ_xノ
|
||
# / | Feed me Stars ⭐ ️
|
||
# / ヽ ノ
|
||
# │ | | |
|
||
# / ̄| | | |
|
||
# | ( ̄ヽ__ヽ_)__)
|
||
# \二つ
|
||
# ==============================================================================
|
||
#
|
||
# Contributor Link:
|
||
# - https://github.com/Evil0ctal
|
||
# - https://github.com/Johnserf-Seed
|
||
#
|
||
# ==============================================================================
|
||
|
||
import httpx
|
||
import json
|
||
import asyncio
|
||
import re
|
||
|
||
from httpx import Response
|
||
|
||
from crawlers.utils.logger import logger
|
||
from crawlers.utils.api_exceptions import (
|
||
APIError,
|
||
APIConnectionError,
|
||
APIResponseError,
|
||
APITimeoutError,
|
||
APIUnavailableError,
|
||
APIUnauthorizedError,
|
||
APINotFoundError,
|
||
APIRateLimitError,
|
||
APIRetryExhaustedError,
|
||
)
|
||
|
||
|
||
class BaseCrawler:
|
||
"""
|
||
基础爬虫客户端 (Base crawler client)
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
proxies: dict = None,
|
||
max_retries: int = 3,
|
||
max_connections: int = 50,
|
||
timeout: int = 10,
|
||
max_tasks: int = 50,
|
||
crawler_headers: dict = {},
|
||
):
|
||
if isinstance(proxies, dict):
|
||
self.proxies = proxies
|
||
# [f"{k}://{v}" for k, v in proxies.items()]
|
||
else:
|
||
self.proxies = None
|
||
|
||
# 爬虫请求头 / Crawler request header
|
||
self.crawler_headers = crawler_headers or {}
|
||
|
||
# 异步的任务数 / Number of asynchronous tasks
|
||
self._max_tasks = max_tasks
|
||
self.semaphore = asyncio.Semaphore(max_tasks)
|
||
|
||
# 限制最大连接数 / Limit the maximum number of connections
|
||
self._max_connections = max_connections
|
||
self.limits = httpx.Limits(max_connections=max_connections)
|
||
|
||
# 业务逻辑重试次数 / Business logic retry count
|
||
self._max_retries = max_retries
|
||
# 底层连接重试次数 / Underlying connection retry count
|
||
self.atransport = httpx.AsyncHTTPTransport(retries=max_retries)
|
||
|
||
# 超时等待时间 / Timeout waiting time
|
||
self._timeout = timeout
|
||
self.timeout = httpx.Timeout(timeout)
|
||
# 异步客户端 / Asynchronous client
|
||
self.aclient = httpx.AsyncClient(
|
||
headers=self.crawler_headers,
|
||
proxies=self.proxies,
|
||
timeout=self.timeout,
|
||
limits=self.limits,
|
||
transport=self.atransport,
|
||
)
|
||
|
||
async def fetch_response(self, endpoint: str) -> Response:
|
||
"""获取数据 (Get data)
|
||
|
||
Args:
|
||
endpoint (str): 接口地址 (Endpoint URL)
|
||
|
||
Returns:
|
||
Response: 原始响应对象 (Raw response object)
|
||
"""
|
||
return await self.get_fetch_data(endpoint)
|
||
|
||
async def fetch_get_json(self, endpoint: str) -> dict:
|
||
"""获取 JSON 数据 (Get JSON data)
|
||
|
||
Args:
|
||
endpoint (str): 接口地址 (Endpoint URL)
|
||
|
||
Returns:
|
||
dict: 解析后的JSON数据 (Parsed JSON data)
|
||
"""
|
||
response = await self.get_fetch_data(endpoint)
|
||
return self.parse_json(response)
|
||
|
||
async def fetch_post_json(self, endpoint: str, params: dict = {}, data=None) -> dict:
|
||
"""获取 JSON 数据 (Post JSON data)
|
||
|
||
Args:
|
||
endpoint (str): 接口地址 (Endpoint URL)
|
||
|
||
Returns:
|
||
dict: 解析后的JSON数据 (Parsed JSON data)
|
||
"""
|
||
response = await self.post_fetch_data(endpoint, params, data)
|
||
return self.parse_json(response)
|
||
|
||
def parse_json(self, response: Response) -> dict:
|
||
"""解析JSON响应对象 (Parse JSON response object)
|
||
|
||
Args:
|
||
response (Response): 原始响应对象 (Raw response object)
|
||
|
||
Returns:
|
||
dict: 解析后的JSON数据 (Parsed JSON data)
|
||
"""
|
||
if (
|
||
response is not None
|
||
and isinstance(response, Response)
|
||
and response.status_code == 200
|
||
):
|
||
try:
|
||
return response.json()
|
||
except json.JSONDecodeError as e:
|
||
# 尝试使用正则表达式匹配response.text中的json数据
|
||
match = re.search(r"\{.*\}", response.text)
|
||
try:
|
||
return json.loads(match.group())
|
||
except json.JSONDecodeError as e:
|
||
logger.error("解析 {0} 接口 JSON 失败: {1}".format(response.url, e))
|
||
raise APIResponseError("解析JSON数据失败")
|
||
|
||
else:
|
||
if isinstance(response, Response):
|
||
logger.error(
|
||
"获取数据失败。状态码: {0}".format(response.status_code)
|
||
)
|
||
else:
|
||
logger.error("无效响应类型。响应类型: {0}".format(type(response)))
|
||
|
||
raise APIResponseError("获取数据失败")
|
||
|
||
async def get_fetch_data(self, url: str):
|
||
"""
|
||
获取GET端点数据 (Get GET endpoint data)
|
||
|
||
Args:
|
||
url (str): 端点URL (Endpoint URL)
|
||
|
||
Returns:
|
||
response: 响应内容 (Response content)
|
||
"""
|
||
for attempt in range(self._max_retries):
|
||
try:
|
||
response = await self.aclient.get(url, follow_redirects=True)
|
||
if not response.text.strip() or not response.content:
|
||
error_message = "第 {0} 次响应内容为空, 状态码: {1}, URL:{2}".format(attempt + 1,
|
||
response.status_code,
|
||
response.url)
|
||
|
||
logger.warning(error_message)
|
||
|
||
if attempt == self._max_retries - 1:
|
||
raise APIRetryExhaustedError(
|
||
"获取端点数据失败, 次数达到上限"
|
||
)
|
||
|
||
await asyncio.sleep(self._timeout)
|
||
continue
|
||
|
||
# logger.info("响应状态码: {0}".format(response.status_code))
|
||
response.raise_for_status()
|
||
return response
|
||
|
||
except httpx.RequestError:
|
||
raise APIConnectionError("连接端点失败,检查网络环境或代理:{0} 代理:{1} 类名:{2}"
|
||
.format(url, self.proxies, self.__class__.__name__)
|
||
)
|
||
|
||
except httpx.HTTPStatusError as http_error:
|
||
self.handle_http_status_error(http_error, url, attempt + 1)
|
||
|
||
except APIError as e:
|
||
e.display_error()
|
||
|
||
async def post_fetch_data(self, url: str, params: dict = {}, data=None):
|
||
"""
|
||
获取POST端点数据 (Get POST endpoint data)
|
||
|
||
Args:
|
||
url (str): 端点URL (Endpoint URL)
|
||
params (dict): POST请求参数 (POST request parameters)
|
||
|
||
Returns:
|
||
response: 响应内容 (Response content)
|
||
"""
|
||
for attempt in range(self._max_retries):
|
||
try:
|
||
response = await self.aclient.post(
|
||
url,
|
||
json=None if not params else dict(params),
|
||
data=None if not data else data,
|
||
follow_redirects=True
|
||
)
|
||
if not response.text.strip() or not response.content:
|
||
error_message = "第 {0} 次响应内容为空, 状态码: {1}, URL:{2}".format(attempt + 1,
|
||
response.status_code,
|
||
response.url)
|
||
|
||
logger.warning(error_message)
|
||
|
||
if attempt == self._max_retries - 1:
|
||
raise APIRetryExhaustedError(
|
||
"获取端点数据失败, 次数达到上限"
|
||
)
|
||
|
||
await asyncio.sleep(self._timeout)
|
||
continue
|
||
|
||
# logger.info("响应状态码: {0}".format(response.status_code))
|
||
response.raise_for_status()
|
||
return response
|
||
|
||
except httpx.RequestError:
|
||
raise APIConnectionError(
|
||
"连接端点失败,检查网络环境或代理:{0} 代理:{1} 类名:{2}".format(url, self.proxies,
|
||
self.__class__.__name__)
|
||
)
|
||
|
||
except httpx.HTTPStatusError as http_error:
|
||
self.handle_http_status_error(http_error, url, attempt + 1)
|
||
|
||
except APIError as e:
|
||
e.display_error()
|
||
|
||
async def head_fetch_data(self, url: str):
|
||
"""
|
||
获取HEAD端点数据 (Get HEAD endpoint data)
|
||
|
||
Args:
|
||
url (str): 端点URL (Endpoint URL)
|
||
|
||
Returns:
|
||
response: 响应内容 (Response content)
|
||
"""
|
||
try:
|
||
response = await self.aclient.head(url)
|
||
# logger.info("响应状态码: {0}".format(response.status_code))
|
||
response.raise_for_status()
|
||
return response
|
||
|
||
except httpx.RequestError:
|
||
raise APIConnectionError("连接端点失败,检查网络环境或代理:{0} 代理:{1} 类名:{2}".format(
|
||
url, self.proxies, self.__class__.__name__
|
||
)
|
||
)
|
||
|
||
except httpx.HTTPStatusError as http_error:
|
||
self.handle_http_status_error(http_error, url, 1)
|
||
|
||
except APIError as e:
|
||
e.display_error()
|
||
|
||
def handle_http_status_error(self, http_error, url: str, attempt):
|
||
"""
|
||
处理HTTP状态错误 (Handle HTTP status error)
|
||
|
||
Args:
|
||
http_error: HTTP状态错误 (HTTP status error)
|
||
url: 端点URL (Endpoint URL)
|
||
attempt: 尝试次数 (Number of attempts)
|
||
Raises:
|
||
APIConnectionError: 连接端点失败 (Failed to connect to endpoint)
|
||
APIResponseError: 响应错误 (Response error)
|
||
APIUnavailableError: 服务不可用 (Service unavailable)
|
||
APINotFoundError: 端点不存在 (Endpoint does not exist)
|
||
APITimeoutError: 连接超时 (Connection timeout)
|
||
APIUnauthorizedError: 未授权 (Unauthorized)
|
||
APIRateLimitError: 请求频率过高 (Request frequency is too high)
|
||
APIRetryExhaustedError: 重试次数达到上限 (The number of retries has reached the upper limit)
|
||
"""
|
||
response = getattr(http_error, "response", None)
|
||
status_code = getattr(response, "status_code", None)
|
||
|
||
if response is None or status_code is None:
|
||
logger.error("HTTP状态错误: {0}, URL: {1}, 尝试次数: {2}".format(
|
||
http_error, url, attempt
|
||
)
|
||
)
|
||
raise APIResponseError(f"处理HTTP错误时遇到意外情况: {http_error}")
|
||
|
||
if status_code == 302:
|
||
pass
|
||
elif status_code == 404:
|
||
raise APINotFoundError(f"HTTP Status Code {status_code}")
|
||
elif status_code == 503:
|
||
raise APIUnavailableError(f"HTTP Status Code {status_code}")
|
||
elif status_code == 408:
|
||
raise APITimeoutError(f"HTTP Status Code {status_code}")
|
||
elif status_code == 401:
|
||
raise APIUnauthorizedError(f"HTTP Status Code {status_code}")
|
||
elif status_code == 429:
|
||
raise APIRateLimitError(f"HTTP Status Code {status_code}")
|
||
else:
|
||
logger.error("HTTP状态错误: {0}, URL: {1}, 尝试次数: {2}".format(
|
||
status_code, url, attempt
|
||
)
|
||
)
|
||
raise APIResponseError(f"HTTP状态错误: {status_code}")
|
||
|
||
async def close(self):
|
||
await self.aclient.aclose()
|
||
|
||
async def __aenter__(self):
|
||
return self
|
||
|
||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||
await self.aclient.aclose()
|