Bug fix about history length for some website

This commit is contained in:
NaiboWang-Alienware 2023-10-08 17:09:27 +08:00
parent bb2638b432
commit 1ce5e280af
2 changed files with 45 additions and 21 deletions

View File

@ -12,7 +12,7 @@
"justMyCode": false, "justMyCode": false,
// "args": ["--id", "[7]", "--read_type", "remote", "--headless", "0"] // "args": ["--id", "[7]", "--read_type", "remote", "--headless", "0"]
// "args": ["--id", "[9]", "--read_type", "remote", "--headless", "0", "--saved_file_name", "YOUTUBE"] // "args": ["--id", "[9]", "--read_type", "remote", "--headless", "0", "--saved_file_name", "YOUTUBE"]
"args": ["--id", "[36]", "--headless", "0", "--user_data", "0", "--keyboard", "0"] "args": ["--id", "[14]", "--headless", "0", "--user_data", "0", "--keyboard", "0"]
} }
] ]
} }

View File

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# import atexit # import atexit
import undetected_chromedriver as uc
from utils import download_image, get_output_code, isnotnull, lowercase_tags_in_xpath, myMySQL, new_line, on_press_creator, on_release_creator, replace_field_values, write_to_csv, write_to_excel, write_to_json from utils import download_image, get_output_code, isnotnull, lowercase_tags_in_xpath, myMySQL, new_line, on_press_creator, on_release_creator, replace_field_values, write_to_csv, write_to_excel, write_to_json
from myChrome import MyChrome from myChrome import MyChrome
from threading import Thread, Event from threading import Thread, Event
@ -41,7 +42,6 @@ from urllib.parse import urljoin
from lxml import etree from lxml import etree
import onnxruntime import onnxruntime
onnxruntime.set_default_logger_severity(3) # 隐藏onnxruntime的日志 onnxruntime.set_default_logger_severity(3) # 隐藏onnxruntime的日志
import undetected_chromedriver as uc
# import pandas as pd # import pandas as pd
# import numpy # import numpy
# import pytesseract # import pytesseract
@ -157,7 +157,7 @@ class BrowserThread(Thread):
self.OUTPUT.append([]) # 添加表头 self.OUTPUT.append([]) # 添加表头
self.writeMode = 0 self.writeMode = 0
elif self.outputFormat == "json": elif self.outputFormat == "json":
self.writeMode = 3 # JSON模式无需判断是否存在文件 self.writeMode = 3 # JSON模式无需判断是否存在文件
elif self.outputFormat == "mysql": elif self.outputFormat == "mysql":
self.mysql = myMySQL(config["mysql_config_path"]) self.mysql = myMySQL(config["mysql_config_path"])
self.mysql.create_table(self.saveName, service["outputParameters"]) self.mysql.create_table(self.saveName, service["outputParameters"])
@ -409,7 +409,8 @@ class BrowserThread(Thread):
elif self.outputFormat == "json": elif self.outputFormat == "json":
file_name = "Data/Task_" + \ file_name = "Data/Task_" + \
str(self.id) + "/" + self.saveName + '.json' str(self.id) + "/" + self.saveName + '.json'
write_to_json(file_name, self.OUTPUT, self.outputParametersTypes, self.outputParametersRecord, self.outputParameters.keys()) write_to_json(file_name, self.OUTPUT, self.outputParametersTypes,
self.outputParametersRecord, self.outputParameters.keys())
elif self.outputFormat == "mysql": elif self.outputFormat == "mysql":
self.mysql.write_to_mysql( self.mysql.write_to_mysql(
self.OUTPUT, self.outputParametersRecord, self.outputParametersTypes) self.OUTPUT, self.outputParametersRecord, self.outputParametersTypes)
@ -647,7 +648,8 @@ class BrowserThread(Thread):
optionValue = loopValue optionValue = loopValue
optionMode = 1 optionMode = 1
try: try:
xpath = replace_field_values(para["xpath"], self.outputParameters, self) xpath = replace_field_values(
para["xpath"], self.outputParameters, self)
dropdown = Select(self.browser.find_element( dropdown = Select(self.browser.find_element(
By.XPATH, xpath, iframe=para["iframe"])) By.XPATH, xpath, iframe=para["iframe"]))
try: try:
@ -678,7 +680,8 @@ class BrowserThread(Thread):
def moveToElement(self, para, loopElement=None, loopPath="", index=0): def moveToElement(self, para, loopElement=None, loopPath="", index=0):
time.sleep(0.1) # 移动之前等待0.1秒 time.sleep(0.1) # 移动之前等待0.1秒
loopPath = replace_field_values(loopPath, self.outputParameters, self) loopPath = replace_field_values(loopPath, self.outputParameters, self)
xpath = replace_field_values(para["xpath"], self.outputParameters, self) xpath = replace_field_values(
para["xpath"], self.outputParameters, self)
if para["useLoop"]: # 使用循环的情况下传入的clickPath就是实际的xpath if para["useLoop"]: # 使用循环的情况下传入的clickPath就是实际的xpath
if xpath == "": if xpath == "":
path = loopPath path = loopPath
@ -873,8 +876,11 @@ class BrowserThread(Thread):
def loopExecute(self, node, loopValue, clickPath="", index=0): def loopExecute(self, node, loopValue, clickPath="", index=0):
time.sleep(0.1) # 第一次执行循环的时候强制等待1秒 time.sleep(0.1) # 第一次执行循环的时候强制等待1秒
thisHandle = self.browser.current_window_handle # 记录本次循环内的标签页的ID thisHandle = self.browser.current_window_handle # 记录本次循环内的标签页的ID
thisHistoryLength = self.browser.execute_script( try:
'return history.length') # 记录本次循环内的history的length thisHistoryLength = self.browser.execute_script(
'return history.length') # 记录本次循环内的history的length
except:
thisHistoryLength = 0
self.history["index"] = thisHistoryLength self.history["index"] = thisHistoryLength
self.history["handle"] = thisHandle self.history["handle"] = thisHandle
if int(node["parameters"]["loopType"]) == 0: # 单个元素循环 if int(node["parameters"]["loopType"]) == 0: # 单个元素循环
@ -1030,7 +1036,8 @@ class BrowserThread(Thread):
# 千万不要忘了分割!! # 千万不要忘了分割!!
for path in node["parameters"]["pathList"].split("\n"): for path in node["parameters"]["pathList"].split("\n"):
try: try:
path = replace_field_values(path, self.outputParameters, self) path = replace_field_values(
path, self.outputParameters, self)
element = self.browser.find_element( element = self.browser.find_element(
By.XPATH, path, iframe=node["parameters"]["iframe"]) By.XPATH, path, iframe=node["parameters"]["iframe"])
# self.recordLog("循环元素|Loop element:", path) # self.recordLog("循环元素|Loop element:", path)
@ -1224,13 +1231,17 @@ class BrowserThread(Thread):
"return history.length") "return history.length")
except: except:
self.history["index"] = 0 self.history["index"] = 0
except Exception as e:
self.print_and_log("History Length Error")
self.history["index"] = 0
self.scrollDown(para) # 控制屏幕向下滚动 self.scrollDown(para) # 控制屏幕向下滚动
# 键盘输入事件 # 键盘输入事件
def inputInfo(self, para, loopValue): def inputInfo(self, para, loopValue):
time.sleep(0.1) # 输入之前等待0.1秒 time.sleep(0.1) # 输入之前等待0.1秒
try: try:
xpath = replace_field_values(para["xpath"], self.outputParameters, self) xpath = replace_field_values(
para["xpath"], self.outputParameters, self)
textbox = self.browser.find_element( textbox = self.browser.find_element(
By.XPATH, xpath, iframe=para["iframe"]) By.XPATH, xpath, iframe=para["iframe"])
# textbox.send_keys(Keys.CONTROL, 'a') # textbox.send_keys(Keys.CONTROL, 'a')
@ -1289,8 +1300,10 @@ class BrowserThread(Thread):
try: try:
# element = self.browser.find_element( # element = self.browser.find_element(
# By.XPATH, path, iframe=para["iframe"]) # By.XPATH, path, iframe=para["iframe"])
clickPath = replace_field_values(clickPath, self.outputParameters, self) clickPath = replace_field_values(
xpath = replace_field_values(para["xpath"], self.outputParameters, self) clickPath, self.outputParameters, self)
xpath = replace_field_values(
para["xpath"], self.outputParameters, self)
if para["useLoop"]: # 使用循环的情况下传入的clickPath就是实际的xpath if para["useLoop"]: # 使用循环的情况下传入的clickPath就是实际的xpath
if xpath == "": if xpath == "":
path = clickPath path = clickPath
@ -1375,6 +1388,9 @@ class BrowserThread(Thread):
pass pass
self.history["index"] = self.browser.execute_script( self.history["index"] = self.browser.execute_script(
"return history.length") "return history.length")
except Exception as e:
self.print_and_log("History Length Error")
self.history["index"] = 0
else: else:
try: try:
self.history["index"] = self.browser.execute_script( self.history["index"] = self.browser.execute_script(
@ -1387,6 +1403,9 @@ class BrowserThread(Thread):
self.history["index"] = self.browser.execute_script( self.history["index"] = self.browser.execute_script(
"return history.length") "return history.length")
# 如果打开了新窗口,切换到新窗口 # 如果打开了新窗口,切换到新窗口
except Exception as e:
self.print_and_log("History Length Error")
self.history["index"] = 0
self.scrollDown(para) # 根据参数配置向下滚动 self.scrollDown(para) # 根据参数配置向下滚动
# rt.end() # rt.end()
@ -1556,7 +1575,8 @@ class BrowserThread(Thread):
# 提取数据事件 # 提取数据事件
def getData(self, para, loopElement, isInLoop=True, parentPath="", index=0): def getData(self, para, loopElement, isInLoop=True, parentPath="", index=0):
parentPath = replace_field_values(parentPath, self.outputParameters, self) parentPath = replace_field_values(
parentPath, self.outputParameters, self)
if para["clear"] == 1: if para["clear"] == 1:
self.clearOutputParameters() self.clearOutputParameters()
try: try:
@ -1762,7 +1782,7 @@ class BrowserThread(Thread):
if __name__ == '__main__': if __name__ == '__main__':
from multiprocessing import freeze_support from multiprocessing import freeze_support
freeze_support() # 防止无限死循环多开 freeze_support() # 防止无限死循环多开
# 如果需要调试程序,请在命令行参数中加入--keyboard 0 来禁用键盘监听以提升调试速度 # 如果需要调试程序,请在命令行参数中加入--keyboard 0 来禁用键盘监听以提升调试速度
# If you need to debug the program, please add --keyboard 0 in the command line parameters to disable keyboard listening to improve debugging speed # If you need to debug the program, please add --keyboard 0 in the command line parameters to disable keyboard listening to improve debugging speed
@ -1961,11 +1981,15 @@ if __name__ == '__main__':
options.binary_location = "C:\\Program Files\\Google\\Chrome Beta\\Application\\chrome.exe" # 需要用自己的浏览器 options.binary_location = "C:\\Program Files\\Google\\Chrome Beta\\Application\\chrome.exe" # 需要用自己的浏览器
# options.add_argument("--auto-open-devtools-for-tabs") # options.add_argument("--auto-open-devtools-for-tabs")
# options.binary_location = "C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe" # 需要用自己的浏览器 # options.binary_location = "C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe" # 需要用自己的浏览器
browser_t = MyUCChrome(options=options, driver_executable_path=driver_path) browser_t = MyUCChrome(
options=options, driver_executable_path=driver_path)
links = list(filter(isnotnull, service["links"].split("\n"))) links = list(filter(isnotnull, service["links"].split("\n")))
browser_t.execute_script('window.open("'+ links[0] +'","_blank");') # open page in new tab # open page in new tab
time.sleep(5) # wait until page has loaded browser_t.execute_script(
browser_t.switch_to.window(browser_t.window_handles[1]) # switch to new tab 'window.open("' + links[0] + '","_blank");')
time.sleep(5) # wait until page has loaded
browser_t.switch_to.window(
browser_t.window_handles[1]) # switch to new tab
# browser_t = uc.Chrome() # browser_t = uc.Chrome()
else: else:
print("Cloudflare模式只支持Windows x64平台。") print("Cloudflare模式只支持Windows x64平台。")