Эх сурвалжийг харах

修复线上mq流程报错的问题

luojiehua 11 сар өмнө
parent
commit
09451ba0f9

+ 1 - 1
.idea/misc.xml

@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <project version="4">
-  <component name="ProjectRootManager" version="2" languageLevel="JDK_13" default="false" project-jdk-name="Python 3.7 (py37)" project-jdk-type="Python SDK">
+  <component name="ProjectRootManager" version="2" languageLevel="JDK_13" project-jdk-name="Python 3.7 (py37)" project-jdk-type="Python SDK">
     <output url="file://$PROJECT_DIR$/out" />
   </component>
 </project>

+ 3 - 9
BaseDataMaintenance/common/activateMQUtils.py

@@ -15,18 +15,12 @@ def send_msg_toacmq(pool_conn,msg,dest,retry_times=5):
         conn = pool_conn.getConnector()
         try:
             conn.send(body=str(msg), destination=dest, persistent='false')
+            pool_conn.putConnector(conn)
             return True
         except Exception as e:
             traceback.print_exc()
-            try:
-                conn.disconnect()
-            except Exception as e:
-                pass
-        finally:
-            if conn.is_connected():
-                pool_conn.putConnector(conn)
-            else:
-                del conn
+            time.sleep(2)
+            del conn
     return False
 
 class MyListener(object):

+ 4 - 3
BaseDataMaintenance/common/documentFingerprint.py

@@ -1,4 +1,4 @@
-
+#coding:utf8
 
 import hashlib
 import codecs
@@ -47,6 +47,7 @@ def getFingerprint(sourceHtml):
     return _fingerprint
 
 if __name__=="__main__":
-    sourceHtml = codecs.open("C:\\Users\\\Administrator\\Desktop\\2.html","rb",encoding="utf8").read()
-    # sourceHtml = "abcddafafffffffffffffffffffffffff你"
+    # sourceHtml = codecs.open("C:\\Users\\\Administrator\\Desktop\\2.html","rb",encoding="utf8").read()
+    sourceHtml = "天全县农村敬老院护理能力提升改造项目初步设计及概算审批公示"+'<div> <div> <div> 天全县农村敬老院护理能力提升改造项目初步设计及概算审批公示 </div> <div> <div> <p>一、办理事项:天全县农村敬老院护理能力提升改造项目初步设计及概算审批</p> <p>二、项目业主:天全县民政局</p> <p>三、项目代码:2107-511825-04-01-642123</p> <p>四、办理状态:办结。</p> <p>五、办理时间:2024年5月14日</p> </div> </div> </div> </div>'
+    sourceHtml = "天全县农村敬老院护理能力提升改造项目初步设计及概算审批公示"+'审批项目'
     print(getFingerprint(sourceHtml))

+ 4 - 4
BaseDataMaintenance/dataMonitor/data_monitor.py

@@ -284,7 +284,7 @@ class BaseDataMonitor():
                 for k,v in dict_type.items():
                     process_count += v.get("success",0)+v.get("fail",0)
                     process_succeed_count += v.get("success",0)
-                    _msg_type += "\n类型%s\n\t成功%s,消耗%s秒,%.2f秒/个,\n\t失败%s,消耗%s秒,%.2f秒/个,\n\t下载%s,消耗%s秒,%.2f秒/个,\n\t上传%s,消耗%s秒,%.2f秒/个"%(k,str(v.get("success")),str(v.get("success_costtime")),v.get("success_costtime")/max(1,v.get("success")),str(v.get("fail")),str(v.get("fail_costtime")),v.get("fail_costtime")/max(1,v.get("fail")),str(v.get("downcount")),str(v.get("downtime")),v.get("downtime")/max(1,v.get("downcount")),str(v.get("uploadcount")),str(v.get("uploadtime")),v.get("uploadtime")/max(1,v.get("uploadcount")))
+                    _msg_type += "\n类型%s\n\t成功%s,消耗%s秒,%.2f秒/个,\n\t失败%s,消耗%s秒,%.2f秒/个,\n\t下载%s,消耗%.2f秒,%.2f秒/个,\n\t上传%s,消耗%.2f秒,%.2f秒/个"%(k,str(v.get("success")),str(v.get("success_costtime")),v.get("success_costtime")/max(1,v.get("success")),str(v.get("fail")),str(v.get("fail_costtime")),v.get("fail_costtime")/max(1,v.get("fail")),str(v.get("downcount")),v.get("downtime"),v.get("downtime")/max(1,v.get("downcount")),str(v.get("uploadcount")),v.get("uploadtime"),v.get("uploadtime")/max(1,v.get("uploadcount")))
 
                 _msg = "附件提取队列报警:队列堆积%s条公告,最近十分钟处理公告附件数:%s,处理成功数:%s"%(str(total_count_todeal),str(process_count),str(process_succeed_count))
                 log(_msg)
