Quellcode durchsuchen

工商数据已处理的定时删除

luojiehua vor 2 Jahren
Ursprung
Commit
d1f6436c45

+ 9 - 0
BaseDataMaintenance/maintenance/3.py

@@ -0,0 +1,9 @@
+a = 3000.12
+
+b = 3000.13
+
+a = round(a)
+b = round(b)
+a = round(a,6-len(str(a)))
+b = round(b,6-len(str(b)))
+print(a,b)

+ 2 - 2
BaseDataMaintenance/maintenance/dataflow.py

@@ -2694,8 +2694,8 @@ class Dataflow_dumplicate(Dataflow):
 
     def flow_dumplicate(self,process_count=flow_process_count,status_from=flow_dumplicate_status_from):
         def producer(columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json]):
-            if self.queue_dumplicate.qsize()>flow_process_count//3:
-                return
+            # if self.queue_dumplicate.qsize()>flow_process_count//3:
+            #     return
             bool_query = BoolQuery(must_queries=[
                 RangeQuery(document_tmp_status,*status_from,True,True),
                 # TermQuery("docid",271983871)

+ 0 - 0
BaseDataMaintenance/maintenance/tyc_company/__init__.py


+ 57 - 0
BaseDataMaintenance/maintenance/tyc_company/remove_processed.py

@@ -0,0 +1,57 @@
+
+from BaseDataMaintenance.model.ots_capacity.tyc_company import Tyc_company
+from BaseDataMaintenance.dataSource.source import getConnect_ots_capacity
+from apscheduler.schedulers.blocking import BlockingScheduler
+from queue import Queue
+from BaseDataMaintenance.common.Utils import *
+from tablestore import *
+from BaseDataMaintenance.common.multiThread import MultiThreadHandler
+
+class Tyc_company_maintenance():
+
+    def __init__(self):
+        self.ots_capacity = getConnect_ots_capacity()
+        self.remove_queue = Queue()
+
+    def remove_handle(self,item,result_queue):
+        # print(item)
+        _tc = Tyc_company(item)
+        _tc.delete_row(self.ots_capacity)
+
+    def remove_processed(self):
+        query = BoolQuery(must_queries=[TermQuery("processed",1)])
+        rows,next_token,total_count,is_all_succeed = self.ots_capacity.search("tyc_company","tyc_company_index",
+                                                                              SearchQuery(query,sort=Sort(sorters=[FieldSort("processed")]),limit=100,get_total_count=True),
+                                                                              ColumnsToGet(return_type=ColumnReturnType.NONE))
+        list_data = getRow_ots(rows)
+        for _data in list_data:
+            self.remove_queue.put(_data)
+        while next_token:
+            rows,next_token,total_count,is_all_succeed = self.ots_capacity.search("tyc_company","tyc_company_index",
+                                                                                  SearchQuery(query,next_token=next_token,limit=100,get_total_count=True),
+                                                                                  ColumnsToGet(return_type=ColumnReturnType.NONE))
+            list_data = getRow_ots(rows)
+            for _data in list_data:
+                self.remove_queue.put(_data)
+            if self.remove_queue.qsize()>=10000:
+                break
+
+    def remove_comsumer(self):
+        mt = MultiThreadHandler(self.remove_queue,self.remove_handle,None,30)
+        mt.run()
+
+
+
+
+    def start_remove_processed(self):
+        _schedule = BlockingScheduler()
+        _schedule.add_job(self.remove_processed,"cron",second="*/5")
+        _schedule.add_job(self.remove_comsumer,"cron",second="*/5")
+        _schedule.start()
+
+def start_remove_processed_tyc_company():
+    tcm = Tyc_company_maintenance()
+    tcm.start_remove_processed()
+
+if __name__ == '__main__':
+    start_remove_processed_tyc_company()

+ 16 - 0
BaseDataMaintenance/model/ots_capacity/tyc_company.py

@@ -0,0 +1,16 @@
+
+
+from BaseDataMaintenance.model.ots.BaseModel import BaseModel
+
+
+
+class Tyc_company(BaseModel):
+
+    def __init__(self,_dict):
+        BaseModel.__init__(self)
+        for k,v in _dict.items():
+            self.setValue(k,v,True)
+        self.table_name = "tyc_company"
+
+    def getPrimary_keys(self):
+        return ["found_time","company_name","social_credit_code","register_number"]

+ 4 - 0
BaseDataMaintenance/start_main.py

@@ -11,6 +11,7 @@ def main(args=None):
     parser.add_argument("--filename",dest="filename",type=str,default=None,help="start attachmentAttachment process")
     parser.add_argument("--delkey",dest="deleteEnterpriseKey",action="store_true",help="start attachmentAttachment process")
     parser.add_argument("--keys",dest="keys",type=str,default=None,help="start attachmentAttachment process")
+    parser.add_argument("--rptc",dest="remove_processed_tyc_company",action="store_true",help="start attachmentAttachment process")
     args = parser.parse_args(args)
     if args.attachAttachment:
         from BaseDataMaintenance.maintenance.document.attachAttachment import start_attachAttachment
@@ -23,6 +24,9 @@ def main(args=None):
         from BaseDataMaintenance.model.redis.enterprise import remove_enterprise_key
         if args.keys or args.filename:
             remove_enterprise_key(args.filename,args.keys)
+    if args.remove_processed_tyc_company:
+        from BaseDataMaintenance.maintenance.tyc_company.remove_processed import start_remove_processed_tyc_company
+        start_remove_processed_tyc_company()
 
 
 if __name__ == '__main__':

+ 19 - 0
BaseDataMaintenance/test/test_capacity.py

@@ -0,0 +1,19 @@
+
+
+from BaseDataMaintenance.common.Utils import getRow_ots
+from BaseDataMaintenance.dataSource.source import getConnect_ots_capacity,getConnect_ots
+
+from tablestore import *
+
+
+ots_client = getConnect_ots()
+
+
+query = BoolQuery(must_queries=[TermQuery("page_time","2022-12-02")],
+                  should_queries=[TermQuery("province","广西"),
+                                  TermQuery("province","广东")])
+rows,next_token,total_count,is_all_succeed = ots_client.search("project2","project2_index",
+                                                               SearchQuery(query,get_total_count=True,limit=100),
+                                                               columns_to_get=ColumnsToGet(column_names=["page_time","province"],return_type=ColumnReturnType.SPECIFIED))
+print(total_count)
+print(getRow_ots(rows))