#!/usr/bin/env python # -*- encoding: utf-8 -*- # @Author: https://github.com/Evil0ctal/ # @Time: 2021/11/06 # @Update: 2022/12/22 # @Version: 3.1.1 # @Function: # 创建一个接受提交参数的FastAPi应用程序。 # 将scraper.py返回的内容以JSON格式返回。 import os import time import json import aiohttp import uvicorn import zipfile import threading import configparser from fastapi import FastAPI, Request from fastapi.responses import ORJSONResponse, FileResponse from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.errors import RateLimitExceeded from slowapi.util import get_remote_address from pydantic import BaseModel from starlette.responses import RedirectResponse from scraper import Scraper # 读取配置文件 config = configparser.ConfigParser() config.read('config.ini', encoding='utf-8') # 运行端口 port = int(config["Web_API"]["Port"]) # 域名 domain = config["Web_API"]["Domain"] # 限制器/Limiter Rate_Limit = config["Web_API"]["Rate_Limit"] # 创建FastAPI实例 title = "Douyin TikTok Download/Scraper API-V1" version = '3.1.2' update_time = "2022/12/25" description = """ #### Description/说明
点击展开/Click to expand > [中文/Chinese] - 爬取Douyin以及TikTok的数据并返回,更多功能正在开发中。 - 如果需要更多接口,请查看[https://api.tikhub.io/docs](https://api.tikhub.io/docs)。 - 本项目开源在[GitHub:Douyin_TikTok_Download_API](https://github.com/Evil0ctal/Douyin_TikTok_Download_API)。 - 全部端点数据均来自抖音以及TikTok的官方接口,如遇到问题或BUG或建议请在[issues](https://github.com/Evil0ctal/Douyin_TikTok_Download_API/issues)中反馈。 - 本项目仅供学习交流使用,严禁用于违法用途,如有侵权请联系作者。 > [英文/English] - Crawl the data of Douyin and TikTok and return it. More features are under development. - If you need more interfaces, please visit [https://api.tikhub.io/docs](https://api.tikhub.io/docs). - This project is open source on [GitHub: Douyin_TikTok_Download_API](https://github.com/Evil0ctal/Douyin_TikTok_Download_API). - All endpoint data comes from the official interface of Douyin and TikTok. If you have any questions or BUGs or suggestions, please feedback in [issues]( - This project is for learning and communication only. It is strictly forbidden to be used for illegal purposes. If there is any infringement, please contact the author.
#### Contact author/联系作者
点击展开/Click to expand - WeChat: Evil0ctal - Email: [Evil0ctal1985@gmail.com](mailto:Evil0ctal1985@gmail.com) - Github: [https://github.com/Evil0ctal](https://github.com/Evil0ctal)
""" tags_metadata = [ { "name": "Root", "description": "Root path info.", }, { "name": "API", "description": "Hybrid interface, automatically determine the input link and return the simplified data/混合接口,自动判断输入链接返回精简后的数据。", }, { "name": "Douyin", "description": "All Douyin API Endpoints/所有抖音接口节点", }, { "name": "TikTok", "description": "All TikTok API Endpoints/所有TikTok接口节点", }, { "name": "Download", "description": "Enter the share link and return the download file response./输入分享链接后返回下载文件响应", }, { "name": "iOS_Shortcut", "description": "Get iOS shortcut info/获取iOS快捷指令信息", }, ] # 创建Scraper对象 api = Scraper() # 创建FastAPI实例 app = FastAPI( title=title, description=description, version=version, openapi_tags=tags_metadata ) # 创建Limiter对象 limiter = Limiter(key_func=get_remote_address) app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) """ ________________________⬇️端点响应模型(Endpoints Response Model)⬇️________________________""" # API Root节点 class APIRoot(BaseModel): API_status: str Version: str = version Update_time: str = update_time Request_Rate_Limit: str = Rate_Limit Web_APP: str API_V1_Document: str TikHub_API_Document: str GitHub: str # API获取视频基础模型 class iOS_Shortcut(BaseModel): version: str = None update: str = None link: str = None link_en: str = None note: str = None note_en: str = None # API获取视频基础模型 class API_Video_Response(BaseModel): status: str = None platform: str = None endpoint: str = None message: str = None total_time: float = None aweme_list: list = None # 混合解析API基础模型: class API_Hybrid_Response(BaseModel): status: str = None message: str = None endpoint: str = None url: str = None type: str = None platform: str = None aweme_id: str = None total_time: float = None official_api_url: dict = None desc: str = None create_time: int = None author: dict = None music: dict = None statistics: dict = None cover_data: dict = None hashtags: list = None video_data: dict = None image_data: dict = None # 混合解析API精简版基础模型: class API_Hybrid_Minimal_Response(BaseModel): status: str = None message: str = None platform: str = None type: str = None wm_video_url: str = None wm_video_url_HQ: str = None nwm_video_url: str = None nwm_video_url_HQ: str = None no_watermark_image_list: list or None = None watermark_image_list: list or None = None """ ________________________⬇️端点日志记录(Endpoint logs)⬇️________________________""" # 记录API请求日志 async def api_logs(start_time, input_data, endpoint, error_data: dict = None): if config["Web_API"]["Allow_Logs"] == "True": time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) total_time = float(format(time.time() - start_time, '.4f')) file_name = "API_logs.json" # 写入日志内容 with open(file_name, "a", encoding="utf-8") as f: data = { "time": time_now, "endpoint": f'/{endpoint}/', "total_time": total_time, "input_data": input_data, "error_data": error_data if error_data else "No error" } f.write(json.dumps(data, ensure_ascii=False) + ",\n") print('日志记录成功!') return 1 else: print('日志记录已关闭!') return 0 """ ________________________⬇️Root端点(Root endpoint)⬇️________________________""" # Root端点 @app.get("/", response_model=APIRoot, tags=["Root"]) async def root(): """ Root path info. """ data = { "API_status": "Running", "Version": version, "Update_time": update_time, "Request_Rate_Limit": Rate_Limit, "Web_APP": "https://www.douyin.wtf/", "API_V1_Document": "https://api.douyin.wtf/docs", "TikHub_API_Document": "https://api.tikhub.io/docs", "GitHub": "https://github.com/Evil0ctal/Douyin_TikTok_Download_API", } return ORJSONResponse(data) """ ________________________⬇️混合解析端点(Hybrid parsing endpoints)⬇️________________________""" # 混合解析端点,自动判断输入链接返回精简后的数据 # Hybrid parsing endpoint, automatically determine the input link and return the simplified data. @app.get("/api", tags=["API"], response_model=API_Hybrid_Response) @limiter.limit(Rate_Limit) async def hybrid_parsing(request: Request, url: str, minimal: bool = False): """ ## 用途/Usage - 获取[抖音|TikTok]单个视频数据,参数是视频链接或分享口令。 - Get [Douyin|TikTok] single video data, the parameter is the video link or share code. ## 参数/Parameter #### url(必填/Required)): - 视频链接。| 分享口令 - The video link.| Share code - 例子/Example: `https://www.douyin.com/video/7153585499477757192` `https://v.douyin.com/MkmSwy7/` `https://vm.tiktok.com/TTPdkQvKjP/` `https://www.tiktok.com/@tvamii/video/7045537727743380782` #### minimal(选填/Optional Default:False): - 是否返回精简版数据。 - Whether to return simplified data. - 例子/Example: `True` `False` ## 返回值/Return - 用户当个视频数据的列表,列表内包含JSON数据。 - List of user single video data, list contains JSON data. """ print("正在进行混合解析...") # 开始时间 start_time = time.time() # 获取数据 data = await api.hybrid_parsing(url) # 是否精简 if minimal: result = api.hybrid_parsing_minimal(data) else: # 更新数据 result = { 'url': url, "endpoint": "/api/", "total_time": float(format(time.time() - start_time, '.4f')), } # 合并数据 result.update(data) # 记录API调用 await api_logs(start_time=start_time, input_data={'url': url}, endpoint='api') return ORJSONResponse(result) """ ________________________⬇️抖音视频解析端点(Douyin video parsing endpoint)⬇️________________________""" # 获取抖音单个视频数据/Get Douyin single video data @app.get("/douyin_video_data/", response_model=API_Video_Response, tags=["Douyin"]) @limiter.limit(Rate_Limit) async def get_douyin_video_data(request: Request, douyin_video_url: str = None, video_id: str = None): """ ## 用途/Usage - 获取抖音用户单个视频数据,参数是视频链接|分享口令 - Get the data of a single video of a Douyin user, the parameter is the video link. ## 参数/Parameter #### douyin_video_url(选填/Optional): - 视频链接。| 分享口令 - The video link.| Share code - 例子/Example: `https://www.douyin.com/video/7153585499477757192` `https://v.douyin.com/MkmSwy7/` #### video_id(选填/Optional): - 视频ID,可以从视频链接中获取。 - The video ID, can be obtained from the video link. - 例子/Example: `7153585499477757192` #### 备注/Note: - 参数`douyin_video_url`和`video_id`二选一即可,如果都填写,优先使用`video_id`以获得更快的响应速度。 - The parameters `douyin_video_url` and `video_id` can be selected, if both are filled in, the `video_id` is used first to get a faster response speed. ## 返回值/Return - 用户当个视频数据的列表,列表内包含JSON数据。 - List of user single video data, list contains JSON data. """ if video_id is None or video_id == '': # 获取视频ID video_id = await api.get_douyin_video_id(douyin_video_url) if video_id is None: result = { "status": "failed", "platform": "douyin", "message": "video_id获取失败/Failed to get video_id", } return ORJSONResponse(result) if video_id is not None and video_id != '': # 开始时间 start_time = time.time() print('获取到的video_id数据:{}'.format(video_id)) if video_id is not None: video_data = await api.get_douyin_video_data(video_id=video_id) if video_data is None: result = { "status": "failed", "platform": "douyin", "endpoint": "/douyin_video_data/", "message": "视频API数据获取失败/Failed to get video API data", } return ORJSONResponse(result) # print('获取到的video_data:{}'.format(video_data)) # 记录API调用 await api_logs(start_time=start_time, input_data={'douyin_video_url': douyin_video_url, 'video_id': video_id}, endpoint='douyin_video_data') # 结束时间 total_time = float(format(time.time() - start_time, '.4f')) # 返回数据 result = { "status": "success", "platform": "douyin", "endpoint": "/douyin_video_data/", "message": "获取视频数据成功/Got video data successfully", "total_time": total_time, "aweme_list": [video_data] } return ORJSONResponse(result) else: print('获取抖音video_id失败') result = { "status": "failed", "platform": "douyin", "endpoint": "/douyin_video_data/", "message": "获取视频ID失败/Failed to get video ID", "total_time": 0, "aweme_list": [] } return ORJSONResponse(result) @app.get("/douyin_live_video_data/", response_model=API_Video_Response, tags=["Douyin"]) @limiter.limit(Rate_Limit) async def get_douyin_live_video_data(request: Request, douyin_live_video_url: str = None, web_rid: str = None): if web_rid is None or web_rid == '': # 获取视频ID web_rid = await api.get_douyin_video_id(douyin_live_video_url) if web_rid is None: result = { "status": "failed", "platform": "douyin", "message": "web_rid获取失败/Failed to get web_rid", } return ORJSONResponse(result) if web_rid is not None and web_rid != '': # 开始时间 start_time = time.time() print('获取到的web_rid:{}'.format(web_rid)) if web_rid is not None: video_data = await api.get_douyin_live_video_data(web_rid=web_rid) if video_data is None: result = { "status": "failed", "platform": "douyin", "endpoint": "/douyin_live_video_data/", "message": "直播视频API数据获取失败/Failed to get live video API data", } return ORJSONResponse(result) # print('获取到的video_data:{}'.format(video_data)) # 记录API调用 await api_logs(start_time=start_time, input_data={'douyin_video_url': douyin_live_video_url, 'web_rid': web_rid}, endpoint='douyin_live_video_data') # 结束时间 total_time = float(format(time.time() - start_time, '.4f')) # 返回数据 result = { "status": "success", "platform": "douyin", "endpoint": "/douyin_live_video_data/", "message": "获取直播视频数据成功/Got live video data successfully", "total_time": total_time, "aweme_list": [video_data] } return ORJSONResponse(result) else: print('获取抖音video_id失败') result = { "status": "failed", "platform": "douyin", "endpoint": "/douyin_live_video_data/", "message": "获取直播视频ID失败/Failed to get live video ID", "total_time": 0, "aweme_list": [] } return ORJSONResponse(result) """ ________________________⬇️TikTok视频解析端点(TikTok video parsing endpoint)⬇️________________________""" # 获取TikTok单个视频数据/Get TikTok single video data @app.get("/tiktok_video_data/", response_class=ORJSONResponse, response_model=API_Video_Response, tags=["TikTok"]) @limiter.limit(Rate_Limit) async def get_tiktok_video_data(request: Request, tiktok_video_url: str = None, video_id: str = None): """ ## 用途/Usage - 获取单个视频数据,参数是视频链接| 分享口令。 - Get single video data, the parameter is the video link. ## 参数/Parameter #### tiktok_video_url(选填/Optional): - 视频链接。| 分享口令 - The video link.| Share code - 例子/Example: `https://www.tiktok.com/@evil0ctal/video/7156033831819037994` `https://vm.tiktok.com/TTPdkQvKjP/` #### video_id(选填/Optional): - 视频ID,可以从视频链接中获取。 - The video ID, can be obtained from the video link. - 例子/Example: `7156033831819037994` #### 备注/Note: - 参数`tiktok_video_url`和`video_id`二选一即可,如果都填写,优先使用`video_id`以获得更快的响应速度。 - The parameters `tiktok_video_url` and `video_id` can be selected, if both are filled in, the `video_id` is used first to get a faster response speed. ## 返回值/Return - 用户当个视频数据的列表,列表内包含JSON数据。 - List of user single video data, list contains JSON data. """ # 开始时间 start_time = time.time() if video_id is None or video_id == "": video_id = await api.get_tiktok_video_id(tiktok_video_url) if video_id is None: return ORJSONResponse({"status": "fail", "platform": "tiktok", "endpoint": "/tiktok_video_data/", "message": "获取视频ID失败/Get video ID failed"}) if video_id is not None and video_id != '': print('开始解析单个TikTok视频数据') video_data = await api.get_tiktok_video_data(video_id) # TikTok的API数据如果为空或者返回的数据中没有视频数据,就返回错误信息 # If the TikTok API data is empty or there is no video data in the returned data, an error message is returned if video_data is None or video_data.get('aweme_id') != video_id: print('视频数据获取失败/Failed to get video data') result = { "status": "failed", "platform": "tiktok", "endpoint": "/tiktok_video_data/", "message": "视频数据获取失败/Failed to get video data" } return ORJSONResponse(result) # 记录API调用 await api_logs(start_time=start_time, input_data={'tiktok_video_url': tiktok_video_url, 'video_id': video_id}, endpoint='tiktok_video_data') # 结束时间 total_time = float(format(time.time() - start_time, '.4f')) # 返回数据 result = { "status": "success", "platform": "tiktok", "endpoint": "/tiktok_video_data/", "message": "获取视频数据成功/Got video data successfully", "total_time": total_time, "aweme_list": [video_data] } return ORJSONResponse(result) else: print('视频链接错误/Video link error') result = { "status": "failed", "platform": "tiktok", "endpoint": "/tiktok_video_data/", "message": "视频链接错误/Video link error" } return ORJSONResponse(result) """ ________________________⬇️iOS快捷指令更新端点(iOS Shortcut update endpoint)⬇️________________________""" @app.get("/ios", response_model=iOS_Shortcut, tags=["iOS_Shortcut"]) async def Get_Shortcut(): data = { 'version': config["Web_API"]["iOS_Shortcut_Version"], 'update': config["Web_API"]['iOS_Shortcut_Update_Time'], 'link': config["Web_API"]['iOS_Shortcut_Link'], 'link_en': config["Web_API"]['iOS_Shortcut_Link_EN'], 'note': config["Web_API"]['iOS_Shortcut_Update_Note'], 'note_en': config["Web_API"]['iOS_Shortcut_Update_Note_EN'], } return ORJSONResponse(data) """ ________________________⬇️下载文件端点/函数(Download file endpoints/functions)⬇️________________________""" # 下载文件端点/Download file endpoint @app.get("/download", tags=["Download"]) @limiter.limit(Rate_Limit) async def download_file_hybrid(request: Request, url: str, prefix: bool = True, watermark: bool = False): """ ## 用途/Usage ### [中文] - 将[抖音|TikTok]链接作为参数提交至此端点,返回[视频|图片]文件下载请求。 ### [English] - Submit the [Douyin|TikTok] link as a parameter to this endpoint and return the [video|picture] file download request. # 参数/Parameter - url:str -> [Douyin|TikTok] [视频|图片] 链接/ [Douyin|TikTok] [video|image] link - prefix: bool -> [True/False] 是否添加前缀/Whether to add a prefix - watermark: bool -> [True/False] 是否添加水印/Whether to add a watermark """ # 是否开启此端点/Whether to enable this endpoint if config["Web_API"]["Download_Switch"] != "True": return ORJSONResponse({"status": "endpoint closed", "message": "此端点已关闭请在配置文件中开启/This endpoint is closed, please enable it in the configuration file"}) # 开始时间 start_time = time.time() headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } data = await api.hybrid_parsing(url) if data is None: return ORJSONResponse(data) else: # 记录API调用 await api_logs(start_time=start_time, input_data={'url': url}, endpoint='download') url_type = data.get('type') platform = data.get('platform') aweme_id = data.get('aweme_id') file_name_prefix = config["Web_API"]["File_Name_Prefix"] if prefix else '' root_path = config["Web_API"]["Download_Path"] # 查看目录是否存在,不存在就创建 if not os.path.exists(root_path): os.makedirs(root_path) if url_type == 'video': file_name = file_name_prefix + platform + '_' + aweme_id + '.mp4' if not watermark else file_name_prefix + platform + '_' + aweme_id + '_watermark' + '.mp4' url = data.get('video_data').get('nwm_video_url_HQ') if not watermark else data.get('video_data').get( 'wm_video_url') print('url: ', url) file_path = root_path + "/" + file_name print('file_path: ', file_path) # 判断文件是否存在,存在就直接返回、 if os.path.exists(file_path): print('文件已存在,直接返回') return FileResponse(path=file_path, media_type='video/mp4', filename=file_name) else: if platform == 'douyin': async with aiohttp.ClientSession() as session: async with session.get(url=url, headers=headers, allow_redirects=False) as response: r = response.headers cdn_url = r.get('location') async with session.get(url=cdn_url) as res: r = await res.content.read() elif platform == 'tiktok': async with aiohttp.ClientSession() as session: async with session.get(url=url, headers=headers) as res: r = await res.content.read() with open(file_path, 'wb') as f: f.write(r) return FileResponse(path=file_path, media_type='video/mp4', filename=file_name) elif url_type == 'image': url = data.get('image_data').get('no_watermark_image_list') if not watermark else data.get( 'image_data').get('watermark_image_list') print('url: ', url) zip_file_name = file_name_prefix + platform + '_' + aweme_id + '_images.zip' if not watermark else file_name_prefix + platform + '_' + aweme_id + '_images_watermark.zip' zip_file_path = root_path + "/" + zip_file_name print('zip_file_name: ', zip_file_name) print('zip_file_path: ', zip_file_path) # 判断文件是否存在,存在就直接返回、 if os.path.exists(zip_file_path): print('文件已存在,直接返回') return FileResponse(path=zip_file_path, media_type='zip', filename=zip_file_name) file_path_list = [] for i in url: async with aiohttp.ClientSession() as session: async with session.get(url=i, headers=headers) as res: content_type = res.headers.get('content-type') file_format = content_type.split('/')[1] r = await res.content.read() index = int(url.index(i)) file_name = file_name_prefix + platform + '_' + aweme_id + '_' + str( index + 1) + '.' + file_format if not watermark else \ file_name_prefix + platform + '_' + aweme_id + '_' + str( index + 1) + '_watermark' + '.' + file_format file_path = root_path + "/" + file_name file_path_list.append(file_path) print('file_path: ', file_path) with open(file_path, 'wb') as f: f.write(r) if len(url) == len(file_path_list): zip_file = zipfile.ZipFile(zip_file_path, 'w') for f in file_path_list: zip_file.write(os.path.join(f), f, zipfile.ZIP_DEFLATED) zip_file.close() return FileResponse(path=zip_file_path, media_type='zip', filename=zip_file_name) else: return ORJSONResponse(data) # 批量下载文件端点/Batch download file endpoint @app.get("/batch_download", tags=["Download"]) async def batch_download_file(url_list: str, prefix: bool = True): """ 批量下载文件端点/Batch download file endpoint 未完工/Unfinished """ print('url_list: ', url_list) return ORJSONResponse({"status": "failed", "message": "嘿嘿嘿,这个功能还没做呢,等我有空再做吧/Hehehe, this function hasn't been done yet, I'll do it when I have time"}) # 抖音链接格式下载端点(video)/Douyin link format download endpoint(video) @app.get("/video/{aweme_id}", tags=["Download"]) async def download_douyin_video(aweme_id: str, prefix: bool = True, watermark: bool = False): """ ## 用途/Usage ### [中文] - 将抖音域名改为当前服务器域名即可调用此端点,返回[视频|图片]文件下载请求。 - 例如原链接:https://douyin.com/video/1234567890123456789 改成 https://api.douyin.wtf/video/1234567890123456789 即可调用此端点。 ### [English] - Change the Douyin domain name to the current server domain name to call this endpoint and return the video file download request. - For example, the original link: https://douyin.com/video/1234567890123456789 becomes https://api.douyin.wtf/video/1234567890123456789 to call this endpoint. # 参数/Parameter - aweme_id:str -> 抖音视频ID/Douyin video ID - prefix: bool -> [True/False] 是否添加前缀/Whether to add a prefix - watermark: bool -> [True/False] 是否添加水印/Whether to add a watermark """ # 是否开启此端点/Whether to enable this endpoint if config["Web_API"]["Download_Switch"] != "True": return ORJSONResponse({"status": "endpoint closed", "message": "此端点已关闭请在配置文件中开启/This endpoint is closed, please enable it in the configuration file"}) video_url = f"https://www.douyin.com/video/{aweme_id}" download_url = f"{domain}/download?url={video_url}&prefix={prefix}&watermark={watermark}" return RedirectResponse(download_url) # 抖音链接格式下载端点(video)/Douyin link format download endpoint(video) @app.get("/note/{aweme_id}", tags=["Download"]) async def download_douyin_video(aweme_id: str, prefix: bool = True, watermark: bool = False): """ ## 用途/Usage ### [中文] - 将抖音域名改为当前服务器域名即可调用此端点,返回[视频|图片]文件下载请求。 - 例如原链接:https://douyin.com/video/1234567890123456789 改成 https://api.douyin.wtf/video/1234567890123456789 即可调用此端点。 ### [English] - Change the Douyin domain name to the current server domain name to call this endpoint and return the video file download request. - For example, the original link: https://douyin.com/video/1234567890123456789 becomes https://api.douyin.wtf/video/1234567890123456789 to call this endpoint. # 参数/Parameter - aweme_id:str -> 抖音视频ID/Douyin video ID - prefix: bool -> [True/False] 是否添加前缀/Whether to add a prefix - watermark: bool -> [True/False] 是否添加水印/Whether to add a watermark """ # 是否开启此端点/Whether to enable this endpoint if config["Web_API"]["Download_Switch"] != "True": return ORJSONResponse({"status": "endpoint closed", "message": "此端点已关闭请在配置文件中开启/This endpoint is closed, please enable it in the configuration file"}) video_url = f"https://www.douyin.com/video/{aweme_id}" download_url = f"{domain}/download?url={video_url}&prefix={prefix}&watermark={watermark}" return RedirectResponse(download_url) # 抖音链接格式下载端点/Douyin link format download endpoint @app.get("/discover", tags=["Download"]) async def download_douyin_discover(modal_id: str, prefix: bool = True, watermark: bool = False): """ ## 用途/Usage ### [中文] - 将抖音域名改为当前服务器域名即可调用此端点,返回[视频|图片]文件下载请求。 - 例如原链接:https://www.douyin.com/discover?modal_id=1234567890123456789 改成 https://api.douyin.wtf/discover?modal_id=1234567890123456789 即可调用此端点。 ### [English] - Change the Douyin domain name to the current server domain name to call this endpoint and return the video file download request. - For example, the original link: https://douyin.com/discover?modal_id=1234567890123456789 becomes https://api.douyin.wtf/discover?modal_id=1234567890123456789 to call this endpoint. # 参数/Parameter - modal_id: str -> 抖音视频ID/Douyin video ID - prefix: bool -> [True/False] 是否添加前缀/Whether to add a prefix - watermark: bool -> [True/False] 是否添加水印/Whether to add a watermark """ # 是否开启此端点/Whether to enable this endpoint if config["Web_API"]["Download_Switch"] != "True": return ORJSONResponse({"status": "endpoint closed", "message": "此端点已关闭请在配置文件中开启/This endpoint is closed, please enable it in the configuration file"}) video_url = f"https://www.douyin.com/discover?modal_id={modal_id}" download_url = f"{domain}/download?url={video_url}&prefix={prefix}&watermark={watermark}" return RedirectResponse(download_url) # Tiktok链接格式下载端点(video)/Tiktok link format download endpoint(video) @app.get("/{user_id}/video/{aweme_id}", tags=["Download"]) async def download_tiktok_video(user_id: str, aweme_id: str, prefix: bool = True, watermark: bool = False): """ ## 用途/Usage ### [中文] - 将TikTok域名改为当前服务器域名即可调用此端点,返回[视频|图片]文件下载请求。 - 例如原链接:https://www.tiktok.com/@evil0ctal/video/7156033831819037994 改成 https://api.douyin.wtf/@evil0ctal/video/7156033831819037994 即可调用此端点。 ### [English] - Change the TikTok domain name to the current server domain name to call this endpoint and return the video file download request. - For example, the original link: https://www.tiktok.com/@evil0ctal/video/7156033831819037994 becomes https://api.douyin.wtf/@evil0ctal/video/7156033831819037994 to call this endpoint. # 参数/Parameter - user_id: str -> TikTok用户ID/TikTok user ID - aweme_id: str -> TikTok视频ID/TikTok video ID - prefix: bool -> [True/False] 是否添加前缀/Whether to add a prefix - watermark: bool -> [True/False] 是否添加水印/Whether to add a watermark """ # 是否开启此端点/Whether to enable this endpoint if config["Web_API"]["Download_Switch"] != "True": return ORJSONResponse({"status": "endpoint closed", "message": "此端点已关闭请在配置文件中开启/This endpoint is closed, please enable it in the configuration file"}) video_url = f"https://www.tiktok.com/{user_id}/video/{aweme_id}" download_url = f"{domain}/download?url={video_url}&prefix={prefix}&watermark={watermark}" return RedirectResponse(download_url) # 定期清理[Download_Path]文件夹 # Periodically clean the [Download_Path] folder def cleanup_path(): while True: root_path = config["Web_API"]["Download_Path"] timer = int(config["Web_API"]["Download_Path_Clean_Timer"]) # 查看目录是否存在,不存在就跳过 if os.path.exists(root_path): time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) print(f"{time_now}: Cleaning up the download folder...") for file in os.listdir("./download"): file_path = os.path.join("./download", file) try: if os.path.isfile(file_path): os.remove(file_path) except Exception as e: print(e) else: time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) print(f"{time_now}: The download folder does not exist, skipping...") time.sleep(timer) """ ________________________⬇️项目启动执行函数(Project start execution function)⬇️________________________""" # 程序启动后执行/Execute after program startup @app.on_event("startup") async def startup_event(): # 创建一个清理下载目录定时器线程并启动 # Create a timer thread to clean up the download directory and start it download_path_clean_switches = True if config["Web_API"]["Download_Path_Clean_Switch"] == "True" else False if download_path_clean_switches: # 启动清理线程/Start cleaning thread thread_1 = threading.Thread(target=cleanup_path) thread_1.start() if __name__ == '__main__': # 建议使用gunicorn启动,使用uvicorn启动时请将debug设置为False # It is recommended to use gunicorn to start, when using uvicorn to start, please set debug to False # uvicorn web_api:app --host '0.0.0.0' --port 8000 --reload uvicorn.run("web_api:app", host='0.0.0.0', port=port, reload=True, access_log=False)