Przeglądaj źródła

Merge branch 'master' of http://192.168.2.103:3000/luojiehua/BaseDataMaintenance

 Conflicts:
	BaseDataMaintenance/maintenance/document/download_attachment_and_set_status_rerun.py
znj 1 rok temu
rodzic
commit
db9a5012a6

+ 1 - 1
BaseDataMaintenance/common/multiThread.py

@@ -46,7 +46,7 @@ class _taskHandler(threading.Thread):
     def run(self):
         while(True):
             try:
-                logging.info("handler task queue size is %d need_stop %s thread_id:%d-%d"%(self.task_queue.qsize(),str(self.need_stop),os.getpid(),threading.get_ident()))
+                logging.info("%s - handler task queue size is %d need_stop %s thread_id:%d-%d"%(self.task_handler.__name__, self.task_queue.qsize(),str(self.need_stop),os.getpid(),threading.get_ident()))
                 item = self.task_queue.get(True,timeout=5)
 
                 self.task_handler(item,self.result_queue,*self.args,**self.kwargs)

+ 14 - 9
BaseDataMaintenance/dataMonitor/data_monitor.py

@@ -1,13 +1,8 @@
-
-
 import os,sys
 from apscheduler.schedulers.blocking import BlockingScheduler
-
 from BaseDataMaintenance.dataSource.source import getConnect_ots,getConnect_activateMQ
-
 from BaseDataMaintenance.dataSource.interface import *
 from BaseDataMaintenance.common.Utils import *
-
 from tablestore import *
 from BaseDataMaintenance.dataSource.setttings import *
 from queue import Queue
@@ -209,7 +204,7 @@ class BaseDataMonitor():
             #                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
             total_count_todeal = getQueueSize("dataflow_attachment")
 
-            if total_count_todeal>500:
+            if total_count_todeal>1000:
                 # query = BoolQuery(must_queries=[
                 #     RangeQuery("crtime",self.get_last_tenmin_time(16))
                 # ])
@@ -248,7 +243,7 @@ class BaseDataMonitor():
 
                 #通过读取文件获取日志情况
                 dict_type = {}
-                _pattern = "%s.*process filemd5\:[^\s]* (?P<result>(True|False)) of type\:(?P<type>[^\s]*).*recognize takes (?P<costtime>\d+)s"%(re.escape(self.get_last_tenmin_time()))
+                _pattern = "%s.*process filemd5\:[^\s]* (?P<result>(True|False)) of type\:(?P<type>[^\s]*).*download:(?P<downloadtime>\d+\.\d+)s recognize takes (?P<costtime>\d+)s upload takes (?P<uploadtime>\d+\.\d+)s"%(re.escape(self.get_last_tenmin_time()))
                 with open(flow_attachment_log_path,"r",encoding="utf8") as f:
                     while True:
                         line = f.readline()
@@ -258,15 +253,25 @@ class BaseDataMonitor():
                         if _match is not None:
                             _type = _match.groupdict().get("type")
                             _result = _match.groupdict().get("result")
+                            _downtime = _match.groupdict().get("downloadtime")
                             _costtime = _match.groupdict().get("costtime")
+
+                            _uploadtime = _match.groupdict().get("uploadtime")
                             if _type not in dict_type:
-                                dict_type[_type] = {"success":0,"fail":0,"success_costtime":0,"fail_costtime":0}
+                                dict_type[_type] = {"success":0,"fail":0,"success_costtime":0,"fail_costtime":0,"downtime":0,"downcount":0,"uploadtime":0,"uploadcount":0}
                             if _result=="True":
                                 dict_type[_type]["success"] += 1
                                 dict_type[_type]["success_costtime"] += int(_costtime)
+
                             else:
                                 dict_type[_type]["fail"] += 1
                                 dict_type[_type]["fail_costtime"] += int(_costtime)
+                            if float(_downtime)>0:
+                                dict_type[_type]["downcount"] += 1
+                                dict_type[_type]["downtime"] += float(_downtime)
+                            if float(_uploadtime)>0:
+                                dict_type[_type]["uploadcount"] += 1
+                                dict_type[_type]["uploadtime"] += float(_uploadtime)
 
                 process_count = 0
                 process_succeed_count = 0
@@ -274,7 +279,7 @@ class BaseDataMonitor():
                 for k,v in dict_type.items():
                     process_count += v.get("success",0)+v.get("fail",0)
                     process_succeed_count += v.get("success",0)
-                    _msg_type += "\n类型%s\n\t成功%s,消耗%s秒,%.2f秒/个,\n\t失败%s,消耗%s秒,%.2f秒/个"%(k,str(v.get("success")),str(v.get("success_costtime")),v.get("success_costtime")/max(1,v.get("success")),str(v.get("fail")),str(v.get("fail_costtime")),v.get("fail_costtime")/max(1,v.get("fail")))
+                    _msg_type += "\n类型%s\n\t成功%s,消耗%s秒,%.2f秒/个,\n\t失败%s,消耗%s秒,%.2f秒/个,\n\t下载%s,消耗%s秒,%.2f秒/个,\n\t上传%s,消耗%s秒,%.2f秒/个"%(k,str(v.get("success")),str(v.get("success_costtime")),v.get("success_costtime")/max(1,v.get("success")),str(v.get("fail")),str(v.get("fail_costtime")),v.get("fail_costtime")/max(1,v.get("fail")),str(v.get("downcount")),str(v.get("downtime")),v.get("downtime")/max(1,v.get("downcount")),str(v.get("uploadcount")),str(v.get("uploadtime")),v.get("uploadtime")/max(1,v.get("uploadcount")))
 
                 _msg = "附件提取队列报警:队列堆积%s条公告,最近十分钟处理公告附件数:%s,处理成功数:%s"%(str(total_count_todeal),str(process_count),str(process_succeed_count))
                 log(_msg)

+ 718 - 0
BaseDataMaintenance/dataMonitor/data_monitor_fjs.py

