2 Komitmen db9a5012a6 ... 4c68d3fa53

Pembuat SHA1 Pesan Tanggal
  fangjiasheng 4c68d3fa53 Merge remote-tracking branch 'origin/master' 1 tahun lalu
  fangjiasheng f74c457cee 新增附件流程监控 1 tahun lalu
1 mengubah file dengan 370 tambahan dan 240 penghapusan
  1. 370 240
      BaseDataMaintenance/dataMonitor/data_monitor.py

+ 370 - 240
BaseDataMaintenance/dataMonitor/data_monitor.py

@@ -1,6 +1,9 @@
-import os,sys
+import os, sys
+import subprocess
+from datetime import datetime, timedelta
+import psutil
 from apscheduler.schedulers.blocking import BlockingScheduler
-from BaseDataMaintenance.dataSource.source import getConnect_ots,getConnect_activateMQ
+from BaseDataMaintenance.dataSource.source import getConnect_ots, getConnect_activateMQ
 from BaseDataMaintenance.dataSource.interface import *
 from BaseDataMaintenance.common.Utils import *
 from tablestore import *
@@ -8,23 +11,19 @@ from BaseDataMaintenance.dataSource.setttings import *
 from queue import Queue
 from BaseDataMaintenance.common.multiThread import MultiThreadHandler
 
-
 from BaseDataMaintenance.maintenance.dataflow_settings import *
 
 import pandas as pd
 
-
 flow_attachment_log_path = "/data/python/flow_attachment.log"
 
 flow_extract_log_path = "/data/python/flow_extract.log"
 
 flow_init_path = "/data/python/flow_init.log"
 
-
 flow_init_log_dir = "/data/python/flow_init_log"
 flow_init_check_dir = "/data/python/flow_init_check"
 
-
 flow_dumplicate_log_path = "/home/appuser/python/flow_dumplicate.log"
 
 
@@ -32,99 +31,104 @@ class BaseDataMonitor():
 
     def __init__(self):
         self.ots_client = getConnect_ots()
-        self.recieviers = ["1175730271@qq.com","531870502@qq.com"]
+        self.recieviers = ["1175730271@qq.com", "531870502@qq.com"]
 
         self.list_proposed_count = []
         self.current_path = os.path.dirname(__file__)
 
-    def cmd_execute(self,_cmd):
+    def cmd_execute(self, _cmd):
         with os.popen(_cmd) as f:
             return f.read()
 
-    def get_last_tenmin_time(self,nums=15):
+    def get_last_tenmin_time(self, nums=15):
         current_time = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
 
-        last_ten_minite_time = timeAdd(current_time,0,"%Y-%m-%d %H:%M:%S",-10)
+        last_ten_minite_time = timeAdd(current_time, 0, "%Y-%m-%d %H:%M:%S", -10)
         return last_ten_minite_time[:nums]
 
-    def check_document_uuid(self,log_filename):
+    def check_document_uuid(self, log_filename):
 
-        def _handle(_item,result_queue):
-            bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
+        def _handle(_item, result_queue):
+            bool_query = BoolQuery(must_queries=[TermQuery("uuid", _item.get("uuid"))])
 
-            rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
-                                                                           SearchQuery(bool_query,get_total_count=True),
-                                                                           columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+            rows, next_token, total_count, is_all_succeed = ots_client.search("document_tmp", "document_tmp_index",
+                                                                              SearchQuery(bool_query,
+                                                                                          get_total_count=True),
+                                                                              columns_to_get=ColumnsToGet(
+                                                                                  return_type=ColumnReturnType.NONE))
 
             _item["exists"] = total_count
-        check_filename = "%s_check.xlsx"%(log_filename)
+
+        check_filename = "%s_check.xlsx" % (log_filename)
         list_uuid = []
         task_queue = Queue()
         dict_tolong = {}
         if not os.path.exists(check_filename) and os.path.exists(log_filename):
             _regrex = "delete\s+(?P<tablename>bxkc[^\s]+)\s+.*ID='(?P<uuid>.+)'"
             _regrex_tolong = "msg too long:(?P<uuid>[^,]+),\d+"
-            with open(log_filename,"r",encoding="utf8") as f:
+            with open(log_filename, "r", encoding="utf8") as f:
                 while 1:
                     _line = f.readline()
                     if not _line:
                         break
-                    _match = re.search(_regrex,_line)
+                    _match = re.search(_regrex, _line)
                     if _match is not None:
                         _uuid = _match.groupdict().get("uuid")
                         tablename = _match.groupdict().get("tablename")
                         if _uuid is not None:
-                            list_uuid.append({"uuid":_uuid,"tablename":tablename})
-                    _match = re.search(_regrex_tolong,_line)
+                            list_uuid.append({"uuid": _uuid, "tablename": tablename})
+                    _match = re.search(_regrex_tolong, _line)
                     if _match is not None:
                         _uuid = _match.groupdict().get("uuid")
                         dict_tolong[_uuid] = 1
 
-
-            if list_uuid==0:
+            if list_uuid == 0:
                 _msg = "数据遗漏检查出错"
-                sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
+                sentMsgToDD(_msg, ACCESS_TOKEN_DATAWORKS, atAll=True)
                 # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
             ots_client = getConnect_ots()
 
             for _d in list_uuid:
                 task_queue.put(_d)
-            mt = MultiThreadHandler(task_queue,_handle,None,30)
+            mt = MultiThreadHandler(task_queue, _handle, None, 30)
             mt.run()
-            df_data = {"uuid":[],
-                       "tablename":[],
-                       "exists":[],
-                       "tolong":[]}
+            df_data = {"uuid": [],
+                       "tablename": [],
+                       "exists": [],
+                       "tolong": []}
 
             for _data in list_uuid:
-                for k,v in df_data.items():
-                    if k!="tolong":
+                for k, v in df_data.items():
+                    if k != "tolong":
                         v.append(_data.get(k))
-                df_data["tolong"].append(dict_tolong.get(_data["uuid"],0))
+                df_data["tolong"].append(dict_tolong.get(_data["uuid"], 0))
             df2 = pd.DataFrame(df_data)
             df2.to_excel(check_filename)
 
     def monitor_init(self):
 
