123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642 |
- import os,sys
- from apscheduler.schedulers.blocking import BlockingScheduler
- from BaseDataMaintenance.dataSource.source import getConnect_ots,getConnect_activateMQ
- from BaseDataMaintenance.dataSource.interface import *
- from BaseDataMaintenance.common.Utils import *
- from tablestore import *
- from BaseDataMaintenance.dataSource.setttings import *
- from queue import Queue
- from BaseDataMaintenance.common.multiThread import MultiThreadHandler
- from BaseDataMaintenance.maintenance.dataflow_settings import *
- import pandas as pd
- flow_attachment_log_path = "/data/python/flow_attachment.log"
- flow_extract_log_path = "/data/python/flow_extract.log"
- flow_init_path = "/data/python/flow_init.log"
- flow_init_log_dir = "/data/python/flow_init_log"
- flow_init_check_dir = "/data/python/flow_init_check"
- flow_dumplicate_log_path = "/python_log/flow_dumplicate.log"
- class BaseDataMonitor():
- def __init__(self):
- self.ots_client = getConnect_ots()
- self.recieviers = ["1175730271@qq.com","531870502@qq.com"]
- self.list_proposed_count = []
- self.current_path = os.path.dirname(__file__)
- def cmd_execute(self,_cmd):
- with os.popen(_cmd) as f:
- return f.read()
- def get_last_tenmin_time(self,nums=15):
- current_time = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
- last_ten_minite_time = timeAdd(current_time,0,"%Y-%m-%d %H:%M:%S",-10)
- return last_ten_minite_time[:nums]
- def check_document_uuid(self,log_filename):
- def _handle(_item,result_queue):
- bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
- rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,get_total_count=True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- _item["exists"] = total_count
- check_filename = "%s_check.xlsx"%(log_filename)
- list_uuid = []
- task_queue = Queue()
- dict_tolong = {}
- if not os.path.exists(check_filename) and os.path.exists(log_filename):
- _regrex = "delete\s+(?P<tablename>bxkc[^\s]+)\s+.*ID='(?P<uuid>.+)'"
- _regrex_tolong = "msg too long:(?P<uuid>[^,]+),\d+"
- with open(log_filename,"r",encoding="utf8") as f:
- while 1:
- _line = f.readline()
- if not _line:
- break
- _match = re.search(_regrex,_line)
- if _match is not None:
- _uuid = _match.groupdict().get("uuid")
- tablename = _match.groupdict().get("tablename")
- if _uuid is not None:
- list_uuid.append({"uuid":_uuid,"tablename":tablename})
- _match = re.search(_regrex_tolong,_line)
- if _match is not None:
- _uuid = _match.groupdict().get("uuid")
- dict_tolong[_uuid] = 1
- if list_uuid==0:
- _msg = "数据遗漏检查出错"
- sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
- # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
- ots_client = getConnect_ots()
- for _d in list_uuid:
- task_queue.put(_d)
- mt = MultiThreadHandler(task_queue,_handle,None,30)
- mt.run()
- df_data = {"uuid":[],
- "tablename":[],
- "exists":[],
- "tolong":[]}
- for _data in list_uuid:
- for k,v in df_data.items():
- if k!="tolong":
- v.append(_data.get(k))
- df_data["tolong"].append(dict_tolong.get(_data["uuid"],0))
- df2 = pd.DataFrame(df_data)
- df2.to_excel(check_filename)
- def monitor_init(self):
- def _handle(_item,result_queue):
- bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
- rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(bool_query,get_total_count=True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- _item["exists"] = total_count
- try:
- current_date = getCurrent_date("%Y-%m-%d")
- last_date = timeAdd(current_date,-1,"%Y-%m-%d")
- if not os.path.exists(flow_init_check_dir):
- os.mkdir(flow_init_check_dir)
- log_filename = os.path.join(flow_init_log_dir,"flow_init_%s.log"%(last_date))
- check_filename = os.path.join(flow_init_check_dir,"flow_init_%s.xlsx"%(last_date))
- list_uuid = []
- task_queue = Queue()
- dict_tolong = {}
- if not os.path.exists(check_filename) and os.path.exists(log_filename):
- _regrex = "delete\s+(?P<tablename>bxkc[^\s]+)\s+.*ID='(?P<uuid>.+)'"
- _regrex_tolong = "msg too long:(?P<uuid>[^,]+),\d+"
- with open(log_filename,"r",encoding="utf8") as f:
- while 1:
- _line = f.readline()
- if not _line:
- break
- _match = re.search(_regrex,_line)
- if _match is not None:
- _uuid = _match.groupdict().get("uuid")
- tablename = _match.groupdict().get("tablename")
- if _uuid is not None:
- list_uuid.append({"uuid":_uuid,"tablename":tablename})
- _match = re.search(_regrex_tolong,_line)
- if _match is not None:
- _uuid = _match.groupdict().get("uuid")
- dict_tolong[_uuid] = 1
- if list_uuid==0:
- _msg = "数据遗漏检查出错"
- sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
- # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
- ots_client = getConnect_ots()
- for _d in list_uuid:
- task_queue.put(_d)
- mt = MultiThreadHandler(task_queue,_handle,None,30)
- mt.run()
- df_data = {"uuid":[],
- "tablename":[],
- "exists":[],
- "tolong":[]}
- for _data in list_uuid:
- for k,v in df_data.items():
- if k!="tolong":
- v.append(_data.get(k))
- df_data["tolong"].append(dict_tolong.get(_data["uuid"],0))
- df2 = pd.DataFrame(df_data)
- df2.to_excel(check_filename)
- counts = 0
- df_data = pd.read_excel(check_filename)
- for _exists,_tolong in zip(df_data["exists"],df_data["tolong"]):
- if _exists==0 and _tolong==0:
- counts += 1
- if counts>0:
- _msg = "数据遗漏检查报警,%s有%s条公告遗漏,详见%s"%(last_date,str(counts),check_filename)
- sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
- # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
- except Exception as e:
- _msg = "数据遗漏检查报错"
- sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
- # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
- traceback.print_exc()
- def monitor_attachment(self):
- from BaseDataMaintenance.java.MQInfo import getAllQueueSize,getQueueSize
- try:
- # query = BoolQuery(must_queries=[
- # RangeQuery("status",0,11),
- # ])
- #
- # rows,next_token,total_count_todeal,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- # SearchQuery(query,None,True),
- # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- total_count_todeal = getQueueSize("dataflow_attachment")
- if total_count_todeal>100:
- # query = BoolQuery(must_queries=[
- # RangeQuery("crtime",self.get_last_tenmin_time(16))
- # ])
- #
- # rows,next_token,total_count_todeal_t_a,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- # SearchQuery(query,None,True),
- # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- #
- # query = BoolQuery(must_queries=[
- # RangeQuery("status",0,11),
- # RangeQuery("crtime",self.get_last_tenmin_time(16))
- # ])
- #
- # rows,next_token,total_count_todeal_t,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- # SearchQuery(query,None,True),
- # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- #通过命令行获取日志情况
- # _cmd = 'cat %s | grep -c "%s.*process filemd5"'%(flow_attachment_log_path,self.get_last_tenmin_time())
- # log(_cmd)
- # process_count = self.cmd_execute(_cmd)
- #
- # _cmd = 'cat %s | grep -c "%s.*process filemd5.*True"'%(flow_attachment_log_path,self.get_last_tenmin_time())
- # log(_cmd)
- # process_succeed_count = self.cmd_execute(_cmd)
- #
- # _cmd = 'cat %s | grep -c "%s.*delete sql"'%(flow_init_path,self.get_last_tenmin_time())
- # log(_cmd)
- # init_count = self.cmd_execute(_cmd)
- # _msg = "附件提取队列报警:队列堆积%s条公告,最近十分钟处理公告附件数:%s,处理成功数:%s"%(str(total_count_todeal),str(process_count),str(process_succeed_count))
- #通过读取文件获取日志情况
- dict_type = {}
- _pattern = "%s.*process filemd5\:[^\s]* (?P<result>(True|False)) of type\:(?P<type>[^\s]*).*recognize takes (?P<costtime>\d+)s"%(re.escape(self.get_last_tenmin_time()))
- with open(flow_attachment_log_path,"r",encoding="utf8") as f:
- while True:
- line = f.readline()
- if not line:
- break
- _match = re.search(_pattern,str(line))
- if _match is not None:
- _type = _match.groupdict().get("type")
- _result = _match.groupdict().get("result")
- _costtime = _match.groupdict().get("costtime")
- if _type not in dict_type:
- dict_type[_type] = {"success":0,"fail":0,"success_costtime":0,"fail_costtime":0}
- if _result=="True":
- dict_type[_type]["success"] += 1
- dict_type[_type]["success_costtime"] += int(_costtime)
- else:
- dict_type[_type]["fail"] += 1
- dict_type[_type]["fail_costtime"] += int(_costtime)
- process_count = 0
- process_succeed_count = 0
- _msg_type = ""
- for k,v in dict_type.items():
- process_count += v.get("success",0)+v.get("fail",0)
- process_succeed_count += v.get("success",0)
- _msg_type += "\n类型%s\n\t成功%s,消耗%s秒,%.2f秒/个,\n\t失败%s,消耗%s秒,%.2f秒/个"%(k,str(v.get("success")),str(v.get("success_costtime")),v.get("success_costtime")/max(1,v.get("success")),str(v.get("fail")),str(v.get("fail_costtime")),v.get("fail_costtime")/max(1,v.get("fail")))
- _msg = "附件提取队列报警:队列堆积%s条公告,最近十分钟处理公告附件数:%s,处理成功数:%s"%(str(total_count_todeal),str(process_count),str(process_succeed_count))
- sentMsgToDD(_msg+_msg_type,ACCESS_TOKEN_DATAWORKS)
- # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
- except Exception as e:
- traceback.print_exc()
- def monitor_extract(self):
- from BaseDataMaintenance.java.MQInfo import getAllQueueSize,getQueueSize
- try:
- # query = BoolQuery(must_queries=[
- # RangeQuery("status",11,61),
- # ])
- #
- # rows,next_token,total_count_todeal,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- # SearchQuery(query,None,True),
- # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- total_count_init = getQueueSize("dataflow_init")
- if total_count_init>=100:
- _msg = "同步队列报警,有%s条数据滞留"%(str(total_count_init))
- sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
- # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
- total_count_todeal = getQueueSize("dataflow_extract")
- if total_count_todeal>100:
- _cmd = 'cat %s | grep "%s" | grep -c "process.*docid"'%(flow_extract_log_path,self.get_last_tenmin_time())
- log(_cmd)
- process_count = self.cmd_execute(_cmd)
- _cmd = 'cat %s | grep "%s" | grep -c "process.*docid.*1$"'%(flow_extract_log_path,self.get_last_tenmin_time())
- log(_cmd)
- success_count = self.cmd_execute(_cmd)
- _cmd = 'cat %s | grep "%s" | grep -c "fingerprint.*exists docid"'%(flow_extract_log_path,self.get_last_tenmin_time())
- log(_cmd)
- exists_count = self.cmd_execute(_cmd)
- _cmd = 'cat %s | grep -c "%s.*delete sql"'%(flow_init_path,self.get_last_tenmin_time())
- log(_cmd)
- init_count = self.cmd_execute(_cmd)
- # query = BoolQuery(must_queries=[
- # RangeQuery("crtime",self.get_last_tenmin_time(16))
- # ])
- #
- # rows,next_token,total_count_todeal_t_a,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- # SearchQuery(query,None,True),
- # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- #
- # query = BoolQuery(must_queries=[
- # RangeQuery("status",11,61),
- # RangeQuery("crtime",self.get_last_tenmin_time(16))
- # ])
- #
- # rows,next_token,total_count_todeal_t,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- # SearchQuery(query,None,True),
- # columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- _msg = "要素提取队列报警:队列堆积%s条公告,最近十分钟入库数:%s,最近十分钟处理公告数:%s,其中成功处理数:%s,查库免提取数:%s"%(str(total_count_todeal),str(init_count),str(process_count),str(success_count),str(exists_count))
- atAll=False
- if success_count==0:
- atAll=True
- _msg += "\n疑似流程出现问题,请相关负责人尽快处理"
- sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=atAll)
- # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
- except Exception as e:
- traceback.print_exc()
- def monitor_proposedBuilding(self):
- current_date = getCurrent_date("%Y-%m-%d")
- current_hour = getCurrent_date("%H")
- query = BoolQuery(must_queries=[
- RangeQuery("update_time",current_date),
- WildcardQuery("docids","*")
- ])
- rows,next_token,total_count_doc,is_all_succeed = self.ots_client.search("designed_project","designed_project_index",
- SearchQuery(query,None,True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- query = BoolQuery(must_queries=[
- RangeQuery("update_time",current_date),
- WildcardQuery("spids","*")
- ])
- rows,next_token,total_count_sp,is_all_succeed = self.ots_client.search("designed_project","designed_project_index",
- SearchQuery(query,None,True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- total_count = total_count_doc+total_count_sp
- if not (current_hour in ("00","23","24")):
- _msg = "拟在建生成报警:当天生成的拟在建数量为:%d,其中公告生成:%d,审批项目生成:%d"%(total_count,total_count_doc,total_count_sp)
- atAll=False
- if total_count==0:
- atAll=True
- _msg += "\n疑似流程出现问题,请相关负责人尽快处理"
- sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=atAll)
- # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
- def monitor_sychr(self):
- current_date = getCurrent_date("%Y-%m-%d")
- last_date = timeAdd(current_date,-1,"%Y-%m-%d")
- query = BoolQuery(must_queries=[
- RangeQuery("status",*flow_sychro_status_from,True,True),
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(query,None,True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- if total_count>=200:
- _msg = "数据流报警:待同步到成品表公告数为:%d"%(total_count)
- sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
- # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
- def monitor_preproject(self):
- current_date = getCurrent_date("%Y-%m-%d")
- last_date = timeAdd(current_date,-7,"%Y-%m-%d")
- query = BoolQuery(must_queries=[
- RangeQuery("crtime",last_date),
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("preproject","preproject_index",
- SearchQuery(query,None,True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- log("preproject count:%d"%(total_count))
- if total_count<=10*10000:
- _msg = "周期项目生成报警:最近一周生成/更新的周期项目数量为:%d"%(total_count)
- sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
- # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
- def monitor_dumplicate(self):
- current_date = getCurrent_date("%Y-%m-%d")
- last_date = timeAdd(current_date,-1,"%Y-%m-%d")
- query = BoolQuery(must_queries=[
- TermQuery("page_time",last_date),
- RangeQuery("status",71),
- ])
- rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(query,None,True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- query = BoolQuery(must_queries=[
- RangeQuery("status",66,71),
- ])
- rows,next_token,total_count_to_dump,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(query,None,True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- query = BoolQuery(must_queries=[
- TermQuery("page_time",last_date),
- RangeQuery("status",71),
- TermQuery("save",0)
- ])
- rows,next_token,total_count_lastday_dump,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(query,None,True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- # if total_count_lastday_dump/total_count_lastday<0.2:
- # _msg = "公告去重报警,%s入库公告数:%d,其中去重数:%d,去重率:%.2f"%(last_date,total_count_lastday,total_count_lastday_dump,total_count_lastday_dump/total_count_lastday)
- # sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
- # # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
- # if total_count_to_dump>2000:
- # _msg = "公告去重报警,待去重数量:%s"%(str(total_count_to_dump))
- # sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
- # # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
- #成品表监控
- query = BoolQuery(must_queries=[
- TermQuery("page_time",last_date),
- ])
- rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document","document_index",
- SearchQuery(query,None,True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- query = BoolQuery(must_queries=[
- TermQuery("page_time",last_date),
- RangeQuery("status",401,451),
- ])
- rows,next_token,total_count_lastday_dump,is_all_succeed = self.ots_client.search("document","document_index",
- SearchQuery(query,None,True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- if total_count_lastday_dump/total_count_lastday<0.2:
- _msg = "公告成品去重报警,%s入库公告数:%d,其中去重数:%d,去重率:%.2f"%(last_date,total_count_lastday,total_count_lastday_dump,total_count_lastday_dump/total_count_lastday)
- sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
- # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
- query = BoolQuery(must_queries=[
- RangeQuery("status",*flow_dumplicate_status_from,True,True),
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(query,None,True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- if total_count>=1000:
- _cmd = 'cat %s | grep -c "%s.*merge_project whole_time"'%(flow_dumplicate_log_path,self.get_last_tenmin_time())
- process_count = self.cmd_execute(_cmd)
- atAll = False
- if process_count=="":
- process_count = 0
- query = BoolQuery(must_queries=[
- RangeQuery("status",flow_dumplicate_status_from[1]),
- RangeQuery("opertime",self.get_last_tenmin_time())
- ])
- rows,next_token,total_count_oper,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(query,None,True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- if int(process_count)==0:
- if total_count_oper==0:
- atAll = True
- _cmd = "echo `tail %s -c 10000k` > %s"%(flow_dumplicate_log_path,flow_dumplicate_log_path)
- self.cmd_execute(_cmd)
- # if int(process_count)>0 and int(process_count)<100:
- # self.cmd_execute("ps -ef | grep dumplicate | grep -v grep|cut -c 9-15|xargs kill -9")
- _msg = "数据流报警:待去重公告数为:%d,最近十分钟日志去重数为:%s,ots去重数为:%s"%(total_count,str(process_count),str(total_count_oper))
- sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=atAll)
- # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
- def monitor_merge(self):
- current_date = getCurrent_date("%Y-%m-%d")
- last_date = timeAdd(current_date,-1,"%Y-%m-%d")
- check_count = 20000
- query = BoolQuery(must_queries=[
- RangeQuery("status",201,301),
- RangeQuery("docchannel",0,300)
- ])
- list_doc = []
- queue_docid = Queue()
- rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document","document_index",
- SearchQuery(query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),get_total_count=True),
- columns_to_get=ColumnsToGet(["docchannel"],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- list_doc.extend(list_data)
- while next_token and len(list_doc)<check_count:
- rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document","document_index",
- SearchQuery(query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(["docchannel"],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- list_doc.extend(list_data)
- for _doc in list_doc:
- queue_docid.put(_doc)
- def _handle(item,result_queue):
- docid = item.get("docid")
- _query = BoolQuery(must_queries=[TermQuery("docids",str(docid))])
- rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("project2","project2_index",
- SearchQuery(_query,None,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(["zhao_biao_page_time","zhong_biao_page_time","docid_number"],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- item["projects"] = list_data
- mt = MultiThreadHandler(queue_docid,_handle,None,30)
- mt.run()
- not_find_count = 0
- zhongbiao_find_zhaobiao = 0
- zhongbiao_count = 0
- multi_count = 0
- list_notfind = []
- list_multi = []
- for _doc in list_doc:
- if len(_doc.get("projects",[]))==0:
- not_find_count += 1
- list_notfind.append(str(_doc.get("docid")))
- if len(_doc.get("projects",[]))>=2:
- multi_count += 1
- list_multi.append(str(_doc.get("docid")))
- if _doc.get("docchannel") in (101,118,119,120,121,122):
- zhongbiao_count += 1
- if len(_doc.get("projects",[]))==1:
- _project = _doc.get("projects")[0]
- if _project.get("zhao_biao_page_time","")!="":
- zhongbiao_find_zhaobiao += 1
- _ratio = zhongbiao_find_zhaobiao/zhongbiao_count if zhongbiao_count>0 else 0
- if not_find_count>0 or multi_count>0 or _ratio<0.8:
- if not_find_count>0 or multi_count>0:
- current_time = getCurrent_date(format="%Y-%m-%d_%H%M%S")
- logname = os.path.join(self.current_path,"log","%s.log"%current_time)
- with open(logname,"w",encoding="utf8") as f:
- f.write(",".join(list_notfind))
- f.write("\n")
- f.write(",".join(list_multi))
- _msg = "公告合并报警:近%d条成品公告,有%d条未生成项目,有%d条公告找到多个项目,详见%s;其中有%d条中标公告且找到招标公告数为%d,比率为%.3f"%(check_count,not_find_count,multi_count,logname,zhongbiao_count,zhongbiao_find_zhaobiao,_ratio)
- sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
- # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
- else:
- _msg = "公告合并报警:近%d条成品公告其中有%d条中标公告且找到招标公告数为%d,比率为%.3f"%(check_count,zhongbiao_count,zhongbiao_find_zhaobiao,zhongbiao_find_zhaobiao/zhongbiao_count)
- sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
- # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
- query = BoolQuery(must_queries=[
- TermQuery("page_time",last_date),
- RangeQuery("status",71),
- ])
- rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
- SearchQuery(query,None,True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- if total_count_lastday<10*10000:
- _msg = "公告成品入库报警,%s入库公告数:%d"%(last_date,total_count_lastday)
- sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
- # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
- def start_monitor(self):
- #数据监控
- scheduler = BlockingScheduler()
- # scheduler.add_job(self.monitor_attachment,"cron",minute="*/10")
- scheduler.add_job(self.monitor_extract,"cron",minute="*/10")
- scheduler.add_job(self.monitor_proposedBuilding,"cron",hour="*/3")
- # scheduler.add_job(self.monitor_dumplicate,"cron",minute="*/10")
- scheduler.add_job(self.monitor_sychr,"cron",minute="*/10")
- scheduler.add_job(self.monitor_preproject,"cron",hour="8")
- scheduler.add_job(self.monitor_merge,"cron",minute="*/30")
- scheduler.add_job(self.monitor_init,"cron",hour="*/3")
- scheduler.start()
- def start_attach_monitor(self):
- #附件监控
- scheduler = BlockingScheduler()
- scheduler.add_job(self.monitor_attachment,"cron",minute="*/10")
- scheduler.start()
- if __name__ == '__main__':
- # dm = BaseDataMonitor()
- # # dm.start_monitor()
- # log_filename = "C:\\Users\\Administrator\\Desktop\\flow_init_2023-02-03.log"
- # dm.check_document_uuid(log_filename)
- sentMsgToDD("报警test_msg",ACCESS_TOKEN_DATAWORKS)
- # dm.monitor_proposedBuilding()
- # print(dm.get_last_tenmin_time(16))
|