Bladeren bron

数据遗漏检查后自动同步

luojiehua 5 maanden geleden
bovenliggende
commit
9e5621cde3

+ 77 - 0
BaseDataMaintenance/chat/ERNIE_utils.py

@@ -0,0 +1,77 @@
+
+import requests
+import json
+
+def get_access_token():
+    """
+    使用 API Key,Secret Key 获取access_token,替换下列示例中的应用API Key、应用Secret Key
+    """
+
+    url = "https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id=gnwVXv96An9qMYqq9eWbeNqk&client_secret=mDsRQbCPsV4N7x28LbwkhTAaLmrrDnXk"
+    url = "https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id=Ok8QMe4qIQOAex0F9Gf1uns0&client_secret=6DjGGDdvhnBaEOMdSXAg02KxZnQhWpbd"
+
+    payload = json.dumps("")
+    headers = {
+        'Content-Type': 'application/json',
+        'Accept': 'application/json'
+    }
+
+    response = requests.request("POST", url, headers=headers, data=payload)
+    return response.json().get("access_token")
+
+def main():
+    _token = get_access_token()
+    # _token = "24.93c9d66ffc94ffaef6c6c9d35770a5f5.2592000.1701242081.282335-37357318"
+    url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/completions?access_token=" + _token
+
+    # url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/xuanyuan_70b_chat?access_token=" + _token
+
+    payload = json.dumps({
+        "messages": [
+            {
+                "role": "user",
+                "content": '''
+               今天是几号
+                '''
+            }
+        ]
+    })
+    headers = {
+        'Content-Type': 'application/json'
+    }
+
+
+
+
+    response = requests.request("POST", url, headers=headers, data=payload)
+
+    print(response.text)
+
+def chat(msg,token=None,api_url=None):
+    if token is None:
+        token = get_access_token()
+    # _token = "24.93c9d66ffc94ffaef6c6c9d35770a5f5.2592000.1701242081.282335-37357318"
+    if api_url is None:
+        api_url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/completions"
+        # api_url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/ernie-3.5-128k"
+    url =  api_url+"?access_token="+ token
+    payload = json.dumps({
+        "messages": [
+            {
+                "role": "user",
+                "content": '''
+               %s
+                '''%msg
+            }
+        ],
+        "stream":False
+    })
+    headers = {
+        'Content-Type': 'application/json'
+    }
+    response = requests.request("POST", url, headers=headers, data=payload)
+
+    return response
+
+if __name__ == '__main__':
+    main()

+ 86 - 0
BaseDataMaintenance/chat/chatUtil.py

@@ -0,0 +1,86 @@
+#coding:utf8
+
+from bs4 import BeautifulSoup
+import re
+
+def html2text(_html):
+
+    if type(_html)==str:
+        _soup = BeautifulSoup(_html,"lxml")
+    else:
+        _soup = _html
+    list_table = _soup.find_all("table")
+    list_tbody = _soup.find_all("tbody")
+    if len(list_table)>0 or len(list_tbody)>0:
+        list_childs = _soup.find_all(recursive=False)
+        list_child_text = []
+        for child in list_childs:
+            list_child_text.append(html2text(child))
+        return "\n".join(list_child_text)
+
+    else:
+        if _soup.name=="table" or _soup.name=="tbody":
+            _table_text = ""
+            trs = _soup.find_all("tr")
+            list_tr_text = []
+            for tr in trs:
+                tds = tr.find_all("th")
+                if len(tds)>0:
+                    list_td_text = []
+                    for td in tds:
+                        list_td_text.append(re.sub('\s','',td.get_text()))
+                    list_tr_text.append("|".join(list_td_text))
+                tds = tr.find_all("td")
+                if len(tds)>0:
+                    list_td_text = []
+                    for td in tds:
+                        list_td_text.append(re.sub('\s','',td.get_text()))
+                    list_tr_text.append("|".join(list_td_text))
+            _table_text = "%s\n\n"%"\n".join(list_tr_text)
+            if _table_text == "":
+                _table_text = _soup.get_text()
+            _soup.decompose()
+            return _table_text
+        else:
+            _text = re.sub('\s','',_soup.get_text().strip())
+            _soup.decompose()
+            return _text
+
+def table2list(_html):
+    if type(_html)==str:
+        _soup = BeautifulSoup(_html,'lxml')
+    else:
+        _soup = _html
+    print("===",type(_soup),_soup.name)
+    if _soup.name=="table" or _soup.name=="tbody":
+        _table_text = ""
+        trs = _soup.find_all("tr")
+        list_tr_text = []
+        for tr in trs:
+            tds = tr.find_all("th")
+            if len(tds)>0:
+                list_td_text = []
+                for td in tds:
+                    list_td_text.append(re.sub('\s','',td.get_text()))
+                if len(list_td_text)>0:
+                    list_tr_text.append(list_td_text)
+            tds = tr.find_all("td")
+            if len(tds)>0:
+                list_td_text = []
+                for td in tds:
+                    list_td_text.append(re.sub('\s','',td.get_text()))
+                if len(list_td_text)>0:
+                    list_tr_text.append(list_td_text)
+        return list_tr_text
+
+def tableList2text(table_list):
+    list_tr_text = []
+    for tr in table_list:
+        tds = tr
+        if len(tds)>0:
+            list_td_text = []
+            for td in tds:
+                list_td_text.append(re.sub('\s','',td))
+            list_tr_text.append("|".join(list_td_text))
+    _table_text = "%s\n\n"%"\n".join(list_tr_text)
+    return _table_text

+ 31 - 13
BaseDataMaintenance/maintenance/dataflow.py

@@ -2571,7 +2571,7 @@ class Dataflow_dumplicate(Dataflow):
                 else:
                     bool_query = _query
                 rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
-                                                                                    SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(sort_column)]),limit=60,get_total_count=True),
+                                                                                    SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(sort_column)]),limit=100,get_total_count=True),
                                                                                     ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
                 list_dict = getRow_ots(rows)
                 list_data = []
@@ -3291,33 +3291,42 @@ class Dataflow_dumplicate(Dataflow):
             list_projects = dumplicate_projects(list_projects)
         list_projects.extend(list_delete_projects)
         project_json = to_project_json(list_projects)
-        print("delete_json",project_json)
         return project_json
 
 
     def delete_doc_handle(self,_dict,result_queue):
         headers = _dict.get("frame")
         conn = _dict.get("conn")
-        log("==========delete")
+
         if headers is not None:
             message_id = headers.headers["message-id"]
             body = headers.body
             item = json.loads(body)
             docid = item.get("docid")
+            log("==========start delete docid:%s"%(str(docid)))
             if docid is None:
-                return
+                ackMsg(conn,message_id)
             delete_result = self.delete_projects_by_document(docid)
 
+            log("1")
             _uuid = uuid4().hex
             _d = {PROJECT_PROCESS_UUID:_uuid,
                   PROJECT_PROCESS_CRTIME:1,
                   PROJECT_PROCESS_PROJECTS:delete_result}
             _pp = Project_process(_d)
-            if _pp.update_row(self.ots_client):
+            log("2")
+            try:
+                if _pp.update_row(self.ots_client):
+                    ackMsg(conn,message_id)
+            except Exception as e:
                 ackMsg(conn,message_id)
