Ver Fonte

数据流优化(数据遗漏,过长数据丢弃),增加数据遗漏检查

luojiehua há 3 anos atrás
pai
commit
cf2a71ef1b

+ 39 - 3
BaseDataMaintenance/common/2.py

@@ -5,8 +5,8 @@ import os
 
 base64_bytes = 'MEUCIAfMRvXbONh32lUBAq5CJRLk2+gDVOlL4cb5yVoZgfIjAiEAmTWQ1XZFspKdee0iiXYg4KQZZbBuFcCa3uJiMdGzKfM='
 
-image_bytes = base64.b64decode(base64_bytes)
-print(str(image_bytes,"GB18030"))
+# image_bytes = base64.b64decode(base64_bytes)
+# print(str(image_bytes,"GB18030"))
 # image_path = os.path.join("","swf_page_%d.png"%(1))
 # with open(image_path, 'wb') as f:
 #     f.write(image_bytes)
@@ -17,4 +17,40 @@ print(str(image_bytes,"GB18030"))
 # s.write(b'adfdafa')
 # s.seek(0,2)
 # a = s.tell()
-# print(a)
+# print(a)
+
+from PyPDF2 import PdfFileReader, PdfFileWriter
+from pdfminer.pdfparser import PDFParser
+from pdfminer.pdfdocument import PDFDocument
+from pdfminer.pdfpage import PDFPage
+from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter
+from pdfminer.converter import PDFPageAggregator
+from pdfminer.layout import LTTextBoxHorizontal, LAParams, LTFigure, LTImage, LTCurve, LTText, LTChar, LTRect, \
+    LTTextBoxVertical, LTLine
+from io import BytesIO
+_b = open("4f70ff9e095cd8fe0651a694a1976ebe.pdf","rb").read()
+bio = BytesIO()
+bio.write(_b)
+
+def generateFp(_bytes):
+    bio = BytesIO()
+    bio.write(_bytes)
+    return bio
+
+parser = PDFParser(bio)
+doc_pdfminer = PDFDocument(parser)
+rsrcmgr = PDFResourceManager()
+laparams = LAParams(line_overlap=0.01,
+                         char_margin=0.05,
+                         line_margin=0.01,
+                         word_margin=0.01,
+                         boxes_flow=0.1,)
+device = PDFPageAggregator(rsrcmgr, laparams=laparams)
+interpreter = PDFPageInterpreter(rsrcmgr, device)
+
+pages = PDFPage.create_pages(doc_pdfminer)
+pages = list(pages)
+page_count = len(pages)
+page_no = 0
+for page in pages:
+    print(1)

+ 119 - 1
BaseDataMaintenance/common/Utils.py

