Browse Source

要素提取增加5次失败重试,并且调整提取调度权重

luojiehua 2 years ago
parent
commit
281e925efa
1 changed files with 56 additions and 3 deletions
  1. 56 3
      BaseDataMaintenance/maintenance/dataflow_mq.py

+ 56 - 3
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -524,7 +524,7 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
         self.industy_url = "http://127.0.0.1:15000/industry_extract"
 
         self.extract_interfaces = [["http://127.0.0.1:15030/content_extract",15],
-                                   ["http://192.168.0.115:15030/content_extract",8]
+                                   ["http://192.168.0.115:15030/content_extract",9]
                                    ]
 
 
@@ -573,6 +573,33 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
         q_size = self.queue_extract.qsize()
         log("queue extract size:%d"%(q_size))
 
+    def process_extract_failed(self):
+        def _handle(_dict,result_queue):
+            frame = _dict.get("frame")
+            message_id = frame.headers["message-id"]
+            subscription = frame.headers.setdefault('subscription', None)
+            conn = _dict.get("conn")
+            body = frame.body
+            if body is not None:
+                item = json.loads(body)
+                item["extract_times"] = 10
+                if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
+                    ackMsg(conn,message_id,subscription)
+
+
+
+        from BaseDataMaintenance.java.MQInfo import getQueueSize
+        extract_failed_size = getQueueSize(self.mq_extract_failed)
+        if extract_failed_size>0:
+            failed_listener = self.ExtractListener(getConnect_activateMQ(),_handle)
+            createComsumer(failed_listener,self.mq_extract_failed)
+            while 1:
+                extract_failed_size = getQueueSize(self.mq_extract_failed)
+                if extract_failed_size==0:
+                    break
+                time.sleep(600)
+            failed_listener.conn.disconnect()
+
     def flow_extract(self,):
         self.comsumer()
 
@@ -641,7 +668,20 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
             try:
                 if all_done!=1:
                     sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
-                    if extract_times>2:
+                    if extract_times>=10:
+                        #process as succeed
+                        dtmp.setValue(document_tmp_dochtmlcon,"",False)
+                        dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
+                        dtmp.update_row(self.ots_client)
+                        dhtml.update_row(self.ots_client)
+
+                        #replace as {}
+                        _extract.setValue(document_extract2_extract_json,"{}",True)
+                        _extract.setValue(document_extract2_industry_json,"{}",True)
+                        _extract.setValue(document_extract2_status,random.randint(1,50),True)
+                        _extract.update_row(self.ots_client)
+                        _to_ack = True
+                    elif extract_times>5:
                         #transform to the extract_failed queue
                         if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed):
                             #process as succeed
@@ -687,7 +727,19 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
             traceback.print_exc()
             sentMsgToDD("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e)))
             log("process %s docid: failed message_id:%s"%(data["doc_id"],message_id))
-            if extract_times>2:
+            if extract_times>=10:
+                #process as succeed
+                dtmp.setValue(document_tmp_dochtmlcon,"",False)
+                dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
+                dtmp.update_row(self.ots_client)
+                dhtml.update_row(self.ots_client)
+                #replace as {}
+                _extract.setValue(document_extract2_extract_json,"{}",True)
+                _extract.setValue(document_extract2_industry_json,"{}",True)
+                _extract.setValue(document_extract2_status,random.randint(1,50),True)
+                _extract.update_row(self.ots_client)
+                ackMsg(conn,message_id,subscription)
+            elif extract_times>5:
                 #transform to the extract_failed queue
                 if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed):
                     #process as succeed
@@ -716,6 +768,7 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
     def start_flow_extract(self):
         schedule = BlockingScheduler()
         schedule.add_job(self.flow_extract_producer,"cron",second="*/20")
+        schedule.add_job(self.process_extract_failed,"cron",hour="20")
         schedule.start()
 
 from multiprocessing import RLock