+            log("3")
             #取消插入结果队列,改成插入project_process表
             # if send_msg_toacmq(self.pool_mq_ali,delete_result,self.doc_delete_result):
             #     ackMsg(conn,message_id)
+            log("==========end delete docid:%s"%(str(docid)))
+        else:
+            log("has not headers")
 
     def generate_common_properties(self,list_docs):
         '''
@@ -3693,6 +3702,14 @@ class Dataflow_dumplicate(Dataflow):
                       should_q_cod]
             list_query.append([_query,2])
 
+        if win_tenderer!="" and sub_project_name!="":
+            _query = [TermQuery(project_win_tenderer,win_tenderer),
+                      TermQuery(project_sub_project_name,sub_project_name)
+                                             ]
+            list_query.append([_query,2])
+
+
+
         if win_tenderer!="" and float(win_bid_price)>0:
             _query = [TermQuery(project_win_tenderer,win_tenderer),
                                              TermQuery(project_win_bid_price,win_bid_price)]
@@ -3749,10 +3766,7 @@ class Dataflow_dumplicate(Dataflow):
                 _uuid = _proj.get("uuid")
                 if _uuid is not None:
                     set_uuid = set_uuid | set(_uuid.split(","))
-            must_not_q = []
-            for _uuid in list(set_uuid):
-                must_not_q.append(TermQuery("uuid",_uuid))
-                print("must_not_q uuid:%s"%(_uuid))
+
 
 
             projects_merge_count = 0
@@ -3768,6 +3782,10 @@ class Dataflow_dumplicate(Dataflow):
             docids = ""
             for _proj in list_projects[:30]:
 
+                must_not_q = []
+                for _uuid in list(set_uuid):
+                    must_not_q.append(TermQuery("uuid",_uuid))
+
                 docids = _proj.get(project_docids,"")
                 page_time = _proj.get(project_page_time,"")
                 project_codes = _proj.get(project_project_codes,"")
@@ -3872,7 +3890,8 @@ class Dataflow_dumplicate(Dataflow):
                 list_merge_data.sort(key=lambda x:x.get(project_page_time,""))
                 list_merge_data.sort(key=lambda x:x.get(project_bidding_budget,-1))
                 # log(page_time_less+"=="+page_time_greater)
-                # log("list_merge_data:%s"%(str(list_merge_data)))
+                if b_log:
+                    log("list_merge_data count:%d"%(len(list_merge_data)))
                 list_check_data = []
                 for _data in list_merge_data:
                     _time = time.time()
@@ -3933,10 +3952,9 @@ class Dataflow_dumplicate(Dataflow):
             list_docids = [a for a in list_docids if a is not None]
 
 
-
             _time = time.time()
             list_projects = self.search_projects_with_document(list_docids)
-            # log("search projects takes:%.3f"%(time.time()-_time))
+            log("search %d projects takes:%.3f"%(len(list_projects),time.time()-_time))
             if len(list_projects)==0:
                 # _time = time.time()
                 list_docs = self.search_docs(list_docids)
@@ -4497,7 +4515,7 @@ if __name__ == '__main__':
     # test_attachment_interface()
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
-    df_dump.test_dumplicate(562889387
+    df_dump.test_dumplicate(576859812
                             )
     # compare_dumplicate_check()
     # df_dump.test_merge([391898061

+ 646 - 0
BaseDataMaintenance/maintenance/document/ApprovalData.py

@@ -0,0 +1,646 @@
+
+from BaseDataMaintenance.common.Utils import *
+from BaseDataMaintenance.dataSource.source import getConnect_ots,getConnect_ots_capacity
+from tablestore import *
+import pandas as pd
+from queue import Queue
+from BaseDataMaintenance.common.multiThread import MultiThreadHandler
+from BaseDataMaintenance.model.ots.document import Document
+
+import json
+from uuid import uuid4
+from bs4 import BeautifulSoup
+
+'''
+"approval": [
+        {
+            "approval_items": "", #审批事项
+            "approval_result": "", #审批结果
+            "approver": "",#审批部门
+            "city": "深圳",
+            "construct_company": "深圳市赛孚电子科技有限公司",# 建设单位
+            "construction_scale": "",#建设规模
+            "declare_company": "",#申报单位
+            "district": "光明",
+            "doc_num": "",#审批文号
+            "evaluation_agency": "",#环评机构
+            "legal_person": "陈雷", # 项目法人
+            "moneysource": "",# 资金来源
+            "phone": "",
+            "pro_type": "",#申报类型
+            "project_addr": "广东省深圳市光明区玉塘街道田寮社区第七工业区26栋301",
+            "project_code": "",
+            "project_name": "深圳市赛孚电子科技有限公司销售医用射线装置项目",
+            "properties": "新建", #建设性质
+            "province": "广东",
+            "time_commencement": "",# 开工时间
+            "time_completion": "",#竣工时间
+            "time_declare": "",#申报时间
+            "total_tendereeMoney": "200000", # 总投资
+            "year_limit": ""#建设年限,
+"compilation_unit": "编制单位", 
+"publisher": "发布单位",
+"time_approval":"审批时间",
+"time_release": "发布日期"
+        }
+    ]
+'''
+
+
+key_trans = {
+    "doctitle":"公告标题",
+    "page_time":"公告时间",
+    "province": "省份",
+    "city": "城市",
+    "district": "地区",
+
+    "approval_items": "审批事项",
+    "approval_result": "审批结果",
+    "declare_company": "申报单位",
+    "construct_company": "建设单位",
+    "evaluation_agency": "环评机构",
+    "approver": "审批部门",
+    "compilation_unit": "编制单位",
+    "publisher": "发布单位",
+
+    "total_tendereeMoney": "总投资",
+    "construction_scale": "建设规模",
+    "proportion":"建筑面积",
+    "usearea":"用地面积",
+
+    "doc_num": "审批文号",
+
+    "legal_person": "项目法人",
+    "moneysource": "资金来源",
+    "moneyuse":"资金构成",
+    "env_invest":"环保投资",
+    "phone": "电话",
+    "pro_type": "申报类型",
+    "project_addr": "项目地址",
+    "project_code": "项目编号",
+    "project_name": "项目名称",
+    "properties": "建设性质",
+    "time_commencement": "开工时间",
+    "time_completion": "竣工时间",
+    "time_declare": "申报时间",
+
+    "year_limit": "建设年限",
+
+    "time_approval":"审批时间",
+    "time_release": "发布日期"
+}
+
+key_trans_d = {"docid":"公告id"}
+key_trans_d.update(key_trans)
+
+
+
+
+def extract_proportion(content, has_preffix=True):
+    if not content:
+        return "", ""
+    # log("content")
+    # log(content)
+    suffix = "[大概约为是::【\[\s]*[\d,]+(\.\d+)?[十百千万亿]*([\]】平方kK千万公㎡mM米里顷亩]+2?))"
+    reg_dict = {
+        0: "(?P<proportion>(总((建筑|建设)(面积|规模)|长|长度))" + suffix,
+        1: "(?P<proportion>((建筑|建设)(面积|规模)|全长)" + suffix,
+        2: "(?P<proportion>((建筑|建设|区域)?面积|全长|项目规模)" + suffix
+    }
+
+    if not has_preffix:
+        reg_dict[3] = "(?P<proportion>" + suffix
+
+    _proportion = ""
+    for i in range(len(list(reg_dict.keys()))):
+        if _proportion:
+            break
+        _pattern = reg_dict.get(i)
+        # logging.info('content ' + str(content))
+        match = re.search(_pattern, str(content))
+        if match:
+            _proportion = match.groupdict().get("proportion", "")
+
+    if not _proportion:
+        return "", ""
+
+    # 统一格式
+    multiple_cnt = 1
+    digit = ""
+
+    # 确定具体数字
+    match = re.search('(?P<d1>[\d,]+)(?P<d2>(\.\d+)?)', _proportion)
+    if match:
+        # logging.info(str(_proportion) + '  ' + str(match.group()))
+        d1 = match.group('d1')
+        d2 = match.group('d2')
+        try:
+            d1 = int(re.sub(',', '', d1))
+        except:
+            return "", ""
+        if d2:
+            d2 = Decimal(d2[1:]) / Decimal(str(int(10 ** len(d2[1:]))))
+            # print('d1, d2', d1, d2)
+            d1 += d2
+        digit = d1
+    # print('digit', digit)
+
+    # 确定中文倍数
+    _proportion2 = re.sub(re.escape(match.group()), '', _proportion)
+    match = re.search('[十百千万亿]+', _proportion2)
+    _dict = {'十': 10, '百': 100, '千': 1000, '万': 10000, '亿': 100000000}
+    if match:
+        for c in match.group():
+            multiple_cnt *= _dict.get(c)
+        _proportion3 = re.sub(re.escape(match.group()), '', _proportion2)
+    else:
+        _proportion3 = _proportion2
+    # print('multiple_cnt2', multiple_cnt)
+
+    # 确定面积/长度
+    match = re.search('[平方㎡顷亩]+|[mM]2', _proportion3)
+    if match:
+        unit = '㎡'
+    else:
+        unit = 'm'
+
+    # 确定单位倍数
+    match = re.search('[平方kK千万公㎡mM米里顷亩]+2?', _proportion3)
+    if match:
+        if unit == 'm':
+            if re.search('[kK千公]', match.group()):
+                multiple_cnt *= 1000
+            elif re.search('[里]', match.group()):
+                multiple_cnt *= Decimal(str(500))
+        else:
+            if '亩' in match.group():
+                multiple_cnt *= Decimal(str(666.67))
+            elif '顷' in match.group():
+                multiple_cnt *= 10000
+            elif re.search('千米|公里|k[mM㎡]', match.group()):
+                multiple_cnt *= 1000000
+    # print('multiple_cnt1', multiple_cnt)
+
+    # 拼接
+    digit = str(digit * multiple_cnt) + unit
+
+    return _proportion, digit
+
+def extract_usearea(content, has_preffix=True):
+    if not content:
+        return "", ""
+    # log("content")
+    # log(content)
+    suffix = "[大概约为是::【\[\s]*[\d,]+(\.\d+)?[十百千万亿]*([\]】平方kK千万公㎡mM米里顷亩]+2?))"
+    reg_dict = {
+        0: "(?P<proportion>(总((用地|占地|使用)(面积|规模)|长|长度))" + suffix,
+        1: "(?P<proportion>((用地|占地|使用)(面积|规模)|全长)" + suffix,
+        2: "(?P<proportion>((用地|占地|使用)?面积)" + suffix
+    }
+
+    if not has_preffix:
+        reg_dict[3] = "(?P<proportion>" + suffix
+
+    _proportion = ""
+    for i in range(len(list(reg_dict.keys()))):
+        if _proportion:
+            break
+        _pattern = reg_dict.get(i)
+        # logging.info('content ' + str(content))
+        match = re.search(_pattern, str(content))
+        if match:
+            _proportion = match.groupdict().get("proportion", "")
+
+    if not _proportion:
+        return "", ""
+
+    # 统一格式
+    multiple_cnt = 1
+    digit = ""
+
+    # 确定具体数字
+    match = re.search('(?P<d1>[\d,]+)(?P<d2>(\.\d+)?)', _proportion)
+    if match:
+        # logging.info(str(_proportion) + '  ' + str(match.group()))
+        d1 = match.group('d1')
+        d2 = match.group('d2')
+        try:
+            d1 = int(re.sub(',', '', d1))
+        except:
+            return "", ""
+        if d2:
+            d2 = Decimal(d2[1:]) / Decimal(str(int(10 ** len(d2[1:]))))
+            # print('d1, d2', d1, d2)
+            d1 += d2
+        digit = d1
+    # print('digit', digit)
+
+    # 确定中文倍数
+    _proportion2 = re.sub(re.escape(match.group()), '', _proportion)
+    match = re.search('[十百千万亿]+', _proportion2)
+    _dict = {'十': 10, '百': 100, '千': 1000, '万': 10000, '亿': 100000000}
+    if match:
+        for c in match.group():
+            multiple_cnt *= _dict.get(c)
+        _proportion3 = re.sub(re.escape(match.group()), '', _proportion2)
+    else:
+        _proportion3 = _proportion2
+    # print('multiple_cnt2', multiple_cnt)
+
+    # 确定面积/长度
+    match = re.search('[平方㎡顷亩]+|[mM]2', _proportion3)
+    if match:
+        unit = '㎡'
+    else:
+        unit = 'm'
+
+    # 确定单位倍数
+    match = re.search('[平方kK千万公㎡mM米里顷亩]+2?', _proportion3)
+    if match:
+        if unit == 'm':
+            if re.search('[kK千公]', match.group()):
+                multiple_cnt *= 1000
+            elif re.search('[里]', match.group()):
+                multiple_cnt *= Decimal(str(500))
+        else:
+            if '亩' in match.group():
+                multiple_cnt *= Decimal(str(666.67))
+            elif '顷' in match.group():
+                multiple_cnt *= 10000
+            elif re.search('千米|公里|k[mM㎡]', match.group()):
+                multiple_cnt *= 1000000
+    # print('multiple_cnt1', multiple_cnt)
+
+    # 拼接
+    digit = str(digit * multiple_cnt) + unit
+
+    return _proportion, digit
+
+def extract_env_invest(content):
+    pattern = "环保投资[大概约为是::]*(?P<invs>\d+(\.\d+)?万?元)"
+
+    match = re.search(pattern,content)
+    if match is not None:
+        invest =  match.groupdict().get("invs","")
+        money = getUnifyMoney(invest)
+        if money>0:
+            return money
+    return ""
+
+def extract_moneyuse(content):
+    list_sentences = re.split(",|。",content)
+    list_data = []
+    pattern = "^.{,20}[费用|预备费|费][大概约为是::]*\d+(\.\d+)?万?元.{,20}$"
+    for sentence in list_sentences:
+        match = re.search(pattern,sentence)
+        if match is not None:
+            list_data.append(sentence)
+    return ",".join(list_data)
+
+def get_approval_data(ots_client,ots_capacity,docid):
+
+    bool_query = BoolQuery(must_queries=[
+        TermQuery("docid",docid)
+    ])
+    rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
+                                                                   SearchQuery(bool_query),
+                                                                   ColumnsToGet(["doctitle","project_name","page_time","project_code","approval_json","extract_json"],return_type=ColumnReturnType.SPECIFIED))
+    list_data = getRow_ots(rows)
+    for _d in list_data:
+        approval_json = _d.get("approval_json")
+        partitionkey = _d.get("partitionkey")
+        docid = _d.get("docid")
+        doctitle = _d.get("doctitle")
+        project_name = _d.get("project_name")
+        page_time = _d.get("page_time")
+        extract_json = _d.get("extract_json")
+
+        _d_html = {"partitionkey":partitionkey,"docid":docid}
+        _html = Document(_d_html)
+        _html.fix_columns(ots_capacity,["dochtmlcon"],True)
+        dochtml = _html.getProperties().get("dochtmlcon","")
+        doctextcon = BeautifulSoup(dochtml,"lxml").get_text()
+        attachmenttextcon = ""
+        try:
+            _extract = json.loads(extract_json)
+        except Exception  as e:
+            _extract = {}
+        proportion = _extract.get("pb",{}).get("proportion")
+        _,usearea = extract_usearea(doctextcon+attachmenttextcon)
+        env_invest = extract_env_invest(doctextcon+attachmenttextcon)
+        moneyuse = extract_moneyuse(doctextcon+attachmenttextcon)
+
+        if approval_json:
+            list_approval = json.loads(approval_json)
+            for _appr in list_approval:
+                _appr["partitionkey"] = partitionkey
+                _appr["docid"] = docid
+                _appr["doctitle"] = doctitle
+                _appr["page_time"] = page_time
+                _appr["proportion"] = proportion
+                _appr["usearea"] = usearea
+                _appr["env_invest"] = env_invest
+                _appr["moneyuse"] = moneyuse
+
+                fix_area(ots_client,_appr)
+
+                construction_scale = _d.get("construction_scale","")
+                proportion,_ = extract_proportion(construction_scale)
+                if proportion!="":
+                    _appr["proportion"] = proportion
+                _,usearea = extract_usearea(construction_scale)
+                if usearea!="":
+                    _appr["usearea"] = usearea
+                env_invest = extract_env_invest(construction_scale)
+                if env_invest!="":
+                    _appr["env_invest"] = env_invest
+                moneyuse = extract_moneyuse(construction_scale)
+                if moneyuse!="":
+                    _appr["moneyuse"] = moneyuse
+
+            return list_approval
+
+
+def check_approval(appr1,appr2):
+    check_keys = ["declare_company","construct_company","total_tendereeMoney","proportion","usearea","doc_num","project_code"]
+    same_count = 0
+    for k in check_keys:
+        if k in appr1 and k in appr2:
+            if appr1[k]==appr2[k] and appr1[k] is not None and appr1[k]!="":
+                same_count += 1
+
+    if same_count>=1:
+        return True
+    return False
+
+
+def merge_approval_real(ots_client,ots_capacity,approval):
+    doc_num = approval.get("doc_num","")
+    doctitle = approval.get("doctitle","")
+    project_name = approval.get("project_name","")
+    project_code = approval.get("project_code","")
+
+    docid = approval.get("docid")
+    should_queries = []
+
+    if doc_num!="":
+        should_queries.append(MatchPhraseQuery("doctitle",doc_num))
+        should_queries.append(MatchPhraseQuery("doctextcon",doc_num))
+        should_queries.append(MatchPhraseQuery("attachmenttextcon",doc_num))
+    if doctitle!="":
+        should_queries.append(MatchPhraseQuery("doctitle",doctitle))
+        should_queries.append(MatchPhraseQuery("doctextcon",doctitle))
+        should_queries.append(MatchPhraseQuery("attachmenttextcon",doctitle))
+    if project_name!="":
+        should_queries.append(MatchPhraseQuery("doctitle",project_name))
+        should_queries.append(MatchPhraseQuery("doctextcon",project_name))
+        should_queries.append(MatchPhraseQuery("attachmenttextcon",project_name))
+    if project_code!="":
+        should_queries.append(MatchPhraseQuery("doctitle",project_code))
+        should_queries.append(MatchPhraseQuery("doctextcon",project_code))
+        should_queries.append(MatchPhraseQuery("attachmenttextcon",project_code))
+
+
+    _query = BoolQuery(should_queries=should_queries,must_not_queries=[TermQuery("docid",docid)])
+    bool_query = BoolQuery(must_queries=[
+        RangeQuery("status",201,301),
+        _query
+    ])
+    rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
+                                                                   SearchQuery(bool_query),
+                                                                   ColumnsToGet(["doctitle","page_time","project_name","project_code","approval_json","extract_json"],return_type=ColumnReturnType.SPECIFIED))
+    list_data = getRow_ots(rows)
+    approvals = [approval]
+    for _d in list_data:
+        approval_json = _d.get("approval_json")
+        partitionkey = _d.get("partitionkey")
+        docid = _d.get("docid")
+        doctitle = _d.get("doctitle")
+        project_name = _d.get("project_name")
+        page_time = _d.get("page_time")
+        extract_json = _d.get("extract_json")
+
+
+        _d_html = {"partitionkey":partitionkey,"docid":docid}
+        _html = Document(_d_html)
+        _html.fix_columns(ots_capacity,["dochtmlcon"],True)
+        dochtml = _html.getProperties().get("dochtmlcon","")
+        doctextcon = BeautifulSoup(dochtml,"lxml").get_text()
+        attachmenttextcon = ""
+
+        try:
+            _extract = json.loads(extract_json)
+        except Exception  as e:
+            _extract = {}
+        proportion = _extract.get("pb",{}).get("proportion")
+        _,usearea = extract_usearea(doctextcon+attachmenttextcon)
+        env_invest = extract_env_invest(doctextcon+attachmenttextcon)
+        moneyuse = extract_moneyuse(doctextcon+attachmenttextcon)
+        if approval_json:
+            list_approval = json.loads(approval_json)
+            for _appr in list_approval:
+                _appr["partitionkey"] = partitionkey
+                _appr["docid"] = docid
+                _appr["doctitle"] = doctitle
+                _appr["page_time"] = page_time
+                _appr["usearea"] = usearea
+                _appr["env_invest"] = env_invest
+                _appr["moneyuse"] = moneyuse
+
+                fix_area(ots_client,_appr)
+
+                construction_scale = _d.get("construction_scale","")
+                proportion,_ = extract_proportion(construction_scale)
+                if proportion!="":
+                    _appr["proportion"] = proportion
+                _,usearea = extract_usearea(construction_scale)
+                if usearea!="":
+                    _appr["usearea"] = usearea
+                env_invest = extract_env_invest(construction_scale)
+                if env_invest!="":
+                    _appr["env_invest"] = env_invest
+                moneyuse = extract_moneyuse(construction_scale)
+                if moneyuse!="":
+                    _appr["moneyuse"] = moneyuse
+                if check_approval(approval,_appr):
+                    approvals.append(_appr)
+    return approvals
+
+def get_enterprise_area(ots_client,name):
+    bool_query = BoolQuery(must_queries=[
+        TermQuery("name",name)
+    ])
+    rows,next_token,total_count,is_all_succeed = ots_client.search("enterprise","enterprise_index",
+                                                                   SearchQuery(bool_query),
+                                                                   ColumnsToGet(["province","city","district"],return_type=ColumnReturnType.SPECIFIED))
+    list_data = getRow_ots(rows)
+    _d = {}
+    if len(list_data)>0:
+        _d["province"] = list_data[0].get("province","")
+        _d["city"] = list_data[0].get("city","")
+        _d["district"] = list_data[0].get("district","")
+    return _d
+
+def area_count(_d):
+    keys = ["province","city","district"]
+    return sum([1 if _d.get(k,"") not in ("","全国","未知") else 0 for k in keys])
+
+def fix_area(ots_client,appr):
+    if appr.get("district","")!="":
+        return
+    declare_company = appr.get("declare_company","")
+    _d = get_enterprise_area(ots_client,declare_company)
+    if area_count(_d)>area_count(appr):
+        appr.update(_d)
+
+    construct_company = appr.get("construct_company","")
+    _d = get_enterprise_area(ots_client,construct_company)
+    if area_count(_d)>area_count(appr):
+        appr.update(_d)
+
+    approver = appr.get("approver","")
+    _d = get_enterprise_area(ots_client,approver)
+    if area_count(_d)>area_count(appr):
+        appr.update(_d)
+
+    compilation_unit = appr.get("compilation_unit","")
+    _d = get_enterprise_area(ots_client,approver)
+    if area_count(_d)>area_count(appr):
+        appr.update(_d)
+
+    publisher = appr.get("publisher","")
+    _d = get_enterprise_area(ots_client,publisher)
+    if area_count(_d)>area_count(appr):
+        appr.update(_d)
+
+
+
+def generate_projects(approvals):
+    project_id = str(uuid4())
+    approvals.sort(key=lambda x:x.get("page_time",""),reverse=False)
+    _dict = {}
+    for appr in approvals:
+        _d = {}
+        _d_area = {}
+        for k,v in appr.items():
+            if v is not None and v!="":
+                if k in ("province","city","district"):
+                    _d_area[k] = v
+                else:
+                    _d[k] = v
+        if _dict.get("province","")=="" and _d_area.get("province","")!="":
+            _dict.update(_d_area)
+        if _dict.get("city","")=="" and _d_area.get("city","")!="":
+            _dict.update(_d_area)
+        if _dict.get("district","")=="" and _d_area.get("district","")!="":
+            _dict.update(_d_area)
+        _dict.update(_d)
+    _dict["id"] = project_id
+    return _dict
+
+
+def merge_approval():
+    ots_client = getConnect_ots()
+    ots_capacity = getConnect_ots_capacity()
+
+    list_data = []
+
+    # filename = r"G:\新建文件夹\WeChat Files\wxid_kluerlj8cn3b21\FileStorage\File\2024-11\20241104审批项目公告_审批要素.xlsx"
+    # df = pd.read_excel(filename)
+    # _count = 0
+    # for docid in df["公告id"]:
+    #     docid = int(docid)
+    #     _count += 1
+    #     # if _count>3000:
+    #     #     break
+    #     # if docid!=400066972170 and docid!=400066972181:
+    #     #     continue
+    #     # list_approval = get_approval_data(ots_client,docid)
+    #     # if list_approval:
+    #     #     list_data.extend(list_approval)
+    #     list_data.append(docid)
+
+    bool_query = BoolQuery(must_queries=[
+        RangeQuery("status",201,301),
+        TermQuery("page_time","2024-11-04"),
+        TermQuery("docchannel",302),
+    ])
+    rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
+                                                                   SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("page_time")]),limit=100,get_total_count=True),
+                                                                   ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
+    list_row = getRow_ots(rows)
+    for _data in list_row:
+        list_data.append(_data.get("docid"))
+
+    while next_token:
+        rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
+                                                                   SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+                                                                   ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
+        list_row = getRow_ots(rows)
+        for _data in list_row:
+            list_data.append(_data.get("docid"))
+        print("%d/%d"%(len(list_data),total_count))
+        # if len(list_data)>=2000:
+        #     break
+
+    task_queue = Queue()
+    for _data in list_data:
+        task_queue.put(_data)
+
+    result_queue = Queue()
+
+    def merge_approval_handle(docid,result_queue):
+        print("docid",docid)
+        list_approval = get_approval_data(ots_client,ots_capacity,docid)
+        if list_approval:
+            for appr in list_approval:
+                approvals = merge_approval_real(ots_client,ots_capacity,appr)
+                result_queue.put(approvals)
+
+    mt = MultiThreadHandler(task_queue,merge_approval_handle,result_queue,30)
+    mt.run()
+
+    list_approvals = []
+    try:
+        while 1:
+            item = result_queue.get(timeout=1)
+            list_approvals.append(item)
+    except:
+        pass
+
+    data_approval = []
+    data_approvals_p = []
+    for approvals in list_approvals:
+        _project = generate_projects(approvals)
+        _project_id = _project.get("id")
+
+        for _approval in approvals:
+
+            _d = {"项目id":_project_id}
+            for k,v in key_trans_d.items():
+                if k in _approval:
+                    _d[v] = _approval[k]
+                else:
+                    _d[v] = ""
+            data_approval.append(_d)
+        _d = {"项目id":_project_id}
+        for k,v in key_trans.items():
+            if k in _project:
+                _d[v] = _project[k]
+            else:
+                _d[v] = ""
+        data_approvals_p.append(_d)
+
+
+
+    df_approval = pd.DataFrame(data_approval)
+    df_approvals_p = pd.DataFrame(data_approvals_p)
+    df_approval.to_excel("a.xlsx")
+    df_approvals_p.to_excel("b.xlsx")
+
+
+
+
+
+if __name__ == '__main__':
+    merge_approval()

+ 164 - 0
BaseDataMaintenance/maintenance/gpt_extract.py

@@ -0,0 +1,164 @@
+#coding:utf8
+
+from BaseDataMaintenance.chat.ERNIE_utils import *
+
+from BaseDataMaintenance.dataSource.source import getConnect_ots
+from BaseDataMaintenance.chat.chatUtil import *
+
+from tablestore import *
+from BaseDataMaintenance.common.Utils import getRow_ots,getCurrent_date,timeAdd
+from bs4 import BeautifulSoup
+import json
+import re
+import pandas as pd
+import time
+
+
+
+def get_columns(ots_client,docid,columns):
+
+    bool_query = BoolQuery(must_queries=[TermQuery("docid",docid)])
+    rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
+                                                                   SearchQuery(bool_query),
+                                                                   ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
+    list_data = getRow_ots(rows)
+    _dict = {}
+    if len(list_data)==1:
+        _dict = list_data[0]
+    return _dict
+
+
+def jsonchat(msg,try_times):
+
+    try:
+        print(msg)
+    except Exception as e:
+        pass
+    while try_times>0:
+        try:
+            try_times -= 1
+            resp = chat(msg)
+            time.sleep(1)
+
+            if resp.status_code == 200:
+                result_dict = json.loads(resp.text)
+                result = result_dict.get("result", "")
+                error_msg = result_dict.get("error_msg")
+                if error_msg is not None:
+                    print("error_msg",error_msg)
+                    time.sleep(10)
+                    continue
+                _pattern = "```json(?P<json>.*)```"
+                _search = re.search(_pattern, result, re.DOTALL)
+                if _search is not None:
+                    _json = _search.groupdict().get("json")
+                    _d = json.loads(_json)
+                    return _json
+        except Exception as e:
+            pass
+
+
+def extract_tenderee():
+    filename = r'F:\Workspace2016\DataMining\data\2024-11-26_174430_数据导出.xlsx'
+    df = pd.read_excel(filename)
+
+    ots_client = getConnect_ots()
+
+    list_data = []
+
+    for docid in df["docid"]:
+        docid = int(docid)
+        # if docid!=559799502:
+        #     continue
+        _dict = get_columns(ots_client,docid,["doctextcon","attachmenttextcon","nlp_enterprise","nlp_enterprise_attachment"])
+        doctextcon = _dict.get("doctextcon","")
+        attachmenttextcon = _dict.get("attachmenttextcon","")
+        nlp_enterprise = _dict.get("nlp_enterprise","")
+        nlp_enterprise_attachment = _dict.get("nlp_enterprise_attachment","")
+
+        pre_tenderee = ""
+        if len(nlp_enterprise)>2:
+            _ent = json.loads(nlp_enterprise)
+            pre_tenderee = _ent[0]
+        if len(nlp_enterprise_attachment)>2:
+            _ent = json.loads(nlp_enterprise_attachment)
+            pre_tenderee = _ent[0]
+
+        msg = '''从内容中提取出招标人,招标人应该是公司实体,如果没有则返回"",返回结果为json格式{"tenderee":""}\n%s\n%s''' % (str(doctextcon),str(attachmenttextcon))
+        _json = jsonchat(msg,3)
+        new_tenderee = ""
+        if _json is not None:
+            _d = json.loads(_json)
+            new_tenderee = _d.get("tenderee")
+        new_d = {"docid":docid,"nlp_enterprise":nlp_enterprise,"nlp_enterprise_attachment":nlp_enterprise_attachment,
+                 "pre_tenderee":pre_tenderee,"new_tenderee":new_tenderee}
+        list_data.append(new_d)
+        print(new_d)
+    df1 = pd.DataFrame(list_data)
+    df1.to_excel("tenderee_extract.xlsx",columns=["docid","nlp_enterprise","nlp_enterprise_attachment","pre_tenderee","new_tenderee"])
+
+def prompt_tenderee():
+    _prompt = '招标人,招标人应该是公司实体,如果没有则返回""'
+    _ret = {"招标人":""}
+    return _prompt,_ret
+
+def prompt_budget():
+    _prompt = "预算金额,如果没有则默认0"
+    _ret = {"预算金额":0}
+    return _prompt,_ret
+
+def prompt_win_tenderer():
+    _prompt = '中标人及其中标金额,中标人应该是公司实体,中标金额没有则默认0,中标人与中标金额放到一个字典中,如果有多个,则在数组中分别返回,如果没有则返回空数组'
+    _ret = {"中标人及金额":[{"中标人":"","中标金额":0}]}
+    return _prompt,_ret
+
+def extract_bidding_budget():
+    pass
+
+def extract_win_tenderer():
+    pass
+
+
+def get_data_to_qualify(ots_client,count=-1):
+    current_date = getCurrent_date('%Y-%m-%d')
+    last_date = timeAdd(current_date,-1)
+    bool_query = BoolQuery(
+        must_queries=[
+            RangeQuery("crtime",last_date,current_date),
+            RangeQuery("status",201,301),
+            TermsQuery("docchannel",[52,101,119,120])
+        ]
+    )
+
+    list_data = []
+    rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
+                                                                   SearchQuery(bool_query,limit=100,get_total_count=True),
+                                                                   ColumnsToGet(["extract_json"],return_type=ColumnReturnType.SPECIFIED))
+    list_data.extend(getRow_ots(rows))
+    while 1:
+        if next_token is None or len(list_data)>=30*10000:
+            break
+        rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
+                                                                   SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+                                                                   ColumnsToGet(["extract_json"],return_type=ColumnReturnType.SPECIFIED))
+        list_data.extend(getRow_ots(rows))
+        if count>0 and list_data>=count:
+            break
+    return list_data
+
+
+def quality_inspection():
+
+
+def merge_extract_json():
+
+
+if __name__ == '__main__':
+    extract_tenderee()
+
+
+
+
+
+
+

+ 1 - 1
BaseDataMaintenance/maintenance/product/htmlparser.py

@@ -200,7 +200,7 @@ class ParseDocument():
                 if v is not None:
                     groups.append((k,v))
         if len(groups):
-            # groups.sort(key=lambda x:x[0])
+            groups.sort(key=lambda x:x[0])
             return groups
         return None
 

+ 4 - 3
BaseDataMaintenance/maxcompute/documentDumplicate.py

@@ -1328,9 +1328,10 @@ def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_
     else:
         base_prob = 0.6
     _prob = base_prob*same_count/all_count
-    if min(extract_count_less,extract_count_greater)<=3:
-        if _prob<0.1:
-            _prob = 0.15
+    if min(extract_count_less,extract_count_greater)<=3 and max(extract_count_less,extract_count_greater)>=5:
+        if _prob<0.1 and str(page_time_less)==str(page_time_greater):
+            if str(docchannel_less) not in ("302","303"):
+                _prob = 0.15
         if getLength(province_less)>0 and getLength(province_greater)>0 and province_less not in ("全国","未知") and province_greater not in ("全国","未知") and province_less!=province_greater:
             if b_log:
                 logging.info("province not same:%s-%s"%(province_less,province_greater))

+ 10 - 2
BaseDataMaintenance/maxcompute/documentMerge.py

@@ -2647,6 +2647,7 @@ def check_project_codes_merge(list_code,list_code_to_merge,b_log):
 
 
 def check_merge_rule(_proj,_dict,b_log=False,time_limit=86400*300,return_prob=False,simple_check=False):
+
     docids = _proj.get(project_docids,"")
     page_time = _proj.get(project_page_time,"")
     project_codes = _proj.get(project_project_codes,"")
@@ -2699,6 +2700,14 @@ def check_merge_rule(_proj,_dict,b_log=False,time_limit=86400*300,return_prob=Fa
 
     project_dynamics_to_merge = _dict.get(project_project_dynamics)
 
+    # if len(set([docids,docids_to_merge])&set(["576859812","545764033"]))==2:
+    #     if return_prob:
+    #         return True,1
+    #     return True
+
+    if b_log:
+        log("check %s-%s ,%s-%s"%(docids,docids_to_merge,sub_project_name,sub_project_name_to_merge))
+
     is_few = False
     if (0 if project_codes=="" else 1) + (0 if project_name=="" else 1) + (0 if bidding_budget<0 else 1) +(0 if tenderee=="" else 1) + (0 if win_bid_price<0 else 1) + (0 if win_tenderer=="" else 1)<=1:
         is_few = True
@@ -2815,8 +2824,7 @@ def check_merge_rule(_proj,_dict,b_log=False,time_limit=86400*300,return_prob=Fa
 
     _prob = prob_count/8
 
-    if b_log:
-        log("check %s-%s result%s"%(docids,docids_to_merge,str(check_dict)))
+
     if _prob<0.15:
         if b_log:
             log("prob less than 0.15 prob_count:%d"%(prob_count))

+ 17 - 0
BaseDataMaintenance/model/oracle/QiTaShiXinTemp.py

@@ -0,0 +1,17 @@
+
+import traceback
+from BaseDataMaintenance.model.oracle.TouSuTemp import SouSuTemp
+
+dict_replace = {""}
+
+class QiTaShiXin(SouSuTemp):
+
+    def __init__(self,_dict):
+        SouSuTemp.__init__(self,_dict)
+        self.table_name = "bxkc.t_qi_ta_shi_xin_temp"
+        self.setValue("docchannel",303,True)
+        self.setValue("original_type","qi_ta_shi_xin",True)
+
+    def getPrimary_keys(self):
+        return ["ID"]
+

+ 17 - 0
BaseDataMaintenance/model/oracle/TouSuChuLiTemp.py

@@ -0,0 +1,17 @@
+
+import traceback
+from BaseDataMaintenance.model.oracle.TouSuTemp import SouSuTemp
+
+dict_replace = {""}
+
+class TouSuChuLiTemp(SouSuTemp):
+
+    def __init__(self,_dict):
+        SouSuTemp.__init__(self,_dict)
+        self.table_name = "bxkc.t_tou_su_chu_li_temp"
+        self.setValue("docchannel",303,True)
+        self.setValue("original_type","tou_su_chu_li",True)
+
+    def getPrimary_keys(self):
+        return ["ID"]
+

+ 215 - 0
BaseDataMaintenance/model/oracle/TouSuTemp.py

@@ -0,0 +1,215 @@
+
+import traceback
+from BaseDataMaintenance.model.oracle.BaseModel import BaseModel
+from datetime import datetime
+from BaseDataMaintenance.common.Utils import getCurrent_date,log
+
+dict_oracle2ots = {"WEB_SOURCE_NO":"web_source_no",
+                    "AREA":"area",
+                    "PROVINCE":"province",
+                    "CITY":"city",
+                    "WEB_SOURCE_NAME":"web_source_name",
+                    "INFO_SOURCE":"info_source",
+                    "INFO_TYPE":"info_type",
+                    "INDUSTRY":"industry",
+                    "ID":"uuid",
+                    "PAGE_TITLE":"doctitle",
+                    "PAGE_TIME":"page_time",
+                    "PAGE_CONTENT":"dochtmlcon",
+                    "ATTACHMENT_PATH":"page_attachments",
+                    "CREATE_TIME":"crtime",
+                    "DISTRICT":"district",
+                    "DETAILLINK":"detail_link",
+                   "RECORD_ID":"record_id",
+                   "PUNISHNO":"punishno",
+                   "INSTITUTION":"institution",
+                   "PUNISHTIME":"punish_time",
+                   "PUNISHTYPE":"punish_type",
+                   "COMPLAINANT":"complainant",
+                   "PUNISHPERPLE":"punish_perple",
+                   "PUNISHWHETHER":"punish_whether",
+                   "PUNISHDECISION":"punish_decision",
+                   "docchannel":"docchannel",
+                   "original_type":"original_type"}
+
+
+class SouSuTemp(BaseModel):
+
+    def __init__(self,_dict):
+        self.all_columns = []
+        for k,v in _dict.items():
+            self.setValue(k,v,True)
+
+    def getPrimary_keys(self):
+        raise NotImplementedError()
+
+    def getProperties(self):
+        return self.__dict__
+
+    def getProperties_ots(self):
+        new_dict = {}
+        for k,v in self.__dict__.items():
+            if k in dict_oracle2ots:
+                n_k = dict_oracle2ots[k]
+                if v is not None:
+                    if isinstance(v,(str,int,float)):
+                        pass
+                    elif isinstance(v,(datetime)):
+                        v = v.strftime("%Y-%m-%d %H:%M:%S")
+                    else:
+                        v = str(v)
+                    new_dict[n_k] = v
+        opertime = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
+        publishtime = "%s %s"%(new_dict.get("page_time",""),opertime.split(" ")[1])
+        new_dict["opertime"] = opertime
+        new_dict["publishtime"] = publishtime
+        if "docchannel" in new_dict:
+            new_dict["original_docchannel"] = new_dict["docchannel"]
+        return new_dict
+
+    def setValue(self,k,v,isColumn=False):
+        if "all_columns" not in self.__dict__:
+            self.all_columns = []
+        self.__dict__[k] = v
+        if isColumn:
+            if k not in (set(self.all_columns)):
+                self.all_columns.append(k)
+
+    def delete_row(self,conn):
+        try:
+            cursor = conn.cursor()
+            sql = "delete %s  "%(self.table_name)
+            s_where = " where 1=1 "
+            _set_keys = set(self.getPrimary_keys())
+            has_key = False
+            if len(_set_keys)==0:
+                return
+            for k,v in self.__dict__.items():
+                if k in _set_keys:
+                    if v is None or str(v)=="":
+                        raise RuntimeError("主键%s为空"%k)
+                    s_where += " and %s="%k
+                    if isinstance(v,str):
+                        s_where += "'%s' "%v
+                    else:
+                        s_where += "%d "%v
+                    has_key = True
+            log("delete sql:%s-%s %s"%(str(has_key),sql,s_where))
+            if has_key:
+                sql = "%s %s"%(sql,s_where)
+                update_rows = cursor.execute(sql)
+                conn.commit()
+                return update_rows
+        except Exception as e:
+            traceback.print_exc()
+        return 0
+
+    def insert_row(self,conn):
+        try:
+            cursor = conn.cursor()
+            sql = "insert into %s"%(self.table_name)
+            s_columns = "("
+            s_values = "values("
+            _set_columns = set(self.all_columns)
+            for k,v in self.__dict__.items():
+                if k in _set_columns:
+                    if v is not None and str(v)!="":
+                        s_columns += "%s,"%k
+
+                        if isinstance(v,(int,)):
+                            s_values += "%d,"%v
+
+                        elif isinstance(v,(datetime)):
+                            s_values += "to_date('%s','yyyy-MM-dd HH24:mi:ss'),"%v.strftime("%Y-%m-%d %H:%M:%S")
+                        else:
+                            s_values += "'%s',"%str(v).replace("'","\'")
+            s_columns = "%s)"%s_columns[:-1]
+            s_values = "%s)"%s_values[:-1]
+            sql = "%s%s%s"%(sql,s_columns,s_values)
+            print("sql",sql)
+            cursor.execute(sql)
+            conn.commit()
+        except Exception as e:
+            traceback.print_exc()
+
+
+    def update_row(self,conn,conditions=[]):
+        cursor = conn.cursor()
+        sql = "update %s set "%(self.table_name)
+        s_columns = ""
+        s_where = " where 1=1 "
+        _set_columns = set(self.all_columns)
+        _set_keys = set(self.getPrimary_keys())
+        for k,v in self.__dict__.items():
+            if k in _set_columns and k not in _set_keys:
+                if v is not None and str(v)!="":
+                    s_columns += "%s="%k
+                    if isinstance(v,str):
+                        s_columns += "'%s',"%v
+                    else:
+                        s_columns += "%d,"%v
+            elif k in _set_keys:
+                if v is None or str(v)=="":
+                    raise RuntimeError("主键%s为空"%k)
+                s_where += " and %s="%k
+                if isinstance(v,str):
+                    s_where += "'%s' "%v
+                else:
+                    s_where += "%d "%v
+        s_columns = "%s"%s_columns[:-1]
+        sql = "%s%s%s"%(sql,s_columns,s_where)
+        update_rows = cursor.execute(sql)
+        conn.commit()
+        return update_rows
+
+
+
+    def exists(self,conn):
+        s_where = " where 1=1 "
+        _set_columns = set(self.all_columns)
+        _set_keys = set(self.getPrimary_keys())
+        for k,v in self.__dict__.items():
+            if k in _set_keys:
+                if v is None or str(v)=="":
+                    raise RuntimeError("主键%s为空"%k)
+                s_where += " and %s="%k
+                if isinstance(v,str):
+                    s_where += "'%s' "%v
+                else:
+                    s_where += "%d "%v
+        cursor = conn.cursor()
+        sql = "select count(1) from %s %s"%(self.table_name,s_where)
+        cursor.execute(sql)
+        rows = cursor.fetchall()
+        if rows[0][0]==0:
+            return False
+        return True
+
+    @staticmethod
+    def select_rows(conn,cls,table_name,conditions,rows_to_get="*",limit=60):
+        list_result = []
+        s_limit = ""
+        if limit is not None:
+            s_limit = " and rownum<=%d"%limit
+        if len(conditions)>0:
+            s_where = " where %s"%(" and ".join(conditions))
+        else:
+            s_where = " where 1=1 "
+
+        cursor = conn.cursor()
+        sql = "select %s from %s %s %s"%(rows_to_get,table_name,s_where,s_limit)
+        log(sql)
+        cursor.execute(sql)
+
+        vol = cursor.description
+        rows = cursor.fetchall()
+        for row in rows:
+            _dict = {}
+            for _vol,_val in zip(vol,row):
+                _name = _vol[0]
+                _dict[_name] = _val
+            list_result.append(cls(_dict))
+        return list_result
+
+
+

+ 56 - 0
BaseDataMaintenance/model/oracle/WeiFaJiLuTemp.py

@@ -0,0 +1,56 @@
+
+import traceback
+from BaseDataMaintenance.model.oracle.TouSuTemp import SouSuTemp
+
+dict_replace = {""}
+
+class WeiFaJiLuTemp(SouSuTemp):
+
+    def __init__(self,_dict):
+        SouSuTemp.__init__(self,_dict)
+        self.table_name = "bxkc.t_wei_fa_ji_lu_temp"
+        self.setValue("docchannel",303,True)
+        self.setValue("original_type","wei_fa_ji_lu",True)
+
+    def getPrimary_keys(self):
+        return ["ID"]
+
+    @staticmethod
+    def synchonize():
+        try:
+            print("123")
+            from BaseDataMaintenance.dataSource.source import getConnection_oracle
+            conn = getConnection_oracle()
+            cursor = conn.cursor()
+            has_commit = 0
+            while 1:
+                sql = '''
+                INSERT INTO bxkc.t_wei_fa_ji_lu_temp
+SELECT *
+FROM (
+         SELECT w.*
+         FROM bxkc.t_wei_fa_ji_lu w
+                  LEFT JOIN bxkc.id_wei_fa_ji_lu b ON w.id = b.id
+         WHERE b.id IS not NULL
+     ) res
+WHERE ROWNUM < 10001
+                '''
+                cursor.execute(sql)
+                row_effected = cursor.rowcount
+
+                if row_effected==0:
+                    break
+                print("row_effected",row_effected)
+                sql1 = '''
+                delete bxkc.id_wei_fa_ji_lu where id in (select id from bxkc.t_wei_fa_ji_lu_temp)
+                '''
+                cursor.execute(sql1)
+                conn.commit()
+
+
+        except Exception as e:
+            traceback.print_exc()
+
+
+if __name__ == '__main__':
+    WeiFaJiLuTemp.synchonize()

+ 31 - 28
BaseDataMaintenance/model/ots/document.py

@@ -342,25 +342,25 @@ def turn_document_status():
         #
         # )
 
-        rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
-                                                                       SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
-                                                                       columns_to_get=ColumnsToGet(["product","product_number"],return_type=ColumnReturnType.SPECIFIED))
-        list_data = getRow_ots(rows)
-        print(total_count)
-        _count = len(list_data)
-        for _data in list_data:
-            _document = Document(_data)
-            task_queue.put(_document)
-        while next_token:
-            rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
-                                                                           SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
-                                                                           columns_to_get=ColumnsToGet(["product"],return_type=ColumnReturnType.SPECIFIED))
-            list_data = getRow_ots(rows)
-            _count += len(list_data)
-            print("%d/%d"%(_count,total_count))
-            for _data in list_data:
-                _document = Document(_data)
-                task_queue.put(_document)
+        # rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
+        #                                                                SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
+        #                                                                columns_to_get=ColumnsToGet(["product","product_number"],return_type=ColumnReturnType.SPECIFIED))
+        # list_data = getRow_ots(rows)
+        # print(total_count)
+        # _count = len(list_data)
+        # for _data in list_data:
+        #     _document = Document(_data)
+        #     task_queue.put(_document)
+        # while next_token:
+        #     rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
+        #                                                                    SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+        #                                                                    columns_to_get=ColumnsToGet(["product"],return_type=ColumnReturnType.SPECIFIED))
+        #     list_data = getRow_ots(rows)
+        #     _count += len(list_data)
+        #     print("%d/%d"%(_count,total_count))
+        #     for _data in list_data:
+        #         _document = Document(_data)
+        #         task_queue.put(_document)
 
         # docids = [223820830,224445409]
         # for docid in docids:
@@ -368,9 +368,9 @@ def turn_document_status():
         #              document_partitionkey:int(docid)%500+1,
         #              }
         #     task_queue.put(Document(_dict))
-        # import pandas as pd
-        # df = pd.read_excel(r"F:\Workspace2016\DataMining\data\2024-07-24_143135_数据导出.xlsx")
-        # list_docid = df["docid"]
+        import pandas as pd
+        df = pd.read_csv(r"C:\Users\Administrator\Desktop\export_241224_6.csv")
+        list_docid = df["docid"]
         # list_docid = [519497468]
 
         # list_docid = []
@@ -385,11 +385,13 @@ def turn_document_status():
         #         if re.search("^\d+$",docid) is not None:
         #             list_docid.append(int(docid))
 
-        # for docid in list_docid:
-        #     _dict = {document_docid:int(docid),
-        #              document_partitionkey:int(docid)%500+1,
-        #              }
-        #     task_queue.put(Document(_dict))
+        for docid,construct_company,recall_flag in zip(list_docid,df["construct_company"],df["recall_flag"]):
+            if recall_flag == 1:
+                _dict = {document_docid:int(docid),
+                         document_partitionkey:int(docid)%500+1,
+                         "construct_company":construct_company
+                         }
+                task_queue.put(Document(_dict))
         # for docid in df["docid2"]:
         #     _dict = {document_docid:int(docid),
         #              document_partitionkey:int(docid)%500+1,
@@ -426,11 +428,12 @@ def turn_document_status():
         # item.setValue(document_district,"金湾区",True)
         # item.setValue(document_status,66,True)
         # print(item.getProperties())
-        item.setValue(document_status,1,True)
+        # item.setValue(document_status,1,True)
         # product = item.getProperties().get(document_product)
         # l_product = product.split(",")
         # n_product = ",".join(l_product[:500])
         # item.setValue(document_product,n_product,True)
+        # item.fix_columns(ots_client,["extract_json","doctitle",""],True)
         item.update_row(ots_client)
         # log("update %d status done"%(item.getProperties().get(document_docid)))
         pass