Переглянути джерело

修复附件识别和要素提取消费不足问题

luojiehua 1 рік тому
батько
коміт
d4bf291905
1 змінених файлів з 75 додано та 22 видалено
  1. 75 22
      BaseDataMaintenance/maintenance/dataflow_mq.py

+ 75 - 22
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -88,6 +88,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
         #     self.list_attachment_comsumer.append(listener_attachment)
 
         self.attach_pool = ConnectorPool(10,30,getConnection_postgres)
+        self.redis_pool = ConnectorPool(10,30,getConnect_redis_doc)
         self.conn_mq = getConnect_activateMQ()
         self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
 
@@ -344,10 +345,12 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                     attach.setValue(attachment_status,_ots_attach.getProperties().get(attachment_status,""))
                     attach.setValue(attachment_filetype,_ots_attach.getProperties().get(attachment_filetype,""))
                     attach.setValue(attachment_classification,_ots_attach.getProperties().get(attachment_classification,""))
-                    if attach.exists(self.attach_pool):
-                        attach.update_row(self.attach_pool)
-                    else:
-                        attach.insert_row(self.attach_pool)
+                    # if attach.exists(self.attach_pool):
+                    #     attach.update_row(self.attach_pool)
+                    # else:
+                    #     attach.insert_row(self.attach_pool)
+                    self.putAttach_json_toRedis(filemd5,json.dumps(attach.getProperties()))
+
 
                     try:
                         if os.exists(localpath):
@@ -374,11 +377,12 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                     _ots_attach = attachment(attach.getProperties_ots())
                     _ots_attach.update_row(self.ots_client)
 
-                    #更新postgres
-                    if attach.exists(self.attach_pool):
-                        attach.update_row(self.attach_pool)
-                    else:
-                        attach.insert_row(self.attach_pool)
+                    # #更新postgres
+                    # if attach.exists(self.attach_pool):
+                    #     attach.update_row(self.attach_pool)
+                    # else:
+                    #     attach.insert_row(self.attach_pool)
+                    self.putAttach_json_toRedis(filemd5,json.dumps(attach.getProperties()))
 
 
                     if local_exists:
@@ -453,11 +457,12 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                 _ots_attach = attachment(attach.getProperties_ots())
                 _ots_attach.update_row(self.ots_client) #线上再开放更新
 
-                #更新postgres
-                if attach.exists(self.attach_pool):
-                    attach.update_row(self.attach_pool)
-                else:
-                    attach.insert_row(self.attach_pool)
+                # #更新postgres
+                # if attach.exists(self.attach_pool):
+                #     attach.update_row(self.attach_pool)
+                # else:
+                #     attach.insert_row(self.attach_pool)
+                self.putAttach_json_toRedis(filemd5,json.dumps(attach.getProperties()))
 
 
                 if local_exists:
@@ -503,6 +508,43 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
         if len(_path)>1:
             return _path[1]
 
+    def getAttach_json_fromRedis(self,filemd5):
+        db = self.redis_pool.getConnector()
+        try:
+
+            _key = "attach-%s"%(filemd5)
+            _attach_json = db.get(_key)
+            return _attach_json
+        except Exception as e:
+            log("getAttach_json_fromRedis error %s"%(str(e)))
+        finally:
+            try:
+                if db.connection.check_health():
+                    self.redis_pool.putConnector(db)
+            except Exception as e:
+                pass
+        return None
+
+    def putAttach_json_toRedis(self,filemd5,extract_json):
+
+        db = self.redis_pool.getConnector()
+        try:
+            _key = "attach-%s"%(filemd5)
+            _extract_json = db.set(str(_key),extract_json)
+            db.expire(_key,3600*3)
+            return _extract_json
+        except Exception as e:
+            log("putExtract_json_toRedis error%s"%(str(e)))
+            traceback.print_exc()
+        finally:
+            try:
+                if db.connection.check_health():
+                    self.redis_pool.putConnector(db)
+            except Exception as e:
+                pass
+
+
+
     def getAttachments(self,list_filemd5,_dochtmlcon):
         conn = self.attach_pool.getConnector()
         #搜索postgres
@@ -513,15 +555,26 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                     to_find_md5.append(_filemd5)
 
             conditions = ["filemd5 in ('%s')"%("','".join(to_find_md5))]
-            list_attachment =  Attachment_postgres.select_rows(conn,Attachment_postgres,"attachment",conditions)
-            log("select localpath database %d/%d"%(len(list_attachment),len(to_find_md5)))
+            list_attachment = []
+
             set_md5 = set()
-            for _attach in list_attachment:
-                set_md5.add(_attach.getProperties().get(attachment_filemd5))
-            list_not_in_md5 = []
+            # list_attachment =  Attachment_postgres.select_rows(conn,Attachment_postgres,"attachment",conditions)
+            # for _attach in list_attachment:
+            #     set_md5.add(_attach.getProperties().get(attachment_filemd5))
+
             for _filemd5 in to_find_md5:
+                _json = self.getAttach_json_fromRedis(_filemd5)
+
+                if _json is not None:
+                    set_md5.add(_filemd5)
+                    list_attachment.append(Attachment_postgres(json.loads(_json)))
+
+            log("select localpath database %d/%d"%(len(set_md5),len(to_find_md5)))
+
+            for _filemd5 in to_find_md5:
+
                 if _filemd5 not in set_md5:
-                    list_not_in_md5.append(_filemd5)
+
                     _path = self.getAttachPath(_filemd5,_dochtmlcon)
 
 
@@ -689,7 +742,7 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
             self.extract_interfaces[_i][1] = current_weight/self.whole_weight
 
         self.comsumer_count = 50
-        self.pool_postgres = ConnectorPool(10,self.comsumer_count,getConnection_postgres)
+        # self.pool_postgres = ConnectorPool(10,self.comsumer_count,getConnection_postgres)
         self.pool_redis_doc = ConnectorPool(10,self.comsumer_count,getConnect_redis_doc)
         self.conn_mq = getConnect_activateMQ()
         self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
@@ -1086,7 +1139,7 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
         schedule = BlockingScheduler()
         schedule.add_job(self.flow_extract_producer,"cron",second="*/20")
         schedule.add_job(self.process_extract_failed,"cron",minute="*/5")
-        schedule.add_job(self.delete_document_extract,"cron",hour="*/5")
+        # schedule.add_job(self.delete_document_extract,"cron",hour="*/5")
         # schedule.add_job(self.monitor_listener,"cron",minute="*/5")
         schedule.start()