documentMerge.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612
  1. #coding:UTF8
  2. from odps.udf import annotate
  3. from odps.udf import BaseUDAF
  4. from odps.udf import BaseUDTF
  5. def getSet(list_dict,key):
  6. _set = set()
  7. for item in list_dict:
  8. if key in item:
  9. if item[key]!='' and item[key] is not None:
  10. if re.search("^[\d\.]+$",item[key]) is not None:
  11. _set.add(str(float(item[key])))
  12. else:
  13. _set.add(str(item[key]))
  14. return _set
  15. def split_with_time(list_dict,sort_key,timedelta=86400*120):
  16. if len(list_dict)>0:
  17. if sort_key in list_dict[0]:
  18. list_dict.sort(key=lambda x:x[sort_key])
  19. list_group = []
  20. _begin = 0
  21. for i in range(len(list_dict)-1):
  22. if abs(list_dict[i][sort_key]-list_dict[i+1][sort_key])<timedelta:
  23. continue
  24. else:
  25. _group = []
  26. for j in range(_begin,i+1):
  27. _group.append(list_dict[j])
  28. if len(_group)>1:
  29. list_group.append(_group)
  30. _begin = i + 1
  31. if len(list_dict)>1:
  32. _group = []
  33. for j in range(_begin,len(list_dict)):
  34. _group.append(list_dict[j])
  35. if len(_group)>1:
  36. list_group.append(_group)
  37. return list_group
  38. return [list_dict]
  39. @annotate('bigint,bigint,string,string,string,string,string,string,bigint->string')
  40. class f_merge_rule_limit_num_contain_greater(BaseUDAF):
  41. '''
  42. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同
  43. '''
  44. def __init__(self):
  45. import logging
  46. import json,re
  47. global json,logging,re
  48. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  49. def new_buffer(self):
  50. return [list()]
  51. def iterate(self, buffer,docid,page_time_stamp,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column,greater_column,MAX_NUM):
  52. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"set_limit_column1":set_limit_column1,
  53. "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,
  54. "contain_column":contain_column,"greater_column":greater_column,"MAX_NUM":MAX_NUM})
  55. def merge(self, buffer, pbuffer):
  56. buffer[0].extend(pbuffer[0])
  57. def terminate(self, buffer):
  58. MAX_NUM = 5
  59. if len(buffer[0])>0:
  60. MAX_NUM = buffer[0][0]["MAX_NUM"]
  61. list_split = split_with_time(buffer[0],"page_time_stamp")
  62. list_group = []
  63. for _split in list_split:
  64. flag = True
  65. keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"]
  66. dict_set = {}
  67. for _key in keys:
  68. dict_set[_key] = set()
  69. if len(_split)>MAX_NUM:
  70. flag = False
  71. else:
  72. for _key in keys:
  73. logging.info(_key+str(getSet(_split,_key)))
  74. if len(getSet(_split,_key))>1:
  75. flag = False
  76. break
  77. MAX_CONTAIN_COLUMN = None
  78. #判断组内每条公告是否包含
  79. if flag:
  80. for _d in _split:
  81. contain_column = _d["contain_column"]
  82. if contain_column is not None and contain_column !="":
  83. if MAX_CONTAIN_COLUMN is None:
  84. MAX_CONTAIN_COLUMN = contain_column
  85. else:
  86. if len(MAX_CONTAIN_COLUMN)<len(contain_column):
  87. if contain_column.find(MAX_CONTAIN_COLUMN)==-1:
  88. flag = False
  89. break
  90. MAX_CONTAIN_COLUMN = contain_column
  91. else:
  92. if MAX_CONTAIN_COLUMN.find(contain_column)==-1:
  93. flag = False
  94. break
  95. if len(getSet(_split,"greater_column"))==1:
  96. flag = False
  97. break
  98. if flag:
  99. _set_docid = set()
  100. for item in _split:
  101. _set_docid.add(item["docid"])
  102. if len(_set_docid)>1:
  103. list_group.append(list(_set_docid))
  104. return json.dumps(list_group)
  105. def getDiffIndex(list_dict,key):
  106. _set = set()
  107. for _i in range(len(list_dict)):
  108. item = list_dict[_i]
  109. if key in item:
  110. if item[key]!='' and item[key] is not None:
  111. if re.search("^\d[\d\.]*$",item[key]) is not None:
  112. _set.add(str(float(item[key])))
  113. else:
  114. _set.add(str(item[key]))
  115. if len(_set)>1:
  116. return _i
  117. return len(list_dict)
  118. @annotate('bigint,bigint,string,string,string,string,string,string,string,bigint->string')
  119. class f_remege_limit_num_contain(BaseUDAF):
  120. '''
  121. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同
  122. '''
  123. def __init__(self):
  124. import logging
  125. import json,re
  126. global json,logging,re
  127. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  128. def new_buffer(self):
  129. return [list()]
  130. def iterate(self, buffer,docid,page_time_stamp,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column1,contain_column2,notLike_column,confidence):
  131. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"set_limit_column1":set_limit_column1,
  132. "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,
  133. "contain_column1":contain_column1,"contain_column2":contain_column2,"notLike_column":notLike_column,"confidence":confidence})
  134. def merge(self, buffer, pbuffer):
  135. buffer[0].extend(pbuffer[0])
  136. def getNotLikeSet(self,_dict,column_name):
  137. column_value = _dict.get(column_name,None)
  138. _set = set()
  139. if column_value is not None:
  140. for _i in range(1,len(column_value)):
  141. _set.add(column_value[_i-1:_i+1])
  142. _dict["notLike_set"] = _set
  143. def getSimilarity(self,_set1,_set2):
  144. _sum = max([1,min([len(_set1),len(_set2)])])
  145. return len(_set1&_set2)/_sum
  146. def terminate(self, buffer):
  147. list_group = []
  148. the_group = buffer[0]
  149. SIM_PROB = 0.6
  150. for _d in the_group:
  151. self.getNotLikeSet(_d,"notLike_column")
  152. #判断多个值与否
  153. keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"]
  154. re_merge = False
  155. for _key in keys:
  156. if len(getSet(the_group,_key))>1:
  157. re_merge = True
  158. break
  159. #判断是否相似而不相同
  160. re_merge_sim = False
  161. for _i1 in range(0,len(the_group)):
  162. for _j1 in range(_i1+1,len(the_group)):
  163. _set1 = the_group[_i1]["notLike_set"]
  164. _set2 = the_group[_j1]["notLike_set"]
  165. _sim = self.getSimilarity(_set1,_set2)
  166. if _sim>SIM_PROB and _sim<1:
  167. re_merge_sim = True
  168. break
  169. contain_keys = ["contain_column1","contain_column2"]
  170. logging.info(the_group)
  171. if re_merge or re_merge_sim:
  172. the_group.sort(key=lambda x:x["confidence"],reverse=True)
  173. the_group.sort(key=lambda x:x["page_time_stamp"])
  174. #重新成组
  175. dict_docid_doc = {}
  176. for _doc in the_group:
  177. dict_docid_doc[_doc["docid"]] = _doc
  178. for _doc in the_group:
  179. merge_flag = False
  180. for _index in range(len(list_group)):
  181. _g = list_group[_index]
  182. hit_count = 0
  183. dict_temp = dict()
  184. #多个值的异常
  185. if re_merge:
  186. for _c_key in contain_keys:
  187. dict_temp[_c_key] = _g[_c_key]
  188. if _g[_c_key] is not None and _doc[_c_key] is not None:
  189. if len(_g[_c_key])>len(_doc[_c_key]):
  190. if str(_g[_c_key]).find(str(_doc[_c_key]))>=0:
  191. dict_temp[_c_key] = _g[_c_key]
  192. hit_count += 1
  193. else:
  194. if str(_doc[_c_key]).find(str(_g[_c_key]))>=0:
  195. dict_temp[_c_key] = _doc[_c_key]
  196. _g[_c_key] = _doc[_c_key]
  197. hit_count += 1
  198. else:
  199. hit_count = 1
  200. # if hit_count==len(contain_keys):
  201. if hit_count>0:
  202. _flag_sim = False
  203. #相似而不相同的异常
  204. if re_merge_sim:
  205. for _docid in _g["docid"]:
  206. tmp_d = dict_docid_doc[_docid]
  207. _sim = self.getSimilarity(tmp_d["notLike_set"],_doc["notLike_set"])
  208. if _sim>SIM_PROB and _sim<1:
  209. _flag_sim = True
  210. if not _flag_sim:
  211. for _c_key in dict_temp.keys():
  212. _g[_c_key] = dict_temp[_c_key]
  213. _g["docid"].append(_doc["docid"])
  214. merge_flag = True
  215. break
  216. if not merge_flag:
  217. _dict = dict()
  218. _dict["docid"] = [_doc["docid"]]
  219. for _c_key in contain_keys:
  220. _dict[_c_key] = _doc[_c_key]
  221. list_group.append(_dict)
  222. final_group = []
  223. #判断是否符合一个值
  224. for _group in list_group:
  225. _split = []
  226. for _docid in _group["docid"]:
  227. _split.append(dict_docid_doc[_docid])
  228. #通过置信度排序,尽可能保留组
  229. _split.sort(key=lambda x:x["confidence"],reverse=True)
  230. #置信度
  231. list_key_index = []
  232. for _k in keys:
  233. list_key_index.append(getDiffIndex(_split,_k))
  234. _index = min(list_key_index)
  235. final_group.append([_c["docid"] for _c in _split[:_index]])
  236. for _c in _split[_index:]:
  237. final_group.append([_c["docid"]])
  238. #若是找到两个以上,则全部单独成组,否则成一组
  239. # _flag = True
  240. # for _key in keys:
  241. # if len(getSet(_split,_key))>1:
  242. # _flag = False
  243. # break
  244. # if not _flag:
  245. # for _docid in _group["docid"]:
  246. # final_group.append([_docid])
  247. # else:
  248. # final_group.append(list(set(_group["docid"])))
  249. else:
  250. final_group = [list(set([item["docid"] for item in the_group]))]
  251. return json.dumps(final_group)
  252. @annotate('string -> string')
  253. class f_get_remerge_group(BaseUDTF):
  254. '''
  255. 将多个组拆解成多条记录
  256. '''
  257. def __init__(self):
  258. import logging
  259. import json
  260. global json,logging
  261. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  262. def process(self,json_remerge):
  263. if json_remerge is not None:
  264. list_group = json.loads(json_remerge)
  265. for _group in list_group:
  266. l_g = list(set(_group))
  267. l_g.sort(key=lambda x:x)
  268. list_docid = [str(_docid) for _docid in l_g]
  269. self.forward(",".join(list_docid))
  270. @annotate('bigint,bigint,string->string')
  271. class f_merge_probability(BaseUDAF):
  272. '''
  273. 合并组为一条记录
  274. '''
  275. def __init__(self):
  276. import json
  277. global json
  278. def new_buffer(self):
  279. return [[]]
  280. def iterate(self, buffer,docid,page_time_stamp,_type):
  281. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"type":_type})
  282. def merge(self, buffer, pbuffer):
  283. buffer[0].extend(pbuffer[0])
  284. def terminate(self, buffer):
  285. list_dict = buffer[0]
  286. list_dict = list_dict[:10000]
  287. list_group = split_with_time(list_dict,sort_key="page_time_stamp",timedelta=86400*120)
  288. return json.dumps(list_group)
  289. @annotate('string -> bigint,bigint,bigint,bigint,string')
  290. class f_split_merge_probability(BaseUDTF):
  291. def __init__(self):
  292. import logging
  293. import json
  294. global logging,json
  295. logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  296. def process(self,list_group_str):
  297. logging.info("0")
  298. logging.info(list_group_str)
  299. if list_group_str is not None:
  300. logging.info("1")
  301. try:
  302. list_group = json.loads(list_group_str)
  303. logging.info("2")
  304. for _group in list_group:
  305. if len(_group)>0:
  306. _type = _group[0].get("type","")
  307. logging.info("3%d"%len(list_group))
  308. # _group.sort(key=lambda x:x["page_time_stamp"])
  309. _len = min(100,len(_group))
  310. for _index_i in range(_len):
  311. _count = 0
  312. for _index_j in range(_index_i+1,_len):
  313. if abs(_group[_index_j]["page_time_stamp"]-_group[_index_i]["page_time_stamp"])>86400*120:
  314. break
  315. _count += 1
  316. _docid1 = _group[_index_i]["docid"]
  317. _docid2 = _group[_index_j]["docid"]
  318. if _docid1<_docid2:
  319. self.forward(_docid1,_docid2,1,_len,_type)
  320. else:
  321. self.forward(_docid2,_docid1,1,_len,_type)
  322. except Exception as e:
  323. logging(str(e))
  324. @annotate('bigint,bigint,string->string')
  325. class f_merge_groupPairs(BaseUDAF):
  326. '''
  327. 合并组为一条记录
  328. '''
  329. def __init__(self):
  330. import json
  331. global json
  332. def new_buffer(self):
  333. return [[]]
  334. def iterate(self, buffer,is_exists,counts,_type):
  335. buffer[0].append({"is_exists":is_exists,"counts":counts,"_type":_type})
  336. def merge(self, buffer, pbuffer):
  337. buffer[0].extend(pbuffer[0])
  338. def terminate(self, buffer):
  339. list_dict = buffer[0]
  340. list_dict = list_dict[:10000]
  341. return json.dumps(list_dict)
  342. @annotate("string -> bigint,bigint,bigint")
  343. class f_merge_getLabel(BaseUDTF):
  344. def __init__(self):
  345. import logging
  346. import json
  347. global logging,json
  348. def process(self,str_docids):
  349. if str_docids is not None:
  350. list_docids = [int(i) for i in str_docids.split(",")]
  351. list_docids.sort(key=lambda x:x)
  352. _len = min(100,len(list_docids))
  353. for index_i in range(_len):
  354. docid_less = list_docids[index_i]
  355. for index_j in range(index_i+1,_len):
  356. docid_greater = list_docids[index_j]
  357. self.forward(docid_less,docid_greater,1)
  358. def getSimilarityOfString(str1,str2):
  359. _set1 = set()
  360. _set2 = set()
  361. if str1 is not None:
  362. for i in range(1,len(str1)):
  363. _set1.add(str1[i-1:i+1])
  364. if str2 is not None:
  365. for i in range(1,len(str2)):
  366. _set2.add(str2[i-1:i+1])
  367. _len = max(1,min(len(_set1),len(_set2)))
  368. return len(_set1&_set2)/_len
  369. def check_columns(tenderee_less,tenderee_greater,
  370. agency_less,agency_greater,project_code_less,project_code_greater,project_name_less,project_name_greater,
  371. win_tenderer_less,win_tenderer_greater,win_bid_price_less,win_bid_price_greater,
  372. bidding_budget_less,bidding_budget_greater,doctitle_refine_less,doctitle_refine_greater):
  373. flag = True
  374. _set_tenderee = set()
  375. if tenderee_less is not None and tenderee_less!="":
  376. _set_tenderee.add(tenderee_less)
  377. if tenderee_greater is not None and tenderee_greater!="":
  378. _set_tenderee.add(tenderee_greater)
  379. if len(_set_tenderee)>1:
  380. return False
  381. code_sim = getSimilarityOfString(project_code_less,project_code_greater)
  382. if code_sim>0.6 and code_sim<1:
  383. return False
  384. _set_win_tenderer = set()
  385. if win_tenderer_less is not None and tenderee_less!="":
  386. _set_win_tenderer.add(win_tenderer_less)
  387. if win_tenderer_greater is not None and win_tenderer_greater!="":
  388. _set_win_tenderer.add(win_tenderer_greater)
  389. if len(_set_win_tenderer)>1:
  390. return False
  391. _set_win_bid_price = set()
  392. if win_bid_price_less is not None and win_bid_price_less!="":
  393. _set_win_bid_price.add(win_bid_price_less)
  394. if win_bid_price_greater is not None and win_bid_price_greater!="":
  395. _set_win_bid_price.add(win_bid_price_greater)
  396. if len(_set_win_bid_price)>1:
  397. return False
  398. _set_bidding_budget = set()
  399. if bidding_budget_less is not None and bidding_budget_less!="":
  400. _set_bidding_budget.add(bidding_budget_less)
  401. if bidding_budget_greater is not None and bidding_budget_greater!="":
  402. _set_bidding_budget.add(bidding_budget_greater)
  403. if len(_set_bidding_budget)>1:
  404. return False
  405. return True
  406. def getSimLevel(str1,str2):
  407. str1_null = False
  408. str2_null = False
  409. _v = 0
  410. if str1 is None or str1=="":
  411. str1_null = True
  412. if str2 is None or str2=="":
  413. str2_null = True
  414. if str1_null and str2_null:
  415. _v = 2
  416. elif str1_null and not str2_null:
  417. _v = 4
  418. elif not str1_null and str2_null:
  419. _v = 6
  420. elif not str1_null and not str2_null:
  421. if str1==str2:
  422. _v = 10
  423. else:
  424. _v = 0
  425. return _v
  426. import math
  427. def featurnCount(_count,max_count=100):
  428. return max(0,min(1,_count))*(1/math.sqrt(max(1,_count-1)))
  429. @annotate("string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string->string")
  430. class f_merge_featureMatrix(BaseUDTF):
  431. def __init__(self):
  432. import logging
  433. import json
  434. global logging,json
  435. def process(self,json_context,tenderee_less,tenderee_greater,
  436. agency_less,agency_greater,project_code_less,project_code_greater,project_name_less,project_name_greater,
  437. win_tenderer_less,win_tenderer_greater,win_bid_price_less,win_bid_price_greater,
  438. bidding_budget_less,bidding_budget_greater,doctitle_refine_less,doctitle_refine_greater):
  439. # if not check_columns(tenderee_less,tenderee_greater,
  440. # agency_less,agency_greater,project_code_less,project_code_greater,project_name_less,project_name_greater,
  441. # win_tenderer_less,win_tenderer_greater,win_bid_price_less,win_bid_price_greater,
  442. # bidding_budget_less,bidding_budget_greater,doctitle_refine_less,doctitle_refine_greater)
  443. # return
  444. _context = json.loads(json_context)
  445. dict_context = {}
  446. for item in _context:
  447. dict_context[item["_type"]] = [item["is_exists"],item["counts"]]
  448. context_key = ["tenderee","agency","project_code","project_name","win_tenderer","win_bid_price","bidding_budget","doctitle_refine"]
  449. list_matrix = []
  450. for index_i in range(len(context_key)):
  451. for index_j in range(index_i+1,len(context_key)):
  452. _key = "%s&%s"%(context_key[index_i],context_key[index_j])
  453. _v = featurnCount(dict_context.get(_key,[0,0])[1])
  454. list_matrix.append(_v)
  455. context3_key = ["tenderee","agency","win_tenderer","win_bid_price","bidding_budget"]
  456. for index_i in range(len(context3_key)):
  457. for index_j in range(index_i+1,len(context3_key)):
  458. for index_k in range(index_j+1,len(context3_key)):
  459. _key = "%s&%s&%s"%(context3_key[index_i],context3_key[index_j],context3_key[index_k])
  460. _v = featurnCount(dict_context.get(_key,[0,0])[1])
  461. list_matrix.append(_v)
  462. list_matrix.append(getSimLevel(tenderee_less,tenderee_greater)/10)
  463. list_matrix.append(getSimLevel(agency_less,agency_greater)/10)
  464. list_matrix.append(getSimilarityOfString(project_code_less,project_code_greater))
  465. list_matrix.append(getSimilarityOfString(project_name_less,project_name_greater))
  466. list_matrix.append(getSimLevel(win_tenderer_less,win_tenderer_greater)/10)
  467. list_matrix.append(getSimLevel(win_bid_price_less,win_bid_price_greater)/10)
  468. list_matrix.append(getSimLevel(bidding_budget_less,bidding_budget_greater)/10)
  469. list_matrix.append(getSimilarityOfString(doctitle_refine_less,doctitle_refine_greater))
  470. self.forward(json.dumps(list_matrix))
  471. @annotate('string -> bigint,bigint')
  472. class f_check_remerge(BaseUDTF):
  473. '''
  474. 将多个组拆解成多条记录
  475. '''
  476. def __init__(self):
  477. import logging
  478. import json
  479. global json,logging
  480. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  481. def process(self,json_remerge):
  482. if json_remerge is not None:
  483. list_group = json.loads(json_remerge)
  484. for _group in list_group:
  485. for _docid in _group:
  486. self.forward(_group[-1],_docid)
  487. def getConfidence(rule_id):
  488. if rule_id >=1 and rule_id <=20:
  489. return 30
  490. elif rule_id>=31 and rule_id<=50:
  491. return 20
  492. else:
  493. return 10
  494. @annotate('string,bigint -> bigint,bigint,bigint')
  495. class f_arrange_group_single(BaseUDTF):
  496. '''
  497. 将多个组拆解成多条记录
  498. '''
  499. def __init__(self):
  500. import logging
  501. import json
  502. global json,logging
  503. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  504. def process(self,json_set_docid,rule_id):
  505. if json_set_docid is not None:
  506. list_group = json.loads(json_set_docid)
  507. for _group in list_group:
  508. for index_i in range(len(_group)):
  509. for index_j in range(len(_group)):
  510. # if index_i!=index_j and _group[index_i]!=_group[index_j]:
  511. if index_i!=index_j:
  512. self.forward(_group[index_i],_group[index_j],getConfidence(rule_id))
  513. @annotate('bigint,bigint->string')
  514. class f_get_merge_docids(BaseUDAF):
  515. '''
  516. 合并组为一条记录
  517. '''
  518. def __init__(self):
  519. import json
  520. global json
  521. def new_buffer(self):
  522. return [set()]
  523. def iterate(self, buffer,docid1,docid2):
  524. buffer[0].add(docid1)
  525. buffer[0].add(docid2)
  526. def merge(self, buffer, pbuffer):
  527. buffer[0] |= pbuffer[0]
  528. def terminate(self, buffer):
  529. set_docid = buffer[0]
  530. list_docid = list(set_docid)
  531. list_docid.sort(key=lambda x:x)
  532. list_docid_str = []
  533. for _docid in list_docid:
  534. list_docid_str.append(str(_docid))
  535. return ",".join(list_docid_str)