|
@@ -4872,340 +4872,6 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
if res:
|
|
|
log("send_daily_check_data2,sent data len: %d"%(res_json['count']))
|
|
|
|
|
|
- # 根据项目修复doc公告数据
|
|
|
- def fix_doc_by_project2(self):
|
|
|
- import datetime
|
|
|
- from itertools import groupby
|
|
|
- from collections import Counter
|
|
|
- label2key = {
|
|
|
- '公告变更': 51,
|
|
|
- '招标公告': 52,
|
|
|
- '中标信息': 101,
|
|
|
- '招标预告': 102,
|
|
|
- '招标答疑': 103,
|
|
|
- '招标文件': 104,
|
|
|
- '资审结果': 105,
|
|
|
- '法律法规': 106,
|
|
|
- '新闻资讯': 107,
|
|
|
- '采购意向': 114,
|
|
|
- '拍卖出让': 115,
|
|
|
- '土地矿产': 116,
|
|
|
- '产权交易': 117,
|
|
|
- '废标公告': 118,
|
|
|
- '候选人公示': 119,
|
|
|
- '合同公告': 120,
|
|
|
- '开标记录': 121,
|
|
|
- '验收合同': 122,
|
|
|
- # 以下排除
|
|
|
- '拟在建数据': 301,
|
|
|
- '审批项目数据': 302,
|
|
|
- '投诉处罚': 303
|
|
|
- }
|
|
|
- key2label = dict((i[1], i[0]) for i in label2key.items())
|
|
|
-
|
|
|
- today = str(datetime.date.today())
|
|
|
- yesterday = str(datetime.date.today() - datetime.timedelta(days=1))
|
|
|
- front_year = str(datetime.date.today() - datetime.timedelta(days=365))
|
|
|
- bool_query = BoolQuery(must_queries=[RangeQuery("update_time", yesterday + " 00:00:00", today + " 00:00:00"),
|
|
|
- RangeQuery("page_time", front_year, today),
|
|
|
- RangeQuery("status", 201, 301),
|
|
|
- RangeQuery("docid_number", 4, 30)]
|
|
|
- )
|
|
|
- all_rows = []
|
|
|
- rows, next_token, total_count, is_all_succeed = self.ots_client.search("project2", "project2_index",
|
|
|
- SearchQuery(bool_query, sort=Sort(sorters=[
|
|
|
- FieldSort("update_time", SortOrder.ASC)]),
|
|
|
- limit=100, get_total_count=True),
|
|
|
- ColumnsToGet(['uuid', 'docids', 'update_time','docid_number'],
|
|
|
- return_type=ColumnReturnType.SPECIFIED))
|
|
|
- all_rows.extend(rows)
|
|
|
- while next_token:
|
|
|
- rows, next_token, total_count, is_all_succeed = self.ots_client.search("project2", "project2_index",
|
|
|
- SearchQuery(bool_query,
|
|
|
- next_token=next_token,
|
|
|
- sort=Sort(sorters=[
|
|
|
- FieldSort("update_time",SortOrder.ASC)]),
|
|
|
- limit=100,get_total_count=True),
|
|
|
- ColumnsToGet(['uuid', 'docids', 'update_time','docid_number'],
|
|
|
- return_type=ColumnReturnType.SPECIFIED))
|
|
|
- all_rows.extend(rows)
|
|
|
-
|
|
|
- list_dict = getRow_ots(all_rows)
|
|
|
- docids_list = []
|
|
|
- for _dict in list_dict:
|
|
|
- _uuid = _dict.get("uuid", "")
|
|
|
- _docids = _dict.get("docids", "")
|
|
|
- _docids = _docids.split(",")
|
|
|
- for docid in _docids:
|
|
|
- docids_list.append([_uuid, int(docid)])
|
|
|
- # print('docids_list len:', len(docids_list))
|
|
|
-
|
|
|
- ots_query_res = []
|
|
|
- doc_columns_list = ['page_time', 'tenderee', 'tenderee_phone', 'agency', 'agency_phone', 'extract_count',
|
|
|
- "sub_docs_json",'extract_json', 'extract_json1', 'extract_json2', 'extract_json3']
|
|
|
-
|
|
|
- def extract_json_process(res_json):
|
|
|
- # 解析document数据
|
|
|
- extract_json = res_json.pop("extract_json")
|
|
|
- extract_json = extract_json if extract_json else "{}"
|
|
|
- if 'extract_json1' in res_json:
|
|
|
- extract_json1 = res_json.pop("extract_json1")
|
|
|
- extract_json1 = extract_json1 if extract_json1 else ""
|
|
|
- extract_json = extract_json + extract_json1
|
|
|
- if 'extract_json2' in res_json:
|
|
|
- extract_json2 = res_json.pop("extract_json2")
|
|
|
- extract_json2 = extract_json2 if extract_json2 else ""
|
|
|
- extract_json = extract_json + extract_json2
|
|
|
- if 'extract_json3' in res_json:
|
|
|
- extract_json3 = res_json.pop("extract_json3")
|
|
|
- extract_json3 = extract_json3 if extract_json3 else ""
|
|
|
- extract_json = extract_json + extract_json3
|
|
|
- try:
|
|
|
- extract_json = json.loads(extract_json)
|
|
|
- except:
|
|
|
- return None
|
|
|
-
|
|
|
- docchannel_dict = extract_json.get('docchannel', {})
|
|
|
- res_json['docchannel'] = docchannel_dict.get('docchannel', "")
|
|
|
- res_json['life_docchannel'] = docchannel_dict.get('life_docchannel', "")
|
|
|
-
|
|
|
- district_dict = extract_json.get('district', {})
|
|
|
- res_json['province'] = district_dict.get('province', "")
|
|
|
- res_json['city'] = district_dict.get('city', "")
|
|
|
- res_json['district'] = district_dict.get('district', "")
|
|
|
- res_json['area'] = district_dict.get('area', "")
|
|
|
-
|
|
|
- prem = extract_json.get('prem', {})
|
|
|
- res_json['prem'] = prem
|
|
|
-
|
|
|
- return res_json
|
|
|
-
|
|
|
- def _handle(item, _):
|
|
|
- # 查询解析document数据
|
|
|
- _uuid = item[0] # project uuid
|
|
|
- _docid = item[1]
|
|
|
- for i in range(3):
|
|
|
- try:
|
|
|
- bool_query = BoolQuery(must_queries=[TermQuery('docid', _docid)]
|
|
|
- )
|
|
|
- rows, next_token, total_count, is_all_succeed = self.ots_client.search("document", "document_index",
|
|
|
- SearchQuery(bool_query,
|
|
|
- sort=Sort(sorters=[FieldSort("page_time",SortOrder.ASC)]),
|
|
|
- limit=None,get_total_count=True),
|
|
|
- ColumnsToGet(doc_columns_list,
|
|
|
- return_type=ColumnReturnType.SPECIFIED))
|
|
|
- res = getRow_ots(rows)
|
|
|
- if res:
|
|
|
- # 通过extract_count过滤掉相关性不大的公告
|
|
|
- if res[0].get('extract_count', 0) > 5:
|
|
|
- ots_query_res.append([_uuid, _docid, extract_json_process(res[0])])
|
|
|
- break
|
|
|
- except Exception as e:
|
|
|
- # print('error:',e)
|
|
|
- pass
|
|
|
-
|
|
|
- task_queue = Queue()
|
|
|
- for item in docids_list:
|
|
|
- task_queue.put(item)
|
|
|
- if task_queue.qsize() >= 10000:
|
|
|
- _mt = MultiThreadHandler(task_queue, _handle, None, 20)
|
|
|
- _mt.run()
|
|
|
- if task_queue.qsize() >= 0:
|
|
|
- _mt = MultiThreadHandler(task_queue, _handle, None, 20)
|
|
|
- _mt.run()
|
|
|
-
|
|
|
- # print('ots_query_res len:', len(ots_query_res))
|
|
|
-
|
|
|
- # 处理修复数据
|
|
|
- ots_query_res.sort(key=lambda x: x[0])
|
|
|
- # 招标类别
|
|
|
- zb_type = [51, 52, 101, 102, 103, 104, 105, 114, 118, 119, 120, 121, 122]
|
|
|
- zb_type = [key2label[i] for i in zb_type]
|
|
|
-
|
|
|
- change_res = []
|
|
|
- for key, group in groupby(ots_query_res, lambda x: (x[0])):
|
|
|
- uuid = key
|
|
|
- project_data = list(group)
|
|
|
- all_len = len(project_data)
|
|
|
- if all_len < 4:
|
|
|
- continue
|
|
|
- zb_len = sum([1 if i[2].get('docchannel') in zb_type else 0 for i in project_data])
|
|
|
- # 招标类公告占比
|
|
|
- # if zb_len / all_len <= 0.5:
|
|
|
- if zb_len / all_len <= 0.7:
|
|
|
- # 项目不是招标相关项目
|
|
|
- continue
|
|
|
- # 项目里最多的省份
|
|
|
- province_list = [i[2].get('province', '') for i in project_data]
|
|
|
- province_sort = Counter(province_list).most_common()
|
|
|
- change_province = ""
|
|
|
- change_city = ""
|
|
|
- change_district = ""
|
|
|
- change_area = ""
|
|
|
- # if province_sort[0][1]/all_len > 0.5:
|
|
|
- if province_sort[0][1] / all_len > 0.7:
|
|
|
- if province_sort[0][0] and province_sort[0][0] not in ["全国", "未知"]:
|
|
|
- change_province = province_sort[0][0]
|
|
|
- if change_province:
|
|
|
- # 只替换到city,district 取"未知"
|
|
|
- change_province_data = [(i[2].get('province', ''), i[2].get('city', ''), i[2].get('area', '')) for i in
|
|
|
- project_data if i[2].get('province', '') == change_province]
|
|
|
- change_province_data_sort = Counter(change_province_data).most_common()
|
|
|
- change_city = change_province_data_sort[0][0][1]
|
|
|
- change_area = change_province_data_sort[0][0][2]
|
|
|
- change_district = "未知"
|
|
|
-
|
|
|
- # 联系方式统计
|
|
|
- phone_dict = {}
|
|
|
- for d in project_data:
|
|
|
- tenderee = d[2].get("tenderee", "")
|
|
|
- agency = d[2].get("agency", "")
|
|
|
- prem = d[2].get("prem", {})
|
|
|
-
|
|
|
- if len(prem) > 0:
|
|
|
- for name, project in prem.items():
|
|
|
- roleList = project.get("roleList", [])
|
|
|
- for role in roleList:
|
|
|
- role_name = role.get("role_name", "")
|
|
|
- role_text = role.get("role_text", "")
|
|
|
- if role_name in ['tenderee', 'agency', 'win_tenderer']:
|
|
|
- linklist = role.get("linklist", [])
|
|
|
- for _contact in linklist:
|
|
|
- if _contact[1] not in phone_dict:
|
|
|
- phone_dict[_contact[1]] = {}
|
|
|
- if role_text not in phone_dict[_contact[1]]:
|
|
|
- phone_dict[_contact[1]][role_text] = 0
|
|
|
- phone_dict[_contact[1]][role_text] += 1
|
|
|
- # 汇总电话对应的实体
|
|
|
- new_phone_dict = dict((phone, []) for phone in phone_dict)
|
|
|
- for phone, value in phone_dict.items():
|
|
|
- phone_name = [(name, count) for name, count in value.items()]
|
|
|
- phone_name.sort(key=lambda x: x[1], reverse=True)
|
|
|
- max_count = phone_name[0][1]
|
|
|
- max_name = [name for name, count in value.items() if count == max_count and max_count > 0]
|
|
|
- new_phone_dict[phone] = max_name
|
|
|
-
|
|
|
- for item in project_data:
|
|
|
- change_json = {"partitionkey": item[2].get("partitionkey"),
|
|
|
- 'docid': item[1],
|
|
|
- 'contactsByDelete': []}
|
|
|
- tenderee = item[2].get("tenderee", "")
|
|
|
- agency = item[2].get("agency", "")
|
|
|
- # docchannel修复
|
|
|
- docchannel = item[2].get('docchannel', "")
|
|
|
- life_docchannel = item[2].get('life_docchannel', "")
|
|
|
- if docchannel and docchannel not in zb_type:
|
|
|
- if life_docchannel in zb_type and docchannel != '采招数据':
|
|
|
- change_json['docchannel'] = label2key.get(life_docchannel)
|
|
|
- # province修复
|
|
|
- province = item[2].get('province', "")
|
|
|
- if change_province:
|
|
|
- if province != change_province and province in ["全国", "未知", '']: # province未识别时才修复
|
|
|
- change_json['province'] = change_province
|
|
|
- change_json['city'] = change_city
|
|
|
- change_json['district'] = change_district
|
|
|
- change_json['area'] = change_area
|
|
|
-
|
|
|
- # 联系方式修复
|
|
|
- tenderee_phone = item[2].get("tenderee_phone", "")
|
|
|
- agency_phone = item[2].get("agency_phone", "")
|
|
|
- prem = item[2].get("prem", {})
|
|
|
- sub_docs_json = item[2].get("sub_docs_json", "[]")
|
|
|
- try:
|
|
|
- sub_docs_json = json.loads(sub_docs_json)
|
|
|
- except:
|
|
|
- sub_docs_json = []
|
|
|
- for name, project in prem.items():
|
|
|
- roleList = project.get("roleList", [])
|
|
|
- for role in roleList:
|
|
|
- role_name = role.get("role_name", "")
|
|
|
- role_text = role.get("role_text", "")
|
|
|
- if role_name == 'tenderee' and role_text == tenderee:
|
|
|
- linklist = role.get("linklist", [])
|
|
|
- need_change = False
|
|
|
- right_contact = []
|
|
|
- for _contact in linklist:
|
|
|
- if _contact[1] and new_phone_dict.get(_contact[1]) and role_text not in new_phone_dict[_contact[1]]:
|
|
|
- change_json['contactsByDelete'].append({"enterpriseName": role_text, "phoneNo": _contact[1]})
|
|
|
- if _contact[1] == tenderee_phone:
|
|
|
- need_change = True
|
|
|
- else:
|
|
|
- right_contact.append([_contact[0], _contact[1]])
|
|
|
- if need_change:
|
|
|
- if right_contact:
|
|
|
- right_contact.sort(reverse=True)
|
|
|
- change_json['tendereeContact'] = right_contact[0][0]
|
|
|
- change_json['tendereePhone'] = right_contact[0][1]
|
|
|
- elif role_name == 'agency' and role_text == agency:
|
|
|
- linklist = role.get("linklist", [])
|
|
|
- need_change = False
|
|
|
- right_contact = []
|
|
|
- for _contact in linklist:
|
|
|
- if _contact[1] and new_phone_dict.get(_contact[1]) and role_text not in new_phone_dict[_contact[1]]:
|
|
|
- change_json['contactsByDelete'].append({"enterpriseName": role_text, "phoneNo": _contact[1]})
|
|
|
- if _contact[1] == agency_phone:
|
|
|
- need_change = True
|
|
|
- else:
|
|
|
- right_contact.append([_contact[0], _contact[1]])
|
|
|
- if need_change:
|
|
|
- if right_contact:
|
|
|
- right_contact.sort(reverse=True)
|
|
|
- change_json['agencyContact'] = right_contact[0][0]
|
|
|
- change_json['agencyPhone'] = right_contact[0][1]
|
|
|
- elif role_name == 'win_tenderer':
|
|
|
- linklist = role.get("linklist", [])
|
|
|
- for _contact in linklist:
|
|
|
- if _contact[1] and new_phone_dict.get(_contact[1]) and role_text not in new_phone_dict[_contact[1]]:
|
|
|
- change_json['contactsByDelete'].append({"enterpriseName": role_text, "phoneNo": _contact[1]})
|
|
|
-
|
|
|
- sub_docs_json_change = False
|
|
|
- if sub_docs_json:
|
|
|
- for _project in sub_docs_json:
|
|
|
- win_tenderer = _project.get("win_tenderer", "")
|
|
|
- win_tenderer_phone = _project.get("win_tenderer_phone", "")
|
|
|
- if win_tenderer_phone and new_phone_dict.get(win_tenderer_phone) and win_tenderer not in new_phone_dict[win_tenderer_phone]:
|
|
|
- _project["win_tenderer_phone"] = ""
|
|
|
- _project["win_tenderer_manager"] = ""
|
|
|
- sub_docs_json_change = True
|
|
|
- if sub_docs_json_change:
|
|
|
- change_json['subDocsJson'] = sub_docs_json
|
|
|
-
|
|
|
- new_contact_json = []
|
|
|
- for _contact in change_json['contactsByDelete']:
|
|
|
- if _contact not in new_contact_json:
|
|
|
- new_contact_json.append(_contact)
|
|
|
- change_json['contactsByDelete'] = new_contact_json
|
|
|
- if len(change_json) > 3 or len(change_json['contactsByDelete']) > 0:
|
|
|
- # 没有修改地区时,传输原来提取的地区
|
|
|
- if not change_json.get("province"):
|
|
|
- change_json['area'] = item[2].get("area", "")
|
|
|
- change_json['province'] = item[2].get("province", "")
|
|
|
- change_json['city'] = item[2].get("city", "")
|
|
|
- change_json['district'] = item[2].get("district", "")
|
|
|
- change_res.append({"document": change_json})
|
|
|
-
|
|
|
- # post result
|
|
|
- headers = {'Content-Type': 'application/json',
|
|
|
- "Authorization": "Bearer eyJhbGciOiJIUzUxMiJ9.eyJ1c2VySWQiOjEsInVzZXJuYW1lIjoiYWRtaW4iLCJ1dWlkIjoiNGQwYzA0ODYtMzVmZi00MDJhLTk4OWQtNWEwNTE3YTljMDNiIiwic3ViIjoiMSIsImlhdCI6MTY3OTk5MTcxNywiZXhwIjo0ODMzNTkxNzE3fQ.ESDDnEDYP5ioK4ouHOYXsZbLayGRNVI9ugpbxDx_3fPIceD1KIjlDeopBmeATLoz8VYQihd8qO-UzP5pDsaUmQ"}
|
|
|
- # url = "http://192.168.2.26:8002/document/updateAreaAndContact"
|
|
|
- url = "http://data-api.bidizhaobiao.com/document/updateAreaAndContact"
|
|
|
- for _data in change_res:
|
|
|
- post_sucess = False
|
|
|
- for i in range(3):
|
|
|
- if not post_sucess:
|
|
|
- try:
|
|
|
- # 发送POST请求,传输JSON数据
|
|
|
- response = requests.post(url, json=_data,headers=headers)
|
|
|
- # print(response.status_code,response.json())
|
|
|
- # 检查响应状态码
|
|
|
- if response.status_code == 200:
|
|
|
- post_sucess = True
|
|
|
- except requests.exceptions.RequestException as e:
|
|
|
- # log("fix doc by project2,post error reason: %s"%(str(e)))
|
|
|
- pass
|
|
|
-
|
|
|
- log("fix doc by project2, change doc nums:%d"%len(change_res))
|
|
|
-
|
|
|
|
|
|
def start_flow_dumplicate(self):
|
|
|
schedule = BlockingScheduler()
|
|
@@ -5215,7 +4881,6 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
schedule.add_job(self.flow_remove,"cron",hour="20")
|
|
|
schedule.add_job(self.send_daily_check_data,"cron",hour='9', minute='10')
|
|
|
schedule.add_job(self.send_daily_check_data2,"cron",hour='9', minute='10')
|
|
|
- schedule.add_job(self.fix_doc_by_project2,"cron",hour='8', minute='10')
|
|
|
schedule.add_job(self.flow_remove_project_tmp,"cron",hour="20")
|
|
|
schedule.add_job(self.fix_doc_which_not_in_project,"cron",minute="*/10")
|
|
|
schedule.start()
|
|
@@ -5254,7 +4919,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
|
|
|
if item:
|
|
|
log("start dumplicate_comsumer_handle")
|
|
|
- self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=True,upgrade=False)
|
|
|
+ self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=False,upgrade=False)
|
|
|
return
|
|
|
|
|
|
def test_merge(self,list_docid_less,list_docid_greater):
|