|
@@ -2,7 +2,7 @@
|
|
|
|
|
|
from BaseDataMaintenance.maintenance.dataflow import *
|
|
|
from BaseDataMaintenance.common.activateMQUtils import *
|
|
|
-from BaseDataMaintenance.dataSource.source import getConnect_activateMQ,getConnection_postgres,getConnection_mysql,getConnection_oracle,getConnect_ots_capacity
|
|
|
+from BaseDataMaintenance.dataSource.source import getConnect_activateMQ,getConnection_postgres,getConnection_mysql,getConnection_oracle,getConnect_ots_capacity,getConnect_redis_doc
|
|
|
from BaseDataMaintenance.dataSource.setttings import *
|
|
|
|
|
|
from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
|
|
@@ -583,6 +583,7 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
|
|
|
self.list_extract_comsumer.append(listener_extract)
|
|
|
|
|
|
self.pool_postgres = ConnectorPool(10,self.comsumer_count,getConnection_postgres)
|
|
|
+ self.pool_redis_doc = ConnectorPool(10,self.comsumer_count,getConnect_redis_doc)
|
|
|
self.conn_mq = getConnect_activateMQ()
|
|
|
self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
|
|
|
|
|
@@ -685,6 +686,41 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
|
|
|
document_extract_extract_json:extract_json})
|
|
|
_de.insert_row(self.pool_postgres,1)
|
|
|
|
|
|
+ def getExtract_json_fromRedis(self,_fingerprint):
|
|
|
+ db = self.pool_redis_doc.getConnector()
|
|
|
+ try:
|
|
|
+ _extract_json = db.get(_fingerprint)
|
|
|
+ return _extract_json
|
|
|
+ except Exception as e:
|
|
|
+ log("getExtract_json_fromRedis error %s"%(str(e)))
|
|
|
+ finally:
|
|
|
+ try:
|
|
|
+
|
|
|
+ if db.connection.check_health():
|
|
|
+ self.pool_redis_doc.putConnector(db)
|
|
|
+ except Exception as e:
|
|
|
+ pass
|
|
|
+ return None
|
|
|
+
|
|
|
+ def putExtract_json_toRedis(self,fingerprint,extract_json):
|
|
|
+
|
|
|
+ db = self.pool_redis_doc.getConnector()
|
|
|
+ try:
|
|
|
+ _extract_json = db.set(str(fingerprint),extract_json)
|
|
|
+ db.expire(fingerprint,3600*2)
|
|
|
+ return _extract_json
|
|
|
+ except Exception as e:
|
|
|
+ log("putExtract_json_toRedis error%s"%(str(e)))
|
|
|
+ traceback.print_exc()
|
|
|
+ finally:
|
|
|
+ try:
|
|
|
+
|
|
|
+ if db.connection.check_health():
|
|
|
+ self.pool_redis_doc.putConnector(db)
|
|
|
+ except Exception as e:
|
|
|
+ pass
|
|
|
+
|
|
|
+
|
|
|
|
|
|
|
|
|
def comsumer_handle(self,_dict,result_queue):
|
|
@@ -734,7 +770,8 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
|
|
|
if all_done>0:
|
|
|
_time = time.time()
|
|
|
extract_json = self.getExtract_json_fromDB(_fingerprint)
|
|
|
- log("get json from db takes %.2f"%(time.time()-_time))
|
|
|
+ # extract_json = self.getExtract_json_fromRedis(_fingerprint)
|
|
|
+ log("get json from db takes %.4f"%(time.time()-_time))
|
|
|
# extract_json = None
|
|
|
_docid = int(data["doc_id"])
|
|
|
if extract_json is not None:
|