123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451 |
- #encoding:GBK
- import sys
- import os
- sys.path.append("../")
- import pandas as pd
- from dataSource.source import *
- import json
- from utils.multiThread import MultiThreadHandler
- import queue
- from utils.Utils import *
- from dataSource.pool import ConnectorPool
- import re
- from tablestore import *
- import traceback
- from utils.hashUtil import aesCipher
- from export.exportEnterprise import getDictEnterprise,getOneContact
- from export.exportUtils import generateBoolShouldQuery
- data_path = "../data/"
- set_columns = set()
- list_df_columns = []
- def set_dict_item(_dict,name,v):
- _dict[name] = getLegal_str(v)
- if name not in set_columns:
- set_columns.add(name)
- list_df_columns.append(getLegal_str(name))
- def getDict_docchannel():
- conn = getConnection_mysql()
- cursor = conn.cursor()
- sql = "select channel_id,chnlname from sys_channel "
- cursor.execute(sql)
- rows = cursor.fetchall()
- _dict = dict()
- for row in rows:
- _dict[row[0]] = row[1]
- return _dict
- def exportProject_by_pagetime():
- # filename = "../data/重复公告.xlsx"
- # df = pd.read_excel(filename)
- ots_client = getConnect_ots()
- set_enter = set()
- str_enter = '''
- 成都四方伟业软件股份有限公司
- 北京数字冰雹信息技术有限公司
- 北京睿呈时代信息科技有限公司
- 北京五一视界数字孪生科技股份有限公司
- 易达云图(深圳)科技有限公司
- 北京优锘科技有限公司
- 深圳市鸿普森科技股份有限公司
- 厦门图扑软件科技有限公司
- 四川相数科技有限公司
- '''
- for a in re.split("\s+",str_enter):
- if a.strip()!="":
- set_enter.add(a.strip())
- columns = ["docids","doctitle","docchannel","bidway","province","city","district","info_type","page_time","crtime","project_code","tenderee","project_name","agency","sub_docs_json","tenderee_contact","tenderee_phone","doctextcon","product","moneysource","win_bid_price","win_tenderer","bidding_budget"]
- columns = ["page_time","province","city","win_tenderer"]
- dict_channel = getDict_docchannel()
- def getData(df_data,rows,set_line):
- list_data = getRow_ots(rows)
- for row in list_data:
- item = {}
- _dict = row
- set_dict_item(item,"docids",_dict.get("docids",""))
- set_dict_item(item,"项目名称",_dict.get("project_name",""))
- set_dict_item(item,"项目编号",_dict.get("project_code",""))
- # set_dict_item(item,"公告标题",_dict.get("doctitle",""))
- # set_dict_item(item,"公告类别",dict_channel.get(_dict.get("docchannel",""),""))
- set_dict_item(item,"省份",_dict.get("province",""))
- # item["区域"] = "%s-%s-%s"%(_dict.get("province",""),_dict.get("city",""),_dict.get("district",""))
- set_dict_item(item,"城市",_dict.get("city",""))
- set_dict_item(item,"发布时间",_dict.get("page_time",""))
- set_dict_item(item,"公告标题_refine",re.sub(r'工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|合同', '', _dict.get("doctitle","")))
- set_dict_item(item,"招标单位",_dict.get("tenderee",""))
- set_dict_item(item,"招标联系人",_dict.get("tenderee_contact",""))
- set_dict_item(item,"招标联系人电话",_dict.get("tenderee_phone",""))
- set_dict_item(item,"代理单位",_dict.get("agency",""))
- set_dict_item(item,"代理联系人",_dict.get("agency_contact",""))
- set_dict_item(item,"代理联系人电话",_dict.get("agency_phone",""))
- # set_dict_item(item,"比地招标公告地址","http://www.bidizhaobiao.com/excel_detail.do?code=%s"%(str(aesCipher.encrypt('{"docid":%d}'%_dict.get("docid")))))
- set_dict_item(item,"招标金额",_dict.get("bidding_budget",""))
- set_dict_item(item,"中标金额",_dict.get("win_bid_price",""))
- set_dict_item(item,"中标单位",_dict.get("win_tenderer",""))
- sub_docs_json = _dict.get("sub_docs_json")
- if sub_docs_json is not None:
- for _doc in json.loads(sub_docs_json):
- if "win_tenderer" in _doc:
- set_dict_item(item,"中标单位",_doc["win_tenderer"])
- if "win_tenderee_manager" in _doc:
- set_dict_item(item,"中标单位联系人",_doc["win_tenderee_manager"])
- if "win_tenderee_phone" in _doc:
- set_dict_item(item,"中标单位联系电话",_doc["win_tenderee_phone"])
- if "win_bid_price" in _doc and float(0 if _doc["win_bid_price"]=="" else _doc["win_bid_price"])>0:
- set_dict_item(item,"中标金额",_doc["win_bid_price"])
- if "bidding_budget" in _doc and float(0 if _doc["bidding_budget"]=="" else _doc["bidding_budget"])>0:
- set_dict_item(item,"招标金额",_doc["bidding_budget"])
- if "招标金额" not in item:
- set_dict_item(item,"招标金额","")
- if "中标金额" not in item:
- set_dict_item(item,"中标金额","")
- if "中标单位" not in item:
- set_dict_item(item,"中标单位","")
- if "中标单位联系人" not in item:
- set_dict_item(item,"中标单位联系人","")
- if "中标单位联系电话" not in item:
- set_dict_item(item,"中标单位联系电话","")
- # if item["中标单位"] not in set_enter:
- # continue
- _line = "%s-%s-%s-%s-%s-%s"%(item["省份"],item["城市"],item["项目编号"],item["招标单位"],item["招标联系人"],str(item["招标金额"]))
- # if _line in set_line:
- # continue
- # if item["招标金额"]=="":
- # continue
- # set_line.add(_line)
- for k,v in item.items():
- if k not in df_data:
- df_data[k] = []
- df_data[k].append(v)
- # list_province = ["江西","湖南","四川","安徽"]
- list_province = ["全国"]
- for _province in list_province:
- df_data = {}
- str_p = '''
- 家具
- '''
- # str_p = '''
- # 教育信息化 教学设备 智慧校园 互联网教育
- # '''
- list_prov = re.split("\s|、",str_p)
- list_mu = []
- for _p in list_prov:
- if _p.strip()=="":
- continue
- print(_p)
- list_mu.append(MatchPhraseQuery('doctextcon', '%s'%_p.strip()))
- s_tenderee = '教育局、中学、小学'
- list_should_ten = []
- for _p in re.split("、",s_tenderee):
- if _p.split()=="":
- continue
- list_should_ten.append(WildcardQuery("tenderee","*%s*"%_p.strip()))
- # list_should_ten.append(MatchPhraseQuery('doctextcon', '%s'%_p.strip()))
- list_should_chan = []
- list_should_chan.append(TermQuery("docchannel",101))
- # list_should_chan.append(TermQuery("docchannel",101))
- # list_should_chan.append(TermQuery("docchannel",102))
- should_q1 = BoolQuery(should_queries=list_mu)
- should_q2 = BoolQuery(should_queries=list_should_ten)
- should_q3 = BoolQuery(should_queries=list_should_chan)
- bool_query = BoolQuery(must_queries=[
- generateBoolShouldQuery(["doctextcon"],["家具"],MatchPhraseQuery),
- generateBoolShouldQuery(["province"],["广东","安徽","江苏","浙江","四川","北京"],TermQuery),
- WildcardQuery("win_tenderer","*"),
- ])
- table_name = "project2"
- rows, next_token, total_count, is_all_succeed = ots_client.search(table_name, "%s_index"%table_name,
- SearchQuery(bool_query ,sort=Sort(sorters=[FieldSort("page_time",SortOrder.ASC)]), limit=100, get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- print(total_count)
- set_line = set()
- _count = len(rows)
- getData(df_data,rows,set_line)
- while next_token:
- print("%d/%d"%(_count,total_count))
- rows, next_token, total_count, is_all_succeed = ots_client.search(table_name, "%s_index"%table_name,
- SearchQuery(bool_query ,next_token=next_token, limit=100, get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- getData(df_data,rows,set_line)
- _count += len(rows)
- # if len(df_data[list(df_data.keys())[0]])>=300:
- # break
- set_enterprise = set()
- for _tenderee,_agency,_win_tenderer in zip(df_data["招标单位"],df_data["代理单位"],df_data["中标单位"]):
- set_enterprise.add(_tenderee)
- set_enterprise.add(_agency)
- set_enterprise.add(_win_tenderer)
- if "" in set_enterprise:
- set_enterprise.remove("")
- if None in set_enterprise:
- set_enterprise.remove(None)
- # dict_enterprise = getDictEnterprise(list(set_enterprise))
- # if len(set_enterprise)>0:
- # for _i in range(len(df_data["招标单位"])):
- # _enterprise_name = df_data["招标单位"][_i]
- # if df_data["招标联系人电话"][_i]=="":
- # contacts = dict_enterprise.get(_enterprise_name,{}).get("contacts")
- # if contacts is not None:
- # _person,_phone = getOneContact(contacts)
- # df_data["招标联系人"][_i] = _person
- # df_data["招标联系人电话"][_i] = _phone
- #
- # _enterprise_name = df_data["代理单位"][_i]
- # if df_data["代理联系人电话"][_i]=="":
- # contacts = dict_enterprise.get(_enterprise_name,{}).get("contacts")
- # if contacts is not None:
- # _person,_phone = getOneContact(contacts)
- # df_data["代理联系人"][_i] = _person
- # df_data["代理联系人电话"][_i] = _phone
- #
- # _enterprise_name = df_data["中标单位"][_i]
- # if df_data["中标单位联系电话"][_i]=="":
- # contacts = dict_enterprise.get(_enterprise_name,{}).get("contacts")
- # if contacts is not None:
- # _person,_phone = getOneContact(contacts)
- # df_data["中标单位联系人"][_i] = _person
- # df_data["中标单位联系电话"][_i] = _phone
- # print(df_data)
- df1 = pd.DataFrame(df_data)
- df1.to_excel("../data/%s_数据导出.xlsx"%(getCurrent_date('%Y-%m-%d_%H%M%S')),columns=list_df_columns)
- def exportProjectWithOneDocid():
- ots_client = getConnect_ots()
- list_data = []
- bool_query = BoolQuery(must_queries=[TermQuery("page_time","2021-05-28")])
- columns = ["docids","project_name"]
- rows,next_token,total_count,is_all_succeed = ots_client.search("project2","project2_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("page_time",SortOrder.ASC)]),get_total_count=True,limit=100),
- columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _dict in list_dict:
- if len(_dict["docids"].split(","))==1:
- list_data.append(_dict)
- _count = len(list_dict)
- while True:
- if not next_token:
- break
- rows,next_token,total_count,is_all_succeed = ots_client.search("project2","project2_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- _count += len(list_dict)
- print("%d/%d"%(_count,total_count))
- for _dict in list_dict:
- if len(_dict["docids"].split(","))==1:
- list_data.append(_dict)
- _index = 0
- task_queue = queue.Queue()
- for _dict in list_data:
- task_queue.put(_dict)
- def _handle(_dict,result_queue):
- docid = _dict["docids"]
- project_name = _dict["project_name"]
- _dict["candidate"] = []
- _dict["total_count"] = 0
- if len(project_name)>0:
- doc_query = BoolQuery(must_queries=[MatchPhraseQuery("doctextcon",project_name)
- ,RangeQuery("status",201,300,True,True)],
- must_not_queries=[TermQuery("docid",docid)])
- rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
- SearchQuery(doc_query,sort=Sort(sorters=[FieldSort("page_time",SortOrder.DESC)]),limit=10,get_total_count=True),
- columns_to_get=ColumnsToGet(["doctitle"],ColumnReturnType.SPECIFIED))
- l_d = getRow_ots(rows)
- for _d in l_d:
- _dict["candidate"].append(_d["docid"])
- _dict["total_count"] = total_count
- mt = MultiThreadHandler(task_queue,_handle,None,30)
- mt.run()
- df_data = {}
- for _d in list_data:
- for k,v in _d.items():
- if k not in df_data:
- df_data[k] = []
- df_data[k].append(v)
- df = pd.DataFrame(df_data)
- df.to_excel("../data/%s_未合并.xlsx"%(getCurrent_date("%Y-%m-%d %H%M%S")))
- def getPayStaffName():
- conn = getConnection_mysql()
- cursor = conn.cursor()
- sql = " select company,userid,phone,contactname,aftermarket from bxkc.b2c_mall_staff_basic_info where MEMBERLEVELID is not null and MEMBERLEVELID <> 81"
- cursor.execute(sql)
- rows = cursor.fetchall()
- dict_staff = {}
- for row in rows:
- company,userid,phone,contactname,aftermarket = row
- if company is not None:
- dict_staff[company] = {"userid":userid,"phone":phone,"contactname":contactname,"aftermarket":aftermarket}
- return dict_staff
- def exportCompanyByCycleProduct():
- filename = "../data/周期项目识别.csv"
- df = pd.read_csv(filename,encoding='gbk')
- task_queue = queue.Queue()
- result_queue = queue.Queue()
- pool_conn = ConnectorPool(init_num=10,max_num=30,method_init=getConnection_mysql)
- _count = 0
- for tenderee,product,last_time,avg_period,min_period,max_period,json_docid in zip(df["tenderee"],df["product"],df["last_time"],df["avg_period"],df["min_period"],df["max_period"],df["json_docid"]):
- _dict = {"tenderee":tenderee,"product":product,"last_time":last_time,"avg_period":avg_period,"min_period":min_period,
- "max_period":max_period,"json_docid":json_docid}
- task_queue.put(_dict)
- _count += 1
- sstr_staff = getPayStaffName()
- ots_client = getConnect_ots()
- def _comsumer(_dict,result_queue,ots_client,sstr_staff,pool_conn):
- new_dict = {"招标人":_dict["tenderee"],"产品":_dict["product"],"上次招标":_dict["last_time"],
- "预计招标范围":"%s-%s"%(timeAdd(_dict["last_time"],_dict["min_period"]),timeAdd(_dict["last_time"],_dict["max_period"])),
- "周期":_dict["avg_period"],"历史招标":_dict["json_docid"]}
- aint_docid = json.loads(_dict["json_docid"])
- aobj_should_q_docid = []
- consumed, return_row, next_token = ots_client.get_row("enterprise",[("name",_dict["tenderee"])], ["contacts"], None, 1)
- dict_tmp = getRow_ots_primary(return_row)
- contacts = dict_tmp.get("contacts")
- phone_person,phone_no = getOneContact(contacts)
- new_dict["招标人联系人"] = phone_person
- new_dict["招标人联系电话"] = phone_no
- for int_docid in aint_docid:
- aobj_should_q_docid.append(TermQuery("docids",int_docid))
- bool_query = BoolQuery(should_queries=aobj_should_q_docid)
- columns = ['win_tenderer','second_tenderer','third_tenderer']
- rows,next_token,total_count,is_all_succeed = ots_client.search("project2","project2_index",
- SearchQuery(bool_query,limit=100,get_total_count=True),
- ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
- adict_rows = getRow_ots(rows)
- for dict_row in adict_rows:
- for _k,_company in dict_row.items():
- if _k in columns and _company is not None and _company!="":
- _succeed = True
- new_dict1 = {}
- for k,v in new_dict.items():
- new_dict1[k] = v
- new_dict1["潜在客户"] = _company
- consumed, return_row, next_token = ots_client.get_row("enterprise",[("name",_company)], ["contacts"], None, 1)
- dict_tmp = getRow_ots_primary(return_row)
- contacts = dict_tmp.get("contacts")
- phone_person,phone_no = getOneContact(contacts)
- new_dict1["潜在客户联系人"] = phone_person
- new_dict1["潜在客户联系电话"] = phone_no
- if _company in sstr_staff:
- company_info = sstr_staff[_company]
- new_dict1["付费客户"] = "是"
- conn = pool_conn.getConnector()
- try:
- cursor = conn.cursor()
- sql = " select name from bxkc.b2c_mall_staff_basic_info where userid='%s'"%(company_info.get("aftermarket",""))
- cursor.execute(sql)
- rows = cursor.fetchall()
- if len(rows)>0:
- new_dict1["归属客服"] = rows[0][0]
- else:
- new_dict1["归属客服"] = ""
- new_dict1["付费客户联系人"] = company_info.get("contactname","")
- new_dict1["付费客户电话"] = company_info.get("phone","")
- sql = " select date_FORMAT(etiem,\'%Y-%m-%d\') from bxkc.bxkc_member_term where userid='"+company_info.get("userid","")+"' and memberlevelid<>81 order by etiem desc limit 1"
- cursor.execute(sql)
- rows = cursor.fetchall()
- if len(rows)>0:
- etime = rows[0][0]
- new_dict1["付费客户到期日"] = etime
- if time.mktime(time.strptime(etime,"%Y-%m-%d"))>time.mktime(time.localtime()):
- new_dict1["付费客户到期"] = "否"
- else:
- new_dict1["付费客户到期"] = "是"
- else:
- new_dict1["付费客户到期日"] = ""
- new_dict1["付费客户到期"] = ""
- except Exception as e:
- traceback.print_exc()
- _succeed = False
- finally:
- pool_conn.putConnector(conn)
- else:
- new_dict1["付费客户"] = "否"
- new_dict1["归属客服"] = ""
- new_dict1["付费客户联系人"] = ""
- new_dict1["付费客户电话"] = ""
- new_dict1["付费客户到期日"] = ""
- new_dict1["付费客户到期"] = ""
- if _succeed:
- result_queue.put(new_dict1)
- mt = MultiThreadHandler(task_queue,_comsumer,result_queue,ots_client=ots_client,sstr_staff=sstr_staff,pool_conn=pool_conn,thread_count=30)
- mt.run()
- df_data = {}
- set_staff = set()
- while True:
- try:
- _dict = result_queue.get(timeout=1)
- tenderee = _dict.get("招标人","")
- product = _dict.get("产品","")
- staff = _dict.get("潜在客户","")
- _s = "%s-%s-%s"%(tenderee,product,staff)
- if _s in set_staff:
- continue
- set_staff.add(_s)
- for k,v in _dict.items():
- if k not in df_data:
- df_data[k] = []
- df_data[k].append(v)
- except Exception as e:
- break
- df1 = pd.DataFrame(df_data)
- df1.to_excel("../data/%s_周期项目.xlsx"%(getCurrent_date("%Y-%m-%d_%H%M%S")))
- def appendCellphones():
- file = "../data/"
- if __name__=="__main__":
- exportProject_by_pagetime()
- # exportProjectWithOneDocid()
- # exportCompanyByCycleProduct()
|