documentMerge.py 59 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275
  1. #coding:UTF8
  2. from odps.udf import annotate
  3. from odps.distcache import get_cache_archive
  4. from odps.distcache import get_cache_file
  5. from odps.udf import BaseUDTF,BaseUDAF
  6. import threading
  7. import logging
  8. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  9. import time
  10. import json
  11. def log(msg):
  12. logging.info(msg)
  13. # 配置pandas依赖包
  14. def include_package_path(res_name):
  15. import os, sys
  16. archive_files = get_cache_archive(res_name)
  17. dir_names = sorted([os.path.dirname(os.path.normpath(f.name)) for f in archive_files
  18. if '.dist_info' not in f.name], key=lambda v: len(v))
  19. _path = dir_names[0].split(".zip/files")[0]+".zip/files"
  20. log("add path:%s"%(_path))
  21. sys.path.append(_path)
  22. return os.path.dirname(dir_names[0])
  23. # 可能出现类似RuntimeError: xxx has been blocked by sandbox
  24. # 这是因为包含C的库,会被沙盘block,可设置set odps.isolation.session.enable = true
  25. def include_file(file_name):
  26. import os, sys
  27. so_file = get_cache_file(file_name)
  28. sys.path.append(os.path.dirname(os.path.abspath(so_file.name)))
  29. def include_so(file_name):
  30. import os, sys
  31. so_file = get_cache_file(file_name)
  32. with open(so_file.name, 'rb') as fp:
  33. content=fp.read()
  34. so = open(file_name, "wb")
  35. so.write(content)
  36. so.flush()
  37. so.close()
  38. #初始化业务数据包,由于上传限制,python版本以及archive解压包不统一等各种问题,需要手动导入
  39. def init_env(list_files,package_name):
  40. import os,sys
  41. if len(list_files)==1:
  42. so_file = get_cache_file(list_files[0])
  43. cmd_line = os.path.abspath(so_file.name)
  44. os.system("unzip -o %s -d %s"%(cmd_line,package_name))
  45. elif len(list_files)>1:
  46. cmd_line = "cat"
  47. for _file in list_files:
  48. so_file = get_cache_file(_file)
  49. cmd_line += " "+os.path.abspath(so_file.name)
  50. cmd_line += " > temp.zip"
  51. os.system(cmd_line)
  52. os.system("unzip -o temp.zip -d %s"%(package_name))
  53. # os.system("rm -rf %s/*.dist-info"%(package_name))
  54. # return os.listdir(os.path.abspath("local_package"))
  55. # os.system("echo export LD_LIBRARY_PATH=%s >> ~/.bashrc"%(os.path.abspath("local_package")))
  56. # os.system("source ~/.bashrc")
  57. sys.path.insert(0,os.path.abspath(package_name))
  58. # sys.path.append(os.path.join(os.path.abspath("local_package"),"interface_real"))
  59. import platform
  60. def getSet(list_dict,key):
  61. _set = set()
  62. for item in list_dict:
  63. if key in item:
  64. if item[key]!='' and item[key] is not None:
  65. if re.search("^[\d\.]+$",item[key]) is not None:
  66. _set.add(str(float(item[key])))
  67. else:
  68. _set.add(str(item[key]))
  69. return _set
  70. def split_with_time(list_dict,sort_key,timedelta=86400*120,more_than_one=True):
  71. group_num = 1
  72. if more_than_one:
  73. group_num = 2
  74. if len(list_dict)>0:
  75. if sort_key in list_dict[0]:
  76. list_dict.sort(key=lambda x:x[sort_key])
  77. list_group = []
  78. _begin = 0
  79. for i in range(len(list_dict)-1):
  80. if abs(list_dict[i][sort_key]-list_dict[i+1][sort_key])<timedelta:
  81. continue
  82. else:
  83. _group = []
  84. for j in range(_begin,i+1):
  85. _group.append(list_dict[j])
  86. if len(_group)>1:
  87. list_group.append(_group)
  88. _begin = i + 1
  89. if len(list_dict)>=group_num:
  90. _group = []
  91. for j in range(_begin,len(list_dict)):
  92. _group.append(list_dict[j])
  93. if len(_group)>0:
  94. list_group.append(_group)
  95. return list_group
  96. return [list_dict]
  97. @annotate('bigint,bigint,string,string,string,string,string,string,bigint->string')
  98. class f_merge_rule_limit_num_contain_greater(BaseUDAF):
  99. '''
  100. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同
  101. '''
  102. def __init__(self):
  103. import logging
  104. import json,re
  105. global json,logging,re
  106. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  107. def new_buffer(self):
  108. return [list()]
  109. def iterate(self, buffer,docid,page_time_stamp,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column,greater_column,MAX_NUM):
  110. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"set_limit_column1":set_limit_column1,
  111. "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,
  112. "contain_column":contain_column,"greater_column":greater_column,"MAX_NUM":MAX_NUM})
  113. def merge(self, buffer, pbuffer):
  114. buffer[0].extend(pbuffer[0])
  115. def terminate(self, buffer):
  116. MAX_NUM = 5
  117. if len(buffer[0])>0:
  118. MAX_NUM = buffer[0][0]["MAX_NUM"]
  119. list_split = split_with_time(buffer[0],"page_time_stamp")
  120. list_group = []
  121. for _split in list_split:
  122. flag = True
  123. keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"]
  124. dict_set = {}
  125. for _key in keys:
  126. dict_set[_key] = set()
  127. if len(_split)>MAX_NUM:
  128. flag = False
  129. else:
  130. for _key in keys:
  131. logging.info(_key+str(getSet(_split,_key)))
  132. if len(getSet(_split,_key))>1:
  133. flag = False
  134. break
  135. MAX_CONTAIN_COLUMN = None
  136. #判断组内每条公告是否包含
  137. if flag:
  138. for _d in _split:
  139. contain_column = _d["contain_column"]
  140. if contain_column is not None and contain_column !="":
  141. if MAX_CONTAIN_COLUMN is None:
  142. MAX_CONTAIN_COLUMN = contain_column
  143. else:
  144. if len(MAX_CONTAIN_COLUMN)<len(contain_column):
  145. if contain_column.find(MAX_CONTAIN_COLUMN)==-1:
  146. flag = False
  147. break
  148. MAX_CONTAIN_COLUMN = contain_column
  149. else:
  150. if MAX_CONTAIN_COLUMN.find(contain_column)==-1:
  151. flag = False
  152. break
  153. if len(getSet(_split,"greater_column"))==1:
  154. flag = False
  155. break
  156. if flag:
  157. _set_docid = set()
  158. for item in _split:
  159. _set_docid.add(item["docid"])
  160. if len(_set_docid)>1:
  161. list_group.append(list(_set_docid))
  162. return json.dumps(list_group)
  163. def getDiffIndex(list_dict,key):
  164. _set = set()
  165. for _i in range(len(list_dict)):
  166. item = list_dict[_i]
  167. if key in item:
  168. if item[key]!='' and item[key] is not None:
  169. if re.search("^\d[\d\.]*$",item[key]) is not None:
  170. _set.add(str(float(item[key])))
  171. else:
  172. _set.add(str(item[key]))
  173. if len(_set)>1:
  174. return _i
  175. return len(list_dict)
  176. @annotate('bigint,bigint,string,string,string,string,string,string,string,bigint->string')
  177. class f_remege_limit_num_contain(BaseUDAF):
  178. '''
  179. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同
  180. '''
  181. def __init__(self):
  182. import logging
  183. import json,re
  184. global json,logging,re
  185. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  186. def new_buffer(self):
  187. return [list()]
  188. def iterate(self, buffer,docid,page_time_stamp,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column1,contain_column2,notLike_column,confidence):
  189. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"set_limit_column1":set_limit_column1,
  190. "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,
  191. "contain_column1":contain_column1,"contain_column2":contain_column2,"notLike_column":notLike_column,"confidence":confidence})
  192. def merge(self, buffer, pbuffer):
  193. buffer[0].extend(pbuffer[0])
  194. def getNotLikeSet(self,_dict,column_name):
  195. column_value = _dict.get(column_name,None)
  196. _set = set()
  197. if column_value is not None:
  198. for _i in range(1,len(column_value)):
  199. _set.add(column_value[_i-1:_i+1])
  200. _dict["notLike_set"] = _set
  201. def getSimilarity(self,_set1,_set2):
  202. _sum = max([1,min([len(_set1),len(_set2)])])
  203. return len(_set1&_set2)/_sum
  204. def terminate(self, buffer):
  205. list_group = []
  206. the_group = buffer[0]
  207. SIM_PROB = 0.6
  208. for _d in the_group:
  209. self.getNotLikeSet(_d,"notLike_column")
  210. #判断多个值与否
  211. keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"]
  212. re_merge = False
  213. for _key in keys:
  214. if len(getSet(the_group,_key))>1:
  215. re_merge = True
  216. break
  217. #判断是否相似而不相同
  218. re_merge_sim = False
  219. for _i1 in range(0,len(the_group)):
  220. for _j1 in range(_i1+1,len(the_group)):
  221. _set1 = the_group[_i1]["notLike_set"]
  222. _set2 = the_group[_j1]["notLike_set"]
  223. _sim = self.getSimilarity(_set1,_set2)
  224. if _sim>SIM_PROB and _sim<1:
  225. re_merge_sim = True
  226. break
  227. contain_keys = ["contain_column1","contain_column2"]
  228. logging.info(the_group)
  229. logging.info(str(re_merge)+str(re_merge_sim))
  230. if re_merge or re_merge_sim:
  231. the_group.sort(key=lambda x:x["confidence"],reverse=True)
  232. the_group.sort(key=lambda x:x["page_time_stamp"])
  233. #重新成组
  234. dict_docid_doc = {}
  235. for _doc in the_group:
  236. dict_docid_doc[_doc["docid"]] = _doc
  237. for _doc in the_group:
  238. merge_flag = False
  239. for _index in range(len(list_group)):
  240. _g = list_group[_index]
  241. hit_count = 0
  242. dict_temp = dict()
  243. #多个值的异常
  244. if re_merge:
  245. for _c_key in contain_keys:
  246. dict_temp[_c_key] = _g[_c_key]
  247. if _g[_c_key] is not None and _doc[_c_key] is not None:
  248. if len(_g[_c_key])>len(_doc[_c_key]):
  249. if str(_g[_c_key]).find(str(_doc[_c_key]))>=0:
  250. dict_temp[_c_key] = _g[_c_key]
  251. hit_count += 1
  252. else:
  253. if str(_doc[_c_key]).find(str(_g[_c_key]))>=0:
  254. dict_temp[_c_key] = _doc[_c_key]
  255. _g[_c_key] = _doc[_c_key]
  256. hit_count += 1
  257. else:
  258. hit_count = 1
  259. # if hit_count==len(contain_keys):
  260. if hit_count>0:
  261. _flag_sim = False
  262. #相似而不相同的异常
  263. if re_merge_sim:
  264. for _docid in _g["docid"]:
  265. tmp_d = dict_docid_doc[_docid]
  266. _sim = self.getSimilarity(tmp_d["notLike_set"],_doc["notLike_set"])
  267. if _sim>SIM_PROB and _sim<1:
  268. _flag_sim = True
  269. if not _flag_sim:
  270. for _c_key in dict_temp.keys():
  271. _g[_c_key] = dict_temp[_c_key]
  272. _g["docid"].append(_doc["docid"])
  273. merge_flag = True
  274. break
  275. if not merge_flag:
  276. _dict = dict()
  277. _dict["docid"] = [_doc["docid"]]
  278. for _c_key in contain_keys:
  279. _dict[_c_key] = _doc[_c_key]
  280. list_group.append(_dict)
  281. final_group = []
  282. #判断是否符合一个值
  283. for _group in list_group:
  284. _split = []
  285. for _docid in _group["docid"]:
  286. _split.append(dict_docid_doc[_docid])
  287. #通过置信度排序,尽可能保留组
  288. _split.sort(key=lambda x:x["confidence"],reverse=True)
  289. #置信度
  290. list_key_index = []
  291. for _k in keys:
  292. list_key_index.append(getDiffIndex(_split,_k))
  293. _index = min(list_key_index)
  294. final_group.append([_c["docid"] for _c in _split[:_index]])
  295. for _c in _split[_index:]:
  296. final_group.append([_c["docid"]])
  297. #若是找到两个以上,则全部单独成组,否则成一组
  298. # _flag = True
  299. # for _key in keys:
  300. # if len(getSet(_split,_key))>1:
  301. # _flag = False
  302. # break
  303. # if not _flag:
  304. # for _docid in _group["docid"]:
  305. # final_group.append([_docid])
  306. # else:
  307. # final_group.append(list(set(_group["docid"])))
  308. else:
  309. final_group = [list(set([item["docid"] for item in the_group]))]
  310. log(str(final_group))
  311. return json.dumps(final_group)
  312. def getCurrent_date(format="%Y-%m-%d %H:%M:%S"):
  313. _time = time.strftime(format,time.localtime())
  314. return _time
  315. @annotate('bigint->string')
  316. class f_get_single_merged_bychannel(BaseUDTF):
  317. def process(self,docid):
  318. _d = {"data":{str(docid):[]},"process_time":getCurrent_date()}
  319. self.forward(json.dumps(_d))
  320. @annotate('bigint,bigint,bigint,string,string,string,string,string,string,string,bigint,bigint,string->string')
  321. class f_remege_limit_num_contain_bychannel(BaseUDAF):
  322. '''f_remege_limit_num_contain_bychannel
  323. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同
  324. '''
  325. def __init__(self):
  326. import logging
  327. import json,re
  328. global json,logging,re
  329. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  330. def new_buffer(self):
  331. return [list()]
  332. def iterate(self, buffer,docid,docchannel,page_time_stamp,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column1,contain_column2,notLike_column,confidence,extract_count,json_dicttime):
  333. _dict = {"docid":docid,"docchannel":docchannel,"page_time_stamp":page_time_stamp,"set_limit_column1":set_limit_column1,
  334. "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,
  335. "contain_column1":contain_column1,"contain_column2":contain_column2,"notLike_column":notLike_column,"confidence":confidence,
  336. "extract_count":extract_count,"json_dicttime":json_dicttime}
  337. buffer[0].append(_dict)
  338. def merge(self, buffer, pbuffer):
  339. buffer[0].extend(pbuffer[0])
  340. def getNotLikeSet(self,_dict,column_name):
  341. column_value = _dict.get(column_name,None)
  342. _set = set()
  343. if column_value is not None:
  344. for _i in range(1,len(column_value)):
  345. _set.add(column_value[_i-1:_i+1])
  346. _dict["notLike_set"] = _set
  347. def getSimilarity(self,_set1,_set2):
  348. _sum = max([1,min([len(_set1),len(_set2)])])
  349. return len(_set1&_set2)/_sum
  350. def difftimecount(self,_dict1,_dict2):
  351. _count = 0
  352. for k,v in _dict1.items():
  353. if v is not None and v!="":
  354. v1 = _dict2.get(k)
  355. if v1 is not None and v1!="":
  356. if v!=v1:
  357. _count += 1
  358. return _count
  359. def splitByTimezone(self,list_dict,_key):
  360. cluster_docid = []
  361. dict_docid_key = {}
  362. dict_docid = {}
  363. for _dict in list_dict:
  364. if _dict.get(_key,"") is None or _dict.get(_key,"")=="":
  365. dict_docid_key[_dict.get("docid")] = {}
  366. else:
  367. dict_docid_key[_dict.get("docid")] = json.loads(_dict.get(_key))
  368. dict_docid[_dict.get("docid")] = _dict
  369. for _dict in list_dict:
  370. _find = False
  371. for _cl in cluster_docid:
  372. _legal = True
  373. for _c in _cl:
  374. if self.difftimecount(dict_docid_key.get(_c),dict_docid_key.get(_dict.get("docid")))>0:
  375. _legal = False
  376. break
  377. if _legal:
  378. _cl.append(_dict.get("docid"))
  379. _find = True
  380. if not _find:
  381. cluster_docid.append([_dict.get("docid")])
  382. _result = []
  383. for _cl in cluster_docid:
  384. _r = []
  385. for _c in _cl:
  386. _r.append(dict_docid.get(_c))
  387. _result.append(_r)
  388. return _result
  389. def terminate(self, buffer):
  390. list_group = []
  391. the_group = buffer[0]
  392. SIM_PROB = 0.6
  393. for _d in the_group:
  394. self.getNotLikeSet(_d,"notLike_column")
  395. #判断多个值与否
  396. keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"]
  397. re_merge = False
  398. for _key in keys:
  399. if len(getSet(the_group,_key))>1:
  400. log("has_more_than_one:%s"%str(getSet(the_group,_key)))
  401. re_merge = True
  402. break
  403. #判断是否相似而不相同
  404. re_merge_sim = False
  405. for _i1 in range(0,len(the_group)):
  406. for _j1 in range(_i1+1,len(the_group)):
  407. _set1 = the_group[_i1]["notLike_set"]
  408. _set2 = the_group[_j1]["notLike_set"]
  409. _sim = self.getSimilarity(_set1,_set2)
  410. if _sim>SIM_PROB and _sim<1:
  411. re_merge_sim = True
  412. break
  413. contain_keys = ["contain_column1","contain_column2"]
  414. logging.info(the_group)
  415. logging.info(str(re_merge)+str(re_merge_sim))
  416. #重新成组
  417. dict_docid_doc = {}
  418. for _doc in the_group:
  419. dict_docid_doc[_doc["docid"]] = _doc
  420. if re_merge or re_merge_sim:
  421. the_group.sort(key=lambda x:x["confidence"],reverse=True)
  422. the_group.sort(key=lambda x:x["page_time_stamp"])
  423. for _doc in the_group:
  424. merge_flag = False
  425. for _index in range(len(list_group)):
  426. _g = list_group[_index]
  427. hit_count = 0
  428. dict_temp = dict()
  429. #多个值的异常
  430. if re_merge:
  431. for _c_key in contain_keys:
  432. dict_temp[_c_key] = _g[_c_key]
  433. if _g[_c_key] is not None and _doc[_c_key] is not None:
  434. if len(_g[_c_key])>len(_doc[_c_key]):
  435. if str(_g[_c_key]).find(str(_doc[_c_key]))>=0:
  436. dict_temp[_c_key] = _g[_c_key]
  437. hit_count += 1
  438. else:
  439. if str(_doc[_c_key]).find(str(_g[_c_key]))>=0:
  440. dict_temp[_c_key] = _doc[_c_key]
  441. _g[_c_key] = _doc[_c_key]
  442. hit_count += 1
  443. else:
  444. hit_count = 1
  445. # if hit_count==len(contain_keys):
  446. if hit_count>0:
  447. _flag_sim = False
  448. #相似而不相同的异常
  449. if re_merge_sim:
  450. for _docid in _g["docid"]:
  451. tmp_d = dict_docid_doc[_docid]
  452. _sim = self.getSimilarity(tmp_d["notLike_set"],_doc["notLike_set"])
  453. if _sim>SIM_PROB and _sim<1:
  454. _flag_sim = True
  455. if not _flag_sim:
  456. for _c_key in dict_temp.keys():
  457. _g[_c_key] = dict_temp[_c_key]
  458. _g["docid"].append(_doc["docid"])
  459. merge_flag = True
  460. break
  461. if not merge_flag:
  462. _dict = dict()
  463. _dict["docid"] = [_doc["docid"]]
  464. for _c_key in contain_keys:
  465. _dict[_c_key] = _doc[_c_key]
  466. list_group.append(_dict)
  467. final_group = []
  468. #判断是否符合一个值
  469. for _group in list_group:
  470. _split = []
  471. for _docid in _group["docid"]:
  472. _split.append(dict_docid_doc[_docid])
  473. #通过置信度排序,尽可能保留组
  474. _split.sort(key=lambda x:x["confidence"],reverse=True)
  475. #置信度
  476. list_key_index = []
  477. for _k in keys:
  478. list_key_index.append(getDiffIndex(_split,_k))
  479. _index = min(list_key_index)
  480. final_group.append([_c["docid"] for _c in _split[:_index]])
  481. for _c in _split[_index:]:
  482. final_group.append([_c["docid"]])
  483. #若是找到两个以上,则全部单独成组,否则成一组
  484. # _flag = True
  485. # for _key in keys:
  486. # if len(getSet(_split,_key))>1:
  487. # _flag = False
  488. # break
  489. # if not _flag:
  490. # for _docid in _group["docid"]:
  491. # final_group.append([_docid])
  492. # else:
  493. # final_group.append(list(set(_group["docid"])))
  494. else:
  495. final_group = [list(set([item["docid"] for item in the_group]))]
  496. log("%s--%s"%("final_group",str(final_group)))
  497. #每个channel选择一篇公告
  498. final_group_channel = []
  499. for _group in final_group:
  500. dict_channel_id = {}
  501. otherChannel = 10000
  502. for _docid in _group:
  503. _channel = dict_docid_doc[_docid].get("docchannel")
  504. if _channel in [114,115,116,117]:
  505. otherChannel += 1
  506. _channel = otherChannel
  507. if _channel not in dict_channel_id:
  508. dict_channel_id[_channel] = []
  509. dict_channel_id[_channel].append({"docid":_docid,"page_time_stamp":dict_docid_doc[_docid].get("page_time_stamp"),
  510. "extract_count":dict_docid_doc[_docid].get("extract_count"),
  511. "json_dicttime":dict_docid_doc[_docid].get("json_dicttime")})
  512. #根据日期进行切分
  513. new_dict_channel_id = {}
  514. log("%s:%s"%("dict_channel_id",str(dict_channel_id)))
  515. for k,v in dict_channel_id.items():
  516. list_time_docids = split_with_time(v,"page_time_stamp",86400*6,more_than_one=False)
  517. log(list_time_docids)
  518. for _l in list_time_docids:
  519. list_t = self.splitByTimezone(_l,"json_dicttime")
  520. for _t in list_t:
  521. otherChannel += 1
  522. new_dict_channel_id[otherChannel] = _t
  523. log("%s:%s"%("new_dict_channel_id",str(new_dict_channel_id)))
  524. channel_dict = {}
  525. for k,v in new_dict_channel_id.items():
  526. v.sort(key=lambda x:x["docid"])
  527. v.sort(key=lambda x:x["extract_count"],reverse=True)
  528. channel_dict[v[0]["docid"]] = []
  529. for _docs in v[1:]:
  530. channel_dict[v[0]["docid"]].append(_docs["docid"])
  531. _d = {"data":channel_dict,"process_time":getCurrent_date()}
  532. final_group_channel.append(_d)
  533. return json.dumps(final_group_channel)
  534. @annotate('string -> string')
  535. class f_get_remerge_group_channel(BaseUDTF):
  536. '''
  537. 将多个组拆解成多条记录
  538. '''
  539. def __init__(self):
  540. import logging
  541. import json
  542. global json,logging
  543. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  544. def process(self,json_remerge):
  545. if json_remerge is not None:
  546. list_group = json.loads(json_remerge)
  547. for _group in list_group:
  548. self.forward(json.dumps(_group))
  549. @annotate('string -> string')
  550. class f_get_remerge_group(BaseUDTF):
  551. '''
  552. 将多个组拆解成多条记录
  553. '''
  554. def __init__(self):
  555. import logging
  556. import json
  557. global json,logging
  558. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  559. def process(self,json_remerge):
  560. if json_remerge is not None:
  561. list_group = json.loads(json_remerge)
  562. for _group in list_group:
  563. l_g = list(set(_group))
  564. l_g.sort(key=lambda x:x)
  565. list_docid = [str(_docid) for _docid in l_g]
  566. self.forward(",".join(list_docid))
  567. @annotate('bigint,bigint,string->string')
  568. class f_merge_probability(BaseUDAF):
  569. '''
  570. 合并组为一条记录
  571. '''
  572. def __init__(self):
  573. import json
  574. global json
  575. def new_buffer(self):
  576. return [[]]
  577. def iterate(self, buffer,docid,page_time_stamp,_type):
  578. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"type":_type})
  579. def merge(self, buffer, pbuffer):
  580. buffer[0].extend(pbuffer[0])
  581. def terminate(self, buffer):
  582. list_dict = buffer[0]
  583. list_dict = list_dict[:10000]
  584. list_group = split_with_time(list_dict,sort_key="page_time_stamp",timedelta=86400*120)
  585. return json.dumps(list_group)
  586. @annotate('string -> bigint,bigint,bigint,bigint,string')
  587. class f_split_merge_probability(BaseUDTF):
  588. def __init__(self):
  589. import logging
  590. import json
  591. global logging,json
  592. logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  593. def process(self,list_group_str):
  594. logging.info("0")
  595. logging.info(list_group_str)
  596. if list_group_str is not None:
  597. logging.info("1")
  598. try:
  599. list_group = json.loads(list_group_str)
  600. logging.info("2")
  601. for _group in list_group:
  602. if len(_group)>0:
  603. _type = _group[0].get("type","")
  604. logging.info("3%d"%len(list_group))
  605. # _group.sort(key=lambda x:x["page_time_stamp"])
  606. _len = min(100,len(_group))
  607. for _index_i in range(_len):
  608. _count = 0
  609. for _index_j in range(_index_i+1,_len):
  610. if abs(_group[_index_j]["page_time_stamp"]-_group[_index_i]["page_time_stamp"])>86400*120:
  611. break
  612. _count += 1
  613. _docid1 = _group[_index_i]["docid"]
  614. _docid2 = _group[_index_j]["docid"]
  615. if _docid1<_docid2:
  616. self.forward(_docid1,_docid2,1,_len,_type)
  617. else:
  618. self.forward(_docid2,_docid1,1,_len,_type)
  619. except Exception as e:
  620. logging(str(e))
  621. @annotate('bigint,bigint,string->string')
  622. class f_merge_groupPairs(BaseUDAF):
  623. '''
  624. 合并组为一条记录
  625. '''
  626. def __init__(self):
  627. import json
  628. global json
  629. def new_buffer(self):
  630. return [[]]
  631. def iterate(self, buffer,is_exists,counts,_type):
  632. buffer[0].append({"is_exists":is_exists,"counts":counts,"_type":_type})
  633. def merge(self, buffer, pbuffer):
  634. buffer[0].extend(pbuffer[0])
  635. def terminate(self, buffer):
  636. list_dict = buffer[0]
  637. list_dict = list_dict[:10000]
  638. return json.dumps(list_dict)
  639. @annotate("string -> bigint,bigint,bigint")
  640. class f_merge_getLabel(BaseUDTF):
  641. def __init__(self):
  642. import logging
  643. import json
  644. global logging,json
  645. def process(self,str_docids):
  646. if str_docids is not None:
  647. list_docids = [int(i) for i in str_docids.split(",")]
  648. list_docids.sort(key=lambda x:x)
  649. _len = min(100,len(list_docids))
  650. for index_i in range(_len):
  651. docid_less = list_docids[index_i]
  652. for index_j in range(index_i+1,_len):
  653. docid_greater = list_docids[index_j]
  654. self.forward(docid_less,docid_greater,1)
  655. def getSimilarityOfString(str1,str2):
  656. _set1 = set()
  657. _set2 = set()
  658. if str1 is not None:
  659. for i in range(1,len(str1)):
  660. _set1.add(str1[i-1:i+1])
  661. if str2 is not None:
  662. for i in range(1,len(str2)):
  663. _set2.add(str2[i-1:i+1])
  664. _len = max(1,min(len(_set1),len(_set2)))
  665. return len(_set1&_set2)/_len
  666. def check_columns(tenderee_less,tenderee_greater,
  667. agency_less,agency_greater,project_code_less,project_code_greater,project_name_less,project_name_greater,
  668. win_tenderer_less,win_tenderer_greater,win_bid_price_less,win_bid_price_greater,
  669. bidding_budget_less,bidding_budget_greater,doctitle_refine_less,doctitle_refine_greater):
  670. flag = True
  671. _set_tenderee = set()
  672. if tenderee_less is not None and tenderee_less!="":
  673. _set_tenderee.add(tenderee_less)
  674. if tenderee_greater is not None and tenderee_greater!="":
  675. _set_tenderee.add(tenderee_greater)
  676. if len(_set_tenderee)>1:
  677. return False
  678. code_sim = getSimilarityOfString(project_code_less,project_code_greater)
  679. if code_sim>0.6 and code_sim<1:
  680. return False
  681. #同批次不同编号
  682. if getLength(project_code_less)>0 and getLength(project_code_greater)>0:
  683. _split_code_less = project_code_less.split("-")
  684. _split_code_greater = project_code_greater.split("-")
  685. if len(_split_code_less)>1 and len(_split_code_greater)>1:
  686. if _split_code_less[0]==_split_code_greater[0] and project_code_less!=project_code_greater:
  687. return False
  688. _set_win_tenderer = set()
  689. if win_tenderer_less is not None and win_tenderer_less!="":
  690. _set_win_tenderer.add(win_tenderer_less)
  691. if win_tenderer_greater is not None and win_tenderer_greater!="":
  692. _set_win_tenderer.add(win_tenderer_greater)
  693. if len(_set_win_tenderer)>1:
  694. return False
  695. _set_win_bid_price = set()
  696. if win_bid_price_less is not None and win_bid_price_less!="":
  697. _set_win_bid_price.add(float(win_bid_price_less))
  698. if win_bid_price_greater is not None and win_bid_price_greater!="":
  699. _set_win_bid_price.add(float(win_bid_price_greater))
  700. if len(_set_win_bid_price)>1:
  701. return False
  702. _set_bidding_budget = set()
  703. if bidding_budget_less is not None and bidding_budget_less!="":
  704. _set_bidding_budget.add(float(bidding_budget_less))
  705. if bidding_budget_greater is not None and bidding_budget_greater!="":
  706. _set_bidding_budget.add(float(bidding_budget_greater))
  707. if len(_set_bidding_budget)>1:
  708. return False
  709. return True
  710. def getSimLevel(str1,str2):
  711. str1_null = False
  712. str2_null = False
  713. _v = 0
  714. if str1 is None or str1=="":
  715. str1_null = True
  716. if str2 is None or str2=="":
  717. str2_null = True
  718. if str1_null and str2_null:
  719. _v = 2
  720. elif str1_null and not str2_null:
  721. _v = 4
  722. elif not str1_null and str2_null:
  723. _v = 6
  724. elif not str1_null and not str2_null:
  725. if str1==str2:
  726. _v = 10
  727. else:
  728. _v = 0
  729. return _v
  730. import math
  731. def featurnCount(_count,max_count=100):
  732. return max(0,min(1,_count))*(1/math.sqrt(max(1,_count-1)))
  733. def getLength(_str):
  734. return len(_str if _str is not None else "")
  735. @annotate("string->bigint")
  736. class f_get_min_counts(object):
  737. def evaluate(self,json_context):
  738. _context = json.loads(json_context)
  739. min_counts = 100
  740. for item in _context:
  741. if item["counts"]<min_counts:
  742. min_counts = item["counts"]
  743. return min_counts
  744. @annotate("string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string->string,double")
  745. class f_merge_featureMatrix(BaseUDTF):
  746. def __init__(self):
  747. import logging
  748. import json
  749. global logging,json
  750. def process(self,json_context,tenderee_less,tenderee_greater,
  751. agency_less,agency_greater,project_code_less,project_code_greater,project_name_less,project_name_greater,
  752. win_tenderer_less,win_tenderer_greater,win_bid_price_less,win_bid_price_greater,
  753. bidding_budget_less,bidding_budget_greater,doctitle_refine_less,doctitle_refine_greater):
  754. if not check_columns(tenderee_less,tenderee_greater,
  755. agency_less,agency_greater,project_code_less,project_code_greater,project_name_less,project_name_greater,
  756. win_tenderer_less,win_tenderer_greater,win_bid_price_less,win_bid_price_greater,
  757. bidding_budget_less,bidding_budget_greater,doctitle_refine_less,doctitle_refine_greater):
  758. return
  759. _context = json.loads(json_context)
  760. min_counts = 100
  761. dict_context = {}
  762. for item in _context:
  763. if item["counts"]<min_counts:
  764. min_counts = item["counts"]
  765. dict_context[item["_type"]] = [item["is_exists"],item["counts"]]
  766. context_key = ["tenderee","agency","project_code","project_name","win_tenderer","win_bid_price","bidding_budget","doctitle_refine"]
  767. list_matrix = []
  768. for index_i in range(len(context_key)):
  769. for index_j in range(index_i+1,len(context_key)):
  770. _key = "%s&%s"%(context_key[index_i],context_key[index_j])
  771. _v = featurnCount(dict_context.get(_key,[0,0])[1])
  772. list_matrix.append(_v)
  773. context3_key = ["tenderee","agency","win_tenderer","win_bid_price","bidding_budget"]
  774. for index_i in range(len(context3_key)):
  775. for index_j in range(index_i+1,len(context3_key)):
  776. for index_k in range(index_j+1,len(context3_key)):
  777. _key = "%s&%s&%s"%(context3_key[index_i],context3_key[index_j],context3_key[index_k])
  778. _v = featurnCount(dict_context.get(_key,[0,0])[1])
  779. list_matrix.append(_v)
  780. list_matrix.append(getSimLevel(tenderee_less,tenderee_greater)/10)
  781. list_matrix.append(getSimLevel(agency_less,agency_greater)/10)
  782. list_matrix.append(getSimilarityOfString(project_code_less,project_code_greater))
  783. list_matrix.append(getSimilarityOfString(project_name_less,project_name_greater))
  784. list_matrix.append(getSimLevel(win_tenderer_less,win_tenderer_greater)/10)
  785. list_matrix.append(getSimLevel(win_bid_price_less,win_bid_price_greater)/10)
  786. list_matrix.append(getSimLevel(bidding_budget_less,bidding_budget_greater)/10)
  787. list_matrix.append(getSimilarityOfString(doctitle_refine_less,doctitle_refine_greater))
  788. # set_tenderer = set()
  789. # if tenderee_less is not None and tenderee_less!="":
  790. # set_tenderer.add(tenderee_less)
  791. # if tenderee_greater is not None and tenderee_greater!="":
  792. # set_tenderer.add(tenderee_greater)
  793. #
  794. # set_win_tenderer = set()
  795. # if win_tenderer_less is not None and win_tenderer_less!="":
  796. # set_win_tenderer.add(win_tenderer_less)
  797. # if win_tenderer_greater is not None and win_tenderer_greater!="":
  798. # set_win_tenderer.add(win_tenderer_greater)
  799. #
  800. # set_bidding_budget = set()
  801. # if bidding_budget_less is not None and bidding_budget_less!="":
  802. # set_bidding_budget.add(bidding_budget_less)
  803. # if bidding_budget_greater is not None and bidding_budget_greater!="":
  804. # set_bidding_budget.add(bidding_budget_greater)
  805. #
  806. # set_win_bid_price = set()
  807. # if win_bid_price_less is not None and win_bid_price_less!="":
  808. # set_win_bid_price.add(win_bid_price_less)
  809. # if win_bid_price_greater is not None and win_bid_price_greater!="":
  810. # set_win_bid_price.add(win_bid_price_greater)
  811. json_matrix = json.dumps(list_matrix)
  812. same_project_code = False
  813. if project_code_less==project_code_greater and getLength(project_code_less)>0:
  814. same_project_code = True
  815. same_project_name = False
  816. if project_name_less==project_name_greater and getLength(project_name_less)>0:
  817. same_project_name = True
  818. same_doctitle_refine = False
  819. if doctitle_refine_less==doctitle_refine_greater and getLength(doctitle_refine_less)>0:
  820. same_doctitle_refine = True
  821. same_tenderee = False
  822. if tenderee_less==tenderee_greater and getLength(tenderee_less)>0:
  823. same_tenderee = True
  824. same_agency = False
  825. if agency_less==agency_greater and getLength(agency_less)>0:
  826. same_agency = True
  827. same_bidding_budget = False
  828. if bidding_budget_less==bidding_budget_greater and getLength(bidding_budget_less)>0:
  829. same_bidding_budget = True
  830. same_win_tenderer = False
  831. if win_tenderer_less==win_tenderer_greater and getLength(win_tenderer_less)>0:
  832. same_win_tenderer = True
  833. same_win_bid_price = False
  834. if win_bid_price_less==win_bid_price_greater and getLength(win_bid_price_less)>0:
  835. same_win_bid_price = True
  836. contain_doctitle = False
  837. if getLength(doctitle_refine_less)>0 and getLength(doctitle_refine_greater)>0 and (doctitle_refine_less in doctitle_refine_greater or doctitle_refine_greater in doctitle_refine_less):
  838. contain_doctitle = True
  839. contain_project_name = False
  840. if getLength(project_name_less)>0 and getLength(project_name_greater)>0 and (project_name_less in project_name_greater or project_name_greater in project_name_less):
  841. contain_project_name = True
  842. total_money_less = 0 if getLength(bidding_budget_less)==0 else float(bidding_budget_less)+0 if getLength(win_bid_price_less)==0 else float(win_bid_price_less)
  843. total_money_greater = 0 if getLength(bidding_budget_greater)==0 else float(bidding_budget_greater) +0 if getLength(win_bid_price_greater)==0 else float(win_bid_price_greater)
  844. if min_counts<10:
  845. _prob = 0.9
  846. if same_project_code and same_win_tenderer and same_tenderee:
  847. self.forward(json_matrix,_prob)
  848. return
  849. if same_tenderee and same_project_name and same_win_tenderer:
  850. self.forward(json_matrix,_prob)
  851. return
  852. if same_tenderee and same_doctitle_refine and same_win_tenderer:
  853. self.forward(json_matrix,_prob)
  854. return
  855. if same_tenderee and same_win_bid_price and same_win_tenderer:
  856. self.forward(json_matrix,_prob)
  857. return
  858. if same_project_code and same_win_bid_price and same_win_tenderer:
  859. self.forward(json_matrix,_prob)
  860. return
  861. if same_project_name and same_win_bid_price and same_win_tenderer:
  862. self.forward(json_matrix,_prob)
  863. return
  864. if same_doctitle_refine and same_win_bid_price and same_win_tenderer:
  865. self.forward(json_matrix,_prob)
  866. return
  867. if same_doctitle_refine and same_bidding_budget and same_win_tenderer:
  868. self.forward(json_matrix,_prob)
  869. return
  870. if same_tenderee and same_doctitle_refine and same_win_tenderer:
  871. self.forward(json_matrix,_prob)
  872. return
  873. if same_tenderee and same_project_code and same_project_name:
  874. self.forward(json_matrix,_prob)
  875. return
  876. if same_tenderee and same_project_code and same_doctitle_refine:
  877. self.forward(json_matrix,_prob)
  878. return
  879. if same_tenderee and same_bidding_budget and same_project_code:
  880. self.forward(json_matrix,_prob)
  881. return
  882. if same_tenderee and same_bidding_budget and same_doctitle_refine:
  883. self.forward(json_matrix,_prob)
  884. return
  885. if same_tenderee and same_bidding_budget and same_project_name:
  886. self.forward(json_matrix,_prob)
  887. return
  888. if same_doctitle_refine and same_project_code and same_project_name:
  889. self.forward(json_matrix,_prob)
  890. return
  891. if min_counts<=5:
  892. _prob = 0.8
  893. if same_project_code and same_tenderee:
  894. self.forward(json_matrix,_prob)
  895. return
  896. if same_project_code and same_win_tenderer:
  897. self.forward(json_matrix,_prob)
  898. return
  899. if same_project_name and same_project_code:
  900. self.forward(json_matrix,_prob)
  901. return
  902. if same_project_code and same_doctitle_refine:
  903. self.forward(json_matrix,_prob)
  904. return
  905. if total_money_less==total_money_greater and total_money_less>100000:
  906. if same_win_tenderer and (same_win_bid_price or same_bidding_budget):
  907. self.forward(json_matrix,_prob)
  908. return
  909. if same_project_code and same_bidding_budget:
  910. self.forward(json_matrix,_prob)
  911. return
  912. if same_project_code and same_win_bid_price:
  913. self.forward(json_matrix,_prob)
  914. return
  915. if same_bidding_budget and same_win_bid_price and (contain_project_name or contain_doctitle):
  916. self.forward(json_matrix,_prob)
  917. return
  918. if min_counts<=3:
  919. _prob = 0.7
  920. if same_project_name or same_project_code or same_doctitle_refine or contain_doctitle or contain_project_name:
  921. self.forward(json_matrix,_prob)
  922. return
  923. self.forward(json_matrix,0)
  924. class MergePredictor():
  925. def __init__(self):
  926. self.input_size = 46
  927. self.output_size = 2
  928. self.matrix = np.array([[-5.817399024963379, 3.367797374725342], [-18.3098201751709, 17.649206161499023], [-7.115952014923096, 9.236002922058105], [-5.054129123687744, 1.8316771984100342], [6.391637325286865, -7.57396125793457], [-2.8721542358398438, 6.826520919799805], [-5.426159858703613, 10.235260009765625], [-4.240962982177734, -0.32092899084091187], [-0.6378090381622314, 0.4834124445915222], [-1.7574478387832642, -0.17846578359603882], [4.325063228607178, -2.345501661300659], [0.6086963415145874, 0.8325914740562439], [2.5674285888671875, 1.8432368040084839], [-11.195490837097168, 17.4630184173584], [-11.334247589111328, 10.294097900390625], [2.639320135116577, -8.072785377502441], [-2.2689898014068604, -3.6194612979888916], [-11.129570960998535, 18.907018661499023], [4.526485919952393, 4.57423210144043], [-3.170452356338501, -1.3847776651382446], [-0.03280467540025711, -3.0471489429473877], [-6.601675510406494, -10.05613899230957], [-2.9116673469543457, 4.819308280944824], [1.4398306608200073, -0.6549674272537231], [7.091512203216553, -0.142232745885849], [-0.14478975534439087, 0.06628061085939407], [-6.775437831878662, 9.279582023620605], [-0.006781991105526686, 1.6472798585891724], [3.83730149269104, 1.4072834253311157], [1.2229349613189697, -2.1653425693511963], [1.445560336112976, -0.8397432565689087], [-11.325132369995117, 11.231744766235352], [2.3229124546051025, -4.623719215393066], [0.38562265038490295, -1.2645516395568848], [-1.3670002222061157, 2.4323790073394775], [-3.6994268894195557, 0.7515658736228943], [-0.11617227643728256, -0.820703387260437], [4.089913368225098, -4.693605422973633], [-0.4959050714969635, 1.5272167921066284], [-2.7135870456695557, -0.5120691657066345], [0.573157548904419, -1.9375460147857666], [-4.262857437133789, 0.6375582814216614], [-1.8825865983963013, 2.427532911300659], [-4.565115451812744, 4.0269083976745605], [-4.339804649353027, 6.754288196563721], [-4.31907320022583, 0.28193211555480957]])
  929. self.bias = np.array([16.79706382751465, -13.713337898254395])
  930. # self.model = load_model("model/merge.h5",custom_objects={"precision":precision,"recall":recall,"f1_score":f1_score})
  931. def activation(self,vec,_type):
  932. if _type=="relu":
  933. _vec = np.array(vec)
  934. return _vec*(_vec>0)
  935. if _type=="tanh":
  936. return np.tanh(vec)
  937. if _type=="softmax":
  938. _vec = np.array(vec)
  939. _exp = np.exp(_vec)
  940. return _exp/np.sum(_exp)
  941. def predict(self,input):
  942. _out = self.activation(self.activation(np.matmul(np.array(input).reshape(-1,self.input_size),self.matrix)+self.bias,"tanh"),"softmax")
  943. # print(self.model.predict(np.array(input).reshape(-1,46)))
  944. return _out
  945. @annotate('string,double -> double')
  946. class f_getMergeProb(BaseUDTF):
  947. def __init__(self):
  948. import json
  949. include_package_path("numpy-1.18.zip")
  950. import numpy as np
  951. global json,np
  952. self.mp = MergePredictor()
  953. def process(self,json_matrix,pre_prob):
  954. if not pre_prob>0.5:
  955. _matrix = json.loads(json_matrix)
  956. _prob = self.mp.predict(_matrix)[0][1]
  957. else:
  958. _prob = pre_prob
  959. if _prob>0.5:
  960. self.forward(float(_prob))
  961. @annotate('string -> bigint,bigint')
  962. class f_check_remerge_channel(BaseUDTF):
  963. '''
  964. 将多个组拆解成多条记录
  965. '''
  966. def __init__(self):
  967. import logging
  968. import json
  969. global json,logging
  970. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  971. def process(self,json_remerge):
  972. if json_remerge is not None:
  973. list_group = json.loads(json_remerge)
  974. for _group in list_group:
  975. _keys = _group.get("data").keys()
  976. if len(_keys)>0:
  977. main_docid = int(list(_keys)[0])
  978. for k,v in _group.get("data",{}).items():
  979. self.forward(main_docid,int(k))
  980. for _v in v:
  981. self.forward(main_docid,int(_v))
  982. @annotate('string -> bigint,bigint')
  983. class f_check_remerge(BaseUDTF):
  984. '''
  985. 将多个组拆解成多条记录
  986. '''
  987. def __init__(self):
  988. import logging
  989. import json
  990. global json,logging
  991. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  992. def process(self,json_remerge):
  993. if json_remerge is not None:
  994. list_group = json.loads(json_remerge)
  995. for _group in list_group:
  996. for _docid in _group:
  997. self.forward(_group[-1],_docid)
  998. def getConfidence(rule_id):
  999. if rule_id >=1 and rule_id <=20:
  1000. return 30
  1001. elif rule_id>=31 and rule_id<=50:
  1002. return 20
  1003. else:
  1004. return 10
  1005. @annotate('string,bigint -> bigint,bigint,bigint')
  1006. class f_arrange_group_single(BaseUDTF):
  1007. '''
  1008. 将多个组拆解成多条记录
  1009. '''
  1010. def __init__(self):
  1011. import logging
  1012. import json
  1013. global json,logging
  1014. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  1015. def process(self,json_set_docid,rule_id):
  1016. if json_set_docid is not None:
  1017. list_group = json.loads(json_set_docid)
  1018. for _group in list_group:
  1019. for index_i in range(len(_group)):
  1020. for index_j in range(len(_group)):
  1021. # if index_i!=index_j and _group[index_i]!=_group[index_j]:
  1022. if index_i!=index_j:
  1023. self.forward(_group[index_i],_group[index_j],getConfidence(rule_id))
  1024. @annotate('bigint,bigint->string')
  1025. class f_get_merge_docids(BaseUDAF):
  1026. '''
  1027. 合并组为一条记录
  1028. '''
  1029. def __init__(self):
  1030. import json
  1031. global json
  1032. def new_buffer(self):
  1033. return [set()]
  1034. def iterate(self, buffer,docid1,docid2):
  1035. buffer[0].add(docid1)
  1036. buffer[0].add(docid2)
  1037. def merge(self, buffer, pbuffer):
  1038. buffer[0] |= pbuffer[0]
  1039. def terminate(self, buffer):
  1040. set_docid = buffer[0]
  1041. list_docid = list(set_docid)
  1042. list_docid.sort(key=lambda x:x)
  1043. list_docid_str = []
  1044. for _docid in list_docid:
  1045. list_docid_str.append(str(_docid))
  1046. return ",".join(list_docid_str)
  1047. @annotate("string,string,string,string,string,string,string,string,string,string,string,string,string,string->string")
  1048. class f_encode_time(object):
  1049. def evaluate(self,time_bidclose,time_bidopen,time_bidstart,time_commencement,time_completion,time_earnest_money_end,time_earnest_money_start,time_get_file_end,time_get_file_start,time_publicity_end,time_publicity_start,time_registration_end,time_registration_start,time_release):
  1050. _dict = {"time_bidclose":time_bidclose,"time_bidopen":time_bidopen,"time_bidstart":time_bidstart,
  1051. "time_commencement":time_commencement,"time_completion":time_completion,"time_earnest_money_end":time_earnest_money_end,
  1052. "time_earnest_money_start":time_earnest_money_start,"time_get_file_end":time_get_file_end,"time_get_file_start":time_get_file_start,
  1053. "time_publicity_end":time_publicity_end,"time_publicity_start":time_publicity_start,"time_registration_end":time_registration_end,
  1054. "time_registration_start":time_registration_start,"time_release":time_release}
  1055. _encode = json.dumps(_dict)
  1056. return _encode
  1057. @annotate('string,string -> string,string')
  1058. class f_decode_ruwei(BaseUDTF):
  1059. def __init__(self):
  1060. import logging
  1061. import json
  1062. global json,logging
  1063. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  1064. def process(self, page_time,sub_docs_json):
  1065. if sub_docs_json is not None:
  1066. for sub_docs in json.loads(sub_docs_json):
  1067. if sub_docs.get("win_tenderer","")!="":
  1068. self.forward(page_time,sub_docs.get("win_tenderer",""))
  1069. if sub_docs.get("second_tenderer","")!="":
  1070. self.forward(page_time,sub_docs.get("second_tenderer",""))
  1071. if sub_docs.get("third_tenderer","")!="":
  1072. self.forward(page_time,sub_docs.get("third_tenderer",""))
  1073. if __name__ == '__main__':
  1074. a = f_remege_limit_num_contain_bychannel()
  1075. buffer = a.new_buffer()
  1076. tmp_s = '''
  1077. 234858920 229011768 2022-03-25 1648137600 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工招标文件.pdf 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化施工文件.pdf 珠海大横琴公共设施建设管理有限公司 珠海德联工程咨询有限公司 103 0 7 "{"time_bidclose": "", "time_bidopen": "", "time_bidstart": "", "time_commencement": "2022-04-29", "time_completion": "", "time_earnest_money_end": "", "time_earnest_money_start": "", "time_get_file_end": "", "time_get_file_start": "", "time_publicity_end": "", "time_publicity_start": "", "time_registration_end": "", "time_registration_start": "", "time_release": ""}"
  1078. 234858920 232745950 2022-04-12 1649692800 E4404000001002779001 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工招标答疑 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化施工答疑 珠海大横琴公共设施建设管理有限公司 珠海德联工程咨询有限公司 103 0 8 "{"time_bidclose": "", "time_bidopen": "", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_earnest_money_end": "", "time_earnest_money_start": "", "time_get_file_end": "", "time_get_file_start": "", "time_publicity_end": "", "time_publicity_start": "", "time_registration_end": "", "time_registration_start": "", "time_release": ""}"
  1079. 234858920 234858920 2022-04-21 1650470400 E4404000001002779001001 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化施工 101 1 2 "{"time_bidclose": "", "time_bidopen": "", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_earnest_money_end": "", "time_earnest_money_start": "", "time_get_file_end": "", "time_get_file_start": "", "time_publicity_end": "", "time_publicity_start": "", "time_registration_end": "", "time_registration_start": "", "time_release": ""}"
  1080. 234858920 234595980 2022-04-20 1650384000 E4404000001002779001001 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化施工 珠海大横琴公共设施建设管理有限公司 珠海德联工程咨询有限公司 105 0 10 "{"time_bidclose": "", "time_bidopen": "", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_earnest_money_end": "", "time_earnest_money_start": "", "time_get_file_end": "", "time_get_file_start": "", "time_publicity_end": "2022-04-22", "time_publicity_start": "2022-04-21", "time_registration_end": "", "time_registration_start": "", "time_release": ""}"
  1081. 234858920 228908786 2022-03-25 1648137600 E4404000001002779001001 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化施工 珠海大横琴公共设施建设管理有限公司 珠海德联工程咨询有限公司 1795743.68 52 0 8 "{"time_bidclose": "2022-04-20", "time_bidopen": "2022-04-20", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_earnest_money_end": "2022-04-20", "time_earnest_money_start": "", "time_get_file_end": "", "time_get_file_start": "2022-03-26", "time_publicity_end": "2022-04-26", "time_publicity_start": "", "time_registration_end": "", "time_registration_start": "", "time_release": ""}"
  1082. 234858920 234523333 2022-04-20 1650384000 E4404000001002779001001 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化施工 101 0 2 "{"time_bidclose": "", "time_bidopen": "", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_earnest_money_end": "", "time_earnest_money_start": "", "time_get_file_end": "", "time_get_file_start": "", "time_publicity_end": "", "time_publicity_start": "", "time_registration_end": "", "time_registration_start": "", "time_release": ""}"
  1083. 234858920 234787082 2022-04-20 1650384000 E4404000001002779001001 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工开标记录表 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化施工开标记录表 1795743.68 101 0 6 "{"time_bidclose": "", "time_bidopen": "2022-04-20", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_earnest_money_end": "", "time_earnest_money_start": "", "time_get_file_end": "", "time_get_file_start": "", "time_publicity_end": "", "time_publicity_start": "", "time_registration_end": "", "time_registration_start": "", "time_release": ""}"
  1084. 234858920 235240618 2022-04-22 1650556800 E4404000001002779001001 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化工程施工 横琴新区2号消防站暨新区消防宣教培训体验中心项目智能化施工 广东博思信息技术股份有限公司 1775136.23 101 0 12 "{"time_bidclose": "", "time_bidopen": "", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_earnest_money_end": "", "time_earnest_money_start": "", "time_get_file_end": "", "time_get_file_start": "", "time_publicity_end": "2022-04-26", "time_publicity_start": "2022-04-24", "time_registration_end": "", "time_registration_start": "", "time_release": ""}"
  1085. '''
  1086. for _s in tmp_s.split("\n"):
  1087. ls = _s.split("\t")
  1088. if len(ls)!=17:
  1089. continue
  1090. _confid = 1 if ls[14] =="" else ls[14]
  1091. a.iterate(buffer,ls[1],ls[13],int(ls[3]),ls[8],ls[10],ls[11],ls[12],ls[7],ls[5],ls[4],_confid,ls[15],ls[16][1:-1])
  1092. # a.iterate(buffer,219957825,101,86400*4,"1","1","1","1","1","1","1",0,5,'{"time_bidclose": "", "time_bidopen": "2022-02-10", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_earnest_money_end": "", "time_earnest_money_start": "", "time_get_file_end": "", "time_get_file_start": "", "time_publicity_end": "2022-02-21", "time_publicity_start": "2022-02-11", "time_registration_end": "", "time_registration_start": "", "time_release": ""}')
  1093. # a.iterate(buffer,219957825,101,86400*4,"1","1","1","1","1","1","1",0,5,'{"time_bidclose": "", "time_bidopen": "2022-02-10", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_earnest_money_end": "", "time_earnest_money_start": "", "time_get_file_end": "", "time_get_file_start": "", "time_publicity_end": "2022-02-21", "time_publicity_start": "2022-02-11", "time_registration_end": "", "time_registration_start": "", "time_release": ""}')
  1094. # a.iterate(buffer,219957825,101,86400*4,"1","1","1","1","1","1","1",0,5,'{"time_bidclose": "", "time_bidopen": "2022-02-10", "time_bidstart": "", "time_commencement": "", "time_completion": "", "time_earnest_money_end": "", "time_earnest_money_start": "", "time_get_file_end": "", "time_get_file_start": "", "time_publicity_end": "2022-02-22", "time_publicity_start": "2022-02-11", "time_registration_end": "", "time_registration_start": "", "time_release": ""}')
  1095. print(a.terminate(buffer))
  1096. print(1)