Эх сурвалжийг харах

同步时mysql连接重置策略,重点项目、实时公告合并

luojiehua 2 жил өмнө
parent
commit
5210e6dd0a

+ 52 - 0
BaseDataMaintenance/common/otsUtils.py

@@ -0,0 +1,52 @@
+
+from tablestore import *
+import traceback
+
+def getRow_ots_primary(row):
+    _dict = dict()
+    if row is None:
+        return _dict
+    for part in row.attribute_columns:
+        _dict[part[0]] = part[1]
+    for part in row.primary_key:
+        _dict[part[0]] = part[1]
+    return _dict
+
+def getRow_ots(rows):
+    list_dict = []
+    for row in rows:
+        _dict = dict()
+        for part in row:
+            for v in part:
+                _dict[v[0]] = v[1]
+        list_dict.append(_dict)
+    return list_dict
+
+def getDocument(_query,ots_client,columns,sort="page_time",table_name="document",table_index="document_index"):
+    try:
+        item = _query.get("query")
+        _limit = _query.get("limit",100)
+        keyword = _query.get("keyword")
+        l_rows = []
+        rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
+                                                                       SearchQuery(item,sort=Sort(sorters=[FieldSort(sort,SortOrder.ASC)]),limit=min(_limit,100),get_total_count=True),
+                                                                       ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
+        dict_row = getRow_ots(rows)
+        for _row in dict_row:
+            _row["keyword"] = keyword
+        l_rows.extend(dict_row)
+        _count = len(dict_row)
+        while next_token:
+            rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
+                                                                           SearchQuery(item,next_token=next_token,limit=min(_limit,100),get_total_count=True),
+                                                                           ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
+            dict_row = getRow_ots(rows)
+            for _row in dict_row:
+                _row["keyword"] = keyword
+            l_rows.extend(dict_row)
+            if _limit and len(l_rows)>=_limit:
+                break
+            _count += len(dict_row)
+    except Exception as e:
+        traceback.print_exc()
+    return l_rows

+ 158 - 24
BaseDataMaintenance/maintenance/dataflow.py

@@ -14,6 +14,7 @@ from BaseDataMaintenance.model.ots.document_tmp import *
 from BaseDataMaintenance.model.ots.attachment import *
 from BaseDataMaintenance.model.ots.attachment import *
 from BaseDataMaintenance.model.ots.document_html import *
 from BaseDataMaintenance.model.ots.document_html import *
 from BaseDataMaintenance.model.ots.document_extract2 import *
 from BaseDataMaintenance.model.ots.document_extract2 import *
+from BaseDataMaintenance.model.ots.project import *
 
 
 import base64
 import base64
 from BaseDataMaintenance.dataSource.interface import getAttachDealInterface,sentMsgToDD
 from BaseDataMaintenance.dataSource.interface import getAttachDealInterface,sentMsgToDD
@@ -28,6 +29,8 @@ from threading import Thread
 import oss2
 import oss2
 from BaseDataMaintenance.maintenance.documentDumplicate import *
 from BaseDataMaintenance.maintenance.documentDumplicate import *
 
 
+from BaseDataMaintenance.common.otsUtils import *
+
 def getSet(list_dict,key):
 def getSet(list_dict,key):
     _set = set()
     _set = set()
     for item in list_dict:
     for item in list_dict:
@@ -1414,10 +1417,12 @@ class Dataflow():
         producer()
         producer()
         comsumer()
         comsumer()
 
 
+
     def merge_document(self,item,status_to=None):
     def merge_document(self,item,status_to=None):
         self.post_extract(item)
         self.post_extract(item)
         docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
         docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
 
 
+
         _d = {"partitionkey":item["partitionkey"],
         _d = {"partitionkey":item["partitionkey"],
               "docid":item["docid"],
               "docid":item["docid"],
               }
               }
@@ -1439,28 +1444,28 @@ class Dataflow():
                 if len(list_data)==1:
                 if len(list_data)==1:
                     dtmp.setValue("merge_uuid",list_data[0]["uuid"],True)
                     dtmp.setValue("merge_uuid",list_data[0]["uuid"],True)
                     print(item["docid"],list_data[0]["uuid"])
                     print(item["docid"],list_data[0]["uuid"])