-        def _handle(_item,result_queue):
-            bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
+        def _handle(_item, result_queue):
+            bool_query = BoolQuery(must_queries=[TermQuery("uuid", _item.get("uuid"))])
 
-            rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
-                                                                           SearchQuery(bool_query,get_total_count=True),
-                                                                           columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+            rows, next_token, total_count, is_all_succeed = ots_client.search("document_tmp", "document_tmp_index",
+                                                                              SearchQuery(bool_query,
+                                                                                          get_total_count=True),
+                                                                              columns_to_get=ColumnsToGet(
+                                                                                  return_type=ColumnReturnType.NONE))
 
             _item["exists"] = total_count
+
         try:
             current_date = getCurrent_date("%Y-%m-%d")
 
-            last_date = timeAdd(current_date,-1,"%Y-%m-%d")
+            last_date = timeAdd(current_date, -1, "%Y-%m-%d")
 
             if not os.path.exists(flow_init_check_dir):
                 os.mkdir(flow_init_check_dir)
 
-            log_filename = os.path.join(flow_init_log_dir,"flow_init_%s.log"%(last_date))
-            check_filename = os.path.join(flow_init_check_dir,"flow_init_%s.xlsx"%(last_date))
+            log_filename = os.path.join(flow_init_log_dir, "flow_init_%s.log" % (last_date))
+            check_filename = os.path.join(flow_init_check_dir, "flow_init_%s.xlsx" % (last_date))
 
             list_uuid = []
             task_queue = Queue()
@@ -132,68 +136,66 @@ class BaseDataMonitor():
             if not os.path.exists(check_filename) and os.path.exists(log_filename):
                 _regrex = "delete\s+(?P<tablename>bxkc[^\s]+)\s+.*ID='(?P<uuid>.+)'"
                 _regrex_tolong = "msg too long:(?P<uuid>[^,]+),\d+"
-                with open(log_filename,"r",encoding="utf8") as f:
+                with open(log_filename, "r", encoding="utf8") as f:
                     while 1:
                         _line = f.readline()
                         if not _line:
                             break
-                        _match = re.search(_regrex,_line)
+                        _match = re.search(_regrex, _line)
                         if _match is not None:
                             _uuid = _match.groupdict().get("uuid")
                             tablename = _match.groupdict().get("tablename")
                             if _uuid is not None:
-                                list_uuid.append({"uuid":_uuid,"tablename":tablename})
-                        _match = re.search(_regrex_tolong,_line)
+                                list_uuid.append({"uuid": _uuid, "tablename": tablename})
+                        _match = re.search(_regrex_tolong, _line)
                         if _match is not None:
                             _uuid = _match.groupdict().get("uuid")
                             dict_tolong[_uuid] = 1
 
-
-                if list_uuid==0:
+                if list_uuid == 0:
                     _msg = "数据遗漏检查出错"
-                    sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
+                    sentMsgToDD(_msg, ACCESS_TOKEN_DATAWORKS, atAll=True)
                     # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
                 ots_client = getConnect_ots()
 
                 for _d in list_uuid:
                     task_queue.put(_d)
-                mt = MultiThreadHandler(task_queue,_handle,None,30)
+                mt = MultiThreadHandler(task_queue, _handle, None, 30)
                 mt.run()
-                df_data = {"uuid":[],
-                           "tablename":[],
-                           "exists":[],
-                           "tolong":[]}
+                df_data = {"uuid": [],
+                           "tablename": [],
+                           "exists": [],
+                           "tolong": []}
 
                 for _data in list_uuid:
-                    for k,v in df_data.items():
-                        if k!="tolong":
+                    for k, v in df_data.items():
+                        if k != "tolong":
                             v.append(_data.get(k))
-                    df_data["tolong"].append(dict_tolong.get(_data["uuid"],0))
+                    df_data["tolong"].append(dict_tolong.get(_data["uuid"], 0))
                 df2 = pd.DataFrame(df_data)
                 df2.to_excel(check_filename)
 
             counts = 0
             df_data = pd.read_excel(check_filename)
-            for _exists,_tolong in zip(df_data["exists"],df_data["tolong"]):
-                if _exists==0 and _tolong==0:
+            for _exists, _tolong in zip(df_data["exists"], df_data["tolong"]):
+                if _exists == 0 and _tolong == 0:
                     counts += 1
-            if counts>0:
-                _msg = "数据遗漏检查报警,%s有%s条公告遗漏,详见%s"%(last_date,str(counts),check_filename)
-                sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
+            if counts > 0:
+                _msg = "数据遗漏检查报警,%s有%s条公告遗漏,详见%s" % (last_date, str(counts), check_filename)
+                sentMsgToDD(_msg, ACCESS_TOKEN_DATAWORKS, atAll=True)
                 # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
 
 
         except Exception as e:
             _msg = "数据遗漏检查报错"
-            sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
+            sentMsgToDD(_msg, ACCESS_TOKEN_DATAWORKS, atAll=True)
             # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
             traceback.print_exc()
 
-
     def monitor_attachment(self):
-        from BaseDataMaintenance.java.MQInfo import getAllQueueSize,getQueueSize
+        from BaseDataMaintenance.java.MQInfo import getAllQueueSize, getQueueSize
         try:
             # query = BoolQuery(must_queries=[
             #     RangeQuery("status",0,11),
@@ -204,7 +206,7 @@ class BaseDataMonitor():
             #                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
             total_count_todeal = getQueueSize("dataflow_attachment")
 
-            if total_count_todeal>1000:
+            if total_count_todeal > 1000:
                 # query = BoolQuery(must_queries=[
                 #     RangeQuery("crtime",self.get_last_tenmin_time(16))
                 # ])
@@ -222,11 +224,7 @@ class BaseDataMonitor():
                 #                                                                            SearchQuery(query,None,True),
                 #                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
 