@@ -313,7 +313,7 @@ class BaseDataMonitor():
 
             total_count_todeal = getQueueSize("dataflow_extract")
 
-            if total_count_todeal>500:
+            if total_count_todeal>5000:
                 _cmd = 'cat %s | grep "%s" | grep -c "process.*docid"'%(flow_extract_log_path,self.get_last_tenmin_time())
                 log(_cmd)
                 process_count = self.cmd_execute(_cmd)
@@ -407,7 +407,7 @@ class BaseDataMonitor():
                                                                             SearchQuery(query,None,True),
                                                                             columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
 
-        if total_count>=200:
+        if total_count>=2000:
             _msg = "数据流报警:待同步到成品表公告数为:%d"%(total_count)
             sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
             # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
@@ -632,7 +632,7 @@ class BaseDataMonitor():
         # scheduler.add_job(self.monitor_dumplicate,"cron",minute="*/10")
         scheduler.add_job(self.monitor_sychr,"cron",minute="*/10")
         scheduler.add_job(self.monitor_preproject,"cron",hour="8")
-        scheduler.add_job(self.monitor_merge,"cron",minute="*/60")
+        scheduler.add_job(self.monitor_merge,"cron",hour="*/2")
         scheduler.add_job(self.monitor_init,"cron",hour="*/3")
         scheduler.start()
 

+ 1 - 1
BaseDataMaintenance/maintenance/dataflow.py

@@ -4413,7 +4413,7 @@ if __name__ == '__main__':
     # test_attachment_interface()
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
-    df_dump.test_dumplicate(455485514
+    df_dump.test_dumplicate(483183339
                             )
     # compare_dumplicate_check()
     # df_dump.test_merge([391898061

+ 58 - 28
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -108,7 +108,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
 
     def start_attachment_listener(self):
         for _i in range(self.comsumer_count):
-            listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,_i)
+            listener_attachment = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler ,_i)
             createComsumer(listener_attachment,self.mq_attachment)
             self.list_attachment_comsumer.append(listener_attachment)
 
