瀏覽代碼

增加提取失败队列,提取重试2次

luojiehua 2 年之前
父節點
當前提交
20a81901bc

+ 49 - 12
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -525,6 +525,7 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
 
 
         self.mq_extract = "/queue/dataflow_extract"
+        self.mq_extract_failed = "/queue/dataflow_extract_failed"
 
         whole_weight = 0
         for _url,weight in self.extract_interfaces:
@@ -587,6 +588,9 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
             dhtml = Document_html({"partitionkey":item.get("partitionkey"),
                                    "docid":item.get("docid")})
 
+            extract_times = item.get("extract_times",0)+1
+            item["extract_times"] = extract_times
+
             _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
 
             if len(_dochtmlcon)>200000:
@@ -633,19 +637,35 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
             try:
                 if all_done!=1:
                     sentMsgToDD("要素提取失败:docid:%d with result:%d"%(item.get(document_tmp_docid),all_done))
-                    send_succeed = send_msg_toacmq(self.pool_mq,frame.body,self.mq_extract)
+                    if extract_times>2:
+                        #transform to the extract_failed queue
+                        if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed):
+                            #process as succeed
+                            dtmp.setValue(document_tmp_dochtmlcon,"",False)
+                            dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
+                            dtmp.update_row(self.ots_client)
+                            dhtml.update_row(self.ots_client)
+
+                            #replace as {}
+                            _extract.setValue(document_extract2_extract_json,"{}",True)
+                            _extract.setValue(document_extract2_industry_json,"{}",True)
+                            _extract.setValue(document_extract2_status,random.randint(1,50),True)
+                            _extract.update_row(self.ots_client)
+                            _to_ack = True
+                    else:
 
+                        send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract)
 
-                    #失败保存
-                    dtmp.setValue(document_tmp_dochtmlcon,"",False)
-                    dtmp.setValue(document_tmp_status,60,True)
-                    if not dtmp.exists_row(self.ots_client):
-                        dtmp.update_row(self.ots_client)
-                        dhtml.update_row(self.ots_client)
-                    if send_succeed:
-                        _to_ack = True
+                        #失败保存
+                        dtmp.setValue(document_tmp_dochtmlcon,"",False)
+                        dtmp.setValue(document_tmp_status,60,True)
+                        if not dtmp.exists_row(self.ots_client):
+                            dtmp.update_row(self.ots_client)
+                            dhtml.update_row(self.ots_client)
+                        if send_succeed:
+                            _to_ack = True
                 else:
-
+                    #process succeed
                     dtmp.setValue(document_tmp_dochtmlcon,"",False)
                     dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
                     dtmp.update_row(self.ots_client)
@@ -661,9 +681,26 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
             log("process %s docid:%d %s"%(str(_to_ack),data["doc_id"],str(all_done)))
         except Exception as e:
             traceback.print_exc()
+            sentMsgToDD("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e)))
             log("process %s docid: failed message_id:%s"%(data["doc_id"],message_id))
-            if send_msg_toacmq(self.pool_mq,frame.body,self.mq_extract):
-                ackMsg(conn,message_id,subscription)
+            if extract_times>2:
+                #transform to the extract_failed queue
+                if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract_failed):
+                    #process as succeed
+                    dtmp.setValue(document_tmp_dochtmlcon,"",False)
+                    dtmp.setValue(document_tmp_status,random.randint(*flow_extract_status_succeed_to),True)
+                    dtmp.update_row(self.ots_client)
+                    dhtml.update_row(self.ots_client)
+                    #replace as {}
+                    _extract.setValue(document_extract2_extract_json,"{}",True)
+                    _extract.setValue(document_extract2_industry_json,"{}",True)
+                    _extract.setValue(document_extract2_status,random.randint(1,50),True)
+                    _extract.update_row(self.ots_client)
+                    ackMsg(conn,message_id,subscription)
+            else:
+                #transform to the extract queue
+                if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
+                    ackMsg(conn,message_id,subscription)
 
 
     def start_flow_extract(self):

+ 80 - 0
BaseDataMaintenance/maintenance/document_extract/fixDocFromTmp.py

