data_monitor.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. import os,sys
  2. from apscheduler.schedulers.blocking import BlockingScheduler
  3. from BaseDataMaintenance.dataSource.source import getConnect_ots,getConnect_activateMQ
  4. from BaseDataMaintenance.dataSource.interface import *
  5. from BaseDataMaintenance.common.Utils import *
  6. from tablestore import *
  7. from BaseDataMaintenance.dataSource.setttings import *
  8. from queue import Queue
  9. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  10. from BaseDataMaintenance.java.MQInfo import getAllQueueSize,getQueueSize
  11. from BaseDataMaintenance.maintenance.dataflow_settings import *
  12. import pandas as pd
  13. flow_attachment_log_path = "/data/python/flow_attachment.log"
  14. flow_extract_log_path = "/data/python/flow_extract.log"
  15. flow_init_path = "/data/python/flow_init.log"
  16. flow_init_log_dir = "/data/python/flow_init_log"
  17. flow_init_check_dir = "/data/python/flow_init_check"
  18. class BaseDataMonitor():
  19. def __init__(self):
  20. self.ots_client = getConnect_ots()
  21. self.recieviers = ["1175730271@qq.com","531870502@qq.com"]
  22. self.list_proposed_count = []
  23. self.current_path = os.path.dirname(__file__)
  24. def cmd_execute(self,_cmd):
  25. return os.popen(_cmd).read()
  26. def get_last_tenmin_time(self,nums=15):
  27. current_time = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
  28. last_ten_minite_time = timeAdd(current_time,0,"%Y-%m-%d %H:%M:%S",-10)
  29. return last_ten_minite_time[:nums]
  30. def monitor_init(self):
  31. def _handle(_item,result_queue):
  32. bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
  33. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  34. SearchQuery(bool_query,get_total_count=True),
  35. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  36. _item["exists"] = total_count
  37. try:
  38. current_date = getCurrent_date("%Y-%m-%d")
  39. last_date = timeAdd(current_date,-1,"%Y-%m-%d")
  40. if not os.path.exists(flow_init_check_dir):
  41. os.mkdir(flow_init_check_dir)
  42. log_filename = os.path.join(flow_init_log_dir,"flow_init_%s.log"%(last_date))
  43. check_filename = os.path.join(flow_init_check_dir,"flow_init_%s.xlsx"%(last_date))
  44. list_uuid = []
  45. task_queue = Queue()
  46. dict_tolong = {}
  47. if not os.path.exists(check_filename) and os.path.exists(log_filename):
  48. _regrex = "delete\s+(?P<tablename>bxkc[^\s]+)\s+.*ID='(?P<uuid>.+)'"
  49. _regrex_tolong = "msg too long:(?P<uuid>[^,]+),\d+"
  50. with open(log_filename,"r",encoding="utf8") as f:
  51. while 1:
  52. _line = f.readline()
  53. if not _line:
  54. break
  55. _match = re.search(_regrex,_line)
  56. if _match is not None:
  57. _uuid = _match.groupdict().get("uuid")
  58. tablename = _match.groupdict().get("tablename")
  59. if _uuid is not None:
  60. list_uuid.append({"uuid":_uuid,"tablename":tablename})
  61. _match = re.search(_regrex_tolong,_line)
  62. if _match is not None:
  63. _uuid = _match.groupdict().get("uuid")
  64. dict_tolong[_uuid] = 1
  65. if list_uuid==0:
  66. _msg = "数据遗漏检查出错"
  67. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  68. sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  69. ots_client = getConnect_ots()
  70. for _d in list_uuid:
  71. task_queue.put(_d)
  72. mt = MultiThreadHandler(task_queue,_handle,None,30)
  73. mt.run()
  74. df_data = {"uuid":[],
  75. "tablename":[],
  76. "exists":[],
  77. "tolong":[]}
  78. for _data in list_uuid:
  79. for k,v in df_data.items():
  80. if k!="tolong":
  81. v.append(_data.get(k))
  82. df_data["tolong"].append(dict_tolong.get(_data["uuid"],0))
  83. df2 = pd.DataFrame(df_data)
  84. df2.to_excel(check_filename)
  85. counts = 0
  86. df_data = pd.read_excel(check_filename)
  87. for _exists,_tolong in zip(df_data["exists"],df_data["tolong"]):
  88. if _exists==0 and _tolong==0:
  89. counts += 1
  90. if counts>0:
  91. _msg = "数据遗漏检查报警,%s有%s条公告遗漏,详见%s"%(last_date,str(counts),check_filename)
  92. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  93. sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  94. except Exception as e:
  95. _msg = "数据遗漏检查报错"
  96. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  97. sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  98. traceback.print_exc()
  99. def monitor_attachment(self):
  100. try:
  101. # query = BoolQuery(must_queries=[
  102. # RangeQuery("status",0,11),
  103. # ])
  104. #
  105. # rows,next_token,total_count_todeal,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  106. # SearchQuery(query,None,True),
  107. # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  108. total_count_todeal = getQueueSize("dataflow_attachment")
  109. if total_count_todeal>100:
  110. # query = BoolQuery(must_queries=[
  111. # RangeQuery("crtime",self.get_last_tenmin_time(16))
  112. # ])
  113. #
  114. # rows,next_token,total_count_todeal_t_a,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  115. # SearchQuery(query,None,True),
  116. # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  117. #
  118. # query = BoolQuery(must_queries=[
  119. # RangeQuery("status",0,11),
  120. # RangeQuery("crtime",self.get_last_tenmin_time(16))
  121. # ])
  122. #
  123. # rows,next_token,total_count_todeal_t,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  124. # SearchQuery(query,None,True),
  125. # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  126. _cmd = 'cat %s | grep -c "%s.*process filemd5"'%(flow_attachment_log_path,self.get_last_tenmin_time())
  127. log(_cmd)
  128. process_count = self.cmd_execute(_cmd)
  129. _cmd = 'cat %s | grep -c "%s.*process filemd5.*True"'%(flow_attachment_log_path,self.get_last_tenmin_time())
  130. log(_cmd)
  131. process_succeed_count = self.cmd_execute(_cmd)
  132. _cmd = 'cat %s | grep -c "%s.*delete sql"'%(flow_init_path,self.get_last_tenmin_time())
  133. log(_cmd)
  134. init_count = self.cmd_execute(_cmd)
  135. _msg = "附件提取队列报警:队列堆积%s条公告,最近十分钟处理公告附件数:%s,处理成功数:%s"%(str(total_count_todeal),str(process_count),str(process_succeed_count))
  136. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  137. sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  138. except Exception as e:
  139. traceback.print_exc()
  140. def monitor_extract(self):
  141. try:
  142. # query = BoolQuery(must_queries=[
  143. # RangeQuery("status",11,61),
  144. # ])
  145. #
  146. # rows,next_token,total_count_todeal,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  147. # SearchQuery(query,None,True),
  148. # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  149. total_count_todeal = getQueueSize("dataflow_extract")
  150. if total_count_todeal>100:
  151. _cmd = 'cat %s | grep "%s" | grep -c "process.*docid"'%(flow_extract_log_path,self.get_last_tenmin_time())
  152. log(_cmd)
  153. process_count = self.cmd_execute(_cmd)
  154. _cmd = 'cat %s | grep "%s" | grep -c "process.*docid.*1$"'%(flow_extract_log_path,self.get_last_tenmin_time())
  155. log(_cmd)
  156. success_count = self.cmd_execute(_cmd)
  157. _cmd = 'cat %s | grep -c "%s.*delete sql"'%(flow_init_path,self.get_last_tenmin_time())
  158. log(_cmd)
  159. init_count = self.cmd_execute(_cmd)
  160. # query = BoolQuery(must_queries=[
  161. # RangeQuery("crtime",self.get_last_tenmin_time(16))
  162. # ])
  163. #
  164. # rows,next_token,total_count_todeal_t_a,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  165. # SearchQuery(query,None,True),
  166. # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  167. #
  168. # query = BoolQuery(must_queries=[
  169. # RangeQuery("status",11,61),
  170. # RangeQuery("crtime",self.get_last_tenmin_time(16))
  171. # ])
  172. #
  173. # rows,next_token,total_count_todeal_t,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  174. # SearchQuery(query,None,True),
  175. # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  176. _msg = "要素提取队列报警:队列堆积%s条公告,最近十分钟入库数:%s,最近十分钟处理公告数:%s,其中成功处理数:%s"%(str(total_count_todeal),str(init_count),str(process_count),str(success_count))
  177. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  178. sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  179. except Exception as e:
  180. traceback.print_exc()
  181. def monitor_proposedBuilding(self):
  182. current_date = getCurrent_date("%Y-%m-%d")
  183. last_date = timeAdd(current_date,-1,"%Y-%m-%d")
  184. query = BoolQuery(must_queries=[
  185. RangeQuery("page_time",last_date),
  186. WildcardQuery("docids","*")
  187. ])
  188. rows,next_token,total_count,is_all_succeed = self.ots_client.search("designed_project","designed_project_index",
  189. SearchQuery(query,None,True),
  190. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  191. if total_count<=100:
  192. _msg = "拟在建生成报警:最近两天生成的拟在建数量为:%d"%(total_count)
  193. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  194. sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  195. def monitor_sychr(self):
  196. current_date = getCurrent_date("%Y-%m-%d")
  197. last_date = timeAdd(current_date,-1,"%Y-%m-%d")
  198. query = BoolQuery(must_queries=[
  199. RangeQuery("status",*flow_sychro_status_from,True,True),
  200. ])
  201. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  202. SearchQuery(query,None,True),
  203. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  204. if total_count>=200:
  205. _msg = "数据流报警:待同步到成品表公告数为:%d"%(total_count)
  206. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  207. sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  208. def monitor_preproject(self):
  209. current_date = getCurrent_date("%Y-%m-%d")
  210. last_date = timeAdd(current_date,-7,"%Y-%m-%d")
  211. query = BoolQuery(must_queries=[
  212. RangeQuery("crtime",last_date),
  213. ])
  214. rows,next_token,total_count,is_all_succeed = self.ots_client.search("preproject","preproject_index",
  215. SearchQuery(query,None,True),
  216. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  217. log("preproject count:%d"%(total_count))
  218. if total_count<=10*10000:
  219. _msg = "周期项目生成报警:最近一周生成/更新的周期项目数量为:%d"%(total_count)
  220. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  221. sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  222. def monitor_dumplicate(self):
  223. current_date = getCurrent_date("%Y-%m-%d")
  224. last_date = timeAdd(current_date,-1,"%Y-%m-%d")
  225. query = BoolQuery(must_queries=[
  226. TermQuery("page_time",last_date),
  227. RangeQuery("status",71),
  228. ])
  229. rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  230. SearchQuery(query,None,True),
  231. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  232. query = BoolQuery(must_queries=[
  233. RangeQuery("status",66,71),
  234. ])
  235. rows,next_token,total_count_to_dump,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  236. SearchQuery(query,None,True),
  237. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  238. query = BoolQuery(must_queries=[
  239. TermQuery("page_time",last_date),
  240. RangeQuery("status",71),
  241. TermQuery("save",0)
  242. ])
  243. rows,next_token,total_count_lastday_dump,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  244. SearchQuery(query,None,True),
  245. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  246. if total_count_lastday_dump/total_count_lastday<0.2:
  247. _msg = "公告去重报警,%s入库公告数:%d,其中去重数:%d,去重率:%.2f"%(last_date,total_count_lastday,total_count_lastday_dump,total_count_lastday_dump/total_count_lastday)
  248. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  249. sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  250. if total_count_to_dump>2000:
  251. _msg = "公告去重报警,待去重数量:%s"%(str(total_count_to_dump))
  252. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  253. sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  254. #成品表监控
  255. query = BoolQuery(must_queries=[
  256. TermQuery("page_time",last_date),
  257. ])
  258. rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document","document_index",
  259. SearchQuery(query,None,True),
  260. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  261. query = BoolQuery(must_queries=[
  262. TermQuery("page_time",last_date),
  263. RangeQuery("status",401,451),
  264. ])
  265. rows,next_token,total_count_lastday_dump,is_all_succeed = self.ots_client.search("document","document_index",
  266. SearchQuery(query,None,True),
  267. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  268. if total_count_lastday_dump/total_count_lastday<0.2:
  269. _msg = "公告成品去重报警,%s入库公告数:%d,其中去重数:%d,去重率:%.2f"%(last_date,total_count_lastday,total_count_lastday_dump,total_count_lastday_dump/total_count_lastday)
  270. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  271. sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  272. query = BoolQuery(must_queries=[
  273. RangeQuery("status",*flow_dumplicate_status_from,True,True),
  274. ])
  275. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  276. SearchQuery(query,None,True),
  277. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  278. if total_count>=200:
  279. _msg = "数据流报警:待去重公告数为:%d"%(total_count)
  280. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  281. sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  282. def monitor_merge(self):
  283. current_date = getCurrent_date("%Y-%m-%d")
  284. last_date = timeAdd(current_date,-1,"%Y-%m-%d")
  285. check_count = 20000
  286. query = BoolQuery(must_queries=[
  287. RangeQuery("status",201,301)
  288. ])
  289. list_doc = []
  290. queue_docid = Queue()
  291. rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document","document_index",
  292. SearchQuery(query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),get_total_count=True),
  293. columns_to_get=ColumnsToGet(["docchannel"],return_type=ColumnReturnType.SPECIFIED))
  294. list_data = getRow_ots(rows)
  295. list_doc.extend(list_data)
  296. while next_token and len(list_doc)<check_count:
  297. rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document","document_index",
  298. SearchQuery(query,next_token=next_token,limit=100,get_total_count=True),
  299. columns_to_get=ColumnsToGet(["docchannel"],return_type=ColumnReturnType.SPECIFIED))
  300. list_data = getRow_ots(rows)
  301. list_doc.extend(list_data)
  302. for _doc in list_doc:
  303. queue_docid.put(_doc)
  304. def _handle(item,result_queue):
  305. docid = item.get("docid")
  306. _query = BoolQuery(must_queries=[TermQuery("docids",str(docid))])
  307. rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("project2","project2_index",
  308. SearchQuery(_query,None,limit=100,get_total_count=True),
  309. columns_to_get=ColumnsToGet(["zhao_biao_page_time","zhong_biao_page_time","docid_number"],return_type=ColumnReturnType.SPECIFIED))
  310. list_data = getRow_ots(rows)
  311. item["projects"] = list_data
  312. mt = MultiThreadHandler(queue_docid,_handle,None,30)
  313. mt.run()
  314. not_find_count = 0
  315. zhongbiao_find_zhaobiao = 0
  316. zhongbiao_count = 0
  317. multi_count = 0
  318. list_notfind = []
  319. list_multi = []
  320. for _doc in list_doc:
  321. print(_doc)
  322. if len(_doc.get("projects",[]))==0:
  323. not_find_count += 1
  324. list_notfind.append(str(_doc.get("docid")))
  325. if len(_doc.get("projects",[]))>=2:
  326. multi_count += 1
  327. list_multi.append(str(_doc.get("docid")))
  328. if _doc.get("docchannel") in (101,118,119,120):
  329. zhongbiao_count += 1
  330. if len(_doc.get("projects",[]))==1:
  331. _project = _doc.get("projects")[0]
  332. if _project.get("zhao_biao_page_time","")!="":
  333. zhongbiao_find_zhaobiao += 1
  334. if not_find_count>0 or multi_count>0 or zhongbiao_find_zhaobiao/zhongbiao_count<0.8:
  335. if not_find_count>0 or multi_count>0:
  336. current_time = getCurrent_date(format="%Y-%m-%d_%H%M%S")
  337. logname = os.path.join(self.current_path,"log","%s.log"%current_time)
  338. with open(logname,"w",encoding="utf8") as f:
  339. f.write(",".join(list_notfind))
  340. f.write("\n")
  341. f.write(",".join(list_multi))
  342. _msg = "公告合并报警:近%d条成品公告,有%d条未生成项目,有%d条公告找到多个项目,详见%s;其中有%d条中标公告且找到招标公告数为%d,比率为%.3f"%(check_count,not_find_count,multi_count,logname,zhongbiao_count,zhongbiao_find_zhaobiao,zhongbiao_find_zhaobiao/zhongbiao_count)
  343. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  344. sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  345. else:
  346. _msg = "公告合并报警:近%d条成品公告其中有%d条中标公告且找到招标公告数为%d,比率为%.3f"%(check_count,zhongbiao_count,zhongbiao_find_zhaobiao,zhongbiao_find_zhaobiao/zhongbiao_count)
  347. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  348. sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  349. query = BoolQuery(must_queries=[
  350. TermQuery("page_time",last_date),
  351. RangeQuery("status",71),
  352. ])
  353. rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  354. SearchQuery(query,None,True),
  355. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  356. if total_count_lastday<10*10000:
  357. _msg = "公告成品入库报警,%s入库公告数:%d"%(last_date,total_count_lastday)
  358. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  359. sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  360. def start_monitor(self):
  361. scheduler = BlockingScheduler()
  362. # scheduler.add_job(self.monitor_attachment,"cron",minute="*/10")
  363. scheduler.add_job(self.monitor_extract,"cron",minute="*/10")
  364. scheduler.add_job(self.monitor_proposedBuilding,"cron",hour="*/2")
  365. scheduler.add_job(self.monitor_dumplicate,"cron",minute="*/10")
  366. scheduler.add_job(self.monitor_sychr,"cron",minute="*/10")
  367. scheduler.add_job(self.monitor_preproject,"cron",hour="8")
  368. scheduler.add_job(self.monitor_merge,"cron",hour="*/1")
  369. scheduler.add_job(self.monitor_init,"cron",hour="*/3")
  370. scheduler.start()
  371. if __name__ == '__main__':
  372. dm = BaseDataMonitor()
  373. # dm.start_monitor()
  374. print(dm.get_last_tenmin_time(16))