Use AIOHTTP instead of HTTPX

This commit is contained in:
Evil0ctal 2022-11-13 16:09:53 -08:00 committed by GitHub
parent c30986c06a
commit 58aa3286a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -2,7 +2,7 @@
# -*- encoding: utf-8 -*- # -*- encoding: utf-8 -*-
# @Author: https://github.com/Evil0ctal/ # @Author: https://github.com/Evil0ctal/
# @Time: 2021/11/06 # @Time: 2021/11/06
# @Update: 2022/11/10 # @Update: 2022/11/13
# @Version: 3.1.0 # @Version: 3.1.0
# @Function: # @Function:
# 核心代码估值1块(๑•̀ㅂ•́)و✧ # 核心代码估值1块(๑•̀ㅂ•́)و✧
@ -11,7 +11,8 @@
import re import re
import httpx import aiohttp
import platform
import asyncio import asyncio
import orjson import orjson
import traceback import traceback
@ -64,23 +65,25 @@ class Scraper:
self.tiktok_api_headers = { self.tiktok_api_headers = {
'User-Agent': 'com.ss.android.ugc.trill/2613 (Linux; U; Android 10; en_US; Pixel 4; Build/QQ3A.200805.001; Cronet/58.0.2991.0)' 'User-Agent': 'com.ss.android.ugc.trill/2613 (Linux; U; Android 10; en_US; Pixel 4; Build/QQ3A.200805.001; Cronet/58.0.2991.0)'
} }
self.app_config = configparser.ConfigParser() self.config = configparser.ConfigParser()
self.app_config.read('config.ini', encoding='utf-8') self.config.read('config.ini', encoding='utf-8')
self.api_config = self.app_config['Scraper']
# 判断是否使用代理 # 判断是否使用代理
if self.api_config['Proxy_switch'] == 'True': if self.config['Scraper']['Proxy_switch'] == 'True':
# 判断是否区别协议选择代理 # 判断是否区别协议选择代理
if self.api_config['Use_different_protocols'] == 'False': if self.config['Scraper']['Use_different_protocols'] == 'False':
self.proxies = { self.proxies = {
'all': self.api_config['All'] 'all': self.config['Scraper']['All']
} }
else: else:
self.proxies = { self.proxies = {
'http': self.api_config['Http_proxy'], 'http': self.config['Scraper']['Http_proxy'],
'https': self.api_config['Https_proxy'], 'https': self.config['Scraper']['Https_proxy'],
} }
else: else:
self.proxies = None self.proxies = None
# 针对Windows系统的异步事件规则/Asynchronous event rules for Windows systems
if platform.system() == 'Windows':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
"""__________________________________________⬇utils(实用程序)⬇______________________________________""" """__________________________________________⬇utils(实用程序)⬇______________________________________"""
@ -94,15 +97,15 @@ class Scraper:
if len(url) > 0: if len(url) > 0:
return url[0] return url[0]
except Exception as e: except Exception as e:
print(e) print('Error in get_url:', e)
return None return None
# 转换链接/convert url # 转换链接/convert url
@retry(stop=stop_after_attempt(3), wait=wait_random(min=1, max=2)) @retry(stop=stop_after_attempt(3), wait=wait_random(min=1, max=2))
async def convert_share_urls(self, url: str) -> str or None: async def convert_share_urls(self, url: str) -> str or None:
""" """
用于从短链接中获取长链接 用于将分享链接(短链接)转换为原始链接/Convert share links (short links) to original links
:return: 长链接 :return: 原始链接/Original link
""" """
# 检索字符串中的链接/Retrieve links from string # 检索字符串中的链接/Retrieve links from string
url = self.get_url(url) url = self.get_url(url)
@ -129,14 +132,12 @@ class Scraper:
url = re.compile(r'(https://v.douyin.com/)\w+', re.I).match(url).group() url = re.compile(r'(https://v.douyin.com/)\w+', re.I).match(url).group()
print('正在通过抖音分享链接获取原始链接...') print('正在通过抖音分享链接获取原始链接...')
try: try:
async with httpx.AsyncClient(proxies=self.proxies) as client: async with aiohttp.ClientSession() as session:
response = await client.get(url, headers=self.headers, follow_redirects=False, timeout=10) async with session.get(url, headers=self.headers, proxy=self.proxies, allow_redirects=False,
if response.status_code == 302: timeout=10) as response:
# 视频链接302重定向'Location'字段 if response.status == 302:
# https://www.iesdouyin.com/share/video/7148345687535570206/ url = response.headers['Location'].split('?')[0] if '?' in response.headers[
# 用户主页链接302重定向'Location'字段 'Location'] else \
# https://www.iesdouyin.com/share/user/MS4wLjABAAAAbLMPpOhVk441et7z7ECGcmGrK42KtoWOuR0_7pLZCcyFheA9__asY-kGfNAtYqXR
url = response.headers['Location'].split('?')[0] if '?' in response.headers['Location'] else \
response.headers['Location'] response.headers['Location']
print('获取原始链接成功, 原始链接为: {}'.format(url)) print('获取原始链接成功, 原始链接为: {}'.format(url))
return url return url
@ -162,14 +163,12 @@ class Scraper:
else: else:
print('正在通过TikTok分享链接获取原始链接...') print('正在通过TikTok分享链接获取原始链接...')
try: try:
async with httpx.AsyncClient(proxies=self.proxies) as client: async with aiohttp.ClientSession() as session:
response = await client.get(url, headers=self.headers, follow_redirects=False, timeout=10) async with session.get(url, headers=self.headers, proxy=self.proxies, allow_redirects=False,
if response.status_code == 301: timeout=10) as response:
# 视频链接302重定向'Location'字段 if response.status == 301:
# https://www.tiktok.com/@tiktok/video/6950000000000000000 url = response.headers['Location'].split('?')[0] if '?' in response.headers[
# 用户主页链接302重定向'Location'字段 'Location'] else \
# https://www.tiktok.com/@tiktok
url = response.headers['Location'].split('?')[0] if '?' in response.headers['Location'] else \
response.headers['Location'] response.headers['Location']
print('获取原始链接成功, 原始链接为: {}'.format(url)) print('获取原始链接成功, 原始链接为: {}'.format(url))
return url return url
@ -230,12 +229,11 @@ class Scraper:
api_url = f"https://www.iesdouyin.com/web/api/v2/aweme/iteminfo/?item_ids={video_id}" api_url = f"https://www.iesdouyin.com/web/api/v2/aweme/iteminfo/?item_ids={video_id}"
# 访问API/Access API # 访问API/Access API
print("正在获取视频数据API: {}".format(api_url)) print("正在获取视频数据API: {}".format(api_url))
async with httpx.AsyncClient(proxies=self.proxies) as client: async with aiohttp.ClientSession() as session:
response = await client.get(api_url, headers=self.headers, timeout=5) async with session.get(api_url, headers=self.headers, proxy=self.proxies, timeout=10) as response:
# 获取返回的json数据/Get the returned json data response = await response.json()
data = orjson.loads(response.text)
# 获取视频数据/Get video data # 获取视频数据/Get video data
video_data = data['item_list'][0] video_data = response['item_list'][0]
print('获取视频数据成功!') print('获取视频数据成功!')
# print("抖音API返回数据: {}".format(video_data)) # print("抖音API返回数据: {}".format(video_data))
return video_data return video_data
@ -252,8 +250,9 @@ class Scraper:
api_url = f"https://live.douyin.com/webcast/web/enter/?aid=6383&web_rid={web_rid}" api_url = f"https://live.douyin.com/webcast/web/enter/?aid=6383&web_rid={web_rid}"
# 访问API/Access API # 访问API/Access API
print("正在获取视频数据API: {}".format(api_url)) print("正在获取视频数据API: {}".format(api_url))
async with httpx.AsyncClient(proxies=self.proxies) as client: async with aiohttp.ClientSession() as session:
response = await client.get(api_url, headers=self.douyin_cookies, timeout=5) async with session.get(api_url, headers=self.douyin_cookies, proxy=self.proxies, timeout=10) as response:
response = await response.json()
# 获取返回的json数据/Get the returned json data # 获取返回的json数据/Get the returned json data
data = orjson.loads(response.text) data = orjson.loads(response.text)
# 获取视频数据/Get video data # 获取视频数据/Get video data
@ -306,11 +305,10 @@ class Scraper:
try: try:
api_url = f'https://api-h2.tiktokv.com/aweme/v1/feed/?aweme_id={video_id}&version_code=2613&aid=1180' api_url = f'https://api-h2.tiktokv.com/aweme/v1/feed/?aweme_id={video_id}&version_code=2613&aid=1180'
print("正在获取视频数据API: {}".format(api_url)) print("正在获取视频数据API: {}".format(api_url))
async with httpx.AsyncClient(proxies=self.proxies) as client: async with aiohttp.ClientSession() as session:
response = await client.get(api_url, headers=self.tiktok_api_headers, timeout=5) async with session.get(api_url, headers=self.tiktok_api_headers, proxy=self.proxies, timeout=10) as response:
if response.content != '': response = await response.json()
data = orjson.loads(response.text) video_data = response['aweme_list'][0]
video_data = data['aweme_list'][0]
print('获取视频信息成功!') print('获取视频信息成功!')
return video_data return video_data
except Exception as e: except Exception as e:
@ -519,8 +517,10 @@ class Scraper:
"""__________________________________________⬇Test methods(测试方法)⬇______________________________________""" """__________________________________________⬇Test methods(测试方法)⬇______________________________________"""
async def async_test(douyin_url: str = None, tiktok_url: str = None): async def async_test(douyin_url: str = None, tiktok_url: str = None) -> None:
# 异步测试/Async test # 异步测试/Async test
start_time = time.time()
print("正在进行异步测试...")
print("正在测试异步获取抖音视频ID方法...") print("正在测试异步获取抖音视频ID方法...")
douyin_id = await api.get_douyin_video_id(douyin_url) douyin_id = await api.get_douyin_video_id(douyin_url)
@ -536,6 +536,10 @@ async def async_test(douyin_url: str = None, tiktok_url: str = None):
douyin_hybrid_data = await api.hybrid_parsing(douyin_url) douyin_hybrid_data = await api.hybrid_parsing(douyin_url)
tiktok_hybrid_data = await api.hybrid_parsing(tiktok_url) tiktok_hybrid_data = await api.hybrid_parsing(tiktok_url)
# 总耗时/Total time
total_time = round(time.time() - start_time, 2)
print("异步测试完成,总耗时: {}s".format(total_time))
if __name__ == '__main__': if __name__ == '__main__':
api = Scraper() api = Scraper()