@@ -0,0 +1,718 @@
+
+
+import os,sys
+
+sys.path.append(os.path.dirname(__file__)+"/../../")
+from apscheduler.schedulers.blocking import BlockingScheduler
+
+from BaseDataMaintenance.dataSource.source import getConnect_ots,getConnect_activateMQ
+
+from BaseDataMaintenance.dataSource.interface import *
+from BaseDataMaintenance.common.Utils import *
+
+from tablestore import *
+from BaseDataMaintenance.dataSource.setttings import *
+from queue import Queue
+from BaseDataMaintenance.common.multiThread import MultiThreadHandler
+
+
+from BaseDataMaintenance.maintenance.dataflow_settings import *
+
+import pandas as pd
+
+
+flow_attachment_log_path = "/data/python/flow_attachment.log"
+
+flow_extract_log_path = "/data/python/flow_extract.log"
+
+flow_init_path = "/data/python/flow_init.log"
+
+
+flow_init_log_dir = "/data/python"
+flow_init_check_dir = "/data/python/flow_init_check"
+
+
+flow_dumplicate_log_path = "/python_log/flow_dumplicate.log"
+
+
+class BaseDataMonitor():
+
+    def __init__(self):
+        self.ots_client = getConnect_ots()
+        self.recieviers = ["1175730271@qq.com","531870502@qq.com"]
+
+        self.list_proposed_count = []
+        self.current_path = os.path.dirname(__file__)
+
+    def cmd_execute(self,_cmd):
+        with os.popen(_cmd) as f:
+            return f.read()
+
+    def get_last_tenmin_time(self,nums=15):
+        current_time = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
+
+        last_ten_minite_time = timeAdd(current_time,0,"%Y-%m-%d %H:%M:%S",-10)
+        return last_ten_minite_time[:nums]
+
+    def check_document_uuid(self,log_filename):
+
+        def _handle(_item,result_queue):
+            bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
+
+            rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
+                                                                           SearchQuery(bool_query,get_total_count=True),
+                                                                           columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+
+            _item["exists"] = total_count
+        check_filename = "%s_check.xlsx"%(log_filename)
+        list_uuid = []
+        task_queue = Queue()
+        dict_tolong = {}
+        if not os.path.exists(check_filename) and os.path.exists(log_filename):
+            _regrex = "delete\s+(?P<tablename>bxkc[^\s]+)\s+.*ID='(?P<uuid>.+)'"
+            _regrex_tolong = "msg too long:(?P<uuid>[^,]+),\d+"
+            with open(log_filename,"r",encoding="utf8") as f:
+                while 1:
+                    _line = f.readline()
+                    if not _line:
+                        break
+                    _match = re.search(_regrex,_line)
+                    if _match is not None:
+                        _uuid = _match.groupdict().get("uuid")
+                        tablename = _match.groupdict().get("tablename")
+                        if _uuid is not None:
+                            list_uuid.append({"uuid":_uuid,"tablename":tablename})
+                    _match = re.search(_regrex_tolong,_line)
+                    if _match is not None:
+                        _uuid = _match.groupdict().get("uuid")
+                        dict_tolong[_uuid] = 1
+
+
+            if list_uuid==0:
+                _msg = "数据遗漏检查出错"
+                sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
+                # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+
+            ots_client = getConnect_ots()
+
+            for _d in list_uuid:
+                task_queue.put(_d)
+            mt = MultiThreadHandler(task_queue,_handle,None,30)
+            mt.run()
+            df_data = {"uuid":[],
+                       "tablename":[],
+                       "exists":[],
+                       "tolong":[]}
+
+            for _data in list_uuid:
+                for k,v in df_data.items():
+                    if k!="tolong":
+                        v.append(_data.get(k))
+                df_data["tolong"].append(dict_tolong.get(_data["uuid"],0))
+            df2 = pd.DataFrame(df_data)
+            df2.to_excel(check_filename)
+
+    def monitor_init(self):
+
+        def _handle(_item,result_queue):
+            bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
+
+            rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
+                                                                           SearchQuery(bool_query,get_total_count=True),
+                                                                           columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+
+            _item["exists"] = total_count
+        try:
+            current_date = getCurrent_date("%Y-%m-%d")
+
+            last_date = timeAdd(current_date,-1,"%Y-%m-%d")
+
+            if not os.path.exists(flow_init_check_dir):
+                os.mkdir(flow_init_check_dir)
+
+            log_filename = os.path.join(flow_init_log_dir,"flow_init_%s.log"%(last_date))
+            check_filename = os.path.join(flow_init_check_dir,"flow_init_%s.xlsx"%(last_date))
+
+            list_uuid = []
+            task_queue = Queue()
+            dict_tolong = {}
+            if not os.path.exists(check_filename) and os.path.exists(log_filename):
+                _regrex = "delete\s+(?P<tablename>bxkc[^\s]+)\s+.*ID='(?P<uuid>.+)'"
+                _regrex_tolong = "msg too long:(?P<uuid>[^,]+),\d+"
+                with open(log_filename,"r",encoding="utf8") as f:
+                    while 1:
+                        _line = f.readline()
+                        if not _line:
+                            break
+                        _match = re.search(_regrex,_line)
+                        if _match is not None:
+                            _uuid = _match.groupdict().get("uuid")
+                            tablename = _match.groupdict().get("tablename")
+                            if _uuid is not None:
+                                list_uuid.append({"uuid":_uuid,"tablename":tablename})
+                        _match = re.search(_regrex_tolong,_line)
+                        if _match is not None:
+                            _uuid = _match.groupdict().get("uuid")
+                            dict_tolong[_uuid] = 1
+
+
+                if list_uuid==0:
+                    _msg = "数据遗漏检查出错"
+                    sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
+                    # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+
+                ots_client = getConnect_ots()
+
+                for _d in list_uuid:
+                    task_queue.put(_d)
+                mt = MultiThreadHandler(task_queue,_handle,None,30)
+                mt.run()
+                df_data = {"uuid":[],
+                           "tablename":[],
+                           "exists":[],
+                           "tolong":[]}
+
+                for _data in list_uuid:
+                    for k,v in df_data.items():
+                        if k!="tolong":
+                            v.append(_data.get(k))
+                    df_data["tolong"].append(dict_tolong.get(_data["uuid"],0))
+                df2 = pd.DataFrame(df_data)
+                df2.to_excel(check_filename)
+
+            counts = 0
+            df_data = pd.read_excel(check_filename)
+            for _exists,_tolong in zip(df_data["exists"],df_data["tolong"]):
+                if _exists==0 and _tolong==0:
+                    counts += 1
+            if counts>0:
+                _msg = "数据遗漏检查报警,%s有%s条公告遗漏,详见%s"%(last_date,str(counts),check_filename)
+                sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
+                # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+
+
+
+        except Exception as e:
+            _msg = "数据遗漏检查报错"
+            sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
+            # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+            traceback.print_exc()
+
+    def monitor_init_fjs(self):
+
+        def _handle(_item,result_queue):
+            bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
+
+            rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
+                                                                           SearchQuery(bool_query,get_total_count=True),
+                                                                           columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+
+            _item["exists"] = total_count
+
+        try:
+            current_date = getCurrent_date("%Y-%m-%d")
+
+            last_date = timeAdd(current_date,-1,"%Y-%m-%d")
+
+            if not os.path.exists(flow_init_check_dir):
+                os.mkdir(flow_init_check_dir)
+
+            log_filename = os.path.join(flow_init_log_dir,"flow_init.log")
+            check_filename = os.path.join(flow_init_check_dir,"flow_init_2024-05-21.xlsx")
+
+            list_uuid = []
+            task_queue = Queue()
+            dict_tolong = {}
+            if not os.path.exists(check_filename) and os.path.exists(log_filename):
+                print('--------- 1 ----------')
+                _regrex = "delete\s+(?P<tablename>bxkc[^\s]+)\s+.*ID='(?P<uuid>.+)'"
+                _regrex_tolong = "msg too long:(?P<uuid>[^,]+),\d+"
+                with open(log_filename,"r",encoding="utf8") as f:
+                    while 1:
+                        _line = f.readline()
+                        if not _line:
+                            break
+                        _match = re.search(_regrex,_line)
+                        if _match is not None:
+                            _uuid = _match.groupdict().get("uuid")
+                            tablename = _match.groupdict().get("tablename")
+                            if _uuid is not None:
+                                list_uuid.append({"uuid":_uuid,"tablename":tablename})
+                        _match = re.search(_regrex_tolong,_line)
+                        if _match is not None:
+                            _uuid = _match.groupdict().get("uuid")
+                            dict_tolong[_uuid] = 1
+
+
+                if list_uuid==0:
+                    _msg = "数据遗漏检查出错"
+                    sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
+                    # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+
+                ots_client = getConnect_ots()
+
+                for _d in list_uuid:
+                    task_queue.put(_d)
+                mt = MultiThreadHandler(task_queue,_handle,None,30)
+                mt.run()
+                df_data = {"uuid":[],
+                           "tablename":[],
+                           "exists":[],
+                           "tolong":[]}
+
+                for _data in list_uuid:
+                    for k,v in df_data.items():
+                        if k!="tolong":
+                            v.append(_data.get(k))
+                    df_data["tolong"].append(dict_tolong.get(_data["uuid"],0))
+                df2 = pd.DataFrame(df_data)
+                df2.to_excel(check_filename)
+
+            counts = 0
+            df_data = pd.read_excel(check_filename)
+            for _exists,_tolong in zip(df_data["exists"],df_data["tolong"]):
+                if _exists==0 and _tolong==0:
+                    counts += 1
+            if counts>0:
+                _msg = "数据遗漏检查报警,%s有%s条公告遗漏,详见%s"%(last_date,str(counts),check_filename)
+                sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
+                # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+
+
+
+        except Exception as e:
+            _msg = "数据遗漏检查报错"
+            sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
+            # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+            traceback.print_exc()
+
+    def monitor_attachment(self):
+        from BaseDataMaintenance.java.MQInfo import getAllQueueSize,getQueueSize
+        try:
+            # query = BoolQuery(must_queries=[
+            #     RangeQuery("status",0,11),
+            # ])
+            #
+            # rows,next_token,total_count_todeal,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
+            #                                                                            SearchQuery(query,None,True),
+            #                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+            total_count_todeal = getQueueSize("dataflow_attachment")
+
+            if total_count_todeal>100:
+                # query = BoolQuery(must_queries=[
+                #     RangeQuery("crtime",self.get_last_tenmin_time(16))
+                # ])
+                #
+                # rows,next_token,total_count_todeal_t_a,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
+                #                                                                              SearchQuery(query,None,True),
+                #                                                                              columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+                #
+                # query = BoolQuery(must_queries=[
+                #     RangeQuery("status",0,11),
+                #     RangeQuery("crtime",self.get_last_tenmin_time(16))
+                # ])
+                #
+                # rows,next_token,total_count_todeal_t,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
+                #                                                                            SearchQuery(query,None,True),
+                #                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+
+
+
+
+
+                #通过命令行获取日志情况
+                # _cmd = 'cat %s | grep -c "%s.*process filemd5"'%(flow_attachment_log_path,self.get_last_tenmin_time())
+                # log(_cmd)
+                # process_count = self.cmd_execute(_cmd)
+                #
+                # _cmd = 'cat %s | grep -c "%s.*process filemd5.*True"'%(flow_attachment_log_path,self.get_last_tenmin_time())
+                # log(_cmd)
+                # process_succeed_count = self.cmd_execute(_cmd)
+                #
+                # _cmd = 'cat %s | grep -c "%s.*delete sql"'%(flow_init_path,self.get_last_tenmin_time())
+                # log(_cmd)
+                # init_count = self.cmd_execute(_cmd)
+
+                # _msg = "附件提取队列报警:队列堆积%s条公告,最近十分钟处理公告附件数:%s,处理成功数:%s"%(str(total_count_todeal),str(process_count),str(process_succeed_count))
+
+                #通过读取文件获取日志情况
+                dict_type = {}
+                _pattern = "%s.*process filemd5\:[^\s]* (?P<result>(True|False)) of type\:(?P<type>[^\s]*).*recognize takes (?P<costtime>\d+)s"%(re.escape(self.get_last_tenmin_time()))
+                with open(flow_attachment_log_path,"r",encoding="utf8") as f:
+                    while True:
+                        line = f.readline()
+                        if not line:
+                            break
+                        _match = re.search(_pattern,str(line))
+                        if _match is not None:
+                            _type = _match.groupdict().get("type")
+                            _result = _match.groupdict().get("result")
+                            _costtime = _match.groupdict().get("costtime")
+                            if _type not in dict_type:
+                                dict_type[_type] = {"success":0,"fail":0,"success_costtime":0,"fail_costtime":0}
+                            if _result=="True":
+                                dict_type[_type]["success"] += 1
+                                dict_type[_type]["success_costtime"] += int(_costtime)
+                            else:
+                                dict_type[_type]["fail"] += 1
+                                dict_type[_type]["fail_costtime"] += int(_costtime)
+
+                process_count = 0
+                process_succeed_count = 0
+                _msg_type = ""
+                for k,v in dict_type.items():
+                    process_count += v.get("success",0)+v.get("fail",0)
+                    process_succeed_count += v.get("success",0)
+                    _msg_type += "\n类型%s\n\t成功%s,消耗%s秒,%.2f秒/个,\n\t失败%s,消耗%s秒,%.2f秒/个"%(k,str(v.get("success")),str(v.get("success_costtime")),v.get("success_costtime")/max(1,v.get("success")),str(v.get("fail")),str(v.get("fail_costtime")),v.get("fail_costtime")/max(1,v.get("fail")))
+
+                _msg = "附件提取队列报警:队列堆积%s条公告,最近十分钟处理公告附件数:%s,处理成功数:%s"%(str(total_count_todeal),str(process_count),str(process_succeed_count))
+                sentMsgToDD(_msg+_msg_type,ACCESS_TOKEN_DATAWORKS)
+                # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+        except Exception as e:
+            traceback.print_exc()
+
+    def monitor_extract(self):
+        from BaseDataMaintenance.java.MQInfo import getAllQueueSize,getQueueSize
+        try:
+            # query = BoolQuery(must_queries=[
+            #     RangeQuery("status",11,61),
+            # ])
+            #
+            # rows,next_token,total_count_todeal,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
+            #                                                                     SearchQuery(query,None,True),
+            #                                                                     columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+            total_count_init = getQueueSize("dataflow_init")
+            if total_count_init>=100:
+                _msg = "同步队列报警,有%s条数据滞留"%(str(total_count_init))
+                sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
+                # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+
+
+            total_count_todeal = getQueueSize("dataflow_extract")
+
+            if total_count_todeal>100:
+                _cmd = 'cat %s | grep "%s" | grep -c "process.*docid"'%(flow_extract_log_path,self.get_last_tenmin_time())
+                log(_cmd)
+                process_count = self.cmd_execute(_cmd)
+                _cmd = 'cat %s | grep "%s" | grep -c "process.*docid.*1$"'%(flow_extract_log_path,self.get_last_tenmin_time())
+                log(_cmd)
+                success_count = self.cmd_execute(_cmd)
+
+                _cmd = 'cat %s | grep "%s" | grep -c "fingerprint.*exists docid"'%(flow_extract_log_path,self.get_last_tenmin_time())
+                log(_cmd)
+                exists_count = self.cmd_execute(_cmd)
+
+                _cmd = 'cat %s | grep -c "%s.*delete sql"'%(flow_init_path,self.get_last_tenmin_time())
+                log(_cmd)
+                init_count = self.cmd_execute(_cmd)
+
+
+                # query = BoolQuery(must_queries=[
+                #                                                                                RangeQuery("crtime",self.get_last_tenmin_time(16))
+                #                                                                            ])
+                #
+                # rows,next_token,total_count_todeal_t_a,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
+                #                                                                                SearchQuery(query,None,True),
+                #                                                                                columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+                #
+                # query = BoolQuery(must_queries=[
+                #     RangeQuery("status",11,61),
+                #     RangeQuery("crtime",self.get_last_tenmin_time(16))
+                # ])
+                #
+                # rows,next_token,total_count_todeal_t,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
+                #                                                                              SearchQuery(query,None,True),
+                #                                                                              columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+
+                _msg = "要素提取队列报警:队列堆积%s条公告,最近十分钟入库数:%s,最近十分钟处理公告数:%s,其中成功处理数:%s,查库免提取数:%s"%(str(total_count_todeal),str(init_count),str(process_count),str(success_count),str(exists_count))
+                atAll=False
+                if success_count==0:
+                    atAll=True
+                    _msg += "\n疑似流程出现问题,请相关负责人尽快处理"
+                sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=atAll)
+                # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+        except Exception as e:
+            traceback.print_exc()
+
+    def monitor_proposedBuilding(self):
+        current_date = getCurrent_date("%Y-%m-%d")
+
+        current_hour = getCurrent_date("%H")
+
+
+        query = BoolQuery(must_queries=[
+            RangeQuery("update_time",current_date),
+            WildcardQuery("docids","*")
+        ])
+
+        rows,next_token,total_count_doc,is_all_succeed = self.ots_client.search("designed_project","designed_project_index",
+                                                                                   SearchQuery(query,None,True),
+                                                                                   columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+
+
+        query = BoolQuery(must_queries=[
+            RangeQuery("update_time",current_date),
+            WildcardQuery("spids","*")
+        ])
+
+        rows,next_token,total_count_sp,is_all_succeed = self.ots_client.search("designed_project","designed_project_index",
+                                                                            SearchQuery(query,None,True),
+                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+
+        total_count = total_count_doc+total_count_sp
+        if not (current_hour in ("00","23","24")):
+            _msg = "拟在建生成报警:当天生成的拟在建数量为:%d,其中公告生成:%d,审批项目生成:%d"%(total_count,total_count_doc,total_count_sp)
+            atAll=False
+            if total_count==0:
+                atAll=True
+                _msg += "\n疑似流程出现问题,请相关负责人尽快处理"
+            sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=atAll)
+            # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+
+
+    def monitor_sychr(self):
+        current_date = getCurrent_date("%Y-%m-%d")
+
+        last_date = timeAdd(current_date,-1,"%Y-%m-%d")
+
+        query = BoolQuery(must_queries=[
+            RangeQuery("status",*flow_sychro_status_from,True,True),
+        ])
+
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
+                                                                            SearchQuery(query,None,True),
+                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+
+        if total_count>=200:
+            _msg = "数据流报警:待同步到成品表公告数为:%d"%(total_count)
+            sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
+            # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+
+    def monitor_preproject(self):
+        current_date = getCurrent_date("%Y-%m-%d")
+
+        last_date = timeAdd(current_date,-7,"%Y-%m-%d")
+
+        query = BoolQuery(must_queries=[
+            RangeQuery("crtime",last_date),
+        ])
+
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search("preproject","preproject_index",
+                                                                            SearchQuery(query,None,True),
+                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+        log("preproject count:%d"%(total_count))
+        if total_count<=10*10000:
+            _msg = "周期项目生成报警:最近一周生成/更新的周期项目数量为:%d"%(total_count)
+            sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
+            # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+
+    def monitor_dumplicate(self):
+
+        current_date = getCurrent_date("%Y-%m-%d")
+
+        last_date = timeAdd(current_date,-1,"%Y-%m-%d")
+
+        query = BoolQuery(must_queries=[
+            TermQuery("page_time",last_date),
+            RangeQuery("status",71),
+
+        ])
+
+        rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
+                                                                            SearchQuery(query,None,True),
+                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+
+        query = BoolQuery(must_queries=[
+            RangeQuery("status",66,71),
+
+        ])
+
+        rows,next_token,total_count_to_dump,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
+                                                                                    SearchQuery(query,None,True),
+                                                                                    columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+
+        query = BoolQuery(must_queries=[
+            TermQuery("page_time",last_date),
+            RangeQuery("status",71),
+            TermQuery("save",0)
+        ])
+
+        rows,next_token,total_count_lastday_dump,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
+                                                                                    SearchQuery(query,None,True),
+                                                                                    columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+        # if total_count_lastday_dump/total_count_lastday<0.2:
+        #     _msg = "公告去重报警,%s入库公告数:%d,其中去重数:%d,去重率:%.2f"%(last_date,total_count_lastday,total_count_lastday_dump,total_count_lastday_dump/total_count_lastday)
+        #     sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
+        #     # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+
+        # if total_count_to_dump>2000:
+        #     _msg = "公告去重报警,待去重数量:%s"%(str(total_count_to_dump))
+        #     sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
+        #     # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+
+        #成品表监控
+        query = BoolQuery(must_queries=[
+            TermQuery("page_time",last_date),
+        ])
+
+        rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document","document_index",
+                                                                                    SearchQuery(query,None,True),
+                                                                                    columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+        query = BoolQuery(must_queries=[
+            TermQuery("page_time",last_date),
+            RangeQuery("status",401,451),
+        ])
+
+        rows,next_token,total_count_lastday_dump,is_all_succeed = self.ots_client.search("document","document_index",
+                                                                                         SearchQuery(query,None,True),
+                                                                                         columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+        if total_count_lastday_dump/total_count_lastday<0.2:
+            _msg = "公告成品去重报警,%s入库公告数:%d,其中去重数:%d,去重率:%.2f"%(last_date,total_count_lastday,total_count_lastday_dump,total_count_lastday_dump/total_count_lastday)
+            sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
+            # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+
+        query = BoolQuery(must_queries=[
+            RangeQuery("status",*flow_dumplicate_status_from,True,True),
+        ])
+
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
+                                                                            SearchQuery(query,None,True),
+                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+
+        if total_count>=1000:
+            _cmd = 'cat %s | grep -c "%s.*upgrate True save"'%(flow_dumplicate_log_path,self.get_last_tenmin_time())
+            process_count = self.cmd_execute(_cmd)
+            atAll = False
+            if process_count=="":
+                process_count = 0
+            if int(process_count)==0:
+                atAll = True
+            _msg = "数据流报警:待去重公告数为:%d,最近十分钟去重数为:%s"%(total_count,str(process_count))
+            sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=atAll)
+            # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+
+
+    def monitor_merge(self):
+        current_date = getCurrent_date("%Y-%m-%d")
+        last_date = timeAdd(current_date,-1,"%Y-%m-%d")
+        check_count = 20000
+        query = BoolQuery(must_queries=[
+            RangeQuery("status",201,301)
+        ])
+        list_doc = []
+        queue_docid = Queue()
+        rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document","document_index",
+                                                                                    SearchQuery(query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),get_total_count=True),
+                                                                                    columns_to_get=ColumnsToGet(["docchannel"],return_type=ColumnReturnType.SPECIFIED))
+        list_data = getRow_ots(rows)
+        list_doc.extend(list_data)
+        while next_token and len(list_doc)<check_count:
+            rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document","document_index",
+                                                                                        SearchQuery(query,next_token=next_token,limit=100,get_total_count=True),
+                                                                                        columns_to_get=ColumnsToGet(["docchannel"],return_type=ColumnReturnType.SPECIFIED))
+            list_data = getRow_ots(rows)
+            list_doc.extend(list_data)
+        for _doc in list_doc:
+            queue_docid.put(_doc)
+        def _handle(item,result_queue):
+            docid = item.get("docid")
+            _query = BoolQuery(must_queries=[TermQuery("docids",str(docid))])
+            rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("project2","project2_index",
+                                                                                        SearchQuery(_query,None,limit=100,get_total_count=True),
+                                                                                        columns_to_get=ColumnsToGet(["zhao_biao_page_time","zhong_biao_page_time","docid_number"],return_type=ColumnReturnType.SPECIFIED))
+            list_data = getRow_ots(rows)
+            item["projects"] = list_data
+
+        mt = MultiThreadHandler(queue_docid,_handle,None,30)
+        mt.run()
+
+        not_find_count = 0
+        zhongbiao_find_zhaobiao = 0
+        zhongbiao_count = 0
+        multi_count = 0
+        list_notfind = []
+        list_multi = []
+        for _doc in list_doc:
+            if len(_doc.get("projects",[]))==0:
+                not_find_count += 1
+                list_notfind.append(str(_doc.get("docid")))
+            if len(_doc.get("projects",[]))>=2:
+                multi_count += 1
+                list_multi.append(str(_doc.get("docid")))
+            if _doc.get("docchannel") in (101,118,119,120):
+                zhongbiao_count += 1
+                if len(_doc.get("projects",[]))==1:
+                    _project = _doc.get("projects")[0]
+                    if _project.get("zhao_biao_page_time","")!="":
+                        zhongbiao_find_zhaobiao += 1
+
+        if not_find_count>0 or multi_count>0 or zhongbiao_find_zhaobiao/zhongbiao_count<0.8:
+            if not_find_count>0 or multi_count>0:
+                current_time = getCurrent_date(format="%Y-%m-%d_%H%M%S")
+                logname = os.path.join(self.current_path,"log","%s.log"%current_time)
+                with open(logname,"w",encoding="utf8") as f:
+                    f.write(",".join(list_notfind))
+                    f.write("\n")
+                    f.write(",".join(list_multi))
+                _msg = "公告合并报警:近%d条成品公告,有%d条未生成项目,有%d条公告找到多个项目,详见%s;其中有%d条中标公告且找到招标公告数为%d,比率为%.3f"%(check_count,not_find_count,multi_count,logname,zhongbiao_count,zhongbiao_find_zhaobiao,zhongbiao_find_zhaobiao/zhongbiao_count)
+                sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
+                # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+            else:
+                _msg = "公告合并报警:近%d条成品公告其中有%d条中标公告且找到招标公告数为%d,比率为%.3f"%(check_count,zhongbiao_count,zhongbiao_find_zhaobiao,zhongbiao_find_zhaobiao/zhongbiao_count)
+                sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
+                # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+
+
+        query = BoolQuery(must_queries=[
+            TermQuery("page_time",last_date),
+            RangeQuery("status",71),
+
+        ])
+
+        rows,next_token,total_count_lastday,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
+                                                                                    SearchQuery(query,None,True),
+                                                                                    columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+        if total_count_lastday<10*10000:
+            _msg = "公告成品入库报警,%s入库公告数:%d"%(last_date,total_count_lastday)
+            sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
+            # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+
+
+    def start_monitor(self):
+        #数据监控
+
+        scheduler = BlockingScheduler()
+
+        # scheduler.add_job(self.monitor_attachment,"cron",minute="*/10")
+        scheduler.add_job(self.monitor_extract,"cron",minute="*/10")
+        scheduler.add_job(self.monitor_proposedBuilding,"cron",hour="*/3")
+        # scheduler.add_job(self.monitor_dumplicate,"cron",minute="*/10")
+        scheduler.add_job(self.monitor_sychr,"cron",minute="*/10")
+        scheduler.add_job(self.monitor_preproject,"cron",hour="8")
+        scheduler.add_job(self.monitor_merge,"cron",hour="*/1")
+        scheduler.add_job(self.monitor_init,"cron",hour="*/3")
+        scheduler.start()
+
+    def start_attach_monitor(self):
+        #附件监控
+        scheduler = BlockingScheduler()
+
+        scheduler.add_job(self.monitor_attachment,"cron",minute="*/10")
+        scheduler.start()
+
+
+if __name__ == '__main__':
+
+    dm = BaseDataMonitor()
+    dm.monitor_init_fjs()
+
+    # # dm.start_monitor()
+    # log_filename = "C:\\Users\\Administrator\\Desktop\\flow_init_2023-02-03.log"
+    # dm.check_document_uuid(log_filename)
+
+    # sentMsgToDD("报警test_msg",ACCESS_TOKEN_DATAWORKS)
+    # dm.monitor_proposedBuilding()
+    # print(dm.get_last_tenmin_time(16))
+
+
+

