documentMerge.py 51 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241
  1. #coding:UTF8
  2. from odps.udf import annotate
  3. from odps.distcache import get_cache_archive
  4. from odps.distcache import get_cache_file
  5. from odps.udf import BaseUDTF,BaseUDAF
  6. import threading
  7. import logging
  8. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  9. import time
  10. import json
  11. def log(msg):
  12. logging.info(msg)
  13. # 配置pandas依赖包
  14. def include_package_path(res_name):
  15. import os, sys
  16. archive_files = get_cache_archive(res_name)
  17. dir_names = sorted([os.path.dirname(os.path.normpath(f.name)) for f in archive_files
  18. if '.dist_info' not in f.name], key=lambda v: len(v))
  19. _path = dir_names[0].split(".zip/files")[0]+".zip/files"
  20. log("add path:%s"%(_path))
  21. sys.path.append(_path)
  22. return os.path.dirname(dir_names[0])
  23. # 可能出现类似RuntimeError: xxx has been blocked by sandbox
  24. # 这是因为包含C的库,会被沙盘block,可设置set odps.isolation.session.enable = true
  25. def include_file(file_name):
  26. import os, sys
  27. so_file = get_cache_file(file_name)
  28. sys.path.append(os.path.dirname(os.path.abspath(so_file.name)))
  29. def include_so(file_name):
  30. import os, sys
  31. so_file = get_cache_file(file_name)
  32. with open(so_file.name, 'rb') as fp:
  33. content=fp.read()
  34. so = open(file_name, "wb")
  35. so.write(content)
  36. so.flush()
  37. so.close()
  38. #初始化业务数据包,由于上传限制,python版本以及archive解压包不统一等各种问题,需要手动导入
  39. def init_env(list_files,package_name):
  40. import os,sys
  41. if len(list_files)==1:
  42. so_file = get_cache_file(list_files[0])
  43. cmd_line = os.path.abspath(so_file.name)
  44. os.system("unzip -o %s -d %s"%(cmd_line,package_name))
  45. elif len(list_files)>1:
  46. cmd_line = "cat"
  47. for _file in list_files:
  48. so_file = get_cache_file(_file)
  49. cmd_line += " "+os.path.abspath(so_file.name)
  50. cmd_line += " > temp.zip"
  51. os.system(cmd_line)
  52. os.system("unzip -o temp.zip -d %s"%(package_name))
  53. # os.system("rm -rf %s/*.dist-info"%(package_name))
  54. # return os.listdir(os.path.abspath("local_package"))
  55. # os.system("echo export LD_LIBRARY_PATH=%s >> ~/.bashrc"%(os.path.abspath("local_package")))
  56. # os.system("source ~/.bashrc")
  57. sys.path.insert(0,os.path.abspath(package_name))
  58. # sys.path.append(os.path.join(os.path.abspath("local_package"),"interface_real"))
  59. import platform
  60. def getSet(list_dict,key):
  61. _set = set()
  62. for item in list_dict:
  63. if key in item:
  64. if item[key]!='' and item[key] is not None:
  65. if re.search("^[\d\.]+$",item[key]) is not None:
  66. _set.add(str(float(item[key])))
  67. else:
  68. _set.add(str(item[key]))
  69. return _set
  70. def split_with_time(list_dict,sort_key,timedelta=86400*120):
  71. if len(list_dict)>0:
  72. if sort_key in list_dict[0]:
  73. list_dict.sort(key=lambda x:x[sort_key])
  74. list_group = []
  75. _begin = 0
  76. for i in range(len(list_dict)-1):
  77. if abs(list_dict[i][sort_key]-list_dict[i+1][sort_key])<timedelta:
  78. continue
  79. else:
  80. _group = []
  81. for j in range(_begin,i+1):
  82. _group.append(list_dict[j])
  83. if len(_group)>1:
  84. list_group.append(_group)
  85. _begin = i + 1
  86. if len(list_dict)>1:
  87. _group = []
  88. for j in range(_begin,len(list_dict)):
  89. _group.append(list_dict[j])
  90. if len(_group)>0:
  91. list_group.append(_group)
  92. return list_group
  93. return [list_dict]
  94. @annotate('bigint,bigint,string,string,string,string,string,string,bigint->string')
  95. class f_merge_rule_limit_num_contain_greater(BaseUDAF):
  96. '''
  97. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同
  98. '''
  99. def __init__(self):
  100. import logging
  101. import json,re
  102. global json,logging,re
  103. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  104. def new_buffer(self):
  105. return [list()]
  106. def iterate(self, buffer,docid,page_time_stamp,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column,greater_column,MAX_NUM):
  107. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"set_limit_column1":set_limit_column1,
  108. "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,
  109. "contain_column":contain_column,"greater_column":greater_column,"MAX_NUM":MAX_NUM})
  110. def merge(self, buffer, pbuffer):
  111. buffer[0].extend(pbuffer[0])
  112. def terminate(self, buffer):
  113. MAX_NUM = 5
  114. if len(buffer[0])>0:
  115. MAX_NUM = buffer[0][0]["MAX_NUM"]
  116. list_split = split_with_time(buffer[0],"page_time_stamp")
  117. list_group = []
  118. for _split in list_split:
  119. flag = True
  120. keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"]
  121. dict_set = {}
  122. for _key in keys:
  123. dict_set[_key] = set()
  124. if len(_split)>MAX_NUM:
  125. flag = False
  126. else:
  127. for _key in keys:
  128. logging.info(_key+str(getSet(_split,_key)))
  129. if len(getSet(_split,_key))>1:
  130. flag = False
  131. break
  132. MAX_CONTAIN_COLUMN = None
  133. #判断组内每条公告是否包含
  134. if flag:
  135. for _d in _split:
  136. contain_column = _d["contain_column"]
  137. if contain_column is not None and contain_column !="":
  138. if MAX_CONTAIN_COLUMN is None:
  139. MAX_CONTAIN_COLUMN = contain_column
  140. else:
  141. if len(MAX_CONTAIN_COLUMN)<len(contain_column):
  142. if contain_column.find(MAX_CONTAIN_COLUMN)==-1:
  143. flag = False
  144. break
  145. MAX_CONTAIN_COLUMN = contain_column
  146. else:
  147. if MAX_CONTAIN_COLUMN.find(contain_column)==-1:
  148. flag = False
  149. break
  150. if len(getSet(_split,"greater_column"))==1:
  151. flag = False
  152. break
  153. if flag:
  154. _set_docid = set()
  155. for item in _split:
  156. _set_docid.add(item["docid"])
  157. if len(_set_docid)>1:
  158. list_group.append(list(_set_docid))
  159. return json.dumps(list_group)
  160. def getDiffIndex(list_dict,key):
  161. _set = set()
  162. for _i in range(len(list_dict)):
  163. item = list_dict[_i]
  164. if key in item:
  165. if item[key]!='' and item[key] is not None:
  166. if re.search("^\d[\d\.]*$",item[key]) is not None:
  167. _set.add(str(float(item[key])))
  168. else:
  169. _set.add(str(item[key]))
  170. if len(_set)>1:
  171. return _i
  172. return len(list_dict)
  173. @annotate('bigint,bigint,string,string,string,string,string,string,string,bigint->string')
  174. class f_remege_limit_num_contain(BaseUDAF):
  175. '''
  176. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同
  177. '''
  178. def __init__(self):
  179. import logging
  180. import json,re
  181. global json,logging,re
  182. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  183. def new_buffer(self):
  184. return [list()]
  185. def iterate(self, buffer,docid,page_time_stamp,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column1,contain_column2,notLike_column,confidence):
  186. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"set_limit_column1":set_limit_column1,
  187. "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,
  188. "contain_column1":contain_column1,"contain_column2":contain_column2,"notLike_column":notLike_column,"confidence":confidence})
  189. def merge(self, buffer, pbuffer):
  190. buffer[0].extend(pbuffer[0])
  191. def getNotLikeSet(self,_dict,column_name):
  192. column_value = _dict.get(column_name,None)
  193. _set = set()
  194. if column_value is not None:
  195. for _i in range(1,len(column_value)):
  196. _set.add(column_value[_i-1:_i+1])
  197. _dict["notLike_set"] = _set
  198. def getSimilarity(self,_set1,_set2):
  199. _sum = max([1,min([len(_set1),len(_set2)])])
  200. return len(_set1&_set2)/_sum
  201. def terminate(self, buffer):
  202. list_group = []
  203. the_group = buffer[0]
  204. SIM_PROB = 0.6
  205. for _d in the_group:
  206. self.getNotLikeSet(_d,"notLike_column")
  207. #判断多个值与否
  208. keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"]
  209. re_merge = False
  210. for _key in keys:
  211. if len(getSet(the_group,_key))>1:
  212. re_merge = True
  213. break
  214. #判断是否相似而不相同
  215. re_merge_sim = False
  216. for _i1 in range(0,len(the_group)):
  217. for _j1 in range(_i1+1,len(the_group)):
  218. _set1 = the_group[_i1]["notLike_set"]
  219. _set2 = the_group[_j1]["notLike_set"]
  220. _sim = self.getSimilarity(_set1,_set2)
  221. if _sim>SIM_PROB and _sim<1:
  222. re_merge_sim = True
  223. break
  224. contain_keys = ["contain_column1","contain_column2"]
  225. logging.info(the_group)
  226. logging.info(str(re_merge)+str(re_merge_sim))
  227. if re_merge or re_merge_sim:
  228. the_group.sort(key=lambda x:x["confidence"],reverse=True)
  229. the_group.sort(key=lambda x:x["page_time_stamp"])
  230. #重新成组
  231. dict_docid_doc = {}
  232. for _doc in the_group:
  233. dict_docid_doc[_doc["docid"]] = _doc
  234. for _doc in the_group:
  235. merge_flag = False
  236. for _index in range(len(list_group)):
  237. _g = list_group[_index]
  238. hit_count = 0
  239. dict_temp = dict()
  240. #多个值的异常
  241. if re_merge:
  242. for _c_key in contain_keys:
  243. dict_temp[_c_key] = _g[_c_key]
  244. if _g[_c_key] is not None and _doc[_c_key] is not None:
  245. if len(_g[_c_key])>len(_doc[_c_key]):
  246. if str(_g[_c_key]).find(str(_doc[_c_key]))>=0:
  247. dict_temp[_c_key] = _g[_c_key]
  248. hit_count += 1
  249. else:
  250. if str(_doc[_c_key]).find(str(_g[_c_key]))>=0:
  251. dict_temp[_c_key] = _doc[_c_key]
  252. _g[_c_key] = _doc[_c_key]
  253. hit_count += 1
  254. else:
  255. hit_count = 1
  256. # if hit_count==len(contain_keys):
  257. if hit_count>0:
  258. _flag_sim = False
  259. #相似而不相同的异常
  260. if re_merge_sim:
  261. for _docid in _g["docid"]:
  262. tmp_d = dict_docid_doc[_docid]
  263. _sim = self.getSimilarity(tmp_d["notLike_set"],_doc["notLike_set"])
  264. if _sim>SIM_PROB and _sim<1:
  265. _flag_sim = True
  266. if not _flag_sim:
  267. for _c_key in dict_temp.keys():
  268. _g[_c_key] = dict_temp[_c_key]
  269. _g["docid"].append(_doc["docid"])
  270. merge_flag = True
  271. break
  272. if not merge_flag:
  273. _dict = dict()
  274. _dict["docid"] = [_doc["docid"]]
  275. for _c_key in contain_keys:
  276. _dict[_c_key] = _doc[_c_key]
  277. list_group.append(_dict)
  278. final_group = []
  279. #判断是否符合一个值
  280. for _group in list_group:
  281. _split = []
  282. for _docid in _group["docid"]:
  283. _split.append(dict_docid_doc[_docid])
  284. #通过置信度排序,尽可能保留组
  285. _split.sort(key=lambda x:x["confidence"],reverse=True)
  286. #置信度
  287. list_key_index = []
  288. for _k in keys:
  289. list_key_index.append(getDiffIndex(_split,_k))
  290. _index = min(list_key_index)
  291. final_group.append([_c["docid"] for _c in _split[:_index]])
  292. for _c in _split[_index:]:
  293. final_group.append([_c["docid"]])
  294. #若是找到两个以上,则全部单独成组,否则成一组
  295. # _flag = True
  296. # for _key in keys:
  297. # if len(getSet(_split,_key))>1:
  298. # _flag = False
  299. # break
  300. # if not _flag:
  301. # for _docid in _group["docid"]:
  302. # final_group.append([_docid])
  303. # else:
  304. # final_group.append(list(set(_group["docid"])))
  305. else:
  306. final_group = [list(set([item["docid"] for item in the_group]))]
  307. log(str(final_group))
  308. return json.dumps(final_group)
  309. def getCurrent_date(format="%Y-%m-%d %H:%M:%S"):
  310. _time = time.strftime(format,time.localtime())
  311. return _time
  312. @annotate('bigint->string')
  313. class f_get_single_merged_bychannel(BaseUDTF):
  314. def process(self,docid):
  315. _d = {"data":{str(docid):[]},"process_time":getCurrent_date()}
  316. self.forward(json.dumps(_d))
  317. @annotate('bigint,bigint,bigint,string,string,string,string,string,string,string,bigint,bigint,string->string')
  318. class f_remege_limit_num_contain_bychannel(BaseUDAF):
  319. '''f_remege_limit_num_contain_bychannel
  320. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同
  321. '''
  322. def __init__(self):
  323. import logging
  324. import json,re
  325. global json,logging,re
  326. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  327. def new_buffer(self):
  328. return [list()]
  329. def iterate(self, buffer,docid,docchannel,page_time_stamp,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column1,contain_column2,notLike_column,confidence,extract_count,json_dicttime):
  330. _dict = {"docid":docid,"docchannel":docchannel,"page_time_stamp":page_time_stamp,"set_limit_column1":set_limit_column1,
  331. "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,
  332. "contain_column1":contain_column1,"contain_column2":contain_column2,"notLike_column":notLike_column,"confidence":confidence,
  333. "extract_count":extract_count,"json_dicttime":json_dicttime}
  334. buffer[0].append(_dict)
  335. def merge(self, buffer, pbuffer):
  336. buffer[0].extend(pbuffer[0])
  337. def getNotLikeSet(self,_dict,column_name):
  338. column_value = _dict.get(column_name,None)
  339. _set = set()
  340. if column_value is not None:
  341. for _i in range(1,len(column_value)):
  342. _set.add(column_value[_i-1:_i+1])
  343. _dict["notLike_set"] = _set
  344. def getSimilarity(self,_set1,_set2):
  345. _sum = max([1,min([len(_set1),len(_set2)])])
  346. return len(_set1&_set2)/_sum
  347. def difftimecount(self,_dict1,_dict2):
  348. _count = 0
  349. for k,v in _dict1.items():
  350. if v is not None and v!="":
  351. v1 = _dict2.get(k)
  352. if v1 is not None and v1!="":
  353. if v!=v1:
  354. _count += 1
  355. return _count
  356. def splitByTimezone(self,list_dict,_key):
  357. cluster_docid = []
  358. dict_docid_key = {}
  359. dict_docid = {}
  360. for _dict in list_dict:
  361. if _dict.get(_key,"") is None or _dict.get(_key,"")=="":
  362. dict_docid_key[_dict.get("docid")] = {}
  363. else:
  364. dict_docid_key[_dict.get("docid")] = json.loads(_dict.get(_key))
  365. dict_docid[_dict.get("docid")] = _dict
  366. for _dict in list_dict:
  367. _find = False
  368. for _cl in cluster_docid:
  369. _legal = True
  370. for _c in _cl:
  371. if self.difftimecount(dict_docid_key.get(_c),dict_docid_key.get(_dict.get("docid")))>0:
  372. _legal = False
  373. break
  374. if _legal:
  375. _cl.append(_dict.get("docid"))
  376. _find = True
  377. if not _find:
  378. cluster_docid.append([_dict.get("docid")])
  379. _result = []
  380. for _cl in cluster_docid:
  381. _r = []
  382. for _c in _cl:
  383. _r.append(dict_docid.get(_c))
  384. _result.append(_r)
  385. return _result
  386. def terminate(self, buffer):
  387. list_group = []
  388. the_group = buffer[0]
  389. SIM_PROB = 0.6
  390. for _d in the_group:
  391. self.getNotLikeSet(_d,"notLike_column")
  392. #判断多个值与否
  393. keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"]
  394. re_merge = False
  395. for _key in keys:
  396. if len(getSet(the_group,_key))>1:
  397. re_merge = True
  398. break
  399. #判断是否相似而不相同
  400. re_merge_sim = False
  401. for _i1 in range(0,len(the_group)):
  402. for _j1 in range(_i1+1,len(the_group)):
  403. _set1 = the_group[_i1]["notLike_set"]
  404. _set2 = the_group[_j1]["notLike_set"]
  405. _sim = self.getSimilarity(_set1,_set2)
  406. if _sim>SIM_PROB and _sim<1:
  407. re_merge_sim = True
  408. break
  409. contain_keys = ["contain_column1","contain_column2"]
  410. logging.info(the_group)
  411. logging.info(str(re_merge)+str(re_merge_sim))
  412. #重新成组
  413. dict_docid_doc = {}
  414. for _doc in the_group:
  415. dict_docid_doc[_doc["docid"]] = _doc
  416. if re_merge or re_merge_sim:
  417. the_group.sort(key=lambda x:x["confidence"],reverse=True)
  418. the_group.sort(key=lambda x:x["page_time_stamp"])
  419. for _doc in the_group:
  420. merge_flag = False
  421. for _index in range(len(list_group)):
  422. _g = list_group[_index]
  423. hit_count = 0
  424. dict_temp = dict()
  425. #多个值的异常
  426. if re_merge:
  427. for _c_key in contain_keys:
  428. dict_temp[_c_key] = _g[_c_key]
  429. if _g[_c_key] is not None and _doc[_c_key] is not None:
  430. if len(_g[_c_key])>len(_doc[_c_key]):
  431. if str(_g[_c_key]).find(str(_doc[_c_key]))>=0:
  432. dict_temp[_c_key] = _g[_c_key]
  433. hit_count += 1
  434. else:
  435. if str(_doc[_c_key]).find(str(_g[_c_key]))>=0:
  436. dict_temp[_c_key] = _doc[_c_key]
  437. _g[_c_key] = _doc[_c_key]
  438. hit_count += 1
  439. else:
  440. hit_count = 1
  441. # if hit_count==len(contain_keys):
  442. if hit_count>0:
  443. _flag_sim = False
  444. #相似而不相同的异常
  445. if re_merge_sim:
  446. for _docid in _g["docid"]:
  447. tmp_d = dict_docid_doc[_docid]
  448. _sim = self.getSimilarity(tmp_d["notLike_set"],_doc["notLike_set"])
  449. if _sim>SIM_PROB and _sim<1:
  450. _flag_sim = True
  451. if not _flag_sim:
  452. for _c_key in dict_temp.keys():
  453. _g[_c_key] = dict_temp[_c_key]
  454. _g["docid"].append(_doc["docid"])
  455. merge_flag = True
  456. break
  457. if not merge_flag:
  458. _dict = dict()
  459. _dict["docid"] = [_doc["docid"]]
  460. for _c_key in contain_keys:
  461. _dict[_c_key] = _doc[_c_key]
  462. list_group.append(_dict)
  463. final_group = []
  464. #判断是否符合一个值
  465. for _group in list_group:
  466. _split = []
  467. for _docid in _group["docid"]:
  468. _split.append(dict_docid_doc[_docid])
  469. #通过置信度排序,尽可能保留组
  470. _split.sort(key=lambda x:x["confidence"],reverse=True)
  471. #置信度
  472. list_key_index = []
  473. for _k in keys:
  474. list_key_index.append(getDiffIndex(_split,_k))
  475. _index = min(list_key_index)
  476. final_group.append([_c["docid"] for _c in _split[:_index]])
  477. for _c in _split[_index:]:
  478. final_group.append([_c["docid"]])
  479. #若是找到两个以上,则全部单独成组,否则成一组
  480. # _flag = True
  481. # for _key in keys:
  482. # if len(getSet(_split,_key))>1:
  483. # _flag = False
  484. # break
  485. # if not _flag:
  486. # for _docid in _group["docid"]:
  487. # final_group.append([_docid])
  488. # else:
  489. # final_group.append(list(set(_group["docid"])))
  490. else:
  491. final_group = [list(set([item["docid"] for item in the_group]))]
  492. log(str(final_group))
  493. #每个channel选择一篇公告
  494. final_group_channel = []
  495. for _group in final_group:
  496. dict_channel_id = {}
  497. otherChannel = 10000
  498. for _docid in _group:
  499. _channel = dict_docid_doc[_docid].get("docchannel")
  500. if _channel in [114,115,116,117]:
  501. otherChannel += 1
  502. _channel = otherChannel
  503. if _channel not in dict_channel_id:
  504. dict_channel_id[_channel] = []
  505. dict_channel_id[_channel].append({"docid":_docid,"page_time_stamp":dict_docid_doc[_docid].get("page_time_stamp"),
  506. "extract_count":dict_docid_doc[_docid].get("extract_count"),
  507. "json_dicttime":dict_docid_doc[_docid].get("json_dicttime")})
  508. #根据日期进行切分
  509. new_dict_channel_id = {}
  510. print(dict_channel_id)
  511. for k,v in dict_channel_id.items():
  512. list_time_docids = split_with_time(v,"page_time_stamp",86400*6)
  513. print(list_time_docids)
  514. for _l in list_time_docids:
  515. list_t = self.splitByTimezone(_l,"json_dicttime")
  516. for _t in list_t:
  517. otherChannel += 1
  518. new_dict_channel_id[otherChannel] = _t
  519. print(new_dict_channel_id)
  520. channel_dict = {}
  521. for k,v in new_dict_channel_id.items():
  522. v.sort(key=lambda x:x["docid"])
  523. v.sort(key=lambda x:x["extract_count"],reverse=True)
  524. channel_dict[v[0]["docid"]] = []
  525. for _docs in v[1:]:
  526. channel_dict[v[0]["docid"]].append(_docs["docid"])
  527. _d = {"data":channel_dict,"process_time":getCurrent_date()}
  528. final_group_channel.append(_d)
  529. return json.dumps(final_group_channel)
  530. @annotate('string -> string')
  531. class f_get_remerge_group_channel(BaseUDTF):
  532. '''
  533. 将多个组拆解成多条记录
  534. '''
  535. def __init__(self):
  536. import logging
  537. import json
  538. global json,logging
  539. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  540. def process(self,json_remerge):
  541. if json_remerge is not None:
  542. list_group = json.loads(json_remerge)
  543. for _group in list_group:
  544. self.forward(json.dumps(_group))
  545. @annotate('string -> string')
  546. class f_get_remerge_group(BaseUDTF):
  547. '''
  548. 将多个组拆解成多条记录
  549. '''
  550. def __init__(self):
  551. import logging
  552. import json
  553. global json,logging
  554. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  555. def process(self,json_remerge):
  556. if json_remerge is not None:
  557. list_group = json.loads(json_remerge)
  558. for _group in list_group:
  559. l_g = list(set(_group))
  560. l_g.sort(key=lambda x:x)
  561. list_docid = [str(_docid) for _docid in l_g]
  562. self.forward(",".join(list_docid))
  563. @annotate('bigint,bigint,string->string')
  564. class f_merge_probability(BaseUDAF):
  565. '''
  566. 合并组为一条记录
  567. '''
  568. def __init__(self):
  569. import json
  570. global json
  571. def new_buffer(self):
  572. return [[]]
  573. def iterate(self, buffer,docid,page_time_stamp,_type):
  574. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"type":_type})
  575. def merge(self, buffer, pbuffer):
  576. buffer[0].extend(pbuffer[0])
  577. def terminate(self, buffer):
  578. list_dict = buffer[0]
  579. list_dict = list_dict[:10000]
  580. list_group = split_with_time(list_dict,sort_key="page_time_stamp",timedelta=86400*120)
  581. return json.dumps(list_group)
  582. @annotate('string -> bigint,bigint,bigint,bigint,string')
  583. class f_split_merge_probability(BaseUDTF):
  584. def __init__(self):
  585. import logging
  586. import json
  587. global logging,json
  588. logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  589. def process(self,list_group_str):
  590. logging.info("0")
  591. logging.info(list_group_str)
  592. if list_group_str is not None:
  593. logging.info("1")
  594. try:
  595. list_group = json.loads(list_group_str)
  596. logging.info("2")
  597. for _group in list_group:
  598. if len(_group)>0:
  599. _type = _group[0].get("type","")
  600. logging.info("3%d"%len(list_group))
  601. # _group.sort(key=lambda x:x["page_time_stamp"])
  602. _len = min(100,len(_group))
  603. for _index_i in range(_len):
  604. _count = 0
  605. for _index_j in range(_index_i+1,_len):
  606. if abs(_group[_index_j]["page_time_stamp"]-_group[_index_i]["page_time_stamp"])>86400*120:
  607. break
  608. _count += 1
  609. _docid1 = _group[_index_i]["docid"]
  610. _docid2 = _group[_index_j]["docid"]
  611. if _docid1<_docid2:
  612. self.forward(_docid1,_docid2,1,_len,_type)
  613. else:
  614. self.forward(_docid2,_docid1,1,_len,_type)
  615. except Exception as e:
  616. logging(str(e))
  617. @annotate('bigint,bigint,string->string')
  618. class f_merge_groupPairs(BaseUDAF):
  619. '''
  620. 合并组为一条记录
  621. '''
  622. def __init__(self):
  623. import json
  624. global json
  625. def new_buffer(self):
  626. return [[]]
  627. def iterate(self, buffer,is_exists,counts,_type):
  628. buffer[0].append({"is_exists":is_exists,"counts":counts,"_type":_type})
  629. def merge(self, buffer, pbuffer):
  630. buffer[0].extend(pbuffer[0])
  631. def terminate(self, buffer):
  632. list_dict = buffer[0]
  633. list_dict = list_dict[:10000]
  634. return json.dumps(list_dict)
  635. @annotate("string -> bigint,bigint,bigint")
  636. class f_merge_getLabel(BaseUDTF):
  637. def __init__(self):
  638. import logging
  639. import json
  640. global logging,json
  641. def process(self,str_docids):
  642. if str_docids is not None:
  643. list_docids = [int(i) for i in str_docids.split(",")]
  644. list_docids.sort(key=lambda x:x)
  645. _len = min(100,len(list_docids))
  646. for index_i in range(_len):
  647. docid_less = list_docids[index_i]
  648. for index_j in range(index_i+1,_len):
  649. docid_greater = list_docids[index_j]
  650. self.forward(docid_less,docid_greater,1)
  651. def getSimilarityOfString(str1,str2):
  652. _set1 = set()
  653. _set2 = set()
  654. if str1 is not None:
  655. for i in range(1,len(str1)):
  656. _set1.add(str1[i-1:i+1])
  657. if str2 is not None:
  658. for i in range(1,len(str2)):
  659. _set2.add(str2[i-1:i+1])
  660. _len = max(1,min(len(_set1),len(_set2)))
  661. return len(_set1&_set2)/_len
  662. def check_columns(tenderee_less,tenderee_greater,
  663. agency_less,agency_greater,project_code_less,project_code_greater,project_name_less,project_name_greater,
  664. win_tenderer_less,win_tenderer_greater,win_bid_price_less,win_bid_price_greater,
  665. bidding_budget_less,bidding_budget_greater,doctitle_refine_less,doctitle_refine_greater):
  666. flag = True
  667. _set_tenderee = set()
  668. if tenderee_less is not None and tenderee_less!="":
  669. _set_tenderee.add(tenderee_less)
  670. if tenderee_greater is not None and tenderee_greater!="":
  671. _set_tenderee.add(tenderee_greater)
  672. if len(_set_tenderee)>1:
  673. return False
  674. code_sim = getSimilarityOfString(project_code_less,project_code_greater)
  675. if code_sim>0.6 and code_sim<1:
  676. return False
  677. #同批次不同编号
  678. if getLength(project_code_less)>0 and getLength(project_code_greater)>0:
  679. _split_code_less = project_code_less.split("-")
  680. _split_code_greater = project_code_greater.split("-")
  681. if len(_split_code_less)>1 and len(_split_code_greater)>1:
  682. if _split_code_less[0]==_split_code_greater[0] and project_code_less!=project_code_greater:
  683. return False
  684. _set_win_tenderer = set()
  685. if win_tenderer_less is not None and win_tenderer_less!="":
  686. _set_win_tenderer.add(win_tenderer_less)
  687. if win_tenderer_greater is not None and win_tenderer_greater!="":
  688. _set_win_tenderer.add(win_tenderer_greater)
  689. if len(_set_win_tenderer)>1:
  690. return False
  691. _set_win_bid_price = set()
  692. if win_bid_price_less is not None and win_bid_price_less!="":
  693. _set_win_bid_price.add(float(win_bid_price_less))
  694. if win_bid_price_greater is not None and win_bid_price_greater!="":
  695. _set_win_bid_price.add(float(win_bid_price_greater))
  696. if len(_set_win_bid_price)>1:
  697. return False
  698. _set_bidding_budget = set()
  699. if bidding_budget_less is not None and bidding_budget_less!="":
  700. _set_bidding_budget.add(float(bidding_budget_less))
  701. if bidding_budget_greater is not None and bidding_budget_greater!="":
  702. _set_bidding_budget.add(float(bidding_budget_greater))
  703. if len(_set_bidding_budget)>1:
  704. return False
  705. return True
  706. def getSimLevel(str1,str2):
  707. str1_null = False
  708. str2_null = False
  709. _v = 0
  710. if str1 is None or str1=="":
  711. str1_null = True
  712. if str2 is None or str2=="":
  713. str2_null = True
  714. if str1_null and str2_null:
  715. _v = 2
  716. elif str1_null and not str2_null:
  717. _v = 4
  718. elif not str1_null and str2_null:
  719. _v = 6
  720. elif not str1_null and not str2_null:
  721. if str1==str2:
  722. _v = 10
  723. else:
  724. _v = 0
  725. return _v
  726. import math
  727. def featurnCount(_count,max_count=100):
  728. return max(0,min(1,_count))*(1/math.sqrt(max(1,_count-1)))
  729. def getLength(_str):
  730. return len(_str if _str is not None else "")
  731. @annotate("string->bigint")
  732. class f_get_min_counts(object):
  733. def evaluate(self,json_context):
  734. _context = json.loads(json_context)
  735. min_counts = 100
  736. for item in _context:
  737. if item["counts"]<min_counts:
  738. min_counts = item["counts"]
  739. return min_counts
  740. @annotate("string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string->string,double")
  741. class f_merge_featureMatrix(BaseUDTF):
  742. def __init__(self):
  743. import logging
  744. import json
  745. global logging,json
  746. def process(self,json_context,tenderee_less,tenderee_greater,
  747. agency_less,agency_greater,project_code_less,project_code_greater,project_name_less,project_name_greater,
  748. win_tenderer_less,win_tenderer_greater,win_bid_price_less,win_bid_price_greater,
  749. bidding_budget_less,bidding_budget_greater,doctitle_refine_less,doctitle_refine_greater):
  750. if not check_columns(tenderee_less,tenderee_greater,
  751. agency_less,agency_greater,project_code_less,project_code_greater,project_name_less,project_name_greater,
  752. win_tenderer_less,win_tenderer_greater,win_bid_price_less,win_bid_price_greater,
  753. bidding_budget_less,bidding_budget_greater,doctitle_refine_less,doctitle_refine_greater):
  754. return
  755. _context = json.loads(json_context)
  756. min_counts = 100
  757. dict_context = {}
  758. for item in _context:
  759. if item["counts"]<min_counts:
  760. min_counts = item["counts"]
  761. dict_context[item["_type"]] = [item["is_exists"],item["counts"]]
  762. context_key = ["tenderee","agency","project_code","project_name","win_tenderer","win_bid_price","bidding_budget","doctitle_refine"]
  763. list_matrix = []
  764. for index_i in range(len(context_key)):
  765. for index_j in range(index_i+1,len(context_key)):
  766. _key = "%s&%s"%(context_key[index_i],context_key[index_j])
  767. _v = featurnCount(dict_context.get(_key,[0,0])[1])
  768. list_matrix.append(_v)
  769. context3_key = ["tenderee","agency","win_tenderer","win_bid_price","bidding_budget"]
  770. for index_i in range(len(context3_key)):
  771. for index_j in range(index_i+1,len(context3_key)):
  772. for index_k in range(index_j+1,len(context3_key)):
  773. _key = "%s&%s&%s"%(context3_key[index_i],context3_key[index_j],context3_key[index_k])
  774. _v = featurnCount(dict_context.get(_key,[0,0])[1])
  775. list_matrix.append(_v)
  776. list_matrix.append(getSimLevel(tenderee_less,tenderee_greater)/10)
  777. list_matrix.append(getSimLevel(agency_less,agency_greater)/10)
  778. list_matrix.append(getSimilarityOfString(project_code_less,project_code_greater))
  779. list_matrix.append(getSimilarityOfString(project_name_less,project_name_greater))
  780. list_matrix.append(getSimLevel(win_tenderer_less,win_tenderer_greater)/10)
  781. list_matrix.append(getSimLevel(win_bid_price_less,win_bid_price_greater)/10)
  782. list_matrix.append(getSimLevel(bidding_budget_less,bidding_budget_greater)/10)
  783. list_matrix.append(getSimilarityOfString(doctitle_refine_less,doctitle_refine_greater))
  784. # set_tenderer = set()
  785. # if tenderee_less is not None and tenderee_less!="":
  786. # set_tenderer.add(tenderee_less)
  787. # if tenderee_greater is not None and tenderee_greater!="":
  788. # set_tenderer.add(tenderee_greater)
  789. #
  790. # set_win_tenderer = set()
  791. # if win_tenderer_less is not None and win_tenderer_less!="":
  792. # set_win_tenderer.add(win_tenderer_less)
  793. # if win_tenderer_greater is not None and win_tenderer_greater!="":
  794. # set_win_tenderer.add(win_tenderer_greater)
  795. #
  796. # set_bidding_budget = set()
  797. # if bidding_budget_less is not None and bidding_budget_less!="":
  798. # set_bidding_budget.add(bidding_budget_less)
  799. # if bidding_budget_greater is not None and bidding_budget_greater!="":
  800. # set_bidding_budget.add(bidding_budget_greater)
  801. #
  802. # set_win_bid_price = set()
  803. # if win_bid_price_less is not None and win_bid_price_less!="":
  804. # set_win_bid_price.add(win_bid_price_less)
  805. # if win_bid_price_greater is not None and win_bid_price_greater!="":
  806. # set_win_bid_price.add(win_bid_price_greater)
  807. json_matrix = json.dumps(list_matrix)
  808. same_project_code = False
  809. if project_code_less==project_code_greater and getLength(project_code_less)>0:
  810. same_project_code = True
  811. same_project_name = False
  812. if project_name_less==project_name_greater and getLength(project_name_less)>0:
  813. same_project_name = True
  814. same_doctitle_refine = False
  815. if doctitle_refine_less==doctitle_refine_greater and getLength(doctitle_refine_less)>0:
  816. same_doctitle_refine = True
  817. same_tenderee = False
  818. if tenderee_less==tenderee_greater and getLength(tenderee_less)>0:
  819. same_tenderee = True
  820. same_agency = False
  821. if agency_less==agency_greater and getLength(agency_less)>0:
  822. same_agency = True
  823. same_bidding_budget = False
  824. if bidding_budget_less==bidding_budget_greater and getLength(bidding_budget_less)>0:
  825. same_bidding_budget = True
  826. same_win_tenderer = False
  827. if win_tenderer_less==win_tenderer_greater and getLength(win_tenderer_less)>0:
  828. same_win_tenderer = True
  829. same_win_bid_price = False
  830. if win_bid_price_less==win_bid_price_greater and getLength(win_bid_price_less)>0:
  831. same_win_bid_price = True
  832. contain_doctitle = False
  833. if getLength(doctitle_refine_less)>0 and getLength(doctitle_refine_greater)>0 and (doctitle_refine_less in doctitle_refine_greater or doctitle_refine_greater in doctitle_refine_less):
  834. contain_doctitle = True
  835. contain_project_name = False
  836. if getLength(project_name_less)>0 and getLength(project_name_greater)>0 and (project_name_less in project_name_greater or project_name_greater in project_name_less):
  837. contain_project_name = True
  838. total_money_less = 0 if getLength(bidding_budget_less)==0 else float(bidding_budget_less)+0 if getLength(win_bid_price_less)==0 else float(win_bid_price_less)
  839. total_money_greater = 0 if getLength(bidding_budget_greater)==0 else float(bidding_budget_greater) +0 if getLength(win_bid_price_greater)==0 else float(win_bid_price_greater)
  840. if min_counts<10:
  841. _prob = 0.9
  842. if same_project_code and same_win_tenderer and same_tenderee:
  843. self.forward(json_matrix,_prob)
  844. return
  845. if same_tenderee and same_project_name and same_win_tenderer:
  846. self.forward(json_matrix,_prob)
  847. return
  848. if same_tenderee and same_doctitle_refine and same_win_tenderer:
  849. self.forward(json_matrix,_prob)
  850. return
  851. if same_tenderee and same_win_bid_price and same_win_tenderer:
  852. self.forward(json_matrix,_prob)
  853. return
  854. if same_project_code and same_win_bid_price and same_win_tenderer:
  855. self.forward(json_matrix,_prob)
  856. return
  857. if same_project_name and same_win_bid_price and same_win_tenderer:
  858. self.forward(json_matrix,_prob)
  859. return
  860. if same_doctitle_refine and same_win_bid_price and same_win_tenderer:
  861. self.forward(json_matrix,_prob)
  862. return
  863. if same_doctitle_refine and same_bidding_budget and same_win_tenderer:
  864. self.forward(json_matrix,_prob)
  865. return
  866. if same_tenderee and same_doctitle_refine and same_win_tenderer:
  867. self.forward(json_matrix,_prob)
  868. return
  869. if same_tenderee and same_project_code and same_project_name:
  870. self.forward(json_matrix,_prob)
  871. return
  872. if same_tenderee and same_project_code and same_doctitle_refine:
  873. self.forward(json_matrix,_prob)
  874. return
  875. if same_tenderee and same_bidding_budget and same_project_code:
  876. self.forward(json_matrix,_prob)
  877. return
  878. if same_tenderee and same_bidding_budget and same_doctitle_refine:
  879. self.forward(json_matrix,_prob)
  880. return
  881. if same_tenderee and same_bidding_budget and same_project_name:
  882. self.forward(json_matrix,_prob)
  883. return
  884. if same_doctitle_refine and same_project_code and same_project_name:
  885. self.forward(json_matrix,_prob)
  886. return
  887. if min_counts<=5:
  888. _prob = 0.8
  889. if same_project_code and same_tenderee:
  890. self.forward(json_matrix,_prob)
  891. return
  892. if same_project_code and same_win_tenderer:
  893. self.forward(json_matrix,_prob)
  894. return
  895. if same_project_name and same_project_code:
  896. self.forward(json_matrix,_prob)
  897. return
  898. if same_project_code and same_doctitle_refine:
  899. self.forward(json_matrix,_prob)
  900. return
  901. if total_money_less==total_money_greater and total_money_less>100000:
  902. if same_win_tenderer and (same_win_bid_price or same_bidding_budget):
  903. self.forward(json_matrix,_prob)
  904. return
  905. if same_project_code and same_bidding_budget:
  906. self.forward(json_matrix,_prob)
  907. return
  908. if same_project_code and same_win_bid_price:
  909. self.forward(json_matrix,_prob)
  910. return
  911. if same_bidding_budget and same_win_bid_price and (contain_project_name or contain_doctitle):
  912. self.forward(json_matrix,_prob)
  913. return
  914. if min_counts<=3:
  915. _prob = 0.7
  916. if same_project_name or same_project_code or same_doctitle_refine or contain_doctitle or contain_project_name:
  917. self.forward(json_matrix,_prob)
  918. return
  919. self.forward(json_matrix,0)
  920. class MergePredictor():
  921. def __init__(self):
  922. self.input_size = 46
  923. self.output_size = 2
  924. self.matrix = np.array([[-5.817399024963379, 3.367797374725342], [-18.3098201751709, 17.649206161499023], [-7.115952014923096, 9.236002922058105], [-5.054129123687744, 1.8316771984100342], [6.391637325286865, -7.57396125793457], [-2.8721542358398438, 6.826520919799805], [-5.426159858703613, 10.235260009765625], [-4.240962982177734, -0.32092899084091187], [-0.6378090381622314, 0.4834124445915222], [-1.7574478387832642, -0.17846578359603882], [4.325063228607178, -2.345501661300659], [0.6086963415145874, 0.8325914740562439], [2.5674285888671875, 1.8432368040084839], [-11.195490837097168, 17.4630184173584], [-11.334247589111328, 10.294097900390625], [2.639320135116577, -8.072785377502441], [-2.2689898014068604, -3.6194612979888916], [-11.129570960998535, 18.907018661499023], [4.526485919952393, 4.57423210144043], [-3.170452356338501, -1.3847776651382446], [-0.03280467540025711, -3.0471489429473877], [-6.601675510406494, -10.05613899230957], [-2.9116673469543457, 4.819308280944824], [1.4398306608200073, -0.6549674272537231], [7.091512203216553, -0.142232745885849], [-0.14478975534439087, 0.06628061085939407], [-6.775437831878662, 9.279582023620605], [-0.006781991105526686, 1.6472798585891724], [3.83730149269104, 1.4072834253311157], [1.2229349613189697, -2.1653425693511963], [1.445560336112976, -0.8397432565689087], [-11.325132369995117, 11.231744766235352], [2.3229124546051025, -4.623719215393066], [0.38562265038490295, -1.2645516395568848], [-1.3670002222061157, 2.4323790073394775], [-3.6994268894195557, 0.7515658736228943], [-0.11617227643728256, -0.820703387260437], [4.089913368225098, -4.693605422973633], [-0.4959050714969635, 1.5272167921066284], [-2.7135870456695557, -0.5120691657066345], [0.573157548904419, -1.9375460147857666], [-4.262857437133789, 0.6375582814216614], [-1.8825865983963013, 2.427532911300659], [-4.565115451812744, 4.0269083976745605], [-4.339804649353027, 6.754288196563721], [-4.31907320022583, 0.28193211555480957]])
  925. self.bias = np.array([16.79706382751465, -13.713337898254395])
  926. # self.model = load_model("model/merge.h5",custom_objects={"precision":precision,"recall":recall,"f1_score":f1_score})
  927. def activation(self,vec,_type):
  928. if _type=="relu":
  929. _vec = np.array(vec)
  930. return _vec*(_vec>0)
  931. if _type=="tanh":
  932. return np.tanh(vec)
  933. if _type=="softmax":
  934. _vec = np.array(vec)
  935. _exp = np.exp(_vec)
  936. return _exp/np.sum(_exp)
  937. def predict(self,input):
  938. _out = self.activation(self.activation(np.matmul(np.array(input).reshape(-1,self.input_size),self.matrix)+self.bias,"tanh"),"softmax")
  939. # print(self.model.predict(np.array(input).reshape(-1,46)))
  940. return _out
  941. @annotate('string,double -> double')
  942. class f_getMergeProb(BaseUDTF):
  943. def __init__(self):
  944. import json
  945. include_package_path("numpy-1.18.zip")
  946. import numpy as np
  947. global json,np
  948. self.mp = MergePredictor()
  949. def process(self,json_matrix,pre_prob):
  950. if not pre_prob>0.5:
  951. _matrix = json.loads(json_matrix)
  952. _prob = self.mp.predict(_matrix)[0][1]
  953. else:
  954. _prob = pre_prob
  955. if _prob>0.5:
  956. self.forward(float(_prob))
  957. @annotate('string -> bigint,bigint')
  958. class f_check_remerge_channel(BaseUDTF):
  959. '''
  960. 将多个组拆解成多条记录
  961. '''
  962. def __init__(self):
  963. import logging
  964. import json
  965. global json,logging
  966. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  967. def process(self,json_remerge):
  968. if json_remerge is not None:
  969. list_group = json.loads(json_remerge)
  970. for _group in list_group:
  971. _keys = _group.get("data").keys()
  972. if len(_keys)>0:
  973. main_docid = int(list(_keys)[0])
  974. for k,v in _group.get("data",{}).items():
  975. self.forward(main_docid,int(k))
  976. for _v in v:
  977. self.forward(main_docid,int(_v))
  978. @annotate('string -> bigint,bigint')
  979. class f_check_remerge(BaseUDTF):
  980. '''
  981. 将多个组拆解成多条记录
  982. '''
  983. def __init__(self):
  984. import logging
  985. import json
  986. global json,logging
  987. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  988. def process(self,json_remerge):
  989. if json_remerge is not None:
  990. list_group = json.loads(json_remerge)
  991. for _group in list_group:
  992. for _docid in _group:
  993. self.forward(_group[-1],_docid)
  994. def getConfidence(rule_id):
  995. if rule_id >=1 and rule_id <=20:
  996. return 30
  997. elif rule_id>=31 and rule_id<=50:
  998. return 20
  999. else:
  1000. return 10
  1001. @annotate('string,bigint -> bigint,bigint,bigint')
  1002. class f_arrange_group_single(BaseUDTF):
  1003. '''
  1004. 将多个组拆解成多条记录
  1005. '''
  1006. def __init__(self):
  1007. import logging
  1008. import json
  1009. global json,logging
  1010. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  1011. def process(self,json_set_docid,rule_id):
  1012. if json_set_docid is not None:
  1013. list_group = json.loads(json_set_docid)
  1014. for _group in list_group:
  1015. for index_i in range(len(_group)):
  1016. for index_j in range(len(_group)):
  1017. # if index_i!=index_j and _group[index_i]!=_group[index_j]:
  1018. if index_i!=index_j:
  1019. self.forward(_group[index_i],_group[index_j],getConfidence(rule_id))
  1020. @annotate('bigint,bigint->string')
  1021. class f_get_merge_docids(BaseUDAF):
  1022. '''
  1023. 合并组为一条记录
  1024. '''
  1025. def __init__(self):
  1026. import json
  1027. global json
  1028. def new_buffer(self):
  1029. return [set()]
  1030. def iterate(self, buffer,docid1,docid2):
  1031. buffer[0].add(docid1)
  1032. buffer[0].add(docid2)
  1033. def merge(self, buffer, pbuffer):
  1034. buffer[0] |= pbuffer[0]
  1035. def terminate(self, buffer):
  1036. set_docid = buffer[0]
  1037. list_docid = list(set_docid)
  1038. list_docid.sort(key=lambda x:x)
  1039. list_docid_str = []
  1040. for _docid in list_docid:
  1041. list_docid_str.append(str(_docid))
  1042. return ",".join(list_docid_str)
  1043. @annotate("string,string,string,string,string,string,string,string,string,string,string,string,string,string->string")
  1044. class f_encode_time(object):
  1045. def evaluate(self,time_bidclose,time_bidopen,time_bidstart,time_commencement,time_completion,time_earnest_money_end,time_earnest_money_start,time_get_file_end,time_get_file_start,time_publicity_end,time_publicity_start,time_registration_end,time_registration_start,time_release):
  1046. _dict = {"time_bidclose":time_bidclose,"time_bidopen":time_bidopen,"time_bidstart":time_bidstart,
  1047. "time_commencement":time_commencement,"time_completion":time_completion,"time_earnest_money_end":time_earnest_money_end,
  1048. "time_earnest_money_start":time_earnest_money_start,"time_get_file_end":time_get_file_end,"time_get_file_start":time_get_file_start,
  1049. "time_publicity_end":time_publicity_end,"time_publicity_start":time_publicity_start,"time_registration_end":time_registration_end,
  1050. "time_registration_start":time_registration_start,"time_release":time_release}
  1051. _encode = json.dumps(_dict)
  1052. return _encode
  1053. if __name__ == '__main__':
  1054. a = f_remege_limit_num_contain_bychannel()
  1055. buffer = a.new_buffer()
  1056. a.iterate(buffer,1,1,86400*1,"1","1","1","1","1","1","1",5,5,None)
  1057. a.iterate(buffer,3,1,86400*4,"1","1","1","1","1","1","1",5,5,'{"a":"dbb"}')
  1058. a.iterate(buffer,5,1,86400*10,"1","1","1","1","1","1","1",5,5,"{}")
  1059. print(a.terminate(buffer))
  1060. print(1)