-        else:
-            list_should_q = []
-            if bidding_budget!="" and project_code!="":
-                _q = BoolQuery(must_queries=[MatchQuery("project_code",project_code),
-                                             TermQuery("bidding_budget",float(bidding_budget))])
-                list_should_q.append(_q)
-            if tenderee!="" and bidding_budget!="" and project_name!="":
-                _q = BoolQuery(must_queries=[MatchQuery("tenderee",tenderee),
-                                             TermQuery("bidding_budget",float(bidding_budget)),
-                                             TermQuery("project_name",project_name)])
-                list_should_q.append(_q)
-            if tenderee!="" and win_bid_price!="" and project_name!="":
-                _q = BoolQuery(must_queries=[MatchQuery("tenderee",tenderee),
-                                             TermQuery("win_bid_price",float(win_bid_price)),
-                                             TermQuery("project_name",project_name)])
-                list_should_q.append(_q)
-                if len(list_should_q)>0:
-                    list_data = self.search_data_by_query(item,list_should_q,100,table_name="project2",table_index="project2_index_formerge",sort_column="tenderee",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=["tenderee","win_tenderer"])
-
-                    if len(list_data)==1:
-                        dtmp.setValue("merge_uuid",list_data[0]["uuid"],True)
-                        print(item["docid"],list_data[0]["uuid"])
+            else:
+                list_should_q = []
+                if bidding_budget!="" and project_code!="":
+                    _q = BoolQuery(must_queries=[MatchQuery("project_code",project_code),
+                                                 TermQuery("bidding_budget",float(bidding_budget))])
+                    list_should_q.append(_q)
+                if tenderee!="" and bidding_budget!="" and project_name!="":
+                    _q = BoolQuery(must_queries=[MatchQuery("tenderee",tenderee),
+                                                 TermQuery("bidding_budget",float(bidding_budget)),
+                                                 TermQuery("project_name",project_name)])
+                    list_should_q.append(_q)
+                if tenderee!="" and win_bid_price!="" and project_name!="":
+                    _q = BoolQuery(must_queries=[MatchQuery("tenderee",tenderee),
+                                                 TermQuery("win_bid_price",float(win_bid_price)),
+                                                 TermQuery("project_name",project_name)])
+                    list_should_q.append(_q)
+                    if len(list_should_q)>0:
+                        list_data = self.search_data_by_query(item,list_should_q,100,table_name="project2",table_index="project2_index_formerge",sort_column="tenderee",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=["tenderee","win_tenderer"])
+
+                        if len(list_data)==1:
+                            dtmp.setValue("merge_uuid",list_data[0]["uuid"],True)
+                            print(item["docid"],list_data[0]["uuid"])
 
 
         return dtmp.getProperties().get("merge_uuid","")
         return dtmp.getProperties().get("merge_uuid","")
         # dtmp.update_row(self.ots_client)
         # dtmp.update_row(self.ots_client)
@@ -2653,6 +2658,131 @@ class Dataflow_dumplicate(Dataflow):
         producer()
         producer()
         comsumer()
         comsumer()
 
 
+    def search_docs(self,list_docids):
+        pass
+
+    def update_projects_by_document(self,docid,projects,action):
+        '''
+        更新projects中对应的document的属性
+        :param docid:
+        :param projects: 项目集合
+        :param action:add/delete add时附加唯一属性,delete时删除唯一属性
+        :return:
+        '''
+
+    def generate_projects_from_document(self,list_docs):
+        '''
+        #通过公告生成projects
+        :param list_docids:
+        :return:
+        '''
+        pass
+
+    def search_projects_with_document(self,list_docids):
+        '''
+        通过docid集合查询对应的projects
+        :param list_docids:
+        :return:
+        '''
+        print("==",list_docids)
+        list_should_q = []
+        for _docid in list_docids:
+            list_should_q.append(TermQuery("docids",_docid))
+        bool_query = BoolQuery(should_queries=list_should_q)
+        _query = {"query":bool_query,"limit":20}
+        list_project_dict = getDocument(_query,self.ots_client,[project_uuid,project_docids,project_zhao_biao_page_time,
+        project_zhong_biao_page_time,
+        project_page_time,
+        project_area,
+        project_province,
+        project_city,
+        project_district,
+        project_info_type,
+        project_industry,
+        project_qcodes,
+        project_project_name,
+        project_project_code,
+        project_project_addr,
+        project_tenderee,
+        project_tenderee_addr,
+        project_tenderee_phone,
+        project_tenderee_contact,
+        project_agency,
+        project_agency_phone,
+        project_agency_contact,
+        project_sub_project_name,
+        project_sub_project_code,
+        project_bidding_budget,
+        project_win_tenderer,
+        project_win_bid_price,
+        project_win_tenderer_manager,
+        project_win_tenderer_phone,
+        project_second_tenderer,
+        project_second_bid_price,
+        project_second_tenderer_manager,
+        project_second_tenderer_phone,
+        project_third_tenderer,
+        project_third_bid_price,
+        project_third_tenderer_manager,
+        project_third_tenderer_phone,
+        project_procurement_system,
+        project_bidway,
+        project_dup_data,
+        project_docid_number,
+        project_dynamics],sort="page_time",table_name="project2",table_index="project2_index")
+        print(list_project_dict)
+
+
+        pass
+
+    def dumplicate_projects(self,list_projects):
+        '''
+        对项目进行去重
+        :return:
+        '''
+        pass
+
+    def merge_projects(self,list_projects):
+        '''
+        对项目进行合并
+        :return:
+        '''
+        pass
+
+    def to_project_json(self,list_projects):
+        pass
+
+
+    def merge_document(self,item,dup_docid,table_name,status_to=None):
+        '''
+        实时项目合并
+        :param item:
+        :param dup_docid:重复的公告集合
+        :param status_to:
+        :return:
+        '''
+        print("1",dup_docid)
+        list_docids = []
+        _docid = item.get(document_tmp_docid)
+        list_docids.append(_docid)
+        if isinstance(dup_docid,list):
+            list_docids.extend(dup_docid)
+
+        list_projects = self.search_projects_with_document(list_docids)
+        if len(list_projects)==0:
+            list_docs = self.search_docs(list_docids)
+            list_projects = self.generate_projects_from_document(list_docs)
+        else:
+            self.update_projects_by_document(_docid,list_projects,"add")
+        list_projects = self.dumplicate_projects(list_projects)
+        list_projects = self.merge_projects(list_projects)
+
+        project_json = self.to_project_json(list_projects)
+
+        return project_json
+
+        pass
+
     def dumplicate_comsumer_handle(self,item,result_queue,ots_client,get_all=False,upgrade=True):
     def dumplicate_comsumer_handle(self,item,result_queue,ots_client,get_all=False,upgrade=True):
         start_time = time.time()
         start_time = time.time()
         self.post_extract(item)
         self.post_extract(item)