@@ -0,0 +1,80 @@
+
+
+from tablestore import *
+from BaseDataMaintenance.dataSource.source import getConnect_ots,getConnect_ots_capacity
+
+from BaseDataMaintenance.model.ots.document import Document
+from BaseDataMaintenance.model.ots.document_tmp import Document_tmp
+from BaseDataMaintenance.model.ots.document_html import Document_html
+
+from BaseDataMaintenance.common.Utils import *
+from BaseDataMaintenance.common.multiThread import MultiThreadHandler
+
+from queue import Queue
+
+def fixDoc():
+    bool_query = BoolQuery(must_queries=[RangeQuery("crtime","2022-08-02 17:00:00","2022-08-02 18:00:00")])
+    ots_client =getConnect_ots()
+    capacity_client =getConnect_ots_capacity()
+    rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
+                                                                   SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
+                                                                   columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
+    task_queue = Queue()
+    list_data = getRow_ots(rows)
+
+    for _data in list_data:
+        task_queue.put(_data)
+    while next_token:
+        rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
+                                                                       SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
+                                                                       columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
+        list_data = getRow_ots(rows)
+
+        for _data in list_data:
+            task_queue.put(_data)
+        # if task_queue.qsize()>4000:
+        #     break
+        log("%d/%d"%(task_queue.qsize(),total_count))
+
+    def _handle(item,result_queue):
+        docid = item.get("docid")
+        if "all_columns" in item:
+            item.pop("all_columns")
+        _d = Document(item)
+        _exists = _d.exists_row(ots_client)
+
+        if not _exists:
+            _d.setValue("status",1,True)
+            _d_html = Document_html({"partitionkey":_d.getProperties().get("partitionkey"),
+                                "docid":_d.getProperties().get("docid")})
+            _d_html.fix_columns(ots_client,["dochtmlcon"],True)
+
+            _dict_d = _d.getProperties()
+            listk = []
+            for k,v in _dict_d.items():
+                if k in ("COLUMN_MAX_SIZE","all_columns","table_name","prefixs"):
+                    listk.append(k)
+            for k in listk:
+                _dict_d.pop(k)
+
+            listk = []
+            _dict_html = _d_html.getProperties()
+            for k,v in _dict_html.items():
+                if k not in ("partitionkey","docid","dochtmlcon"):
+                    listk.append(k)
+            for k in listk:
+                _dict_html.pop(k)
+
+            _d1 = Document(_dict_d)
+            _d1.update_row(ots_client)
+            _d2 = Document(_dict_html)
+            _d2.update_row(capacity_client)
+
+            log("docid:%s not exists"%(str(docid)))
+
+    mt = MultiThreadHandler(task_queue,_handle,None,30)
+    mt.run()
+
+
+if __name__ == '__main__':
+    fixDoc()

+ 6 - 6
BaseDataMaintenance/maintenance/proposedBuilding/DataSynchronization.py

@@ -96,13 +96,13 @@ class DataSynchronization():
 
         task_queue = queue.Queue()
 
-        self.producer(task_queue)
+        # self.producer(task_queue)
 
