Эх сурвалжийг харах

Merge branch 'master' of http://192.168.2.103:3000/luojiehua/BaseDataMaintenance

znj 1 жил өмнө
parent
commit
f45d0c6d88

+ 2 - 1
BaseDataMaintenance/common/Utils.py

@@ -26,7 +26,7 @@ USE_PAI_EAS = False
 
 Lazy_load = False
 
-ILLEGAL_CHARACTERS_RE = re.compile(r'[\000-\010]|[\013-\014]|[\016-\037]')
+ILLEGAL_CHARACTERS_RE = re.compile(r'[\000-\010]|[\013-\014]|[\016-\037]|\x00')
 
 import smtplib
 from email.mime.application import MIMEApplication
@@ -257,6 +257,7 @@ def cut_str(text_list, only_text_list, max_bytes_length=2000000):
 def getLegal_str(_str):
     if _str is not None:
         return ILLEGAL_CHARACTERS_RE.sub("",str(_str))
+    return ""
 
 def getRow_ots_primary(row):
     _dict = dict()

+ 8 - 5
BaseDataMaintenance/dataMonitor/data_monitor.py

@@ -30,7 +30,7 @@ flow_init_log_dir = "/data/python/flow_init_log"
 flow_init_check_dir = "/data/python/flow_init_check"
 
 
-flow_dumplicate_log_path = "/python_log/flow_dumplicate.log"
+flow_dumplicate_log_path = "/home/appuser/python/flow_dumplicate.log"
 
 
 class BaseDataMonitor():
@@ -209,7 +209,7 @@ class BaseDataMonitor():
             #                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
             total_count_todeal = getQueueSize("dataflow_attachment")
 
-            if total_count_todeal>100:
+            if total_count_todeal>500:
                 # query = BoolQuery(must_queries=[
                 #     RangeQuery("crtime",self.get_last_tenmin_time(16))
                 # ])
@@ -277,6 +277,8 @@ class BaseDataMonitor():
                     _msg_type += "\n类型%s\n\t成功%s,消耗%s秒,%.2f秒/个,\n\t失败%s,消耗%s秒,%.2f秒/个"%(k,str(v.get("success")),str(v.get("success_costtime")),v.get("success_costtime")/max(1,v.get("success")),str(v.get("fail")),str(v.get("fail_costtime")),v.get("fail_costtime")/max(1,v.get("fail")))
 
                 _msg = "附件提取队列报警:队列堆积%s条公告,最近十分钟处理公告附件数:%s,处理成功数:%s"%(str(total_count_todeal),str(process_count),str(process_succeed_count))
+                log(_msg)
+                log(_msg_type)
                 sentMsgToDD(_msg+_msg_type,ACCESS_TOKEN_DATAWORKS)
                 # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
         except Exception as e:
@@ -301,7 +303,7 @@ class BaseDataMonitor():
 
             total_count_todeal = getQueueSize("dataflow_extract")
 
-            if total_count_todeal>100:
+            if total_count_todeal>500:
                 _cmd = 'cat %s | grep "%s" | grep -c "process.*docid"'%(flow_extract_log_path,self.get_last_tenmin_time())
                 log(_cmd)
                 process_count = self.cmd_execute(_cmd)
@@ -336,6 +338,7 @@ class BaseDataMonitor():
                 #                                                                              columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
 
                 _msg = "要素提取队列报警:队列堆积%s条公告,最近十分钟入库数:%s,最近十分钟处理公告数:%s,其中成功处理数:%s,查库免提取数:%s"%(str(total_count_todeal),str(init_count),str(process_count),str(success_count),str(exists_count))
+                log(_msg)
                 atAll=False
                 if success_count==0:
                     atAll=True
@@ -615,11 +618,11 @@ class BaseDataMonitor():
 
         # scheduler.add_job(self.monitor_attachment,"cron",minute="*/10")
         scheduler.add_job(self.monitor_extract,"cron",minute="*/10")
