diff --git a/.gitignore b/.gitignore index 2934958..508e5d0 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ *.vscode *.pdf *.pt +chromedriver package.json package-lock.json venv/ diff --git a/抖音js逆向学习/抖店精选联盟数据/dao/mysql_dao.py b/抖音js逆向学习/抖店精选联盟数据/dao/mysql_dao.py index edd65c6..19b2c6e 100644 --- a/抖音js逆向学习/抖店精选联盟数据/dao/mysql_dao.py +++ b/抖音js逆向学习/抖店精选联盟数据/dao/mysql_dao.py @@ -1,648 +1,348 @@ -from handlers.base import Base -import pymysql.cursors -import settings - -base = Base() +# -*- coding: utf8 -*- +from collections import OrderedDict +from copy import deepcopy +import pymysql +import time +from DBUtils.PooledDB import PooledDB +from DBUtils.PersistentDB import PersistentDB +import traceback +from datetime import datetime -def getConnection(): - return pymysql.connect( - host=settings.HOST, - port=settings.PORT, - user=settings.USER, - password=settings.PASSWORD, - database=settings.DATABASE, - autocommit=True, - cursorclass=pymysql.cursors.DictCursor - ) - - -def sqlConn(func): - """ use a decorator to make connection easier to use +class StoreMysqlPool(object): + """ + mysql读写相关操作,pymysql,使用PooledDB连接池 + Args: + host:数据库ip + user:数据库用户名 + password:数据库用户密码 + db:数据库名 + port:数据库端口,默认3306 + mincached:最少的空闲连接数, 默认为1 + charset:数据库编码,默认utf8 """ - def __sqlFunc(*args): - conn = None - cursor = None + def __init__(self, host="", user="", password="", db="", port=3306, mincached=1, + pattern=1, charset="utf8"): + self.host = host + + if pattern == 1: + self.pool = PooledDB(pymysql, mincached, host=host, user=user, passwd=password, db=db, port=port, + charset=charset) + self.close_able = True + elif pattern == 2: + self.pool = PersistentDB(pymysql, host=host, user=user, passwd=password, db=db, port=port, + charset=charset) + + self.close_able = False + self._last_use_time = time.time() + + def close(self, db): + if db is not None: + db.close() + + def connect(self): + return self.pool.connection() + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close_all() + + @staticmethod + def _cursor(db): + # return db.cursor(pymysql.cursors.DictCursor) + return db.cursor() + + def query(self, sql): + """ + 根据sql查询 + Returns: + 数组的数组,外层数组元素为一行,内存数组元素为一行的一列 + """ + db = self.connect() + cur = self._cursor(db) + rows = [] try: - conn = getConnection() - cursor = conn.cursor() - return func(*args, cursor=cursor) + cur.execute(sql) + db.commit() + rows = cur.fetchall() + except pymysql.OperationalError: + print(traceback.format_exc()) + except Exception as e: + print('query{}'.format(e)) + print(traceback.format_exc()) finally: - if cursor: - cursor.close() - if conn: - conn.close() + cur.close() + self.close(db) + return rows - return __sqlFunc + def count(self, tb): + """ + 返回某表的行数 + Args: + tb:字符串,表名称 + """ + sql = 'select count(*) from %s' % tb + results = self.query(sql) + if len(results) == 1 and len(results[0]) == 1: + return int(results[0][0]) + + def do(self, sql, flag='rowcount'): + """ + 执行sql,insert/delete/update操作 + Args: + sql:要执行的sql + flag:返回值类型,flag=lastrowid返回lastrowid,flag=rowcount返回rowcount + """ + db = self.connect() + cur = self._cursor(db) + r = None + try: + cur.execute(sql) + db.commit() + r = 1 + if flag == 'lastrowid': + r = cur.lastrowid + elif flag == 'rowcount': + r = cur.rowcount + except pymysql.OperationalError: + print("time: %s; Error on %s: %s" % (str(datetime.now()), self.host, sql[0: 200])) + print(traceback.format_exc()) + r = -1 + except Exception: + print("time: %s; Error to MySQL on %s: %s" % (str(datetime.now()), self.host, sql[0: 200])) + print(traceback.format_exc()) + r = -1 + finally: + cur.close() + self.close(db) + return r + + def save(self, table, data, mapping_fields=dict()): + """ + 将字典直接insert到数据库 + Args: + table:字符串,插入目标表的名称 + data:字典格式,key为字段名称,value为字段值,如{'id':'1','name':'temp'} + mapping_fields: 用于保持data字典的key与数据库字段的对应关系, + 如果结果字典的某个key不包含在mapping_fields中,则将直接使用key作为字段名 + """ + + if len(data) <= 0: + return -1 + try: + fields = '' + values = '' + for d in data: + if d in mapping_fields: + fields += "`%s`," % (str(mapping_fields[d])) + else: + fields += "`%s`," % (str(d)) + values += "'%s'," % (pymysql.escape_string(str(data[d]))) + if len(fields) <= 0 or len(values) <= 0: + return -1 + sql = "insert ignore into %s(%s) values(%s)" % (table, fields[:-1], values[:-1]) + return self.do(sql) + except Exception: + print(traceback.format_exc()) + return -1 + + def saveMany(self, sql, values, flag='lastrowid'): + """ + 执行sql,insert/delete/update操作 + :param table: 'insert into table(id,name) values(%s,%s)' + :param names: 字符串 + :param values:[{1,2},{1,2},...] 是一个列表,列表中的每一个元素必须是元组!!! + :param flag: + :return: + """ + db = self.connect() + cur = self._cursor(db) + r = None + try: + cur.executemany(sql, values) + db.commit() + r = 1 + if flag == 'lastrowid': + r = cur.lastrowid + elif flag == 'rowcount': + r = cur.rowcount + + except pymysql.OperationalError: + print("Error connecting to MySQL on %s: %s" % (self.host, sql[0:255])) + print(traceback.format_exc()) + r = -1 + except Exception: + print("Error connecting to MySQL on %s: %s" % (self.host, sql[0:255])) + print(traceback.format_exc()) + r = -1 + finally: + cur.close() + self.close(db) + return r + + def update(self, table, data, field, mapping_fields=dict()): + """ + 将字典直接update到数据库 + Args: + table:字符串,更新目标表的名称 + data:字典格式,key为字段名称,value为字段值,如{'id':'1','name':'temp'} + field:唯一索引字段,即根据该字段判断是否为同一条记录,作为where条件 + mapping_fields: 用于保持data字典的key与数据库字段的对应关系, + 如果结果字典的某个key不包含在mapping_fields中,则将直接使用key作为字段名 + """ + if len(data) <= 0: + return -1 + else: + try: + values = '' + field_value = None + for d in data: + key = d + if d in mapping_fields: + key = mapping_fields[d] + if key == field: + field_value = data[d] + values += "%s='%s'," % (str(key), pymysql.escape_string(str(data[d]))) + if len(values) <= 0 or field_value is None: + return -1 + sql = "update " + table + " set " + values[:-1] + " where " + field + "='" + pymysql.escape_string( + str(field_value)) + "'" + return self.do(sql, flag='rowcount') + except Exception: + print(traceback.format_exc()) + return -1 + + def saveorupdate(self, table, data, field, mapping_fields=dict()): + """ + 将字典更新到数据库,如果已存在则update,不存在则insert + Args: + table:字符串,更新目标表的名称 + data:字典格式,key为字段名称,value为字段值,如{'id':'1','name':'temp'} + field:唯一索引字段,即根据词字段判断是否为同一条记录,作为where条件 + mapping_fields: 用于保持data字典的key与数据库字段的对应关系, + 如果结果字典的某个key不包含在mapping_fields中,则将直接使用key作为字段名 + """ + if len(data) <= 0: + return -1 + try: + field_value = None + if field in data: + field_value = data[field] + else: + for key in mapping_fields: + if mapping_fields[key] == field and key in data: + field_value = data[key] + if field_value is None: + return -1 + querysql = "select count(1) from " + table + " where " + field + "='" + pymysql.escape_string( + str(field_value)) + "'" + ed = self.query(querysql) + if ed and ed[0][0] > 0: + return self.update(table, data, field, mapping_fields) + else: + return self.save(table, data, mapping_fields) + except Exception: + print(traceback.format_exc()) + return -1 + + def tableCreate(self, tb_name, likeTable): + """ + 创建数据表格 + :param tb_name: table's name + :return: 1 ok , -1 error + """ + # sql = "SELECT table_name FROM information_schema.TABLES WHERE table_name ='{}';".format(tb_name) + sql = "show tables;" + datas = self.query(sql) + if tb_name in [str(i[0]) for i in datas]: + # 存在 + sql = "select * from {} limit 10;".format(tb_name) + if not self.query(sql): + print("{} 是空表, 正在truncate.....".format(tb_name)) + sql = " truncate {};".format(tb_name) + datas = self.query(sql) + else: + print("表格存在:{}".format(tb_name)) + + else: + # 不存在 创建 清空 + print("{} 表格不存在,正在创建....".format(tb_name)) + sql = "create table {} like {};".format(tb_name, likeTable) + datas = self.query(sql) + sql = " truncate {};".format(tb_name) + datas = self.query(sql) + return 1 + + def dictsToOrderDicts(self, list_dicts): + alist = [] + # list_dicts = [list_dicts] if isinstance(list_dicts, dict) else list_dicts + ks = list_dicts[0].keys() # 有序 + order_dict = OrderedDict() + for adict in list_dicts: + for k in ks: + order_dict[k] = adict[k] + alist.append(deepcopy(order_dict)) + return alist + + def insert_many(self, table, list_dicts, conflict=''): + ''' + [批量插入] + :param store_name: + :param table: + :param list_dicts: + :param conflict: 冲突更新字段 local_date=CURRENT_DATE() or ['local_date','fields'] + :return: + ''' + try: + if len(list_dicts): + orderDicts = self.dictsToOrderDicts(list_dicts) + ks = orderDicts[0].keys() # 获取键值 + if conflict: + if isinstance(conflict, str): + insert_sql = "insert ignore into {} ({}) values ({}) ON DUPLICATE KEY UPDATE {}".format( + table, ", ".join(ks), ("%s," * len(ks)).strip(","), conflict) + else: + if len(conflict): + conflict = ','.join(['{}=VALUES({})'.format(field, field) for field in conflict]) + insert_sql = "insert ignore into {} ({}) values ({}) ON DUPLICATE KEY UPDATE {}".format( + table, ", ".join(ks), ("%s," * len(ks)).strip(","), conflict) + else: + return -1 + + else: + insert_sql = "insert ignore into {} ({}) values ({})".format(table, ", ".join(ks), + ("%s," * len(ks)).strip(",")) + values = [] + for adict in list_dicts: + values.append(tuple(adict.values())) + res = self.saveMany(insert_sql, values, flag="rowcount") + return res + else: + return -3 + except Exception as e: + print('insert_many{}'.format(e)) + return -2 + + def close_all(self): + """ + 关闭连接池全部连接 + 请在确保连接不再需要时确认 + :return: + """ + if self.close_able: + self.pool.close() -@sqlConn -def getFinancialData(symbol, startDate, endDate, page, limit, **kwargs): - """ fetch financial from db - """ - sql = f"SELECT symbol, date, open_price, close_price, volume FROM Financial " + \ - f"WHERE date >= '{startDate}' AND " + \ - f"date <= '{endDate}' AND symbol = '{symbol}' LIMIT {(page - 1) * limit}, {limit};" - - # base.log(sql) - cursor = kwargs.get('cursor') - cursor.execute(sql) - rows = cursor.fetchall() - return rows - - -@sqlConn -def getPaginationData(symbol, startDate, endDate, page, limit, **kwargs): - """ get pagination information - """ - sql = f"SELECT COUNT(*) AS cnt FROM Financial WHERE date >= '{startDate}' " + \ - f"AND date <= '{endDate}' AND symbol = '{symbol}';" - - # base.log(sql) - cursor = kwargs.get('cursor') - cursor.execute(sql) - c = cursor.fetchone() - - return { - 'count': c['cnt'], - 'page': page, - 'limit': limit, - 'pages': (c['cnt'] + limit - 1) // limit +def test(): + db = { + 'host': 'localhost', + 'user': 'root', + 'password': '', + 'db': 'test' } + mq = StoreMysqlPool(**db) + print(mq.count('urls')) -@sqlConn -def initDatabase(**kwargs): - sql = """ - CREATE TABLE IF NOT EXISTS Financial( - id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, - symbol VARCHAR(255) NOT NULL, - `date` VARCHAR(16) NOT NULL, - open_price VARCHAR(255) NOT NULL, - close_price VARCHAR(255) NOT NULL, - volume VARCHAR(255) NOT NULL, - UNIQUE(symbol, `date`) - ); - """ - - cursor = kwargs.get('cursor') - cursor.execute(sql) - - -@sqlConn -def updateFinancialData(symbol, data, **kwargs): - """ save data to db - @return 0: success, other: failure - """ - - # use ignore, so the insert will ignore those duplicated key - sql = "INSERT IGNORE INTO Financial (symbol, date, open_price, close_price, volume) VALUES " - - i = 1 - for date, fields in data.items(): - sql += "('%s', '%s', '%s', '%s', '%s')" % \ - (symbol, date, fields['1. open'], fields['4. close'], fields['6. volume']) - - if i < len(data): - sql += "," - - i += 1 - - cursor = kwargs.get('cursor') - cursor.execute(sql) - base.log(f'更新 {symbol} 数量: {cursor.rowcount}') - return cursor.rowcount - - -@sqlConn -def insert_project_spider(symbol, item_list, **kwargs): - sql = f""" - INSERT IGNORE INTO {symbol} (task_id, payload_get, payload_post, deduplication, weight) VALUES - """ - for item in item_list: - sql += "('%s', '%s', '%s', '%s', '%s')," % ( - item['task_id'], item['payload_get'], - item['payload_post'], item['deduplication'], - item['weight'] - ) - sql = sql.rstrip(',') - - cursor = kwargs.get('cursor') - cursor.execute(sql) - base.log(f'插入 {symbol} 数量: {cursor.rowcount}') - return cursor.rowcount - - -@sqlConn -def insert_project_user_monitor(symbol, item_list, **kwargs): - sql = f""" - INSERT IGNORE INTO {symbol} (task_id, authorId, BeginTime, EndTime, weight) VALUES - """ - for item in item_list: - sql += "('%s', '%s', '%s', '%s', '%s')," % ( - item['task_id'], item['authorId'], - item['BeginTime'], item['EndTime'], - item['weight'] - ) - sql = sql.rstrip(',') - cursor = kwargs.get('cursor') - cursor.execute(sql) - base.log(f'插入 {symbol} 数量: {cursor.rowcount}') - return cursor.rowcount - - -@sqlConn -def get_user_monitor(symbol, **kwargs): - sql = f""" - SELECT - authorId, - BeginTime, - EndTime - FROM - clean_daduoduo_dy_author_room_info - WHERE - authorId = '{symbol}' - ORDER BY BeginTime DESC - LIMIT 5 - """ - cursor = kwargs.get('cursor') - cursor.execute(sql) - statistics = cursor.fetchall() - return statistics - - -@sqlConn -def get_project_spider(symbol, task_id, deduplication, **kwargs): - """ - sql 判断5分钟内是否已经抓取 - """ - sql = f""" - SELECT - status - FROM - {symbol} - WHERE - deduplication = '{deduplication}' and - create_time>=DATE_SUB(NOW(),INTERVAL 5 MINUTE) - """ - cursor = kwargs.get('cursor') - cursor.execute(sql) - statistics = cursor.fetchone() - return statistics - - -@sqlConn -def getStatisticsData(symbol, startDate, endDate, **kwargs): - """ get the average value of the given peroid - """ - sql = f"SELECT AVG(open_price) AS avg_open, AVG(close_price) AS avg_close, AVG(volume) " + \ - f"AS avg_volume FROM Financial WHERE date >= '{startDate}' " + \ - f"AND date <= '{endDate}' AND symbol = '{symbol}';" - cursor = kwargs.get('cursor') - cursor.execute(sql) - statistics = cursor.fetchone() - return statistics - - -@sqlConn -def get_anchor_info(symbol, **kwargs): - sql = f""" - SELECT - b.UserId, - b.UserId as userSn, - b.HeaderImg, - b.FavCnt, - b.FansCnt, - b.IsShow, - BeginTime, - Gmv, - b.SubDetail, - b.Reputation, - b.DouYinId, - b.NAME, - c.LiveName as title, - c.BeginTime as startTime, - c.EndTime as finishTime - FROM - clean_daduoduo_dy_author_room_info c - RIGHT JOIN ( - SELECT - UserId, - HeaderImg, - FavCnt, - FansCnt, - IsShow, - SubDetail, - Reputation, - DouYinId, - NAME, - spider_time - FROM - clean_daduoduo_dy_author_detail - WHERE - UserId = '{symbol}' - ORDER BY - spider_time DESC - LIMIT 1 - ) b ON c.authorId = b.UserId - ORDER BY - c.spider_time DESC - LIMIT 1 - """ - cursor = kwargs.get('cursor') - cursor.execute(sql) - statistics = cursor.fetchone() - return statistics - - -@sqlConn -def get_anchor_search(symbol, page, limit, **kwargs): - sql = f""" - SELECT - UserId, - NAME, - HeaderImg, - DouYinId, - FansCnt - FROM - clean_daduoduo_dy_author_detail - WHERE - NAME LIKE '%{symbol}%' - ORDER BY - UserLevel DESC - LIMIT {(page - 1) * limit}, {limit}; - """ - cursor = kwargs.get('cursor') - cursor.execute(sql) - statistics = cursor.fetchall() - return statistics - - -# 搜索结果数 -@sqlConn -def get_anchor_search_count(symbol, **kwargs): - sql = f""" - SELECT - count(1) as total - FROM - clean_daduoduo_dy_author_detail - WHERE - NAME LIKE '%{symbol}%'; - """ - cursor = kwargs.get('cursor') - cursor.execute(sql) - total = cursor.fetchone() - return total - - -@sqlConn -def get_anchor_gmv(symbol, **kwargs): - symbol_str = ','.join(symbol) - sql = f""" - SELECT - a.authorId, - a.BeginTime, - a.Gmv - FROM - clean_daduoduo_dy_author_room_info AS a, - ( SELECT authorId, MAX( RoomId ) AS RoomId FROM clean_daduoduo_dy_author_room_info WHERE authorId IN - {symbol} GROUP BY authorId ) AS b - WHERE - a.authorId = b.authorId - AND a.RoomId = b.RoomId - ORDER BY - field( b.authorId, {symbol_str}); - """ - cursor = kwargs.get('cursor') - cursor.execute(sql) - statistics = cursor.fetchall() - return statistics - - -@sqlConn -def get_anchor_room_list(symbol, page, limit, goods_live_status, sort_order, **kwargs): - # item.title 直播标题 - # item.startTime 开播时间 - # item.actualSalesMoney 销售额 - # item.totalUser 直播销售额 - # item.minuteEnterUser 直播销量 - # item.actualSales 带货热度 - if int(goods_live_status) == 0: - bring_goods = '' - else: - bring_goods = f'and SaleCnt != 0' - sql = f""" - SELECT - RoomPic as coverUrl, - LiveName as title, - BeginTime as startTime, - Gmv as actualSalesMoney, - TotalUser as totalUser, - SaleCnt as minuteEnterUser, - UserCount as actualSales, - RoomId as roomSn - FROM - clean_daduoduo_dy_author_room_info - WHERE - authorId = '{symbol}' {bring_goods} - ORDER BY - BeginTime {sort_order} - LIMIT {(page - 1) * limit}, {limit}; - """ - cursor = kwargs.get('cursor') - cursor.execute(sql) - statistics = cursor.fetchall() - return statistics - - -# 直播记录数 -@sqlConn -def get_anchor_room_list_count(symbol, goods_live_status, **kwargs): - # item.title 直播标题 - # item.startTime 开播时间 - # item.actualSalesMoney 销售额 - # item.totalUser 直播销售额 - # item.minuteEnterUser 直播销量 - # item.actualSales 带货热度 - if int(goods_live_status) == 0: - bring_goods = '' - else: - bring_goods = f'and SaleCnt != 0' - sql = f""" - SELECT - count(1) as total - FROM - clean_daduoduo_dy_author_room_info - WHERE - authorId = '{symbol}' {bring_goods}; - """ - cursor = kwargs.get('cursor') - cursor.execute(sql) - total = cursor.fetchone() - return total - - -@sqlConn -def get_anchor_room_big_screen_goods(symbol, page, limit, goods_live_status, sortKey, sort_order, **kwargs): - # thumbCoverUrl - # title - # price - # sales - # salesMoney - sql = f""" - SELECT - GoodPic as thumbCoverUrl, - GoodName as title, - SellPrice as price, - SaleCnt as sales, - SaleCnt as totalSales, - Gmv as salesMoney - FROM - clean_daduoduo_dy_live_room_goods - WHERE - RoomId = '{symbol}' - ORDER BY - {sortKey} {sort_order} - LIMIT {(page - 1) * limit}, {limit}; - """ - cursor = kwargs.get('cursor') - cursor.execute(sql) - statistics = cursor.fetchall() - return statistics - - -@sqlConn -def get_anchor_room_big_screen_goods_count(symbol, **kwargs): # goods_live_status - sql = f""" - SELECT - count(1) as total - FROM - clean_daduoduo_dy_live_room_goods - WHERE - RoomId = '{symbol}'; - """ - cursor = kwargs.get('cursor') - cursor.execute(sql) - total = cursor.fetchone() - return total - - -@sqlConn -def get_anchor_room_goods(symbol, page, limit, sort_order, **kwargs): - # item.thumbCoverUrl - # item.title - # item.price - # item.totalSales - # item.totalSalesMoney - sql = f""" - SELECT - GoodPic as thumbCoverUrl, - GoodName as title, - SellPrice as price, - SaleCnt as totalSales, - Gmv as totalSalesMoney - FROM - clean_daduoduo_dy_live_room_goods - WHERE - RoomId = '{symbol}' - ORDER BY - StartTime {sort_order} - LIMIT {(page - 1) * limit}, {limit}; - """ - cursor = kwargs.get('cursor') - cursor.execute(sql) - statistics = cursor.fetchall() - return statistics - - -# 直播间商品数量 -@sqlConn -def get_anchor_room_goods_count(symbol, **kwargs): - # item.thumbCoverUrl - # item.title - # item.price - # item.totalSales - # item.totalSalesMoney - sql = f""" - SELECT - count(1) as total - FROM - clean_daduoduo_dy_live_room_goods - WHERE - RoomId = '{symbol}'; - """ - cursor = kwargs.get('cursor') - cursor.execute(sql) - total = cursor.fetchone() - return total - - -@sqlConn -def get_anchor_room_info(symbol, **kwargs): - # title - # coverUrl - # startTime - # finishTime - # liveTimeLength 直播时长(分钟) - # liveGoodsScore 本场带货口碑 - # actualSalesMoney 销售额 - # userVO {avatarThumbUrl, nickname, lastRoomStartTime, uniqueId } - # userCountVO { other, city, videoDetail, myFollow } - # followCount 直播涨粉 - # actualSales 销量 - # totalUser 观看人次 - # hasLiveStatus - # minuteEnterUser 分钟流速 - # avgOnline 平均在线 - # avgStayLength 平均停留 - # likeCount 本场点赞 - # converFanRate 转粉率 - # guestUnitPrice 客单价 - # userContribution 千次观看成交(UV价值) - # minuteSalesMoney 分钟销售额 - # minuteSales 分钟销售 - # goodsCount 商品数 - # totalSalesMoney 销售额 - # converRate 整体转化率 - sql = f""" - SELECT - people_Name as title, - people_HeaderImg as coverUrl, - room_BeginTime as startTime, - room_EndTime as finishTime, - CONCAT(UNIX_TIMESTAMP(room_EndTime)-UNIX_TIMESTAMP(room_BeginTime)) as liveTimeLength, - people_UserId as uniqueId, - room_Gmv as actualSalesMoney, - room_SaleCnt as actualSales, - room_TotalUser as totalUser, - room_MaxUserCnt as minuteEnterUser, - room_AvgUserCount as avgOnline, - room_viewer_info_other as other, - room_viewer_info_video as videoDetail, - room_viewer_info_follow as myFollow, - room_viewer_info_feed as feed, - room_viewer_info_city as city, - room_viewer_info_plaza as plaza, - room_viewer_info_search as search, - room_viewer_info_home as home, - room_GoodsCnt as goodsCount, - CONCAT((room_Gmv / room_SaleCnt)) as guestUnitPrice, - CONCAT((room_Gmv / room_TotalUser)) as userContribution - FROM - clean_daduoduo_dy_live_room_detail - WHERE - RoomId = '{symbol}' - """ - cursor = kwargs.get('cursor') - cursor.execute(sql) - statistics = cursor.fetchall() - return statistics - - -# 本场点赞 -@sqlConn -def get_anchor_room_big_screen_likeCount(symbol, **kwargs): - sql = f""" - SELECT FavCnt as likeCount FROM - clean_daduoduo_dy_live_room_flow_info - WHERE - RoomId = '{symbol}' - ORDER BY FullTime DESC - LIMIT 1 - """ - cursor = kwargs.get('cursor') - cursor.execute(sql) - statistics = cursor.fetchone() - return statistics - - -# 直播数据增长相关 -# FavCnt 点赞 -# FollowCnt 涨粉 -# TotalUserCnt 直播间总人数 -@sqlConn -def get_anchor_room_data_increase(symbol, **kwargs): - sql = f""" - SELECT FavCnt as likeCount, FollowCnt as followCount, TotalUserCnt FROM - clean_daduoduo_dy_live_room_flow_info - WHERE - RoomId = '{symbol}' - ORDER BY FullTime DESC - LIMIT 1 - """ - cursor = kwargs.get('cursor') - cursor.execute(sql) - statistics = cursor.fetchone() - return statistics - pass - - -@sqlConn -def get_anchor_room_big_screen(symbol, **kwargs): - # detailData.title - # detailData.coverUrl - # detailData.followerCount 粉丝 - # detailData.createTime 开播时间 - # detailData.liveTimeLength 直播时长 分钟 - # detailData.liveStatus 直播状态 [0,1] 0已下播1直播中 - # detailData.actualSalesMoney 直播间实时总销售额(元) - # detailData.actualSales 总销量 - # detailData.userCount 当前在线人数 - # detailData.totalUser 总观看人数 - # detailData.avgOnline 平均在线 - # detailData.minuteEnterUser 分钟流量 - # detailData.avgStayLength 平均停留 - # detailData.userContribution UV价值 - # detailData.guestUnitPrice 客单价 - # detailData.likeCount 本场点赞 - # detailData.followCount 直播涨粉 - sql = f""" - SELECT - people_Name as title, - people_HeaderImg as coverUrl, - room_BeginTime as createTime, - REPLACE(room_BeginTime, ' ', 'T') as startTime, - REPLACE(room_EndTime, ' ', 'T') as finishTime, - CONCAT(UNIX_TIMESTAMP(room_EndTime)-UNIX_TIMESTAMP(room_BeginTime)) as liveTimeLength, - room_Gmv as actualSalesMoney, - room_Gmv as salesExplain, - 0 as likeCount, - room_viewer_info_follow as followCount, - room_GoodsCnt as goodsCount, - room_SaleCnt as actualSales, - room_UserCount as userCount, - room_TotalUser as totalUser, - room_MaxUserCnt as minuteEnterUser, - room_AvgUserCount as avgOnline, - CONCAT((room_Gmv / room_SaleCnt)) as guestUnitPrice, - CONCAT((room_Gmv / room_TotalUser)) as userContribution - FROM - clean_daduoduo_dy_live_room_detail - WHERE - RoomId = '{symbol}' - """ - cursor = kwargs.get('cursor') - cursor.execute(sql) - statistics = cursor.fetchall() - return statistics - - -@sqlConn -def get_anchor_room_trend_first(symbol, **kwargs): - sql = f""" - SELECT - DATE_FORMAT(STR_TO_DATE(FullTime, '%Y%m%d %H%i%s'), '%Y-%m-%dT%H:%i:%s') as dateKeyListValue, - DATE_FORMAT(STR_TO_DATE(FullTime, '%Y%m%d'), '%Y-%m-%d') as dateKeyListLabel, - UserCnt AS userCountListValue - FROM - clean_daduoduo_dy_live_room_flow_info - WHERE - RoomId = '{symbol}' - ORDER BY - FullTime asc - """ - cursor = kwargs.get('cursor') - cursor.execute(sql) - statistics = cursor.fetchall() - return statistics +if __name__ == "__main__": + test() diff --git a/抖音js逆向学习/抖店精选联盟数据/dispatch/__init__.py b/抖音js逆向学习/抖店精选联盟数据/dispatch/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/抖音js逆向学习/抖店精选联盟数据/dispatch/base.py b/抖音js逆向学习/抖店精选联盟数据/dispatch/base.py new file mode 100644 index 0000000..0e525d9 --- /dev/null +++ b/抖音js逆向学习/抖店精选联盟数据/dispatch/base.py @@ -0,0 +1,12 @@ +from dao.mysql_dao import StoreMysqlPool +from datetime import datetime +import settings + + +class Base(object): + + def __init__(self): + self.eb_supports = StoreMysqlPool(**settings.mysql_server_baiyin) + + def log(self, s): + print('【%s】 %s' % (datetime.now(), s), flush=True) diff --git a/抖音js逆向学习/抖店精选联盟数据/dispatch/创建巨量百应主播详情爬虫.py b/抖音js逆向学习/抖店精选联盟数据/dispatch/创建巨量百应主播详情爬虫.py new file mode 100644 index 0000000..cd8d6f1 --- /dev/null +++ b/抖音js逆向学习/抖店精选联盟数据/dispatch/创建巨量百应主播详情爬虫.py @@ -0,0 +1,64 @@ +from dispatch.base import Base +import datetime +import json + + +class 创建巨量百应主播详情爬虫(Base): + + def __init__(self): + super(创建巨量百应主播详情爬虫, self).__init__() + self.project_table = 'project_buyin_authorStatData' + + def project(self, tasks: list): + """ + :param tasks:[{brand_code:, search_keyword:}] + :return: + search_keyword: 多组关键词用空格分隔 + """ + list_dict = [] + for task in tasks: + task_id = task.get("task_id") + log_id = task.get("log_id") + uid = task.get("uid") + payload = f"https://buyin.jinritemai.com/dashboard/servicehall/daren-profile?log_id={log_id}&uid={uid}" + item = { + "task_id": task_id, + "payload_get": payload, + "payload_post": '', + 'deduplication': f"uid={uid[0:30]}", + 'weight': 1 + } + list_dict.append(item) + cnt = self.eb_supports.insert_many(self.project_table, list_dict) + if cnt >= 0: + self.log(f"成功插入{self.project_table}任务-{cnt}") + + +if __name__ == '__main__': + now = datetime.datetime.now() + date = now.strftime('%Y_%m_%d_%H_%M_%S') + task_id = f'project_daduoduo_dy_author_detail-{date}' + d = 创建巨量百应主播详情爬虫() + weight = 1 + offset = 0 + while True: + sql = f""" + SELECT + data, deduplication + FROM + buyin_authorStatData_seekAuthor + LIMIT 1000 OFFSET {offset} + """ + msg = d.eb_supports.query(sql) + list_dict = [] + for data, deduplication in msg: + data = json.loads(data) + log_id = deduplication.split('&')[0] + uid = data.get('author_base').get('uid') + item = {"task_id": task_id, "uid": uid, "log_id": log_id} + list_dict.append(item) + if list_dict: + d.project(list_dict) + else: + break + offset += 1000 diff --git a/抖音js逆向学习/抖店精选联盟数据/extractors/__init__.py b/抖音js逆向学习/抖店精选联盟数据/extractors/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/抖音js逆向学习/抖店精选联盟数据/main_spider.py b/抖音js逆向学习/抖店精选联盟数据/main_spider.py new file mode 100644 index 0000000..07d8b7a --- /dev/null +++ b/抖音js逆向学习/抖店精选联盟数据/main_spider.py @@ -0,0 +1,4 @@ +from spider.buyin_author_statData_mitm import BuyinAuthorStatDataMitm + + +addons = [BuyinAuthorStatDataMitm()] diff --git a/抖音js逆向学习/抖店精选联盟数据/settings.py b/抖音js逆向学习/抖店精选联盟数据/settings.py index 0926244..7efeb8a 100644 --- a/抖音js逆向学习/抖店精选联盟数据/settings.py +++ b/抖音js逆向学习/抖店精选联盟数据/settings.py @@ -8,21 +8,21 @@ def log(s): if os.environ.get('ENV_REPLACE_API') == 'prod': log('生产环境') - mysql_server_daduoduo = { + mysql_server_baiyin = { "host": '127.0.0.1', "user": 'root', - "password": os.environ.get('DB_PASSWORD_DADUODUO'), - "db": 'eb_supports_daduoduo', + "password": os.environ.get('DB_PASSWORD_BAIYIN'), + "db": 'eb_supports_baiyin', "port": 3306, "charset": 'utf8mb4' } else: log('测试环境') - mysql_server_daduoduo = { + mysql_server_baiyin = { "host": '127.0.0.1', "user": 'root', "password": '123456', - "db": 'eb_supports_daduoduo', + "db": 'eb_supports_baiyin', "port": 3306, "charset": 'utf8mb4' } diff --git a/抖音js逆向学习/抖店精选联盟数据/spider/base.py b/抖音js逆向学习/抖店精选联盟数据/spider/base.py new file mode 100644 index 0000000..0e525d9 --- /dev/null +++ b/抖音js逆向学习/抖店精选联盟数据/spider/base.py @@ -0,0 +1,12 @@ +from dao.mysql_dao import StoreMysqlPool +from datetime import datetime +import settings + + +class Base(object): + + def __init__(self): + self.eb_supports = StoreMysqlPool(**settings.mysql_server_baiyin) + + def log(self, s): + print('【%s】 %s' % (datetime.now(), s), flush=True) diff --git a/抖音js逆向学习/抖店精选联盟数据/spider/browser_baiyin.py b/抖音js逆向学习/抖店精选联盟数据/spider/browser_baiyin.py new file mode 100644 index 0000000..0427632 --- /dev/null +++ b/抖音js逆向学习/抖店精选联盟数据/spider/browser_baiyin.py @@ -0,0 +1,54 @@ +from multiprocessing import Queue +from selenium import webdriver +from spider.base import Base +import time + + +class BrowserBaiyin(Base): + + def __init__(self, **kwargs): + super(BrowserBaiyin, self).__init__() + self.queue_list = Queue() + self.status_type = "status" + executablePath = r"../file/chromedriver" + self.executablePath = kwargs.get("executablePath", executablePath) + options = webdriver.ChromeOptions() + # 配置代码 + options.add_experimental_option("debuggerAddress", "127.0.0.1:9222") + options.add_argument("--headless") + self.options = { + "headless": True, + "handleSIGINT": True, + "handleSIGTERM": True, + "handleSIGHUP": True, + } + if self.executablePath is not None: + self.options["executablePath"] = self.executablePath + self.browser = webdriver.Chrome(executable_path=self.executablePath, chrome_options=options) + self.browser.get('https://buyin.jinritemai.com/dashboard/servicehall/daren-profile?' + 'log_id=20230711105437315B137C5F87A4BE6336&uid=v2_0a2773177a04f52dbfb23f61609cd2843a31e9' + '22c1355db72de16fe32292f85d99c03dccc7e02117d81a4b0a3cb4d3fab4e8644e5431d4dab3370b9ab0092' + '5112a189a90e5d239f9b2a2e4b688f3db83682da9c158e6903c09d6b97e7f6cc1cea0defaf6c57672a72510' + 'da95b60d18e5ade4c90120012201039e3ecc30') + + def get_sign_url(self, keywords): + try: + pass + except Exception: + pass + + def run(self): + while True: + self.log(f"等待 2 秒") + time.sleep(2) + self.get_sign_url('JY20220616004390') + + def close(self): + self.browser.close() + self.browser.quit() + + +if __name__ == "__main__": + browser = BrowserBaiyin() + browser.run() + browser.close() diff --git a/抖音js逆向学习/抖店精选联盟数据/spider/buyin_authorStatData_seekAuthor_mitm.py b/抖音js逆向学习/抖店精选联盟数据/spider/buyin_authorStatData_seekAuthor_mitm.py deleted file mode 100644 index ecb5361..0000000 --- a/抖音js逆向学习/抖店精选联盟数据/spider/buyin_authorStatData_seekAuthor_mitm.py +++ /dev/null @@ -1,14 +0,0 @@ -import mitmproxy.http - - -class AddHeader: - def __init__(self): - self.num = 0 - - def response(self, flow): - print(flow.request.host) - self.num = self.num + 1 - flow.response.headers["count"] = str(self.num) - - -addons = [AddHeader()] diff --git a/抖音js逆向学习/抖店精选联盟数据/spider/buyin_author_statData_mitm.py b/抖音js逆向学习/抖店精选联盟数据/spider/buyin_author_statData_mitm.py new file mode 100644 index 0000000..06fa362 --- /dev/null +++ b/抖音js逆向学习/抖店精选联盟数据/spider/buyin_author_statData_mitm.py @@ -0,0 +1,57 @@ +from spider.base import Base +from urllib.parse import parse_qsl, urlsplit +import json + + +class BuyinAuthorStatDataMitm(Base): + def __init__(self): + super(BuyinAuthorStatDataMitm, self).__init__() + self.达人广场搜索列表 = 'buyin_authorStatData_seekAuthor' + self.作者概述V2 = 'buyin_authorStatData_authorOverviewV2' + self.联系方式 = 'buyin_contact_info' + + def response(self, flow): + # 达人广场搜索列表 + if "https://buyin.jinritemai.com/api/authorStatData/seekAuthor" in flow.request.url: + list_dicts = [] + author_list = json.loads(flow.response.content).get('data').get('list') + event_track_log_id = json.loads(flow.response.content).get('data').get('event_track_log_id') + for author in author_list: + item = { + "task_id": 'project_test', + "data": json.dumps(author), + "deduplication": event_track_log_id + f"&uid={author.get('author_base').get('uid')[0:30]}", + } + list_dicts.append(item) + db_res = self.eb_supports.insert_many(self.达人广场搜索列表, list_dicts) + print(db_res) + + # 作者概述V2 + if "https://buyin.jinritemai.com/api/authorStatData/authorOverviewV2" in flow.request.url: + uid = dict(parse_qsl(urlsplit(flow.request.url).query)).get('uid') + list_dicts = [] + data = json.loads(flow.response.content).get('data') + item = { + "task_id": 'project_test', + "data": json.dumps(data), + "deduplication": f"uid={uid[0:30]}" + } + list_dicts.append(item) + db_res = self.eb_supports.insert_many(self.作者概述V2, list_dicts) + print(db_res) + + # 联系方式 + if "https://buyin.jinritemai.com/api/contact/contact_info" in flow.request.url: + uid = dict(parse_qsl(urlsplit(flow.request.url).query)).get('origin_uid') + list_dicts = [] + data = json.loads(flow.response.content).get('data') + contact_value = data.get('contact_info').get('contact_value') + if contact_value: + item = { + "task_id": 'project_test', + "data": json.dumps(data), + "deduplication": f"uid={uid[0:30]}" + } + list_dicts.append(item) + db_res = self.eb_supports.insert_many(self.联系方式, list_dicts) + print(db_res)