data_monitor_fjs.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718
  1. import os,sys
  2. sys.path.append(os.path.dirname(__file__)+"/../../")
  3. from apscheduler.schedulers.blocking import BlockingScheduler
  4. from BaseDataMaintenance.dataSource.source import getConnect_ots,getConnect_activateMQ
  5. from BaseDataMaintenance.dataSource.interface import *
  6. from BaseDataMaintenance.common.Utils import *
  7. from tablestore import *
  8. from BaseDataMaintenance.dataSource.setttings import *
  9. from queue import Queue
  10. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  11. from BaseDataMaintenance.maintenance.dataflow_settings import *
  12. import pandas as pd
  13. flow_attachment_log_path = "/data/python/flow_attachment.log"
  14. flow_extract_log_path = "/data/python/flow_extract.log"
  15. flow_init_path = "/data/python/flow_init.log"
  16. flow_init_log_dir = "/data/python"
  17. flow_init_check_dir = "/data/python/flow_init_check"
  18. flow_dumplicate_log_path = "/python_log/flow_dumplicate.log"
  19. class BaseDataMonitor():
  20. def __init__(self):
  21. self.ots_client = getConnect_ots()
  22. self.recieviers = ["1175730271@qq.com","531870502@qq.com"]
  23. self.list_proposed_count = []
  24. self.current_path = os.path.dirname(__file__)
  25. def cmd_execute(self,_cmd):
  26. with os.popen(_cmd) as f:
  27. return f.read()
  28. def get_last_tenmin_time(self,nums=15):
  29. current_time = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
  30. last_ten_minite_time = timeAdd(current_time,0,"%Y-%m-%d %H:%M:%S",-10)
  31. return last_ten_minite_time[:nums]
  32. def check_document_uuid(self,log_filename):
  33. def _handle(_item,result_queue):
  34. bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
  35. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  36. SearchQuery(bool_query,get_total_count=True),
  37. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  38. _item["exists"] = total_count
  39. check_filename = "%s_check.xlsx"%(log_filename)
  40. list_uuid = []
  41. task_queue = Queue()
  42. dict_tolong = {}
  43. if not os.path.exists(check_filename) and os.path.exists(log_filename):
  44. _regrex = "delete\s+(?P<tablename>bxkc[^\s]+)\s+.*ID='(?P<uuid>.+)'"
  45. _regrex_tolong = "msg too long:(?P<uuid>[^,]+),\d+"
  46. with open(log_filename,"r",encoding="utf8") as f:
  47. while 1:
  48. _line = f.readline()
  49. if not _line:
  50. break
  51. _match = re.search(_regrex,_line)
  52. if _match is not None:
  53. _uuid = _match.groupdict().get("uuid")
  54. tablename = _match.groupdict().get("tablename")
  55. if _uuid is not None:
  56. list_uuid.append({"uuid":_uuid,"tablename":tablename})
  57. _match = re.search(_regrex_tolong,_line)
  58. if _match is not None:
  59. _uuid = _match.groupdict().get("uuid")
  60. dict_tolong[_uuid] = 1
  61. if list_uuid==0:
  62. _msg = "数据遗漏检查出错"
  63. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
  64. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  65. ots_client = getConnect_ots()
  66. for _d in list_uuid:
  67. task_queue.put(_d)
  68. mt = MultiThreadHandler(task_queue,_handle,None,30)
  69. mt.run()
  70. df_data = {"uuid":[],
  71. "tablename":[],
  72. "exists":[],
  73. "tolong":[]}
  74. for _data in list_uuid:
  75. for k,v in df_data.items():
  76. if k!="tolong":
  77. v.append(_data.get(k))
  78. df_data["tolong"].append(dict_tolong.get(_data["uuid"],0))
  79. df2 = pd.DataFrame(df_data)
  80. df2.to_excel(check_filename)
  81. def monitor_init(self):
  82. def _handle(_item,result_queue):
  83. bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
  84. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  85. SearchQuery(bool_query,get_total_count=True),
  86. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  87. _item["exists"] = total_count
  88. try:
  89. current_date = getCurrent_date("%Y-%m-%d")
  90. last_date = timeAdd(current_date,-1,"%Y-%m-%d")
  91. if not os.path.exists(flow_init_check_dir):
  92. os.mkdir(flow_init_check_dir)
  93. log_filename = os.path.join(flow_init_log_dir,"flow_init_%s.log"%(last_date))
  94. check_filename = os.path.join(flow_init_check_dir,"flow_init_%s.xlsx"%(last_date))
  95. list_uuid = []
  96. task_queue = Queue()
  97. dict_tolong = {}
  98. if not os.path.exists(check_filename) and os.path.exists(log_filename):
  99. _regrex = "delete\s+(?P<tablename>bxkc[^\s]+)\s+.*ID='(?P<uuid>.+)'"
  100. _regrex_tolong = "msg too long:(?P<uuid>[^,]+),\d+"
  101. with open(log_filename,"r",encoding="utf8") as f:
  102. while 1:
  103. _line = f.readline()
  104. if not _line:
  105. break
  106. _match = re.search(_regrex,_line)
  107. if _match is not None:
  108. _uuid = _match.groupdict().get("uuid")
  109. tablename = _match.groupdict().get("tablename")
  110. if _uuid is not None:
  111. list_uuid.append({"uuid":_uuid,"tablename":tablename})
  112. _match = re.search(_regrex_tolong,_line)
  113. if _match is not None:
  114. _uuid = _match.groupdict().get("uuid")
  115. dict_tolong[_uuid] = 1
  116. if list_uuid==0:
  117. _msg = "数据遗漏检查出错"
  118. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
  119. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  120. ots_client = getConnect_ots()
  121. for _d in list_uuid:
  122. task_queue.put(_d)
  123. mt = MultiThreadHandler(task_queue,_handle,None,30)
  124. mt.run()
  125. df_data = {"uuid":[],
  126. "tablename":[],
  127. "exists":[],
  128. "tolong":[]}
  129. for _data in list_uuid:
  130. for k,v in df_data.items():
  131. if k!="tolong":
  132. v.append(_data.get(k))
  133. df_data["tolong"].append(dict_tolong.get(_data["uuid"],0))
  134. df2 = pd.DataFrame(df_data)
  135. df2.to_excel(check_filename)
  136. counts = 0
  137. df_data = pd.read_excel(check_filename)
  138. for _exists,_tolong in zip(df_data["exists"],df_data["tolong"]):
  139. if _exists==0 and _tolong==0:
  140. counts += 1
  141. if counts>0:
  142. _msg = "数据遗漏检查报警,%s有%s条公告遗漏,详见%s"%(last_date,str(counts),check_filename)
  143. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
  144. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  145. except Exception as e:
  146. _msg = "数据遗漏检查报错"
  147. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
  148. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  149. traceback.print_exc()
  150. def monitor_init_fjs(self):
  151. def _handle(_item,result_queue):
  152. bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
  153. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  154. SearchQuery(bool_query,get_total_count=True),
  155. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  156. _item["exists"] = total_count
  157. try:
  158. current_date = getCurrent_date("%Y-%m-%d")
  159. last_date = timeAdd(current_date,-1,"%Y-%m-%d")
  160. if not os.path.exists(flow_init_check_dir):
  161. os.mkdir(flow_init_check_dir)
  162. log_filename = os.path.join(flow_init_log_dir,"flow_init.log")
  163. check_filename = os.path.join(flow_init_check_dir,"flow_init_2024-05-21.xlsx")
  164. list_uuid = []
  165. task_queue = Queue()
  166. dict_tolong = {}
  167. if not os.path.exists(check_filename) and os.path.exists(log_filename):
  168. print('--------- 1 ----------')
  169. _regrex = "delete\s+(?P<tablename>bxkc[^\s]+)\s+.*ID='(?P<uuid>.+)'"
  170. _regrex_tolong = "msg too long:(?P<uuid>[^,]+),\d+"
  171. with open(log_filename,"r",encoding="utf8") as f:
  172. while 1:
  173. _line = f.readline()
  174. if not _line:
  175. break
  176. _match = re.search(_regrex,_line)
  177. if _match is not None:
  178. _uuid = _match.groupdict().get("uuid")
  179. tablename = _match.groupdict().get("tablename")
  180. if _uuid is not None:
  181. list_uuid.append({"uuid":_uuid,"tablename":tablename})
  182. _match = re.search(_regrex_tolong,_line)
  183. if _match is not None:
  184. _uuid = _match.groupdict().get("uuid")
  185. dict_tolong[_uuid] = 1
  186. if list_uuid==0:
  187. _msg = "数据遗漏检查出错"
  188. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
  189. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  190. ots_client = getConnect_ots()
  191. for _d in list_uuid:
  192. task_queue.put(_d)
  193. mt = MultiThreadHandler(task_queue,_handle,None,30)
  194. mt.run()
  195. df_data = {"uuid":[],
  196. "tablename":[],
  197. "exists":[],
  198. "tolong":[]}
  199. for _data in list_uuid:
  200. for k,v in df_data.items():
  201. if k!="tolong":
  202. v.append(_data.get(k))
  203. df_data["tolong"].append(dict_tolong.get(_data["uuid"],0))
  204. df2 = pd.DataFrame(df_data)
  205. df2.to_excel(check_filename)
  206. counts = 0
  207. df_data = pd.read_excel(check_filename)
  208. for _exists,_tolong in zip(df_data["exists"],df_data["tolong"]):
  209. if _exists==0 and _tolong==0:
  210. counts += 1
  211. if counts>0:
  212. _msg = "数据遗漏检查报警,%s有%s条公告遗漏,详见%s"%(last_date,str(counts),check_filename)
  213. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
  214. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  215. except Exception as e:
  216. _msg = "数据遗漏检查报错"
  217. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
  218. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  219. traceback.print_exc()
  220. def monitor_attachment(self):
  221. from BaseDataMaintenance.java.MQInfo import getAllQueueSize,getQueueSize
  222. try:
  223. # query = BoolQuery(must_queries=[
  224. # RangeQuery("status",0,11),
  225. # ])
  226. #
  227. # rows,next_token,total_count_todeal,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  228. # SearchQuery(query,None,True),
  229. # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  230. total_count_todeal = getQueueSize("dataflow_attachment")
  231. if total_count_todeal>100:
  232. # query = BoolQuery(must_queries=[
  233. # RangeQuery("crtime",self.get_last_tenmin_time(16))
  234. # ])
  235. #
  236. # rows,next_token,total_count_todeal_t_a,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  237. # SearchQuery(query,None,True),
  238. # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  239. #
  240. # query = BoolQuery(must_queries=[
  241. # RangeQuery("status",0,11),
  242. # RangeQuery("crtime",self.get_last_tenmin_time(16))
  243. # ])
  244. #
  245. # rows,next_token,total_count_todeal_t,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  246. # SearchQuery(query,None,True),
  247. # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  248. #通过命令行获取日志情况
  249. # _cmd = 'cat %s | grep -c "%s.*process filemd5"'%(flow_attachment_log_path,self.get_last_tenmin_time())
  250. # log(_cmd)
  251. # process_count = self.cmd_execute(_cmd)
  252. #
  253. # _cmd = 'cat %s | grep -c "%s.*process filemd5.*True"'%(flow_attachment_log_path,self.get_last_tenmin_time())
  254. # log(_cmd)
  255. # process_succeed_count = self.cmd_execute(_cmd)
  256. #
  257. # _cmd = 'cat %s | grep -c "%s.*delete sql"'%(flow_init_path,self.get_last_tenmin_time())
  258. # log(_cmd)
  259. # init_count = self.cmd_execute(_cmd)
  260. # _msg = "附件提取队列报警:队列堆积%s条公告,最近十分钟处理公告附件数:%s,处理成功数:%s"%(str(total_count_todeal),str(process_count),str(process_succeed_count))
  261. #通过读取文件获取日志情况
  262. dict_type = {}
  263. _pattern = "%s.*process filemd5\:[^\s]* (?P<result>(True|False)) of type\:(?P<type>[^\s]*).*recognize takes (?P<costtime>\d+)s"%(re.escape(self.get_last_tenmin_time()))
  264. with open(flow_attachment_log_path,"r",encoding="utf8") as f:
  265. while True:
  266. line = f.readline()
  267. if not line:
  268. break
  269. _match = re.search(_pattern,str(line))
  270. if _match is not None:
  271. _type = _match.groupdict().get("type")
  272. _result = _match.groupdict().get("result")
  273. _costtime = _match.groupdict().get("costtime")
  274. if _type not in dict_type:
  275. dict_type[_type] = {"success":0,"fail":0,"success_costtime":0,"fail_costtime":0}
  276. if _result=="True":
  277. dict_type[_type]["success"] += 1
  278. dict_type[_type]["success_costtime"] += int(_costtime)
  279. else:
  280. dict_type[_type]["fail"] += 1
  281. dict_type[_type]["fail_costtime"] += int(_costtime)
  282. process_count = 0
  283. process_succeed_count = 0
  284. _msg_type = ""
  285. for k,v in dict_type.items():
  286. process_count += v.get("success",0)+v.get("fail",0)
  287. process_succeed_count += v.get("success",0)
  288. _msg_type += "\n类型%s\n\t成功%s,消耗%s秒,%.2f秒/个,\n\t失败%s,消耗%s秒,%.2f秒/个"%(k,str(v.get("success")),str(v.get("success_costtime")),v.get("success_costtime")/max(1,v.get("success")),str(v.get("fail")),str(v.get("fail_costtime")),v.get("fail_costtime")/max(1,v.get("fail")))
  289. _msg = "附件提取队列报警:队列堆积%s条公告,最近十分钟处理公告附件数:%s,处理成功数:%s"%(str(total_count_todeal),str(process_count),str(process_succeed_count))
  290. sentMsgToDD(_msg+_msg_type,ACCESS_TOKEN_DATAWORKS)
  291. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  292. except Exception as e:
  293. traceback.print_exc()
  294. def monitor_extract(self):
  295. from BaseDataMaintenance.java.MQInfo import getAllQueueSize,getQueueSize
  296. try:
  297. # query = BoolQuery(must_queries=[
  298. # RangeQuery("status",11,61),
  299. # ])
  300. #
  301. # rows,next_token,total_count_todeal,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  302. # SearchQuery(query,None,True),
  303. # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  304. total_count_init = getQueueSize("dataflow_init")
  305. if total_count_init>=100:
  306. _msg = "同步队列报警,有%s条数据滞留"%(str(total_count_init))
  307. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
  308. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  309. total_count_todeal = getQueueSize("dataflow_extract")
  310. if total_count_todeal>100:
  311. _cmd = 'cat %s | grep "%s" | grep -c "process.*docid"'%(flow_extract_log_path,self.get_last_tenmin_time())
  312. log(_cmd)
  313. process_count = self.cmd_execute(_cmd)
  314. _cmd = 'cat %s | grep "%s" | grep -c "process.*docid.*1$"'%(flow_extract_log_path,self.get_last_tenmin_time())
  315. log(_cmd)
  316. success_count = self.cmd_execute(_cmd)
  317. _cmd = 'cat %s | grep "%s" | grep -c "fingerprint.*exists docid"'%(flow_extract_log_path,self.get_last_tenmin_time())
  318. log(_cmd)
  319. exists_count = self.cmd_execute(_cmd)
  320. _cmd = 'cat %s | grep -c "%s.*delete sql"'%(flow_init_path,self.get_last_tenmin_time())
  321. log(_cmd)
  322. init_count = self.cmd_execute(_cmd)
  323. # query = BoolQuery(must_queries=[
  324. # RangeQuery("crtime",self.get_last_tenmin_time(16))
  325. # ])
  326. #
  327. # rows,next_token,total_count_todeal_t_a,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  328. # SearchQuery(query,None,True),
  329. # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  330. #
  331. # query = BoolQuery(must_queries=[
  332. # RangeQuery("status",11,61),
  333. # RangeQuery("crtime",self.get_last_tenmin_time(16))
  334. # ])
  335. #
  336. # rows,next_token,total_count_todeal_t,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  337. # SearchQuery(query,None,True),
  338. # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  339. _msg = "要素提取队列报警:队列堆积%s条公告,最近十分钟入库数:%s,最近十分钟处理公告数:%s,其中成功处理数:%s,查库免提取数:%s"%(str(total_count_todeal),str(init_count),str(process_count),str(success_count),str(exists_count))
  340. atAll=False
  341. if success_count==0:
  342. atAll=True
  343. _msg += "\n疑似流程出现问题,请相关负责人尽快处理"
  344. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=atAll)
  345. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  346. except Exception as e:
  347. traceback.print_exc()
  348. def monitor_proposedBuilding(self):
  349. current_date = getCurrent_date("%Y-%m-%d")
  350. current_hour = getCurrent_date("%H")
  351. query = BoolQuery(must_queries=[
  352. RangeQuery("update_time",current_date),
  353. WildcardQuery("docids","*")
  354. ])
  355. rows,next_token,total_count_doc,is_all_succeed = self.ots_client.search("designed_project","designed_project_index",
  356. SearchQuery(query,None,True),
  357. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  358. query = BoolQuery(must_queries=[
  359. RangeQuery("update_time",current_date),
  360. WildcardQuery("spids","*")
  361. ])
  362. rows,next_token,total_count_sp,is_all_succeed = self.ots_client.search("designed_project","designed_project_index",
  363. SearchQuery(query,None,True),
  364. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  365. total_count = total_count_doc+total_count_sp
  366. if not (current_hour in ("00","23","24")):
  367. _msg = "拟在建生成报警:当天生成的拟在建数量为:%d,其中公告生成:%d,审批项目生成:%d"%(total_count,total_count_doc,total_count_sp)
  368. atAll=False
  369. if total_count==0:
  370. atAll=True
  371. _msg += "\n疑似流程出现问题,请相关负责人尽快处理"
  372. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=atAll)
  373. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  374. def monitor_sychr(self):
  375. current_date = getCurrent_date("%Y-%m-%d")
  376. last_date = timeAdd(current_date,-1,"%Y-%m-%d")
  377. query = BoolQuery(must_queries=[
  378. RangeQuery("status",*flow_sychro_status_from,True,True),
  379. ])
  380. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  381. SearchQuery(query,None,True),
  382. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  383. if total_count>=200:
  384. _msg = "数据流报警:待同步到成品表公告数为:%d"%(total_count)
  385. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  386. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  387. def monitor_preproject(self):
  388. current_date = getCurrent_date("%Y-%m-%d")
  389. last_date = timeAdd(current_date,-7,"%Y-%m-%d")
  390. query = BoolQuery(must_queries=[
  391. RangeQuery("crtime",last_date),
  392. ])
  393. rows,next_token,total_count,is_all_succeed = self.ots_client.search("preproject","preproject_index",
  394. SearchQuery(query,None,True),
  395. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  396. log("preproject count:%d"%(total_count))
  397. if total_count<=10*10000:
  398. _msg = "周期项目生成报警:最近一周生成/更新的周期项目数量为:%d"%(total_count)
  399. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  400. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  401. def monitor_dumplicate(self):
  402. current_date = getCurrent_date("%Y-%m-%d")
  403. last_date = timeAdd(current_date,-1,"%Y-%m-%d")
  404. query = BoolQuery(must_queries=[
  405. TermQuery("page_time",last_date),
  406. RangeQuery("status",71),
  407. ])
  408. rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  409. SearchQuery(query,None,True),
  410. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  411. query = BoolQuery(must_queries=[
  412. RangeQuery("status",66,71),
  413. ])
  414. rows,next_token,total_count_to_dump,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  415. SearchQuery(query,None,True),
  416. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  417. query = BoolQuery(must_queries=[
  418. TermQuery("page_time",last_date),
  419. RangeQuery("status",71),
  420. TermQuery("save",0)
  421. ])
  422. rows,next_token,total_count_lastday_dump,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  423. SearchQuery(query,None,True),
  424. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  425. # if total_count_lastday_dump/total_count_lastday<0.2:
  426. # _msg = "公告去重报警,%s入库公告数:%d,其中去重数:%d,去重率:%.2f"%(last_date,total_count_lastday,total_count_lastday_dump,total_count_lastday_dump/total_count_lastday)
  427. # sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  428. # # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  429. # if total_count_to_dump>2000:
  430. # _msg = "公告去重报警,待去重数量:%s"%(str(total_count_to_dump))
  431. # sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  432. # # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  433. #成品表监控
  434. query = BoolQuery(must_queries=[
  435. TermQuery("page_time",last_date),
  436. ])
  437. rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document","document_index",
  438. SearchQuery(query,None,True),
  439. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  440. query = BoolQuery(must_queries=[
  441. TermQuery("page_time",last_date),
  442. RangeQuery("status",401,451),
  443. ])
  444. rows,next_token,total_count_lastday_dump,is_all_succeed = self.ots_client.search("document","document_index",
  445. SearchQuery(query,None,True),
  446. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  447. if total_count_lastday_dump/total_count_lastday<0.2:
  448. _msg = "公告成品去重报警,%s入库公告数:%d,其中去重数:%d,去重率:%.2f"%(last_date,total_count_lastday,total_count_lastday_dump,total_count_lastday_dump/total_count_lastday)
  449. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  450. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  451. query = BoolQuery(must_queries=[
  452. RangeQuery("status",*flow_dumplicate_status_from,True,True),
  453. ])
  454. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  455. SearchQuery(query,None,True),
  456. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  457. if total_count>=1000:
  458. _cmd = 'cat %s | grep -c "%s.*upgrate True save"'%(flow_dumplicate_log_path,self.get_last_tenmin_time())
  459. process_count = self.cmd_execute(_cmd)
  460. atAll = False
  461. if process_count=="":
  462. process_count = 0
  463. if int(process_count)==0:
  464. atAll = True
  465. _msg = "数据流报警:待去重公告数为:%d,最近十分钟去重数为:%s"%(total_count,str(process_count))
  466. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=atAll)
  467. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  468. def monitor_merge(self):
  469. current_date = getCurrent_date("%Y-%m-%d")
  470. last_date = timeAdd(current_date,-1,"%Y-%m-%d")
  471. check_count = 20000
  472. query = BoolQuery(must_queries=[
  473. RangeQuery("status",201,301)
  474. ])
  475. list_doc = []
  476. queue_docid = Queue()
  477. rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document","document_index",
  478. SearchQuery(query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),get_total_count=True),
  479. columns_to_get=ColumnsToGet(["docchannel"],return_type=ColumnReturnType.SPECIFIED))
  480. list_data = getRow_ots(rows)
  481. list_doc.extend(list_data)
  482. while next_token and len(list_doc)<check_count:
  483. rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document","document_index",
  484. SearchQuery(query,next_token=next_token,limit=100,get_total_count=True),
  485. columns_to_get=ColumnsToGet(["docchannel"],return_type=ColumnReturnType.SPECIFIED))
  486. list_data = getRow_ots(rows)
  487. list_doc.extend(list_data)
  488. for _doc in list_doc:
  489. queue_docid.put(_doc)
  490. def _handle(item,result_queue):
  491. docid = item.get("docid")
  492. _query = BoolQuery(must_queries=[TermQuery("docids",str(docid))])
  493. rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("project2","project2_index",
  494. SearchQuery(_query,None,limit=100,get_total_count=True),
  495. columns_to_get=ColumnsToGet(["zhao_biao_page_time","zhong_biao_page_time","docid_number"],return_type=ColumnReturnType.SPECIFIED))
  496. list_data = getRow_ots(rows)
  497. item["projects"] = list_data
  498. mt = MultiThreadHandler(queue_docid,_handle,None,30)
  499. mt.run()
  500. not_find_count = 0
  501. zhongbiao_find_zhaobiao = 0
  502. zhongbiao_count = 0
  503. multi_count = 0
  504. list_notfind = []
  505. list_multi = []
  506. for _doc in list_doc:
  507. if len(_doc.get("projects",[]))==0:
  508. not_find_count += 1
  509. list_notfind.append(str(_doc.get("docid")))
  510. if len(_doc.get("projects",[]))>=2:
  511. multi_count += 1
  512. list_multi.append(str(_doc.get("docid")))
  513. if _doc.get("docchannel") in (101,118,119,120):
  514. zhongbiao_count += 1
  515. if len(_doc.get("projects",[]))==1:
  516. _project = _doc.get("projects")[0]
  517. if _project.get("zhao_biao_page_time","")!="":
  518. zhongbiao_find_zhaobiao += 1
  519. if not_find_count>0 or multi_count>0 or zhongbiao_find_zhaobiao/zhongbiao_count<0.8:
  520. if not_find_count>0 or multi_count>0:
  521. current_time = getCurrent_date(format="%Y-%m-%d_%H%M%S")
  522. logname = os.path.join(self.current_path,"log","%s.log"%current_time)
  523. with open(logname,"w",encoding="utf8") as f:
  524. f.write(",".join(list_notfind))
  525. f.write("\n")
  526. f.write(",".join(list_multi))
  527. _msg = "公告合并报警:近%d条成品公告,有%d条未生成项目,有%d条公告找到多个项目,详见%s;其中有%d条中标公告且找到招标公告数为%d,比率为%.3f"%(check_count,not_find_count,multi_count,logname,zhongbiao_count,zhongbiao_find_zhaobiao,zhongbiao_find_zhaobiao/zhongbiao_count)
  528. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  529. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  530. else:
  531. _msg = "公告合并报警:近%d条成品公告其中有%d条中标公告且找到招标公告数为%d,比率为%.3f"%(check_count,zhongbiao_count,zhongbiao_find_zhaobiao,zhongbiao_find_zhaobiao/zhongbiao_count)
  532. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  533. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  534. query = BoolQuery(must_queries=[
  535. TermQuery("page_time",last_date),
  536. RangeQuery("status",71),
  537. ])
  538. rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  539. SearchQuery(query,None,True),
  540. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  541. if total_count_lastday<10*10000:
  542. _msg = "公告成品入库报警,%s入库公告数:%d"%(last_date,total_count_lastday)
  543. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  544. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  545. def start_monitor(self):
  546. #数据监控
  547. scheduler = BlockingScheduler()
  548. # scheduler.add_job(self.monitor_attachment,"cron",minute="*/10")
  549. scheduler.add_job(self.monitor_extract,"cron",minute="*/10")
  550. scheduler.add_job(self.monitor_proposedBuilding,"cron",hour="*/3")
  551. # scheduler.add_job(self.monitor_dumplicate,"cron",minute="*/10")
  552. scheduler.add_job(self.monitor_sychr,"cron",minute="*/10")
  553. scheduler.add_job(self.monitor_preproject,"cron",hour="8")
  554. scheduler.add_job(self.monitor_merge,"cron",hour="*/1")
  555. scheduler.add_job(self.monitor_init,"cron",hour="*/3")
  556. scheduler.start()
  557. def start_attach_monitor(self):
  558. #附件监控
  559. scheduler = BlockingScheduler()
  560. scheduler.add_job(self.monitor_attachment,"cron",minute="*/10")
  561. scheduler.start()
  562. if __name__ == '__main__':
  563. dm = BaseDataMonitor()
  564. dm.monitor_init_fjs()
  565. # # dm.start_monitor()
  566. # log_filename = "C:\\Users\\Administrator\\Desktop\\flow_init_2023-02-03.log"
  567. # dm.check_document_uuid(log_filename)
  568. # sentMsgToDD("报警test_msg",ACCESS_TOKEN_DATAWORKS)
  569. # dm.monitor_proposedBuilding()
  570. # print(dm.get_last_tenmin_time(16))