-        scheduler.add_job(self.monitor_proposedBuilding,"cron",hour="*/3")
+        scheduler.add_job(self.monitor_proposedBuilding,"cron",hour="*/11")
         # scheduler.add_job(self.monitor_dumplicate,"cron",minute="*/10")
         scheduler.add_job(self.monitor_sychr,"cron",minute="*/10")
         scheduler.add_job(self.monitor_preproject,"cron",hour="8")
-        scheduler.add_job(self.monitor_merge,"cron",minute="*/30")
+        scheduler.add_job(self.monitor_merge,"cron",minute="*/60")
         scheduler.add_job(self.monitor_init,"cron",hour="*/3")
         scheduler.start()
 

+ 25 - 9
BaseDataMaintenance/maintenance/dataflow.py

@@ -1351,7 +1351,7 @@ class Dataflow():
 
 
     def flow_dumplicate(self,process_count=flow_process_count,status_from=flow_dumplicate_status_from):
-        def producer(columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_product,document_tmp_fingerprint,document_tmp_tenderee,document_tmp_agency,document_tmp_project_code,document_tmp_project_name,document_tmp_doctitle_refine,document_tmp_doctitle,document_tmp_sub_docs_json]):
+        def producer(columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_product,document_tmp_fingerprint,document_tmp_tenderee,document_tmp_agency,document_tmp_project_code,document_tmp_project_name,document_tmp_doctitle_refine,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_web_source_name]):
             bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*status_from,True,True)])
             rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
                                                                                 SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
@@ -2854,7 +2854,7 @@ class Dataflow_dumplicate(Dataflow):
 
         return list_rules,table_name,table_index
 
-    def producer_flow_dumplicate(self,process_count,status_from,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path]):
+    def producer_flow_dumplicate(self,process_count,status_from,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_name]):
         q_size = self.queue_dumplicate.qsize()
         log("dumplicate queue size %d"%(q_size))
 
@@ -2913,7 +2913,7 @@ class Dataflow_dumplicate(Dataflow):
 
     def flow_dumpcate_comsumer(self):
         from multiprocessing import Process
-        process_count = 3
+        process_count = 6
         thread_count = 12
         list_process = []
         def start_thread():
@@ -3956,6 +3956,7 @@ class Dataflow_dumplicate(Dataflow):
         page_time = item.get(document_page_time,"")
         has_before = False
         has_after = False
+
         if len(page_time)>0:
             l_page_time = timeAdd(page_time,days=-90)
             dict_time = item.get("dict_time",{})
@@ -3965,9 +3966,22 @@ class Dataflow_dumplicate(Dataflow):
                         has_before = True
                     if v>page_time:
                         has_after = True
-        if not has_after and has_before:
-            log("check page_time false %s==%s-%s"%(l_page_time,k,v))
-            return False
+        if has_before:
+            _query = BoolQuery(must_queries=[MatchPhraseQuery(document_doctitle,item.get(document_doctitle,""))],
+                               must_not_queries=[TermQuery(document_docid,item.get(document_docid,0))])
+            if not has_after:
+                log("check page_time false %s==%s-%s"%(l_page_time,k,v))
+
+                rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
+                                       SearchQuery(_query,get_total_count=True,limit=1))
+                if total_count>0:
+                    return False
+            if item.get(document_web_source_name,"")=="中国政府采购网":
+                rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
+                                                                                    SearchQuery(_query,get_total_count=True,limit=1))
+                if total_count>0:
+                    return False
+
         return True
 
 
@@ -4071,11 +4085,13 @@ class Dataflow_dumplicate(Dataflow):
                 dtmp.setValue(document_tmp_save,1,True)
 
             list_merge_dump = []
