Преглед изворни кода

Merge remote-tracking branch 'origin/master'

# Conflicts:
#	BaseDataMaintenance/maintenance/dataflow.py
luojiehua пре 1 недеља
родитељ
комит
315a490db9

+ 344 - 8
BaseDataMaintenance/maintenance/dataflow.py

@@ -2229,7 +2229,7 @@ class Dataflow_dumplicate(Dataflow):
         return dict_time
 
 
-    def get_attrs_before_dump(self,docid,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type]):
+    def get_attrs_before_dump(self,docid,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type,"detail_link",'products']):
 
         bool_query = BoolQuery(must_queries=[
             TermQuery("docid",docid)
@@ -2491,7 +2491,7 @@ class Dataflow_dumplicate(Dataflow):
                     check_result["pass"] = 0
                 else:
                     check_result["docchannel"] = 2
-        if not check_doctitle(doctitle_refine_less,doctitle_refine_greater,project_codes_less,project_codes_greater):
+        if not check_doctitle(doctitle_refine_less,doctitle_refine_greater,project_codes_less,project_codes_greater,page_time_less,page_time_greater):
             check_result["doctitle"] = 0
             check_result["pass"] = 0
             if b_log:
@@ -2689,7 +2689,8 @@ class Dataflow_dumplicate(Dataflow):
 
         if table_name in {"document_tmp","document"}:
 
-            if page_time>=timeAdd(current_date,-7) and item.get("is_special_bonds")!=1:
+            # if page_time>=timeAdd(current_date,-7) and item.get("is_special_bonds")!=1:
+            if page_time>=timeAdd(current_date,-7) and item.get("is_special_bonds")!=1 and not get_all:
                 table_name = "document_tmp"
                 table_index = "document_tmp_index"
                 base_dict = {
@@ -2939,7 +2940,7 @@ class Dataflow_dumplicate(Dataflow):
 
         return list_rules,table_name,table_index
 
-    def producer_flow_dumplicate(self,process_count,status_from,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type,"detail_link"]):
+    def producer_flow_dumplicate(self,process_count,status_from,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type,"detail_link",'products']):
         q_size = self.queue_dumplicate.qsize()
         log("dumplicate queue size %d"%(q_size))
 
@@ -4463,7 +4464,7 @@ class Dataflow_dumplicate(Dataflow):
                 singleNum_keys = _rule["singleNum_keys"]
                 contain_keys = _rule["contain_keys"]
                 multiNum_keys = _rule["multiNum_keys"]
-                self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle,document_tmp_attachment_path,document_tmp_source_stage,document_tmp_source_type,document_update_document,document_tmp_web_source_name,'detail_link'],b_log=b_log)
+                self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle,document_tmp_attachment_path,document_tmp_source_stage,document_tmp_source_type,document_update_document,document_tmp_web_source_name,'detail_link','products'],b_log=b_log)
                 _i += step
 
 
@@ -4872,6 +4873,340 @@ class Dataflow_dumplicate(Dataflow):
         if res:
             log("send_daily_check_data2,sent data len: %d"%(res_json['count']))
 
