2024-07-07 15:47:48 +08:00

805 lines
30 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# ==============================================================================
# Copyright (C) 2021 Evil0ctal
#
# This file is part of the Douyin_TikTok_Download_API project.
#
# This project is licensed under the Apache License 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
#         __
#        />  フ
#       |  _  _ l
#       ` ミ_x
#      /      | Feed me Stars ⭐
#     /  ヽ   ノ
#     │  | | |
#  / ̄|   | | |
#  | ( ̄ヽ__ヽ_)__)
#  \二つ
# ==============================================================================
#
# Contributor Link:
# - https://github.com/Evil0ctal
# - https://github.com/Johnserf-Seed
#
# ==============================================================================
import asyncio
import json
import os
import random
import re
import time
import urllib
from pathlib import Path
from typing import Union
from urllib.parse import urlencode, quote
# import execjs
import httpx
import qrcode
import yaml
from crawlers.douyin.web.xbogus import XBogus as XB
from crawlers.douyin.web.abogus import ABogus as AB
from crawlers.utils.api_exceptions import (
APIError,
APIConnectionError,
APIResponseError,
APIUnavailableError,
APIUnauthorizedError,
APINotFoundError,
)
from crawlers.utils.logger import logger
from crawlers.utils.utils import (
gen_random_str,
get_timestamp,
extract_valid_urls,
split_filename,
)
# 配置文件路径
# Read the configuration file
path = os.path.abspath(os.path.dirname(__file__))
# 读取配置文件
with open(f"{path}/config.yaml", "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
class TokenManager:
douyin_manager = config.get("TokenManager").get("douyin")
token_conf = douyin_manager.get("msToken", None)
ttwid_conf = douyin_manager.get("ttwid", None)
proxies_conf = douyin_manager.get("proxies", None)
proxies = {
"http://": proxies_conf.get("http", None),
"https://": proxies_conf.get("https", None),
}
@classmethod
def gen_real_msToken(cls) -> str:
"""
生成真实的msToken,当出现错误时返回虚假的值
(Generate a real msToken and return a false value when an error occurs)
"""
payload = json.dumps(
{
"magic": cls.token_conf["magic"],
"version": cls.token_conf["version"],
"dataType": cls.token_conf["dataType"],
"strData": cls.token_conf["strData"],
"tspFromClient": get_timestamp(),
}
)
headers = {
"User-Agent": cls.token_conf["User-Agent"],
"Content-Type": "application/json",
}
transport = httpx.HTTPTransport(retries=5)
with httpx.Client(transport=transport, proxies=cls.proxies) as client:
try:
response = client.post(
cls.token_conf["url"], content=payload, headers=headers
)
response.raise_for_status()
msToken = str(httpx.Cookies(response.cookies).get("msToken"))
if len(msToken) not in [120, 128]:
raise APIResponseError("响应内容:{0} Douyin msToken API 的响应内容不符合要求。".format(msToken))
return msToken
# except httpx.RequestError as exc:
# # 捕获所有与 httpx 请求相关的异常情况 (Captures all httpx request-related exceptions)
# raise APIConnectionError(
# "请求端点失败,请检查当前网络环境。 链接:{0},代理:{1},异常类名:{2},异常详细信息:{3}"
# .format(cls.token_conf["url"], cls.proxies, cls.__name__, exc)
# )
#
# except httpx.HTTPStatusError as e:
# # 捕获 httpx 的状态代码错误 (captures specific status code errors from httpx)
# if e.response.status_code == 401:
# raise APIUnauthorizedError(
# "参数验证失败,请更新 Douyin_TikTok_Download_API 配置文件中的 {0},以匹配 {1} 新规则"
# .format("msToken", "douyin")
# )
#
# elif e.response.status_code == 404:
# raise APINotFoundError("{0} 无法找到API端点".format("msToken"))
# else:
# raise APIResponseError(
# "链接:{0},状态码 {1}{2} ".format(
# e.response.url, e.response.status_code, e.response.text
# )
# )
except Exception as e:
# 返回虚假的msToken (Return a fake msToken)
logger.error("请求Douyin msToken API时发生错误{0}".format(e))
logger.info("将使用本地生成的虚假msToken参数以继续请求。")
return cls.gen_false_msToken()
@classmethod
def gen_false_msToken(cls) -> str:
"""生成随机msToken (Generate random msToken)"""
return gen_random_str(126) + "=="
@classmethod
def gen_ttwid(cls) -> str:
"""
生成请求必带的ttwid
(Generate the essential ttwid for requests)
"""
transport = httpx.HTTPTransport(retries=5)
with httpx.Client(transport=transport) as client:
try:
response = client.post(
cls.ttwid_conf["url"], content=cls.ttwid_conf["data"]
)
response.raise_for_status()
ttwid = str(httpx.Cookies(response.cookies).get("ttwid"))
return ttwid
except httpx.RequestError as exc:
# 捕获所有与 httpx 请求相关的异常情况 (Captures all httpx request-related exceptions)
raise APIConnectionError(
"请求端点失败,请检查当前网络环境。 链接:{0},代理:{1},异常类名:{2},异常详细信息:{3}"
.format(cls.ttwid_conf["url"], cls.proxies, cls.__name__, exc)
)
except httpx.HTTPStatusError as e:
# 捕获 httpx 的状态代码错误 (captures specific status code errors from httpx)
if e.response.status_code == 401:
raise APIUnauthorizedError(
"参数验证失败,请更新 Douyin_TikTok_Download_API 配置文件中的 {0},以匹配 {1} 新规则"
.format("ttwid", "douyin")
)
elif e.response.status_code == 404:
raise APINotFoundError("ttwid无法找到API端点")
else:
raise APIResponseError("链接:{0},状态码 {1}{2} ".format(
e.response.url, e.response.status_code, e.response.text
)
)
class VerifyFpManager:
@classmethod
def gen_verify_fp(cls) -> str:
"""
生成verifyFp 与 s_v_web_id (Generate verifyFp)
"""
base_str = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
t = len(base_str)
milliseconds = int(round(time.time() * 1000))
base36 = ""
while milliseconds > 0:
remainder = milliseconds % 36
if remainder < 10:
base36 = str(remainder) + base36
else:
base36 = chr(ord("a") + remainder - 10) + base36
milliseconds = int(milliseconds / 36)
r = base36
o = [""] * 36
o[8] = o[13] = o[18] = o[23] = "_"
o[14] = "4"
for i in range(36):
if not o[i]:
n = 0 or int(random.random() * t)
if i == 19:
n = 3 & n | 8
o[i] = base_str[n]
return "verify_" + r + "_" + "".join(o)
@classmethod
def gen_s_v_web_id(cls) -> str:
return cls.gen_verify_fp()
class BogusManager:
# 字符串方法生成X-Bogus参数
@classmethod
def xb_str_2_endpoint(cls, endpoint: str, user_agent: str) -> str:
try:
final_endpoint = XB(user_agent).getXBogus(endpoint)
except Exception as e:
raise RuntimeError("生成X-Bogus失败: {0})".format(e))
return final_endpoint[0]
# 字典方法生成X-Bogus参数
@classmethod
def xb_model_2_endpoint(cls, base_endpoint: str, params: dict, user_agent: str) -> str:
if not isinstance(params, dict):
raise TypeError("参数必须是字典类型")
param_str = "&".join([f"{k}={v}" for k, v in params.items()])
try:
xb_value = XB(user_agent).getXBogus(param_str)
except Exception as e:
raise RuntimeError("生成X-Bogus失败: {0})".format(e))
# 检查base_endpoint是否已有查询参数 (Check if base_endpoint already has query parameters)
separator = "&" if "?" in base_endpoint else "?"
final_endpoint = f"{base_endpoint}{separator}{param_str}&X-Bogus={xb_value[1]}"
return final_endpoint
# 字符串方法生成A-Bogus参数
# TODO: 未完成测试,暂时不提交至主分支。
# @classmethod
# def ab_str_2_endpoint_js_ver(cls, endpoint: str, user_agent: str) -> str:
# try:
# # 获取请求参数
# endpoint_query_params = urllib.parse.urlparse(endpoint).query
# # 确定A-Bogus JS文件路径
# js_path = os.path.dirname(os.path.abspath(__file__))
# a_bogus_js_path = os.path.join(js_path, 'a_bogus.js')
# with open(a_bogus_js_path, 'r', encoding='utf-8') as file:
# js_code = file.read()
# # 此处需要使用Node环境
# # - 安装Node.js
# # - 安装execjs库
# # - 安装NPM依赖
# # - npm install jsdom
# node_runtime = execjs.get('Node')
# context = node_runtime.compile(js_code)
# arg = [0, 1, 0, endpoint_query_params, "", user_agent]
# a_bougus = quote(context.call('get_a_bogus', arg), safe='')
# return a_bougus
# except Exception as e:
# raise RuntimeError("生成A-Bogus失败: {0})".format(e))
# 字典方法生成A-Bogus参数感谢 @JoeanAmier 提供的纯Python版本算法。
@classmethod
def ab_model_2_endpoint(cls, params: dict, user_agent: str) -> str:
if not isinstance(params, dict):
raise TypeError("参数必须是字典类型")
try:
ab_value = AB().get_value(params, )
except Exception as e:
raise RuntimeError("生成A-Bogus失败: {0})".format(e))
return quote(ab_value, safe='')
class SecUserIdFetcher:
# 预编译正则表达式
_DOUYIN_URL_PATTERN = re.compile(r"user/([^/?]*)")
_REDIRECT_URL_PATTERN = re.compile(r"sec_uid=([^&]*)")
@classmethod
async def get_sec_user_id(cls, url: str) -> str:
"""
从单个url中获取sec_user_id (Get sec_user_id from a single url)
Args:
url (str): 输入的url (Input url)
Returns:
str: 匹配到的sec_user_id (Matched sec_user_id)。
"""
if not isinstance(url, str):
raise TypeError("参数必须是字符串类型")
# 提取有效URL
url = extract_valid_urls(url)
if url is None:
raise (
APINotFoundError("输入的URL不合法。类名{0}".format(cls.__name__))
)
pattern = (
cls._REDIRECT_URL_PATTERN
if "v.douyin.com" in url
else cls._DOUYIN_URL_PATTERN
)
try:
transport = httpx.AsyncHTTPTransport(retries=5)
async with httpx.AsyncClient(
transport=transport, proxies=TokenManager.proxies, timeout=10
) as client:
response = await client.get(url, follow_redirects=True)
# 444一般为Nginx拦截不返回状态 (444 is generally intercepted by Nginx and does not return status)
if response.status_code in {200, 444}:
match = pattern.search(str(response.url))
if match:
return match.group(1)
else:
raise APIResponseError(
"未在响应的地址中找到sec_user_id检查链接是否为用户主页类名{0}"
.format(cls.__name__)
)
elif response.status_code == 401:
raise APIUnauthorizedError("未授权的请求。类名:{0}".format(cls.__name__)
)
elif response.status_code == 404:
raise APINotFoundError("未找到API端点。类名{0}".format(cls.__name__)
)
elif response.status_code == 503:
raise APIUnavailableError("API服务不可用。类名{0}".format(cls.__name__)
)
else:
raise APIResponseError("链接:{0},状态码 {1}{2} ".format(
response.url, response.status_code, response.text
)
)
except httpx.RequestError as exc:
raise APIConnectionError("请求端点失败,请检查当前网络环境。 链接:{0},代理:{1},异常类名:{2},异常详细信息:{3}"
.format(url, TokenManager.proxies, cls.__name__, exc)
)
@classmethod
async def get_all_sec_user_id(cls, urls: list) -> list:
"""
获取列表sec_user_id列表 (Get list sec_user_id list)
Args:
urls: list: 用户url列表 (User url list)
Return:
sec_user_ids: list: 用户sec_user_id列表 (User sec_user_id list)
"""
if not isinstance(urls, list):
raise TypeError("参数必须是列表类型")
# 提取有效URL
urls = extract_valid_urls(urls)
if urls == []:
raise (
APINotFoundError("输入的URL List不合法。类名{0}".format(cls.__name__)
)
)
sec_user_ids = [cls.get_sec_user_id(url) for url in urls]
return await asyncio.gather(*sec_user_ids)
class AwemeIdFetcher:
# 预编译正则表达式
_DOUYIN_VIDEO_URL_PATTERN = re.compile(r"video/([^/?]*)")
_DOUYIN_NOTE_URL_PATTERN = re.compile(r"note/([^/?]*)")
_DOUYIN_DISCOVER_URL_PATTERN = re.compile(r"modal_id=([0-9]+)")
@classmethod
async def get_aweme_id(cls, url: str) -> str:
"""
从单个url中获取aweme_id (Get aweme_id from a single url)
Args:
url (str): 输入的url (Input url)
Returns:
str: 匹配到的aweme_id (Matched aweme_id)。
"""
if not isinstance(url, str):
raise TypeError("参数必须是字符串类型")
# 提取有效URL
url = extract_valid_urls(url)
if url is None:
raise (
APINotFoundError("输入的URL不合法。类名{0}".format(cls.__name__))
)
# 重定向到完整链接
transport = httpx.AsyncHTTPTransport(retries=5)
async with httpx.AsyncClient(
transport=transport, proxies=TokenManager.proxies, timeout=10
) as client:
try:
response = await client.get(url, follow_redirects=True)
response.raise_for_status()
video_pattern = cls._DOUYIN_VIDEO_URL_PATTERN
note_pattern = cls._DOUYIN_NOTE_URL_PATTERN
discover_pattern = cls._DOUYIN_DISCOVER_URL_PATTERN
# 2024-4-22
# 嵌套如果超过3层需要修改此处代码 (If the nesting exceeds 3 layers, you need to modify this code)
match = video_pattern.search(str(response.url))
if video_pattern.search(str(response.url)):
aweme_id = match.group(1)
else:
match = note_pattern.search(str(response.url))
if match:
aweme_id = match.group(1)
else:
match = discover_pattern.search(str(response.url))
if match:
aweme_id = match.group(1)
else:
raise APIResponseError(
"未在响应的地址中找到aweme_id检查链接是否为作品页"
)
return aweme_id
except httpx.RequestError as exc:
# 捕获所有与 httpx 请求相关的异常情况 (Captures all httpx request-related exceptions)
raise APIConnectionError("请求端点失败,请检查当前网络环境。 链接:{0},代理:{1},异常类名:{2},异常详细信息:{3}"
.format(url, TokenManager.proxies, cls.__name__, exc)
)
except httpx.HTTPStatusError as e:
raise APIResponseError("链接:{0},状态码 {1}{2} ".format(
e.response.url, e.response.status_code, e.response.text
)
)
@classmethod
async def get_all_aweme_id(cls, urls: list) -> list:
"""
获取视频aweme_id,传入列表url都可以解析出aweme_id (Get video aweme_id, pass in the list url can parse out aweme_id)
Args:
urls: list: 列表url (list url)
Return:
aweme_ids: list: 视频的唯一标识,返回列表 (The unique identifier of the video, return list)
"""
if not isinstance(urls, list):
raise TypeError("参数必须是列表类型")
# 提取有效URL
urls = extract_valid_urls(urls)
if urls == []:
raise (
APINotFoundError("输入的URL List不合法。类名{0}".format(cls.__name__)
)
)
aweme_ids = [cls.get_aweme_id(url) for url in urls]
return await asyncio.gather(*aweme_ids)
class MixIdFetcher:
# 获取方法同AwemeIdFetcher
@classmethod
async def get_mix_id(cls, url: str) -> str:
return
class WebCastIdFetcher:
# 预编译正则表达式
_DOUYIN_LIVE_URL_PATTERN = re.compile(r"live/([^/?]*)")
# https://live.douyin.com/766545142636?cover_type=0&enter_from_merge=web_live&enter_method=web_card&game_name=&is_recommend=1&live_type=game&more_detail=&request_id=20231110224012D47CD00C18B4AE4BFF9B&room_id=7299828646049827596&stream_type=vertical&title_type=1&web_live_page=hot_live&web_live_tab=all
# https://live.douyin.com/766545142636
_DOUYIN_LIVE_URL_PATTERN2 = re.compile(r"http[s]?://live.douyin.com/(\d+)")
# https://webcast.amemv.com/douyin/webcast/reflow/7318296342189919011?u_code=l1j9bkbd&did=MS4wLjABAAAAEs86TBQPNwAo-RGrcxWyCdwKhI66AK3Pqf3ieo6HaxI&iid=MS4wLjABAAAA0ptpM-zzoliLEeyvWOCUt-_dQza4uSjlIvbtIazXnCY&with_sec_did=1&use_link_command=1&ecom_share_track_params=&extra_params={"from_request_id":"20231230162057EC005772A8EAA0199906","im_channel_invite_id":"0"}&user_id=3644207898042206&liveId=7318296342189919011&from=share&style=share&enter_method=click_share&roomId=7318296342189919011&activity_info={}
_DOUYIN_LIVE_URL_PATTERN3 = re.compile(r"reflow/([^/?]*)")
@classmethod
async def get_webcast_id(cls, url: str) -> str:
"""
从单个url中获取webcast_id (Get webcast_id from a single url)
Args:
url (str): 输入的url (Input url)
Returns:
str: 匹配到的webcast_id (Matched webcast_id)。
"""
if not isinstance(url, str):
raise TypeError("参数必须是字符串类型")
# 提取有效URL
url = extract_valid_urls(url)
if url is None:
raise (
APINotFoundError("输入的URL不合法。类名{0}".format(cls.__name__))
)
try:
# 重定向到完整链接
transport = httpx.AsyncHTTPTransport(retries=5)
async with httpx.AsyncClient(
transport=transport, proxies=TokenManager.proxies, timeout=10
) as client:
response = await client.get(url, follow_redirects=True)
response.raise_for_status()
url = str(response.url)
live_pattern = cls._DOUYIN_LIVE_URL_PATTERN
live_pattern2 = cls._DOUYIN_LIVE_URL_PATTERN2
live_pattern3 = cls._DOUYIN_LIVE_URL_PATTERN3
if live_pattern.search(url):
match = live_pattern.search(url)
elif live_pattern2.search(url):
match = live_pattern2.search(url)
elif live_pattern3.search(url):
match = live_pattern3.search(url)
logger.warning("该链接返回的是room_id请使用`fetch_user_live_videos_by_room_id`接口"
)
else:
raise APIResponseError("未在响应的地址中找到webcast_id检查链接是否为直播页"
)
return match.group(1)
except httpx.RequestError as exc:
# 捕获所有与 httpx 请求相关的异常情况 (Captures all httpx request-related exceptions)
raise APIConnectionError("请求端点失败,请检查当前网络环境。 链接:{0},代理:{1},异常类名:{2},异常详细信息:{3}"
.format(url, TokenManager.proxies, cls.__name__, exc)
)
except httpx.HTTPStatusError as e:
raise APIResponseError("链接:{0},状态码 {1}{2} ".format(
e.response.url, e.response.status_code, e.response.text
)
)
@classmethod
async def get_all_webcast_id(cls, urls: list) -> list:
"""
获取直播webcast_id,传入列表url都可以解析出webcast_id (Get live webcast_id, pass in the list url can parse out webcast_id)
Args:
urls: list: 列表url (list url)
Return:
webcast_ids: list: 直播的唯一标识,返回列表 (The unique identifier of the live, return list)
"""
if not isinstance(urls, list):
raise TypeError("参数必须是列表类型")
# 提取有效URL
urls = extract_valid_urls(urls)
if urls == []:
raise (
APINotFoundError("输入的URL List不合法。类名{0}".format(cls.__name__)
)
)
webcast_ids = [cls.get_webcast_id(url) for url in urls]
return await asyncio.gather(*webcast_ids)
def format_file_name(
naming_template: str,
aweme_data: dict = {},
custom_fields: dict = {},
) -> str:
"""
根据配置文件的全局格式化文件名
(Format file name according to the global conf file)
Args:
aweme_data (dict): 抖音数据的字典 (dict of douyin data)
naming_template (str): 文件的命名模板, 如 "{create}_{desc}" (Naming template for files, such as "{create}_{desc}")
custom_fields (dict): 用户自定义字段, 用于替代默认的字段值 (Custom fields for replacing default field values)
Note:
windows 文件名长度限制为 255 个字符, 开启了长文件名支持后为 32,767 个字符
(Windows file name length limit is 255 characters, 32,767 characters after long file name support is enabled)
Unix 文件名长度限制为 255 个字符
(Unix file name length limit is 255 characters)
取去除后的50个字符, 加上后缀, 一般不会超过255个字符
(Take the removed 50 characters, add the suffix, and generally not exceed 255 characters)
详细信息请参考: https://en.wikipedia.org/wiki/Filename#Length
(For more information, please refer to: https://en.wikipedia.org/wiki/Filename#Length)
Returns:
str: 格式化的文件名 (Formatted file name)
"""
# 为不同系统设置不同的文件名长度限制
os_limit = {
"win32": 200,
"cygwin": 60,
"darwin": 60,
"linux": 60,
}
fields = {
"create": aweme_data.get("create_time", ""), # 长度固定19
"nickname": aweme_data.get("nickname", ""), # 最长30
"aweme_id": aweme_data.get("aweme_id", ""), # 长度固定19
"desc": split_filename(aweme_data.get("desc", ""), os_limit),
"uid": aweme_data.get("uid", ""), # 固定11
}
if custom_fields:
# 更新自定义字段
fields.update(custom_fields)
try:
return naming_template.format(**fields)
except KeyError as e:
raise KeyError("文件名模板字段 {0} 不存在,请检查".format(e))
def create_user_folder(kwargs: dict, nickname: Union[str, int]) -> Path:
"""
根据提供的配置文件和昵称,创建对应的保存目录。
(Create the corresponding save directory according to the provided conf file and nickname.)
Args:
kwargs (dict): 配置文件,字典格式。(Conf file, dict format)
nickname (Union[str, int]): 用户的昵称,允许字符串或整数。 (User nickname, allow strings or integers)
Note:
如果未在配置文件中指定路径,则默认为 "Download"
(If the path is not specified in the conf file, it defaults to "Download".)
支持绝对与相对路径。
(Support absolute and relative paths)
Raises:
TypeError: 如果 kwargs 不是字典格式,将引发 TypeError。
(If kwargs is not in dict format, TypeError will be raised.)
"""
# 确定函数参数是否正确
if not isinstance(kwargs, dict):
raise TypeError("kwargs 参数必须是字典")
# 创建基础路径
base_path = Path(kwargs.get("path", "Download"))
# 添加下载模式和用户名
user_path = (
base_path / "douyin" / kwargs.get("mode", "PLEASE_SETUP_MODE") / str(nickname)
)
# 获取绝对路径并确保它存在
resolve_user_path = user_path.resolve()
# 创建目录
resolve_user_path.mkdir(parents=True, exist_ok=True)
return resolve_user_path
def rename_user_folder(old_path: Path, new_nickname: str) -> Path:
"""
重命名用户目录 (Rename User Folder).
Args:
old_path (Path): 旧的用户目录路径 (Path of the old user folder)
new_nickname (str): 新的用户昵称 (New user nickname)
Returns:
Path: 重命名后的用户目录路径 (Path of the renamed user folder)
"""
# 获取目标目录的父目录 (Get the parent directory of the target folder)
parent_directory = old_path.parent
# 构建新目录路径 (Construct the new directory path)
new_path = old_path.rename(parent_directory / new_nickname).resolve()
return new_path
def create_or_rename_user_folder(
kwargs: dict, local_user_data: dict, current_nickname: str
) -> Path:
"""
创建或重命名用户目录 (Create or rename user directory)
Args:
kwargs (dict): 配置参数 (Conf parameters)
local_user_data (dict): 本地用户数据 (Local user data)
current_nickname (str): 当前用户昵称 (Current user nickname)
Returns:
user_path (Path): 用户目录路径 (User directory path)
"""
user_path = create_user_folder(kwargs, current_nickname)
if not local_user_data:
return user_path
if local_user_data.get("nickname") != current_nickname:
# 昵称不一致,触发目录更新操作
user_path = rename_user_folder(user_path, current_nickname)
return user_path
def show_qrcode(qrcode_url: str, show_image: bool = False) -> None:
"""
显示二维码 (Show QR code)
Args:
qrcode_url (str): 登录二维码链接 (Login QR code link)
show_image (bool): 是否显示图像True 表示显示False 表示在控制台显示
(Whether to display the image, True means display, False means display in the console)
"""
if show_image:
# 创建并显示QR码图像
qr_code_img = qrcode.make(qrcode_url)
qr_code_img.show()
else:
# 在控制台以 ASCII 形式打印二维码
qr = qrcode.QRCode()
qr.add_data(qrcode_url)
qr.make(fit=True)
# 在控制台以 ASCII 形式打印二维码
qr.print_ascii(invert=True)
def json_2_lrc(data: Union[str, list, dict]) -> str:
"""
从抖音原声json格式歌词生成lrc格式歌词
(Generate lrc lyrics format from Douyin original json lyrics format)
Args:
data (Union[str, list, dict]): 抖音原声json格式歌词 (Douyin original json lyrics format)
Returns:
str: 生成的lrc格式歌词 (Generated lrc format lyrics)
"""
try:
lrc_lines = []
for item in data:
text = item["text"]
time_seconds = float(item["timeId"])
minutes = int(time_seconds // 60)
seconds = int(time_seconds % 60)
milliseconds = int((time_seconds % 1) * 1000)
time_str = f"{minutes:02}:{seconds:02}.{milliseconds:03}"
lrc_lines.append(f"[{time_str}] {text}")
except KeyError as e:
raise KeyError("歌词数据字段错误:{0}".format(e))
except RuntimeError as e:
raise RuntimeError("生成歌词文件失败:{0},请检查歌词 `data` 内容".format(e))
except TypeError as e:
raise TypeError("歌词数据类型错误:{0}".format(e))
return "\n".join(lrc_lines)