-            if exist_finterprint and dtmp.getProperties().get(document_tmp_save)==0:
+            if (exist_finterprint and dtmp.getProperties().get(document_tmp_save)==0) or item.get(document_docchannel,0) in (301,302):
                 log("exist_finterprint %s"%(str(item.get(document_tmp_docid))))
                 dtmp.setValue(document_tmp_projects,"[]",True)
             else:
                 project_json,list_merge_dump = self.merge_document_real(item,list_docids,table_name,dtmp.getProperties().get(document_tmp_save),flow_dumplicate_status_to,b_log)
+                if list_merge_dump is not None and str(item.get(document_tmp_docid)) in list_merge_dump:
+                    dtmp.setValue(document_tmp_save,0,True)
                 dtmp.setValue(document_tmp_projects,project_json,True)
             log("upgrate %s save:%s:docid:%d,final_list:%d,rules:%d,best_docid:%s,dmp_docid:%s"%(str(upgrade),dtmp.getProperties().get(document_tmp_save),item.get(document_tmp_docid),len(final_list),len(list_rules),str(best_docid),dmp_docid))
 
@@ -4205,7 +4221,7 @@ class Dataflow_dumplicate(Dataflow):
 
     def test_dumplicate(self,docid):
         # columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status]
-        columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path]
+        columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_name]
         bool_query = BoolQuery(must_queries=[
             TermQuery("docid",docid)
         ])
@@ -4396,7 +4412,7 @@ if __name__ == '__main__':
     # test_attachment_interface()
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
-    df_dump.test_dumplicate(378760606
+    df_dump.test_dumplicate(332439629
                             )
     # compare_dumplicate_check()
     # df_dump.test_merge([391898061

+ 61 - 2
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -14,6 +14,7 @@ from BaseDataMaintenance.model.ots.document import Document
 from BaseDataMaintenance.common.Utils import article_limit
 from BaseDataMaintenance.common.documentFingerprint import getFingerprint
 from BaseDataMaintenance.model.postgres.document_extract import *
+from BaseDataMaintenance.model.oracle.T_SHEN_PI_XIANG_MU import *
 
 import sys
 sys.setrecursionlimit(1000000)
@@ -57,7 +58,8 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                 log("get message of idx:%s"%(str(self._idx)))
                 message_id = headers.headers["message-id"]
                 body = headers.body
-                _dict = {"frame":headers,"conn":self.conn}
+
+                _dict = {"frame":headers,"conn":self.conn,"idx":self._idx}
                 self._func(_dict=_dict)
             except Exception as e:
                 traceback.print_exc()
@@ -115,7 +117,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                 if self.list_attachment_comsumer[i].conn.is_connected():
                     continue
                 else:
-                    listener = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,_i)
+                    listener = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,i)
                     createComsumer(listener,self.mq_attachment)
                     self.list_attachment_comsumer[i] = listener
             time.sleep(5)
@@ -155,9 +157,16 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
             conn = _dict["conn"]
             message_id = frame.headers["message-id"]
             item = json.loads(frame.body)
+            _idx = _dict.get("idx",1)
             page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
             _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
 
+            if random.random()<0.2:
+                log("jump by random")
+                if send_msg_toacmq(self.pool_mq,frame.body,self.mq_attachment):
+                    ackMsg(conn,message_id)
+                    return
+
             if len(page_attachments)==0:
                 newitem ={"item":item,"list_attach":[],"message_id":message_id,"conn":conn}
             else:
@@ -799,6 +808,8 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
 
     def start_extract_listener(self):
 
+        self.list_extract_comsumer = []
+
         for _i in range(self.comsumer_count):
             listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
             createComsumer(listener_extract,self.mq_extract)
@@ -1296,6 +1307,8 @@ class Dataflow_init(Dataflow):
 
     def __init__(self):
         Dataflow.__init__(self)
+        self.max_shenpi_id = None
+        self.base_shenpi_id = 400000000000
         self.mq_init = "/queue/dataflow_init"
 
         self.mq_attachment = "/queue/dataflow_attachment"
@@ -1347,6 +1360,52 @@ class Dataflow_init(Dataflow):
             traceback.print_exc()
             self.pool_oracle.decrease()
 