+    # 根据项目修复doc公告数据
+    def fix_doc_by_project2(self):
+        import datetime
+        from itertools import groupby
+        from collections import Counter
+        label2key = {
+            '公告变更': 51,
+            '招标公告': 52,
+            '中标信息': 101,
+            '招标预告': 102,
+            '招标答疑': 103,
+            '招标文件': 104,
+            '资审结果': 105,
+            '法律法规': 106,
+            '新闻资讯': 107,
+            '采购意向': 114,
+            '拍卖出让': 115,
+            '土地矿产': 116,
+            '产权交易': 117,
+            '废标公告': 118,
+            '候选人公示': 119,
+            '合同公告': 120,
+            '开标记录': 121,
+            '验收合同': 122,
+            # 以下排除
+            '拟在建数据': 301,
+            '审批项目数据': 302,
+            '投诉处罚': 303
+        }
+        key2label = dict((i[1], i[0]) for i in label2key.items())
+
+        today = str(datetime.date.today())
+        yesterday = str(datetime.date.today() - datetime.timedelta(days=1))
+        front_year = str(datetime.date.today() - datetime.timedelta(days=365))
+        bool_query = BoolQuery(must_queries=[RangeQuery("update_time", yesterday + " 00:00:00", today + " 00:00:00"),
+                                             RangeQuery("page_time", front_year, today),
+                                             RangeQuery("status", 201, 301),
+                                             RangeQuery("docid_number", 4, 30)]
+                               )
+        all_rows = []
+        rows, next_token, total_count, is_all_succeed = self.ots_client.search("project2", "project2_index",
+                                                                          SearchQuery(bool_query, sort=Sort(sorters=[
+                                                                              FieldSort("update_time", SortOrder.ASC)]),
+                                                                                      limit=100, get_total_count=True),
+                                                                          ColumnsToGet(['uuid', 'docids', 'update_time','docid_number'],
+                                                                                       return_type=ColumnReturnType.SPECIFIED))
+        all_rows.extend(rows)
+        while next_token:
+            rows, next_token, total_count, is_all_succeed = self.ots_client.search("project2", "project2_index",
+                                                                              SearchQuery(bool_query,
+                                                                                          next_token=next_token,
+                                                                                          sort=Sort(sorters=[
+                                                                                              FieldSort("update_time",SortOrder.ASC)]),
+                                                                                          limit=100,get_total_count=True),
+                                                                              ColumnsToGet(['uuid', 'docids', 'update_time','docid_number'],
+                                                                                  return_type=ColumnReturnType.SPECIFIED))
+            all_rows.extend(rows)
+
+        list_dict = getRow_ots(all_rows)
+        docids_list = []
+        for _dict in list_dict:
+            _uuid = _dict.get("uuid", "")
+            _docids = _dict.get("docids", "")
+            _docids = _docids.split(",")
+            for docid in _docids:
+                docids_list.append([_uuid, int(docid)])
+        # print('docids_list len:', len(docids_list))
+
+        ots_query_res = []
+        doc_columns_list = ['page_time', 'tenderee', 'tenderee_phone', 'agency', 'agency_phone', 'extract_count',
+                            "sub_docs_json",'extract_json', 'extract_json1', 'extract_json2', 'extract_json3']
+
+        def extract_json_process(res_json):
+            # 解析document数据
+            extract_json = res_json.pop("extract_json")
+            extract_json = extract_json if extract_json else "{}"
+            if 'extract_json1' in res_json:
+                extract_json1 = res_json.pop("extract_json1")
+                extract_json1 = extract_json1 if extract_json1 else ""
+                extract_json = extract_json + extract_json1
+            if 'extract_json2' in res_json:
+                extract_json2 = res_json.pop("extract_json2")
+                extract_json2 = extract_json2 if extract_json2 else ""
+                extract_json = extract_json + extract_json2
+            if 'extract_json3' in res_json:
+                extract_json3 = res_json.pop("extract_json3")
+                extract_json3 = extract_json3 if extract_json3 else ""
+                extract_json = extract_json + extract_json3
+            try:
+                extract_json = json.loads(extract_json)
+            except:
+                return None
+
+            docchannel_dict = extract_json.get('docchannel', {})
+            res_json['docchannel'] = docchannel_dict.get('docchannel', "")
+            res_json['life_docchannel'] = docchannel_dict.get('life_docchannel', "")
+
+            district_dict = extract_json.get('district', {})
+            res_json['province'] = district_dict.get('province', "")
+            res_json['city'] = district_dict.get('city', "")
+            res_json['district'] = district_dict.get('district', "")
+            res_json['area'] = district_dict.get('area', "")
+
+            prem = extract_json.get('prem', {})
+            res_json['prem'] = prem
+
+            return res_json
+
+        def _handle(item, _):
+            # 查询解析document数据
+            _uuid = item[0]  # project uuid
+            _docid = item[1]
+            for i in range(3):
+                try:
+                    bool_query = BoolQuery(must_queries=[TermQuery('docid', _docid)]
+                                           )
+                    rows, next_token, total_count, is_all_succeed = self.ots_client.search("document", "document_index",
+                                                                                      SearchQuery(bool_query,
+                                                                                                  sort=Sort(sorters=[FieldSort("page_time",SortOrder.ASC)]),
+                                                                                                  limit=None,get_total_count=True),
+                                                                                      ColumnsToGet(doc_columns_list,
+                                                                                                   return_type=ColumnReturnType.SPECIFIED))
+                    res = getRow_ots(rows)
+                    if res:
+                        # 通过extract_count过滤掉相关性不大的公告
+                        if res[0].get('extract_count', 0) > 5:
+                            ots_query_res.append([_uuid, _docid, extract_json_process(res[0])])
+                        break
+                except Exception as e:
+                    # print('error:',e)
+                    pass
+
+        task_queue = Queue()
+        for item in docids_list:
+            task_queue.put(item)
+            if task_queue.qsize() >= 10000:
+                _mt = MultiThreadHandler(task_queue, _handle, None, 20)
+                _mt.run()
+        if task_queue.qsize() >= 0:
+            _mt = MultiThreadHandler(task_queue, _handle, None, 20)
+            _mt.run()
+
+        # print('ots_query_res len:', len(ots_query_res))
+
+        # 处理修复数据
+        ots_query_res.sort(key=lambda x: x[0])
+        # 招标类别
+        zb_type = [51, 52, 101, 102, 103, 104, 105, 114, 118, 119, 120, 121, 122]
+        zb_type = [key2label[i] for i in zb_type]
+
+        change_res = []
+        for key, group in groupby(ots_query_res, lambda x: (x[0])):
+            uuid = key
+            project_data = list(group)
+            all_len = len(project_data)
+            if all_len < 4:
+                continue
+            zb_len = sum([1 if i[2].get('docchannel') in zb_type else 0 for i in project_data])
+            # 招标类公告占比
+            # if zb_len / all_len <= 0.5:
+            if zb_len / all_len <= 0.7:
+                # 项目不是招标相关项目
+                continue
+            # 项目里最多的省份
+            province_list = [i[2].get('province', '') for i in project_data]
+            province_sort = Counter(province_list).most_common()
+            change_province = ""
+            change_city = ""
+            change_district = ""
+            change_area = ""
+            # if province_sort[0][1]/all_len > 0.5:
+            if province_sort[0][1] / all_len > 0.7:
+                if province_sort[0][0] and province_sort[0][0] not in ["全国", "未知"]:
+                    change_province = province_sort[0][0]
+            if change_province:
+                # 只替换到city,district 取"未知"
+                change_province_data = [(i[2].get('province', ''), i[2].get('city', ''), i[2].get('area', '')) for i in
+                                        project_data if i[2].get('province', '') == change_province]
+                change_province_data_sort = Counter(change_province_data).most_common()
+                change_city = change_province_data_sort[0][0][1]
+                change_area = change_province_data_sort[0][0][2]
+                change_district = "未知"
+
+            # 联系方式统计
+            phone_dict = {}
+            for d in project_data:
+                tenderee = d[2].get("tenderee", "")
+                agency = d[2].get("agency", "")
+                prem = d[2].get("prem", {})
+
+                if len(prem) > 0:
+                    for name, project in prem.items():
+                        roleList = project.get("roleList", [])
+                        for role in roleList:
+                            role_name = role.get("role_name", "")
+                            role_text = role.get("role_text", "")
+                            if role_name in ['tenderee', 'agency', 'win_tenderer']:
+                                linklist = role.get("linklist", [])
+                                for _contact in linklist:
+                                    if _contact[1] not in phone_dict:
+                                        phone_dict[_contact[1]] = {}
+                                    if role_text not in phone_dict[_contact[1]]:
+                                        phone_dict[_contact[1]][role_text] = 0
+                                    phone_dict[_contact[1]][role_text] += 1
+            # 汇总电话对应的实体
+            new_phone_dict = dict((phone, []) for phone in phone_dict)
+            for phone, value in phone_dict.items():
+                phone_name = [(name, count) for name, count in value.items()]
+                phone_name.sort(key=lambda x: x[1], reverse=True)
+                max_count = phone_name[0][1]
+                max_name = [name for name, count in value.items() if count == max_count and max_count > 0]
+                new_phone_dict[phone] = max_name
+
+            for item in project_data:
+                change_json = {"partitionkey": item[2].get("partitionkey"),
+                               'docid': item[1],
+                               'contactsByDelete': []}
+                tenderee = item[2].get("tenderee", "")
+                agency = item[2].get("agency", "")
+                # docchannel修复
+                docchannel = item[2].get('docchannel', "")
+                life_docchannel = item[2].get('life_docchannel', "")
+                if docchannel and docchannel not in zb_type:
+                    if life_docchannel in zb_type and docchannel != '采招数据':
+                        change_json['docchannel'] = label2key.get(life_docchannel)
+                # province修复
+                province = item[2].get('province', "")
+                if change_province:
+                    if province != change_province and province in ["全国", "未知", '']:  # province未识别时才修复
+                        change_json['province'] = change_province
+                        change_json['city'] = change_city
+                        change_json['district'] = change_district
+                        change_json['area'] = change_area
+
+                # 联系方式修复
+                tenderee_phone = item[2].get("tenderee_phone", "")
+                agency_phone = item[2].get("agency_phone", "")
+                prem = item[2].get("prem", {})
+                sub_docs_json = item[2].get("sub_docs_json", "[]")
+                try:
+                    sub_docs_json = json.loads(sub_docs_json)
+                except:
+                    sub_docs_json = []
+                for name, project in prem.items():
+                    roleList = project.get("roleList", [])
+                    for role in roleList:
+                        role_name = role.get("role_name", "")
+                        role_text = role.get("role_text", "")
+                        if role_name == 'tenderee' and role_text == tenderee:
+                            linklist = role.get("linklist", [])
+                            need_change = False
+                            right_contact = []
+                            for _contact in linklist:
+                                if _contact[1] and new_phone_dict.get(_contact[1]) and role_text not in new_phone_dict[_contact[1]]:
+                                    change_json['contactsByDelete'].append({"enterpriseName": role_text, "phoneNo": _contact[1]})
+                                    if _contact[1] == tenderee_phone:
+                                        need_change = True
+                                else:
+                                    right_contact.append([_contact[0], _contact[1]])
+                            if need_change:
+                                if right_contact:
+                                    right_contact.sort(reverse=True)
+                                    change_json['tendereeContact'] = right_contact[0][0]
+                                    change_json['tendereePhone'] = right_contact[0][1]
+                        elif role_name == 'agency' and role_text == agency:
+                            linklist = role.get("linklist", [])
+                            need_change = False
+                            right_contact = []
+                            for _contact in linklist:
+                                if _contact[1] and new_phone_dict.get(_contact[1]) and role_text not in new_phone_dict[_contact[1]]:
+                                    change_json['contactsByDelete'].append({"enterpriseName": role_text, "phoneNo": _contact[1]})
+                                    if _contact[1] == agency_phone:
+                                        need_change = True
+                                else:
+                                    right_contact.append([_contact[0], _contact[1]])
+                            if need_change:
+                                if right_contact:
+                                    right_contact.sort(reverse=True)
+                                    change_json['agencyContact'] = right_contact[0][0]
+                                    change_json['agencyPhone'] = right_contact[0][1]
+                        elif role_name == 'win_tenderer':
+                            linklist = role.get("linklist", [])
+                            for _contact in linklist:
+                                if _contact[1] and new_phone_dict.get(_contact[1]) and role_text not in new_phone_dict[_contact[1]]:
+                                    change_json['contactsByDelete'].append({"enterpriseName": role_text, "phoneNo": _contact[1]})
+
+                sub_docs_json_change = False
+                if sub_docs_json:
+                    for _project in sub_docs_json:
+                        win_tenderer = _project.get("win_tenderer", "")
+                        win_tenderer_phone = _project.get("win_tenderer_phone", "")
+                        if win_tenderer_phone and new_phone_dict.get(win_tenderer_phone) and win_tenderer not in new_phone_dict[win_tenderer_phone]:
+                            _project["win_tenderer_phone"] = ""
+                            _project["win_tenderer_manager"] = ""
+                            sub_docs_json_change = True
+                if sub_docs_json_change:
+                    change_json['subDocsJson'] = sub_docs_json
+
+                new_contact_json = []
+                for _contact in change_json['contactsByDelete']:
+                    if _contact not in new_contact_json:
+                        new_contact_json.append(_contact)
+                change_json['contactsByDelete'] = new_contact_json
+                if len(change_json) > 3 or len(change_json['contactsByDelete']) > 0:
+                    # 没有修改地区时,传输原来提取的地区
+                    if not change_json.get("province"):
+                        change_json['area'] = item[2].get("area", "")
+                        change_json['province'] = item[2].get("province", "")
+                        change_json['city'] = item[2].get("city", "")
+                        change_json['district'] = item[2].get("district", "")
+                    change_res.append({"document": change_json})
+
+        # post result
+        headers = {'Content-Type': 'application/json',
+                   "Authorization": "Bearer eyJhbGciOiJIUzUxMiJ9.eyJ1c2VySWQiOjEsInVzZXJuYW1lIjoiYWRtaW4iLCJ1dWlkIjoiNGQwYzA0ODYtMzVmZi00MDJhLTk4OWQtNWEwNTE3YTljMDNiIiwic3ViIjoiMSIsImlhdCI6MTY3OTk5MTcxNywiZXhwIjo0ODMzNTkxNzE3fQ.ESDDnEDYP5ioK4ouHOYXsZbLayGRNVI9ugpbxDx_3fPIceD1KIjlDeopBmeATLoz8VYQihd8qO-UzP5pDsaUmQ"}
+        # url = "http://192.168.2.26:8002/document/updateAreaAndContact"
+        url = "http://data-api.bidizhaobiao.com/document/updateAreaAndContact"
+        for _data in change_res:
+            post_sucess = False
+            for i in range(3):
+                if not post_sucess:
+                    try:
+                        # 发送POST请求,传输JSON数据
+                        response = requests.post(url, json=_data,headers=headers)
+                        # print(response.status_code,response.json())
+                        # 检查响应状态码
+                        if response.status_code == 200:
+                            post_sucess = True
+                    except requests.exceptions.RequestException as e:
+                        # log("fix doc by project2,post error reason: %s"%(str(e)))
+                        pass
+
+        log("fix doc by project2, change doc nums:%d"%len(change_res))
+
 
     def start_flow_dumplicate(self):
         schedule = BlockingScheduler()
