ソースを参照

提升附件处理的速度,增加进程,已存在的数据不再上传

luojiehua 1 年間 前
コミット
239aa87d89

+ 19 - 8
BaseDataMaintenance/dataMonitor/data_monitor.py

@@ -209,7 +209,7 @@ class BaseDataMonitor():
             #                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
             total_count_todeal = getQueueSize("dataflow_attachment")
 
-            if total_count_todeal>500:
+            if total_count_todeal>1000:
                 # query = BoolQuery(must_queries=[
                 #     RangeQuery("crtime",self.get_last_tenmin_time(16))
                 # ])
@@ -248,7 +248,7 @@ class BaseDataMonitor():
 
                 #通过读取文件获取日志情况
                 dict_type = {}
-                _pattern = "%s.*process filemd5\:[^\s]* (?P<result>(True|False)) of type\:(?P<type>[^\s]*).*recognize takes (?P<costtime>\d+)s"%(re.escape(self.get_last_tenmin_time()))
+                _pattern = "%s.*process filemd5\:[^\s]* (?P<result>(True|False)) of type\:(?P<type>[^\s]*).*download:(?P<downloadtime>\d+\.\d+)s recognize takes (?P<costtime>\d+)s upload takes (?P<uploadtime>\d+\.\d+)s"%(re.escape(self.get_last_tenmin_time()))
                 with open(flow_attachment_log_path,"r",encoding="utf8") as f:
                     while True:
                         line = f.readline()
@@ -258,15 +258,25 @@ class BaseDataMonitor():
                         if _match is not None:
                             _type = _match.groupdict().get("type")
                             _result = _match.groupdict().get("result")
+                            _downtime = _match.groupdict().get("downloadtime")
                             _costtime = _match.groupdict().get("costtime")
+
+                            _uploadtime = _match.groupdict().get("uploadtime")
                             if _type not in dict_type:
-                                dict_type[_type] = {"success":0,"fail":0,"success_costtime":0,"fail_costtime":0}
+                                dict_type[_type] = {"success":0,"fail":0,"success_costtime":0,"fail_costtime":0,"downtime":0,"downcount":0,"uploadtime":0,"uploadcount":0}
                             if _result=="True":
                                 dict_type[_type]["success"] += 1
                                 dict_type[_type]["success_costtime"] += int(_costtime)
+
                             else:
                                 dict_type[_type]["fail"] += 1
                                 dict_type[_type]["fail_costtime"] += int(_costtime)
+                            if float(_downtime)>0:
+                                dict_type[_type]["downcount"] += 1
+                                dict_type[_type]["downtime"] += float(_downtime)
+                            if float(_uploadtime)>0:
+                                dict_type[_type]["uploadcount"] += 1
+                                dict_type[_type]["uploadtime"] += float(_uploadtime)
 
                 process_count = 0
                 process_succeed_count = 0
@@ -274,7 +284,7 @@ class BaseDataMonitor():
                 for k,v in dict_type.items():
                     process_count += v.get("success",0)+v.get("fail",0)
                     process_succeed_count += v.get("success",0)
-                    _msg_type += "\n类型%s\n\t成功%s,消耗%s秒,%.2f秒/个,\n\t失败%s,消耗%s秒,%.2f秒/个"%(k,str(v.get("success")),str(v.get("success_costtime")),v.get("success_costtime")/max(1,v.get("success")),str(v.get("fail")),str(v.get("fail_costtime")),v.get("fail_costtime")/max(1,v.get("fail")))
+                    _msg_type += "\n类型%s\n\t成功%s,消耗%s秒,%.2f秒/个,\n\t失败%s,消耗%s秒,%.2f秒/个,\n\t下载%s,消耗%s秒,%.2f秒/个,\n\t上传%s,消耗%s秒,%.2f秒/个"%(k,str(v.get("success")),str(v.get("success_costtime")),v.get("success_costtime")/max(1,v.get("success")),str(v.get("fail")),str(v.get("fail_costtime")),v.get("fail_costtime")/max(1,v.get("fail")),str(v.get("downcount")),str(v.get("downtime")),v.get("downtime")/max(1,v.get("downcount")),str(v.get("uploadcount")),str(v.get("uploadtime")),v.get("uploadtime")/max(1,v.get("uploadcount")))
 
                 _msg = "附件提取队列报警:队列堆积%s条公告,最近十分钟处理公告附件数:%s,处理成功数:%s"%(str(total_count_todeal),str(process_count),str(process_succeed_count))
                 log(_msg)
@@ -629,10 +639,11 @@ class BaseDataMonitor():
 
     def start_attach_monitor(self):
         #附件监控
-        scheduler = BlockingScheduler()
-
-        scheduler.add_job(self.monitor_attachment,"cron",minute="*/10")
-        scheduler.start()
+        # scheduler = BlockingScheduler()
+        #
+        # scheduler.add_job(self.monitor_attachment,"cron",minute="*/10")
+        # scheduler.start()
+        self.monitor_attachment()
 
 
 if __name__ == '__main__':

+ 3 - 2
BaseDataMaintenance/maintenance/dataflow.py