@@ -2714,7 +2844,6 @@ class Dataflow_dumplicate(Dataflow):
 
 
         if len(final_list)==0 or best_docid==item.get(document_tmp_docid):
         if len(final_list)==0 or best_docid==item.get(document_tmp_docid):
             dtmp.setValue(document_tmp_save,1,True)
             dtmp.setValue(document_tmp_save,1,True)
-            dtmp.setValue(document_tmp_merge_uuid,self.merge_document(item,flow_dumplicate_status_to),True)
             dmp_docid = ",".join([str(a) for a in list(dup_docid)])
             dmp_docid = ",".join([str(a) for a in list(dup_docid)])
             for _dict in final_list:
             for _dict in final_list:
                 if _dict.get(document_tmp_docid) in dup_docid:
                 if _dict.get(document_tmp_docid) in dup_docid:
@@ -2733,6 +2862,11 @@ class Dataflow_dumplicate(Dataflow):
                 for _dict in final_list:
                 for _dict in final_list:
                     if _dict.get(document_tmp_docid) in dup_docid:
                     if _dict.get(document_tmp_docid) in dup_docid:
                         remove_list.append(_dict)
                         remove_list.append(_dict)
+
+        list_docids = list(dup_docid)
+        list_docids.append(best_docid)
+        dtmp.setValue(document_tmp_projects,self.merge_document(item,list_docids,table_name,flow_dumplicate_status_to),True)
+
         log("save:%s:docid:%d,final_list:%d,rules:%d,best_docid:%s,dmp_docid:%s"%(dtmp.getProperties().get(document_tmp_save),item.get(document_tmp_docid),len(final_list),len(list_rules),str(best_docid),dmp_docid))
         log("save:%s:docid:%d,final_list:%d,rules:%d,best_docid:%s,dmp_docid:%s"%(dtmp.getProperties().get(document_tmp_save),item.get(document_tmp_docid),len(final_list),len(list_rules),str(best_docid),dmp_docid))
         if upgrade:
         if upgrade:
             if table_name=="document_tmp":
             if table_name=="document_tmp":
@@ -2844,4 +2978,4 @@ if __name__ == '__main__':
     # test_attachment_interface()
     # test_attachment_interface()
     df_dump = Dataflow_dumplicate()
     df_dump = Dataflow_dumplicate()
     # df_dump.start_flow_dumplicate()
     # df_dump.start_flow_dumplicate()
-    df_dump.test_dumplicate(263649800)
+    df_dump.test_dumplicate(266576922)

+ 17 - 13
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -887,20 +887,24 @@ conn_mysql = None
 
 
 def generateRangeDocid(nums):
 def generateRangeDocid(nums):
     global conn_mysql
     global conn_mysql
-    with docid_lock:
-        if conn_mysql is None:
+    while 1:
+        try:
+            with docid_lock:
+                if conn_mysql is None:
+                    conn_mysql = getConnection_mysql()
+                cursor = conn_mysql.cursor()
+                sql = "select serial_value from b2c_serial_no where serial_name='DocumentIdSerial'"
+                cursor.execute(sql)
+                rows = cursor.fetchall()
+                current_docid = rows[0][0]
+                next_docid = current_docid+1
+                update_docid = current_docid+nums
+                sql = " update b2c_serial_no set serial_value=%d where serial_name='DocumentIdSerial'"%(update_docid)
+                cursor.execute(sql)
+                conn_mysql.commit()
+                return next_docid
+        except Exception as e:
             conn_mysql = getConnection_mysql()
             conn_mysql = getConnection_mysql()
-        cursor = conn_mysql.cursor()
-        sql = "select serial_value from b2c_serial_no where serial_name='DocumentIdSerial'"
-        cursor.execute(sql)
-        rows = cursor.fetchall()
-        current_docid = rows[0][0]
-        next_docid = current_docid+1
-        update_docid = current_docid+nums
-        sql = " update b2c_serial_no set serial_value=%d where serial_name='DocumentIdSerial'"%(update_docid)
-        cursor.execute(sql)
-        conn_mysql.commit()
-        return next_docid
 
 
 
 
 # 自定义jsonEncoder
 # 自定义jsonEncoder

+ 0 - 0
BaseDataMaintenance/maintenance/major_project/__init__.py


BIN
BaseDataMaintenance/maintenance/major_project/major_project_industry_keyword.xlsx


+ 452 - 0
BaseDataMaintenance/maintenance/major_project/unionDocument.py

