documentDumplicate.py 39 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052
  1. #coding:UTF8
  2. from odps.udf import annotate
  3. from odps.udf import BaseUDTF
  4. from odps.udf import BaseUDAF
  5. @annotate('string,string -> string,bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string')
  6. class f_decode_extract(BaseUDTF):
  7. def __init__(self):
  8. import logging
  9. import json
  10. import time,re
  11. global json,logging,time,re
  12. self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*"
  13. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  14. self.dict_channel = {"公告变更":51,
  15. "招标公告":52,
  16. "中标信息":101,
  17. "招标预告":102,
  18. "招标答疑":103,
  19. "资审结果":105,
  20. "法律法规":106,
  21. "新闻资讯":107,
  22. "采购意向":114,
  23. "拍卖出让":115,
  24. "土地矿产":116,
  25. "产权交易":117,
  26. "废标公告":118,
  27. "候选人公示":119,
  28. "合同公告":120}
  29. def process(self, extractjson,otherjson):
  30. if extractjson is not None:
  31. _extract = json.loads(extractjson)
  32. else:
  33. _extract = {}
  34. if otherjson is not None:
  35. _other = json.loads(otherjson)
  36. else:
  37. _other = {}
  38. project_code = ""
  39. project_name = ""
  40. tenderee = ""
  41. agency = ""
  42. win_tenderer = ""
  43. bidding_budget = ""
  44. win_bid_price = ""
  45. fingerprint = ""
  46. page_time_stamp = 0
  47. docchannel = 0
  48. extract_count = 0
  49. page_time = _other.get("pageTime",time.strftime('%Y-%m-%d',time.localtime()))
  50. doctitle = _other.get("doctitle","")
  51. doctitle_refine = re.sub(r'工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|合同', '', doctitle)
  52. area = _other.get("area","")
  53. province = _other.get("province","")
  54. city = _other.get("city","")
  55. district = _other.get("district","")
  56. web_source_no = _other.get("webSourceNo","")
  57. time_bidclose = _extract.get("time_bidclose")
  58. time_bidopen = _extract.get("time_bidopen")
  59. time_bidstart = _extract.get("time_bidstart")
  60. time_commencement = _extract.get("time_commencement")
  61. time_completion = _extract.get("time_completion")
  62. time_earnest_money_end = _extract.get("time_earnestMoneyEnd")
  63. time_earnest_money_start = _extract.get("time_earnestMoneyStart")
  64. time_get_file_end = _extract.get("time_getFileEnd")
  65. time_get_file_start = _extract.get("time_getFileStart")
  66. time_publicity_end = _extract.get("time_publicityEnd")
  67. time_publicity_start = _extract.get("time_publicityStart")
  68. time_registration_end = _extract.get("time_registrationEnd")
  69. time_registration_start = _extract.get("time_registrationStart")
  70. time_release = _extract.get("time_release")
  71. # docchannel = _other.get("docchannel",0)
  72. docchannel = self.dict_channel.get(_extract.get("docchannel",""),0)
  73. if re.search(self.time_pattern,page_time) is not None:
  74. try:
  75. timeArray = time.strptime(page_time[:11], "%Y-%m-%d")
  76. page_time_stamp = int(time.mktime(timeArray))
  77. except Exception as e:
  78. pass
  79. list_code = _extract.get("code",[])
  80. if len(list_code)>0:
  81. project_code = list_code[0]
  82. project_name = _extract.get("name","")
  83. fingerprint = _extract.get("fingerprint","")
  84. dict_pack = _extract.get("prem",{})
  85. logging.info(dict_pack)
  86. for _key in dict_pack.keys():
  87. if dict_pack[_key]["tendereeMoney"]!='' and float(dict_pack[_key]["tendereeMoney"])>0:
  88. extract_count += 1
  89. if bidding_budget=="":
  90. bidding_budget = str(float(dict_pack[_key]["tendereeMoney"]))
  91. for _role in dict_pack[_key]["roleList"]:
  92. extract_count += 1
  93. if _role[2]!='' and float(_role[2])>0:
  94. extract_count += 1
  95. if _role[0]=="tenderee":
  96. tenderee = _role[1]
  97. if _role[0]=="win_tenderer":
  98. if win_tenderer=="":
  99. win_tenderer = _role[1]
  100. if _role[2]!='' and float(_role[2])>0:
  101. extract_count += 1
  102. if win_bid_price=="":
  103. win_bid_price = str(float(_role[2]))
  104. if _role[0]=="agency":
  105. agency = _role[1]
  106. if project_code!="":
  107. extract_count += 1
  108. if project_name!="":
  109. extract_count += 1
  110. logging.info(page_time+doctitle+doctitle_refine+area+province+city+
  111. district+web_source_no+project_code+project_name+tenderee+agency+win_tenderer+bidding_budget+win_bid_price)
  112. self.forward(page_time,page_time_stamp,docchannel,doctitle,doctitle_refine,area,province,city,
  113. district,web_source_no,fingerprint,project_code,project_name,tenderee,agency,win_tenderer,bidding_budget,win_bid_price,extract_count,
  114. time_bidclose,time_bidopen,time_bidstart,time_commencement,time_completion,time_earnest_money_end,time_earnest_money_start,
  115. time_get_file_end,time_get_file_start,time_publicity_end,time_publicity_start,time_registration_end,time_registration_start,time_release)
  116. @annotate("string->bigint")
  117. class f_get_extractCount(object):
  118. def __init__(self):
  119. import time
  120. global time
  121. import logging
  122. import json
  123. import re
  124. global json,logging,re
  125. self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*"
  126. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  127. def evaluate(self, extractjson):
  128. if extractjson is not None:
  129. _extract = json.loads(extractjson)
  130. else:
  131. _extract = {}
  132. dict_pack = _extract.get("prem",{})
  133. extract_count = 0
  134. list_code = _extract.get("code",[])
  135. if len(list_code)>0:
  136. project_code = list_code[0]
  137. else:
  138. project_code = ""
  139. project_name = _extract.get("name","")
  140. bidding_budget = ""
  141. win_tenderer = ""
  142. win_bid_price = ""
  143. for _key in dict_pack.keys():
  144. if dict_pack[_key]["tendereeMoney"]!='' and float(dict_pack[_key]["tendereeMoney"])>0:
  145. extract_count += 1
  146. if bidding_budget=="":
  147. bidding_budget = str(float(dict_pack[_key]["tendereeMoney"]))
  148. for _role in dict_pack[_key]["roleList"]:
  149. extract_count += 1
  150. if _role[2]!='' and float(_role[2])>0:
  151. extract_count += 1
  152. if _role[0]=="tenderee":
  153. tenderee = _role[1]
  154. if _role[0]=="win_tenderer":
  155. if win_tenderer=="":
  156. win_tenderer = _role[1]
  157. if _role[2]!='' and float(_role[2])>0:
  158. extract_count += 1
  159. if win_bid_price=="":
  160. win_bid_price = str(float(_role[2]))
  161. if _role[0]=="agency":
  162. agency = _role[1]
  163. if project_code!="":
  164. extract_count += 1
  165. if project_name!="":
  166. extract_count += 1
  167. return extract_count
  168. @annotate('string,string,string,string,string -> string,string,string,bigint')
  169. class f_decode_sub_docs_json(BaseUDTF):
  170. def __init__(self):
  171. import logging
  172. import json
  173. global json,logging
  174. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  175. def process(self, project_code,project_name,tenderee,agency,sub_docs_json):
  176. columns = {"win_tenderer":"","bidding_budget":"","win_bid_price":""}
  177. extract_count = 0
  178. if project_code is not None and project_code!="":
  179. extract_count += 1
  180. if project_name is not None and project_name!="":
  181. extract_count += 1
  182. if tenderee is not None and tenderee!="":
  183. extract_count += 1
  184. if agency is not None and agency!="":
  185. extract_count += 1
  186. if sub_docs_json is not None:
  187. for sub_docs in json.loads(sub_docs_json):
  188. for _key_sub_docs in sub_docs.keys():
  189. extract_count += 1
  190. if _key_sub_docs in columns:
  191. if columns[_key_sub_docs]=="" and str(sub_docs[_key_sub_docs]) not in ["","0"]:
  192. if _key_sub_docs in ["bidding_budget","win_bid_price"]:
  193. if float(sub_docs[_key_sub_docs])>0:
  194. columns[_key_sub_docs] = str(float(sub_docs[_key_sub_docs]))
  195. else:
  196. columns[_key_sub_docs] = str(sub_docs[_key_sub_docs])
  197. self.forward(columns["win_tenderer"],columns["bidding_budget"],columns["win_bid_price"],extract_count)
  198. @annotate("string->bigint")
  199. class totimestamp(object):
  200. def __init__(self):
  201. import time
  202. global time
  203. import logging
  204. import json
  205. import re
  206. global json,logging,re
  207. self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*"
  208. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  209. def evaluate(self, str_time):
  210. try:
  211. logging.info(str_time)
  212. if str_time is not None and re.search(self.time_pattern,str_time) is not None:
  213. timeArray = time.strptime(str_time[:10], "%Y-%m-%d")
  214. timeStamp = int(time.mktime(timeArray))
  215. return timeStamp
  216. else:
  217. return 0
  218. except Exception as e:
  219. return 0
  220. @annotate("string->string")
  221. class refind_name(object):
  222. def __init__(self):
  223. import logging
  224. import re
  225. global logging,re
  226. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  227. def evaluate(self, title):
  228. if title is not None:
  229. return re.sub(r'工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|\[|\]|【|】', '', title)
  230. return ""
  231. @annotate('bigint,bigint,bigint,string,bigint,string->string')
  232. class f_set_docid(BaseUDAF):
  233. '''
  234. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  235. '''
  236. def __init__(self):
  237. import json
  238. global json
  239. def new_buffer(self):
  240. return [[]]
  241. def iterate(self, buffer,docid, page_time_stamp,extract_count,defind_column,defind_count,tenderee):
  242. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count,
  243. "defind_column":defind_column,"defind_count":defind_count,"tenderee":tenderee})
  244. def merge(self, buffer, pbuffer):
  245. buffer[0].extend(pbuffer[0])
  246. def terminate(self, buffer):
  247. list_docs = buffer[0]
  248. list_docs.sort(key=lambda x:x["page_time_stamp"])
  249. list_group = []
  250. _begin = 0
  251. defind_count = 0
  252. if len(list_docs)>0:
  253. defind_count = list_docs[0]["defind_count"]
  254. for i in range(len(list_docs)-1):
  255. if abs(list_docs[i]["page_time_stamp"]-list_docs[i+1]["page_time_stamp"])<=86400*2:
  256. continue
  257. else:
  258. _group = []
  259. _set_column = set()
  260. _set_tenderee = set()
  261. for j in range(_begin,i+1):
  262. if list_docs[j]["tenderee"] is not None and list_docs[j]["tenderee"]!="":
  263. _set_tenderee.add(list_docs[j]["tenderee"])
  264. _set_column.add(list_docs[j]["defind_column"])
  265. _group.append({"docid":list_docs[j]["docid"],"extract_count":list_docs[j]["extract_count"]})
  266. if len(_group)>=3 and len(_set_tenderee)>1:
  267. pass
  268. else:
  269. if len(_group)>1:
  270. if defind_count==2:
  271. if len(_set_column)>=2:
  272. list_group.append(_group)
  273. elif defind_count==1:
  274. if len(_set_column)==1:
  275. list_group.append(_group)
  276. elif defind_count==0:
  277. list_group.append(_group)
  278. _begin = i+1
  279. if len(list_docs)>1:
  280. _set_column = set()
  281. _set_tenderee = set()
  282. _group = []
  283. for j in range(_begin,len(list_docs)):
  284. if list_docs[j]["tenderee"] is not None and list_docs[j]["tenderee"]!="":
  285. _set_tenderee.add(list_docs[j]["tenderee"])
  286. _set_column.add(list_docs[j]["defind_column"])
  287. _group.append({"docid":list_docs[j]["docid"],"extract_count":list_docs[j]["extract_count"]})
  288. if len(_group)>=3 and len(_set_tenderee)>1:
  289. pass
  290. else:
  291. if len(_group)>1:
  292. if defind_count==2:
  293. if len(_set_column)>=2:
  294. list_group.append(_group)
  295. elif defind_count==1:
  296. if len(_set_column)==1:
  297. list_group.append(_group)
  298. elif defind_count==0:
  299. list_group.append(_group)
  300. return json.dumps(list_group)
  301. def isEmpty(_str):
  302. if _str is None or _str=="":
  303. return True
  304. return False
  305. @annotate('bigint,bigint,bigint,string,string,string,string,string,string,string,string->string')
  306. class f_set_docid_binaryChart(BaseUDAF):
  307. '''
  308. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  309. '''
  310. def __init__(self):
  311. import json
  312. global json
  313. def new_buffer(self):
  314. return [[]]
  315. def iterate(self, buffer,docid, page_time_stamp,extract_count,project_code,project_name,tenderee,bidding_budget,win_tenderer,win_bid_price,agency,web_source_no):
  316. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count,
  317. "project_code":project_code,"project_name":project_name,"tenderee":tenderee,
  318. "bidding_budget":bidding_budget,"win_tenderer":win_tenderer,"win_bid_price":win_bid_price,
  319. "agency":agency,"web_source_no":web_source_no})
  320. def merge(self, buffer, pbuffer):
  321. buffer[0].extend(pbuffer[0])
  322. def terminate(self, buffer):
  323. list_docs = buffer[0]
  324. list_timeGroups = split_with_time(list_docs,"page_time_stamp",86400*2)
  325. list_group = []
  326. empty_key = ["project_code","bidding_budget","win_tenderer","win_bid_price","agency"]
  327. for _timeGroups in list_timeGroups:
  328. list_empty = []
  329. list_notEmpty = []
  330. for _item in _timeGroups:
  331. empty_flag = True
  332. for _key in empty_key:
  333. if not isEmpty(_item[_key]):
  334. empty_flag = False
  335. break
  336. if empty_flag:
  337. list_empty.append(_item)
  338. else:
  339. list_notEmpty.append(_item)
  340. for _e in list_empty:
  341. _group = [{"docid":_e["docid"],"extract_count":_e["extract_count"]}]
  342. _e_tenderee = _e["tenderee"]
  343. for _ne in list_notEmpty:
  344. if "set_webSource" not in _ne:
  345. _ne["set_webSource"] = set()
  346. _ne["set_webSource"].add(_ne["web_source_no"])
  347. _suit = False
  348. if not isEmpty(_e_tenderee) and _e_tenderee==_ne["tenderee"]:
  349. _suit = True
  350. elif isEmpty(_e_tenderee):
  351. _suit = True
  352. if _suit:
  353. if _e["web_source_no"] not in _ne["set_webSource"]:
  354. _ne["set_webSource"].add(_e["web_source_no"])
  355. _group.append({"docid":_ne["docid"],"extract_count":_ne["extract_count"]})
  356. break
  357. if len(_group)>1:
  358. list_group.append(_group)
  359. return json.dumps(list_group)
  360. def split_with_time(list_dict,sort_key,timedelta=86400*2):
  361. if len(list_dict)>0:
  362. if sort_key in list_dict[0]:
  363. list_dict.sort(key=lambda x:x[sort_key])
  364. list_group = []
  365. _begin = 0
  366. for i in range(len(list_dict)-1):
  367. if abs(list_dict[i][sort_key]-list_dict[i+1][sort_key])<timedelta:
  368. continue
  369. else:
  370. _group = []
  371. for j in range(_begin,i+1):
  372. _group.append(list_dict[j])
  373. if len(_group)>1:
  374. list_group.append(_group)
  375. _begin = i + 1
  376. if len(list_dict)>1:
  377. _group = []
  378. for j in range(_begin,len(list_dict)):
  379. _group.append(list_dict[j])
  380. if len(_group)>1:
  381. list_group.append(_group)
  382. return list_group
  383. return [list_dict]
  384. @annotate('bigint,bigint,bigint,string,string,string,string,string->string')
  385. class f_set_docid_limitNum_contain(BaseUDAF):
  386. '''
  387. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同
  388. '''
  389. def __init__(self):
  390. import logging
  391. import json,re
  392. global json,logging,re
  393. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  394. def new_buffer(self):
  395. return [list()]
  396. def iterate(self, buffer,docid,page_time_stamp,extract_count,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column):
  397. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count,"set_limit_column1":set_limit_column1,
  398. "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,
  399. "contain_column":contain_column})
  400. def merge(self, buffer, pbuffer):
  401. buffer[0].extend(pbuffer[0])
  402. def terminate(self, buffer):
  403. list_split = split_with_time(buffer[0],"page_time_stamp")
  404. list_group = []
  405. for _split in list_split:
  406. flag = True
  407. keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"]
  408. for _key in keys:
  409. logging.info(_key+str(getSet(_split,_key)))
  410. if len(getSet(_split,_key))>1:
  411. flag = False
  412. break
  413. MAX_CONTAIN_COLUMN = None
  414. #判断组内每条公告是否包含
  415. if flag:
  416. for _d in _split:
  417. contain_column = _d["contain_column"]
  418. if contain_column is not None and contain_column !="":
  419. if MAX_CONTAIN_COLUMN is None:
  420. MAX_CONTAIN_COLUMN = contain_column
  421. else:
  422. if len(MAX_CONTAIN_COLUMN)<len(contain_column):
  423. if contain_column.find(MAX_CONTAIN_COLUMN)==-1:
  424. flag = False
  425. break
  426. MAX_CONTAIN_COLUMN = contain_column
  427. else:
  428. if MAX_CONTAIN_COLUMN.find(contain_column)==-1:
  429. flag = False
  430. break
  431. if flag:
  432. if len(_split)>1:
  433. _group = []
  434. for _item in _split:
  435. _group.append({"docid":_item["docid"],"extract_count":_item["extract_count"]})
  436. list_group.append(_group)
  437. return json.dumps(list_group)
  438. @annotate('bigint->string')
  439. class f_stamp_squence(BaseUDAF):
  440. '''
  441. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  442. '''
  443. def __init__(self):
  444. import json
  445. global json
  446. import logging
  447. global logging
  448. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  449. def new_buffer(self):
  450. return [set()]
  451. def iterate(self, buffer,page_time_stamp):
  452. buffer[0].add(page_time_stamp)
  453. def merge(self, buffer, pbuffer):
  454. buffer[0] |= pbuffer[0]
  455. def terminate(self, buffer):
  456. if 0 in buffer[0]:
  457. buffer[0].remove(0)
  458. list_stamp = list(buffer[0])
  459. list_stamp.sort(key=lambda x:x)
  460. list_stamp_final = []
  461. _begin = 0
  462. _time_decase = 86400*2
  463. logging.info(str(list_stamp))
  464. for _index in range(len(list_stamp)-1):
  465. if list_stamp[_index+1]-list_stamp[_index]<_time_decase:
  466. continue
  467. else:
  468. list_stamp_final.append([list_stamp[_begin]-_time_decase,list_stamp[_index]+_time_decase])
  469. _begin = _index+1
  470. if len(list_stamp)>0:
  471. list_stamp_final.append([list_stamp[_begin]-_time_decase,list_stamp[-1]+_time_decase])
  472. return json.dumps(list_stamp_final)
  473. @annotate("bigint,string->bigint")
  474. class in_stamp(object):
  475. def __init__(self):
  476. import logging
  477. import re
  478. import json
  479. global logging,re,json
  480. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  481. def evaluate(self, page_time_stamp,json_stamp):
  482. list_stamp = json.loads(json_stamp)
  483. int_flag = 0
  484. for item in list_stamp:
  485. if page_time_stamp <item[0]:
  486. break
  487. if page_time_stamp>item[0] and page_time_stamp<item[1]:
  488. int_flag = 1
  489. break
  490. return int_flag
  491. def getConfidence(rule_id):
  492. if rule_id ==0:
  493. return 30
  494. elif rule_id >=1 and rule_id <30:
  495. return 20
  496. else:
  497. return 10
  498. @annotate('string,string -> string')
  499. class f_splitStr(BaseUDTF):
  500. '''
  501. 将多个组拆解成多条记录
  502. '''
  503. def __init__(self):
  504. import logging
  505. import json
  506. global json,logging
  507. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  508. def process(self, str_split,_split):
  509. try:
  510. for _s in str_split.split(_split):
  511. self.forward(_s)
  512. except Exception as e:
  513. pass
  514. @annotate('string,bigint -> bigint,bigint,bigint,bigint,bigint')
  515. class f_split_group_single(BaseUDTF):
  516. '''
  517. 将多个组拆解成多条记录
  518. '''
  519. def __init__(self):
  520. import logging
  521. import json
  522. global json,logging
  523. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  524. def process(self, json_set_docid,rule_id):
  525. list_group = json.loads(json_set_docid)
  526. for item in list_group:
  527. if len(item)>100:
  528. item.sort(key=lambda x:x["docid"],reverse=True)
  529. index_i = 0
  530. for index_j in range(1,len(item)):
  531. if item[index_i]["docid"]!=item[index_j]["docid"]:
  532. self.forward(item[index_i]["docid"],item[index_j]["docid"],item[index_i]["extract_count"],item[index_j]["extract_count"],getConfidence(rule_id))
  533. else:
  534. for index_i in range(len(item)):
  535. for index_j in range(len(item)):
  536. if index_i!=index_j and item[index_i]["docid"]!=item[index_j]["docid"]:
  537. self.forward(item[index_i]["docid"],item[index_j]["docid"],item[index_i]["extract_count"],item[index_j]["extract_count"],getConfidence(rule_id))
  538. @annotate('bigint,string->string')
  539. class group_document(BaseUDAF):
  540. '''
  541. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  542. '''
  543. def __init__(self):
  544. import json
  545. global json
  546. def new_buffer(self):
  547. return [[]]
  548. def iterate(self, buffer,id,json_set_docid):
  549. buffer[0].append({"id":id,"json_set_docid":json.loads(json_set_docid)})
  550. def merge(self, buffer, pbuffer):
  551. buffer[0].extend(pbuffer[0])
  552. def terminate(self, buffer):
  553. return json.dumps(buffer[0])
  554. @annotate('bigint,string,bigint,string -> bigint,bigint,string')
  555. class decare_document(BaseUDTF):
  556. '''
  557. 将多个组拆解成多条记录
  558. '''
  559. def __init__(self):
  560. import logging
  561. import json
  562. global json,logging
  563. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  564. def process(self,group_id1, json_list_doc1,group_id2,json_list_doc2):
  565. #y=x,少掉近一半的数据
  566. if group_id1>=group_id2:
  567. list_doc1 = json.loads(json_list_doc1)
  568. list_doc2 = json.loads(json_list_doc2)
  569. for _doc1 in list_doc1:
  570. for _doc2 in list_doc2:
  571. #同一个重复group不做判断
  572. if _doc1["id"]!=_doc2["id"]:
  573. #判断两个group是否有重复
  574. _set1 = set()
  575. for _item1 in _doc1["json_set_docid"]:
  576. _set1.add(_item1["docid"])
  577. _set2 = set()
  578. for _item2 in _doc2["json_set_docid"]:
  579. _set2.add(_item2["docid"])
  580. if len(_set1&_set2)>0:
  581. new_json_set_docid = _doc1["json_set_docid"]
  582. for _item2 in _doc2["json_set_docid"]:
  583. if _item2["docid"] not in _set1:
  584. new_json_set_docid.append(_item2)
  585. self.forward(_doc1["id"],_doc2["id"],json.dumps(new_json_set_docid))
  586. def getBestDocid(list_pair):
  587. # [docid1,extract_count1,docid2,extract_count2]
  588. # list_pair.sort(key=lambda x:x[3],reverse=True)
  589. # _max_count = max(list_pair[0][3],list_pair[0][1])
  590. # set_candidate = set()
  591. # if list_pair[0][1]==_max_count:
  592. # set_candidate.add(list_pair[0][0])
  593. # for item in list_pair:
  594. # if item[3]==_max_count:
  595. # set_candidate.add(item[2])
  596. # else:
  597. # break
  598. # list_candidate = list(set_candidate)
  599. # list_candidate.sort(key=lambda x:x)
  600. new_pair = []
  601. new_pair.append([list_pair[0][0],list_pair[0][0],list_pair[0][1]])
  602. for item in list_pair:
  603. new_pair.append([item[0],item[2],item[3]])
  604. new_pair.sort(key=lambda x:x[1])
  605. new_pair.sort(key=lambda x:x[2],reverse=True)
  606. return new_pair[0][1]
  607. @annotate('bigint,bigint,bigint,bigint->string')
  608. class choose_document(BaseUDAF):
  609. '''
  610. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  611. '''
  612. def __init__(self):
  613. import json
  614. global json
  615. def new_buffer(self):
  616. return [[]]
  617. def iterate(self, buffer,docid1,extract_count1,docid2,extract_count2):
  618. buffer[0].append([docid1,extract_count1,docid2,extract_count2])
  619. def merge(self, buffer, pbuffer):
  620. buffer[0].extend(pbuffer[0])
  621. def terminate(self, buffer):
  622. list_pair = buffer[0]
  623. _set = set()
  624. for item in buffer[0]:
  625. _set.add(str(item[2]))
  626. list_dumplicate = list(_set)
  627. best_docid = getBestDocid(list_pair)
  628. if best_docid==list_pair[0][0]:
  629. save_flag = 1
  630. else:
  631. save_flag = 0
  632. return json.dumps({"save_flag":save_flag,"dumplicates":list_dumplicate})
  633. @annotate('string -> bigint,string')
  634. class f_get_choose_document(BaseUDTF):
  635. '''
  636. 将多个组拆解成多条记录
  637. '''
  638. def __init__(self):
  639. import logging
  640. import json
  641. global json,logging
  642. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  643. def process(self,json_choose):
  644. if json_choose is None:
  645. self.forward(1,None)
  646. else:
  647. _choose = json.loads(json_choose)
  648. self.forward(_choose["save_flag"],",".join(_choose["dumplicates"]))
  649. @annotate('bigint,bigint,bigint,bigint->string')
  650. class group_document_bestFirst(BaseUDAF):
  651. '''
  652. 将组里面最优的放在前面
  653. '''
  654. def __init__(self):
  655. import json
  656. global json
  657. def new_buffer(self):
  658. return [[]]
  659. def iterate(self, buffer,docid1,extract_count1,docid2,extract_count2):
  660. buffer[0].append([docid1,extract_count1,docid2,extract_count2])
  661. def merge(self, buffer, pbuffer):
  662. buffer[0].extend(pbuffer[0])
  663. def terminate(self, buffer):
  664. list_pair = buffer[0]
  665. _set = set()
  666. for item in buffer[0]:
  667. _set.add(item[2])
  668. _set.add(list_pair[0][0])
  669. best_docid = getBestDocid(list_pair)
  670. _set.remove(best_docid)
  671. list_dumplicate = list(_set)
  672. list_dumplicate.sort(key=lambda x:x)
  673. list_dumplicate.insert(0,best_docid)
  674. list_dumplicate_str = []
  675. for item in list_dumplicate:
  676. list_dumplicate_str.append(str(item))
  677. return ",".join(list_dumplicate_str)
  678. @annotate('string -> bigint,string')
  679. class f_get_best_dumplicates(BaseUDTF):
  680. '''
  681. 得到每个分组中最优的那一条及其重复记录
  682. '''
  683. def __init__(self):
  684. import logging
  685. import json
  686. global json,logging
  687. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  688. def process(self,list_dumplicate_str):
  689. if list_dumplicate_str is None:
  690. pass
  691. else:
  692. list_dumplicate = list_dumplicate_str.split(",")
  693. if len(list_dumplicate)>0:
  694. self.forward(int(list_dumplicate[0]),",".join(list_dumplicate[1:]))
  695. else:
  696. pass
  697. @annotate('bigint,bigint->string')
  698. class bridge2group(BaseUDAF):
  699. '''
  700. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  701. '''
  702. def __init__(self):
  703. import json
  704. global json
  705. def new_buffer(self):
  706. return [set()]
  707. def iterate(self, buffer,docid1,docid2):
  708. buffer[0].add(docid1)
  709. buffer[0].add(docid2)
  710. def merge(self, buffer, pbuffer):
  711. buffer[0] |= pbuffer[0]
  712. def terminate(self, buffer):
  713. list_pair = list(buffer[0])
  714. list_pair.sort(key=lambda x:x,reverse=True)
  715. return json.dumps(list_pair)
  716. @annotate('string -> bigint,bigint')
  717. class group2bridge(BaseUDTF):
  718. '''
  719. 将多个组拆解成多条记录
  720. '''
  721. def __init__(self):
  722. import logging
  723. import json
  724. global json,logging
  725. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  726. def process(self,json_list_docid):
  727. list_docid = json.loads(json_list_docid)
  728. for _docid in list_docid:
  729. self.forward(list_docid[-1],_docid)
  730. @annotate('bigint,bigint,string -> bigint')
  731. class f_get_dump_docid(BaseUDTF):
  732. '''
  733. 将多个组拆解成多条记录
  734. '''
  735. def __init__(self):
  736. import logging
  737. import json
  738. global json,logging
  739. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  740. def process(self,docid,save_flag,dumplicates):
  741. if save_flag==0:
  742. self.forward(docid)
  743. if dumplicates is not None:
  744. list_docid = dumplicates.split(",")
  745. if len(list_docid)>0:
  746. for _docid in list_docid[1:]:
  747. self.forward(int(_docid))
  748. else:
  749. if dumplicates is not None:
  750. list_docid = dumplicates.split(",")
  751. if len(list_docid)>0:
  752. for _docid in list_docid:
  753. self.forward(int(_docid))
  754. @annotate('string -> bigint,bigint')
  755. class f_get_docid(BaseUDTF):
  756. '''
  757. 将多个组拆解成多条记录
  758. '''
  759. def __init__(self):
  760. import logging
  761. import json
  762. global json,logging
  763. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  764. def process(self,json_set_docid):
  765. team_id = 0
  766. if json_set_docid is not None:
  767. list_docses = json.loads(json_set_docid)
  768. for list_docs in list_docses:
  769. team_id += 1
  770. for item in list_docs:
  771. self.forward(team_id,item["docid"])
  772. @annotate("string->bigint")
  773. class get_count_dump(object):
  774. def __init__(self):
  775. import logging
  776. import re
  777. global logging,re
  778. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  779. def evaluate(self, title):
  780. _count = 0
  781. if title is not None:
  782. _count = len(title.split(","))
  783. return _count
  784. def getSet(list_dict,key):
  785. _set = set()
  786. for item in list_dict:
  787. if key in item:
  788. if item[key]!='' and item[key] is not None:
  789. if re.search("^\d[\d\.]*$",item[key]) is not None:
  790. _set.add(str(float(item[key])))
  791. else:
  792. _set.add(str(item[key]))
  793. return _set
  794. def getDiffIndex(list_dict,key,confidence=100):
  795. _set = set()
  796. for _i in range(len(list_dict)):
  797. item = list_dict[_i]
  798. if item["confidence"]>=confidence:
  799. continue
  800. if key in item:
  801. if item[key]!='' and item[key] is not None:
  802. if re.search("^\d+(\.\d+)?$",item[key]) is not None:
  803. _set.add(str(float(item[key])))
  804. else:
  805. _set.add(str(item[key]))
  806. if len(_set)>1:
  807. return _i
  808. return len(list_dict)
  809. @annotate('bigint,string -> bigint,bigint')
  810. class f_getGroup_dumpFinal(BaseUDTF):
  811. '''
  812. 从最后的结果中获取组
  813. '''
  814. def __init__(self):
  815. import logging
  816. import json
  817. global json,logging
  818. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  819. def process(self,docid,dumplicates):
  820. self.forward(int(docid),int(docid))
  821. if dumplicates is not None:
  822. list_docids = dumplicates.split(",")
  823. for _docid in list_docids:
  824. self.forward(int(docid),int(_docid))
  825. @annotate('bigint,bigint,string,string,string,string,bigint,bigint,bigint->string')
  826. class f_redump_limit_num(BaseUDAF):
  827. '''
  828. 去重合并后重新判断,组内个数大于5时,dottitle、tenderee、win_tenderer、bidding_budget组内只能有一个取值
  829. 组内个数小于等于5时,tenderee、win_tenderer、bidding_budget组内只能有一个取值
  830. '''
  831. def __init__(self):
  832. import logging
  833. import json,re
  834. global json,logging,re
  835. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  836. def new_buffer(self):
  837. return [list()]
  838. def iterate(self, buffer,main_docid,docid,doctitle,set_limit_column2,set_limit_column3,set_limit_column4,extract_count1,extract_count2,confidence):
  839. buffer[0].append({"main_docid":main_docid,"docid":docid,"doctitle":doctitle,"set_limit_column2":set_limit_column2,
  840. "set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,"extract_count1":extract_count1,
  841. "extract_count2":extract_count2,"confidence":confidence})
  842. def merge(self, buffer, pbuffer):
  843. buffer[0].extend(pbuffer[0])
  844. def terminate(self, buffer):
  845. list_group = []
  846. the_group = buffer[0]
  847. the_group.sort(key=lambda x:x["confidence"],reverse=True)
  848. if len(the_group)>5:
  849. keys = ["doctitle","set_limit_column2","set_limit_column3","set_limit_column4"]
  850. else:
  851. keys = ["set_limit_column2","set_limit_column3","set_limit_column4"]
  852. final_group = []
  853. #置信度
  854. list_key_index = []
  855. for _k in keys:
  856. if _k=="doctitle":
  857. list_key_index.append(getDiffIndex(the_group,_k,confidence=30))
  858. else:
  859. list_key_index.append(getDiffIndex(the_group,_k))
  860. _index = min(list_key_index)
  861. if _index>1:
  862. main_docid = the_group[0]["main_docid"]
  863. for item in the_group[:_index]:
  864. if item["docid"]!=main_docid:
  865. final_group.append({"docid1":main_docid,"docid2":item["docid"],"extract_count1":item["extract_count1"],"extract_count2":item["extract_count2"],"confidence":item["confidence"]})
  866. # stay = True
  867. # for _key in keys:
  868. # if len(getSet(the_group,_key))>1:
  869. # stay = False
  870. # break
  871. #
  872. # if stay:
  873. # main_docid = the_group[0]["main_docid"]
  874. # for item in the_group:
  875. # if item["docid"]!=main_docid:
  876. # final_group.append({"docid1":main_docid,"docid2":item["docid"],"extract_count1":item["extract_count1"],"extract_count2":item["extract_count2"]})
  877. return json.dumps(final_group)
  878. @annotate('string -> bigint,bigint,bigint,bigint,bigint')
  879. class f_get_dumpFinal_checked(BaseUDTF):
  880. '''
  881. 从最后的结果中获取组
  882. '''
  883. def __init__(self):
  884. import logging
  885. import json
  886. global json,logging
  887. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  888. def process(self,list_group):
  889. if list_group is not None:
  890. final_group = json.loads(list_group)
  891. for _group in final_group:
  892. self.forward(_group["docid1"],_group["docid2"],_group["extract_count1"],_group["extract_count2"],_group["confidence"])
  893. @annotate('string -> bigint')
  894. class f_getDumplicateDocids(BaseUDTF):
  895. '''
  896. 从最后的结果中获取组
  897. '''
  898. def __init__(self):
  899. import logging
  900. import json
  901. global json,logging
  902. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  903. def process(self,dumplicates):
  904. list_docids = dumplicates.split(",")
  905. for _d in list_docids:
  906. self.forward(int(_d))