documentDumplicate.py 37 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010
  1. #coding:UTF8
  2. from odps.udf import annotate
  3. from odps.udf import BaseUDTF
  4. from odps.udf import BaseUDAF
  5. @annotate('string,string -> string,bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,bigint')
  6. class f_decode_extract(BaseUDTF):
  7. def __init__(self):
  8. import logging
  9. import json
  10. import time,re
  11. global json,logging,time,re
  12. self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*"
  13. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  14. def process(self, extractjson,otherjson):
  15. if extractjson is not None:
  16. _extract = json.loads(extractjson)
  17. else:
  18. _extract = {}
  19. if otherjson is not None:
  20. _other = json.loads(otherjson)
  21. else:
  22. _other = {}
  23. project_code = ""
  24. project_name = ""
  25. tenderee = ""
  26. agency = ""
  27. win_tenderer = ""
  28. bidding_budget = ""
  29. win_bid_price = ""
  30. fingerprint = ""
  31. page_time_stamp = 0
  32. docchannel = 0
  33. extract_count = 0
  34. page_time = _other.get("pageTime",time.strftime('%Y-%m-%d',time.localtime()))
  35. doctitle = _other.get("doctitle","")
  36. doctitle_refine = re.sub(r'工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|合同', '', doctitle)
  37. area = _other.get("area","")
  38. province = _other.get("province","")
  39. city = _other.get("city","")
  40. district = _other.get("district","")
  41. web_source_no = _other.get("webSourceNo","")
  42. docchannel = _other.get("docchannel",0)
  43. if re.search(self.time_pattern,page_time) is not None:
  44. try:
  45. timeArray = time.strptime(page_time[:11], "%Y-%m-%d")
  46. page_time_stamp = int(time.mktime(timeArray))
  47. except Exception as e:
  48. pass
  49. list_code = _extract.get("code",[])
  50. if len(list_code)>0:
  51. project_code = list_code[0]
  52. project_name = _extract.get("name","")
  53. fingerprint = _extract.get("fingerprint","")
  54. dict_pack = _extract.get("prem",{})
  55. logging.info(dict_pack)
  56. for _key in dict_pack.keys():
  57. if dict_pack[_key]["tendereeMoney"]!='' and float(dict_pack[_key]["tendereeMoney"])>0:
  58. extract_count += 1
  59. if bidding_budget=="":
  60. bidding_budget = str(float(dict_pack[_key]["tendereeMoney"]))
  61. for _role in dict_pack[_key]["roleList"]:
  62. extract_count += 1
  63. if _role[2]!='' and float(_role[2])>0:
  64. extract_count += 1
  65. if _role[0]=="tenderee":
  66. tenderee = _role[1]
  67. if _role[0]=="win_tenderer":
  68. if win_tenderer=="":
  69. win_tenderer = _role[1]
  70. if _role[2]!='' and float(_role[2])>0:
  71. extract_count += 1
  72. if win_bid_price=="":
  73. win_bid_price = str(float(_role[2]))
  74. if _role[0]=="agency":
  75. agency = _role[1]
  76. if project_code!="":
  77. extract_count += 1
  78. if project_name!="":
  79. extract_count += 1
  80. logging.info(page_time+doctitle+doctitle_refine+area+province+city+
  81. district+web_source_no+project_code+project_name+tenderee+agency+win_tenderer+bidding_budget+win_bid_price)
  82. self.forward(page_time,page_time_stamp,docchannel,doctitle,doctitle_refine,area,province,city,
  83. district,web_source_no,fingerprint,project_code,project_name,tenderee,agency,win_tenderer,bidding_budget,win_bid_price,extract_count)
  84. @annotate("string->bigint")
  85. class f_get_extractCount(object):
  86. def __init__(self):
  87. import time
  88. global time
  89. import logging
  90. import json
  91. import re
  92. global json,logging,re
  93. self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*"
  94. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  95. def evaluate(self, extractjson):
  96. if extractjson is not None:
  97. _extract = json.loads(extractjson)
  98. else:
  99. _extract = {}
  100. dict_pack = _extract.get("prem",{})
  101. extract_count = 0
  102. list_code = _extract.get("code",[])
  103. if len(list_code)>0:
  104. project_code = list_code[0]
  105. else:
  106. project_code = ""
  107. project_name = _extract.get("name","")
  108. bidding_budget = ""
  109. win_tenderer = ""
  110. win_bid_price = ""
  111. for _key in dict_pack.keys():
  112. if dict_pack[_key]["tendereeMoney"]!='' and float(dict_pack[_key]["tendereeMoney"])>0:
  113. extract_count += 1
  114. if bidding_budget=="":
  115. bidding_budget = str(float(dict_pack[_key]["tendereeMoney"]))
  116. for _role in dict_pack[_key]["roleList"]:
  117. extract_count += 1
  118. if _role[2]!='' and float(_role[2])>0:
  119. extract_count += 1
  120. if _role[0]=="tenderee":
  121. tenderee = _role[1]
  122. if _role[0]=="win_tenderer":
  123. if win_tenderer=="":
  124. win_tenderer = _role[1]
  125. if _role[2]!='' and float(_role[2])>0:
  126. extract_count += 1
  127. if win_bid_price=="":
  128. win_bid_price = str(float(_role[2]))
  129. if _role[0]=="agency":
  130. agency = _role[1]
  131. if project_code!="":
  132. extract_count += 1
  133. if project_name!="":
  134. extract_count += 1
  135. return extract_count
  136. @annotate('string,string,string,string,string -> string,string,string,bigint')
  137. class f_decode_sub_docs_json(BaseUDTF):
  138. def __init__(self):
  139. import logging
  140. import json
  141. global json,logging
  142. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  143. def process(self, project_code,project_name,tenderee,agency,sub_docs_json):
  144. columns = {"win_tenderer":"","bidding_budget":"","win_bid_price":""}
  145. extract_count = 0
  146. if project_code is not None and project_code!="":
  147. extract_count += 1
  148. if project_name is not None and project_name!="":
  149. extract_count += 1
  150. if tenderee is not None and tenderee!="":
  151. extract_count += 1
  152. if agency is not None and agency!="":
  153. extract_count += 1
  154. if sub_docs_json is not None:
  155. for sub_docs in json.loads(sub_docs_json):
  156. for _key_sub_docs in sub_docs.keys():
  157. extract_count += 1
  158. if _key_sub_docs in columns:
  159. if columns[_key_sub_docs]=="" and str(sub_docs[_key_sub_docs]) not in ["","0"]:
  160. if _key_sub_docs in ["bidding_budget","win_bid_price"]:
  161. if float(sub_docs[_key_sub_docs])>0:
  162. columns[_key_sub_docs] = str(float(sub_docs[_key_sub_docs]))
  163. else:
  164. columns[_key_sub_docs] = str(sub_docs[_key_sub_docs])
  165. self.forward(columns["win_tenderer"],columns["bidding_budget"],columns["win_bid_price"],extract_count)
  166. @annotate("string->bigint")
  167. class totimestamp(object):
  168. def __init__(self):
  169. import time
  170. global time
  171. import logging
  172. import json
  173. import re
  174. global json,logging,re
  175. self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*"
  176. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  177. def evaluate(self, str_time):
  178. try:
  179. logging.info(str_time)
  180. if str_time is not None and re.search(self.time_pattern,str_time) is not None:
  181. timeArray = time.strptime(str_time[:10], "%Y-%m-%d")
  182. timeStamp = int(time.mktime(timeArray))
  183. return timeStamp
  184. else:
  185. return 0
  186. except Exception as e:
  187. return 0
  188. @annotate("string->string")
  189. class refind_name(object):
  190. def __init__(self):
  191. import logging
  192. import re
  193. global logging,re
  194. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  195. def evaluate(self, title):
  196. if title is not None:
  197. return re.sub(r'工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|\[|\]|【|】', '', title)
  198. return ""
  199. @annotate('bigint,bigint,bigint,string,bigint,string->string')
  200. class f_set_docid(BaseUDAF):
  201. '''
  202. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  203. '''
  204. def __init__(self):
  205. import json
  206. global json
  207. def new_buffer(self):
  208. return [[]]
  209. def iterate(self, buffer,docid, page_time_stamp,extract_count,defind_column,defind_count,tenderee):
  210. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count,
  211. "defind_column":defind_column,"defind_count":defind_count,"tenderee":tenderee})
  212. def merge(self, buffer, pbuffer):
  213. buffer[0].extend(pbuffer[0])
  214. def terminate(self, buffer):
  215. list_docs = buffer[0]
  216. list_docs.sort(key=lambda x:x["page_time_stamp"])
  217. list_group = []
  218. _begin = 0
  219. defind_count = 0
  220. if len(list_docs)>0:
  221. defind_count = list_docs[0]["defind_count"]
  222. for i in range(len(list_docs)-1):
  223. if abs(list_docs[i]["page_time_stamp"]-list_docs[i+1]["page_time_stamp"])<=86400*2:
  224. continue
  225. else:
  226. _group = []
  227. _set_column = set()
  228. _set_tenderee = set()
  229. for j in range(_begin,i+1):
  230. if list_docs[j]["tenderee"] is not None and list_docs[j]["tenderee"]!="":
  231. _set_tenderee.add(list_docs[j]["tenderee"])
  232. _set_column.add(list_docs[j]["defind_column"])
  233. _group.append({"docid":list_docs[j]["docid"],"extract_count":list_docs[j]["extract_count"]})
  234. if len(_group)>=3 and len(_set_tenderee)>1:
  235. pass
  236. else:
  237. if len(_group)>1:
  238. if defind_count==2:
  239. if len(_set_column)>=2:
  240. list_group.append(_group)
  241. elif defind_count==1:
  242. if len(_set_column)==1:
  243. list_group.append(_group)
  244. elif defind_count==0:
  245. list_group.append(_group)
  246. _begin = i+1
  247. if len(list_docs)>1:
  248. _set_column = set()
  249. _set_tenderee = set()
  250. _group = []
  251. for j in range(_begin,len(list_docs)):
  252. if list_docs[j]["tenderee"] is not None and list_docs[j]["tenderee"]!="":
  253. _set_tenderee.add(list_docs[j]["tenderee"])
  254. _set_column.add(list_docs[j]["defind_column"])
  255. _group.append({"docid":list_docs[j]["docid"],"extract_count":list_docs[j]["extract_count"]})
  256. if len(_group)>=3 and len(_set_tenderee)>1:
  257. pass
  258. else:
  259. if len(_group)>1:
  260. if defind_count==2:
  261. if len(_set_column)>=2:
  262. list_group.append(_group)
  263. elif defind_count==1:
  264. if len(_set_column)==1:
  265. list_group.append(_group)
  266. elif defind_count==0:
  267. list_group.append(_group)
  268. return json.dumps(list_group)
  269. def isEmpty(_str):
  270. if _str is None or _str=="":
  271. return True
  272. return False
  273. @annotate('bigint,bigint,bigint,string,string,string,string,string,string,string,string->string')
  274. class f_set_docid_binaryChart(BaseUDAF):
  275. '''
  276. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  277. '''
  278. def __init__(self):
  279. import json
  280. global json
  281. def new_buffer(self):
  282. return [[]]
  283. def iterate(self, buffer,docid, page_time_stamp,extract_count,project_code,project_name,tenderee,bidding_budget,win_tenderer,win_bid_price,agency,web_source_no):
  284. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count,
  285. "project_code":project_code,"project_name":project_name,"tenderee":tenderee,
  286. "bidding_budget":bidding_budget,"win_tenderer":win_tenderer,"win_bid_price":win_bid_price,
  287. "agency":agency,"web_source_no":web_source_no})
  288. def merge(self, buffer, pbuffer):
  289. buffer[0].extend(pbuffer[0])
  290. def terminate(self, buffer):
  291. list_docs = buffer[0]
  292. list_timeGroups = split_with_time(list_docs,"page_time_stamp",86400*2)
  293. list_group = []
  294. empty_key = ["project_code","bidding_budget","win_tenderer","win_bid_price","agency"]
  295. for _timeGroups in list_timeGroups:
  296. list_empty = []
  297. list_notEmpty = []
  298. for _item in _timeGroups:
  299. empty_flag = True
  300. for _key in empty_key:
  301. if not isEmpty(_item[_key]):
  302. empty_flag = False
  303. break
  304. if empty_flag:
  305. list_empty.append(_item)
  306. else:
  307. list_notEmpty.append(_item)
  308. for _e in list_empty:
  309. _group = [{"docid":_e["docid"],"extract_count":_e["extract_count"]}]
  310. _e_tenderee = _e["tenderee"]
  311. for _ne in list_notEmpty:
  312. if "set_webSource" not in _ne:
  313. _ne["set_webSource"] = set()
  314. _ne["set_webSource"].add(_ne["web_source_no"])
  315. _suit = False
  316. if not isEmpty(_e_tenderee) and _e_tenderee==_ne["tenderee"]:
  317. _suit = True
  318. elif isEmpty(_e_tenderee):
  319. _suit = True
  320. if _suit:
  321. if _e["web_source_no"] not in _ne["set_webSource"]:
  322. _ne["set_webSource"].add(_e["web_source_no"])
  323. _group.append({"docid":_ne["docid"],"extract_count":_ne["extract_count"]})
  324. break
  325. if len(_group)>1:
  326. list_group.append(_group)
  327. return json.dumps(list_group)
  328. def split_with_time(list_dict,sort_key,timedelta=86400*2):
  329. if len(list_dict)>0:
  330. if sort_key in list_dict[0]:
  331. list_dict.sort(key=lambda x:x[sort_key])
  332. list_group = []
  333. _begin = 0
  334. for i in range(len(list_dict)-1):
  335. if abs(list_dict[i][sort_key]-list_dict[i+1][sort_key])<timedelta:
  336. continue
  337. else:
  338. _group = []
  339. for j in range(_begin,i+1):
  340. _group.append(list_dict[j])
  341. if len(_group)>1:
  342. list_group.append(_group)
  343. _begin = i + 1
  344. if len(list_dict)>1:
  345. _group = []
  346. for j in range(_begin,len(list_dict)):
  347. _group.append(list_dict[j])
  348. if len(_group)>1:
  349. list_group.append(_group)
  350. return list_group
  351. return [list_dict]
  352. @annotate('bigint,bigint,bigint,string,string,string,string,string->string')
  353. class f_set_docid_limitNum_contain(BaseUDAF):
  354. '''
  355. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同
  356. '''
  357. def __init__(self):
  358. import logging
  359. import json,re
  360. global json,logging,re
  361. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  362. def new_buffer(self):
  363. return [list()]
  364. def iterate(self, buffer,docid,page_time_stamp,extract_count,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column):
  365. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count,"set_limit_column1":set_limit_column1,
  366. "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,
  367. "contain_column":contain_column})
  368. def merge(self, buffer, pbuffer):
  369. buffer[0].extend(pbuffer[0])
  370. def terminate(self, buffer):
  371. list_split = split_with_time(buffer[0],"page_time_stamp")
  372. list_group = []
  373. for _split in list_split:
  374. flag = True
  375. keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"]
  376. for _key in keys:
  377. logging.info(_key+str(getSet(_split,_key)))
  378. if len(getSet(_split,_key))>1:
  379. flag = False
  380. break
  381. MAX_CONTAIN_COLUMN = None
  382. #判断组内每条公告是否包含
  383. if flag:
  384. for _d in _split:
  385. contain_column = _d["contain_column"]
  386. if contain_column is not None and contain_column !="":
  387. if MAX_CONTAIN_COLUMN is None:
  388. MAX_CONTAIN_COLUMN = contain_column
  389. else:
  390. if len(MAX_CONTAIN_COLUMN)<len(contain_column):
  391. if contain_column.find(MAX_CONTAIN_COLUMN)==-1:
  392. flag = False
  393. break
  394. MAX_CONTAIN_COLUMN = contain_column
  395. else:
  396. if MAX_CONTAIN_COLUMN.find(contain_column)==-1:
  397. flag = False
  398. break
  399. if flag:
  400. if len(_split)>1:
  401. _group = []
  402. for _item in _split:
  403. _group.append({"docid":_item["docid"],"extract_count":_item["extract_count"]})
  404. list_group.append(_group)
  405. return json.dumps(list_group)
  406. @annotate('bigint->string')
  407. class f_stamp_squence(BaseUDAF):
  408. '''
  409. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  410. '''
  411. def __init__(self):
  412. import json
  413. global json
  414. import logging
  415. global logging
  416. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  417. def new_buffer(self):
  418. return [set()]
  419. def iterate(self, buffer,page_time_stamp):
  420. buffer[0].add(page_time_stamp)
  421. def merge(self, buffer, pbuffer):
  422. buffer[0] |= pbuffer[0]
  423. def terminate(self, buffer):
  424. if 0 in buffer[0]:
  425. buffer[0].remove(0)
  426. list_stamp = list(buffer[0])
  427. list_stamp.sort(key=lambda x:x)
  428. list_stamp_final = []
  429. _begin = 0
  430. _time_decase = 86400*2
  431. logging.info(str(list_stamp))
  432. for _index in range(len(list_stamp)-1):
  433. if list_stamp[_index+1]-list_stamp[_index]<_time_decase:
  434. continue
  435. else:
  436. list_stamp_final.append([list_stamp[_begin]-_time_decase,list_stamp[_index]+_time_decase])
  437. _begin = _index+1
  438. if len(list_stamp)>0:
  439. list_stamp_final.append([list_stamp[_begin]-_time_decase,list_stamp[-1]+_time_decase])
  440. return json.dumps(list_stamp_final)
  441. @annotate("bigint,string->bigint")
  442. class in_stamp(object):
  443. def __init__(self):
  444. import logging
  445. import re
  446. import json
  447. global logging,re,json
  448. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  449. def evaluate(self, page_time_stamp,json_stamp):
  450. list_stamp = json.loads(json_stamp)
  451. int_flag = 0
  452. for item in list_stamp:
  453. if page_time_stamp <item[0]:
  454. break
  455. if page_time_stamp>item[0] and page_time_stamp<item[1]:
  456. int_flag = 1
  457. break
  458. return int_flag
  459. def getConfidence(rule_id):
  460. if rule_id ==0:
  461. return 30
  462. elif rule_id >=1 and rule_id <30:
  463. return 20
  464. else:
  465. return 10
  466. @annotate('string,string -> string')
  467. class f_splitStr(BaseUDTF):
  468. '''
  469. 将多个组拆解成多条记录
  470. '''
  471. def __init__(self):
  472. import logging
  473. import json
  474. global json,logging
  475. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  476. def process(self, str_split,_split):
  477. try:
  478. for _s in str_split.split(_split):
  479. self.forward(_s)
  480. except Exception as e:
  481. pass
  482. @annotate('string,bigint -> bigint,bigint,bigint,bigint,bigint')
  483. class f_split_group_single(BaseUDTF):
  484. '''
  485. 将多个组拆解成多条记录
  486. '''
  487. def __init__(self):
  488. import logging
  489. import json
  490. global json,logging
  491. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  492. def process(self, json_set_docid,rule_id):
  493. list_group = json.loads(json_set_docid)
  494. for item in list_group:
  495. if len(item)>100:
  496. item.sort(key=lambda x:x["docid"],reverse=True)
  497. index_i = 0
  498. for index_j in range(1,len(item)):
  499. if item[index_i]["docid"]!=item[index_j]["docid"]:
  500. self.forward(item[index_i]["docid"],item[index_j]["docid"],item[index_i]["extract_count"],item[index_j]["extract_count"],getConfidence(rule_id))
  501. else:
  502. for index_i in range(len(item)):
  503. for index_j in range(len(item)):
  504. if index_i!=index_j and item[index_i]["docid"]!=item[index_j]["docid"]:
  505. self.forward(item[index_i]["docid"],item[index_j]["docid"],item[index_i]["extract_count"],item[index_j]["extract_count"],getConfidence(rule_id))
  506. @annotate('bigint,string->string')
  507. class group_document(BaseUDAF):
  508. '''
  509. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  510. '''
  511. def __init__(self):
  512. import json
  513. global json
  514. def new_buffer(self):
  515. return [[]]
  516. def iterate(self, buffer,id,json_set_docid):
  517. buffer[0].append({"id":id,"json_set_docid":json.loads(json_set_docid)})
  518. def merge(self, buffer, pbuffer):
  519. buffer[0].extend(pbuffer[0])
  520. def terminate(self, buffer):
  521. return json.dumps(buffer[0])
  522. @annotate('bigint,string,bigint,string -> bigint,bigint,string')
  523. class decare_document(BaseUDTF):
  524. '''
  525. 将多个组拆解成多条记录
  526. '''
  527. def __init__(self):
  528. import logging
  529. import json
  530. global json,logging
  531. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  532. def process(self,group_id1, json_list_doc1,group_id2,json_list_doc2):
  533. #y=x,少掉近一半的数据
  534. if group_id1>=group_id2:
  535. list_doc1 = json.loads(json_list_doc1)
  536. list_doc2 = json.loads(json_list_doc2)
  537. for _doc1 in list_doc1:
  538. for _doc2 in list_doc2:
  539. #同一个重复group不做判断
  540. if _doc1["id"]!=_doc2["id"]:
  541. #判断两个group是否有重复
  542. _set1 = set()
  543. for _item1 in _doc1["json_set_docid"]:
  544. _set1.add(_item1["docid"])
  545. _set2 = set()
  546. for _item2 in _doc2["json_set_docid"]:
  547. _set2.add(_item2["docid"])
  548. if len(_set1&_set2)>0:
  549. new_json_set_docid = _doc1["json_set_docid"]
  550. for _item2 in _doc2["json_set_docid"]:
  551. if _item2["docid"] not in _set1:
  552. new_json_set_docid.append(_item2)
  553. self.forward(_doc1["id"],_doc2["id"],json.dumps(new_json_set_docid))
  554. def getBestDocid(list_pair):
  555. list_pair.sort(key=lambda x:x[3],reverse=True)
  556. _max_count = max(list_pair[0][3],list_pair[0][1])
  557. set_candidate = set()
  558. if list_pair[0][1]==_max_count:
  559. set_candidate.add(list_pair[0][0])
  560. for item in list_pair:
  561. if item[3]==_max_count:
  562. set_candidate.add(item[2])
  563. else:
  564. break
  565. list_candidate = list(set_candidate)
  566. list_candidate.sort(key=lambda x:x)
  567. return list_candidate[0]
  568. @annotate('bigint,bigint,bigint,bigint->string')
  569. class choose_document(BaseUDAF):
  570. '''
  571. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  572. '''
  573. def __init__(self):
  574. import json
  575. global json
  576. def new_buffer(self):
  577. return [[]]
  578. def iterate(self, buffer,docid1,extract_count1,docid2,extract_count2):
  579. buffer[0].append([docid1,extract_count1,docid2,extract_count2])
  580. def merge(self, buffer, pbuffer):
  581. buffer[0].extend(pbuffer[0])
  582. def terminate(self, buffer):
  583. list_pair = buffer[0]
  584. _set = set()
  585. for item in buffer[0]:
  586. _set.add(str(item[2]))
  587. list_dumplicate = list(_set)
  588. best_docid = getBestDocid(list_pair)
  589. if best_docid==list_pair[0][0]:
  590. save_flag = 1
  591. else:
  592. save_flag = 0
  593. return json.dumps({"save_flag":save_flag,"dumplicates":list_dumplicate})
  594. @annotate('string -> bigint,string')
  595. class f_get_choose_document(BaseUDTF):
  596. '''
  597. 将多个组拆解成多条记录
  598. '''
  599. def __init__(self):
  600. import logging
  601. import json
  602. global json,logging
  603. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  604. def process(self,json_choose):
  605. if json_choose is None:
  606. self.forward(1,None)
  607. else:
  608. _choose = json.loads(json_choose)
  609. self.forward(_choose["save_flag"],",".join(_choose["dumplicates"]))
  610. @annotate('bigint,bigint,bigint,bigint->string')
  611. class group_document_bestFirst(BaseUDAF):
  612. '''
  613. 将组里面最优的放在前面
  614. '''
  615. def __init__(self):
  616. import json
  617. global json
  618. def new_buffer(self):
  619. return [[]]
  620. def iterate(self, buffer,docid1,extract_count1,docid2,extract_count2):
  621. buffer[0].append([docid1,extract_count1,docid2,extract_count2])
  622. def merge(self, buffer, pbuffer):
  623. buffer[0].extend(pbuffer[0])
  624. def terminate(self, buffer):
  625. list_pair = buffer[0]
  626. _set = set()
  627. for item in buffer[0]:
  628. _set.add(item[2])
  629. _set.add(list_pair[0][0])
  630. best_docid = getBestDocid(list_pair)
  631. _set.remove(best_docid)
  632. list_dumplicate = list(_set)
  633. list_dumplicate.sort(key=lambda x:x)
  634. list_dumplicate.insert(0,best_docid)
  635. list_dumplicate_str = []
  636. for item in list_dumplicate:
  637. list_dumplicate_str.append(str(item))
  638. return ",".join(list_dumplicate_str)
  639. @annotate('string -> bigint,string')
  640. class f_get_best_dumplicates(BaseUDTF):
  641. '''
  642. 得到每个分组中最优的那一条及其重复记录
  643. '''
  644. def __init__(self):
  645. import logging
  646. import json
  647. global json,logging
  648. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  649. def process(self,list_dumplicate_str):
  650. if list_dumplicate_str is None:
  651. pass
  652. else:
  653. list_dumplicate = list_dumplicate_str.split(",")
  654. if len(list_dumplicate)>0:
  655. self.forward(int(list_dumplicate[0]),",".join(list_dumplicate[1:]))
  656. else:
  657. pass
  658. @annotate('bigint,bigint->string')
  659. class bridge2group(BaseUDAF):
  660. '''
  661. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  662. '''
  663. def __init__(self):
  664. import json
  665. global json
  666. def new_buffer(self):
  667. return [set()]
  668. def iterate(self, buffer,docid1,docid2):
  669. buffer[0].add(docid1)
  670. buffer[0].add(docid2)
  671. def merge(self, buffer, pbuffer):
  672. buffer[0] |= pbuffer[0]
  673. def terminate(self, buffer):
  674. list_pair = list(buffer[0])
  675. list_pair.sort(key=lambda x:x,reverse=True)
  676. return json.dumps(list_pair)
  677. @annotate('string -> bigint,bigint')
  678. class group2bridge(BaseUDTF):
  679. '''
  680. 将多个组拆解成多条记录
  681. '''
  682. def __init__(self):
  683. import logging
  684. import json
  685. global json,logging
  686. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  687. def process(self,json_list_docid):
  688. list_docid = json.loads(json_list_docid)
  689. for _docid in list_docid:
  690. self.forward(list_docid[-1],_docid)
  691. @annotate('bigint,bigint,string -> bigint')
  692. class f_get_dump_docid(BaseUDTF):
  693. '''
  694. 将多个组拆解成多条记录
  695. '''
  696. def __init__(self):
  697. import logging
  698. import json
  699. global json,logging
  700. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  701. def process(self,docid,save_flag,dumplicates):
  702. if save_flag==0:
  703. self.forward(docid)
  704. if dumplicates is not None:
  705. list_docid = dumplicates.split(",")
  706. if len(list_docid)>0:
  707. for _docid in list_docid[1:]:
  708. self.forward(int(_docid))
  709. else:
  710. if dumplicates is not None:
  711. list_docid = dumplicates.split(",")
  712. if len(list_docid)>0:
  713. for _docid in list_docid:
  714. self.forward(int(_docid))
  715. @annotate('string -> bigint,bigint')
  716. class f_get_docid(BaseUDTF):
  717. '''
  718. 将多个组拆解成多条记录
  719. '''
  720. def __init__(self):
  721. import logging
  722. import json
  723. global json,logging
  724. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  725. def process(self,json_set_docid):
  726. team_id = 0
  727. if json_set_docid is not None:
  728. list_docses = json.loads(json_set_docid)
  729. for list_docs in list_docses:
  730. team_id += 1
  731. for item in list_docs:
  732. self.forward(team_id,item["docid"])
  733. @annotate("string->bigint")
  734. class get_count_dump(object):
  735. def __init__(self):
  736. import logging
  737. import re
  738. global logging,re
  739. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  740. def evaluate(self, title):
  741. _count = 0
  742. if title is not None:
  743. _count = len(title.split(","))
  744. return _count
  745. def getSet(list_dict,key):
  746. _set = set()
  747. for item in list_dict:
  748. if key in item:
  749. if item[key]!='' and item[key] is not None:
  750. if re.search("^\d[\d\.]*$",item[key]) is not None:
  751. _set.add(str(float(item[key])))
  752. else:
  753. _set.add(str(item[key]))
  754. return _set
  755. def getDiffIndex(list_dict,key,confidence=100):
  756. _set = set()
  757. for _i in range(len(list_dict)):
  758. item = list_dict[_i]
  759. if item["confidence"]>=confidence:
  760. continue
  761. if key in item:
  762. if item[key]!='' and item[key] is not None:
  763. if re.search("^\d+(\.\d+)?$",item[key]) is not None:
  764. _set.add(str(float(item[key])))
  765. else:
  766. _set.add(str(item[key]))
  767. if len(_set)>1:
  768. return _i
  769. return len(list_dict)
  770. @annotate('bigint,string -> bigint,bigint')
  771. class f_getGroup_dumpFinal(BaseUDTF):
  772. '''
  773. 从最后的结果中获取组
  774. '''
  775. def __init__(self):
  776. import logging
  777. import json
  778. global json,logging
  779. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  780. def process(self,docid,dumplicates):
  781. self.forward(int(docid),int(docid))
  782. if dumplicates is not None:
  783. list_docids = dumplicates.split(",")
  784. for _docid in list_docids:
  785. self.forward(int(docid),int(_docid))
  786. @annotate('bigint,bigint,string,string,string,string,bigint,bigint,bigint->string')
  787. class f_redump_limit_num(BaseUDAF):
  788. '''
  789. 去重合并后重新判断,组内个数大于5时,dottitle、tenderee、win_tenderer、bidding_budget组内只能有一个取值
  790. 组内个数小于等于5时,tenderee、win_tenderer、bidding_budget组内只能有一个取值
  791. '''
  792. def __init__(self):
  793. import logging
  794. import json,re
  795. global json,logging,re
  796. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  797. def new_buffer(self):
  798. return [list()]
  799. def iterate(self, buffer,main_docid,docid,doctitle,set_limit_column2,set_limit_column3,set_limit_column4,extract_count1,extract_count2,confidence):
  800. buffer[0].append({"main_docid":main_docid,"docid":docid,"doctitle":doctitle,"set_limit_column2":set_limit_column2,
  801. "set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,"extract_count1":extract_count1,
  802. "extract_count2":extract_count2,"confidence":confidence})
  803. def merge(self, buffer, pbuffer):
  804. buffer[0].extend(pbuffer[0])
  805. def terminate(self, buffer):
  806. list_group = []
  807. the_group = buffer[0]
  808. the_group.sort(key=lambda x:x["confidence"],reverse=True)
  809. if len(the_group)>5:
  810. keys = ["doctitle","set_limit_column2","set_limit_column3","set_limit_column4"]
  811. else:
  812. keys = ["set_limit_column2","set_limit_column3","set_limit_column4"]
  813. final_group = []
  814. #置信度
  815. list_key_index = []
  816. for _k in keys:
  817. if _k=="doctitle":
  818. list_key_index.append(getDiffIndex(the_group,_k,confidence=30))
  819. else:
  820. list_key_index.append(getDiffIndex(the_group,_k))
  821. _index = min(list_key_index)
  822. if _index>1:
  823. main_docid = the_group[0]["main_docid"]
  824. for item in the_group[:_index]:
  825. if item["docid"]!=main_docid:
  826. final_group.append({"docid1":main_docid,"docid2":item["docid"],"extract_count1":item["extract_count1"],"extract_count2":item["extract_count2"],"confidence":item["confidence"]})
  827. # stay = True
  828. # for _key in keys:
  829. # if len(getSet(the_group,_key))>1:
  830. # stay = False
  831. # break
  832. #
  833. # if stay:
  834. # main_docid = the_group[0]["main_docid"]
  835. # for item in the_group:
  836. # if item["docid"]!=main_docid:
  837. # final_group.append({"docid1":main_docid,"docid2":item["docid"],"extract_count1":item["extract_count1"],"extract_count2":item["extract_count2"]})
  838. return json.dumps(final_group)
  839. @annotate('string -> bigint,bigint,bigint,bigint,bigint')
  840. class f_get_dumpFinal_checked(BaseUDTF):
  841. '''
  842. 从最后的结果中获取组
  843. '''
  844. def __init__(self):
  845. import logging
  846. import json
  847. global json,logging
  848. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  849. def process(self,list_group):
  850. if list_group is not None:
  851. final_group = json.loads(list_group)
  852. for _group in final_group:
  853. self.forward(_group["docid1"],_group["docid2"],_group["extract_count1"],_group["extract_count2"],_group["confidence"])
  854. @annotate('string -> bigint')
  855. class f_getDumplicateDocids(BaseUDTF):
  856. '''
  857. 从最后的结果中获取组
  858. '''
  859. def __init__(self):
  860. import logging
  861. import json
  862. global json,logging
  863. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  864. def process(self,dumplicates):
  865. list_docids = dumplicates.split(",")
  866. for _d in list_docids:
  867. self.forward(int(_d))