-
-
-
-
-                #通过命令行获取日志情况
+                # 通过命令行获取日志情况
                 # _cmd = 'cat %s | grep -c "%s.*process filemd5"'%(flow_attachment_log_path,self.get_last_tenmin_time())
                 # log(_cmd)
                 # process_count = self.cmd_execute(_cmd)
@@ -241,15 +239,16 @@ class BaseDataMonitor():
 
                 # _msg = "附件提取队列报警:队列堆积%s条公告,最近十分钟处理公告附件数:%s,处理成功数:%s"%(str(total_count_todeal),str(process_count),str(process_succeed_count))
 
-                #通过读取文件获取日志情况
+                # 通过读取文件获取日志情况
                 dict_type = {}
-                _pattern = "%s.*process filemd5\:[^\s]* (?P<result>(True|False)) of type\:(?P<type>[^\s]*).*download:(?P<downloadtime>\d+\.\d+)s recognize takes (?P<costtime>\d+)s upload takes (?P<uploadtime>\d+\.\d+)s"%(re.escape(self.get_last_tenmin_time()))
-                with open(flow_attachment_log_path,"r",encoding="utf8") as f:
+                _pattern = "%s.*process filemd5\:[^\s]* (?P<result>(True|False)) of type\:(?P<type>[^\s]*).*download:(?P<downloadtime>\d+\.\d+)s recognize takes (?P<costtime>\d+)s upload takes (?P<uploadtime>\d+\.\d+)s" % (
+                    re.escape(self.get_last_tenmin_time()))
+                with open(flow_attachment_log_path, "r", encoding="utf8") as f:
                     while True:
                         line = f.readline()
                         if not line:
                             break
-                        _match = re.search(_pattern,str(line))
+                        _match = re.search(_pattern, str(line))
                         if _match is not None:
                             _type = _match.groupdict().get("type")
                             _result = _match.groupdict().get("result")
@@ -258,39 +257,66 @@ class BaseDataMonitor():
 
                             _uploadtime = _match.groupdict().get("uploadtime")
                             if _type not in dict_type:
-                                dict_type[_type] = {"success":0,"fail":0,"success_costtime":0,"fail_costtime":0,"downtime":0,"downcount":0,"uploadtime":0,"uploadcount":0}
-                            if _result=="True":
+                                dict_type[_type] = {"success": 0, "fail": 0, "success_costtime": 0, "fail_costtime": 0,
+                                                    "downtime": 0, "downcount": 0, "uploadtime": 0, "uploadcount": 0}
+                            if _result == "True":
                                 dict_type[_type]["success"] += 1
                                 dict_type[_type]["success_costtime"] += int(_costtime)
 
                             else:
                                 dict_type[_type]["fail"] += 1
                                 dict_type[_type]["fail_costtime"] += int(_costtime)
-                            if float(_downtime)>0:
+                            if float(_downtime) > 0:
                                 dict_type[_type]["downcount"] += 1
                                 dict_type[_type]["downtime"] += float(_downtime)
-                            if float(_uploadtime)>0:
+                            if float(_uploadtime) > 0:
                                 dict_type[_type]["uploadcount"] += 1
                                 dict_type[_type]["uploadtime"] += float(_uploadtime)
 
                 process_count = 0
                 process_succeed_count = 0
                 _msg_type = ""
-                for k,v in dict_type.items():
-                    process_count += v.get("success",0)+v.get("fail",0)
-                    process_succeed_count += v.get("success",0)
-                    _msg_type += "\n类型%s\n\t成功%s,消耗%s秒,%.2f秒/个,\n\t失败%s,消耗%s秒,%.2f秒/个,\n\t下载%s,消耗%s秒,%.2f秒/个,\n\t上传%s,消耗%s秒,%.2f秒/个"%(k,str(v.get("success")),str(v.get("success_costtime")),v.get("success_costtime")/max(1,v.get("success")),str(v.get("fail")),str(v.get("fail_costtime")),v.get("fail_costtime")/max(1,v.get("fail")),str(v.get("downcount")),str(v.get("downtime")),v.get("downtime")/max(1,v.get("downcount")),str(v.get("uploadcount")),str(v.get("uploadtime")),v.get("uploadtime")/max(1,v.get("uploadcount")))
-
-                _msg = "附件提取队列报警:队列堆积%s条公告,最近十分钟处理公告附件数:%s,处理成功数:%s"%(str(total_count_todeal),str(process_count),str(process_succeed_count))
+                _msg_restart = ""
+                _msg_convert_free_time = ""
+                for k, v in dict_type.items():
+                    process_count += v.get("success", 0) + v.get("fail", 0)
+                    process_succeed_count += v.get("success", 0)
+                    _msg_type += "\n类型%s\n\t成功%s,消耗%s秒,%.2f秒/个,\n\t失败%s,消耗%s秒,%.2f秒/个,\n\t下载%s,消耗%s秒,%.2f秒/个,\n\t上传%s,消耗%s秒,%.2f秒/个" % (
+                    k, str(v.get("success")), str(v.get("success_costtime")),
+                    v.get("success_costtime") / max(1, v.get("success")), str(v.get("fail")),
+                    str(v.get("fail_costtime")), v.get("fail_costtime") / max(1, v.get("fail")),
+                    str(v.get("downcount")), str(v.get("downtime")), v.get("downtime") / max(1, v.get("downcount")),
+                    str(v.get("uploadcount")), str(v.get("uploadtime")),
+                    v.get("uploadtime") / max(1, v.get("uploadcount")))
+
+                # process_count = 0
+                # process_succeed_count = 0
+                # _msg_type = ""
+                if process_count == 0 and getQueueSize("dataflow_attachment") != 0:
+                    _msg_restart = '\n\n重启start_dataflow_attachment'
+                    pid_list = get_pid_list_by_command('start_dataflow_attachment')
+                    pid_list = [str(x) for x in pid_list]
+                    pid_str = ' '.join(pid_list)
+                    log('kill pid_str ' + pid_str)
+                    os.system('kill -9 ' + pid_str)
+
+                avg_free_time = monitor_convert_interface()
+                if avg_free_time is not None:
+                    _msg_convert_free_time = '\n\n附件主接口平均空闲时间(秒): ' + str(avg_free_time)
+
+                _msg = "附件提取队列报警:队列堆积%s条公告,最近十分钟处理公告附件数:%s,处理成功数:%s" % (
+                    str(total_count_todeal), str(process_count), str(process_succeed_count))
                 log(_msg)
                 log(_msg_type)
