ソースを参照

优化项目合并比率至50%/70%,前值是40%/70%;优化重点项目,动态保留规则及补充一些字段

luojiehua 2 年 前
コミット
ae78a625bd

+ 17 - 7
BaseDataMaintenance/common/Utils.py

@@ -7,7 +7,7 @@ Created on 2018年12月20日
 import numpy as np
 import re
 import gensim
-from keras import backend as K
+
 import os
 
 from threading import RLock
@@ -83,6 +83,13 @@ def sendEmail(host,username,password,receivers,subject="数据导出",content=""
     finally:
         server.close()
 
+mobile_pattern = re.compile("^1\d{10}$")
+def recog_likeType(phone):
+    if re.search(mobile_pattern,phone) is not None:
+        return "mobile"
+    else:
+        return "phone"
+
 def article_limit(soup,limit_words=30000):
     sub_space = re.compile("\s+")
     def soup_limit(_soup,_count,max_count=30000,max_gap=500):
@@ -239,12 +246,6 @@ def getLegal_str(_str):
     if _str is not None:
         return ILLEGAL_CHARACTERS_RE.sub("",str(_str))
 
-def timeAdd(_time,days,format="%Y-%m-%d",minutes=0):
-    a = time.mktime(time.strptime(_time,format))+86400*days+60*minutes
-
-    _time1 = time.strftime(format,time.localtime(a))
-    return _time1
-
 def getRow_ots_primary(row):
     _dict = dict()
     if row is None:
@@ -255,6 +256,12 @@ def getRow_ots_primary(row):
         _dict[part[0]] = part[1]
     return _dict
 
+def timeAdd(_time,days,format="%Y-%m-%d",minutes=0):
+    a = time.mktime(time.strptime(_time,format))+86400*days+60*minutes
+
+    _time1 = time.strftime(format,time.localtime(a))
+    return _time1
+
 def getRow_ots(rows):
     list_dict = []
     for row in rows:
@@ -970,6 +977,7 @@ def partMoney(entity_text,input2_shape = [7]):
     return parts
 
 def recall(y_true, y_pred):
+    from keras import backend as K
     '''
     计算召回率
     @Argus:
@@ -988,6 +996,7 @@ def recall(y_true, y_pred):
 
 
 def f1_score(y_true, y_pred):
+    from keras import backend as K
     '''
     计算F1
 
@@ -1012,6 +1021,7 @@ def f1_score(y_true, y_pred):
 
 
 def precision(y_true, y_pred):
+    from keras import backend as K
     '''
     计算精确率
 

+ 210 - 45
BaseDataMaintenance/maintenance/dataflow.py

@@ -10,6 +10,7 @@ from BaseDataMaintenance.model.ots.document_html import *
 from BaseDataMaintenance.model.ots.document_extract2 import *
 from BaseDataMaintenance.model.ots.project import *
 from BaseDataMaintenance.model.ots.document import *
+from BaseDataMaintenance.model.ots.project_process import *
 
 import base64
 from BaseDataMaintenance.dataSource.interface import getAttachDealInterface,sentMsgToDD
@@ -1245,8 +1246,19 @@ class Dataflow():
         return []
 
     def get_best_docid(self,base_list):
+        to_reverse = False
+        dict_source_count = {}
+        for _item in base_list:
+            _web_source = _item.get(document_tmp_web_source_no)
+            _fingerprint = _item.get(document_tmp_fingerprint)
+            if _web_source is not None:
+                if _web_source not in dict_source_count:
+                    dict_source_count[_web_source] = set()
+                dict_source_count[_web_source].add(_fingerprint)
+                if len(dict_source_count[_web_source])>=2:
+                    to_reverse=True
         if len(base_list)>0:
-            base_list.sort(key=lambda x:x["docid"])
+            base_list.sort(key=lambda x:x["docid"],reverse=to_reverse)
             base_list.sort(key=lambda x:x["extract_count"],reverse=True)
             return base_list[0]["docid"]
 
@@ -2081,10 +2093,10 @@ class Dataflow_dumplicate(Dataflow):
             self.conn = conn
             self._func = _func
 
-        def on_error(self, headers):
+        def on_error(self, headers,*args,**kwargs):
             log('received an error %s' % str(headers.body))
 
-        def on_message(self, headers):
+        def on_message(self, headers,*args,**kwargs):
             try:
                 message_id = headers.headers["message-id"]
                 body = headers.body
@@ -2103,6 +2115,7 @@ class Dataflow_dumplicate(Dataflow):
         self.c_f_get_package = f_get_package()
         logging.basicConfig(level = logging.info,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
 
+        self.fix_doc_docid = None
 
         if start_delete_listener:
             self.delete_comsumer_counts = 2
@@ -2153,6 +2166,7 @@ class Dataflow_dumplicate(Dataflow):
 
         _index = 0
         base_fingerprint = "None"
+
         if len(base_list)>0:
             base_fingerprint = base_list[0]["fingerprint"]
         for _i in range(1,len(base_list)):
@@ -2176,6 +2190,17 @@ class Dataflow_dumplicate(Dataflow):
                 break
 
         if _index>=1:
+            # #对重复入库的进行去重
+            # _l = the_group[:_index+1]
+            # set_fingerprint = set()
+            # final_l = []
+            # for _dict in _l:
+            #     fingerprint_less = _dict["fingerprint"]
+            #     if fingerprint_less in set_fingerprint:
+            #         continue
+            #     else:
+            #         final_l.append(_dict)
+            #         set_fingerprint.add(fingerprint_less)
             return the_group[:_index+1]
         return []
 
@@ -2701,6 +2726,7 @@ class Dataflow_dumplicate(Dataflow):
         :return:
         '''
         list_docs = []
+        set_fingerprint = set()
         for _docid in list_docids:
             docid = int(_docid)
             _dict = {document_partitionkey:getPartitionKey(docid),
@@ -2711,6 +2737,10 @@ class Dataflow_dumplicate(Dataflow):
                 _doc = Document(_dict)
                 _exists = _doc.fix_columns(self.ots_client,columns_to_get,True)
             if _exists:
+                _fingerprint = _doc.getProperties().get(document_fingerprint)
+                if _fingerprint in set_fingerprint:
+                    continue
+                set_fingerprint.add(_fingerprint)
                 list_docs.append(_doc)
         for _doc in list_docs:
             try:
@@ -2783,7 +2813,6 @@ class Dataflow_dumplicate(Dataflow):
         docs = [_doc.getProperties() for _doc in list_docs]
 
         project_dict = generate_common_properties(docs)
-        print("list_docs",project_dict)
 
         list_package_properties = generate_packages_properties(docs)
 
@@ -2821,44 +2850,42 @@ class Dataflow_dumplicate(Dataflow):
                 set_nlp_enterprise_attachment |= set(json.loads(_proj.get(project_nlp_enterprise_attachment,"[]")))
             except Exception as e:
                 pass
-        set_docid = set_docid | set(project_dict.get(project_docids,"").split(","))
-        set_code = set_code | set(project_dict.get(project_project_codes,"").split(","))
-        set_product = set_product | set(project_dict.get(project_product,"").split(","))
+            set_docid = set_docid | set(project_dict.get(project_docids,"").split(","))
+            set_code = set_code | set(project_dict.get(project_project_codes,"").split(","))
+            set_product = set_product | set(project_dict.get(project_product,"").split(","))
 
-        try:
-            set_nlp_enterprise |= set(json.loads(project_dict.get(project_nlp_enterprise,"[]")))
-            set_nlp_enterprise_attachment |= set(json.loads(project_dict.get(project_nlp_enterprise_attachment,"[]")))
-        except Exception as e:
-            pass
+            try:
+                set_nlp_enterprise |= set(json.loads(project_dict.get(project_nlp_enterprise,"[]")))
+                set_nlp_enterprise_attachment |= set(json.loads(project_dict.get(project_nlp_enterprise_attachment,"[]")))
+            except Exception as e:
+                pass
 
 
-        append_dict[project_docids] = ",".join([a for a in list(set_docid) if a!=""])
-        append_dict[project_docid_number] = len(set_docid)
-        append_dict[project_project_codes] = ",".join([a for a in list(set_code) if a!=""])
-        append_dict[project_product] = ",".join([a for a in list(set_product) if a!=""])
+            append_dict[project_docids] = ",".join([a for a in list(set_docid) if a!=""])
+            append_dict[project_docid_number] = len(set_docid)
+            append_dict[project_project_codes] = ",".join([a for a in list(set_code) if a!=""])
+            append_dict[project_product] = ",".join([a for a in list(set_product) if a!=""])
 
-        append_dict[project_nlp_enterprise] = json.dumps(list(set_nlp_enterprise)[:100],ensure_ascii=False)
-        append_dict[project_nlp_enterprise_attachment] = json.dumps(list(set_nlp_enterprise_attachment)[:100],ensure_ascii=False)
+            append_dict[project_nlp_enterprise] = json.dumps(list(set_nlp_enterprise)[:100],ensure_ascii=False)
+            append_dict[project_nlp_enterprise_attachment] = json.dumps(list(set_nlp_enterprise_attachment)[:100],ensure_ascii=False)
 
-        dict_dynamic = {}
-        set_docid = set()
-        for _proj in projects:
+
+            dict_dynamic = {}
+            set_docid = set()
             _dynamic = json.loads(_proj.get(project_project_dynamics,"[]"))
             for _dy in _dynamic:
                 _docid = _dy.get("docid")
                 dict_dynamic[_docid] = _dy
-        _dynamic = json.loads(project_dict.get(project_project_dynamics,"[]"))
-        for _dy in _dynamic:
-            _docid = _dy.get("docid")
-            dict_dynamic[_docid] = _dy
-        list_dynamics = []
-        for k,v in dict_dynamic.items():
-            list_dynamics.append(v)
-        list_dynamics.sort(key=lambda x:x.get(document_page_time,""))
-
-        append_dict[project_project_dynamics] = json.dumps(list_dynamics[:100],ensure_ascii=False)
+            _dynamic = json.loads(project_dict.get(project_project_dynamics,"[]"))
+            for _dy in _dynamic:
+                _docid = _dy.get("docid")
+                dict_dynamic[_docid] = _dy
+            list_dynamics = []
+            for k,v in dict_dynamic.items():
+                list_dynamics.append(v)
+            list_dynamics.sort(key=lambda x:x.get(document_page_time,""))
 
-        for _proj in projects:
+            append_dict[project_project_dynamics] = json.dumps(list_dynamics[:100],ensure_ascii=False)
             _proj.update(append_dict)
 
 
@@ -3011,8 +3038,17 @@ class Dataflow_dumplicate(Dataflow):
             if docid is None:
                 return
             delete_result = self.delete_projects_by_document(docid)
-            if send_msg_toacmq(self.pool_mq_ali,delete_result,self.doc_delete_result):
+
+            _uuid = uuid4().hex
+            _d = {PROJECT_PROCESS_UUID:_uuid,
+                  PROJECT_PROCESS_CRTIME:1,
+                  PROJECT_PROCESS_PROJECTS:delete_result}
+            _pp = Project_process(_d)
+            if _pp.update_row(self.ots_client):
                 ackMsg(conn,message_id)
+            #取消插入结果队列,改成插入project_process表
+            # if send_msg_toacmq(self.pool_mq_ali,delete_result,self.doc_delete_result):
+            #     ackMsg(conn,message_id)
 
     def generate_common_properties(self,list_docs):
         '''
@@ -3283,7 +3319,7 @@ class Dataflow_dumplicate(Dataflow):
 
 
 
-    def getMerge_rules(self,page_time,project_codes,project_name,tenderee,agency,product,sub_project_name,bidding_budget,win_tenderer,win_bid_price):
+    def getMerge_rules(self,page_time,project_codes,project_name,tenderee,agency,product,sub_project_name,bidding_budget,win_tenderer,win_bid_price,province,city,district):
 
         whole_time_start = time.time()
         _time = time.time()
@@ -3291,10 +3327,25 @@ class Dataflow_dumplicate(Dataflow):
 
         list_code = [a for a in project_codes.split(",") if a!='']
         should_q_code = BoolQuery(should_queries=[MatchQuery(project_project_codes,a) for a in list_code[:20]])
+
+        # print("should_q_code",[a for a in list_code[:20]])
         should_q_cod = BoolQuery(should_queries=[MatchQuery(project_project_code,a) for a in list_code[:20]])
         list_product = [a for a in product.split(",") if a!='']
         should_q_product = BoolQuery(should_queries=[MatchQuery(project_product,a) for a in list_product[:20]])
 
+        should_q_area = None
+        if province!="" or city!="" or district!="":
+            should_q = []
+            if province not in ("","全国","未知") and province is not None:
+                should_q.append(TermQuery(project_province,province))
+            if city not in ("","全国","未知") and city is not None:
+                should_q.append(TermQuery(project_city,city))
+            if district not in ("","全国","未知") and district is not None:
+                should_q.append(TermQuery(project_district,district))
+            if len(should_q)>0:
+                should_q_area = BoolQuery(should_queries=should_q)
+
+
         prepare_time = time.time()-_time
 
         _time = time.time()
@@ -3330,16 +3381,37 @@ class Dataflow_dumplicate(Dataflow):
                                              TermQuery(project_project_name,project_name)]
             list_query.append([_query,2])
 
+        if tenderee!="" and agency!="":
+            _query = [TermQuery(project_tenderee,tenderee),
+                      TermQuery(project_agency,agency)]
+            list_query.append([_query,1])
+
         if tenderee!="" and bidding_budget>0:
             _query = [TermQuery(project_tenderee,tenderee),
                                              TermQuery(project_bidding_budget,bidding_budget)]
             list_query.append([_query,2])
 
+        if bidding_budget>0 and win_bid_price>0:
+            _query = [TermQuery(project_bidding_budget,bidding_budget),
+                      TermQuery(project_win_bid_price,win_bid_price)]
+            list_query.append([_query,2])
+
+
         if tenderee!="" and win_tenderer!="":
             _query = [TermQuery(project_tenderee,tenderee),
                       TermQuery(project_win_tenderer,win_tenderer)]
             list_query.append([_query,2])
 
+        if agency!="" and win_tenderer!="":
+            _query = [TermQuery(project_agency,agency),
+                      TermQuery(project_win_tenderer,win_tenderer)]
+            list_query.append([_query,2])
+
+        if agency!="" and len(list_product)>0:
+            _query = [TermQuery(project_agency,agency),
+                      should_q_product]
+            list_query.append([_query,2])
+
         if win_tenderer!="" and len(list_code)>0:
             _query = [TermQuery(project_win_tenderer,win_tenderer),
                                              should_q_code]
@@ -3354,6 +3426,16 @@ class Dataflow_dumplicate(Dataflow):
                                              TermQuery(project_win_bid_price,win_bid_price)]
             list_query.append([_query,2])
 
+        if win_tenderer!="" and bidding_budget>0:
+            _query = [TermQuery(project_win_tenderer,win_tenderer),
+                      TermQuery(project_bidding_budget,bidding_budget)]
+            list_query.append([_query,2])
+
+        if len(list_code)>0 and len(list_product)>0:
+            _query = [should_q_code,
+                      should_q_product]
+            list_query.append([_query,2])
+
         if len(list_code)>0:
             _query = [
                       should_q_code]
@@ -3363,10 +3445,15 @@ class Dataflow_dumplicate(Dataflow):
                 should_q_cod]
             list_query.append([_query,1])
 
-        if project_name!="":
+        if project_name!="" and project_name is not None:
             _query = [
                       TermQuery(project_project_name,project_name)]
             list_query.append([_query,1])
+        if len(list_product)>0 and should_q_area is not None:
+            _query = [should_q_area,
+                      should_q_product]
+            list_query.append([_query,1])
+
         generate_time = time.time()-_time
         whole_time = time.time()-whole_time_start
         log("projects merge rules whole_time:%.3f prepare_time:%.3f log_time:%.3f generate_time:%.3f"%(whole_time,prepare_time,log_time,generate_time))
@@ -3391,6 +3478,7 @@ class Dataflow_dumplicate(Dataflow):
         for _uuid in list(set_uuid):
             must_not_q.append(TermQuery("uuid",_uuid))
 
+
         projects_merge_count = 0
         projects_check_rule_time = 0
         projects_update_time = 0
@@ -3409,21 +3497,28 @@ class Dataflow_dumplicate(Dataflow):
             win_tenderer = _proj.get(project_win_tenderer,"")
             win_bid_price = _proj.get(project_win_bid_price,-1)
 
+            province = _proj.get(project_province,"")
+            city = _proj.get(project_city,"")
+            district = _proj.get(project_district,"")
+
             page_time_less = timeAdd(page_time,-150)
             page_time_greater = timeAdd(page_time,120)
             sub_project_q = TermQuery(project_sub_project_name,sub_project_name) if sub_project_name.replace("Project","")!="" else None
             _time = time.time()
-            list_must_query = self.getMerge_rules(page_time,project_codes,project_name,tenderee,agency,product,sub_project_name,bidding_budget,win_tenderer,win_bid_price)
+            list_must_query = self.getMerge_rules(page_time,project_codes,project_name,tenderee,agency,product,sub_project_name,bidding_budget,win_tenderer,win_bid_price,province,city,district)
 
 
             list_merge_data = []
 
-            _step = 5
+            _step = 3
             _begin = 0
             must_queries = [RangeQuery(project_page_time,page_time_less,page_time_greater,True,True),
                             ]
-            if sub_project_q is not None:
-                must_queries.append(sub_project_q)
+
+            #sub_project_name非必要条件
+            # if sub_project_q is not None:
+            #     must_queries.append(sub_project_q)
+
             projects_prepare_time += time.time()-_time
             _time = time.time()
             while _begin<len(list_must_query):
@@ -3440,7 +3535,7 @@ class Dataflow_dumplicate(Dataflow):
                                    must_not_queries=must_not_q[:100])
                 rows,next_token,total_count,is_all_succeed = self.ots_client_merge.search("project2","project2_index_formerge",
                                                                                     SearchQuery(_query,limit=_limit),
-                                                                                    columns_to_get=ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
+                                                                                    columns_to_get=ColumnsToGet(column_names=[project_uuid,project_docids,project_zhao_biao_page_time,project_zhong_biao_page_time,project_page_time,project_area,project_province,project_city,project_district,project_info_type,project_industry,project_qcodes,project_project_name,project_project_code,project_project_codes,project_project_addr,project_tenderee,project_tenderee_addr,project_tenderee_phone,project_tenderee_contact,project_agency,project_agency_phone,project_agency_contact,project_sub_project_name,project_sub_project_code,project_bidding_budget,project_win_tenderer,project_win_bid_price,project_win_tenderer_manager,project_win_tenderer_phone,project_second_tenderer,project_second_bid_price,project_second_tenderer_manager,project_second_tenderer_phone,project_third_tenderer,project_third_bid_price,project_third_tenderer_manager,project_third_tenderer_phone,project_procurement_system,project_bidway,project_dup_data,project_docid_number,project_project_dynamics,project_product,project_moneysource,project_service_time,project_time_bidclose,project_time_bidopen,project_time_bidstart,project_time_commencement,project_time_completion,project_time_earnest_money_start,project_time_earnest_money_end,project_time_get_file_end,project_time_get_file_start,project_time_publicity_end,project_time_publicity_start,project_time_registration_end,project_time_registration_start,project_time_release,project_dup_docid,project_info_source,project_nlp_enterprise,project_nlp_enterprise_attachment],return_type=ColumnReturnType.SPECIFIED))
                 list_data = getRow_ots(rows)
 
                 list_merge_data.extend(list_data)
@@ -3453,10 +3548,15 @@ class Dataflow_dumplicate(Dataflow):
             projects_query_time += time.time()-_time
             #优先匹配招标金额相近的
             projects_merge_count = len(list_merge_data)
+            list_merge_data.sort(key=lambda x:x.get(project_page_time,""))
             list_merge_data.sort(key=lambda x:x.get(project_bidding_budget,-1))
+            # log(page_time_less+"=="+page_time_greater)
+            # log("list_merge_data:%s"%(str(list_merge_data)))
             for _data in list_merge_data:
                 _time = time.time()
                 _check = check_merge_rule(_proj,_data,b_log=b_log)
+                if b_log:
+                    log(str(_check))
                 projects_check_rule_time += time.time()-_time
                 if _check:
                     _time = time.time()
@@ -3474,6 +3574,7 @@ class Dataflow_dumplicate(Dataflow):
 
 
 
+
     def merge_document_real(self,item,dup_docid,table_name,status_to=None,b_log=False):
         '''
         实时项目合并
@@ -3551,7 +3652,7 @@ class Dataflow_dumplicate(Dataflow):
                 singleNum_keys = _rule["singleNum_keys"]
                 contain_keys = _rule["contain_keys"]
                 multiNum_keys = _rule["multiNum_keys"]
-                self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json])
+                self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint])
                 _i += step
 
 