@@ -2592,7 +2592,7 @@ class Dataflow_dumplicate(Dataflow):
             confidence = _dict["confidence"]
 
             if b_log:
-                log("confidence %d %.3f"%(_docid,confidence))
+                log("confidence %d %.3f total_count %d"%(_docid,confidence,_dict.get('min_counts',0)))
 
             if confidence>0.1:
                 if _docid not in set_docid:
@@ -3966,6 +3966,7 @@ class Dataflow_dumplicate(Dataflow):
                         has_before = True
                     if v>page_time:
                         has_after = True
+        log("check page_time has_before %s has_after %s"%(str(has_before),str(has_after)))
         if has_before:
             _query = BoolQuery(must_queries=[MatchPhraseQuery(document_doctitle,item.get(document_doctitle,""))],
                                must_not_queries=[TermQuery(document_docid,item.get(document_docid,0))])
@@ -4412,7 +4413,7 @@ if __name__ == '__main__':
     # test_attachment_interface()
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
-    df_dump.test_dumplicate(464184856
+    df_dump.test_dumplicate(455485514
                             )
     # compare_dumplicate_check()
     # df_dump.test_merge([391898061

+ 24 - 12
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -79,7 +79,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
 
         self.queue_attachment_ocr = Queue()
         self.queue_attachment_not_ocr = Queue()
-        self.comsumer_count = 90
+        self.comsumer_count = 20
         self.comsumer_process_count = 5
         self.retry_comsumer_count = 10
         self.retry_times = 5
@@ -97,12 +97,12 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
 
         self.session = None
 
-        # for _ in range(self.comsumer_process_count):
-        #     listener_p = Process(target=self.start_attachment_listener)
-        #     listener_p.start()
+        for _ in range(self.comsumer_process_count):
+            listener_p = Process(target=self.start_attachment_listener)
+            listener_p.start()
 
-        listener_p = Process(target=self.start_attachment_listener)
-        listener_p.start()
+        # listener_p = Process(target=self.start_attachment_listener)
+        # listener_p.start()
 
 
 
@@ -332,6 +332,8 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
         objectPath = attach.getProperties().get(attachment_path)
         docids = attach.getProperties().get(attachment_docids)
 
+        _ots_exists = attach.getProperties().get("ots_exists")
+
         if objectPath is None:
             relative_path = "%s/%s"%(_uuid.hex[:4],_uuid.hex)
         else:
@@ -408,8 +410,9 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
 
 
                     if local_exists:
-                        upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
-                    os.remove(localpath)
+                        if not _ots_exists:
+                            upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
+                        os.remove(localpath)
 
                     return True
 
@@ -422,7 +425,9 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                 # _data_base64 = base64.b64encode(open(localpath,"rb").read())
                 # _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype)
                 _success,_html,swf_images,classification = getAttachDealInterface(None,_filetype,path=localpath,session=self.session)
-                log("process filemd5:%s %s of type:%s with size:%.3fM download:%ds recognize takes %ds,ret_size:%d"%(filemd5,str(_success),_filetype,round(_size/1024/1024,4),time_download,time.time()-start_time,len(_html)))
+
+                _reg_time = time.time()-start_time
+
                 if _success:
                     if len(_html)<5:
                         _html = ""
@@ -435,6 +440,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
 
                         return False
 
+
                 # 重跑swf时,删除原来的swf_urls中的"\"
                 if attach.getProperties().get(attachment_filetype) == "swf":
                     swf_urls = attach.getProperties().get(attachment_swfUrls, "[]")
@@ -497,14 +503,17 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                 self.putAttach_json_toRedis(filemd5,attach.getProperties())
 
 
+                start_time = time.time()
                 if local_exists:
-                    upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
+                    if not _ots_exists:
+                        upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
                 try:
                     if upload_status and os.exists(localpath):
                         os.remove(localpath)
                 except Exception as e:
                     pass
-
+                _upload_time = time.time()-start_time
+                log("process filemd5:%s %s of type:%s with size:%.3fM download:%.2fs recognize takes %ds upload takes %.2fs _ots_exists %s,ret_size:%d"%(filemd5,str(_success),_filetype,round(_size/1024/1024,4),time_download,_reg_time,_upload_time,str(_ots_exists),len(_html)))
 
                 return True
             else:
@@ -624,7 +633,10 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                     if _attach_ots.fix_columns(self.ots_client,[attachment_status,attachment_path,attachment_attachmenthtml,attachment_attachmentcon,attachment_filetype,attachment_swfUrls,attachment_process_time],True):
                         if _attach_ots.getProperties().get(attachment_status) is not None:
                             log("getAttachments find in ots:%s"%(_filemd5))
-                            list_attachment.append(Attachment_postgres(_attach_ots.getProperties()))
+                            _attach_pg = Attachment_postgres(_attach_ots.getProperties())
+                            _attach_pg.setValue("ots_exists",True,True)
+                            list_attachment.append(_attach_pg)
+
                     else:
                         log("getAttachments search in path:%s"%(_filemd5))
                         if _path: