Jelajahi Sumber

提取流程不再额外调用行业分类接口,拟在建优化入库

luojiehua 2 tahun lalu
induk
melakukan
b5a35ca598

+ 7 - 7
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -619,13 +619,13 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
                 else:
                     log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
                     all_done = -2
-            if all_done>0:
-                resp = self.request_industry_interface(json=data,headers=self.header)
-                if (resp.status_code >=200 and resp.status_code<=213):
-                    _extract.setValue(document_extract2_industry_json,resp.content.decode("utf8"),True)
-                else:
-                    log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
-                    all_done = -3
+            # if all_done>0:
+            #     resp = self.request_industry_interface(json=data,headers=self.header)
+            #     if (resp.status_code >=200 and resp.status_code<=213):
+            #         _extract.setValue(document_extract2_industry_json,resp.content.decode("utf8"),True)
+            #     else:
+            #         log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
+            #         all_done = -3
             _to_ack = False
             if all_done>0 and len(_extract.getProperties().get(document_extract2_extract_json,""))<=2 or len(_extract.getProperties().get(document_extract2_industry_json,""))<=2:
                 all_done = -4

+ 64 - 0
BaseDataMaintenance/maintenance/proposedBuilding/dataRepair.py

@@ -0,0 +1,64 @@
+
+
+from tablestore import *
+from BaseDataMaintenance.model.ots.designed_project import designed_project
+
+from BaseDataMaintenance.dataSource.source import getConnect_ots
+from BaseDataMaintenance.common.Utils import *
+from queue import Queue
+
+from BaseDataMaintenance.common.multiThread import MultiThreadHandler
+
+import json
+
+def turnPageTime():
+    ots_client = getConnect_ots()
+
+    task_queue = Queue()
+
+    bool_query = BoolQuery(must_queries=[RangeQuery("update_time","2022-07-22")])
+    rows,next_token,total_count,is_all_succeed = ots_client.search("designed_project","designed_project_index",
+                                                                   SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("id")]),limit=100,get_total_count=True),
+                                                                   columns_to_get=ColumnsToGet(["update_time","page_time","follows"],return_type=ColumnReturnType.SPECIFIED))
+    list_data = getRow_ots(rows)
+    for _data in list_data:
+        task_queue.put(_data)
+    _count = len(list_data)
+    log("count/total_count:%d/%d"%(_count,total_count))
+
+
+    while next_token:
+        rows,next_token,total_count,is_all_succeed = ots_client.search("designed_project","designed_project_index",
+                                                                       SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+                                                                       columns_to_get=ColumnsToGet(["update_time","page_time","follows"],return_type=ColumnReturnType.SPECIFIED))
+        list_data = getRow_ots(rows)
+        for _data in list_data:
+            task_queue.put(_data)
+
+        _count += len(list_data)
+        log("count/total_count:%d/%d"%(_count,total_count))
+
+    def _handle(item,result_queue):
+
+
+        list_follow = json.loads(item.get("follows","[]"))
+
+        item.pop("follows")
+        dp = designed_project(item)
+        if len(list_follow)==0:
+            dp.setValue("page_time",dp.getProperties().get("update_time"))
+        else:
+            list_follow.sort(key=lambda x:x.get("mdate",0),reverse=True)
+            _page_time = list_follow[0].get("mdate")[:10]
+            dp.setValue("page_time",_page_time)
+
+
+
+        dp.update_row(ots_client)
+    mt = MultiThreadHandler(task_queue,_handle,None,30)
+    mt.run()
+
+
+
+if __name__ == '__main__':
+    turnPageTime()

+ 12 - 115
BaseDataMaintenance/model/ots/document_tmp.py

@@ -167,7 +167,7 @@ def turn_extract_status():
         print(total_count)
         _count = len(list_data)
         for _data in list_data:
-            _document = Document(_data)
+            _document = Document_tmp(_data)
             task_queue.put(_document)
         while next_token:
             rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_tmp_index",
@@ -177,7 +177,7 @@ def turn_extract_status():
             _count += len(list_data)
             print("%d/%d"%(_count,total_count))
             for _data in list_data:
-                _document = Document(_data)
+                _document = Document_tmp(_data)
                 task_queue.put(_document)
 
     def _handle(item,result_queue,ots_client):
@@ -259,27 +259,29 @@ def turn_document_tmp_status():
             #                                      ]
             #     )
             # ],
-            must_not_queries=[ExistsQuery("fingerprint")]
+            must_not_queries=[ExistsQuery("status"),
+                              ExistsQuery("page_time"),
+                              ]
         )
 
-        rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_tmp_index",
+        rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
                                                                        SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
                                                                        columns_to_get=ColumnsToGet(["extract_json"],return_type=ColumnReturnType.SPECIFIED))
         list_data = getRow_ots(rows)
         print(total_count)
         _count = len(list_data)
         for _data in list_data:
-            _document = Document(_data)
+            _document = Document_tmp(_data)
             task_queue.put(_document)
         while next_token:
-            rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_tmp_index",
+            rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
                                                                            SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
                                                                            columns_to_get=ColumnsToGet(["extract_json"],return_type=ColumnReturnType.SPECIFIED))
             list_data = getRow_ots(rows)
             _count += len(list_data)
             print("%d/%d"%(_count,total_count))
             for _data in list_data:
-                _document = Document(_data)
+                _document = Document_tmp(_data)
                 task_queue.put(_document)
 
         # docids = [223820830,224445409]
@@ -318,114 +320,9 @@ def turn_document_tmp_status():
 
         #change status
         # item.setValue(document_tmp_docchannel,item.getProperties().get(document_tmp_original_docchannel),True)
-        item.setValue(document_tmp_status,random.randint(151,171),True)
-        item.update_row(ots_client)
-        log("update %d status done"%(item.getProperties().get(document_tmp_docid)))
-        pass
-
-
-    t_producer = Thread(target=producer,kwargs={"task_queue":task_queue,"ots_client":ots_client})
-    t_producer.start()
-    t_producer.join()
-    mt = MultiThreadHandler(task_queue,_handle,None,30,ots_client=ots_client)
-    mt.run()
-
-def drop_extract2():
-    from BaseDataMaintenance.dataSource.source import getConnect_ots
-    from BaseDataMaintenance.common.multiThread import MultiThreadHandler
-    import queue
-    from threading import Thread
-    import json
-    task_queue = queue.Queue()
-    from BaseDataMaintenance.model.ots.attachment import attachment_filemd5,attachment_file_title,attachment_file_link
-    ots_client = getConnect_ots()
-    from BaseDataMaintenance.model.ots.document_tmp_extract2 import document_tmp_extract2
-
-    def producer(task_queue,ots_client):
-
-
-        bool_query = BoolQuery(must_queries=[
-            BoolQuery(should_queries=[
-                # TermQuery("tenderee","山西利民工业有限责任公司"),
-                # MatchPhraseQuery("doctitle","中国电信"),
-                # MatchPhraseQuery("doctextcon","中国电信"),
-                # MatchPhraseQuery("attachmenttextcon","中国电信")]),
-                RangeQuery("status",1,1000,True,True),
-                # RangeQuery("page_time","2021-12-20","2022-01-05",True,False),
-                #,TermQuery(document_tmp_docid,171146519)
-                ]
-            ),
-            # TermQuery("docid",228359000)
-        ],
-            # must_not_queries=[NestedQuery("sub_docs_json",WildcardQuery("sub_docs_json.win_tenderer","*"))]
-        )
-
-        rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp_extract2","document_tmp_extract2_index",
-                                                                       SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
-                                                                       columns_to_get=ColumnsToGet(["status"],return_type=ColumnReturnType.SPECIFIED))
-        list_data = getRow_ots(rows)
-        print(total_count)
-        _count = len(list_data)
-        for _data in list_data:
-            task_queue.put(_data)
-        while next_token:
-            rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp_extract2","document_tmp_extract2_index",
-                                                                           SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
-                                                                           columns_to_get=ColumnsToGet(["status"],return_type=ColumnReturnType.SPECIFIED))
-            list_data = getRow_ots(rows)
-            _count += len(list_data)
-            print("%d/%d"%(_count,total_count))
-            for _data in list_data:
-                task_queue.put(_data)
-
-        # docids = [223820830,224445409]
-        # for docid in docids:
-        #     _dict = {document_tmp_docid:int(docid),
-        #              document_tmp_partitionkey:int(docid)%500+1,
-        #              }
-        #     task_queue.put(Document(_dict))
-        # import pandas as pd
-        # df = pd.read_excel("2022-01-19_214304_export11.xlsx")
-        # for docid,tenderee,win in zip(df["docid"],df["招标单位"],df["中标单位"]):
-        #     if not isinstance(tenderee,(str)) or not isinstance(win,(str)) or win=="" or tenderee=="":
-        #         # print(docid)
-        #         _dict = {document_tmp_docid:int(docid),
-        #                  document_tmp_partitionkey:int(docid)%500+1,
-        #                  }
-        #         task_queue.put(Document(_dict))
-        log("task_queue size:%d"%(task_queue.qsize()))
-
-    def _handle(item,result_queue,ots_client):
-        #change attach value
-        # list_attachment = json.loads(item.getProperties().get(document_tmp_attachment_path))
-        # print("docid",item.getProperties().get(document_tmp_docid))
-        # for attach in list_attachment:
-        #
-        #     filemd5 = attach.get(document_tmp_attachment_path_filemd5,"")
-        #     _document_tmp_html = item.getProperties().get(document_tmp_dochtmlcon,"")
-        #
-        #     _file_title = item.getTitleFromHtml(filemd5,_document_tmp_html)
-        #     filelink = item.getSourceLinkFromHtml(filemd5,_document_tmp_html)
-        #     attach[document_tmp_attachment_path_fileTitle] = _file_title
-        #     attach[document_tmp_attachment_path_fileLink] = filelink
-        #
-        # item.setValue(document_tmp_attachment_path,json.dumps(list_attachment,ensure_ascii=False),True)
-        # item.all_columns.remove(document_tmp_dochtmlcon)
-
-        #change status
-        # item.setValue(document_tmp_docchannel,item.getProperties().get(document_tmp_original_docchannel),True)
-        # item.setValue(document_tmp_status,random.randint(151,170),True)
+        # item.setValue(document_tmp_status,random.randint(151,171),True)
         # item.update_row(ots_client)
         # log("update %d status done"%(item.getProperties().get(document_tmp_docid)))
