Forráskód Böngészése

项目合并效率提升,重点项目合并优化

luojiehua 2 éve
szülő
commit
6e2d923906

+ 6 - 0
BaseDataMaintenance/dataSource/setttings.py

@@ -78,3 +78,9 @@ attach_postgres_port = 5432
 attach_postgres_user = "postgres"
 attach_postgres_pswd = "postgres"
 attach_postgres_db = "postgres"
+
+# REDIS_HOST="192.168.2.103"#本地测试环境
+REDIS_HOST="192.168.0.115"#线上环境
+REDIS_PORT=6379
+REDIS_PASS="daf!#@#fdasf234"
+

+ 6 - 0
BaseDataMaintenance/dataSource/source.py

@@ -7,6 +7,7 @@ import pymysql
 import pymongo
 import tablestore
 import oss2
+import redis
 
 def solrQuery(collection,args):
     if collection in solr_collections:
@@ -141,6 +142,11 @@ def getConnect_activateMQ_ali():
     return conn
 
 
+def getConnect_redis_baseline():
+    db = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT,
+                           db=6,password=REDIS_PASS)
+    return db
+
 if __name__=="__main__":
     # solrQuery("document",{"q":"*:*"})
     # getConnect_mongodb()

+ 17 - 7
BaseDataMaintenance/maintenance/dataflow.py

@@ -109,7 +109,6 @@ class Dataflow():
 
         self.attachment_rec_interface = ""
 
-        self.ots_client = getConnect_ots()
         self.ots_client_merge = getConnect_ots()
 
         if is_internal:
@@ -3462,7 +3461,7 @@ class Dataflow_dumplicate(Dataflow):
 
 
 
