|
@@ -58,6 +58,13 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
|
|
log("get message of idx:%s"%(str(self._idx)))
|
|
log("get message of idx:%s"%(str(self._idx)))
|
|
message_id = headers.headers["message-id"]
|
|
message_id = headers.headers["message-id"]
|
|
body = headers.body
|
|
body = headers.body
|
|
|
|
+
|
|
|
|
+ if self._idx==0:
|
|
|
|
+ log("jump by random")
|
|
|
|
+ if send_msg_toacmq(self.pool_mq,body,self.mq_attachment):
|
|
|
|
+ ackMsg(self.conn,message_id)
|
|
|
|
+ return
|
|
|
|
+
|
|
_dict = {"frame":headers,"conn":self.conn}
|
|
_dict = {"frame":headers,"conn":self.conn}
|
|
self._func(_dict=_dict)
|
|
self._func(_dict=_dict)
|
|
except Exception as e:
|
|
except Exception as e:
|
|
@@ -159,11 +166,6 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
|
|
page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
|
|
page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
|
|
_dochtmlcon = item.get(document_tmp_dochtmlcon,"")
|
|
_dochtmlcon = item.get(document_tmp_dochtmlcon,"")
|
|
|
|
|
|
- if random.random()<0.1:
|
|
|
|
- log("jump by random")
|
|
|
|
- if send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment):
|
|
|
|
- return
|
|
|
|
-
|
|
|
|
|
|
|
|
if len(page_attachments)==0:
|
|
if len(page_attachments)==0:
|
|
newitem ={"item":item,"list_attach":[],"message_id":message_id,"conn":conn}
|
|
newitem ={"item":item,"list_attach":[],"message_id":message_id,"conn":conn}
|