|
@@ -1355,7 +1355,7 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
|
|
_extract_json = None
|
|
_extract_json = None
|
|
if document_extract2_extract_json in item:
|
|
if document_extract2_extract_json in item:
|
|
_extract_json = item.get(document_extract2_extract_json)
|
|
_extract_json = item.get(document_extract2_extract_json)
|
|
-
|
|
|
|
|
|
+ item.pop(document_extract2_extract_json)
|
|
|
|
|
|
try:
|
|
try:
|
|
message_acknowledged = False
|
|
message_acknowledged = False
|
|
@@ -1380,16 +1380,23 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
|
|
if aitimes is None:
|
|
if aitimes is None:
|
|
aitimes = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
|
|
aitimes = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
|
|
item["aitimes"] = aitimes
|
|
item["aitimes"] = aitimes
|
|
- if not message_acknowledged and ackMsg(conn,message_id,subscription):
|
|
|
|
- message_acknowledged = True
|
|
|
|
- send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_ai)
|
|
|
|
- return
|
|
|
|
- else:
|
|
|
|
- if not timeAdd(aitimes,0,format="%Y-%m-%d %H:%M:%S",minutes=10)<getCurrent_date(format="%Y-%m-%d %H:%M:%S"):
|
|
|
|
- if not message_acknowledged and ackMsg(conn,message_id,subscription):
|
|
|
|
|
|
+
|
|
|
|
+ if not message_acknowledged:
|
|
|
|
+ item[document_extract2_extract_json] = _extract_json
|
|
|
|
+ if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_ai):
|
|
message_acknowledged = True
|
|
message_acknowledged = True
|
|
- send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_ai)
|
|
|
|
|
|
+ ackMsg(conn,message_id,subscription)
|
|
return
|
|
return
|
|
|
|
+ else:
|
|
|
|
+ if not timeAdd(aitimes,0,format="%Y-%m-%d %H:%M:%S",minutes=10)<getCurrent_date(format="%Y-%m-%d %H:%M:%S"):
|
|
|
|
+
|
|
|
|
+ if not message_acknowledged:
|
|
|
|
+
|
|
|
|
+ item[document_extract2_extract_json] = _extract_json
|
|
|
|
+ if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_ai):
|
|
|
|
+ message_acknowledged = True
|
|
|
|
+ ackMsg(conn,message_id,subscription)
|
|
|
|
+ return
|
|
|
|
|
|
|
|
|
|
dhtml = Document_html({"partitionkey":item.get("partitionkey"),
|
|
dhtml = Document_html({"partitionkey":item.get("partitionkey"),
|