-                sentMsgToDD(_msg+_msg_type,ACCESS_TOKEN_DATAWORKS)
+                log(_msg_restart)
+                log(_msg_convert_free_time)
+                sentMsgToDD(_msg + _msg_type + _msg_restart + _msg_convert_free_time, ACCESS_TOKEN_DATAWORKS)
                 # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
         except Exception as e:
             traceback.print_exc()
 
     def monitor_extract(self):
-        from BaseDataMaintenance.java.MQInfo import getAllQueueSize,getQueueSize
+        from BaseDataMaintenance.java.MQInfo import getAllQueueSize, getQueueSize
         try:
             # query = BoolQuery(must_queries=[
             #     RangeQuery("status",11,61),
@@ -300,31 +326,32 @@ class BaseDataMonitor():
             #                                                                     SearchQuery(query,None,True),
             #                                                                     columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
             total_count_init = getQueueSize("dataflow_init")
-            if total_count_init>=100:
-                _msg = "同步队列报警,有%s条数据滞留"%(str(total_count_init))
-                sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
+            if total_count_init >= 100:
+                _msg = "同步队列报警,有%s条数据滞留" % (str(total_count_init))
+                sentMsgToDD(_msg, ACCESS_TOKEN_DATAWORKS, atAll=True)
                 # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
-
             total_count_todeal = getQueueSize("dataflow_extract")
 
-            if total_count_todeal>500:
-                _cmd = 'cat %s | grep "%s" | grep -c "process.*docid"'%(flow_extract_log_path,self.get_last_tenmin_time())
+            if total_count_todeal > 500:
+                _cmd = 'cat %s | grep "%s" | grep -c "process.*docid"' % (
+                flow_extract_log_path, self.get_last_tenmin_time())
                 log(_cmd)
                 process_count = self.cmd_execute(_cmd)
-                _cmd = 'cat %s | grep "%s" | grep -c "process.*docid.*1$"'%(flow_extract_log_path,self.get_last_tenmin_time())
+                _cmd = 'cat %s | grep "%s" | grep -c "process.*docid.*1$"' % (
+                flow_extract_log_path, self.get_last_tenmin_time())
                 log(_cmd)
                 success_count = self.cmd_execute(_cmd)
 
-                _cmd = 'cat %s | grep "%s" | grep -c "fingerprint.*exists docid"'%(flow_extract_log_path,self.get_last_tenmin_time())
+                _cmd = 'cat %s | grep "%s" | grep -c "fingerprint.*exists docid"' % (
+                flow_extract_log_path, self.get_last_tenmin_time())
                 log(_cmd)
                 exists_count = self.cmd_execute(_cmd)
 
-                _cmd = 'cat %s | grep -c "%s.*delete sql"'%(flow_init_path,self.get_last_tenmin_time())
+                _cmd = 'cat %s | grep -c "%s.*delete sql"' % (flow_init_path, self.get_last_tenmin_time())
                 log(_cmd)
                 init_count = self.cmd_execute(_cmd)
 
-
                 # query = BoolQuery(must_queries=[
                 #                                                                                RangeQuery("crtime",self.get_last_tenmin_time(16))
                 #                                                                            ])
@@ -342,13 +369,14 @@ class BaseDataMonitor():
                 #                                                                              SearchQuery(query,None,True),
                 #                                                                              columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
 
-                _msg = "要素提取队列报警:队列堆积%s条公告,最近十分钟入库数:%s,最近十分钟处理公告数:%s,其中成功处理数:%s,查库免提取数:%s"%(str(total_count_todeal),str(init_count),str(process_count),str(success_count),str(exists_count))
+                _msg = "要素提取队列报警:队列堆积%s条公告,最近十分钟入库数:%s,最近十分钟处理公告数:%s,其中成功处理数:%s,查库免提取数:%s" % (
+                str(total_count_todeal), str(init_count), str(process_count), str(success_count), str(exists_count))
                 log(_msg)
-                atAll=False
-                if success_count==0:
-                    atAll=True
+                atAll = False
+                if success_count == 0:
+                    atAll = True
                     _msg += "\n疑似流程出现问题,请相关负责人尽快处理"
-                sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=atAll)
+                sentMsgToDD(_msg, ACCESS_TOKEN_DATAWORKS, atAll=atAll)
                 # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
         except Exception as e:
             traceback.print_exc()
@@ -358,107 +386,117 @@ class BaseDataMonitor():
 
         current_hour = getCurrent_date("%H")
 
-
         query = BoolQuery(must_queries=[
-            RangeQuery("update_time",current_date),
-            WildcardQuery("docids","*")
+            RangeQuery("update_time", current_date),
+            WildcardQuery("docids", "*")
         ])
 
