documentDumplicate.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989
  1. #coding:UTF8
  2. from odps.udf import annotate
  3. from odps.udf import BaseUDTF
  4. from odps.udf import BaseUDAF
  5. @annotate('string,string -> string,bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,bigint')
  6. class f_decode_extract(BaseUDTF):
  7. def __init__(self):
  8. import logging
  9. import json
  10. import time,re
  11. global json,logging,time,re
  12. self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*"
  13. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  14. def process(self, extractjson,otherjson):
  15. if extractjson is not None:
  16. _extract = json.loads(extractjson)
  17. else:
  18. _extract = {}
  19. if otherjson is not None:
  20. _other = json.loads(otherjson)
  21. else:
  22. _other = {}
  23. project_code = ""
  24. project_name = ""
  25. tenderee = ""
  26. agency = ""
  27. win_tenderer = ""
  28. bidding_budget = ""
  29. win_bid_price = ""
  30. fingerprint = ""
  31. page_time_stamp = 0
  32. docchannel = 0
  33. extract_count = 0
  34. page_time = _other.get("pageTime",time.strftime('%Y-%m-%d',time.localtime()))
  35. doctitle = _other.get("doctitle","")
  36. doctitle_refine = re.sub(r'工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|合同', '', doctitle)
  37. area = _other.get("area","")
  38. province = _other.get("province","")
  39. city = _other.get("city","")
  40. district = _other.get("district","")
  41. web_source_no = _other.get("webSourceNo","")
  42. docchannel = _other.get("docchannel",0)
  43. if re.search(self.time_pattern,page_time) is not None:
  44. try:
  45. timeArray = time.strptime(page_time[:11], "%Y-%m-%d")
  46. page_time_stamp = int(time.mktime(timeArray))
  47. except Exception as e:
  48. pass
  49. list_code = _extract.get("code",[])
  50. if len(list_code)>0:
  51. project_code = list_code[0]
  52. project_name = _extract.get("name","")
  53. fingerprint = _extract.get("fingerprint","")
  54. dict_pack = _extract.get("prem",{})
  55. logging.info(dict_pack)
  56. for _key in dict_pack.keys():
  57. if dict_pack[_key]["tendereeMoney"]!='' and float(dict_pack[_key]["tendereeMoney"])>0:
  58. extract_count += 1
  59. if bidding_budget=="":
  60. bidding_budget = str(float(dict_pack[_key]["tendereeMoney"]))
  61. for _role in dict_pack[_key]["roleList"]:
  62. extract_count += 1
  63. if _role[2]!='' and float(_role[2])>0:
  64. extract_count += 1
  65. if _role[0]=="tenderee":
  66. tenderee = _role[1]
  67. if _role[0]=="win_tenderer":
  68. if win_tenderer=="":
  69. win_tenderer = _role[1]
  70. if _role[2]!='' and float(_role[2])>0:
  71. extract_count += 1
  72. if win_bid_price=="":
  73. win_bid_price = str(float(_role[2]))
  74. if _role[0]=="agency":
  75. agency = _role[1]
  76. if project_code!="":
  77. extract_count += 1
  78. if project_name!="":
  79. extract_count += 1
  80. logging.info(page_time+doctitle+doctitle_refine+area+province+city+
  81. district+web_source_no+project_code+project_name+tenderee+agency+win_tenderer+bidding_budget+win_bid_price)
  82. self.forward(page_time,page_time_stamp,docchannel,doctitle,doctitle_refine,area,province,city,
  83. district,web_source_no,fingerprint,project_code,project_name,tenderee,agency,win_tenderer,bidding_budget,win_bid_price,extract_count)
  84. @annotate("string->bigint")
  85. class f_get_extractCount(object):
  86. def __init__(self):
  87. import time
  88. global time
  89. import logging
  90. import json
  91. import re
  92. global json,logging,re
  93. self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*"
  94. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  95. def evaluate(self, extractjson):
  96. if extractjson is not None:
  97. _extract = json.loads(extractjson)
  98. else:
  99. _extract = {}
  100. dict_pack = _extract.get("prem",{})
  101. extract_count = 0
  102. list_code = _extract.get("code",[])
  103. if len(list_code)>0:
  104. project_code = list_code[0]
  105. else:
  106. project_code = ""
  107. project_name = _extract.get("name","")
  108. bidding_budget = ""
  109. win_tenderer = ""
  110. win_bid_price = ""
  111. for _key in dict_pack.keys():
  112. if dict_pack[_key]["tendereeMoney"]!='' and float(dict_pack[_key]["tendereeMoney"])>0:
  113. extract_count += 1
  114. if bidding_budget=="":
  115. bidding_budget = str(float(dict_pack[_key]["tendereeMoney"]))
  116. for _role in dict_pack[_key]["roleList"]:
  117. extract_count += 1
  118. if _role[2]!='' and float(_role[2])>0:
  119. extract_count += 1
  120. if _role[0]=="tenderee":
  121. tenderee = _role[1]
  122. if _role[0]=="win_tenderer":
  123. if win_tenderer=="":
  124. win_tenderer = _role[1]
  125. if _role[2]!='' and float(_role[2])>0:
  126. extract_count += 1
  127. if win_bid_price=="":
  128. win_bid_price = str(float(_role[2]))
  129. if _role[0]=="agency":
  130. agency = _role[1]
  131. if project_code!="":
  132. extract_count += 1
  133. if project_name!="":
  134. extract_count += 1
  135. return extract_count
  136. @annotate('string,string,string,string,string -> string,string,string,bigint')
  137. class f_decode_sub_docs_json(BaseUDTF):
  138. def __init__(self):
  139. import logging
  140. import json
  141. global json,logging
  142. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  143. def process(self, project_code,project_name,tenderee,agency,sub_docs_json):
  144. columns = {"win_tenderer":"","bidding_budget":"","win_bid_price":""}
  145. extract_count = 0
  146. if project_code is not None and project_code!="":
  147. extract_count += 1
  148. if project_name is not None and project_name!="":
  149. extract_count += 1
  150. if tenderee is not None and tenderee!="":
  151. extract_count += 1
  152. if agency is not None and agency!="":
  153. extract_count += 1
  154. if sub_docs_json is not None:
  155. for sub_docs in json.loads(sub_docs_json):
  156. for _key_sub_docs in sub_docs.keys():
  157. extract_count += 1
  158. if _key_sub_docs in columns:
  159. if columns[_key_sub_docs]=="" and str(sub_docs[_key_sub_docs]) not in ["","0"]:
  160. if _key_sub_docs in ["bidding_budget","win_bid_price"]:
  161. if float(sub_docs[_key_sub_docs])>0:
  162. columns[_key_sub_docs] = str(float(sub_docs[_key_sub_docs]))
  163. else:
  164. columns[_key_sub_docs] = str(sub_docs[_key_sub_docs])
  165. self.forward(columns["win_tenderer"],columns["bidding_budget"],columns["win_bid_price"],extract_count)
  166. @annotate("string->bigint")
  167. class totimestamp(object):
  168. def __init__(self):
  169. import time
  170. global time
  171. import logging
  172. import json
  173. import re
  174. global json,logging,re
  175. self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*"
  176. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  177. def evaluate(self, str_time):
  178. try:
  179. logging.info(str_time)
  180. if str_time is not None and re.search(self.time_pattern,str_time) is not None:
  181. timeArray = time.strptime(str_time[:10], "%Y-%m-%d")
  182. timeStamp = int(time.mktime(timeArray))
  183. return timeStamp
  184. else:
  185. return 0
  186. except Exception as e:
  187. return 0
  188. @annotate("string->string")
  189. class refind_name(object):
  190. def __init__(self):
  191. import logging
  192. import re
  193. global logging,re
  194. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  195. def evaluate(self, title):
  196. if title is not None:
  197. return re.sub(r'工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|\[|\]|【|】', '', title)
  198. return ""
  199. @annotate('bigint,bigint,bigint,string,bigint,string->string')
  200. class f_set_docid(BaseUDAF):
  201. '''
  202. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  203. '''
  204. def __init__(self):
  205. import json
  206. global json
  207. def new_buffer(self):
  208. return [[]]
  209. def iterate(self, buffer,docid, page_time_stamp,extract_count,defind_column,defind_count,tenderee):
  210. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count,
  211. "defind_column":defind_column,"defind_count":defind_count,"tenderee":tenderee})
  212. def merge(self, buffer, pbuffer):
  213. buffer[0].extend(pbuffer[0])
  214. def terminate(self, buffer):
  215. list_docs = buffer[0]
  216. list_docs.sort(key=lambda x:x["page_time_stamp"])
  217. list_group = []
  218. _begin = 0
  219. defind_count = 0
  220. if len(list_docs)>0:
  221. defind_count = list_docs[0]["defind_count"]
  222. for i in range(len(list_docs)-1):
  223. if abs(list_docs[i]["page_time_stamp"]-list_docs[i+1]["page_time_stamp"])<=86400*2:
  224. continue
  225. else:
  226. _group = []
  227. _set_column = set()
  228. _set_tenderee = set()
  229. for j in range(_begin,i+1):
  230. if list_docs[j]["tenderee"] is not None and list_docs[j]["tenderee"]!="":
  231. _set_tenderee.add(list_docs[j]["tenderee"])
  232. _set_column.add(list_docs[j]["defind_column"])
  233. _group.append({"docid":list_docs[j]["docid"],"extract_count":list_docs[j]["extract_count"]})
  234. if len(_group)>=3 and len(_set_tenderee)>1:
  235. pass
  236. else:
  237. if len(_group)>1:
  238. if defind_count==2:
  239. if len(_set_column)>=2:
  240. list_group.append(_group)
  241. elif defind_count==1:
  242. if len(_set_column)==1:
  243. list_group.append(_group)
  244. elif defind_count==0:
  245. list_group.append(_group)
  246. _begin = i+1
  247. if len(list_docs)>1:
  248. _set_column = set()
  249. _set_tenderee = set()
  250. _group = []
  251. for j in range(_begin,len(list_docs)):
  252. if list_docs[j]["tenderee"] is not None and list_docs[j]["tenderee"]!="":
  253. _set_tenderee.add(list_docs[j]["tenderee"])
  254. _set_column.add(list_docs[j]["defind_column"])
  255. _group.append({"docid":list_docs[j]["docid"],"extract_count":list_docs[j]["extract_count"]})
  256. if len(_group)>=3 and len(_set_tenderee)>1:
  257. pass
  258. else:
  259. if len(_group)>1:
  260. if defind_count==2:
  261. if len(_set_column)>=2:
  262. list_group.append(_group)
  263. elif defind_count==1:
  264. if len(_set_column)==1:
  265. list_group.append(_group)
  266. elif defind_count==0:
  267. list_group.append(_group)
  268. return json.dumps(list_group)
  269. def isEmpty(_str):
  270. if _str is None or _str=="":
  271. return True
  272. return False
  273. @annotate('bigint,bigint,bigint,string,string,string,string,string,string,string,string->string')
  274. class f_set_docid_binaryChart(BaseUDAF):
  275. '''
  276. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  277. '''
  278. def __init__(self):
  279. import json
  280. global json
  281. def new_buffer(self):
  282. return [[]]
  283. def iterate(self, buffer,docid, page_time_stamp,extract_count,project_code,project_name,tenderee,bidding_budget,win_tenderer,win_bid_price,agency,web_source_no):
  284. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count,
  285. "project_code":project_code,"project_name":project_name,"tenderee":tenderee,
  286. "bidding_budget":bidding_budget,"win_tenderer":win_tenderer,"win_bid_price":win_bid_price,
  287. "agency":agency,"web_source_no":web_source_no})
  288. def merge(self, buffer, pbuffer):
  289. buffer[0].extend(pbuffer[0])
  290. def terminate(self, buffer):
  291. list_docs = buffer[0]
  292. list_timeGroups = split_with_time(list_docs,"page_time_stamp",86400*2)
  293. list_group = []
  294. empty_key = ["project_code","bidding_budget","win_tenderer","win_bid_price","agency"]
  295. for _timeGroups in list_timeGroups:
  296. list_empty = []
  297. list_notEmpty = []
  298. for _item in _timeGroups:
  299. empty_flag = True
  300. for _key in empty_key:
  301. if not isEmpty(_item[_key]):
  302. empty_flag = False
  303. break
  304. if empty_flag:
  305. list_empty.append(_item)
  306. else:
  307. list_notEmpty.append(_item)
  308. for _e in list_empty:
  309. _group = [{"docid":_e["docid"],"extract_count":_e["extract_count"]}]
  310. _e_tenderee = _e["tenderee"]
  311. for _ne in list_notEmpty:
  312. if "set_webSource" not in _ne:
  313. _ne["set_webSource"] = set()
  314. _ne["set_webSource"].add(_ne["web_source_no"])
  315. _suit = False
  316. if not isEmpty(_e_tenderee) and _e_tenderee==_ne["tenderee"]:
  317. _suit = True
  318. elif isEmpty(_e_tenderee):
  319. _suit = True
  320. if _suit:
  321. if _e["web_source_no"] not in _ne["set_webSource"]:
  322. _ne["set_webSource"].add(_e["web_source_no"])
  323. _group.append({"docid":_ne["docid"],"extract_count":_ne["extract_count"]})
  324. break
  325. if len(_group)>1:
  326. list_group.append(_group)
  327. return json.dumps(list_group)
  328. def split_with_time(list_dict,sort_key,timedelta=86400*2):
  329. if len(list_dict)>0:
  330. if sort_key in list_dict[0]:
  331. list_dict.sort(key=lambda x:x[sort_key])
  332. list_group = []
  333. _begin = 0
  334. for i in range(len(list_dict)-1):
  335. if abs(list_dict[i][sort_key]-list_dict[i+1][sort_key])<timedelta:
  336. continue
  337. else:
  338. _group = []
  339. for j in range(_begin,i+1):
  340. _group.append(list_dict[j])
  341. if len(_group)>1:
  342. list_group.append(_group)
  343. _begin = i + 1
  344. if len(list_dict)>1:
  345. _group = []
  346. for j in range(_begin,len(list_dict)):
  347. _group.append(list_dict[j])
  348. if len(_group)>1:
  349. list_group.append(_group)
  350. return list_group
  351. return [list_dict]
  352. @annotate('bigint,bigint,bigint,string,string,string,string,string->string')
  353. class f_set_docid_limitNum_contain(BaseUDAF):
  354. '''
  355. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同
  356. '''
  357. def __init__(self):
  358. import logging
  359. import json,re
  360. global json,logging,re
  361. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  362. def new_buffer(self):
  363. return [list()]
  364. def iterate(self, buffer,docid,page_time_stamp,extract_count,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column):
  365. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count,"set_limit_column1":set_limit_column1,
  366. "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,
  367. "contain_column":contain_column})
  368. def merge(self, buffer, pbuffer):
  369. buffer[0].extend(pbuffer[0])
  370. def terminate(self, buffer):
  371. list_split = split_with_time(buffer[0],"page_time_stamp")
  372. list_group = []
  373. for _split in list_split:
  374. flag = True
  375. keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"]
  376. for _key in keys:
  377. logging.info(_key+str(getSet(_split,_key)))
  378. if len(getSet(_split,_key))>1:
  379. flag = False
  380. break
  381. MAX_CONTAIN_COLUMN = None
  382. #判断组内每条公告是否包含
  383. if flag:
  384. for _d in _split:
  385. contain_column = _d["contain_column"]
  386. if contain_column is not None and contain_column !="":
  387. if MAX_CONTAIN_COLUMN is None:
  388. MAX_CONTAIN_COLUMN = contain_column
  389. else:
  390. if len(MAX_CONTAIN_COLUMN)<len(contain_column):
  391. if contain_column.find(MAX_CONTAIN_COLUMN)==-1:
  392. flag = False
  393. break
  394. MAX_CONTAIN_COLUMN = contain_column
  395. else:
  396. if MAX_CONTAIN_COLUMN.find(contain_column)==-1:
  397. flag = False
  398. break
  399. if flag:
  400. if len(_split)>1:
  401. _group = []
  402. for _item in _split:
  403. _group.append({"docid":_item["docid"],"extract_count":_item["extract_count"]})
  404. list_group.append(_group)
  405. return json.dumps(list_group)
  406. @annotate('bigint->string')
  407. class f_stamp_squence(BaseUDAF):
  408. '''
  409. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  410. '''
  411. def __init__(self):
  412. import json
  413. global json
  414. import logging
  415. global logging
  416. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  417. def new_buffer(self):
  418. return [set()]
  419. def iterate(self, buffer,page_time_stamp):
  420. buffer[0].add(page_time_stamp)
  421. def merge(self, buffer, pbuffer):
  422. buffer[0] |= pbuffer[0]
  423. def terminate(self, buffer):
  424. if 0 in buffer[0]:
  425. buffer[0].remove(0)
  426. list_stamp = list(buffer[0])
  427. list_stamp.sort(key=lambda x:x)
  428. list_stamp_final = []
  429. _begin = 0
  430. _time_decase = 86400*2
  431. logging.info(str(list_stamp))
  432. for _index in range(len(list_stamp)-1):
  433. if list_stamp[_index+1]-list_stamp[_index]<_time_decase:
  434. continue
  435. else:
  436. list_stamp_final.append([list_stamp[_begin]-_time_decase,list_stamp[_index]+_time_decase])
  437. _begin = _index+1
  438. if len(list_stamp)>0:
  439. list_stamp_final.append([list_stamp[_begin]-_time_decase,list_stamp[-1]+_time_decase])
  440. return json.dumps(list_stamp_final)
  441. @annotate("bigint,string->bigint")
  442. class in_stamp(object):
  443. def __init__(self):
  444. import logging
  445. import re
  446. import json
  447. global logging,re,json
  448. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  449. def evaluate(self, page_time_stamp,json_stamp):
  450. list_stamp = json.loads(json_stamp)
  451. int_flag = 0
  452. for item in list_stamp:
  453. if page_time_stamp <item[0]:
  454. break
  455. if page_time_stamp>item[0] and page_time_stamp<item[1]:
  456. int_flag = 1
  457. break
  458. return int_flag
  459. def getConfidence(rule_id):
  460. if rule_id ==0:
  461. return 30
  462. elif rule_id >=1 and rule_id <30:
  463. return 20
  464. else:
  465. return 10
  466. @annotate('string,bigint -> bigint,bigint,bigint,bigint,bigint')
  467. class f_split_group_single(BaseUDTF):
  468. '''
  469. 将多个组拆解成多条记录
  470. '''
  471. def __init__(self):
  472. import logging
  473. import json
  474. global json,logging
  475. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  476. def process(self, json_set_docid,rule_id):
  477. list_group = json.loads(json_set_docid)
  478. for item in list_group:
  479. if len(item)>100:
  480. item.sort(key=lambda x:x["docid"],reverse=True)
  481. index_i = 0
  482. for index_j in range(1,len(item)):
  483. if item[index_i]["docid"]!=item[index_j]["docid"]:
  484. self.forward(item[index_i]["docid"],item[index_j]["docid"],item[index_i]["extract_count"],item[index_j]["extract_count"],getConfidence(rule_id))
  485. else:
  486. for index_i in range(len(item)):
  487. for index_j in range(len(item)):
  488. if index_i!=index_j and item[index_i]["docid"]!=item[index_j]["docid"]:
  489. self.forward(item[index_i]["docid"],item[index_j]["docid"],item[index_i]["extract_count"],item[index_j]["extract_count"],getConfidence(rule_id))
  490. @annotate('bigint,string->string')
  491. class group_document(BaseUDAF):
  492. '''
  493. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  494. '''
  495. def __init__(self):
  496. import json
  497. global json
  498. def new_buffer(self):
  499. return [[]]
  500. def iterate(self, buffer,id,json_set_docid):
  501. buffer[0].append({"id":id,"json_set_docid":json.loads(json_set_docid)})
  502. def merge(self, buffer, pbuffer):
  503. buffer[0].extend(pbuffer[0])
  504. def terminate(self, buffer):
  505. return json.dumps(buffer[0])
  506. @annotate('bigint,string,bigint,string -> bigint,bigint,string')
  507. class decare_document(BaseUDTF):
  508. '''
  509. 将多个组拆解成多条记录
  510. '''
  511. def __init__(self):
  512. import logging
  513. import json
  514. global json,logging
  515. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  516. def process(self,group_id1, json_list_doc1,group_id2,json_list_doc2):
  517. #y=x,少掉近一半的数据
  518. if group_id1>=group_id2:
  519. list_doc1 = json.loads(json_list_doc1)
  520. list_doc2 = json.loads(json_list_doc2)
  521. for _doc1 in list_doc1:
  522. for _doc2 in list_doc2:
  523. #同一个重复group不做判断
  524. if _doc1["id"]!=_doc2["id"]:
  525. #判断两个group是否有重复
  526. _set1 = set()
  527. for _item1 in _doc1["json_set_docid"]:
  528. _set1.add(_item1["docid"])
  529. _set2 = set()
  530. for _item2 in _doc2["json_set_docid"]:
  531. _set2.add(_item2["docid"])
  532. if len(_set1&_set2)>0:
  533. new_json_set_docid = _doc1["json_set_docid"]
  534. for _item2 in _doc2["json_set_docid"]:
  535. if _item2["docid"] not in _set1:
  536. new_json_set_docid.append(_item2)
  537. self.forward(_doc1["id"],_doc2["id"],json.dumps(new_json_set_docid))
  538. def getBestDocid(list_pair):
  539. list_pair.sort(key=lambda x:x[3],reverse=True)
  540. _max_count = max(list_pair[0][3],list_pair[0][1])
  541. set_candidate = set()
  542. if list_pair[0][1]==_max_count:
  543. set_candidate.add(list_pair[0][0])
  544. for item in list_pair:
  545. if item[3]==_max_count:
  546. set_candidate.add(item[2])
  547. else:
  548. break
  549. list_candidate = list(set_candidate)
  550. list_candidate.sort(key=lambda x:x)
  551. return list_candidate[0]
  552. @annotate('bigint,bigint,bigint,bigint->string')
  553. class choose_document(BaseUDAF):
  554. '''
  555. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  556. '''
  557. def __init__(self):
  558. import json
  559. global json
  560. def new_buffer(self):
  561. return [[]]
  562. def iterate(self, buffer,docid1,extract_count1,docid2,extract_count2):
  563. buffer[0].append([docid1,extract_count1,docid2,extract_count2])
  564. def merge(self, buffer, pbuffer):
  565. buffer[0].extend(pbuffer[0])
  566. def terminate(self, buffer):
  567. list_pair = buffer[0]
  568. _set = set()
  569. for item in buffer[0]:
  570. _set.add(str(item[2]))
  571. list_dumplicate = list(_set)
  572. best_docid = getBestDocid(list_pair)
  573. if best_docid==list_pair[0][0]:
  574. save_flag = 1
  575. else:
  576. save_flag = 0
  577. return json.dumps({"save_flag":save_flag,"dumplicates":list_dumplicate})
  578. @annotate('string -> bigint,string')
  579. class f_get_choose_document(BaseUDTF):
  580. '''
  581. 将多个组拆解成多条记录
  582. '''
  583. def __init__(self):
  584. import logging
  585. import json
  586. global json,logging
  587. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  588. def process(self,json_choose):
  589. if json_choose is None:
  590. self.forward(1,None)
  591. else:
  592. _choose = json.loads(json_choose)
  593. self.forward(_choose["save_flag"],",".join(_choose["dumplicates"]))
  594. @annotate('bigint,bigint,bigint,bigint->string')
  595. class group_document_bestFirst(BaseUDAF):
  596. '''
  597. 将组里面最优的放在前面
  598. '''
  599. def __init__(self):
  600. import json
  601. global json
  602. def new_buffer(self):
  603. return [[]]
  604. def iterate(self, buffer,docid1,extract_count1,docid2,extract_count2):
  605. buffer[0].append([docid1,extract_count1,docid2,extract_count2])
  606. def merge(self, buffer, pbuffer):
  607. buffer[0].extend(pbuffer[0])
  608. def terminate(self, buffer):
  609. list_pair = buffer[0]
  610. _set = set()
  611. for item in buffer[0]:
  612. _set.add(item[2])
  613. _set.add(list_pair[0][0])
  614. best_docid = getBestDocid(list_pair)
  615. _set.remove(best_docid)
  616. list_dumplicate = list(_set)
  617. list_dumplicate.sort(key=lambda x:x)
  618. list_dumplicate.insert(0,best_docid)
  619. list_dumplicate_str = []
  620. for item in list_dumplicate:
  621. list_dumplicate_str.append(str(item))
  622. return ",".join(list_dumplicate_str)
  623. @annotate('string -> bigint,string')
  624. class f_get_best_dumplicates(BaseUDTF):
  625. '''
  626. 得到每个分组中最优的那一条及其重复记录
  627. '''
  628. def __init__(self):
  629. import logging
  630. import json
  631. global json,logging
  632. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  633. def process(self,list_dumplicate_str):
  634. if list_dumplicate_str is None:
  635. pass
  636. else:
  637. list_dumplicate = list_dumplicate_str.split(",")
  638. if len(list_dumplicate)>0:
  639. self.forward(int(list_dumplicate[0]),",".join(list_dumplicate[1:]))
  640. else:
  641. pass
  642. @annotate('bigint,bigint->string')
  643. class bridge2group(BaseUDAF):
  644. '''
  645. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  646. '''
  647. def __init__(self):
  648. import json
  649. global json
  650. def new_buffer(self):
  651. return [set()]
  652. def iterate(self, buffer,docid1,docid2):
  653. buffer[0].add(docid1)
  654. buffer[0].add(docid2)
  655. def merge(self, buffer, pbuffer):
  656. buffer[0] |= pbuffer[0]
  657. def terminate(self, buffer):
  658. list_pair = list(buffer[0])
  659. list_pair.sort(key=lambda x:x,reverse=True)
  660. return json.dumps(list_pair)
  661. @annotate('string -> bigint,bigint')
  662. class group2bridge(BaseUDTF):
  663. '''
  664. 将多个组拆解成多条记录
  665. '''
  666. def __init__(self):
  667. import logging
  668. import json
  669. global json,logging
  670. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  671. def process(self,json_list_docid):
  672. list_docid = json.loads(json_list_docid)
  673. for _docid in list_docid:
  674. self.forward(list_docid[-1],_docid)
  675. @annotate('bigint,bigint,string -> bigint')
  676. class f_get_dump_docid(BaseUDTF):
  677. '''
  678. 将多个组拆解成多条记录
  679. '''
  680. def __init__(self):
  681. import logging
  682. import json
  683. global json,logging
  684. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  685. def process(self,docid,save_flag,dumplicates):
  686. if save_flag==0:
  687. self.forward(docid)
  688. if dumplicates is not None:
  689. list_docid = dumplicates.split(",")
  690. if len(list_docid)>0:
  691. for _docid in list_docid[1:]:
  692. self.forward(int(_docid))
  693. else:
  694. if dumplicates is not None:
  695. list_docid = dumplicates.split(",")
  696. if len(list_docid)>0:
  697. for _docid in list_docid:
  698. self.forward(int(_docid))
  699. @annotate('string -> bigint,bigint')
  700. class f_get_docid(BaseUDTF):
  701. '''
  702. 将多个组拆解成多条记录
  703. '''
  704. def __init__(self):
  705. import logging
  706. import json
  707. global json,logging
  708. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  709. def process(self,json_set_docid):
  710. team_id = 0
  711. if json_set_docid is not None:
  712. list_docses = json.loads(json_set_docid)
  713. for list_docs in list_docses:
  714. team_id += 1
  715. for item in list_docs:
  716. self.forward(team_id,item["docid"])
  717. @annotate("string->bigint")
  718. class get_count_dump(object):
  719. def __init__(self):
  720. import logging
  721. import re
  722. global logging,re
  723. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  724. def evaluate(self, title):
  725. _count = 0
  726. if title is not None:
  727. _count = len(title.split(","))
  728. return _count
  729. def getSet(list_dict,key):
  730. _set = set()
  731. for item in list_dict:
  732. if key in item:
  733. if item[key]!='' and item[key] is not None:
  734. if re.search("^\d[\d\.]*$",item[key]) is not None:
  735. _set.add(str(float(item[key])))
  736. else:
  737. _set.add(str(item[key]))
  738. return _set
  739. def getDiffIndex(list_dict,key,confidence=100):
  740. _set = set()
  741. for _i in range(len(list_dict)):
  742. item = list_dict[_i]
  743. if item["confidence"]>=confidence:
  744. continue
  745. if key in item:
  746. if item[key]!='' and item[key] is not None:
  747. if re.search("^\d[\d\.]*$",item[key]) is not None:
  748. _set.add(str(float(item[key])))
  749. else:
  750. _set.add(str(item[key]))
  751. if len(_set)>1:
  752. return _i
  753. return len(list_dict)
  754. @annotate('bigint,string -> bigint,bigint')
  755. class f_getGroup_dumpFinal(BaseUDTF):
  756. '''
  757. 从最后的结果中获取组
  758. '''
  759. def __init__(self):
  760. import logging
  761. import json
  762. global json,logging
  763. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  764. def process(self,docid,dumplicates):
  765. self.forward(int(docid),int(docid))
  766. if dumplicates is not None:
  767. list_docids = dumplicates.split(",")
  768. for _docid in list_docids:
  769. self.forward(int(docid),int(_docid))
  770. @annotate('bigint,bigint,string,string,string,string,bigint,bigint,bigint->string')
  771. class f_redump_limit_num(BaseUDAF):
  772. '''
  773. 去重合并后重新判断,组内个数大于5时,dottitle、tenderee、win_tenderer、bidding_budget组内只能有一个取值
  774. 组内个数小于等于5时,tenderee、win_tenderer、bidding_budget组内只能有一个取值
  775. '''
  776. def __init__(self):
  777. import logging
  778. import json,re
  779. global json,logging,re
  780. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  781. def new_buffer(self):
  782. return [list()]
  783. def iterate(self, buffer,main_docid,docid,doctitle,set_limit_column2,set_limit_column3,set_limit_column4,extract_count1,extract_count2,confidence):
  784. buffer[0].append({"main_docid":main_docid,"docid":docid,"doctitle":doctitle,"set_limit_column2":set_limit_column2,
  785. "set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,"extract_count1":extract_count1,
  786. "extract_count2":extract_count2,"confidence":confidence})
  787. def merge(self, buffer, pbuffer):
  788. buffer[0].extend(pbuffer[0])
  789. def terminate(self, buffer):
  790. list_group = []
  791. the_group = buffer[0]
  792. the_group.sort(key=lambda x:x["confidence"],reverse=True)
  793. if len(the_group)>5:
  794. keys = ["doctitle","set_limit_column2","set_limit_column3","set_limit_column4"]
  795. else:
  796. keys = ["set_limit_column2","set_limit_column3","set_limit_column4"]
  797. final_group = []
  798. #置信度
  799. list_key_index = []
  800. for _k in keys:
  801. if _k=="doctitle":
  802. list_key_index.append(getDiffIndex(the_group,_k,confidence=30))
  803. else:
  804. list_key_index.append(getDiffIndex(the_group,_k))
  805. _index = min(list_key_index)
  806. if _index>1:
  807. main_docid = the_group[0]["main_docid"]
  808. for item in the_group[:_index]:
  809. if item["docid"]!=main_docid:
  810. final_group.append({"docid1":main_docid,"docid2":item["docid"],"extract_count1":item["extract_count1"],"extract_count2":item["extract_count2"],"confidence":item["confidence"]})
  811. # stay = True
  812. # for _key in keys:
  813. # if len(getSet(the_group,_key))>1:
  814. # stay = False
  815. # break
  816. #
  817. # if stay:
  818. # main_docid = the_group[0]["main_docid"]
  819. # for item in the_group:
  820. # if item["docid"]!=main_docid:
  821. # final_group.append({"docid1":main_docid,"docid2":item["docid"],"extract_count1":item["extract_count1"],"extract_count2":item["extract_count2"]})
  822. return json.dumps(final_group)
  823. @annotate('string -> bigint,bigint,bigint,bigint,bigint')
  824. class f_get_dumpFinal_checked(BaseUDTF):
  825. '''
  826. 从最后的结果中获取组
  827. '''
  828. def __init__(self):
  829. import logging
  830. import json
  831. global json,logging
  832. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  833. def process(self,list_group):
  834. if list_group is not None:
  835. final_group = json.loads(list_group)
  836. for _group in final_group:
  837. self.forward(_group["docid1"],_group["docid2"],_group["extract_count1"],_group["extract_count2"],_group["confidence"])
  838. @annotate('string -> bigint')
  839. class f_getDumplicateDocids(BaseUDTF):
  840. '''
  841. 从最后的结果中获取组
  842. '''
  843. def __init__(self):
  844. import logging
  845. import json
  846. global json,logging
  847. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  848. def process(self,dumplicates):
  849. list_docids = dumplicates.split(",")
  850. for _d in list_docids:
  851. self.forward(int(_d))