@@ -3616,6 +3717,7 @@ class Dataflow_dumplicate(Dataflow):
                 # print(dtmp.getProperties())
                 dmp_docid = ",".join([str(a) for a in list(dup_docid)])
                 dtmp.setValue(document_tmp_dup_docid,dmp_docid,True)
+                dtmp.setValue(document_tmp_best_docid,best_docid,True)
                 dtmp.update_row(self.ots_client)
 
             # log("dump takes %.2f"%(time.time()-start_time))
@@ -3623,9 +3725,71 @@ class Dataflow_dumplicate(Dataflow):
             traceback.print_exc()
             log("error on dumplicate of %s"%(str(item.get(document_tmp_docid))))
 
+
+    def fix_doc_which_not_in_project(self):
+        '''
+        将成品公告中不存在于project2的数据取出,并放入document_tmp中重新进行去重和合并
+        :return:
+        '''
+        def fix_doc_handle(item,result_queue):
+            _docid = item.get(document_tmp_docid)
+            b_q = BoolQuery(must_queries=[TermQuery(project_docids,str(_docid))])
+
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search("project2","project2_index",
+                                                                                SearchQuery(b_q,get_total_count=True),
+                                                                                ColumnsToGet(return_type=ColumnReturnType.NONE))
+            if total_count==0:
+                log("fix_doc:%s not in project2"%(str(_docid)))
+                d_tmp = Document_tmp(item)
+                d_tmp.setValue(document_tmp_status,flow_dumplicate_status_from[0],True)
+                d_tmp.update_row(self.ots_client)
+
+
+
+        if self.fix_doc_docid is None:
+            current_date = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
+            before_date = timeAdd(current_date,0,format="%Y-%m-%d %H:%M:%S",minutes=-5)
+            bool_query = BoolQuery(must_queries=[
+                TermQuery(document_tmp_save,1),
+                RangeQuery(document_tmp_status,flow_dumplicate_status_to[0]),
+                RangeQuery(document_tmp_opertime,before_date)
+            ])
+        else:
+            bool_query = BoolQuery(must_queries=[
+                TermQuery(document_tmp_save,1),
+                RangeQuery(document_tmp_status,flow_dumplicate_status_to[0]),
+                RangeQuery(document_tmp_docid,self.fix_doc_docid)
+            ])
+
+        list_data = []
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
+                                                                            SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.ASC)]),get_total_count=True,limit=100),
+                                                                            ColumnsToGet(return_type=ColumnReturnType.NONE))
+        list_d = getRow_ots(rows)
+        list_data.extend(list_d)
+        while next_token:
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
+                                                                                SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
+                                                                                ColumnsToGet(return_type=ColumnReturnType.NONE))
+            list_d = getRow_ots(rows)
+            list_data.extend(list_d)
+            print("%d/%d"%(len(list_data),total_count))
+        if len(list_data)>0:
+            self.fix_doc_docid = list_data[-1].get(document_tmp_docid)
+            log("current fix_doc_docid:%s"%(str(self.fix_doc_docid)))
+            task_queue = Queue()
+            for _data in list_data:
+                task_queue.put(_data)
+
+        mt = MultiThreadHandler(task_queue,fix_doc_handle,None,30)
+        mt.run()
+
+
+
     def start_flow_dumplicate(self):
         schedule = BlockingScheduler()
         schedule.add_job(self.flow_dumplicate,"cron",second="*/5")
