luojiehua 4 лет назад
Родитель
Сommit
7b9d284cd0

+ 27 - 0
BaseDataMaintenance/common/Utils.py

@@ -16,6 +16,7 @@ from pai_tf_predict_proto import tf_predict_pb2
 import requests
 
 import time
+from bs4 import BeautifulSoup
 
 
 model_w2v = None
@@ -356,6 +357,32 @@ def popNoneFromDict(_dict):
         _dict.pop(k)
     return _dict
 
+
+pattern_attachment = re.compile("\.(?P<attachment>jpg|jpeg|png|swf|tif|pdf|doc|docx|xls|xlsx|zip|rar|tar|7z|wim)$")
+def getAttachmentTypeFromUrl(url):
+    _match = re.search(pattern_attachment,url)
+    if _match is not None:
+        return _match.groupdict().get("attachment")
+    return None
+
+def getAttachmentUrls(sourceHtml):
+    list_urls = []
+    _soup = BeautifulSoup(sourceHtml,"lxml")
+    set_types = set()
+    list_a = _soup.find_all("a")
+    for _a in list_a:
+        _url = _a.attrs.get("href","")
+        _type = getAttachmentTypeFromUrl(_url)
+        if _type is not None:
+            list_urls.append({"url":_url,"type":_type})
+    list_img = _soup.find_all("img")
+    for _img in list_img:
+        _url = _img.attrs.get("src","")
+        _type = getAttachmentTypeFromUrl(_url)
+        if _type is not None:
+            list_urls.append({"url":_url,"type":_type})
+    return list_urls
+
 def getCurrent_date(format="%Y-%m-%d %H:%M:%S"):
     _time = time.strftime(format,time.localtime())
     return _time

+ 12 - 0
BaseDataMaintenance/dataSource/download.py

@@ -0,0 +1,12 @@
+
+import requests
+
+def download(_url):
+    try:
+        _res = requests.get(_url,stream=True,timeout=100)
+        if _res.status_code==200:
+            return True,_res.content
+        else:
+            return False,""
+    except Exception as e:
+        return False,""

+ 16 - 0
BaseDataMaintenance/dataSource/interface.py

@@ -0,0 +1,16 @@
+
+import requests
+import json
+
+interface_url = ""
+DEFAULT_TIMEOUT = 300
+
+def getAttachDealInterface(_data,_type):
+    _json = {"data":_data,
+            "type":_type}
+    headers = {"Content-Type":"application/json"}
+    _resp = requests.post(interface_url,json=_json,headers=headers,timeout=DEFAULT_TIMEOUT)
+    if _resp.status_code==200:
+        _result = json.loads(_resp.content)
+        return True,_result["html"]
+    return False,""

+ 1 - 1
BaseDataMaintenance/dataSource/source.py

@@ -67,7 +67,7 @@ def getConnect_neo4j():
 
 import platform
 
-if platform.system()=="windows":
+if platform.system()=="Windows":
     OTS_URL = "https://bxkc-ots.cn-hangzhou.ots.aliyuncs.com"
 else:
     OTS_URL = "https://bxkc-ots.cn-hangzhou.vpc.tablestore.aliyuncs.com"

+ 259 - 0
BaseDataMaintenance/maintenance/attachment/DataSynchronization.py

