|
@@ -818,10 +818,32 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
|
|
|
ackMsg(conn,message_id,subscription)
|
|
|
|
|
|
|
|
|
+ def delete_document_extract(self,save_count=50*10000):
|
|
|
+ conn = self.pool_postgres.getConnector()
|
|
|
+ try:
|
|
|
+ cursor = conn.cursor()
|
|
|
+ sql = " select max(docid),min(docid) from document_extract "
|
|
|
+ cursor.execute(sql)
|
|
|
+ rows = cursor.fetchall()
|
|
|
+ if len(rows)>0:
|
|
|
+ maxdocid,mindocid = rows[0]
|
|
|
+
|
|
|
+ d_mindocid = int(maxdocid)-save_count
|
|
|
+ if mindocid<d_mindocid:
|
|
|
+ sql = " delete from document_extract where docid<%d"%d_mindocid
|
|
|
+ cursor.execute(sql)
|
|
|
+ conn.commit()
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ traceback.print_exc()
|
|
|
+ finally:
|
|
|
+ self.pool_postgres.putConnector(conn)
|
|
|
+
|
|
|
def start_flow_extract(self):
|
|
|
schedule = BlockingScheduler()
|
|
|
schedule.add_job(self.flow_extract_producer,"cron",second="*/20")
|
|
|
- schedule.add_job(self.process_extract_failed,"cron",hour="20")
|
|
|
+ schedule.add_job(self.process_extract_failed,"cron",hour="*/5")
|
|
|
+ schedule.add_job(self.delete_document_extract,"cron",hour="*/5")
|
|
|
schedule.start()
|
|
|
|
|
|
from multiprocessing import RLock
|