+        schedule.add_job(self.fix_doc_which_not_in_project,"cron",minute="55")
         schedule.start()
 
     def changeSaveStatus(self,list_dict):
@@ -3641,7 +3805,7 @@ class Dataflow_dumplicate(Dataflow):
 
 
     def test_dumplicate(self,docid):
-        columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json]
+        columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint]
         bool_query = BoolQuery(must_queries=[
             TermQuery("docid",docid)
         ])
@@ -3728,12 +3892,13 @@ if __name__ == '__main__':
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
     a = time.time()
-    df_dump.test_dumplicate(275459183)
+    df_dump.test_dumplicate(183573001)
     print("takes",time.time()-a)
+    # df_dump.fix_doc_which_not_in_project()
     # df_dump.delete_projects_by_document(16288036)
     # log("=======")
     # for i in range(3):
     #     time.sleep(20)
     #
-    # a = {"docid":16288036}
-    # send_msg_toacmq(df_dump.pool_mq_ali,json.dumps(a),df_dump.doc_delete_queue)
+    # a = {"docid":74295123}
+    # send_msg_toacmq(df_dump.pool_mq_ali,json.dumps(a),df_dump.doc_delete_queue)

+ 253 - 0
BaseDataMaintenance/maintenance/document/attachAttachment.py

@@ -0,0 +1,253 @@
+
+from BaseDataMaintenance.common.multiProcess import MultiHandler,MultiProcessHandler
+from BaseDataMaintenance.common.multiThread import MultiThreadHandler
+from BaseDataMaintenance.dataSource.pool import ConnectorPool
+from BaseDataMaintenance.dataSource.source import *
+from BaseDataMaintenance.common.Utils import *
+import queue
+from tablestore import *
+from multiprocessing import RLock
+from multiprocessing import Queue
+from threading import Thread
+from apscheduler.schedulers.blocking import BlockingScheduler
+
+from BaseDataMaintenance.model.mysql.attach_document_richtext import attach_document_richtext
+from BaseDataMaintenance.model.mysql.BaseModel import BaseModel
+from BaseDataMaintenance.model.ots.document import *
+import traceback
+from BaseDataMaintenance.dataSource.download import download
+import base64
+from BaseDataMaintenance.dataSource.interface import getAttachDealInterface,sentMsgToDD
+from BaseDataMaintenance.model.ots.attachment import *
+from BaseDataMaintenance.common.ossUtils import *
+from uuid import uuid4
+from bs4 import BeautifulSoup
+import random
+import platform
+from BaseDataMaintenance.common.SWFUtils import swf2images
+from multiprocessing import Process
+
+STATUS_TODEAL = 10
+STATUS_DEALING = 20
+STATUS_DONE = 30
+STATUS_FAILED = 40
+MAX_DEAL_COUNT = 5
+
+class AttachProcess():
+
+    def __init__(self):
+
+        self.set_docid = set()
+        self.docid_lock = RLock()
+        self.q_size = 5000
+        self.task_queue = Queue()
+        self.document_table = "document"
+        self.document_table_index = "document_index"
+        self.ots_client = getConnect_ots()
+        self.ots_capacity = getConnect_ots_capacity()
+        self.attachment_table = "attachment"
+        self.attachment_table_index = "attachment_index"
+        self.attachment_bucket_name = "attachment-hub"
+        self.auth = getAuth()
+        if is_internal:
+            self.bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
+        else:
+            self.bucket_url = "http://oss-cn-hangzhou.aliyuncs.com"
+        self.attachment_hub_url = "https://attachment-hub.oss-cn-hangzhou.aliyuncs.com/"
+        log("bucket_url:%s"%(self.bucket_url))
+        self.bucket = oss2.Bucket(self.auth,self.bucket_url,self.attachment_bucket_name)
+        self.current_path = os.path.dirname(__file__)
+        self.current_docid = 0
+        self.doc_file = os.path.join(self.current_path,"attach_docid.txt")
+        if os.path.exists(self.doc_file):
+            with open(self.doc_file,"r",encoding="utf8") as f:
+                line = f.readline()
+                if line:
+                    self.current_docid = int(line.strip())
+
+
+
+    def producer(self):
+        #直接从ots_document表查询待处理数据
+        #查询有附件内容并且还没有处理的
+
+        try:
+
+            bool_q = BoolQuery(must_queries=[RangeQuery(document_docid,self.current_docid,270747031),
+                                             NestedQuery(document_attachment_path,ExistsQuery("%s.%s"%(document_attachment_path,document_attachment_path_filemd5))),
+                                             # TermQuery(document_docid,1100)
+                                             ])
+
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search(self.document_table,self.document_table_index,
+                                                                                SearchQuery(bool_q,sort=Sort(sorters=[FieldSort("docid",SortOrder.ASC)]),get_total_count=False,limit=100),
+                                                                                ColumnsToGet([document_attachment_path],ColumnReturnType.SPECIFIED))
+            list_data = []
+            list_d = getRow_ots(rows)
+            list_data.extend(list_d)
+
+            while next_token:
+                rows,next_token,total_count,is_all_succeed = self.ots_client.search(self.document_table,self.document_table_index,
+                                                                                    SearchQuery(bool_q,next_token=next_token,get_total_count=False,limit=100),
+                                                                                    ColumnsToGet([document_attachment_path],ColumnReturnType.SPECIFIED))
+                list_d = getRow_ots(rows)
+                list_data.extend(list_d)
+                if len(list_data)>=self.q_size:
+                    break
+            for _d in list_data:
+                self.task_queue.put(_d)
+            if len(list_data)>0:
+                self.current_docid = list_data[-1].get(document_docid)
+        except Exception as e:
+            log("attachProcess producer error %s"%(str(e)))
+
+
+
+    def getAttachments(self,list_filemd5,columns_to_get=[attachment_attachmenthtml]):
+        list_attachment = []
+        rows_to_get = []
+        for _md5 in list_filemd5[:50]:
+            primary_key = [(attachment_filemd5,_md5)]
+            rows_to_get.append(primary_key)
+        req = BatchGetRowRequest()
+        req.add(TableInBatchGetRowItem(self.attachment_table,rows_to_get,columns_to_get,None,1))
+        try:
+            result = self.ots_client.batch_get_row(req)
+            attach_result = result.get_result_by_table(self.attachment_table)
+            for item in attach_result:
+                if item.is_ok:
+                    _dict = getRow_ots_primary(item.row)
+                    if _dict is not None:
+                        list_attachment.append(attachment(_dict))
+
+        except Exception as e:
+            log(str(list_filemd5))
+            log("attachProcess comsumer error %s"%str(e))
+        return list_attachment
+
+    def getTitleFromHtml(self,filemd5,_html):
+        _soup = BeautifulSoup(_html,"lxml")
+
+        _find = _soup.find("a",attrs={"data":filemd5})
+        _title = ""
+        if _find is not None:
+            _title = _find.get_text()
+        return _title
+
+    def getSourceLinkFromHtml(self,filemd5,_html):
+        _soup = BeautifulSoup(_html,"lxml")
+
+        _find = _soup.find("a",attrs={"filelink":filemd5})
+        filelink = ""
+        if _find is None:
+            _find = _soup.find("img",attrs={"filelink":filemd5})
+            if _find is not None:
+                filelink = _find.attrs.get("src","")
+        else:
+            filelink = _find.attrs.get("href","")
+        return filelink
+
+    def transformSWF(self,bucket,attachment_hub_url,objectPath,localpath,swf_dir):
+        swf_urls = []
+        try:
+            swf2images(localpath,swf_dir)
+            list_files = os.listdir(swf_dir)
+            list_files.sort(key=lambda x:x)
+            headers = dict()
+            headers["x-oss-object-acl"] = oss2.OBJECT_ACL_PUBLIC_READ
+            for _file in list_files:
+                swf_localpath = "%s/%s"%(swf_dir,_file)
+                swf_objectPath = "%s/%s"%(objectPath.split(".")[0],_file)
+                uploadFileByPath(bucket,swf_localpath,swf_objectPath,headers)
+                _url = "%s/%s"%(attachment_hub_url,swf_objectPath)
+                swf_urls.append(_url)
+                os.remove(swf_localpath)
+        except Exception as e:
+            traceback.print_exc()
+        return swf_urls
+
+    def comsumer_handle(self,_item,result_queue):
+
+
+        _dict = _item
+        if type(_dict)==dict:
+            _document = Document(_dict)
+        else:
+            _document = _dict
+        log("process docid:%s"%(str(_document.getProperties().get(document_docid))))
+        try:
+            attachPaths = json.loads(_document.getProperties().get(document_attachment_path,'[]'))
+            list_filemd5 = []
+            dict_attach = {}
+            for _d in attachPaths:
+                _md5 = _d.get(document_attachment_path_filemd5)
+                if _md5 is not None:
+                    list_filemd5.append(_md5)
+                    dict_attach[_md5] = _d
+
+            if len(list_filemd5)>0:
+                list_attachment = self.getAttachments(list_filemd5)
+                _all_succeed = True
+                list_html = []
+                for attach in list_attachment:
+                    #更新docids
+                    filemd5 = attach.getProperties().get(attachment_filemd5,"")
+                    docids = attach.getProperties().get(attachment_docids,"")
+                    _html = attach.getProperties().get(attachment_attachmenthtml)
+                    _dict = {"filemd5":filemd5,
+                             "html":_html}
+                    if len(_html)>10:
+                        list_html.append(_dict)
+
+                if len(list_html)>0:
+
+                    html_d = {document_partitionkey:_document.getProperties().get(document_partitionkey),
+                               document_docid:_document.getProperties().get(document_docid)}
+                    d_html = Document(html_d)
+                    for _ in range(3):
+                        if d_html.fix_columns(self.ots_capacity,[document_dochtmlcon],True):
+                            log("update docid:%d attachments"%(d_html.getProperties().get(document_docid)))
+                            d_html.updateAttachment(list_html)
+                            d_html.update_row(self.ots_capacity)
+                            break
+
+        except Exception as e:
+            debug("attach process failed of docid:%s of error:%s"%(str(_document.getProperties().get(document_docid)),str(e)))
+            traceback.print_exc()
+
+    def comsumer(self):
+        #对每篇公告的附件处理数据,查询附件是否以及上传完成和处理完成,未完全上传的直接跳过
+
+        #上传完成但没有处理完成的调用接口处理,更新attachment
+
+        #合并所有attachment数据,html部分写入dochtmlcon,对html取text后写入attachtextcon
+
+        #更新数据
+
+        log("comsumer task_queue size :%d"%(self.task_queue.qsize()))
+        # mt = MultiThreadHandler(self.task_queue,self.comsumer_handle,None,60)
+        mt = MultiHandler(self.task_queue,self.comsumer_handle,None,2,30)
+        # mt = MultiProcessHandler(self.task_queue,self.comsumer_handle,None,30)
+        mt.run()
+        log("current docid:%d"%(self.current_docid))
+        with open(self.doc_file,"w") as f:
+            f.write(str(self.current_docid))
+
+    def attachment_process(self):
+        self.producer()
+        self.comsumer()
+
+    def schedule(self):
+        _scheduler = BlockingScheduler()
+        # _scheduler.add_job(self.attachment_process,"cron",second="*/1")
+        _scheduler.add_job(self.producer,"cron",second="*/10")
+        _scheduler.add_job(self.comsumer,"cron",second="*/10")
+        _scheduler.start()
+
+
+def start_attachAttachment():
+    ap = AttachProcess()
+    ap.schedule()
+
+if __name__ == '__main__':
+    ap = AttachProcess()
+    ap.attachment_process()

