documentDumplicate.py 47 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215
  1. #coding:UTF8
  2. from odps.udf import annotate
  3. from odps.udf import BaseUDTF
  4. from odps.udf import BaseUDAF
  5. @annotate('string,string -> string,bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string')
  6. class f_decode_extract(BaseUDTF):
  7. def __init__(self):
  8. import logging
  9. import json
  10. import time,re
  11. global json,logging,time,re
  12. self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*"
  13. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  14. self.dict_channel = {"公告变更":51,
  15. "招标公告":52,
  16. "中标信息":101,
  17. "招标预告":102,
  18. "招标答疑":103,
  19. "资审结果":105,
  20. "法律法规":106,
  21. "新闻资讯":107,
  22. "采购意向":114,
  23. "拍卖出让":115,
  24. "土地矿产":116,
  25. "产权交易":117,
  26. "废标公告":118,
  27. "候选人公示":119,
  28. "合同公告":120}
  29. def process(self, extractjson,otherjson):
  30. if extractjson is not None:
  31. _extract = json.loads(extractjson)
  32. else:
  33. _extract = {}
  34. if otherjson is not None:
  35. _other = json.loads(otherjson)
  36. else:
  37. _other = {}
  38. project_code = ""
  39. project_name = ""
  40. tenderee = ""
  41. agency = ""
  42. win_tenderer = ""
  43. bidding_budget = ""
  44. win_bid_price = ""
  45. fingerprint = ""
  46. page_time_stamp = 0
  47. docchannel = 0
  48. extract_count = 0
  49. page_time = _other.get("pageTime",time.strftime('%Y-%m-%d',time.localtime()))
  50. doctitle = _other.get("doctitle","")
  51. doctitle_refine = re.sub(r'工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|合同', '', doctitle)
  52. area = _other.get("area","")
  53. province = _other.get("province","")
  54. city = _other.get("city","")
  55. district = _other.get("district","")
  56. web_source_no = _other.get("webSourceNo","")
  57. time_bidclose = _extract.get("time_bidclose")
  58. time_bidopen = _extract.get("time_bidopen")
  59. time_bidstart = _extract.get("time_bidstart")
  60. time_commencement = _extract.get("time_commencement")
  61. time_completion = _extract.get("time_completion")
  62. time_earnest_money_end = _extract.get("time_earnestMoneyEnd")
  63. time_earnest_money_start = _extract.get("time_earnestMoneyStart")
  64. time_get_file_end = _extract.get("time_getFileEnd")
  65. time_get_file_start = _extract.get("time_getFileStart")
  66. time_publicity_end = _extract.get("time_publicityEnd")
  67. time_publicity_start = _extract.get("time_publicityStart")
  68. time_registration_end = _extract.get("time_registrationEnd")
  69. time_registration_start = _extract.get("time_registrationStart")
  70. time_release = _extract.get("time_release")
  71. # docchannel = _other.get("docchannel",0)
  72. docchannel_name = _extract.get("docchannel",{}).get("docchannel")
  73. doctype_name = _extract.get("docchannel",{}).get("doctype")
  74. if doctype_name in ["法律法规","新闻资讯","拍卖出让","土地矿产"]:
  75. docchannel_name = doctype_name
  76. docchannel = self.dict_channel.get(docchannel_name,0)
  77. if re.search(self.time_pattern,page_time) is not None:
  78. try:
  79. timeArray = time.strptime(page_time[:11], "%Y-%m-%d")
  80. page_time_stamp = int(time.mktime(timeArray))
  81. except Exception as e:
  82. pass
  83. list_code = _extract.get("code",[])
  84. if len(list_code)>0:
  85. project_code = list_code[0]
  86. project_name = _extract.get("name","")
  87. fingerprint = _extract.get("fingerprint","")
  88. dict_pack = _extract.get("prem",{})
  89. logging.info(dict_pack)
  90. for _key in dict_pack.keys():
  91. if dict_pack[_key]["tendereeMoney"]!='' and float(dict_pack[_key]["tendereeMoney"])>0:
  92. extract_count += 1
  93. if bidding_budget=="":
  94. bidding_budget = str(float(dict_pack[_key]["tendereeMoney"]))
  95. for _role in dict_pack[_key]["roleList"]:
  96. if isinstance(_role,list):
  97. extract_count += 1
  98. if _role[2]!='' and float(_role[2])>0:
  99. extract_count += 1
  100. if _role[0]=="tenderee":
  101. tenderee = _role[1]
  102. if _role[0]=="win_tenderer":
  103. if win_tenderer=="":
  104. win_tenderer = _role[1]
  105. if _role[2]!='' and float(_role[2])>0:
  106. extract_count += 1
  107. if win_bid_price=="":
  108. win_bid_price = str(float(_role[2]))
  109. if _role[0]=="agency":
  110. agency = _role[1]
  111. if isinstance(_role,dict):
  112. extract_count += 1
  113. if str(_role["role_money"]["money"])!='' and float(_role["role_money"]["money"])>0:
  114. extract_count += 1
  115. if _role["role_name"]=="tenderee":
  116. tenderee = _role["role_text"]
  117. if _role["role_name"]=="win_tenderer":
  118. if win_tenderer=="":
  119. win_tenderer = _role["role_text"]
  120. if str(_role["role_money"]["money"])!='' and float(_role["role_money"]["money"])>0:
  121. extract_count += 1
  122. if win_bid_price=="":
  123. win_bid_price = str(float(_role["role_money"]["money"]))
  124. if _role["role_name"]=="agency":
  125. agency = _role["role_text"]
  126. if project_code!="":
  127. extract_count += 1
  128. if project_name!="":
  129. extract_count += 1
  130. logging.info(page_time+doctitle+doctitle_refine+area+province+city+
  131. district+web_source_no+project_code+project_name+tenderee+agency+win_tenderer+bidding_budget+win_bid_price)
  132. self.forward(page_time,page_time_stamp,docchannel,doctitle,doctitle_refine,area,province,city,
  133. district,web_source_no,fingerprint,project_code,project_name,tenderee,agency,win_tenderer,bidding_budget,win_bid_price,extract_count,
  134. time_bidclose,time_bidopen,time_bidstart,time_commencement,time_completion,time_earnest_money_end,time_earnest_money_start,
  135. time_get_file_end,time_get_file_start,time_publicity_end,time_publicity_start,time_registration_end,time_registration_start,time_release)
  136. @annotate("string->bigint")
  137. class f_get_extractCount(object):
  138. def __init__(self):
  139. import time
  140. global time
  141. import logging
  142. import json
  143. import re
  144. global json,logging,re
  145. self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*"
  146. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  147. def evaluate(self, extractjson):
  148. if extractjson is not None:
  149. _extract = json.loads(extractjson)
  150. else:
  151. _extract = {}
  152. dict_pack = _extract.get("prem",{})
  153. extract_count = 0
  154. list_code = _extract.get("code",[])
  155. if len(list_code)>0:
  156. project_code = list_code[0]
  157. else:
  158. project_code = ""
  159. project_name = _extract.get("name","")
  160. bidding_budget = ""
  161. win_tenderer = ""
  162. win_bid_price = ""
  163. for _key in dict_pack.keys():
  164. if "tendereeMoney" in dict_pack[_key] and dict_pack[_key]["tendereeMoney"]!='' and float(dict_pack[_key]["tendereeMoney"])>0:
  165. extract_count += 1
  166. if bidding_budget=="":
  167. bidding_budget = str(float(dict_pack[_key]["tendereeMoney"]))
  168. for _role in dict_pack[_key]["roleList"]:
  169. if isinstance(_role,list):
  170. extract_count += 1
  171. if _role[2]!='' and float(_role[2])>0:
  172. extract_count += 1
  173. if _role[0]=="tenderee":
  174. tenderee = _role[1]
  175. if _role[0]=="win_tenderer":
  176. if win_tenderer=="":
  177. win_tenderer = _role[1]
  178. if _role[2]!='' and float(_role[2])>0:
  179. extract_count += 1
  180. if win_bid_price=="":
  181. win_bid_price = str(float(_role[2]))
  182. if _role[0]=="agency":
  183. agency = _role[1]
  184. if isinstance(_role,dict):
  185. extract_count += 1
  186. if "role_money" in _role:
  187. if str(_role["role_money"].get("money",""))!='' and float(_role["role_money"].get("money",""))>0:
  188. extract_count += 1
  189. if _role.get("role_name")=="tenderee":
  190. tenderee = _role["role_text"]
  191. if _role.get("role_name")=="win_tenderer":
  192. if win_tenderer=="":
  193. win_tenderer = _role["role_text"]
  194. if "role_money" in _role:
  195. if str(_role["role_money"]["money"])!='' and float(_role["role_money"]["money"])>0:
  196. extract_count += 1
  197. if win_bid_price=="":
  198. win_bid_price = str(float(_role["role_money"]["money"]))
  199. if _role["role_name"]=="agency":
  200. agency = _role["role_text"]
  201. if project_code!="":
  202. extract_count += 1
  203. if project_name!="":
  204. extract_count += 1
  205. return extract_count
  206. @annotate('string,string,string,string,string -> string,string,string,bigint')
  207. class f_decode_sub_docs_json(BaseUDTF):
  208. def __init__(self):
  209. import logging
  210. import json
  211. global json,logging
  212. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  213. def process(self, project_code,project_name,tenderee,agency,sub_docs_json):
  214. columns = {"win_tenderer":"","bidding_budget":"","win_bid_price":""}
  215. extract_count = 0
  216. if project_code is not None and project_code!="":
  217. extract_count += 1
  218. if project_name is not None and project_name!="":
  219. extract_count += 1
  220. if tenderee is not None and tenderee!="":
  221. extract_count += 1
  222. if agency is not None and agency!="":
  223. extract_count += 1
  224. if sub_docs_json is not None:
  225. for sub_docs in json.loads(sub_docs_json):
  226. for _key_sub_docs in sub_docs.keys():
  227. extract_count += 1
  228. if _key_sub_docs in columns:
  229. if columns[_key_sub_docs]=="" and str(sub_docs[_key_sub_docs]) not in ["","0"]:
  230. if _key_sub_docs in ["bidding_budget","win_bid_price"]:
  231. if float(sub_docs[_key_sub_docs])>0:
  232. columns[_key_sub_docs] = str(float(sub_docs[_key_sub_docs]))
  233. else:
  234. columns[_key_sub_docs] = str(sub_docs[_key_sub_docs])
  235. self.forward(columns["win_tenderer"],columns["bidding_budget"],columns["win_bid_price"],extract_count)
  236. @annotate("string->bigint")
  237. class totimestamp(object):
  238. def __init__(self):
  239. import time
  240. global time
  241. import logging
  242. import json
  243. import re
  244. global json,logging,re
  245. self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*"
  246. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  247. def evaluate(self, str_time):
  248. try:
  249. logging.info(str_time)
  250. if str_time is not None and re.search(self.time_pattern,str_time) is not None:
  251. timeArray = time.strptime(str_time[:10], "%Y-%m-%d")
  252. timeStamp = int(time.mktime(timeArray))
  253. return timeStamp
  254. else:
  255. return 0
  256. except Exception as e:
  257. return 0
  258. @annotate("string->string")
  259. class refind_name(object):
  260. def __init__(self):
  261. import logging
  262. import re
  263. global logging,re
  264. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  265. def evaluate(self, title):
  266. if title is not None:
  267. return re.sub(r'工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|\[|\]|【|】', '', title)
  268. return ""
  269. @annotate('bigint,bigint,bigint,string,bigint,string->string')
  270. class f_set_docid(BaseUDAF):
  271. '''
  272. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  273. '''
  274. def __init__(self):
  275. import json
  276. global json
  277. def new_buffer(self):
  278. return [[]]
  279. def iterate(self, buffer,docid, page_time_stamp,extract_count,defind_column,defind_count,tenderee):
  280. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count,
  281. "defind_column":defind_column,"defind_count":defind_count,"tenderee":tenderee})
  282. def merge(self, buffer, pbuffer):
  283. buffer[0].extend(pbuffer[0])
  284. def terminate(self, buffer):
  285. list_docs = buffer[0]
  286. list_docs.sort(key=lambda x:x["page_time_stamp"])
  287. list_group = []
  288. _begin = 0
  289. defind_count = 0
  290. if len(list_docs)>0:
  291. defind_count = list_docs[0]["defind_count"]
  292. for i in range(len(list_docs)-1):
  293. if abs(list_docs[i]["page_time_stamp"]-list_docs[i+1]["page_time_stamp"])<=86400*2:
  294. continue
  295. else:
  296. _group = []
  297. _set_column = set()
  298. _set_tenderee = set()
  299. for j in range(_begin,i+1):
  300. if list_docs[j]["tenderee"] is not None and list_docs[j]["tenderee"]!="":
  301. _set_tenderee.add(list_docs[j]["tenderee"])
  302. _set_column.add(list_docs[j]["defind_column"])
  303. _group.append({"docid":list_docs[j]["docid"],"extract_count":list_docs[j]["extract_count"]})
  304. if len(_group)>=3 and len(_set_tenderee)>1:
  305. pass
  306. else:
  307. if len(_group)>1:
  308. if defind_count==2:
  309. if len(_set_column)>=2:
  310. list_group.append(_group)
  311. elif defind_count==1:
  312. if len(_set_column)==1:
  313. list_group.append(_group)
  314. elif defind_count==0:
  315. list_group.append(_group)
  316. _begin = i+1
  317. if len(list_docs)>1:
  318. _set_column = set()
  319. _set_tenderee = set()
  320. _group = []
  321. for j in range(_begin,len(list_docs)):
  322. if list_docs[j]["tenderee"] is not None and list_docs[j]["tenderee"]!="":
  323. _set_tenderee.add(list_docs[j]["tenderee"])
  324. _set_column.add(list_docs[j]["defind_column"])
  325. _group.append({"docid":list_docs[j]["docid"],"extract_count":list_docs[j]["extract_count"]})
  326. if len(_group)>=3 and len(_set_tenderee)>1:
  327. pass
  328. else:
  329. if len(_group)>1:
  330. if defind_count==2:
  331. if len(_set_column)>=2:
  332. list_group.append(_group)
  333. elif defind_count==1:
  334. if len(_set_column)==1:
  335. list_group.append(_group)
  336. elif defind_count==0:
  337. list_group.append(_group)
  338. return json.dumps(list_group)
  339. def isEmpty(_str):
  340. if _str is None or _str=="":
  341. return True
  342. return False
  343. @annotate('bigint,bigint,bigint,string,string,string,string,string,string,string,string->string')
  344. class f_set_docid_binaryChart(BaseUDAF):
  345. '''
  346. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  347. '''
  348. def __init__(self):
  349. import json
  350. global json
  351. def new_buffer(self):
  352. return [[]]
  353. def iterate(self, buffer,docid, page_time_stamp,extract_count,project_code,project_name,tenderee,bidding_budget,win_tenderer,win_bid_price,agency,web_source_no):
  354. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count,
  355. "project_code":project_code,"project_name":project_name,"tenderee":tenderee,
  356. "bidding_budget":bidding_budget,"win_tenderer":win_tenderer,"win_bid_price":win_bid_price,
  357. "agency":agency,"web_source_no":web_source_no})
  358. def merge(self, buffer, pbuffer):
  359. buffer[0].extend(pbuffer[0])
  360. def terminate(self, buffer):
  361. list_docs = buffer[0]
  362. list_timeGroups = split_with_time(list_docs,"page_time_stamp",86400*2)
  363. list_group = []
  364. empty_key = ["project_code","bidding_budget","win_tenderer","win_bid_price","agency"]
  365. for _timeGroups in list_timeGroups:
  366. list_empty = []
  367. list_notEmpty = []
  368. for _item in _timeGroups:
  369. empty_flag = True
  370. for _key in empty_key:
  371. if not isEmpty(_item[_key]):
  372. empty_flag = False
  373. break
  374. if empty_flag:
  375. list_empty.append(_item)
  376. else:
  377. list_notEmpty.append(_item)
  378. for _e in list_empty:
  379. _group = [{"docid":_e["docid"],"extract_count":_e["extract_count"]}]
  380. _e_tenderee = _e["tenderee"]
  381. for _ne in list_notEmpty:
  382. if "set_webSource" not in _ne:
  383. _ne["set_webSource"] = set()
  384. _ne["set_webSource"].add(_ne["web_source_no"])
  385. _suit = False
  386. if not isEmpty(_e_tenderee) and _e_tenderee==_ne["tenderee"]:
  387. _suit = True
  388. elif isEmpty(_e_tenderee):
  389. _suit = True
  390. if _suit:
  391. if _e["web_source_no"] not in _ne["set_webSource"]:
  392. _ne["set_webSource"].add(_e["web_source_no"])
  393. _group.append({"docid":_ne["docid"],"extract_count":_ne["extract_count"]})
  394. break
  395. if len(_group)>1:
  396. list_group.append(_group)
  397. return json.dumps(list_group)
  398. def split_with_time(list_dict,sort_key,timedelta=86400*2):
  399. if len(list_dict)>0:
  400. if sort_key in list_dict[0]:
  401. list_dict.sort(key=lambda x:x[sort_key])
  402. list_group = []
  403. _begin = 0
  404. for i in range(len(list_dict)-1):
  405. if abs(list_dict[i][sort_key]-list_dict[i+1][sort_key])<timedelta:
  406. continue
  407. else:
  408. _group = []
  409. for j in range(_begin,i+1):
  410. _group.append(list_dict[j])
  411. if len(_group)>1:
  412. list_group.append(_group)
  413. _begin = i + 1
  414. if len(list_dict)>1:
  415. _group = []
  416. for j in range(_begin,len(list_dict)):
  417. _group.append(list_dict[j])
  418. if len(_group)>1:
  419. list_group.append(_group)
  420. return list_group
  421. return [list_dict]
  422. @annotate('bigint,bigint,bigint,string,string,string,string,string->string')
  423. class f_set_docid_limitNum_contain(BaseUDAF):
  424. '''
  425. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同
  426. '''
  427. def __init__(self):
  428. import logging
  429. import json,re
  430. global json,logging,re
  431. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  432. def new_buffer(self):
  433. return [list()]
  434. def iterate(self, buffer,docid,page_time_stamp,extract_count,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column):
  435. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"extract_count":extract_count,"set_limit_column1":set_limit_column1,
  436. "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,
  437. "contain_column":contain_column})
  438. def merge(self, buffer, pbuffer):
  439. buffer[0].extend(pbuffer[0])
  440. def terminate(self, buffer):
  441. list_split = split_with_time(buffer[0],"page_time_stamp")
  442. list_group = []
  443. for _split in list_split:
  444. flag = True
  445. keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"]
  446. for _key in keys:
  447. logging.info(_key+str(getSet(_split,_key)))
  448. if len(getSet(_split,_key))>1:
  449. flag = False
  450. break
  451. MAX_CONTAIN_COLUMN = None
  452. #判断组内每条公告是否包含
  453. if flag:
  454. for _d in _split:
  455. contain_column = _d["contain_column"]
  456. if contain_column is not None and contain_column !="":
  457. if MAX_CONTAIN_COLUMN is None:
  458. MAX_CONTAIN_COLUMN = contain_column
  459. else:
  460. if len(MAX_CONTAIN_COLUMN)<len(contain_column):
  461. if contain_column.find(MAX_CONTAIN_COLUMN)==-1:
  462. flag = False
  463. break
  464. MAX_CONTAIN_COLUMN = contain_column
  465. else:
  466. if MAX_CONTAIN_COLUMN.find(contain_column)==-1:
  467. flag = False
  468. break
  469. if flag:
  470. if len(_split)>1:
  471. _group = []
  472. for _item in _split:
  473. _group.append({"docid":_item["docid"],"extract_count":_item["extract_count"]})
  474. list_group.append(_group)
  475. return json.dumps(list_group)
  476. @annotate('bigint->string')
  477. class f_stamp_squence(BaseUDAF):
  478. '''
  479. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  480. '''
  481. def __init__(self):
  482. import json
  483. global json
  484. import logging
  485. global logging
  486. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  487. def new_buffer(self):
  488. return [set()]
  489. def iterate(self, buffer,page_time_stamp):
  490. buffer[0].add(page_time_stamp)
  491. def merge(self, buffer, pbuffer):
  492. buffer[0] |= pbuffer[0]
  493. def terminate(self, buffer):
  494. if 0 in buffer[0]:
  495. buffer[0].remove(0)
  496. list_stamp = list(buffer[0])
  497. list_stamp.sort(key=lambda x:x)
  498. list_stamp_final = []
  499. _begin = 0
  500. _time_decase = 86400*2
  501. logging.info(str(list_stamp))
  502. for _index in range(len(list_stamp)-1):
  503. if list_stamp[_index+1]-list_stamp[_index]<_time_decase:
  504. continue
  505. else:
  506. list_stamp_final.append([list_stamp[_begin]-_time_decase,list_stamp[_index]+_time_decase])
  507. _begin = _index+1
  508. if len(list_stamp)>0:
  509. list_stamp_final.append([list_stamp[_begin]-_time_decase,list_stamp[-1]+_time_decase])
  510. return json.dumps(list_stamp_final)
  511. @annotate("bigint,string->bigint")
  512. class in_stamp(object):
  513. def __init__(self):
  514. import logging
  515. import re
  516. import json
  517. global logging,re,json
  518. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  519. def evaluate(self, page_time_stamp,json_stamp):
  520. list_stamp = json.loads(json_stamp)
  521. int_flag = 0
  522. for item in list_stamp:
  523. if page_time_stamp <item[0]:
  524. break
  525. if page_time_stamp>item[0] and page_time_stamp<item[1]:
  526. int_flag = 1
  527. break
  528. return int_flag
  529. def getConfidence(rule_id):
  530. if rule_id ==0:
  531. return 30
  532. elif rule_id >=1 and rule_id <30:
  533. return 20
  534. else:
  535. return 10
  536. @annotate('string,string -> string')
  537. class f_splitStr(BaseUDTF):
  538. '''
  539. 将多个组拆解成多条记录
  540. '''
  541. def __init__(self):
  542. import logging
  543. import json
  544. global json,logging
  545. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  546. def process(self, str_split,_split):
  547. try:
  548. for _s in str_split.split(_split):
  549. self.forward(_s)
  550. except Exception as e:
  551. pass
  552. @annotate('string,bigint -> bigint,bigint,bigint,bigint,bigint')
  553. class f_split_group_single(BaseUDTF):
  554. '''
  555. 将多个组拆解成多条记录
  556. '''
  557. def __init__(self):
  558. import logging
  559. import json
  560. global json,logging
  561. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  562. def process(self, json_set_docid,rule_id):
  563. list_group = json.loads(json_set_docid)
  564. for item in list_group:
  565. if len(item)>100:
  566. item.sort(key=lambda x:x["docid"],reverse=True)
  567. index_i = 0
  568. for index_j in range(1,len(item)):
  569. if item[index_i]["docid"]!=item[index_j]["docid"]:
  570. self.forward(item[index_i]["docid"],item[index_j]["docid"],item[index_i]["extract_count"],item[index_j]["extract_count"],getConfidence(rule_id))
  571. else:
  572. for index_i in range(len(item)):
  573. for index_j in range(len(item)):
  574. if index_i!=index_j and item[index_i]["docid"]!=item[index_j]["docid"]:
  575. self.forward(item[index_i]["docid"],item[index_j]["docid"],item[index_i]["extract_count"],item[index_j]["extract_count"],getConfidence(rule_id))
  576. @annotate('bigint,string->string')
  577. class group_document(BaseUDAF):
  578. '''
  579. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  580. '''
  581. def __init__(self):
  582. import json
  583. global json
  584. def new_buffer(self):
  585. return [[]]
  586. def iterate(self, buffer,id,json_set_docid):
  587. buffer[0].append({"id":id,"json_set_docid":json.loads(json_set_docid)})
  588. def merge(self, buffer, pbuffer):
  589. buffer[0].extend(pbuffer[0])
  590. def terminate(self, buffer):
  591. return json.dumps(buffer[0])
  592. @annotate('bigint,string,bigint,string -> bigint,bigint,string')
  593. class decare_document(BaseUDTF):
  594. '''
  595. 将多个组拆解成多条记录
  596. '''
  597. def __init__(self):
  598. import logging
  599. import json
  600. global json,logging
  601. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  602. def process(self,group_id1, json_list_doc1,group_id2,json_list_doc2):
  603. #y=x,少掉近一半的数据
  604. if group_id1>=group_id2:
  605. list_doc1 = json.loads(json_list_doc1)
  606. list_doc2 = json.loads(json_list_doc2)
  607. for _doc1 in list_doc1:
  608. for _doc2 in list_doc2:
  609. #同一个重复group不做判断
  610. if _doc1["id"]!=_doc2["id"]:
  611. #判断两个group是否有重复
  612. _set1 = set()
  613. for _item1 in _doc1["json_set_docid"]:
  614. _set1.add(_item1["docid"])
  615. _set2 = set()
  616. for _item2 in _doc2["json_set_docid"]:
  617. _set2.add(_item2["docid"])
  618. if len(_set1&_set2)>0:
  619. new_json_set_docid = _doc1["json_set_docid"]
  620. for _item2 in _doc2["json_set_docid"]:
  621. if _item2["docid"] not in _set1:
  622. new_json_set_docid.append(_item2)
  623. self.forward(_doc1["id"],_doc2["id"],json.dumps(new_json_set_docid))
  624. def getBestDocid(list_pair):
  625. # [docid1,extract_count1,docid2,extract_count2]
  626. # list_pair.sort(key=lambda x:x[3],reverse=True)
  627. # _max_count = max(list_pair[0][3],list_pair[0][1])
  628. # set_candidate = set()
  629. # if list_pair[0][1]==_max_count:
  630. # set_candidate.add(list_pair[0][0])
  631. # for item in list_pair:
  632. # if item[3]==_max_count:
  633. # set_candidate.add(item[2])
  634. # else:
  635. # break
  636. # list_candidate = list(set_candidate)
  637. # list_candidate.sort(key=lambda x:x)
  638. new_pair = []
  639. new_pair.append([list_pair[0][0],list_pair[0][0],list_pair[0][1]])
  640. for item in list_pair:
  641. new_pair.append([item[0],item[2],item[3]])
  642. new_pair.sort(key=lambda x:x[1])
  643. new_pair.sort(key=lambda x:x[2],reverse=True)
  644. return new_pair[0][1]
  645. @annotate('bigint,bigint,bigint,bigint->string')
  646. class choose_document(BaseUDAF):
  647. '''
  648. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  649. '''
  650. def __init__(self):
  651. import json
  652. global json
  653. def new_buffer(self):
  654. return [[]]
  655. def iterate(self, buffer,docid1,extract_count1,docid2,extract_count2):
  656. buffer[0].append([docid1,extract_count1,docid2,extract_count2])
  657. def merge(self, buffer, pbuffer):
  658. buffer[0].extend(pbuffer[0])
  659. def terminate(self, buffer):
  660. list_pair = buffer[0]
  661. _set = set()
  662. for item in buffer[0]:
  663. _set.add(str(item[2]))
  664. list_dumplicate = list(_set)
  665. best_docid = getBestDocid(list_pair)
  666. if best_docid==list_pair[0][0]:
  667. save_flag = 1
  668. else:
  669. save_flag = 0
  670. return json.dumps({"save_flag":save_flag,"dumplicates":list_dumplicate})
  671. @annotate('string -> bigint,string')
  672. class f_get_choose_document(BaseUDTF):
  673. '''
  674. 将多个组拆解成多条记录
  675. '''
  676. def __init__(self):
  677. import logging
  678. import json
  679. global json,logging
  680. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  681. def process(self,json_choose):
  682. if json_choose is None:
  683. self.forward(1,None)
  684. else:
  685. _choose = json.loads(json_choose)
  686. self.forward(_choose["save_flag"],",".join(_choose["dumplicates"]))
  687. @annotate('bigint,bigint,bigint,bigint->string')
  688. class group_document_bestFirst(BaseUDAF):
  689. '''
  690. 将组里面最优的放在前面
  691. '''
  692. def __init__(self):
  693. import json
  694. global json
  695. def new_buffer(self):
  696. return [[]]
  697. def iterate(self, buffer,docid1,extract_count1,docid2,extract_count2):
  698. buffer[0].append([docid1,extract_count1,docid2,extract_count2])
  699. def merge(self, buffer, pbuffer):
  700. buffer[0].extend(pbuffer[0])
  701. def terminate(self, buffer):
  702. list_pair = buffer[0]
  703. _set = set()
  704. for item in buffer[0]:
  705. _set.add(item[2])
  706. _set.add(list_pair[0][0])
  707. best_docid = getBestDocid(list_pair)
  708. _set.remove(best_docid)
  709. list_dumplicate = list(_set)
  710. list_dumplicate.sort(key=lambda x:x)
  711. list_dumplicate.insert(0,best_docid)
  712. list_dumplicate_str = []
  713. for item in list_dumplicate:
  714. list_dumplicate_str.append(str(item))
  715. return ",".join(list_dumplicate_str)
  716. @annotate('string -> bigint,string')
  717. class f_get_best_dumplicates(BaseUDTF):
  718. '''
  719. 得到每个分组中最优的那一条及其重复记录
  720. '''
  721. def __init__(self):
  722. import logging
  723. import json
  724. global json,logging
  725. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  726. def process(self,list_dumplicate_str):
  727. if list_dumplicate_str is None:
  728. pass
  729. else:
  730. list_dumplicate = list_dumplicate_str.split(",")
  731. if len(list_dumplicate)>0:
  732. self.forward(int(list_dumplicate[0]),",".join(list_dumplicate[1:]))
  733. else:
  734. pass
  735. @annotate('bigint,bigint->string')
  736. class bridge2group(BaseUDAF):
  737. '''
  738. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""
  739. '''
  740. def __init__(self):
  741. import json
  742. global json
  743. def new_buffer(self):
  744. return [set()]
  745. def iterate(self, buffer,docid1,docid2):
  746. buffer[0].add(docid1)
  747. buffer[0].add(docid2)
  748. def merge(self, buffer, pbuffer):
  749. buffer[0] |= pbuffer[0]
  750. def terminate(self, buffer):
  751. list_pair = list(buffer[0])
  752. list_pair.sort(key=lambda x:x,reverse=True)
  753. return json.dumps(list_pair)
  754. @annotate('string -> bigint,bigint')
  755. class group2bridge(BaseUDTF):
  756. '''
  757. 将多个组拆解成多条记录
  758. '''
  759. def __init__(self):
  760. import logging
  761. import json
  762. global json,logging
  763. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  764. def process(self,json_list_docid):
  765. list_docid = json.loads(json_list_docid)
  766. for _docid in list_docid:
  767. self.forward(list_docid[-1],_docid)
  768. @annotate('bigint,bigint,string -> bigint')
  769. class f_get_dump_docid(BaseUDTF):
  770. '''
  771. 将多个组拆解成多条记录
  772. '''
  773. def __init__(self):
  774. import logging
  775. import json
  776. global json,logging
  777. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  778. def process(self,docid,save_flag,dumplicates):
  779. if save_flag==0:
  780. self.forward(docid)
  781. if dumplicates is not None:
  782. list_docid = dumplicates.split(",")
  783. if len(list_docid)>0:
  784. for _docid in list_docid[1:]:
  785. self.forward(int(_docid))
  786. else:
  787. if dumplicates is not None:
  788. list_docid = dumplicates.split(",")
  789. if len(list_docid)>0:
  790. for _docid in list_docid:
  791. self.forward(int(_docid))
  792. @annotate('string -> bigint,bigint')
  793. class f_get_docid(BaseUDTF):
  794. '''
  795. 将多个组拆解成多条记录
  796. '''
  797. def __init__(self):
  798. import logging
  799. import json
  800. global json,logging
  801. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  802. def process(self,json_set_docid):
  803. team_id = 0
  804. if json_set_docid is not None:
  805. list_docses = json.loads(json_set_docid)
  806. for list_docs in list_docses:
  807. team_id += 1
  808. for item in list_docs:
  809. self.forward(team_id,item["docid"])
  810. @annotate("string->bigint")
  811. class get_count_dump(object):
  812. def __init__(self):
  813. import logging
  814. import re
  815. global logging,re
  816. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  817. def evaluate(self, title):
  818. _count = 0
  819. if title is not None:
  820. _count = len(title.split(","))
  821. return _count
  822. def getSet(list_dict,key):
  823. _set = set()
  824. for item in list_dict:
  825. if key in item:
  826. if item[key]!='' and item[key] is not None:
  827. if re.search("^\d[\d\.]*$",item[key]) is not None:
  828. _set.add(str(float(item[key])))
  829. else:
  830. _set.add(str(item[key]))
  831. return _set
  832. def getDiffIndex(list_dict,key,confidence=100):
  833. _set = set()
  834. for _i in range(len(list_dict)):
  835. item = list_dict[_i]
  836. if item["confidence"]>=confidence:
  837. continue
  838. if key in item:
  839. if item[key]!='' and item[key] is not None:
  840. if re.search("^\d+(\.\d+)?$",item[key]) is not None:
  841. _set.add(str(float(item[key])))
  842. else:
  843. _set.add(str(item[key]))
  844. if len(_set)>1:
  845. return _i
  846. return len(list_dict)
  847. @annotate('bigint,string -> bigint,bigint')
  848. class f_getGroup_dumpFinal(BaseUDTF):
  849. '''
  850. 从最后的结果中获取组
  851. '''
  852. def __init__(self):
  853. import logging
  854. import json
  855. global json,logging
  856. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  857. def process(self,docid,dumplicates):
  858. self.forward(int(docid),int(docid))
  859. if dumplicates is not None:
  860. list_docids = dumplicates.split(",")
  861. for _docid in list_docids:
  862. self.forward(int(docid),int(_docid))
  863. @annotate('bigint,bigint,string,string,string,string,bigint,bigint,bigint->string')
  864. class f_redump_limit_num(BaseUDAF):
  865. '''
  866. 去重合并后重新判断,组内个数大于5时,dottitle、tenderee、win_tenderer、bidding_budget组内只能有一个取值
  867. 组内个数小于等于5时,tenderee、win_tenderer、bidding_budget组内只能有一个取值
  868. '''
  869. def __init__(self):
  870. import logging
  871. import json,re
  872. global json,logging,re
  873. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  874. def new_buffer(self):
  875. return [list()]
  876. def iterate(self, buffer,main_docid,docid,doctitle,set_limit_column2,set_limit_column3,set_limit_column4,extract_count1,extract_count2,confidence):
  877. buffer[0].append({"main_docid":main_docid,"docid":docid,"doctitle":doctitle,"set_limit_column2":set_limit_column2,
  878. "set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,"extract_count1":extract_count1,
  879. "extract_count2":extract_count2,"confidence":confidence})
  880. def merge(self, buffer, pbuffer):
  881. buffer[0].extend(pbuffer[0])
  882. def terminate(self, buffer):
  883. list_group = []
  884. the_group = buffer[0]
  885. the_group.sort(key=lambda x:x["confidence"],reverse=True)
  886. if len(the_group)>5:
  887. keys = ["doctitle","set_limit_column2","set_limit_column3","set_limit_column4"]
  888. else:
  889. keys = ["set_limit_column2","set_limit_column3","set_limit_column4"]
  890. final_group = []
  891. #置信度
  892. list_key_index = []
  893. for _k in keys:
  894. if _k=="doctitle":
  895. list_key_index.append(getDiffIndex(the_group,_k,confidence=30))
  896. else:
  897. list_key_index.append(getDiffIndex(the_group,_k))
  898. _index = min(list_key_index)
  899. if _index>1:
  900. main_docid = the_group[0]["main_docid"]
  901. for item in the_group[:_index]:
  902. if item["docid"]!=main_docid:
  903. final_group.append({"docid1":main_docid,"docid2":item["docid"],"extract_count1":item["extract_count1"],"extract_count2":item["extract_count2"],"confidence":item["confidence"]})
  904. # stay = True
  905. # for _key in keys:
  906. # if len(getSet(the_group,_key))>1:
  907. # stay = False
  908. # break
  909. #
  910. # if stay:
  911. # main_docid = the_group[0]["main_docid"]
  912. # for item in the_group:
  913. # if item["docid"]!=main_docid:
  914. # final_group.append({"docid1":main_docid,"docid2":item["docid"],"extract_count1":item["extract_count1"],"extract_count2":item["extract_count2"]})
  915. return json.dumps(final_group)
  916. @annotate('string -> bigint,bigint,bigint,bigint,bigint')
  917. class f_get_dumpFinal_checked(BaseUDTF):
  918. '''
  919. 从最后的结果中获取组
  920. '''
  921. def __init__(self):
  922. import logging
  923. import json
  924. global json,logging
  925. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  926. def process(self,list_group):
  927. if list_group is not None:
  928. final_group = json.loads(list_group)
  929. for _group in final_group:
  930. self.forward(_group["docid1"],_group["docid2"],_group["extract_count1"],_group["extract_count2"],_group["confidence"])
  931. @annotate('string -> bigint')
  932. class f_getDumplicateDocids(BaseUDTF):
  933. '''
  934. 从最后的结果中获取组
  935. '''
  936. def __init__(self):
  937. import logging
  938. import json
  939. global json,logging
  940. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  941. def process(self,dumplicates):
  942. list_docids = dumplicates.split(",")
  943. for _d in list_docids:
  944. self.forward(int(_d))
  945. def getSimilarityOfString(str1,str2):
  946. _set1 = set()
  947. _set2 = set()
  948. if str1 is not None:
  949. for i in range(1,len(str1)):
  950. _set1.add(str1[i-1:i+1])
  951. if str2 is not None:
  952. for i in range(1,len(str2)):
  953. _set2.add(str2[i-1:i+1])
  954. _len = max(1,min(len(_set1),len(_set2)))
  955. return len(_set1&_set2)/_len
  956. @annotate("string,string,string,string,string,string,string,string,string,string->bigint")
  957. class f_is_legal(object):
  958. def __init__(self):
  959. import logging
  960. import re
  961. global logging,re
  962. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  963. def evaluate(self, tenderee1,tenderee2,bidding_budget1,budding_budget2,win_tenderee1,win_tenderee2,win_bid_price1,win_bid_price2,project_code1,project_code2):
  964. if tenderee1 is not None and tenderee1!="" and tenderee2 is not None and tenderee2!="" and tenderee1!=tenderee2:
  965. return 0
  966. if bidding_budget1 is not None and bidding_budget1!="" and budding_budget2 is not None and budding_budget2!="" and bidding_budget1!=budding_budget2:
  967. return 0
  968. if win_tenderee1 is not None and win_tenderee1!="" and win_tenderee2 is not None and win_tenderee2!="" and win_tenderee1!=win_tenderee2:
  969. return 0
  970. if win_bid_price1 is not None and win_bid_price1!="" and win_bid_price2 is not None and win_bid_price2!="" and win_bid_price1!=win_bid_price2:
  971. return 0
  972. _sim = getSimilarityOfString(project_code1,project_code2)
  973. if _sim>0.7 and _sim<1:
  974. return 0
  975. return 1
  976. @annotate('bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,bigint,bigint,bigint->string')
  977. class f_autorule_group(BaseUDAF):
  978. '''
  979. 去重合并后重新判断,组内个数大于5时,dottitle、tenderee、win_tenderer、bidding_budget组内只能有一个取值
  980. 组内个数小于等于5时,tenderee、win_tenderer、bidding_budget组内只能有一个取值
  981. '''
  982. def __init__(self):
  983. import logging
  984. import json,re
  985. global json,logging,re
  986. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  987. def new_buffer(self):
  988. return [list()]
  989. def iterate(self, buffer,main_docid,docid,docchannel,doctitle,doctitle_refine,area,province,city,district,web_source_no,fingerprint,
  990. project_code,project_name,tenderee,agency,win_tenderer,bidding_budget,win_bid_price,extract_count1,extract_count2,confidence):
  991. buffer[0].append({"main_docid":main_docid,"docid":docid,"docchannel":docchannel,"doctitle":doctitle,
  992. "doctitle_refine":doctitle_refine,"area":area,"province":province,
  993. "city":city,"district":district,"web_source_no":web_source_no,"fingerprint":fingerprint,
  994. "project_code":project_code,"project_name":project_name,"tenderee":tenderee,"agency":agency,
  995. "win_tenderer":win_tenderer,"bidding_budget":bidding_budget,"win_bid_price":win_bid_price,
  996. "extract_count1":extract_count1,"extract_count2":extract_count2,"confidence":confidence})
  997. def merge(self, buffer, pbuffer):
  998. buffer[0].extend(pbuffer[0][:100])
  999. buffer[0] = buffer[0][:100]
  1000. def getSameKeys(self,_dict1,_dict2):
  1001. list_keys = []
  1002. for k,v in _dict1.items():
  1003. if k in ["area","city","confidence","district","extract_count1","extract_count2","main_docid","province"]:
  1004. continue
  1005. v2 = _dict2.get(k,"")
  1006. if v is not None and v!="" and v2 is not None and v2!="" and v==v2:
  1007. list_keys.append(k)
  1008. list_keys.sort(key=lambda x:x)
  1009. return "=".join(list_keys)
  1010. def terminate(self, buffer):
  1011. list_group = []
  1012. the_group = buffer[0]
  1013. the_group.sort(key=lambda x:x["confidence"],reverse=True)
  1014. if len(the_group)>5:
  1015. keys = ["doctitle","tenderee","win_tenderer","bidding_budget","win_bid_price"]
  1016. else:
  1017. keys = ["tenderee","win_tenderer","bidding_budget","win_bid_price"]
  1018. #置信度
  1019. list_key_index = []
  1020. for _k in keys:
  1021. if _k=="doctitle":
  1022. list_key_index.append(getDiffIndex(the_group,_k,confidence=30))
  1023. else:
  1024. list_key_index.append(getDiffIndex(the_group,_k))
  1025. final_group = []
  1026. _index = min(list_key_index)
  1027. if _index>1:
  1028. for item in the_group[:_index]:
  1029. final_group.append(item)
  1030. list_rules = []
  1031. for i in range(len(final_group)):
  1032. for j in range(i+1,len(final_group)):
  1033. _dict1 = final_group[i]
  1034. _dict2 = final_group[j]
  1035. _rule = self.getSameKeys(_dict1,_dict2)
  1036. list_rules.append([_rule,_dict1.get("docid"),_dict2.get("docid")])
  1037. return json.dumps(list_rules)
  1038. @annotate('string -> string,bigint,bigint')
  1039. class f_autorule_group_extract(BaseUDTF):
  1040. '''
  1041. 从最后的结果中获取组
  1042. '''
  1043. def __init__(self):
  1044. import logging
  1045. import json
  1046. global json,logging
  1047. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  1048. def process(self,rules_json):
  1049. list_rules = json.loads(rules_json)
  1050. for _rule in list_rules:
  1051. self.forward(_rule[0],_rule[1],_rule[2])