From eb3133ab8f6f4d81c1f5b7fae007294b9d774eac Mon Sep 17 00:00:00 2001 From: Evil0ctal Date: Wed, 6 Apr 2022 01:41:56 -0700 Subject: [PATCH] =?UTF-8?q?=F0=9F=9A=80=E9=87=8D=E6=9E=84=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- API/API_logs.txt | 2 + API/logs.txt | 2 + API/scraper.py | 388 +++++++++++++++++++++++++++++++++++++++++++++++ API/web_api.py | 195 ++++++++++++++++++++++++ 4 files changed, 587 insertions(+) create mode 100644 API/API_logs.txt create mode 100644 API/logs.txt create mode 100644 API/scraper.py create mode 100644 API/web_api.py diff --git a/API/API_logs.txt b/API/API_logs.txt new file mode 100644 index 0000000..25b1a65 --- /dev/null +++ b/API/API_logs.txt @@ -0,0 +1,2 @@ +# @Author: https://github.com/Evil0ctal/ +# @Time: 2021/11/06 \ No newline at end of file diff --git a/API/logs.txt b/API/logs.txt new file mode 100644 index 0000000..25b1a65 --- /dev/null +++ b/API/logs.txt @@ -0,0 +1,2 @@ +# @Author: https://github.com/Evil0ctal/ +# @Time: 2021/11/06 \ No newline at end of file diff --git a/API/scraper.py b/API/scraper.py new file mode 100644 index 0000000..7262fea --- /dev/null +++ b/API/scraper.py @@ -0,0 +1,388 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- +# @Author: https://github.com/Evil0ctal/ +# @Time: 2021/11/06 +# @Update: 2022/04/05 +# @Function: +# 核心代码,估值1块(๑•̀ㅂ•́)و✧ +# 用于爬取Douyin/TikTok数据并以字典形式返回。 + + +import re +import json +import time +import requests +from retrying import retry + + +class Scraper: + """ + Scraper.douyin():抖音视频/图集解析,返回字典。 + Scraper.tiktok():TikTok视频解析,返回字典。 + """ + def __init__(self): + self.headers = { + 'user-agent': 'Mozilla/5.0 (Linux; Android 8.0; Pixel 2 Build/OPD3.170816.012) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Mobile Safari/537.36 Edg/87.0.664.66' + } + self.tiktok_headers = { + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", + "authority": "www.tiktok.com", + "Accept-Encoding": "gzip, deflate", + "Connection": "keep-alive", + "Host": "www.tiktok.com", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) coc_coc_browser/86.0.170 Chrome/80.0.3987.170 Safari/537.36", + } + + @retry(stop_max_attempt_number=6) + def douyin(self, original_url): + """ + 利用官方接口解析抖音链接信息 + :param original_url: 抖音/TikTok链接(支持长/短链接) + :return:包含信息的字典 + """ + headers = self.headers + try: + # 开始时间 + start = time.time() + # 原视频链接 + r = requests.get(url=original_url, headers=headers, allow_redirects=False) + try: + # 2021/12/11 发现抖音做了限制,会自动重定向网址,但是可以从回执头中获取 + long_url = r.headers['Location'] + except: + # 报错后判断为长链接,直接截取视频id + long_url = original_url + # 正则匹配出视频ID + key = re.findall('video/(\d+)?', long_url)[0] + # 构造抖音API链接 + api_url = f'https://www.iesdouyin.com/web/api/v2/aweme/iteminfo/?item_ids={key}' + print("正在请求抖音API链接: " + '\n' + api_url) + # 将回执以JSON格式处理 + js = json.loads(requests.get(url=api_url, headers=headers).text) + # 判断是否为图集 + if js['item_list'][0]['images'] is not None: + print("类型 = 图集") + # 类型为图集 + url_type = 'album' + # 图集标题 + album_title = str(js['item_list'][0]['desc']) + # 图集作者昵称 + album_author = str(js['item_list'][0]['author']['nickname']) + # 图集作者签名 + album_author_signature = str(js['item_list'][0]['author']['signature']) + # 图集作者UID + album_author_uid = str(js['item_list'][0]['author']['uid']) + # 图集作者抖音号 + album_author_id = str(js['item_list'][0]['author']['unique_id']) + if album_author_id == "": + # 如果作者未修改过抖音号,应使用此值以避免无法获取其抖音ID + album_author_id = str(js['item_list'][0]['author']['short_id']) + # 图集BGM链接 + if len(js['item_list'][0]['music']['play_url']['url_list']) > 0: + album_music = str(js['item_list'][0]['music']['play_url']['url_list'][0]) + else: + # 部分视频的API数据中没有BGM链接,返回None + album_music = "None" + # 图集BGM标题 + album_music_title = str(js['item_list'][0]['music']['title']) + # 图集BGM作者 + album_music_author = str(js['item_list'][0]['music']['author']) + # 图集BGM ID + album_music_id = str(js['item_list'][0]['music']['id']) + # 图集BGM MID + album_music_mid = str(js['item_list'][0]['music']['mid']) + # 图集ID + album_aweme_id = str(js['item_list'][0]['statistics']['aweme_id']) + # 评论数量 + album_comment_count = str(js['item_list'][0]['statistics']['comment_count']) + # 获赞数量 + album_digg_count = str(js['item_list'][0]['statistics']['digg_count']) + # 播放次数 + album_play_count = str(js['item_list'][0]['statistics']['play_count']) + # 分享次数 + album_share_count = str(js['item_list'][0]['statistics']['share_count']) + # 上传时间戳 + album_create_time = str(js['item_list'][0]['create_time']) + # 将话题保存在列表中 + album_hashtags = [] + for tag in js['item_list'][0]['text_extra']: + album_hashtags.append(tag['hashtag_name']) + # 将无水印图片链接保存在列表中 + images_list = [] + for data in js['item_list'][0]['images']: + images_list.append(data['url_list'][0]) + # 结束时间 + end = time.time() + # 解析时间 + analyze_time = format((end - start), '.4f') + # 将信息储存在字典中 + album_data = {'status': 'success', + 'analyze_time': (analyze_time + 's'), + 'url_type': url_type, + 'platform': 'douyin', + 'original_url': original_url, + 'api_url': api_url, + 'album_aweme_id': album_aweme_id, + 'album_title': album_title, + 'album_author': album_author, + 'album_author_signature': album_author_signature, + 'album_author_uid': album_author_uid, + 'album_author_id': album_author_id, + 'album_music': album_music, + 'album_music_title': album_music_title, + 'album_music_author': album_music_author, + 'album_music_id': album_music_id, + 'album_music_mid': album_music_mid, + 'album_comment_count': album_comment_count, + 'album_digg_count': album_digg_count, + 'album_play_count': album_play_count, + 'album_share_count': album_share_count, + 'album_create_time': album_create_time, + 'album_list': images_list, + 'album_hashtags': album_hashtags} + return album_data + else: + print("类型 = 视频") + # 类型为视频 + url_type = 'video' + # 视频标题 + video_title = str(js['item_list'][0]['desc']) + # 视频作者昵称 + video_author = str(js['item_list'][0]['author']['nickname']) + # 视频作者抖音号 + video_author_id = str(js['item_list'][0]['author']['unique_id']) + if video_author_id == "": + # 如果作者未修改过抖音号,应使用此值以避免无法获取其抖音ID + video_author_id = str(js['item_list'][0]['author']['short_id']) + # 有水印视频链接 + wm_video_url = str(js['item_list'][0]['video']['play_addr']['url_list'][0]) + # 无水印视频链接 (在回执JSON中将关键字'playwm'替换为'play'即可获得无水印地址) + nwm_video_url = str(js['item_list'][0]['video']['play_addr']['url_list'][0]).replace('playwm', 'play') + # 去水印后视频链接(2022年1月1日抖音APi获取到的URL会进行跳转,需要在Location中获取直链) + r = requests.get(url=nwm_video_url, headers=headers, allow_redirects=False) + video_url = r.headers['Location'] + # 视频背景音频 + if len(js['item_list'][0]['music']['play_url']['url_list']) > 0: + video_music = str(js['item_list'][0]['music']['play_url']['url_list'][0]) + else: + # 部分视频的API数据中没有BGM链接,返回None + video_music = "None" + # 视频作者签名 + video_author_signature = str(js['item_list'][0]['author']['signature']) + # 视频作者UID + video_author_uid = str(js['item_list'][0]['author']['uid']) + # 视频BGM标题 + video_music_title = str(js['item_list'][0]['music']['title']) + # 视频BGM作者 + video_music_author = str(js['item_list'][0]['music']['author']) + # 视频BGM ID + video_music_id = str(js['item_list'][0]['music']['id']) + # 视频BGM MID + video_music_mid = str(js['item_list'][0]['music']['mid']) + # 视频ID + video_aweme_id = str(js['item_list'][0]['statistics']['aweme_id']) + # 评论数量 + video_comment_count = str(js['item_list'][0]['statistics']['comment_count']) + # 获赞数量 + video_digg_count = str(js['item_list'][0]['statistics']['digg_count']) + # 播放次数 + video_play_count = str(js['item_list'][0]['statistics']['play_count']) + # 分享次数 + video_share_count = str(js['item_list'][0]['statistics']['share_count']) + # 上传时间戳 + video_create_time = str(js['item_list'][0]['create_time']) + # 将话题保存在列表中 + video_hashtags = [] + for tag in js['item_list'][0]['text_extra']: + video_hashtags.append(tag['hashtag_name']) + # 结束时间 + end = time.time() + # 解析时间 + analyze_time = format((end - start), '.4f') + # 返回包含数据的字典 + video_data = {'status': 'success', + 'analyze_time': (analyze_time + 's'), + 'url_type': url_type, + 'platform': 'douyin', + 'original_url': original_url, + 'api_url': api_url, + 'video_title': video_title, + 'nwm_video_url': video_url, + 'wm_video_url': wm_video_url, + 'video_aweme_id': video_aweme_id, + 'video_author': video_author, + 'video_author_signature': video_author_signature, + 'video_author_uid': video_author_uid, + 'video_author_id': video_author_id, + 'video_music': video_music, + 'video_music_title': video_music_title, + 'video_music_author': video_music_author, + 'video_music_id': video_music_id, + 'video_music_mid': video_music_mid, + 'video_comment_count': video_comment_count, + 'video_digg_count': video_digg_count, + 'video_play_count': video_play_count, + 'video_share_count': video_share_count, + 'video_create_time': video_create_time, + 'video_hashtags': video_hashtags} + return video_data + except Exception as e: + # 返回异常 + return {'status': 'failed', 'reason': e, 'function': 'Scraper.douyin()', 'value': original_url} + + @retry(stop_max_attempt_number=6) + def tiktok(self, original_url): + """ + 解析TikTok链接 + :param original_url:TikTok链接 + :return:包含信息的字典 + """ + headers = self.headers + # 开始时间 + start = time.time() + # 校验TikTok链接 + if original_url[:12] == "https://www.": + original_url = original_url + print("目标链接: ", original_url) + else: + # 从请求头中获取原始链接 + response = requests.get(url=original_url, headers=headers, allow_redirects=False) + true_link = response.headers['Location'].split("?")[0] + # TikTok请求头返回的第二种链接类型 + if '.html' in true_link: + response = requests.get(url=true_link, headers=headers, allow_redirects=False) + original_url = response.headers['Location'].split("?")[0] + print("目标链接: ", original_url) + try: + # 开始获取TikTok数据 + tiktok_headers = self.tiktok_headers + html = requests.get(url=original_url, headers=tiktok_headers) + res = re.search(' |') + rstr = r"[\/\\\:\*\?\"\<\>\|]" + # 将上述字符替换为下划线 + new_title = re.sub(rstr, "_", string) + # 新文件名 + filename = ('douyin.wtf_' + new_title + '_' + author_name).replace('\n', '') + return filename + + +@app.route("/api", methods=["POST", "GET"]) +def webapi(): + # 创建一个Flask应用获取POST参数并返回结果 + api = Scraper() + content = request.args.get("url") + if content != '': + post_content = find_url(content)[0] + # 将API记录在API_logs.txt中 + date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + with open('API_logs.txt', 'a') as f: + f.write(date + " : " + post_content + '\n') + try: + # 开始时间 + start = time.time() + # 校验是否为TikTok链接 + if 'tiktok.com' in post_content: + result = api.tiktok(post_content) + # 以JSON格式返回TikTok信息 + return jsonify(result) + # 如果关键字不存在则判断为抖音链接 + elif 'douyin.com' in post_content: + result = api.douyin(post_content) + # 以JSON格式返回返回Douyin信息 + return jsonify(result) + except Exception as e: + # 结束时间 + end = time.time() + # 解析时间 + analyze_time = (format((end - start), '.4f') + 's') + # 返回错误信息 + return jsonify(status='failed', reason=str(e), time=analyze_time, function='webapi()', value=content) + else: + # 返回错误信息 + return jsonify(status='failed', reason='url value cannot be empty') + + + +@app.route("/video", methods=["POST", "GET"]) +def download_video(): + # 用于返回视频下载请求(返回MP4文件下载请求,面对大量请求时非常吃服务器内存,容易崩,慎用。) + api = Scraper() + content = request.args.get("url") + post_content = find_url(content)[0] + try: + if 'douyin.com' in post_content: + # 获取视频信息 + result = api.douyin(post_content) + # 视频链接 + video_url = result['nwm_video_url'] + # 视频标题 + video_title = result['video_title'] + # 作者昵称 + video_author = result['video_author'] + # 清理文件名 + file_name = clean_filename(video_title, video_author) + elif 'tiktok.com' in post_content: + # 获取视频信息 + result = api.tiktok(post_content) + # 无水印地址 + video_url = result['nwm_video_url'] + # 视频标题 + video_title = result['video_title'] + # 作者昵称 + video_author = result['video_author'] + # 清理文件名 + file_name = clean_filename(video_title, video_author) + else: + return jsonify(Status='Failed', Reason='Check submitted parameters!') + # 获取视频文件字节流 + video_mp4 = requests.get(video_url, headers).content + # 将字节流封装成返回对象 + response = make_response(video_mp4) + # 添加响应头部信息 + response.headers['Content-Type'] = "video/mp4" + # 他妈的,费了我老大劲才解决文件中文名的问题 + try: + filename = file_name.encode('latin-1') + except UnicodeEncodeError: + filenames = { + 'filename': unicodedata.normalize('NFKD', file_name).encode('latin-1', 'ignore'), + 'filename*': "UTF-8''{}".format(url_quote(file_name) + '.mp4'), + } + else: + filenames = {'filename': file_name + '.mp4'} + # attachment表示以附件形式下载 + response.headers.set('Content-Disposition', 'attachment', **filenames) + return response + except Exception as e: + return jsonify(status='failed', reason=str(e), function='download_video()', value=content) + + +@app.route("/music", methods=["POST", "GET"]) +def download_music(): + # 用于返回视频下载请求(返回MP3文件下载请求,面对大量请求时非常吃服务器内存,容易崩,慎用。) + api = Scraper() + content = request.args.get("url") + post_content = find_url(content)[0] + try: + if 'douyin.com' in post_content: + # 获取视频信息 + result = api.douyin(post_content) + bgm_url = result['video_music'] + if bgm_url == "None": + return jsonify(Status='Failed', Reason='This link has no music to get!') + else: + # 视频标题 + bgm_title = result['video_music_title'] + # 作者昵称 + author_name = result['video_music_author'] + # 清理文件名 + file_name = clean_filename(bgm_title, author_name) + elif 'tiktok.com' in post_content: + # 获取视频信息 + result = api.douyin(post_content) + # BGM链接 + bgm_url = result['video_music'] + # 视频标题 + bgm_title = result['video_music_title'] + # 作者昵称 + author_name = result['video_music_author'] + # 清理文件名 + file_name = clean_filename(bgm_title, author_name) + else: + return jsonify(Status='Failed', Reason='This link has no music to get!') + video_bgm = requests.get(bgm_url, headers).content + # 将bgm字节流封装成response对象 + response = make_response(video_bgm) + # 添加响应头部信息 + response.headers['Content-Type'] = "video/mp3" + # 他妈的,费了我老大劲才解决文件中文名的问题 + try: + filename = file_name.encode('latin-1') + except UnicodeEncodeError: + filenames = { + 'filename': unicodedata.normalize('NFKD', file_name).encode('latin-1', 'ignore'), + 'filename*': "UTF-8''{}".format(url_quote(file_name) + '.mp3'), + } + else: + filenames = {'filename': file_name + '.mp3'} + # attachment表示以附件形式下载 + response.headers.set('Content-Disposition', 'attachment', **filenames) + return response + except Exception as e: + return jsonify(status='failed', reason=str(e), function='download_music()', value=content) + + +if __name__ == '__main__': + # 开启WebAPI + if os.environ.get('PORT'): + port = int(os.environ.get('PORT')) + else: + # 默认端口 + port = 2333 + app.run(host='0.0.0.0', port=port) \ No newline at end of file