Forráskód Böngészése

eas接口部署,统计口径修改,实体替换规则修改

luojiehua 3 éve
szülő
commit
f4f969d952

+ 1 - 13
.idea/sonarlint/issuestore/index.pb

@@ -17,8 +17,6 @@ O
 BiddingKG/dl/common/nerUtils.py,8\2\82c3c87116c1da9281790ac9c71f57821e9207cf
 H
 BiddingKG/dl/__init__.py,a\c\ac12bb80834a26e34df8eaf4c762410dfcfc0a27
-U
-%BiddingKG/dl/metrics/extractMetric.py,f\e\fed725bbe7e61499dcc542a2cd6279850a62cb79
 W
 'BiddingKG/maxcompute/article_extract.py,1\d\1d533d48614eebe6b6e03d0bf64b381cdf4beca0
 ]
@@ -39,21 +37,11 @@ Q
 !BiddingKG/dl/form/generateData.py,7\e\7e590aa47c1871cc7d75ac844d5769bff50a6e70
 U
 %BiddingKG/maxcompute/attachmentRec.py,b\e\be8f50b8961bc8ae61e105517763f21c707ea3ec
-P
- BiddingKG/maxcompute/cycleRec.py,b\d\bdbd92638e7f5983e655c67b07bb464d62021b36
-P
- BiddingKG/dl/bidway/re_bidway.py,4\b\4bbee1b8e2177ffd4dbcc10e26686b81b38db517
 G
 BiddingKG/dl/test/12.py,5\c\5c99d16b0fcfaac86fa00d720a060d38778939c6
 J
 BiddingKG/dl/form/train.py,9\0\9092cdc516e6529e04ba5c9e245978a9778f1457
 U
 %BiddingKG/maxcompute/extract_check.py,c\b\cb469c6b5dccfb880cb12b739ba36dd30aa17830
-Q
-!BiddingKG/dl/LEGAL_ENTERPRISE.txt,6\8\685bd49ae2f5f0de419c93a217a0e57564d705ab
 U
-%BiddingKG/maxcompute/documentMerge.py,0\2\0281d029b2e1edefc27911179172143779deed49
-Q
-!BiddingKG/maxcompute/evaluates.py,4\b\4bf6acd495095b59143b1620d2f455e5e651a071
-L
-BiddingKG/dl/common/Utils.py,f\4\f4c35e30342829a2fc89108259e28edc0a425cce
+%BiddingKG/maxcompute/documentMerge.py,0\2\0281d029b2e1edefc27911179172143779deed49

+ 8 - 5
BiddingKG/app.py

@@ -7,6 +7,7 @@ Created on 2019年12月3日
 import allspark
 import sys
 import os
+sys.path.append(os.path.dirname(__file__)+"/..")
 os.environ["KERAS_BACKEND"] = "tensorflow"
 import json
 import re
@@ -130,17 +131,17 @@ class MyProcessor(allspark.BaseProcessor):
                 # data_res["success"] = True
                 #return json.dumps(Preprocessing.union_result(codeName, prem)[0][1],cls=MyEncoder,sort_keys=True,indent=4,ensure_ascii=False)
             else:
-                data_res = {"success":False,"msg":"content not passed"}
+                data_res = json.dumps({"success":False,"msg":"content not passed"})
 
 
         except Exception as e:
             traceback.print_exc()
-            data_res = {"success":False,"msg":str(e)}
+            data_res = json.dumps({"success":False,"msg":str(e)})
         # 以json形式返回结果
         #_resp = json.dumps(data_res,cls=MyEncoder)
         #log(str(data["flag"])+str(data))
         log("done for doc_id:%s with result:%s"%(_doc_id,str(data_res)))
