extract_check.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491
  1. #coding=utf-8
  2. # evaluate为该方法的入口函数,必须用这个名字
  3. from odps.udf import annotate
  4. from odps.distcache import get_cache_archive
  5. from odps.distcache import get_cache_file
  6. from odps.udf import BaseUDTF
  7. from odps.udf import BaseUDAF
  8. # 配置pandas依赖包
  9. def include_package_path(res_name):
  10. import os, sys
  11. archive_files = get_cache_archive(res_name)
  12. dir_names = sorted([os.path.dirname(os.path.normpath(f.name)) for f in archive_files
  13. if '.dist_info' not in f.name], key=lambda v: len(v))
  14. sys.path.append(dir_names[0])
  15. return os.path.dirname(dir_names[0])
  16. # 可能出现类似RuntimeError: xxx has been blocked by sandbox
  17. # 这是因为包含C的库,会被沙盘block,可设置set odps.isolation.session.enable = true
  18. def include_file(file_name):
  19. import os, sys
  20. so_file = get_cache_file(file_name)
  21. sys.path.append(os.path.dirname(os.path.abspath(so_file.name)))
  22. def include_so(file_name):
  23. import os, sys
  24. so_file = get_cache_file(file_name)
  25. with open(so_file.name, 'rb') as fp:
  26. content=fp.read()
  27. so = open(file_name, "wb")
  28. so.write(content)
  29. so.flush()
  30. so.close()
  31. #初始化业务数据包,由于上传限制,python版本以及archive解压包不统一等各种问题,需要手动导入
  32. def init_env(list_files,package_name):
  33. import os,sys
  34. if len(list_files)==1:
  35. so_file = get_cache_file(list_files[0])
  36. cmd_line = os.path.abspath(so_file.name)
  37. os.system("unzip -o %s -d %s"%(cmd_line,package_name))
  38. elif len(list_files)>1:
  39. cmd_line = "cat"
  40. for _file in list_files:
  41. so_file = get_cache_file(_file)
  42. cmd_line += " "+os.path.abspath(so_file.name)
  43. cmd_line += " > temp.zip"
  44. os.system(cmd_line)
  45. os.system("unzip -o temp.zip -d %s"%(package_name))
  46. # os.system("rm -rf %s/*.dist-info"%(package_name))
  47. # return os.listdir(os.path.abspath("local_package"))
  48. # os.system("echo export LD_LIBRARY_PATH=%s >> ~/.bashrc"%(os.path.abspath("local_package")))
  49. # os.system("source ~/.bashrc")
  50. sys.path.insert(0,os.path.abspath(package_name))
  51. # sys.path.append(os.path.join(os.path.abspath("local_package"),"interface_real"))
  52. import json
  53. class MyEncoder(json.JSONEncoder):
  54. def default(self, obj):
  55. if isinstance(obj, np.ndarray):
  56. return obj.tolist()
  57. elif isinstance(obj, bytes):
  58. return str(obj, encoding='utf-8')
  59. elif isinstance(obj, (np.float_, np.float16, np.float32,
  60. np.float64)):
  61. return float(obj)
  62. elif isinstance(obj,(np.int64)):
  63. return int(obj)
  64. return json.JSONEncoder.default(self, obj)
  65. @annotate("string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string->string")
  66. class f_json_extract_online(BaseUDTF):
  67. def __init__(self):
  68. import uuid
  69. global uuid
  70. import logging
  71. import datetime
  72. import numpy as np
  73. global json,MyEncoder,time,log,MyEncoder,np
  74. def process(self,page_time,doctitle,
  75. tenderee,tenderee_contact,tenderee_phone,agency,
  76. agency_contact,agency_phone,sub_docs_json,project_code,
  77. project_name,product,time_bidclose,time_bidopen,time_release,
  78. moneysource,person_review,bidway,punish,serviceTime):
  79. _dict = {}
  80. _dict["code"] = project_code if project_code is not None else ""
  81. _dict["name"] = project_name if project_name is not None else ""
  82. if product is not None and product!="":
  83. _dict["product"] = product.split(",")
  84. else:
  85. _dict["product"] = []
  86. _dict["time_bidclose"] = time_bidclose if time_bidclose is not None else ""
  87. _dict["time_bidopen"] = time_bidopen if time_bidopen is not None else ""
  88. _dict["time_release"] = time_release if time_release is not None else ""
  89. _dict["moneysource"] = moneysource if moneysource is not None else ""
  90. if person_review not in (None,''):
  91. _dict["person_review"] = person_review.split(",")
  92. else:
  93. _dict["person_review"] = []
  94. _dict["bidway"] = bidway if bidway is not None else ""
  95. _dict["serviceTime"] = serviceTime if serviceTime is not None else ""
  96. if punish not in (None,''):
  97. _punish = json.loads(punish)
  98. else:
  99. _punish = {}
  100. for k,v in _punish.items():
  101. _dict[k] = v
  102. if sub_docs_json not in (None,''):
  103. _docs = json.loads(sub_docs_json)
  104. else:
  105. _docs = [{}]
  106. set_comp_contact = set()
  107. if tenderee not in (None,"") and tenderee_contact not in (None,""):
  108. set_comp_contact.add("%s-%s-%s-%s"%("tenderee",tenderee,tenderee_contact,tenderee_phone))
  109. if agency not in (None,"") and agency_contact not in (None,""):
  110. set_comp_contact.add("%s-%s-%s-%s"%("agency",agency,agency_contact,agency_phone))
  111. set_pack_comp = set()
  112. if tenderee not in (None,""):
  113. set_pack_comp.add("%s-%s-%s"%("Project","tenderee",tenderee))
  114. if agency not in (None,""):
  115. set_pack_comp.add("%s-%s-%s"%("Project","agency",agency))
  116. set_pack_money = set()
  117. for _d in _docs:
  118. if len(_d.keys())>0:
  119. sub_project_name = _d.get("sub_project_name","Project")
  120. bidding_budget = float(_d.get("bidding_budget",0))
  121. win_tenderer = _d.get("win_tenderer","")
  122. win_bid_price = float(_d.get("win_bid_price",0))
  123. win_tenderer_manager = _d.get("win_tenderer_manager","")
  124. win_tenderer_phone = _d.get("win_tenderer_phone","")
  125. second_tenderer = _d.get("second_tenderer","")
  126. second_bid_price = float(_d.get("second_bid_price",0))
  127. second_tenderer_manager = _d.get("second_tenderer_manager","")
  128. second_tenderer_phone = _d.get("second_tenderer_phone","")
  129. third_tenderer = _d.get("third_tenderer","")
  130. third_bid_price = float(_d.get("third_bid_price",0))
  131. third_tenderer_manager = _d.get("third_tenderer_manager","")
  132. third_tenderer_phone = _d.get("third_tenderer_phone","")
  133. if win_tenderer not in (None,"") and win_tenderer_manager not in (None,""):
  134. set_comp_contact.add("%s-%s-%s-%s"%("win_tenderee",win_tenderer,win_tenderer_manager,win_tenderer_phone))
  135. if second_tenderer not in (None,"") and second_tenderer_manager not in (None,""):
  136. set_comp_contact.add("%s-%s-%s-%s"%("second_tenderer",second_tenderer,second_tenderer_manager,second_tenderer_phone))
  137. if third_tenderer not in (None,"") and third_tenderer_manager not in (None,""):
  138. set_comp_contact.add("%s-%s-%s-%s"%("third_tenderer",third_tenderer,third_tenderer_manager,third_tenderer_phone))
  139. if win_tenderer not in (None,""):
  140. set_pack_comp.add("%s-%s-%s"%(sub_project_name,"win_tenderer",win_tenderer))
  141. if second_tenderer not in (None,""):
  142. set_pack_comp.add("%s-%s-%s"%(sub_project_name,"second_tenderer",second_tenderer))
  143. if third_tenderer not in (None,""):
  144. set_pack_comp.add("%s-%s-%s"%(sub_project_name,"third_tenderer",third_tenderer))
  145. if bidding_budget>0:
  146. set_pack_money.add("%s-%s-%2f"%(sub_project_name,"bidding_budget",bidding_budget))
  147. if win_bid_price>0:
  148. set_pack_money.add("%s-%s-%2f"%(sub_project_name,"win_tenderer",win_bid_price))
  149. if second_bid_price>0:
  150. set_pack_money.add("%s-%s-%2f"%(sub_project_name,"second_tenderer",second_bid_price))
  151. if third_bid_price>0:
  152. set_pack_money.add("%s-%s-%2f"%(sub_project_name,"third_tenderer",third_bid_price))
  153. _dict["set_comp_contact"] = list(set_comp_contact)
  154. _dict["set_pack_comp"] = list(set_pack_comp)
  155. _dict["set_pack_money"] = list(set_pack_money)
  156. self.forward(json.dumps(_dict,cls=MyEncoder,sort_keys=True,indent=4,ensure_ascii=False))
  157. @annotate("string,string->string")
  158. class f_compair_extract(object):
  159. def __init__(self):
  160. import logging
  161. import re
  162. import json
  163. global logging,re,json
  164. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  165. def evaluate(self, json_online,json_result):
  166. dict_online = json.loads(json_online)
  167. dict_result = json.loads(json_result)
  168. logging.info(json_online)
  169. dict_test = {}
  170. set_comp_contact = set()
  171. set_pack_comp = set()
  172. set_pack_money = set()
  173. logging.info("1")
  174. for k,v in dict_result.items():
  175. if k in ["bidway","moneysource","time_bidclose","serviceTime","time_bidopen","time_release","name"]:
  176. dict_test[k] = v
  177. elif k in ["code"]:
  178. if len(v)>0:
  179. dict_test["code"] = v[0]
  180. else:
  181. dict_test["code"] = ""
  182. elif k in ["person_review","product"]:
  183. list_temp = v
  184. list_temp.sort(key=lambda x:x)
  185. dict_test[k] = list_temp
  186. elif k in ["punish"]:
  187. for k1,v1 in v.items():
  188. dict_test[k1] = v1
  189. elif k in ["prem"]:
  190. for _pack,_prem in v.items():
  191. bidding_budget = float(_prem.get("tendereeMoney",0))
  192. role_lists = _prem.get("roleList",[])
  193. if bidding_budget>0:
  194. set_pack_money.add("%s-%s-%2f"%(_pack,"bidding_budget",bidding_budget))
  195. for _role in role_lists:
  196. role_type = _role[0]
  197. role_name = _role[1]
  198. role_money = 0 if _role[2]=="" else float(_role[2])
  199. contact_list = _role[3]
  200. for _person,_phone in contact_list:
  201. set_comp_contact.add("%s-%s-%s-%s"%(role_type,role_name,_person,_phone))
  202. set_pack_comp.add("%s-%s-%s"%(_pack,role_type,role_name))
  203. if role_money >0:
  204. set_pack_money.add("%s-%s-%2f"%(_pack,role_type,role_money))
  205. dict_test["set_comp_contact"] = list(set_comp_contact)
  206. dict_test["set_pack_comp"] = list(set_pack_comp)
  207. dict_test["set_pack_money"] = list(set_pack_money)
  208. logging.info(dict_test)
  209. logging.info("2")
  210. dict_compair = {}
  211. set_keys_online = set(dict_online.keys())
  212. set_keys_test = set(dict_test.keys())
  213. union_keys = list(set_keys_online|set_keys_test)
  214. logging.info(str(union_keys))
  215. for _key in union_keys:
  216. logging.info(_key)
  217. v_online = dict_online.get(_key,"")
  218. v_test = dict_test.get(_key,"")
  219. logging.info(v_online)
  220. logging.info(v_test)
  221. if isinstance(v_online,list) or isinstance(v_test,list):
  222. logging.info("3")
  223. if v_online=="":
  224. v_online = []
  225. if v_test=="":
  226. v_test = []
  227. v_online.sort(key=lambda x:x)
  228. v_test.sort(key=lambda x:x)
  229. s_online = set(v_online)
  230. s_test = set(v_test)
  231. diff_count = len(s_online-s_test)+len(s_test-s_online)
  232. dict_compair[_key+"_diff"] = diff_count
  233. dict_compair[_key+"_online"] = v_online
  234. dict_compair[_key+"_test"] = v_test
  235. elif isinstance(v_online,str):
  236. logging.info("4")
  237. if v_online==v_test:
  238. diff_count = 0
  239. else:
  240. diff_count = 1
  241. dict_compair[_key+"_diff"] = diff_count
  242. dict_compair[_key+"_online"] = v_online
  243. dict_compair[_key+"_test"] = v_test
  244. return json.dumps(dict_compair,sort_keys=True,indent=4,ensure_ascii=False)
  245. import hashlib
  246. def getMD5(sourceHtml):
  247. if sourceHtml is not None and len(sourceHtml)>0:
  248. if isinstance(sourceHtml,str):
  249. bs = sourceHtml.encode()
  250. elif isinstance(sourceHtml,bytes):
  251. bs = sourceHtml
  252. else:
  253. return ""
  254. md5 = hashlib.md5()
  255. md5.update(bs)
  256. return md5.hexdigest()
  257. return ""
  258. def getFingerprint(sourceHtml):
  259. md5 = getMD5(sourceHtml)
  260. if md5!="":
  261. _fingerprint = "md5=%s"%(md5)
  262. else:
  263. _fingerprint = ""
  264. return _fingerprint
  265. @annotate("string,string->string")
  266. class f_getFingerprint(object):
  267. def __init__(self):
  268. import logging
  269. import re
  270. import json
  271. global logging,re,json
  272. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  273. def evaluate(self, doctitle,dochtmlcon):
  274. fingerprint = getFingerprint(doctitle+dochtmlcon)
  275. return fingerprint
  276. @annotate('bigint,string,string,string,string,string,string,string,string->string')
  277. class f_check_dumplicate(BaseUDAF):
  278. '''
  279. 去重合并后重新判断,组内个数大于5时,dottitle、tenderee、win_tenderer、bidding_budget组内只能有一个取值
  280. 组内个数小于等于5时,tenderee、win_tenderer、bidding_budget组内只能有一个取值
  281. '''
  282. def __init__(self):
  283. import logging
  284. import json,re
  285. global json,logging,re
  286. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  287. def new_buffer(self):
  288. return [list()]
  289. def iterate(self, buffer,docid,doctitle,project_code,project_name,tenderee,agency,win_tenderer,bidding_budget,win_bid_price):
  290. buffer[0].append({"docid":docid,"doctitle":doctitle,"project_code":project_code,"project_name":project_name,
  291. "tenderee":tenderee,"agency":agency,"win_tenderer":win_tenderer,"bidding_budget":bidding_budget,"win_bid_price":win_bid_price})
  292. def merge(self, buffer, pbuffer):
  293. buffer[0].extend(pbuffer[0])
  294. def terminate(self, buffer):
  295. list_group = []
  296. list_group.append(buffer[0])
  297. return json.dumps(list_group,ensure_ascii=False)
  298. @annotate('string -> bigint,bigint,string,string,string,string,string,string,string,string')
  299. class f_check_dumplicate_group(BaseUDTF):
  300. '''
  301. 从最后的结果中获取组
  302. '''
  303. def __init__(self):
  304. import logging
  305. import json
  306. global json,logging
  307. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  308. def process(self,list_group):
  309. if list_group is not None:
  310. final_group = json.loads(list_group)
  311. logging.info(list_group)
  312. for _groups in final_group:
  313. for _group in _groups:
  314. self.forward(_groups[0]["docid"],_group["docid"],_group["doctitle"],_group["project_code"],_group["project_name"],_group["tenderee"],_group["agency"],_group["win_tenderer"],_group["bidding_budget"],_group["win_bid_price"])
  315. @annotate('string->bigint')
  316. class f_is_contain(BaseUDAF):
  317. '''
  318. 去重合并后重新判断,组内个数大于5时,dottitle、tenderee、win_tenderer、bidding_budget组内只能有一个取值
  319. 组内个数小于等于5时,tenderee、win_tenderer、bidding_budget组内只能有一个取值
  320. '''
  321. def __init__(self):
  322. import logging
  323. import json,re
  324. global json,logging,re
  325. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  326. def new_buffer(self):
  327. return [list()]
  328. def iterate(self, buffer,doctitle):
  329. buffer[0].append(doctitle)
  330. def merge(self, buffer, pbuffer):
  331. buffer[0].extend(pbuffer[0])
  332. def terminate(self, buffer):
  333. is_contain = 1
  334. list_doctitle = buffer[0]
  335. main_doctitle = ""
  336. for _doctitle in list_doctitle:
  337. if _doctitle in main_doctitle or main_doctitle in _doctitle:
  338. if len(_doctitle)>len(main_doctitle):
  339. main_doctitle = _doctitle
  340. else:
  341. is_contain = 0
  342. break
  343. return is_contain
  344. def getSet(list_dict,key):
  345. _set = set()
  346. for item in list_dict:
  347. if key in item:
  348. if item[key]!='' and item[key] is not None:
  349. if re.search("^[\d\.]+$",item[key]) is not None:
  350. _set.add(str(float(item[key])))
  351. else:
  352. _set.add(str(item[key]))
  353. return _set
  354. def split_with_time(list_dict,sort_key,timedelta=86400*2):
  355. if len(list_dict)>0:
  356. if sort_key in list_dict[0]:
  357. list_dict.sort(key=lambda x:x[sort_key])
  358. list_group = []
  359. _begin = 0
  360. for i in range(len(list_dict)-1):
  361. if abs(list_dict[i][sort_key]-list_dict[i+1][sort_key])<timedelta:
  362. continue
  363. else:
  364. _group = []
  365. for j in range(_begin,i+1):
  366. _group.append(list_dict[j])
  367. if len(_group)>1:
  368. list_group.append(_group)
  369. _begin = i + 1
  370. if len(list_dict)>1:
  371. _group = []
  372. for j in range(_begin,len(list_dict)):
  373. _group.append(list_dict[j])
  374. if len(_group)>1:
  375. list_group.append(_group)
  376. return list_group
  377. return [list_dict]
  378. @annotate('bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string->string')
  379. class f_check_dumplicate_1(BaseUDAF):
  380. '''
  381. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同
  382. '''
  383. def __init__(self):
  384. import logging
  385. import json,re
  386. global json,logging,re
  387. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  388. def new_buffer(self):
  389. return [list()]
  390. def iterate(self, buffer,docid,page_time_stamp,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column,doctitle,project_code,project_name,tenderee,agency,win_tenderer,bidding_budget,win_bid_price):
  391. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"set_limit_column1":set_limit_column1,
  392. "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,
  393. "contain_column":contain_column,"doctitle":doctitle,"project_code":project_code,"project_name":project_name,
  394. "tenderee":tenderee,"agency":agency,"win_tenderer":win_tenderer,"bidding_budget":bidding_budget,"win_bid_price":win_bid_price})
  395. def merge(self, buffer, pbuffer):
  396. buffer[0].extend(pbuffer[0])
  397. def terminate(self, buffer):
  398. list_split = split_with_time(buffer[0],"page_time_stamp")
  399. list_group = []
  400. for _split in list_split:
  401. flag = True
  402. keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"]
  403. for _key in keys:
  404. logging.info(_key+str(getSet(_split,_key)))
  405. if len(getSet(_split,_key))>1:
  406. flag = False
  407. break
  408. MAX_CONTAIN_COLUMN = None
  409. #判断组内每条公告是否包含
  410. if flag:
  411. for _d in _split:
  412. contain_column = _d["contain_column"]
  413. if contain_column is not None and contain_column !="":
  414. if MAX_CONTAIN_COLUMN is None:
  415. MAX_CONTAIN_COLUMN = contain_column
  416. else:
  417. if len(MAX_CONTAIN_COLUMN)<len(contain_column):
  418. if contain_column.find(MAX_CONTAIN_COLUMN)==-1:
  419. flag = False
  420. break
  421. MAX_CONTAIN_COLUMN = contain_column
  422. else:
  423. if MAX_CONTAIN_COLUMN.find(contain_column)==-1:
  424. flag = False
  425. break
  426. if flag:
  427. if len(_split)>1:
  428. list_group.append(_split)
  429. return json.dumps(list_group)