+ 11 - 0
BaseDataMaintenance/fixDoc_to_queue_extract.py

@@ -0,0 +1,11 @@
+
+import sys,os
+
+sys.path.append(os.path.dirname(__file__)+"/..")
+
+from BaseDataMaintenance.maintenance.dataflow_mq import fixDoc_to_queue_extract,fixDoc_to_queue_init
+
+
+if __name__ == '__main__':
+    # fixDoc_to_queue_extract()
+    fixDoc_to_queue_init(filename="/data/python/flow_init_check/flow_init_2023-12-28.xlsx")

+ 6 - 5
BaseDataMaintenance/maintenance/dataflow.py

@@ -2592,7 +2592,7 @@ class Dataflow_dumplicate(Dataflow):
             confidence = _dict["confidence"]
 
             if b_log:
-                log("confidence %d %.3f"%(_docid,confidence))
+                log("confidence %d %.3f total_count %d"%(_docid,confidence,_dict.get('min_counts',0)))
 
             if confidence>0.1:
                 if _docid not in set_docid:
@@ -2623,9 +2623,9 @@ class Dataflow_dumplicate(Dataflow):
         if page_time=='':
             page_time = current_date
 
-        two_day_dict = {"page_time":[timeAdd(page_time,-2),timeAdd(page_time,2)]}
+        two_day_dict = {"page_time":[timeAdd(page_time,-7),timeAdd(page_time,7)]}
 