+ 201 - 30
BaseDataMaintenance/maintenance/major_project/unionDocument.py

@@ -2,6 +2,7 @@
 from BaseDataMaintenance.dataSource.source import getConnect_ots
 from tablestore import *
 from BaseDataMaintenance.model.ots.major_project import *
+from BaseDataMaintenance.model.ots.document import *
 from BaseDataMaintenance.common.Utils import *
 
 from queue import Queue
@@ -85,9 +86,9 @@ def get_stage_pattern():
 def extract_legal_stage(content, _pattern, priority_dict):
     # 判断这几类直接返回
     if not content:
-        return None
+        return None,None
     if re.search("拍卖|转让|产权|出让|租赁|招租", content) is not None:
-        return None
+        return None,None
     # 替换混淆词
     _content = re.sub("设计院|设计总院", "", content)
 
@@ -95,16 +96,16 @@ def extract_legal_stage(content, _pattern, priority_dict):
     for stage_search in re.finditer(_pattern, _content):
         for k,v in stage_search.groupdict().items():
             if v is not None:
-                list_stage.append([k, priority_dict.get(k)])
+                list_stage.append([k, priority_dict.get(k),v])
     if len(list_stage)>0:
-        list_stage.sort(key=lambda x: x[1])
-        return list_stage[0][0]
+        # list_stage.sort(key=lambda x: x[1])
+        return list_stage[0][0],list_stage[0][2]
     if re.search("总承包|EPC",_content) is not None:
         if re.search("设计",_content) is not None:
-            return "设计阶段"
+            return "设计阶段","设计"
         else:
-            return "施工在建"
-    return None
+            return "施工在建","总承包"
+    return None,None
 
 def read_industry_keyword(_path):
     df = pd.read_excel(_path)
@@ -208,25 +209,56 @@ def dynamicDumplicate2(list_dynamic,stage_order):
     _set = set()
     l_d = []
     list_dynamic.sort(key=lambda x:x.get("page_time",""))
-    list_dynamic.sort(key=lambda x:1 if x.get("docchannel","") in (101,119,120) else 0,reverse=True)
+
+    # list_dynamic.sort(key=lambda x:1 if x.get("docchannel","") in (101,119,120) else 0,reverse=True)
+
+
+    #stage_keyword,docchannel去重
+
+    dict_stage_channel_dynamic = {}
+
     last_stage = 0
     set_stage = set()
     for _dynamic in list_dynamic:
-        _stage = _dynamic.get("project_stage","")
+
+        _stage = _dynamic.get(project_dynamics_project_stage,"")
         if _stage=="":
             continue
         current_stage = stage_order.get(_stage,-1)
         if current_stage<last_stage:
             continue
+
         set_stage.add(_stage)
         last_stage = current_stage
-        # _channel = _dynamic.get("docchannel","")+_dynamic.get("sp_type","")
-        # 保留一条数据
-        # _key = _stage+_channel
-        # if _key in _set or _key=="" or _key is None:
-        #     continue
-        # _set.add(_key)
-        l_d.append(_dynamic)
+
+        stage_keyword = _dynamic.get("project_stage_keyword","")
+        new_channel = _dynamic.get("docchannel","")
+        if new_channel in (101,119,120):
+            new_channel = 1101
+        if new_channel in (51,103,104,105,115,116,117):
+            new_channel = 1201
+        if _dynamic.get("sp_type","")!="":
+            new_channel = 1001
+
+
+        _key = "%s-%s"%(str(stage_keyword),str(new_channel))
+        if new_channel!=1101:
+            if _key in _set:
+                continue
+            else:
+                dict_stage_channel_dynamic[_key] = _dynamic
+        else:
+            if _key in _set:
+                b_dynamic = dict_stage_channel_dynamic[_key]
+                if b_dynamic.get(project_dynamics_win_tenderer,"")=="" and _dynamic.get(project_dynamics_win_tenderer,"")!="":
+                    dict_stage_channel_dynamic[_key] = _dynamic
+            else:
+                dict_stage_channel_dynamic[_key] = _dynamic
+
+        _set.add(_key)
+    for k,v in dict_stage_channel_dynamic.items():
+        l_d.append(v)
+        l_d.sort(key=lambda x:x.get(project_dynamics_page_time,""))
     return l_d,list(set_stage)
 
 
@@ -248,7 +280,7 @@ class MajorUnion():
 
     def __init__(self):
         self.ots_client = getConnect_ots()
-        self.search_columns = [major_project_project_name,major_project_province,major_project_project_stage,major_project_plan_start_time]
+        self.search_columns = [major_project_project_name,major_project_province,major_project_project_stage,major_project_plan_start_time,major_project_total_investment]
 
 
         self.stage_pattern,self.stage_priority_dict = get_stage_pattern()
@@ -276,11 +308,13 @@ class MajorUnion():
 
     def producer(self):
         bool_query = BoolQuery(must_queries=[
-            RangeQuery(major_project_status,1,50,True,True),
-            # TermQuery(major_project_id,"00099059de2dedc5b969c3c19aa41c8b")
+            # RangeQuery(major_project_status,1,50,True,True),
+            # RangeQuery(major_project_status,201,301,True,True),
+            TermQuery(major_project_id,"00048953975ad762e883f1626f6b99ec")
             ]
         )
 