+    def shengpi2mq(self):
+
+        conn_oracle = self.pool_oracle.getConnector()
+
+        try:
+            if self.max_shenpi_id is None:
+                # get the max_shenpi_id
+                _query = BoolQuery(must_queries=[ExistsQuery("id")])
+                rows,next_token,total_count,is_all_succeed = self.ots_client.search("t_shen_pi_xiang_mu","t_shen_pi_xiang_mu_index",
+                                                                                    SearchQuery(_query,sort=Sort(sorters=[FieldSort("id",SortOrder.DESC)]),limit=1))
+                list_data = getRow_ots(rows)
+                if len(list_data)>0:
+                    max_shenpi_id = list_data[0].get("id")
+                    if max_shenpi_id>self.base_shenpi_id:
+                        max_shenpi_id -= self.base_shenpi_id
+                    self.max_shenpi_id = max_shenpi_id
+            if self.max_shenpi_id is not None:
+                # select data in order
+                list_data = T_SHEN_PI_XIANG_MU.select_rows(conn_oracle,self.max_shenpi_id,)
+
+                # send data to mq one by one with max_shenpi_id updated
+                for _data in list_data:
+                    _id = _data.getProperties().get(T_SHEN_PI_XIANG_MU_ID)
+
+                    ots_dict = _data.getProperties_ots()
+                    if ots_dict["docid"]<self.base_shenpi_id:
+                        ots_dict["docid"] += self.base_shenpi_id
+
+                    if ots_dict.get(T_SHEN_PI_XIANG_MU_PAGE_ATTACHMENTS,"") !='[]':
+                        if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_attachment):
+                            self.max_shenpi_id = _id
+                        else:
+                            log("sent shenpi message to mq failed %s"%(_id))
+                            break
+                    else:
+                        if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_extract):
+                            self.max_shenpi_id = _id
+                        else:
+                            log("sent shenpi message to mq failed %s"%(_id))
+                            break
+
+        except Exception as e:
+            traceback.print_exc()
+            self.pool_oracle.decrease()
+
+
 
     def ots2mq(self):
         try:

+ 146 - 0
BaseDataMaintenance/maintenance/product/extract_data.py

