فهرست منبع

增加漏数据及修复数据;附件处理时如果历史已经处理过,则跳过

luojiehua 2 سال پیش
والد
کامیت
74d6d9c3fe
2فایلهای تغییر یافته به همراه35 افزوده شده و 32 حذف شده
  1. 2 4
      BaseDataMaintenance/maintenance/dataflow.py
  2. 33 28
      BaseDataMaintenance/maintenance/dataflow_mq.py

+ 2 - 4
BaseDataMaintenance/maintenance/dataflow.py

@@ -1791,8 +1791,7 @@ class Dataflow_attachment(Dataflow):
             try:
                 item = self.queue_attachment_ocr.get(True,timeout=0.2)
                 log("attachment get doc:%s"%(str(item.get("item",{}).get("docid"))))
-                if item.get("item",{}).get("docid")<290001272:
-                    continue
+
                 self.attachment_recognize(item,None)
                 log("attachment get doc:%s succeed"%(str(item.get("item",{}).get("docid"))))
             except Exception as e:
@@ -1801,8 +1800,7 @@ class Dataflow_attachment(Dataflow):
             try:
                 item = self.queue_attachment_not_ocr.get(True,timeout=0.2)
                 log("attachment get doc:%s"%(str(item.get("item",{}).get("docid"))))
-                if item.get("item",{}).get("docid")<290001272:
-                    continue
+
                 self.attachment_recognize(item,None)
                 log("attachment get doc:%s succeed"%(str(item.get("item",{}).get("docid"))))
             except Exception as e:

+ 33 - 28
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -159,44 +159,49 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
             dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
             dtmp = Document_tmp(item)
 
+
             #调用识别接口
             _succeed,list_html,swf_urls = self.rec_attachments_by_interface(list_attach,_dochtmlcon,save=True)
 
 
-
-
-            _to_ack = False
-            if not _succeed and _retry_times<self.retry_times:
-                item[document_tmp_status] = random.randint(*flow_attachment_status_failed_to)
-                item["retry_times"] = _retry_times+1
-                #失败次数大于5次就放入失败队列,此队列的数据会在空闲时间重新处理一次
-                if item["retry_times"]>=self.retry_times:
-                    send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment_failed)
-
-                send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment)
-
-                #失败保存
-                dtmp.setValue(document_tmp_dochtmlcon,"",False)
-                dtmp.setValue(document_tmp_status,0,True)
-                if not dtmp.exists_row(self.ots_client):
-                    dtmp.update_row(self.ots_client)
-                    dhtml.update_row(self.ots_client)
+            if item.get("docid")<280749583:
+                send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(dtmp.getProperties(),cls=MyEncoder),self.mq_extract)
                 if send_succeed:
                     _to_ack = True
-
             else:
-                try:
-                    log("docid:%d,retry:%d swf_urls:%s list_html:%s"%(dhtml.getProperties().get(document_docid),_retry_times,str(swf_urls),str(len(list_html))))
-                    dhtml.updateSWFImages(swf_urls)
-                    dhtml.updateAttachment(list_html)
 
-                    dtmp.setValue(document_tmp_attachment_extract_status,1,True)
-                    dtmp.setValue(document_tmp_dochtmlcon,dhtml.getProperties().get(document_tmp_dochtmlcon),True)
-                    send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(dtmp.getProperties(),cls=MyEncoder),self.mq_extract)
+                _to_ack = False
+                if not _succeed and _retry_times<self.retry_times:
+                    item[document_tmp_status] = random.randint(*flow_attachment_status_failed_to)
+                    item["retry_times"] = _retry_times+1
+                    #失败次数大于5次就放入失败队列,此队列的数据会在空闲时间重新处理一次
+                    if item["retry_times"]>=self.retry_times:
+                        send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment_failed)
+
+                    send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment)
+
+                    #失败保存
+                    dtmp.setValue(document_tmp_dochtmlcon,"",False)
+                    dtmp.setValue(document_tmp_status,0,True)
+                    if not dtmp.exists_row(self.ots_client):
+                        dtmp.update_row(self.ots_client)
+                        dhtml.update_row(self.ots_client)
                     if send_succeed:
                         _to_ack = True
-                except Exception as e:
-                    traceback.print_exc()
+
+                else:
+                    try:
+                        log("docid:%d,retry:%d swf_urls:%s list_html:%s"%(dhtml.getProperties().get(document_docid),_retry_times,str(swf_urls),str(len(list_html))))
+                        dhtml.updateSWFImages(swf_urls)
+                        dhtml.updateAttachment(list_html)
+
+                        dtmp.setValue(document_tmp_attachment_extract_status,1,True)
+                        dtmp.setValue(document_tmp_dochtmlcon,dhtml.getProperties().get(document_tmp_dochtmlcon),True)
+                        send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(dtmp.getProperties(),cls=MyEncoder),self.mq_extract)
+                        if send_succeed:
+                            _to_ack = True
+                    except Exception as e:
+                        traceback.print_exc()
 
             if _to_ack:
                 ackMsg(conn,message_id)