-        rows,next_token,total_count_doc,is_all_succeed = self.ots_client.search("designed_project","designed_project_index",
-                                                                                   SearchQuery(query,None,True),
-                                                                                   columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
-
+        rows, next_token, total_count_doc, is_all_succeed = self.ots_client.search("designed_project",
+                                                                                   "designed_project_index",
+                                                                                   SearchQuery(query, None, True),
+                                                                                   columns_to_get=ColumnsToGet(
+                                                                                       return_type=ColumnReturnType.NONE))
 
         query = BoolQuery(must_queries=[
-            RangeQuery("update_time",current_date),
-            WildcardQuery("spids","*")
+            RangeQuery("update_time", current_date),
+            WildcardQuery("spids", "*")
         ])
 
-        rows,next_token,total_count_sp,is_all_succeed = self.ots_client.search("designed_project","designed_project_index",
-                                                                            SearchQuery(query,None,True),
-                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+        rows, next_token, total_count_sp, is_all_succeed = self.ots_client.search("designed_project",
+                                                                                  "designed_project_index",
+                                                                                  SearchQuery(query, None, True),
+                                                                                  columns_to_get=ColumnsToGet(
+                                                                                      return_type=ColumnReturnType.NONE))
 
-        total_count = total_count_doc+total_count_sp
-        if not (current_hour in ("00","23","24")):
-            _msg = "拟在建生成报警:当天生成的拟在建数量为:%d,其中公告生成:%d,审批项目生成:%d"%(total_count,total_count_doc,total_count_sp)
-            atAll=False
-            if total_count==0:
-                atAll=True
+        total_count = total_count_doc + total_count_sp
+        if not (current_hour in ("00", "23", "24")):
+            _msg = "拟在建生成报警:当天生成的拟在建数量为:%d,其中公告生成:%d,审批项目生成:%d" % (total_count, total_count_doc, total_count_sp)
+            atAll = False
+            if total_count == 0:
+                atAll = True
                 _msg += "\n疑似流程出现问题,请相关负责人尽快处理"
-            sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=atAll)
+            sentMsgToDD(_msg, ACCESS_TOKEN_DATAWORKS, atAll=atAll)
             # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
-
     def monitor_sychr(self):
         current_date = getCurrent_date("%Y-%m-%d")
 
-        last_date = timeAdd(current_date,-1,"%Y-%m-%d")
+        last_date = timeAdd(current_date, -1, "%Y-%m-%d")
 
         query = BoolQuery(must_queries=[
-            RangeQuery("status",*flow_sychro_status_from,True,True),
+            RangeQuery("status", *flow_sychro_status_from, True, True),
         ])
 
-        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
-                                                                            SearchQuery(query,None,True),
-                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+        rows, next_token, total_count, is_all_succeed = self.ots_client.search("document_tmp", "document_tmp_index",
+                                                                               SearchQuery(query, None, True),
+                                                                               columns_to_get=ColumnsToGet(
+                                                                                   return_type=ColumnReturnType.NONE))
 
-        if total_count>=200:
-            _msg = "数据流报警:待同步到成品表公告数为:%d"%(total_count)
-            sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
+        if total_count >= 200:
+            _msg = "数据流报警:待同步到成品表公告数为:%d" % (total_count)
+            sentMsgToDD(_msg, ACCESS_TOKEN_DATAWORKS)
             # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
     def monitor_preproject(self):
         current_date = getCurrent_date("%Y-%m-%d")
 
-        last_date = timeAdd(current_date,-7,"%Y-%m-%d")
+        last_date = timeAdd(current_date, -7, "%Y-%m-%d")
 
         query = BoolQuery(must_queries=[
-            RangeQuery("crtime",last_date),
+            RangeQuery("crtime", last_date),
         ])
 
-        rows,next_token,total_count,is_all_succeed = self.ots_client.search("preproject","preproject_index",
-                                                                            SearchQuery(query,None,True),
-                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
-        log("preproject count:%d"%(total_count))
-        if total_count<=10*10000:
-            _msg = "周期项目生成报警:最近一周生成/更新的周期项目数量为:%d"%(total_count)
-            sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
+        rows, next_token, total_count, is_all_succeed = self.ots_client.search("preproject", "preproject_index",
+                                                                               SearchQuery(query, None, True),
+                                                                               columns_to_get=ColumnsToGet(
+                                                                                   return_type=ColumnReturnType.NONE))
+        log("preproject count:%d" % (total_count))
+        if total_count <= 10 * 10000:
+            _msg = "周期项目生成报警:最近一周生成/更新的周期项目数量为:%d" % (total_count)
+            sentMsgToDD(_msg, ACCESS_TOKEN_DATAWORKS)
             # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
     def monitor_dumplicate(self):
 
         current_date = getCurrent_date("%Y-%m-%d")
 
-        last_date = timeAdd(current_date,-1,"%Y-%m-%d")
+        last_date = timeAdd(current_date, -1, "%Y-%m-%d")
 
         query = BoolQuery(must_queries=[
-            TermQuery("page_time",last_date),
-            RangeQuery("status",71),
+            TermQuery("page_time", last_date),
+            RangeQuery("status", 71),
 
         ])
 
-        rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
-                                                                            SearchQuery(query,None,True),
-                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+        rows, next_token, total_count_lastday, is_all_succeed = self.ots_client.search("document_tmp",
+                                                                                       "document_tmp_index",
+                                                                                       SearchQuery(query, None, True),
+                                                                                       columns_to_get=ColumnsToGet(
+                                                                                           return_type=ColumnReturnType.NONE))
 
         query = BoolQuery(must_queries=[
-            RangeQuery("status",66,71),
+            RangeQuery("status", 66, 71),
 
         ])
 
-        rows,next_token,total_count_to_dump,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
-                                                                                    SearchQuery(query,None,True),
-                                                                                    columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+        rows, next_token, total_count_to_dump, is_all_succeed = self.ots_client.search("document_tmp",
+                                                                                       "document_tmp_index",
+                                                                                       SearchQuery(query, None, True),
+                                                                                       columns_to_get=ColumnsToGet(
+                                                                                           return_type=ColumnReturnType.NONE))
 
         query = BoolQuery(must_queries=[
-            TermQuery("page_time",last_date),
-            RangeQuery("status",71),
-            TermQuery("save",0)
+            TermQuery("page_time", last_date),
+            RangeQuery("status", 71),
+            TermQuery("save", 0)
         ])
 
-        rows,next_token,total_count_lastday_dump,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
-                                                                                    SearchQuery(query,None,True),
-                                                                                    columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+        rows, next_token, total_count_lastday_dump, is_all_succeed = self.ots_client.search("document_tmp",
+                                                                                            "document_tmp_index",
+                                                                                            SearchQuery(query, None,
+                                                                                                        True),
+                                                                                            columns_to_get=ColumnsToGet(
+                                                                                                return_type=ColumnReturnType.NONE))
         # if total_count_lastday_dump/total_count_lastday<0.2:
         #     _msg = "公告去重报警,%s入库公告数:%d,其中去重数:%d,去重率:%.2f"%(last_date,total_count_lastday,total_count_lastday_dump,total_count_lastday_dump/total_count_lastday)
         #     sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
@@ -469,97 +507,122 @@ class BaseDataMonitor():
         #     sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
         #     # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
-        #成品表监控
+        # 成品表监控
         query = BoolQuery(must_queries=[
-            TermQuery("page_time",last_date),
+            TermQuery("page_time", last_date),
         ])
 
-        rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document","document_index",
-                                                                                    SearchQuery(query,None,True),
-                                                                                    columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+        rows, next_token, total_count_lastday, is_all_succeed = self.ots_client.search("document", "document_index",
+                                                                                       SearchQuery(query, None, True),
+                                                                                       columns_to_get=ColumnsToGet(
+                                                                                           return_type=ColumnReturnType.NONE))
         query = BoolQuery(must_queries=[
-            TermQuery("page_time",last_date),
-            RangeQuery("status",401,451),
+            TermQuery("page_time", last_date),
+            RangeQuery("status", 401, 451),
         ])
 
-        rows,next_token,total_count_lastday_dump,is_all_succeed = self.ots_client.search("document","document_index",
-                                                                                         SearchQuery(query,None,True),
-                                                                                         columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
-        if total_count_lastday_dump/total_count_lastday<0.2:
-            _msg = "公告成品去重报警,%s入库公告数:%d,其中去重数:%d,去重率:%.2f"%(last_date,total_count_lastday,total_count_lastday_dump,total_count_lastday_dump/total_count_lastday)
-            sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
+        rows, next_token, total_count_lastday_dump, is_all_succeed = self.ots_client.search("document",
+                                                                                            "document_index",
+                                                                                            SearchQuery(query, None,
+                                                                                                        True),
+                                                                                            columns_to_get=ColumnsToGet(
+                                                                                                return_type=ColumnReturnType.NONE))
+        if total_count_lastday_dump / total_count_lastday < 0.2:
+            _msg = "公告成品去重报警,%s入库公告数:%d,其中去重数:%d,去重率:%.2f" % (
+            last_date, total_count_lastday, total_count_lastday_dump, total_count_lastday_dump / total_count_lastday)
+            sentMsgToDD(_msg, ACCESS_TOKEN_DATAWORKS)
             # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
         query = BoolQuery(must_queries=[
-            RangeQuery("status",*flow_dumplicate_status_from,True,True),
+            RangeQuery("status", *flow_dumplicate_status_from, True, True),
         ])
 
-        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
-                                                                            SearchQuery(query,None,True),
-                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
-
+        rows, next_token, total_count, is_all_succeed = self.ots_client.search("document_tmp", "document_tmp_index",
+                                                                               SearchQuery(query, None, True),
+                                                                               columns_to_get=ColumnsToGet(
+                                                                                   return_type=ColumnReturnType.NONE))
 
-        if total_count>=1000:
-            _cmd = 'cat %s | grep -c "%s.*merge_project whole_time"'%(flow_dumplicate_log_path,self.get_last_tenmin_time())
+        if total_count >= 1000:
+            _cmd = 'cat %s | grep -c "%s.*merge_project whole_time"' % (
+            flow_dumplicate_log_path, self.get_last_tenmin_time())
             process_count = self.cmd_execute(_cmd)
             atAll = False
-            if process_count=="":
+            if process_count == "":
                 process_count = 0
 
             query = BoolQuery(must_queries=[
-                RangeQuery("status",flow_dumplicate_status_from[1]),
-                RangeQuery("opertime",self.get_last_tenmin_time())
+                RangeQuery("status", flow_dumplicate_status_from[1]),
+                RangeQuery("opertime", self.get_last_tenmin_time())
             ])
 
-            rows,next_token,total_count_oper,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
-                                                                                SearchQuery(query,None,True),
-                                                                                columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+            rows, next_token, total_count_oper, is_all_succeed = self.ots_client.search("document_tmp",
+                                                                                        "document_tmp_index",
+                                                                                        SearchQuery(query, None, True),
+                                                                                        columns_to_get=ColumnsToGet(
+                                                                                            return_type=ColumnReturnType.NONE))
 
-            if int(process_count)==0:
-                if total_count_oper==0:
+            if int(process_count) == 0:
+                if total_count_oper == 0:
                     atAll = True
-                _cmd = "echo `tail %s -c 10000k` > %s"%(flow_dumplicate_log_path,flow_dumplicate_log_path)
+                _cmd = "echo `tail %s -c 10000k` > %s" % (flow_dumplicate_log_path, flow_dumplicate_log_path)
                 self.cmd_execute(_cmd)
             # if int(process_count)>0 and int(process_count)<100:
             #     self.cmd_execute("ps -ef | grep dumplicate | grep -v grep|cut -c 9-15|xargs kill -9")
-            _msg = "数据流报警:待去重公告数为:%d,最近十分钟日志去重数为:%s,ots去重数为:%s"%(total_count,str(process_count),str(total_count_oper))
-            sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=atAll)
+            _msg = "数据流报警:待去重公告数为:%d,最近十分钟日志去重数为:%s,ots去重数为:%s" % (
+            total_count, str(process_count), str(total_count_oper))
+            sentMsgToDD(_msg, ACCESS_TOKEN_DATAWORKS, atAll=atAll)
             # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
-
     def monitor_merge(self):
         current_date = getCurrent_date("%Y-%m-%d")
-        last_date = timeAdd(current_date,-1,"%Y-%m-%d")
+        last_date = timeAdd(current_date, -1, "%Y-%m-%d")
         check_count = 20000
         query = BoolQuery(must_queries=[
-            RangeQuery("status",201,301),
-            RangeQuery("docchannel",0,300)
+            RangeQuery("status", 201, 301),
+            RangeQuery("docchannel", 0, 300)
         ])
         list_doc = []
         queue_docid = Queue()
-        rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document","document_index",
-                                                                                    SearchQuery(query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),get_total_count=True),
-                                                                                    columns_to_get=ColumnsToGet(["docchannel"],return_type=ColumnReturnType.SPECIFIED))
+        rows, next_token, total_count_lastday, is_all_succeed = self.ots_client.search("document", "document_index",
+                                                                                       SearchQuery(query, sort=Sort(
+                                                                                           sorters=[FieldSort("docid",
+                                                                                                              SortOrder.DESC)]),
+                                                                                                   get_total_count=True),
+                                                                                       columns_to_get=ColumnsToGet(
+                                                                                           ["docchannel"],
+                                                                                           return_type=ColumnReturnType.SPECIFIED))
         list_data = getRow_ots(rows)
         list_doc.extend(list_data)