@@ -0,0 +1,146 @@
+
+
+from BaseDataMaintenance.maintenance.product.htmlparser import *
+from BaseDataMaintenance.common.Utils import getLegal_str
+
+
+requirement_pattern = "(采购需求|需求分析|项目说明|合同内容|招标项目技术要求|采购内容及其它要求|服务要求|招标内容[及与和]要求|服务需求|采购内容|项目概况|项目目标|项目服务内容|项目服务范围|需求内容如下)([::]|$)"
+policy_pattern = "《.+?》"
+not_policy_pattern = "(表|函|书|证)》$|采购合同|响应方须知|响应文件格式|营业执照|开标一览|采购需求"
+aptitude_pattern = "(资格要求|资质要求)([::]|$)"
+def extract_parameters(_html):
+
+
+    pd = ParseDocument(_html,True)
+
+    list_data = pd.tree
+    list_result = []
+
+
+
+    requirement_text = ''
+    list_policy = []
+    aptitude_text = ''
+
+    _find_count = 0
+    _data_i = -1
+    while _data_i<len(list_data)-1:
+        _data_i += 1
+        _data = list_data[_data_i]
+        _type = _data["type"]
+        _text = _data["text"].strip()
+        # print(_data.keys())
+        if _type=="sentence":
+            if _data["sentence_title"] is not None:
+                if re.search(requirement_pattern,_text) is not None:
+
+                    childs = get_childs([_data])
+                    for c in childs:
+                        requirement_text += c["text"]+"\n"
+                    _data_i += len(childs)
+    _data_i = -1
+    while _data_i<len(list_data)-1:
+        _data_i += 1
+        _data = list_data[_data_i]
+        _type = _data["type"]
+        _text = _data["text"].strip()
+        # print(_data.keys())
+        if _type=="sentence":
+            if _data["sentence_title"] is not None:
+                print("aptitude_pattern",_text)
+                if re.search(aptitude_pattern,_text) is not None:
+                    childs = get_childs([_data])
+
+                    for c in childs:
+                        aptitude_text += c["text"]+"\n"
+                    _data_i += len(childs)
+
+        if _type=="table":
+            list_table = _data["list_table"]
+            parent_title = _data["parent_title"]
+            if list_table is not None:
+                for line in list_table[:2]:
+                    for cell_i in range(len(line)):
+                        cell = line[cell_i]
+                        cell_text = cell[0]
+                        if len(cell_text)>120 and re.search(aptitude_pattern,cell_text) is not None:
+                            aptitude_text += cell_text+"\n"
+
+    _data_i = -1
+    while _data_i<len(list_data)-1:
+        _data_i += 1
+        _data = list_data[_data_i]
+        _type = _data["type"]
+        _text = _data["text"].strip()
+        # print(_data.keys())
+        if _type=="sentence":
+            if re.search(policy_pattern,_text) is not None:
+                for t in re.findall(policy_pattern,_text):
+                    if len(t)>0:
+                        list_policy.append(t)
+
+
+    list_policy = list(set(list_policy))
+    new_list_policy = []
+    for _policy in list_policy:
+        if re.search(not_policy_pattern,_policy) is None:
+            new_list_policy.append(_policy)
+    return requirement_text,new_list_policy,aptitude_text
+
+
+def valid_xml_char_ordinal(c):
+    codepoint = ord(c)
+    # conditions ordered by presumed frequency
+    return (
+            0x20 <= codepoint <= 0xD7FF or
+            codepoint in (0x9, 0xA, 0xD) or
+            0xE000 <= codepoint <= 0xFFFD or
+            0x10000 <= codepoint <= 0x10FFFF
+    )
+
+
+def wash_data(text):
+    cleaned_string = ''.join(c for c in text if valid_xml_char_ordinal(c))
+    return cleaned_string
+
+def extract_datas():
+    import glob
+    import pandas as pd
+
+    list_data = []
+    for file in glob.glob(r"C:\Users\Administrator\Desktop\html_output\*.html"):
+        _content = open(file,"r",encoding="utf-8").read()
+        filename = file.split("\\")[-1]
+        # if filename!='2023年泗门镇公共视频图像通信链路技术服务采购项目.html':
+        #     continue
+        requirement_text,list_policy,aptitude_text = extract_parameters(_content)
+        print("requirement_text",requirement_text)
+        print("list_policy",list_policy)
+        print("aptitude_text",aptitude_text)
+        list_data.append([wash_data(filename),wash_data(requirement_text),wash_data(",".join(list_policy)),wash_data(aptitude_text)])
+    df = pd.DataFrame(list_data,columns=["文件名","采购需求","采购政策","资质要求"])
+    df.to_excel("提取结果.xlsx",columns=["文件名","采购需求","采购政策","资质要求"])
+
+def extract_datas_1():
+    import glob
+    import pandas as pd
+
+    list_data = []
+    for file in glob.glob(r"C:\Users\Administrator\Desktop\html_output1\*.html"):
+        _content = open(file,"r",encoding="utf-8").read()
+        filename = file.split("\\")[-1].split(".")[0]
+        # if filename!='2023年泗门镇公共视频图像通信链路技术服务采购项目.html':
+        #     continue
+        requirement_text,list_policy,aptitude_text = extract_parameters(_content)
+        print("requirement_text",requirement_text)
+        print("list_policy",list_policy)
+        print("aptitude_text",aptitude_text)
+        # list_data.append([wash_data(filename),wash_data(requirement_text),wash_data(",".join(list_policy)),wash_data(aptitude_text)])
+        list_data.append([wash_data(filename),wash_data(",".join(list_policy))])
+    df = pd.DataFrame(list_data,columns=["文件名","采购政策"])
+    df.to_excel("提取结果1.xlsx",columns=["文件名","采购政策"])
+
+if __name__ == '__main__':
+    # extract_datas()
+    extract_datas_1()
+    # print(re.search(aptitude_pattern,'二、申请人的资格要求:'))