@@ -83,9 +83,127 @@ def sendEmail(host,username,password,receivers,subject="数据导出",content=""
     finally:
         server.close()
 
+def article_limit(soup,limit_words=30000):
+    sub_space = re.compile("\s+")
+    def soup_limit(_soup,_count,max_count=30000,max_gap=500):
+        """
+        :param _soup: soup
+        :param _count: 当前字数
+        :param max_count: 字数最大限制
+        :param max_gap: 超过限制后的最大误差
+        :return:
+        """
+        _gap = _count - max_count
+        _is_skip = False
+        next_soup = None
+        while len(_soup.find_all(recursive=False)) == 1 and \
+                _soup.get_text(strip=True) == _soup.find_all(recursive=False)[0].get_text(strip=True):
+            _soup = _soup.find_all(recursive=False)[0]
+        if len(_soup.find_all(recursive=False)) == 0:
+            _soup.string = str(_soup.get_text())[:max_count-_count]
+            _count += len(re.sub(sub_space, "", _soup.string))
+            _gap = _count - max_count
+            next_soup = None
+        else:
+            for _soup_part in _soup.find_all(recursive=False):
+                if not _is_skip:
+                    _count += len(re.sub(sub_space, "", _soup_part.get_text()))
+                    if _count >= max_count:
+                        _gap = _count - max_count
+                        if _gap <= max_gap:
+                            _is_skip = True
+                        else:
+                            _is_skip = True
+                            next_soup = _soup_part
+                            _count -= len(re.sub(sub_space, "", _soup_part.get_text()))
+                            continue
+                else:
+                    _soup_part.decompose()
+        return _count,_gap,next_soup
+
+    text_count = 0
+    have_attachment = False
+    attachment_part = None
+    for child in soup.find_all(recursive=True):
+        if child.name == 'div' and 'class' in child.attrs:
+            if "richTextFetch" in child['class']:
+                child.insert_before("##attachment##")
+                attachment_part = child
+                have_attachment = True
+                break
+    if not have_attachment:
+        # 无附件
+        if len(re.sub(sub_space, "", soup.get_text())) > limit_words:
+            text_count,gap,n_soup = soup_limit(soup,text_count,max_count=limit_words,max_gap=500)
+            while n_soup:
+                text_count, gap, n_soup = soup_limit(n_soup, text_count, max_count=limit_words, max_gap=500)
+
+    else:
+        # 有附件
+        _text = re.sub(sub_space, "", soup.get_text())
+        _text_split = _text.split("##attachment##")
+        if len(_text_split[0])>limit_words:
+            main_soup = attachment_part.parent
+            main_text = main_soup.find_all(recursive=False)[0]
+            text_count, gap, n_soup = soup_limit(main_text, text_count, max_count=limit_words, max_gap=500)
+            while n_soup:
+                text_count, gap, n_soup = soup_limit(n_soup, text_count, max_count=limit_words, max_gap=500)
+        if len(_text_split[1])>limit_words:
+            # attachment_html纯文本,无子结构
+            if len(attachment_part.find_all(recursive=False))==0:
+                attachment_part.string = str(attachment_part.get_text())[:limit_words]
+            else:
+                attachment_text_nums = 0
+                attachment_skip = False
+                for part in attachment_part.find_all(recursive=False):
+                    if not attachment_skip:
+                        last_attachment_text_nums = attachment_text_nums
+                        attachment_text_nums = attachment_text_nums + len(re.sub(sub_space, "", part.get_text()))
+                        if attachment_text_nums>=limit_words:
+                            part.string = str(part.get_text())[:limit_words-last_attachment_text_nums]
+                            attachment_skip = True
+                    else:
+                        part.decompose()
+
+    return soup
+
+def soup_limit(_soup,_count,max_count=30000,max_gap=500,sub_space = re.compile("\s+")):
+    """
+    :param _soup: soup
+    :param _count: 当前字数
+    :param max_count: 字数最大限制
+    :param max_gap: 超过限制后的最大误差
+    :return:
+    """
+    _gap = _count - max_count
+    _is_skip = False
+    next_soup = None
+    while len(_soup.find_all(recursive=False)) == 1 and \
+            _soup.get_text(strip=True) == _soup.find_all(recursive=False)[0].get_text(strip=True):
+        _soup = _soup.find_all(recursive=False)[0]
+    if len(_soup.find_all(recursive=False)) == 0:
+        _soup.string = str(_soup.get_text())[:max_count-_count]
+        _count += len(re.sub(sub_space, "", _soup.string))
+        _gap = _count - max_count
+        next_soup = None
+    else:
+        for _soup_part in _soup.find_all(recursive=False):
+            if not _is_skip:
+                _count += len(re.sub(sub_space, "", _soup_part.get_text()))
+                if _count >= max_count:
+                    _gap = _count - max_count
+                    if _gap <= max_gap:
+                        _is_skip = True
+                    else:
+                        _is_skip = True
+                        next_soup = _soup_part
+                        _count -= len(re.sub(sub_space, "", _soup_part.get_text()))
+                        continue
+            else:
+                _soup_part.decompose()
+    return _count,_gap,next_soup
 
 def cut_str(text_list, only_text_list, max_bytes_length=2000000):
-    logging.info("into cut_str")
     try:
 
         # 计算有格式总字节数

+ 15 - 5
BaseDataMaintenance/common/activateMQUtils.py

@@ -10,11 +10,21 @@ import traceback
 from BaseDataMaintenance.common.Utils import log
 
 
-def send_msg_toacmq(conn,msg,dest):
-    try:
-        conn.send(body=str(msg), destination=dest, persistent='false')
-    except Exception as e:
-        traceback.print_exc()
+def send_msg_toacmq(pool_conn,msg,dest,retry_times=5):
+    for _ in range(retry_times):
+        conn = pool_conn.getConnector()
+        try:
+            conn.send(body=str(msg), destination=dest, persistent='false')
+            return True
+        except Exception as e:
+            traceback.print_exc()
+            conn.disconnect()
+        finally:
+            if conn.is_connected():
+                pool_conn.putConnector(conn)
+            else:
+                del conn
+    return False
 
 class MyListener(object):
     def __init__(self, conn,subscription):

+ 1 - 1
BaseDataMaintenance/common/multiThread.py

@@ -85,7 +85,7 @@ class MultiThreadHandler(object):
 
     def run(self):
 
-
+        self.list_thread = []
         for i in range(self.thread_count):
             th = _taskHandler(self.task_queue,self.task_handler,self.result_queue,*self.args,**self.kwargs)
             # th.setDaemon(True)

+ 132 - 3
BaseDataMaintenance/dataMonitor/data_monitor.py

@@ -14,6 +14,9 @@ from queue import Queue
 from BaseDataMaintenance.common.multiThread import MultiThreadHandler
 
 from BaseDataMaintenance.java.MQInfo import getAllQueueSize,getQueueSize
+from BaseDataMaintenance.maintenance.dataflow_settings import *
+
+import pandas as pd
 
 
 flow_attachment_log_path = "/data/python/flow_attachment.log"
@@ -23,6 +26,10 @@ flow_extract_log_path = "/data/python/flow_extract.log"
 flow_init_path = "/data/python/flow_init.log"
 
 
+flow_init_log_dir = "/data/python/flow_init_log"
+flow_init_check_dir = "/data/python/flow_init_check"
+
+
 class BaseDataMonitor():
 
     def __init__(self):
@@ -40,6 +47,95 @@ class BaseDataMonitor():
 
         last_ten_minite_time = timeAdd(current_time,0,"%Y-%m-%d %H:%M:%S",-10)
         return last_ten_minite_time[:nums]
+
+    def monitor_init(self):
+
+        def _handle(_item,result_queue):
+            bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
+
+            rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
+                                                                           SearchQuery(bool_query,get_total_count=True),
+                                                                           columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+
+            _item["exists"] = total_count
+        try:
+            current_date = getCurrent_date("%Y-%m-%d")
+
+            last_date = timeAdd(current_date,-1,"%Y-%m-%d")
+
+            if not os.path.exists(flow_init_check_dir):
+                os.mkdir(flow_init_check_dir)
+
+            log_filename = os.path.join(flow_init_log_dir,"flow_init_%s.log"%(last_date))
+            check_filename = os.path.join(flow_init_check_dir,"flow_init_%s.xlsx"%(last_date))
+
+            list_uuid = []
+            task_queue = Queue()
+            dict_tolong = {}
+            if not os.path.exists(check_filename) and os.path.exists(log_filename):
+                _regrex = "delete\s+(?P<tablename>bxkc[^\s]+)\s+.*ID='(?P<uuid>.+)'"
+                _regrex_tolong = "msg too long:(?P<uuid>[^,]+),\d+"
+                with open(log_filename,"r",encoding="utf8") as f:
+                    while 1:
+                        _line = f.readline()
+                        if not _line:
+                            break
+                        _match = re.search(_regrex,_line)
+                        if _match is not None:
+                            _uuid = _match.groupdict().get("uuid")
+                            tablename = _match.groupdict().get("tablename")
+                            if _uuid is not None:
+                                list_uuid.append({"uuid":_uuid,"tablename":tablename})
+                        _match = re.search(_regrex_tolong,_line)
+                        if _match is not None:
+                            _uuid = _match.groupdict().get("uuid")
+                            dict_tolong[_uuid] = 1
+
+
+                if list_uuid==0:
+                    _msg = "数据遗漏检查出错"
+                    sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
+                    sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+
+                ots_client = getConnect_ots()
+
+                for _d in list_uuid:
+                    task_queue.put(_d)
+                mt = MultiThreadHandler(task_queue,_handle,None,30)
+                mt.run()
+                df_data = {"uuid":[],
+                           "tablename":[],
+                           "exists":[],
+                           "tolong":[]}
+
+                for _data in list_uuid:
+                    for k,v in df_data.items():
+                        if k!="tolong":
+                            v.append(_data.get(k))
+                    df_data["tolong"].append(dict_tolong.get(_data["uuid"],0))
+                df2 = pd.DataFrame(df_data)
+                df2.to_excel(check_filename)
+
+            counts = 0
+            df_data = pd.read_excel(check_filename)
+            for _exists,_tolong in zip(df_data["exists"],df_data["tolong"]):
+                if _exists==0 and _tolong==0:
+                    counts += 1
+            if counts>0:
+                _msg = "数据遗漏检查报警,%s有%s条公告遗漏,详见%s"%(last_date,str(counts),check_filename)
+                sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
+                sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+
+
+
+
+        except Exception as e:
+            _msg = "数据遗漏检查报错"
+            sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
+            sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+            traceback.print_exc()
+
+
     def monitor_attachment(self):
         try:
             # query = BoolQuery(must_queries=[
@@ -82,13 +178,12 @@ class BaseDataMonitor():
                 init_count = self.cmd_execute(_cmd)
 
 
-                _msg = "附件提取队列报警:队列堆积%s条公告,最近十分钟入库数:%s,最近十分钟处理公告附件数:%s,处理成功数:%s"%(str(total_count_todeal),str(init_count),str(process_count),str(process_succeed_count))
+                _msg = "附件提取队列报警:队列堆积%s条公告,最近十分钟处理公告附件数:%s,处理成功数:%s"%(str(total_count_todeal),str(process_count),str(process_succeed_count))
                 sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
                 sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
         except Exception as e:
             traceback.print_exc()
 
-
     def monitor_extract(self):
 
         try:
@@ -156,6 +251,25 @@ class BaseDataMonitor():
             sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
             sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
+
+    def monitor_sychr(self):
+        current_date = getCurrent_date("%Y-%m-%d")
+
+        last_date = timeAdd(current_date,-1,"%Y-%m-%d")
+
+        query = BoolQuery(must_queries=[
+            RangeQuery("status",*flow_sychro_status_from,True,True),
+        ])
+
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
+                                                                            SearchQuery(query,None,True),
+                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+
+        if total_count>=200:
+            _msg = "数据流报警:待同步到成品表公告数为:%d"%(total_count)
+            sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
+            sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+
     def monitor_preproject(self):
         current_date = getCurrent_date("%Y-%m-%d")
 
@@ -239,6 +353,19 @@ class BaseDataMonitor():
             sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
             sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
+        query = BoolQuery(must_queries=[
+            RangeQuery("status",*flow_dumplicate_status_from,True,True),
+        ])
+
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
+                                                                            SearchQuery(query,None,True),
+                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+
+        if total_count>=200:
+            _msg = "数据流报警:待去重公告数为:%d"%(total_count)
+            sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
+            sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+
 
     def monitor_merge(self):
         current_date = getCurrent_date("%Y-%m-%d")
@@ -331,12 +458,14 @@ class BaseDataMonitor():
 
         scheduler = BlockingScheduler()
 
-        scheduler.add_job(self.monitor_attachment,"cron",minute="*/10")
+        # scheduler.add_job(self.monitor_attachment,"cron",minute="*/10")
         scheduler.add_job(self.monitor_extract,"cron",minute="*/10")
         scheduler.add_job(self.monitor_proposedBuilding,"cron",hour="*/2")
         scheduler.add_job(self.monitor_dumplicate,"cron",minute="*/10")
+        scheduler.add_job(self.monitor_sychr,"cron",minute="*/10")
         scheduler.add_job(self.monitor_preproject,"cron",hour="8")
         scheduler.add_job(self.monitor_merge,"cron",hour="*/1")
+        scheduler.add_job(self.monitor_init,"cron",hour="*/3")
         scheduler.start()
 
 

+ 12 - 4
BaseDataMaintenance/dataSource/interface.py

@@ -3,6 +3,7 @@ import requests
 import json
 
 from urllib import request
+from requests.exceptions import ConnectionError
 def check_net(testserver):
     try:
         ret = request.urlopen(url=testserver, timeout=10.0)
@@ -11,6 +12,7 @@ def check_net(testserver):
         return False
     return True
 interface_url = "http://192.168.0.115:15010/convert"
+# interface_url = "http://192.168.2.102:15010/convert"
 # if not check_net(interface_url):
 #     interface_url = "http://47.98.57.0:15015/convert"
 print(interface_url)
@@ -18,14 +20,18 @@ DEFAULT_TIMEOUT = 3000
 import traceback
 import base64
 
-def getAttachDealInterface(_data,_type,restry=1):
+def getAttachDealInterface(_data,_type,path="",restry=1):
     _succeed = False
     _html = ""
     swf_images = []
     for i in range(restry):
         try:
-            _json = {"file":_data,
-                    "type":_type}
+            if path!="":
+                _json = {"file_path":path,
+                         "type":_type}
+            else:
+                _json = {"file":_data,
+                        "type":_type}
             headers = {"Content-Type":"application/json"}
             _resp = requests.post(interface_url,data=_json,timeout=DEFAULT_TIMEOUT)
 
@@ -40,6 +46,8 @@ def getAttachDealInterface(_data,_type,restry=1):
                     return _succeed,_html,swf_images
                 else:
                     pass
+        except ConnectionError as e1:
+            raise e1
         except Exception as e:
             traceback.print_exc()
             _succeed = False
@@ -79,5 +87,5 @@ def sentMsgToDD(msg,access_token=ACCESS_TOKEN_SUANFA):
 
 
 if __name__=="__main__":
-    print(getAttachDealInterface(base64.b64encode(open("F://Workspace2016/BaseDataMaintenance/BaseDataMaintenance/maintenance/attachment\download\\2020年南通市职业学校教学大赛结果的通知(扫描件).pdf","rb").read()),"pdf"))
+    print(getAttachDealInterface(base64.b64encode(open("F://Workspace2016/BaseDataMaintenance/BaseDataMaintenance/maintenance/attachment/readme.md","rb").read()),"pdf"))
     # sentMsgToDD("测试消息")

+ 8 - 3
BaseDataMaintenance/dataSource/pool.py

@@ -19,13 +19,18 @@ class ConnectorPool():
         with self._lock:
             if self.connector_pool.empty():
                 if self.pool_size<self.max_num:
-                    self.connector_pool.put(self.method_init(**self.kwargs))
+                    while 1:
+                        try:
+                            _conn = self.method_init(**self.kwargs)
+                            self.connector_pool.put(_conn)
+                            break
+                        except Exception as e:
+                            pass
             _conn = self.connector_pool.get(block=True)
             return _conn
 
     def putConnector(self,_conn):
-        with self._lock:
-            self.connector_pool.put(_conn)
+        self.connector_pool.put(_conn)
 
     def destory(self):
         while 1:

+ 5 - 2
BaseDataMaintenance/dataSource/setttings.py

@@ -43,8 +43,10 @@ oracle_host = "192.168.0.150"
 oracle_port = 1522
 # oracle_user = "bxkc_data_readonly"
 # oracle_pass = "P7WUrgcz0@#j8pjg"
-oracle_user = "bxkc_data"
-oracle_pass = "Z0rTLHo@nIu5Zk1Z"
+# oracle_user = "bxkc_data"
+# oracle_pass = "Z0rTLHo@nIu5Zk1Z"
+oracle_user = "bxkc_write"
+oracle_pass = "TKVF#3idC4UQlDVy"
 oracle_db = "yanphone"
 
 ots_AccessKeyId = 'LTAI5tFuoxHm8Uxrr5nT8wTZ'
@@ -65,6 +67,7 @@ activateMQ_pswd = "admin"
 
 # attach_postgres_host = "121.46.18.113"
 attach_postgres_host = "127.0.0.1"
+attach_postgres_host = "192.168.0.114"
 attach_postgres_port = 5432
 attach_postgres_user = "postgres"
 attach_postgres_pswd = "postgres"

+ 3 - 2
BaseDataMaintenance/fixDoc_to_queue_extract.py

@@ -3,8 +3,9 @@ import sys,os
 
 sys.path.append(os.path.dirname(__file__)+"/..")
 
-from BaseDataMaintenance.maintenance.dataflow_mq import fixDoc_to_queue_extract
+from BaseDataMaintenance.maintenance.dataflow_mq import fixDoc_to_queue_extract,fixDoc_to_queue_init
 
 
 if __name__ == '__main__':
-    fixDoc_to_queue_extract()
+    fixDoc_to_queue_extract()
+    # fixDoc_to_queue_init()

+ 13 - 2
BaseDataMaintenance/maintenance/2.py

@@ -4,7 +4,18 @@ import time
 import gunicorn
 
 import requests
+from bs4 import BeautifulSoup
+
+from BaseDataMaintenance.common.Utils import article_limit
+
+import codecs
 
 if __name__ == '__main__':
-    print(time.time())
-    print(time.time()*100)
+
+    text = codecs.open("C:\\Users\\\Administrator\\Desktop\\2.html","r",encoding="utf8").read()
+    content = str(BeautifulSoup(text).find("div",id="pcontent"))
+    _soup = BeautifulSoup(content,"lxml")
+    print(len(str(_soup)))
+    _soup = article_limit(_soup,3000)
+    print(len(str(_soup)))
+    print(str(_soup))

+ 9 - 6
BaseDataMaintenance/maintenance/dataflow.py

@@ -411,7 +411,8 @@ class Dataflow():
         _dict["win_tenderer"] = win_tenderer
         _dict["bidding_budget"] = bidding_budget
         _dict["win_bid_price"] = win_bid_price
-        _dict["extract_count"] = extract_count
+        if "extract_count" not in _dict:
+            _dict["extract_count"] = extract_count
 
     def get_dump_columns(self,_dict):
         docchannel = _dict.get(document_tmp_docchannel,0)
@@ -480,7 +481,7 @@ class Dataflow():
             return _split
         return []
 
-    def search_data_by_query(self,item,_query,confidence,table_name="document_tmp",table_index="document_tmp_index",sort_column="docid",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json]):
+    def search_data_by_query(self,item,_query,confidence,table_name="document_tmp",table_index="document_tmp_index",sort_column="docid",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
 
         list_data = []
         if isinstance(_query,list):
@@ -510,7 +511,7 @@ class Dataflow():
         list_dict = self.f_set_docid_limitNum_contain(item,list_dict,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys)
         return list_dict
 
-    def add_data_by_query(self,item,base_list,set_docid,_query,confidence,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_save,document_tmp_status,document_tmp_product,document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json]):
+    def add_data_by_query(self,item,base_list,set_docid,_query,confidence,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_save,document_tmp_status,document_tmp_product,document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
 
         list_dict = self.search_data_by_query(item,_query,confidence,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys,columns=columns)
         for _dict in list_dict:
@@ -1214,9 +1215,9 @@ class Dataflow():
         the_group = base_list
         the_group.sort(key=lambda x:x["confidence"],reverse=True)
         if len(the_group)>10:
-            keys = ["tenderee","win_tenderer","win_bid_price","doctitle_refine"]
+            keys = ["tenderee","win_tenderer","win_bid_price","bidding_budget","doctitle_refine"]
         else:
-            keys = ["tenderee","win_tenderer","win_bid_price"]
+            keys = ["tenderee","win_tenderer","win_bid_price","bidding_budget"]
 
 
         #置信度
@@ -1363,10 +1364,12 @@ class Dataflow():
 
             final_list = self.dumplicate_fianl_check(base_list)
             best_docid = self.get_best_docid(final_list)
+            # log(str(final_list))
 
             _d = {"partitionkey":item["partitionkey"],
                   "docid":item["docid"],
                   "status":random.randint(*flow_dumplicate_status_to),
+                  document_tmp_opertime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")
                   }
             dtmp = Document_tmp(_d)
 
@@ -1598,7 +1601,7 @@ class Dataflow():
 
     def start_flow_remove(self):
         schedule = BlockingScheduler()
-        schedule.add_job(self.flow_remove,"cron",hour="*/2")
+        schedule.add_job(self.flow_remove,"cron",hour="20")
         schedule.start()
 
 def download_attachment():

+ 232 - 59
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -9,6 +9,9 @@ from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
 import os
 from BaseDataMaintenance.common.ossUtils import *
 from BaseDataMaintenance.dataSource.pool import ConnectorPool
+from BaseDataMaintenance.model.ots.document import Document
+
+from BaseDataMaintenance.common.Utils import article_limit
 
 class ActiveMQListener():
 
@@ -38,7 +41,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
         self.mq_attachment = "/queue/dataflow_attachment"
         self.mq_attachment_failed = "/queue/dataflow_attachment_failed"
         self.mq_extract = "/queue/dataflow_extract"
-        self.comsumer_count = 50
+        self.comsumer_count = 80
         self.retry_comsumer_count = 10
         self.retry_times = 5
         for _i in range(self.comsumer_count):
@@ -46,6 +49,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
             createComsumer(listener_attachment,self.mq_attachment)
         self.attach_pool = ConnectorPool(10,30,getConnection_postgres)
         self.conn_mq = getConnect_activateMQ()
+        self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
 
 
     def process_failed_attachment(self):
@@ -98,6 +102,9 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
             if not _not_failed:
                 return False,list_html,swf_urls
             return True,list_html,swf_urls
+
+        except requests.ConnectionError as e1:
+            raise e1
         except Exception as e:
             return False,list_html,swf_urls
 
@@ -135,9 +142,9 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                 item["retry_times"] = _retry_times+1
                 #失败次数大于5次就放入失败队列,此队列的数据会在空闲时间重新处理一次
                 if item["retry_times"]>=self.retry_times:
-                    send_msg_toacmq(self.conn_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment_failed)
+                    send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment_failed)
 
