exportProject.py 21 KB


  1. #encoding:GBK
  2. import sys
  3. import os
  4. sys.path.append("../")
  5. import pandas as pd
  6. from dataSource.source import *
  7. import json
  8. from utils.multiThread import MultiThreadHandler
  9. import queue
  10. from utils.Utils import *
  11. from dataSource.pool import ConnectorPool
  12. import re
  13. from tablestore import *
  14. import traceback
  15. from utils.hashUtil import aesCipher
  16. from export.exportEnterprise import getDictEnterprise,getOneContact
  17. from export.exportUtils import generateBoolShouldQuery
  18. data_path = "../data/"
  19. set_columns = set()
  20. list_df_columns = []
  21. def set_dict_item(_dict,name,v):
  22. _dict[name] = getLegal_str(v)
  23. if name not in set_columns:
  24. set_columns.add(name)
  25. list_df_columns.append(getLegal_str(name))
  26. def getDict_docchannel():
  27. conn = getConnection_mysql()
  28. cursor = conn.cursor()
  29. sql = "select channel_id,chnlname from sys_channel "
  30. cursor.execute(sql)
  31. rows = cursor.fetchall()
  32. _dict = dict()
  33. for row in rows:
  34. _dict[row[0]] = row[1]
  35. return _dict
  36. def exportProject_by_pagetime():
  37. # filename = "../data/重复公告.xlsx"
  38. # df = pd.read_excel(filename)
  39. ots_client = getConnect_ots()
  40. set_enter = set()
  41. str_enter = '''
  42. 成都四方伟业软件股份有限公司
  43. 北京数字冰雹信息技术有限公司
  44. 北京睿呈时代信息科技有限公司
  45. 北京五一视界数字孪生科技股份有限公司
  46. 易达云图(深圳)科技有限公司
  47. 北京优锘科技有限公司
  48. 深圳市鸿普森科技股份有限公司
  49. 厦门图扑软件科技有限公司
  50. 四川相数科技有限公司
  51. '''
  52. for a in re.split("\s+",str_enter):
  53. if a.strip()!="":
  54. set_enter.add(a.strip())
  55. columns = ["docids","doctitle","docchannel","bidway","province","city","district","info_type","page_time","crtime","project_code","tenderee","project_name","agency","sub_docs_json","tenderee_contact","tenderee_phone","doctextcon","product","moneysource","win_bid_price","win_tenderer","bidding_budget"]
  56. columns = ["page_time","province","city","win_tenderer"]
  57. dict_channel = getDict_docchannel()
  58. def getData(df_data,rows,set_line):
  59. list_data = getRow_ots(rows)
  60. for row in list_data:
  61. item = {}
  62. _dict = row
  63. set_dict_item(item,"docids",_dict.get("docids",""))
  64. set_dict_item(item,"项目名称",_dict.get("project_name",""))
  65. set_dict_item(item,"项目编号",_dict.get("project_code",""))
  66. # set_dict_item(item,"公告标题",_dict.get("doctitle",""))
  67. # set_dict_item(item,"公告类别",dict_channel.get(_dict.get("docchannel",""),""))
  68. set_dict_item(item,"省份",_dict.get("province",""))
  69. # item["区域"] = "%s-%s-%s"%(_dict.get("province",""),_dict.get("city",""),_dict.get("district",""))
  70. set_dict_item(item,"城市",_dict.get("city",""))
  71. set_dict_item(item,"发布时间",_dict.get("page_time",""))
  72. set_dict_item(item,"公告标题_refine",re.sub(r'工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|合同', '', _dict.get("doctitle","")))
  73. set_dict_item(item,"招标单位",_dict.get("tenderee",""))
  74. set_dict_item(item,"招标联系人",_dict.get("tenderee_contact",""))
  75. set_dict_item(item,"招标联系人电话",_dict.get("tenderee_phone",""))
  76. set_dict_item(item,"代理单位",_dict.get("agency",""))
  77. set_dict_item(item,"代理联系人",_dict.get("agency_contact",""))
  78. set_dict_item(item,"代理联系人电话",_dict.get("agency_phone",""))
  79. # set_dict_item(item,"比地招标公告地址","http://www.bidizhaobiao.com/excel_detail.do?code=%s"%(str(aesCipher.encrypt('{"docid":%d}'%_dict.get("docid")))))
  80. set_dict_item(item,"招标金额",_dict.get("bidding_budget",""))
  81. set_dict_item(item,"中标金额",_dict.get("win_bid_price",""))
  82. set_dict_item(item,"中标单位",_dict.get("win_tenderer",""))
  83. sub_docs_json = _dict.get("sub_docs_json")
  84. if sub_docs_json is not None:
  85. for _doc in json.loads(sub_docs_json):
  86. if "win_tenderer" in _doc:
  87. set_dict_item(item,"中标单位",_doc["win_tenderer"])
  88. if "win_tenderee_manager" in _doc:
  89. set_dict_item(item,"中标单位联系人",_doc["win_tenderee_manager"])
  90. if "win_tenderee_phone" in _doc:
  91. set_dict_item(item,"中标单位联系电话",_doc["win_tenderee_phone"])
  92. if "win_bid_price" in _doc and float(0 if _doc["win_bid_price"]=="" else _doc["win_bid_price"])>0:
  93. set_dict_item(item,"中标金额",_doc["win_bid_price"])
  94. if "bidding_budget" in _doc and float(0 if _doc["bidding_budget"]=="" else _doc["bidding_budget"])>0:
  95. set_dict_item(item,"招标金额",_doc["bidding_budget"])
  96. if "招标金额" not in item:
  97. set_dict_item(item,"招标金额","")
  98. if "中标金额" not in item:
  99. set_dict_item(item,"中标金额","")
  100. if "中标单位" not in item:
  101. set_dict_item(item,"中标单位","")
  102. if "中标单位联系人" not in item:
  103. set_dict_item(item,"中标单位联系人","")
  104. if "中标单位联系电话" not in item:
  105. set_dict_item(item,"中标单位联系电话","")
  106. # if item["中标单位"] not in set_enter:
  107. # continue
  108. _line = "%s-%s-%s-%s-%s-%s"%(item["省份"],item["城市"],item["项目编号"],item["招标单位"],item["招标联系人"],str(item["招标金额"]))
  109. # if _line in set_line:
  110. # continue
  111. # if item["招标金额"]=="":
  112. # continue
  113. # set_line.add(_line)
  114. for k,v in item.items():
  115. if k not in df_data:
  116. df_data[k] = []
  117. df_data[k].append(v)
  118. # list_province = ["江西","湖南","四川","安徽"]
  119. list_province = ["全国"]
  120. for _province in list_province:
  121. df_data = {}
  122. str_p = '''
  123. 家具
  124. '''
  125. # str_p = '''
  126. # 教育信息化 教学设备 智慧校园 互联网教育
  127. # '''
  128. list_prov = re.split("\s|、",str_p)
  129. list_mu = []
  130. for _p in list_prov:
  131. if _p.strip()=="":
  132. continue
  133. print(_p)
  134. list_mu.append(MatchPhraseQuery('doctextcon', '%s'%_p.strip()))
  135. s_tenderee = '教育局、中学、小学'
  136. list_should_ten = []
  137. for _p in re.split("、",s_tenderee):
  138. if _p.split()=="":
  139. continue
  140. list_should_ten.append(WildcardQuery("tenderee","*%s*"%_p.strip()))
  141. # list_should_ten.append(MatchPhraseQuery('doctextcon', '%s'%_p.strip()))
  142. list_should_chan = []
  143. list_should_chan.append(TermQuery("docchannel",101))
  144. # list_should_chan.append(TermQuery("docchannel",101))
  145. # list_should_chan.append(TermQuery("docchannel",102))
  146. should_q1 = BoolQuery(should_queries=list_mu)
  147. should_q2 = BoolQuery(should_queries=list_should_ten)
  148. should_q3 = BoolQuery(should_queries=list_should_chan)
  149. bool_query = BoolQuery(must_queries=[
  150. generateBoolShouldQuery(["doctextcon"],["家具"],MatchPhraseQuery),
  151. generateBoolShouldQuery(["province"],["广东","安徽","江苏","浙江","四川","北京"],TermQuery),
  152. WildcardQuery("win_tenderer","*"),
  153. ])
  154. table_name = "project2"
  155. rows, next_token, total_count, is_all_succeed = ots_client.search(table_name, "%s_index"%table_name,
  156. SearchQuery(bool_query ,sort=Sort(sorters=[FieldSort("page_time",SortOrder.ASC)]), limit=100, get_total_count=True),
  157. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  158. print(total_count)
  159. set_line = set()
  160. _count = len(rows)
  161. getData(df_data,rows,set_line)
  162. while next_token:
  163. print("%d/%d"%(_count,total_count))
  164. rows, next_token, total_count, is_all_succeed = ots_client.search(table_name, "%s_index"%table_name,
  165. SearchQuery(bool_query ,next_token=next_token, limit=100, get_total_count=True),
  166. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  167. getData(df_data,rows,set_line)
  168. _count += len(rows)
  169. # if len(df_data[list(df_data.keys())[0]])>=300:
  170. # break
  171. set_enterprise = set()
  172. for _tenderee,_agency,_win_tenderer in zip(df_data["招标单位"],df_data["代理单位"],df_data["中标单位"]):
  173. set_enterprise.add(_tenderee)
  174. set_enterprise.add(_agency)
  175. set_enterprise.add(_win_tenderer)
  176. if "" in set_enterprise:
  177. set_enterprise.remove("")
  178. if None in set_enterprise:
  179. set_enterprise.remove(None)
  180. # dict_enterprise = getDictEnterprise(list(set_enterprise))
  181. # if len(set_enterprise)>0:
  182. # for _i in range(len(df_data["招标单位"])):
  183. # _enterprise_name = df_data["招标单位"][_i]
  184. # if df_data["招标联系人电话"][_i]=="":
  185. # contacts = dict_enterprise.get(_enterprise_name,{}).get("contacts")
  186. # if contacts is not None:
  187. # _person,_phone = getOneContact(contacts)
  188. # df_data["招标联系人"][_i] = _person
  189. # df_data["招标联系人电话"][_i] = _phone
  190. #
  191. # _enterprise_name = df_data["代理单位"][_i]
  192. # if df_data["代理联系人电话"][_i]=="":
  193. # contacts = dict_enterprise.get(_enterprise_name,{}).get("contacts")
  194. # if contacts is not None:
  195. # _person,_phone = getOneContact(contacts)
  196. # df_data["代理联系人"][_i] = _person
  197. # df_data["代理联系人电话"][_i] = _phone
  198. #
  199. # _enterprise_name = df_data["中标单位"][_i]
  200. # if df_data["中标单位联系电话"][_i]=="":
  201. # contacts = dict_enterprise.get(_enterprise_name,{}).get("contacts")
  202. # if contacts is not None:
  203. # _person,_phone = getOneContact(contacts)
  204. # df_data["中标单位联系人"][_i] = _person
  205. # df_data["中标单位联系电话"][_i] = _phone
  206. # print(df_data)
  207. df1 = pd.DataFrame(df_data)
  208. df1.to_excel("../data/%s_数据导出.xlsx"%(getCurrent_date('%Y-%m-%d_%H%M%S')),columns=list_df_columns)
  209. def exportProjectWithOneDocid():
  210. ots_client = getConnect_ots()
  211. list_data = []
  212. bool_query = BoolQuery(must_queries=[TermQuery("page_time","2021-05-28")])
  213. columns = ["docids","project_name"]
  214. rows,next_token,total_count,is_all_succeed = ots_client.search("project2","project2_index",
  215. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("page_time",SortOrder.ASC)]),get_total_count=True,limit=100),
  216. columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
  217. list_dict = getRow_ots(rows)
  218. for _dict in list_dict:
  219. if len(_dict["docids"].split(","))==1:
  220. list_data.append(_dict)
  221. _count = len(list_dict)
  222. while True:
  223. if not next_token:
  224. break
  225. rows,next_token,total_count,is_all_succeed = ots_client.search("project2","project2_index",
  226. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  227. columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
  228. list_dict = getRow_ots(rows)
  229. _count += len(list_dict)
  230. print("%d/%d"%(_count,total_count))
  231. for _dict in list_dict:
  232. if len(_dict["docids"].split(","))==1:
  233. list_data.append(_dict)
  234. _index = 0
  235. task_queue = queue.Queue()
  236. for _dict in list_data:
  237. task_queue.put(_dict)
  238. def _handle(_dict,result_queue):
  239. docid = _dict["docids"]
  240. project_name = _dict["project_name"]
  241. _dict["candidate"] = []
  242. _dict["total_count"] = 0
  243. if len(project_name)>0:
  244. doc_query = BoolQuery(must_queries=[MatchPhraseQuery("doctextcon",project_name)
  245. ,RangeQuery("status",201,300,True,True)],
  246. must_not_queries=[TermQuery("docid",docid)])
  247. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  248. SearchQuery(doc_query,sort=Sort(sorters=[FieldSort("page_time",SortOrder.DESC)]),limit=10,get_total_count=True),
  249. columns_to_get=ColumnsToGet(["doctitle"],ColumnReturnType.SPECIFIED))
  250. l_d = getRow_ots(rows)
  251. for _d in l_d:
  252. _dict["candidate"].append(_d["docid"])
  253. _dict["total_count"] = total_count
  254. mt = MultiThreadHandler(task_queue,_handle,None,30)
  255. mt.run()
  256. df_data = {}
  257. for _d in list_data:
  258. for k,v in _d.items():
  259. if k not in df_data:
  260. df_data[k] = []
  261. df_data[k].append(v)
  262. df = pd.DataFrame(df_data)
  263. df.to_excel("../data/%s_未合并.xlsx"%(getCurrent_date("%Y-%m-%d %H%M%S")))
  264. def getPayStaffName():
  265. conn = getConnection_mysql()
  266. cursor = conn.cursor()
  267. sql = " select company,userid,phone,contactname,aftermarket from bxkc.b2c_mall_staff_basic_info where MEMBERLEVELID is not null and MEMBERLEVELID <> 81"
  268. cursor.execute(sql)
  269. rows = cursor.fetchall()
  270. dict_staff = {}
  271. for row in rows:
  272. company,userid,phone,contactname,aftermarket = row
  273. if company is not None:
  274. dict_staff[company] = {"userid":userid,"phone":phone,"contactname":contactname,"aftermarket":aftermarket}
  275. return dict_staff
  276. def exportCompanyByCycleProduct():
  277. filename = "../data/周期项目识别.csv"
  278. df = pd.read_csv(filename,encoding='gbk')
  279. task_queue = queue.Queue()
  280. result_queue = queue.Queue()
  281. pool_conn = ConnectorPool(init_num=10,max_num=30,method_init=getConnection_mysql)
  282. _count = 0
  283. for tenderee,product,last_time,avg_period,min_period,max_period,json_docid in zip(df["tenderee"],df["product"],df["last_time"],df["avg_period"],df["min_period"],df["max_period"],df["json_docid"]):
  284. _dict = {"tenderee":tenderee,"product":product,"last_time":last_time,"avg_period":avg_period,"min_period":min_period,
  285. "max_period":max_period,"json_docid":json_docid}
  286. task_queue.put(_dict)
  287. _count += 1
  288. sstr_staff = getPayStaffName()
  289. ots_client = getConnect_ots()
  290. def _comsumer(_dict,result_queue,ots_client,sstr_staff,pool_conn):
  291. new_dict = {"招标人":_dict["tenderee"],"产品":_dict["product"],"上次招标":_dict["last_time"],
  292. "预计招标范围":"%s-%s"%(timeAdd(_dict["last_time"],_dict["min_period"]),timeAdd(_dict["last_time"],_dict["max_period"])),
  293. "周期":_dict["avg_period"],"历史招标":_dict["json_docid"]}
  294. aint_docid = json.loads(_dict["json_docid"])
  295. aobj_should_q_docid = []
  296. consumed, return_row, next_token = ots_client.get_row("enterprise",[("name",_dict["tenderee"])], ["contacts"], None, 1)
  297. dict_tmp = getRow_ots_primary(return_row)
  298. contacts = dict_tmp.get("contacts")
  299. phone_person,phone_no = getOneContact(contacts)
  300. new_dict["招标人联系人"] = phone_person
  301. new_dict["招标人联系电话"] = phone_no
  302. for int_docid in aint_docid:
  303. aobj_should_q_docid.append(TermQuery("docids",int_docid))
  304. bool_query = BoolQuery(should_queries=aobj_should_q_docid)
  305. columns = ['win_tenderer','second_tenderer','third_tenderer']
  306. rows,next_token,total_count,is_all_succeed = ots_client.search("project2","project2_index",
  307. SearchQuery(bool_query,limit=100,get_total_count=True),
  308. ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
  309. adict_rows = getRow_ots(rows)
  310. for dict_row in adict_rows:
  311. for _k,_company in dict_row.items():
  312. if _k in columns and _company is not None and _company!="":
  313. _succeed = True
  314. new_dict1 = {}
  315. for k,v in new_dict.items():
  316. new_dict1[k] = v
  317. new_dict1["潜在客户"] = _company
  318. consumed, return_row, next_token = ots_client.get_row("enterprise",[("name",_company)], ["contacts"], None, 1)
  319. dict_tmp = getRow_ots_primary(return_row)
  320. contacts = dict_tmp.get("contacts")
  321. phone_person,phone_no = getOneContact(contacts)
  322. new_dict1["潜在客户联系人"] = phone_person
  323. new_dict1["潜在客户联系电话"] = phone_no
  324. if _company in sstr_staff:
  325. company_info = sstr_staff[_company]
  326. new_dict1["付费客户"] = "是"
  327. conn = pool_conn.getConnector()
  328. try:
  329. cursor = conn.cursor()
  330. sql = " select name from bxkc.b2c_mall_staff_basic_info where userid='%s'"%(company_info.get("aftermarket",""))
  331. cursor.execute(sql)
  332. rows = cursor.fetchall()
  333. if len(rows)>0:
  334. new_dict1["归属客服"] = rows[0][0]
  335. else:
  336. new_dict1["归属客服"] = ""
  337. new_dict1["付费客户联系人"] = company_info.get("contactname","")
  338. new_dict1["付费客户电话"] = company_info.get("phone","")
  339. sql = " select date_FORMAT(etiem,\'%Y-%m-%d\') from bxkc.bxkc_member_term where userid='"+company_info.get("userid","")+"' and memberlevelid<>81 order by etiem desc limit 1"
  340. cursor.execute(sql)
  341. rows = cursor.fetchall()
  342. if len(rows)>0:
  343. etime = rows[0][0]
  344. new_dict1["付费客户到期日"] = etime
  345. if time.mktime(time.strptime(etime,"%Y-%m-%d"))>time.mktime(time.localtime()):
  346. new_dict1["付费客户到期"] = "否"
  347. else:
  348. new_dict1["付费客户到期"] = "是"
  349. else:
  350. new_dict1["付费客户到期日"] = ""
  351. new_dict1["付费客户到期"] = ""
  352. except Exception as e:
  353. traceback.print_exc()
  354. _succeed = False
  355. finally:
  356. pool_conn.putConnector(conn)
  357. else:
  358. new_dict1["付费客户"] = "否"
  359. new_dict1["归属客服"] = ""
  360. new_dict1["付费客户联系人"] = ""
  361. new_dict1["付费客户电话"] = ""
  362. new_dict1["付费客户到期日"] = ""
  363. new_dict1["付费客户到期"] = ""
  364. if _succeed:
  365. result_queue.put(new_dict1)
  366. mt = MultiThreadHandler(task_queue,_comsumer,result_queue,ots_client=ots_client,sstr_staff=sstr_staff,pool_conn=pool_conn,thread_count=30)
  367. mt.run()
  368. df_data = {}
  369. set_staff = set()
  370. while True:
  371. try:
  372. _dict = result_queue.get(timeout=1)
  373. tenderee = _dict.get("招标人","")
  374. product = _dict.get("产品","")
  375. staff = _dict.get("潜在客户","")
  376. _s = "%s-%s-%s"%(tenderee,product,staff)
  377. if _s in set_staff:
  378. continue
  379. set_staff.add(_s)
  380. for k,v in _dict.items():
  381. if k not in df_data:
  382. df_data[k] = []
  383. df_data[k].append(v)
  384. except Exception as e:
  385. break
  386. df1 = pd.DataFrame(df_data)
  387. df1.to_excel("../data/%s_周期项目.xlsx"%(getCurrent_date("%Y-%m-%d_%H%M%S")))
  388. def appendCellphones():
  389. file = "../data/"
  390. if __name__=="__main__":
  391. exportProject_by_pagetime()
  392. # exportProjectWithOneDocid()
  393. # exportCompanyByCycleProduct()