Pārlūkot izejas kodu

修复提取失败流程

luojiehua 2 gadi atpakaļ
vecāks
revīzija
5565314555
1 mainītis faili ar 12 papildinājumiem un 6 dzēšanām
  1. 12 6
      BaseDataMaintenance/maintenance/dataflow_mq.py

+ 12 - 6
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -69,7 +69,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                 failed_attachment_size = getQueueSize("dataflow_attachment_failed")
                 if failed_attachment_size==0:
                     break
-                time.sleep(600)
+                time.sleep(10)
             for _c in list_comsumer:
                 _c.conn.disconnect()
 
@@ -604,15 +604,17 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
 
 
         from BaseDataMaintenance.java.MQInfo import getQueueSize
-        extract_failed_size = getQueueSize(self.mq_extract_failed)
-        if extract_failed_size>0:
+        extract_failed_size = getQueueSize("dataflow_extract_failed")
+        extract_size = getQueueSize("dataflow_extract")
+        log("extract_failed_size %s extract_size %s"%(str(extract_failed_size),str(extract_size)))
+        if extract_failed_size>0 and extract_size<100:
             failed_listener = self.ExtractListener(getConnect_activateMQ(),_handle)
             createComsumer(failed_listener,self.mq_extract_failed)
             while 1:
-                extract_failed_size = getQueueSize(self.mq_extract_failed)
+                extract_failed_size = getQueueSize("dataflow_extract_failed")
                 if extract_failed_size==0:
                     break
-                time.sleep(600)
+                time.sleep(10)
             failed_listener.conn.disconnect()
 
     def flow_extract(self,):
@@ -688,7 +690,9 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
             _fingerprint = getFingerprint(str(data["title"])+str(data["content"]))
 
             if all_done>0:
+                _time = time.time()
                 extract_json = self.getExtract_json_fromDB(_fingerprint)
+                log("get json from db takes %.2f"%(time.time()-_time))
                 # extract_json = None
                 _docid = int(data["doc_id"])
                 if extract_json is not None:
@@ -699,7 +703,9 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
                     if (resp.status_code >=200 and resp.status_code<=213):
                         extract_json = resp.content.decode("utf8")
                         _extract.setValue(document_extract2_extract_json,extract_json,True)
+                        _time = time.time()
                         self.putExtract_json_toDB(_fingerprint,_docid,extract_json)
+                        log("get json to db takes %.2f"%(time.time()-_time))
                     else:
                         log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
                         all_done = -2
@@ -842,7 +848,7 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
     def start_flow_extract(self):
         schedule = BlockingScheduler()
         schedule.add_job(self.flow_extract_producer,"cron",second="*/20")
-        schedule.add_job(self.process_extract_failed,"cron",hour="*/5")
+        schedule.add_job(self.process_extract_failed,"cron",minute="*/5")
         schedule.add_job(self.delete_document_extract,"cron",hour="*/5")
         schedule.start()