-                send_msg_toacmq(self.conn_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment)
+                send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment)
 
                 #失败保存
                 dtmp.setValue(document_tmp_dochtmlcon,"",False)
@@ -145,7 +152,8 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                 if not dtmp.exists_row(self.ots_client):
                     dtmp.update_row(self.ots_client)
                     dhtml.update_row(self.ots_client)
-                _to_ack = True
+                if send_succeed:
+                    _to_ack = True
 
             else:
                 try:
@@ -155,8 +163,9 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
 
                     dtmp.setValue(document_tmp_attachment_extract_status,1,True)
                     dtmp.setValue(document_tmp_dochtmlcon,dhtml.getProperties().get(document_tmp_dochtmlcon),True)
-                    send_msg_toacmq(self.conn_mq,json.dumps(dtmp.getProperties(),cls=MyEncoder),self.mq_extract)
-                    _to_ack = True
+                    send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(dtmp.getProperties(),cls=MyEncoder),self.mq_extract)
+                    if send_succeed:
+                        _to_ack = True
                 except Exception as e:
                     traceback.print_exc()
 
@@ -164,8 +173,8 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                 ackMsg(conn,message_id)
             log("document:%d get attachments with result:%s %s retry_times:%d"%(item.get("docid"),str(_succeed),str(_to_ack),_retry_times))
         except Exception as e:
