data_monitor.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642
  1. import os,sys
  2. from apscheduler.schedulers.blocking import BlockingScheduler
  3. from BaseDataMaintenance.dataSource.source import getConnect_ots,getConnect_activateMQ
  4. from BaseDataMaintenance.dataSource.interface import *
  5. from BaseDataMaintenance.common.Utils import *
  6. from tablestore import *
  7. from BaseDataMaintenance.dataSource.setttings import *
  8. from queue import Queue
  9. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  10. from BaseDataMaintenance.maintenance.dataflow_settings import *
  11. import pandas as pd
  12. flow_attachment_log_path = "/data/python/flow_attachment.log"
  13. flow_extract_log_path = "/data/python/flow_extract.log"
  14. flow_init_path = "/data/python/flow_init.log"
  15. flow_init_log_dir = "/data/python/flow_init_log"
  16. flow_init_check_dir = "/data/python/flow_init_check"
  17. flow_dumplicate_log_path = "/python_log/flow_dumplicate.log"
  18. class BaseDataMonitor():
  19. def __init__(self):
  20. self.ots_client = getConnect_ots()
  21. self.recieviers = ["1175730271@qq.com","531870502@qq.com"]
  22. self.list_proposed_count = []
  23. self.current_path = os.path.dirname(__file__)
  24. def cmd_execute(self,_cmd):
  25. with os.popen(_cmd) as f:
  26. return f.read()
  27. def get_last_tenmin_time(self,nums=15):
  28. current_time = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
  29. last_ten_minite_time = timeAdd(current_time,0,"%Y-%m-%d %H:%M:%S",-10)
  30. return last_ten_minite_time[:nums]
  31. def check_document_uuid(self,log_filename):
  32. def _handle(_item,result_queue):
  33. bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
  34. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  35. SearchQuery(bool_query,get_total_count=True),
  36. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  37. _item["exists"] = total_count
  38. check_filename = "%s_check.xlsx"%(log_filename)
  39. list_uuid = []
  40. task_queue = Queue()
  41. dict_tolong = {}
  42. if not os.path.exists(check_filename) and os.path.exists(log_filename):
  43. _regrex = "delete\s+(?P<tablename>bxkc[^\s]+)\s+.*ID='(?P<uuid>.+)'"
  44. _regrex_tolong = "msg too long:(?P<uuid>[^,]+),\d+"
  45. with open(log_filename,"r",encoding="utf8") as f:
  46. while 1:
  47. _line = f.readline()
  48. if not _line:
  49. break
  50. _match = re.search(_regrex,_line)
  51. if _match is not None:
  52. _uuid = _match.groupdict().get("uuid")
  53. tablename = _match.groupdict().get("tablename")
  54. if _uuid is not None:
  55. list_uuid.append({"uuid":_uuid,"tablename":tablename})
  56. _match = re.search(_regrex_tolong,_line)
  57. if _match is not None:
  58. _uuid = _match.groupdict().get("uuid")
  59. dict_tolong[_uuid] = 1
  60. if list_uuid==0:
  61. _msg = "数据遗漏检查出错"
  62. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
  63. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  64. ots_client = getConnect_ots()
  65. for _d in list_uuid:
  66. task_queue.put(_d)
  67. mt = MultiThreadHandler(task_queue,_handle,None,30)
  68. mt.run()
  69. df_data = {"uuid":[],
  70. "tablename":[],
  71. "exists":[],
  72. "tolong":[]}
  73. for _data in list_uuid:
  74. for k,v in df_data.items():
  75. if k!="tolong":
  76. v.append(_data.get(k))
  77. df_data["tolong"].append(dict_tolong.get(_data["uuid"],0))
  78. df2 = pd.DataFrame(df_data)
  79. df2.to_excel(check_filename)
  80. def monitor_init(self):
  81. def _handle(_item,result_queue):
  82. bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
  83. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  84. SearchQuery(bool_query,get_total_count=True),
  85. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  86. _item["exists"] = total_count
  87. try:
  88. current_date = getCurrent_date("%Y-%m-%d")
  89. last_date = timeAdd(current_date,-1,"%Y-%m-%d")
  90. if not os.path.exists(flow_init_check_dir):
  91. os.mkdir(flow_init_check_dir)
  92. log_filename = os.path.join(flow_init_log_dir,"flow_init_%s.log"%(last_date))
  93. check_filename = os.path.join(flow_init_check_dir,"flow_init_%s.xlsx"%(last_date))
  94. list_uuid = []
  95. task_queue = Queue()
  96. dict_tolong = {}
  97. if not os.path.exists(check_filename) and os.path.exists(log_filename):
  98. _regrex = "delete\s+(?P<tablename>bxkc[^\s]+)\s+.*ID='(?P<uuid>.+)'"
  99. _regrex_tolong = "msg too long:(?P<uuid>[^,]+),\d+"
  100. with open(log_filename,"r",encoding="utf8") as f:
  101. while 1:
  102. _line = f.readline()
  103. if not _line:
  104. break
  105. _match = re.search(_regrex,_line)
  106. if _match is not None:
  107. _uuid = _match.groupdict().get("uuid")
  108. tablename = _match.groupdict().get("tablename")
  109. if _uuid is not None:
  110. list_uuid.append({"uuid":_uuid,"tablename":tablename})
  111. _match = re.search(_regrex_tolong,_line)
  112. if _match is not None:
  113. _uuid = _match.groupdict().get("uuid")
  114. dict_tolong[_uuid] = 1
  115. if list_uuid==0:
  116. _msg = "数据遗漏检查出错"
  117. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
  118. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  119. ots_client = getConnect_ots()
  120. for _d in list_uuid:
  121. task_queue.put(_d)
  122. mt = MultiThreadHandler(task_queue,_handle,None,30)
  123. mt.run()
  124. df_data = {"uuid":[],
  125. "tablename":[],
  126. "exists":[],
  127. "tolong":[]}
  128. for _data in list_uuid:
  129. for k,v in df_data.items():
  130. if k!="tolong":
  131. v.append(_data.get(k))
  132. df_data["tolong"].append(dict_tolong.get(_data["uuid"],0))
  133. df2 = pd.DataFrame(df_data)
  134. df2.to_excel(check_filename)
  135. counts = 0
  136. df_data = pd.read_excel(check_filename)
  137. for _exists,_tolong in zip(df_data["exists"],df_data["tolong"]):
  138. if _exists==0 and _tolong==0:
  139. counts += 1
  140. if counts>0:
  141. _msg = "数据遗漏检查报警,%s有%s条公告遗漏,详见%s"%(last_date,str(counts),check_filename)
  142. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
  143. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  144. except Exception as e:
  145. _msg = "数据遗漏检查报错"
  146. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
  147. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  148. traceback.print_exc()
  149. def monitor_attachment(self):
  150. from BaseDataMaintenance.java.MQInfo import getAllQueueSize,getQueueSize
  151. try:
  152. # query = BoolQuery(must_queries=[
  153. # RangeQuery("status",0,11),
  154. # ])
  155. #
  156. # rows,next_token,total_count_todeal,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  157. # SearchQuery(query,None,True),
  158. # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  159. total_count_todeal = getQueueSize("dataflow_attachment")
  160. if total_count_todeal>100:
  161. # query = BoolQuery(must_queries=[
  162. # RangeQuery("crtime",self.get_last_tenmin_time(16))
  163. # ])
  164. #
  165. # rows,next_token,total_count_todeal_t_a,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  166. # SearchQuery(query,None,True),
  167. # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  168. #
  169. # query = BoolQuery(must_queries=[
  170. # RangeQuery("status",0,11),
  171. # RangeQuery("crtime",self.get_last_tenmin_time(16))
  172. # ])
  173. #
  174. # rows,next_token,total_count_todeal_t,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  175. # SearchQuery(query,None,True),
  176. # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  177. #通过命令行获取日志情况
  178. # _cmd = 'cat %s | grep -c "%s.*process filemd5"'%(flow_attachment_log_path,self.get_last_tenmin_time())
  179. # log(_cmd)
  180. # process_count = self.cmd_execute(_cmd)
  181. #
  182. # _cmd = 'cat %s | grep -c "%s.*process filemd5.*True"'%(flow_attachment_log_path,self.get_last_tenmin_time())
  183. # log(_cmd)
  184. # process_succeed_count = self.cmd_execute(_cmd)
  185. #
  186. # _cmd = 'cat %s | grep -c "%s.*delete sql"'%(flow_init_path,self.get_last_tenmin_time())
  187. # log(_cmd)
  188. # init_count = self.cmd_execute(_cmd)
  189. # _msg = "附件提取队列报警:队列堆积%s条公告,最近十分钟处理公告附件数:%s,处理成功数:%s"%(str(total_count_todeal),str(process_count),str(process_succeed_count))
  190. #通过读取文件获取日志情况
  191. dict_type = {}
  192. _pattern = "%s.*process filemd5\:[^\s]* (?P<result>(True|False)) of type\:(?P<type>[^\s]*).*recognize takes (?P<costtime>\d+)s"%(re.escape(self.get_last_tenmin_time()))
  193. with open(flow_attachment_log_path,"r",encoding="utf8") as f:
  194. while True:
  195. line = f.readline()
  196. if not line:
  197. break
  198. _match = re.search(_pattern,str(line))
  199. if _match is not None:
  200. _type = _match.groupdict().get("type")
  201. _result = _match.groupdict().get("result")
  202. _costtime = _match.groupdict().get("costtime")
  203. if _type not in dict_type:
  204. dict_type[_type] = {"success":0,"fail":0,"success_costtime":0,"fail_costtime":0}
  205. if _result=="True":
  206. dict_type[_type]["success"] += 1
  207. dict_type[_type]["success_costtime"] += int(_costtime)
  208. else:
  209. dict_type[_type]["fail"] += 1
  210. dict_type[_type]["fail_costtime"] += int(_costtime)
  211. process_count = 0
  212. process_succeed_count = 0
  213. _msg_type = ""
  214. for k,v in dict_type.items():
  215. process_count += v.get("success",0)+v.get("fail",0)
  216. process_succeed_count += v.get("success",0)
  217. _msg_type += "\n类型%s\n\t成功%s,消耗%s秒,%.2f秒/个,\n\t失败%s,消耗%s秒,%.2f秒/个"%(k,str(v.get("success")),str(v.get("success_costtime")),v.get("success_costtime")/max(1,v.get("success")),str(v.get("fail")),str(v.get("fail_costtime")),v.get("fail_costtime")/max(1,v.get("fail")))
  218. _msg = "附件提取队列报警:队列堆积%s条公告,最近十分钟处理公告附件数:%s,处理成功数:%s"%(str(total_count_todeal),str(process_count),str(process_succeed_count))
  219. sentMsgToDD(_msg+_msg_type,ACCESS_TOKEN_DATAWORKS)
  220. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  221. except Exception as e:
  222. traceback.print_exc()
  223. def monitor_extract(self):
  224. from BaseDataMaintenance.java.MQInfo import getAllQueueSize,getQueueSize
  225. try:
  226. # query = BoolQuery(must_queries=[
  227. # RangeQuery("status",11,61),
  228. # ])
  229. #
  230. # rows,next_token,total_count_todeal,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  231. # SearchQuery(query,None,True),
  232. # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  233. total_count_init = getQueueSize("dataflow_init")
  234. if total_count_init>=100:
  235. _msg = "同步队列报警,有%s条数据滞留"%(str(total_count_init))
  236. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
  237. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  238. total_count_todeal = getQueueSize("dataflow_extract")
  239. if total_count_todeal>100:
  240. _cmd = 'cat %s | grep "%s" | grep -c "process.*docid"'%(flow_extract_log_path,self.get_last_tenmin_time())
  241. log(_cmd)
  242. process_count = self.cmd_execute(_cmd)
  243. _cmd = 'cat %s | grep "%s" | grep -c "process.*docid.*1$"'%(flow_extract_log_path,self.get_last_tenmin_time())
  244. log(_cmd)
  245. success_count = self.cmd_execute(_cmd)
  246. _cmd = 'cat %s | grep "%s" | grep -c "fingerprint.*exists docid"'%(flow_extract_log_path,self.get_last_tenmin_time())
  247. log(_cmd)
  248. exists_count = self.cmd_execute(_cmd)
  249. _cmd = 'cat %s | grep -c "%s.*delete sql"'%(flow_init_path,self.get_last_tenmin_time())
  250. log(_cmd)
  251. init_count = self.cmd_execute(_cmd)
  252. # query = BoolQuery(must_queries=[
  253. # RangeQuery("crtime",self.get_last_tenmin_time(16))
  254. # ])
  255. #
  256. # rows,next_token,total_count_todeal_t_a,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  257. # SearchQuery(query,None,True),
  258. # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  259. #
  260. # query = BoolQuery(must_queries=[
  261. # RangeQuery("status",11,61),
  262. # RangeQuery("crtime",self.get_last_tenmin_time(16))
  263. # ])
  264. #
  265. # rows,next_token,total_count_todeal_t,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  266. # SearchQuery(query,None,True),
  267. # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  268. _msg = "要素提取队列报警:队列堆积%s条公告,最近十分钟入库数:%s,最近十分钟处理公告数:%s,其中成功处理数:%s,查库免提取数:%s"%(str(total_count_todeal),str(init_count),str(process_count),str(success_count),str(exists_count))
  269. atAll=False
  270. if success_count==0:
  271. atAll=True
  272. _msg += "\n疑似流程出现问题,请相关负责人尽快处理"
  273. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=atAll)
  274. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  275. except Exception as e:
  276. traceback.print_exc()
  277. def monitor_proposedBuilding(self):
  278. current_date = getCurrent_date("%Y-%m-%d")
  279. current_hour = getCurrent_date("%H")
  280. query = BoolQuery(must_queries=[
  281. RangeQuery("update_time",current_date),
  282. WildcardQuery("docids","*")
  283. ])
  284. rows,next_token,total_count_doc,is_all_succeed = self.ots_client.search("designed_project","designed_project_index",
  285. SearchQuery(query,None,True),
  286. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  287. query = BoolQuery(must_queries=[
  288. RangeQuery("update_time",current_date),
  289. WildcardQuery("spids","*")
  290. ])
  291. rows,next_token,total_count_sp,is_all_succeed = self.ots_client.search("designed_project","designed_project_index",
  292. SearchQuery(query,None,True),
  293. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  294. total_count = total_count_doc+total_count_sp
  295. if not (current_hour in ("00","23","24")):
  296. _msg = "拟在建生成报警:当天生成的拟在建数量为:%d,其中公告生成:%d,审批项目生成:%d"%(total_count,total_count_doc,total_count_sp)
  297. atAll=False
  298. if total_count==0:
  299. atAll=True
  300. _msg += "\n疑似流程出现问题,请相关负责人尽快处理"
  301. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=atAll)
  302. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  303. def monitor_sychr(self):
  304. current_date = getCurrent_date("%Y-%m-%d")
  305. last_date = timeAdd(current_date,-1,"%Y-%m-%d")
  306. query = BoolQuery(must_queries=[
  307. RangeQuery("status",*flow_sychro_status_from,True,True),
  308. ])
  309. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  310. SearchQuery(query,None,True),
  311. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  312. if total_count>=200:
  313. _msg = "数据流报警:待同步到成品表公告数为:%d"%(total_count)
  314. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  315. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  316. def monitor_preproject(self):
  317. current_date = getCurrent_date("%Y-%m-%d")
  318. last_date = timeAdd(current_date,-7,"%Y-%m-%d")
  319. query = BoolQuery(must_queries=[
  320. RangeQuery("crtime",last_date),
  321. ])
  322. rows,next_token,total_count,is_all_succeed = self.ots_client.search("preproject","preproject_index",
  323. SearchQuery(query,None,True),
  324. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  325. log("preproject count:%d"%(total_count))
  326. if total_count<=10*10000:
  327. _msg = "周期项目生成报警:最近一周生成/更新的周期项目数量为:%d"%(total_count)
  328. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  329. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  330. def monitor_dumplicate(self):
  331. current_date = getCurrent_date("%Y-%m-%d")
  332. last_date = timeAdd(current_date,-1,"%Y-%m-%d")
  333. query = BoolQuery(must_queries=[
  334. TermQuery("page_time",last_date),
  335. RangeQuery("status",71),
  336. ])
  337. rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  338. SearchQuery(query,None,True),
  339. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  340. query = BoolQuery(must_queries=[
  341. RangeQuery("status",66,71),
  342. ])
  343. rows,next_token,total_count_to_dump,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  344. SearchQuery(query,None,True),
  345. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  346. query = BoolQuery(must_queries=[
  347. TermQuery("page_time",last_date),
  348. RangeQuery("status",71),
  349. TermQuery("save",0)
  350. ])
  351. rows,next_token,total_count_lastday_dump,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  352. SearchQuery(query,None,True),
  353. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  354. # if total_count_lastday_dump/total_count_lastday<0.2:
  355. # _msg = "公告去重报警,%s入库公告数:%d,其中去重数:%d,去重率:%.2f"%(last_date,total_count_lastday,total_count_lastday_dump,total_count_lastday_dump/total_count_lastday)
  356. # sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  357. # # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  358. # if total_count_to_dump>2000:
  359. # _msg = "公告去重报警,待去重数量:%s"%(str(total_count_to_dump))
  360. # sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  361. # # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  362. #成品表监控
  363. query = BoolQuery(must_queries=[
  364. TermQuery("page_time",last_date),
  365. ])
  366. rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document","document_index",
  367. SearchQuery(query,None,True),
  368. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  369. query = BoolQuery(must_queries=[
  370. TermQuery("page_time",last_date),
  371. RangeQuery("status",401,451),
  372. ])
  373. rows,next_token,total_count_lastday_dump,is_all_succeed = self.ots_client.search("document","document_index",
  374. SearchQuery(query,None,True),
  375. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  376. if total_count_lastday_dump/total_count_lastday<0.2:
  377. _msg = "公告成品去重报警,%s入库公告数:%d,其中去重数:%d,去重率:%.2f"%(last_date,total_count_lastday,total_count_lastday_dump,total_count_lastday_dump/total_count_lastday)
  378. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  379. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  380. query = BoolQuery(must_queries=[
  381. RangeQuery("status",*flow_dumplicate_status_from,True,True),
  382. ])
  383. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  384. SearchQuery(query,None,True),
  385. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  386. if total_count>=1000:
  387. _cmd = 'cat %s | grep -c "%s.*merge_project whole_time"'%(flow_dumplicate_log_path,self.get_last_tenmin_time())
  388. process_count = self.cmd_execute(_cmd)
  389. atAll = False
  390. if process_count=="":
  391. process_count = 0
  392. query = BoolQuery(must_queries=[
  393. RangeQuery("status",flow_dumplicate_status_from[1]),
  394. RangeQuery("opertime",self.get_last_tenmin_time())
  395. ])
  396. rows,next_token,total_count_oper,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  397. SearchQuery(query,None,True),
  398. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  399. if int(process_count)==0:
  400. if total_count_oper==0:
  401. atAll = True
  402. _cmd = "echo `tail %s -c 10000k` > %s"%(flow_dumplicate_log_path,flow_dumplicate_log_path)
  403. self.cmd_execute(_cmd)
  404. # if int(process_count)>0 and int(process_count)<100:
  405. # self.cmd_execute("ps -ef | grep dumplicate | grep -v grep|cut -c 9-15|xargs kill -9")
  406. _msg = "数据流报警:待去重公告数为:%d,最近十分钟日志去重数为:%s,ots去重数为:%s"%(total_count,str(process_count),str(total_count_oper))
  407. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=atAll)
  408. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  409. def monitor_merge(self):
  410. current_date = getCurrent_date("%Y-%m-%d")
  411. last_date = timeAdd(current_date,-1,"%Y-%m-%d")
  412. check_count = 20000
  413. query = BoolQuery(must_queries=[
  414. RangeQuery("status",201,301),
  415. RangeQuery("docchannel",0,300)
  416. ])
  417. list_doc = []
  418. queue_docid = Queue()
  419. rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document","document_index",
  420. SearchQuery(query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),get_total_count=True),
  421. columns_to_get=ColumnsToGet(["docchannel"],return_type=ColumnReturnType.SPECIFIED))
  422. list_data = getRow_ots(rows)
  423. list_doc.extend(list_data)
  424. while next_token and len(list_doc)<check_count:
  425. rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document","document_index",
  426. SearchQuery(query,next_token=next_token,limit=100,get_total_count=True),
  427. columns_to_get=ColumnsToGet(["docchannel"],return_type=ColumnReturnType.SPECIFIED))
  428. list_data = getRow_ots(rows)
  429. list_doc.extend(list_data)
  430. for _doc in list_doc:
  431. queue_docid.put(_doc)
  432. def _handle(item,result_queue):
  433. docid = item.get("docid")
  434. _query = BoolQuery(must_queries=[TermQuery("docids",str(docid))])
  435. rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("project2","project2_index",
  436. SearchQuery(_query,None,limit=100,get_total_count=True),
  437. columns_to_get=ColumnsToGet(["zhao_biao_page_time","zhong_biao_page_time","docid_number"],return_type=ColumnReturnType.SPECIFIED))
  438. list_data = getRow_ots(rows)
  439. item["projects"] = list_data
  440. mt = MultiThreadHandler(queue_docid,_handle,None,30)
  441. mt.run()
  442. not_find_count = 0
  443. zhongbiao_find_zhaobiao = 0
  444. zhongbiao_count = 0
  445. multi_count = 0
  446. list_notfind = []
  447. list_multi = []
  448. for _doc in list_doc:
  449. if len(_doc.get("projects",[]))==0:
  450. not_find_count += 1
  451. list_notfind.append(str(_doc.get("docid")))
  452. if len(_doc.get("projects",[]))>=2:
  453. multi_count += 1
  454. list_multi.append(str(_doc.get("docid")))
  455. if _doc.get("docchannel") in (101,118,119,120,121,122):
  456. zhongbiao_count += 1
  457. if len(_doc.get("projects",[]))==1:
  458. _project = _doc.get("projects")[0]
  459. if _project.get("zhao_biao_page_time","")!="":
  460. zhongbiao_find_zhaobiao += 1
  461. _ratio = zhongbiao_find_zhaobiao/zhongbiao_count if zhongbiao_count>0 else 0
  462. if not_find_count>0 or multi_count>0 or _ratio<0.8:
  463. if not_find_count>0 or multi_count>0:
  464. current_time = getCurrent_date(format="%Y-%m-%d_%H%M%S")
  465. logname = os.path.join(self.current_path,"log","%s.log"%current_time)
  466. with open(logname,"w",encoding="utf8") as f:
  467. f.write(",".join(list_notfind))
  468. f.write("\n")
  469. f.write(",".join(list_multi))
  470. _msg = "公告合并报警:近%d条成品公告,有%d条未生成项目,有%d条公告找到多个项目,详见%s;其中有%d条中标公告且找到招标公告数为%d,比率为%.3f"%(check_count,not_find_count,multi_count,logname,zhongbiao_count,zhongbiao_find_zhaobiao,_ratio)
  471. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  472. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  473. else:
  474. _msg = "公告合并报警:近%d条成品公告其中有%d条中标公告且找到招标公告数为%d,比率为%.3f"%(check_count,zhongbiao_count,zhongbiao_find_zhaobiao,zhongbiao_find_zhaobiao/zhongbiao_count)
  475. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  476. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  477. query = BoolQuery(must_queries=[
  478. TermQuery("page_time",last_date),
  479. RangeQuery("status",71),
  480. ])
  481. rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  482. SearchQuery(query,None,True),
  483. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  484. if total_count_lastday<10*10000:
  485. _msg = "公告成品入库报警,%s入库公告数:%d"%(last_date,total_count_lastday)
  486. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  487. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  488. def start_monitor(self):
  489. #数据监控
  490. scheduler = BlockingScheduler()
  491. # scheduler.add_job(self.monitor_attachment,"cron",minute="*/10")
  492. scheduler.add_job(self.monitor_extract,"cron",minute="*/10")
  493. scheduler.add_job(self.monitor_proposedBuilding,"cron",hour="*/3")
  494. # scheduler.add_job(self.monitor_dumplicate,"cron",minute="*/10")
  495. scheduler.add_job(self.monitor_sychr,"cron",minute="*/10")
  496. scheduler.add_job(self.monitor_preproject,"cron",hour="8")
  497. scheduler.add_job(self.monitor_merge,"cron",minute="*/30")
  498. scheduler.add_job(self.monitor_init,"cron",hour="*/3")
  499. scheduler.start()
  500. def start_attach_monitor(self):
  501. #附件监控
  502. scheduler = BlockingScheduler()
  503. scheduler.add_job(self.monitor_attachment,"cron",minute="*/10")
  504. scheduler.start()
  505. if __name__ == '__main__':
  506. # dm = BaseDataMonitor()
  507. # # dm.start_monitor()
  508. # log_filename = "C:\\Users\\Administrator\\Desktop\\flow_init_2023-02-03.log"
  509. # dm.check_document_uuid(log_filename)
  510. sentMsgToDD("报警test_msg",ACCESS_TOKEN_DATAWORKS)
  511. # dm.monitor_proposedBuilding()
  512. # print(dm.get_last_tenmin_time(16))