-        if page_time>=timeAdd(current_date,-2):
+        if page_time>=timeAdd(current_date,-7):
             table_name = "document_tmp"
             table_index = "document_tmp_index"
             base_dict = {
@@ -3966,6 +3966,7 @@ class Dataflow_dumplicate(Dataflow):
                         has_before = True
                     if v>page_time:
                         has_after = True
+        log("check page_time has_before %s has_after %s"%(str(has_before),str(has_after)))
         if has_before:
             _query = BoolQuery(must_queries=[MatchPhraseQuery(document_doctitle,item.get(document_doctitle,""))],
                                must_not_queries=[TermQuery(document_docid,item.get(document_docid,0))])
@@ -4003,7 +4004,7 @@ class Dataflow_dumplicate(Dataflow):
             log("dumplicate %s rules:%d"%(str(item.get(document_tmp_docid)),len(list_rules)))
             list_rules = list_rules[:30]
             _i = 0
-            step = 5
+            step = 2
 
 
             item["confidence"] = 999
@@ -4412,7 +4413,7 @@ if __name__ == '__main__':
     # test_attachment_interface()
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
-    df_dump.test_dumplicate(463253000
+    df_dump.test_dumplicate(455485514
                             )
     # compare_dumplicate_check()
     # df_dump.test_merge([391898061

+ 24 - 12
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -79,7 +79,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
 
         self.queue_attachment_ocr = Queue()
         self.queue_attachment_not_ocr = Queue()
-        self.comsumer_count = 90
+        self.comsumer_count = 20
         self.comsumer_process_count = 5
         self.retry_comsumer_count = 10
         self.retry_times = 5
@@ -97,12 +97,12 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
 
         self.session = None
 
-        # for _ in range(self.comsumer_process_count):
-        #     listener_p = Process(target=self.start_attachment_listener)
-        #     listener_p.start()
+        for _ in range(self.comsumer_process_count):
+            listener_p = Process(target=self.start_attachment_listener)
+            listener_p.start()
 
-        listener_p = Process(target=self.start_attachment_listener)
-        listener_p.start()
+        # listener_p = Process(target=self.start_attachment_listener)
+        # listener_p.start()
 
 
 
@@ -332,6 +332,8 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
         objectPath = attach.getProperties().get(attachment_path)
         docids = attach.getProperties().get(attachment_docids)
 
+        _ots_exists = attach.getProperties().get("ots_exists")
+
         if objectPath is None:
             relative_path = "%s/%s"%(_uuid.hex[:4],_uuid.hex)
         else:
@@ -408,8 +410,9 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
 
 
                     if local_exists:
-                        upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
-                    os.remove(localpath)
+                        if not _ots_exists:
+                            upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
+                        os.remove(localpath)
 
                     return True
 
@@ -422,7 +425,9 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                 # _data_base64 = base64.b64encode(open(localpath,"rb").read())
                 # _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype)
                 _success,_html,swf_images,classification = getAttachDealInterface(None,_filetype,path=localpath,session=self.session)
-                log("process filemd5:%s %s of type:%s with size:%.3fM download:%ds recognize takes %ds,ret_size:%d"%(filemd5,str(_success),_filetype,round(_size/1024/1024,4),time_download,time.time()-start_time,len(_html)))
+
+                _reg_time = time.time()-start_time
+
                 if _success:
                     if len(_html)<5:
                         _html = ""
@@ -435,6 +440,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
 
                         return False
 
+
                 # 重跑swf时,删除原来的swf_urls中的"\"
                 if attach.getProperties().get(attachment_filetype) == "swf":
                     swf_urls = attach.getProperties().get(attachment_swfUrls, "[]")
@@ -497,14 +503,17 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                 self.putAttach_json_toRedis(filemd5,attach.getProperties())
 
 
+                start_time = time.time()
                 if local_exists:
-                    upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
+                    if not _ots_exists:
+                        upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
                 try:
                     if upload_status and os.exists(localpath):
                         os.remove(localpath)
                 except Exception as e:
                     pass
-
+                _upload_time = time.time()-start_time
+                log("process filemd5:%s %s of type:%s with size:%.3fM download:%.2fs recognize takes %ds upload takes %.2fs _ots_exists %s,ret_size:%d"%(filemd5,str(_success),_filetype,round(_size/1024/1024,4),time_download,_reg_time,_upload_time,str(_ots_exists),len(_html)))
 
                 return True
             else:
@@ -624,7 +633,10 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                     if _attach_ots.fix_columns(self.ots_client,[attachment_status,attachment_path,attachment_attachmenthtml,attachment_attachmentcon,attachment_filetype,attachment_swfUrls,attachment_process_time],True):
                         if _attach_ots.getProperties().get(attachment_status) is not None:
                             log("getAttachments find in ots:%s"%(_filemd5))
-                            list_attachment.append(Attachment_postgres(_attach_ots.getProperties()))
+                            _attach_pg = Attachment_postgres(_attach_ots.getProperties())
+                            _attach_pg.setValue("ots_exists",True,True)
+                            list_attachment.append(_attach_pg)
+
                     else:
                         log("getAttachments search in path:%s"%(_filemd5))
                         if _path:

+ 8 - 8
BaseDataMaintenance/maintenance/document/download_attachment_and_set_status_rerun.py

@@ -7,14 +7,13 @@ import time
 import pandas as pd
 import numpy as np
 
-from BaseDataMaintenance.common.multiThread import MultiThreadHandler
-
 sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../../")
 from BaseDataMaintenance.common.ossUtils import downloadFile
 from BaseDataMaintenance.dataSource.source import getConnect_ots
 from BaseDataMaintenance.maintenance.dataflow import Dataflow
 from BaseDataMaintenance.model.ots.attachment import attachment, attachment_filemd5, attachment_status
 from BaseDataMaintenance.model.ots.document import Document, document_docid, document_partitionkey, document_status
+from BaseDataMaintenance.common.multiThread import MultiThreadHandler
 from BaseDataMaintenance.java.MQInfo import getQueueSize
 
 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@@ -215,14 +214,14 @@ def set_status_document(args, queue):
     docid, p_key = args
     # print(docid, p_key)
     ots_doc = Document({document_docid: docid, document_partitionkey: p_key, document_status: 1})
-    # ots_doc.update_row(ots_client)
+    ots_doc.update_row(ots_client)
 
 
 def set_status_attachment(args, queue):
     md5, path = args
     # print(md5, path)
     ots_attach = attachment({attachment_filemd5: md5, attachment_status: 0})
-    # ots_attach.update_row(ots_client)
+    ots_attach.update_row(ots_client)
 
 
 def rerun_main(file_path, set_docid=True, set_md5=True):
@@ -244,11 +243,12 @@ def rerun_main(file_path, set_docid=True, set_md5=True):
         print(md5_path_list[:10])
 
     # 多线程执行
-    batch = 1000
+    batch = 100
+    start_index = 1800
     used_md5_list = []
     if md5_path_list and docid_key_list:
         start_time = time.time()
-        for i in range(0, len(docid_key_list), batch):
+        for i in range(start_index, len(docid_key_list), batch):
             logging.info('Start batch ' + str(i) + ' to ' + str(i+batch))
 
             # 取一个batch的docid
@@ -276,7 +276,7 @@ def rerun_main(file_path, set_docid=True, set_md5=True):
             for k in sub_md5_path_list:
                 task_queue.put(k)
             a = MultiThreadHandler(task_queue=task_queue, task_handler=download_attachment_mp,
-                                   result_queue=result_queue, thread_count=1)
+                                   result_queue=result_queue, thread_count=4)
             a.run()
 
             # set attachment status
@@ -291,7 +291,7 @@ def rerun_main(file_path, set_docid=True, set_md5=True):
             # set document status
             task_queue = queue.Queue()
             result_queue = queue.Queue()
-            for k in docid_key_list:
+            for k in sub_docid_key_list:
                 task_queue.put(k)
             a = MultiThreadHandler(task_queue=task_queue, task_handler=set_status_document,
                                    result_queue=result_queue, thread_count=10)

+ 1 - 1
BaseDataMaintenance/maintenance/proposedBuilding/DataSynchronization.py

@@ -53,7 +53,7 @@ class DataSynchronization():
                 _proposed = proposedBuilding_tmp(_data)
                 task_queue.put(_proposed,True)
             _count += len(list_data)
-            if _count>3000:
+            if _count>20000:
                 break
 
 

+ 136 - 0
BaseDataMaintenance/maintenance/proposedBuilding/one_col_format_unify.py

@@ -0,0 +1,136 @@
+import time
+import pandas as pd
+from sqlalchemy import create_engine
+from tablestore import INF_MIN, INF_MAX, CompositeColumnCondition, LogicalOperator, SingleColumnCondition, \
+    ComparatorType, Direction, OTSClientError, OTSServiceError
+from BaseDataMaintenance.dataSource.source import getConnect_ots
+
+ots_client = getConnect_ots()
+
+
+def get_one_col_all_data():
+    table_name = 'designed_project'
+
+    # 设置范围读的起始主键。
+    inclusive_start_primary_key = [('partitionkey', INF_MIN), ('id', INF_MIN)]
+
+    # 设置范围读的结束主键。
+    exclusive_end_primary_key = [('partitionkey', INF_MAX), ('id', INF_MAX)]
+
+    # 查询所有列。
+    columns_to_get = ['project_investment']
+
+    # 每次最多返回90行,如果总共有100个结果,首次查询时指定limit=90,则第一次最多返回90,最少可能返回0个结果,但是next_start_primary_key不为None。
+    limit = 90
+
+    # 设置过滤器,增加列条件。过滤条件为address列值等于'China'且age列值小于50。
+    cond = CompositeColumnCondition(LogicalOperator.AND)
+
+    # 如果某行不存在对应列时,您需要配置pass_if_missing参数来确定该行是否满足过滤条件。
+    # 当不设置pass_if_missing或者设置pass_if_missing为True时,表示当某行不存在该列时,该行满足过滤条件。
+    # 当设置pass_if_missing为False时,表示当某行不存在该列时,该行不满足过滤条件。
+    # cond.add_sub_condition(SingleColumnCondition("address", 'China', ComparatorType.EQUAL, pass_if_missing=False))
+    # cond.add_sub_condition(SingleColumnCondition("age", 50, ComparatorType.LESS_THAN, pass_if_missing=False))
+
+    all_rows = []
+    start_time1 = time.time()
+    try:
+        # 调用get_range接口。
+        consumed, next_start_primary_key, row_list, next_token = ots_client.get_range(
+            table_name,
+            Direction.FORWARD,
+            inclusive_start_primary_key,
+            exclusive_end_primary_key,
+            columns_to_get,
+            limit,
+            # column_filter=cond,
+            max_version=1,
+            # time_range=(1557125059000, 1557129059000)  # start_time大于等于1557125059000,end_time小于1557129059000。
+        )
+        all_rows.extend(row_list)
+
+        # 当next_start_primary_key不为空时,则继续读取数据。
+        index = 0
+        start_time = time.time()
+        while next_start_primary_key is not None:
+            if index % 1000 == 0:
+                print('Loop', (index+2)*limit, time.time()-start_time)
+                start_time = time.time()
+            inclusive_start_primary_key = next_start_primary_key
+            consumed, next_start_primary_key, row_list, next_token = ots_client.get_range(
+                table_name,
+                Direction.FORWARD,
+                inclusive_start_primary_key,
+                exclusive_end_primary_key,
+                columns_to_get,
+                limit,
+                # column_filter=cond,
+                max_version=1
+            )
+            all_rows.extend(row_list)
+            index += 1
+
+    # 客户端异常,一般为参数错误或者网络异常。
+    except OTSClientError as e:
+        print('get row failed, http_status:%d, error_message:%s' % (e.get_http_status(), e.get_error_message()))
+    # 服务端异常,一般为参数错误或者流控错误。
+    except OTSServiceError as e:
+        print('get row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s' % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id()))
+
+    # 打印主键和属性列。
+    result_rows = []
+    for row in all_rows:
+        # print(row.primary_key, row.attribute_columns)
+        result_rows.append([row.primary_key[0][1], row.primary_key[1][1], row.attribute_columns[0][1]])
+    print('Total rows: ', len(all_rows), 'Total time: ', time.time() - start_time1)
+    print(result_rows[0])
+    print(result_rows[1])
+
+    return result_rows
+
+
+def list_to_xlsx(data_list):
+    df = pd.DataFrame(data_list)
+    df.columns = ['partitionkey', 'id', 'project_investment']
+    df.to_csv('D:\\BIDI_DOC\\比地_文档\\统一格式_project_investment.csv', index=False)
+
+
+def csv_to_mysql():
+    mysql_host = '192.168.2.170:3306'
+    mysql_db = 'exportdb'
+    mysql_user = 'root'
+    mysql_pwd = 'pwdformysql0922'
+    mysql_table = 'bdm_one_col_format_unify'
+    xlsx_path = r'D:\BIDI_DOC\比地_文档\统一格式_project_investment.csv'
+
+    engine = create_engine('mysql+pymysql://{}:{}@{}/{}?charset=utf8'.format(mysql_user, mysql_pwd, mysql_host, mysql_db))
+    # df = pd.read_excel(xlsx_path)
+    df = pd.read_csv(xlsx_path)
+    # 表名 需删除索引列
+    df.to_sql(mysql_table, con=engine, if_exists='append', index=False)
+    """
+        to_sql参数:(比较重要)
+            if_exists:表如果存在怎么处理
+                    append:追加
+                    replace:删除原表,建立新表再添加
+                    fail:什么都不干
+             chunksize: 默认的话是一次性导入, 给值的话是批量导入,一批次导入多少
+             index=False:不插入索引index
+             dtype 创建表结构
+               需要导入 import sqlalchemy
+               dtype = {'id': sqlalchemy.types.BigInteger(),
+                 'name': sqlalchemy.types.String(length=20),
+                 'sex': sqlalchemy.types.String(length=20),
+                 'age': sqlalchemy.types.BigInteger(),
+                 })
+             
+    """
+
+
+if __name__ == '__main__':
+    # _list = get_one_col_all_data()
+    # list_to_xlsx(_list)
+    csv_to_mysql()
+
+    # 拿到单行所有数据上传到maxcompute处理成统一格式,再更新
+    # 本地取数据是因为maxcompute取数据需要数据类型,一列中有多种数据类型会报错