-            send_msg_toacmq(self.conn_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment)
-            ackMsg(conn,message_id)
+            if send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment):
+                ackMsg(conn,message_id)
 
 
 
@@ -179,7 +188,10 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
         objectPath = attach.getProperties().get(attachment_path)
         docids = attach.getProperties().get(attachment_docids)
 
-        relative_path = objectPath[5:]
+        if objectPath is None:
+            relative_path = "%s/%s"%(_uuid.hex[:4],_uuid.hex)
+        else:
+            relative_path = objectPath[5:]
         localpath = "/FileInfo/%s"%(relative_path)
         if not os.path.exists(localpath):
             if not os.path.exists(os.path.dirname(localpath)):
@@ -203,12 +215,13 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                 log("md5:%s path:%s exists"%(filemd5,objectPath[5:]))
             if not (local_exists or download_succeed):
                 _ots_attach = attachment(attach.getProperties_ots())
-                _ots_exists = _ots_attach.fix_columns(self.ots_client,[attachment_attachmenthtml,attachment_attachmentcon,attachment_path,attachment_status],True)
+                _ots_exists = _ots_attach.fix_columns(self.ots_client,[attachment_attachmenthtml,attachment_attachmentcon,attachment_path,attachment_status,attachment_filetype],True)
                 log("md5:%s path:%s file not in local or oss,search ots.attachment"%(filemd5,objectPath[5:]))
                 if _ots_attach.getProperties().get(attachment_attachmenthtml,"")!="":
                     attach.setValue(attachment_attachmenthtml,_ots_attach.getProperties().get(attachment_attachmenthtml,""))
                     attach.setValue(attachment_attachmentcon,_ots_attach.getProperties().get(attachment_attachmentcon,""))
                     attach.setValue(attachment_status,_ots_attach.getProperties().get(attachment_status,""))