-        # _dict = {"uuid":"12313","crtime":123,
-        #                 "json_list_group":'''
-        #                 [{"docid": 254681903, "shenpi_id": null, "type_id": "254681903", "page_time": "2022-07-25", "province": "云南", "city": null, "district": null, "tenderee": "景谷傣族彝族自治县永平镇人民政府", "tenderee_contact": "", "tenderee_phone": "0879-5311786", "agency": "云南赛林工程管理咨询有限公司", "project_code": "云赛招字2022-245", "project_name": "永平镇芒腊村芒腊组组内道路建设及基础设施建设扶持项目", "doctitle": "云赛招字2022-245:景谷傣族彝族自治县永平镇人民政府永平镇芒腊村芒腊组组内道路建设及基础设施建设扶持项目施工竞争性磋商成交公告", "docchannel": "101", "stage": "施工在建", "proportion": "全长1959.8m", "projectDigest": "标的信息,工程类:标段名称:永平镇芒腊村芒腊组组内道路建设及基础设施建设扶持项目。工程类:名称:永平镇芒腊村芒腊组组内道路建设及基础设施建设扶持项目。工程类:施工工期:90日历天", "projectAddress": null, "begin_time": null, "end_time": null, "project_name_refind": "永平镇芒腊村芒腊组组内道路建设及基础设施建设扶持项目", "industry": "司法建筑", "new_enough": 1, "follow_enough": 1}]
-        #                 '''}
-        # task_queue.put(proposedBuilding_tmp(_dict))
+        _dict = {"uuid":"12313","crtime":123,
+                        "json_list_group":'''
+                        [{"docid": 218170163, "shenpi_id": null, "type_id": "218170163", "page_time": "2022-01-26", "province": "广东", "city": "深圳", "district": "罗湖", "tenderee": "深圳市城市建设开发(集团)有限公司", "tenderee_contact": "孙工", "tenderee_phone": "18998998087", "agency": "", "project_code": null, "project_name": "成都市白鹭湾(住宅用地)项目可行性研究", "doctitle": "成都市白鹭湾(住宅用地)项目可行性研究服务招标公告", "docchannel": "52", "stage": "可研阶段", "proportion": "建筑面积为32388.83㎡", "projectDigest": "招标信息),六、开标地点:深圳市罗湖区桂园街道滨河东路1011号鹿丹大厦12层会议室;(鉴于疫情原因,投标人自行决定是否到开标现场开标;如投标人需要到开标现场,请投标人关注、执行深圳市关于疫情的相关规定,并提前2天与招标人进行沟通协商。),七、投标人资格标准等详细内容详见招标文件。招标联系人:孙工;联系电话:18998998087;邮箱:sundh@szcjjt.com;张工;联系电话:13928429353;邮箱:zli@szcjjt.com", "projectAddress": null, "begin_time": null, "end_time": null, "project_name_refind": "成都市白鹭湾(住宅用地)项目可行性研究", "industry": "办公楼", "location": null, "section_num": "-1", "new_enough": 1, "follow_enough": 1}]
+                        '''}
+        task_queue.put(proposedBuilding_tmp(_dict))
 
         self.comsumer(task_queue)
 

+ 1 - 1
BaseDataMaintenance/maintenance/proposedBuilding/dataRepair.py

@@ -16,7 +16,7 @@ def turnPageTime():
 
     task_queue = Queue()
 
-    bool_query = BoolQuery(must_queries=[RangeQuery("update_time","2022-07-22")])
+    bool_query = BoolQuery(must_queries=[RangeQuery("update_time","2022-08-03")])
     rows,next_token,total_count,is_all_succeed = ots_client.search("designed_project","designed_project_index",
                                                                    SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("id")]),limit=100,get_total_count=True),
                                                                    columns_to_get=ColumnsToGet(["update_time","page_time","follows"],return_type=ColumnReturnType.SPECIFIED))

+ 5 - 2
BaseDataMaintenance/model/ots/proposedBuilding_tmp.py

@@ -416,13 +416,14 @@ class proposedBuilding_tmp(BaseModel):
         high_project_name = ""
         ordinary_name = ""
         project_follow = ""
-        page_time = ""
+
         progress = ""
         des_project_type = "30"
         status = "1"
 
         update_status = "0"
         update_time = getCurrent_date("%Y-%m-%d")
+        page_time = ""
         project_code = ""
         project_type = ""
         area = ""
@@ -469,6 +470,8 @@ class proposedBuilding_tmp(BaseModel):
             #     _version_index += 1
             progress = str(self.getStage(_group.get("stage","")))
 
+        if page_time=="":
+            page_time = update_time
         legal_contacts = []
         list_follows = self.getFollows(list_group,legal_contacts,set_enterprise)
         self.getContacts(ots_client,list_group,set_enterprise,legal_contacts)
@@ -480,7 +483,7 @@ class proposedBuilding_tmp(BaseModel):
                         "crtime":crtime,"floor_space":floor_space,"project_address":project_address,
                         "begintime":begintime,"endtime":endtime,"project_description":project_description,
                         "project_name":project_name,"ordinary_name":ordinary_name,"high_project_name":high_project_name,
-                        "project_follow":project_follow,"page_time":update_time,"progress":progress,"contacts":json.dumps(legal_contacts,ensure_ascii=False),
+                        "project_follow":project_follow,"page_time":page_time,"progress":progress,"contacts":json.dumps(legal_contacts,ensure_ascii=False),
                         "follows":json.dumps(list_follows,ensure_ascii=False),
                         "docids":",".join(list(set_docid)),"spids":",".join(list(set_spid)),"des_project_type":des_project_type,"status":status,"covered_area":covered_area,
                         "update_status":update_status,"update_time":update_time,"project_code":project_code,