+ 53 - 0
BaseDataMaintenance/maintenance/proposedBuilding/update_col.py

@@ -0,0 +1,53 @@
+from BaseDataMaintenance.model.ots.designed_project import designed_project, designed_project_id, \
+    designed_project_partitionkey, designed_project_project_investment, designed_project_engineer_cost, \
+    designed_project_proportion_unit, designed_project_facade_type, designed_project_construct_install_fee, \
+    designed_project_project_structure, designed_project_has_steel, designed_project_covered_area, \
+    designed_project_floor_space, designed_project_elevator, designed_project_project_nature, \
+    designed_project_floors_num
+from BaseDataMaintenance.dataSource.source import getConnect_ots
+
+
+ots_client = getConnect_ots()
+
+
+def update():
+    _dict1 = {
+        designed_project_partitionkey: 2,
+        designed_project_id: 4624623048274739201,
+        designed_project_project_investment: 100000.12354,
+        designed_project_engineer_cost: 20000.222,
+        designed_project_construct_install_fee: 10000.4,
+        designed_project_floor_space: '',
+        designed_project_covered_area: 900,
+        designed_project_proportion_unit: 'm',
+        designed_project_project_structure: '框架结构',
+        designed_project_facade_type: '玻璃幕墙',
+        designed_project_has_steel: 1,
+        designed_project_elevator: 1,
+        designed_project_project_nature: '新建',
+        designed_project_floors_num: 0,
+    }
+
+    _dict2 = {
+        designed_project_partitionkey: 2,
+        designed_project_id: 4647683317091532801,
+        designed_project_project_investment: None,
+        designed_project_engineer_cost: 20.222,
+        designed_project_construct_install_fee: 100,
+        designed_project_floor_space: '',
+        designed_project_covered_area: 5465.36,
+        # designed_project_proportion_unit: '㎡',
+        designed_project_proportion_unit: '',
+        designed_project_project_structure: None,
+        designed_project_facade_type: '',
+        designed_project_has_steel: 0,
+        designed_project_elevator: 0,
+        designed_project_project_nature: None,
+        designed_project_floors_num: 2,
+    }
+    ots_dp = designed_project(_dict2)
+    ots_dp.update_row(ots_client)
+
+
+if __name__ == '__main__':
+    update()