+        # self.search_columns = [major_project_status]
         task_queue = Queue()
 
         rows,next_token,total_count,is_all_succeed = self.ots_client.search("major_project","major_project_index",
@@ -298,7 +332,7 @@ class MajorUnion():
             list_data = getRow_ots(rows)
             for _data in list_data:
                 task_queue.put(_data)
-            if task_queue.qsize()>=1000:
+            if task_queue.qsize()>=3000:
                 break
         return task_queue
 
@@ -306,6 +340,17 @@ class MajorUnion():
         _major.setValue(major_project_status,random.randint(201,300),True)
 
 
+    def extract_projectDigest(self,content):
+        _pattern = "(?P<projectDigest>(项目|工程|标的|需求|建设|招标|采购|内容)(概况|规模|简介|信息|范围|内容|说明|摘要).{10,300})"
+        _pattern_search = re.search(_pattern,content)
+        _projectDigest = ""
+        _find = ""
+        if _pattern_search is not None:
+            _find = _pattern_search.groupdict().get("projectDigest","")
+        if len(_find)>0:
+            _projectDigest = "。".join(_find.split("。")[0:3])
+        return _projectDigest
+
     def comsumer(self):
 
         def _handle(item,result_queue):
@@ -314,8 +359,6 @@ class MajorUnion():
             # _major.update_row(self.ots_client)
             # return
 
-
-
             project_name = item.get(major_project_project_name,"")
 
 
@@ -338,7 +381,7 @@ class MajorUnion():
                 _major.setValue(major_project_update_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
                 _major.setValue(major_project_all_project_dynamics,"[]]",True)
                 _major.setValue(major_project_all_project_dynamic_number,0,True)
-                _major.setValue(major_project_stages,",".join([]),True)
+                _major.setValue(major_project_project_stages,",".join([]),True)
                 self.set_status_to_adult(_major)
                 _major.update_row(self.ots_client)
                 self.set_status_to_adult(_major)
@@ -364,21 +407,73 @@ class MajorUnion():
                                                                                 ColumnsToGet(["province","page_title","page_content","page_time","sp_type"],ColumnReturnType.SPECIFIED))
             list_data = getRow_ots(rows)
             dict_industry = {}
+
+            list_area = []
+            list_construction = []
+            list_contact = []
+            dict_tenderee = {}
+
             for _data in list_data:
                 _content = _data.get("page_title","")+_data.get("page_content","")[:100]
-                _stage = extract_legal_stage(_content,self.stage_pattern,self.stage_priority_dict)
+                _stage,_stage_keyword = extract_legal_stage(_content,self.stage_pattern,self.stage_priority_dict)
                 _dynamic = {"docid":str(_data.get("id")),
                             "doctype":2,
                             "doctitle":_data.get("page_title",""),
                             "page_time":_data.get("page_time",""),
                             "sp_type":str(_data.get("sp_type","")),
-                            "project_stage":_stage}
+                            "project_stage":_stage,
+                            "project_stage_keyword":_stage_keyword}
                 list_dynamics.append(_dynamic)
                 _industry = extract_industry(self.dict_keyword,self.keyword_pattern,_content)
                 if _industry not in dict_industry:
                     dict_industry[_industry] = 0
                 dict_industry[_industry] += 1
 
+                _construction = self.extract_projectDigest(_content)
+                list_construction.append(_construction)
+
+                _area = _data.get(document_area,"")
+                _province = _data.get(document_province,"")
+                _city = _data.get(document_city,"")
+                _district = _data.get(document_district,"")
+
+                _score = 0
+                if _area!="":
+                    _score += 1
+                if _province!="":
+                    _score += 1
+                if _city!="":
+                    _score += 1
+                if _district!="":
+                    _score += 1
+                list_area.append({document_area:_area,
+                                  document_province:_province,
+                                  document_city:_city,
+                                  document_district:_district,
+                                  "score":_score})
+
+                _tenderee = _data.get(document_tenderee,"")
+                if _tenderee!="":
+                    if _tenderee not in dict_tenderee:
+                        dict_tenderee[_tenderee] = 0
+                    dict_tenderee[_tenderee] += 1
+
+                tenderee_contact = _data.get(document_tenderee_contact,"")
+                tenderee_phone = _data.get(document_tenderee_phone,"")
+                contact_score = 0
+                if tenderee_phone!="":
+                    if recog_likeType(tenderee_phone)=="mobile":
+                        contact_score += 2
+                    else:
+                        contact_score += 1
+                if tenderee_contact!="":
+                    contact_score += 1
+                if contact_score>0:
+                    list_contact.append({document_tenderee_contact:tenderee_contact,
+                                         document_tenderee_phone:tenderee_phone,
+                                         "score":contact_score})
+
+
 
             log("%s search sp %d"%(item.get("id"),len(list_data)))
 
@@ -411,15 +506,17 @@ class MajorUnion():
                 ])
             rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
                                                                                 SearchQuery(bool_query_doc,limit=100),
-                                                                                ColumnsToGet(["doctitle","doctextcon","attachmenttextcon","page_time","docchannel","bidway","sub_docs_json","project_name"],ColumnReturnType.SPECIFIED))
+                                                                                ColumnsToGet(["doctitle","doctextcon","attachmenttextcon","page_time","docchannel","bidway","sub_docs_json","project_name",document_tenderee,document_tenderee_contact,document_tenderee_phone,document_area,document_province,document_city,document_district,document_total_tenderee_money],ColumnReturnType.SPECIFIED))
             list_data = getRow_ots(rows)
 
             log("%s search doc %d"%(item.get("id"),len(list_data)))
             for _data in list_data:
                 _content = _data.get("doctitle","")+_data.get("doctextcon","")+_data.get("attachmenttextcon","")
+
+
                 stage_content = _data.get("doctitle","")+_data.get("project_name","")
                 win_tenderer,win_tenderer_manager,win_tenderer_phone = getWinTenderer(_data.get("sub_docs_json"),self.ots_client)
-                _stage = extract_legal_stage(stage_content,self.stage_pattern,self.stage_priority_dict)
+                _stage,_stage_keyword = extract_legal_stage(stage_content,self.stage_pattern,self.stage_priority_dict)
                 _dynamic = {"docid":str(_data.get("docid")),
                             "doctype":1,
                             "doctitle":_data.get("doctitle",""),
@@ -429,12 +526,86 @@ class MajorUnion():
                             "project_stage":_stage,
                             "win_tenderer":win_tenderer,
                             "win_tenderer_manager":win_tenderer_manager,
-                            "win_tenderer_phone":win_tenderer_phone}
+                            "win_tenderer_phone":win_tenderer_phone,
+                            "project_stage_keyword":_stage_keyword}
                 list_dynamics.append(_dynamic)
                 _industry = extract_industry(self.dict_keyword,self.keyword_pattern,_content)
                 if _industry not in dict_industry:
                     dict_industry[_industry] = 0
                 dict_industry[_industry] += 1
+
+                total_tenderee_money = _data.get(document_total_tenderee_money,0)
+                if total_tenderee_money>0:
+                    if _major.getProperties().get(major_project_total_investment,0)==0:
+                        _major.setValue(major_project_total_investment,total_tenderee_money/10000,True)
+
+                _construction = self.extract_projectDigest(_content)
+                list_construction.append(_construction)
+
+                _area = _data.get(document_area,"")
+                _province = _data.get(document_province,"")
+                _city = _data.get(document_city,"")
+                _district = _data.get(document_district,"")
+
+                _score = 0
+                if _area!="":
+                    _score += 1
+                if _province!="":
+                    _score += 1
+                if _city!="":
+                    _score += 1
+                if _district!="":
+                    _score += 1
+                list_area.append({document_area:_area,
+                                  document_province:_province,
+                                  document_city:_city,
+                                  document_district:_district,
+                                  "score":_score})
+
+                _tenderee = _data.get(document_tenderee,"")
+                if _tenderee!="":
+                    if _tenderee not in dict_tenderee:
+                        dict_tenderee[_tenderee] = 0
+                    dict_tenderee[_tenderee] += 1
+
+                tenderee_contact = _data.get(document_tenderee_contact,"")
+                tenderee_phone = _data.get(document_tenderee_phone,"")
+                contact_score = 0
+                if tenderee_phone!="":
+                    if recog_likeType(tenderee_phone)=="mobile":
+                        contact_score += 2
+                    else:
+                        contact_score += 1
+                if tenderee_contact!="":
+                    contact_score += 1
+                if contact_score>0:
+                    list_contact.append({document_tenderee_contact:tenderee_contact,
+                                         document_tenderee_phone:tenderee_phone,
+                                         "score":contact_score})
+
+
+            #补充属性
+            if len(list_area)>0:
+                list_area.sort(key=lambda x:x.get("score"),reverse=True)
+                _dict = list_area[0]
+                _major.setValue(major_project_area,_dict.get(document_area,""),True)
+                _major.setValue(major_project_province,_dict.get(document_province,""),True)
+                _major.setValue(major_project_city,_dict.get(document_city,""),True)
+                _major.setValue(major_project_district,_dict.get(document_district,""),True)
+            if len(list_construction)>0:
+                list_construction.sort(key=lambda x:len(x),reverse=True)
+                _major.setValue(major_project_project_overview,list_construction[0],True)
+            if len(dict_tenderee.keys())>0:
+                _l = []
+                for k,v in dict_tenderee.items():
+                    _l.append([k,v])
+                _l.sort(key=lambda x:x[1],reverse=True)
+                _major.setValue(major_project_construction_enterprise,_l[0][0],True)
+
+            if len(list_contact)>0:
+                list_contact.sort(key=lambda x:x.get("score"),reverse=False)
+                _major.setValue(major_project_project_leader,list_contact[0].get(document_tenderee_contact,""),True)
+                _major.setValue(major_project_project_leader_phone,list_contact[0].get(document_tenderee_phone,""),True)
             # print(list_data)
             # print(list_dynamics)
             list_dynamics_all =  dynamicDumplicate(list_dynamics)
