documentDumplicate.py 38 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036
  1. #coding:UTF8
  2. from odps.udf import annotate
  3. from odps.udf import BaseUDTF
  4. from odps.udf import BaseUDAF
  5. @annotate('string,string -> string,bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,bigint')
  6. class f_decode_extract(BaseUDTF):
  7. def __init__(self):
  8. import logging
  9. import json
  10. import time,re
  11. global json,logging,time,re
  12. self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*"
  13. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  14. self.dict_channel = {"公告变更":51,
  15. "招标公告":52,
  16. "中标信息":101,
  17. "招标预告":102,
  18. "招标答疑":103,
  19. "资审结果":105,
  20. "法律法规":106,
  21. "新闻资讯":107,
  22. "采购意向":114,
  23. "拍卖出让":115,
  24. "土地矿产":116,
  25. "产权交易":117,
  26. "废标公告":118,
  27. "候选人公示":119,
  28. "合同公告":120}
  29. def process(self, extractjson,otherjson):
  30. if extractjson is not None:
  31. _extract = json.loads(extractjson)
  32. else:
  33. _extract = {}
  34. if otherjson is not None:
  35. _other = json.loads(otherjson)
  36. else:
  37. _other = {}
  38. project_code = ""
  39. project_name = ""
  40. tenderee = ""
  41. agency = ""
  42. win_tenderer = ""
  43. bidding_budget = ""
  44. win_bid_price = ""
  45. fingerprint = ""
  46. page_time_stamp = 0
  47. docchannel = 0
  48. extract_count = 0
  49. page_time = _other.get("pageTime",time.strftime('%Y-%m-%d',time.localtime()))
  50. doctitle = _other.get("doctitle","")
  51. doctitle_refine = re.sub(r'工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|合同', '', doctitle)
  52. area = _other.get("area","")
  53. province = _other.get("province","")
  54. city = _other.get("city","")
  55. district = _other.get("district","")
  56. web_source_no = _other.get("webSourceNo","")
  57. # docchannel = _other.get("docchannel",0)
  58. docchannel = self.dict_channel.get(_extract.get("docchannel",""),0)
  59. if re.search(self.time_pattern,page_time) is not None:
  60. try:
  61. timeArray = time.strptime(page_time[:11], "%Y-%m-%d")
  62. page_time_stamp = int(time.mktime(timeArray))
  63. except Exception as e:
  64. pass
  65. list_code = _extract.get("code",[])
  66. if len(list_code)>0:
  67. project_code = list_code[0]
  68. project_name = _extract.get("name","")
  69. fingerprint = _extract.get("fingerprint","")
  70. dict_pack = _extract.get("prem",{})
  71. logging.info(dict_pack)
  72. for _key in dict_pack.keys():
  73. if dict_pack[_key]["tendereeMoney"]!='' and float(dict_pack[_key]["tendereeMoney"])>0:
  74. extract_count += 1
  75. if bidding_budget=="":
  76. bidding_budget = str(float(dict_pack[_key]["tendereeMoney"]))
  77. for _role in dict_pack[_key]["roleList"]:
  78. extract_count += 1
  79. if _role[2]!='' and float(_role[2])>0:
  80. extract_count += 1
  81. if _role[0]=="tenderee":
  82. tenderee = _role[1]
  83. if _role[0]=="win_tenderer":
  84. if win_tenderer=="":
  85. win_tenderer = _role[1]
  86. if _role[2]!='' and float(_role[2])>0:
  87. extract_count += 1
  88. if win_bid_price=="":
  89. win_bid_price = str(float(_role[2]))
  90. if _role[0]=="agency":
  91. agency = _role[1]
  92. if project_code!="":
  93. extract_count += 1
  94. if project_name!="":
  95. extract_count += 1
  96. logging.info(page_time+doctitle+doctitle_refine+area+province+city+
  97. district+web_source_no+project_code+project_name+tenderee+agency+win_tenderer+bidding_budget+win_bid_price)
  98. self.forward(page_time,page_time_stamp,docchannel,doctitle,doctitle_refine,area,province,city,
  99. district,web_source_no,fingerprint,project_code,project_name,tenderee,agency,win_tenderer,bidding_budget,win_bid_price,extract_count)
  100. @annotate("string->bigint")
  101. class f_get_extractCount(object):
  102. def __init__(self):
  103. import time
  104. global time
  105. import logging
  106. import json
  107. import re
  108. global json,logging,re
  109. self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*"
  110. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  111. def evaluate(self, extractjson):
  112. if extractjson is not None:
  113. _extract = json.loads(extractjson)
  114. else:
  115. _extract = {}
  116. dict_pack = _extract.get("prem",{})
  117. extract_count = 0
  118. list_code = _extract.get("code",[])
  119. if len(list_code)>0:
  120. project_code = list_code[0]
  121. else:
  122. project_code = ""
  123. project_name = _extract.get("name","")
  124. bidding_budget = ""
  125. win_tenderer = ""
  126. win_bid_price = ""
  127. for _key in dict_pack.keys():
  128. if dict_pack[_key]["tendereeMoney"]!='' and float(dict_pack[_key]["tendereeMoney"])>0:
  129. extract_count += 1
  130. if bidding_budget=="":
  131. bidding_budget = str(float(dict_pack[_key]["tendereeMoney"]))
  132. for _role in dict_pack[_key]["roleList"]:
  133. extract_count += 1
  134. if _role[2]!='' and float(_role[2])>0:
  135. extract_count += 1
  136. if _role[0]=="tenderee":
  137. tenderee = _role[1]
  138. if _role[0]=="win_tenderer":
  139. if win_tenderer=="":
  140. win_tenderer = _role[1]
  141. if _role[2]!='' and float(_role[2])>0:
  142. extract_count += 1
  143. if win_bid_price=="":
  144. win_bid_price = str(float(_role[2]))
  145. if _role[0]=="agency":
  146. agency = _role[1]
  147. if project_code!="":
  148. extract_count += 1
  149. if project_name!="":
  150. extract_count += 1
  151. return extract_count
  152. @annotate('string,string,string,string,string -> string,string,string,bigint')
  153. class f_decode_sub_docs_json(BaseUDTF):
  154. def __init__(self):
  155. import logging
  156. import json
  157. global json,logging
  158. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  159. def process(self, project_code,project_name,tenderee,agency,sub_docs_json):
  160. columns = {"win_tenderer":"","bidding_budget":"","win_bid_price":""}
  161. extract_count = 0
  162. if project_code is not None and project_code!="":
  163. extract_count += 1
  164. if project_name is not None and project_name!="":
  165. extract_count += 1
  166. if tenderee is not None and tenderee!="":
  167. extract_count += 1
  168. if agency is not None and agency!="":
  169. extract_count += 1
  170. if sub_docs_json is not None:
  171. for sub_docs in json.loads(sub_docs_json):
  172. for _key_sub_docs in sub_docs.keys():
  173. extract_count += 1
  174. if _key_sub_docs in columns:
  175. if columns[_key_sub_docs]=="" and str(sub_docs[_key_sub_docs]) not in ["","0"]:
  176. if _key_sub_docs in ["bidding_budget","win_bid_price"]:
  177. if float(sub_docs[_key_sub_docs])>0:
  178. columns[_key_sub_docs] = str(float(sub_docs[_key_sub_docs]))
  179. else:
  180. columns[_key_sub_docs] = str(sub_docs[_key_sub_docs])
  181. self.forward(columns["win_tenderer"],columns["bidding_budget"],columns["win_bid_price"],extract_count)
  182. @annotate("string->bigint")
  183. class totimestamp(object):
  184. def __init__(self):
  185. import time
  186. global time
  187. import logging
  188. import json
  189. import re
  190. global json,logging,re
  191. self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*"
  192. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  193. def evaluate(self, str_time):
  194. try:
  195. logging.info(str_time)
  196. if str_time is not None and re.search(self.time_pattern,str_time) is not None:
  197. timeArray = time.strptime(str_time[:10], "%Y-%m-%d")
  198. timeStamp = int(time.mktime(timeArray))
  199. return timeStamp
  200. else:
  201. return 0
  202. except Exception as e:
  203. return 0
  204. @annotate("string->string")
  205. class refind_name(object):
  206. def __init__(self):
  207. import logging
  208. import re
  209. global logging,re
  210. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  211. def evaluate(self, title):
  212. if title is not None:
  213. return re.sub(r'工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|\[|\]|【|】', '', title)
  214. return ""
  215. @annotate('bigint,bigint,bigint,string,bigint,string->string')
  216. class f_set_docid(BaseUDAF):
  217. '''
  218. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  219. '''
  220. def __init__(self):
  221. import json
  222. global json
  223. def new_buffer(self):
  224. return [[]]
  225. def iterate(self, buffer,docid, page_time_stamp,extract_count,defind_column,defind_count,tenderee):
  226. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count,
  227. "defind_column":defind_column,"defind_count":defind_count,"tenderee":tenderee})
  228. def merge(self, buffer, pbuffer):
  229. buffer[0].extend(pbuffer[0])
  230. def terminate(self, buffer):
  231. list_docs = buffer[0]
  232. list_docs.sort(key=lambda x:x["page_time_stamp"])
  233. list_group = []
  234. _begin = 0
  235. defind_count = 0
  236. if len(list_docs)>0:
  237. defind_count = list_docs[0]["defind_count"]
  238. for i in range(len(list_docs)-1):
  239. if abs(list_docs[i]["page_time_stamp"]-list_docs[i+1]["page_time_stamp"])<=86400*2:
  240. continue
  241. else:
  242. _group = []
  243. _set_column = set()
  244. _set_tenderee = set()
  245. for j in range(_begin,i+1):
  246. if list_docs[j]["tenderee"] is not None and list_docs[j]["tenderee"]!="":
  247. _set_tenderee.add(list_docs[j]["tenderee"])
  248. _set_column.add(list_docs[j]["defind_column"])
  249. _group.append({"docid":list_docs[j]["docid"],"extract_count":list_docs[j]["extract_count"]})
  250. if len(_group)>=3 and len(_set_tenderee)>1:
  251. pass
  252. else:
  253. if len(_group)>1:
  254. if defind_count==2:
  255. if len(_set_column)>=2:
  256. list_group.append(_group)
  257. elif defind_count==1:
  258. if len(_set_column)==1:
  259. list_group.append(_group)
  260. elif defind_count==0:
  261. list_group.append(_group)
  262. _begin = i+1
  263. if len(list_docs)>1:
  264. _set_column = set()
  265. _set_tenderee = set()
  266. _group = []
  267. for j in range(_begin,len(list_docs)):
  268. if list_docs[j]["tenderee"] is not None and list_docs[j]["tenderee"]!="":
  269. _set_tenderee.add(list_docs[j]["tenderee"])
  270. _set_column.add(list_docs[j]["defind_column"])
  271. _group.append({"docid":list_docs[j]["docid"],"extract_count":list_docs[j]["extract_count"]})
  272. if len(_group)>=3 and len(_set_tenderee)>1:
  273. pass
  274. else:
  275. if len(_group)>1:
  276. if defind_count==2:
  277. if len(_set_column)>=2:
  278. list_group.append(_group)
  279. elif defind_count==1:
  280. if len(_set_column)==1:
  281. list_group.append(_group)
  282. elif defind_count==0:
  283. list_group.append(_group)
  284. return json.dumps(list_group)
  285. def isEmpty(_str):
  286. if _str is None or _str=="":
  287. return True
  288. return False
  289. @annotate('bigint,bigint,bigint,string,string,string,string,string,string,string,string->string')
  290. class f_set_docid_binaryChart(BaseUDAF):
  291. '''
  292. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  293. '''
  294. def __init__(self):
  295. import json
  296. global json
  297. def new_buffer(self):
  298. return [[]]
  299. def iterate(self, buffer,docid, page_time_stamp,extract_count,project_code,project_name,tenderee,bidding_budget,win_tenderer,win_bid_price,agency,web_source_no):
  300. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count,
  301. "project_code":project_code,"project_name":project_name,"tenderee":tenderee,
  302. "bidding_budget":bidding_budget,"win_tenderer":win_tenderer,"win_bid_price":win_bid_price,
  303. "agency":agency,"web_source_no":web_source_no})
  304. def merge(self, buffer, pbuffer):
  305. buffer[0].extend(pbuffer[0])
  306. def terminate(self, buffer):
  307. list_docs = buffer[0]
  308. list_timeGroups = split_with_time(list_docs,"page_time_stamp",86400*2)
  309. list_group = []
  310. empty_key = ["project_code","bidding_budget","win_tenderer","win_bid_price","agency"]
  311. for _timeGroups in list_timeGroups:
  312. list_empty = []
  313. list_notEmpty = []
  314. for _item in _timeGroups:
  315. empty_flag = True
  316. for _key in empty_key:
  317. if not isEmpty(_item[_key]):
  318. empty_flag = False
  319. break
  320. if empty_flag:
  321. list_empty.append(_item)
  322. else:
  323. list_notEmpty.append(_item)
  324. for _e in list_empty:
  325. _group = [{"docid":_e["docid"],"extract_count":_e["extract_count"]}]
  326. _e_tenderee = _e["tenderee"]
  327. for _ne in list_notEmpty:
  328. if "set_webSource" not in _ne:
  329. _ne["set_webSource"] = set()
  330. _ne["set_webSource"].add(_ne["web_source_no"])
  331. _suit = False
  332. if not isEmpty(_e_tenderee) and _e_tenderee==_ne["tenderee"]:
  333. _suit = True
  334. elif isEmpty(_e_tenderee):
  335. _suit = True
  336. if _suit:
  337. if _e["web_source_no"] not in _ne["set_webSource"]:
  338. _ne["set_webSource"].add(_e["web_source_no"])
  339. _group.append({"docid":_ne["docid"],"extract_count":_ne["extract_count"]})
  340. break
  341. if len(_group)>1:
  342. list_group.append(_group)
  343. return json.dumps(list_group)
  344. def split_with_time(list_dict,sort_key,timedelta=86400*2):
  345. if len(list_dict)>0:
  346. if sort_key in list_dict[0]:
  347. list_dict.sort(key=lambda x:x[sort_key])
  348. list_group = []
  349. _begin = 0
  350. for i in range(len(list_dict)-1):
  351. if abs(list_dict[i][sort_key]-list_dict[i+1][sort_key])<timedelta:
  352. continue
  353. else:
  354. _group = []
  355. for j in range(_begin,i+1):
  356. _group.append(list_dict[j])
  357. if len(_group)>1:
  358. list_group.append(_group)
  359. _begin = i + 1
  360. if len(list_dict)>1:
  361. _group = []
  362. for j in range(_begin,len(list_dict)):
  363. _group.append(list_dict[j])
  364. if len(_group)>1:
  365. list_group.append(_group)
  366. return list_group
  367. return [list_dict]
  368. @annotate('bigint,bigint,bigint,string,string,string,string,string->string')
  369. class f_set_docid_limitNum_contain(BaseUDAF):
  370. '''
  371. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同
  372. '''
  373. def __init__(self):
  374. import logging
  375. import json,re
  376. global json,logging,re
  377. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  378. def new_buffer(self):
  379. return [list()]
  380. def iterate(self, buffer,docid,page_time_stamp,extract_count,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column):
  381. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count,"set_limit_column1":set_limit_column1,
  382. "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,
  383. "contain_column":contain_column})
  384. def merge(self, buffer, pbuffer):
  385. buffer[0].extend(pbuffer[0])
  386. def terminate(self, buffer):
  387. list_split = split_with_time(buffer[0],"page_time_stamp")
  388. list_group = []
  389. for _split in list_split:
  390. flag = True
  391. keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"]
  392. for _key in keys:
  393. logging.info(_key+str(getSet(_split,_key)))
  394. if len(getSet(_split,_key))>1:
  395. flag = False
  396. break
  397. MAX_CONTAIN_COLUMN = None
  398. #判断组内每条公告是否包含
  399. if flag:
  400. for _d in _split:
  401. contain_column = _d["contain_column"]
  402. if contain_column is not None and contain_column !="":
  403. if MAX_CONTAIN_COLUMN is None:
  404. MAX_CONTAIN_COLUMN = contain_column
  405. else:
  406. if len(MAX_CONTAIN_COLUMN)<len(contain_column):
  407. if contain_column.find(MAX_CONTAIN_COLUMN)==-1:
  408. flag = False
  409. break
  410. MAX_CONTAIN_COLUMN = contain_column
  411. else:
  412. if MAX_CONTAIN_COLUMN.find(contain_column)==-1:
  413. flag = False
  414. break
  415. if flag:
  416. if len(_split)>1:
  417. _group = []
  418. for _item in _split:
  419. _group.append({"docid":_item["docid"],"extract_count":_item["extract_count"]})
  420. list_group.append(_group)
  421. return json.dumps(list_group)
  422. @annotate('bigint->string')
  423. class f_stamp_squence(BaseUDAF):
  424. '''
  425. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  426. '''
  427. def __init__(self):
  428. import json
  429. global json
  430. import logging
  431. global logging
  432. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  433. def new_buffer(self):
  434. return [set()]
  435. def iterate(self, buffer,page_time_stamp):
  436. buffer[0].add(page_time_stamp)
  437. def merge(self, buffer, pbuffer):
  438. buffer[0] |= pbuffer[0]
  439. def terminate(self, buffer):
  440. if 0 in buffer[0]:
  441. buffer[0].remove(0)
  442. list_stamp = list(buffer[0])
  443. list_stamp.sort(key=lambda x:x)
  444. list_stamp_final = []
  445. _begin = 0
  446. _time_decase = 86400*2
  447. logging.info(str(list_stamp))
  448. for _index in range(len(list_stamp)-1):
  449. if list_stamp[_index+1]-list_stamp[_index]<_time_decase:
  450. continue
  451. else:
  452. list_stamp_final.append([list_stamp[_begin]-_time_decase,list_stamp[_index]+_time_decase])
  453. _begin = _index+1
  454. if len(list_stamp)>0:
  455. list_stamp_final.append([list_stamp[_begin]-_time_decase,list_stamp[-1]+_time_decase])
  456. return json.dumps(list_stamp_final)
  457. @annotate("bigint,string->bigint")
  458. class in_stamp(object):
  459. def __init__(self):
  460. import logging
  461. import re
  462. import json
  463. global logging,re,json
  464. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  465. def evaluate(self, page_time_stamp,json_stamp):
  466. list_stamp = json.loads(json_stamp)
  467. int_flag = 0
  468. for item in list_stamp:
  469. if page_time_stamp <item[0]:
  470. break
  471. if page_time_stamp>item[0] and page_time_stamp<item[1]:
  472. int_flag = 1
  473. break
  474. return int_flag
  475. def getConfidence(rule_id):
  476. if rule_id ==0:
  477. return 30
  478. elif rule_id >=1 and rule_id <30:
  479. return 20
  480. else:
  481. return 10
  482. @annotate('string,string -> string')
  483. class f_splitStr(BaseUDTF):
  484. '''
  485. 将多个组拆解成多条记录
  486. '''
  487. def __init__(self):
  488. import logging
  489. import json
  490. global json,logging
  491. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  492. def process(self, str_split,_split):
  493. try:
  494. for _s in str_split.split(_split):
  495. self.forward(_s)
  496. except Exception as e:
  497. pass
  498. @annotate('string,bigint -> bigint,bigint,bigint,bigint,bigint')
  499. class f_split_group_single(BaseUDTF):
  500. '''
  501. 将多个组拆解成多条记录
  502. '''
  503. def __init__(self):
  504. import logging
  505. import json
  506. global json,logging
  507. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  508. def process(self, json_set_docid,rule_id):
  509. list_group = json.loads(json_set_docid)
  510. for item in list_group:
  511. if len(item)>100:
  512. item.sort(key=lambda x:x["docid"],reverse=True)
  513. index_i = 0
  514. for index_j in range(1,len(item)):
  515. if item[index_i]["docid"]!=item[index_j]["docid"]:
  516. self.forward(item[index_i]["docid"],item[index_j]["docid"],item[index_i]["extract_count"],item[index_j]["extract_count"],getConfidence(rule_id))
  517. else:
  518. for index_i in range(len(item)):
  519. for index_j in range(len(item)):
  520. if index_i!=index_j and item[index_i]["docid"]!=item[index_j]["docid"]:
  521. self.forward(item[index_i]["docid"],item[index_j]["docid"],item[index_i]["extract_count"],item[index_j]["extract_count"],getConfidence(rule_id))
  522. @annotate('bigint,string->string')
  523. class group_document(BaseUDAF):
  524. '''
  525. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  526. '''
  527. def __init__(self):
  528. import json
  529. global json
  530. def new_buffer(self):
  531. return [[]]
  532. def iterate(self, buffer,id,json_set_docid):
  533. buffer[0].append({"id":id,"json_set_docid":json.loads(json_set_docid)})
  534. def merge(self, buffer, pbuffer):
  535. buffer[0].extend(pbuffer[0])
  536. def terminate(self, buffer):
  537. return json.dumps(buffer[0])
  538. @annotate('bigint,string,bigint,string -> bigint,bigint,string')
  539. class decare_document(BaseUDTF):
  540. '''
  541. 将多个组拆解成多条记录
  542. '''
  543. def __init__(self):
  544. import logging
  545. import json
  546. global json,logging
  547. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  548. def process(self,group_id1, json_list_doc1,group_id2,json_list_doc2):
  549. #y=x,少掉近一半的数据
  550. if group_id1>=group_id2:
  551. list_doc1 = json.loads(json_list_doc1)
  552. list_doc2 = json.loads(json_list_doc2)
  553. for _doc1 in list_doc1:
  554. for _doc2 in list_doc2:
  555. #同一个重复group不做判断
  556. if _doc1["id"]!=_doc2["id"]:
  557. #判断两个group是否有重复
  558. _set1 = set()
  559. for _item1 in _doc1["json_set_docid"]:
  560. _set1.add(_item1["docid"])
  561. _set2 = set()
  562. for _item2 in _doc2["json_set_docid"]:
  563. _set2.add(_item2["docid"])
  564. if len(_set1&_set2)>0:
  565. new_json_set_docid = _doc1["json_set_docid"]
  566. for _item2 in _doc2["json_set_docid"]:
  567. if _item2["docid"] not in _set1:
  568. new_json_set_docid.append(_item2)
  569. self.forward(_doc1["id"],_doc2["id"],json.dumps(new_json_set_docid))
  570. def getBestDocid(list_pair):
  571. # [docid1,extract_count1,docid2,extract_count2]
  572. # list_pair.sort(key=lambda x:x[3],reverse=True)
  573. # _max_count = max(list_pair[0][3],list_pair[0][1])
  574. # set_candidate = set()
  575. # if list_pair[0][1]==_max_count:
  576. # set_candidate.add(list_pair[0][0])
  577. # for item in list_pair:
  578. # if item[3]==_max_count:
  579. # set_candidate.add(item[2])
  580. # else:
  581. # break
  582. # list_candidate = list(set_candidate)
  583. # list_candidate.sort(key=lambda x:x)
  584. new_pair = []
  585. new_pair.append([list_pair[0][0],list_pair[0][0],list_pair[0][1]])
  586. for item in list_pair:
  587. new_pair.append([item[0],item[2],item[3]])
  588. new_pair.sort(key=lambda x:x[1])
  589. new_pair.sort(key=lambda x:x[2],reverse=True)
  590. return new_pair[0][1]
  591. @annotate('bigint,bigint,bigint,bigint->string')
  592. class choose_document(BaseUDAF):
  593. '''
  594. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  595. '''
  596. def __init__(self):
  597. import json
  598. global json
  599. def new_buffer(self):
  600. return [[]]
  601. def iterate(self, buffer,docid1,extract_count1,docid2,extract_count2):
  602. buffer[0].append([docid1,extract_count1,docid2,extract_count2])
  603. def merge(self, buffer, pbuffer):
  604. buffer[0].extend(pbuffer[0])
  605. def terminate(self, buffer):
  606. list_pair = buffer[0]
  607. _set = set()
  608. for item in buffer[0]:
  609. _set.add(str(item[2]))
  610. list_dumplicate = list(_set)
  611. best_docid = getBestDocid(list_pair)
  612. if best_docid==list_pair[0][0]:
  613. save_flag = 1
  614. else:
  615. save_flag = 0
  616. return json.dumps({"save_flag":save_flag,"dumplicates":list_dumplicate})
  617. @annotate('string -> bigint,string')
  618. class f_get_choose_document(BaseUDTF):
  619. '''
  620. 将多个组拆解成多条记录
  621. '''
  622. def __init__(self):
  623. import logging
  624. import json
  625. global json,logging
  626. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  627. def process(self,json_choose):
  628. if json_choose is None:
  629. self.forward(1,None)
  630. else:
  631. _choose = json.loads(json_choose)
  632. self.forward(_choose["save_flag"],",".join(_choose["dumplicates"]))
  633. @annotate('bigint,bigint,bigint,bigint->string')
  634. class group_document_bestFirst(BaseUDAF):
  635. '''
  636. 将组里面最优的放在前面
  637. '''
  638. def __init__(self):
  639. import json
  640. global json
  641. def new_buffer(self):
  642. return [[]]
  643. def iterate(self, buffer,docid1,extract_count1,docid2,extract_count2):
  644. buffer[0].append([docid1,extract_count1,docid2,extract_count2])
  645. def merge(self, buffer, pbuffer):
  646. buffer[0].extend(pbuffer[0])
  647. def terminate(self, buffer):
  648. list_pair = buffer[0]
  649. _set = set()
  650. for item in buffer[0]:
  651. _set.add(item[2])
  652. _set.add(list_pair[0][0])
  653. best_docid = getBestDocid(list_pair)
  654. _set.remove(best_docid)
  655. list_dumplicate = list(_set)
  656. list_dumplicate.sort(key=lambda x:x)
  657. list_dumplicate.insert(0,best_docid)
  658. list_dumplicate_str = []
  659. for item in list_dumplicate:
  660. list_dumplicate_str.append(str(item))
  661. return ",".join(list_dumplicate_str)
  662. @annotate('string -> bigint,string')
  663. class f_get_best_dumplicates(BaseUDTF):
  664. '''
  665. 得到每个分组中最优的那一条及其重复记录
  666. '''
  667. def __init__(self):
  668. import logging
  669. import json
  670. global json,logging
  671. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  672. def process(self,list_dumplicate_str):
  673. if list_dumplicate_str is None:
  674. pass
  675. else:
  676. list_dumplicate = list_dumplicate_str.split(",")
  677. if len(list_dumplicate)>0:
  678. self.forward(int(list_dumplicate[0]),",".join(list_dumplicate[1:]))
  679. else:
  680. pass
  681. @annotate('bigint,bigint->string')
  682. class bridge2group(BaseUDAF):
  683. '''
  684. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  685. '''
  686. def __init__(self):
  687. import json
  688. global json
  689. def new_buffer(self):
  690. return [set()]
  691. def iterate(self, buffer,docid1,docid2):
  692. buffer[0].add(docid1)
  693. buffer[0].add(docid2)
  694. def merge(self, buffer, pbuffer):
  695. buffer[0] |= pbuffer[0]
  696. def terminate(self, buffer):
  697. list_pair = list(buffer[0])
  698. list_pair.sort(key=lambda x:x,reverse=True)
  699. return json.dumps(list_pair)
  700. @annotate('string -> bigint,bigint')
  701. class group2bridge(BaseUDTF):
  702. '''
  703. 将多个组拆解成多条记录
  704. '''
  705. def __init__(self):
  706. import logging
  707. import json
  708. global json,logging
  709. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  710. def process(self,json_list_docid):
  711. list_docid = json.loads(json_list_docid)
  712. for _docid in list_docid:
  713. self.forward(list_docid[-1],_docid)
  714. @annotate('bigint,bigint,string -> bigint')
  715. class f_get_dump_docid(BaseUDTF):
  716. '''
  717. 将多个组拆解成多条记录
  718. '''
  719. def __init__(self):
  720. import logging
  721. import json
  722. global json,logging
  723. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  724. def process(self,docid,save_flag,dumplicates):
  725. if save_flag==0:
  726. self.forward(docid)
  727. if dumplicates is not None:
  728. list_docid = dumplicates.split(",")
  729. if len(list_docid)>0:
  730. for _docid in list_docid[1:]:
  731. self.forward(int(_docid))
  732. else:
  733. if dumplicates is not None:
  734. list_docid = dumplicates.split(",")
  735. if len(list_docid)>0:
  736. for _docid in list_docid:
  737. self.forward(int(_docid))
  738. @annotate('string -> bigint,bigint')
  739. class f_get_docid(BaseUDTF):
  740. '''
  741. 将多个组拆解成多条记录
  742. '''
  743. def __init__(self):
  744. import logging
  745. import json
  746. global json,logging
  747. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  748. def process(self,json_set_docid):
  749. team_id = 0
  750. if json_set_docid is not None:
  751. list_docses = json.loads(json_set_docid)
  752. for list_docs in list_docses:
  753. team_id += 1
  754. for item in list_docs:
  755. self.forward(team_id,item["docid"])
  756. @annotate("string->bigint")
  757. class get_count_dump(object):
  758. def __init__(self):
  759. import logging
  760. import re
  761. global logging,re
  762. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  763. def evaluate(self, title):
  764. _count = 0
  765. if title is not None:
  766. _count = len(title.split(","))
  767. return _count
  768. def getSet(list_dict,key):
  769. _set = set()
  770. for item in list_dict:
  771. if key in item:
  772. if item[key]!='' and item[key] is not None:
  773. if re.search("^\d[\d\.]*$",item[key]) is not None:
  774. _set.add(str(float(item[key])))
  775. else:
  776. _set.add(str(item[key]))
  777. return _set
  778. def getDiffIndex(list_dict,key,confidence=100):
  779. _set = set()
  780. for _i in range(len(list_dict)):
  781. item = list_dict[_i]
  782. if item["confidence"]>=confidence:
  783. continue
  784. if key in item:
  785. if item[key]!='' and item[key] is not None:
  786. if re.search("^\d+(\.\d+)?$",item[key]) is not None:
  787. _set.add(str(float(item[key])))
  788. else:
  789. _set.add(str(item[key]))
  790. if len(_set)>1:
  791. return _i
  792. return len(list_dict)
  793. @annotate('bigint,string -> bigint,bigint')
  794. class f_getGroup_dumpFinal(BaseUDTF):
  795. '''
  796. 从最后的结果中获取组
  797. '''
  798. def __init__(self):
  799. import logging
  800. import json
  801. global json,logging
  802. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  803. def process(self,docid,dumplicates):
  804. self.forward(int(docid),int(docid))
  805. if dumplicates is not None:
  806. list_docids = dumplicates.split(",")
  807. for _docid in list_docids:
  808. self.forward(int(docid),int(_docid))
  809. @annotate('bigint,bigint,string,string,string,string,bigint,bigint,bigint->string')
  810. class f_redump_limit_num(BaseUDAF):
  811. '''
  812. 去重合并后重新判断,组内个数大于5时,dottitle、tenderee、win_tenderer、bidding_budget组内只能有一个取值
  813. 组内个数小于等于5时,tenderee、win_tenderer、bidding_budget组内只能有一个取值
  814. '''
  815. def __init__(self):
  816. import logging
  817. import json,re
  818. global json,logging,re
  819. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  820. def new_buffer(self):
  821. return [list()]
  822. def iterate(self, buffer,main_docid,docid,doctitle,set_limit_column2,set_limit_column3,set_limit_column4,extract_count1,extract_count2,confidence):
  823. buffer[0].append({"main_docid":main_docid,"docid":docid,"doctitle":doctitle,"set_limit_column2":set_limit_column2,
  824. "set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,"extract_count1":extract_count1,
  825. "extract_count2":extract_count2,"confidence":confidence})
  826. def merge(self, buffer, pbuffer):
  827. buffer[0].extend(pbuffer[0])
  828. def terminate(self, buffer):
  829. list_group = []
  830. the_group = buffer[0]
  831. the_group.sort(key=lambda x:x["confidence"],reverse=True)
  832. if len(the_group)>5:
  833. keys = ["doctitle","set_limit_column2","set_limit_column3","set_limit_column4"]
  834. else:
  835. keys = ["set_limit_column2","set_limit_column3","set_limit_column4"]
  836. final_group = []
  837. #置信度
  838. list_key_index = []
  839. for _k in keys:
  840. if _k=="doctitle":
  841. list_key_index.append(getDiffIndex(the_group,_k,confidence=30))
  842. else:
  843. list_key_index.append(getDiffIndex(the_group,_k))
  844. _index = min(list_key_index)
  845. if _index>1:
  846. main_docid = the_group[0]["main_docid"]
  847. for item in the_group[:_index]:
  848. if item["docid"]!=main_docid:
  849. final_group.append({"docid1":main_docid,"docid2":item["docid"],"extract_count1":item["extract_count1"],"extract_count2":item["extract_count2"],"confidence":item["confidence"]})
  850. # stay = True
  851. # for _key in keys:
  852. # if len(getSet(the_group,_key))>1:
  853. # stay = False
  854. # break
  855. #
  856. # if stay:
  857. # main_docid = the_group[0]["main_docid"]
  858. # for item in the_group:
  859. # if item["docid"]!=main_docid:
  860. # final_group.append({"docid1":main_docid,"docid2":item["docid"],"extract_count1":item["extract_count1"],"extract_count2":item["extract_count2"]})
  861. return json.dumps(final_group)
  862. @annotate('string -> bigint,bigint,bigint,bigint,bigint')
  863. class f_get_dumpFinal_checked(BaseUDTF):
  864. '''
  865. 从最后的结果中获取组
  866. '''
  867. def __init__(self):
  868. import logging
  869. import json
  870. global json,logging
  871. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  872. def process(self,list_group):
  873. if list_group is not None:
  874. final_group = json.loads(list_group)
  875. for _group in final_group:
  876. self.forward(_group["docid1"],_group["docid2"],_group["extract_count1"],_group["extract_count2"],_group["confidence"])
  877. @annotate('string -> bigint')
  878. class f_getDumplicateDocids(BaseUDTF):
  879. '''
  880. 从最后的结果中获取组
  881. '''
  882. def __init__(self):
  883. import logging
  884. import json
  885. global json,logging
  886. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  887. def process(self,dumplicates):
  888. list_docids = dumplicates.split(",")
  889. for _d in list_docids:
  890. self.forward(int(_d))