Browse Source

要素提取对重复入库的数据直接查库,以增加提取速度

luojiehua 2 years ago
parent
commit
2f10a1211c

+ 1 - 1
BaseDataMaintenance/dataSource/setttings.py

@@ -66,7 +66,7 @@ activateMQ_pswd = "admin"
 
 
 # attach_postgres_host = "121.46.18.113"
-attach_postgres_host = "127.0.0.1"
+# attach_postgres_host = "127.0.0.1"
 attach_postgres_host = "192.168.0.114"
 attach_postgres_port = 5432
 attach_postgres_user = "postgres"

+ 1 - 1
BaseDataMaintenance/maintenance/dataflow.py

@@ -2843,4 +2843,4 @@ if __name__ == '__main__':
     # test_attachment_interface()
     df_dump = Dataflow_dumplicate()
     # df_dump.start_flow_dumplicate()
-    df_dump.test_dumplicate(25126084)
+    df_dump.test_dumplicate(258925781)

+ 66 - 13
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -12,6 +12,8 @@ from BaseDataMaintenance.dataSource.pool import ConnectorPool
 from BaseDataMaintenance.model.ots.document import Document
 
 from BaseDataMaintenance.common.Utils import article_limit
+from BaseDataMaintenance.common.documentFingerprint import getFingerprint
+from BaseDataMaintenance.model.postgres.document_extract import *
 
 class ActiveMQListener():
 
@@ -523,33 +525,45 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
 
         self.industy_url = "http://127.0.0.1:15000/industry_extract"
 
-        self.extract_interfaces = [["http://127.0.0.1:15030/content_extract",15],
-                                   ["http://192.168.0.115:15030/content_extract",9]
+        self.extract_interfaces = [["http://127.0.0.1:15030/content_extract",17],
+                                   ["http://192.168.0.115:15030/content_extract",3]
                                    ]
 
 
+
         self.mq_extract = "/queue/dataflow_extract"
         self.mq_extract_failed = "/queue/dataflow_extract_failed"
 
-        whole_weight = 0
+        self.whole_weight = 0
         for _url,weight in self.extract_interfaces:
-            whole_weight+= weight
+            self.whole_weight+= weight
         current_weight = 0
         for _i in range(len(self.extract_interfaces)):
             current_weight += self.extract_interfaces[_i][1]
-            self.extract_interfaces[_i][1] = current_weight/whole_weight
+            self.extract_interfaces[_i][1] = current_weight/self.whole_weight
 
 
+        self.block_url = RLock()
+        self.url_count = 0
 
         self.comsumer_count = 20
         for _i in range(self.comsumer_count):
             listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle)
             createComsumer(listener_extract,self.mq_extract)
+        self.pool_postgres = ConnectorPool(10,self.comsumer_count,getConnection_postgres)
         self.conn_mq = getConnect_activateMQ()
         self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
 
+
     def getExtract_url(self):
-        _r = random.random()
+        _url_num = 0
+        with self.block_url:
+            self.url_count += 1
+            self.url_count %= self.whole_weight
+            _url_num = self.url_count
+
+        # _r = random.random()
+        _r = _url_num/self.whole_weight
         for _i in range(len(self.extract_interfaces)):
             if _r<=self.extract_interfaces[_i][1]:
                 return self.extract_interfaces[_i][0]
@@ -559,6 +573,7 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
         # _i = 0
         # _url = self.extract_interfaces[_i]
         _url = self.getExtract_url()
+        log("extract_url:%s"%(str(_url)))
         resp = requests.post(_url,json=json,headers=headers)
         return resp
 
@@ -604,9 +619,31 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
         self.comsumer()
 
     def comsumer(self):
-        mt = MultiThreadHandler(self.queue_extract,self.comsumer_handle,None,50,1,True)
+        mt = MultiThreadHandler(self.queue_extract,self.comsumer_handle,None,self.comsumer_count,1,True)
         mt.run()
 
+
+    def getExtract_json_fromDB(self,_fingerprint):
+        conn = self.pool_postgres.getConnector()
+        try:
+            list_extract = Document_extract_postgres.select_rows(conn,Document_extract_postgres,"document_extract",[" fingerprint='%s'"%_fingerprint])
+            if len(list_extract)>0:
+                _extract = list_extract[0]
+                return _extract.getProperties().get(document_extract_extract_json)
+        except Exception as e:
+            traceback.print_exc()
+        finally:
+            self.pool_postgres.putConnector(conn)
+        return None
+
+    def putExtract_json_toDB(self,fingerprint,docid,extract_json):
+        _de = Document_extract_postgres({document_extract_fingerprint:fingerprint,
+                                         document_extract_docid:docid,
+                                         document_extract_extract_json:extract_json})
+        _de.insert_row(self.pool_postgres,1)
+
+
+
     def comsumer_handle(self,_dict,result_queue):
         try:
             frame = _dict["frame"]