-        list_result.append(json.dumps(data_res,ensure_ascii=False))
+        list_result.append(data_res)
 
     def initialize(self):
         """ load module, executed once at the start of the service
@@ -197,10 +198,12 @@ class MyProcessor(allspark.BaseProcessor):
 if __name__ == '__main__':
     # paramter worker_threads indicates concurrency of processing
     #本地运行
-    allspark.default_properties().put("rpc.keepalive", 60000)
-    runner = MyProcessor(worker_threads=5,worker_processes=1,endpoint="0.0.0.0:15030")
+    allspark.default_properties().put("rpc.keepalive", 120000)
 
 
+    runner = MyProcessor(worker_threads=5,worker_processes=1,endpoint="0.0.0.0:15030")
     #PAI平台运行
     # runner = MyProcessor()
+
+
     runner.run()

+ 5 - 4
BiddingKG/dl/entityLink/entityLink.py

@@ -38,7 +38,7 @@ def jaccard_score(source,target):
 
 
 def get_place_list():
-    path = os.path.abspath(__file__) + '/../../place_info.csv'
+    path = os.path.dirname(__file__) + '/../place_info.csv'
     place_df = pd.read_csv(path)
 
     place_list = []
@@ -66,9 +66,10 @@ def get_place_list():
 
 
 place_list = get_place_list()
+place_pattern = "|".join(place_list)
 
 
-def link_entitys(list_entitys,on_value=0.8):
+def link_entitys(list_entitys,on_value=0.81):
     for list_entity in list_entitys:
         range_entity = []
         for _entity in list_entity:
@@ -82,7 +83,7 @@ def link_entitys(list_entitys,on_value=0.8):
                 # 2021/5/21 update: 两个实体标签互斥(一个是招标人、一个是代理人)且entity_text不相等时,跳过
                 if _entity.entity_text != _ent.entity_text and _entity.label != _ent.label and _entity.label in [0,1] and _ent.label in [0, 1]:
                     continue
-                _score = jaccard_score(re.sub("股份|责任|有限|公司","",_entity.entity_text), re.sub("股份|责任|有限|公司","",_ent.entity_text))
+                _score = jaccard_score(re.sub("%s|%s"%("股份|责任|有限|公司",place_pattern),"",_entity.entity_text), re.sub("%s|%s"%("股份|责任|有限|公司",place_pattern),"",_ent.entity_text))
                 if _entity.entity_text!=_ent.entity_text and _score>=on_value:
                     _entity.linked_entitys.append(_ent)
                     _ent.linked_entitys.append(_entity)
@@ -123,7 +124,7 @@ def link_entitys(list_entitys,on_value=0.8):
 
 
 def getEnterprisePath():
-    filename = "../LEGAL_ENTERPRISE.txt"
+    filename = "LEGAL_ENTERPRISE.txt"
     real_path = getFileFromSysPath(filename)
     if real_path is None:
         real_path = filename

+ 5 - 5
BiddingKG/dl/interface/extract.py

@@ -52,11 +52,11 @@ def predict(doc_id,text,title="",page_time="",**kwargs):
     cost_time["preprocess"] = round(time.time()-start_time,2)
     cost_time.update(_cost_time)
 
-    for list_entity in list_entitys:
-        for _entity in list_entity:
-            log("type:%s,text:%s,label:%s,values:%s,sentence:%s,begin_index:%s,end_index:%s"%
-                (str(_entity.entity_type),str(_entity.entity_text),str(_entity.label),str(_entity.values),str(_entity.sentence_index),
-                 str(_entity.begin_index),str(_entity.end_index)))
+    # for list_entity in list_entitys:
+    #     for _entity in list_entity:
+    #         log("type:%s,text:%s,label:%s,values:%s,sentence:%s,begin_index:%s,end_index:%s"%
+    #             (str(_entity.entity_type),str(_entity.entity_text),str(_entity.label),str(_entity.values),str(_entity.sentence_index),
+    #              str(_entity.begin_index),str(_entity.end_index)))
 
     #依赖句子顺序
     start_time = time.time() # 公告类型/生命周期提取

+ 49 - 3
BiddingKG/dl/metrics/extractMetric.py

@@ -2,9 +2,38 @@
 import psycopg2
 from BiddingKG.dl.interface.extract import predict,test
 from BiddingKG.dl.common.Utils import getUnifyMoney,timeFormat
+from BiddingKG.dl.entityLink.entityLink import jaccard_score
 import re
 import json
 
+
+bidway_dict = {'询价': '询价', '竞争性谈判': '竞争性谈判',
+               '公开比选': '其他', '国内竞争性磋商': '竞争性磋商',
+               '招标方式:t公开': '公开招标', '竞价': '竞价',
+               '竞标': '竞价', '电子竞价': '竞价',
+               '电子书面竞投': '竞价', '单一来源': '单一来源',
+               '网上竞价': '竞价', '公开招标': '公开招标',
+               '询比': '询价', '定点采购': '其他',
+               '招标方式:■公开': '公开招标', '交易其他,付款其他': '其他',
+               '竞争性评审': '竞争性磋商', '公开招租': '其他', '\\N': '',
+               '比选': '其他', '比质比价': '其他', '分散采购': '其他',
+               '内部邀标': '邀请招标', '邀请招标': '邀请招标',
+               '网上招标': '公开招标', '非定向询价': '询价',
+               '网络竞价': '竞价', '公开询价': '询价',
+               '定点采购议价': '其他', '询单': '询价',
+               '网上挂牌': '其他', '网上直购': '其他',
+               '定向询价': '询价', '采购方式:公开': '公开招标',
+               '磋商': '竞争性磋商', '公开招投标': '公开招标',
+               '招标方式:√公开': '公开招标', '公开选取': '公开招标',
+               '网上电子投标': '公开招标', '公开竞谈': '竞争性谈判',
+               '竞争性磋商': '竞争性磋商', '采购方式:邀请': '邀请招标',
+               '公开竞价': '竞价', '其他': '其他', '公开招募': '其他',
+               '网上询价': '询价'}
+# bidway名称统一规范
+def bidway_integrate(bidway):
+    integrate_name = bidway_dict.get(bidway,"其他")
+    return integrate_name
+
 class ExtractMetric():
 
     def __init__(self):
@@ -91,6 +120,8 @@ class ExtractMetric():
                     else:
                         _t = v.get("text")
                     dict_result[v.get("type")] = _t
+                if v.get("type")=="bidway":
+                    dict_result[v.get("type")] = bidway_integrate(v.get("text"))
             _split = v.get("type").split("_")
             if len(_split)>1:
                 if _split[1]=="tenderee":
@@ -191,7 +222,7 @@ class ExtractMetric():
             _user = _payroll[2]
             doc_count = _payroll[3]
             print(_user,_begin_time,_end_time,doc_count)
-            _sql = "select document_id,value from brat_bratannotation where document_id in (select human_identifier from corpus_iedocument where edituser='%s' and to_char(edittime,'yyyy-mm-dd')>='%s' and to_char(edittime,'yyyy-mm-dd')<='%s' limit 100)  order by document_id"%(_user,_begin_time,_end_time)
+            _sql = "select document_id,value from brat_bratannotation where document_id in (select human_identifier from corpus_iedocument where edituser='%s' and to_char(edittime,'yyyy-mm-dd')>='%s' and to_char(edittime,'yyyy-mm-dd')<='%s' limit 10)  order by document_id"%(_user,_begin_time,_end_time)
             cursor.execute(_sql)
             rows = cursor.fetchall()
             if len(rows)>0:
@@ -224,6 +255,7 @@ class ExtractMetric():
     def extractFromInterface(self,content):
         return json.loads(test("",content))
 
+
     def getDiff(self,_inter,_inter2):
         _dict = {}
         for k in ["code","product","person_review"]:
@@ -238,7 +270,14 @@ class ExtractMetric():
             _k2 = _inter2.get(k,"")
             len_k1 = 0 if _k1=="" else 1
             len_k2 = 0 if _k2=="" else 1
-            len_union = 1 if _k1==_k2 and len_k1==1 else 0
+            if k in ["name","serviceTime"]:
+                _score = jaccard_score(_k1,_k2)
+                if len_k1 and len_k2 and _score>0.9:
+                    len_union = 1
+                else:
+                    len_union = 0
+            else:
+                len_union = 1 if _k1==_k2 and len_k1==1 else 0
             _dict["%s_inter"%k] = len_k1
             _dict["%s_inter2"%k] = len_k2
             _dict["%s_union"%k] = len_union
@@ -296,7 +335,14 @@ class ExtractMetric():
             k_other = "%s_%s"%(base_key,k2)
             _dict[k] = len(v)
             _dict[k_other] = len(dict_project.get(k_other,[]))
-            _dict["%s_union"%base_key] = len(set(v)&set(dict_project.get(k_other,[])))
+            if base_key=="tenderee":
+                _dict["%s_union"%base_key] = 0
+                if _dict[k]>0 and _dict[k_other]>0:
+                    _score = jaccard_score(dict_project.get(k),dict_project.get(k_other))
+                    if _score>0.9:
+                        _dict["%s_union"%base_key] = 1
+            else:
+                _dict["%s_union"%base_key] = len(set(v)&set(dict_project.get(k_other,[])))
             set_k.add(base_key)
         print("=========================")
         print(_inter)

+ 3 - 0
BiddingKG/dl/test/test4.py

@@ -34,6 +34,9 @@ def test(name,content):
     _resp = requests.post(_url, json=user, headers=myheaders, verify=True)
     # _resp = requests.post("http://192.168.2.102:15000" + '/article_extract', json=user, headers=myheaders, verify=True)
     resp_json = _resp.content.decode("utf-8")
+    print("===",json.loads(resp_json))
+
+    print("====",json.dumps(json.loads(resp_json)))
     print(resp_json)
     return resp_json
 

+ 5 - 5
BiddingKG/app.json → BiddingKG/extract.app.json

@@ -1,11 +1,11 @@
 {
   "generate_token": "true",
   "metadata": {
-    "cpu": 5,
-    "instance": 5,
-    "memory": 20000,
+    "cpu": 7,
+    "instance": 4,
+    "memory": 18000,
     "region": "cn-hangzhou",
-    "resource": "EAS-SrhheASr495",
+    "resource": "eas-r-9oq7xupatg8yoiyuvk",
     "rpc": {
       "batching": "true",
       "keepalive": 60000,
@@ -14,7 +14,7 @@
   },
   "workers":7,
   "name": "content_extract",
-  "processor_entry": "./app.py",
+  "processor_entry": "./BiddingKG/app.py",
   "processor_path": "oss://eas-model-hangzhou/1255640119316927/BiddingKG_eas.zip",
   "data_image": "registry-vpc.cn-hangzhou.aliyuncs.com/bxkc/eas-service:latest",
   "processor_type": "python",

+ 11 - 1
BiddingKG/maxcompute/evaluates.py

@@ -160,6 +160,7 @@ class Extract(BaseUDTF):
         self.deal_process = Process(target=self.f_queue_process,args=(self.task_queue,self.result_queue))
         self.deal_process.start()
         import numpy as np
+        self.last_timeout = False
 
 
         global predictor,Entitys,getAttributes,entityLink,json,MyEncoder,Preprocessing,MyEncoder,np,predict
@@ -202,16 +203,25 @@ class Extract(BaseUDTF):
 
             try:
                 _timeout = 60*4
+                if self.last_timeout:
+                    _timeout += 60*2
+                    self.last_timeout = False
                 if not self.deal_process.is_alive():
                     log("deal process is down")
                     self.task_queue = Queue()
                     self.deal_process = Process(target=self.f_queue_process,args=(self.task_queue,self.result_queue))
                     self.deal_process.start()
-                    _timeout += 60*4
+                    _timeout += 60*2
                 log("putting item to task_queue with docid:%s"%(str(_doc_id)))
                 self.task_queue.put(_item)
                 result_json = self.result_queue.get(timeout=_timeout)
                 self.forward(page_time,int(_doc_id),result_json)
             except Exception as e:
                 log("dealing docid %s failed by timeout"%(str(_doc_id)))
+                self.last_timeout = True
                 self.deal_process.kill()
+                time.sleep(5)
+                self.task_queue = Queue()
+                self.deal_process = Process(target=self.f_queue_process,args=(self.task_queue,self.result_queue))
+                self.deal_process.start()
+

+ 683 - 0
BiddingKG/maxcompute/exportdata.py

@@ -0,0 +1,683 @@
+#coding=utf-8
+# evaluate为该方法的入口函数,必须用这个名字
+
+from odps.udf import annotate
+from odps.distcache import get_cache_archive
+from odps.distcache import get_cache_file
+from odps.udf import BaseUDTF
+from odps.udf import BaseUDAF
+from odps.distcache import get_cache_archive
+from odps.distcache import get_cache_file
+
+
+# 配置pandas依赖包
+def include_package_path(res_name):
+    import os, sys
+    archive_files = get_cache_archive(res_name)
+    dir_names = sorted([os.path.dirname(os.path.normpath(f.name)) for f in archive_files
+                        if '.dist_info' not in f.name], key=lambda v: len(v))
+    sys.path.append(dir_names[0])
+
+    return os.path.dirname(dir_names[0])
+
+# 可能出现类似RuntimeError: xxx has been blocked by sandbox
+# 这是因为包含C的库,会被沙盘block,可设置set odps.isolation.session.enable = true
+def include_file(file_name):
+    import os, sys
+    so_file = get_cache_file(file_name)
+    sys.path.append(os.path.dirname(os.path.abspath(so_file.name)))
+
+def include_so(file_name):
+    import os, sys
+    so_file = get_cache_file(file_name)
+
+    with open(so_file.name, 'rb') as fp:
+        content=fp.read()
+        so = open(file_name, "wb")
+        so.write(content)
+        so.flush()
+        so.close()
+
+#初始化业务数据包,由于上传限制,python版本以及archive解压包不统一等各种问题,需要手动导入
+def init_env(list_files,package_name):
+    import os,sys
+
+    if len(list_files)==1:
+        so_file = get_cache_file(list_files[0])
+        cmd_line = os.path.abspath(so_file.name)
+        os.system("unzip -o %s -d %s"%(cmd_line,package_name))
+    elif len(list_files)>1:
+        cmd_line = "cat"
+        for _file in list_files:
+            so_file = get_cache_file(_file)
+            cmd_line += " "+os.path.abspath(so_file.name)
+        cmd_line += " > temp.zip"
+        os.system(cmd_line)
+        os.system("unzip -o temp.zip -d %s"%(package_name))
+    # os.system("rm -rf %s/*.dist-info"%(package_name))
+    # return os.listdir(os.path.abspath("local_package"))
+    # os.system("echo export LD_LIBRARY_PATH=%s >> ~/.bashrc"%(os.path.abspath("local_package")))
+    # os.system("source ~/.bashrc")
+    sys.path.insert(0,os.path.abspath(package_name))
+
+    # sys.path.append(os.path.join(os.path.abspath("local_package"),"interface_real"))
+
+def load_project():
+    start_time = time.time()
+    init_env(["BiddingKG.zip.env.baseline"],str(uuid.uuid4()))
+    # init_env(["BiddingKG.zip.env.backup"],str(uuid.uuid4()))
+    logging.info("init biddingkg.zip.env.line cost %d"%(time.time()-start_time))
+
+def load_vector():
+    start_time = time.time()
+    init_env(["wiki_128_word_embedding_new.vector.env"],".")
+    logging.info("init wiki_128_word_embedding_new cost %d"%(time.time()-start_time))
+
+    start_time = time.time()
+    init_env(["enterprise.zip.env"],".")
+    # init_env(["LEGAL_ENTERPRISE.zip.env"],".")
+    logging.info("init legal_enterprise.zip.env cost %d"%(time.time()-start_time))
+
+    start_time = time.time()
+    init_env(["so.env"],".")
+    logging.info("init so.env cost %d"%(time.time()-start_time))
+
+def load_py():
+    start_time = time.time()
+    # self.out = init_env(["envs_py37.zip.env"],str(uuid.uuid4()))
+    include_package_path("envs_py37.env.zip")
+    logging.info("init envs_py37 cost %d"%(time.time()-start_time))
+
+def multiLoadEnv():
+    load_project()
+    load_vector()
+    load_py()
+
+import json
+class MyEncoder(json.JSONEncoder):
+
+    def default(self, obj):
+        if isinstance(obj, np.ndarray):
+            return obj.tolist()
+        elif isinstance(obj, bytes):
+            return str(obj, encoding='utf-8')
+        elif isinstance(obj, (np.float_, np.float16, np.float32,
+                              np.float64)):
+            return float(obj)
+        elif isinstance(obj,(np.int64)):
+            return int(obj)
+        return json.JSONEncoder.default(self, obj)
+
+
+@annotate("string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string->string")
+class f_json_extract_online(BaseUDTF):
+
+    def __init__(self):
+
+        import uuid
+        global uuid
+
+        import logging
+        import datetime
+        import numpy as np
+
+
+
+        global json,MyEncoder,time,log,MyEncoder,np
+
+
+    def process(self,page_time,doctitle,
+                tenderee,tenderee_contact,tenderee_phone,agency,
+                agency_contact,agency_phone,sub_docs_json,project_code,
+                project_name,product,time_bidclose,time_bidopen,time_release,
+                moneysource,person_review,bidway,punish,serviceTime):
+        _dict = {}
+        _dict["code"] = project_code if project_code is not None else ""
+        _dict["name"] = project_name if project_name is not None else ""
+        if product is not None and product!="":
+            _dict["product"] = product.split(",")
+        else:
+            _dict["product"] = []
+        _dict["time_bidclose"] = time_bidclose if time_bidclose is not None else ""
+        _dict["time_bidopen"] = time_bidopen if time_bidopen is not None else ""
+        _dict["time_release"] = time_release if time_release is not None else ""
+        _dict["moneysource"] = moneysource if moneysource is not None else ""
+        if person_review not in (None,''):
+            _dict["person_review"] = person_review.split(",")
+        else:
+            _dict["person_review"] = []
+        _dict["bidway"] = bidway if bidway is not None else ""
+        _dict["serviceTime"] = serviceTime if serviceTime is not None else ""
+        if punish not in (None,''):
+            _punish = json.loads(punish)
+        else:
+            _punish = {}
+        for k,v in _punish.items():
+            _dict[k] = v
+
+        if sub_docs_json not in (None,''):
+            _docs = json.loads(sub_docs_json)
+        else:
+            _docs = [{}]
+        set_comp_contact = set()
+        if tenderee not in (None,"") and tenderee_contact not in (None,""):
+            set_comp_contact.add("%s-%s-%s-%s"%("tenderee",tenderee,tenderee_contact,tenderee_phone))
+        if agency not in (None,"") and agency_contact not in (None,""):
+            set_comp_contact.add("%s-%s-%s-%s"%("agency",agency,agency_contact,agency_phone))
+        set_pack_comp = set()
+        if tenderee not in (None,""):
+            set_pack_comp.add("%s-%s-%s"%("Project","tenderee",tenderee))
+        if agency not in (None,""):
+            set_pack_comp.add("%s-%s-%s"%("Project","agency",agency))
+        set_pack_money = set()
+        for _d in _docs:
+            if len(_d.keys())>0:
+                sub_project_name = _d.get("sub_project_name","Project")
+                bidding_budget = float(_d.get("bidding_budget",0))
+                win_tenderer = _d.get("win_tenderer","")
+                win_bid_price = float(_d.get("win_bid_price",0))
+                win_tenderer_manager = _d.get("win_tenderer_manager","")
+                win_tenderer_phone = _d.get("win_tenderer_phone","")
+                second_tenderer = _d.get("second_tenderer","")
+                second_bid_price = float(_d.get("second_bid_price",0))
+                second_tenderer_manager = _d.get("second_tenderer_manager","")
+                second_tenderer_phone = _d.get("second_tenderer_phone","")
+                third_tenderer = _d.get("third_tenderer","")
+                third_bid_price = float(_d.get("third_bid_price",0))
+                third_tenderer_manager = _d.get("third_tenderer_manager","")
+                third_tenderer_phone = _d.get("third_tenderer_phone","")
+                if win_tenderer not in (None,"") and win_tenderer_manager not in (None,""):
+                    set_comp_contact.add("%s-%s-%s-%s"%("win_tenderee",win_tenderer,win_tenderer_manager,win_tenderer_phone))
+                if second_tenderer not in (None,"") and second_tenderer_manager not in (None,""):
+                    set_comp_contact.add("%s-%s-%s-%s"%("second_tenderer",second_tenderer,second_tenderer_manager,second_tenderer_phone))
+                if third_tenderer not in (None,"") and third_tenderer_manager not in (None,""):
+                    set_comp_contact.add("%s-%s-%s-%s"%("third_tenderer",third_tenderer,third_tenderer_manager,third_tenderer_phone))
+
+                if win_tenderer not in (None,""):
+                    set_pack_comp.add("%s-%s-%s"%(sub_project_name,"win_tenderer",win_tenderer))
+                if second_tenderer not in (None,""):
+                    set_pack_comp.add("%s-%s-%s"%(sub_project_name,"second_tenderer",second_tenderer))
+                if third_tenderer not in (None,""):
+                    set_pack_comp.add("%s-%s-%s"%(sub_project_name,"third_tenderer",third_tenderer))
+
+                if bidding_budget>0:
+                    set_pack_money.add("%s-%s-%2f"%(sub_project_name,"bidding_budget",bidding_budget))
+                if win_bid_price>0:
+                    set_pack_money.add("%s-%s-%2f"%(sub_project_name,"win_tenderer",win_bid_price))
+                if second_bid_price>0:
+                    set_pack_money.add("%s-%s-%2f"%(sub_project_name,"second_tenderer",second_bid_price))
+                if third_bid_price>0:
+                    set_pack_money.add("%s-%s-%2f"%(sub_project_name,"third_tenderer",third_bid_price))
+        _dict["set_comp_contact"] = list(set_comp_contact)
+        _dict["set_pack_comp"] = list(set_pack_comp)
+        _dict["set_pack_money"] = list(set_pack_money)
+        self.forward(json.dumps(_dict,cls=MyEncoder,sort_keys=True,indent=4,ensure_ascii=False))
+
+@annotate("string,string->string")
+class f_compair_extract(object):
+
+    def __init__(self):
+        import logging
+        import re
+        import json
+        global logging,re,json
+        logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+
+    def evaluate(self, json_online,json_result):
+        dict_online = json.loads(json_online)
+        dict_result = json.loads(json_result)
+
+        logging.info(json_online)
+
+        dict_test = {}
+        set_comp_contact = set()
+        set_pack_comp = set()
+        set_pack_money = set()
+        logging.info("1")
+        for k,v in dict_result.items():
+            if k in ["bidway","moneysource","time_bidclose","serviceTime","time_bidopen","time_release","name"]:
+                dict_test[k] = v
+            elif k in ["code"]:
+                if len(v)>0:
+                    dict_test["code"] = v[0]
+                else:
+                    dict_test["code"] = ""
+            elif k in ["person_review","product"]:
+                list_temp = v
+                list_temp.sort(key=lambda x:x)
+                dict_test[k] = list_temp
+            elif k in ["punish"]:
+                for k1,v1 in v.items():
+                    dict_test[k1] = v1
+            elif k in ["prem"]:
+                for _pack,_prem in v.items():
+                    bidding_budget = float(_prem.get("tendereeMoney",0))
+                    role_lists = _prem.get("roleList",[])
+                    if bidding_budget>0:
+                        set_pack_money.add("%s-%s-%2f"%(_pack,"bidding_budget",bidding_budget))
+                    for _role in role_lists:
+                        role_type = _role[0]
+                        role_name = _role[1]
+                        role_money = 0 if _role[2]=="" else float(_role[2])
+                        contact_list = _role[3]
+                        for _person,_phone in contact_list:
+                            set_comp_contact.add("%s-%s-%s-%s"%(role_type,role_name,_person,_phone))
+                        set_pack_comp.add("%s-%s-%s"%(_pack,role_type,role_name))
+                        if role_money >0:
+                            set_pack_money.add("%s-%s-%2f"%(_pack,role_type,role_money))
+        dict_test["set_comp_contact"] = list(set_comp_contact)
+        dict_test["set_pack_comp"] = list(set_pack_comp)
+        dict_test["set_pack_money"] = list(set_pack_money)
+
+        logging.info(dict_test)
+        logging.info("2")
+        dict_compair = {}
+        set_keys_online = set(dict_online.keys())
+        set_keys_test = set(dict_test.keys())
+        union_keys = list(set_keys_online|set_keys_test)
+        logging.info(str(union_keys))
+        for _key in union_keys:
+            logging.info(_key)
+            v_online = dict_online.get(_key,"")
+            v_test = dict_test.get(_key,"")
+            logging.info(v_online)
+            logging.info(v_test)
+            if isinstance(v_online,list) or isinstance(v_test,list):
+                logging.info("3")
+                if v_online=="":
+                    v_online = []
+                if v_test=="":
+                    v_test = []
+                v_online.sort(key=lambda x:x)
+                v_test.sort(key=lambda x:x)
+                s_online = set(v_online)
+                s_test = set(v_test)
+                diff_count = len(s_online-s_test)+len(s_test-s_online)
+                dict_compair[_key+"_diff"] = diff_count
+                dict_compair[_key+"_online"] = v_online
+                dict_compair[_key+"_test"] = v_test
+            elif isinstance(v_online,str):
+                logging.info("4")
+                if v_online==v_test:
+                    diff_count = 0
+                else:
+                    diff_count = 1
+                dict_compair[_key+"_diff"] = diff_count
+                dict_compair[_key+"_online"] = v_online
+                dict_compair[_key+"_test"] = v_test
+
+        return json.dumps(dict_compair,sort_keys=True,indent=4,ensure_ascii=False)
+
+import hashlib
+def getMD5(sourceHtml):
+    if sourceHtml is not None and len(sourceHtml)>0:
+        if isinstance(sourceHtml,str):
+            bs = sourceHtml.encode()
+        elif isinstance(sourceHtml,bytes):
+            bs = sourceHtml
+        else:
+            return ""
+        md5 = hashlib.md5()
+        md5.update(bs)
+        return md5.hexdigest()
+    return ""
+
+def getFingerprint(sourceHtml):
+    md5 = getMD5(sourceHtml)
+    if md5!="":
+        _fingerprint = "md5=%s"%(md5)
+    else:
+        _fingerprint = ""
+    return _fingerprint
+
+@annotate("string,string->string")
+class f_getFingerprint(object):
+
+    def __init__(self):
+        import logging
+        import re
+        import json
+        global logging,re,json
+        logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+
+    def evaluate(self, doctitle,dochtmlcon):
+        fingerprint = getFingerprint(doctitle+dochtmlcon)
+        return fingerprint
+
+@annotate('bigint,string,string,string,string,string,string,string,string->string')
+class f_check_dumplicate(BaseUDAF):
+    '''
+    去重合并后重新判断,组内个数大于5时,dottitle、tenderee、win_tenderer、bidding_budget组内只能有一个取值
+    组内个数小于等于5时,tenderee、win_tenderer、bidding_budget组内只能有一个取值
+    '''
+    def __init__(self):
+        import logging
+        import json,re
+        global json,logging,re
+        logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+
+    def new_buffer(self):
+        return [list()]
+
+    def iterate(self, buffer,docid,doctitle,project_code,project_name,tenderee,agency,win_tenderer,bidding_budget,win_bid_price):
+        buffer[0].append({"docid":docid,"doctitle":doctitle,"project_code":project_code,"project_name":project_name,
+                          "tenderee":tenderee,"agency":agency,"win_tenderer":win_tenderer,"bidding_budget":bidding_budget,"win_bid_price":win_bid_price})
+
+    def merge(self, buffer, pbuffer):
+        buffer[0].extend(pbuffer[0])
+
+    def terminate(self, buffer):
+        list_group = []
+        list_group.append(buffer[0])
+        return json.dumps(list_group,ensure_ascii=False)
+
+
+@annotate('string -> bigint,bigint,string,string,string,string,string,string,string,string')
+class f_check_dumplicate_group(BaseUDTF):
+    '''
+    从最后的结果中获取组
+    '''
+
+    def __init__(self):
+        import logging
+        import json
+        global json,logging
+        logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+
+    def process(self,list_group):
+        if list_group is not None:
+            final_group = json.loads(list_group)
+            logging.info(list_group)
+            for _groups in final_group:
+                for _group in _groups:
+                    self.forward(_groups[0]["docid"],_group["docid"],_group["doctitle"],_group["project_code"],_group["project_name"],_group["tenderee"],_group["agency"],_group["win_tenderer"],_group["bidding_budget"],_group["win_bid_price"])
+
+@annotate('string->bigint')
+class f_is_contain(BaseUDAF):
+    '''
+    去重合并后重新判断,组内个数大于5时,dottitle、tenderee、win_tenderer、bidding_budget组内只能有一个取值
+    组内个数小于等于5时,tenderee、win_tenderer、bidding_budget组内只能有一个取值
+    '''
+    def __init__(self):
+        import logging
+        import json,re
+        global json,logging,re
+        logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+
+    def new_buffer(self):
+        return [list()]
+
+    def iterate(self, buffer,doctitle):
+        buffer[0].append(doctitle)
+
+    def merge(self, buffer, pbuffer):
+        buffer[0].extend(pbuffer[0])
+
+    def terminate(self, buffer):
+        is_contain = 1
+        list_doctitle = buffer[0]
+        main_doctitle = ""
+        for _doctitle in list_doctitle:
+            if _doctitle in main_doctitle or main_doctitle in _doctitle:
+                if len(_doctitle)>len(main_doctitle):
+                    main_doctitle = _doctitle
+            else:
+                is_contain = 0
+                break
+        return is_contain
+
+
+
+def getSet(list_dict,key):
+    _set = set()
+    for item in list_dict:
+        if key in item:
+            if item[key]!='' and item[key] is not None:
+                if re.search("^[\d\.]+$",item[key]) is not None:
+                    _set.add(str(float(item[key])))
+                else:
+                    _set.add(str(item[key]))
+    return _set
+
+def split_with_time(list_dict,sort_key,timedelta=86400*2):
+    if len(list_dict)>0:
+        if sort_key in list_dict[0]:
+            list_dict.sort(key=lambda x:x[sort_key])
+            list_group = []
+            _begin = 0
+            for i in range(len(list_dict)-1):
+                if abs(list_dict[i][sort_key]-list_dict[i+1][sort_key])<timedelta:
+                    continue
+                else:
+                    _group = []
+                    for j in range(_begin,i+1):
+                        _group.append(list_dict[j])
+                    if len(_group)>1:
+                        list_group.append(_group)
+                    _begin = i + 1
+            if len(list_dict)>1:
+                _group = []
+                for j in range(_begin,len(list_dict)):
+                    _group.append(list_dict[j])
+                if len(_group)>1:
+                    list_group.append(_group)
+            return list_group
+    return [list_dict]
+
+@annotate('bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string->string')
+class f_check_dumplicate_1(BaseUDAF):
+    '''
+    项目编号、中标单位、len(项目编号)>7、中标单位<> ""、合并后非空招标单位数<2、合并后同公告类型非空金额相同
+    '''
+    def __init__(self):
+        import logging
+        import json,re
+        global json,logging,re
+        logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+
+    def new_buffer(self):
+        return [list()]
+
+    def iterate(self, buffer,docid,page_time_stamp,set_limit_column1,set_limit_column2,set_limit_column3,set_limit_column4,contain_column,doctitle,project_code,project_name,tenderee,agency,win_tenderer,bidding_budget,win_bid_price):
+        buffer[0].append({"docid":docid,"page_time_stamp":page_time_stamp,"set_limit_column1":set_limit_column1,
+                          "set_limit_column2":set_limit_column2,"set_limit_column3":set_limit_column3,"set_limit_column4":set_limit_column4,
+                          "contain_column":contain_column,"doctitle":doctitle,"project_code":project_code,"project_name":project_name,
+                          "tenderee":tenderee,"agency":agency,"win_tenderer":win_tenderer,"bidding_budget":bidding_budget,"win_bid_price":win_bid_price})
+
+    def merge(self, buffer, pbuffer):
+        buffer[0].extend(pbuffer[0])
+
+    def terminate(self, buffer):
+        list_split = split_with_time(buffer[0],"page_time_stamp")
+        list_group = []
+        for _split in list_split:
+            flag = True
+            keys = ["set_limit_column1","set_limit_column2","set_limit_column3","set_limit_column4"]
+            for _key in keys:
+                logging.info(_key+str(getSet(_split,_key)))
+                if len(getSet(_split,_key))>1:
+                    flag = False
+                    break
+
+            MAX_CONTAIN_COLUMN = None
+            #判断组内每条公告是否包含
+            if flag:
+                for _d in _split:
+                    contain_column = _d["contain_column"]
+                    if contain_column is not None and contain_column !="":
+                        if MAX_CONTAIN_COLUMN is None:
+                            MAX_CONTAIN_COLUMN = contain_column
+                        else:
+                            if len(MAX_CONTAIN_COLUMN)<len(contain_column):
+                                if contain_column.find(MAX_CONTAIN_COLUMN)==-1:
+                                    flag = False
+                                    break
+                                MAX_CONTAIN_COLUMN = contain_column
+                            else:
+                                if MAX_CONTAIN_COLUMN.find(contain_column)==-1:
+                                    flag = False
+                                    break
+            if flag:
+                if len(_split)>1:
+                    list_group.append(_split)
+        return json.dumps(list_group)
+
+
+@annotate('string->string,string')
+class f_splitAttach(BaseUDTF):
+
+    def __init__(self):
+        import logging
+        import time
+        global time,logging
+        logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+        logging.info("start init env")
+        load_py()
+        logging.info("init env done")
+        from bs4 import BeautifulSoup
+        global BeautifulSoup
+
+    def process(self,dochtmlcon):
+        doctextcon = ""
+        attachmenttextcon = ""
+
+        if dochtmlcon is not None:
+            _soup = BeautifulSoup(dochtmlcon,"lxml")
+
+            _find = _soup.find("div",attrs={"class":"richTextFetch"})
+            if _find is not None:
+                attachmenttextcon = _find.get_text()
+                _find.decompose()
+            doctextcon = _soup.get_text()
+        self.forward(doctextcon,attachmenttextcon)
+
+def getTitleFromHtml(filemd5,_html):
+    _soup = BeautifulSoup(_html,"lxml")
+
+    _find = _soup.find("a",attrs={"data":filemd5})
+    _title = ""
+    if _find is not None:
+        _title = _find.get_text()
+    return _title
+
+def getSourceLinkFromHtml(filemd5,_html):
+    _soup = BeautifulSoup(_html,"lxml")
+
+    _find = _soup.find("a",attrs={"filelink":filemd5})
+    filelink = ""
+    if _find is None:
+        _find = _soup.find("img",attrs={"filelink":filemd5})
+        if _find is not None:
+            filelink = _find.attrs.get("src","")
+    else:
+        filelink = _find.attrs.get("href","")
+    return filelink
+
+def turnAttachmentsFromHtml(dochtmlcon,page_attachments):
+    new_attachments = json.loads(page_attachments)
+    for _atta in new_attachments:
+        fileMd5 = _atta.get("fileMd5")
+        if fileMd5 is not None:
+            fileTitle = getTitleFromHtml(fileMd5,dochtmlcon)
+            fileLink = getSourceLinkFromHtml(fileMd5,dochtmlcon)
+            _atta["fileTitle"] = fileTitle
+            _atta["fileLink"] = fileLink
+    print(new_attachments)
+    return json.dumps(new_attachments,ensure_ascii=False)
+
+@annotate('string,string->string')
+class f_turnPageattachments(object):
+
+
+    def evaluate(self,dochtmlcon,page_attachments):
+        new_page_attachments = None
+        if page_attachments is not None:
+            if "fileMd5" in page_attachments:
+                new_page_attachments = turnAttachmentsFromHtml(dochtmlcon,page_attachments)
+        return new_page_attachments
+
+@annotate("string->string")
+class f_getRoles(BaseUDTF):
+
+    def __init__(self):
+        self.columns = ["win_tenderer","second_tenderer","third_tenderer"]
+        pass
+
+    # bidway名称统一规范
+    def bidway_integrate(self,sub_docs_json):
+        if sub_docs_json is not None:
+            _docs = json.loads(sub_docs_json)
+            for _doc in _docs:
+                for _c in self.columns:
+                    if _doc.get(_c) is not None:
+                        self.forward(_doc.get(_c))
+
+    def process(self,sub_docs_json):
+        self.bidway_integrate(sub_docs_json)
+
+@annotate("string->string")
+class turn_bidway(BaseUDTF):
+
+    def __init__(self):
+        self.bidway_dict = {'询价': '询价', '竞争性谈判': '竞争性谈判',
+                       '公开比选': '其他', '国内竞争性磋商': '竞争性磋商',
+                       '招标方式:t公开': '公开招标', '竞价': '竞价',
+                       '竞标': '竞价', '电子竞价': '竞价',
+                       '电子书面竞投': '竞价', '单一来源': '单一来源',
+                       '网上竞价': '竞价', '公开招标': '公开招标',
+                       '询比': '询价', '定点采购': '其他',
+                       '招标方式:■公开': '公开招标', '交易其他,付款其他': '其他',
+                       '竞争性评审': '竞争性磋商', '公开招租': '其他', '\\N': '',
+                       '比选': '其他', '比质比价': '其他', '分散采购': '其他',
+                       '内部邀标': '邀请招标', '邀请招标': '邀请招标',
+                       '网上招标': '公开招标', '非定向询价': '询价',
+                       '网络竞价': '竞价', '公开询价': '询价',
+                       '定点采购议价': '其他', '询单': '询价',
+                       '网上挂牌': '其他', '网上直购': '其他',
+                       '定向询价': '询价', '采购方式:公开': '公开招标',
+                       '磋商': '竞争性磋商', '公开招投标': '公开招标',
+                       '招标方式:√公开': '公开招标', '公开选取': '公开招标',
+                       '网上电子投标': '公开招标', '公开竞谈': '竞争性谈判',
+                       '竞争性磋商': '竞争性磋商', '采购方式:邀请': '邀请招标',
+                       '公开竞价': '竞价', '其他': '其他', '公开招募': '其他',
+                       '网上询价': '询价'}
+    # bidway名称统一规范
+    def bidway_integrate(self,bidway):
+        integrate_name = self.bidway_dict.get(bidway,"其他")
+        return integrate_name
+
+    def process(self,bidway):
+        new_bidway =self.bidway_integrate(bidway)
+        self.forward(new_bidway)
+
+
+@annotate('string,double->string')
+class f_getLimit(BaseUDAF):
+    '''
+    去重合并后重新判断,组内个数大于5时,dottitle、tenderee、win_tenderer、bidding_budget组内只能有一个取值
+    组内个数小于等于5时,tenderee、win_tenderer、bidding_budget组内只能有一个取值
+    '''
+    def __init__(self):
+        import logging
+        import json,re
+        global json,logging,re
+        logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+
+    def new_buffer(self):
+        return [list()]
+
+    def iterate(self, buffer,win_tenderee,win_bid_price):
+        buffer[0].append({"win_tenderee":win_tenderee,
+                          "win_bid_price":win_bid_price})
+
+    def merge(self, buffer, pbuffer):
+        buffer[0].extend(pbuffer[0])
+        buffer[0].sort(key=lambda x:x["win_bid_price"],reverse=True)
+        buffer[0] = buffer[0][:100]
+
+    def terminate(self, buffer):
+
+        buffer[0].sort(key=lambda x:x["win_bid_price"],reverse=True)
+        buffer[0] = buffer[0][:100]
+
+        return json.dumps(buffer[0],ensure_ascii=False)