+ 53 - 0
BaseDataMaintenance/model/ots/designed_project.py

@@ -4,6 +4,59 @@ from BaseDataMaintenance.model.ots.BaseModel import BaseModel
 from tablestore import *
 from BaseDataMaintenance.common.Utils import *
 
+
+designed_project_partitionkey = 'partitionkey'
+designed_project_id = 'id'
+designed_project_area = 'area'
+designed_project_begintime = 'begintime'
+designed_project_city = 'city'
+designed_project_construct_install_fee = 'construct_install_fee'
+designed_project_contacts = 'contacts'
+designed_project_covered_area = 'covered_area'
+designed_project_crtime = 'crtime'
+designed_project_des_project_type = 'des_project_type'
+designed_project_district = 'district'
+designed_project_docids = 'docids'
+designed_project_elevator = 'elevator'
+designed_project_endtime = 'endtime'
+designed_project_engineer_cost = 'engineer_cost'
+designed_project_facade_type = 'facade_type'
+designed_project_facade_type2 = 'facade_type2'
+designed_project_floor_space = 'floor_space'
+designed_project_floors_num = 'floors_num'
+designed_project_follow_number = 'follow_number'
+designed_project_follows = 'follows'
+designed_project_full_text = 'full_text'
+designed_project_has_steel = 'has_steel'
+designed_project_high_project_name = 'high_project_name'
+designed_project_json_list_group = 'json_list_group'
+designed_project_latest_service_time = 'latest_service_time'
+designed_project_ordinary_name = 'ordinary_name'
+designed_project_page_time = 'page_time'
+designed_project_progress = 'progress'
+designed_project_project_address = 'project_address'
+designed_project_project_code = 'project_code'
+designed_project_project_description = 'project_description'
+designed_project_project_follow = 'project_follow'
+designed_project_project_investment = 'project_investment'
+designed_project_project_name = 'project_name'
+designed_project_project_nature = 'project_nature'
+designed_project_project_structure = 'project_structure'
+designed_project_project_type = 'project_type'
+designed_project_proportion_unit = 'proportion_unit'
+designed_project_province = 'province'
+designed_project_spids = 'spids'
+designed_project_status = 'status'
+designed_project_update_status = 'update_status'
+designed_project_update_time = 'update_time'
+designed_project_approval_list = 'approval_list'
+designed_project_bid_info_list = 'bid_info_list'
+designed_project_contract_register_list = 'contract_register_list'
+designed_project_construct_pic_check_list = 'construct_pic_check_list'
+designed_project_construct_permit_list = 'construct_permit_list'
+designed_project_completion_accpet_list = 'completion_accpet_list'
+
+
 class designed_project(BaseModel):
 
     def __init__(self,_dict):

