data_monitor.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618
  1. import os,sys
  2. from apscheduler.schedulers.blocking import BlockingScheduler
  3. from BaseDataMaintenance.dataSource.source import getConnect_ots,getConnect_activateMQ
  4. from BaseDataMaintenance.dataSource.interface import *
  5. from BaseDataMaintenance.common.Utils import *
  6. from tablestore import *
  7. from BaseDataMaintenance.dataSource.setttings import *
  8. from queue import Queue
  9. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  10. from BaseDataMaintenance.java.MQInfo import getAllQueueSize,getQueueSize
  11. from BaseDataMaintenance.maintenance.dataflow_settings import *
  12. import pandas as pd
  13. flow_attachment_log_path = "/data/python/flow_attachment.log"
  14. flow_extract_log_path = "/data/python/flow_extract.log"
  15. flow_init_path = "/data/python/flow_init.log"
  16. flow_init_log_dir = "/data/python/flow_init_log"
  17. flow_init_check_dir = "/data/python/flow_init_check"
  18. class BaseDataMonitor():
  19. def __init__(self):
  20. self.ots_client = getConnect_ots()
  21. self.recieviers = ["1175730271@qq.com","531870502@qq.com"]
  22. self.list_proposed_count = []
  23. self.current_path = os.path.dirname(__file__)
  24. def cmd_execute(self,_cmd):
  25. return os.popen(_cmd).read()
  26. def get_last_tenmin_time(self,nums=15):
  27. current_time = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
  28. last_ten_minite_time = timeAdd(current_time,0,"%Y-%m-%d %H:%M:%S",-10)
  29. return last_ten_minite_time[:nums]
  30. def check_document_uuid(self,log_filename):
  31. def _handle(_item,result_queue):
  32. bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
  33. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  34. SearchQuery(bool_query,get_total_count=True),
  35. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  36. _item["exists"] = total_count
  37. check_filename = "%s_check.xlsx"%(log_filename)
  38. list_uuid = []
  39. task_queue = Queue()
  40. dict_tolong = {}
  41. if not os.path.exists(check_filename) and os.path.exists(log_filename):
  42. _regrex = "delete\s+(?P<tablename>bxkc[^\s]+)\s+.*ID='(?P<uuid>.+)'"
  43. _regrex_tolong = "msg too long:(?P<uuid>[^,]+),\d+"
  44. with open(log_filename,"r",encoding="utf8") as f:
  45. while 1:
  46. _line = f.readline()
  47. if not _line:
  48. break
  49. _match = re.search(_regrex,_line)
  50. if _match is not None:
  51. _uuid = _match.groupdict().get("uuid")
  52. tablename = _match.groupdict().get("tablename")
  53. if _uuid is not None:
  54. list_uuid.append({"uuid":_uuid,"tablename":tablename})
  55. _match = re.search(_regrex_tolong,_line)
  56. if _match is not None:
  57. _uuid = _match.groupdict().get("uuid")
  58. dict_tolong[_uuid] = 1
  59. if list_uuid==0:
  60. _msg = "数据遗漏检查出错"
  61. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
  62. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  63. ots_client = getConnect_ots()
  64. for _d in list_uuid:
  65. task_queue.put(_d)
  66. mt = MultiThreadHandler(task_queue,_handle,None,30)
  67. mt.run()
  68. df_data = {"uuid":[],
  69. "tablename":[],
  70. "exists":[],
  71. "tolong":[]}
  72. for _data in list_uuid:
  73. for k,v in df_data.items():
  74. if k!="tolong":
  75. v.append(_data.get(k))
  76. df_data["tolong"].append(dict_tolong.get(_data["uuid"],0))
  77. df2 = pd.DataFrame(df_data)
  78. df2.to_excel(check_filename)
  79. def monitor_init(self):
  80. def _handle(_item,result_queue):
  81. bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
  82. rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
  83. SearchQuery(bool_query,get_total_count=True),
  84. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  85. _item["exists"] = total_count
  86. try:
  87. current_date = getCurrent_date("%Y-%m-%d")
  88. last_date = timeAdd(current_date,-1,"%Y-%m-%d")
  89. if not os.path.exists(flow_init_check_dir):
  90. os.mkdir(flow_init_check_dir)
  91. log_filename = os.path.join(flow_init_log_dir,"flow_init_%s.log"%(last_date))
  92. check_filename = os.path.join(flow_init_check_dir,"flow_init_%s.xlsx"%(last_date))
  93. list_uuid = []
  94. task_queue = Queue()
  95. dict_tolong = {}
  96. if not os.path.exists(check_filename) and os.path.exists(log_filename):
  97. _regrex = "delete\s+(?P<tablename>bxkc[^\s]+)\s+.*ID='(?P<uuid>.+)'"
  98. _regrex_tolong = "msg too long:(?P<uuid>[^,]+),\d+"
  99. with open(log_filename,"r",encoding="utf8") as f:
  100. while 1:
  101. _line = f.readline()
  102. if not _line:
  103. break
  104. _match = re.search(_regrex,_line)
  105. if _match is not None:
  106. _uuid = _match.groupdict().get("uuid")
  107. tablename = _match.groupdict().get("tablename")
  108. if _uuid is not None:
  109. list_uuid.append({"uuid":_uuid,"tablename":tablename})
  110. _match = re.search(_regrex_tolong,_line)
  111. if _match is not None:
  112. _uuid = _match.groupdict().get("uuid")
  113. dict_tolong[_uuid] = 1
  114. if list_uuid==0:
  115. _msg = "数据遗漏检查出错"
  116. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
  117. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  118. ots_client = getConnect_ots()
  119. for _d in list_uuid:
  120. task_queue.put(_d)
  121. mt = MultiThreadHandler(task_queue,_handle,None,30)
  122. mt.run()
  123. df_data = {"uuid":[],
  124. "tablename":[],
  125. "exists":[],
  126. "tolong":[]}
  127. for _data in list_uuid:
  128. for k,v in df_data.items():
  129. if k!="tolong":
  130. v.append(_data.get(k))
  131. df_data["tolong"].append(dict_tolong.get(_data["uuid"],0))
  132. df2 = pd.DataFrame(df_data)
  133. df2.to_excel(check_filename)
  134. counts = 0
  135. df_data = pd.read_excel(check_filename)
  136. for _exists,_tolong in zip(df_data["exists"],df_data["tolong"]):
  137. if _exists==0 and _tolong==0:
  138. counts += 1
  139. if counts>0:
  140. _msg = "数据遗漏检查报警,%s有%s条公告遗漏,详见%s"%(last_date,str(counts),check_filename)
  141. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
  142. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  143. except Exception as e:
  144. _msg = "数据遗漏检查报错"
  145. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
  146. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  147. traceback.print_exc()
  148. def monitor_attachment(self):
  149. try:
  150. # query = BoolQuery(must_queries=[
  151. # RangeQuery("status",0,11),
  152. # ])
  153. #
  154. # rows,next_token,total_count_todeal,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  155. # SearchQuery(query,None,True),
  156. # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  157. total_count_todeal = getQueueSize("dataflow_attachment")
  158. if total_count_todeal>100:
  159. # query = BoolQuery(must_queries=[
  160. # RangeQuery("crtime",self.get_last_tenmin_time(16))
  161. # ])
  162. #
  163. # rows,next_token,total_count_todeal_t_a,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  164. # SearchQuery(query,None,True),
  165. # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  166. #
  167. # query = BoolQuery(must_queries=[
  168. # RangeQuery("status",0,11),
  169. # RangeQuery("crtime",self.get_last_tenmin_time(16))
  170. # ])
  171. #
  172. # rows,next_token,total_count_todeal_t,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  173. # SearchQuery(query,None,True),
  174. # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  175. #通过命令行获取日志情况
  176. # _cmd = 'cat %s | grep -c "%s.*process filemd5"'%(flow_attachment_log_path,self.get_last_tenmin_time())
  177. # log(_cmd)
  178. # process_count = self.cmd_execute(_cmd)
  179. #
  180. # _cmd = 'cat %s | grep -c "%s.*process filemd5.*True"'%(flow_attachment_log_path,self.get_last_tenmin_time())
  181. # log(_cmd)
  182. # process_succeed_count = self.cmd_execute(_cmd)
  183. #
  184. # _cmd = 'cat %s | grep -c "%s.*delete sql"'%(flow_init_path,self.get_last_tenmin_time())
  185. # log(_cmd)
  186. # init_count = self.cmd_execute(_cmd)
  187. # _msg = "附件提取队列报警:队列堆积%s条公告,最近十分钟处理公告附件数:%s,处理成功数:%s"%(str(total_count_todeal),str(process_count),str(process_succeed_count))
  188. #通过读取文件获取日志情况
  189. dict_type = {}
  190. _pattern = "%s.*process filemd5\:[^\s]* (?P<result>(True|False)) of type\:(?P<type>[^\s]*).*recognize takes (?P<costtime>\d+)s"%(re.escape(self.get_last_tenmin_time()))
  191. with open(flow_attachment_log_path,"r",encoding="utf8") as f:
  192. while True:
  193. line = f.readline()
  194. if not line:
  195. break
  196. _match = re.search(_pattern,str(line))
  197. if _match is not None:
  198. _type = _match.groupdict().get("type")
  199. _result = _match.groupdict().get("result")
  200. _costtime = _match.groupdict().get("costtime")
  201. if _type not in dict_type:
  202. dict_type[_type] = {"success":0,"fail":0,"success_costtime":0,"fail_costtime":0}
  203. if _result=="True":
  204. dict_type[_type]["success"] += 1
  205. dict_type[_type]["success_costtime"] += int(_costtime)
  206. else:
  207. dict_type[_type]["fail"] += 1
  208. dict_type[_type]["fail_costtime"] += int(_costtime)
  209. process_count = 0
  210. process_succeed_count = 0
  211. _msg_type = ""
  212. for k,v in dict_type.items():
  213. process_count += v.get("success",0)+v.get("fail",0)
  214. process_succeed_count += v.get("success",0)
  215. _msg_type += "\n类型%s\n\t成功%s,消耗%s秒,%.2f秒/个,\n\t失败%s,消耗%s秒,%.2f秒/个"%(k,str(v.get("success")),str(v.get("success_costtime")),v.get("success_costtime")/max(1,v.get("success")),str(v.get("fail")),str(v.get("fail_costtime")),v.get("fail_costtime")/max(1,v.get("fail")))
  216. _msg = "附件提取队列报警:队列堆积%s条公告,最近十分钟处理公告附件数:%s,处理成功数:%s"%(str(total_count_todeal),str(process_count),str(process_succeed_count))
  217. sentMsgToDD(_msg+_msg_type,ACCESS_TOKEN_DATAWORKS)
  218. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  219. except Exception as e:
  220. traceback.print_exc()
  221. def monitor_extract(self):
  222. try:
  223. # query = BoolQuery(must_queries=[
  224. # RangeQuery("status",11,61),
  225. # ])
  226. #
  227. # rows,next_token,total_count_todeal,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  228. # SearchQuery(query,None,True),
  229. # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  230. total_count_init = getQueueSize("dataflow_init")
  231. if total_count_init>=100:
  232. _msg = "同步队列报警,有%s条数据滞留"%(str(total_count_init))
  233. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
  234. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  235. total_count_todeal = getQueueSize("dataflow_extract")
  236. if total_count_todeal>100:
  237. _cmd = 'cat %s | grep "%s" | grep -c "process.*docid"'%(flow_extract_log_path,self.get_last_tenmin_time())
  238. log(_cmd)
  239. process_count = self.cmd_execute(_cmd)
  240. _cmd = 'cat %s | grep "%s" | grep -c "process.*docid.*1$"'%(flow_extract_log_path,self.get_last_tenmin_time())
  241. log(_cmd)
  242. success_count = self.cmd_execute(_cmd)
  243. _cmd = 'cat %s | grep "%s" | grep -c "fingerprint.*exists docid"'%(flow_extract_log_path,self.get_last_tenmin_time())
  244. log(_cmd)
  245. exists_count = self.cmd_execute(_cmd)
  246. _cmd = 'cat %s | grep -c "%s.*delete sql"'%(flow_init_path,self.get_last_tenmin_time())
  247. log(_cmd)
  248. init_count = self.cmd_execute(_cmd)
  249. # query = BoolQuery(must_queries=[
  250. # RangeQuery("crtime",self.get_last_tenmin_time(16))
  251. # ])
  252. #
  253. # rows,next_token,total_count_todeal_t_a,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  254. # SearchQuery(query,None,True),
  255. # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  256. #
  257. # query = BoolQuery(must_queries=[
  258. # RangeQuery("status",11,61),
  259. # RangeQuery("crtime",self.get_last_tenmin_time(16))
  260. # ])
  261. #
  262. # rows,next_token,total_count_todeal_t,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  263. # SearchQuery(query,None,True),
  264. # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  265. _msg = "要素提取队列报警:队列堆积%s条公告,最近十分钟入库数:%s,最近十分钟处理公告数:%s,其中成功处理数:%s,查库免提取数:%s"%(str(total_count_todeal),str(init_count),str(process_count),str(success_count),str(exists_count))
  266. atAll=False
  267. if success_count==0:
  268. atAll=True
  269. _msg += "\n疑似流程出现问题,请相关负责人尽快处理"
  270. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=atAll)
  271. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  272. except Exception as e:
  273. traceback.print_exc()
  274. def monitor_proposedBuilding(self):
  275. current_date = getCurrent_date("%Y-%m-%d")
  276. current_hour = getCurrent_date("%H")
  277. query = BoolQuery(must_queries=[
  278. RangeQuery("update_time",current_date),
  279. WildcardQuery("docids","*")
  280. ])
  281. rows,next_token,total_count_doc,is_all_succeed = self.ots_client.search("designed_project","designed_project_index",
  282. SearchQuery(query,None,True),
  283. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  284. query = BoolQuery(must_queries=[
  285. RangeQuery("update_time",current_date),
  286. WildcardQuery("spids","*")
  287. ])
  288. rows,next_token,total_count_sp,is_all_succeed = self.ots_client.search("designed_project","designed_project_index",
  289. SearchQuery(query,None,True),
  290. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  291. total_count = total_count_doc+total_count_sp
  292. if not (current_hour in ("0","23","24")):
  293. _msg = "拟在建生成报警:当天生成的拟在建数量为:%d,其中公告生成:%d,审批项目生成:%d"%(total_count,total_count_doc,total_count_sp)
  294. atAll=False
  295. if total_count==0:
  296. atAll=True
  297. _msg += "\n疑似流程出现问题,请相关负责人尽快处理"
  298. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=atAll)
  299. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  300. def monitor_sychr(self):
  301. current_date = getCurrent_date("%Y-%m-%d")
  302. last_date = timeAdd(current_date,-1,"%Y-%m-%d")
  303. query = BoolQuery(must_queries=[
  304. RangeQuery("status",*flow_sychro_status_from,True,True),
  305. ])
  306. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  307. SearchQuery(query,None,True),
  308. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  309. if total_count>=200:
  310. _msg = "数据流报警:待同步到成品表公告数为:%d"%(total_count)
  311. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  312. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  313. def monitor_preproject(self):
  314. current_date = getCurrent_date("%Y-%m-%d")
  315. last_date = timeAdd(current_date,-7,"%Y-%m-%d")
  316. query = BoolQuery(must_queries=[
  317. RangeQuery("crtime",last_date),
  318. ])
  319. rows,next_token,total_count,is_all_succeed = self.ots_client.search("preproject","preproject_index",
  320. SearchQuery(query,None,True),
  321. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  322. log("preproject count:%d"%(total_count))
  323. if total_count<=10*10000:
  324. _msg = "周期项目生成报警:最近一周生成/更新的周期项目数量为:%d"%(total_count)
  325. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  326. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  327. def monitor_dumplicate(self):
  328. current_date = getCurrent_date("%Y-%m-%d")
  329. last_date = timeAdd(current_date,-1,"%Y-%m-%d")
  330. query = BoolQuery(must_queries=[
  331. TermQuery("page_time",last_date),
  332. RangeQuery("status",71),
  333. ])
  334. rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  335. SearchQuery(query,None,True),
  336. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  337. query = BoolQuery(must_queries=[
  338. RangeQuery("status",66,71),
  339. ])
  340. rows,next_token,total_count_to_dump,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  341. SearchQuery(query,None,True),
  342. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  343. query = BoolQuery(must_queries=[
  344. TermQuery("page_time",last_date),
  345. RangeQuery("status",71),
  346. TermQuery("save",0)
  347. ])
  348. rows,next_token,total_count_lastday_dump,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  349. SearchQuery(query,None,True),
  350. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  351. if total_count_lastday_dump/total_count_lastday<0.2:
  352. _msg = "公告去重报警,%s入库公告数:%d,其中去重数:%d,去重率:%.2f"%(last_date,total_count_lastday,total_count_lastday_dump,total_count_lastday_dump/total_count_lastday)
  353. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  354. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  355. # if total_count_to_dump>2000:
  356. # _msg = "公告去重报警,待去重数量:%s"%(str(total_count_to_dump))
  357. # sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  358. # # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  359. #成品表监控
  360. query = BoolQuery(must_queries=[
  361. TermQuery("page_time",last_date),
  362. ])
  363. rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document","document_index",
  364. SearchQuery(query,None,True),
  365. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  366. query = BoolQuery(must_queries=[
  367. TermQuery("page_time",last_date),
  368. RangeQuery("status",401,451),
  369. ])
  370. rows,next_token,total_count_lastday_dump,is_all_succeed = self.ots_client.search("document","document_index",
  371. SearchQuery(query,None,True),
  372. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  373. if total_count_lastday_dump/total_count_lastday<0.2:
  374. _msg = "公告成品去重报警,%s入库公告数:%d,其中去重数:%d,去重率:%.2f"%(last_date,total_count_lastday,total_count_lastday_dump,total_count_lastday_dump/total_count_lastday)
  375. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  376. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  377. query = BoolQuery(must_queries=[
  378. RangeQuery("status",*flow_dumplicate_status_from,True,True),
  379. ])
  380. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  381. SearchQuery(query,None,True),
  382. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  383. if total_count>=1000:
  384. _msg = "数据流报警:待去重公告数为:%d"%(total_count)
  385. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  386. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  387. def monitor_merge(self):
  388. current_date = getCurrent_date("%Y-%m-%d")
  389. last_date = timeAdd(current_date,-1,"%Y-%m-%d")
  390. check_count = 20000
  391. query = BoolQuery(must_queries=[
  392. RangeQuery("status",201,301)
  393. ])
  394. list_doc = []
  395. queue_docid = Queue()
  396. rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document","document_index",
  397. SearchQuery(query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),get_total_count=True),
  398. columns_to_get=ColumnsToGet(["docchannel"],return_type=ColumnReturnType.SPECIFIED))
  399. list_data = getRow_ots(rows)
  400. list_doc.extend(list_data)
  401. while next_token and len(list_doc)<check_count:
  402. rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document","document_index",
  403. SearchQuery(query,next_token=next_token,limit=100,get_total_count=True),
  404. columns_to_get=ColumnsToGet(["docchannel"],return_type=ColumnReturnType.SPECIFIED))
  405. list_data = getRow_ots(rows)
  406. list_doc.extend(list_data)
  407. for _doc in list_doc:
  408. queue_docid.put(_doc)
  409. def _handle(item,result_queue):
  410. docid = item.get("docid")
  411. _query = BoolQuery(must_queries=[TermQuery("docids",str(docid))])
  412. rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("project2","project2_index",
  413. SearchQuery(_query,None,limit=100,get_total_count=True),
  414. columns_to_get=ColumnsToGet(["zhao_biao_page_time","zhong_biao_page_time","docid_number"],return_type=ColumnReturnType.SPECIFIED))
  415. list_data = getRow_ots(rows)
  416. item["projects"] = list_data
  417. mt = MultiThreadHandler(queue_docid,_handle,None,30)
  418. mt.run()
  419. not_find_count = 0
  420. zhongbiao_find_zhaobiao = 0
  421. zhongbiao_count = 0
  422. multi_count = 0
  423. list_notfind = []
  424. list_multi = []
  425. for _doc in list_doc:
  426. print(_doc)
  427. if len(_doc.get("projects",[]))==0:
  428. not_find_count += 1
  429. list_notfind.append(str(_doc.get("docid")))
  430. if len(_doc.get("projects",[]))>=2:
  431. multi_count += 1
  432. list_multi.append(str(_doc.get("docid")))
  433. if _doc.get("docchannel") in (101,118,119,120):
  434. zhongbiao_count += 1
  435. if len(_doc.get("projects",[]))==1:
  436. _project = _doc.get("projects")[0]
  437. if _project.get("zhao_biao_page_time","")!="":
  438. zhongbiao_find_zhaobiao += 1
  439. if not_find_count>0 or multi_count>0 or zhongbiao_find_zhaobiao/zhongbiao_count<0.8:
  440. if not_find_count>0 or multi_count>0:
  441. current_time = getCurrent_date(format="%Y-%m-%d_%H%M%S")
  442. logname = os.path.join(self.current_path,"log","%s.log"%current_time)
  443. with open(logname,"w",encoding="utf8") as f:
  444. f.write(",".join(list_notfind))
  445. f.write("\n")
  446. f.write(",".join(list_multi))
  447. _msg = "公告合并报警:近%d条成品公告,有%d条未生成项目,有%d条公告找到多个项目,详见%s;其中有%d条中标公告且找到招标公告数为%d,比率为%.3f"%(check_count,not_find_count,multi_count,logname,zhongbiao_count,zhongbiao_find_zhaobiao,zhongbiao_find_zhaobiao/zhongbiao_count)
  448. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  449. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  450. else:
  451. _msg = "公告合并报警:近%d条成品公告其中有%d条中标公告且找到招标公告数为%d,比率为%.3f"%(check_count,zhongbiao_count,zhongbiao_find_zhaobiao,zhongbiao_find_zhaobiao/zhongbiao_count)
  452. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  453. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  454. query = BoolQuery(must_queries=[
  455. TermQuery("page_time",last_date),
  456. RangeQuery("status",71),
  457. ])
  458. rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
  459. SearchQuery(query,None,True),
  460. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  461. if total_count_lastday<10*10000:
  462. _msg = "公告成品入库报警,%s入库公告数:%d"%(last_date,total_count_lastday)
  463. sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
  464. # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
  465. def start_monitor(self):
  466. #数据监控
  467. scheduler = BlockingScheduler()
  468. # scheduler.add_job(self.monitor_attachment,"cron",minute="*/10")
  469. scheduler.add_job(self.monitor_extract,"cron",minute="*/10")
  470. scheduler.add_job(self.monitor_proposedBuilding,"cron",hour="*/3")
  471. scheduler.add_job(self.monitor_dumplicate,"cron",minute="*/10")
  472. scheduler.add_job(self.monitor_sychr,"cron",minute="*/10")
  473. scheduler.add_job(self.monitor_preproject,"cron",hour="8")
  474. scheduler.add_job(self.monitor_merge,"cron",hour="*/1")
  475. scheduler.add_job(self.monitor_init,"cron",hour="*/3")
  476. scheduler.start()
  477. def start_attach_monitor(self):
  478. #附件监控
  479. scheduler = BlockingScheduler()
  480. scheduler.add_job(self.monitor_attachment,"cron",minute="*/10")
  481. scheduler.start()
  482. if __name__ == '__main__':
  483. # dm = BaseDataMonitor()
  484. # # dm.start_monitor()
  485. # log_filename = "C:\\Users\\Administrator\\Desktop\\flow_init_2023-02-03.log"
  486. # dm.check_document_uuid(log_filename)
  487. sentMsgToDD("报警test_msg",ACCESS_TOKEN_DATAWORKS)
  488. # dm.monitor_proposedBuilding()
  489. # print(dm.get_last_tenmin_time(16))