Переглянути джерело

修复数据时判断队列长度低于1000才同步1000修复数据

luojiehua 1 місяць тому
батько
коміт
234200478e

+ 2 - 2
BaseDataMaintenance/maintenance/dataflow.py

@@ -4879,7 +4879,7 @@ class Dataflow_dumplicate(Dataflow):
 
         if item:
             log("start dumplicate_comsumer_handle")
-            self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=False,upgrade=False)
+            self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=False,upgrade=True)
             return
 
     def test_merge(self,list_docid_less,list_docid_greater):
@@ -5118,7 +5118,7 @@ if __name__ == '__main__':
     # test_attachment_interface()
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
-    df_dump.test_dumplicate(585953643
+    df_dump.test_dumplicate(613075691
                             )
     # df_dump.dumplicate_comsumer_handle_interface(603504420,document_table="document_0000",document_table_index="document_0000_index",project_table="project_0000",project_table_index="project_0000_index_formerge")
     # compare_dumplicate_check()

+ 18 - 0
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -2100,6 +2100,16 @@ class Dataflow_init(Dataflow):
 
     def ots2mq(self):
         try:
+
+            from BaseDataMaintenance.java.MQInfo import getQueueSize
+            attachment_size = getQueueSize("dataflow_attachment")
+            extract_size = getQueueSize("dataflow_extract")
+
+            if attachment_size>1000 or extract_size>1000:
+                log("ots2mq break because of long queue size")
+                return
+
+
             bool_query = BoolQuery(must_queries=[RangeQuery("status",1,51)])
 
             rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
@@ -2161,6 +2171,14 @@ class Dataflow_init(Dataflow):
 
     def otstmp2mq(self):
         try:
+            from BaseDataMaintenance.java.MQInfo import getQueueSize
+            attachment_size = getQueueSize("dataflow_attachment")
+            extract_size = getQueueSize("dataflow_extract")
+
+            if attachment_size>1000 or extract_size>1000:
+                log("otstmp2mq break because of long queue size")
+                return
+
             bool_query = BoolQuery(must_queries=[TermQuery("status",0)])
 
             rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",