+                    attach.setValue(attachment_filetype,_ots_attach.getProperties().get(attachment_filetype,""))
                     if attach.exists(self.attach_pool):
                         attach.update_row(self.attach_pool)
                     else:
@@ -253,10 +266,14 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                     return True
 
                 time_download = time.time()-d_start_time
-                _data_base64 = base64.b64encode(open(localpath,"rb").read())
+
                 #调用接口处理结果
                 start_time = time.time()
-                _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype)
+                _filetype = attach.getProperties().get(attachment_filetype)
+
+                # _data_base64 = base64.b64encode(open(localpath,"rb").read())
+                # _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype)
+                _success,_html,swf_images = getAttachDealInterface(None,_filetype,path=localpath)
                 log("process filemd5:%s %s of type:%s with size:%.3fM download:%ds recognize takes %ds,ret_size:%d"%(filemd5,str(_success),_filetype,round(_size/1024/1024,4),time_download,time.time()-start_time,len(_html)))
                 if _success:
                     if len(_html)<5:
@@ -331,6 +348,9 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
             else:
                 return True
 
+        except requests.ConnectionError as e1:
+            raise e1
+
         except oss2.exceptions.NotFound as e:
             return True
 
@@ -386,6 +406,15 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                                    attachment_path:"%s/%s"%(_filemd5[:4],_path),
                                    attachment_crtime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
                         list_attachment.append(Attachment_postgres(_attach))
+                    else:
+                        log("getAttachments search in ots:%s"%(_filemd5))
+                        _attach = {attachment_filemd5:_filemd5}
+                        _attach_ots = attachment(_attach)
+                        _attach_ots.fix_columns(self.ots_client,[attachment_status,attachment_path,attachment_attachmenthtml,attachment_attachmentcon,attachment_filetype,attachment_swfUrls],True)
+                        if _attach_ots.getProperties().get(attachment_status) is not None:
+                            log("getAttachments find in ots:%s"%(_filemd5))
+                            list_attachment.append(Attachment_postgres(_attach_ots.getProperties()))
+
             return list_attachment
         except Exception as e:
             log("attachProcess comsumer error %s"%str(e))
@@ -490,24 +519,42 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
 
         self.industy_url = "http://127.0.0.1:15000/industry_extract"
 
-        self.extract_interfaces = ["http://127.0.0.1:15030/content_extract",
-                                   "http://192.168.0.115:15030/content_extract"
+        self.extract_interfaces = [["http://127.0.0.1:15030/content_extract",11],
+                                   ["http://192.168.0.115:15030/content_extract",10]
                                    ]
 
 
         self.mq_extract = "/queue/dataflow_extract"
 
+        whole_weight = 0
+        for _url,weight in self.extract_interfaces:
+            whole_weight+= weight
+        current_weight = 0
+        for _i in range(len(self.extract_interfaces)):
+            current_weight += self.extract_interfaces[_i][1]
+            self.extract_interfaces[_i][1] = current_weight/whole_weight
+
 
-        self.comsumer_count = 25
+
+        self.comsumer_count = 20
         for _i in range(self.comsumer_count):
             listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle)
             createComsumer(listener_extract,self.mq_extract)
         self.conn_mq = getConnect_activateMQ()
+        self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
+
+    def getExtract_url(self):
+        _r = random.random()
+        for _i in range(len(self.extract_interfaces)):
+            if _r<=self.extract_interfaces[_i][1]:
+                return self.extract_interfaces[_i][0]
 
     def request_extract_interface(self,json,headers):
-        _i = random.randint(0,len(self.extract_interfaces)-1)
+        # _i = random.randint(0,len(self.extract_interfaces)-1)
         # _i = 0
-        resp = requests.post(self.extract_interfaces[_i],json=json,headers=headers)
+        # _url = self.extract_interfaces[_i]
+        _url = self.getExtract_url()
+        resp = requests.post(_url,json=json,headers=headers)
         return resp
 
 
@@ -541,6 +588,13 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
                                    "docid":item.get("docid")})
 
             _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
+
+            if len(_dochtmlcon)>200000:
+                _soup = BeautifulSoup(_dochtmlcon,"lxml")
+                _soup = article_limit(_soup,200000)
+                _dochtmlcon = str(_soup)
+
+
             dhtml.setValue(document_tmp_dochtmlcon,_dochtmlcon,True)
             _extract = Document_extract({})
             _extract.setValue(document_extract2_partitionkey,item.get(document_partitionkey))
@@ -578,7 +632,7 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
             try:
                 if all_done!=1:
                     sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
-                    send_msg_toacmq(self.conn_mq,frame.body,self.mq_extract)
+                    send_succeed = send_msg_toacmq(self.pool_mq,frame.body,self.mq_extract)
 
 
                     #失败保存
@@ -587,7 +641,8 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
                     if not dtmp.exists_row(self.ots_client):
                         dtmp.update_row(self.ots_client)
                         dhtml.update_row(self.ots_client)
-                    _to_ack = True
+                    if send_succeed:
+                        _to_ack = True
                 else:
 
                     dtmp.setValue(document_tmp_dochtmlcon,"",False)
@@ -606,14 +661,13 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
         except Exception as e:
             traceback.print_exc()
             log("process %s docid: failed message_id:%s"%(data["doc_id"],message_id))
-            send_msg_toacmq(self.conn_mq,frame.body,self.mq_extract)
-            ackMsg(conn,message_id,subscription)
+            if send_msg_toacmq(self.pool_mq,frame.body,self.mq_extract):
+                ackMsg(conn,message_id,subscription)
 
 
     def start_flow_extract(self):
         schedule = BlockingScheduler()
         schedule.add_job(self.flow_extract_producer,"cron",second="*/20")
-        # schedule.add_job(self.flow_extract,"cron",second="*/10")
         schedule.start()
 
 from multiprocessing import RLock
@@ -665,6 +719,7 @@ class Dataflow_init(Dataflow):
             self.begin_docid = None
             self.mq_attachment = "/queue/dataflow_attachment"
             self.mq_extract = "/queue/dataflow_extract"
+            self.pool_mq1 = ConnectorPool(1,4,getConnect_activateMQ)
 
         def on_error(self, headers):
             log('received an error %s' % headers.body)
@@ -689,25 +744,38 @@ class Dataflow_init(Dataflow):
             body[document_tmp_partitionkey] = partitionkey
             body[document_tmp_docid] = next_docid
             page_attachments = body.get(document_tmp_attachment_path,"[]")
+            _uuid = body.get(document_tmp_uuid,"")
             if page_attachments!="[]":
                 status = random.randint(1,10)
                 body[document_tmp_status] = status
-                send_msg_toacmq(self.conn,json.dumps(body,cls=MyEncoder),self.mq_attachment)
-                ackMsg(self.conn,message_id)
+                if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_attachment):
+                    log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
+                    ackMsg(self.conn,message_id)
+                else:
+                    log("send_msg_error on init listener")
             else:
                 status = random.randint(11,50)
                 body[document_tmp_status] = status
