luojiehua преди 2 месеца
родител
ревизия
921e284be1
променени са 1 файла, в които са добавени 40 реда и са изтрити 32 реда
  1. 40 32
      BaseDataMaintenance/maintenance/dataflow_mq.py

+ 40 - 32
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -1357,49 +1357,55 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
             _extract_json = item.get(document_extract2_extract_json)
             item.pop(document_extract2_extract_json)
 
-        dtmp = Document_tmp(item)
 
-        _tomq = False
-        if dtmp.fix_columns(self.ots_client,["status","save"],True):
-            if dtmp.getProperties().get("status",0)>=71:
-                if dtmp.getProperties().get("save",1)==0:
-                    log("extract_dump_ai of docid:%d"%(item.get(document_docid)))
-                    ackMsg(conn,message_id,subscription)
-                    return
+        try:
+            message_acknowledged = False
+            dtmp = Document_tmp(item)
+
+            _tomq = False
+            if dtmp.fix_columns(self.ots_client,["status","save"],True):
+                if dtmp.getProperties().get("status",0)>=71:
+                    if dtmp.getProperties().get("save",1)==0:
+                        message_acknowledged = True
+                        log("extract_dump_ai of docid:%d"%(item.get(document_docid)))
+                        ackMsg(conn,message_id,subscription)
+                        return
+                else:
+                    _tomq = True
             else:
                 _tomq = True
-        else:
-            _tomq = True
 
 
-        if _tomq:
-            aitimes = item.get("aitimes")
-            if aitimes is None:
-                aitimes = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
-                item["aitimes"] = aitimes
-                if ackMsg(conn,message_id,subscription):
-                    send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_ai)
-                    return
-            else:
-                if not timeAdd(aitimes,0,format="%Y-%m-%d %H:%M:%S",minutes=10)<getCurrent_date(format="%Y-%m-%d %H:%M:%S"):
-                    if ackMsg(conn,message_id,subscription):
+            if _tomq:
+                aitimes = item.get("aitimes")
+                if aitimes is None:
+                    aitimes = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
+                    item["aitimes"] = aitimes
+                    if not message_acknowledged and ackMsg(conn,message_id,subscription):
+                        message_acknowledged = True
                         send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_ai)
                         return
+                else:
+                    if not timeAdd(aitimes,0,format="%Y-%m-%d %H:%M:%S",minutes=10)<getCurrent_date(format="%Y-%m-%d %H:%M:%S"):
+                        if not message_acknowledged and ackMsg(conn,message_id,subscription):
+                            message_acknowledged = True
+                            send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_ai)
+                            return
 
 
-        dhtml = Document_html({"partitionkey":item.get("partitionkey"),
-                               "docid":item.get("docid")})
+            dhtml = Document_html({"partitionkey":item.get("partitionkey"),
+                                   "docid":item.get("docid")})
 
-        _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
+            _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
 
-        dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
+            dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
 
-        extract_times = item.get("extract_times",0)+1
-        item["extract_times"] = extract_times
+            extract_times = item.get("extract_times",0)+1
+            item["extract_times"] = extract_times
 
 
-        _extract_ai = {}
-        try:
+            _extract_ai = {}
+
             _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
 
             _text = html2text_with_tablehtml(_dochtmlcon)
@@ -1446,12 +1452,14 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
                         doc.update_row(self.ots_client)
                         log("extract_nochange_ai of docid:%d"%(item.get(document_docid)))
 
-            ackMsg(conn,message_id,subscription)
+            if not message_acknowledged:
+                message_acknowledged = True
+                ackMsg(conn,message_id,subscription)
 
         except Exception as e:
             traceback.print_exc()
-            ackMsg(conn,message_id,subscription)
-            pass
+            if not message_acknowledged:
+                ackMsg(conn,message_id,subscription)
 
 
     def merge_json(self,extract_json,extract_ai_json):