documentDumplicate.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824
  1. #coding:UTF8
  2. from odps.udf import annotate
  3. from odps.udf import BaseUDTF
  4. from odps.udf import BaseUDAF
  5. @annotate('string,string -> string,bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,bigint')
  6. class f_decode_extract(BaseUDTF):
  7. def __init__(self):
  8. import logging
  9. import json
  10. import time,re
  11. global json,logging,time,re
  12. self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*"
  13. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  14. def process(self, extractjson,otherjson):
  15. if extractjson is not None:
  16. _extract = json.loads(extractjson)
  17. else:
  18. _extract = {}
  19. if otherjson is not None:
  20. _other = json.loads(otherjson)
  21. else:
  22. _other = {}
  23. project_code = ""
  24. project_name = ""
  25. tenderee = ""
  26. agency = ""
  27. win_tenderer = ""
  28. bidding_budget = ""
  29. win_bid_price = ""
  30. fingerprint = ""
  31. page_time_stamp = 0
  32. docchannel = 0
  33. extract_count = 0
  34. page_time = _other.get("pageTime",time.strftime('%Y-%m-%d',time.localtime()))
  35. doctitle = _other.get("doctitle","")
  36. doctitle_refine = re.sub(r'工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|合同', '', doctitle)
  37. area = _other.get("area","")
  38. province = _other.get("province","")
  39. city = _other.get("city","")
  40. district = _other.get("district","")
  41. web_source_no = _other.get("webSourceNo","")
  42. docchannel = _other.get("docchannel",0)
  43. if re.search(self.time_pattern,page_time) is not None:
  44. timeArray = time.strptime(page_time[:11], "%Y-%m-%d")
  45. page_time_stamp = int(time.mktime(timeArray))
  46. list_code = _extract.get("code",[])
  47. if len(list_code)>0:
  48. project_code = list_code[0]
  49. project_name = _extract.get("name","")
  50. fingerprint = _extract.get("fingerprint","")
  51. dict_pack = _extract.get("prem",{})
  52. logging.info(dict_pack)
  53. for _key in dict_pack.keys():
  54. if dict_pack[_key]["tendereeMoney"]!='' and float(dict_pack[_key]["tendereeMoney"])>0:
  55. extract_count += 1
  56. if bidding_budget=="":
  57. bidding_budget = str(float(dict_pack[_key]["tendereeMoney"]))
  58. for _role in dict_pack[_key]["roleList"]:
  59. extract_count += 1
  60. if _role[2]!='' and float(_role[2])>0:
  61. extract_count += 1
  62. if _role[0]=="tenderee":
  63. tenderee = _role[1]
  64. if _role[0]=="win_tenderer":
  65. if win_tenderer=="":
  66. win_tenderer = _role[1]
  67. if _role[2]!='' and float(_role[2])>0:
  68. if win_bid_price=="":
  69. win_bid_price = str(float(_role[2]))
  70. if _role[0]=="agency":
  71. agency = _role[1]
  72. if project_code!="":
  73. extract_count += 1
  74. if project_name!="":
  75. extract_count += 1
  76. logging.info(page_time+doctitle+doctitle_refine+area+province+city+
  77. district+web_source_no+project_code+project_name+tenderee+agency+win_tenderer+bidding_budget+win_bid_price)
  78. self.forward(page_time,page_time_stamp,docchannel,doctitle,doctitle_refine,area,province,city,
  79. district,web_source_no,fingerprint,project_code,project_name,tenderee,agency,win_tenderer,bidding_budget,win_bid_price,extract_count)
  80. @annotate('string,string,string,string,string -> string,string,string,bigint')
  81. class f_decode_sub_docs_json(BaseUDTF):
  82. def __init__(self):
  83. import logging
  84. import json
  85. global json,logging
  86. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  87. def process(self, project_code,project_name,tenderee,agency,sub_docs_json):
  88. columns = {"win_tenderer":"","bidding_budget":"","win_bid_price":""}
  89. extract_count = 0
  90. if project_code!="":
  91. extract_count += 1
  92. if project_name!="":
  93. extract_count += 1
  94. if tenderee!="":
  95. extract_count += 1
  96. if agency!="":
  97. extract_count += 1
  98. if sub_docs_json is not None:
  99. for sub_docs in json.loads(sub_docs_json):
  100. for _key_sub_docs in sub_docs.keys():
  101. extract_count += 1
  102. if _key_sub_docs in columns:
  103. if columns[_key_sub_docs]=="" and str(sub_docs[_key_sub_docs]) not in ["","0"]:
  104. if _key_sub_docs in ["bidding_budget","win_bid_price"]:
  105. if float(sub_docs[_key_sub_docs])>0:
  106. columns[_key_sub_docs] = str(float(sub_docs[_key_sub_docs]))
  107. else:
  108. columns[_key_sub_docs] = str(sub_docs[_key_sub_docs])
  109. self.forward(columns["win_tenderer"],columns["bidding_budget"],columns["win_bid_price"],extract_count)
  110. @annotate("string->bigint")
  111. class totimestamp(object):
  112. def __init__(self):
  113. import time
  114. global time
  115. import logging
  116. import json
  117. import re
  118. global json,logging,re
  119. self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*"
  120. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  121. def evaluate(self, str_time):
  122. try:
  123. logging.info(str_time)
  124. if str_time is not None and re.search(self.time_pattern,str_time) is not None:
  125. timeArray = time.strptime(str_time[:10], "%Y-%m-%d")
  126. timeStamp = int(time.mktime(timeArray))
  127. return timeStamp
  128. else:
  129. return 0
  130. except Exception as e:
  131. return 0
  132. @annotate("string->string")
  133. class refind_name(object):
  134. def __init__(self):
  135. import logging
  136. import re
  137. global logging,re
  138. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  139. def evaluate(self, title):
  140. if title is not None:
  141. return re.sub(r'工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价', '', title)
  142. return ""
  143. @annotate('bigint,bigint,bigint,string,bigint,string->string')
  144. class f_set_docid(BaseUDAF):
  145. '''
  146. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  147. '''
  148. def __init__(self):
  149. import json
  150. global json
  151. def new_buffer(self):
  152. return [[]]
  153. def iterate(self, buffer,docid, page_time_stamp,extract_count,defind_column,defind_count,tenderee):
  154. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count,
  155. "defind_column":defind_column,"defind_count":defind_count,"tenderee":tenderee})
  156. def merge(self, buffer, pbuffer):
  157. buffer[0].extend(pbuffer[0])
  158. def terminate(self, buffer):
  159. list_docs = buffer[0]
  160. list_docs.sort(key=lambda x:x["page_time_stamp"])
  161. list_group = []
  162. _begin = 0
  163. defind_count = 0
  164. if len(list_docs)>0:
  165. defind_count = list_docs[0]["defind_count"]
  166. for i in range(len(list_docs)-1):
  167. if abs(list_docs[i]["page_time_stamp"]-list_docs[i+1]["page_time_stamp"])<=86400*2:
  168. continue
  169. else:
  170. _group = []
  171. _set_column = set()
  172. _set_tenderee = set()
  173. for j in range(_begin,i+1):
  174. if list_docs[j]["tenderee"] is not None and list_docs[j]["tenderee"]!="":
  175. _set_tenderee.add(list_docs[j]["tenderee"])
  176. _set_column.add(list_docs[j]["defind_column"])
  177. _group.append({"docid":list_docs[j]["docid"],"extract_count":list_docs[j]["extract_count"]})
  178. if len(_group)>=3 and len(_set_tenderee)>1:
  179. pass
  180. else:
  181. if len(_group)>1:
  182. if defind_count==2:
  183. if len(_set_column)>=2:
  184. list_group.append(_group)
  185. elif defind_count==1:
  186. if len(_set_column)==1:
  187. list_group.append(_group)
  188. elif defind_count==0:
  189. list_group.append(_group)
  190. _begin = i+1
  191. if len(list_docs)>1:
  192. _set_column = set()
  193. _set_tenderee = set()
  194. _group = []
  195. for j in range(_begin,len(list_docs)):
  196. if list_docs[j]["tenderee"] is not None and list_docs[j]["tenderee"]!="":
  197. _set_tenderee.add(list_docs[j]["tenderee"])
  198. _set_column.add(list_docs[j]["defind_column"])
  199. _group.append({"docid":list_docs[j]["docid"],"extract_count":list_docs[j]["extract_count"]})
  200. if len(_group)>=3 and len(_set_tenderee)>1:
  201. pass
  202. else:
  203. if len(_group)>1:
  204. if defind_count==2:
  205. if len(_set_column)>=2:
  206. list_group.append(_group)
  207. elif defind_count==1:
  208. if len(_set_column)==1:
  209. list_group.append(_group)
  210. elif defind_count==0:
  211. list_group.append(_group)
  212. return json.dumps(list_group)
  213. def split_with_time(list_dict,sort_key,timedelta=86400*2):
  214. if len(list_dict)>0:
  215. if sort_key in list_dict[0]:
  216. list_dict.sort(key=lambda x:x[sort_key])
  217. list_group = []
  218. _begin = 0
  219. for i in range(len(list_dict)-1):
  220. if abs(list_dict[i][sort_key]-list_dict[i+1][sort_key])<timedelta:
  221. continue
  222. else:
  223. _group = []
  224. for j in range(_begin,i+1):
  225. _group.append(list_dict[j])
  226. if len(_group)>1:
  227. list_group.append(_group)
  228. _begin = i + 1
  229. if len(list_dict)>1:
  230. _group = []
  231. for j in range(_begin,len(list_dict)):
  232. _group.append(list_dict[j])
  233. if len(_group)>1:
  234. list_group.append(_group)
  235. return list_group
  236. return [list_dict]
  237. @annotate('bigint,bigint,bigint,string,string,string,string,string->string')
  238. class f_set_docid_limitNum_contain(BaseUDAF):
  239. '''
  240. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同
  241. '''
  242. def __init__(self):
  243. import logging
  244. import json,re
  245. global json,logging,re
  246. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  247. def new_buffer(self):
  248. return [list()]
  249. def iterate(self, buffer,docid,page_time_stamp,extract_count,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column):
  250. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count,"set_limit_column1":set_limit_column1,
  251. "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,
  252. "contain_column":contain_column})
  253. def merge(self, buffer, pbuffer):
  254. buffer[0].extend(pbuffer[0])
  255. def terminate(self, buffer):
  256. list_split = split_with_time(buffer[0],"page_time_stamp")
  257. list_group = []
  258. for _split in list_split:
  259. flag = True
  260. keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"]
  261. for _key in keys:
  262. logging.info(_key+str(getSet(_split,_key)))
  263. if len(getSet(_split,_key))>1:
  264. flag = False
  265. break
  266. MAX_CONTAIN_COLUMN = None
  267. #判断组内每条公告是否包含
  268. if flag:
  269. for _d in _split:
  270. contain_column = _d["contain_column"]
  271. if contain_column is not None and contain_column !="":
  272. if MAX_CONTAIN_COLUMN is None:
  273. MAX_CONTAIN_COLUMN = contain_column
  274. else:
  275. if len(MAX_CONTAIN_COLUMN)<len(contain_column):
  276. if contain_column.find(MAX_CONTAIN_COLUMN)==-1:
  277. flag = False
  278. break
  279. MAX_CONTAIN_COLUMN = contain_column
  280. else:
  281. if MAX_CONTAIN_COLUMN.find(contain_column)==-1:
  282. flag = False
  283. break
  284. if flag:
  285. if len(_split)>1:
  286. _group = []
  287. for _item in _split:
  288. _group.append({"docid":_item["docid"],"extract_count":_item["extract_count"]})
  289. list_group.append(_group)
  290. return json.dumps(list_group)
  291. @annotate('bigint->string')
  292. class f_stamp_squence(BaseUDAF):
  293. '''
  294. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  295. '''
  296. def __init__(self):
  297. import json
  298. global json
  299. import logging
  300. global logging
  301. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  302. def new_buffer(self):
  303. return [set()]
  304. def iterate(self, buffer,page_time_stamp):
  305. buffer[0].add(page_time_stamp)
  306. def merge(self, buffer, pbuffer):
  307. buffer[0] |= pbuffer[0]
  308. def terminate(self, buffer):
  309. if 0 in buffer[0]:
  310. buffer[0].remove(0)
  311. list_stamp = list(buffer[0])
  312. list_stamp.sort(key=lambda x:x)
  313. list_stamp_final = []
  314. _begin = 0
  315. _time_decase = 86400*2
  316. logging.info(str(list_stamp))
  317. for _index in range(len(list_stamp)-1):
  318. if list_stamp[_index+1]-list_stamp[_index]<_time_decase:
  319. continue
  320. else:
  321. list_stamp_final.append([list_stamp[_begin]-_time_decase,list_stamp[_index]+_time_decase])
  322. _begin = _index+1
  323. if len(list_stamp)>0:
  324. list_stamp_final.append([list_stamp[_begin]-_time_decase,list_stamp[-1]+_time_decase])
  325. return json.dumps(list_stamp_final)
  326. @annotate("bigint,string->bigint")
  327. class in_stamp(object):
  328. def __init__(self):
  329. import logging
  330. import re
  331. import json
  332. global logging,re,json
  333. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  334. def evaluate(self, page_time_stamp,json_stamp):
  335. list_stamp = json.loads(json_stamp)
  336. int_flag = 0
  337. for item in list_stamp:
  338. if page_time_stamp <item[0]:
  339. break
  340. if page_time_stamp>item[0] and page_time_stamp<item[1]:
  341. int_flag = 1
  342. break
  343. return int_flag
  344. def getConfidence(rule_id):
  345. if rule_id ==0:
  346. return 30
  347. elif rule_id >=1 and rule_id <=26:
  348. return 20
  349. else:
  350. return 10
  351. @annotate('string,bigint -> bigint,bigint,bigint,bigint,bigint')
  352. class f_split_group_single(BaseUDTF):
  353. '''
  354. 将多个组拆解成多条记录
  355. '''
  356. def __init__(self):
  357. import logging
  358. import json
  359. global json,logging
  360. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  361. def process(self, json_set_docid,rule_id):
  362. list_group = json.loads(json_set_docid)
  363. for item in list_group:
  364. for index_i in range(len(item)):
  365. for index_j in range(len(item)):
  366. if index_i!=index_j and item[index_i]["docid"]!=item[index_j]["docid"]:
  367. self.forward(item[index_i]["docid"],item[index_j]["docid"],item[index_i]["extract_count"],item[index_j]["extract_count"],getConfidence(rule_id))
  368. @annotate('bigint,string->string')
  369. class group_document(BaseUDAF):
  370. '''
  371. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  372. '''
  373. def __init__(self):
  374. import json
  375. global json
  376. def new_buffer(self):
  377. return [[]]
  378. def iterate(self, buffer,id,json_set_docid):
  379. buffer[0].append({"id":id,"json_set_docid":json.loads(json_set_docid)})
  380. def merge(self, buffer, pbuffer):
  381. buffer[0].extend(pbuffer[0])
  382. def terminate(self, buffer):
  383. return json.dumps(buffer[0])
  384. @annotate('bigint,string,bigint,string -> bigint,bigint,string')
  385. class decare_document(BaseUDTF):
  386. '''
  387. 将多个组拆解成多条记录
  388. '''
  389. def __init__(self):
  390. import logging
  391. import json
  392. global json,logging
  393. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  394. def process(self,group_id1, json_list_doc1,group_id2,json_list_doc2):
  395. #y=x,少掉近一半的数据
  396. if group_id1>=group_id2:
  397. list_doc1 = json.loads(json_list_doc1)
  398. list_doc2 = json.loads(json_list_doc2)
  399. for _doc1 in list_doc1:
  400. for _doc2 in list_doc2:
  401. #同一个重复group不做判断
  402. if _doc1["id"]!=_doc2["id"]:
  403. #判断两个group是否有重复
  404. _set1 = set()
  405. for _item1 in _doc1["json_set_docid"]:
  406. _set1.add(_item1["docid"])
  407. _set2 = set()
  408. for _item2 in _doc2["json_set_docid"]:
  409. _set2.add(_item2["docid"])
  410. if len(_set1&_set2)>0:
  411. new_json_set_docid = _doc1["json_set_docid"]
  412. for _item2 in _doc2["json_set_docid"]:
  413. if _item2["docid"] not in _set1:
  414. new_json_set_docid.append(_item2)
  415. self.forward(_doc1["id"],_doc2["id"],json.dumps(new_json_set_docid))
  416. def getBestDocid(list_pair):
  417. list_pair.sort(key=lambda x:x[3],reverse=True)
  418. _max_count = max(list_pair[0][3],list_pair[0][1])
  419. set_candidate = set()
  420. if list_pair[0][1]==_max_count:
  421. set_candidate.add(list_pair[0][0])
  422. for item in list_pair:
  423. if item[3]==_max_count:
  424. set_candidate.add(item[2])
  425. else:
  426. break
  427. list_candidate = list(set_candidate)
  428. list_candidate.sort(key=lambda x:x)
  429. return list_candidate[0]
  430. @annotate('bigint,bigint,bigint,bigint->string')
  431. class choose_document(BaseUDAF):
  432. '''
  433. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  434. '''
  435. def __init__(self):
  436. import json
  437. global json
  438. def new_buffer(self):
  439. return [[]]
  440. def iterate(self, buffer,docid1,extract_count1,docid2,extract_count2):
  441. buffer[0].append([docid1,extract_count1,docid2,extract_count2])
  442. def merge(self, buffer, pbuffer):
  443. buffer[0].extend(pbuffer[0])
  444. def terminate(self, buffer):
  445. list_pair = buffer[0]
  446. _set = set()
  447. for item in buffer[0]:
  448. _set.add(str(item[2]))
  449. list_dumplicate = list(_set)
  450. best_docid = getBestDocid(list_pair)
  451. if best_docid==list_pair[0][0]:
  452. save_flag = 1
  453. else:
  454. save_flag = 0
  455. return json.dumps({"save_flag":save_flag,"dumplicates":list_dumplicate})
  456. @annotate('string -> bigint,string')
  457. class f_get_choose_document(BaseUDTF):
  458. '''
  459. 将多个组拆解成多条记录
  460. '''
  461. def __init__(self):
  462. import logging
  463. import json
  464. global json,logging
  465. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  466. def process(self,json_choose):
  467. if json_choose is None:
  468. self.forward(1,None)
  469. else:
  470. _choose = json.loads(json_choose)
  471. self.forward(_choose["save_flag"],",".join(_choose["dumplicates"]))
  472. @annotate('bigint,bigint,bigint,bigint->string')
  473. class group_document_bestFirst(BaseUDAF):
  474. '''
  475. 将组里面最优的放在前面
  476. '''
  477. def __init__(self):
  478. import json
  479. global json
  480. def new_buffer(self):
  481. return [[]]
  482. def iterate(self, buffer,docid1,extract_count1,docid2,extract_count2):
  483. buffer[0].append([docid1,extract_count1,docid2,extract_count2])
  484. def merge(self, buffer, pbuffer):
  485. buffer[0].extend(pbuffer[0])
  486. def terminate(self, buffer):
  487. list_pair = buffer[0]
  488. _set = set()
  489. for item in buffer[0]:
  490. _set.add(item[2])
  491. _set.add(list_pair[0][0])
  492. best_docid = getBestDocid(list_pair)
  493. _set.remove(best_docid)
  494. list_dumplicate = list(_set)
  495. list_dumplicate.sort(key=lambda x:x)
  496. list_dumplicate.insert(0,best_docid)
  497. list_dumplicate_str = []
  498. for item in list_dumplicate:
  499. list_dumplicate_str.append(str(item))
  500. return ",".join(list_dumplicate_str)
  501. @annotate('string -> bigint,string')
  502. class f_get_best_dumplicates(BaseUDTF):
  503. '''
  504. 得到每个分组中最优的那一条及其重复记录
  505. '''
  506. def __init__(self):
  507. import logging
  508. import json
  509. global json,logging
  510. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  511. def process(self,list_dumplicate_str):
  512. if list_dumplicate_str is None:
  513. pass
  514. else:
  515. list_dumplicate = list_dumplicate_str.split(",")
  516. if len(list_dumplicate)>0:
  517. self.forward(int(list_dumplicate[0]),",".join(list_dumplicate[1:]))
  518. else:
  519. pass
  520. @annotate('bigint,bigint->string')
  521. class bridge2group(BaseUDAF):
  522. '''
  523. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  524. '''
  525. def __init__(self):
  526. import json
  527. global json
  528. def new_buffer(self):
  529. return [set()]
  530. def iterate(self, buffer,docid1,docid2):
  531. buffer[0].add(docid1)
  532. buffer[0].add(docid2)
  533. def merge(self, buffer, pbuffer):
  534. buffer[0] |= pbuffer[0]
  535. def terminate(self, buffer):
  536. list_pair = list(buffer[0])
  537. list_pair.sort(key=lambda x:x,reverse=True)
  538. return json.dumps(list_pair)
  539. @annotate('string -> bigint,bigint')
  540. class group2bridge(BaseUDTF):
  541. '''
  542. 将多个组拆解成多条记录
  543. '''
  544. def __init__(self):
  545. import logging
  546. import json
  547. global json,logging
  548. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  549. def process(self,json_list_docid):
  550. list_docid = json.loads(json_list_docid)
  551. for _docid in list_docid:
  552. self.forward(list_docid[-1],_docid)
  553. @annotate('bigint,bigint,string -> bigint')
  554. class f_get_dump_docid(BaseUDTF):
  555. '''
  556. 将多个组拆解成多条记录
  557. '''
  558. def __init__(self):
  559. import logging
  560. import json
  561. global json,logging
  562. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  563. def process(self,docid,save_flag,dumplicates):
  564. if save_flag==0:
  565. self.forward(docid)
  566. if dumplicates is not None:
  567. list_docid = dumplicates.split(",")
  568. if len(list_docid)>0:
  569. for _docid in list_docid[1:]:
  570. self.forward(int(_docid))
  571. else:
  572. if dumplicates is not None:
  573. list_docid = dumplicates.split(",")
  574. if len(list_docid)>0:
  575. for _docid in list_docid:
  576. self.forward(int(_docid))
  577. @annotate('string -> bigint,bigint')
  578. class f_get_docid(BaseUDTF):
  579. '''
  580. 将多个组拆解成多条记录
  581. '''
  582. def __init__(self):
  583. import logging
  584. import json
  585. global json,logging
  586. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  587. def process(self,json_set_docid):
  588. team_id = 0
  589. if json_set_docid is not None:
  590. list_docses = json.loads(json_set_docid)
  591. for list_docs in list_docses:
  592. team_id += 1
  593. for item in list_docs:
  594. self.forward(team_id,item["docid"])
  595. @annotate("string->bigint")
  596. class get_count_dump(object):
  597. def __init__(self):
  598. import logging
  599. import re
  600. global logging,re
  601. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  602. def evaluate(self, title):
  603. _count = 0
  604. if title is not None:
  605. _count = len(title.split(","))
  606. return _count
  607. def getSet(list_dict,key):
  608. _set = set()
  609. for item in list_dict:
  610. if key in item:
  611. if item[key]!='' and item[key] is not None:
  612. if re.search("^\d[\d\.]*$",item[key]) is not None:
  613. _set.add(str(float(item[key])))
  614. else:
  615. _set.add(str(item[key]))
  616. return _set
  617. def getDiffIndex(list_dict,key):
  618. _set = set()
  619. for _i in range(len(list_dict)):
  620. item = list_dict[_i]
  621. if key in item:
  622. if item[key]!='' and item[key] is not None:
  623. if re.search("^\d[\d\.]*$",item[key]) is not None:
  624. _set.add(str(float(item[key])))
  625. else:
  626. _set.add(str(item[key]))
  627. if len(_set)>1:
  628. return _i
  629. return len(list_dict)
  630. @annotate('bigint,string -> bigint,bigint')
  631. class f_getGroup_dumpFinal(BaseUDTF):
  632. '''
  633. 从最后的结果中获取组
  634. '''
  635. def __init__(self):
  636. import logging
  637. import json
  638. global json,logging
  639. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  640. def process(self,docid,dumplicates):
  641. self.forward(int(docid),int(docid))
  642. if dumplicates is not None:
  643. list_docids = dumplicates.split(",")
  644. for _docid in list_docids:
  645. self.forward(int(docid),int(_docid))
  646. @annotate('bigint,bigint,string,string,string,string,bigint,bigint,bigint->string')
  647. class f_redump_limit_num(BaseUDAF):
  648. '''
  649. 去重合并后重新判断,组内个数大于5时,dottitle、tenderee、win_tenderer、bidding_budget组内只能有一个取值
  650. 组内个数小于等于5时,tenderee、win_tenderer、bidding_budget组内只能有一个取值
  651. '''
  652. def __init__(self):
  653. import logging
  654. import json,re
  655. global json,logging,re
  656. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  657. def new_buffer(self):
  658. return [list()]
  659. def iterate(self, buffer,main_docid,docid,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,extract_count1,extract_count2,confidence):
  660. buffer[0].append({"main_docid":main_docid,"docid":docid,"set_limit_column1":set_limit_column1,"set_limit_column2":set_limit_column2,
  661. "set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,"extract_count1":extract_count1,
  662. "extract_count2":extract_count2,"confidence":confidence})
  663. def merge(self, buffer, pbuffer):
  664. buffer[0].extend(pbuffer[0])
  665. def terminate(self, buffer):
  666. list_group = []
  667. the_group = buffer[0]
  668. the_group.sort(key=lambda x:x["confidence"],reverse=True)
  669. if len(the_group)>5:
  670. keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"]
  671. else:
  672. keys = ["set_limit_column2","set_limit_column3","set_limit_column4"]
  673. final_group = []
  674. #置信度
  675. list_key_index = []
  676. for _k in keys:
  677. list_key_index.append(getDiffIndex(the_group,_k))
  678. _index = min(list_key_index)
  679. if _index>1:
  680. main_docid = the_group[0]["main_docid"]
  681. for item in the_group[:_index]:
  682. if item["docid"]!=main_docid:
  683. final_group.append({"docid1":main_docid,"docid2":item["docid"],"extract_count1":item["extract_count1"],"extract_count2":item["extract_count2"],"confidence":item["confidence"]})
  684. # stay = True
  685. # for _key in keys:
  686. # if len(getSet(the_group,_key))>1:
  687. # stay = False
  688. # break
  689. #
  690. # if stay:
  691. # main_docid = the_group[0]["main_docid"]
  692. # for item in the_group:
  693. # if item["docid"]!=main_docid:
  694. # final_group.append({"docid1":main_docid,"docid2":item["docid"],"extract_count1":item["extract_count1"],"extract_count2":item["extract_count2"]})
  695. return json.dumps(final_group)
  696. @annotate('string -> bigint,bigint,bigint,bigint,bigint')
  697. class f_get_dumpFinal_checked(BaseUDTF):
  698. '''
  699. 从最后的结果中获取组
  700. '''
  701. def __init__(self):
  702. import logging
  703. import json
  704. global json,logging
  705. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  706. def process(self,list_group):
  707. if list_group is not None:
  708. final_group = json.loads(list_group)
  709. for _group in final_group:
  710. self.forward(_group["docid1"],_group["docid2"],_group["extract_count1"],_group["extract_count2"],_group["confidence"])