AST抽象语法树学习记录

This commit is contained in:
aiyingfeng 2023-07-11 14:01:35 +08:00
parent 3413db8651
commit c1986d90bc
12 changed files with 545 additions and 655 deletions

1
.gitignore vendored
View File

@ -8,6 +8,7 @@
*.vscode
*.pdf
*.pt
chromedriver
package.json
package-lock.json
venv/

View File

@ -1,648 +1,348 @@
from handlers.base import Base
import pymysql.cursors
import settings
base = Base()
# -*- coding: utf8 -*-
from collections import OrderedDict
from copy import deepcopy
import pymysql
import time
from DBUtils.PooledDB import PooledDB
from DBUtils.PersistentDB import PersistentDB
import traceback
from datetime import datetime
def getConnection():
return pymysql.connect(
host=settings.HOST,
port=settings.PORT,
user=settings.USER,
password=settings.PASSWORD,
database=settings.DATABASE,
autocommit=True,
cursorclass=pymysql.cursors.DictCursor
)
def sqlConn(func):
""" use a decorator to make connection easier to use
class StoreMysqlPool(object):
"""
mysql读写相关操作pymysql使用PooledDB连接池
Args:
host:数据库ip
user:数据库用户名
password:数据库用户密码
db:数据库名
port:数据库端口默认3306
mincached:最少的空闲连接数 默认为1
charset:数据库编码默认utf8
"""
def __sqlFunc(*args):
conn = None
cursor = None
def __init__(self, host="", user="", password="", db="", port=3306, mincached=1,
pattern=1, charset="utf8"):
self.host = host
if pattern == 1:
self.pool = PooledDB(pymysql, mincached, host=host, user=user, passwd=password, db=db, port=port,
charset=charset)
self.close_able = True
elif pattern == 2:
self.pool = PersistentDB(pymysql, host=host, user=user, passwd=password, db=db, port=port,
charset=charset)
self.close_able = False
self._last_use_time = time.time()
def close(self, db):
if db is not None:
db.close()
def connect(self):
return self.pool.connection()
def __exit__(self, exc_type, exc_val, exc_tb):
self.close_all()
@staticmethod
def _cursor(db):
# return db.cursor(pymysql.cursors.DictCursor)
return db.cursor()
def query(self, sql):
"""
根据sql查询
Returns
数组的数组外层数组元素为一行内存数组元素为一行的一列
"""
db = self.connect()
cur = self._cursor(db)
rows = []
try:
conn = getConnection()
cursor = conn.cursor()
return func(*args, cursor=cursor)
cur.execute(sql)
db.commit()
rows = cur.fetchall()
except pymysql.OperationalError:
print(traceback.format_exc())
except Exception as e:
print('query{}'.format(e))
print(traceback.format_exc())
finally:
if cursor:
cursor.close()
if conn:
conn.close()
cur.close()
self.close(db)
return rows
return __sqlFunc
def count(self, tb):
"""
返回某表的行数
Args:
tb:字符串表名称
"""
sql = 'select count(*) from %s' % tb
results = self.query(sql)
if len(results) == 1 and len(results[0]) == 1:
return int(results[0][0])
def do(self, sql, flag='rowcount'):
"""
执行sqlinsert/delete/update操作
Args:
sql:要执行的sql
flag:返回值类型flag=lastrowid返回lastrowidflag=rowcount返回rowcount
"""
db = self.connect()
cur = self._cursor(db)
r = None
try:
cur.execute(sql)
db.commit()
r = 1
if flag == 'lastrowid':
r = cur.lastrowid
elif flag == 'rowcount':
r = cur.rowcount
except pymysql.OperationalError:
print("time: %s; Error on %s: %s" % (str(datetime.now()), self.host, sql[0: 200]))
print(traceback.format_exc())
r = -1
except Exception:
print("time: %s; Error to MySQL on %s: %s" % (str(datetime.now()), self.host, sql[0: 200]))
print(traceback.format_exc())
r = -1
finally:
cur.close()
self.close(db)
return r
def save(self, table, data, mapping_fields=dict()):
"""
将字典直接insert到数据库
Args:
table:字符串插入目标表的名称
data:字典格式key为字段名称value为字段值{'id':'1','name':'temp'}
mapping_fields: 用于保持data字典的key与数据库字段的对应关系
如果结果字典的某个key不包含在mapping_fields中则将直接使用key作为字段名
"""
if len(data) <= 0:
return -1
try:
fields = ''
values = ''
for d in data:
if d in mapping_fields:
fields += "`%s`," % (str(mapping_fields[d]))
else:
fields += "`%s`," % (str(d))
values += "'%s'," % (pymysql.escape_string(str(data[d])))
if len(fields) <= 0 or len(values) <= 0:
return -1
sql = "insert ignore into %s(%s) values(%s)" % (table, fields[:-1], values[:-1])
return self.do(sql)
except Exception:
print(traceback.format_exc())
return -1
def saveMany(self, sql, values, flag='lastrowid'):
"""
执行sqlinsert/delete/update操作
:param table: 'insert into table(id,name) values(%s,%s)'
:param names: 字符串
:param values:[{1,2},{1,2},...] 是一个列表列表中的每一个元素必须是元组
:param flag:
:return:
"""
db = self.connect()
cur = self._cursor(db)
r = None
try:
cur.executemany(sql, values)
db.commit()
r = 1
if flag == 'lastrowid':
r = cur.lastrowid
elif flag == 'rowcount':
r = cur.rowcount
except pymysql.OperationalError:
print("Error connecting to MySQL on %s: %s" % (self.host, sql[0:255]))
print(traceback.format_exc())
r = -1
except Exception:
print("Error connecting to MySQL on %s: %s" % (self.host, sql[0:255]))
print(traceback.format_exc())
r = -1
finally:
cur.close()
self.close(db)
return r
def update(self, table, data, field, mapping_fields=dict()):
"""
将字典直接update到数据库
Args:
table:字符串更新目标表的名称
data:字典格式key为字段名称value为字段值{'id':'1','name':'temp'}
field:唯一索引字段即根据该字段判断是否为同一条记录作为where条件
mapping_fields: 用于保持data字典的key与数据库字段的对应关系
如果结果字典的某个key不包含在mapping_fields中则将直接使用key作为字段名
"""
if len(data) <= 0:
return -1
else:
try:
values = ''
field_value = None
for d in data:
key = d
if d in mapping_fields:
key = mapping_fields[d]
if key == field:
field_value = data[d]
values += "%s='%s'," % (str(key), pymysql.escape_string(str(data[d])))
if len(values) <= 0 or field_value is None:
return -1
sql = "update " + table + " set " + values[:-1] + " where " + field + "='" + pymysql.escape_string(
str(field_value)) + "'"
return self.do(sql, flag='rowcount')
except Exception:
print(traceback.format_exc())
return -1
def saveorupdate(self, table, data, field, mapping_fields=dict()):
"""
将字典更新到数据库如果已存在则update不存在则insert
Args:
table:字符串更新目标表的名称
data:字典格式key为字段名称value为字段值{'id':'1','name':'temp'}
field:唯一索引字段即根据词字段判断是否为同一条记录作为where条件
mapping_fields: 用于保持data字典的key与数据库字段的对应关系
如果结果字典的某个key不包含在mapping_fields中则将直接使用key作为字段名
"""
if len(data) <= 0:
return -1
try:
field_value = None
if field in data:
field_value = data[field]
else:
for key in mapping_fields:
if mapping_fields[key] == field and key in data:
field_value = data[key]
if field_value is None:
return -1
querysql = "select count(1) from " + table + " where " + field + "='" + pymysql.escape_string(
str(field_value)) + "'"
ed = self.query(querysql)
if ed and ed[0][0] > 0:
return self.update(table, data, field, mapping_fields)
else:
return self.save(table, data, mapping_fields)
except Exception:
print(traceback.format_exc())
return -1
def tableCreate(self, tb_name, likeTable):
"""
创建数据表格
:param tb_name: table's name
:return: 1 ok , -1 error
"""
# sql = "SELECT table_name FROM information_schema.TABLES WHERE table_name ='{}';".format(tb_name)
sql = "show tables;"
datas = self.query(sql)
if tb_name in [str(i[0]) for i in datas]:
# 存在
sql = "select * from {} limit 10;".format(tb_name)
if not self.query(sql):
print("{} 是空表, 正在truncate.....".format(tb_name))
sql = " truncate {};".format(tb_name)
datas = self.query(sql)
else:
print("表格存在:{}".format(tb_name))
else:
# 不存在 创建 清空
print("{} 表格不存在,正在创建....".format(tb_name))
sql = "create table {} like {};".format(tb_name, likeTable)
datas = self.query(sql)
sql = " truncate {};".format(tb_name)
datas = self.query(sql)
return 1
def dictsToOrderDicts(self, list_dicts):
alist = []
# list_dicts = [list_dicts] if isinstance(list_dicts, dict) else list_dicts
ks = list_dicts[0].keys() # 有序
order_dict = OrderedDict()
for adict in list_dicts:
for k in ks:
order_dict[k] = adict[k]
alist.append(deepcopy(order_dict))
return alist
def insert_many(self, table, list_dicts, conflict=''):
'''
[批量插入]
:param store_name:
:param table:
:param list_dicts:
:param conflict: 冲突更新字段 local_date=CURRENT_DATE() or ['local_date','fields']
:return:
'''
try:
if len(list_dicts):
orderDicts = self.dictsToOrderDicts(list_dicts)
ks = orderDicts[0].keys() # 获取键值
if conflict:
if isinstance(conflict, str):
insert_sql = "insert ignore into {} ({}) values ({}) ON DUPLICATE KEY UPDATE {}".format(
table, ", ".join(ks), ("%s," * len(ks)).strip(","), conflict)
else:
if len(conflict):
conflict = ','.join(['{}=VALUES({})'.format(field, field) for field in conflict])
insert_sql = "insert ignore into {} ({}) values ({}) ON DUPLICATE KEY UPDATE {}".format(
table, ", ".join(ks), ("%s," * len(ks)).strip(","), conflict)
else:
return -1
else:
insert_sql = "insert ignore into {} ({}) values ({})".format(table, ", ".join(ks),
("%s," * len(ks)).strip(","))
values = []
for adict in list_dicts:
values.append(tuple(adict.values()))
res = self.saveMany(insert_sql, values, flag="rowcount")
return res
else:
return -3
except Exception as e:
print('insert_many{}'.format(e))
return -2
def close_all(self):
"""
关闭连接池全部连接
请在确保连接不再需要时确认
:return:
"""
if self.close_able:
self.pool.close()
@sqlConn
def getFinancialData(symbol, startDate, endDate, page, limit, **kwargs):
""" fetch financial from db
"""
sql = f"SELECT symbol, date, open_price, close_price, volume FROM Financial " + \
f"WHERE date >= '{startDate}' AND " + \
f"date <= '{endDate}' AND symbol = '{symbol}' LIMIT {(page - 1) * limit}, {limit};"
# base.log(sql)
cursor = kwargs.get('cursor')
cursor.execute(sql)
rows = cursor.fetchall()
return rows
@sqlConn
def getPaginationData(symbol, startDate, endDate, page, limit, **kwargs):
""" get pagination information
"""
sql = f"SELECT COUNT(*) AS cnt FROM Financial WHERE date >= '{startDate}' " + \
f"AND date <= '{endDate}' AND symbol = '{symbol}';"
# base.log(sql)
cursor = kwargs.get('cursor')
cursor.execute(sql)
c = cursor.fetchone()
return {
'count': c['cnt'],
'page': page,
'limit': limit,
'pages': (c['cnt'] + limit - 1) // limit
def test():
db = {
'host': 'localhost',
'user': 'root',
'password': '',
'db': 'test'
}
mq = StoreMysqlPool(**db)
print(mq.count('urls'))
@sqlConn
def initDatabase(**kwargs):
sql = """
CREATE TABLE IF NOT EXISTS Financial(
id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
symbol VARCHAR(255) NOT NULL,
`date` VARCHAR(16) NOT NULL,
open_price VARCHAR(255) NOT NULL,
close_price VARCHAR(255) NOT NULL,
volume VARCHAR(255) NOT NULL,
UNIQUE(symbol, `date`)
);
"""
cursor = kwargs.get('cursor')
cursor.execute(sql)
@sqlConn
def updateFinancialData(symbol, data, **kwargs):
""" save data to db
@return 0: success, other: failure
"""
# use ignore, so the insert will ignore those duplicated key
sql = "INSERT IGNORE INTO Financial (symbol, date, open_price, close_price, volume) VALUES "
i = 1
for date, fields in data.items():
sql += "('%s', '%s', '%s', '%s', '%s')" % \
(symbol, date, fields['1. open'], fields['4. close'], fields['6. volume'])
if i < len(data):
sql += ","
i += 1
cursor = kwargs.get('cursor')
cursor.execute(sql)
base.log(f'更新 {symbol} 数量: {cursor.rowcount}')
return cursor.rowcount
@sqlConn
def insert_project_spider(symbol, item_list, **kwargs):
sql = f"""
INSERT IGNORE INTO {symbol} (task_id, payload_get, payload_post, deduplication, weight) VALUES
"""
for item in item_list:
sql += "('%s', '%s', '%s', '%s', '%s')," % (
item['task_id'], item['payload_get'],
item['payload_post'], item['deduplication'],
item['weight']
)
sql = sql.rstrip(',')
cursor = kwargs.get('cursor')
cursor.execute(sql)
base.log(f'插入 {symbol} 数量: {cursor.rowcount}')
return cursor.rowcount
@sqlConn
def insert_project_user_monitor(symbol, item_list, **kwargs):
sql = f"""
INSERT IGNORE INTO {symbol} (task_id, authorId, BeginTime, EndTime, weight) VALUES
"""
for item in item_list:
sql += "('%s', '%s', '%s', '%s', '%s')," % (
item['task_id'], item['authorId'],
item['BeginTime'], item['EndTime'],
item['weight']
)
sql = sql.rstrip(',')
cursor = kwargs.get('cursor')
cursor.execute(sql)
base.log(f'插入 {symbol} 数量: {cursor.rowcount}')
return cursor.rowcount
@sqlConn
def get_user_monitor(symbol, **kwargs):
sql = f"""
SELECT
authorId,
BeginTime,
EndTime
FROM
clean_daduoduo_dy_author_room_info
WHERE
authorId = '{symbol}'
ORDER BY BeginTime DESC
LIMIT 5
"""
cursor = kwargs.get('cursor')
cursor.execute(sql)
statistics = cursor.fetchall()
return statistics
@sqlConn
def get_project_spider(symbol, task_id, deduplication, **kwargs):
"""
sql 判断5分钟内是否已经抓取
"""
sql = f"""
SELECT
status
FROM
{symbol}
WHERE
deduplication = '{deduplication}' and
create_time>=DATE_SUB(NOW(),INTERVAL 5 MINUTE)
"""
cursor = kwargs.get('cursor')
cursor.execute(sql)
statistics = cursor.fetchone()
return statistics
@sqlConn
def getStatisticsData(symbol, startDate, endDate, **kwargs):
""" get the average value of the given peroid
"""
sql = f"SELECT AVG(open_price) AS avg_open, AVG(close_price) AS avg_close, AVG(volume) " + \
f"AS avg_volume FROM Financial WHERE date >= '{startDate}' " + \
f"AND date <= '{endDate}' AND symbol = '{symbol}';"
cursor = kwargs.get('cursor')
cursor.execute(sql)
statistics = cursor.fetchone()
return statistics
@sqlConn
def get_anchor_info(symbol, **kwargs):
sql = f"""
SELECT
b.UserId,
b.UserId as userSn,
b.HeaderImg,
b.FavCnt,
b.FansCnt,
b.IsShow,
BeginTime,
Gmv,
b.SubDetail,
b.Reputation,
b.DouYinId,
b.NAME,
c.LiveName as title,
c.BeginTime as startTime,
c.EndTime as finishTime
FROM
clean_daduoduo_dy_author_room_info c
RIGHT JOIN (
SELECT
UserId,
HeaderImg,
FavCnt,
FansCnt,
IsShow,
SubDetail,
Reputation,
DouYinId,
NAME,
spider_time
FROM
clean_daduoduo_dy_author_detail
WHERE
UserId = '{symbol}'
ORDER BY
spider_time DESC
LIMIT 1
) b ON c.authorId = b.UserId
ORDER BY
c.spider_time DESC
LIMIT 1
"""
cursor = kwargs.get('cursor')
cursor.execute(sql)
statistics = cursor.fetchone()
return statistics
@sqlConn
def get_anchor_search(symbol, page, limit, **kwargs):
sql = f"""
SELECT
UserId,
NAME,
HeaderImg,
DouYinId,
FansCnt
FROM
clean_daduoduo_dy_author_detail
WHERE
NAME LIKE '%{symbol}%'
ORDER BY
UserLevel DESC
LIMIT {(page - 1) * limit}, {limit};
"""
cursor = kwargs.get('cursor')
cursor.execute(sql)
statistics = cursor.fetchall()
return statistics
# 搜索结果数
@sqlConn
def get_anchor_search_count(symbol, **kwargs):
sql = f"""
SELECT
count(1) as total
FROM
clean_daduoduo_dy_author_detail
WHERE
NAME LIKE '%{symbol}%';
"""
cursor = kwargs.get('cursor')
cursor.execute(sql)
total = cursor.fetchone()
return total
@sqlConn
def get_anchor_gmv(symbol, **kwargs):
symbol_str = ','.join(symbol)
sql = f"""
SELECT
a.authorId,
a.BeginTime,
a.Gmv
FROM
clean_daduoduo_dy_author_room_info AS a,
( SELECT authorId, MAX( RoomId ) AS RoomId FROM clean_daduoduo_dy_author_room_info WHERE authorId IN
{symbol} GROUP BY authorId ) AS b
WHERE
a.authorId = b.authorId
AND a.RoomId = b.RoomId
ORDER BY
field( b.authorId, {symbol_str});
"""
cursor = kwargs.get('cursor')
cursor.execute(sql)
statistics = cursor.fetchall()
return statistics
@sqlConn
def get_anchor_room_list(symbol, page, limit, goods_live_status, sort_order, **kwargs):
# item.title 直播标题
# item.startTime 开播时间
# item.actualSalesMoney 销售额
# item.totalUser 直播销售额
# item.minuteEnterUser 直播销量
# item.actualSales 带货热度
if int(goods_live_status) == 0:
bring_goods = ''
else:
bring_goods = f'and SaleCnt != 0'
sql = f"""
SELECT
RoomPic as coverUrl,
LiveName as title,
BeginTime as startTime,
Gmv as actualSalesMoney,
TotalUser as totalUser,
SaleCnt as minuteEnterUser,
UserCount as actualSales,
RoomId as roomSn
FROM
clean_daduoduo_dy_author_room_info
WHERE
authorId = '{symbol}' {bring_goods}
ORDER BY
BeginTime {sort_order}
LIMIT {(page - 1) * limit}, {limit};
"""
cursor = kwargs.get('cursor')
cursor.execute(sql)
statistics = cursor.fetchall()
return statistics
# 直播记录数
@sqlConn
def get_anchor_room_list_count(symbol, goods_live_status, **kwargs):
# item.title 直播标题
# item.startTime 开播时间
# item.actualSalesMoney 销售额
# item.totalUser 直播销售额
# item.minuteEnterUser 直播销量
# item.actualSales 带货热度
if int(goods_live_status) == 0:
bring_goods = ''
else:
bring_goods = f'and SaleCnt != 0'
sql = f"""
SELECT
count(1) as total
FROM
clean_daduoduo_dy_author_room_info
WHERE
authorId = '{symbol}' {bring_goods};
"""
cursor = kwargs.get('cursor')
cursor.execute(sql)
total = cursor.fetchone()
return total
@sqlConn
def get_anchor_room_big_screen_goods(symbol, page, limit, goods_live_status, sortKey, sort_order, **kwargs):
# thumbCoverUrl
# title
# price
# sales
# salesMoney
sql = f"""
SELECT
GoodPic as thumbCoverUrl,
GoodName as title,
SellPrice as price,
SaleCnt as sales,
SaleCnt as totalSales,
Gmv as salesMoney
FROM
clean_daduoduo_dy_live_room_goods
WHERE
RoomId = '{symbol}'
ORDER BY
{sortKey} {sort_order}
LIMIT {(page - 1) * limit}, {limit};
"""
cursor = kwargs.get('cursor')
cursor.execute(sql)
statistics = cursor.fetchall()
return statistics
@sqlConn
def get_anchor_room_big_screen_goods_count(symbol, **kwargs): # goods_live_status
sql = f"""
SELECT
count(1) as total
FROM
clean_daduoduo_dy_live_room_goods
WHERE
RoomId = '{symbol}';
"""
cursor = kwargs.get('cursor')
cursor.execute(sql)
total = cursor.fetchone()
return total
@sqlConn
def get_anchor_room_goods(symbol, page, limit, sort_order, **kwargs):
# item.thumbCoverUrl
# item.title
# item.price
# item.totalSales
# item.totalSalesMoney
sql = f"""
SELECT
GoodPic as thumbCoverUrl,
GoodName as title,
SellPrice as price,
SaleCnt as totalSales,
Gmv as totalSalesMoney
FROM
clean_daduoduo_dy_live_room_goods
WHERE
RoomId = '{symbol}'
ORDER BY
StartTime {sort_order}
LIMIT {(page - 1) * limit}, {limit};
"""
cursor = kwargs.get('cursor')
cursor.execute(sql)
statistics = cursor.fetchall()
return statistics
# 直播间商品数量
@sqlConn
def get_anchor_room_goods_count(symbol, **kwargs):
# item.thumbCoverUrl
# item.title
# item.price
# item.totalSales
# item.totalSalesMoney
sql = f"""
SELECT
count(1) as total
FROM
clean_daduoduo_dy_live_room_goods
WHERE
RoomId = '{symbol}';
"""
cursor = kwargs.get('cursor')
cursor.execute(sql)
total = cursor.fetchone()
return total
@sqlConn
def get_anchor_room_info(symbol, **kwargs):
# title
# coverUrl
# startTime
# finishTime
# liveTimeLength 直播时长(分钟)
# liveGoodsScore 本场带货口碑
# actualSalesMoney 销售额
# userVO {avatarThumbUrl, nickname, lastRoomStartTime, uniqueId }
# userCountVO { other, city, videoDetail, myFollow }
# followCount 直播涨粉
# actualSales 销量
# totalUser 观看人次
# hasLiveStatus
# minuteEnterUser 分钟流速
# avgOnline 平均在线
# avgStayLength 平均停留
# likeCount 本场点赞
# converFanRate 转粉率
# guestUnitPrice 客单价
# userContribution 千次观看成交UV价值
# minuteSalesMoney 分钟销售额
# minuteSales 分钟销售
# goodsCount 商品数
# totalSalesMoney 销售额
# converRate 整体转化率
sql = f"""
SELECT
people_Name as title,
people_HeaderImg as coverUrl,
room_BeginTime as startTime,
room_EndTime as finishTime,
CONCAT(UNIX_TIMESTAMP(room_EndTime)-UNIX_TIMESTAMP(room_BeginTime)) as liveTimeLength,
people_UserId as uniqueId,
room_Gmv as actualSalesMoney,
room_SaleCnt as actualSales,
room_TotalUser as totalUser,
room_MaxUserCnt as minuteEnterUser,
room_AvgUserCount as avgOnline,
room_viewer_info_other as other,
room_viewer_info_video as videoDetail,
room_viewer_info_follow as myFollow,
room_viewer_info_feed as feed,
room_viewer_info_city as city,
room_viewer_info_plaza as plaza,
room_viewer_info_search as search,
room_viewer_info_home as home,
room_GoodsCnt as goodsCount,
CONCAT((room_Gmv / room_SaleCnt)) as guestUnitPrice,
CONCAT((room_Gmv / room_TotalUser)) as userContribution
FROM
clean_daduoduo_dy_live_room_detail
WHERE
RoomId = '{symbol}'
"""
cursor = kwargs.get('cursor')
cursor.execute(sql)
statistics = cursor.fetchall()
return statistics
# 本场点赞
@sqlConn
def get_anchor_room_big_screen_likeCount(symbol, **kwargs):
sql = f"""
SELECT FavCnt as likeCount FROM
clean_daduoduo_dy_live_room_flow_info
WHERE
RoomId = '{symbol}'
ORDER BY FullTime DESC
LIMIT 1
"""
cursor = kwargs.get('cursor')
cursor.execute(sql)
statistics = cursor.fetchone()
return statistics
# 直播数据增长相关
# FavCnt 点赞
# FollowCnt 涨粉
# TotalUserCnt 直播间总人数
@sqlConn
def get_anchor_room_data_increase(symbol, **kwargs):
sql = f"""
SELECT FavCnt as likeCount, FollowCnt as followCount, TotalUserCnt FROM
clean_daduoduo_dy_live_room_flow_info
WHERE
RoomId = '{symbol}'
ORDER BY FullTime DESC
LIMIT 1
"""
cursor = kwargs.get('cursor')
cursor.execute(sql)
statistics = cursor.fetchone()
return statistics
pass
@sqlConn
def get_anchor_room_big_screen(symbol, **kwargs):
# detailData.title
# detailData.coverUrl
# detailData.followerCount 粉丝
# detailData.createTime 开播时间
# detailData.liveTimeLength 直播时长 分钟
# detailData.liveStatus 直播状态 [0,1] 0已下播1直播中
# detailData.actualSalesMoney 直播间实时总销售额(元)
# detailData.actualSales 总销量
# detailData.userCount 当前在线人数
# detailData.totalUser 总观看人数
# detailData.avgOnline 平均在线
# detailData.minuteEnterUser 分钟流量
# detailData.avgStayLength 平均停留
# detailData.userContribution UV价值
# detailData.guestUnitPrice 客单价
# detailData.likeCount 本场点赞
# detailData.followCount 直播涨粉
sql = f"""
SELECT
people_Name as title,
people_HeaderImg as coverUrl,
room_BeginTime as createTime,
REPLACE(room_BeginTime, ' ', 'T') as startTime,
REPLACE(room_EndTime, ' ', 'T') as finishTime,
CONCAT(UNIX_TIMESTAMP(room_EndTime)-UNIX_TIMESTAMP(room_BeginTime)) as liveTimeLength,
room_Gmv as actualSalesMoney,
room_Gmv as salesExplain,
0 as likeCount,
room_viewer_info_follow as followCount,
room_GoodsCnt as goodsCount,
room_SaleCnt as actualSales,
room_UserCount as userCount,
room_TotalUser as totalUser,
room_MaxUserCnt as minuteEnterUser,
room_AvgUserCount as avgOnline,
CONCAT((room_Gmv / room_SaleCnt)) as guestUnitPrice,
CONCAT((room_Gmv / room_TotalUser)) as userContribution
FROM
clean_daduoduo_dy_live_room_detail
WHERE
RoomId = '{symbol}'
"""
cursor = kwargs.get('cursor')
cursor.execute(sql)
statistics = cursor.fetchall()
return statistics
@sqlConn
def get_anchor_room_trend_first(symbol, **kwargs):
sql = f"""
SELECT
DATE_FORMAT(STR_TO_DATE(FullTime, '%Y%m%d %H%i%s'), '%Y-%m-%dT%H:%i:%s') as dateKeyListValue,
DATE_FORMAT(STR_TO_DATE(FullTime, '%Y%m%d'), '%Y-%m-%d') as dateKeyListLabel,
UserCnt AS userCountListValue
FROM
clean_daduoduo_dy_live_room_flow_info
WHERE
RoomId = '{symbol}'
ORDER BY
FullTime asc
"""
cursor = kwargs.get('cursor')
cursor.execute(sql)
statistics = cursor.fetchall()
return statistics
if __name__ == "__main__":
test()

View File

@ -0,0 +1,12 @@
from dao.mysql_dao import StoreMysqlPool
from datetime import datetime
import settings
class Base(object):
def __init__(self):
self.eb_supports = StoreMysqlPool(**settings.mysql_server_baiyin)
def log(self, s):
print('%s%s' % (datetime.now(), s), flush=True)

View File

@ -0,0 +1,64 @@
from dispatch.base import Base
import datetime
import json
class 创建巨量百应主播详情爬虫(Base):
def __init__(self):
super(创建巨量百应主播详情爬虫, self).__init__()
self.project_table = 'project_buyin_authorStatData'
def project(self, tasks: list):
"""
:param tasks:[{brand_code:, search_keyword:}]
:return:
search_keyword: 多组关键词用空格分隔
"""
list_dict = []
for task in tasks:
task_id = task.get("task_id")
log_id = task.get("log_id")
uid = task.get("uid")
payload = f"https://buyin.jinritemai.com/dashboard/servicehall/daren-profile?log_id={log_id}&uid={uid}"
item = {
"task_id": task_id,
"payload_get": payload,
"payload_post": '',
'deduplication': f"uid={uid[0:30]}",
'weight': 1
}
list_dict.append(item)
cnt = self.eb_supports.insert_many(self.project_table, list_dict)
if cnt >= 0:
self.log(f"成功插入{self.project_table}任务-{cnt}")
if __name__ == '__main__':
now = datetime.datetime.now()
date = now.strftime('%Y_%m_%d_%H_%M_%S')
task_id = f'project_daduoduo_dy_author_detail-{date}'
d = 创建巨量百应主播详情爬虫()
weight = 1
offset = 0
while True:
sql = f"""
SELECT
data, deduplication
FROM
buyin_authorStatData_seekAuthor
LIMIT 1000 OFFSET {offset}
"""
msg = d.eb_supports.query(sql)
list_dict = []
for data, deduplication in msg:
data = json.loads(data)
log_id = deduplication.split('&')[0]
uid = data.get('author_base').get('uid')
item = {"task_id": task_id, "uid": uid, "log_id": log_id}
list_dict.append(item)
if list_dict:
d.project(list_dict)
else:
break
offset += 1000

View File

@ -0,0 +1,4 @@
from spider.buyin_author_statData_mitm import BuyinAuthorStatDataMitm
addons = [BuyinAuthorStatDataMitm()]

View File

@ -8,21 +8,21 @@ def log(s):
if os.environ.get('ENV_REPLACE_API') == 'prod':
log('生产环境')
mysql_server_daduoduo = {
mysql_server_baiyin = {
"host": '127.0.0.1',
"user": 'root',
"password": os.environ.get('DB_PASSWORD_DADUODUO'),
"db": 'eb_supports_daduoduo',
"password": os.environ.get('DB_PASSWORD_BAIYIN'),
"db": 'eb_supports_baiyin',
"port": 3306,
"charset": 'utf8mb4'
}
else:
log('测试环境')
mysql_server_daduoduo = {
mysql_server_baiyin = {
"host": '127.0.0.1',
"user": 'root',
"password": '123456',
"db": 'eb_supports_daduoduo',
"db": 'eb_supports_baiyin',
"port": 3306,
"charset": 'utf8mb4'
}

View File

@ -0,0 +1,12 @@
from dao.mysql_dao import StoreMysqlPool
from datetime import datetime
import settings
class Base(object):
def __init__(self):
self.eb_supports = StoreMysqlPool(**settings.mysql_server_baiyin)
def log(self, s):
print('%s%s' % (datetime.now(), s), flush=True)

View File

@ -0,0 +1,54 @@
from multiprocessing import Queue
from selenium import webdriver
from spider.base import Base
import time
class BrowserBaiyin(Base):
def __init__(self, **kwargs):
super(BrowserBaiyin, self).__init__()
self.queue_list = Queue()
self.status_type = "status"
executablePath = r"../file/chromedriver"
self.executablePath = kwargs.get("executablePath", executablePath)
options = webdriver.ChromeOptions()
# 配置代码
options.add_experimental_option("debuggerAddress", "127.0.0.1:9222")
options.add_argument("--headless")
self.options = {
"headless": True,
"handleSIGINT": True,
"handleSIGTERM": True,
"handleSIGHUP": True,
}
if self.executablePath is not None:
self.options["executablePath"] = self.executablePath
self.browser = webdriver.Chrome(executable_path=self.executablePath, chrome_options=options)
self.browser.get('https://buyin.jinritemai.com/dashboard/servicehall/daren-profile?'
'log_id=20230711105437315B137C5F87A4BE6336&uid=v2_0a2773177a04f52dbfb23f61609cd2843a31e9'
'22c1355db72de16fe32292f85d99c03dccc7e02117d81a4b0a3cb4d3fab4e8644e5431d4dab3370b9ab0092'
'5112a189a90e5d239f9b2a2e4b688f3db83682da9c158e6903c09d6b97e7f6cc1cea0defaf6c57672a72510'
'da95b60d18e5ade4c90120012201039e3ecc30')
def get_sign_url(self, keywords):
try:
pass
except Exception:
pass
def run(self):
while True:
self.log(f"等待 2 秒")
time.sleep(2)
self.get_sign_url('JY20220616004390')
def close(self):
self.browser.close()
self.browser.quit()
if __name__ == "__main__":
browser = BrowserBaiyin()
browser.run()
browser.close()

View File

@ -1,14 +0,0 @@
import mitmproxy.http
class AddHeader:
def __init__(self):
self.num = 0
def response(self, flow):
print(flow.request.host)
self.num = self.num + 1
flow.response.headers["count"] = str(self.num)
addons = [AddHeader()]

View File

@ -0,0 +1,57 @@
from spider.base import Base
from urllib.parse import parse_qsl, urlsplit
import json
class BuyinAuthorStatDataMitm(Base):
def __init__(self):
super(BuyinAuthorStatDataMitm, self).__init__()
self.达人广场搜索列表 = 'buyin_authorStatData_seekAuthor'
self.作者概述V2 = 'buyin_authorStatData_authorOverviewV2'
self.联系方式 = 'buyin_contact_info'
def response(self, flow):
# 达人广场搜索列表
if "https://buyin.jinritemai.com/api/authorStatData/seekAuthor" in flow.request.url:
list_dicts = []
author_list = json.loads(flow.response.content).get('data').get('list')
event_track_log_id = json.loads(flow.response.content).get('data').get('event_track_log_id')
for author in author_list:
item = {
"task_id": 'project_test',
"data": json.dumps(author),
"deduplication": event_track_log_id + f"&uid={author.get('author_base').get('uid')[0:30]}",
}
list_dicts.append(item)
db_res = self.eb_supports.insert_many(self.达人广场搜索列表, list_dicts)
print(db_res)
# 作者概述V2
if "https://buyin.jinritemai.com/api/authorStatData/authorOverviewV2" in flow.request.url:
uid = dict(parse_qsl(urlsplit(flow.request.url).query)).get('uid')
list_dicts = []
data = json.loads(flow.response.content).get('data')
item = {
"task_id": 'project_test',
"data": json.dumps(data),
"deduplication": f"uid={uid[0:30]}"
}
list_dicts.append(item)
db_res = self.eb_supports.insert_many(self.作者概述V2, list_dicts)
print(db_res)
# 联系方式
if "https://buyin.jinritemai.com/api/contact/contact_info" in flow.request.url:
uid = dict(parse_qsl(urlsplit(flow.request.url).query)).get('origin_uid')
list_dicts = []
data = json.loads(flow.response.content).get('data')
contact_value = data.get('contact_info').get('contact_value')
if contact_value:
item = {
"task_id": 'project_test',
"data": json.dumps(data),
"deduplication": f"uid={uid[0:30]}"
}
list_dicts.append(item)
db_res = self.eb_supports.insert_many(self.联系方式, list_dicts)
print(db_res)