@@ -0,0 +1,259 @@
+#encoding:UTF8
+from BaseDataMaintenance.dataSource.pool import ConnectorPool
+from BaseDataMaintenance.dataSource.source import *
+from BaseDataMaintenance.common.Utils import *
+import queue
+from tablestore import *
+from multiprocessing import RLock
+from threading import Thread
+from apscheduler.schedulers.blocking import BlockingScheduler
+from BaseDataMaintenance.common.multiThread import MultiThreadHandler
+from BaseDataMaintenance.model.mysql.attach_document_richtext import attach_document_richtext
+from BaseDataMaintenance.model.mysql.BaseModel import BaseModel
+from BaseDataMaintenance.model.ots.document import document
+import traceback
+from BaseDataMaintenance.dataSource.download import download
+import base64
+from BaseDataMaintenance.dataSource.interface import getAttachDealInterface
+
+STATUS_TODEAL = 10
+STATUS_DEALING = 20
+STATUS_DONE = 30
+STATUS_FAILED = 40
+MAX_DEAL_COUNT = 5
+
+class Data_toDeal_Synchronization():
+
+    def __init__(self):
+        self.done_lock = RLock()
+        self.isDone = False
+        self.document_table = "document"
+        self.document_table_index = "document_index"
+        self.pool_ots = ConnectorPool(init_num=10,max_num=40,method_init=getConnect_ots)
+        self.pool_mysql = ConnectorPool(init_num=10,max_num=40,method_init=getConnection_mysql)
+
+    def producer(self,task_queue):
+        '''
+        :return:生产数据
+        '''
+        ots_client = self.pool_ots.getConnector()
+        try:
+            #获取最新的crtime
+            conn_mysql = getConnection_mysql()
+            cursor = conn_mysql.cursor()
+            sql = "select max(create_time) from attach_document_richtext "
+            cursor.execute(sql)
+            rows = cursor.fechall()
+            max_crtime = ""
+            if len(rows)>0:
+                max_crtime = rows[0][0]
+            conn_mysql.close()
+
+            bool_query = BoolQuery(must_queries=[RangeQuery("attachmentTypes","",include_lower=False),
+                                                 RangeQuery("crtime",max_crtime)])
+
+            columns = ["partitionkey","docid","crtime"]
+            rows, next_token, total_count, is_all_succeed = ots_client.search(self.document_table, self.document_table_index,
+                                                                              SearchQuery(bool_query ,sort=Sort(sorters=[FieldSort("crtime",SortOrder.ASC)]), limit=100, get_total_count=True),
+                                                                              ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
+            list_data = getRow_ots(rows)
+            for _data in list_data:
+                _document = document(_data)
+                task_queue.put(_document,True)
+            while next_token:
+                rows, next_token, total_count, is_all_succeed = ots_client.search(self.document_table, self.document_table_index,
+                                                                                  SearchQuery(bool_query ,next_token=next_token, limit=100, get_total_count=True),
+                                                                                  ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
+                list_data = getRow_ots(rows)
+                for _data in list_data:
+                    _document = document(_data)
+                    task_queue.put(_document,True)
+        except Exception as e:
+            pass
+        self.pool_ots.putConnector(ots_client)
+
+    def comsumer(self,task_queue):
+
+        def _handle(_document,result_queue,pool_mysql):
+            conn_mysql = pool_mysql.getConnector()
+            cursor = conn_mysql.cursor()
+            #插入mysql
+            _attach_document = attach_document_richtext({"docid":_document.docid,
+                                                         "create_time":_document.crtime,
+                                                         "insert_time":getCurrent_date(),
+                                                         "type":0})
+            _attach_document.insert_row(conn_mysql)
+            pool_mysql.putConnector(conn_mysql)
+
+
+        result_queue = queue.Queue()
+
+        mt = MultiThreadHandler(task_queue,_handle,result_queue,thread_count=30,pool_mysql=self.pool_mysql)
+        mt.run()
+
+
+    def waitTask(self,task_queue):
+        for i in range(60):
+            if task_queue.qsize()>0:
+                return True
+            else:
+                time.sleep(1)
+        return False
+
+    def maxcompute2ots(self):
+
+        task_queue = queue.Queue(maxsize=10000)
+
+        thread_producer = Thread(target=self.producer,args=([task_queue]))
+        thread_producer.start()
+
+        if self.waitTask(task_queue):
+            thread_comsumer = Thread(target=self.comsumer,args=([task_queue]))
+            thread_comsumer.start()
+
+
+    def scheduler(self):
+        _scheduler = BlockingScheduler()
+        _scheduler.add_job(self.maxcompute2ots,"cron",minute="*/10")
+        _scheduler.start()
+
+class Data_Dealing_Synchronization():
+
+    def __init__(self):
+        self.done_lock = RLock()
+        self.isDone = False
+        self.document_table = "document"
+        self.document_table_index = "document_index"
+        self.pool_ots = ConnectorPool(init_num=10,max_num=40,method_init=getConnect_ots)
+        self.pool_mysql = ConnectorPool(init_num=10,max_num=40,method_init=getConnection_mysql)
+        self.deal_type = 0#实时处理
+
+    def producer(self,task_queue):
+        '''
+        :return:生产数据
+        '''
+        conn = self.pool_mysql.getConnector()
+        list_attachDocument = BaseModel.select_rows(conn,attach_document_richtext,"attach_document_richtext",[("status",10),("type",0)],2000)
+        for _ad in list_attachDocument:
+            task_queue.put(_ad,True)
+
+    def comsumer(self,task_queue):
+
+        def _handle(_ad,result_queue,pool_ots,pool_mysql):
+            ots_client = pool_ots.getConnector()
+            conn = pool_mysql.getConnector()
+
+            try:
+                docid = _ad.docid
+                _ad.status = STATUS_DEALING
+                if _ad.getProperties().get("deal_count",0)>=MAX_DEAL_COUNT:
+                    _ad.status = STATUS_FAILED
+                    _ad.setValue("message",str(_ad.getProperties().get("message",""))+" 超过处理次数 ")
+                    _ad.update_row(conn)
+                else:
+                    _ad.setValue("deal_count",_ad.getProperties().get("deal_count",0)+1)
+                    if _ad.update_row(conn,[("status",STATUS_TODEAL)])==1:
+                        partitionkey = int(docid)%500+1
+                        #取出document的html,获得所有链接
+                        _dict = document.search(ots_client,"document",[("partitionkey",partitionkey),("docid",int(docid))],["dochtmlcon"])
+                        if _dict is not None:
+                            _document = document(_dict)
+                            _dochtmlcon = _document.getProperties().get("dochtmlcon","")
+
+                            list_url = getAttachmentUrls()
+                            list_html = []
+                            #对每个链接的内容进行下载
+                            message = ""
+
+                            if len(list_url)==0:
+                                message += " 无链接 "
+
+                            download_flag = False
+                            deal_flag = False
+                            for _url,_type in list_url:
+                                if not _document.isLegalUrl(_url,self.deal_type):
+                                    continue
+                                _success,_data = download(_url)
+                                if not _success:
+                                    message += " 链接%s下载失败 "%_url
+                                else:
+                                    download_flag = True
+                                    _data_base64 = base64.b64encode(_data)
+                                    #调用接口处理结果
+                                    _success,_html = getAttachDealInterface(_data_base64,_type)
+                                    if _success:
+                                        deal_flag = True
+                                        list_html.append(_html)
+                                    else:
+                                        message += " 链接%s处理失败 "%_url
+                            _attach_status = STATUS_DONE
+                            if download_flag and deal_flag:
+                                #更新document的html
+                                _document.updateAttachment(list_html)
+                                _document.update_row(ots_client)
+                            else:
+                                _attach_status = STATUS_FAILED
+
+                            _ad.setValue("message",message)
+                            _ad.setValue("status",_attach_status)
+                            _ad.setValue("update_time",getCurrent_date())
+                            _ad.update_row(conn)
+                        else:
+                            _ad.setValue("message"," 公告不存在 ")
+                            _ad.setValue("status",STATUS_FAILED)
+                            _ad.setValue("update_time",getCurrent_date())
+                            _ad.update_row(conn)
+
+            except Exception as e:
+                log("comsumer failed cause of %s"%(str(e)))
+                log(traceback.format_exc())
+                _ad.status = STATUS_TODEAL
+                _ad.update_row(conn)
+
+            pool_ots.putConnector(ots_client)
+            pool_mysql.putConnector(conn)
+
+
+        result_queue = queue.Queue()
+
+        mt = MultiThreadHandler(task_queue,_handle,result_queue,thread_count=30,pool_ots=self.pool_ots,pool_mysql=self.pool_mysql)
+        mt.run()
+
+
+    def waitTask(self,task_queue):
+        for i in range(60):
+            if task_queue.qsize()>0:
+                return True
+            else:
+                time.sleep(1)
+        return False
+
+    def maxcompute2ots(self):
+
+        task_queue = queue.Queue(maxsize=10000)
+
+        thread_producer = Thread(target=self.producer,args=([task_queue]))
+        thread_producer.start()
+
+        if self.waitTask(task_queue):
+            thread_comsumer = Thread(target=self.comsumer,args=([task_queue]))
+            thread_comsumer.start()
+
+
+    def scheduler(self):
+        _scheduler = BlockingScheduler()
+        _scheduler.add_job(self.maxcompute2ots,"cron",minute="*/10")
+        _scheduler.start()
+
+def startSychro_toDeal():
+    ds_toDeal = Data_toDeal_Synchronization()
+    ds_toDeal.scheduler()
+
+def startSychro_Dealing():
+    ds_Dealing = Data_Dealing_Synchronization()
+    ds_Dealing.scheduler()
+
+if __name__=="__main__":
+    pass
+
+

+ 0 - 0
BaseDataMaintenance/maintenance/attachment/__init__.py


+ 6 - 0
BaseDataMaintenance/maintenance/attachment/readme.md

@@ -0,0 +1,6 @@
+附件识别说明
+
+1.定时器查询数据导入到document_attach_rec_log表中
+
+2.定时器从document_attach_rec_log表中查询数据识别附件并修改状态
+

+ 51 - 0
BaseDataMaintenance/maintenance/document/DataSynchronization.py

@@ -0,0 +1,51 @@
+
+
+from tablestore import *
+from apscheduler.schedulers.blocking import BlockingScheduler
+from BaseDataMaintenance.dataSource.source import getConnect_ots
+from BaseDataMaintenance.common.Utils import *
+from BaseDataMaintenance.model.ots.document import document
+import random
+
+
+class TurnHandInputDocument():
+
+
+    def __init__(self):
+        self.document_table = "document"
+        self.document_table_index = "document_index"
+        self.ots_client = getConnect_ots()
+
+
+
+    def turn_status(self):
+
+
+
+        bool_query = BoolQuery(must_queries=[RangeQuery("status",101,120,include_lower=True,include_upper=True),
+                                             TermQuery("web_source_no",'000000')])
+
+        columns = ["partitionkey","docid","status"]
+        rows, next_token, total_count, is_all_succeed = self.ots_client.search(self.document_table, self.document_table_index,
+                                                                          SearchQuery(bool_query ,sort=Sort(sorters=[FieldSort("docid",SortOrder.ASC)]), limit=100, get_total_count=True),
+                                                                          ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
+        list_data = getRow_ots(rows)
+
+        log("processed number %d"%(len(list_data)))
+        for _data in list_data:
+            _document = document(_data)
+            new_status = random.randint(201,300)
+            _document.setValue("status",new_status,True)
+            _document.update_row(self.ots_client)
+
+    def scheduler(self):
+        _scheduler = BlockingScheduler()
+        _scheduler.add_job(self.turn_status,"cron",second="*/30")
+        _scheduler.start()
+
+def startSychro_handInput():
+    turnStatus = TurnHandInputDocument()
+    turnStatus.scheduler()
+
+if __name__=="__main__":
+    startSychro_handInput()

+ 0 - 0
BaseDataMaintenance/maintenance/document/__init__.py


+ 125 - 0
BaseDataMaintenance/model/mysql/BaseModel.py

@@ -0,0 +1,125 @@
+
+
+class BaseModel():
+
+    def __init__(self):
+        raise NotImplementedError()
+
+    def getPrimary_keys(self):
+        raise NotImplementedError()
+
+    def getProperties(self):
+        return self.__dict__
+
+    def setValue(self,k,v,isColumn=False):
+        if "all_columns" not in self.__dict__:
+            self.all_columns = []
+        self.__dict__[k] = v
+        if isColumn:
+            if k not in (set(self.all_columns)):
+                self.all_columns.append(k)
+
+    def insert_row(self,conn):
+        cursor = conn.cursor()
+        sql = "insert into %s"%(self.table_name)
+        s_columns = "("
+        s_values = "values("
+        _set_columns = set(self.all_columns)
+        for k,v in self.__dict__.items():
+            if k in _set_columns:
+                if v is not None and str(v)!="":
+                    s_columns += "%s,"%k
+                    if isinstance(v,str):
+                        s_values += "'%s',"%v
+                    else:
+                        s_values += "%d,"%v
+        s_columns = "%s)"%s_columns[:-1]
+        s_values = "%s)"%s_values[:-1]
+        sql = "%s%s%s"%(sql,s_columns,s_values)
+        cursor.execute(sql)
+        conn.commit()
+
+
+    def update_row(self,conn,conditions=[]):
+        cursor = conn.cursor()
+        sql = "update %s set "%(self.table_name)
+        s_columns = ""
+        s_where = " where "
+        _set_columns = set(self.all_columns)
+        _set_keys = set(self.getPrimary_keys())
+        for k,v in self.__dict__.items():
+            if k in _set_columns and k not in _set_keys:
+                if v is not None and str(v)!="":
+                    s_columns += "%s="%k
+                    if isinstance(v,str):
+                        s_columns += "'%s',"%v
+                    else:
+                        s_columns += "%d,"%v
+            elif k in _set_keys:
+                if v is None or str(v)=="":
+                    raise RuntimeError("主键%s为空"%k)
+                s_where += "%s="%k
+                if isinstance(v,str):
+                    s_where += "'%s' and "%v
+                else:
+                    s_where += "%d and "%v
+        for k,v in conditions:
+            s_where += "%s="%k
+            if isinstance(v,str):
+                s_where += "'%s' and "%v
+            else:
+                s_where += "%d and "%v
+        s_columns = "%s"%s_columns[:-1]
+        s_where = "%s"%s_where[:-1]
+        sql = "%s%s%s"%(sql,s_columns,s_where)
+        update_rows = cursor.execute(sql)
+        conn.commit()
+        return update_rows
+
+    @staticmethod
+    def exists(conn,table_name,conditions):
+        s_where = ""
+        for k,v in conditions:
+            s_where += "%s="%k
+            if isinstance(v,str):
+                s_where += "'%s' and "%v
+            else:
+                s_where += "%d and "%v
+        cursor = conn.cursor()
+        sql = "select count(1) from %s %s"%(table_name,s_where)
+        cursor.execute(sql)
+        rows = cursor.fetchall()
+        if rows[0][0]==0:
+            return False
+        return True
+
+    @staticmethod
+    def select_rows(conn,cls,table_name,conditions,limit=None):
+        list_result = []
+        s_limit = ""
+        if limit is not None:
+            s_limit = "limit %d"%limit
+        s_where = ""
+        for k,v in conditions:
+            s_where += "%s="%k
+            if isinstance(v,str):
+                s_where += "'%s' and "%v
+            else:
+                s_where += "%d and "%v
+
+        cursor = conn.cursor()
+        sql = "select * from %s %s %s"%(table_name,s_where,s_limit)
+        cursor.execute(sql)
+
+        vol = cursor.description
+        rows = cursor.fetchall()
+        for row in rows:
+            _dict = {}
+            for _vol,_val in zip(vol,row):
+                _name = _vol[0]
+                _dict[_name] = _val
+            list_result.append(cls(_dict))
+        return list_result
+
+
+

+ 0 - 0
BaseDataMaintenance/model/mysql/__init__.py


+ 21 - 0
BaseDataMaintenance/model/mysql/attach_document_richtext.py

@@ -0,0 +1,21 @@
+from BaseDataMaintenance.model.mysql.BaseModel import BaseModel
+
+
+class attach_document_richtext(BaseModel):
+
+    def __init__(self,_dict):
+        self.setValue("id",_dict.get("id"),True)
+        self.setValue("docid",_dict.get("docid"),True)
+        self.setValue("status",_dict.get("status",10),True)
+        self.setValue("message",_dict.get("message",""),True)
+        self.setValue("create_time",_dict.get("create_time",""),True)
+        self.setValue("insert_time",_dict.get("insert_time",""),True)
+        self.setValue("update_time",_dict.get("update_time",""),True)
+        self.setValue("type",_dict.get("type",0),True)
+        self.setValue("deal_count",_dict.get("deal_count",0),True)
+
+        self.table_name = "attach_document_richtext"
+
+    def getPrimary_keys(self):
+        return ["id"]
+

+ 2 - 1
BaseDataMaintenance/model/ots/BaseModel.py

@@ -22,7 +22,8 @@ class BaseModel():
             self.all_columns = []
         self.__dict__[k] = v
         if isColumn:
-            self.all_columns.append(k)
+            if k not in (set(self.all_columns)):
+                self.all_columns.append(k)
 
     def getAll_columns(self):
         return list(self.__dict__.keys())

+ 49 - 0
BaseDataMaintenance/model/ots/document.py

@@ -0,0 +1,49 @@
+from BaseDataMaintenance.model.ots.BaseModel import BaseModel
+from tablestore import *
+from BaseDataMaintenance.common.Utils import *
+from bs4 import BeautifulSoup
+
+class document(BaseModel):
+
+    def __init__(self,_dict):
+        for k,v in _dict.items():
+            self.setValue(k,v,True)
+        self.table_name = "document"
+        self.prefixs = ["www.bidizhaobiao.com","bxkc.oss-cn-shanghai.aliyuncs.com"]
+
+    def getPrimary_keys(self):
+        return ["partitionkey","docid"]
+
+    def delete_row(self,ots_client):
+        raise NotImplementedError()
+
+    def isLegalUrl(self,_url,_type):
+        _flag = False
+        for _prefix in self.prefixs:
+            if _url.find(_prefix)>=0:
+                _flag = True
+        if _type==0:
+            if _flag:
+                return True
+            else:
+                return False
+        else:
+            if _flag:
+                return False
+            else:
+                return True
+
+    def updateAttachment(self,list_html):
+        if len(list_html)>0:
+            _text = '\n<div style="display:none;" class="richTextFetch">%s</div>'%("\n".join(list_html))
+            _dochtmlcon = self.getProperties().get("dochtmlcon")
+            if _dochtmlcon is not None:
+                _soup = BeautifulSoup(_dochtmlcon,"lxml")
+                _node = _soup.find("div",id="richTextFetch")
+                if _node is not None:
+                    _node.decompose()
+                self.setValue("dochtmlcon",str(_soup)+_text)
+
+
+
+

+ 5 - 0
BaseDataMaintenance/readme.md

@@ -0,0 +1,5 @@
+将执行环节ENV放在BaseDataMaintenance同级路径下如/home/python
+
+1.执行cd /home/python
+2.执行 ./ENV/bin/python ./BaseDataMaintenance/start_*.py
+

+ 13 - 0
BaseDataMaintenance/start_sychro_attachRec.py

@@ -0,0 +1,13 @@
+
+import sys
+import os
+sys.path.append(os.path.dirname(__file__)+"/..")
+
+from BaseDataMaintenance.maintenance.attachment.DataSynchronization import startSychro_toDeal,startSychro_Dealing
+
+if __name__=="__main__":
+    for item in sys.argv:
+        if "todeal"==item:
+            startSychro_toDeal()
+        if "dealing"==item:
+            startSychro_Dealing()

+ 9 - 0
BaseDataMaintenance/start_sychro_handInput.py

@@ -0,0 +1,9 @@
+
+import sys
+import os
+sys.path.append(os.path.dirname(__file__)+"/..")
+
+from BaseDataMaintenance.maintenance.document.DataSynchronization import startSychro_handInput
+
+if __name__=="__main__":
+    startSychro_handInput()

+ 2 - 7
test.py

@@ -1,8 +1,3 @@
 
-def a():
-    try:
-        a = 1/0
-    except Exception as e:
-        pass
-
-a()
+print('265\u4e07')
+import urllib.request as request