|
@@ -1358,15 +1358,33 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
|
|
|
item.pop(document_extract2_extract_json)
|
|
|
|
|
|
dtmp = Document_tmp(item)
|
|
|
- for _ in range(3):
|
|
|
- if dtmp.fix_columns(self.ots_client,["status","save"],True):
|
|
|
- if dtmp.getProperties().get("status",0)>=71:
|
|
|
- if dtmp.getProperties().get("save",1)==0:
|
|
|
- log("extract_dump_ai of docid:%d"%(item.get(document_docid)))
|
|
|
- ackMsg(conn,message_id,subscription)
|
|
|
+
|
|
|
+ _tomq = False
|
|
|
+ if dtmp.fix_columns(self.ots_client,["status","save"],True):
|
|
|
+ if dtmp.getProperties().get("status",0)>=71:
|
|
|
+ if dtmp.getProperties().get("save",1)==0:
|
|
|
+ log("extract_dump_ai of docid:%d"%(item.get(document_docid)))
|
|
|
+ ackMsg(conn,message_id,subscription)
|
|
|
+ return
|
|
|
+ else:
|
|
|
+ _tomq = True
|
|
|
+ else:
|
|
|
+ _tomq = True
|
|
|
+
|
|
|
+
|
|
|
+ if _tomq:
|
|
|
+ aitimes = item.get("aitimes")
|
|
|
+ if aitimes is None:
|
|
|
+ aitimes = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
|
|
|
+ item["aitimes"] = aitimes
|
|
|
+ if ackMsg(conn,message_id,subscription):
|
|
|
+ send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_ai)
|
|
|
+ return
|
|
|
+ else:
|
|
|
+ if not timeAdd(aitimes,0,format="%Y-%m-%d %H:%M:%S",minutes=10)<getCurrent_date(format="%Y-%m-%d %H:%M:%S"):
|
|
|
+ if ackMsg(conn,message_id,subscription):
|
|
|
+ send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_ai)
|
|
|
return
|
|
|
- break
|
|
|
- time.sleep(20)
|
|
|
|
|
|
|
|
|
dhtml = Document_html({"partitionkey":item.get("partitionkey"),
|