+ 5 - 5
BaseDataMaintenance/maintenance/product/htmlparser.py

@@ -139,6 +139,7 @@ class ParseDocument():
         # #识别目录树
         # if self.parseTree:
         #     self.parseTree.printParseTree()
+        # self.print_tree(self.tree,"-|")
 
     def get_soup_objs(self,soup,list_obj=None):
         if list_obj is None:
@@ -158,18 +159,17 @@ class ParseDocument():
             self.tree = self.buildParsetree(self.list_obj,products,self.auto_merge_table)
 
     def print_tree(self,tree,append=""):
+        self.set_tree_id = set()
         if append=="":
-            self.set_tree_id = set()
-
-            # for t in tree:
-            #     logger.debug("%s text:%s title:%s title_text:%s before:%s after%s product:%s"%("==>",t["text"][:50],t["sentence_title"],t["sentence_title_text"],t["title_before"],t["title_after"],t["has_product"]))
+            for t in tree:
+                logger.debug("%s text:%s title:%s title_text:%s before:%s after%s product:%s"%("==>",t["text"][:50],t["sentence_title"],t["sentence_title_text"],t["title_before"],t["title_after"],t["has_product"]))
 
         for t in tree:
             _id = id(t)
             if _id in self.set_tree_id:
                 continue
             self.set_tree_id.add(_id)
-            logger.debug("%s text:%s title:%s title_text:%s before:%s after%s product:%s"%(append,t["text"][:50],t["sentence_title"],t["sentence_title_text"],t["title_before"],t["title_after"],t["has_product"]))
+            logger.info("%s text:%s title:%s title_text:%s before:%s after%s product:%s"%(append,t["text"][:50],t["sentence_title"],t["sentence_title_text"],t["title_before"],t["title_after"],t["has_product"]))
             childs = t["child_title"]
             self.print_tree(childs,append=append+"-|")
 

+ 98 - 0
BaseDataMaintenance/model/oracle/T_SHEN_PI_XIANG_MU.py

