Browse Source

增加对tmp表数据的重新处理

luojiehua 2 years ago
parent
commit
26ae8aa433
1 changed files with 61 additions and 0 deletions
  1. 61 0
      BaseDataMaintenance/maintenance/dataflow_mq.py

+ 61 - 0
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -1151,6 +1151,66 @@ class Dataflow_init(Dataflow):
         except Exception as e:
             traceback.print_exc()
 
+    def otstmp2mq(self):
+        try:
+            bool_query = BoolQuery(must_queries=[TermQuery("status",0)])
+
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
+                                                                                SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100),
+                                                                                ColumnsToGet(return_type=ColumnReturnType.ALL))
+            list_data = getRow_ots(rows)
+            for _data in list_data:
+                _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
+                      document_tmp_docid:_data.get(document_tmp_docid),
+                      document_tmp_status:0}
+                _document = Document(_d)
+                page_attachments = _data.get(document_tmp_attachment_path,"[]")
+
+                _document_html = Document_html(_data)
+                _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True)
+
+                if page_attachments!="[]":
+                    status = random.randint(1,10)
+                    _data[document_tmp_status] = status
+                    send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
+                else:
+                    status = random.randint(11,50)
+                    _data[document_tmp_status] = status
+                    send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
+                if send_succeed:
+                    _document.update_row(self.ots_client)
+                else:
+                    log("send_msg_error2222")
+            while next_token:
+                rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
+                                                                                    SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
+                                                                                    ColumnsToGet(return_type=ColumnReturnType.ALL))
+                list_data = getRow_ots(rows)
+                for _data in list_data:
+                    _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
+                          document_tmp_docid:_data.get(document_tmp_docid),
+                          document_tmp_status:0}
+                    _document = Document(_d)
+                    page_attachments = _data.get(document_tmp_attachment_path,"[]")
+
+                    _document_html = Document_html(_data)
+                    _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True)
+
+                    if page_attachments!="[]":
+                        status = random.randint(1,10)
+                        _data[document_tmp_status] = status
+                        send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
+                    else:
+                        status = random.randint(11,50)
+                        _data[document_tmp_status] = status
+                        send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
+                    if send_succeed:
+                        _document.update_row(self.ots_client)
+                    else:
+                        log("send_msg_error2222")
+        except Exception as e:
+            traceback.print_exc()
+
 
     def test_dump_docid(self):
         class TestDumpListener(ActiveMQListener):
@@ -1210,6 +1270,7 @@ class Dataflow_init(Dataflow):
         schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoDaYiTemp({}),),second="*/10")
         schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoWenJianTemp({}),),second="*/10")
         schedule.add_job(self.ots2mq,"cron",second="*/10")
+        schedule.add_job(self.otstmp2mq,"cron",second="*/10")
         schedule.add_job(self.monitor_listener,"cron",minute="*/1")
         schedule.start()