@@ -468,7 +639,7 @@ class MajorUnion():
             _major.setValue(major_project_update_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
             _major.setValue(major_project_all_project_dynamics,all_project_dynamics,True)
             _major.setValue(major_project_all_project_dynamic_number,all_project_dynamic_number,True)
-            _major.setValue(major_project_stages,",".join(list_stage),True)
+            _major.setValue(major_project_project_stages,",".join(list_stage),True)
             self.set_status_to_adult(_major)
             _major.update_row(self.ots_client)
 

+ 33 - 8
BaseDataMaintenance/maxcompute/documentDumplicate.py

@@ -693,6 +693,22 @@ def getLength(_str):
 
 def check_money(bidding_budget_less,bidding_budget_greater,
                 win_bid_price_less,win_bid_price_greater):
+
+    #只判断最高前六位
+    if getLength(bidding_budget_less)>0:
+        bidding_budget_less = round(float(bidding_budget_less))
+        bidding_budget_less = str(round(bidding_budget_less,6-len(str(bidding_budget_less))))
+    if getLength(bidding_budget_greater)>0:
+        bidding_budget_greater = round(float(bidding_budget_greater))
+        bidding_budget_greater = str(round(bidding_budget_greater,6-len(str(bidding_budget_greater))))
+
+    if getLength(win_bid_price_less)>0:
+        win_bid_price_less = round(float(win_bid_price_less))
+        win_bid_price_less = str(round(win_bid_price_less,6-len(str(win_bid_price_less))))
+    if getLength(win_bid_price_greater)>0:
+        win_bid_price_greater = round(float(win_bid_price_greater))
+        win_bid_price_greater = str(round(win_bid_price_greater,6-len(str(win_bid_price_greater))))
+
     #check saming
     budget_is_same = ""
     price_is_same = ""
@@ -756,13 +772,20 @@ def check_codes(project_codes_less,project_codes_greater):
     #check the similarity
     is_same = False
     is_sim = False
+
+
     for project_code_less in project_codes_less:
         for project_code_greater in project_codes_greater:
             code_sim = getSimilarityOfString(project_code_less,project_code_greater)
-            if code_sim>0.6 and code_sim<1:
-                is_sim = True
-            if code_sim==1:
-                is_same = True
+            if project_code_less is not None and project_code_greater is not None:
+                if code_sim>0.6:
+                    if str(project_code_less).find(str(project_code_greater))>=0 or str(project_code_greater).find(str(project_code_less))>=0:
+                        is_same = True
+                    else:
+                        is_sim = True
+                if project_code_less!=project_code_greater:
+                    if code_sim>0.4 and len(project_code_less)==len(project_code_greater):
+                        is_sim = True
     if is_same:
         return True
     if is_sim:
@@ -779,7 +802,9 @@ num1_pattern = re.compile("[一二三四五六七八九]+")
 location_pattern = re.compile("[^\[【\(]{1,2}[市区镇县村路]")
 building_pattern = "工程招标代理|工程设计|工程造价咨询|施工图设计文件审查|咨询|环评|设计|施工监理|施工|监理|EPC|epc|总承包|水土保持|选址论证|勘界|勘察|预算编制|预算审核|设备类|第?[\((]?[一二三四五六七八九1-9][)\)]?[次批]"
 date_pattern = re.compile("\d{2,4}[\-\./年]\d{1,2}[\-\./月]\d{1,2}")
-def check_doctitle(doctitle_refind_less,doctitle_refind_greater,codes_less=[],code_greater=[]):
+def check_doctitle(doctitle_refind_less, doctitle_refind_greater, codes_less=[], code_greater=[]):
+    if code_greater is None:
+        code_greater = []
     doctitle_refind_less = str(doctitle_refind_less).replace("(","(").replace(")",")")
     doctitle_refind_greater = str(doctitle_refind_greater).replace("(","(").replace(")",")")
     for _c in codes_less:
@@ -2139,9 +2164,9 @@ class f_autorule_group_extract(BaseUDTF):
 
 
 if __name__ == '__main__':
-    # _str1 = "SXXY-ZBP-GG-2020002"
-    # _str2 = "SXXY-ZBP-GG-2020002"
-    # print(getSimilarityOfString(_str1,_str2))
+    _str1 = "PMJJ-202211030004001"
+    _str2 = "PMJJ-202211030001001"
+    print(getSimilarityOfString(_str1,_str2))
     print(check_doctitle("强化桂城街道工地扬尘防控监管巡查第三方(二次)","广东省强化桂城街道工地扬尘防控监管巡查第三方(二次)"))
     # print(check_codes(["F-2022-027(MASCG-2-F-F-2022-0462)"],["F-2022-027(MASCG-2-F-F-2022-0462)"]))
     # print(check_product(None,None))

+ 185 - 40
BaseDataMaintenance/maxcompute/documentMerge.py

@@ -960,16 +960,20 @@ class f_merge_getLabel(BaseUDTF):
 def getSimilarityOfString(str1,str2,nums=2):
     _set1 = set()
     _set2 = set()
+    if str1 is None:
+        str1 = ""
+    if str2 is None:
+        str2 = ""
     if len(str1)<=nums or len(str2)<=nums:
         if str1!=str2:
             return 0.8
         else:
             return 1
     if str1 is not None:
-        for i in range(nums,len(str1)):
+        for i in range(nums,min(1000,len(str1))):
             _set1.add(str1[i-nums:i+1])
     if str2 is not None:
-        for i in range(nums,len(str2)):
+        for i in range(nums,min(1000,len(str2))):
             _set2.add(str2[i-nums:i+1])
     _len = max(1,min(len(_set1),len(_set2)))
     return len(_set1&_set2)/_len
@@ -2029,10 +2033,14 @@ class f_generate_projects_from_project(BaseUDTF):
         if win_bid_price is None:
             win_bid_price = -1
 
+        if project_codes is None:
+            project_codes = ""
         list_codes = project_codes.split(",")
         page_time_stamp = self.ToTimeStamp.evaluate(page_time)
         if len(list_codes)==0:
             list_codes.append("")
+        if product is None:
+            product = ""
         list_product = product.split(",")
         if len(list_product)==0:
             list_product.append("")
@@ -2067,9 +2075,9 @@ def dumplicate_projects(list_projects,b_log=False):
     while 1:
         _update = False
         list_p = []
-        log("================")
-        for _p in cluster_projects:
-            log("docids:%s"%(_p.get(project_docids,"")))
+        # log("================")
+        # for _p in cluster_projects:
+        #     log("docids:%s"%(_p.get(project_docids,"")))
 
         for _pp in cluster_projects:
             _find = False
@@ -2183,16 +2191,27 @@ def getTimeStamp(page_time):
     except Exception as e:
         return 0
 
-def timeAdd(_time,days):
+def timeAdd(_time,days,format="%Y-%m-%d",minutes=0):
     try:
-        a = time.mktime(time.strptime(_time,'%Y-%m-%d'))+86400*days
+        a = time.mktime(time.strptime(_time,format))+86400*days+60*minutes
 
-        _time1 = time.strftime("%Y-%m-%d",time.localtime(a))
+        _time1 = time.strftime(format,time.localtime(a))
         return _time1
     except Exception as e:
         return None
 
+
+# def timeAdd(_time,days):
+#     try:
+#         a = time.mktime(time.strptime(_time,'%Y-%m-%d'))+86400*days
+#
+#         _time1 = time.strftime("%Y-%m-%d",time.localtime(a))
+#         return _time1
+#     except Exception as e:
+#         return None
+
 def check_time_merge(json_time_less,json_time_greater,b_log,set_time_key=set([project_time_bidclose,project_time_bidopen,project_time_bidstart,project_time_commencement,project_time_completion,project_time_earnest_money_start,project_time_earnest_money_end,project_time_get_file_end,project_time_get_file_start,project_time_publicity_end,project_time_publicity_start,project_time_registration_end,project_time_registration_start])):
+
     same_count = 0
     if getLength(json_time_less)>0 and getLength(json_time_greater)>0:
         if isinstance(json_time_less,dict):
@@ -2209,7 +2228,7 @@ def check_time_merge(json_time_less,json_time_greater,b_log,set_time_key=set([pr
                     v1 = time_greater.get(k,"")
                     if getLength(v1)>0:
                         _dis = getTimeStamp(v)-getTimeStamp(v1)
-                        if _dis>86400*2 or _dis<-86400*2:
+                        if _dis>86400*5 or _dis<-86400*5:
                             if b_log:
                                 log("check time failed %s-%s-%s"%(str(k),str(v),str(v1)))
                             return -1
@@ -2245,19 +2264,58 @@ def check_page_time_merge(page_time,page_time_to_merge,b_log,time_limit):
             return 1
     return 0
 
+def check_dynamics_title_merge(project_dynamics,project_dynamics_to_merge,b_log):
+    #判断项目名称
+    if project_dynamics is not None and project_dynamics_to_merge is not None:
+        try:
+            project_dynamics = json.loads(project_dynamics)
+            project_dynamics_to_merge = json.loads(project_dynamics_to_merge)
+            for _d in project_dynamics:
+                _title1 = _d.get(document_doctitle,"")
+                _title1 = re.sub(r'项目|工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|合同', '',  _title1)
+                for _dm in project_dynamics_to_merge:
+
+                    _title2 = _dm.get(document_doctitle,"")
+
+                    _title2 = re.sub(r'项目|工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|合同', '',  _title2)
+                    if len(_title1)>15 and len(_title2)>15:
+                        _sim = getSimilarityOfString(_title1,_title2)
+                        if _sim>0.7:
+                            return 1
+        except Exception as e:
+            pass
+    return -1
+
 def check_project_name_merge(project_name,project_name_to_merge,b_log):
     #判断项目名称
-    return 0
+
+    project_name = re.sub(r'项目|工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|合同', '',  project_name)
+    project_name_to_merge = re.sub(r'项目|工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|合同', '',  project_name_to_merge)
     if len(project_name)>15 and len(project_name_to_merge)>15:
         _sim = getSimilarityOfString(project_name,project_name_to_merge)
         if _sim<0.7:
             if b_log:
-                log("check project_name failed %s===%s"%(str(project_name),str(project_name_to_merge)))
+                log("check project_name failed %s %s===%s"%(str(_sim),str(project_name),str(project_name_to_merge)))
             return -1
         return 1
+    return 0
 
-def check_zhaozhong_page_time_merge(zhao_biao_page_time,zhong_biao_page_time,zhao_biao_page_time_to_merge,zhong_biao_page_time_to_merge,b_log):
-    if (len(zhong_biao_page_time)>0 and len(zhao_biao_page_time_to_merge)>0 and zhong_biao_page_time<zhao_biao_page_time_to_merge) or (len(zhong_biao_page_time_to_merge)>0 and len(zhao_biao_page_time)>0 and zhong_biao_page_time_to_merge<zhao_biao_page_time):
+def check_zhaozhong_page_time_merge(zhao_biao_page_time,zhong_biao_page_time,zhao_biao_page_time_to_merge,zhong_biao_page_time_to_merge,_proj,_proj_to_merge,b_log):
+    if getLength(zhong_biao_page_time)>0:
+        bidopen = _proj.get(project_time_bidopen)
+        if getLength(bidopen)==0:
+            bidopen = _proj.get(project_time_bidclose)
+        if getLength(bidopen)>0 and bidopen>zhong_biao_page_time:
+            zhong_biao_page_time = bidopen
+
+    if getLength(zhong_biao_page_time_to_merge)>0:
+        bidopen_to_merge = _proj_to_merge.get(project_time_bidopen)
+        if getLength(bidopen_to_merge)==0:
+            bidopen_to_merge = _proj_to_merge.get(project_time_bidclose)
+        if getLength(bidopen_to_merge)>0 and bidopen_to_merge>zhong_biao_page_time_to_merge:
+            zhong_biao_page_time_to_merge = bidopen_to_merge
+
+    if (getLength(zhong_biao_page_time)>0 and getLength(zhao_biao_page_time_to_merge)>0 and zhong_biao_page_time<zhao_biao_page_time_to_merge) or (len(zhong_biao_page_time_to_merge)>0 and len(zhao_biao_page_time)>0 and zhong_biao_page_time_to_merge<zhao_biao_page_time):
         if b_log:
             log("check zhaobiao zhongbiao page_time failed %s=%s===%s=%s"%(str(zhao_biao_page_time),str(zhong_biao_page_time),str(zhao_biao_page_time_to_merge),str(zhong_biao_page_time_to_merge)))
         return -1
@@ -2308,6 +2366,18 @@ def check_roles_merge(enterprise,enterprise_to_merge,tenderee,tenderee_to_merge,
     return 0
 
 def check_money_merge(bidding_budget,bidding_budget_to_merge,win_bid_price,win_bid_price_to_merge,b_log):
+
+    #只判断最高前五位
+    bidding_budget = round(bidding_budget)
+    bidding_budget = round(bidding_budget,6-len(str(bidding_budget)))
+    bidding_budget_to_merge = round(bidding_budget_to_merge)
+    bidding_budget_to_merge = round(bidding_budget_to_merge,6-len(str(bidding_budget_to_merge)))
+
+    win_bid_price = round(win_bid_price)
+    win_bid_price = round(win_bid_price,6-len(str(win_bid_price)))
+    win_bid_price_to_merge = round(win_bid_price_to_merge)
+    win_bid_price_to_merge = round(win_bid_price_to_merge,6-len(str(win_bid_price_to_merge)))
+
     _set = set([a for a in [bidding_budget,bidding_budget_to_merge] if a>0])
     if len(_set)>1:
         if b_log:
@@ -2325,12 +2395,15 @@ def check_money_merge(bidding_budget,bidding_budget_to_merge,win_bid_price,win_b
         max_win_bid_price = max(_set1)
         max_bidding_budget = max(_set)
         radio = max_win_bid_price/max_bidding_budget
-        if max_win_bid_price>max_bidding_budget:
+        #允许中标金额大于预算10%
+        if max_win_bid_price>max_bidding_budget*(1.1):
             if b_log:
-                log("check money failed %s===%s"%(str(max(_set1)),str(max(_set))))
+                log("check max_win_bid_price<=max_bidding_budget*(1.1) failed %s===%s"%(str(max(_set1)),str(max(_set))))
             return -1
         else:
             if radio<0.3:
+                if b_log:
+                    log("check money failed radio<0.3 %s===%s"%(str(max(_set1)),str(max(_set))))
                 return -1
         if (bidding_budget>0 and bidding_budget_to_merge>0) or (win_bid_price>0 and win_bid_price_to_merge>0):
             return 1
@@ -2340,8 +2413,8 @@ def check_project_codes_merge(list_code,list_code_to_merge,b_log):
     #check project_codes
     has_same = False
     has_similar = False
-    for _c in list_code:
-        for _c1 in list_code_to_merge:
+    for _c in list_code[:100]:
+        for _c1 in list_code_to_merge[:100]:
             _simi = getSimilarityOfString(_c,_c1,3)
             if _simi==1:
                 has_same = True
@@ -2360,6 +2433,7 @@ def check_project_codes_merge(list_code,list_code_to_merge,b_log):
     return 0
 
 def check_merge_rule(_proj,_dict,b_log=False,time_limit=86400*200,return_prob=False):
+    docids = _proj.get(project_docids,"")
     page_time = _proj.get(project_page_time,"")
     project_codes = _proj.get(project_project_codes,"")
     project_name = _proj.get(project_project_name,"")
@@ -2374,7 +2448,7 @@ def check_merge_rule(_proj,_dict,b_log=False,time_limit=86400*200,return_prob=Fa
     zhao_biao_page_time = _proj.get(project_zhao_biao_page_time,"")
     zhong_biao_page_time = _proj.get(project_zhong_biao_page_time,"")
 
-
+    project_dynamics = _proj.get(project_project_dynamics)
 
     enterprise = _proj.get("enterprise")
     if enterprise is None:
@@ -2390,6 +2464,7 @@ def check_merge_rule(_proj,_dict,b_log=False,time_limit=86400*200,return_prob=Fa
         list_code.append(project_code)
     list_code = [a for a in list_code if a is not None]
 
+    docids_to_merge = _dict.get(project_docids,"")
     page_time_to_merge = _dict.get(project_page_time,"")
     project_codes_to_merge = _dict.get(project_project_codes,"")
     project_name_to_merge = _dict.get(project_project_name,"")
@@ -2405,13 +2480,16 @@ def check_merge_rule(_proj,_dict,b_log=False,time_limit=86400*200,return_prob=Fa
     zhao_biao_page_time_to_merge = _dict.get(project_zhao_biao_page_time,"")
     zhong_biao_page_time_to_merge = _dict.get(project_zhong_biao_page_time,"")
 
+    project_dynamics_to_merge = _dict.get(project_project_dynamics)
+
     list_code_to_merge = [a for a in project_codes_to_merge.split(",") if a!='']
     if project_code_to_merge!="":
         list_code_to_merge.append(project_code_to_merge)
 
     list_code_to_merge = [a for a in list_code_to_merge if a is not None]
 
-
+    if b_log:
+        log("checking docids:%s and %s"%(str(docids),str(docids_to_merge)))
     enterprise_to_merge = _dict.get("enterprise")
     if enterprise_to_merge is None:
         try:
@@ -2424,13 +2502,15 @@ def check_merge_rule(_proj,_dict,b_log=False,time_limit=86400*200,return_prob=Fa
 
     check_dict = {0:0,1:0,-1:0}
 
-    _zhaozhong_check = check_zhaozhong_page_time_merge(zhao_biao_page_time,zhong_biao_page_time,zhao_biao_page_time_to_merge,zhong_biao_page_time_to_merge,b_log)
+    #时间判断-招中标时间
+    _zhaozhong_check = check_zhaozhong_page_time_merge(zhao_biao_page_time,zhong_biao_page_time,zhao_biao_page_time_to_merge,zhong_biao_page_time_to_merge,_proj,_dict,b_log)
     check_dict[_zhaozhong_check] += 1
     if check_dict[-1]>0:
         if return_prob:
             return False,0
         return False
 
+    #事件判断-金额
     _money_check = check_money_merge(bidding_budget,bidding_budget_to_merge,win_bid_price,win_bid_price_to_merge,b_log)
     check_dict[_money_check] += 1
     if check_dict[-1]>0:
@@ -2438,6 +2518,7 @@ def check_merge_rule(_proj,_dict,b_log=False,time_limit=86400*200,return_prob=Fa
             return False,0
         return False
 
+    #人物判断-角色
     _roles_check = check_roles_merge(enterprise,enterprise_to_merge,tenderee,tenderee_to_merge,agency,agency_to_merge,win_tenderer,win_tenderer_to_merge,b_log)
     check_dict[_roles_check] += 1
     if check_dict[-1]>0:
@@ -2445,6 +2526,7 @@ def check_merge_rule(_proj,_dict,b_log=False,time_limit=86400*200,return_prob=Fa
             return False,0
         return False
 
+    #事件判断-编号
     _codes_check = check_project_codes_merge(list_code,list_code_to_merge,b_log)
     check_dict[_codes_check] += 1
     if check_dict[-1]>0:
@@ -2453,25 +2535,40 @@ def check_merge_rule(_proj,_dict,b_log=False,time_limit=86400*200,return_prob=Fa
         return False
 
     _product_check = check_product_merge(product,product_to_merge,b_log)
-    check_dict[_product_check] += 1
-    if check_dict[-1]>0:
+
+
+    _project_name_check = check_project_name_merge(project_name,project_name_to_merge,b_log)
+
+    if _project_name_check==-1:
+        _project_name_check = check_dynamics_title_merge(project_dynamics,project_dynamics_to_merge,b_log)
+
+    #事件判断--产品和名称、标题只需要满足一个
+    if _product_check==-1 and _project_name_check==-1:
         if return_prob:
             return False,0
         return False
+    else:
+        check_dict[1] += 1
+        check_dict[1] += 1
 
+
+    #时间判断-其他时间
     _time_check = check_time_merge(_proj,_dict,b_log)
     check_dict[_time_check] += 1
 
+    #时间判断-分包编号
     _sub_project_name_check = check_sub_project_name_merge(sub_project_name,sub_project_name_to_merge,b_log)
     check_dict[_sub_project_name_check] += 1
 
-    _project_name_check = check_project_name_merge(project_name,project_name_to_merge,b_log)
-    check_dict[_project_name_check] += 1
 
+    #时间判断-发布时间
     _page_time_check = check_page_time_merge(page_time,page_time_to_merge,b_log,time_limit)
     check_dict[_page_time_check] += 1
 
+
     _prob = check_dict[1]/(check_dict[-1]+check_dict[0]+check_dict[1])
+    if b_log:
+        log("check %s-%s result%s"%(docids,docids_to_merge,str(check_dict)))
     if check_dict[-1]>0:
         if check_dict[-1]==1:
             if (_codes_check==1 and _roles_check==1 and _product_check==1) or (_roles_check==1 and _money_check==1 and _product_check==1):
@@ -2509,7 +2606,6 @@ class f_group_merge_projects(BaseUDAF):
     def terminate(self, buffer):
         set_uuid = set()
         list_data = []
-        log("111:\n%s"%(str(buffer)))
         for _uuid,page_time_stamp,attrs_json in buffer[0]:
             if _uuid in set_uuid:
                 continue
@@ -2522,17 +2618,20 @@ class f_group_merge_projects(BaseUDAF):
         list_group_data = []
         list_group = split_with_time(list_data,1)
 
-        for _group in list_group:
+        _time = time.time()
+        for _group in list_group[:100]:
             list_group_pair = []
             _group = _group[:50]
             for _i in range(len(_group)):
                 for _j in range(_i+1,len(_group)):
                     _p_uuid,_,_p = _group[_i]
                     _pp_uuid,_,_pp = _group[_j]
-                    if check_merge_rule(_p,_pp,True):
+                    if check_merge_rule(_p,_pp,False):
                         list_group_pair.append([_p_uuid,_pp_uuid])
             if len(list_group_pair)>0:
                 list_group_data.append(list_group_pair)
+            if time.time()-_time>600:
+                break
 
         return json.dumps(list_group_data)
 
@@ -2556,6 +2655,7 @@ class f_extract_uuid_groups(BaseUDTF):
                     self.forward(_group[0],_group[1])
                     self.forward(_group[1],_group[0])
 
+
 @annotate('string,string->string')
 class f_group_uuids(BaseUDAF):
     '''
@@ -2654,6 +2754,10 @@ def to_project_json(projects):
             _proj["delete_uuid"] = ",".join(list_uuid[1:])
         else:
             _proj["keep_uuid"] = _proj.get("keep_uuid","")
+            to_delete = _proj.get("to_delete","")
+            if to_delete=="" and _proj.get("keep_uuid","")=="":
+                _uuid = uuid4()
+                _proj["keep_uuid_generated"] = str(_uuid)
             _proj["delete_uuid"] = _proj.get("delete_uuid","")
         list_proj.append(_proj)
         if project_uuid in _proj:
@@ -2710,18 +2814,19 @@ def dumplicate_document_in_merge(list_projects):
                             continue
                         if not check_page_time_dup(page_time,n_page_time):
                             continue
-                        if not is_multipack and not n_is_multipack:
-                            if extract_count>n_extract_count:
+                        if extract_count>n_extract_count:
+                            set_dup_docid.add(str(n_docid))
+                            dict_channel_proj[docchannel] = _d
+                        elif extract_count==n_extract_count:
+                            if int(n_docid)>int(docid):
                                 set_dup_docid.add(str(n_docid))
                                 dict_channel_proj[docchannel] = _d
-                            elif extract_count==n_extract_count:
-                                if int(n_docid)>int(docid):
-                                    set_dup_docid.add(str(n_docid))
-                                    dict_channel_proj[docchannel] = _d
-                                elif int(n_docid)<int(docid):
-                                    set_dup_docid.add(str(docid))
-                            else:
+                            elif int(n_docid)<int(docid):
                                 set_dup_docid.add(str(docid))
+                        else:
+                            set_dup_docid.add(str(docid))
+                        if not is_multipack and not n_is_multipack:
+                            pass
                     else:
                         dict_channel_proj[docchannel] = _d
 
@@ -2768,10 +2873,9 @@ class f_dumplicate_projects(BaseUDAF):
             list_data.append(json.loads(attrs_json))
             set_uuid.add(uuid_1)
 
-        list_projects = dumplicate_projects(list_data,True)
+        list_projects = dumplicate_projects(list_data,False)
         dumplicate_document_in_merge(list_projects)
 
-        log("===========2")
         project_json = to_project_json(list_projects)
 
         return project_json
@@ -2791,7 +2895,7 @@ class f_generate_project_with_attrs_json(BaseUDTF):
     def process(self,attrs_json):
         if attrs_json is not None:
             _group = json.loads(attrs_json)
-            self.forward(json.dumps([_group]),ensure_ascii=False)
+            self.forward(json.dumps([_group],ensure_ascii=False))
 
 @annotate('string -> string')
 class f_generate_project_with_delete_uuid(BaseUDTF):
@@ -2833,6 +2937,44 @@ def test_remerge():
 
     print(getSimilarityOfString('37168100014015220220012_40785671','SDGP371681000202201000912'))
 
+@annotate('string,bigint,bigint->string')
+class f_check_projects_by_num(BaseUDTF):
+
+    def process(self,json_projects,len_start,len_end):
+        if json_projects is not None:
+            list_projects = json.loads(json_projects)
+            for _proj in list_projects:
+                _num = _proj.get(project_docid_number,0)
+                if _num>=len_start and _num<=len_end:
+                    self.forward(json.dumps(_proj,ensure_ascii=False))
+
+@annotate('string->string')
+class f_check_projects_by_time(BaseUDTF):
+
+    def process(self,json_projects):
+        if json_projects is not None:
+            list_projects = json.loads(json_projects)
+            for _proj in list_projects:
+                zhaobiao = _proj.get(project_zhao_biao_page_time)
+                zhongbiao = _proj.get(project_zhong_biao_page_time)
+                if (zhongbiao is None or zhongbiao=="") and zhaobiao is not None and zhaobiao!="":
+                # if zhaobiao is not None and zhongbiao is not None and zhaobiao!="" and zhongbiao!="":
+                    self.forward(json.dumps(_proj,ensure_ascii=False))
+
+@annotate('string->string,string,double')
+class f_extract_year_win_and_price(BaseUDTF):
+
+    def process(self,json_projects):
+        if json_projects is not None:
+            list_projects = json.loads(json_projects)
+            for _proj in list_projects:
+                win_tenderer = _proj.get(project_win_tenderer,"")
+                win_bid_price = float(_proj.get(project_win_bid_price,0))
+                page_time = _proj.get(project_zhong_biao_page_time,"")
+                if win_tenderer!="":
+                    self.forward(page_time,win_tenderer,win_bid_price)
+
+
 
 def test_merge_rule():
     o_a = {
@@ -2933,4 +3075,7 @@ def test_merge_rule():
     print(check_merge_rule(o_a,o_b,True))
 
 if __name__ == '__main__':
-    test_merge_rule()
+    # test_merge_rule()
+    a = uuid4()
+    print(str(a))
+    print(to_project_json([{"keep_uuid":"123"}]))

+ 1 - 1
BaseDataMaintenance/model/ots/BaseModel.py

@@ -42,7 +42,7 @@ class BaseModel():
             if _key=="all_columns":
                 continue
             _v = self.getProperties().get(_key)
-            if _v is not None:
+            if _v is not None and _v!="":
                 if isinstance(_v,list):
                     _v = json.dumps(_v)
                 _list.append((_key,_v))

+ 11 - 1
BaseDataMaintenance/model/ots/document.py

@@ -3,6 +3,8 @@ from tablestore import *
 from BaseDataMaintenance.common.Utils import *
 from bs4 import BeautifulSoup
 
+from BaseDataMaintenance.common.Utils import article_limit
+
 document_partitionkey = "partitionkey"
 document_docid = "docid"
 document_dochtmlcon = "dochtmlcon"
@@ -65,6 +67,9 @@ document_info_source = "info_source"
 
 document_nlp_enterprise = "nlp_enterprise"
 document_nlp_enterprise_attachment = "nlp_enterprise_attachment"
+
+
+document_total_tenderee_money = "total_tenderee_money"
 class Document(BaseModel):
 
     def __init__(self,_dict):
@@ -129,17 +134,22 @@ class Document(BaseModel):
                 _filemd5 = _ht.get("filemd5","")
                 _html = _ht.get("html","")
                 _text += '<div filemd5="%s">%s</div>'%(_filemd5,_html)
+        if len(_text)>50000:
+            _soup = BeautifulSoup(_text,"lxml")
+            _soup = article_limit(_soup,50000)
+            _text = re.sub("<html>|</html>|<body>|</body>","",str(_soup))
         return _text
 
     def updateAttachment(self,list_html):
         if len(list_html)>0:
+
             _dochtmlcon = self.getProperties().get(document_dochtmlcon,"")
             _dochtmlcon = re.sub("<html>|</html>|<body>|</body>","",_dochtmlcon)
             _dochtmlcon_len = len(bytes(_dochtmlcon,encoding="utf8"))
             fix_len = self.COLUMN_MAX_SIZE-_dochtmlcon_len-100
 
             # _text = '\n<div style="display:none;" class="richTextFetch">%s</div>'%("\n".join(list_html))
-            _text = '\n<div style="display:none;" class="richTextFetch">%s</div>'%(self.getRichTextFetch(list_html))
+            _text = '<div style="display:none;" class="richTextFetch">%s</div>'%(self.getRichTextFetch(list_html))
 
 
 

+ 1 - 0
BaseDataMaintenance/model/ots/document_tmp.py

@@ -26,6 +26,7 @@ document_tmp_extract_count = "extract_count"
 document_tmp_sub_docs_json = "sub_docs_json"
 document_tmp_save = "save"
 document_tmp_dup_docid = "dup_docid"
+document_tmp_best_docid = "best_docid"
 document_tmp_merge_uuid = "merge_uuid"
 document_tmp_projects = "projects"
 document_tmp_page_time = "page_time"

+ 5 - 1
BaseDataMaintenance/model/ots/major_project.py

@@ -2,13 +2,17 @@ from BaseDataMaintenance.model.ots.BaseModel import BaseModel
 
 major_project_id = "id"
 major_project_project_name = "project_name"
+
 major_project_area = "area"
 major_project_province = "province"
 major_project_city = "city"
 major_project_district = "district"
 major_project_construction_enterprise = "construction_enterprise"
 major_project_project_leader = "project_leader"
+major_project_project_leader_phone = "project_leader_phone"
 major_project_project_overview = "project_overview"
+
+
 major_project_construction_nature = "construction_nature"
 major_project_main_engineering = "main_engineering"
 major_project_plan_start_time = "plan_start_time"
@@ -45,7 +49,7 @@ project_dynamics_win_tenderer = "win_tenderer"
 project_dynamics_win_tenderer_manager = "win_tenderer_manager"
 project_dynamics_win_tenderer_phone = "win_tenderer_phone"
 
-major_project_stages = "stages"
+major_project_project_stages = "project_stages"
 
 
 

+ 25 - 0
BaseDataMaintenance/model/ots/project_process.py

@@ -0,0 +1,25 @@
+
+from BaseDataMaintenance.model.ots.BaseModel import BaseModel
+from BaseDataMaintenance.common.Utils import *
+import json
+from BaseDataMaintenance.dataSource.source import getConnect_ots
+from tablestore import *
+
+
+PROJECT_PROCESS_UUID = "uuid"
+PROJECT_PROCESS_CRTIME = "crtime"
+PROJECT_PROCESS_PROJECTS = "projects"
+
+class Project_process(BaseModel):
+
+    def __init__(self,_dict):
+
+        for k,v in _dict.items():
+            self.setValue(k,v,True)
+        self.table_name = "project_process"
+
+    def getPrimary_keys(self):
+        return ["uuid"]
+
+
+

+ 1 - 1
BaseDataMaintenance/start_dataflow_dumplicate.py

@@ -7,5 +7,5 @@ from BaseDataMaintenance.maintenance.dataflow import *
 
 if __name__ == '__main__':
     # flow = Dataflow()
-    flow = Dataflow_dumplicate()
+    flow = Dataflow_dumplicate(start_delete_listener=True)
     flow.start_flow_dumplicate()

+ 16 - 0
BaseDataMaintenance/start_main.py

@@ -0,0 +1,16 @@
+
+import sys
+import os
+sys.path.append(os.path.dirname(__file__)+"/..")
+import argparse
+
+def main(args=None):
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--aA",dest="attachAttachment",action="store_true",help="start attachmentAttachment process")
+    args = parser.parse_args(args)
+    if args.attachAttachment:
+        from BaseDataMaintenance.maintenance.document.attachAttachment import start_attachAttachment
+        start_attachAttachment()
+
+if __name__ == '__main__':
+    main()