@@ -4881,6 +5216,7 @@ class Dataflow_dumplicate(Dataflow):
         schedule.add_job(self.flow_remove,"cron",hour="20")
         schedule.add_job(self.send_daily_check_data,"cron",hour='9', minute='10')
         schedule.add_job(self.send_daily_check_data2,"cron",hour='9', minute='10')
+        schedule.add_job(self.fix_doc_by_project2,"cron",hour='8', minute='10')
         schedule.add_job(self.flow_remove_project_tmp,"cron",hour="20")
         schedule.add_job(self.fix_doc_which_not_in_project,"cron",minute="*/10")
         schedule.start()
@@ -4913,13 +5249,13 @@ class Dataflow_dumplicate(Dataflow):
 
     def test_dumplicate(self,docid):
         # columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status]
-        columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type,'detail_link']
+        columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type,'detail_link','products']
         # print('columns',columns)
         item = self.get_attrs_before_dump(docid,columns)
 
         if item:
             log("start dumplicate_comsumer_handle")
-            self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=True,upgrade=False)
+            self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=False,upgrade=False)
             return
 
     def test_merge(self,list_docid_less,list_docid_greater):
@@ -5158,7 +5494,7 @@ if __name__ == '__main__':
     # test_attachment_interface()
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
-    df_dump.test_dumplicate(628170641
+    df_dump.test_dumplicate(626076001
                             )
     # df_dump.dumplicate_comsumer_handle_interface(603504420,document_table="document_0000",document_table_index="document_0000_index",project_table="project_0000",project_table_index="project_0000_index_formerge")
     # compare_dumplicate_check()

+ 106 - 30
BaseDataMaintenance/maxcompute/documentDumplicate.py

@@ -1002,12 +1002,12 @@ def check_demand():
 package_number_pattern = re.compile("(?P<name>(((([^承]|^)包|标[段号的包]|分?包|包组|包件)编?号?|子项目|项目类型|项目)[::]?[0-9A-Za-z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦ]{1,4}[^\.]?)[^至]?|((?![\.])第?[ⅠⅡⅢⅣⅤⅥⅦ0-9A-Za-z一二三四五六七八九十]{1,4}(包号|标[段号的包]|分?包)))")  # 第? 去掉问号 修复 纯木浆8包/箱复印 这种作为包号
 code_pattern = re.compile("[A-Za-z0-9\-\(\)()【】\.-]+")
 num_pattern = re.compile("^\d+(?:\.\d+)?$")
-num1_pattern = re.compile("[一二三四五六七八九A-Za-z]+")
+num1_pattern = re.compile("[一二三四五六七八九A-Za-z]+")
 location_pattern = re.compile("[^\[【\(]{1,2}[市区镇县村路]")
-building_pattern = "工程招标代理|工程设计|暂停|继续|工程造价咨询|施工图设计文件审查|咨询|环评|设计|施工监理|施工|监理|EPC|epc|总承包|水土保持|选址论证|勘界|勘察|预算编制|预算审核|结算审计|招标代理|设备类|第?[\((]?[一二三四五六七八九1-9][)\)]?[次批]"
+building_pattern = "工程招标代理|工程设计|暂停|继续|工程造价咨询|施工图设计文件审查|咨询|环评|设计|施工监理|施工|监理|EPC|epc|总承包|水土保持|选址论证|勘界|勘察|预算编制|预算审核|结算审计|招标代理|设备类|第?[\((]?[一二三四五六七八九1-9]+[)\)]?[次批]"
 rebid_pattern = "再次|重新招标|[一二三四五六七八九十]+次"
 date_pattern = re.compile("\d{2,4}[\-\./年]\d{1,2}[\-\./月]\d{1,2}")
-def check_doctitle(doctitle_refind_less, doctitle_refind_greater, codes_less=[], code_greater=[]):
+def check_doctitle(doctitle_refind_less, doctitle_refind_greater, codes_less=[], code_greater=[],page_time_less="",page_time_greater=""):
     if code_greater is None:
         code_greater = []
     doctitle_refind_less = str(doctitle_refind_less).replace("(","(").replace(")",")")
@@ -1027,8 +1027,9 @@ def check_doctitle(doctitle_refind_less, doctitle_refind_greater, codes_less=[],
     _pack1 = None
     _pack2 = None
     #if contain then pass
-    if doctitle_refind_less.find(doctitle_refind_greater)>=0 or doctitle_refind_greater.find(doctitle_refind_less)>=0:
-        return True
+    if page_time_less and page_time_less == page_time_greater:
+        if doctitle_refind_less.find(doctitle_refind_greater)>=0 or doctitle_refind_greater.find(doctitle_refind_less)>=0:
+            return True
     #check the package in title
 
     _match = re.search(package_number_pattern,doctitle_refind_less)
@@ -1067,7 +1068,16 @@ def check_doctitle(doctitle_refind_less, doctitle_refind_greater, codes_less=[],
                 return False
 
     #check location and keywords
-    for _p in [num1_pattern,building_pattern,rebid_pattern]:
+    for _p in [num1_pattern,building_pattern]:
+        num_all_l = re.findall(_p,doctitle_refind_less)
+        num_all_g = re.findall(_p,doctitle_refind_greater)
+        set_num_l = set(num_all_l)
+        set_num_g = set(num_all_g)
+        if len(set_num_l)==len(set_num_g):
+            if len(set_num_l&set_num_g)!=len(set_num_l):
+                return False
+    # 重新(多次)招标关键词
+    for _p in [rebid_pattern]:
         num_all_l = re.findall(_p,doctitle_refind_less)
         num_all_g = re.findall(_p,doctitle_refind_greater)
         set_num_l = set(num_all_l)
@@ -1075,6 +1085,8 @@ def check_doctitle(doctitle_refind_less, doctitle_refind_greater, codes_less=[],
         if len(set_num_l)==len(set_num_g):
             if len(set_num_l&set_num_g)!=len(set_num_l):
                 return False
+        elif (len(set_num_l) and not len(set_num_g)) or (len(set_num_g) and not len(set_num_l)):
+            return False
 
     #check the location has conflict
     for _p in [location_pattern]:
@@ -1164,7 +1176,8 @@ def check_product(product_less,product_greater,split_char=",",doctitle_refine_le
             #         return False
         for _l in _product_l:
             for _g in _product_g:
-                if getSimilarityOfString(_l,_g)>=0.8 or doctitle_refine_greater.find(_l)>=0 or doctitle_refine_less.find(_g)>=0:
+                # if getSimilarityOfString(_l,_g)>=0.8 or doctitle_refine_greater.find(_l)>=0 or doctitle_refine_less.find(_g)>=0:
+                if getSimilarityOfString(_l,_g)>=0.8 or doctitle_refine_greater.find(_l)>=0:
                     same_count += 1
                     break
         if same_count/len(_product_l)>=0.5:
@@ -1216,6 +1229,40 @@ def check_time(json_time_less,json_time_greater):
         return 0
     return 1
 
+def check_products(products_less,products_greater):
+    if isinstance(products_less, list):
+        products_less = products_less
+    else:
+        products_less = json.loads(products_less) if products_less else []
+    if isinstance(products_greater, list):
+        products_greater = products_greater
+    else:
+        products_greater = json.loads(products_greater) if products_greater else []
+    # if len(products_less)>0 and len(products_greater)>0:
+    if len(products_less)>=4 and len(products_greater)>=4:
+        products_less_list = [p['product'] for p in products_less]
+        products_less_list = product_dump(products_less_list)
+        products_greater_list = [p['product'] for p in products_greater]
+        products_greater_list = product_dump(products_greater_list)
+        if len(products_less_list)>len(products_greater_list):
+            a = products_greater_list
+            products_greater_list = products_less_list
+            products_less_list = a
+        # print('products_less_list',products_less_list)
+        # print('products_greater_list',products_greater_list)
+        same_count = 0
+        for _l in products_less_list:
+            for _g in products_greater_list:
+                if getSimilarityOfString(_l,_g)>=0.8:
+                    same_count += 1
+                    break
+        if same_count/len(products_less_list)<0.5:
+            # print('check_products false')
+            return False
+
+    return True
+
+
 def check_dumplicate_rule(document_less,document_greater,min_counts,b_log=False,hard_level=1):
 
     docid_less = document_less["docid"]
@@ -1236,6 +1283,7 @@ def check_dumplicate_rule(document_less,document_greater,min_counts,b_log=False,
     fingerprint_less = document_less.get("fingerprint")
     extract_count_less = document_less.get("extract_count",0)
     web_source_no_less = document_less.get("web_source_no")
+    web_source_name_less = document_less.get("web_source_name")
     province_less = document_less.get("province")
     city_less = document_less.get("city")
     district_less = document_less.get("district")
@@ -1247,6 +1295,7 @@ def check_dumplicate_rule(document_less,document_greater,min_counts,b_log=False,
     source_type_less = document_less.get("source_type")
     detail_link_less = document_less.get("detail_link")
     is_special_bonds_less = document_less.get("is_special_bonds")
+    products_less = document_less.get("products")
 
 
     docid_greater = document_greater["docid"]
@@ -1267,11 +1316,13 @@ def check_dumplicate_rule(document_less,document_greater,min_counts,b_log=False,
     fingerprint_greater = document_greater.get("fingerprint")
     extract_count_greater = document_greater.get("extract_count",0)
     web_source_no_greater = document_greater.get("web_source_no")
+    web_source_name_greater = document_greater.get("web_source_name")
     province_greater = document_greater.get("province")
     city_greater = document_greater.get("city")
     district_greater = document_greater.get("district")
     detail_link_greater = document_greater.get("detail_link")
     is_special_bonds_greater = document_greater.get("is_special_bonds")
+    products_greater = document_greater.get("products")
 
     moneys_greater = document_greater.get("moneys")
     moneys_attachment_greater = document_greater.get("moneys_attachment")
@@ -1281,10 +1332,54 @@ def check_dumplicate_rule(document_less,document_greater,min_counts,b_log=False,
     approval_greater = document_greater.get("approval",[])
     source_type_greater = document_greater.get("source_type")
 
-
+    # print('docid:',docid_less,docid_greater)
     if fingerprint_less==fingerprint_greater and getLength(fingerprint_less)>0:
+        # print('fingerprint same')
         return 1
 
+    # # 专项债去重
+    if is_special_bonds_greater==is_special_bonds_less==1:
+        detail_link_less = detail_link_less.strip() if detail_link_less else ""
+        detail_link_greater = detail_link_greater.strip() if detail_link_greater else ""
+        if "bondId=" in detail_link_less:
+            bondId_less = detail_link_less.split("bondId=")[1]
+            bondId_less = bondId_less.split(",") if bondId_less else []
+        else:
+            bondId_less = []
+        if "bondId=" in detail_link_greater:
+            bondId_greater = detail_link_greater.split("bondId=")[1]
+            bondId_greater = bondId_greater.split(",") if bondId_greater else []
+        else:
+            bondId_greater = []
+        # print('bondId_less',bondId_less)
+        # print('bondId_greater',bondId_greater)
+        if bondId_less and bondId_greater:
+            bondId_less = set(bondId_less)
+            bondId_greater = set(bondId_greater)
+            if bondId_less.issubset(bondId_greater) or bondId_greater.issubset(bondId_less):
+                return 1
+
+    # 站源相同时,除了fingerprint一样和detail_link一样,其他不去重
+    if web_source_no_less==web_source_no_greater and getLength(web_source_no_less)>0:
+        if getLength(detail_link_less)>0 and getLength(detail_link_greater)>0:
+            if detail_link_less != detail_link_greater:
+                # print('站源相同时,detail_link不一样,直接不去重')
+                return 0
+            else: # 链接一样时,判断其是否为主页或者列表页
+                detail_link_split_less = re.sub("https?://","",detail_link_less.strip())
+                detail_link_split_less = re.split("/",detail_link_split_less)
+                detail_link_split_less = [i for i in detail_link_split_less if i]
+                if len(detail_link_split_less)==1: #链接为站源主页域名
+                    # print('站源相同时,detail_link一样,链接为站源主页域名')
+                    return 0
+                elif re.search("(index|list)(\.html?|\.do)?$",detail_link_split_less[-1],re.I): #链接为列表页
+                    # print('站源相同时,detail_link一样,链接为列表页')
+                    return 0
+
+    # 采购产品products对比
+    if getLength(products_less)>0 and getLength(products_greater)>0:
+        if not check_products(products_less,products_greater):
+            return 0
 
     #一篇要素都在附件,且两篇附件md5有重叠
     set_md5_less = set()
@@ -1386,27 +1481,6 @@ def check_dumplicate_rule(document_less,document_greater,min_counts,b_log=False,
             if demand_info_greater and len(demand_info_greater)==len(demand_info_less):# demand_info完全相同
                 return 1
 
-    # 专项债去重
-    if is_special_bonds_greater==is_special_bonds_less==1:
-        detail_link_less = detail_link_less.strip() if detail_link_less else ""
-        detail_link_greater = detail_link_greater.strip() if detail_link_greater else ""
-        if "bondId=" in detail_link_less:
-            bondId_less = detail_link_less.split("bondId=")[1]
-            bondId_less = bondId_less.split(",") if bondId_less else []
-        else:
-            bondId_less = []
-        if "bondId=" in detail_link_greater:
-            bondId_greater = detail_link_greater.split("bondId=")[1]
-            bondId_greater = bondId_greater.split(",") if bondId_greater else []
-        else:
-            bondId_greater = []
-        # print('bondId_less',bondId_less)
-        # print('bondId_greater',bondId_greater)
-        if bondId_less and bondId_greater:
-            bondId_less = set(bondId_less)
-            bondId_greater = set(bondId_greater)
-            if bondId_less.issubset(bondId_greater) or bondId_greater.issubset(bondId_less):
-                return 1
 
     same_count = 0
     all_count = 8
@@ -1463,6 +1537,7 @@ def check_dumplicate_rule(document_less,document_greater,min_counts,b_log=False,
     else:
         base_prob = 0.6
     _prob = base_prob*same_count/all_count
+    # print('base_prob',base_prob,'min_counts',min_counts,'same_count',same_count,'all_count',all_count)
     if min(extract_count_less,extract_count_greater)<=3 and max(extract_count_less,extract_count_greater)>=5:
         if _prob<0.1 and str(page_time_less)==str(page_time_greater):
             if str(docchannel_less) not in ("302","303"):
@@ -1484,7 +1559,7 @@ def check_dumplicate_rule(document_less,document_greater,min_counts,b_log=False,
                 check_result["pass"] = 0
             else:
                 check_result["docchannel"] = 2
-    if not check_doctitle(doctitle_refine_less,doctitle_refine_greater,project_codes_less,project_codes_greater):
+    if not check_doctitle(doctitle_refine_less,doctitle_refine_greater,project_codes_less,project_codes_greater,page_time_less,page_time_greater):
         check_result["doctitle"] = 0
         check_result["pass"] = 0
         if b_log:
@@ -1596,6 +1671,7 @@ def check_dumplicate_rule(document_less,document_greater,min_counts,b_log=False,
         if b_log:
             logging.info("hard_level %s and check_product less than 2"%(str(hard_level)))
         return 0
+    # print('check_result',check_result,'_prob',_prob)
     if check_result.get("pass",0)==0:
         if b_log:
             logging.info(str(check_result))