1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950 |
- from queue import Queue
- from BaseDataMaintenance.dataSource.source import getConnect_ots
- from tablestore import *
- from BaseDataMaintenance.model.ots.Preproject import *
- from BaseDataMaintenance.model.ots.Preproject_dump import *
- from BaseDataMaintenance.common.Utils import *
- from BaseDataMaintenance.common.multiThread import MultiThreadHandler
- from apscheduler.schedulers.blocking import BlockingScheduler
- def drop_dump_data():
- def drop_data(item,result_queue):
- preproject_dump = Preproject_dump(item)
- preproject = Preproject(item)
- if preproject.delete_row(ots_client):
- preproject_dump.delete_row(ots_client)
- task_queue = Queue()
- ots_client = getConnect_ots()
- bool_query = BoolQuery(must_queries=[ExistsQuery(preproject_tenderee)])
- rows,next_token,total_count,is_all_succeed = ots_client.search("preproject_dump","preproject_dump_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(preproject_tenderee)]),get_total_count=True,limit=100),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
- list_data = getRow_ots(rows)
- log("to drop preproject dump data:%d"%total_count)
- for _data in list_data:
- task_queue.put(_data)
- mt = MultiThreadHandler(task_queue,drop_data,None,30)
- mt.run()
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search("preproject_dump","preproject_dump_index",
- SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
- list_data = getRow_ots(rows)
- for _data in list_data:
- task_queue.put(_data)
- mt.run()
- def start_drop_preproject_dump():
- scheduler = BlockingScheduler()
- scheduler.add_job(drop_dump_data,"cron",minute="*/1")
- scheduler.start()
- if __name__ == '__main__':
- drop_dump_data()
- # start_drop_preproject_dump()
|