-    def merge_projects(self,list_projects,b_log=False,columns=[project_tenderee,project_agency,project_bidding_budget,project_win_tenderer,project_win_bid_price,project_sub_project_name,project_product,project_zhao_biao_page_time,project_zhong_biao_page_time,project_project_code,project_project_codes,project_docids]):
+    def merge_projects(self,list_projects,b_log=False,check_columns=[project_uuid,project_zhao_biao_page_time,project_zhong_biao_page_time,project_page_time,project_project_name,project_project_code,project_project_codes,project_tenderee,project_agency,project_sub_project_name,project_sub_project_code,project_bidding_budget,project_win_tenderer,project_win_bid_price,project_project_dynamics,project_product,project_time_bidclose,project_time_bidopen,project_time_bidstart,project_time_commencement,project_time_completion,project_time_earnest_money_start,project_time_earnest_money_end,project_time_get_file_end,project_time_get_file_start,project_time_publicity_end,project_time_publicity_start,project_time_registration_end,project_time_registration_start,project_time_release,project_nlp_enterprise,project_nlp_enterprise_attachment],fix_columns=[project_docids,project_area,project_province,project_city,project_district,project_info_type,project_industry,project_qcodes,project_project_addr,project_tenderee_addr,project_agency_phone,project_agency_contact,project_tenderee_phone,project_tenderee_contact,project_win_tenderer_manager,project_win_tenderer_phone,project_second_tenderer,project_second_bid_price,project_second_tenderer_manager,project_second_tenderer_phone,project_third_tenderer,project_third_bid_price,project_third_tenderer_manager,project_third_tenderer_phone,project_procurement_system,project_bidway,project_dup_data,project_docid_number,project_moneysource,project_service_time,project_dup_docid,project_info_source]):
         '''
         对项目进行合并
         :return:
@@ -3510,7 +3509,7 @@ class Dataflow_dumplicate(Dataflow):
 
             list_merge_data = []
 
-            _step = 3
+            _step = 4
             _begin = 0
             must_queries = [RangeQuery(project_page_time,page_time_less,page_time_greater,True,True),
                             ]
@@ -3532,10 +3531,15 @@ class Dataflow_dumplicate(Dataflow):
                     # _limit += _count*5
                 _query = BoolQuery(
                                    should_queries=list_should_q,
-                                   must_not_queries=must_not_q[:100])
+                                   must_not_queries=must_not_q[:100]
+                )
+                # rows,next_token,total_count,is_all_succeed = self.ots_client_merge.search("project2","project2_index_formerge",
+                #                                                                     SearchQuery(_query,limit=_limit),
+                #                                                                     columns_to_get=ColumnsToGet(column_names=[project_uuid,project_docids,project_zhao_biao_page_time,project_zhong_biao_page_time,project_page_time,project_area,project_province,project_city,project_district,project_info_type,project_industry,project_qcodes,project_project_name,project_project_code,project_project_codes,project_project_addr,project_tenderee,project_tenderee_addr,project_tenderee_phone,project_tenderee_contact,project_agency,project_agency_phone,project_agency_contact,project_sub_project_name,project_sub_project_code,project_bidding_budget,project_win_tenderer,project_win_bid_price,project_win_tenderer_manager,project_win_tenderer_phone,project_second_tenderer,project_second_bid_price,project_second_tenderer_manager,project_second_tenderer_phone,project_third_tenderer,project_third_bid_price,project_third_tenderer_manager,project_third_tenderer_phone,project_procurement_system,project_bidway,project_dup_data,project_docid_number,project_project_dynamics,project_product,project_moneysource,project_service_time,project_time_bidclose,project_time_bidopen,project_time_bidstart,project_time_commencement,project_time_completion,project_time_earnest_money_start,project_time_earnest_money_end,project_time_get_file_end,project_time_get_file_start,project_time_publicity_end,project_time_publicity_start,project_time_registration_end,project_time_registration_start,project_time_release,project_dup_docid,project_info_source,project_nlp_enterprise,project_nlp_enterprise_attachment],return_type=ColumnReturnType.SPECIFIED))
+
                 rows,next_token,total_count,is_all_succeed = self.ots_client_merge.search("project2","project2_index_formerge",
-                                                                                    SearchQuery(_query,limit=_limit),
-                                                                                    columns_to_get=ColumnsToGet(column_names=[project_uuid,project_docids,project_zhao_biao_page_time,project_zhong_biao_page_time,project_page_time,project_area,project_province,project_city,project_district,project_info_type,project_industry,project_qcodes,project_project_name,project_project_code,project_project_codes,project_project_addr,project_tenderee,project_tenderee_addr,project_tenderee_phone,project_tenderee_contact,project_agency,project_agency_phone,project_agency_contact,project_sub_project_name,project_sub_project_code,project_bidding_budget,project_win_tenderer,project_win_bid_price,project_win_tenderer_manager,project_win_tenderer_phone,project_second_tenderer,project_second_bid_price,project_second_tenderer_manager,project_second_tenderer_phone,project_third_tenderer,project_third_bid_price,project_third_tenderer_manager,project_third_tenderer_phone,project_procurement_system,project_bidway,project_dup_data,project_docid_number,project_project_dynamics,project_product,project_moneysource,project_service_time,project_time_bidclose,project_time_bidopen,project_time_bidstart,project_time_commencement,project_time_completion,project_time_earnest_money_start,project_time_earnest_money_end,project_time_get_file_end,project_time_get_file_start,project_time_publicity_end,project_time_publicity_start,project_time_registration_end,project_time_registration_start,project_time_release,project_dup_docid,project_info_source,project_nlp_enterprise,project_nlp_enterprise_attachment],return_type=ColumnReturnType.SPECIFIED))
+                                                                                          SearchQuery(_query,limit=_limit),
+                                                                                          columns_to_get=ColumnsToGet(column_names=check_columns,return_type=ColumnReturnType.SPECIFIED))
                 list_data = getRow_ots(rows)
 
                 list_merge_data.extend(list_data)
@@ -3560,6 +3564,12 @@ class Dataflow_dumplicate(Dataflow):
                 projects_check_rule_time += time.time()-_time
                 if _check:
                     _time = time.time()
+
+                    o_proj = Project(_data)
+                    o_proj.fix_columns(self.ots_client,fix_columns,True)
+                    for k in fix_columns:
+                        _data[k] = o_proj.getProperties().get(k)
+
                     update_projects_by_project(_data,[_proj])
                     projects_update_time += time.time()-_time
 
@@ -3892,7 +3902,7 @@ if __name__ == '__main__':
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
     a = time.time()
-    df_dump.test_dumplicate(284480111)
+    df_dump.test_dumplicate(278818571)
     print("takes",time.time()-a)
     # df_dump.fix_doc_which_not_in_project()
     # df_dump.delete_projects_by_document(16288036)

+ 2 - 2
BaseDataMaintenance/maintenance/major_project/unionDocument.py

@@ -308,9 +308,9 @@ class MajorUnion():
 
     def producer(self):
         bool_query = BoolQuery(must_queries=[
-            # RangeQuery(major_project_status,1,50,True,True),
+            RangeQuery(major_project_status,1,50,True,True),
             # RangeQuery(major_project_status,201,301,True,True),
-            TermQuery(major_project_id,"00048953975ad762e883f1626f6b99ec")
+            # TermQuery(major_project_id,"00048953975ad762e883f1626f6b99ec")
             ]
         )
 

+ 1 - 1
BaseDataMaintenance/maintenance/preproject/fillColumns.py

@@ -51,7 +51,7 @@ class PreprojectFill():
                     for a in docids[:30]:
                         for b in a.split(","):
                             list_docids.append(b)
-                    atta_query = BoolQuery(should_queries=[TermQuery("docids",_d) for _d in list_docids])
+                    atta_query = BoolQuery(should_queries=[TermQuery("docids",_d) for _d in list_docids[:100]])
                     atta_b_q = BoolQuery(must_queries=[TermQuery("classification","招标文件"),atta_query])
                     atta_rows,atta_next_token,atta_total_count,_ = self.ots_client.search("attachment","attachment_index",
                                                                                           SearchQuery(atta_b_q,get_total_count=True,limit=1),

+ 5 - 2
BaseDataMaintenance/maxcompute/documentMerge.py

@@ -2792,6 +2792,8 @@ def dumplicate_document_in_merge(list_projects):
 
     for _proj in list_projects:
         try:
+            docids = _proj.get(project_docids,"")
+            set_docids = set([a for a in docids.split(",") if a!=""])
             dict_channel_proj = {}
             _project_dynamics = _proj.get(project_project_dynamics,"[]")
             list_dynamics = json.loads(_project_dynamics)
@@ -2799,6 +2801,8 @@ def dumplicate_document_in_merge(list_projects):
             _time = time.time()
             for _d in list_dynamics:
                 docid = _d.get(document_docid)
+                if docid not in set_docids:
+                    continue
                 _status = _d.get(document_status,201)
                 is_multipack = _d.get("is_multipack",True)
                 extract_count = _d.get(document_tmp_extract_count,0)
@@ -2831,8 +2835,7 @@ def dumplicate_document_in_merge(list_projects):
                     else:
                         dict_channel_proj[docchannel] = _d
 
-            docids = _proj.get(project_docids,"")
-            set_docids = set([a for a in docids.split(",") if a!=""])
+
             set_docids = set_docids-set_dup_docid
             if len(set_docids)==0:
                 log("projects set_docids length is zero %s"%(docids))

+ 0 - 0
BaseDataMaintenance/model/redis/__init__.py


+ 61 - 0
BaseDataMaintenance/model/redis/enterprise.py

@@ -0,0 +1,61 @@
+
+
+import redis
+import time
+import os
+from queue import Queue
+import traceback
+
+def set_enterprise_to_redis(filepath):
+    from BaseDataMaintenance.dataSource.source import getConnect_redis_baseline
+    from BaseDataMaintenance.common.Utils import log
+    from BaseDataMaintenance.common.multiThread import MultiThreadHandler
+    from BaseDataMaintenance.dataSource.pool import ConnectorPool
+
+    def handle(item,result_queue):
+        _db = pool_db.getConnector()
+        try:
+            _db.set(item,1)
+        except Exception as e:
+            traceback.print_exc()
+        finally:
+            pool_db.putConnector(_db)
+
+    if os.path.exists(filepath):
+
+        pool_db = ConnectorPool(10,30,getConnect_redis_baseline)
+        _start_time = time.time()
+        task_queue = Queue()
+
+        with open(filepath,"r",encoding="UTF8") as f:
+            for _e in f:
+                if not _e:
+                    continue
+                _e = _e.strip()
+                if len(_e)>=4:
+                    task_queue.put(_e)
+                    if task_queue.qsize()>=100*10000:
+                        _mt = MultiThreadHandler(task_queue,handle,None,30)
+                        _mt.run()
+                        _count = 0
+            if task_queue.qsize()>=0:
+                _mt = MultiThreadHandler(task_queue,handle,None,30)
+                _mt.run()
+                _count = 0
+
+        log("enterprise to redis takes %.5f"%(time.time()-_start_time))
+    else:
+        log("enterprise filepath not exists")
+
+def remove_enterprise_key(_keys):
+    from BaseDataMaintenance.dataSource.source import getConnect_redis_baseline
+    db = getConnect_redis_baseline()
+    for _key in _keys.split(","):
+        _v = db.delete(_key)
+        print("delete key %s %s"%(_key,str(_v)))
+    del db
+
+
+
+if __name__ == '__main__':
+    set_enterprise_to_redis("F:\Workspace2016\BiddingKG\BiddingKG\dl\LEGAL_ENTERPRISE.txt")

+ 13 - 0
BaseDataMaintenance/start_main.py

@@ -7,10 +7,23 @@ import argparse
 def main(args=None):
     parser = argparse.ArgumentParser()
     parser.add_argument("--aA",dest="attachAttachment",action="store_true",help="start attachmentAttachment process")
+    parser.add_argument("--etr",dest="enterpriseToRedis",action="store_true",help="start attachmentAttachment process")
+    parser.add_argument("--filename",dest="filename",type=str,default=None,help="start attachmentAttachment process")
+    parser.add_argument("--delkey",dest="deleteEnterpriseKey",action="store_true",help="start attachmentAttachment process")
+    parser.add_argument("--keys",dest="keys",type=str,default=None,help="start attachmentAttachment process")
     args = parser.parse_args(args)
     if args.attachAttachment:
         from BaseDataMaintenance.maintenance.document.attachAttachment import start_attachAttachment
         start_attachAttachment()
+    if args.enterpriseToRedis:
+        from BaseDataMaintenance.model.redis.enterprise import set_enterprise_to_redis
+        if args.filename is not None:
+            set_enterprise_to_redis(args.filename)
+    if args.deleteEnterpriseKey:
+        from BaseDataMaintenance.model.redis.enterprise import remove_enterprise_key
+        if args.keys:
+            remove_enterprise_key(args.keys)
+
 
 if __name__ == '__main__':
     main()