Преглед на файлове

产品数据流处理启动

luojiehua преди 1 година
родител
ревизия
5d9086632e
променени са 1 файла, в които са добавени 22 реда и са изтрити 11 реда
  1. 22 11
      BaseDataMaintenance/maintenance/product/products.py

+ 22 - 11
BaseDataMaintenance/maintenance/product/products.py

@@ -17,7 +17,7 @@ from multiprocessing import Process,Queue
 from random import randint
 
 from BaseDataMaintenance.maintenance.product.product_dict import Product_Dict_Manager
-
+from apscheduler.schedulers.blocking import BlockingScheduler
 
 class Product_Manager(Product_Dict_Manager):
 
@@ -26,6 +26,7 @@ class Product_Manager(Product_Dict_Manager):
         self.process_queue = Queue()
         self.ots_client = getConnect_ots()
 
+        self.set_id = set()
 
     def get_product_id(self,docid,name,brand,specs,unit_price,quantity):
         if name is None:
@@ -54,7 +55,12 @@ class Product_Manager(Product_Dict_Manager):
                                                                             columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
         list_data = getRow_ots(rows)
         _count = len(list_data)
+        list_id = []
         for _d in list_data:
+            _id = _d.get(DOCUMENT_PRODUCT_TMP_ID)
+            if _id in self.set_id:
+                continue
+            list_id.append(_id)
             self.process_queue.put(_d)
         while next_token:
             rows,next_token,total_count_is_all_succeed = self.ots_client.search("document_product_temp","document_product_temp_index",
@@ -62,10 +68,15 @@ class Product_Manager(Product_Dict_Manager):
                                                                                 columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
             list_data = getRow_ots(rows)
             for _d in list_data:
+                _id = _d.get(DOCUMENT_PRODUCT_TMP_ID)
+                if _id in self.set_id:
+                    continue
+                list_id.append(_id)
                 self.process_queue.put(_d)
             _count += len(list_data)
             if _count>=process_count:
                 break
+        self.set_id = set(list_id)
 
     def comsumer(self):
         def start_thread(thread_count):
@@ -414,18 +425,18 @@ class Product_Manager(Product_Dict_Manager):
         else:
             return False
 
+    def start_processing(self):
+        scheduler = BlockingScheduler()
+        scheduler.add_job(self.producer,"cron",minute="*/1")
+        scheduler.add_job(self.comsumer,"cron",minute="*/1")
+        scheduler.start()
+
 
+def start_process_product():
+    pm = Product_Manager()
+    pm.start_processing()
 
 
 if __name__ == '__main__':
 
-    a = str(1) + "CT设备" + "西门子" + str(5.5) + str(10)
-    print(a,getMD5(a))
-    a = str(1) + "CT设备" + "" + str(5.5) + str(10)
-    print(a,getMD5(a))
-    a = str(1) + "CT设备" + "" + "" + str(10)
-    print(a,getMD5(a))
-    a = str(1) + "CT设备" + "" + "" + ""
-    print(a,getMD5(a))
-    a = str(1) + "CT设备" + "" + str(5.5) + ""
-    print(a,getMD5(a))
+    start_process_product()