#coding:UTF8 import sys import os sys.path.append("../") import pandas as pd from dataSource.source import * import json from utils.multiThread import MultiThreadHandler import queue from utils.Utils import * from dataSource.pool import ConnectorPool import re from tablestore import * import traceback from export.exportUtils import generateBoolShouldQuery,splitIntoList data_path = "../data/" def getCompanys(): list_company = [] keywords = ["环境","生态","再生","回收","环保"] provinces = ["广东"] for _name in keywords: for _prov in provinces: data = make_elasticSearch({ "query": { "bool": { "must": [ { "wildcard": { "name.keyword": "*%s*"%_name } } # , # { # "term": { # "province.keyword": "%s"%_prov # } # } # , # { # "range": { # "zhongBiaoNumber": { # "gt": "0" # } # } # } ], "must_not": [ ], "should": [ ] } }, "from": 0, "size": 1000000, "sort": [ ], "aggs": { } }) print("--",data["hits"]["total"]) for item in data["hits"]["hits"]: _company = {"enterprise_name":"","regCapital":"","legal_person":"","phone":"","industry":"","province":""} _company["enterprise_name"] = item["_source"].get("name","") _company["regCapital"] = item["_source"].get("regCapital","") _company["zhongBiaoNumber"] = item["_source"].get("zhongBiaoNumber","0") list_company.append(_company) # data = make_elasticSearch({ # "query": { # "bool": { # "must": [ # { # "wildcard": { # "name.keyword": "*电商*" # } # } # , # { # "term": { # "province.keyword": "北京" # } # } # , # { # "range": { # "zhongBiaoNumber": { # "gt": "0" # } # } # } # ], # "must_not": [ ], # "should": [ ] # } # }, # "from": 0, # "size": 10000, # "sort": [ ], # "aggs": { } # }) # # for item in data["hits"]["hits"]: # _company = {"enterprise_name":"","regCapital":"","legal_person":"","phone":"","industry":"","province":""} # _company["enterprise_name"] = item["_source"].get("name","") # _company["regCapital"] = item["_source"].get("regCapital","") # list_company.append(_company) print(len(list_company)) return list_company def exportFactory(): def _handle(item,result_queue,pool_mongo,pool_neo4j): company_name = item["enterprise_name"] mongo = pool_mongo.getConnector() coll_zb = mongo.enterprise_profile rows = coll_zb.find({"enterprise_name":item["enterprise_name"]},{"enterprise_name":1, "actualCapital":1,"estiblishTime":1,"legal_person":1,"phone":1 }) _flag = False for row in rows: actualCapital = row.get("actualCapital","0") estiblishTime = row.get("estiblishTime","2020-01-01") _captial = re.match("\d+[亿万]+",actualCapital) # if _captial is not None: # if getUnifyMoney(_captial.group())>getUnifyMoney("5000万"): # if estiblishTime<="2015-10-09": item["legal_person"] = row.get("legal_person","") item["phone"] = row.get("phone","") item["actualCapital"] = actualCapital item["estiblishTime"] = row.get("estiblishTime","") _flag = True break if _flag: result_queue.put(item) cql = "MATCH (n:Organization)-[r:ZhongBiaoRelation]->(p:Project) where n.name='%s' RETURN count(p) as _c "%(company_name) graph = pool_neo4j.getConnector() finded = graph.run(cql) data = json.loads(json.dumps(finded.data())) _count = data[0]["_c"] # list_project = [] # for _data in data: # if _count<=3: # if "zhong_biao_page_time" in _data and _data["zhong_biao_page_time"]>"2019-01-01": # if _data["project_name"] is not None: # list_project.append(_data["project_name"]) # _count += 1 item["count"] = _count pool_mongo.putConnector(mongo) pool_neo4j.putConnector(graph) # list_company = getCompanys() list_company = [] filename = "../data/天眼查1(1).xlsx" df1 = pd.read_excel(filename) for item in df1["公司名称"]: list_company.append({"enterprise_name":item,"regCapital":"","legal_person":"","phone":"","industry":"","province":""}) task_queue = queue.Queue() result_queue = queue.Queue() for item in list_company: task_queue.put(item) pool_mongo = ConnectorPool(init_num=10,max_num=50,method_init=getConnect_mongodb) pool_neo4j = ConnectorPool(init_num=10,max_num=50,method_init=getConnect_neo4j) _mult = MultiThreadHandler(task_queue=task_queue,task_handler=_handle,result_queue=result_queue,thread_count=70,pool_mongo=pool_mongo,pool_neo4j=pool_neo4j) _mult.run() list_name = [] list_actualCapital = [] list_estiblishTime = [] list_legal_person = [] list_phone = [] list_zb = [] while(True): try: item = result_queue.get(False) list_name.append(item["enterprise_name"]) list_actualCapital.append(item["actualCapital"]) list_estiblishTime.append(item["estiblishTime"]) list_legal_person.append(item["legal_person"]) list_phone.append(item["phone"]) list_zb.append(item["count"]) except: break df = pd.DataFrame({"公司":list_name,"实缴":list_actualCapital, "注册时间":list_estiblishTime,"联系人":list_legal_person,"联系电话":list_phone, "中标次数":list_zb}) df.to_excel("%s"%filename+"_export.xlsx",columns=["公司","实缴","注册时间","联系人","联系电话","中标次数"]) def deal(): def _handle(item,result_queue): graph = getConnect_neo4j() company_name = item["enterprise_name"] cql = "MATCH (n:Organization)-[r:ZhongBiaoRelation]->(p:Project) where n.name='%s' RETURN p.zhong_biao_page_time as zhong_biao_page_time,p.project_name as project_name order by p.zhong_biao_page_time desc limit 3"%(company_name) finded = graph.run(cql) data = json.loads(json.dumps(finded.data())) _count = 1 list_project = [] for _data in data: if _count<=3: if "zhong_biao_page_time" in _data and _data["zhong_biao_page_time"]>"2019-01-01": list_project.append(_data["project_name"]) _count += 1 item["project"] = str(list_project) result_queue.put(item) file = "../data/北京行业_export.xls" df = pd.read_excel(file) list_company = [] for _company,rep,industry,project,count,person,phone in zip(df["公司名字"],df["注册资金"],df["行业"],df["中标项目"],df["中标次数"],df["联系人"],df["联系电话"]): list_company.append({"enterprise_name":_company,"regCapital":rep,"legal_person":person,"phone":phone,"industry":industry,"province":"","count":count}) task_queue = queue.Queue() result_queue = queue.Queue() for item in list_company: task_queue.put(item) _mult = MultiThreadHandler(task_queue=task_queue,task_handler=_handle,result_queue=result_queue,thread_count=30) _mult.run() list_name = [] list_regCapital = [] list_industry = [] list_count = [] list_person = [] list_phone = [] list_project = [] while(True): try: _result = result_queue.get(False) list_name.append(_result["enterprise_name"]) list_regCapital.append(_result["regCapital"]) list_industry.append(_result["industry"]) list_count.append(_result["count"]) list_person.append(_result["legal_person"]) list_phone.append(_result["phone"]) list_project.append(_result["project"]) except Exception as e: print(e) break df1 = pd.DataFrame({"公司名字":list_name,"注册资金":list_regCapital,"行业":list_industry,"中标项目":list_project,"中标次数":list_count,"联系人":list_person,"联系电话":list_phone}) df1.to_excel("%s_export1.xls"%("北京行业"),columns=["公司名字","注册资金","行业","中标项目","中标次数","联系人","联系电话"]) def deal1(): def _handle(item,result_queue): graph = getConnect_neo4j() company_name = item["enterprise_name"] cql = "MATCH (n:Organization)-[r:ZhongBiaoRelation]->(p:Project) where n.name='%s' RETURN p.zhong_biao_page_time as zhong_biao_page_time,p.project_name as project_name order by p.zhong_biao_page_time desc "%(company_name) finded = graph.run(cql) data = json.loads(json.dumps(finded.data())) _count = 0 list_project = [] for _data in data: if _count<=2: if "zhong_biao_page_time" in _data and _data["zhong_biao_page_time"]>"2019-01-01": list_project.append(_data["project_name"]) _count += 1 item["count"] = _count item["project"] = str(list_project) cql = "MATCH (n:Organization)-[r:ZhongBiaoRelation]->(p:Project) where n.name='%s' RETURN r.price"%(company_name) print(cql) finded = graph.run(cql) finded_money = json.loads(json.dumps(finded.data())) whole_money = 0 for _item in finded_money: if _item["r.price"] is not None: whole_money += getUnifyMoney(_item["r.price"]) item["whole_money"] = str(whole_money) result_queue.put(item) # filename = "数据导出需求9.11(1)(1).xlsx" filename = "../data/新建 XLSX 工作表(1).xlsx" df = pd.read_excel(filename) list_company = [] for _key in df.keys(): print(_key,len(df[_key])) for _company in df["公司名称"]: list_company.append({"enterprise_name":_company,"regCapital":"","legal_person":"","phone":"","industry":"","province":"","count":0}) task_queue = queue.Queue() result_queue = queue.Queue() for item in list_company: task_queue.put(item) _mult = MultiThreadHandler(task_queue=task_queue,task_handler=_handle,result_queue=result_queue,thread_count=30) _mult.run() _dict_item = {} while(True): try: item = result_queue.get(False) if item["enterprise_name"]!="": _dict_item[item["enterprise_name"]] = item except Exception as e: print(str(e)) break list_count = [] list_project = [] list_money = [] list_zb = [] for _company in df["公司名称"]: if _company in _dict_item: list_count.append(_dict_item[_company]["count"]) list_project.append(_dict_item[_company]["project"]) list_money.append(_dict_item[_company]["whole_money"]) list_zb.append("是" if _dict_item[_company]["count"]>0 else "否") else: print(_company) list_count.append(0) list_project.append("") list_money.append("0") list_zb.append("否") print(len(list_count),len(list_project),len(list_money),len(list_zb)) df2 = pd.DataFrame({"公司名称":df["公司名称"],"次数":list_count}) df2.to_excel("%s_export.xls"%filename) # df1 = pd.DataFrame({"月份":df["月份"],"电话":df["电话"],"公司名字":df["公司名字"],"开通时间":df["开通时间"], # "到期时间":df["到期时间"],"客户公司注册时间":df["客户公司注册时间"],"客户公司注册资金":df["客户公司注册资金"], # "实际缴费资金":df["实际缴费资金"],"天眼查行业分类":df["天眼查行业分类"],"是否中标":list_zb, # "中标次数":list_count,"中标项目|3个":list_project,"中标金额":list_money,"客户设置关键词":df["客户设置关键词"],"客户搜索词":df["客户搜索词"].xls}) # df1.to_excel("%s_补充.xls"%filename,columns=["月份","电话","公司名字", "开通时间" ,"到期时间" ,"客户公司注册时间" ,"客户公司注册资金" ,"实际缴费资金" ,"天眼查行业分类" ,"是否中标" ,"中标次数" ,"中标项目|3个" ,"中标金额" ,"客户设置关键词" ,"客户搜索词"]) def deal3(): filename = "../data/导出工厂.xlsx" df = pd.DataFrame(filename) count = 0 for item in df["实缴"]: if getUnifyMoney(item)>getUnifyMoney("5000万"): count += 1 print(count) def exportEnterpriseByName(): df = pd.read_csv("../data/中标家具公司.csv",encoding="GBK") def _handle(item,result_queue,pool_ots): ots_client = pool_ots.getConnector() primary_key = [('name',str(item["name"]))] columns_to_get = ["reg_capital","actual_capital","contacts","industry","estiblish_time","social_staff_num","business_scope","zhong_biao_number"] consumed, return_row, next_token = ots_client.get_row("enterprise",primary_key, columns_to_get, None, 1) print(return_row) for _item in return_row.attribute_columns: if _item[0]=="contacts": a = json.loads(_item[1]) for i in a: if i.get("mobile_no","")==item["phone"] or i.get("phone_no","")==item["phone"]: item["contact_person"] = i.get("contact_person","") else: item[_item[0]] = _item[1] list_dict = [] for name,phone in zip(df["name"],df["phone"]): list_dict.append({"name":name,"phone":phone}) task_queue = queue.Queue() for item in list_dict: task_queue.put(item) result_queue = queue.Queue() pool_ots = ConnectorPool(init_num=10,max_num=30,method_init=getConnect_ots) mt = MultiThreadHandler(task_queue=task_queue,task_handler=_handle,result_queue=result_queue,thread_count=70,pool_ots=pool_ots) mt.run() columns = ["name","contact_person","phone","reg_capital","actual_capital","industry","estiblish_time","social_staff_num","business_scope","zhong_biao_number"] df_data = {} for _c in columns: df_data[_c] = [] for item in list_dict: for _key in columns: df_data[_key].append(item.get(_key,"")) df1 = pd.DataFrame(df_data) df1.to_csv("中标家具公司1.csv") def getCompanys(): conn = getConnection_mysql() cursor = conn.cursor() sql = '''select C.login_id as 登陆名,B.company ,B.contactname as 联系人,B.phone as 联系电话 ,(select MLEVELNAME from sys_memberlevel where id =A.memberlevelid) as 会员等级,( select name from b2c_mall_staff_basic_info where userid=B.aftermarket) as 售后客服 from bxkc.bxkc_member_term A,bxkc.b2c_mall_staff_basic_info B,bxkc.b2c_user_login_info C where A.USERID=B.USERID and B.USERID=C.USERID and B.innerOrg like '广州%' and A.memberlevelid!=81 and A.status='01' and str_to_date('2020-11-20','%Y-%m-%d') between A.stime and A.etiem ; ''' cursor.execute(sql) vol = cursor.description list_company = [] rows = cursor.fetchall() for row in rows: _company = {} for _vol,_value in zip(vol,row): _name = _vol[0] _company[_name] = _value list_company.append(_company) return list_company def exportEnterprise_byindustry(page_time, columns = ["name","address","business_scope","province","city","district","reg_capital","phone","estiblish_time"], keywords = ["钢材","水泥","五金","水电","暖通","暖气","电缆"]): list_should_q = [] for _key in keywords: list_should_q.append(WildcardQuery("industry","*%s*"%_key)) list_should_q.append(WildcardQuery("nicknames","*%s*"%_key)) key_query = BoolQuery(should_queries=list_should_q) #WildcardQuery("industry","*建筑*") ots_client = getConnect_ots() bool_query = BoolQuery(must_queries=[RangeQuery("bidi_id",0,include_lower=True), key_query, RangeQuery("estiblish_time",range_to="2017-01-01")]) rows, next_token, total_count, is_all_succeed = ots_client.search("enterprise", "enterprise_index", SearchQuery(bool_query, limit=100, get_total_count=True), ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED)) all_rows = 0 df_data = {} for key in columns: df_data[key] = [] for row in rows: _dict = dict() for part in row: for item in part: _dict[item[0]] = item[1] for key in columns: df_data[key].append(_dict.get(key,"")) # if "reg_capital" in _dict: # _money = re.match("\d+[万亿千百十]",_dict["reg_capital"]) # if _money is not None: # if getUnifyMoney(_money.group())>2000000: # for key in columns: # df_data[key].append(_dict.get(key,"")) all_rows += len(rows) # print(next_token) while(next_token): rows, next_token, total_count, is_all_succeed = ots_client.search("enterprise", "enterprise_index", SearchQuery(bool_query, next_token=next_token,limit=100, get_total_count=True), ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED)) for row in rows: _dict = dict() for part in row: for item in part: _dict[item[0]] = item[1] for key in columns: df_data[key].append(_dict.get(key,"")) # if "reg_capital" in _dict: # _money = re.match("\d+[万亿千百十]",_dict["reg_capital"]) # if _money is not None: # if getUnifyMoney(_money.group())>2000000: # for key in columns: # df_data[key].append(_dict.get(key,"")) all_rows += len(rows) print(all_rows,total_count,len(df_data[columns[0]])) df = pd.DataFrame(df_data) df.to_csv("../data/enterprise_2017_a.csv",columns=columns) def getTyc_company(): root_path = ["G:/文档/tyc国企","G:/文档/tyc机构"] list_files = [] for _path in root_path: for file in os.listdir(_path): list_files.append(os.path.join(_path,file)) list_files = ["G:/文档/tyc机构\\高级搜索导出数据结果—自定义条件—天眼查(W20011656561610789770227).xlsx"] pool_mysql = ConnectorPool(method_init=getConnection_testmysql,init_num=10,max_num=30) task_queue = queue.Queue() result_queue = queue.Queue() for _file in list_files: task_queue.put(_file) def _handle(_file,task_queue,pool_mysql): print("handle",_file) conn = pool_mysql.getConnector() cursor = conn.cursor() df = pd.read_excel(_file,header=2) for name,social_credit,identification,regist_num,organization_code in zip(df["公司名称"],df["统一社会信用代码"],df["纳税人识别号"],df["注册号"],df["组织机构代码"]): try: sql = " insert into Enterprise(name,social_credit,identification,regist_num,organization_code) values ('%s','%s','%s','%s','%s')"%(name,social_credit,identification,regist_num,organization_code) cursor.execute(sql) except Exception as e: print("error") conn.commit() pool_mysql.putConnector(conn) mt = MultiThreadHandler(task_queue,_handle,result_queue,20,pool_mysql=pool_mysql) mt.run() set_columns = set() list_df_columns = [] def set_dict_item(_dict,name,v): _dict[name] = getLegal_str(v) if name not in set_columns: set_columns.add(name) list_df_columns.append(getLegal_str(name)) def exportEnterprise_by_bidNum(): columns = ["name","contacts","province","city","address","reg_location"] list_data = [] ots_client = getConnect_ots() bool_query = BoolQuery(must_not_queries=[ ExistsQuery("tyc_id"), RangeQuery("bid_number",1), RangeQuery("status",401,451), BoolQuery(should_queries=[NestedQuery("contacts",ExistsQuery("contacts.phone_no")), NestedQuery("contacts",ExistsQuery("contacts.mobile_no"))]) ]) for _prov in ["北京","天津"]: bool_query = BoolQuery(must_queries=[BoolQuery(should_queries=[TermQuery("province",_prov)]), BoolQuery(should_queries=[MatchPhraseQuery("nicknames","地产"),MatchPhraseQuery("nicknames","酒店")]), NestedQuery("contacts",WildcardQuery("contacts.mobile_no","1*"))]) # # bool_query = BoolQuery(must_queries=[MatchPhraseQuery("nicknames","物资回收"), # TermQuery("province","贵州")] # ,must_not_queries=[ExistsQuery("tyc_id"),NestedQuery("contacts",ExistsQuery("contacts"))] # ) rows, next_token, total_count, is_all_succeed = ots_client.search("enterprise", "enterprise_index", SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("tyc_id",SortOrder.ASC)]), limit=100, get_total_count=True), ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED)) def getData(df_data,rows): list_dict = getRow_ots(rows) for _dict in list_dict: print(_dict) for mobile_person,mobile_no in getMobiles(_dict.get("contacts","[]")): # for contact_person,mobile_no in getMobiles(_dict.get("contacts","[{}]")): _d = {} set_dict_item(_d,"名称",_dict.get("name","")) set_dict_item(_d,"省份",_dict.get("province","")) set_dict_item(_d,"城市",_dict.get("city","")) set_dict_item(_d,"联系人",mobile_person) set_dict_item(_d,"手机",mobile_no) list_data.append(_d) # _d = {} # set_dict_item(_d,"名称",_dict.get("name","")) # set_dict_item(_d,"省份",_dict.get("province","")) # set_dict_item(_d,"城市",_dict.get("city","")) # list_data.append(_d) # mobile_person,mobile_no = getOneContact(_dict.get("contacts")) # if mobile_no!="": # set_dict_item(_d,"联系人",mobile_person) # set_dict_item(_d,"手机",mobile_no) # # _address = _dict.get("address","") # # reg_location = _dict.get("reg_location","") # # if _address=="": # # _address = reg_location # # set_dict_item(_d,"地址",_address) # list_data.append(_d) getData(df_data,rows) _count = len(rows) while(next_token): print("%d/%d"%(_count,total_count)) rows, next_token, total_count, is_all_succeed = ots_client.search("enterprise", "enterprise_index", SearchQuery(bool_query, next_token=next_token,limit=100, get_total_count=True), ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED)) getData(df_data,rows) _count += len(rows) if _count>=300: break df_data = {} for item in list_data: for k in list_df_columns: if k not in df_data: df_data[k] = [] df_data[k].append(item.get(k)) df = pd.DataFrame(df_data) df.to_excel("../data/%s_enterprise_bidinum.xlsx"%getCurrent_date("%Y-%m-%d_%H%M%S"),columns=list_df_columns) def make_Legal_enterprise(): import codecs def format(_e): if _e is None: return None if not isinstance(_e,str): return None if re.search("^[a-zA-Z0-9]+$",_e) is not None: return None if re.search("[<《]>-。\-\.\?]",_e) is not None: return None _e1 = re.sub("\s+","",_e.replace("(","(").replace(")",")")) if re.search("[省市区县乡镇]$",_e) is not None: return None if len(_e1)>=4: return _e1 return None set_enterprise = set() df = pd.read_csv("../data/other/enterprise_bidinum.csv", encoding="GBK") _count = 0 for _e in df["name"]: _count += 1 if _count%10000==0: print(_count) _e1 = format(_e) if _e1 is not None: set_enterprise.add(_e1) conn = getConnection_testmysql() cursor = conn.cursor() sql = " select name from Enterprise " cursor.execute(sql) rows = cursor.fetchmany(10000) while rows: for row in rows: _count += 1 if _count%10000==0: print(_count) _e = row[0] _e1 = format(_e) if _e1 is not None: set_enterprise.add(_e1) rows = cursor.fetchmany(10000) with codecs.open("../data/other/LEGAL_ENTERPRISE.txt", "w", encoding="UTF8") as f: for _e in list(set_enterprise): f.write(_e+"\n") def getDictEnterprise(list_enterprise,columns_to_get = ["reg_capital","actual_capital","industry","estiblish_time","social_staff_num","zhong_biao_number","tou_biao_number","credit_code"]): task_queue = queue.Queue() result_queue= queue.Queue() for _enterprise in list_enterprise: task_queue.put(str(_enterprise)) def _handle(item,result_queue,pool_ots): ots_client = pool_ots.getConnector() try: primary_key = [("name",item)] consumed,return_row,next_token = ots_client.get_row("enterprise",primary_key,columns_to_get,None,1) dict_data = getRow_ots_primary(return_row) if dict_data is not None: result_queue.put({item:dict_data}) except Exception as e: traceback.print_exc() pool_ots.putConnector(ots_client) pool_ots = ConnectorPool(init_num=10,max_num=50,method_init=getConnect_ots) mt = MultiThreadHandler(task_queue=task_queue,task_handler=_handle,result_queue=result_queue,thread_count=50,pool_ots=pool_ots) mt.run() dict_enterprise = {} while True: try: _dict = result_queue.get(False) for k,v in _dict.items(): dict_enterprise[k] = v except Exception as e: break return dict_enterprise def getOneContact(contacts,tojson=True,mobile_first=True,mobile_only=True): mobile_person = "" mobile_no = '' phone_person = "" phone_no = '' if contacts is None: return "","" try: if tojson: list_contacts = json.loads(contacts) else: list_contacts = contacts for _contact in list_contacts: if _contact.get("mobile_no","")!="": mobile_person = _contact.get("contact_person","") mobile_no = _contact.get("mobile_no","") if _contact.get("phone_no","")!="": phone_person = _contact.get("contact_person","") phone_no = _contact.get("phone_no","") if mobile_first: if mobile_no!="": return mobile_person,mobile_no else: if mobile_only: return mobile_person,mobile_no except Exception as e: pass return phone_person,phone_no def getMobiles(contacts,to_json=True): if to_json: list_contacts = json.loads(contacts) else: list_contacts = contacts list_result = [] for _c in list_contacts: if _c.get("mobile_no","")!="": list_result.append([_c.get("contact_person",""),_c.get("mobile_no")]) return list_result def getEnterpriseData(list_enterprise,df_data): def getEnterpriseData(list_enterprise,df_data): for _e in list_enterprise: _dict = {} set_dict_item(_dict,"公司名称",_e.get("name")) set_dict_item(_dict,"省份",_e.get("province")) set_dict_item(_dict,"城市",_e.get("city")) set_dict_item(_dict,"法人",_e.get("legal_person")) set_dict_item(_dict,"法人电话",_e.get("phone")) _match = re.search("^1\d{10}",_e.get("phone","")) set_dict_item(_dict,"是否手机","是" if _match is not None else "否") # set_dict_item(_dict,"企业属性",v.get("business_scope","")) # set_dict_item(_dict,"行业",v.get("industry","")) # contact_person,mobile_no = getOneContact(v.get("contacts",'[]')) # set_dict_item(_dict,"所有联系方式",v.get("contacts")) # set_dict_item(_dict,"联系人",contact_person) # set_dict_item(_dict,"手机号",mobile_no) # set_dict_item(_dict,"注册时间",v.get("estiblish_time","")) # set_dict_item(_dict,"注册资金",v.get("reg_capital","")) # set_dict_item(_dict,"bid_number",v.get("bid_number",0)) # set_dict_item(_dict,"招标次数",v.get("zhao_biao_number",0)) # set_dict_item(_dict,"投标次数",v.get("tou_biao_number",0)) # set_dict_item(_dict,"中标次数",v.get("zhong_biao_number",0)) # set_dict_item(_dict,"主营产品",v.get("products","")) for k,v in _dict.items(): if k not in df_data: df_data[k] = [] df_data[k].append(v) def exportEnterprise(): def getEnterpriseData(list_enterprise,df_data): for _e in list_enterprise: _dict = {} set_dict_item(_dict,"公司名称",_e.get("name")) bool_query = BoolQuery(must_queries=[ TermQuery("enterprise_name",_e.get("name")) ]) rows,next_token,total_count,is_all_succeed = ots_client.search("enterprise_contact","enterprise_contact_index", SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("score",SortOrder.DESC)]),limit=5,get_total_count=False), ColumnsToGet(["contact_person","position","phone_no"],ColumnReturnType.SPECIFIED)) list_row = getRow_ots(rows) for _i in range(1,6): if _i-1=200: break # dict_enterprise = getDictEnterprise(data[_name_c][:1050000], df_data = {} getEnterpriseData(list_enterprise,df_data) df = pd.DataFrame(df_data) df.to_excel("../data/%s企业导出.xlsx"%getCurrent_date("%Y-%m-%d_%H%M%S"),columns=list_df_columns) import numpy as np def exportEnterprise_by_phone(): ots_client = getConnect_ots() filename = "C:\\Users\\Administrator\\Desktop\\用户数据0910.xlsx" df = pd.read_excel(filename) astr_phone = df["手机"] all_count = 0 _begin = 0 int_count = 0 while _begin<5582: # should_q = [] # print("-=") # for str_phone,str_enter,int_throw,int_search in zip(astr_phone[_begin:_begin+100],df["公司名称"][_begin:_begin+100],df["浏览条数"][_begin:_begin+100],df["搜索次数"][_begin:_begin+100]): # if str(str_phone) !="nan" and str(str_enter)!="nan" and str(int_search)=="nan" and str(int_throw)!="nan": # int_count += 1 # print(str_phone,str_enter,int_throw,int_search) # _phone = str(int(str_phone)) # # should_q.append(NestedQuery("contacts",TermQuery("contacts.mobile_no",_phone))) # should_q.append(MatchPhraseQuery("nicknames",str(str_enter))) # _begin += 100 # if should_q: # bool_query = BoolQuery(should_queries=should_q) # rows,next_token,total_count,is_all_succeed = ots_client.search("enterprise","enterprise_index", # SearchQuery(bool_query,get_total_count=True), # columns_to_get=ColumnsToGet(["nicknames"],ColumnReturnType.SPECIFIED)) try: str_enter = str(df["公司名称"][_begin]) consumed, return_row, next_token = ots_client.get_row("enterprise",[('name',str_enter)], ["nicknames"], None, 1) rows = getRow_ots_primary(return_row) total_count = len(rows) _begin += 1 int_count += 1 if total_count>0: all_count += total_count print("===",str_enter,int_count,all_count) except Exception as e: pass print("===",int_count,all_count) def attachColumn(): filename = "../data/中标单位.xlsx" list_data = {} list_enterprise = [] df1 = pd.read_excel(filename) for _name in df1["中标单位"]: list_enterprise.append(_name) d_e = getDictEnterprise(list_enterprise,["legal_person","phone"]) df_data = {} columns = ["name","legal_person","phone"] for _name in list_enterprise: for _c in columns: if _c not in df_data: df_data[_c] = [] df_data[_c].append(d_e.get(_name).get(_c)) df = pd.DataFrame(df_data) df.to_excel("%s.attach.xlsx"%(filename)) def transform_enterprise(): conn_source = getConnection_testmysql() conn_target = getConnection_oracle() cursor_source = conn_source.cursor() cursor_target = conn_target.cursor() sql = " select name,province,city,credit_code,org_number,tax_number from enterprise_build " cursor_source.execute(sql) rows_source = cursor_source.fetchmany(10) excepted = False _index = 0 while True: try: if excepted: print("==") for _r in rows_source: _sql = " insert into BXKC.COMPANY_NAME_INFO(COMPANY_NAME,PROVINCE,CITY,TAX_NUM,ORG_NUM,CREDIT_CODE) values ('%s','%s','%s','%s','%s','%s')"%(_r[0],_r[1],_r[2],_r[5],_r[4],_r[3]) _sql = _sql.replace("None","") cursor_target.execute(_sql) conn_target.commit() excepted = False else: _sql = " INSERT ALL" for _r in rows_source: _sql += " into BXKC.COMPANY_NAME_INFO(COMPANY_NAME,PROVINCE,CITY,TAX_NUM,ORG_NUM,CREDIT_CODE) values ('%s','%s','%s','%s','%s','%s') "%(_r[0],_r[1],_r[2],_r[5],_r[4],_r[3]) _sql = _sql +" select 1 from dual " _sql = _sql.replace("None","") cursor_target.execute(_sql) conn_target.commit() excepted = False except Exception as e: excepted = True traceback.print_exc() rows_source = cursor_source.fetchmany(1000) _index += 1 print(_index,excepted) if not rows_source or len(rows_source)==0: break def exportEnterprise_GMV(): task_queue = queue.Queue() ots_client = getConnect_ots() bool_query = BoolQuery(must_queries=[ RangeQuery("zhong_biao_number",20,100) ]) rows,next_token,total_count,is_all_succeed = ots_client.search("enterprise","enterprise_index", SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("zhong_biao_number")]),limit=100,get_total_count=True), ColumnsToGet(["zhao_biao_number"],ColumnReturnType.SPECIFIED)) list_dict = getRow_ots(rows) for _dict in list_dict: task_queue.put(_dict) while next_token: rows,next_token,total_count,is_all_succeed = ots_client.search("enterprise","enterprise_index", SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True), ColumnsToGet(["zhao_biao_number"],ColumnReturnType.SPECIFIED)) list_dict = getRow_ots(rows) for _dict in list_dict: task_queue.put(_dict) if task_queue.qsize()>=10000: break def _handle(_dict,result_queue,ots_client): name = _dict.get("name") bool_query = BoolQuery(must_queries=[ RangeQuery("page_time","2020-01-01","2021-12-31",True,True), TermQuery("win_tenderer",name) ]) rows,next_token,total_count,is_all_succeed = ots_client.search("project2","project2_index", SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("page_time",SortOrder.DESC)]),limit=100,get_total_count=True), ColumnsToGet(["page_time","win_bid_price"],ColumnReturnType.SPECIFIED)) list_rows = getRow_ots(rows) _dict["c3"] = 0 _dict["c6"] = 0 _dict["c12"] = 0 _dict["c24"] = 0 for _row in list_rows: page_time = _row.get("page_time") win_bid_price = _row.get("win_bid_price",0) if page_time>="2021-10-01": _dict["c3"] += win_bid_price _dict["c6"] += win_bid_price _dict["c12"] += win_bid_price _dict["c24"] += win_bid_price elif page_time>="2021-07-01": _dict["c6"] += win_bid_price _dict["c12"] += win_bid_price _dict["c24"] += win_bid_price elif page_time>="2021-01-01": _dict["c12"] += win_bid_price _dict["c24"] += win_bid_price else: _dict["c24"] += win_bid_price while next_token: ows,next_token,total_count,is_all_succeed = ots_client.search("project2","project2_index", SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("page_time",SortOrder.DESC)]),limit=100,get_total_count=True), ColumnsToGet(["page_time","win_bid_price"],ColumnReturnType.SPECIFIED)) list_rows = getRow_ots(rows) for _row in list_rows: page_time = _row.get("page_time") win_bid_price = _row.get("win_bid_price",0) if page_time>="2021-10-01": _dict["c3"] += win_bid_price _dict["c6"] += win_bid_price _dict["c12"] += win_bid_price _dict["c24"] += win_bid_price elif page_time>="2021-07-01": _dict["c6"] += win_bid_price _dict["c12"] += win_bid_price _dict["c24"] += win_bid_price elif page_time>="2021-01-01": _dict["c12"] += win_bid_price _dict["c24"] += win_bid_price else: _dict["c24"] += win_bid_price result_queue.put(_dict) result_queue = queue.Queue() mt = MultiThreadHandler(task_queue,_handle,result_queue,30,ots_client=ots_client) mt.run() list_item = [] while True: try: _dict = result_queue.get(False) list_item.append(_dict) except Exception as e: break df_data = {"公司名称":[], "近3个月营收":[], "近6个月营收":[], "近12个月营收":[], "近24个月营收":[]} for _dict in list_item: df_data["公司名称"].append(_dict.get("name")) df_data["近3个月营收"].append(_dict.get("c3")) df_data["近6个月营收"].append(_dict.get("c6")) df_data["近12个月营收"].append(_dict.get("c12")) df_data["近24个月营收"].append(_dict.get("c24")) df = pd.DataFrame(df_data) df.to_excel("蚂蚁测试数据.xlsx",columns=["公司名称","近3个月营收","近6个月营收","近12个月营收","近24个月营收"]) def attachColumn1(): filename = "全国剩下数据16570-1(2).xlsx" df = pd.read_excel(filename) list_enter = list(set(df["公司名"])) dict_en = getDictEnterprise(list_enter) list_zhongbiao = [] for company in df["公司名"]: _zb = dict_en.get(company,{}).get("zhong_biao_number",0) if _zb>0: _c = "是" else: _c = "否" list_zhongbiao.append(_c) df["是否中标"] = list_zhongbiao df.to_excel("全国剩下数据16570-1(2)11.xlsx") def exportContact(): filename = "../data/2023-03-06_190109_to_excel.xlsx" df = pd.read_excel(filename) list_ename = df["_id"] list_dict = [] for _en in list_ename: if isinstance(_en,(str)) and _en!="": _dict = {"enterprise_name":_en} list_dict.append(_dict) task_queue = queue.Queue() for _d in list_dict: task_queue.put(_d) ots_client = getConnect_ots() def _handle(_d,result_queue): _name = _d["enterprise_name"] bool_query = BoolQuery(must_queries=[TermQuery("name",_name)]) rows,next_token,total_count,is_all_succeed = ots_client.search("enterprise","enterprise_index", SearchQuery(bool_query,limit=1), columns_to_get=ColumnsToGet(["reg_location"],return_type=ColumnReturnType.SPECIFIED)) l_data = getRow_ots(rows) if len(l_data)>0: _d.update(l_data[0]) bool_query = BoolQuery(must_queries=[TermQuery("enterprise_name",_name), BoolQuery(should_queries=[TermQuery("is_legal_person",1), TermQuery("is_mobile",1)])]) rows,next_token,total_count,is_all_succeed = ots_client.search("enterprise_contact","enterprise_contact_index", SearchQuery(bool_query,limit=5), columns_to_get=ColumnsToGet(["enterprise_name","contact_person","phone_no","position"],return_type=ColumnReturnType.SPECIFIED)) l_data = getRow_ots(rows) if len(l_data)>0: _d.update(l_data[0]) mt = MultiThreadHandler(task_queue,_handle,None,60) mt.run() df_data= {} columns = ["name","contact_person","phone_no","reg_location"] for _d in list_dict: if "phone_no" in _d: for c in columns: if c not in df_data: df_data[c] = [] df_data[c].append(getLegal_str(_d.get(c,""))) df = pd.DataFrame(df_data) df.to_excel("../data/%s_export_enterprise.xlsx"%(getCurrent_date(format="%Y-%m-%d_%H%M%S")),encoding="utf",columns=columns) def getTycCompany(): filename = "公司地址(1).xlsx" df = pd.read_excel(filename) list_name = df["name"] task_queue = queue.Queue() list_data = [] for _i in range(len(list_name)): _name = list_name[_i] _d = {"企业名称":_name, "地址":df["address"][_i], "注册地址":df["reg_location"][_i]} task_queue.put(_d) list_data.append(_d) ots_client = getConnect_ots() columns = ["legal_person","phone_number"] def _handle(item,result_queue): try: bool_query = BoolQuery(must_queries=[TermQuery("name",item.get("企业名称"))]) rows, next_token,total_count,is_all_succeed = ots_client.search("enterprise","enterprise_index", SearchQuery(bool_query,limit=1),columns_to_get=ColumnsToGet(column_names=columns,return_type=ColumnReturnType.SPECIFIED)) item["count"] = len(getRow_ots(rows)) if item["count"]==1: _d = getRow_ots(rows)[0] item["法人"] = _d.get("legal_person") item["法人电话"] = _d.get("phone_number") # item["简称"] = _d.get("alias") item["营业状态"] = _d.get("reg_status") bool_query = BoolQuery(must_queries=[ RangeQuery("status",201,301), generateBoolShouldQuery(["doctitle","doctextcon","attachmenttextcon"],[item.get("企业名称")],MatchPhraseQuery) ]) rows, next_token,total_count,is_all_succeed = ots_client.search("document","document_index", SearchQuery(bool_query,limit=1,get_total_count=True),columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE)) item["公告数量"] = total_count bool_query = BoolQuery(must_queries=[ TermQuery("status",1), TermQuery("enterprise_name",item.get("企业名称")), ]) rows, next_token,total_count,is_all_succeed = ots_client.search("enterprise_contact","enterprise_contact_index", SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("score",SortOrder.DESC)]),limit=10,get_total_count=True), columns_to_get=ColumnsToGet(["contact_person","phone_no"],return_type=ColumnReturnType.SPECIFIED)) list_concat = getRow_ots(rows) concat = "" for data_i in range(len(list_concat)): data = list_concat[data_i] concat += "联系人%d%s(%s)\n"%(data_i+1,data.get("contact_person",""),data.get("phone_no","")) item["联系人"] = concat except Exception: traceback.print_exc() mt = MultiThreadHandler(task_queue,_handle,None,30) mt.run() columns = ["企业名称","法人","法人电话","地址","注册地址","公告数量","联系人"] df_data = {} for data in list_data: for c in columns: if c not in df_data: df_data[c] = [] df_data[c].append(data.get(c)) df = pd.DataFrame(df_data) df.to_excel("%s.xlsx"%filename,columns=columns) if __name__=="__main__": # getTyc_company() getTycCompany() # exportEnterprise_by_bidNum() # print(getDictEnterprise(["南宁宏基建筑工程有限责任公司"],["phone"])) # exportEnterprise_by_phone() # make_Legal_enterprise() # transform_enterprise() # exportEnterprise() # exportContact() # attachColumn() # attachColumn() # ots_client = getConnect_ots() # bool_query = BoolQuery(must_queries=[RangeQuery("tyc_id",1,include_lower=True), # RangeQuery("bid_number",4,include_lower=True) # ]) # bool_query = BoolQuery(must_queries=[TermQuery("bid_number",0)], # must_not_queries=[ExistsQuery("tyc_id"),NestedQuery("contacts",ExistsQuery("contacts"))]) # # # columns = ["name","contacts","province","city","address","reg_location"] # rows, next_token, total_count, is_all_succeed = ots_client.search("enterprise", "enterprise_index", # SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("tyc_id",SortOrder.ASC)]), limit=100, get_total_count=True), # ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED)) # print(total_count) # exportEnterprise_GMV()