浏览代码

增加漏数据及修复数据;附件处理时如果历史已经处理过,则跳过

luojiehua 2 年之前
父节点
当前提交
2ddc0bf313
共有 1 个文件被更改,包括 25 次插入28 次删除
  1. 25 28
      BaseDataMaintenance/maintenance/dataflow_mq.py

+ 25 - 28
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -164,44 +164,41 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
             _succeed,list_html,swf_urls = self.rec_attachments_by_interface(list_attach,_dochtmlcon,save=True)
 
 
-            if item.get("docid")<280749583:
-                send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(dtmp.getProperties(),cls=MyEncoder),self.mq_extract)
-                if send_succeed:
-                    _to_ack = True
-            else:
 
-                _to_ack = False
-                if not _succeed and _retry_times<self.retry_times:
-                    item[document_tmp_status] = random.randint(*flow_attachment_status_failed_to)
-                    item["retry_times"] = _retry_times+1
-                    #失败次数大于5次就放入失败队列,此队列的数据会在空闲时间重新处理一次
-                    if item["retry_times"]>=self.retry_times:
-                        send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment_failed)
 
-                    send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment)
+            _to_ack = False
+            if not _succeed and _retry_times<self.retry_times:
+                item[document_tmp_status] = random.randint(*flow_attachment_status_failed_to)
+                item["retry_times"] = _retry_times+1
+                #失败次数大于5次就放入失败队列,此队列的数据会在空闲时间重新处理一次
+                if item["retry_times"]>=self.retry_times:
+                    send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment_failed)
 
-                    #失败保存
+                send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,cls=MyEncoder,ensure_ascii=False),self.mq_attachment)
+
+                #失败保存
+                if _retry_times==0:
                     dtmp.setValue(document_tmp_dochtmlcon,"",False)
                     dtmp.setValue(document_tmp_status,0,True)
                     if not dtmp.exists_row(self.ots_client):
                         dtmp.update_row(self.ots_client)
                         dhtml.update_row(self.ots_client)
-                    if send_succeed:
-                        _to_ack = True
+                if send_succeed:
+                    _to_ack = True
 
-                else:
-                    try:
-                        log("docid:%d,retry:%d swf_urls:%s list_html:%s"%(dhtml.getProperties().get(document_docid),_retry_times,str(swf_urls),str(len(list_html))))
-                        dhtml.updateSWFImages(swf_urls)
-                        dhtml.updateAttachment(list_html)
+            else:
+                try:
+                    log("docid:%d,retry:%d swf_urls:%s list_html:%s"%(dhtml.getProperties().get(document_docid),_retry_times,str(swf_urls),str(len(list_html))))
+                    dhtml.updateSWFImages(swf_urls)
+                    dhtml.updateAttachment(list_html)
 
-                        dtmp.setValue(document_tmp_attachment_extract_status,1,True)
-                        dtmp.setValue(document_tmp_dochtmlcon,dhtml.getProperties().get(document_tmp_dochtmlcon),True)
-                        send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(dtmp.getProperties(),cls=MyEncoder),self.mq_extract)
-                        if send_succeed:
-                            _to_ack = True
-                    except Exception as e:
-                        traceback.print_exc()
+                    dtmp.setValue(document_tmp_attachment_extract_status,1,True)
+                    dtmp.setValue(document_tmp_dochtmlcon,dhtml.getProperties().get(document_tmp_dochtmlcon),True)
+                    send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(dtmp.getProperties(),cls=MyEncoder),self.mq_extract)
+                    if send_succeed:
+                        _to_ack = True
+                except Exception as e:
+                    traceback.print_exc()
 
             if _to_ack:
                 ackMsg(conn,message_id)