mirror of
https://github.com/Evil0ctal/Douyin_TikTok_Download_API.git
synced 2025-04-23 05:24:26 +08:00
395 lines
13 KiB
Python
395 lines
13 KiB
Python
# ==============================================================================
|
||
# Copyright (C) 2021 Evil0ctal
|
||
#
|
||
# This file is part of the Douyin_TikTok_Download_API project.
|
||
#
|
||
# This project is licensed under the Apache License 2.0 (the "License");
|
||
# you may not use this file except in compliance with the License.
|
||
# You may obtain a copy of the License at:
|
||
# http://www.apache.org/licenses/LICENSE-2.0
|
||
#
|
||
# Unless required by applicable law or agreed to in writing, software
|
||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
# See the License for the specific language governing permissions and
|
||
# limitations under the License.
|
||
# ==============================================================================
|
||
# __
|
||
# /> フ
|
||
# | _ _ l
|
||
# /` ミ_xノ
|
||
# / | Feed me Stars ⭐ ️
|
||
# / ヽ ノ
|
||
# │ | | |
|
||
# / ̄| | | |
|
||
# | ( ̄ヽ__ヽ_)__)
|
||
# \二つ
|
||
# ==============================================================================
|
||
#
|
||
# Contributor Link:
|
||
# - https://github.com/Evil0ctal
|
||
# - https://github.com/Johnserf-Seed
|
||
#
|
||
# ==============================================================================
|
||
|
||
|
||
import re
|
||
import sys
|
||
import random
|
||
import secrets
|
||
import datetime
|
||
import browser_cookie3
|
||
import importlib_resources
|
||
|
||
from pydantic import BaseModel
|
||
|
||
from urllib.parse import quote, urlencode # URL编码
|
||
from typing import Union, List, Any
|
||
from pathlib import Path
|
||
|
||
# 生成一个 16 字节的随机字节串 (Generate a random byte string of 16 bytes)
|
||
seed_bytes = secrets.token_bytes(16)
|
||
|
||
# 将字节字符串转换为整数 (Convert the byte string to an integer)
|
||
seed_int = int.from_bytes(seed_bytes, "big")
|
||
|
||
# 设置随机种子 (Seed the random module)
|
||
random.seed(seed_int)
|
||
|
||
|
||
# 将模型实例转换为字典
|
||
def model_to_query_string(model: BaseModel) -> str:
|
||
model_dict = model.dict()
|
||
# 使用urlencode进行URL编码
|
||
query_string = urlencode(model_dict)
|
||
return query_string
|
||
|
||
|
||
def gen_random_str(randomlength: int) -> str:
|
||
"""
|
||
根据传入长度产生随机字符串 (Generate a random string based on the given length)
|
||
|
||
Args:
|
||
randomlength (int): 需要生成的随机字符串的长度 (The length of the random string to be generated)
|
||
|
||
Returns:
|
||
str: 生成的随机字符串 (The generated random string)
|
||
"""
|
||
|
||
base_str = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+-"
|
||
return "".join(random.choice(base_str) for _ in range(randomlength))
|
||
|
||
|
||
def get_timestamp(unit: str = "milli"):
|
||
"""
|
||
根据给定的单位获取当前时间 (Get the current time based on the given unit)
|
||
|
||
Args:
|
||
unit (str): 时间单位,可以是 "milli"、"sec"、"min" 等
|
||
(The time unit, which can be "milli", "sec", "min", etc.)
|
||
|
||
Returns:
|
||
int: 根据给定单位的当前时间 (The current time based on the given unit)
|
||
"""
|
||
|
||
now = datetime.datetime.utcnow() - datetime.datetime(1970, 1, 1)
|
||
if unit == "milli":
|
||
return int(now.total_seconds() * 1000)
|
||
elif unit == "sec":
|
||
return int(now.total_seconds())
|
||
elif unit == "min":
|
||
return int(now.total_seconds() / 60)
|
||
else:
|
||
raise ValueError("Unsupported time unit")
|
||
|
||
|
||
def timestamp_2_str(
|
||
timestamp: Union[str, int, float], format: str = "%Y-%m-%d %H-%M-%S"
|
||
) -> str:
|
||
"""
|
||
将 UNIX 时间戳转换为格式化字符串 (Convert a UNIX timestamp to a formatted string)
|
||
|
||
Args:
|
||
timestamp (int): 要转换的 UNIX 时间戳 (The UNIX timestamp to be converted)
|
||
format (str, optional): 返回的日期时间字符串的格式。
|
||
默认为 '%Y-%m-%d %H-%M-%S'。
|
||
(The format for the returned date-time string
|
||
Defaults to '%Y-%m-%d %H-%M-%S')
|
||
|
||
Returns:
|
||
str: 格式化的日期时间字符串 (The formatted date-time string)
|
||
"""
|
||
if timestamp is None or timestamp == "None":
|
||
return ""
|
||
|
||
if isinstance(timestamp, str):
|
||
if len(timestamp) == 30:
|
||
return datetime.datetime.strptime(timestamp, "%a %b %d %H:%M:%S %z %Y")
|
||
|
||
return datetime.datetime.fromtimestamp(float(timestamp)).strftime(format)
|
||
|
||
|
||
def num_to_base36(num: int) -> str:
|
||
"""数字转换成base32 (Convert number to base 36)"""
|
||
|
||
base_str = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
|
||
|
||
if num == 0:
|
||
return "0"
|
||
|
||
base36 = []
|
||
while num:
|
||
num, i = divmod(num, 36)
|
||
base36.append(base_str[i])
|
||
|
||
return "".join(reversed(base36))
|
||
|
||
|
||
def split_set_cookie(cookie_str: str) -> str:
|
||
"""
|
||
拆分Set-Cookie字符串并拼接 (Split the Set-Cookie string and concatenate)
|
||
|
||
Args:
|
||
cookie_str (str): 待拆分的Set-Cookie字符串 (The Set-Cookie string to be split)
|
||
|
||
Returns:
|
||
str: 拼接后的Cookie字符串 (Concatenated cookie string)
|
||
"""
|
||
|
||
# 判断是否为字符串 / Check if it's a string
|
||
if not isinstance(cookie_str, str):
|
||
raise TypeError("`set-cookie` must be str")
|
||
|
||
# 拆分Set-Cookie字符串,避免错误地在expires字段的值中分割字符串 (Split the Set-Cookie string, avoiding incorrect splitting on the value of the 'expires' field)
|
||
# 拆分每个Cookie字符串,只获取第一个分段(即key=value部分) / Split each Cookie string, only getting the first segment (i.e., key=value part)
|
||
# 拼接所有的Cookie (Concatenate all cookies)
|
||
return ";".join(
|
||
cookie.split(";")[0] for cookie in re.split(", (?=[a-zA-Z])", cookie_str)
|
||
)
|
||
|
||
|
||
def split_dict_cookie(cookie_dict: dict) -> str:
|
||
return "; ".join(f"{key}={value}" for key, value in cookie_dict.items())
|
||
|
||
|
||
def extract_valid_urls(inputs: Union[str, List[str]]) -> Union[str, List[str], None]:
|
||
"""从输入中提取有效的URL (Extract valid URLs from input)
|
||
|
||
Args:
|
||
inputs (Union[str, list[str]]): 输入的字符串或字符串列表 (Input string or list of strings)
|
||
|
||
Returns:
|
||
Union[str, list[str]]: 提取出的有效URL或URL列表 (Extracted valid URL or list of URLs)
|
||
"""
|
||
url_pattern = re.compile(r"https?://\S+")
|
||
|
||
# 如果输入是单个字符串
|
||
if isinstance(inputs, str):
|
||
match = url_pattern.search(inputs)
|
||
return match.group(0) if match else None
|
||
|
||
# 如果输入是字符串列表
|
||
elif isinstance(inputs, list):
|
||
valid_urls = []
|
||
|
||
for input_str in inputs:
|
||
matches = url_pattern.findall(input_str)
|
||
if matches:
|
||
valid_urls.extend(matches)
|
||
|
||
return valid_urls
|
||
|
||
|
||
def _get_first_item_from_list(_list) -> list:
|
||
# 检查是否是列表 (Check if it's a list)
|
||
if _list and isinstance(_list, list):
|
||
# 如果列表里第一个还是列表则提起每一个列表的第一个值
|
||
# (If the first one in the list is still a list then bring up the first value of each list)
|
||
if isinstance(_list[0], list):
|
||
return [inner[0] for inner in _list if inner]
|
||
# 如果只是普通列表,则返回这个列表包含的第一个项目作为新列表
|
||
# (If it's just a regular list, return the first item wrapped in a list)
|
||
else:
|
||
return [_list[0]]
|
||
return []
|
||
|
||
|
||
def get_resource_path(filepath: str):
|
||
"""获取资源文件的路径 (Get the path of the resource file)
|
||
|
||
Args:
|
||
filepath: str: 文件路径 (file path)
|
||
"""
|
||
|
||
return importlib_resources.files("f2") / filepath
|
||
|
||
|
||
def replaceT(obj: Union[str, Any]) -> Union[str, Any]:
|
||
"""
|
||
替换文案非法字符 (Replace illegal characters in the text)
|
||
|
||
Args:
|
||
obj (str): 传入对象 (Input object)
|
||
|
||
Returns:
|
||
new: 处理后的内容 (Processed content)
|
||
"""
|
||
|
||
reSub = r"[^\u4e00-\u9fa5a-zA-Z0-9#]"
|
||
|
||
if isinstance(obj, list):
|
||
return [re.sub(reSub, "_", i) for i in obj]
|
||
|
||
if isinstance(obj, str):
|
||
return re.sub(reSub, "_", obj)
|
||
|
||
return obj
|
||
# raise TypeError("输入应为字符串或字符串列表")
|
||
|
||
|
||
def split_filename(text: str, os_limit: dict) -> str:
|
||
"""
|
||
根据操作系统的字符限制分割文件名,并用 '......' 代替。
|
||
|
||
Args:
|
||
text (str): 要计算的文本
|
||
os_limit (dict): 操作系统的字符限制字典
|
||
|
||
Returns:
|
||
str: 分割后的文本
|
||
"""
|
||
# 获取操作系统名称和文件名长度限制
|
||
os_name = sys.platform
|
||
filename_length_limit = os_limit.get(os_name, 200)
|
||
|
||
# 计算中文字符长度(中文字符长度*3)
|
||
chinese_length = sum(1 for char in text if "\u4e00" <= char <= "\u9fff") * 3
|
||
# 计算英文字符长度
|
||
english_length = sum(1 for char in text if char.isalpha())
|
||
# 计算下划线数量
|
||
num_underscores = text.count("_")
|
||
|
||
# 计算总长度
|
||
total_length = chinese_length + english_length + num_underscores
|
||
|
||
# 如果总长度超过操作系统限制或手动设置的限制,则根据限制进行分割
|
||
if total_length > filename_length_limit:
|
||
split_index = min(total_length, filename_length_limit) // 2 - 6
|
||
split_text = text[:split_index] + "......" + text[-split_index:]
|
||
return split_text
|
||
else:
|
||
return text
|
||
|
||
|
||
def ensure_path(path: Union[str, Path]) -> Path:
|
||
"""确保路径是一个Path对象 (Ensure the path is a Path object)"""
|
||
return Path(path) if isinstance(path, str) else path
|
||
|
||
|
||
def get_cookie_from_browser(browser_choice: str, domain: str = "") -> dict:
|
||
"""
|
||
根据用户选择的浏览器获取domain的cookie。
|
||
|
||
Args:
|
||
browser_choice (str): 用户选择的浏览器名称
|
||
|
||
Returns:
|
||
str: *.domain的cookie值
|
||
"""
|
||
|
||
if not browser_choice or not domain:
|
||
return ""
|
||
|
||
BROWSER_FUNCTIONS = {
|
||
"chrome": browser_cookie3.chrome,
|
||
"firefox": browser_cookie3.firefox,
|
||
"edge": browser_cookie3.edge,
|
||
"opera": browser_cookie3.opera,
|
||
"opera_gx": browser_cookie3.opera_gx,
|
||
"safari": browser_cookie3.safari,
|
||
"chromium": browser_cookie3.chromium,
|
||
"brave": browser_cookie3.brave,
|
||
"vivaldi": browser_cookie3.vivaldi,
|
||
"librewolf": browser_cookie3.librewolf,
|
||
}
|
||
cj_function = BROWSER_FUNCTIONS.get(browser_choice)
|
||
cj = cj_function(domain_name=domain)
|
||
cookie_value = {c.name: c.value for c in cj if c.domain.endswith(domain)}
|
||
return cookie_value
|
||
|
||
|
||
def check_invalid_naming(
|
||
naming: str, allowed_patterns: list, allowed_separators: list
|
||
) -> list:
|
||
"""
|
||
检查命名是否符合命名模板 (Check if the naming conforms to the naming template)
|
||
|
||
Args:
|
||
naming (str): 命名字符串 (Naming string)
|
||
allowed_patterns (list): 允许的模式列表 (List of allowed patterns)
|
||
allowed_separators (list): 允许的分隔符列表 (List of allowed separators)
|
||
Returns:
|
||
list: 无效的模式列表 (List of invalid patterns)
|
||
"""
|
||
if not naming or not allowed_patterns or not allowed_separators:
|
||
return []
|
||
|
||
temp_naming = naming
|
||
invalid_patterns = []
|
||
|
||
# 检查提供的模式是否有效
|
||
for pattern in allowed_patterns:
|
||
if pattern in temp_naming:
|
||
temp_naming = temp_naming.replace(pattern, "")
|
||
|
||
# 此时,temp_naming应只包含分隔符
|
||
for char in temp_naming:
|
||
if char not in allowed_separators:
|
||
invalid_patterns.append(char)
|
||
|
||
# 检查连续的无效模式或分隔符
|
||
for pattern in allowed_patterns:
|
||
# 检查像"{xxx}{xxx}"这样的模式
|
||
if pattern + pattern in naming:
|
||
invalid_patterns.append(pattern + pattern)
|
||
for sep in allowed_patterns:
|
||
# 检查像"{xxx}-{xxx}"这样的模式
|
||
if pattern + sep + pattern in naming:
|
||
invalid_patterns.append(pattern + sep + pattern)
|
||
|
||
return invalid_patterns
|
||
|
||
|
||
def merge_config(
|
||
main_conf: dict = ...,
|
||
custom_conf: dict = ...,
|
||
**kwargs,
|
||
):
|
||
"""
|
||
合并配置参数,使 CLI 参数优先级高于自定义配置,自定义配置优先级高于主配置,最终生成完整配置参数字典。
|
||
|
||
Args:
|
||
main_conf (dict): 主配置参数字典
|
||
custom_conf (dict): 自定义配置参数字典
|
||
**kwargs: CLI 参数和其他额外的配置参数
|
||
|
||
Returns:
|
||
dict: 合并后的配置参数字典
|
||
"""
|
||
# 合并主配置和自定义配置
|
||
merged_conf = {}
|
||
for key, value in main_conf.items():
|
||
merged_conf[key] = value # 将主配置复制到合并后的配置中
|
||
for key, value in custom_conf.items():
|
||
if value is not None and value != "": # 只有值不为 None 和 空值,才进行合并
|
||
merged_conf[key] = value # 自定义配置参数会覆盖主配置中的同名参数
|
||
|
||
# 合并 CLI 参数与合并后的配置,确保 CLI 参数的优先级最高
|
||
for key, value in kwargs.items():
|
||
if key not in merged_conf: # 如果合并后的配置中没有这个键,则直接添加
|
||
merged_conf[key] = value
|
||
elif value is not None and value != "": # 如果值不为 None 和 空值,则进行合并
|
||
merged_conf[key] = value # CLI 参数会覆盖自定义配置和主配置中的同名参数
|
||
|
||
return merged_conf
|