#encoding:GBK import sys import os sys.path.append("../") import pandas as pd from dataSource.source import * import json from utils.multiThread import MultiThreadHandler import queue from utils.Utils import * from dataSource.pool import ConnectorPool import re from tablestore import * import traceback from utils.hashUtil import aesCipher from export.exportEnterprise import getDictEnterprise,getOneContact from export.exportUtils import generateBoolShouldQuery data_path = "../data/" set_columns = set() list_df_columns = [] def set_dict_item(_dict,name,v): _dict[name] = getLegal_str(v) if name not in set_columns: set_columns.add(name) list_df_columns.append(getLegal_str(name)) def getDict_docchannel(): conn = getConnection_mysql() cursor = conn.cursor() sql = "select channel_id,chnlname from sys_channel " cursor.execute(sql) rows = cursor.fetchall() _dict = dict() for row in rows: _dict[row[0]] = row[1] return _dict def exportProject_by_pagetime(): # filename = "../data/重复公告.xlsx" # df = pd.read_excel(filename) ots_client = getConnect_ots() set_enter = set() str_enter = ''' 成都四方伟业软件股份有限公司 北京数字冰雹信息技术有限公司 北京睿呈时代信息科技有限公司 北京五一视界数字孪生科技股份有限公司 易达云图(深圳)科技有限公司 北京优锘科技有限公司 深圳市鸿普森科技股份有限公司 厦门图扑软件科技有限公司 四川相数科技有限公司 ''' for a in re.split("\s+",str_enter): if a.strip()!="": set_enter.add(a.strip()) columns = ["docids","doctitle","docchannel","bidway","province","city","district","info_type","page_time","crtime","project_code","tenderee","project_name","agency","sub_docs_json","tenderee_contact","tenderee_phone","doctextcon","product","moneysource","win_bid_price","win_tenderer","bidding_budget"] columns = ["page_time","province","city","win_tenderer"] dict_channel = getDict_docchannel() def getData(df_data,rows,set_line): list_data = getRow_ots(rows) for row in list_data: item = {} _dict = row set_dict_item(item,"docids",_dict.get("docids","")) set_dict_item(item,"项目名称",_dict.get("project_name","")) set_dict_item(item,"项目编号",_dict.get("project_code","")) # set_dict_item(item,"公告标题",_dict.get("doctitle","")) # set_dict_item(item,"公告类别",dict_channel.get(_dict.get("docchannel",""),"")) set_dict_item(item,"省份",_dict.get("province","")) # item["区域"] = "%s-%s-%s"%(_dict.get("province",""),_dict.get("city",""),_dict.get("district","")) set_dict_item(item,"城市",_dict.get("city","")) set_dict_item(item,"发布时间",_dict.get("page_time","")) set_dict_item(item,"公告标题_refine",re.sub(r'工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|合同', '', _dict.get("doctitle",""))) set_dict_item(item,"招标单位",_dict.get("tenderee","")) set_dict_item(item,"招标联系人",_dict.get("tenderee_contact","")) set_dict_item(item,"招标联系人电话",_dict.get("tenderee_phone","")) set_dict_item(item,"代理单位",_dict.get("agency","")) set_dict_item(item,"代理联系人",_dict.get("agency_contact","")) set_dict_item(item,"代理联系人电话",_dict.get("agency_phone","")) # set_dict_item(item,"比地招标公告地址","http://www.bidizhaobiao.com/excel_detail.do?code=%s"%(str(aesCipher.encrypt('{"docid":%d}'%_dict.get("docid"))))) set_dict_item(item,"招标金额",_dict.get("bidding_budget","")) set_dict_item(item,"中标金额",_dict.get("win_bid_price","")) set_dict_item(item,"中标单位",_dict.get("win_tenderer","")) sub_docs_json = _dict.get("sub_docs_json") if sub_docs_json is not None: for _doc in json.loads(sub_docs_json): if "win_tenderer" in _doc: set_dict_item(item,"中标单位",_doc["win_tenderer"]) if "win_tenderee_manager" in _doc: set_dict_item(item,"中标单位联系人",_doc["win_tenderee_manager"]) if "win_tenderee_phone" in _doc: set_dict_item(item,"中标单位联系电话",_doc["win_tenderee_phone"]) if "win_bid_price" in _doc and float(0 if _doc["win_bid_price"]=="" else _doc["win_bid_price"])>0: set_dict_item(item,"中标金额",_doc["win_bid_price"]) if "bidding_budget" in _doc and float(0 if _doc["bidding_budget"]=="" else _doc["bidding_budget"])>0: set_dict_item(item,"招标金额",_doc["bidding_budget"]) if "招标金额" not in item: set_dict_item(item,"招标金额","") if "中标金额" not in item: set_dict_item(item,"中标金额","") if "中标单位" not in item: set_dict_item(item,"中标单位","") if "中标单位联系人" not in item: set_dict_item(item,"中标单位联系人","") if "中标单位联系电话" not in item: set_dict_item(item,"中标单位联系电话","") # if item["中标单位"] not in set_enter: # continue _line = "%s-%s-%s-%s-%s-%s"%(item["省份"],item["城市"],item["项目编号"],item["招标单位"],item["招标联系人"],str(item["招标金额"])) # if _line in set_line: # continue # if item["招标金额"]=="": # continue # set_line.add(_line) for k,v in item.items(): if k not in df_data: df_data[k] = [] df_data[k].append(v) # list_province = ["江西","湖南","四川","安徽"] list_province = ["全国"] for _province in list_province: df_data = {} str_p = ''' 家具 ''' # str_p = ''' # 教育信息化 教学设备 智慧校园 互联网教育 # ''' list_prov = re.split("\s|、",str_p) list_mu = [] for _p in list_prov: if _p.strip()=="": continue print(_p) list_mu.append(MatchPhraseQuery('doctextcon', '%s'%_p.strip())) s_tenderee = '教育局、中学、小学' list_should_ten = [] for _p in re.split("、",s_tenderee): if _p.split()=="": continue list_should_ten.append(WildcardQuery("tenderee","*%s*"%_p.strip())) # list_should_ten.append(MatchPhraseQuery('doctextcon', '%s'%_p.strip())) list_should_chan = [] list_should_chan.append(TermQuery("docchannel",101)) # list_should_chan.append(TermQuery("docchannel",101)) # list_should_chan.append(TermQuery("docchannel",102)) should_q1 = BoolQuery(should_queries=list_mu) should_q2 = BoolQuery(should_queries=list_should_ten) should_q3 = BoolQuery(should_queries=list_should_chan) bool_query = BoolQuery(must_queries=[ generateBoolShouldQuery(["doctextcon"],["家具"],MatchPhraseQuery), generateBoolShouldQuery(["province"],["广东","安徽","江苏","浙江","四川","北京"],TermQuery), WildcardQuery("win_tenderer","*"), ]) table_name = "project2" rows, next_token, total_count, is_all_succeed = ots_client.search(table_name, "%s_index"%table_name, SearchQuery(bool_query ,sort=Sort(sorters=[FieldSort("page_time",SortOrder.ASC)]), limit=100, get_total_count=True), ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED)) print(total_count) set_line = set() _count = len(rows) getData(df_data,rows,set_line) while next_token: print("%d/%d"%(_count,total_count)) rows, next_token, total_count, is_all_succeed = ots_client.search(table_name, "%s_index"%table_name, SearchQuery(bool_query ,next_token=next_token, limit=100, get_total_count=True), ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED)) getData(df_data,rows,set_line) _count += len(rows) # if len(df_data[list(df_data.keys())[0]])>=300: # break set_enterprise = set() for _tenderee,_agency,_win_tenderer in zip(df_data["招标单位"],df_data["代理单位"],df_data["中标单位"]): set_enterprise.add(_tenderee) set_enterprise.add(_agency) set_enterprise.add(_win_tenderer) if "" in set_enterprise: set_enterprise.remove("") if None in set_enterprise: set_enterprise.remove(None) # dict_enterprise = getDictEnterprise(list(set_enterprise)) # if len(set_enterprise)>0: # for _i in range(len(df_data["招标单位"])): # _enterprise_name = df_data["招标单位"][_i] # if df_data["招标联系人电话"][_i]=="": # contacts = dict_enterprise.get(_enterprise_name,{}).get("contacts") # if contacts is not None: # _person,_phone = getOneContact(contacts) # df_data["招标联系人"][_i] = _person # df_data["招标联系人电话"][_i] = _phone # # _enterprise_name = df_data["代理单位"][_i] # if df_data["代理联系人电话"][_i]=="": # contacts = dict_enterprise.get(_enterprise_name,{}).get("contacts") # if contacts is not None: # _person,_phone = getOneContact(contacts) # df_data["代理联系人"][_i] = _person # df_data["代理联系人电话"][_i] = _phone # # _enterprise_name = df_data["中标单位"][_i] # if df_data["中标单位联系电话"][_i]=="": # contacts = dict_enterprise.get(_enterprise_name,{}).get("contacts") # if contacts is not None: # _person,_phone = getOneContact(contacts) # df_data["中标单位联系人"][_i] = _person # df_data["中标单位联系电话"][_i] = _phone # print(df_data) df1 = pd.DataFrame(df_data) df1.to_excel("../data/%s_数据导出.xlsx"%(getCurrent_date('%Y-%m-%d_%H%M%S')),columns=list_df_columns) def exportProjectWithOneDocid(): ots_client = getConnect_ots() list_data = [] bool_query = BoolQuery(must_queries=[TermQuery("page_time","2021-05-28")]) columns = ["docids","project_name"] rows,next_token,total_count,is_all_succeed = ots_client.search("project2","project2_index", SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("page_time",SortOrder.ASC)]),get_total_count=True,limit=100), columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED)) list_dict = getRow_ots(rows) for _dict in list_dict: if len(_dict["docids"].split(","))==1: list_data.append(_dict) _count = len(list_dict) while True: if not next_token: break rows,next_token,total_count,is_all_succeed = ots_client.search("project2","project2_index", SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True), columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED)) list_dict = getRow_ots(rows) _count += len(list_dict) print("%d/%d"%(_count,total_count)) for _dict in list_dict: if len(_dict["docids"].split(","))==1: list_data.append(_dict) _index = 0 task_queue = queue.Queue() for _dict in list_data: task_queue.put(_dict) def _handle(_dict,result_queue): docid = _dict["docids"] project_name = _dict["project_name"] _dict["candidate"] = [] _dict["total_count"] = 0 if len(project_name)>0: doc_query = BoolQuery(must_queries=[MatchPhraseQuery("doctextcon",project_name) ,RangeQuery("status",201,300,True,True)], must_not_queries=[TermQuery("docid",docid)]) rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index", SearchQuery(doc_query,sort=Sort(sorters=[FieldSort("page_time",SortOrder.DESC)]),limit=10,get_total_count=True), columns_to_get=ColumnsToGet(["doctitle"],ColumnReturnType.SPECIFIED)) l_d = getRow_ots(rows) for _d in l_d: _dict["candidate"].append(_d["docid"]) _dict["total_count"] = total_count mt = MultiThreadHandler(task_queue,_handle,None,30) mt.run() df_data = {} for _d in list_data: for k,v in _d.items(): if k not in df_data: df_data[k] = [] df_data[k].append(v) df = pd.DataFrame(df_data) df.to_excel("../data/%s_未合并.xlsx"%(getCurrent_date("%Y-%m-%d %H%M%S"))) def getPayStaffName(): conn = getConnection_mysql() cursor = conn.cursor() sql = " select company,userid,phone,contactname,aftermarket from bxkc.b2c_mall_staff_basic_info where MEMBERLEVELID is not null and MEMBERLEVELID <> 81" cursor.execute(sql) rows = cursor.fetchall() dict_staff = {} for row in rows: company,userid,phone,contactname,aftermarket = row if company is not None: dict_staff[company] = {"userid":userid,"phone":phone,"contactname":contactname,"aftermarket":aftermarket} return dict_staff def exportCompanyByCycleProduct(): filename = "../data/周期项目识别.csv" df = pd.read_csv(filename,encoding='gbk') task_queue = queue.Queue() result_queue = queue.Queue() pool_conn = ConnectorPool(init_num=10,max_num=30,method_init=getConnection_mysql) _count = 0 for tenderee,product,last_time,avg_period,min_period,max_period,json_docid in zip(df["tenderee"],df["product"],df["last_time"],df["avg_period"],df["min_period"],df["max_period"],df["json_docid"]): _dict = {"tenderee":tenderee,"product":product,"last_time":last_time,"avg_period":avg_period,"min_period":min_period, "max_period":max_period,"json_docid":json_docid} task_queue.put(_dict) _count += 1 sstr_staff = getPayStaffName() ots_client = getConnect_ots() def _comsumer(_dict,result_queue,ots_client,sstr_staff,pool_conn): new_dict = {"招标人":_dict["tenderee"],"产品":_dict["product"],"上次招标":_dict["last_time"], "预计招标范围":"%s-%s"%(timeAdd(_dict["last_time"],_dict["min_period"]),timeAdd(_dict["last_time"],_dict["max_period"])), "周期":_dict["avg_period"],"历史招标":_dict["json_docid"]} aint_docid = json.loads(_dict["json_docid"]) aobj_should_q_docid = [] consumed, return_row, next_token = ots_client.get_row("enterprise",[("name",_dict["tenderee"])], ["contacts"], None, 1) dict_tmp = getRow_ots_primary(return_row) contacts = dict_tmp.get("contacts") phone_person,phone_no = getOneContact(contacts) new_dict["招标人联系人"] = phone_person new_dict["招标人联系电话"] = phone_no for int_docid in aint_docid: aobj_should_q_docid.append(TermQuery("docids",int_docid)) bool_query = BoolQuery(should_queries=aobj_should_q_docid) columns = ['win_tenderer','second_tenderer','third_tenderer'] rows,next_token,total_count,is_all_succeed = ots_client.search("project2","project2_index", SearchQuery(bool_query,limit=100,get_total_count=True), ColumnsToGet(columns,ColumnReturnType.SPECIFIED)) adict_rows = getRow_ots(rows) for dict_row in adict_rows: for _k,_company in dict_row.items(): if _k in columns and _company is not None and _company!="": _succeed = True new_dict1 = {} for k,v in new_dict.items(): new_dict1[k] = v new_dict1["潜在客户"] = _company consumed, return_row, next_token = ots_client.get_row("enterprise",[("name",_company)], ["contacts"], None, 1) dict_tmp = getRow_ots_primary(return_row) contacts = dict_tmp.get("contacts") phone_person,phone_no = getOneContact(contacts) new_dict1["潜在客户联系人"] = phone_person new_dict1["潜在客户联系电话"] = phone_no if _company in sstr_staff: company_info = sstr_staff[_company] new_dict1["付费客户"] = "是" conn = pool_conn.getConnector() try: cursor = conn.cursor() sql = " select name from bxkc.b2c_mall_staff_basic_info where userid='%s'"%(company_info.get("aftermarket","")) cursor.execute(sql) rows = cursor.fetchall() if len(rows)>0: new_dict1["归属客服"] = rows[0][0] else: new_dict1["归属客服"] = "" new_dict1["付费客户联系人"] = company_info.get("contactname","") new_dict1["付费客户电话"] = company_info.get("phone","") sql = " select date_FORMAT(etiem,\'%Y-%m-%d\') from bxkc.bxkc_member_term where userid='"+company_info.get("userid","")+"' and memberlevelid<>81 order by etiem desc limit 1" cursor.execute(sql) rows = cursor.fetchall() if len(rows)>0: etime = rows[0][0] new_dict1["付费客户到期日"] = etime if time.mktime(time.strptime(etime,"%Y-%m-%d"))>time.mktime(time.localtime()): new_dict1["付费客户到期"] = "否" else: new_dict1["付费客户到期"] = "是" else: new_dict1["付费客户到期日"] = "" new_dict1["付费客户到期"] = "" except Exception as e: traceback.print_exc() _succeed = False finally: pool_conn.putConnector(conn) else: new_dict1["付费客户"] = "否" new_dict1["归属客服"] = "" new_dict1["付费客户联系人"] = "" new_dict1["付费客户电话"] = "" new_dict1["付费客户到期日"] = "" new_dict1["付费客户到期"] = "" if _succeed: result_queue.put(new_dict1) mt = MultiThreadHandler(task_queue,_comsumer,result_queue,ots_client=ots_client,sstr_staff=sstr_staff,pool_conn=pool_conn,thread_count=30) mt.run() df_data = {} set_staff = set() while True: try: _dict = result_queue.get(timeout=1) tenderee = _dict.get("招标人","") product = _dict.get("产品","") staff = _dict.get("潜在客户","") _s = "%s-%s-%s"%(tenderee,product,staff) if _s in set_staff: continue set_staff.add(_s) for k,v in _dict.items(): if k not in df_data: df_data[k] = [] df_data[k].append(v) except Exception as e: break df1 = pd.DataFrame(df_data) df1.to_excel("../data/%s_周期项目.xlsx"%(getCurrent_date("%Y-%m-%d_%H%M%S"))) def appendCellphones(): file = "../data/" if __name__=="__main__": exportProject_by_pagetime() # exportProjectWithOneDocid() # exportCompanyByCycleProduct()