@@ -254,11 +254,15 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
         '''
 
         try:
+            start_time = time.time()
+
             item = _dict.get("item")
             list_attach = _dict.get("list_attach")
             conn = _dict["conn"]
             message_id = _dict.get("message_id")
 
+            if "retry_times" not in item:
+                item["retry_times"] = 5
             _retry_times = item.get("retry_times",0)
             dhtml = Document_html({"partitionkey":item.get("partitionkey"),
                                    "docid":item.get("docid")})
@@ -269,7 +273,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
             dtmp = Document_tmp(item)
 
 
-            start_time = time.time()
+
             #调用识别接口
             _succeed,list_html,swf_urls = self.rec_attachments_by_interface(list_attach,_dochtmlcon,save=True)
 
@@ -978,6 +982,8 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
     def comsumer_handle(self,_dict,result_queue):
         try:
             log("start handle")
+            data = {}
+
             frame = _dict["frame"]
             conn = _dict["conn"]
             message_id = frame.headers["message-id"]
@@ -1026,7 +1032,7 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
             _extract.setValue(document_extract2_docid,item.get(document_docid))
             all_done = 1
 
-            data = {}
+
             for k,v in item.items():
                 data[k] = v
             data["timeout"] = 440
@@ -1043,7 +1049,7 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
             data["web_source_name"] = item.get(document_tmp_web_source_name,"")
             data["original_docchannel"] = item.get(document_tmp_original_docchannel,"")
 
-            _fingerprint = getFingerprint(str(data["title"])+str(data["content"]))
+            _fingerprint = getFingerprint(str(data["title"])+str(data["content"]))+str(data["original_docchannel"])
 
             if all_done>0:
                 _time = time.time()
@@ -1078,6 +1084,7 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
             # if all_done>0 and len(_extract.getProperties().get(document_extract2_extract_json,""))<=2:
             #     all_done = -4
             _extract.setValue(document_extract2_industry_json,"{}",True)
+            _to_ack = True
             try:
                 if all_done!=1:
                     sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
@@ -1138,7 +1145,11 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
 
             if _to_ack:
                 ackMsg(conn,message_id,subscription)
-            log("process %s docid:%d %s"%(str(_to_ack),data["doc_id"],str(all_done)))
+            else:
+                item["extract_times"] -= 1
+                send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract)
+                ackMsg(conn,message_id,subscription)
+            log("process %s docid:%d %s"%(str(_to_ack),data.get("doc_id"),str(all_done)))
         except requests.ConnectionError as e1:
             item["extract_times"] -= 1
             if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
@@ -1146,7 +1157,7 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
         except Exception as e:
             traceback.print_exc()
             sentMsgToDD("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e)))
-            log("process %s docid: failed message_id:%s"%(data["doc_id"],message_id))
+            log("process %s docid: failed message_id:%s"%(data.get("doc_id"),message_id))
             if extract_times>=10:
                 #process as succeed
                 dtmp.setValue(document_tmp_dochtmlcon,"",False)
@@ -1379,7 +1390,7 @@ class Dataflow_init(Dataflow):
             traceback.print_exc()
             self.pool_oracle.decrease()
 
-    def shengpi2mq(self):
+    def shenpi2mq(self):
 
         conn_oracle = self.pool_oracle.getConnector()
 
@@ -1395,32 +1406,46 @@ class Dataflow_init(Dataflow):
                     if max_shenpi_id>self.base_shenpi_id:
                         max_shenpi_id -= self.base_shenpi_id
                     self.max_shenpi_id = max_shenpi_id
+
+                if self.max_shenpi_id<60383953:
+                    self.max_shenpi_id = 60383953
+
+
             if self.max_shenpi_id is not None:
                 # select data in order
-                list_data = T_SHEN_PI_XIANG_MU.select_rows(conn_oracle,self.max_shenpi_id,)
 
-                # send data to mq one by one with max_shenpi_id updated
-                for _data in list_data:
-                    _id = _data.getProperties().get(T_SHEN_PI_XIANG_MU_ID)
-
-                    ots_dict = _data.getProperties_ots()
-                    if ots_dict["docid"]<self.base_shenpi_id:
-                        ots_dict["docid"] += self.base_shenpi_id
-
-                    if ots_dict.get(T_SHEN_PI_XIANG_MU_PAGE_ATTACHMENTS,"") !='[]':
-                        if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_attachment):
-                            self.max_shenpi_id = _id
-                        else:
-                            log("sent shenpi message to mq failed %s"%(_id))
-                            break
-                    else:
-                        if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_extract):
-                            self.max_shenpi_id = _id
-                        else:
-                            log("sent shenpi message to mq failed %s"%(_id))
-                            break
+                origin_max_shenpi_id = T_SHEN_PI_XIANG_MU.get_max_id(conn_oracle)
+
+                if origin_max_shenpi_id is not None:
+                    log("shenpi origin_max_shenpi_id:%d current_id:%d"%(origin_max_shenpi_id,self.max_shenpi_id))
+                    for _id_i in range(self.max_shenpi_id+1,origin_max_shenpi_id+1):
+                        list_data = T_SHEN_PI_XIANG_MU.select_rows(conn_oracle,_id_i)
+
+                        # send data to mq one by one with max_shenpi_id updated
+                        for _data in list_data:
+
+                            _id = _data.getProperties().get(T_SHEN_PI_XIANG_MU_ID)
+
+                            ots_dict = _data.getProperties_ots()
+                            if ots_dict["docid"]<self.base_shenpi_id:
+                                ots_dict["docid"] += self.base_shenpi_id
+
+                            if ots_dict.get(T_SHEN_PI_XIANG_MU_PAGE_ATTACHMENTS,"") !='[]':
+                                if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_attachment):
+                                    self.max_shenpi_id = _id
+                                else:
+                                    log("sent shenpi message to mq failed %s"%(_id))
+                                    break
+                            else:
+                                if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_extract):
+                                    self.max_shenpi_id = _id
+                                else:
+                                    log("sent shenpi message to mq failed %s"%(_id))
+                                    break
+            self.pool_oracle.putConnector(conn_oracle)
 
         except Exception as e:
+            log("shenpi error")
             traceback.print_exc()
             self.pool_oracle.decrease()
 
@@ -1594,6 +1619,7 @@ class Dataflow_init(Dataflow):
         from BaseDataMaintenance.model.oracle.ZhaoBiaoDaYiTemp import ZhaoBiaoDaYiTemp
         from BaseDataMaintenance.model.oracle.ZhaoBiaoWenJianTemp import ZhaoBiaoWenJianTemp
         schedule = BlockingScheduler()
+
         schedule.add_job(self.temp2mq,"cron",args=(CaiGouYiXiangTemp({}),),second="*/10")
         schedule.add_job(self.temp2mq,"cron",args=(PaiMaiChuRangTemp({}),),second="*/10")
         schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoGongGaoTemp({}),),second="*/10")
@@ -1609,11 +1635,15 @@ class Dataflow_init(Dataflow):
         schedule.add_job(self.ots2mq,"cron",second="*/10")
         schedule.add_job(self.otstmp2mq,"cron",second="*/10")
         schedule.add_job(self.monitor_listener,"cron",minute="*/1")
+
+        schedule.add_job(self.shenpi2mq,"cron",minute="*/1")
         schedule.start()
 
 
 
 
+
+
 def transform_attachment():
     from BaseDataMaintenance.model.ots.attachment import attachment
     from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres

+ 36 - 10
BaseDataMaintenance/model/oracle/T_SHEN_PI_XIANG_MU.py

@@ -41,6 +41,8 @@ class T_SHEN_PI_XIANG_MU(BaseModel):
     def getProperties_ots(self):
         new_dict = {}
         for k,v in self.__dict__.items():
+            if k=="all_columns":
+                continue
             if v is not None:
                 if isinstance(v,(str,int,float)):
                     pass
@@ -52,12 +54,18 @@ class T_SHEN_PI_XIANG_MU(BaseModel):
         docid = int(new_dict.get("id",0))
         partition_key = docid%500+1
 
-        new_dict["partition_key"] = partition_key
+        new_dict["partitionkey"] = partition_key
         new_dict["docid"] = docid
         new_dict["original_id"] = str(new_dict.get(T_SHEN_PI_XIANG_MU_ID))
+        new_dict["uuid"] = str(new_dict.get(T_SHEN_PI_XIANG_MU_ID))
         new_dict.pop(T_SHEN_PI_XIANG_MU_ID)
 
-        new_dict["uuid"] = str(new_dict.get(T_SHEN_PI_XIANG_MU_ID))
+        try:
+            new_dict[T_SHEN_PI_XIANG_MU_SOURCE_STAGE] = int(new_dict.get(T_SHEN_PI_XIANG_MU_SOURCE_STAGE,0))
+            new_dict[T_SHEN_PI_XIANG_MU_SOURCE_TYPE] = int(new_dict.get(T_SHEN_PI_XIANG_MU_SOURCE_TYPE,0))
+        except Exception as e:
+            pass
+
 
         new_dict["crtime"] = new_dict.get(T_SHEN_PI_XIANG_MU_CREATE_TIME)
         new_dict["docchannel"] = 302
@@ -65,11 +73,13 @@ class T_SHEN_PI_XIANG_MU(BaseModel):
         new_dict["doctitle"] = new_dict.get(T_SHEN_PI_XIANG_MU_PAGE_TITLE,"")
         new_dict.pop(T_SHEN_PI_XIANG_MU_PAGE_TITLE)
 
-        new_dict["dochtmlcon"] = new_dict.get(T_SHEN_PI_XIANG_MU_PAGE_CONTENT)
-        new_dict.pop(T_SHEN_PI_XIANG_MU_PAGE_CONTENT)
+        new_dict["dochtmlcon"] = new_dict.get(T_SHEN_PI_XIANG_MU_PAGE_CONTENT,"")
+        if T_SHEN_PI_XIANG_MU_PAGE_CONTENT in new_dict:
+            new_dict.pop(T_SHEN_PI_XIANG_MU_PAGE_CONTENT)
 
-        new_dict["detail_link"] = new_dict.get(T_SHEN_PI_XIANG_MU_DETAILLINK)
-        new_dict.pop(T_SHEN_PI_XIANG_MU_DETAILLINK)
+        new_dict["detail_link"] = new_dict.get(T_SHEN_PI_XIANG_MU_DETAILLINK,"")
+        if T_SHEN_PI_XIANG_MU_DETAILLINK in new_dict:
+            new_dict.pop(T_SHEN_PI_XIANG_MU_DETAILLINK)
 
         new_dict[T_SHEN_PI_XIANG_MU_PAGE_ATTACHMENTS] = new_dict.get(T_SHEN_PI_XIANG_MU_ATTACHMENT_PATH,"[]")
 
@@ -81,15 +91,31 @@ class T_SHEN_PI_XIANG_MU(BaseModel):
             new_dict["original_docchannel"] = new_dict["docchannel"]
         return new_dict
 
-    def select_rows(conn,max_shenpi_id,limit=500):
+    @staticmethod
+    def get_max_id(conn):
+        cursor = conn.cursor()
+        sql = "select max(id) from %s"%("bxkc.t_shen_pi_xiang_mu_new")
+
+        cursor.execute(sql)
+        rows = cursor.fetchall()
+
+        if len(rows)>0:
+            max_id = rows[0][0]
+            log("selext_max_id:%d"%(max_id))
+            return max_id
+        return None
+
+
+    @staticmethod
+    def select_rows(conn,_id,limit=500):
         list_result = []
         s_limit = ""
         if limit is not None:
             s_limit = "limit %d"%limit
-        s_where = " where id>%d "%(max_shenpi_id)
+        s_where = " where id=%d "%(_id)
 
         cursor = conn.cursor()
-        sql = "select %s from %s %s %s order by id asc"%("*","t_shen_pi_xiang_mu_new",s_where,s_limit)
+        sql = "select %s from %s %s "%("*","bxkc.t_shen_pi_xiang_mu_new",s_where)
         log("select rows:%s"%(sql))
         cursor.execute(sql)
 
@@ -98,7 +124,7 @@ class T_SHEN_PI_XIANG_MU(BaseModel):
         for row in rows:
             _dict = {}
             for _vol,_val in zip(vol,row):
-                _name = _vol[0]
+                _name = str(_vol[0]).lower()
                 _dict[_name] = _val
             list_result.append(T_SHEN_PI_XIANG_MU(_dict))
         return list_result

+ 3 - 0
BaseDataMaintenance/model/ots/BaseModel.py

@@ -20,9 +20,12 @@ class BaseModel():
         raise NotImplementedError
 
     def setValue(self,k,v,isColumn=True):
+        if k=="all_columns":
+            return
         if "all_columns" not in self.__dict__ or not isinstance(self.__dict__["all_columns"],(list)):
             self.all_columns = []
         self.__dict__[k] = v
+
         if isColumn:
             if k not in (set(self.all_columns)):
                 self.all_columns.append(k)

+ 19 - 17
BaseDataMaintenance/model/ots/document_tmp.py

@@ -254,6 +254,7 @@ def turn_document_tmp_status():
     ots_client = getConnect_ots()
 
     def producer1(task_queue,ots_client):
+        a = ''
         for l_a in a.split("\n"):
             l_a = l_a.strip()
             if l_a !="":
@@ -266,8 +267,8 @@ def turn_document_tmp_status():
         bool_query = BoolQuery(
             must_queries=[
                 # TermQuery("fingerprint","md5=2cc044b81ec13acddcc970b71b780365")
-                TermQuery("save",1),
-                RangeQuery("status",72),
+                # TermQuery("save",66),
+                RangeQuery("status",66),
                 # BoolQuery(should_queries=[
                 #                           # TermQuery("tenderee","山西利民工业有限责任公司"),
                 #                           # MatchPhraseQuery("doctitle","中国电信"),
@@ -280,11 +281,11 @@ def turn_document_tmp_status():
                 #                                  ]
                 # )
             ],
-            # must_not_queries=[
-            #     TermQuery("docid",288599518)
-            #     # ExistsQuery("status"),
-            #     # ExistsQuery("page_time"),
-            #                   ]
+            must_not_queries=[
+                # TermQuery("docid",288599518)
+                ExistsQuery("doctitle"),
+                # ExistsQuery("page_time"),
+                              ]
         )
 
         rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
@@ -297,6 +298,7 @@ def turn_document_tmp_status():
         for _data in list_data:
             _document = Document_tmp(_data)
             task_queue.put(_document)
+        print(list_data)
         while next_token:
             rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
                                                                            SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
@@ -355,16 +357,16 @@ def turn_document_tmp_status():
         # print(item.getProperties())
         # item.update_row(ots_client)
         # log("update %d status done"%(item.getProperties().get(document_tmp_docid)))
-        # item.delete_row(ots_client)
-        from BaseDataMaintenance.model.ots.document import Document
-
-        Doc = Document(item.getProperties())
-        if Doc.fix_columns(ots_client,["status"],True):
-            if Doc.getProperties().get("status",0)>=401:
-                print(Doc.getProperties().get("docid"),"redo")
-                item.setValue("status",66,True)
-                item.update_row(ots_client)
-        pass
+        item.delete_row(ots_client)
+        # from BaseDataMaintenance.model.ots.document import Document
+        #
+        # Doc = Document(item.getProperties())
+        # if Doc.fix_columns(ots_client,["status"],True):
+        #     if Doc.getProperties().get("status",0)>=401:
+        #         print(Doc.getProperties().get("docid"),"redo")
+        #         item.setValue("status",66,True)
+        #         item.update_row(ots_client)
+        # pass
 
     t_producer = Thread(target=producer,kwargs={"task_queue":task_queue,"ots_client":ots_client})
     t_producer.start()