-        while next_token and len(list_doc)<check_count:
-            rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document","document_index",
-                                                                                        SearchQuery(query,next_token=next_token,limit=100,get_total_count=True),
-                                                                                        columns_to_get=ColumnsToGet(["docchannel"],return_type=ColumnReturnType.SPECIFIED))
+        while next_token and len(list_doc) < check_count:
+            rows, next_token, total_count_lastday, is_all_succeed = self.ots_client.search("document", "document_index",
+                                                                                           SearchQuery(query,
+                                                                                                       next_token=next_token,
+                                                                                                       limit=100,
+                                                                                                       get_total_count=True),
+                                                                                           columns_to_get=ColumnsToGet(
+                                                                                               ["docchannel"],
+                                                                                               return_type=ColumnReturnType.SPECIFIED))
             list_data = getRow_ots(rows)
             list_doc.extend(list_data)
         for _doc in list_doc:
             queue_docid.put(_doc)
-        def _handle(item,result_queue):
+
+        def _handle(item, result_queue):
             docid = item.get("docid")
-            _query = BoolQuery(must_queries=[TermQuery("docids",str(docid))])
-            rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("project2","project2_index",
-                                                                                        SearchQuery(_query,None,limit=100,get_total_count=True),
-                                                                                        columns_to_get=ColumnsToGet(["zhao_biao_page_time","zhong_biao_page_time","docid_number"],return_type=ColumnReturnType.SPECIFIED))
+            _query = BoolQuery(must_queries=[TermQuery("docids", str(docid))])
+            rows, next_token, total_count_lastday, is_all_succeed = self.ots_client.search("project2", "project2_index",
+                                                                                           SearchQuery(_query, None,
+                                                                                                       limit=100,
+                                                                                                       get_total_count=True),
+                                                                                           columns_to_get=ColumnsToGet(
+                                                                                               ["zhao_biao_page_time",
+                                                                                                "zhong_biao_page_time",
+                                                                                                "docid_number"],
+                                                                                               return_type=ColumnReturnType.SPECIFIED))
             list_data = getRow_ots(rows)
             item["projects"] = list_data
 
