Bläddra i källkod

Merge remote-tracking branch 'origin/master'

luojiehua 2 veckor sedan
förälder
incheckning
26204fd286
1 ändrade filer med 335 tillägg och 0 borttagningar
  1. 335 0
      BaseDataMaintenance/maintenance/dataflow.py

+ 335 - 0
BaseDataMaintenance/maintenance/dataflow.py

@@ -4872,6 +4872,340 @@ class Dataflow_dumplicate(Dataflow):
         if res:
             log("send_daily_check_data2,sent data len: %d"%(res_json['count']))
 
+    # 根据项目修复doc公告数据
+    def fix_doc_by_project2(self):
+        import datetime
+        from itertools import groupby
+        from collections import Counter
+        label2key = {
+            '公告变更': 51,
+            '招标公告': 52,
+            '中标信息': 101,
+            '招标预告': 102,
+            '招标答疑': 103,
+            '招标文件': 104,
+            '资审结果': 105,
+            '法律法规': 106,
+            '新闻资讯': 107,
+            '采购意向': 114,
+            '拍卖出让': 115,
+            '土地矿产': 116,
+            '产权交易': 117,
+            '废标公告': 118,
+            '候选人公示': 119,
+            '合同公告': 120,
+            '开标记录': 121,
+            '验收合同': 122,
+            # 以下排除
+            '拟在建数据': 301,
+            '审批项目数据': 302,
+            '投诉处罚': 303
+        }
+        key2label = dict((i[1], i[0]) for i in label2key.items())
+
+        today = str(datetime.date.today())
+        yesterday = str(datetime.date.today() - datetime.timedelta(days=1))
+        front_year = str(datetime.date.today() - datetime.timedelta(days=365))
+        bool_query = BoolQuery(must_queries=[RangeQuery("update_time", yesterday + " 00:00:00", today + " 00:00:00"),
+                                             RangeQuery("page_time", front_year, today),
+                                             RangeQuery("status", 201, 301),
+                                             RangeQuery("docid_number", 4, 30)]
+                               )
+        all_rows = []
+        rows, next_token, total_count, is_all_succeed = self.ots_client.search("project2", "project2_index",
+                                                                          SearchQuery(bool_query, sort=Sort(sorters=[
+                                                                              FieldSort("update_time", SortOrder.ASC)]),
+                                                                                      limit=100, get_total_count=True),
+                                                                          ColumnsToGet(['uuid', 'docids', 'update_time','docid_number'],
+                                                                                       return_type=ColumnReturnType.SPECIFIED))
+        all_rows.extend(rows)
+        while next_token:
+            rows, next_token, total_count, is_all_succeed = self.ots_client.search("project2", "project2_index",
+                                                                              SearchQuery(bool_query,
+                                                                                          next_token=next_token,
+                                                                                          sort=Sort(sorters=[
+                                                                                              FieldSort("update_time",SortOrder.ASC)]),
+                                                                                          limit=100,get_total_count=True),
+                                                                              ColumnsToGet(['uuid', 'docids', 'update_time','docid_number'],
+                                                                                  return_type=ColumnReturnType.SPECIFIED))
+            all_rows.extend(rows)
+
+        list_dict = getRow_ots(all_rows)
+        docids_list = []
+        for _dict in list_dict:
+            _uuid = _dict.get("uuid", "")
+            _docids = _dict.get("docids", "")
+            _docids = _docids.split(",")
+            for docid in _docids:
+                docids_list.append([_uuid, int(docid)])
+        # print('docids_list len:', len(docids_list))
+
+        ots_query_res = []
+        doc_columns_list = ['page_time', 'tenderee', 'tenderee_phone', 'agency', 'agency_phone', 'extract_count',
+                            "sub_docs_json",'extract_json', 'extract_json1', 'extract_json2', 'extract_json3']
+
+        def extract_json_process(res_json):
+            # 解析document数据
+            extract_json = res_json.pop("extract_json")
+            extract_json = extract_json if extract_json else "{}"
+            if 'extract_json1' in res_json:
+                extract_json1 = res_json.pop("extract_json1")
+                extract_json1 = extract_json1 if extract_json1 else ""
+                extract_json = extract_json + extract_json1
+            if 'extract_json2' in res_json:
+                extract_json2 = res_json.pop("extract_json2")
+                extract_json2 = extract_json2 if extract_json2 else ""
+                extract_json = extract_json + extract_json2
+            if 'extract_json3' in res_json:
+                extract_json3 = res_json.pop("extract_json3")
+                extract_json3 = extract_json3 if extract_json3 else ""
+                extract_json = extract_json + extract_json3
+            try:
+                extract_json = json.loads(extract_json)
+            except:
+                return None
+
+            docchannel_dict = extract_json.get('docchannel', {})
+            res_json['docchannel'] = docchannel_dict.get('docchannel', "")
+            res_json['life_docchannel'] = docchannel_dict.get('life_docchannel', "")
+
+            district_dict = extract_json.get('district', {})
+            res_json['province'] = district_dict.get('province', "")
+            res_json['city'] = district_dict.get('city', "")
+            res_json['district'] = district_dict.get('district', "")
+            res_json['area'] = district_dict.get('area', "")
+
+            prem = extract_json.get('prem', {})
+            res_json['prem'] = prem
+
+            return res_json
+
+        def _handle(item, _):
+            # 查询解析document数据
+            _uuid = item[0]  # project uuid
+            _docid = item[1]
+            for i in range(3):
+                try:
+                    bool_query = BoolQuery(must_queries=[TermQuery('docid', _docid)]
+                                           )
+                    rows, next_token, total_count, is_all_succeed = self.ots_client.search("document", "document_index",
+                                                                                      SearchQuery(bool_query,
+                                                                                                  sort=Sort(sorters=[FieldSort("page_time",SortOrder.ASC)]),
+                                                                                                  limit=None,get_total_count=True),
+                                                                                      ColumnsToGet(doc_columns_list,
+                                                                                                   return_type=ColumnReturnType.SPECIFIED))
+                    res = getRow_ots(rows)
+                    if res:
+                        # 通过extract_count过滤掉相关性不大的公告
+                        if res[0].get('extract_count', 0) > 5:
+                            ots_query_res.append([_uuid, _docid, extract_json_process(res[0])])
+                        break
+                except Exception as e:
+                    # print('error:',e)
+                    pass
+
+        task_queue = Queue()
+        for item in docids_list:
+            task_queue.put(item)
+            if task_queue.qsize() >= 10000:
+                _mt = MultiThreadHandler(task_queue, _handle, None, 20)
+                _mt.run()
+        if task_queue.qsize() >= 0:
+            _mt = MultiThreadHandler(task_queue, _handle, None, 20)
+            _mt.run()
+
+        # print('ots_query_res len:', len(ots_query_res))
+
+        # 处理修复数据
+        ots_query_res.sort(key=lambda x: x[0])
+        # 招标类别
+        zb_type = [51, 52, 101, 102, 103, 104, 105, 114, 118, 119, 120, 121, 122]
+        zb_type = [key2label[i] for i in zb_type]
+
+        change_res = []
+        for key, group in groupby(ots_query_res, lambda x: (x[0])):
+            uuid = key
+            project_data = list(group)
+            all_len = len(project_data)
+            if all_len < 4:
+                continue
+            zb_len = sum([1 if i[2].get('docchannel') in zb_type else 0 for i in project_data])
+            # 招标类公告占比
+            # if zb_len / all_len <= 0.5:
+            if zb_len / all_len <= 0.7:
+                # 项目不是招标相关项目
+                continue
+            # 项目里最多的省份
+            province_list = [i[2].get('province', '') for i in project_data]
+            province_sort = Counter(province_list).most_common()
+            change_province = ""
+            change_city = ""
+            change_district = ""
+            change_area = ""
+            # if province_sort[0][1]/all_len > 0.5:
+            if province_sort[0][1] / all_len > 0.7:
+                if province_sort[0][0] and province_sort[0][0] not in ["全国", "未知"]:
+                    change_province = province_sort[0][0]
+            if change_province:
+                # 只替换到city,district 取"未知"
+                change_province_data = [(i[2].get('province', ''), i[2].get('city', ''), i[2].get('area', '')) for i in
+                                        project_data if i[2].get('province', '') == change_province]
+                change_province_data_sort = Counter(change_province_data).most_common()
+                change_city = change_province_data_sort[0][0][1]
+                change_area = change_province_data_sort[0][0][2]
+                change_district = "未知"
+
+            # 联系方式统计
+            phone_dict = {}
+            for d in project_data:
+                tenderee = d[2].get("tenderee", "")
+                agency = d[2].get("agency", "")
+                prem = d[2].get("prem", {})
+
+                if len(prem) > 0:
+                    for name, project in prem.items():
+                        roleList = project.get("roleList", [])
+                        for role in roleList:
+                            role_name = role.get("role_name", "")
+                            role_text = role.get("role_text", "")
+                            if role_name in ['tenderee', 'agency', 'win_tenderer']:
+                                linklist = role.get("linklist", [])
+                                for _contact in linklist:
+                                    if _contact[1] not in phone_dict:
+                                        phone_dict[_contact[1]] = {}
+                                    if role_text not in phone_dict[_contact[1]]:
+                                        phone_dict[_contact[1]][role_text] = 0
+                                    phone_dict[_contact[1]][role_text] += 1
+            # 汇总电话对应的实体
+            new_phone_dict = dict((phone, []) for phone in phone_dict)
+            for phone, value in phone_dict.items():
+                phone_name = [(name, count) for name, count in value.items()]
+                phone_name.sort(key=lambda x: x[1], reverse=True)
+                max_count = phone_name[0][1]
+                max_name = [name for name, count in value.items() if count == max_count and max_count > 0]
+                new_phone_dict[phone] = max_name
+
+            for item in project_data:
+                change_json = {"partitionkey": item[2].get("partitionkey"),
+                               'docid': item[1],
+                               'contactsByDelete': []}
+                tenderee = item[2].get("tenderee", "")
+                agency = item[2].get("agency", "")
+                # docchannel修复
+                docchannel = item[2].get('docchannel', "")
+                life_docchannel = item[2].get('life_docchannel', "")
+                if docchannel and docchannel not in zb_type:
+                    if life_docchannel in zb_type and docchannel != '采招数据':
+                        change_json['docchannel'] = label2key.get(life_docchannel)
+                # province修复
+                province = item[2].get('province', "")
+                if change_province:
+                    if province != change_province and province in ["全国", "未知", '']:  # province未识别时才修复
+                        change_json['province'] = change_province
+                        change_json['city'] = change_city
+                        change_json['district'] = change_district
+                        change_json['area'] = change_area
+
+                # 联系方式修复
+                tenderee_phone = item[2].get("tenderee_phone", "")
+                agency_phone = item[2].get("agency_phone", "")
+                prem = item[2].get("prem", {})
+                sub_docs_json = item[2].get("sub_docs_json", "[]")
+                try:
+                    sub_docs_json = json.loads(sub_docs_json)
+                except:
+                    sub_docs_json = []
+                for name, project in prem.items():
+                    roleList = project.get("roleList", [])
+                    for role in roleList:
+                        role_name = role.get("role_name", "")
+                        role_text = role.get("role_text", "")
+                        if role_name == 'tenderee' and role_text == tenderee:
+                            linklist = role.get("linklist", [])
+                            need_change = False
+                            right_contact = []
+                            for _contact in linklist:
+                                if _contact[1] and new_phone_dict.get(_contact[1]) and role_text not in new_phone_dict[_contact[1]]:
+                                    change_json['contactsByDelete'].append({"enterpriseName": role_text, "phoneNo": _contact[1]})
+                                    if _contact[1] == tenderee_phone:
+                                        need_change = True
+                                else:
+                                    right_contact.append([_contact[0], _contact[1]])
+                            if need_change:
+                                if right_contact:
+                                    right_contact.sort(reverse=True)
+                                    change_json['tendereeContact'] = right_contact[0][0]
+                                    change_json['tendereePhone'] = right_contact[0][1]
+                        elif role_name == 'agency' and role_text == agency:
+                            linklist = role.get("linklist", [])
+                            need_change = False
+                            right_contact = []
+                            for _contact in linklist:
+                                if _contact[1] and new_phone_dict.get(_contact[1]) and role_text not in new_phone_dict[_contact[1]]:
+                                    change_json['contactsByDelete'].append({"enterpriseName": role_text, "phoneNo": _contact[1]})
+                                    if _contact[1] == agency_phone:
+                                        need_change = True
+                                else:
+                                    right_contact.append([_contact[0], _contact[1]])
+                            if need_change:
+                                if right_contact:
+                                    right_contact.sort(reverse=True)
+                                    change_json['agencyContact'] = right_contact[0][0]
+                                    change_json['agencyPhone'] = right_contact[0][1]
+                        elif role_name == 'win_tenderer':
+                            linklist = role.get("linklist", [])
+                            for _contact in linklist:
+                                if _contact[1] and new_phone_dict.get(_contact[1]) and role_text not in new_phone_dict[_contact[1]]:
+                                    change_json['contactsByDelete'].append({"enterpriseName": role_text, "phoneNo": _contact[1]})
+
+                sub_docs_json_change = False
+                if sub_docs_json:
+                    for _project in sub_docs_json:
+                        win_tenderer = _project.get("win_tenderer", "")
+                        win_tenderer_phone = _project.get("win_tenderer_phone", "")
+                        if win_tenderer_phone and new_phone_dict.get(win_tenderer_phone) and win_tenderer not in new_phone_dict[win_tenderer_phone]:
+                            _project["win_tenderer_phone"] = ""
+                            _project["win_tenderer_manager"] = ""
+                            sub_docs_json_change = True
+                if sub_docs_json_change:
+                    change_json['subDocsJson'] = sub_docs_json
+
+                new_contact_json = []
+                for _contact in change_json['contactsByDelete']:
+                    if _contact not in new_contact_json:
+                        new_contact_json.append(_contact)
+                change_json['contactsByDelete'] = new_contact_json
+                if len(change_json) > 3 or len(change_json['contactsByDelete']) > 0:
+                    # 没有修改地区时,传输原来提取的地区
+                    if not change_json.get("province"):
+                        change_json['area'] = item[2].get("area", "")
+                        change_json['province'] = item[2].get("province", "")
+                        change_json['city'] = item[2].get("city", "")
+                        change_json['district'] = item[2].get("district", "")
+                    change_res.append({"document": change_json})
+
+        # post result
+        headers = {'Content-Type': 'application/json',
+                   "Authorization": "Bearer eyJhbGciOiJIUzUxMiJ9.eyJ1c2VySWQiOjEsInVzZXJuYW1lIjoiYWRtaW4iLCJ1dWlkIjoiNGQwYzA0ODYtMzVmZi00MDJhLTk4OWQtNWEwNTE3YTljMDNiIiwic3ViIjoiMSIsImlhdCI6MTY3OTk5MTcxNywiZXhwIjo0ODMzNTkxNzE3fQ.ESDDnEDYP5ioK4ouHOYXsZbLayGRNVI9ugpbxDx_3fPIceD1KIjlDeopBmeATLoz8VYQihd8qO-UzP5pDsaUmQ"}
+        # url = "http://192.168.2.26:8002/document/updateAreaAndContact"
+        url = "http://data-api.bidizhaobiao.com/document/updateAreaAndContact"
+        for _data in change_res:
+            post_sucess = False
+            for i in range(3):
+                if not post_sucess:
+                    try:
+                        # 发送POST请求,传输JSON数据
+                        response = requests.post(url, json=_data,headers=headers)
+                        # print(response.status_code,response.json())
+                        # 检查响应状态码
+                        if response.status_code == 200:
+                            post_sucess = True
+                    except requests.exceptions.RequestException as e:
+                        # log("fix doc by project2,post error reason: %s"%(str(e)))
+                        pass
+
+        log("fix doc by project2, change doc nums:%d"%len(change_res))
+
 
     def start_flow_dumplicate(self):
         schedule = BlockingScheduler()
@@ -4881,6 +5215,7 @@ class Dataflow_dumplicate(Dataflow):
         schedule.add_job(self.flow_remove,"cron",hour="20")
         schedule.add_job(self.send_daily_check_data,"cron",hour='9', minute='10')
         schedule.add_job(self.send_daily_check_data2,"cron",hour='9', minute='10')
+        schedule.add_job(self.fix_doc_by_project2,"cron",hour='8', minute='10')
         schedule.add_job(self.flow_remove_project_tmp,"cron",hour="20")
         schedule.add_job(self.fix_doc_which_not_in_project,"cron",minute="*/10")
         schedule.start()