diff --git a/web_api.py b/web_api.py index 1b02628..7fb9493 100644 --- a/web_api.py +++ b/web_api.py @@ -3,389 +3,700 @@ # @Author: https://github.com/Evil0ctal/ # @Time: 2021/11/06 # @Update: 2022/11/06 +# @Version: 3.0.0 # @Function: -# 用于在线批量解析Douyin/TikTok的无水印视频/图集。 -# 基于 PyWebIO,将scraper.py返回的内容显示在网页上。 +# 创建一个接受提交参数的Flask应用程序。 +# 将scraper.py返回的内容以JSON格式返回。 + import configparser +import json import os -import re +import threading import time +import zipfile + +import requests +import uvicorn +from fastapi import FastAPI +from fastapi.responses import ORJSONResponse, FileResponse +from pydantic import BaseModel +from starlette.responses import RedirectResponse + -from pywebio import * -from pywebio import config as pywebio_config -from pywebio.input import * -from pywebio.output import * -from pywebio.session import info as session_info from scraper import Scraper +# 读取配置文件 config = configparser.ConfigParser() config.read('config.ini', encoding='utf-8') +# 运行端口 +port = int(config["Web_API"]["Port"]) +# 域名 +domain = config["Web_API"]["Domain"] -# 创建一个Scraper类的实例/Create an instance of the Scraper class +# 创建FastAPI实例 +title = "Douyin TikTok Download API(api.douyin.wtf)" +version = '3.0.0' +update_time = "2022/10/31" +description = """ +#### Description/说明 +
+点击展开/Click to expand +> [中文/Chinese] +- 爬取Douyin以及TikTok的数据并返回,更多功能正在开发中。 +- 如果需要更多接口,请查看[https://api-v2.douyin.wtf/docs](https://api-v2.douyin.wtf/docs)。 +- 本项目开源在[GitHub:Douyin_TikTok_Download_API](https://github.com/Evil0ctal/Douyin_TikTok_Download_API)。 +- 全部端点数据均来自抖音以及TikTok的官方接口,如遇到问题或BUG或建议请在[issues](https://github.com/Evil0ctal/Douyin_TikTok_Download_API/issues)中反馈。 +- 本项目仅供学习交流使用,严禁用于违法用途,如有侵权请联系作者。 +> [英文/English] +- Crawl the data of Douyin and TikTok and return it. More features are under development. +- If you need more interfaces, please visit [https://api-v2.douyin.wtf/docs](https://api-v2.douyin.wtf/docs). +- This project is open source on [GitHub: Douyin_TikTok_Download_API](https://github.com/Evil0ctal/Douyin_TikTok_Download_API). +- All endpoint data comes from the official interface of Douyin and TikTok. If you have any questions or BUGs or suggestions, please feedback in [issues]( +- This project is for learning and communication only. It is strictly forbidden to be used for illegal purposes. If there is any infringement, please contact the author. +
+#### Contact author/联系作者 +
+点击展开/Click to expand +- WeChat: Evil0ctal +- Email: [Evil0ctal1985@gmail.com](mailto:Evil0ctal1985@gmail.com) +- Github: [https://github.com/Evil0ctal](https://github.com/Evil0ctal) +
+""" +tags_metadata = [ + { + "name": "Root", + "description": "Root path info.", + }, + { + "name": "API", + "description": "Hybrid interface, automatically determine the input link and return the simplified data/混合接口,自动判断输入链接返回精简后的数据。", + }, + { + "name": "Douyin", + "description": "All Douyin API Endpoints/所有抖音接口节点", + }, + { + "name": "TikTok", + "description": "All TikTok API Endpoints/所有TikTok接口节点", + }, + { + "name": "Download", + "description": "Enter the share link and return the download file response./输入分享链接后返回下载文件响应", + }, + { + "name": "iOS_Shortcut", + "description": "Get iOS shortcut info/获取iOS快捷指令信息", + }, +] + +# 创建Scraper对象 api = Scraper() +# 创建FastAPI实例 +app = FastAPI( + title=title, + description=description, + version=version, + openapi_tags=tags_metadata +) -# 自动检测语言返回翻译/Auto detect language to return translation -def t(zh: str, en: str) -> str: - return zh if 'zh' in session_info.user_language else en +""" ________________________⬇️端点响应模型(Endpoints Response Model)⬇️________________________""" -# 解析抖音分享口令中的链接并返回列表/Parse the link in the Douyin share command and return a list -def find_url(string: str) -> list: - url = re.findall('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', string) - return url +# API Root节点 +class APIRoot(BaseModel): + API_status: str + Version: str = version + Update_time: str = update_time + API_V1_Document: str + API_V2_Document: str + GitHub: str -# 校验输入值/Validate input value -def valid_check(input_data: str) -> str or None: - # 检索出所有链接并返回列表/Retrieve all links and return a list - url_list = find_url(input_data) - # 总共找到的链接数量/Total number of links found - total_urls = len(url_list) - if total_urls == 0: - return t('没有检测到有效的链接,请检查输入的内容是否正确。', - 'No valid link detected, please check if the input content is correct.') +# API获取视频基础模型 +class iOS_Shortcut(BaseModel): + version: str = None + update: str = None + link: str = None + link_en: str = None + note: str = None + note_en: str = None + + +# API获取视频基础模型 +class API_Video_Response(BaseModel): + status: str = None + platform: str = None + endpoint: str = None + message: str = None + total_time: float = None + aweme_list: list = None + + +# 混合解析API基础模型: +class API_Hybrid_Response(BaseModel): + status: str = None + message: str = None + endpoint: str = None + url: str = None + type: str = None + platform: str = None + aweme_id: str = None + total_time: float = None + official_api_url: dict = None + desc: str = None + create_time: int = None + author: dict = None + music: dict = None + statistics: dict = None + cover_data: dict = None + hashtags: list = None + video_data: dict = None + image_data: dict = None + + +# 混合解析API精简版基础模型: +class API_Hybrid_Minimal_Response(BaseModel): + status: str = None + message: str = None + platform: str = None + type: str = None + wm_video_url: str = None + wm_video_url_HQ: str = None + nwm_video_url: str = None + nwm_video_url_HQ: str = None + no_watermark_image_list: list or None = None + watermark_image_list: list or None = None + + +""" ________________________⬇️端点日志记录(Endpoint logs)⬇️________________________""" + + +# 记录API请求日志 +async def api_logs(start_time, input_data, endpoint, error_data: dict = None): + if config["Web_API"]["Allow_Logs"] == "True": + time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + total_time = float(format(time.time() - start_time, '.4f')) + file_name = "API_logs.json" + # 写入日志内容 + with open(file_name, "a", encoding="utf-8") as f: + data = { + "time": time_now, + "endpoint": f'/{endpoint}/', + "total_time": total_time, + "input_data": input_data, + "error_data": error_data if error_data else "No error" + } + f.write(json.dumps(data, ensure_ascii=False) + ",\n") + print('日志记录成功!') + return 1 else: - # 最大接受提交URL的数量/Maximum number of URLs accepted - max_urls = config['Web_APP']['Max_Take_URLs'] - if total_urls > int(max_urls): - warn_info = t('URL数量过多,只会处理前{}个URL。'.format(max_urls), - 'Too many URLs, only the first {} URLs will be processed.'.format(max_urls)) - return warn_info + print('日志记录已关闭!') + return 0 + + + +""" ________________________⬇️Root端点(Root endpoint)⬇️________________________""" + + +# Root端点 +@app.get("/", response_model=APIRoot, tags=["Root"]) +async def root(): + """ + Root path info. + """ + data = { + "API_status": "Running", + "Version": version, + "Update_time": update_time, + "API_V1_Document": "https://api.douyin.wtf/docs", + "API_V2_Document": "https://api-v2.douyin.wtf/docs", + "GitHub": "https://github.com/Evil0ctal/Douyin_TikTok_Download_API", + } + return ORJSONResponse(data) + + +""" ________________________⬇️混合解析端点(Hybrid parsing endpoints)⬇️________________________""" + + +# 混合解析端点,自动判断输入链接返回精简后的数据 +# Hybrid parsing endpoint, automatically determine the input link and return the simplified data. +@app.get("/api", tags=["API"], response_model=API_Hybrid_Response) +async def hybrid_parsing(url: str, minimal: bool = False): + """ + ## 用途/Usage + - 获取[抖音|TikTok]单个视频数据,参数是视频链接或分享口令。 + - Get [Douyin|TikTok] single video data, the parameter is the video link or share code. + ## 参数/Parameter + #### url(必填/Required)): + - 视频链接。| 分享口令 + - The video link.| Share code + - 例子/Example: + `https://www.douyin.com/video/7153585499477757192` + `https://v.douyin.com/MkmSwy7/` + `https://vm.tiktok.com/TTPdkQvKjP/` + `https://www.tiktok.com/@tvamii/video/7045537727743380782` + #### minimal(选填/Optional Default:False): + - 是否返回精简版数据。 + - Whether to return simplified data. + - 例子/Example: + `True` + `False` + ## 返回值/Return + - 用户当个视频数据的列表,列表内包含JSON数据。 + - List of user single video data, list contains JSON data. + """ + print("正在进行混合解析...") + # 开始时间 + start_time = time.time() + # 获取数据 + data = api.hybrid_parsing(url) + # 是否精简 + if minimal: + result = api.hybrid_parsing_minimal(data) + else: + # 更新数据 + result = { + 'url': url, + "endpoint": "/api/", + "total_time": float(format(time.time() - start_time, '.4f')), + } + # 合并数据 + result.update(data) + # 记录API调用 + await api_logs(start_time=start_time, + input_data={'url': url}, + endpoint='api') + return ORJSONResponse(result) + + +""" ________________________⬇️抖音视频解析端点(Douyin video parsing endpoint)⬇️________________________""" + + +# 获取抖音单个视频数据/Get Douyin single video data +@app.get("/douyin_video_data/", response_model=API_Video_Response, tags=["Douyin"]) +async def get_douyin_video_data(douyin_video_url: str = None, video_id: str = None): + """ + ## 用途/Usage + - 获取抖音用户单个视频数据,参数是视频链接|分享口令 + - Get the data of a single video of a Douyin user, the parameter is the video link. + ## 参数/Parameter + #### douyin_video_url(选填/Optional): + - 视频链接。| 分享口令 + - The video link.| Share code + - 例子/Example: + `https://www.douyin.com/video/7153585499477757192` + `https://v.douyin.com/MkmSwy7/` + #### video_id(选填/Optional): + - 视频ID,可以从视频链接中获取。 + - The video ID, can be obtained from the video link. + - 例子/Example: + `7153585499477757192` + #### 备注/Note: + - 参数`douyin_video_url`和`video_id`二选一即可,如果都填写,优先使用`video_id`以获得更快的响应速度。 + - The parameters `douyin_video_url` and `video_id` can be selected, if both are filled in, the `video_id` is used first to get a faster response speed. + ## 返回值/Return + - 用户当个视频数据的列表,列表内包含JSON数据。 + - List of user single video data, list contains JSON data. + """ + if video_id is None or video_id == '': + # 获取视频ID + video_id = api.get_douyin_video_id(douyin_video_url) + if video_id is None: + result = { + "status": "failed", + "platform": "douyin", + "message": "video_id获取失败/Failed to get video_id", + } + return ORJSONResponse(result) + if video_id is not None and video_id != '': + # 开始时间 + start_time = time.time() + print('获取到的video_id数据:{}'.format(video_id)) + if video_id is not None: + video_data = api.get_douyin_video_data(video_id=video_id) + if video_data is None: + result = { + "status": "failed", + "platform": "douyin", + "endpoint": "/douyin_video_data/", + "message": "视频API数据获取失败/Failed to get video API data", + } + return ORJSONResponse(result) + # print('获取到的video_data:{}'.format(video_data)) + # 记录API调用 + await api_logs(start_time=start_time, + input_data={'douyin_video_url': douyin_video_url, 'video_id': video_id}, + endpoint='douyin_video_data') + # 结束时间 + total_time = float(format(time.time() - start_time, '.4f')) + # 返回数据 + result = { + "status": "success", + "platform": "douyin", + "endpoint": "/douyin_video_data/", + "message": "获取视频数据成功/Got video data successfully", + "total_time": total_time, + "aweme_list": [video_data] + } + return ORJSONResponse(result) else: - # 对每一个链接进行校验/Verify each link - for i in url_list: - if 'douyin.com' or 'tiktok.com' in i: - return None - else: - warn_info = t('请确保输入链接均为[抖音|TikTok]链接, 请移除输入值:{}'.format(i), - 'Please make sure that the input link is a [Douyin|TikTok] link, please remove the input value: {}'.format( - i)) - return warn_info + print('获取抖音video_id失败') + result = { + "status": "failed", + "platform": "douyin", + "endpoint": "/douyin_video_data/", + "message": "获取视频ID失败/Failed to get video ID", + "total_time": 0, + "aweme_list": [] + } + return ORJSONResponse(result) -# 错误处理/Error handling -def error_do(reason: str, value: str) -> None: - # 输出一个毫无用处的信息 - put_html("
") - put_error( - t("发生了了意料之外的错误,输入值已被记录。", "An unexpected error occurred, the input value has been recorded.")) - put_html('

⚠{}

'.format(t('详情', 'Details'))) - put_table([ - [t('原因', 'reason'), t('输入值', 'input value')], - [reason, value]]) - put_markdown(t('可能的原因:', 'Possible reasons:')) - put_markdown(t('服务器可能被目标主机的防火墙限流(稍等片刻后再次尝试)', - 'The server may be limited by the target host firewall (try again after a while)')) - put_markdown(t('输入了错误的链接(API-V1暂不支持主页链接解析)', - 'Entered the wrong link (the home page link is not supported for parsing with API-V1)')) - put_markdown( - t('如果需要解析个人主页,请使用API-V2', 'If you need to parse the personal homepage, please use API-V2')) - put_markdown(t('API-V2 文档: [https://api-v2.douyin.wtf/docs](https://api-v2.douyin.wtf/docs)', - 'API-V2 Documentation: [https://api-v2.douyin.wtf/docs](https://api-v2.douyin.wtf/docs)')) - put_markdown(t('该视频已经被删除或屏蔽(你看的都是些啥(⊙_⊙)?)', - 'The video has been deleted or blocked (what are you watching (⊙_⊙)?)')) - put_markdown(t('其他原因(请联系作者)', 'Other reasons (please contact the author)')) - put_markdown(t('你可以在右上角的关于菜单中查看本站错误日志。', - 'You can view the error log of this site in the about menu in the upper right corner.')) - put_markdown('[{}](https://github.com/Evil0ctal/Douyin_TikTok_Download_API/issues)'.format( - t('点击此处在GitHub上进行反馈', 'Click here to give feedback on GitHub'))) - put_html("
") - if config['Web_APP']['Allow_Logs'] == 'True': - # 将错误记录在logs.txt中 - error_date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - print(f"{error_date}: 正在记录错误信息...") - with open('logs.txt', 'a') as f: - f.write(error_date + ":\n" + str(reason) + '\n' + "Input value: " + value + '\n') +""" ________________________⬇️TikTok视频解析端点(TikTok video parsing endpoint)⬇️________________________""" -# iOS快捷指令弹窗/IOS shortcut pop-up -def ios_pop_window(): - with popup(t("iOS快捷指令", "iOS Shortcut")): - version = config["Web_API"]["iOS_Shortcut_Version"] - update = config["Web_API"]['iOS_Shortcut_Update_Time'] - link = config["Web_API"]['iOS_Shortcut_Link'] - link_en = config["Web_API"]['iOS_Shortcut_Link_EN'] - note = config["Web_API"]['iOS_Shortcut_Update_Note'] - note_en = config["Web_API"]['iOS_Shortcut_Update_Note_EN'] - put_markdown(t('#### 📢 快捷指令介绍:', '#### 📢 Shortcut Introduction:')) - put_markdown( - t('快捷指令运行在iOS平台,本快捷指令可以快速调用本项目的公共API将抖音或TikTok的视频或图集下载到你的手机相册中,暂时只支持单个链接进行下载。', - 'The shortcut runs on the iOS platform, and this shortcut can quickly call the public API of this project to download the video or album of Douyin or TikTok to your phone album. It only supports single link download for now.')) - put_markdown(t('#### 📲 使用方法 ①:', '#### 📲 Operation method ①:')) - put_markdown(t('在抖音或TikTok的APP内,浏览你想要无水印保存的视频或图集。', - 'The shortcut needs to be used in the Douyin or TikTok app, browse the video or album you want to save without watermark.')) - put_markdown(t('然后点击右下角分享按钮,选择更多,然后下拉找到 "抖音TikTok无水印下载" 这个选项。', - 'Then click the share button in the lower right corner, select more, and then scroll down to find the "Douyin TikTok No Watermark Download" option.')) - put_markdown(t('如遇到通知询问是否允许快捷指令访问xxxx (域名或服务器),需要点击允许才可以正常使用。', - 'If you are asked whether to allow the shortcut to access xxxx (domain name or server), you need to click Allow to use it normally.')) - put_markdown(t('该快捷指令会在你相册创建一个新的相薄方便你浏览保存的内容。', - 'The shortcut will create a new album in your photo album to help you browse the saved content.')) - put_markdown(t('#### 📲 使用方法 ②:', '#### 📲 Operation method ②:')) - put_markdown(t('在抖音或TikTok的视频下方点击分享,然后点击复制链接,然后去快捷指令APP中运行该快捷指令。', - 'Click share below the video of Douyin or TikTok, then click to copy the link, then go to the shortcut command APP to run the shortcut command.')) - put_markdown(t('如果弹窗询问是否允许读取剪切板请同意,随后快捷指令将链接内容保存至相册中。', - 'if the pop-up window asks whether to allow reading the clipboard, please agree, and then the shortcut command will save the link content to the album middle.')) - put_html('
') - put_text(t(f"最新快捷指令版本: {version}", f"Latest shortcut version: {version}")) - put_text(t(f"快捷指令更新时间: {update}", f"Shortcut update time: {update}")) - put_text(t(f"快捷指令更新内容: {note}", f"Shortcut update content: {note_en}")) - put_link("[点击获取快捷指令 - 中文]", link, new_window=True) - put_html("
") - put_link("[Click get Shortcut - English]", link_en, new_window=True) +# 获取TikTok单个视频数据/Get TikTok single video data +@app.get("/tiktok_video_data/", response_class=ORJSONResponse, response_model=API_Video_Response, tags=["TikTok"]) +async def get_tiktok_video_data(tiktok_video_url: str = None, video_id: str = None): + """ + ## 用途/Usage + - 获取单个视频数据,参数是视频链接| 分享口令。 + - Get single video data, the parameter is the video link. + ## 参数/Parameter + #### tiktok_video_url(选填/Optional): + - 视频链接。| 分享口令 + - The video link.| Share code + - 例子/Example: + `https://www.tiktok.com/@evil0ctal/video/7156033831819037994` + `https://vm.tiktok.com/TTPdkQvKjP/` + #### video_id(选填/Optional): + - 视频ID,可以从视频链接中获取。 + - The video ID, can be obtained from the video link. + - 例子/Example: + `7156033831819037994` + #### 备注/Note: + - 参数`tiktok_video_url`和`video_id`二选一即可,如果都填写,优先使用`video_id`以获得更快的响应速度。 + - The parameters `tiktok_video_url` and `video_id` can be selected, if both are filled in, the `video_id` is used first to get a faster response speed. + ## 返回值/Return + - 用户当个视频数据的列表,列表内包含JSON数据。 + - List of user single video data, list contains JSON data. + """ + # 开始时间 + start_time = time.time() + if video_id is None or video_id == "": + video_id = api.get_tiktok_video_id(tiktok_video_url) + if video_id is None: + return ORJSONResponse({"status": "fail", "platform": "tiktok", "endpoint": "/tiktok_video_data/", + "message": "获取视频ID失败/Get video ID failed"}) + if video_id is not None and video_id != '': + print('开始解析单个TikTok视频数据') + video_data = api.get_tiktok_video_data(video_id) + # TikTok的API数据如果为空或者返回的数据中没有视频数据,就返回错误信息 + # If the TikTok API data is empty or there is no video data in the returned data, an error message is returned + if video_data is None or video_data.get('aweme_id') != video_id: + print('视频数据获取失败/Failed to get video data') + result = { + "status": "failed", + "platform": "tiktok", + "endpoint": "/tiktok_video_data/", + "message": "视频数据获取失败/Failed to get video data" + } + return ORJSONResponse(result) + # 记录API调用 + await api_logs(start_time=start_time, + input_data={'tiktok_video_url': tiktok_video_url, 'video_id': video_id}, + endpoint='tiktok_video_data') + # 结束时间 + total_time = float(format(time.time() - start_time, '.4f')) + # 返回数据 + result = { + "status": "success", + "platform": "tiktok", + "endpoint": "/tiktok_video_data/", + "message": "获取视频数据成功/Got video data successfully", + "total_time": total_time, + "aweme_list": [video_data] + } + return ORJSONResponse(result) + else: + print('视频链接错误/Video link error') + result = { + "status": "failed", + "platform": "tiktok", + "endpoint": "/tiktok_video_data/", + "message": "视频链接错误/Video link error" + } + return ORJSONResponse(result) -# API文档弹窗/API documentation pop-up -def api_document_pop_window(): - with popup(t("API文档", "API Document")): - put_markdown(t("💾API-V2文档", "💾API-V2 Document")) - put_markdown(t('API-V2 支持抖音和TikTok的更多接口, 如主页解析,视频解析,视频评论解析,个人点赞列表解析等...', - 'API-V2 supports more interfaces of Douyin and TikTok, such as home page parsing, video parsing, video comment parsing, personal like list parsing, etc...')) - put_link('[API-V2 Docs]', 'https://api-v2.douyin.wtf/docs', new_window=True) - put_html('
') - put_markdown(t("💽API-V1文档", "💽API-V1 Document")) - put_markdown(t("API-V1 支持抖音和TikTok的单一视频解析,具体请查看接口文档。", - "API-V1 supports single video parsing of Douyin and TikTok. For details, please refer to the API documentation.")) - put_link('[API-V1 Docs]', 'https://api.douyin.wtf/docs', new_window=True) +""" ________________________⬇️iOS快捷指令更新端点(iOS Shortcut update endpoint)⬇️________________________""" -# 日志文件弹窗/Log file pop-up -def log_popup_window(): - with popup(t('错误日志', 'Error Log')): - put_html('

⚠️{}

'.format('关于解析失败可能的原因', 'About the possible reasons for parsing failure')) - put_markdown(t('服务器可能被目标主机的防火墙限流(稍等片刻后再次尝试)', - 'The server may be limited by the target host firewall (try again after a while)')) - put_markdown(t('输入了错误的链接(API-V1暂不支持主页链接解析)', - 'Entered the wrong link (the home page link is not supported for parsing with API-V1)')) - put_markdown( - t('如果需要解析个人主页,请使用API-V2', 'If you need to parse the personal homepage, please use API-V2')) - put_markdown(t('API-V2 文档: [https://api-v2.douyin.wtf/docs](https://api-v2.douyin.wtf/docs)', - 'API-V2 Documentation: [https://api-v2.douyin.wtf/docs](https://api-v2.douyin.wtf/docs)')) - put_markdown(t('该视频已经被删除或屏蔽(你看的都是些啥(⊙_⊙)?)', - 'The video has been deleted or blocked (what are you watching (⊙_⊙)?)')) - put_markdown(t('[点击此处在GitHub上进行反馈](https://github.com/Evil0ctal/Douyin_TikTok_Download_API/issues)', - '[Click here to feedback on GitHub](https://github.com/Evil0ctal/Douyin_TikTok_Download_API/issues)')) - put_html('
') - # 判断日志文件是否存在 - if os.path.exists('logs.txt'): - put_text(t('点击logs.txt可下载日志:', 'Click logs.txt to download the log:')) - content = open(r'./logs.txt', 'rb').read() - put_file('logs.txt', content=content) - with open('./logs.txt', 'r') as f: - content = f.read() - put_text(str(content)) +@app.get("/ios", response_model=iOS_Shortcut, tags=["iOS_Shortcut"]) +async def Get_Shortcut(): + data = { + 'version': config["Web_API"]["iOS_Shortcut_Version"], + 'update': config["Web_API"]['iOS_Shortcut_Update_Time'], + 'link': config["Web_API"]['iOS_Shortcut_Link'], + 'link_en': config["Web_API"]['iOS_Shortcut_Link_EN'], + 'note': config["Web_API"]['iOS_Shortcut_Update_Note'], + 'note_en': config["Web_API"]['iOS_Shortcut_Update_Note_EN'], + } + return ORJSONResponse(data) + + +""" ________________________⬇️下载文件端点/函数(Download file endpoints/functions)⬇️________________________""" + + +# 下载文件端点/Download file endpoint +@app.get("/download", tags=["Download"]) +async def download_file_hybrid(url: str, prefix: bool = True, watermark: bool = False): + """ + ## 用途/Usage + ### [中文] + - 将[抖音|TikTok]链接作为参数提交至此端点,返回[视频|图片]文件下载请求。 + ### [English] + - Submit the [Douyin|TikTok] link as a parameter to this endpoint and return the [video|picture] file download request. + # 参数/Parameter + - url:str -> [Douyin|TikTok] [视频|图片] 链接/ [Douyin|TikTok] [video|image] link + - prefix: bool -> [True/False] 是否添加前缀/Whether to add a prefix + - watermark: bool -> [True/False] 是否添加水印/Whether to add a watermark + """ + # 是否开启此端点/Whether to enable this endpoint + if config["Web_API"]["Download_Switch"] != "True": + return ORJSONResponse({"status": "endpoint closed", + "message": "此端点已关闭请在配置文件中开启/This endpoint is closed, please enable it in the configuration file"}) + # 开始时间 + start_time = time.time() + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' + } + data = api.hybrid_parsing(url) + if data is None: + return ORJSONResponse(data) + else: + # 记录API调用 + await api_logs(start_time=start_time, + input_data={'url': url}, + endpoint='download') + url_type = data.get('type') + platform = data.get('platform') + aweme_id = data.get('aweme_id') + file_name_prefix = config["Web_API"]["File_Name_Prefix"] if prefix else '' + root_path = config["Web_API"]["Download_Path"] + # 查看目录是否存在,不存在就创建 + if not os.path.exists(root_path): + os.makedirs(root_path) + if url_type == 'video': + file_name = file_name_prefix + platform + '_' + aweme_id + '.mp4' if not watermark else file_name_prefix + platform + '_' + aweme_id + '_watermark' + '.mp4' + url = data.get('video_data').get('nwm_video_url_HQ') if not watermark else data.get('video_data').get('wm_video_url') + print('url: ', url) + file_path = root_path + "/" + file_name + print('file_path: ', file_path) + # 判断文件是否存在,存在就直接返回、 + if os.path.exists(file_path): + print('文件已存在,直接返回') + return FileResponse(path=file_path, media_type='video/mp4', filename=file_name) + else: + if platform == 'douyin': + r = requests.get(url=url, headers=headers, allow_redirects=False).headers + cdn_url = r.get('location') + r = requests.get(cdn_url).content + elif platform == 'tiktok': + r = requests.get(url=url, headers=headers).content + with open(file_path, 'wb') as f: + f.write(r) + return FileResponse(path=file_path, media_type='video/mp4', filename=file_name) + elif url_type == 'image': + url = data.get('image_data').get('no_watermark_image_list') if not watermark else data.get('image_data').get('watermark_image_list') + print('url: ', url) + zip_file_name = file_name_prefix + platform + '_' + aweme_id + '_images.zip' if not watermark else file_name_prefix + platform + '_' + aweme_id + '_images_watermark.zip' + zip_file_path = root_path + "/" + zip_file_name + print('zip_file_name: ', zip_file_name) + print('zip_file_path: ', zip_file_path) + # 判断文件是否存在,存在就直接返回、 + if os.path.exists(zip_file_path): + print('文件已存在,直接返回') + return FileResponse(path=zip_file_path, media_type='zip', filename=zip_file_name) + file_path_list = [] + for i in url: + r = requests.get(url=i, headers=headers) + content_type = r.headers.get('content-type') + file_format = content_type.split('/')[1] + r = r.content + index = int(url.index(i)) + file_name = file_name_prefix + platform + '_' + aweme_id + '_' + str(index + 1) + '.' + file_format if not watermark else \ + file_name_prefix + platform + '_' + aweme_id + '_' + str(index + 1) + '_watermark' + '.' + file_format + file_path = root_path + "/" + file_name + file_path_list.append(file_path) + print('file_path: ', file_path) + with open(file_path, 'wb') as f: + f.write(r) + if len(url) == len(file_path_list): + zip_file = zipfile.ZipFile(zip_file_path, 'w') + for f in file_path_list: + zip_file.write(os.path.join(f), f, zipfile.ZIP_DEFLATED) + zip_file.close() + return FileResponse(path=zip_file_path, media_type='zip', filename=zip_file_name) else: - put_text(t('日志文件不存在,请等发生错误时再回来看看。', - 'The log file does not exist, please come back and take a look when an error occurs.')) + return ORJSONResponse(data) -# 关于弹窗/About pop-up -def about_popup_window(): - with popup(t('更多信息', 'More Information')): - put_html('

👀{}

'.format(t('访问记录', 'Visit Record'))) - put_image('https://views.whatilearened.today/views/github/evil0ctal/TikTokDownload_PyWebIO.svg', - title='访问记录') - put_html('
') - put_html('

⭐Github

') - put_markdown('[Douyin_TikTok_Download_API](https://github.com/Evil0ctal/Douyin_TikTok_Download_API)') - put_html('
') - put_html('

🎯{}

'.format(t('反馈', 'Feedback'))) - put_markdown('{}:[issues](https://github.com/Evil0ctal/Douyin_TikTok_Download_API/issues)'.format( - t('Bug反馈', 'Bug Feedback'))) - put_html('
') - put_html('

💖WeChat

') - put_markdown('WeChat:[Evil0ctal](https://mycyberpunk.com/)') - put_html('
') +# 批量下载文件端点/Batch download file endpoint +@app.get("/batch_download", tags=["Download"]) +async def batch_download_file(url_list: str, prefix: bool = True): + """ + 批量下载文件端点/Batch download file endpoint + 未完工/Unfinished + """ + print('url_list: ', url_list) + return ORJSONResponse({"status": "failed", + "message": "嘿嘿嘿,这个功能还没做呢,等我有空再做吧/Hehehe, this function hasn't been done yet, I'll do it when I have time"}) -# 网站标题/Website title -title = config['Web_APP']['Web_Title'] - -# 网站描述/Website description -description = config['Web_APP']['Web_Description'] +# 抖音链接格式下载端点(video)/Douyin link format download endpoint(video) +@app.get("/video/{aweme_id}", tags=["Download"]) +async def download_douyin_video(aweme_id: str, prefix: bool = True, watermark: bool = False): + """ + ## 用途/Usage + ### [中文] + - 将抖音域名改为当前服务器域名即可调用此端点,返回[视频|图片]文件下载请求。 + - 例如原链接:https://douyin.com/video/1234567890123456789 改成 https://api.douyin.wtf/video/1234567890123456789 即可调用此端点。 + ### [English] + - Change the Douyin domain name to the current server domain name to call this endpoint and return the video file download request. + - For example, the original link: https://douyin.com/video/1234567890123456789 becomes https://api.douyin.wtf/video/1234567890123456789 to call this endpoint. + # 参数/Parameter + - aweme_id:str -> 抖音视频ID/Douyin video ID + - prefix: bool -> [True/False] 是否添加前缀/Whether to add a prefix + - watermark: bool -> [True/False] 是否添加水印/Whether to add a watermark + """ + # 是否开启此端点/Whether to enable this endpoint + if config["Web_API"]["Download_Switch"] != "True": + return ORJSONResponse({"status": "endpoint closed", + "message": "此端点已关闭请在配置文件中开启/This endpoint is closed, please enable it in the configuration file"}) + video_url = f"https://www.douyin.com/video/{aweme_id}" + download_url = f"{domain}/download?url={video_url}&prefix={prefix}&watermark={watermark}" + return RedirectResponse(download_url) -# 网站设置/Website settings -# web_config = pywebio_config(title=title, description=description) -# 程序入口/Main interface -@pywebio_config(title=title, description=description, theme='minty') -def main(): - # 关键字信息 - keywords = config['Web_APP']['Keywords'] - # 设置favicon - favicon_url = "https://raw.githubusercontent.com/Evil0ctal/Douyin_TikTok_Download_API/main/favicon/android-chrome-512x512.png" - session.run_js(""" - $('head').append(''); - $('#favicon32,#favicon16').remove(); - $('head').append('') - """ % favicon_url) - # 设置Keywords - session.run_js(""" - $('head').append('') - """.format(keywords)) - # 修改footer - session.run_js("""$('footer').remove()""") - # 网站标题/Website title - title = t(config['Web_APP']['Web_Title'], config['Web_APP']['Web_Title_English']) - put_markdown("""
😼{}
""".format(title)) - put_html('
') - put_row( - [put_button(t("iOS快捷指令", 'iOS Shortcuts'), onclick=lambda: ios_pop_window(), link_style=True, small=True), - put_button("API", onclick=lambda: api_document_pop_window(), link_style=True, small=True), - put_button(t("日志", "Log"), onclick=lambda: log_popup_window(), link_style=True, small=True), - put_button(t("关于", 'About'), onclick=lambda: about_popup_window(), link_style=True, small=True) - ]) - placeholder = t( - "批量解析请直接粘贴多个口令或链接,无需使用符号分开,支持抖音和TikTok链接混合,暂时不支持作者主页链接批量解析。", - "Batch parsing, please paste multiple passwords or links directly, no need to use symbols to separate, support for mixing Douyin and TikTok links, temporarily not support for author home page link batch parsing.") - input_data = textarea(t('请将抖音或TikTok的分享口令或网址粘贴于此', - "Please paste the share code or URL of [Douyin|TikTok] here"), - type=TEXT, - validate=valid_check, required=True, - placeholder=placeholder, - position=0) - url_lists = find_url(input_data) - # 解析开始时间 - start = time.time() - # 成功/失败统计 - success_count = 0 - failed_count = 0 - # 解析成功的url - success_list = [] - # 解析失败的url - failed_list = [] - # 输出一个提示条 - with use_scope('loading_text'): - # 输出一个分行符 - put_row([put_html('
')]) - put_warning(t('Server酱正收到你输入的链接啦!(◍•ᴗ•◍)\n请稍等片刻...', - 'ServerChan is receiving your input link! (◍•ᴗ•◍)\nPlease wait a moment...')) - # 遍历链接列表 - for url in url_lists: - # 链接编号 - url_index = url_lists.index(url) + 1 - # 解析 - data = api.hybrid_parsing(video_url=url) - # 判断是否解析成功/失败 - status = True if data.get('status') == 'success' else False - # 如果解析成功 - if status: - # 创建一个视频/图集的公有变量 - url_type = t('视频', 'Video') if data.get('type') == 'video' else t('图片', 'Image') - platform = data.get('platform') - table_list = [[t('类型', 'type'), t('内容', 'content')], - [t('解析类型', 'Type'), url_type], - [t('平台', 'Platform'), platform], - [f'{url_type} ID', data.get('aweme_id')], - [t(f'{url_type}描述', 'Description'), data.get('desc')], - [t('作者昵称', 'Author nickname'), data.get('author').get('nickname')], - [t('作者ID', 'Author ID'), data.get('author').get('unique_id')], - [t('API链接', 'API URL'), put_link(t('点击查看', 'Click to view'), - f"{config['Web_API']['Domain']}/api?url={url}&minimal=false", - new_window=True)], - [t('API链接-精简', 'API URL-Minimal'), put_link(t('点击查看', 'Click to view'), - f"{config['Web_API']['Domain']}/api?url={url}&minimal=true", - new_window=True)] - ] - # 如果是视频/If it's video - if url_type == t('视频', 'Video'): - # 添加视频信息 - table_list.insert(4, [t('视频链接-水印', 'Video URL-Watermark'), - put_link(t('点击查看', 'Click to view'), - data.get('video_data').get('wm_video_url_HQ'), new_window=True)]) - table_list.insert(5, [t('视频链接-无水印', 'Video URL-No Watermark'), - put_link(t('点击查看', 'Click to view'), - data.get('video_data').get('nwm_video_url_HQ'), new_window=True)]) - table_list.insert(6, [t('视频下载-水印', 'Video Download-Watermark'), - put_link(t('点击下载', 'Click to download'), - f"{config['Web_API']['Domain']}/download?url={url}&prefix=true&watermark=true", - new_window=True)]) - table_list.insert(6, [t('视频下载-无水印', 'Video Download-No-Watermark'), - put_link(t('点击下载', 'Click to download'), - f"{config['Web_API']['Domain']}/download?url={url}&prefix=true&watermark=false", - new_window=True)]) - # 如果是图片/If it's image - elif url_type == t('图片', 'Image'): - # 添加图片下载链接 - table_list.insert(4, [t('图片打包下载-水印', 'Download images ZIP-Watermark'), - put_link(t('点击下载', 'Click to download'), - f"{config['Web_API']['Domain']}/download?url={url}&prefix=true&watermark=true", - new_window=True)]) - table_list.insert(5, [t('图片打包下载-无水印', 'Download images ZIP-No-Watermark'), - put_link(t('点击下载', 'Click to download'), - f"{config['Web_API']['Domain']}/download?url={url}&prefix=true&watermark=false", - new_window=True)]) - # 添加图片信息 - no_watermark_image_list = data.get('image_data').get('no_watermark_image_list') - for image in no_watermark_image_list: - table_list.append([t('图片直链: ', 'Image URL:'), - put_link(t('点击打开图片', 'Click to open image'), image, new_window=True)]) - table_list.append([t('图片预览(如格式可显示): ', 'Image preview (if the format can be displayed):'), - put_image(image, width='50%', height='50%')]) - # 向网页输出表格/Put table on web page - with use_scope(str(url_index)): - # 显示进度 - put_info(t(f'正在解析第{url_index}个链接: {url}', f'Parsing the {url_index}th link: {url}')) - put_table(table_list) - put_html('
') - scroll_to(str(url_index)) - success_count += 1 - success_list.append(url) - # print(f'success_count: {success_count}, success_list: {success_list}') - # 如果解析失败/Failed to parse +# 抖音链接格式下载端点(video)/Douyin link format download endpoint(video) +@app.get("/note/{aweme_id}", tags=["Download"]) +async def download_douyin_video(aweme_id: str, prefix: bool = True, watermark: bool = False): + """ + ## 用途/Usage + ### [中文] + - 将抖音域名改为当前服务器域名即可调用此端点,返回[视频|图片]文件下载请求。 + - 例如原链接:https://douyin.com/video/1234567890123456789 改成 https://api.douyin.wtf/video/1234567890123456789 即可调用此端点。 + ### [English] + - Change the Douyin domain name to the current server domain name to call this endpoint and return the video file download request. + - For example, the original link: https://douyin.com/video/1234567890123456789 becomes https://api.douyin.wtf/video/1234567890123456789 to call this endpoint. + # 参数/Parameter + - aweme_id:str -> 抖音视频ID/Douyin video ID + - prefix: bool -> [True/False] 是否添加前缀/Whether to add a prefix + - watermark: bool -> [True/False] 是否添加水印/Whether to add a watermark + """ + # 是否开启此端点/Whether to enable this endpoint + if config["Web_API"]["Download_Switch"] != "True": + return ORJSONResponse({"status": "endpoint closed", + "message": "此端点已关闭请在配置文件中开启/This endpoint is closed, please enable it in the configuration file"}) + video_url = f"https://www.douyin.com/video/{aweme_id}" + download_url = f"{domain}/download?url={video_url}&prefix={prefix}&watermark={watermark}" + return RedirectResponse(download_url) + + +# 抖音链接格式下载端点/Douyin link format download endpoint +@app.get("/discover", tags=["Download"]) +async def download_douyin_discover(modal_id: str, prefix: bool = True, watermark: bool = False): + """ + ## 用途/Usage + ### [中文] + - 将抖音域名改为当前服务器域名即可调用此端点,返回[视频|图片]文件下载请求。 + - 例如原链接:https://www.douyin.com/discover?modal_id=1234567890123456789 改成 https://api.douyin.wtf/discover?modal_id=1234567890123456789 即可调用此端点。 + ### [English] + - Change the Douyin domain name to the current server domain name to call this endpoint and return the video file download request. + - For example, the original link: https://douyin.com/discover?modal_id=1234567890123456789 becomes https://api.douyin.wtf/discover?modal_id=1234567890123456789 to call this endpoint. + # 参数/Parameter + - modal_id: str -> 抖音视频ID/Douyin video ID + - prefix: bool -> [True/False] 是否添加前缀/Whether to add a prefix + - watermark: bool -> [True/False] 是否添加水印/Whether to add a watermark + """ + # 是否开启此端点/Whether to enable this endpoint + if config["Web_API"]["Download_Switch"] != "True": + return ORJSONResponse({"status": "endpoint closed", + "message": "此端点已关闭请在配置文件中开启/This endpoint is closed, please enable it in the configuration file"}) + video_url = f"https://www.douyin.com/discover?modal_id={modal_id}" + download_url = f"{domain}/download?url={video_url}&prefix={prefix}&watermark={watermark}" + return RedirectResponse(download_url) + + +# Tiktok链接格式下载端点(video)/Tiktok link format download endpoint(video) +@app.get("/{user_id}/video/{aweme_id}", tags=["Download"]) +async def download_tiktok_video(user_id: str, aweme_id: str, prefix: bool = True, watermark: bool = False): + """ + ## 用途/Usage + ### [中文] + - 将TikTok域名改为当前服务器域名即可调用此端点,返回[视频|图片]文件下载请求。 + - 例如原链接:https://www.tiktok.com/@evil0ctal/video/7156033831819037994 改成 https://api.douyin.wtf/@evil0ctal/video/7156033831819037994 即可调用此端点。 + ### [English] + - Change the TikTok domain name to the current server domain name to call this endpoint and return the video file download request. + - For example, the original link: https://www.tiktok.com/@evil0ctal/video/7156033831819037994 becomes https://api.douyin.wtf/@evil0ctal/video/7156033831819037994 to call this endpoint. + # 参数/Parameter + - user_id: str -> TikTok用户ID/TikTok user ID + - aweme_id: str -> TikTok视频ID/TikTok video ID + - prefix: bool -> [True/False] 是否添加前缀/Whether to add a prefix + - watermark: bool -> [True/False] 是否添加水印/Whether to add a watermark + """ + # 是否开启此端点/Whether to enable this endpoint + if config["Web_API"]["Download_Switch"] != "True": + return ORJSONResponse({"status": "endpoint closed", + "message": "此端点已关闭请在配置文件中开启/This endpoint is closed, please enable it in the configuration file"}) + video_url = f"https://www.tiktok.com/{user_id}/video/{aweme_id}" + download_url = f"{domain}/download?url={video_url}&prefix={prefix}&watermark={watermark}" + return RedirectResponse(download_url) + + +# 定期清理[Download_Path]文件夹 +# Periodically clean the [Download_Path] folder +def cleanup_path(): + while True: + root_path = config["Web_API"]["Download_Path"] + timer = int(config["Web_API"]["Download_Path_Clean_Timer"]) + # 查看目录是否存在,不存在就跳过 + if os.path.exists(root_path): + time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + print(f"{time_now}: Cleaning up the download folder...") + for file in os.listdir("./download"): + file_path = os.path.join("./download", file) + try: + if os.path.isfile(file_path): + os.remove(file_path) + except Exception as e: + print(e) else: - failed_count += 1 - failed_list.append(url) - # print(f'failed_count: {failed_count}, failed_list: {failed_list}') - error_msg = data.get('message').split('/') - error_msg = t(error_msg[0], error_msg[1]) - with use_scope(str(url_index)): - error_do(reason=error_msg, value=url) - scroll_to(str(url_index)) - # 全部解析完成跳出for循环/All parsing completed, break out of for loop - with use_scope('result'): - # 清除进度条 - clear('loading_text') - # 滚动至result - scroll_to('result') - # for循环结束,向网页输出成功提醒 - put_success(t('解析完成啦 ♪(・ω・)ノ\n请查看以下统计信息,如果觉得有用的话请在GitHub上帮我点一个Star吧!', - 'Parsing completed ♪(・ω・)ノ\nPlease check the following statistics, and if you think it\'s useful, please help me click a Star on GitHub!')) - # 将成功,失败以及总数量显示出来并且显示为代码方便复制 - put_markdown( - f'**{t("成功", "Success")}:** {success_count} **{t("失败", "Failed")}:** {failed_count} **{t("总数量", "Total")}:** {success_count + failed_count}') - # 成功列表 - if success_count > 0: - put_markdown(f'**{t("成功列表", "Success list")}:**') - put_code('\n'.join(success_list)) - # 失败列表 - if failed_count > 0: - put_markdown(f'**{t("失败列表", "Failed list")}:**') - put_code('\n'.join(failed_list)) - # 将url_lists显示为代码方便复制 - put_text(t('以下是您输入的所有链接', 'The following are all the links you entered')) - put_code('\n'.join(url_lists)) - # 解析结束时间 - end = time.time() - # 计算耗时,保留两位小数 - time_consuming = round(end - start, 2) - # 显示耗时 - put_markdown(f"**{t('耗时', 'Time consuming')}:** {time_consuming}s") - # 放置一个按钮,点击后跳转到顶部 - put_button(t('回到顶部', 'Back to top'), onclick=lambda: scroll_to('1'), color='success', outline=True) - # 返回主页链接 - put_link(t('再来一波 (つ´ω`)つ', 'Another wave (つ´ω`)つ'), '/') + time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + print(f"{time_now}: The download folder does not exist, skipping...") + time.sleep(timer) + + +""" ________________________⬇️项目启动执行函数(Project start execution function)⬇️________________________""" + + +# 程序启动后执行/Execute after program startup +@app.on_event("startup") +async def startup_event(): + # 创建一个清理下载目录定时器线程并启动 + # Create a timer thread to clean up the download directory and start it + download_path_clean_switches = True if config["Web_API"]["Download_Path_Clean_Switch"] == "True" else False + if download_path_clean_switches: + # 启动清理线程/Start cleaning thread + thread_1 = threading.Thread(target=cleanup_path) + thread_1.start() if __name__ == '__main__': - # 获取空闲端口 - if os.environ.get('PORT'): - port = int(os.environ.get('PORT')) - else: - # 在这里修改默认端口(记得在防火墙放行该端口) - port = int(config['Web_APP']['Port']) - # 判断是否使用CDN加载前端资源 - cdn = True if config['Web_APP']['PyWebIO_CDN'] == 'True' else False - # 启动Web服务\Start Web service - start_server(main, port=port, debug=False, cdn=cdn) + # 建议使用gunicorn启动,使用uvicorn启动时请将debug设置为False + # It is recommended to use gunicorn to start, when using uvicorn to start, please set debug to False + # uvicorn web_api:app --host '0.0.0.0' --port 8000 --reload + uvicorn.run("web_api:app", host='0.0.0.0', port=port, reload=True, access_log=False)