-        mt = MultiThreadHandler(queue_docid,_handle,None,30)
+        mt = MultiThreadHandler(queue_docid, _handle, None, 30)
         mt.run()
 
         not_find_count = 0
@@ -569,87 +632,154 @@ class BaseDataMonitor():
         list_notfind = []
         list_multi = []
         for _doc in list_doc:
-            if len(_doc.get("projects",[]))==0:
+            if len(_doc.get("projects", [])) == 0:
                 not_find_count += 1
                 list_notfind.append(str(_doc.get("docid")))
-            if len(_doc.get("projects",[]))>=2:
+            if len(_doc.get("projects", [])) >= 2:
                 multi_count += 1
                 list_multi.append(str(_doc.get("docid")))
-            if _doc.get("docchannel") in (101,118,119,120,121,122):
+            if _doc.get("docchannel") in (101, 118, 119, 120, 121, 122):
                 zhongbiao_count += 1
-                if len(_doc.get("projects",[]))==1:
+                if len(_doc.get("projects", [])) == 1:
                     _project = _doc.get("projects")[0]
-                    if _project.get("zhao_biao_page_time","")!="":
+                    if _project.get("zhao_biao_page_time", "") != "":
                         zhongbiao_find_zhaobiao += 1
 
-        _ratio = zhongbiao_find_zhaobiao/zhongbiao_count if zhongbiao_count>0 else 0
+        _ratio = zhongbiao_find_zhaobiao / zhongbiao_count if zhongbiao_count > 0 else 0
 
-        if not_find_count>0 or multi_count>0 or _ratio<0.8:
-            if not_find_count>0 or multi_count>0:
+        if not_find_count > 0 or multi_count > 0 or _ratio < 0.8:
+            if not_find_count > 0 or multi_count > 0:
                 current_time = getCurrent_date(format="%Y-%m-%d_%H%M%S")
-                logname = os.path.join(self.current_path,"log","%s.log"%current_time)
-                with open(logname,"w",encoding="utf8") as f:
+                logname = os.path.join(self.current_path, "log", "%s.log" % current_time)
+                with open(logname, "w", encoding="utf8") as f:
                     f.write(",".join(list_notfind))
                     f.write("\n")
                     f.write(",".join(list_multi))
-                _msg = "公告合并报警:近%d条成品公告,有%d条未生成项目,有%d条公告找到多个项目,详见%s;其中有%d条中标公告且找到招标公告数为%d,比率为%.3f"%(check_count,not_find_count,multi_count,logname,zhongbiao_count,zhongbiao_find_zhaobiao,_ratio)
-                sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
+                _msg = "公告合并报警:近%d条成品公告,有%d条未生成项目,有%d条公告找到多个项目,详见%s;其中有%d条中标公告且找到招标公告数为%d,比率为%.3f" % (
+                check_count, not_find_count, multi_count, logname, zhongbiao_count, zhongbiao_find_zhaobiao, _ratio)
+                sentMsgToDD(_msg, ACCESS_TOKEN_DATAWORKS)
                 # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
             else:
-                _msg = "公告合并报警:近%d条成品公告其中有%d条中标公告且找到招标公告数为%d,比率为%.3f"%(check_count,zhongbiao_count,zhongbiao_find_zhaobiao,zhongbiao_find_zhaobiao/zhongbiao_count)
-                sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
+                _msg = "公告合并报警:近%d条成品公告其中有%d条中标公告且找到招标公告数为%d,比率为%.3f" % (
+                check_count, zhongbiao_count, zhongbiao_find_zhaobiao, zhongbiao_find_zhaobiao / zhongbiao_count)
+                sentMsgToDD(_msg, ACCESS_TOKEN_DATAWORKS)
                 # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