@@ -0,0 +1,98 @@
+
+
+from BaseDataMaintenance.model.oracle.BaseModel import BaseModel
+from datetime import datetime
+from BaseDataMaintenance.common.Utils import getCurrent_date,log
+
+T_SHEN_PI_XIANG_MU_ID = "id"
+T_SHEN_PI_XIANG_MU_WEB_SOURCE_NO = "web_source_no"
+T_SHEN_PI_XIANG_MU_AREA = "area"
+T_SHEN_PI_XIANG_MU_PROVINCE = "province"
+T_SHEN_PI_XIANG_MU_CITY = "city"
+T_SHEN_PI_XIANG_MU_DISTRICT = "district"
+T_SHEN_PI_XIANG_MU_WEB_SOURCE_NAME = "web_source_name"
+T_SHEN_PI_XIANG_MU_SP_TYPE = "sp_type"
+T_SHEN_PI_XIANG_MU_DETAILLINK = "detaillink"
+T_SHEN_PI_XIANG_MU_RECORD_ID = "record_id"
+T_SHEN_PI_XIANG_MU_PAGE_TITLE = "page_title"
+T_SHEN_PI_XIANG_MU_PAGE_TIME = "page_time"
+T_SHEN_PI_XIANG_MU_PAGE_CONTENT = "page_content"
+T_SHEN_PI_XIANG_MU_PAGE_CODE = "page_code"
+T_SHEN_PI_XIANG_MU_CREATE_TIME = "create_time"
+T_SHEN_PI_XIANG_MU_SOURCE_TYPE = "source_type"
+T_SHEN_PI_XIANG_MU_SOURCE_STAGE = "source_stage"
+T_SHEN_PI_XIANG_MU_ATTACHMENT_PATH = "attachment_path"
+T_SHEN_PI_XIANG_MU_PAGE_ATTACHMENTS = "page_attachments"
+
+
+class T_SHEN_PI_XIANG_MU(BaseModel):
+
+
+    def __init__(self,_dict):
+        self.all_columns = []
+        for k,v in _dict.items():
+            self.setValue(k,v,True)
+
+
+
+    def getProperties(self):
+        return self.__dict__
+
+    def getProperties_ots(self):
+        new_dict = {}
+        for k,v in self.__dict__.items():
+            if v is not None:
+                if isinstance(v,(str,int,float)):
+                    pass
+                elif isinstance(v,(datetime)):
+                    v = v.strftime("%Y-%m-%d %H:%M:%S")
+                else:
+                    v = str(v)
+                new_dict[k] = v
+        docid = int(new_dict.get("id",0))
+        partition_key = docid%500+1
+
+        new_dict["partition_key"] = partition_key
+        new_dict["docid"] = docid
+        new_dict.pop(T_SHEN_PI_XIANG_MU_ID)
+
+        new_dict["crtime"] = new_dict.get(T_SHEN_PI_XIANG_MU_CREATE_TIME)
+        new_dict["docchannel"] = 302
+
+        new_dict["doctitle"] = new_dict.get(T_SHEN_PI_XIANG_MU_PAGE_TITLE,"")
+        new_dict.pop(T_SHEN_PI_XIANG_MU_PAGE_TITLE)
+
+        new_dict["dochtmlcon"] = new_dict.get(T_SHEN_PI_XIANG_MU_PAGE_CONTENT)
+        new_dict.pop(T_SHEN_PI_XIANG_MU_PAGE_CONTENT)
+
+        new_dict[T_SHEN_PI_XIANG_MU_PAGE_ATTACHMENTS] = new_dict.get(T_SHEN_PI_XIANG_MU_ATTACHMENT_PATH,"[]")
+
+        opertime = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
+        publishtime = "%s %s"%(new_dict.get("page_time",""),opertime.split(" ")[1])
+        new_dict["opertime"] = opertime
+        new_dict["publishtime"] = publishtime
+        if "docchannel" in new_dict:
+            new_dict["original_docchannel"] = new_dict["docchannel"]
+        return new_dict
+
+    def select_rows(conn,max_shenpi_id,limit=500):
+        list_result = []
+        s_limit = ""
+        if limit is not None:
+            s_limit = "limit %d"%limit
+        s_where = " where id>%d "%(max_shenpi_id)
+
+        cursor = conn.cursor()
+        sql = "select %s from %s %s %s order by id asc"%("*","t_shen_pi_xiang_mu_new",s_where,s_limit)
+        log("select rows:%s"%(sql))
+        cursor.execute(sql)
+
+        vol = cursor.description
+        rows = cursor.fetchall()
+        for row in rows:
+            _dict = {}
+            for _vol,_val in zip(vol,row):
+                _name = _vol[0]
+                _dict[_name] = _val
+            list_result.append(T_SHEN_PI_XIANG_MU(_dict))
+        return list_result

+ 39 - 35
BaseDataMaintenance/model/ots/document.py

@@ -20,6 +20,7 @@ document_status = "status"
 document_page_time = "page_time"
 document_attachment_extract_status = "attachment_extract_status"
 document_web_source_no = "web_source_no"
+document_web_source_name = "web_source_name"
 document_fingerprint = "fingerprint"
 document_opertime = "opertime"
 document_docchannel = "docchannel"
@@ -301,13 +302,16 @@ def turn_document_status():
     from BaseDataMaintenance.model.ots.attachment import attachment_filemd5,attachment_file_title,attachment_file_link
     ots_client = getConnect_ots()
     def producer(task_queue,ots_client):