@@ -0,0 +1,452 @@
+
+from BaseDataMaintenance.dataSource.source import getConnect_ots
+from tablestore import *
+from BaseDataMaintenance.model.ots.major_project import *
+from BaseDataMaintenance.common.Utils import *
+
+from queue import Queue
+
+from BaseDataMaintenance.common.multiThread import MultiThreadHandler
+import pandas as pd
+from BaseDataMaintenance.model.ots.enterprise import Enterprise
+import random
+from apscheduler.schedulers.blocking import BlockingScheduler
+
+def get_stage_pattern():
+    # stage_dict = {
+    #     "审批阶段": "审批|批复|(可行性|节能|全过程|社会稳定风险|建议书|施工图|专项法律)(评估|研究|方案|)(报告书?|造价|编制|审查|服务)+",
+    #     "设计阶段": "设计|勘察|勘查",
+    #     "环评阶段": "环评|(环境影响|水土保持)(评估|研究|方案|)(报告书?|造价|编制|审查|服务)+",
+    #     "施工准备": "监理",
+    #     "施工在建": "施工"
+    # }
+
+    stage_dict = {
+        "立项阶段": "立项|项目投资",
+        "可研阶段": "可行性研究|可研",
+        "环评阶段": "环境评价|环境影响|环境评测|环评",
+        "稳评阶段": "稳定风险|社会稳定|风险评估",
+        "咨询阶段": "(水影响|能源|交通影响|地质灾害|地址灾害|地震安全性|地震安全性|气象|雷击风险|安全|海洋|森林环境)(评[价估测])|水土保持|(水|交|灾|震|能|气|安|海|林)评",
+        "造价阶段": "(决算书|预算|结算|造价|决算)(编制|咨询)",
+        "设计阶段": "(施工图(纸|)|初步|项目|工程|工程方案)设计|测绘",
+        # "勘察阶段": "(勘察|勘查)设计|勘察技术|勘查|勘察",
+        "施工图审": "(施工图(纸|)|防雷|消防|人防)审查",
+        "施工许可": "施工许可证",
+        "施工准备": "施工准备|监理|资格预审|资审",
+        "施工在建": "施工",
+        "竣工阶段": "竣工公告|验收公告",
+        # "EPC总承包": "总承包|EPC"
+    }
+    # stage_dict = {
+    #     "立项阶段": stage_dict_child.get("立项阶段"),
+    #     "可研阶段": stage_dict_child.get("可研阶段"),
+    #     "环评阶段": stage_dict_child.get("环评阶段") + "|" + stage_dict_child.get("稳评阶段") + "|" + stage_dict_child.get("咨询阶段"),
+    #     "设计阶段": stage_dict_child.get("造价阶段") + "|" + stage_dict_child.get("设计阶段"),
+    #     "施工准备": stage_dict_child.get("施工图审") + "|" + stage_dict_child.get("施工许可") + "|" + stage_dict_child.get("施工准备"),
+    #     "施工在建": stage_dict_child.get("施工在建"),
+    #     "竣工阶段": stage_dict_child.get("竣工阶段")
+    # }
+
+
+    stage_priority_dict = {
+        "立项阶段": 1,
+        "可研阶段": 3,
+        "环评阶段": 3,
+        "稳评阶段": 3,
+        "咨询阶段": 2,
+        "造价阶段": 2,
+        "设计阶段": 4,
+        "勘察阶段": 4,
+        "施工图审": 2,
+        "施工许可": 2,
+        "施工准备": 3,
+        "施工在建": 5,
+        "竣工阶段": 3,
+        "EPC总承包": 4
+    }
+
+    # stage_priority_dict = {
+    #     "立项阶段": 1,
+    #     "可研阶段": 2,
+    #     "环评阶段": 2,
+    #     "设计阶段": 2,
+    #     "施工准备": 3,
+    #     "施工在建": 4,
+    #     "竣工阶段": 2,
+    # }
+
+
+
+    list_stage_v = []
+    for k,v in stage_dict.items():
+        list_stage_v.append("(?P<%s>%s)"%(k,v))
+    stage_pattern = "|".join(list_stage_v)
+    return stage_pattern, stage_priority_dict
+
+def extract_legal_stage(content, _pattern, priority_dict):
+    # 判断这几类直接返回
+    if not content:
+        return None
+    if re.search("拍卖|转让|产权|出让|租赁|招租", content) is not None:
+        return None
+    # 替换混淆词
+    _content = re.sub("设计院|设计总院", "", content)
+
+    list_stage = []
+    for stage_search in re.finditer(_pattern, _content):
+        for k,v in stage_search.groupdict().items():
+            if v is not None:
+                list_stage.append([k, priority_dict.get(k)])
+    if len(list_stage)>0:
+        list_stage.sort(key=lambda x: x[1])
+        return list_stage[0][0]
+    return None
+
+def read_industry_keyword(_path):
+    df = pd.read_excel(_path)
+    list_data = []
+    for _kw,_industry,_idf in zip(df["关键词"],df["行业"],df["权重"]):
+        _dict = {"keyword":_kw,
+                 "industry":_industry,
+                 "idf":_idf}
+        list_data.append(_dict)
+    return list_data
+
+def getKeywordPattern(list_industry_keyword):
+    list_keyword = []
+    for _ik in list_industry_keyword:
+        list_keyword.append(_ik.get("keyword",""))
+    return "|".join([re.escape(i) for i in list_keyword])
+
+def getDict_keyword(list_industry_keywrod):
+    dict_keyword = {}
+    for _ik in list_industry_keywrod:
+        dict_keyword[_ik.get("keyword")] = _ik
+    return dict_keyword
+
+
+def extract_industry(dict_keyword,_pattern,content):
+    dict_industry_prob = {}
+    list_keyword = re.findall(_pattern,content)
+    for word in list_keyword:
+        if word!="":
+            _ik = dict_keyword.get(word)
+            if _ik is not None:
+                _industry = _ik.get("industry")
+                _idf = _ik.get("idf",0)
+                if _industry not in dict_industry_prob:
+                    dict_industry_prob[_industry] = 0
+                dict_industry_prob[_industry] += _idf
+    _industry = "未知"
+    max_prob = 0
+    for k,v in dict_industry_prob.items():
+        if v>max_prob:
+            max_prob = v
+            _industry = k
+    return _industry
+
+def getWinTenderer(sub_docs_json,ots_client):
+    _win_tenderer = ""
+    _win_tenderer_contact_person = ""
+    _win_tenderer_contact_phone = ""
+    if sub_docs_json is None or sub_docs_json=="":
+        pass
+    else:
+        sub_docs = json.loads(sub_docs_json)
+        for _doc in sub_docs:
+            if _doc.get("win_tenderer","")!="":
+                _win_tenderer =  _doc.get("win_tenderer","")
+                _win_tenderer_contact_person = _doc.get("win_tenderer_manager","")
+                _win_tenderer_contact_phone = _doc.get("win_tenderer_phone","")
+                break
+    # if _win_tenderer!="":
+    #     try:
+    #
+    #         en = Enterprise({"name":_win_tenderer})
+    #         en.fix_columns(ots_client,["contacts"],False)
+    #         list_contacts = json.loads(en.getProperties().get("contacts","[]"))
+    #         list_contacts.sort(key=lambda x:x.get("score",0))
+    #         if len(list_contacts)>0:
+    #             _win_tenderer_contact_person = list_contacts[0].get("contact_person","")
+    #             _win_tenderer_contact_phone = list_contacts[0].get("phone_no","")
+    #             if _win_tenderer_contact_phone=="":
+    #                 _win_tenderer_contact_phone = list_contacts[0].get("mobile_no","")
+    #
+    #     except Exception as e:
+    #         pass
+    return _win_tenderer,_win_tenderer_contact_person,_win_tenderer_contact_phone
+
+def getIndustry(dict_industry):
+    list_industry = []
+    for k,v in dict_industry.items():
+        list_industry.append([k,v])
+    list_industry.sort(key=lambda x:x[1],reverse=True)
+    industry = "未知"
+    if len(list_industry)>0:
+        industry = list_industry[0][0]
+        if industry=="未知":
+            if len(list_industry)>1:
+                industry = list_industry[1][0]
+    return industry
+
+def dynamicDumplicate(list_dynamic):
+    _set = set()
+    l_d = []
+    for _dynamic in list_dynamic:
+        _key = "%s-%s"%(_dynamic.get("project_stage",""),_dynamic.get("page_time",""))
+        if _key in _set:
+            continue
+        _set.add(_key)
+        l_d.append(_dynamic)
+    return l_d
+
+def dynamicDumplicate2(list_dynamic):
+    _set = set()
+    l_d = []
+    list_dynamic.sort(key=lambda x:x.get("page_time",""))
+    list_dynamic.sort(key=lambda x:1 if x.get("docchannel","") in (101,119,120) else 0,reverse=True)
+    for _dynamic in list_dynamic:
+        _stage = _dynamic.get("project_stage","")
+        _channel = _dynamic.get("docchannel","")+_dynamic.get("sp_type","")
+        _key = _stage+_channel
+        if _key in _set or _key=="" or _key is None:
+            continue
+        _set.add(_key)
+        l_d.append(_dynamic)
+    return l_d
+
+
+def getMaxStage(list_dynamic,stage_order):
+    max_stage_index = -1
+    _stage = "未知"
+    for _dynamic in list_dynamic:
+        project_stage = _dynamic.get("project_stage")
+        if project_stage is None:
+            continue
+        if list(stage_order).count(project_stage)>0:
+            _index = list(stage_order).index(project_stage)
+            if _index>max_stage_index:
+                max_stage_index = _index
+                _stage = project_stage
+    return _stage
+
+class MajorUnion():
+
+    def __init__(self):
+        self.ots_client = getConnect_ots()
+        self.search_columns = [major_project_project_name,major_project_province,major_project_project_stage,major_project_plan_start_time]
+
+
+        self.stage_pattern,self.stage_priority_dict = get_stage_pattern()
+        self.current_path = os.path.abspath(os.path.dirname(__file__))
+
+        self.list_industry_keyword = read_industry_keyword(os.path.join(self.current_path,"major_project_industry_keyword.xlsx"))
+        self.keyword_pattern = getKeywordPattern(self.list_industry_keyword)
+        self.dict_keyword = getDict_keyword(self.list_industry_keyword)
+
+        self.stage_order = ["立项阶段",
+                            "可研阶段",
+                            "环评阶段",
+                            "稳评阶段",
+                            "咨询阶段",
+                            "造价阶段",
+                            "设计阶段",
+                            "勘察阶段",
+                            "施工图审",
+                            "施工许可",
+                            "施工准备",
+                            "施工在建",
+                            "竣工阶段",
+                            "EPC总承包"
+                            ]
+
+
+    def producer(self):
+        bool_query = BoolQuery(must_queries=[
+            # RangeQuery(major_project_status,1,50,True,True),
+            TermQuery(major_project_id,"013dbe9c24fcd80d155ec9e1d8d9eebe")
+            ]
+        )
+
+        task_queue = Queue()
+
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search("major_project","major_project_index",
+                                                                            SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),get_total_count=True,limit=100),
+                                                                            ColumnsToGet(self.search_columns,ColumnReturnType.SPECIFIED))
+        log("major producer total count:%d"%(total_count))
+        list_data = getRow_ots(rows)
+        for _data in list_data:
+            task_queue.put(_data)
+
+        while next_token:
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search("major_project","major_project_index",
+                                                                                SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
+                                                                                ColumnsToGet(self.search_columns,ColumnReturnType.SPECIFIED))
+            list_data = getRow_ots(rows)
+            for _data in list_data:
+                task_queue.put(_data)
+            if task_queue.qsize()>=1000:
+                break
+        return task_queue
+
+    def set_status_to_adult(self,_major):
+        _major.setValue(major_project_status,random.randint(201,300),True)
+
+    def comsumer(self):
+
+        def _handle(item,result_queue):
+            # _major = MajorProject(item)
+            # _major.setValue(major_project_status,1,True)
+            # _major.update_row(self.ots_client)
+            # return
+
+            project_name = item.get(major_project_project_name,"")
+            province = item.get(major_project_province,"")
+            _major = MajorProject(item)
+            if project_name=="":
+                #修改status
+                self.set_status_to_adult(_major)
+                _major.update_row(self.ots_client)
+                return
+
+            list_dynamics = []
+
+            if len(project_name)>6:
+                bool_query_sp = BoolQuery(must_queries=[
+                    MatchPhraseQuery("page_content",project_name),
+                    MatchPhraseQuery("province",province)
+                ])
+            else:
+                bool_query_sp = BoolQuery(must_queries=[
+                    MatchPhraseQuery("page_title",project_name),
+                    MatchPhraseQuery("province",province)
+                ])
+
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search("t_shen_pi_xiang_mu","t_shen_pi_xiang_mu_index",
+                                                                                SearchQuery(bool_query_sp,limit=100),
+                                                                                ColumnsToGet(["province","page_title","page_content","page_time","sp_type"],ColumnReturnType.SPECIFIED))
+            list_data = getRow_ots(rows)
+            dict_industry = {}
+            for _data in list_data:
+                _content = _data.get("page_title","")+_data.get("page_content","")
+                _stage = extract_legal_stage(_content,self.stage_pattern,self.stage_priority_dict)
+                _dynamic = {"docid":str(_data.get("id")),
+                            "doctype":2,
+                            "doctitle":_data.get("page_title",""),
+                            "page_time":_data.get("page_time",""),
+                            "sp_type":str(_data.get("sp_type","")),
+                            "project_stage":_stage}
+                list_dynamics.append(_dynamic)
+                _industry = extract_industry(self.dict_keyword,self.keyword_pattern,_content)
+                if _industry not in dict_industry:
+                    dict_industry[_industry] = 0
+                dict_industry[_industry] += 1
+
+
+            log("%s search sp %d"%(item.get("id"),len(list_data)))
+
+
+            if len(project_name)>6:
+                bool_query_doc = BoolQuery(must_queries=[
+                    BoolQuery(should_queries=[
+                        MatchPhraseQuery("doctitle",project_name),
+                        MatchPhraseQuery("doctextcon",project_name),
+                        MatchPhraseQuery("attachmenttextcon",project_name),
+                    ]),
+                    WildcardQuery("province","%s*"%province),
+                    RangeQuery("status",201,300,True,True)
+
+                ],must_not_queries=[
+                    TermQuery("docchannel",106),
+                    TermQuery("docchannel",107)
+                ])
+            else:
+                bool_query_doc = BoolQuery(must_queries=[
+                    BoolQuery(should_queries=[
+                        MatchPhraseQuery("doctitle",project_name),
+                    ]),
+                    WildcardQuery("province","%s*"%province),
+                    RangeQuery("status",201,300,True,True)
+
+                ],must_not_queries=[
+                    TermQuery("docchannel",106),
+                    TermQuery("docchannel",107)
+                ])
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
+                                                                                SearchQuery(bool_query_doc,limit=100),
+                                                                                ColumnsToGet(["doctitle","doctextcon","attachmenttextcon","page_time","docchannel","bidway","sub_docs_json","project_name"],ColumnReturnType.SPECIFIED))
+            list_data = getRow_ots(rows)
+
+            log("%s search doc %d"%(item.get("id"),len(list_data)))
+            for _data in list_data:
+                _content = _data.get("doctitle","")+_data.get("doctextcon","")+_data.get("attachmenttextcon","")
+                stage_content = _data.get("doctitle","")+_data.get("project_name","")
+                win_tenderer,win_tenderer_manager,win_tenderer_phone = getWinTenderer(_data.get("sub_docs_json"),self.ots_client)
+                _stage = extract_legal_stage(stage_content,self.stage_pattern,self.stage_priority_dict)
+                _dynamic = {"docid":str(_data.get("docid")),
+                            "doctype":1,
+                            "doctitle":_data.get("doctitle",""),
+                            "page_time":_data.get("page_time",""),
+                            "docchannel":_data.get("docchannel",""),
+                            "bidway":str(_data.get("bidway","")),
+                            "project_stage":_stage,
+                            "win_tenderer":win_tenderer,
+                            "win_tenderer_manager":win_tenderer_manager,
+                            "win_tenderer_phone":win_tenderer_phone}
+                list_dynamics.append(_dynamic)
+                _industry = extract_industry(self.dict_keyword,self.keyword_pattern,_content)
+                if _industry not in dict_industry:
+                    dict_industry[_industry] = 0
+                dict_industry[_industry] += 1
+            # print(list_data)
+            # print(list_dynamics)
+            list_dynamics_all =  dynamicDumplicate(list_dynamics)
+            industry = getIndustry(dict_industry)
+            all_project_dynamics = json.dumps(list_dynamics_all,ensure_ascii=False)
+            all_project_dynamic_number = len(list_dynamics_all)
+
+            list_dynamics = dynamicDumplicate2(list_dynamics_all)
+            list_dynamics.sort(key=lambda x:x.get("page_time",""),reverse=True)
+            project_dynamic_number = len(list_dynamics)
+            project_dynamics = json.dumps(list_dynamics,ensure_ascii=False)
+            # project_stage = getMaxStage(list_dynamics,self.stage_order)
+            current_stage = item.get(major_project_project_stage,"")
+            project_stage = "未知"
+            latest_page_time = ""
+
+            if len(list_dynamics)>0:
+                list_dynamics.sort(key=lambda x:x.get("page_time",""),reverse=True)
+                latest_page_time = list_dynamics[0].get("page_time","")
+                project_stage = list_dynamics[0].get("project_stage","未知")
+            current_date = getCurrent_date(format="%Y")
+            plan_start_time = item.get(major_project_plan_start_time,"")
+            if (re.search("储备|前期|预备",current_stage) is not None or current_stage=="" or current_stage=="未知") and project_stage=="未知" and plan_start_time>=current_date:
+                project_stage = "预备阶段"
+
+            _major.setValue(major_project_industry,industry,True)
+            _major.setValue(major_project_project_dynamics,project_dynamics,True)
+            _major.setValue(major_project_project_dynamic_number,project_dynamic_number,True)
+            _major.setValue(major_project_project_stage,project_stage,True)
+            _major.setValue(major_project_latest_page_time,latest_page_time,True)
+            _major.setValue(major_project_update_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
+            _major.setValue(major_project_all_project_dynamics,all_project_dynamics,True)
+            _major.setValue(major_project_all_project_dynamic_number,all_project_dynamic_number,True)
+            self.set_status_to_adult(_major)
+            _major.update_row(self.ots_client)
+
+        task_queue = self.producer()
+
+        mt = MultiThreadHandler(task_queue,_handle,None,30)
+        mt.run()
+
+    def start_union(self):
+        scheduler = BlockingScheduler()
+        scheduler.add_job(self.comsumer,"cron",minute="*/1")
+        scheduler.start()
+
+def start_major_union():
+    mu = MajorUnion()
+    mu.start_union()
+if __name__ == '__main__':
+    mu = MajorUnion()
+    mu.comsumer()

+ 2 - 0
BaseDataMaintenance/model/ots/BaseModel.py

@@ -39,6 +39,8 @@ class BaseModel():
     def getAttribute_turple(self):
     def getAttribute_turple(self):
         _list = []
         _list = []
         for _key in self.getAttribute_keys():
         for _key in self.getAttribute_keys():
+            if _key=="all_columns":
+                continue
             _v = self.getProperties().get(_key)
             _v = self.getProperties().get(_key)
             if _v is not None:
             if _v is not None:
                 if isinstance(_v,list):
                 if isinstance(_v,list):

+ 1 - 0
BaseDataMaintenance/model/ots/document_tmp.py

@@ -27,6 +27,7 @@ document_tmp_sub_docs_json = "sub_docs_json"
 document_tmp_save = "save"
 document_tmp_save = "save"
 document_tmp_dup_docid = "dup_docid"
 document_tmp_dup_docid = "dup_docid"
 document_tmp_merge_uuid = "merge_uuid"
 document_tmp_merge_uuid = "merge_uuid"
+document_tmp_projects = "projects"
 document_tmp_page_time = "page_time"
 document_tmp_page_time = "page_time"
 document_tmp_attachment_extract_status = "attachment_extract_status"
 document_tmp_attachment_extract_status = "attachment_extract_status"
 document_tmp_web_source_no = "web_source_no"
 document_tmp_web_source_no = "web_source_no"

+ 65 - 0
BaseDataMaintenance/model/ots/major_project.py

@@ -0,0 +1,65 @@
+from BaseDataMaintenance.model.ots.BaseModel import BaseModel
+
+major_project_id = "id"
+major_project_project_name = "project_name"
+major_project_area = "area"
+major_project_province = "province"
+major_project_city = "city"
+major_project_district = "district"
+major_project_construction_enterprise = "construction_enterprise"
+major_project_project_leader = "project_leader"
+major_project_project_overview = "project_overview"
+major_project_construction_nature = "construction_nature"
+major_project_main_engineering = "main_engineering"
+major_project_plan_start_time = "plan_start_time"
+major_project_plan_end_time = "plan_end_time"
+major_project_total_investment = "total_investment"
+major_project_industry_first = "industry_first"
+major_project_industry_second = "industry_second"
+major_project_industry_third = "industry_third"
+major_project_industry = "industry"
+major_project_page_time = "page_time"
+major_project_plan_year = "plan_year"
+major_project_project_dynamics = "project_dynamics"
+major_project_project_dynamic_number = "project_dynamic_number"
+major_project_project_stage = "project_stage"
+major_project_latest_page_time = "latest_page_time"
+major_project_detail_link = "detail_link"
+major_project_web_source_no = "web_source_no"
+major_project_web_source = "web_source"
+major_project_status = "status"
+major_project_construction_period = "construction_period"
+major_project_attachments = "attachments"
+major_project_create_time = "create_time"
+major_project_update_time = "update_time"
+
+major_project_all_project_dynamics = "all_project_dynamics"
+major_project_all_project_dynamic_number = "all_project_dynamic_number"
+
+project_dynamics_docid = "docid"
+project_dynamics_doctype = "doctype"
+project_dynamics_doctitle = "doctitle"
+project_dynamics_page_time = "page_time"
+project_dynamics_project_stage = "project_stage"
+project_dynamics_win_tenderer = "win_tenderer"
+project_dynamics_win_tenderer_manager = "win_tenderer_manager"
+project_dynamics_win_tenderer_phone = "win_tenderer_phone"
+
+
+
+class MajorProject(BaseModel):
+
+    def __init__(self,_dict):
+
+        for k,v in _dict.items():
+            self.setValue(k,v,True)
+        self.table_name = "major_project"
+
+    def getPrimary_keys(self):
+        return [major_project_id]
+
+
+
+
+if __name__=="__main__":
+    pass

+ 43 - 0
BaseDataMaintenance/model/ots/project.py

@@ -1,6 +1,49 @@
 from BaseDataMaintenance.model.ots.BaseModel import BaseModel
 from BaseDataMaintenance.model.ots.BaseModel import BaseModel
 
 
 project_uuid = "uuid"
 project_uuid = "uuid"
+project_docids = "docids"
+project_zhao_biao_page_time = "zhao_biao_page_time"
+project_zhong_biao_page_time = "zhong_biao_page_time"
+project_page_time = "page_time"
+project_doctextcon = "doctextcon"
+project_area = "area"
+project_province = "province"
+project_city = "city"
+project_district = "district"
+project_info_type = "info_type"
+project_industry = "industry"
+project_qcodes = "qcodes"
+project_project_name = "project_name"
+project_project_code = "project_code"
+project_project_codes = "project_codes"
+project_project_addr = "project_addr"
+project_tenderee = "tenderee"
+project_tenderee_addr = "tenderee_addr"
+project_tenderee_phone = "tenderee_phone"
+project_tenderee_contact = "tenderee_contact"
+project_agency = "agency"
+project_agency_phone = "agency_phone"
+project_agency_contact = "agency_contact"
+project_sub_project_name = "sub_project_name"
+project_sub_project_code = "sub_project_code"
+project_bidding_budget = "bidding_budget"
+project_win_tenderer = "win_tenderer"
+project_win_bid_price = "win_bid_price"
+project_win_tenderer_manager = "win_tenderer_manager"
+project_win_tenderer_phone = "win_tenderer_phone"
+project_second_tenderer = "second_tenderer"
+project_second_bid_price = "second_bid_price"
+project_second_tenderer_manager = "second_tenderer_manager"
+project_second_tenderer_phone = "second_tenderer_phone"
+project_third_tenderer = "third_tenderer"
+project_third_bid_price = "third_bid_price"
+project_third_tenderer_manager = "third_tenderer_manager"
+project_third_tenderer_phone = "third_tenderer_phone"
+project_procurement_system = "procurement_system"
+project_bidway = "bidway"
+project_dup_data = "dup_data"
+project_docid_number = "docid_number"
+project_dynamics = "project_dynamic"
 
 
 
 
 
 

+ 0 - 0
BaseDataMaintenance/model/testmysql/__init__.py


+ 11 - 0
BaseDataMaintenance/start_sychro_unionMajor.py

@@ -0,0 +1,11 @@
+
+import sys
+import os
+sys.path.append(os.path.dirname(__file__)+"/..")
+
+from BaseDataMaintenance.maintenance.major_project.unionDocument import start_major_union
+
+
+
+if __name__=="__main__":
+    start_major_union()