-
         query = BoolQuery(must_queries=[
-            TermQuery("page_time",last_date),
-            RangeQuery("status",71),
+            TermQuery("page_time", last_date),
+            RangeQuery("status", 71),
 
         ])
 
-        rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
-                                                                                    SearchQuery(query,None,True),
-                                                                                    columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
-        if total_count_lastday<10*10000:
-            _msg = "公告成品入库报警,%s入库公告数:%d"%(last_date,total_count_lastday)
-            sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
+        rows, next_token, total_count_lastday, is_all_succeed = self.ots_client.search("document_tmp",
+                                                                                       "document_tmp_index",
+                                                                                       SearchQuery(query, None, True),
+                                                                                       columns_to_get=ColumnsToGet(
+                                                                                           return_type=ColumnReturnType.NONE))
+        if total_count_lastday < 10 * 10000:
+            _msg = "公告成品入库报警,%s入库公告数:%d" % (last_date, total_count_lastday)
+            sentMsgToDD(_msg, ACCESS_TOKEN_DATAWORKS)
             # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
-
     def start_monitor(self):
-        #数据监控
+        # 数据监控
 
         scheduler = BlockingScheduler()
 
         # scheduler.add_job(self.monitor_attachment,"cron",minute="*/10")
-        scheduler.add_job(self.monitor_extract,"cron",minute="*/10")
-        scheduler.add_job(self.monitor_proposedBuilding,"cron",hour="*/11")
+        scheduler.add_job(self.monitor_extract, "cron", minute="*/10")
+        scheduler.add_job(self.monitor_proposedBuilding, "cron", hour="*/11")
         # scheduler.add_job(self.monitor_dumplicate,"cron",minute="*/10")
-        scheduler.add_job(self.monitor_sychr,"cron",minute="*/10")
-        scheduler.add_job(self.monitor_preproject,"cron",hour="8")
-        scheduler.add_job(self.monitor_merge,"cron",minute="*/60")
-        scheduler.add_job(self.monitor_init,"cron",hour="*/3")
+        scheduler.add_job(self.monitor_sychr, "cron", minute="*/10")
+        scheduler.add_job(self.monitor_preproject, "cron", hour="8")
+        scheduler.add_job(self.monitor_merge, "cron", minute="*/60")
+        scheduler.add_job(self.monitor_init, "cron", hour="*/3")
         scheduler.start()
 
-
     def start_attach_monitor(self):
-        #附件监控
+        # 附件监控
         scheduler = BlockingScheduler()
 
-        scheduler.add_job(self.monitor_attachment,"cron",minute="*/10")
+        scheduler.add_job(self.monitor_attachment, "cron", minute="*/10")
         scheduler.start()
 
 
-if __name__ == '__main__':
+def get_pid_list_by_command(command):
+    pid_list = psutil.pids()
+    new_pid_list = []
+    for pid in pid_list:
+        try:
+            process = psutil.Process(pid)
+        except:
+            continue
+        process_cmd = ''
+        for c in process.cmdline():
+            process_cmd += c + " "
+        if process_cmd.strip() == "":
+            continue
+
+        if re.search(str(command), process_cmd):
+            new_pid_list.append(pid)
+
+    new_pid_list.sort(key=lambda x: x)
+    return new_pid_list
+
+
+def monitor_convert_interface():
+    main_pid_list = get_pid_list_by_command('convert:app')
+
+    main_pid_list.sort(key=lambda x: x)
+    # print('main_pid_list', main_pid_list)
+
+    now = datetime.now()
+    last_10_min = now - timedelta(minutes=10)
+    now = now.strftime("%Y-%m-%d %H:%M:%S")
+    last_10_min = last_10_min.strftime("%Y-%m-%d %H:%M:%S")
+    now = now[:-4] + '0:00'
+    last_10_min = last_10_min[:-4] + '0:00'
+
+    command = "sed -n '/%s/,/%s/p' /convert.out" % (last_10_min, now)
+    # print('command', command)
+
+    result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=True)
+    text = result.stdout
+    if len(text) < 10:
+        return None
+    text = str(text).split('\n')
+
+    all_free_time = 0
+    for pid in main_pid_list:
+        # print('pid', pid)
+        time_len = len('2024-06-07 10:15:12')
+        time_finish = None
+        free_time = 0
+        for line in text:
+            line = str(line)
+            try:
+                if re.search(str(pid), line):
+                    if time_finish is not None and re.search('into convert', line):
+                        free_time += (datetime.strptime(line[:time_len], "%Y-%m-%d %H:%M:%S")-time_finish).seconds
+                        # print('time_finish', str(time_finish), 'time_start', line[:time_len])
+                        # print('add free time', free_time)
+                    if re.search('is_success', line):
+                        time_finish = datetime.strptime(line[:time_len], "%Y-%m-%d %H:%M:%S")
+                        # print('set time_finish', line[:time_len])
+            except:
+                continue
+
+        all_free_time += free_time
+        # print(pid, 'free time in 10 min:', free_time)
+    log('all_free_time ' + str(all_free_time))
+    log('len(main_pid_list) ' + str(len(main_pid_list)))
+    return round(all_free_time / len(main_pid_list) * 2, 2)
+
 
+if __name__ == '__main__':
     # dm = BaseDataMonitor()
     # # dm.start_monitor()
     # log_filename = "C:\\Users\\Administrator\\Desktop\\flow_init_2023-02-03.log"
     # dm.check_document_uuid(log_filename)
 
-    sentMsgToDD("报警test_msg",ACCESS_TOKEN_DATAWORKS)
+    sentMsgToDD("报警test_msg", ACCESS_TOKEN_DATAWORKS)
     # dm.monitor_proposedBuilding()
     # print(dm.get_last_tenmin_time(16))
-
-
-