-        _dict = {}
-        _dict.update(item)
-        _dict.pop("status")
-        _dict["status"] = 1
-        print(_dict)
-        _document = Document(_dict)
-        _document.update_row(ots_client)
-        _d_extract = document_tmp_extract2(_dict)
-        _d_extract.delete_row(ots_client)
         pass
 
 
@@ -436,7 +333,7 @@ def drop_extract2():
     mt.run()
 
 
+
 if __name__=="__main__":
     # turn_extract_status()
-    turn_document_tmp_status()
-    # drop_extract2()
+    turn_document_tmp_status()

+ 2 - 2
BaseDataMaintenance/model/ots/proposedBuilding_tmp.py

@@ -452,7 +452,7 @@ class proposedBuilding_tmp(BaseModel):
                 project_name = _group.get("project_name","")
                 high_project_name = project_name
                 ordinary_name = project_name
-            if page_time =="":
+            if _group.get("page_time","")>page_time:
                 page_time = _group.get("page_time","")
             if project_type=="":
                 project_type = _group.get("industry","")
@@ -480,7 +480,7 @@ class proposedBuilding_tmp(BaseModel):
                         "crtime":crtime,"floor_space":floor_space,"project_address":project_address,
                         "begintime":begintime,"endtime":endtime,"project_description":project_description,
                         "project_name":project_name,"ordinary_name":ordinary_name,"high_project_name":high_project_name,
-                        "project_follow":project_follow,"page_time":page_time,"progress":progress,"contacts":json.dumps(legal_contacts,ensure_ascii=False),
+                        "project_follow":project_follow,"page_time":update_time,"progress":progress,"contacts":json.dumps(legal_contacts,ensure_ascii=False),
                         "follows":json.dumps(list_follows,ensure_ascii=False),
                         "docids":",".join(list(set_docid)),"spids":",".join(list(set_spid)),"des_project_type":des_project_type,"status":status,"covered_area":covered_area,
                         "update_status":update_status,"update_time":update_time,"project_code":project_code,