documentDumplicate.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986
  1. #coding:UTF8
  2. from odps.udf import annotate
  3. from odps.udf import BaseUDTF
  4. from odps.udf import BaseUDAF
  5. @annotate('string,string -> string,bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,bigint')
  6. class f_decode_extract(BaseUDTF):
  7. def __init__(self):
  8. import logging
  9. import json
  10. import time,re
  11. global json,logging,time,re
  12. self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*"
  13. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  14. def process(self, extractjson,otherjson):
  15. if extractjson is not None:
  16. _extract = json.loads(extractjson)
  17. else:
  18. _extract = {}
  19. if otherjson is not None:
  20. _other = json.loads(otherjson)
  21. else:
  22. _other = {}
  23. project_code = ""
  24. project_name = ""
  25. tenderee = ""
  26. agency = ""
  27. win_tenderer = ""
  28. bidding_budget = ""
  29. win_bid_price = ""
  30. fingerprint = ""
  31. page_time_stamp = 0
  32. docchannel = 0
  33. extract_count = 0
  34. page_time = _other.get("pageTime",time.strftime('%Y-%m-%d',time.localtime()))
  35. doctitle = _other.get("doctitle","")
  36. doctitle_refine = re.sub(r'工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|合同', '', doctitle)
  37. area = _other.get("area","")
  38. province = _other.get("province","")
  39. city = _other.get("city","")
  40. district = _other.get("district","")
  41. web_source_no = _other.get("webSourceNo","")
  42. docchannel = _other.get("docchannel",0)
  43. if re.search(self.time_pattern,page_time) is not None:
  44. timeArray = time.strptime(page_time[:11], "%Y-%m-%d")
  45. page_time_stamp = int(time.mktime(timeArray))
  46. list_code = _extract.get("code",[])
  47. if len(list_code)>0:
  48. project_code = list_code[0]
  49. project_name = _extract.get("name","")
  50. fingerprint = _extract.get("fingerprint","")
  51. dict_pack = _extract.get("prem",{})
  52. logging.info(dict_pack)
  53. for _key in dict_pack.keys():
  54. if dict_pack[_key]["tendereeMoney"]!='' and float(dict_pack[_key]["tendereeMoney"])>0:
  55. extract_count += 1
  56. if bidding_budget=="":
  57. bidding_budget = str(float(dict_pack[_key]["tendereeMoney"]))
  58. for _role in dict_pack[_key]["roleList"]:
  59. extract_count += 1
  60. if _role[2]!='' and float(_role[2])>0:
  61. extract_count += 1
  62. if _role[0]=="tenderee":
  63. tenderee = _role[1]
  64. if _role[0]=="win_tenderer":
  65. if win_tenderer=="":
  66. win_tenderer = _role[1]
  67. if _role[2]!='' and float(_role[2])>0:
  68. extract_count += 1
  69. if win_bid_price=="":
  70. win_bid_price = str(float(_role[2]))
  71. if _role[0]=="agency":
  72. agency = _role[1]
  73. if project_code!="":
  74. extract_count += 1
  75. if project_name!="":
  76. extract_count += 1
  77. logging.info(page_time+doctitle+doctitle_refine+area+province+city+
  78. district+web_source_no+project_code+project_name+tenderee+agency+win_tenderer+bidding_budget+win_bid_price)
  79. self.forward(page_time,page_time_stamp,docchannel,doctitle,doctitle_refine,area,province,city,
  80. district,web_source_no,fingerprint,project_code,project_name,tenderee,agency,win_tenderer,bidding_budget,win_bid_price,extract_count)
  81. @annotate("string->bigint")
  82. class f_get_extractCount(object):
  83. def __init__(self):
  84. import time
  85. global time
  86. import logging
  87. import json
  88. import re
  89. global json,logging,re
  90. self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*"
  91. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  92. def evaluate(self, extractjson):
  93. if extractjson is not None:
  94. _extract = json.loads(extractjson)
  95. else:
  96. _extract = {}
  97. dict_pack = _extract.get("prem",{})
  98. extract_count = 0
  99. list_code = _extract.get("code",[])
  100. if len(list_code)>0:
  101. project_code = list_code[0]
  102. else:
  103. project_code = ""
  104. project_name = _extract.get("name","")
  105. bidding_budget = ""
  106. win_tenderer = ""
  107. win_bid_price = ""
  108. for _key in dict_pack.keys():
  109. if dict_pack[_key]["tendereeMoney"]!='' and float(dict_pack[_key]["tendereeMoney"])>0:
  110. extract_count += 1
  111. if bidding_budget=="":
  112. bidding_budget = str(float(dict_pack[_key]["tendereeMoney"]))
  113. for _role in dict_pack[_key]["roleList"]:
  114. extract_count += 1
  115. if _role[2]!='' and float(_role[2])>0:
  116. extract_count += 1
  117. if _role[0]=="tenderee":
  118. tenderee = _role[1]
  119. if _role[0]=="win_tenderer":
  120. if win_tenderer=="":
  121. win_tenderer = _role[1]
  122. if _role[2]!='' and float(_role[2])>0:
  123. extract_count += 1
  124. if win_bid_price=="":
  125. win_bid_price = str(float(_role[2]))
  126. if _role[0]=="agency":
  127. agency = _role[1]
  128. if project_code!="":
  129. extract_count += 1
  130. if project_name!="":
  131. extract_count += 1
  132. return extract_count
  133. @annotate('string,string,string,string,string -> string,string,string,bigint')
  134. class f_decode_sub_docs_json(BaseUDTF):
  135. def __init__(self):
  136. import logging
  137. import json
  138. global json,logging
  139. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  140. def process(self, project_code,project_name,tenderee,agency,sub_docs_json):
  141. columns = {"win_tenderer":"","bidding_budget":"","win_bid_price":""}
  142. extract_count = 0
  143. if project_code is not None and project_code!="":
  144. extract_count += 1
  145. if project_name is not None and project_name!="":
  146. extract_count += 1
  147. if tenderee is not None and tenderee!="":
  148. extract_count += 1
  149. if agency is not None and agency!="":
  150. extract_count += 1
  151. if sub_docs_json is not None:
  152. for sub_docs in json.loads(sub_docs_json):
  153. for _key_sub_docs in sub_docs.keys():
  154. extract_count += 1
  155. if _key_sub_docs in columns:
  156. if columns[_key_sub_docs]=="" and str(sub_docs[_key_sub_docs]) not in ["","0"]:
  157. if _key_sub_docs in ["bidding_budget","win_bid_price"]:
  158. if float(sub_docs[_key_sub_docs])>0:
  159. columns[_key_sub_docs] = str(float(sub_docs[_key_sub_docs]))
  160. else:
  161. columns[_key_sub_docs] = str(sub_docs[_key_sub_docs])
  162. self.forward(columns["win_tenderer"],columns["bidding_budget"],columns["win_bid_price"],extract_count)
  163. @annotate("string->bigint")
  164. class totimestamp(object):
  165. def __init__(self):
  166. import time
  167. global time
  168. import logging
  169. import json
  170. import re
  171. global json,logging,re
  172. self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*"
  173. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  174. def evaluate(self, str_time):
  175. try:
  176. logging.info(str_time)
  177. if str_time is not None and re.search(self.time_pattern,str_time) is not None:
  178. timeArray = time.strptime(str_time[:10], "%Y-%m-%d")
  179. timeStamp = int(time.mktime(timeArray))
  180. return timeStamp
  181. else:
  182. return 0
  183. except Exception as e:
  184. return 0
  185. @annotate("string->string")
  186. class refind_name(object):
  187. def __init__(self):
  188. import logging
  189. import re
  190. global logging,re
  191. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  192. def evaluate(self, title):
  193. if title is not None:
  194. return re.sub(r'工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|\[|\]|【|】', '', title)
  195. return ""
  196. @annotate('bigint,bigint,bigint,string,bigint,string->string')
  197. class f_set_docid(BaseUDAF):
  198. '''
  199. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  200. '''
  201. def __init__(self):
  202. import json
  203. global json
  204. def new_buffer(self):
  205. return [[]]
  206. def iterate(self, buffer,docid, page_time_stamp,extract_count,defind_column,defind_count,tenderee):
  207. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count,
  208. "defind_column":defind_column,"defind_count":defind_count,"tenderee":tenderee})
  209. def merge(self, buffer, pbuffer):
  210. buffer[0].extend(pbuffer[0])
  211. def terminate(self, buffer):
  212. list_docs = buffer[0]
  213. list_docs.sort(key=lambda x:x["page_time_stamp"])
  214. list_group = []
  215. _begin = 0
  216. defind_count = 0
  217. if len(list_docs)>0:
  218. defind_count = list_docs[0]["defind_count"]
  219. for i in range(len(list_docs)-1):
  220. if abs(list_docs[i]["page_time_stamp"]-list_docs[i+1]["page_time_stamp"])<=86400*2:
  221. continue
  222. else:
  223. _group = []
  224. _set_column = set()
  225. _set_tenderee = set()
  226. for j in range(_begin,i+1):
  227. if list_docs[j]["tenderee"] is not None and list_docs[j]["tenderee"]!="":
  228. _set_tenderee.add(list_docs[j]["tenderee"])
  229. _set_column.add(list_docs[j]["defind_column"])
  230. _group.append({"docid":list_docs[j]["docid"],"extract_count":list_docs[j]["extract_count"]})
  231. if len(_group)>=3 and len(_set_tenderee)>1:
  232. pass
  233. else:
  234. if len(_group)>1:
  235. if defind_count==2:
  236. if len(_set_column)>=2:
  237. list_group.append(_group)
  238. elif defind_count==1:
  239. if len(_set_column)==1:
  240. list_group.append(_group)
  241. elif defind_count==0:
  242. list_group.append(_group)
  243. _begin = i+1
  244. if len(list_docs)>1:
  245. _set_column = set()
  246. _set_tenderee = set()
  247. _group = []
  248. for j in range(_begin,len(list_docs)):
  249. if list_docs[j]["tenderee"] is not None and list_docs[j]["tenderee"]!="":
  250. _set_tenderee.add(list_docs[j]["tenderee"])
  251. _set_column.add(list_docs[j]["defind_column"])
  252. _group.append({"docid":list_docs[j]["docid"],"extract_count":list_docs[j]["extract_count"]})
  253. if len(_group)>=3 and len(_set_tenderee)>1:
  254. pass
  255. else:
  256. if len(_group)>1:
  257. if defind_count==2:
  258. if len(_set_column)>=2:
  259. list_group.append(_group)
  260. elif defind_count==1:
  261. if len(_set_column)==1:
  262. list_group.append(_group)
  263. elif defind_count==0:
  264. list_group.append(_group)
  265. return json.dumps(list_group)
  266. def isEmpty(_str):
  267. if _str is None or _str=="":
  268. return True
  269. return False
  270. @annotate('bigint,bigint,bigint,string,string,string,string,string,string,string,string->string')
  271. class f_set_docid_binaryChart(BaseUDAF):
  272. '''
  273. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  274. '''
  275. def __init__(self):
  276. import json
  277. global json
  278. def new_buffer(self):
  279. return [[]]
  280. def iterate(self, buffer,docid, page_time_stamp,extract_count,project_code,project_name,tenderee,bidding_budget,win_tenderer,win_bid_price,agency,web_source_no):
  281. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count,
  282. "project_code":project_code,"project_name":project_name,"tenderee":tenderee,
  283. "bidding_budget":bidding_budget,"win_tenderer":win_tenderer,"win_bid_price":win_bid_price,
  284. "agency":agency,"web_source_no":web_source_no})
  285. def merge(self, buffer, pbuffer):
  286. buffer[0].extend(pbuffer[0])
  287. def terminate(self, buffer):
  288. list_docs = buffer[0]
  289. list_timeGroups = split_with_time(list_docs,"page_time_stamp",86400*2)
  290. list_group = []
  291. empty_key = ["project_code","bidding_budget","win_tenderer","win_bid_price","agency"]
  292. for _timeGroups in list_timeGroups:
  293. list_empty = []
  294. list_notEmpty = []
  295. for _item in _timeGroups:
  296. empty_flag = True
  297. for _key in empty_key:
  298. if not isEmpty(_item[_key]):
  299. empty_flag = False
  300. break
  301. if empty_flag:
  302. list_empty.append(_item)
  303. else:
  304. list_notEmpty.append(_item)
  305. for _e in list_empty:
  306. _group = [{"docid":_e["docid"],"extract_count":_e["extract_count"]}]
  307. _e_tenderee = _e["tenderee"]
  308. for _ne in list_notEmpty:
  309. if "set_webSource" not in _ne:
  310. _ne["set_webSource"] = set()
  311. _ne["set_webSource"].add(_ne["web_source_no"])
  312. _suit = False
  313. if not isEmpty(_e_tenderee) and _e_tenderee==_ne["tenderee"]:
  314. _suit = True
  315. elif isEmpty(_e_tenderee):
  316. _suit = True
  317. if _suit:
  318. if _e["web_source_no"] not in _ne["set_webSource"]:
  319. _ne["set_webSource"].add(_e["web_source_no"])
  320. _group.append({"docid":_ne["docid"],"extract_count":_ne["extract_count"]})
  321. break
  322. if len(_group)>1:
  323. list_group.append(_group)
  324. return json.dumps(list_group)
  325. def split_with_time(list_dict,sort_key,timedelta=86400*2):
  326. if len(list_dict)>0:
  327. if sort_key in list_dict[0]:
  328. list_dict.sort(key=lambda x:x[sort_key])
  329. list_group = []
  330. _begin = 0
  331. for i in range(len(list_dict)-1):
  332. if abs(list_dict[i][sort_key]-list_dict[i+1][sort_key])<timedelta:
  333. continue
  334. else:
  335. _group = []
  336. for j in range(_begin,i+1):
  337. _group.append(list_dict[j])
  338. if len(_group)>1:
  339. list_group.append(_group)
  340. _begin = i + 1
  341. if len(list_dict)>1:
  342. _group = []
  343. for j in range(_begin,len(list_dict)):
  344. _group.append(list_dict[j])
  345. if len(_group)>1:
  346. list_group.append(_group)
  347. return list_group
  348. return [list_dict]
  349. @annotate('bigint,bigint,bigint,string,string,string,string,string->string')
  350. class f_set_docid_limitNum_contain(BaseUDAF):
  351. '''
  352. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同
  353. '''
  354. def __init__(self):
  355. import logging
  356. import json,re
  357. global json,logging,re
  358. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  359. def new_buffer(self):
  360. return [list()]
  361. def iterate(self, buffer,docid,page_time_stamp,extract_count,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column):
  362. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count,"set_limit_column1":set_limit_column1,
  363. "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,
  364. "contain_column":contain_column})
  365. def merge(self, buffer, pbuffer):
  366. buffer[0].extend(pbuffer[0])
  367. def terminate(self, buffer):
  368. list_split = split_with_time(buffer[0],"page_time_stamp")
  369. list_group = []
  370. for _split in list_split:
  371. flag = True
  372. keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"]
  373. for _key in keys:
  374. logging.info(_key+str(getSet(_split,_key)))
  375. if len(getSet(_split,_key))>1:
  376. flag = False
  377. break
  378. MAX_CONTAIN_COLUMN = None
  379. #判断组内每条公告是否包含
  380. if flag:
  381. for _d in _split:
  382. contain_column = _d["contain_column"]
  383. if contain_column is not None and contain_column !="":
  384. if MAX_CONTAIN_COLUMN is None:
  385. MAX_CONTAIN_COLUMN = contain_column
  386. else:
  387. if len(MAX_CONTAIN_COLUMN)<len(contain_column):
  388. if contain_column.find(MAX_CONTAIN_COLUMN)==-1:
  389. flag = False
  390. break
  391. MAX_CONTAIN_COLUMN = contain_column
  392. else:
  393. if MAX_CONTAIN_COLUMN.find(contain_column)==-1:
  394. flag = False
  395. break
  396. if flag:
  397. if len(_split)>1:
  398. _group = []
  399. for _item in _split:
  400. _group.append({"docid":_item["docid"],"extract_count":_item["extract_count"]})
  401. list_group.append(_group)
  402. return json.dumps(list_group)
  403. @annotate('bigint->string')
  404. class f_stamp_squence(BaseUDAF):
  405. '''
  406. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  407. '''
  408. def __init__(self):
  409. import json
  410. global json
  411. import logging
  412. global logging
  413. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  414. def new_buffer(self):
  415. return [set()]
  416. def iterate(self, buffer,page_time_stamp):
  417. buffer[0].add(page_time_stamp)
  418. def merge(self, buffer, pbuffer):
  419. buffer[0] |= pbuffer[0]
  420. def terminate(self, buffer):
  421. if 0 in buffer[0]:
  422. buffer[0].remove(0)
  423. list_stamp = list(buffer[0])
  424. list_stamp.sort(key=lambda x:x)
  425. list_stamp_final = []
  426. _begin = 0
  427. _time_decase = 86400*2
  428. logging.info(str(list_stamp))
  429. for _index in range(len(list_stamp)-1):
  430. if list_stamp[_index+1]-list_stamp[_index]<_time_decase:
  431. continue
  432. else:
  433. list_stamp_final.append([list_stamp[_begin]-_time_decase,list_stamp[_index]+_time_decase])
  434. _begin = _index+1
  435. if len(list_stamp)>0:
  436. list_stamp_final.append([list_stamp[_begin]-_time_decase,list_stamp[-1]+_time_decase])
  437. return json.dumps(list_stamp_final)
  438. @annotate("bigint,string->bigint")
  439. class in_stamp(object):
  440. def __init__(self):
  441. import logging
  442. import re
  443. import json
  444. global logging,re,json
  445. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  446. def evaluate(self, page_time_stamp,json_stamp):
  447. list_stamp = json.loads(json_stamp)
  448. int_flag = 0
  449. for item in list_stamp:
  450. if page_time_stamp <item[0]:
  451. break
  452. if page_time_stamp>item[0] and page_time_stamp<item[1]:
  453. int_flag = 1
  454. break
  455. return int_flag
  456. def getConfidence(rule_id):
  457. if rule_id ==0:
  458. return 30
  459. elif rule_id >=1 and rule_id <30:
  460. return 20
  461. else:
  462. return 10
  463. @annotate('string,bigint -> bigint,bigint,bigint,bigint,bigint')
  464. class f_split_group_single(BaseUDTF):
  465. '''
  466. 将多个组拆解成多条记录
  467. '''
  468. def __init__(self):
  469. import logging
  470. import json
  471. global json,logging
  472. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  473. def process(self, json_set_docid,rule_id):
  474. list_group = json.loads(json_set_docid)
  475. for item in list_group:
  476. if len(item)>100:
  477. item.sort(key=lambda x:x["docid"],reverse=True)
  478. index_i = 0
  479. for index_j in range(1,len(item)):
  480. if item[index_i]["docid"]!=item[index_j]["docid"]:
  481. self.forward(item[index_i]["docid"],item[index_j]["docid"],item[index_i]["extract_count"],item[index_j]["extract_count"],getConfidence(rule_id))
  482. else:
  483. for index_i in range(len(item)):
  484. for index_j in range(len(item)):
  485. if index_i!=index_j and item[index_i]["docid"]!=item[index_j]["docid"]:
  486. self.forward(item[index_i]["docid"],item[index_j]["docid"],item[index_i]["extract_count"],item[index_j]["extract_count"],getConfidence(rule_id))
  487. @annotate('bigint,string->string')
  488. class group_document(BaseUDAF):
  489. '''
  490. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  491. '''
  492. def __init__(self):
  493. import json
  494. global json
  495. def new_buffer(self):
  496. return [[]]
  497. def iterate(self, buffer,id,json_set_docid):
  498. buffer[0].append({"id":id,"json_set_docid":json.loads(json_set_docid)})
  499. def merge(self, buffer, pbuffer):
  500. buffer[0].extend(pbuffer[0])
  501. def terminate(self, buffer):
  502. return json.dumps(buffer[0])
  503. @annotate('bigint,string,bigint,string -> bigint,bigint,string')
  504. class decare_document(BaseUDTF):
  505. '''
  506. 将多个组拆解成多条记录
  507. '''
  508. def __init__(self):
  509. import logging
  510. import json
  511. global json,logging
  512. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  513. def process(self,group_id1, json_list_doc1,group_id2,json_list_doc2):
  514. #y=x,少掉近一半的数据
  515. if group_id1>=group_id2:
  516. list_doc1 = json.loads(json_list_doc1)
  517. list_doc2 = json.loads(json_list_doc2)
  518. for _doc1 in list_doc1:
  519. for _doc2 in list_doc2:
  520. #同一个重复group不做判断
  521. if _doc1["id"]!=_doc2["id"]:
  522. #判断两个group是否有重复
  523. _set1 = set()
  524. for _item1 in _doc1["json_set_docid"]:
  525. _set1.add(_item1["docid"])
  526. _set2 = set()
  527. for _item2 in _doc2["json_set_docid"]:
  528. _set2.add(_item2["docid"])
  529. if len(_set1&_set2)>0:
  530. new_json_set_docid = _doc1["json_set_docid"]
  531. for _item2 in _doc2["json_set_docid"]:
  532. if _item2["docid"] not in _set1:
  533. new_json_set_docid.append(_item2)
  534. self.forward(_doc1["id"],_doc2["id"],json.dumps(new_json_set_docid))
  535. def getBestDocid(list_pair):
  536. list_pair.sort(key=lambda x:x[3],reverse=True)
  537. _max_count = max(list_pair[0][3],list_pair[0][1])
  538. set_candidate = set()
  539. if list_pair[0][1]==_max_count:
  540. set_candidate.add(list_pair[0][0])
  541. for item in list_pair:
  542. if item[3]==_max_count:
  543. set_candidate.add(item[2])
  544. else:
  545. break
  546. list_candidate = list(set_candidate)
  547. list_candidate.sort(key=lambda x:x)
  548. return list_candidate[0]
  549. @annotate('bigint,bigint,bigint,bigint->string')
  550. class choose_document(BaseUDAF):
  551. '''
  552. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  553. '''
  554. def __init__(self):
  555. import json
  556. global json
  557. def new_buffer(self):
  558. return [[]]
  559. def iterate(self, buffer,docid1,extract_count1,docid2,extract_count2):
  560. buffer[0].append([docid1,extract_count1,docid2,extract_count2])
  561. def merge(self, buffer, pbuffer):
  562. buffer[0].extend(pbuffer[0])
  563. def terminate(self, buffer):
  564. list_pair = buffer[0]
  565. _set = set()
  566. for item in buffer[0]:
  567. _set.add(str(item[2]))
  568. list_dumplicate = list(_set)
  569. best_docid = getBestDocid(list_pair)
  570. if best_docid==list_pair[0][0]:
  571. save_flag = 1
  572. else:
  573. save_flag = 0
  574. return json.dumps({"save_flag":save_flag,"dumplicates":list_dumplicate})
  575. @annotate('string -> bigint,string')
  576. class f_get_choose_document(BaseUDTF):
  577. '''
  578. 将多个组拆解成多条记录
  579. '''
  580. def __init__(self):
  581. import logging
  582. import json
  583. global json,logging
  584. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  585. def process(self,json_choose):
  586. if json_choose is None:
  587. self.forward(1,None)
  588. else:
  589. _choose = json.loads(json_choose)
  590. self.forward(_choose["save_flag"],",".join(_choose["dumplicates"]))
  591. @annotate('bigint,bigint,bigint,bigint->string')
  592. class group_document_bestFirst(BaseUDAF):
  593. '''
  594. 将组里面最优的放在前面
  595. '''
  596. def __init__(self):
  597. import json
  598. global json
  599. def new_buffer(self):
  600. return [[]]
  601. def iterate(self, buffer,docid1,extract_count1,docid2,extract_count2):
  602. buffer[0].append([docid1,extract_count1,docid2,extract_count2])
  603. def merge(self, buffer, pbuffer):
  604. buffer[0].extend(pbuffer[0])
  605. def terminate(self, buffer):
  606. list_pair = buffer[0]
  607. _set = set()
  608. for item in buffer[0]:
  609. _set.add(item[2])
  610. _set.add(list_pair[0][0])
  611. best_docid = getBestDocid(list_pair)
  612. _set.remove(best_docid)
  613. list_dumplicate = list(_set)
  614. list_dumplicate.sort(key=lambda x:x)
  615. list_dumplicate.insert(0,best_docid)
  616. list_dumplicate_str = []
  617. for item in list_dumplicate:
  618. list_dumplicate_str.append(str(item))
  619. return ",".join(list_dumplicate_str)
  620. @annotate('string -> bigint,string')
  621. class f_get_best_dumplicates(BaseUDTF):
  622. '''
  623. 得到每个分组中最优的那一条及其重复记录
  624. '''
  625. def __init__(self):
  626. import logging
  627. import json
  628. global json,logging
  629. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  630. def process(self,list_dumplicate_str):
  631. if list_dumplicate_str is None:
  632. pass
  633. else:
  634. list_dumplicate = list_dumplicate_str.split(",")
  635. if len(list_dumplicate)>0:
  636. self.forward(int(list_dumplicate[0]),",".join(list_dumplicate[1:]))
  637. else:
  638. pass
  639. @annotate('bigint,bigint->string')
  640. class bridge2group(BaseUDAF):
  641. '''
  642. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  643. '''
  644. def __init__(self):
  645. import json
  646. global json
  647. def new_buffer(self):
  648. return [set()]
  649. def iterate(self, buffer,docid1,docid2):
  650. buffer[0].add(docid1)
  651. buffer[0].add(docid2)
  652. def merge(self, buffer, pbuffer):
  653. buffer[0] |= pbuffer[0]
  654. def terminate(self, buffer):
  655. list_pair = list(buffer[0])
  656. list_pair.sort(key=lambda x:x,reverse=True)
  657. return json.dumps(list_pair)
  658. @annotate('string -> bigint,bigint')
  659. class group2bridge(BaseUDTF):
  660. '''
  661. 将多个组拆解成多条记录
  662. '''
  663. def __init__(self):
  664. import logging
  665. import json
  666. global json,logging
  667. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  668. def process(self,json_list_docid):
  669. list_docid = json.loads(json_list_docid)
  670. for _docid in list_docid:
  671. self.forward(list_docid[-1],_docid)
  672. @annotate('bigint,bigint,string -> bigint')
  673. class f_get_dump_docid(BaseUDTF):
  674. '''
  675. 将多个组拆解成多条记录
  676. '''
  677. def __init__(self):
  678. import logging
  679. import json
  680. global json,logging
  681. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  682. def process(self,docid,save_flag,dumplicates):
  683. if save_flag==0:
  684. self.forward(docid)
  685. if dumplicates is not None:
  686. list_docid = dumplicates.split(",")
  687. if len(list_docid)>0:
  688. for _docid in list_docid[1:]:
  689. self.forward(int(_docid))
  690. else:
  691. if dumplicates is not None:
  692. list_docid = dumplicates.split(",")
  693. if len(list_docid)>0:
  694. for _docid in list_docid:
  695. self.forward(int(_docid))
  696. @annotate('string -> bigint,bigint')
  697. class f_get_docid(BaseUDTF):
  698. '''
  699. 将多个组拆解成多条记录
  700. '''
  701. def __init__(self):
  702. import logging
  703. import json
  704. global json,logging
  705. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  706. def process(self,json_set_docid):
  707. team_id = 0
  708. if json_set_docid is not None:
  709. list_docses = json.loads(json_set_docid)
  710. for list_docs in list_docses:
  711. team_id += 1
  712. for item in list_docs:
  713. self.forward(team_id,item["docid"])
  714. @annotate("string->bigint")
  715. class get_count_dump(object):
  716. def __init__(self):
  717. import logging
  718. import re
  719. global logging,re
  720. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  721. def evaluate(self, title):
  722. _count = 0
  723. if title is not None:
  724. _count = len(title.split(","))
  725. return _count
  726. def getSet(list_dict,key):
  727. _set = set()
  728. for item in list_dict:
  729. if key in item:
  730. if item[key]!='' and item[key] is not None:
  731. if re.search("^\d[\d\.]*$",item[key]) is not None:
  732. _set.add(str(float(item[key])))
  733. else:
  734. _set.add(str(item[key]))
  735. return _set
  736. def getDiffIndex(list_dict,key,confidence=100):
  737. _set = set()
  738. for _i in range(len(list_dict)):
  739. item = list_dict[_i]
  740. if item["confidence"]>=confidence:
  741. continue
  742. if key in item:
  743. if item[key]!='' and item[key] is not None:
  744. if re.search("^\d[\d\.]*$",item[key]) is not None:
  745. _set.add(str(float(item[key])))
  746. else:
  747. _set.add(str(item[key]))
  748. if len(_set)>1:
  749. return _i
  750. return len(list_dict)
  751. @annotate('bigint,string -> bigint,bigint')
  752. class f_getGroup_dumpFinal(BaseUDTF):
  753. '''
  754. 从最后的结果中获取组
  755. '''
  756. def __init__(self):
  757. import logging
  758. import json
  759. global json,logging
  760. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  761. def process(self,docid,dumplicates):
  762. self.forward(int(docid),int(docid))
  763. if dumplicates is not None:
  764. list_docids = dumplicates.split(",")
  765. for _docid in list_docids:
  766. self.forward(int(docid),int(_docid))
  767. @annotate('bigint,bigint,string,string,string,string,bigint,bigint,bigint->string')
  768. class f_redump_limit_num(BaseUDAF):
  769. '''
  770. 去重合并后重新判断,组内个数大于5时,dottitle、tenderee、win_tenderer、bidding_budget组内只能有一个取值
  771. 组内个数小于等于5时,tenderee、win_tenderer、bidding_budget组内只能有一个取值
  772. '''
  773. def __init__(self):
  774. import logging
  775. import json,re
  776. global json,logging,re
  777. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  778. def new_buffer(self):
  779. return [list()]
  780. def iterate(self, buffer,main_docid,docid,doctitle,set_limit_column2,set_limit_column3,set_limit_column4,extract_count1,extract_count2,confidence):
  781. buffer[0].append({"main_docid":main_docid,"docid":docid,"doctitle":doctitle,"set_limit_column2":set_limit_column2,
  782. "set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,"extract_count1":extract_count1,
  783. "extract_count2":extract_count2,"confidence":confidence})
  784. def merge(self, buffer, pbuffer):
  785. buffer[0].extend(pbuffer[0])
  786. def terminate(self, buffer):
  787. list_group = []
  788. the_group = buffer[0]
  789. the_group.sort(key=lambda x:x["confidence"],reverse=True)
  790. if len(the_group)>5:
  791. keys = ["doctitle","set_limit_column2","set_limit_column3","set_limit_column4"]
  792. else:
  793. keys = ["set_limit_column2","set_limit_column3","set_limit_column4"]
  794. final_group = []
  795. #置信度
  796. list_key_index = []
  797. for _k in keys:
  798. if _k=="doctitle":
  799. list_key_index.append(getDiffIndex(the_group,_k,confidence=30))
  800. else:
  801. list_key_index.append(getDiffIndex(the_group,_k))
  802. _index = min(list_key_index)
  803. if _index>1:
  804. main_docid = the_group[0]["main_docid"]
  805. for item in the_group[:_index]:
  806. if item["docid"]!=main_docid:
  807. final_group.append({"docid1":main_docid,"docid2":item["docid"],"extract_count1":item["extract_count1"],"extract_count2":item["extract_count2"],"confidence":item["confidence"]})
  808. # stay = True
  809. # for _key in keys:
  810. # if len(getSet(the_group,_key))>1:
  811. # stay = False
  812. # break
  813. #
  814. # if stay:
  815. # main_docid = the_group[0]["main_docid"]
  816. # for item in the_group:
  817. # if item["docid"]!=main_docid:
  818. # final_group.append({"docid1":main_docid,"docid2":item["docid"],"extract_count1":item["extract_count1"],"extract_count2":item["extract_count2"]})
  819. return json.dumps(final_group)
  820. @annotate('string -> bigint,bigint,bigint,bigint,bigint')
  821. class f_get_dumpFinal_checked(BaseUDTF):
  822. '''
  823. 从最后的结果中获取组
  824. '''
  825. def __init__(self):
  826. import logging
  827. import json
  828. global json,logging
  829. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  830. def process(self,list_group):
  831. if list_group is not None:
  832. final_group = json.loads(list_group)
  833. for _group in final_group:
  834. self.forward(_group["docid1"],_group["docid2"],_group["extract_count1"],_group["extract_count2"],_group["confidence"])
  835. @annotate('string -> bigint')
  836. class f_getDumplicateDocids(BaseUDTF):
  837. '''
  838. 从最后的结果中获取组
  839. '''
  840. def __init__(self):
  841. import logging
  842. import json
  843. global json,logging
  844. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  845. def process(self,dumplicates):
  846. list_docids = dumplicates.split(",")
  847. for _d in list_docids:
  848. self.forward(int(_d))