@@ -640,20 +677,32 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
             for k,v in item.items():
                 data[k] = v
             data["timeout"] = 440
-            data["doc_id"] = data.get(document_tmp_docid)
+            data["doc_id"] = data.get(document_tmp_docid,0)
             data["content"] = data.get(document_tmp_dochtmlcon,"")
             if document_tmp_dochtmlcon in data:
                 data.pop(document_tmp_dochtmlcon)
             data["title"] = data.get(document_tmp_doctitle,"")
             data["web_source_no"] = item.get(document_tmp_web_source_no,"")
             data["original_docchannel"] = item.get(document_tmp_original_docchannel,"")
+
+            _fingerprint = getFingerprint(str(data["title"])+str(data["content"]))
+
             if all_done>0:
-                resp = self.request_extract_interface(json=data,headers=self.header)
-                if (resp.status_code >=200 and resp.status_code<=213):
-                    _extract.setValue(document_extract2_extract_json,resp.content.decode("utf8"),True)
+                extract_json = self.getExtract_json_fromDB(_fingerprint)
+                # extract_json = None
+                _docid = int(data["doc_id"])
+                if extract_json is not None:
+                    log("fingerprint %s exists docid:%s"%(_fingerprint,str(_docid)))
+                    _extract.setValue(document_extract2_extract_json,extract_json,True)
                 else:
-                    log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
-                    all_done = -2
+                    resp = self.request_extract_interface(json=data,headers=self.header)
+                    if (resp.status_code >=200 and resp.status_code<=213):
+                        extract_json = resp.content.decode("utf8")
+                        _extract.setValue(document_extract2_extract_json,extract_json,True)
+                        self.putExtract_json_toDB(_fingerprint,_docid,extract_json)
+                    else:
+                        log("%s--%s"%(str(resp.status_code),resp.content.decode("utf8")))
+                        all_done = -2
             # if all_done>0:
             #     resp = self.request_industry_interface(json=data,headers=self.header)
             #     if (resp.status_code >=200 and resp.status_code<=213):
@@ -723,6 +772,10 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
             if _to_ack:
                 ackMsg(conn,message_id,subscription)
             log("process %s docid:%d %s"%(str(_to_ack),data["doc_id"],str(all_done)))
+        except requests.ConnectionError as e1:
+            item["extract_times"] -= 1
+            if send_msg_toacmq(self.pool_mq,json.dumps(item,ensure_ascii=False),self.mq_extract):
+                ackMsg(conn,message_id,subscription)
         except Exception as e:
             traceback.print_exc()
             sentMsgToDD("要素提取失败:docid:%d with result:%s"%(item.get(document_tmp_docid),str(e)))

+ 33 - 0
BaseDataMaintenance/model/ots/document_extract2.py

@@ -40,3 +40,36 @@ class Document_extract(BaseModel):
     #     raise NotImplementedError()
 
 
+if __name__ == '__main__':
+
+    from BaseDataMaintenance.dataSource.source import getConnect_ots
+    from tablestore import *
+    from BaseDataMaintenance.common.Utils import *
+
+    ots_client = getConnect_ots()
+    bool_query = BoolQuery(must_queries=[RangeQuery("docid",1)])
+    rows,next_token,total_count,is_all_succeed = ots_client.search("document_extract","document_extract_index",
+                                                                   SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
+                                                                   ColumnsToGet(return_type=ColumnReturnType.ALL))
+    list_data = getRow_ots(rows)
+    for _data in list_data:
+        extract_json = _data["extract_json"]
+        extract_json = re.sub("\\\\n","",extract_json)
+        extract_json = re.sub("\\\\","",extract_json)
+        de = Document_extract(_data)
+        de.setValue("extract_json",extract_json,True)
+        de.update_row(ots_client)
+    while next_token:
+        rows,next_token,total_count,is_all_succeed = ots_client.search("document_extract","document_extract_index",
+                                                                       SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+                                                                       ColumnsToGet(return_type=ColumnReturnType.ALL))
+        list_data = getRow_ots(rows)
+        for _data in list_data:
+            extract_json = _data["extract_json"]
+            extract_json = re.sub("\\\\n","",extract_json)
+            extract_json = re.sub("\\\\","",extract_json)
+            de = Document_extract(_data)
+            de.setValue("extract_json",extract_json,True)
+            de.update_row(ots_client)
+
+