+        from BaseDataMaintenance.model.ots.document_tmp import Document_tmp
 
 
         bool_query = BoolQuery(
             must_queries=[
                 # MatchPhraseQuery("doctitle","珠海城市职业技术学院2022年05月至2022年06月政府采购意向"),
-                # WildcardQuery("web_source_no","14763*"),
-                RangeQuery("status",0,1),
+                WildcardQuery("web_source_no","03716-*"),
+                RangeQuery("page_time","2024-04-24"),
+                TermQuery("save",1)
+                # RangeQuery("status",0,1),
                 # NestedQuery("page_attachments",ExistsQuery("page_attachments.fileMd5")),
                 # TermQuery("docid",397656324)
                 # BoolQuery(should_queries=[
@@ -337,25 +341,25 @@ def turn_document_status():
         #
         # )
 
-        # rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
-        #                                                                SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
-        #                                                                columns_to_get=ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
-        # list_data = getRow_ots(rows)
-        # print(total_count)
-        # _count = len(list_data)
-        # for _data in list_data:
-        #     _document = Document(_data)
-        #     task_queue.put(_document)
-        # while next_token:
-        #     rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
-        #                                                                    SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
-        #                                                                    columns_to_get=ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
-        #     list_data = getRow_ots(rows)
-        #     _count += len(list_data)
-        #     print("%d/%d"%(_count,total_count))
-        #     for _data in list_data:
-        #         _document = Document(_data)
-        #         task_queue.put(_document)
+        rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
+                                                                       SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
+                                                                       columns_to_get=ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
+        list_data = getRow_ots(rows)
+        print(total_count)
+        _count = len(list_data)
+        for _data in list_data:
+            _document = Document_tmp(_data)
+            task_queue.put(_document)
+        while next_token:
+            rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
+                                                                           SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+                                                                           columns_to_get=ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
+            list_data = getRow_ots(rows)
+            _count += len(list_data)
+            print("%d/%d"%(_count,total_count))
+            for _data in list_data:
+                _document = Document_tmp(_data)
+                task_queue.put(_document)
 
         # docids = [223820830,224445409]
         # for docid in docids:
@@ -363,19 +367,19 @@ def turn_document_status():
         #              document_partitionkey:int(docid)%500+1,
         #              }
         #     task_queue.put(Document(_dict))
-        import pandas as pd
-        df = pd.read_excel(r"F:\Workspace2016\DataMining\export\abc1.xlsx")
-        for docid in df["docid1"]:
-            _dict = {document_docid:int(docid),
-                     document_partitionkey:int(docid)%500+1,
-                     }
-            task_queue.put(Document(_dict))
-        for docid in df["docid2"]:
-            _dict = {document_docid:int(docid),
-                     document_partitionkey:int(docid)%500+1,
-                     }
-            task_queue.put(Document(_dict))
-        log("task_queue size:%d"%(task_queue.qsize()))
+        # import pandas as pd
+        # df = pd.read_excel(r"F:\Workspace2016\DataMining\export\abc1.xlsx")
+        # for docid in df["docid1"]:
+        #     _dict = {document_docid:int(docid),
+        #              document_partitionkey:int(docid)%500+1,
+        #              }
+        #     task_queue.put(Document(_dict))
+        # for docid in df["docid2"]:
+        #     _dict = {document_docid:int(docid),
+        #              document_partitionkey:int(docid)%500+1,
+        #              }
+        #     task_queue.put(Document(_dict))
+        # log("task_queue size:%d"%(task_queue.qsize()))
 
     def _handle(item,result_queue,ots_client):
         #change attach value
@@ -401,7 +405,7 @@ def turn_document_status():
         # item.setValue(document_province,"广东",True)
         # item.setValue(document_city,"珠海",True)
         # item.setValue(document_district,"金湾区",True)
-        item.setValue(document_status,1,True)
+        item.setValue(document_status,66,True)
         # print(item.getProperties())
         item.update_row(ots_client)
         # log("update %d status done"%(item.getProperties().get(document_docid)))