documentMerge.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. #coding:UTF8
  2. from odps.udf import annotate
  3. from odps.udf import BaseUDAF
  4. from odps.udf import BaseUDTF
  5. def getSet(list_dict,key):
  6. _set = set()
  7. for item in list_dict:
  8. if key in item:
  9. if item[key]!='' and item[key] is not None:
  10. if re.search("^[\d\.]+$",item[key]) is not None:
  11. _set.add(str(float(item[key])))
  12. else:
  13. _set.add(str(item[key]))
  14. return _set
  15. def split_with_time(list_dict,sort_key,timedelta=86400*120):
  16. if len(list_dict)>0:
  17. if sort_key in list_dict[0]:
  18. list_dict.sort(key=lambda x:x[sort_key])
  19. list_group = []
  20. _begin = 0
  21. for i in range(len(list_dict)-1):
  22. if abs(list_dict[i][sort_key]-list_dict[i+1][sort_key])<timedelta:
  23. continue
  24. else:
  25. _group = []
  26. for j in range(_begin,i+1):
  27. _group.append(list_dict[j])
  28. if len(_group)>1:
  29. list_group.append(_group)
  30. _begin = i + 1
  31. if len(list_dict)>1:
  32. _group = []
  33. for j in range(_begin,len(list_dict)):
  34. _group.append(list_dict[j])
  35. if len(_group)>1:
  36. list_group.append(_group)
  37. return list_group
  38. return [list_dict]
  39. @annotate('bigint,bigint,string,string,string,string,string,string,bigint->string')
  40. class f_merge_rule_limit_num_contain_greater(BaseUDAF):
  41. '''
  42. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同
  43. '''
  44. def __init__(self):
  45. import logging
  46. import json,re
  47. global json,logging,re
  48. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  49. def new_buffer(self):
  50. return [list()]
  51. def iterate(self, buffer,docid,page_time_stamp,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column,greater_column,MAX_NUM):
  52. buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"set_limit_column1":set_limit_column1,
  53. "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,
  54. "contain_column":contain_column,"greater_column":greater_column,"MAX_NUM":MAX_NUM})
  55. def merge(self, buffer, pbuffer):
  56. buffer[0].extend(pbuffer[0])
  57. def terminate(self, buffer):
  58. MAX_NUM = 5
  59. if len(buffer[0])>0:
  60. MAX_NUM = buffer[0][0]["MAX_NUM"]
  61. list_split = split_with_time(buffer[0],"page_time_stamp")
  62. list_group = []
  63. for _split in list_split:
  64. flag = True
  65. keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"]
  66. dict_set = {}
  67. for _key in keys:
  68. dict_set[_key] = set()
  69. if len(_split)>MAX_NUM:
  70. flag = False
  71. else:
  72. for _key in keys:
  73. logging.info(_key+str(getSet(_split,_key)))
  74. if len(getSet(_split,_key))>1:
  75. flag = False
  76. break
  77. MAX_CONTAIN_COLUMN = None
  78. #判断组内每条公告是否包含
  79. if flag:
  80. for _d in _split:
  81. contain_column = _d["contain_column"]
  82. if contain_column is not None and contain_column !="":
  83. if MAX_CONTAIN_COLUMN is None:
  84. MAX_CONTAIN_COLUMN = contain_column
  85. else:
  86. if len(MAX_CONTAIN_COLUMN)<len(contain_column):
  87. if contain_column.find(MAX_CONTAIN_COLUMN)==-1:
  88. flag = False
  89. break
  90. MAX_CONTAIN_COLUMN = contain_column
  91. else:
  92. if MAX_CONTAIN_COLUMN.find("project_name")==-1:
  93. flag = False
  94. break
  95. if len(getSet(_split,"greater_column"))==1:
  96. flag = False
  97. break
  98. if flag:
  99. _set_docid = set()
  100. for item in _split:
  101. _set_docid.add(item["docid"])
  102. if len(_set_docid)>1:
  103. list_group.append(list(_set_docid))
  104. return json.dumps(list_group)
  105. @annotate('bigint,string,string,string,string,string,string->string')
  106. class f_remege_limit_num_contain(BaseUDAF):
  107. '''
  108. 项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同
  109. '''
  110. def __init__(self):
  111. import logging
  112. import json,re
  113. global json,logging,re
  114. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  115. def new_buffer(self):
  116. return [list()]
  117. def iterate(self, buffer,docid,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column1,contain_column2):
  118. buffer[0].append({"docid":docid,"set_limit_column1":set_limit_column1,
  119. "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,
  120. "contain_column1":contain_column1,"contain_column2":contain_column2})
  121. def merge(self, buffer, pbuffer):
  122. buffer[0].extend(pbuffer[0])
  123. def terminate(self, buffer):
  124. list_group = []
  125. the_group = buffer[0]
  126. keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"]
  127. re_merge = False
  128. for _key in keys:
  129. if len(getSet(the_group,_key))>1:
  130. re_merge = True
  131. break
  132. contain_keys = ["contain_column1","contain_column2"]
  133. logging.info(the_group)
  134. if re_merge:
  135. dict_docid_doc = {}
  136. for _doc in the_group:
  137. dict_docid_doc[_doc["docid"]] = _doc
  138. for _doc in the_group:
  139. merge_flag = False
  140. for _index in range(len(list_group)):
  141. _g = list_group[_index]
  142. hit_count = 0
  143. dict_temp = dict()
  144. for _c_key in contain_keys:
  145. if _g[_c_key] is not None and _doc[_c_key] is not None:
  146. if len(_g[_c_key])>len(_doc[_c_key]):
  147. if str(_g[_c_key]).find(str(_doc[_c_key]))>=0:
  148. dict_temp[_c_key] = _g[_c_key]
  149. hit_count += 1
  150. else:
  151. if str(_doc[_c_key]).find(str(_g[_c_key]))>=0:
  152. dict_temp[_c_key] = _doc[_c_key]
  153. _g[_c_key] = _doc[_c_key]
  154. hit_count += 1
  155. logging.info(_doc["docid"])
  156. logging.info(_g)
  157. logging.info(str(hit_count))
  158. if hit_count==len(contain_keys):
  159. for _c_key in contain_keys:
  160. _g[_c_key] = dict_temp[_c_key]
  161. _g["docid"].append(_doc["docid"])
  162. merge_flag = True
  163. break
  164. if not merge_flag:
  165. _dict = dict()
  166. _dict["docid"] = [_doc["docid"]]
  167. for _c_key in contain_keys:
  168. _dict[_c_key] = _doc[_c_key]
  169. list_group.append(_dict)
  170. final_group = []
  171. #判断是否符合一个值
  172. for _group in list_group:
  173. _split = []
  174. for _docid in _group["docid"]:
  175. _split.append(dict_docid_doc[_docid])
  176. _flag = True
  177. for _key in keys:
  178. if len(getSet(_split,_key))>1:
  179. _flag = False
  180. break
  181. if not _flag:
  182. for _docid in _group["docid"]:
  183. final_group.append([_docid])
  184. else:
  185. final_group.append(list(set(_group["docid"])))
  186. else:
  187. final_group = [list(set([item["docid"] for item in the_group]))]
  188. return json.dumps(final_group)
  189. @annotate('string -> string')
  190. class f_get_remerge_group(BaseUDTF):
  191. '''
  192. 将多个组拆解成多条记录
  193. '''
  194. def __init__(self):
  195. import logging
  196. import json
  197. global json,logging
  198. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  199. def process(self,json_remerge):
  200. if json_remerge is not None:
  201. list_group = json.loads(json_remerge)
  202. for _group in list_group:
  203. l_g = list(set(_group))
  204. list_docid = [str(_docid) for _docid in l_g]
  205. self.forward(",".join(list_docid))
  206. @annotate('string -> bigint,bigint')
  207. class f_check_remerge(BaseUDTF):
  208. '''
  209. 将多个组拆解成多条记录
  210. '''
  211. def __init__(self):
  212. import logging
  213. import json
  214. global json,logging
  215. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  216. def process(self,json_remerge):
  217. if json_remerge is not None:
  218. list_group = json.loads(json_remerge)
  219. for _group in list_group:
  220. for _docid in _group:
  221. self.forward(_group[-1],_docid)
  222. @annotate('string -> bigint,bigint')
  223. class f_arrange_group_single(BaseUDTF):
  224. '''
  225. 将多个组拆解成多条记录
  226. '''
  227. def __init__(self):
  228. import logging
  229. import json
  230. global json,logging
  231. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  232. def process(self,json_set_docid):
  233. if json_set_docid is not None:
  234. list_group = json.loads(json_set_docid)
  235. for _group in list_group:
  236. for index_i in range(len(_group)):
  237. for index_j in range(len(_group)):
  238. # if index_i!=index_j and _group[index_i]!=_group[index_j]:
  239. if index_i!=index_j:
  240. self.forward(_group[index_i],_group[index_j])
  241. @annotate('bigint,bigint->string')
  242. class f_get_merge_docids(BaseUDAF):
  243. '''
  244. 合并组为一条记录
  245. '''
  246. def __init__(self):
  247. import json
  248. global json
  249. def new_buffer(self):
  250. return [set()]
  251. def iterate(self, buffer,docid1,docid2):
  252. buffer[0].add(docid1)
  253. buffer[0].add(docid2)
  254. def merge(self, buffer, pbuffer):
  255. buffer[0] |= pbuffer[0]
  256. def terminate(self, buffer):
  257. set_docid = buffer[0]
  258. list_docid = list(set_docid)
  259. list_docid.sort(key=lambda x:x)
  260. list_docid_str = []
  261. for _docid in list_docid:
  262. list_docid_str.append(str(_docid))
  263. return ",".join(list_docid_str)