-                send_msg_toacmq(self.conn,json.dumps(body,cls=MyEncoder),self.mq_extract)
-                ackMsg(self.conn,message_id)
+                if send_msg_toacmq(self.pool_mq1,json.dumps(body,cls=MyEncoder),self.mq_extract):
+                    log("uuid:%s with docid:%s"%(str(_uuid),str(next_docid)))
+                    ackMsg(self.conn,message_id)
+                else:
+                    log("send_msg_error on init listener")
 
         def __del__(self):
             self.conn.disconnect()
+            del self.pool_mq1
 
     def __init__(self):
         Dataflow.__init__(self)
         self.mq_init = "/queue/dataflow_init"
+
+        self.mq_attachment = "/queue/dataflow_attachment"
+        self.mq_extract = "/queue/dataflow_extract"
         self.pool_oracle = ConnectorPool(10,15,getConnection_oracle)
-        self.pool_mq = ConnectorPool(10,15,getConnect_activateMQ)
+        self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
+
+        self.ots_capacity = getConnect_ots_capacity()
 
         self.init_comsumer_counts = 2
         for i in range(self.init_comsumer_counts):
@@ -717,24 +785,90 @@ class Dataflow_init(Dataflow):
 
 
     def temp2mq(self,object):
-        conn_mq = self.pool_mq.getConnector()
         conn_oracle = self.pool_oracle.getConnector()
 
         try:
             list_obj = object.select_rows(conn_oracle,type(object),object.table_name,[],limit=1000)
             for _obj in list_obj:
                 ots_dict = _obj.getProperties_ots()
-                send_msg_toacmq(conn_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_init)
 
-                #删除数据,上线放开
-                _obj.delete_row(conn_oracle)
+                if len(ots_dict.get("dochtmlcon",""))>500000:
+                    _obj.delete_row(conn_oracle)
+                    log("msg too long:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon",""))))
+                    continue
+
+                if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_init):
+                    #删除数据,上线放开
+                    _obj.delete_row(conn_oracle)
+                else:
+                    log("send_msg_error111:%s,%d"%(ots_dict.get("uuid"),len(ots_dict.get("dochtmlcon",""))))
 
         except Exception as e:
             traceback.print_exc()
         finally:
-            self.pool_mq.putConnector(conn_mq)
             self.pool_oracle.putConnector(conn_oracle)
 
+    def ots2mq(self):
+        try:
+            bool_query = BoolQuery(must_queries=[RangeQuery("status",1,51)])
+
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
+                                                                                SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100),
+                                                                                ColumnsToGet(return_type=ColumnReturnType.ALL))
+            list_data = getRow_ots(rows)
+            for _data in list_data:
+                _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
+                      document_tmp_docid:_data.get(document_tmp_docid),
+                      document_tmp_status:0}
+                _document = Document(_d)
+                page_attachments = _data.get(document_tmp_attachment_path,"[]")
+
+                _document_html = Document(_data)
+                _document_html.fix_columns(self.ots_capacity,[document_tmp_dochtmlcon],True)
+
+                if page_attachments!="[]":
+                    status = random.randint(1,10)
+                    _data[document_tmp_status] = status
+                    send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
+                else:
+                    status = random.randint(11,50)
+                    _data[document_tmp_status] = status
+                    send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
+                if send_succeed:
+                    _document.update_row(self.ots_client)
+                else:
+                    log("send_msg_error2222")
+            while next_token:
+                rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
+                                                                                    SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
+                                                                                    ColumnsToGet(return_type=ColumnReturnType.ALL))
+                list_data = getRow_ots(rows)
+                for _data in list_data:
+                    _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
+                          document_tmp_docid:_data.get(document_tmp_docid),
+                          document_tmp_status:0}
+                    _document = Document(_d)
+                    page_attachments = _data.get(document_tmp_attachment_path,"[]")
+
+                    _document_html = Document(_data)
+                    _document_html.fix_columns(self.ots_capacity,[document_tmp_dochtmlcon],True)
+
+                    if page_attachments!="[]":
+                        status = random.randint(1,10)
+                        _data[document_tmp_status] = status
+                        send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
+                    else:
+                        status = random.randint(11,50)
+                        _data[document_tmp_status] = status
+                        send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
+                    if send_succeed:
+                        _document.update_row(self.ots_client)
+                    else:
+                        log("send_msg_error2222")
+        except Exception as e:
+            traceback.print_exc()
+
+
     def test_dump_docid(self):
         class TestDumpListener(ActiveMQListener):
             def on_message(self, headers):
@@ -792,6 +926,7 @@ class Dataflow_init(Dataflow):
         schedule.add_job(self.temp2mq,"cron",args=(TuDiKuangChanTemp({}),),second="*/10")
         schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoDaYiTemp({}),),second="*/10")
         schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoWenJianTemp({}),),second="*/10")
+        schedule.add_job(self.ots2mq,"cron",second="*/10")
         schedule.start()
 
 
@@ -886,13 +1021,17 @@ def del_test_doc():
             _html.delete_row(ots_client)
 
 def fixDoc_to_queue_extract():
+    pool_mq = ConnectorPool(10,20,getConnect_activateMQ)
     try:
         ots_client = getConnect_ots()
-        conn_mq = getConnect_activateMQ()
-        bool_query = BoolQuery(must_queries=[RangeQuery("status",range_to=66)])
+        ots_capacity = getConnect_ots_capacity()
+        bool_query = BoolQuery(must_queries=[
+            RangeQuery("crtime","2022-05-31"),
+            TermQuery("docchannel",114)
+        ])
 
         list_data = []
-        rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
+        rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
                                                                        SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
                                                                        columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
         print(total_count)
@@ -900,7 +1039,7 @@ def fixDoc_to_queue_extract():
         list_data.extend(list_row)
         _count = len(list_row)
         while next_token:
-            rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
+            rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
                                                                            SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
 
@@ -912,39 +1051,51 @@ def fixDoc_to_queue_extract():
         for _row in list_data:
             if "all_columns" in _row:
                 _row.pop("all_columns")
-            _html = Document_html(_row)
+            _html = Document(_row)
             task_queue.put(_html)
         def _handle(item,result_queue):
             _html = item
-            _html.fix_columns(ots_client,["dochtmlcon"],True)
+            _html.fix_columns(ots_capacity,["dochtmlcon"],True)
             print(_html.getProperties().get(document_tmp_docid))
-            send_msg_toacmq(conn_mq,json.dumps(_html.getProperties()),"/queue/dataflow_extract")
+            send_msg_toacmq(pool_mq,json.dumps(_html.getProperties()),"/queue/dataflow_extract")
         mt = MultiThreadHandler(task_queue,_handle,None,30)
         mt.run()
     except Exception as e:
         traceback.print_exc()
+    finally:
+        pool_mq.destory()
 
 def check_data_synchronization():
