mirror of
https://github.com/Evil0ctal/Douyin_TikTok_Download_API.git
synced 2025-04-23 00:29:23 +08:00
764 lines
28 KiB
Python
764 lines
28 KiB
Python
# ==============================================================================
|
||
# Copyright (C) 2021 Evil0ctal
|
||
#
|
||
# This file is part of the Douyin_TikTok_Download_API project.
|
||
#
|
||
# This project is licensed under the Apache License 2.0 (the "License");
|
||
# you may not use this file except in compliance with the License.
|
||
# You may obtain a copy of the License at:
|
||
# http://www.apache.org/licenses/LICENSE-2.0
|
||
#
|
||
# Unless required by applicable law or agreed to in writing, software
|
||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
# See the License for the specific language governing permissions and
|
||
# limitations under the License.
|
||
# ==============================================================================
|
||
# __
|
||
# /> フ
|
||
# | _ _ l
|
||
# /` ミ_xノ
|
||
# / | Feed me Stars ⭐ ️
|
||
# / ヽ ノ
|
||
# │ | | |
|
||
# / ̄| | | |
|
||
# | ( ̄ヽ__ヽ_)__)
|
||
# \二つ
|
||
# ==============================================================================
|
||
#
|
||
# Contributor Link:
|
||
# - https://github.com/Evil0ctal
|
||
# - https://github.com/Johnserf-Seed
|
||
#
|
||
# ==============================================================================
|
||
|
||
|
||
import re
|
||
import json
|
||
import time
|
||
import httpx
|
||
import qrcode
|
||
import random
|
||
import asyncio
|
||
import yaml
|
||
|
||
from typing import Union
|
||
from pathlib import Path
|
||
|
||
from crawlers.utils.logger import logger
|
||
from crawlers.utils.utils import (
|
||
gen_random_str,
|
||
get_timestamp,
|
||
extract_valid_urls,
|
||
split_filename,
|
||
)
|
||
from crawlers.utils.api_exceptions import (
|
||
APIError,
|
||
APIConnectionError,
|
||
APIResponseError,
|
||
APIUnavailableError,
|
||
APIUnauthorizedError,
|
||
APINotFoundError,
|
||
)
|
||
|
||
from crawlers.douyin.web.xbogus import XBogus as XB
|
||
|
||
from urllib.parse import quote
|
||
import os
|
||
|
||
# 配置文件路径
|
||
# Read the configuration file
|
||
path = os.path.abspath(os.path.dirname(__file__))
|
||
|
||
# 读取配置文件
|
||
with open(f"{path}/config.yaml", "r", encoding="utf-8") as f:
|
||
config = yaml.safe_load(f)
|
||
|
||
|
||
class TokenManager:
|
||
douyin_manager = config.get("TokenManager").get("douyin")
|
||
token_conf = douyin_manager.get("msToken", None)
|
||
ttwid_conf = douyin_manager.get("ttwid", None)
|
||
proxies_conf = douyin_manager.get("proxies", None)
|
||
proxies = {
|
||
"http://": proxies_conf.get("http", None),
|
||
"https://": proxies_conf.get("https", None),
|
||
}
|
||
|
||
@classmethod
|
||
def gen_real_msToken(cls) -> str:
|
||
"""
|
||
生成真实的msToken,当出现错误时返回虚假的值
|
||
(Generate a real msToken and return a false value when an error occurs)
|
||
"""
|
||
|
||
payload = json.dumps(
|
||
{
|
||
"magic": cls.token_conf["magic"],
|
||
"version": cls.token_conf["version"],
|
||
"dataType": cls.token_conf["dataType"],
|
||
"strData": cls.token_conf["strData"],
|
||
"tspFromClient": get_timestamp(),
|
||
}
|
||
)
|
||
headers = {
|
||
"User-Agent": cls.token_conf["User-Agent"],
|
||
"Content-Type": "application/json",
|
||
}
|
||
|
||
transport = httpx.HTTPTransport(retries=5)
|
||
with httpx.Client(transport=transport, proxies=cls.proxies) as client:
|
||
try:
|
||
response = client.post(
|
||
cls.token_conf["url"], content=payload, headers=headers
|
||
)
|
||
response.raise_for_status()
|
||
|
||
msToken = str(httpx.Cookies(response.cookies).get("msToken"))
|
||
if len(msToken) not in [120, 128]:
|
||
raise APIResponseError("{0} 内容不符合要求".format("msToken"))
|
||
|
||
return msToken
|
||
|
||
except httpx.RequestError as exc:
|
||
# 捕获所有与 httpx 请求相关的异常情况 (Captures all httpx request-related exceptions)
|
||
raise APIConnectionError(
|
||
"请求端点失败,请检查当前网络环境。 链接:{0},代理:{1},异常类名:{2},异常详细信息:{3}"
|
||
.format(cls.token_conf["url"], cls.proxies, cls.__name__, exc)
|
||
)
|
||
|
||
except httpx.HTTPStatusError as e:
|
||
# 捕获 httpx 的状态代码错误 (captures specific status code errors from httpx)
|
||
if e.response.status_code == 401:
|
||
raise APIUnauthorizedError(
|
||
"参数验证失败,请更新 F2 配置文件中的 {0},以匹配 {1} 新规则"
|
||
.format("msToken", "douyin")
|
||
)
|
||
|
||
elif e.response.status_code == 404:
|
||
raise APINotFoundError("{0} 无法找到API端点".format("msToken"))
|
||
else:
|
||
raise APIResponseError(
|
||
"链接:{0},状态码 {1}:{2} ".format(
|
||
e.response.url, e.response.status_code, e.response.text
|
||
)
|
||
)
|
||
|
||
except APIError as e:
|
||
# 返回虚假的msToken (Return a fake msToken)
|
||
logger.error("msToken API错误:{0}".format(e))
|
||
logger.info("生成虚假的msToken")
|
||
return cls.gen_false_msToken()
|
||
|
||
@classmethod
|
||
def gen_false_msToken(cls) -> str:
|
||
"""生成随机msToken (Generate random msToken)"""
|
||
return gen_random_str(126) + "=="
|
||
|
||
@classmethod
|
||
def gen_ttwid(cls) -> str:
|
||
"""
|
||
生成请求必带的ttwid
|
||
(Generate the essential ttwid for requests)
|
||
"""
|
||
|
||
transport = httpx.HTTPTransport(retries=5)
|
||
with httpx.Client(transport=transport) as client:
|
||
try:
|
||
response = client.post(
|
||
cls.ttwid_conf["url"], content=cls.ttwid_conf["data"]
|
||
)
|
||
response.raise_for_status()
|
||
|
||
ttwid = str(httpx.Cookies(response.cookies).get("ttwid"))
|
||
return ttwid
|
||
|
||
except httpx.RequestError as exc:
|
||
# 捕获所有与 httpx 请求相关的异常情况 (Captures all httpx request-related exceptions)
|
||
raise APIConnectionError(
|
||
"请求端点失败,请检查当前网络环境。 链接:{0},代理:{1},异常类名:{2},异常详细信息:{3}"
|
||
.format(cls.ttwid_conf["url"], cls.proxies, cls.__name__, exc)
|
||
)
|
||
|
||
except httpx.HTTPStatusError as e:
|
||
# 捕获 httpx 的状态代码错误 (captures specific status code errors from httpx)
|
||
if e.response.status_code == 401:
|
||
raise APIUnauthorizedError(
|
||
"参数验证失败,请更新 F2 配置文件中的 {0},以匹配 {1} 新规则"
|
||
.format("ttwid", "douyin")
|
||
)
|
||
|
||
elif e.response.status_code == 404:
|
||
raise APINotFoundError("ttwid无法找到API端点")
|
||
else:
|
||
raise APIResponseError("链接:{0},状态码 {1}:{2} ".format(
|
||
e.response.url, e.response.status_code, e.response.text
|
||
)
|
||
)
|
||
|
||
|
||
class VerifyFpManager:
|
||
@classmethod
|
||
def gen_verify_fp(cls) -> str:
|
||
"""
|
||
生成verifyFp 与 s_v_web_id (Generate verifyFp)
|
||
"""
|
||
base_str = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
|
||
t = len(base_str)
|
||
milliseconds = int(round(time.time() * 1000))
|
||
base36 = ""
|
||
while milliseconds > 0:
|
||
remainder = milliseconds % 36
|
||
if remainder < 10:
|
||
base36 = str(remainder) + base36
|
||
else:
|
||
base36 = chr(ord("a") + remainder - 10) + base36
|
||
milliseconds = int(milliseconds / 36)
|
||
r = base36
|
||
o = [""] * 36
|
||
o[8] = o[13] = o[18] = o[23] = "_"
|
||
o[14] = "4"
|
||
|
||
for i in range(36):
|
||
if not o[i]:
|
||
n = 0 or int(random.random() * t)
|
||
if i == 19:
|
||
n = 3 & n | 8
|
||
o[i] = base_str[n]
|
||
|
||
return "verify_" + r + "_" + "".join(o)
|
||
|
||
@classmethod
|
||
def gen_s_v_web_id(cls) -> str:
|
||
return cls.gen_verify_fp()
|
||
|
||
|
||
class BogusManager:
|
||
@classmethod
|
||
def xb_str_2_endpoint(cls, endpoint: str, user_agent: str) -> str:
|
||
try:
|
||
final_endpoint = XB(user_agent).getXBogus(endpoint)
|
||
except Exception as e:
|
||
raise RuntimeError("生成X-Bogus失败: {0})".format(e))
|
||
|
||
return final_endpoint[0]
|
||
|
||
@classmethod
|
||
def xb_model_2_endpoint(cls, base_endpoint: str, params: dict, user_agent: str) -> str:
|
||
if not isinstance(params, dict):
|
||
raise TypeError("参数必须是字典类型")
|
||
|
||
param_str = "&".join([f"{k}={v}" for k, v in params.items()])
|
||
|
||
try:
|
||
xb_value = XB(user_agent).getXBogus(param_str)
|
||
except Exception as e:
|
||
raise RuntimeError("生成X-Bogus失败: {0})".format(e))
|
||
|
||
# 检查base_endpoint是否已有查询参数 (Check if base_endpoint already has query parameters)
|
||
separator = "&" if "?" in base_endpoint else "?"
|
||
|
||
final_endpoint = f"{base_endpoint}{separator}{param_str}&X-Bogus={xb_value[1]}"
|
||
|
||
return final_endpoint
|
||
|
||
|
||
class SecUserIdFetcher:
|
||
# 预编译正则表达式
|
||
_DOUYIN_URL_PATTERN = re.compile(r"user/([^/?]*)")
|
||
_REDIRECT_URL_PATTERN = re.compile(r"sec_uid=([^&]*)")
|
||
|
||
@classmethod
|
||
async def get_sec_user_id(cls, url: str) -> str:
|
||
"""
|
||
从单个url中获取sec_user_id (Get sec_user_id from a single url)
|
||
|
||
Args:
|
||
url (str): 输入的url (Input url)
|
||
|
||
Returns:
|
||
str: 匹配到的sec_user_id (Matched sec_user_id)。
|
||
"""
|
||
|
||
if not isinstance(url, str):
|
||
raise TypeError("参数必须是字符串类型")
|
||
|
||
# 提取有效URL
|
||
url = extract_valid_urls(url)
|
||
|
||
if url is None:
|
||
raise (
|
||
APINotFoundError("输入的URL不合法。类名:{0}".format(cls.__name__))
|
||
)
|
||
|
||
pattern = (
|
||
cls._REDIRECT_URL_PATTERN
|
||
if "v.douyin.com" in url
|
||
else cls._DOUYIN_URL_PATTERN
|
||
)
|
||
|
||
try:
|
||
transport = httpx.AsyncHTTPTransport(retries=5)
|
||
async with httpx.AsyncClient(
|
||
transport=transport, proxies=TokenManager.proxies, timeout=10
|
||
) as client:
|
||
response = await client.get(url, follow_redirects=True)
|
||
# 444一般为Nginx拦截,不返回状态 (444 is generally intercepted by Nginx and does not return status)
|
||
if response.status_code in {200, 444}:
|
||
match = pattern.search(str(response.url))
|
||
if match:
|
||
return match.group(1)
|
||
else:
|
||
raise APIResponseError(
|
||
"未在响应的地址中找到sec_user_id,检查链接是否为用户主页类名:{0}"
|
||
.format(cls.__name__)
|
||
)
|
||
|
||
elif response.status_code == 401:
|
||
raise APIUnauthorizedError("未授权的请求。类名:{0}".format(cls.__name__)
|
||
)
|
||
elif response.status_code == 404:
|
||
raise APINotFoundError("未找到API端点。类名:{0}".format(cls.__name__)
|
||
)
|
||
elif response.status_code == 503:
|
||
raise APIUnavailableError("API服务不可用。类名:{0}".format(cls.__name__)
|
||
)
|
||
else:
|
||
raise APIResponseError("链接:{0},状态码 {1}:{2} ".format(
|
||
response.url, response.status_code, response.text
|
||
)
|
||
)
|
||
|
||
except httpx.RequestError as exc:
|
||
raise APIConnectionError("请求端点失败,请检查当前网络环境。 链接:{0},代理:{1},异常类名:{2},异常详细信息:{3}"
|
||
.format(url, TokenManager.proxies, cls.__name__, exc)
|
||
)
|
||
|
||
@classmethod
|
||
async def get_all_sec_user_id(cls, urls: list) -> list:
|
||
"""
|
||
获取列表sec_user_id列表 (Get list sec_user_id list)
|
||
|
||
Args:
|
||
urls: list: 用户url列表 (User url list)
|
||
|
||
Return:
|
||
sec_user_ids: list: 用户sec_user_id列表 (User sec_user_id list)
|
||
"""
|
||
|
||
if not isinstance(urls, list):
|
||
raise TypeError("参数必须是列表类型")
|
||
|
||
# 提取有效URL
|
||
urls = extract_valid_urls(urls)
|
||
|
||
if urls == []:
|
||
raise (
|
||
APINotFoundError("输入的URL List不合法。类名:{0}".format(cls.__name__)
|
||
)
|
||
)
|
||
|
||
sec_user_ids = [cls.get_sec_user_id(url) for url in urls]
|
||
return await asyncio.gather(*sec_user_ids)
|
||
|
||
|
||
class AwemeIdFetcher:
|
||
# 预编译正则表达式
|
||
_DOUYIN_VIDEO_URL_PATTERN = re.compile(r"video/([^/?]*)")
|
||
_DOUYIN_NOTE_URL_PATTERN = re.compile(r"note/([^/?]*)")
|
||
_DOUYIN_DISCOVER_URL_PATTERN = re.compile(r"modal_id=([0-9]+)")
|
||
|
||
@classmethod
|
||
async def get_aweme_id(cls, url: str) -> str:
|
||
"""
|
||
从单个url中获取aweme_id (Get aweme_id from a single url)
|
||
|
||
Args:
|
||
url (str): 输入的url (Input url)
|
||
|
||
Returns:
|
||
str: 匹配到的aweme_id (Matched aweme_id)。
|
||
"""
|
||
|
||
if not isinstance(url, str):
|
||
raise TypeError("参数必须是字符串类型")
|
||
|
||
# 提取有效URL
|
||
url = extract_valid_urls(url)
|
||
|
||
if url is None:
|
||
raise (
|
||
APINotFoundError("输入的URL不合法。类名:{0}".format(cls.__name__))
|
||
)
|
||
|
||
# 重定向到完整链接
|
||
transport = httpx.AsyncHTTPTransport(retries=5)
|
||
async with httpx.AsyncClient(
|
||
transport=transport, proxies=TokenManager.proxies, timeout=10
|
||
) as client:
|
||
try:
|
||
response = await client.get(url, follow_redirects=True)
|
||
response.raise_for_status()
|
||
|
||
video_pattern = cls._DOUYIN_VIDEO_URL_PATTERN
|
||
note_pattern = cls._DOUYIN_NOTE_URL_PATTERN
|
||
discover_pattern = cls._DOUYIN_DISCOVER_URL_PATTERN
|
||
|
||
# 2024-4-22
|
||
# 嵌套如果超过3层需要修改此处代码 (If the nesting exceeds 3 layers, you need to modify this code)
|
||
match = video_pattern.search(str(response.url))
|
||
if video_pattern.search(str(response.url)):
|
||
aweme_id = match.group(1)
|
||
else:
|
||
match = note_pattern.search(str(response.url))
|
||
if match:
|
||
aweme_id = match.group(1)
|
||
else:
|
||
match = discover_pattern.search(str(response.url))
|
||
if match:
|
||
aweme_id = match.group(1)
|
||
else:
|
||
raise APIResponseError(
|
||
"未在响应的地址中找到aweme_id,检查链接是否为作品页"
|
||
)
|
||
return aweme_id
|
||
|
||
except httpx.RequestError as exc:
|
||
# 捕获所有与 httpx 请求相关的异常情况 (Captures all httpx request-related exceptions)
|
||
raise APIConnectionError("请求端点失败,请检查当前网络环境。 链接:{0},代理:{1},异常类名:{2},异常详细信息:{3}"
|
||
.format(url, TokenManager.proxies, cls.__name__, exc)
|
||
)
|
||
|
||
except httpx.HTTPStatusError as e:
|
||
raise APIResponseError("链接:{0},状态码 {1}:{2} ".format(
|
||
e.response.url, e.response.status_code, e.response.text
|
||
)
|
||
)
|
||
|
||
@classmethod
|
||
async def get_all_aweme_id(cls, urls: list) -> list:
|
||
"""
|
||
获取视频aweme_id,传入列表url都可以解析出aweme_id (Get video aweme_id, pass in the list url can parse out aweme_id)
|
||
|
||
Args:
|
||
urls: list: 列表url (list url)
|
||
|
||
Return:
|
||
aweme_ids: list: 视频的唯一标识,返回列表 (The unique identifier of the video, return list)
|
||
"""
|
||
|
||
if not isinstance(urls, list):
|
||
raise TypeError("参数必须是列表类型")
|
||
|
||
# 提取有效URL
|
||
urls = extract_valid_urls(urls)
|
||
|
||
if urls == []:
|
||
raise (
|
||
APINotFoundError("输入的URL List不合法。类名:{0}".format(cls.__name__)
|
||
)
|
||
)
|
||
|
||
aweme_ids = [cls.get_aweme_id(url) for url in urls]
|
||
return await asyncio.gather(*aweme_ids)
|
||
|
||
|
||
class MixIdFetcher:
|
||
# 获取方法同AwemeIdFetcher
|
||
@classmethod
|
||
async def get_mix_id(cls, url: str) -> str:
|
||
return
|
||
|
||
|
||
class WebCastIdFetcher:
|
||
# 预编译正则表达式
|
||
_DOUYIN_LIVE_URL_PATTERN = re.compile(r"live/([^/?]*)")
|
||
# https://live.douyin.com/766545142636?cover_type=0&enter_from_merge=web_live&enter_method=web_card&game_name=&is_recommend=1&live_type=game&more_detail=&request_id=20231110224012D47CD00C18B4AE4BFF9B&room_id=7299828646049827596&stream_type=vertical&title_type=1&web_live_page=hot_live&web_live_tab=all
|
||
# https://live.douyin.com/766545142636
|
||
_DOUYIN_LIVE_URL_PATTERN2 = re.compile(r"http[s]?://live.douyin.com/(\d+)")
|
||
# https://webcast.amemv.com/douyin/webcast/reflow/7318296342189919011?u_code=l1j9bkbd&did=MS4wLjABAAAAEs86TBQPNwAo-RGrcxWyCdwKhI66AK3Pqf3ieo6HaxI&iid=MS4wLjABAAAA0ptpM-zzoliLEeyvWOCUt-_dQza4uSjlIvbtIazXnCY&with_sec_did=1&use_link_command=1&ecom_share_track_params=&extra_params={"from_request_id":"20231230162057EC005772A8EAA0199906","im_channel_invite_id":"0"}&user_id=3644207898042206&liveId=7318296342189919011&from=share&style=share&enter_method=click_share&roomId=7318296342189919011&activity_info={}
|
||
_DOUYIN_LIVE_URL_PATTERN3 = re.compile(r"reflow/([^/?]*)")
|
||
|
||
@classmethod
|
||
async def get_webcast_id(cls, url: str) -> str:
|
||
"""
|
||
从单个url中获取webcast_id (Get webcast_id from a single url)
|
||
|
||
Args:
|
||
url (str): 输入的url (Input url)
|
||
|
||
Returns:
|
||
str: 匹配到的webcast_id (Matched webcast_id)。
|
||
"""
|
||
|
||
if not isinstance(url, str):
|
||
raise TypeError("参数必须是字符串类型")
|
||
|
||
# 提取有效URL
|
||
url = extract_valid_urls(url)
|
||
|
||
if url is None:
|
||
raise (
|
||
APINotFoundError("输入的URL不合法。类名:{0}".format(cls.__name__))
|
||
)
|
||
try:
|
||
# 重定向到完整链接
|
||
transport = httpx.AsyncHTTPTransport(retries=5)
|
||
async with httpx.AsyncClient(
|
||
transport=transport, proxies=TokenManager.proxies, timeout=10
|
||
) as client:
|
||
response = await client.get(url, follow_redirects=True)
|
||
response.raise_for_status()
|
||
url = str(response.url)
|
||
|
||
live_pattern = cls._DOUYIN_LIVE_URL_PATTERN
|
||
live_pattern2 = cls._DOUYIN_LIVE_URL_PATTERN2
|
||
live_pattern3 = cls._DOUYIN_LIVE_URL_PATTERN3
|
||
|
||
if live_pattern.search(url):
|
||
match = live_pattern.search(url)
|
||
elif live_pattern2.search(url):
|
||
match = live_pattern2.search(url)
|
||
elif live_pattern3.search(url):
|
||
match = live_pattern3.search(url)
|
||
logger.warning("该链接返回的是room_id,请使用`fetch_user_live_videos_by_room_id`接口"
|
||
|
||
)
|
||
else:
|
||
raise APIResponseError("未在响应的地址中找到webcast_id,检查链接是否为直播页"
|
||
)
|
||
|
||
return match.group(1)
|
||
|
||
except httpx.RequestError as exc:
|
||
# 捕获所有与 httpx 请求相关的异常情况 (Captures all httpx request-related exceptions)
|
||
raise APIConnectionError("请求端点失败,请检查当前网络环境。 链接:{0},代理:{1},异常类名:{2},异常详细信息:{3}"
|
||
.format(url, TokenManager.proxies, cls.__name__, exc)
|
||
)
|
||
|
||
except httpx.HTTPStatusError as e:
|
||
raise APIResponseError("链接:{0},状态码 {1}:{2} ".format(
|
||
e.response.url, e.response.status_code, e.response.text
|
||
)
|
||
)
|
||
|
||
@classmethod
|
||
async def get_all_webcast_id(cls, urls: list) -> list:
|
||
"""
|
||
获取直播webcast_id,传入列表url都可以解析出webcast_id (Get live webcast_id, pass in the list url can parse out webcast_id)
|
||
|
||
Args:
|
||
urls: list: 列表url (list url)
|
||
|
||
Return:
|
||
webcast_ids: list: 直播的唯一标识,返回列表 (The unique identifier of the live, return list)
|
||
"""
|
||
|
||
if not isinstance(urls, list):
|
||
raise TypeError("参数必须是列表类型")
|
||
|
||
# 提取有效URL
|
||
urls = extract_valid_urls(urls)
|
||
|
||
if urls == []:
|
||
raise (
|
||
APINotFoundError("输入的URL List不合法。类名:{0}".format(cls.__name__)
|
||
)
|
||
)
|
||
|
||
webcast_ids = [cls.get_webcast_id(url) for url in urls]
|
||
return await asyncio.gather(*webcast_ids)
|
||
|
||
|
||
def format_file_name(
|
||
naming_template: str,
|
||
aweme_data: dict = {},
|
||
custom_fields: dict = {},
|
||
) -> str:
|
||
"""
|
||
根据配置文件的全局格式化文件名
|
||
(Format file name according to the global conf file)
|
||
|
||
Args:
|
||
aweme_data (dict): 抖音数据的字典 (dict of douyin data)
|
||
naming_template (str): 文件的命名模板, 如 "{create}_{desc}" (Naming template for files, such as "{create}_{desc}")
|
||
custom_fields (dict): 用户自定义字段, 用于替代默认的字段值 (Custom fields for replacing default field values)
|
||
|
||
Note:
|
||
windows 文件名长度限制为 255 个字符, 开启了长文件名支持后为 32,767 个字符
|
||
(Windows file name length limit is 255 characters, 32,767 characters after long file name support is enabled)
|
||
Unix 文件名长度限制为 255 个字符
|
||
(Unix file name length limit is 255 characters)
|
||
取去除后的50个字符, 加上后缀, 一般不会超过255个字符
|
||
(Take the removed 50 characters, add the suffix, and generally not exceed 255 characters)
|
||
详细信息请参考: https://en.wikipedia.org/wiki/Filename#Length
|
||
(For more information, please refer to: https://en.wikipedia.org/wiki/Filename#Length)
|
||
|
||
Returns:
|
||
str: 格式化的文件名 (Formatted file name)
|
||
"""
|
||
|
||
# 为不同系统设置不同的文件名长度限制
|
||
os_limit = {
|
||
"win32": 200,
|
||
"cygwin": 60,
|
||
"darwin": 60,
|
||
"linux": 60,
|
||
}
|
||
|
||
fields = {
|
||
"create": aweme_data.get("create_time", ""), # 长度固定19
|
||
"nickname": aweme_data.get("nickname", ""), # 最长30
|
||
"aweme_id": aweme_data.get("aweme_id", ""), # 长度固定19
|
||
"desc": split_filename(aweme_data.get("desc", ""), os_limit),
|
||
"uid": aweme_data.get("uid", ""), # 固定11
|
||
}
|
||
|
||
if custom_fields:
|
||
# 更新自定义字段
|
||
fields.update(custom_fields)
|
||
|
||
try:
|
||
return naming_template.format(**fields)
|
||
except KeyError as e:
|
||
raise KeyError("文件名模板字段 {0} 不存在,请检查".format(e))
|
||
|
||
|
||
def create_user_folder(kwargs: dict, nickname: Union[str, int]) -> Path:
|
||
"""
|
||
根据提供的配置文件和昵称,创建对应的保存目录。
|
||
(Create the corresponding save directory according to the provided conf file and nickname.)
|
||
|
||
Args:
|
||
kwargs (dict): 配置文件,字典格式。(Conf file, dict format)
|
||
nickname (Union[str, int]): 用户的昵称,允许字符串或整数。 (User nickname, allow strings or integers)
|
||
|
||
Note:
|
||
如果未在配置文件中指定路径,则默认为 "Download"。
|
||
(If the path is not specified in the conf file, it defaults to "Download".)
|
||
支持绝对与相对路径。
|
||
(Support absolute and relative paths)
|
||
|
||
Raises:
|
||
TypeError: 如果 kwargs 不是字典格式,将引发 TypeError。
|
||
(If kwargs is not in dict format, TypeError will be raised.)
|
||
"""
|
||
|
||
# 确定函数参数是否正确
|
||
if not isinstance(kwargs, dict):
|
||
raise TypeError("kwargs 参数必须是字典")
|
||
|
||
# 创建基础路径
|
||
base_path = Path(kwargs.get("path", "Download"))
|
||
|
||
# 添加下载模式和用户名
|
||
user_path = (
|
||
base_path / "douyin" / kwargs.get("mode", "PLEASE_SETUP_MODE") / str(nickname)
|
||
)
|
||
|
||
# 获取绝对路径并确保它存在
|
||
resolve_user_path = user_path.resolve()
|
||
|
||
# 创建目录
|
||
resolve_user_path.mkdir(parents=True, exist_ok=True)
|
||
|
||
return resolve_user_path
|
||
|
||
|
||
def rename_user_folder(old_path: Path, new_nickname: str) -> Path:
|
||
"""
|
||
重命名用户目录 (Rename User Folder).
|
||
|
||
Args:
|
||
old_path (Path): 旧的用户目录路径 (Path of the old user folder)
|
||
new_nickname (str): 新的用户昵称 (New user nickname)
|
||
|
||
Returns:
|
||
Path: 重命名后的用户目录路径 (Path of the renamed user folder)
|
||
"""
|
||
# 获取目标目录的父目录 (Get the parent directory of the target folder)
|
||
parent_directory = old_path.parent
|
||
|
||
# 构建新目录路径 (Construct the new directory path)
|
||
new_path = old_path.rename(parent_directory / new_nickname).resolve()
|
||
|
||
return new_path
|
||
|
||
|
||
def create_or_rename_user_folder(
|
||
kwargs: dict, local_user_data: dict, current_nickname: str
|
||
) -> Path:
|
||
"""
|
||
创建或重命名用户目录 (Create or rename user directory)
|
||
|
||
Args:
|
||
kwargs (dict): 配置参数 (Conf parameters)
|
||
local_user_data (dict): 本地用户数据 (Local user data)
|
||
current_nickname (str): 当前用户昵称 (Current user nickname)
|
||
|
||
Returns:
|
||
user_path (Path): 用户目录路径 (User directory path)
|
||
"""
|
||
user_path = create_user_folder(kwargs, current_nickname)
|
||
|
||
if not local_user_data:
|
||
return user_path
|
||
|
||
if local_user_data.get("nickname") != current_nickname:
|
||
# 昵称不一致,触发目录更新操作
|
||
user_path = rename_user_folder(user_path, current_nickname)
|
||
|
||
return user_path
|
||
|
||
|
||
def show_qrcode(qrcode_url: str, show_image: bool = False) -> None:
|
||
"""
|
||
显示二维码 (Show QR code)
|
||
|
||
Args:
|
||
qrcode_url (str): 登录二维码链接 (Login QR code link)
|
||
show_image (bool): 是否显示图像,True 表示显示,False 表示在控制台显示
|
||
(Whether to display the image, True means display, False means display in the console)
|
||
"""
|
||
if show_image:
|
||
# 创建并显示QR码图像
|
||
qr_code_img = qrcode.make(qrcode_url)
|
||
qr_code_img.show()
|
||
else:
|
||
# 在控制台以 ASCII 形式打印二维码
|
||
qr = qrcode.QRCode()
|
||
qr.add_data(qrcode_url)
|
||
qr.make(fit=True)
|
||
# 在控制台以 ASCII 形式打印二维码
|
||
qr.print_ascii(invert=True)
|
||
|
||
|
||
def json_2_lrc(data: Union[str, list, dict]) -> str:
|
||
"""
|
||
从抖音原声json格式歌词生成lrc格式歌词
|
||
(Generate lrc lyrics format from Douyin original json lyrics format)
|
||
|
||
Args:
|
||
data (Union[str, list, dict]): 抖音原声json格式歌词 (Douyin original json lyrics format)
|
||
|
||
Returns:
|
||
str: 生成的lrc格式歌词 (Generated lrc format lyrics)
|
||
"""
|
||
try:
|
||
lrc_lines = []
|
||
for item in data:
|
||
text = item["text"]
|
||
time_seconds = float(item["timeId"])
|
||
minutes = int(time_seconds // 60)
|
||
seconds = int(time_seconds % 60)
|
||
milliseconds = int((time_seconds % 1) * 1000)
|
||
time_str = f"{minutes:02}:{seconds:02}.{milliseconds:03}"
|
||
lrc_lines.append(f"[{time_str}] {text}")
|
||
except KeyError as e:
|
||
raise KeyError("歌词数据字段错误:{0}".format(e))
|
||
except RuntimeError as e:
|
||
raise RuntimeError("生成歌词文件失败:{0},请检查歌词 `data` 内容".format(e))
|
||
except TypeError as e:
|
||
raise TypeError("歌词数据类型错误:{0}".format(e))
|
||
return "\n".join(lrc_lines)
|