documentDumplicate.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697
  1. #coding:UTF8
  2. from odps.udf import annotate
  3. from odps.udf import BaseUDTF
  4. from odps.udf import BaseUDAF
  5. @annotate('string,string -> string,bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string,bigint')
  6. class f_decode_extract(BaseUDTF):
  7. def __init__(self):
  8. import logging
  9. import json
  10. import time,re
  11. global json,logging,time,re
  12. self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*"
  13. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  14. def process(self, extractjson,otherjson):
  15. if extractjson is not None:
  16. _extract = json.loads(extractjson)
  17. else:
  18. _extract = {}
  19. if otherjson is not None:
  20. _other = json.loads(otherjson)
  21. else:
  22. _other = {}
  23. project_code = ""
  24. project_name = ""
  25. tenderee = ""
  26. agency = ""
  27. win_tenderer = ""
  28. bidding_budget = ""
  29. win_bid_price = ""
  30. page_time_stamp = 0
  31. docchannel = 0
  32. extract_count = 0
  33. page_time = _other.get("pageTime",time.strftime('%Y-%m-%d',time.localtime()))
  34. doctitle = _other.get("doctitle","")
  35. doctitle_refine = re.sub(r'工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价', '', doctitle)
  36. area = _other.get("area","")
  37. province = _other.get("province","")
  38. city = _other.get("city","")
  39. district = _other.get("district","")
  40. web_source_no = _other.get("webSourceNo","")
  41. docchannel = _other.get("docchannel",0)
  42. if re.search(self.time_pattern,page_time) is not None:
  43. timeArray = time.strptime(page_time[:11], "%Y-%m-%d")
  44. page_time_stamp = int(time.mktime(timeArray))
  45. list_code = _extract.get("code",[])
  46. if len(list_code)>0:
  47. project_code = list_code[0]
  48. project_name = _extract.get("name","")
  49. dict_pack = _extract.get("prem",{})
  50. logging.info(dict_pack)
  51. for _key in dict_pack.keys():
  52. if dict_pack[_key]["tendereeMoney"]!='' and float(dict_pack[_key]["tendereeMoney"])>0:
  53. extract_count += 1
  54. if bidding_budget=="":
  55. bidding_budget = str(dict_pack[_key]["tendereeMoney"])
  56. for _role in dict_pack[_key]["roleList"]:
  57. extract_count += 1
  58. if _role[2]!='' and float(_role[2])>0:
  59. extract_count += 1
  60. if _role[0]=="tenderee":
  61. tenderee = _role[1]
  62. if _role[0]=="win_tenderer":
  63. if win_tenderer=="":
  64. win_tenderer = _role[1]
  65. if _role[2]!='' and float(_role[2])>0:
  66. if win_bid_price=="":
  67. win_bid_price = str(_role[2])
  68. if _role[0]=="agency":
  69. agency = _role[1]
  70. if project_code!="":
  71. extract_count += 1
  72. if project_name!="":
  73. extract_count += 1
  74. logging.info(page_time+doctitle+doctitle_refine+area+province+city+
  75. district+web_source_no+project_code+project_name+tenderee+agency+win_tenderer+bidding_budget+win_bid_price)
  76. self.forward(page_time,page_time_stamp,docchannel,doctitle,doctitle_refine,area,province,city,
  77. district,web_source_no,project_code,project_name,tenderee,agency,win_tenderer,bidding_budget,win_bid_price,extract_count)
  78. @annotate('string,string,string,string,string -> string,string,string,bigint')
  79. class f_decode_sub_docs_json(BaseUDTF):
  80. def __init__(self):
  81. import logging
  82. import json
  83. global json,logging
  84. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  85. def process(self, project_code,project_name,tenderee,agency,sub_docs_json):
  86. columns = {"win_tenderer":"","bidding_budget":"","win_bid_price":""}
  87. extract_count = 0
  88. if project_code!="":
  89. extract_count += 1
  90. if project_name!="":
  91. extract_count += 1
  92. if tenderee!="":
  93. extract_count += 1
  94. if agency!="":
  95. extract_count += 1
  96. if sub_docs_json is not None:
  97. for sub_docs in json.loads(sub_docs_json):
  98. for _key_sub_docs in sub_docs.keys():
  99. extract_count += 1
  100. if _key_sub_docs in columns:
  101. if columns[_key_sub_docs]=="" and str(sub_docs[_key_sub_docs]) not in ["","0"]:
  102. if _key_sub_docs in ["bidding_budget","win_bid_price"]:
  103. if float(sub_docs[_key_sub_docs])>0:
  104. columns[_key_sub_docs] = str(sub_docs[_key_sub_docs])
  105. else:
  106. columns[_key_sub_docs] = str(sub_docs[_key_sub_docs])
  107. self.forward(columns["win_tenderer"],columns["bidding_budget"],columns["win_bid_price"],extract_count)
  108. @annotate("string->bigint")
  109. class totimestamp(object):
  110. def __init__(self):
  111. import time
  112. global time
  113. import logging
  114. import json
  115. import re
  116. global json,logging,re
  117. self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*"
  118. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  119. def evaluate(self, str_time):
  120. try:
  121. logging.info(str_time)
  122. if str_time is not None and re.search(self.time_pattern,str_time) is not None:
  123. timeArray = time.strptime(str_time[:10], "%Y-%m-%d")
  124. timeStamp = int(time.mktime(timeArray))
  125. return timeStamp
  126. else:
  127. return 0
  128. except Exception as e:
  129. return 0
  130. @annotate("string->string")
  131. class refind_name(object):
  132. def __init__(self):
  133. import logging
  134. import re
  135. global logging,re
  136. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  137. def evaluate(self, title):
  138. if title is not None:
  139. return re.sub(r'工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价', '', title)
  140. return ""
  141. @annotate('bigint,bigint,bigint,string,bigint,string->string')
  142. class f_set_docid(BaseUDAF):
  143. '''
  144. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  145. '''
  146. def __init__(self):
  147. import json
  148. global json
  149. def new_buffer(self):
  150. return [[]]
  151. def iterate(self, buffer,docid, page_time_stamp,extract_count,defind_column,defind_count,tenderee):
  152. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count,
  153. "defind_column":defind_column,"defind_count":defind_count,"tenderee":tenderee})
  154. def merge(self, buffer, pbuffer):
  155. buffer[0].extend(pbuffer[0])
  156. def terminate(self, buffer):
  157. list_docs = buffer[0]
  158. list_docs.sort(key=lambda x:x["page_time_stamp"])
  159. list_group = []
  160. _begin = 0
  161. defind_count = 0
  162. if len(list_docs)>0:
  163. defind_count = list_docs[0]["defind_count"]
  164. for i in range(len(list_docs)-1):
  165. if abs(list_docs[i]["page_time_stamp"]-list_docs[i+1]["page_time_stamp"])<=86400*2:
  166. continue
  167. else:
  168. _group = []
  169. _set_column = set()
  170. _set_tenderee = set()
  171. for j in range(_begin,i+1):
  172. if list_docs[j]["tenderee"] is not None and list_docs[j]["tenderee"]!="":
  173. _set_tenderee.add(list_docs[j]["tenderee"])
  174. _set_column.add(list_docs[j]["defind_column"])
  175. _group.append({"docid":list_docs[j]["docid"],"extract_count":list_docs[j]["extract_count"]})
  176. if len(_group)>=3 and len(_set_tenderee)>1:
  177. pass
  178. else:
  179. if len(_group)>1:
  180. if defind_count==2:
  181. if len(_set_column)>=2:
  182. list_group.append(_group)
  183. elif defind_count==1:
  184. if len(_set_column)==1:
  185. list_group.append(_group)
  186. elif defind_count==0:
  187. list_group.append(_group)
  188. _begin = i+1
  189. if len(list_docs)>1:
  190. _set_column = set()
  191. _set_tenderee = set()
  192. _group = []
  193. for j in range(_begin,len(list_docs)):
  194. if list_docs[j]["tenderee"] is not None and list_docs[j]["tenderee"]!="":
  195. _set_tenderee.add(list_docs[j]["tenderee"])
  196. _set_column.add(list_docs[j]["defind_column"])
  197. _group.append({"docid":list_docs[j]["docid"],"extract_count":list_docs[j]["extract_count"]})
  198. if len(_group)>=3 and len(_set_tenderee)>1:
  199. pass
  200. else:
  201. if len(_group)>1:
  202. if defind_count==2:
  203. if len(_set_column)>=2:
  204. list_group.append(_group)
  205. elif defind_count==1:
  206. if len(_set_column)==1:
  207. list_group.append(_group)
  208. elif defind_count==0:
  209. list_group.append(_group)
  210. return json.dumps(list_group)
  211. @annotate('bigint->string')
  212. class f_stamp_squence(BaseUDAF):
  213. '''
  214. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  215. '''
  216. def __init__(self):
  217. import json
  218. global json
  219. import logging
  220. global logging
  221. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  222. def new_buffer(self):
  223. return [set()]
  224. def iterate(self, buffer,page_time_stamp):
  225. buffer[0].add(page_time_stamp)
  226. def merge(self, buffer, pbuffer):
  227. buffer[0] |= pbuffer[0]
  228. def terminate(self, buffer):
  229. if 0 in buffer[0]:
  230. buffer[0].remove(0)
  231. list_stamp = list(buffer[0])
  232. list_stamp.sort(key=lambda x:x)
  233. list_stamp_final = []
  234. _begin = 0
  235. _time_decase = 86400*2
  236. logging.info(str(list_stamp))
  237. for _index in range(len(list_stamp)-1):
  238. if list_stamp[_index+1]-list_stamp[_index]<_time_decase:
  239. continue
  240. else:
  241. list_stamp_final.append([list_stamp[_begin]-_time_decase,list_stamp[_index]+_time_decase])
  242. _begin = _index+1
  243. if len(list_stamp)>0:
  244. list_stamp_final.append([list_stamp[_begin]-_time_decase,list_stamp[-1]+_time_decase])
  245. return json.dumps(list_stamp_final)
  246. @annotate("bigint,string->bigint")
  247. class in_stamp(object):
  248. def __init__(self):
  249. import logging
  250. import re
  251. import json
  252. global logging,re,json
  253. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  254. def evaluate(self, page_time_stamp,json_stamp):
  255. list_stamp = json.loads(json_stamp)
  256. int_flag = 0
  257. for item in list_stamp:
  258. if page_time_stamp <item[0]:
  259. break
  260. if page_time_stamp>item[0] and page_time_stamp<item[1]:
  261. int_flag = 1
  262. break
  263. return int_flag
  264. @annotate('string -> bigint,bigint,bigint,bigint')
  265. class f_split_group_single(BaseUDTF):
  266. '''
  267. 将多个组拆解成多条记录
  268. '''
  269. def __init__(self):
  270. import logging
  271. import json
  272. global json,logging
  273. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  274. def process(self, json_set_docid):
  275. list_group = json.loads(json_set_docid)
  276. for item in list_group:
  277. for index_i in range(len(item)):
  278. for index_j in range(len(item)):
  279. if index_i!=index_j and item[index_i]["docid"]!=item[index_j]["docid"]:
  280. self.forward(item[index_i]["docid"],item[index_j]["docid"],item[index_i]["extract_count"],item[index_j]["extract_count"])
  281. @annotate('bigint,string->string')
  282. class group_document(BaseUDAF):
  283. '''
  284. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  285. '''
  286. def __init__(self):
  287. import json
  288. global json
  289. def new_buffer(self):
  290. return [[]]
  291. def iterate(self, buffer,id,json_set_docid):
  292. buffer[0].append({"id":id,"json_set_docid":json.loads(json_set_docid)})
  293. def merge(self, buffer, pbuffer):
  294. buffer[0].extend(pbuffer[0])
  295. def terminate(self, buffer):
  296. return json.dumps(buffer[0])
  297. @annotate('bigint,string,bigint,string -> bigint,bigint,string')
  298. class decare_document(BaseUDTF):
  299. '''
  300. 将多个组拆解成多条记录
  301. '''
  302. def __init__(self):
  303. import logging
  304. import json
  305. global json,logging
  306. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  307. def process(self,group_id1, json_list_doc1,group_id2,json_list_doc2):
  308. #y=x,少掉近一半的数据
  309. if group_id1>=group_id2:
  310. list_doc1 = json.loads(json_list_doc1)
  311. list_doc2 = json.loads(json_list_doc2)
  312. for _doc1 in list_doc1:
  313. for _doc2 in list_doc2:
  314. #同一个重复group不做判断
  315. if _doc1["id"]!=_doc2["id"]:
  316. #判断两个group是否有重复
  317. _set1 = set()
  318. for _item1 in _doc1["json_set_docid"]:
  319. _set1.add(_item1["docid"])
  320. _set2 = set()
  321. for _item2 in _doc2["json_set_docid"]:
  322. _set2.add(_item2["docid"])
  323. if len(_set1&_set2)>0:
  324. new_json_set_docid = _doc1["json_set_docid"]
  325. for _item2 in _doc2["json_set_docid"]:
  326. if _item2["docid"] not in _set1:
  327. new_json_set_docid.append(_item2)
  328. self.forward(_doc1["id"],_doc2["id"],json.dumps(new_json_set_docid))
  329. def getBestDocid(list_pair):
  330. list_pair.sort(key=lambda x:x[3],reverse=True)
  331. _max_count = max(list_pair[0][3],list_pair[0][1])
  332. set_candidate = set()
  333. if list_pair[0][1]==_max_count:
  334. set_candidate.add(list_pair[0][0])
  335. for item in list_pair:
  336. if item[3]==_max_count:
  337. set_candidate.add(item[2])
  338. else:
  339. break
  340. list_candidate = list(set_candidate)
  341. list_candidate.sort(key=lambda x:x)
  342. return list_candidate[0]
  343. @annotate('bigint,bigint,bigint,bigint->string')
  344. class choose_document(BaseUDAF):
  345. '''
  346. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  347. '''
  348. def __init__(self):
  349. import json
  350. global json
  351. def new_buffer(self):
  352. return [[]]
  353. def iterate(self, buffer,docid1,extract_count1,docid2,extract_count2):
  354. buffer[0].append([docid1,extract_count1,docid2,extract_count2])
  355. def merge(self, buffer, pbuffer):
  356. buffer[0].extend(pbuffer[0])
  357. def terminate(self, buffer):
  358. list_pair = buffer[0]
  359. _set = set()
  360. for item in buffer[0]:
  361. _set.add(str(item[2]))
  362. list_dumplicate = list(_set)
  363. best_docid = getBestDocid(list_pair)
  364. if best_docid==list_pair[0][0]:
  365. save_flag = 1
  366. else:
  367. save_flag = 0
  368. return json.dumps({"save_flag":save_flag,"dumplicates":list_dumplicate})
  369. @annotate('string -> bigint,string')
  370. class f_get_choose_document(BaseUDTF):
  371. '''
  372. 将多个组拆解成多条记录
  373. '''
  374. def __init__(self):
  375. import logging
  376. import json
  377. global json,logging
  378. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  379. def process(self,json_choose):
  380. if json_choose is None:
  381. self.forward(1,None)
  382. else:
  383. _choose = json.loads(json_choose)
  384. self.forward(_choose["save_flag"],",".join(_choose["dumplicates"]))
  385. @annotate('bigint,bigint,bigint,bigint->string')
  386. class group_document_bestFirst(BaseUDAF):
  387. '''
  388. 将组里面最优的放在前面
  389. '''
  390. def __init__(self):
  391. import json
  392. global json
  393. def new_buffer(self):
  394. return [[]]
  395. def iterate(self, buffer,docid1,extract_count1,docid2,extract_count2):
  396. buffer[0].append([docid1,extract_count1,docid2,extract_count2])
  397. def merge(self, buffer, pbuffer):
  398. buffer[0].extend(pbuffer[0])
  399. def terminate(self, buffer):
  400. list_pair = buffer[0]
  401. _set = set()
  402. for item in buffer[0]:
  403. _set.add(item[2])
  404. _set.add(list_pair[0][0])
  405. best_docid = getBestDocid(list_pair)
  406. _set.remove(best_docid)
  407. list_dumplicate = list(_set)
  408. list_dumplicate.sort(key=lambda x:x)
  409. list_dumplicate.insert(0,best_docid)
  410. list_dumplicate_str = []
  411. for item in list_dumplicate:
  412. list_dumplicate_str.append(str(item))
  413. return ",".join(list_dumplicate_str)
  414. @annotate('string -> bigint,string')
  415. class f_get_best_dumplicates(BaseUDTF):
  416. '''
  417. 得到每个分组中最优的那一条及其重复记录
  418. '''
  419. def __init__(self):
  420. import logging
  421. import json
  422. global json,logging
  423. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  424. def process(self,list_dumplicate_str):
  425. if list_dumplicate_str is None:
  426. pass
  427. else:
  428. list_dumplicate = list_dumplicate_str.split(",")
  429. if len(list_dumplicate)>0:
  430. self.forward(int(list_dumplicate[0]),",".join(list_dumplicate[1:]))
  431. else:
  432. pass
  433. @annotate('bigint,bigint->string')
  434. class bridge2group(BaseUDAF):
  435. '''
  436. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  437. '''
  438. def __init__(self):
  439. import json
  440. global json
  441. def new_buffer(self):
  442. return [set()]
  443. def iterate(self, buffer,docid1,docid2):
  444. buffer[0].add(docid1)
  445. buffer[0].add(docid2)
  446. def merge(self, buffer, pbuffer):
  447. buffer[0] |= pbuffer[0]
  448. def terminate(self, buffer):
  449. list_pair = list(buffer[0])
  450. list_pair.sort(key=lambda x:x,reverse=True)
  451. return json.dumps(list_pair)
  452. @annotate('string -> bigint,bigint')
  453. class group2bridge(BaseUDTF):
  454. '''
  455. 将多个组拆解成多条记录
  456. '''
  457. def __init__(self):
  458. import logging
  459. import json
  460. global json,logging
  461. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  462. def process(self,json_list_docid):
  463. list_docid = json.loads(json_list_docid)
  464. for _docid in list_docid:
  465. self.forward(list_docid[-1],_docid)
  466. @annotate('bigint,bigint,string -> bigint')
  467. class f_get_dump_docid(BaseUDTF):
  468. '''
  469. 将多个组拆解成多条记录
  470. '''
  471. def __init__(self):
  472. import logging
  473. import json
  474. global json,logging
  475. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  476. def process(self,docid,save_flag,dumplicates):
  477. if save_flag==0:
  478. self.forward(docid)
  479. if dumplicates is not None:
  480. list_docid = dumplicates.split(",")
  481. if len(list_docid)>0:
  482. for _docid in list_docid[1:]:
  483. self.forward(int(_docid))
  484. else:
  485. if dumplicates is not None:
  486. list_docid = dumplicates.split(",")
  487. if len(list_docid)>0:
  488. for _docid in list_docid:
  489. self.forward(int(_docid))
  490. @annotate('string -> bigint,bigint')
  491. class f_get_docid(BaseUDTF):
  492. '''
  493. 将多个组拆解成多条记录
  494. '''
  495. def __init__(self):
  496. import logging
  497. import json
  498. global json,logging
  499. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  500. def process(self,json_set_docid):
  501. team_id = 0
  502. if json_set_docid is not None:
  503. list_docses = json.loads(json_set_docid)
  504. for list_docs in list_docses:
  505. team_id += 1
  506. for item in list_docs:
  507. self.forward(team_id,item["docid"])
  508. @annotate("string->bigint")
  509. class get_count_dump(object):
  510. def __init__(self):
  511. import logging
  512. import re
  513. global logging,re
  514. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  515. def evaluate(self, title):
  516. _count = 0
  517. if title is not None:
  518. _count = len(title.split(","))
  519. return _count
  520. def getSet(list_dict,key):
  521. _set = set()
  522. for item in list_dict:
  523. if key in item:
  524. if item[key]!='' and item[key] is not None:
  525. if re.search("^\d[\d\.]*$",item[key]) is not None:
  526. _set.add(str(float(item[key])))
  527. else:
  528. _set.add(str(item[key]))
  529. return _set
  530. @annotate('bigint,string -> bigint,bigint')
  531. class f_getGroup_dumpFinal(BaseUDTF):
  532. '''
  533. 从最后的结果中获取组
  534. '''
  535. def __init__(self):
  536. import logging
  537. import json
  538. global json,logging
  539. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  540. def process(self,docid,dumplicates):
  541. self.forward(int(docid),int(docid))
  542. if dumplicates is not None:
  543. list_docids = dumplicates.split(",")
  544. for _docid in list_docids:
  545. self.forward(int(docid),int(_docid))
  546. @annotate('bigint,bigint,string,string,string,string,bigint,bigint->string')
  547. class f_redump_limit_num(BaseUDAF):
  548. '''
  549. 去重合并后重新判断,组内个数大于5时,dottitle、tenderee、win_tenderer、bidding_budget组内只能有一个取值
  550. 组内个数小于等于5时,tenderee、win_tenderer、bidding_budget组内只能有一个取值
  551. '''
  552. def __init__(self):
  553. import logging
  554. import json,re
  555. global json,logging,re
  556. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  557. def new_buffer(self):
  558. return [list()]
  559. def iterate(self, buffer,main_docid,docid,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,extract_count1,extract_count2):
  560. buffer[0].append({"main_docid":main_docid,"docid":docid,"set_limit_column1":set_limit_column1,"set_limit_column2":set_limit_column2,
  561. "set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,"extract_count1":extract_count1,"extract_count2":extract_count2})
  562. def merge(self, buffer, pbuffer):
  563. buffer[0].extend(pbuffer[0])
  564. def terminate(self, buffer):
  565. list_group = []
  566. the_group = buffer[0]
  567. if len(the_group)>5:
  568. keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"]
  569. else:
  570. keys = ["set_limit_column2","set_limit_column3","set_limit_column4"]
  571. stay = True
  572. for _key in keys:
  573. if len(getSet(the_group,_key))>1:
  574. stay = False
  575. break
  576. final_group = []
  577. if stay:
  578. main_docid = the_group[0]["main_docid"]
  579. for item in the_group:
  580. if item["docid"]!=main_docid:
  581. final_group.append({"docid1":main_docid,"docid2":item["docid"],"extract_count1":item["extract_count1"],"extract_count2":item["extract_count2"]})
  582. return json.dumps(final_group)
  583. @annotate('string -> bigint,bigint,bigint,bigint')
  584. class f_get_dumpFinal_checked(BaseUDTF):
  585. '''
  586. 从最后的结果中获取组
  587. '''
  588. def __init__(self):
  589. import logging
  590. import json
  591. global json,logging
  592. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  593. def process(self,list_group):
  594. if list_group is not None:
  595. final_group = json.loads(list_group)
  596. for _group in final_group:
  597. self.forward(_group["docid1"],_group["docid2"],_group["extract_count1"],_group["extract_count2"])