-    filepath = "C:\\Users\\Administrator\\Desktop\\to_check.log"
-    list_uuid = []
-    _regrex = "ID='(?P<uuid>.+)'"
-    with open(filepath,"r",encoding="utf8") as f:
-        while 1:
-            _line = f.readline()
-            if not _line:
-                break
-            _match = re.search(_regrex,_line)
-            if _match is not None:
-                _uuid = _match.groupdict().get("uuid")
-                if _uuid is not None:
-                    list_uuid.append(_uuid)
-    print(len(list_uuid))
+    # filepath = "C:\\Users\\Administrator\\Desktop\\to_check.log"
+    # list_uuid = []
+    # _regrex = "delete\s+(?P<tablename>[^\s]+)\s+.*ID='(?P<uuid>.+)'"
+    # with open(filepath,"r",encoding="utf8") as f:
+    #     while 1:
+    #         _line = f.readline()
+    #         if not _line:
+    #             break
+    #         _match = re.search(_regrex,_line)
+    #         if _match is not None:
+    #             _uuid = _match.groupdict().get("uuid")
+    #             tablename = _match.groupdict.get("tablename")
+    #             if _uuid is not None:
+    #                 list_uuid.append({"uuid":_uuid,"tablename":tablename})
+    # print("total_count:",len(list_uuid))
+
+    import pandas as pd
+    from BaseDataMaintenance.common.Utils import load
+
+
     task_queue = Queue()
     list_data = []
-    for _uuid in list_uuid:
-        _dict = {"uuid":_uuid}
+    df_data = load("uuid.pk")
+    # df_data = pd.read_excel("check.xlsx")
+    for uuid,tablename in zip(df_data["uuid"],df_data["tablename"]):
+        _dict = {"uuid":uuid,
+                 "tablename":tablename}
         list_data.append(_dict)
         task_queue.put(_dict)
+    print("qsize:",task_queue.qsize())
     ots_client = getConnect_ots()
     def _handle(_item,result_queue):
         bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
@@ -957,14 +1108,35 @@ def check_data_synchronization():
     mt = MultiThreadHandler(task_queue,_handle,None,30)
     mt.run()
     df_data = {"uuid":[],
+               "tablename":[],
                "exists":[]}
     for _data in list_data:
-        for k,v in df_data.items():
-            v.append(_data.get(k))
+        if _data["exists"]==0:
+            for k,v in df_data.items():
+                v.append(_data.get(k))
     import pandas as pd
     df2 = pd.DataFrame(df_data)
-    df2.to_excel("check.xlsx")
+    df2.to_excel("check1.xlsx")
+
+current_path = os.path.abspath(os.path.dirname(__file__))
 
+def fixDoc_to_queue_init(filename=""):
+    import pandas as pd
+    from BaseDataMaintenance.model.oracle.GongGaoTemp import dict_oracle2ots
+    if filename=="":
+        filename = os.path.join(current_path,"check.xlsx")
+    df = pd.read_excel(filename)
+    dict_oracle2ots.pop("docchannel")
+    row_name = ",".join(list(dict_oracle2ots.keys()))
+    conn = getConnection_oracle()
+    cursor = conn.cursor()
+    for uuid,tablename,_exists in zip(df["uuid"],df["tablename"],df["exists"]):
+        if _exists==0:
+            _source = str(tablename).replace("_TEMP","")
+            sql = " insert into %s(%s) select %s from %s where id='%s' "%(tablename,row_name,row_name,_source,uuid)
+            cursor.execute(sql)
+            log(sql)
+    conn.commit()
 
 if __name__ == '__main__':
     # di = Dataflow_init()
@@ -974,4 +1146,5 @@ if __name__ == '__main__':
     # de = Dataflow_ActivteMQ_extract()
     # de.start_flow_extract()
     # fixDoc_to_queue_extract()
-    check_data_synchronization()
+    # check_data_synchronization()
+    fixDoc_to_queue_init()

+ 32 - 18
BaseDataMaintenance/maintenance/preproject/fillColumns.py

@@ -77,33 +77,47 @@ def start_fill_preproject():
     preprojectFill.schedule()
 
 def delete_wrong_data():
+    import pandas as pd
     list_data = []
     task_queue = Queue()
     ots_client = getConnect_ots()
-    q1 = BoolQuery(must_queries=[
-        TermQuery("type",2)
-    ])
-    rows,next_token,total_count,is_all_succeed = ots_client.search("preproject","preproject_index",
-                                                                        SearchQuery(q1,sort=Sort(sorters=[FieldSort("uuid")]),get_total_count=True,limit=100),
-                                                                        ColumnsToGet(return_type=ColumnReturnType.NONE))
-    dict_rows = getRow_ots(rows)
-    list_data.extend(dict_rows)
-    _count = len(dict_rows)
-    while next_token:
-        rows,next_token,total_count,is_all_succeed = ots_client.search("preproject","preproject_index",
-                                                                       SearchQuery(q1,next_token=next_token,get_total_count=True,limit=100),
-                                                                       ColumnsToGet(return_type=ColumnReturnType.NONE))
-        dict_rows = getRow_ots(rows)
-        list_data.extend(dict_rows)
-        _count += len(dict_rows)
+    # q1 = BoolQuery(must_queries=[
+    #     TermQuery("type",2),
+    #     RangeQuery("crtime",'2022-05-24')
+    # ])
+    # rows,next_token,total_count,is_all_succeed = ots_client.search("preproject","preproject_index",
+    #                                                                     SearchQuery(q1,sort=Sort(sorters=[FieldSort("uuid")]),get_total_count=True,limit=100),
+    #                                                                     ColumnsToGet(return_type=ColumnReturnType.NONE))
+    # dict_rows = getRow_ots(rows)
+    # list_data.extend(dict_rows)
+    # _count = len(dict_rows)
+    # while next_token:
+    #     rows,next_token,total_count,is_all_succeed = ots_client.search("preproject","preproject_index",
+    #                                                                    SearchQuery(q1,next_token=next_token,get_total_count=True,limit=100),
+    #                                                                    ColumnsToGet(return_type=ColumnReturnType.NONE))
+    #     dict_rows = getRow_ots(rows)
+    #     list_data.extend(dict_rows)
+    #     _count += len(dict_rows)
+    #     print("%d/%d"%(_count,total_count))
+    #     # if _count>10000:
+    #     #     break
+    df = pd.read_csv("fa45de36-6e47-4817-b2d6-e79ea8c154601.csv")
+    for tenderee,product,may_begin,may_end in zip(df["tenderee"],df["product"],df["may_begin"],df["may_end"]):
+        _data = {"tenderee":tenderee,
+                 "product":product,
+                 "may_begin":str(may_begin),
+                 "may_end":str(may_end)}
+        list_data.append(_data)
     for _data in list_data:
         may_begin = _data.get("may_begin")
         may_end = _data.get("may_end")
-        if len(may_begin)!=10 or len(may_end)!=10:
-            task_queue.put(_data)
+        # if len(may_begin)!=10 or len(may_end)!=10:
+        #     task_queue.put(_data)
+        task_queue.put(_data)
     def _handle(item,result_queue):
         _preproject = Preproject(item)
         _preproject.delete_row(ots_client)
+    log("====%d"%task_queue.qsize())
     mt = MultiThreadHandler(task_queue,_handle,None,30)
     mt.run()
 

+ 50 - 0
BaseDataMaintenance/maintenance/preproject/remove_dump.py