Plik diff jest za duży
+ 370 - 312
BaseDataMaintenance/model/ots/proposedBuilding_tmp.py


+ 25 - 13
BaseDataMaintenance/start_main.py

@@ -1,20 +1,29 @@
-
 import sys
 import os
-sys.path.append(os.path.dirname(__file__)+"/..")
+sys.path.append(os.path.dirname(__file__) + "/..")
 import argparse
 
+
 def main(args=None):
     parser = argparse.ArgumentParser()
-    parser.add_argument("--aA",dest="attachAttachment",action="store_true",help="start attachmentAttachment process")
-    parser.add_argument("--etr",dest="enterpriseToRedis",action="store_true",help="start attachmentAttachment process")
-    parser.add_argument("--filename",dest="filename",type=str,default=None,help="start attachmentAttachment process")
-    parser.add_argument("--delkey",dest="deleteEnterpriseKey",action="store_true",help="start attachmentAttachment process")
-    parser.add_argument("--keys",dest="keys",type=str,default=None,help="start attachmentAttachment process")
-    parser.add_argument("--rptc",dest="remove_processed_tyc_company",action="store_true",help="start attachmentAttachment process")
-    parser.add_argument("--product_dict_synchonize",dest="product_dict_synchonize",action="store_true",help="start product_dict_synchonize process")
-    parser.add_argument("--delete_product_collections",dest="delete_product_collections",action="store_true",help="start product_dict_synchonize process")
-    parser.add_argument("--search_similar",dest="search_similar",action="store_true",help="start product_dict_synchonize process")
+    parser.add_argument("--aA", dest="attachAttachment", action="store_true", help="start attachmentAttachment process")
+    parser.add_argument("--etr", dest="enterpriseToRedis", action="store_true",
+                        help="start attachmentAttachment process")
+    parser.add_argument("--filename", dest="filename", type=str, default=None,
+                        help="start attachmentAttachment process")
+    parser.add_argument("--delkey", dest="deleteEnterpriseKey", action="store_true",
+                        help="start attachmentAttachment process")
+    parser.add_argument("--keys", dest="keys", type=str, default=None, help="start attachmentAttachment process")
+    parser.add_argument("--rptc", dest="remove_processed_tyc_company", action="store_true",
+                        help="start attachmentAttachment process")
+    parser.add_argument("--product_dict_synchonize", dest="product_dict_synchonize", action="store_true",
+                        help="start product_dict_synchonize process")
+    parser.add_argument("--delete_product_collections", dest="delete_product_collections", action="store_true",
+                        help="start product_dict_synchonize process")
+    parser.add_argument("--search_similar", dest="search_similar", action="store_true",
+                        help="start product_dict_synchonize process")
+    parser.add_argument("--rerun", dest="rerun", type=str, default=None,
+                        help="start download_attachment_and_set_status_rerun process, with docid file path and md5 file path")
 
     args = parser.parse_args(args)
     if args.attachAttachment:
@@ -27,7 +36,7 @@ def main(args=None):
     if args.deleteEnterpriseKey:
         from BaseDataMaintenance.model.redis.enterprise import remove_enterprise_key
         if args.keys or args.filename:
-            remove_enterprise_key(args.filename,args.keys)
+            remove_enterprise_key(args.filename, args.keys)
     if args.remove_processed_tyc_company:
         from BaseDataMaintenance.maintenance.tyc_company.remove_processed import start_remove_processed_tyc_company
         start_remove_processed_tyc_company()
@@ -40,7 +49,10 @@ def main(args=None):
     if args.search_similar:
         from BaseDataMaintenance.maintenance.product.product_dict import search_similar
         search_similar()
+    if args.rerun:
+        from BaseDataMaintenance.maintenance.document.download_attachment_and_set_status_rerun import rerun_main
+        rerun_main(args.rerun)
 
 
 if __name__ == '__main__':
-    main()
+    main()

Niektóre pliki nie zostały wyświetlone z powodu dużej ilości zmienionych plików