Forráskód Böngészése

修复数据同步异常的问题

luojiehua 1 éve
szülő
commit
b7b8a03132
1 módosított fájl, 31 hozzáadás és 24 törlés
  1. 31 24
      BaseDataMaintenance/maintenance/dataflow_mq.py

+ 31 - 24
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -1028,6 +1028,7 @@ class Dataflow_init(Dataflow):
             self.get_count = 1000
             self.count = self.get_count
             self.begin_docid = None
+            self.mq_init = "/queue/dataflow_init"
             self.mq_attachment = "/queue/dataflow_attachment"
             self.mq_extract = "/queue/dataflow_extract"
             self.pool_mq1 = ConnectorPool(1,4,getConnect_activateMQ)
@@ -1048,32 +1049,38 @@ class Dataflow_init(Dataflow):
             return next_docid
 
         def on_message(self, headers):
-            next_docid = int(self.getNextDocid())
-            partitionkey = int(next_docid%500+1)
-            message_id = headers.headers["message-id"]
-            body = json.loads(headers.body)
-            body[document_tmp_partitionkey] = partitionkey
-            body[document_tmp_docid] = next_docid
-            if body.get(document_original_docchannel) is None:
-                body[document_original_docchannel] = body.get(document_docchannel)
-            page_attachments = body.get(document_tmp_attachment_path,"[]")
-            _uuid = body.get(document_tmp_uuid,"")
-            if page_attachments!="[]":
-                status = random.randint(1,10)
-                body[document_tmp_status] = status
-                if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_attachment):
-                    log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
-                    ackMsg(self.conn,message_id)
+            try:
+                next_docid = int(self.getNextDocid())
+                partitionkey = int(next_docid%500+1)
+                message_id = headers.headers["message-id"]
+                body = json.loads(headers.body)
+                body[document_tmp_partitionkey] = partitionkey
+                body[document_tmp_docid] = next_docid
+                if body.get(document_original_docchannel) is None:
+                    body[document_original_docchannel] = body.get(document_docchannel)
+                page_attachments = body.get(document_tmp_attachment_path,"[]")
+                _uuid = body.get(document_tmp_uuid,"")
+                if page_attachments!="[]":
+                    status = random.randint(1,10)
+                    body[document_tmp_status] = status
+                    if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_attachment):
+                        log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
+                        ackMsg(self.conn,message_id)
+                    else:
+                        log("send_msg_error on init listener")
                 else:
-                    log("send_msg_error on init listener")
-            else:
-                status = random.randint(11,50)
-                body[document_tmp_status] = status
-                if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_extract):
-                    log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
+                    status = random.randint(11,50)
+                    body[document_tmp_status] = status
+                    if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_extract):
+                        log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
+                        ackMsg(self.conn,message_id)
+                    else:
+                        log("send_msg_error on init listener")
+            except Exception as e:
+                traceback.print_exc()
+                if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_init):
+                    log("init error")
                     ackMsg(self.conn,message_id)
-                else:
-                    log("send_msg_error on init listener")
 
         def __del__(self):
             self.conn.disconnect()