@@ -0,0 +1,50 @@
+
+
+from queue import Queue
+from BaseDataMaintenance.dataSource.source import getConnect_ots
+from tablestore import *
+from BaseDataMaintenance.model.ots.Preproject import *
+from BaseDataMaintenance.model.ots.Preproject_dump import *
+from BaseDataMaintenance.common.Utils import *
+from BaseDataMaintenance.common.multiThread import MultiThreadHandler
+from apscheduler.schedulers.blocking import BlockingScheduler
+
+def drop_dump_data():
+
+    def drop_data(item,result_queue):
+        preproject_dump = Preproject_dump(item)
+        preproject = Preproject(item)
+        if preproject.delete_row(ots_client):
+            preproject_dump.delete_row(ots_client)
+
+
+    task_queue = Queue()
+    ots_client = getConnect_ots()
+    bool_query = BoolQuery(must_queries=[ExistsQuery(preproject_tenderee)])
+    rows,next_token,total_count,is_all_succeed = ots_client.search("preproject_dump","preproject_dump_index",
+                                                                   SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(preproject_tenderee)]),get_total_count=True,limit=100),
+                                                                   columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
+    list_data = getRow_ots(rows)
+    log("to drop preproject dump data:%d"%total_count)
+    for _data in list_data:
+        task_queue.put(_data)
+    mt = MultiThreadHandler(task_queue,drop_data,None,30)
+    mt.run()
+    while next_token:
+        rows,next_token,total_count,is_all_succeed = ots_client.search("preproject_dump","preproject_dump_index",
+                                                                       SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
+                                                                       columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
+        list_data = getRow_ots(rows)
+        for _data in list_data:
+            task_queue.put(_data)
+        mt.run()
+
+def start_drop_preproject_dump():
+
+    scheduler = BlockingScheduler()
+    scheduler.add_job(drop_dump_data,"cron",minute="*/1")
+    scheduler.start()
+
+if __name__ == '__main__':
+    # drop_dump_data()
+    start_drop_preproject_dump()

+ 50 - 0
BaseDataMaintenance/model/ots/Preproject_dump.py

@@ -0,0 +1,50 @@
+from BaseDataMaintenance.model.ots.BaseModel import BaseModel
+
+preproject_tenderee = "tenderee"
+preproject_product = "product"
+preproject_may_begin = "may_begin" #预测开始时间
+preproject_may_end = "may_end"# 预测结束时间
+preproject_crtime = "crtime" #创建时间
+preproject_type = "type"#"0-周期项目,1-采购意向"
+preproject_prob = "prob"#概率
+preproject_avg_period = "avg_period"#平均周期
+preproject_min_period = "min_period"#最小周期
+preproject_max_period = "max_period"#最大周期
+preproject_periods = "periods" #周期数
+preproject_json_docids = "json_docids"#历史项目,格式["1,2,3","4,5,6"],一个字符串代表同一个项目
+preproject_province = "province"
+preproject_city = "city"
+preproject_district = "district"
+preproject_last_project_name = "last_project_name"
+preproject_project_name = "project_name" #此次项目名称
+preproject_last_page_time = "last_page_time"
+preproject_last_bidding_budget = "last_bidding_budget"#上次预算
+preproject_bidding_budget = "bidding_budget"#此次预算
+preproject_last_bidway = "last_bidway"
+preproject_last_win_tenderer = "last_win_tenderer"#上次中标人
+preproject_last_win_bid_price = "last_win_bid_price"#上次中标金额
+preproject_last_agency = "last_agency"#上次代理
+preproject_demand = "demand"#此次需求
+preproject_last_tenderee_contact = "last_tenderee_contact" #上次采购联系人
+preproject_last_tenderee_phone = "last_tenderee_phone"
+preproject_uuid = "uuid"
+preproject_has_bidfile="has_bidfile"
+
+
+
+class Preproject_dump(BaseModel):
+
+    def __init__(self,_dict):
+
+        for k,v in _dict.items():
+            self.setValue(k,v,True)
+        self.table_name = "preproject_dump"
+
+    def getPrimary_keys(self):
+        return [preproject_tenderee,preproject_product,preproject_may_begin,preproject_may_end]
+
+
+
+
+if __name__=="__main__":
+    pass

+ 12 - 4
BaseDataMaintenance/model/ots/document.py

@@ -22,6 +22,10 @@ document_fingerprint = "fingerprint"
 document_opertime = "opertime"
 document_docchannel = "docchannel"
 document_original_docchannel = "original_docchannel"
+document_area = "area"
+document_province = "province"
+document_city = "city"
+document_district = "district"
 
 class Document(BaseModel):
 
@@ -231,7 +235,7 @@ def turn_document_status():
 
         bool_query = BoolQuery(
             must_queries=[
-                WildcardQuery("web_source_no","DX004354*"),
+                MatchPhraseQuery("doctitle","珠海城市职业技术学院2022年05月至2022年06月政府采购意向"),
                 # BoolQuery(should_queries=[
                 #                           # TermQuery("tenderee","山西利民工业有限责任公司"),
                 #                           # MatchPhraseQuery("doctitle","中国电信"),
@@ -249,7 +253,7 @@ def turn_document_status():
 
         rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
                                                                        SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
-                                                                       columns_to_get=ColumnsToGet(["original_docchannel"],return_type=ColumnReturnType.SPECIFIED))
+                                                                       columns_to_get=ColumnsToGet([document_area],return_type=ColumnReturnType.SPECIFIED))
         list_data = getRow_ots(rows)
         print(total_count)
         _count = len(list_data)
@@ -259,7 +263,7 @@ def turn_document_status():
         while next_token:
             rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
                                                                            SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
-                                                                           columns_to_get=ColumnsToGet(["original_docchannel"],return_type=ColumnReturnType.SPECIFIED))
+                                                                           columns_to_get=ColumnsToGet([document_area],return_type=ColumnReturnType.SPECIFIED))
             list_data = getRow_ots(rows)
             _count += len(list_data)
             print("%d/%d"%(_count,total_count))
@@ -302,8 +306,12 @@ def turn_document_status():
         # item.all_columns.remove(document_dochtmlcon)
 
         #change status
-        item.setValue(document_docchannel,item.getProperties().get(document_original_docchannel),True)
+        # item.setValue(document_docchannel,item.getProperties().get(document_original_docchannel),True)
         # item.setValue(document_status,random.randint(151,171),True)
+        item.setValue(document_area,"华南",True)
+        item.setValue(document_province,"广东",True)
+        item.setValue(document_city,"珠海",True)
+        item.setValue(document_district,"金湾区",True)
         item.update_row(ots_client)
         log("update %d status done"%(item.getProperties().get(document_docid)))
         pass

+ 1 - 0
BaseDataMaintenance/model/ots/document_tmp.py

@@ -13,6 +13,7 @@ document_tmp_attachment_path = "page_attachments"
 document_tmp_attachment_path_filemd5 = "fileMd5"
 document_tmp_attachment_path_fileTitle = "fileTitle"
 document_tmp_attachment_path_fileLink = "fileLink"
+document_tmp_uuid = "uuid"
 document_tmp_crtime = "crtime"
 document_tmp_status = "status"
 document_tmp_tenderee = "tenderee"

+ 9 - 0
BaseDataMaintenance/start_drop_preproject_dump.py

@@ -0,0 +1,9 @@
+import sys
+import os
+sys.path.append(os.path.dirname(__file__)+"/..")
+
+from BaseDataMaintenance.maintenance.preproject.remove_dump import start_drop_preproject_dump
+
+
+if __name__ == '__main__':
+    start_drop_preproject_dump()