remove_dump.py 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. from queue import Queue
  2. from BaseDataMaintenance.dataSource.source import getConnect_ots
  3. from tablestore import *
  4. from BaseDataMaintenance.model.ots.Preproject import *
  5. from BaseDataMaintenance.model.ots.Preproject_dump import *
  6. from BaseDataMaintenance.common.Utils import *
  7. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  8. from apscheduler.schedulers.blocking import BlockingScheduler
  9. def drop_dump_data():
  10. def drop_data(item,result_queue):
  11. preproject_dump = Preproject_dump(item)
  12. preproject = Preproject(item)
  13. if preproject.delete_row(ots_client):
  14. preproject_dump.delete_row(ots_client)
  15. task_queue = Queue()
  16. ots_client = getConnect_ots()
  17. bool_query = BoolQuery(must_queries=[ExistsQuery(preproject_tenderee)])
  18. rows,next_token,total_count,is_all_succeed = ots_client.search("preproject_dump","preproject_dump_index",
  19. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(preproject_tenderee)]),get_total_count=True,limit=100),
  20. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  21. list_data = getRow_ots(rows)
  22. log("to drop preproject dump data:%d"%total_count)
  23. for _data in list_data:
  24. task_queue.put(_data)
  25. mt = MultiThreadHandler(task_queue,drop_data,None,30)
  26. mt.run()
  27. while next_token:
  28. rows,next_token,total_count,is_all_succeed = ots_client.search("preproject_dump","preproject_dump_index",
  29. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  30. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  31. list_data = getRow_ots(rows)
  32. for _data in list_data:
  33. task_queue.put(_data)
  34. mt.run()
  35. def start_drop_preproject_dump():
  36. scheduler = BlockingScheduler()
  37. scheduler.add_job(drop_dump_data,"cron",minute="*/1")
  38. scheduler.start()
  39. if __name__ == '__main__':
  40. drop_dump_data()
  41. # start_drop_preproject_dump()