|
@@ -5,7 +5,8 @@ from apscheduler.schedulers.blocking import BlockingScheduler
|
|
from tablestore import *
|
|
from tablestore import *
|
|
from BaseDataMaintenance.dataSource.source import getConnect_ots,getAuth,is_internal
|
|
from BaseDataMaintenance.dataSource.source import getConnect_ots,getAuth,is_internal
|
|
from BaseDataMaintenance.dataSource.interface import *
|
|
from BaseDataMaintenance.dataSource.interface import *
|
|
-from multiprocessing import Queue as PQueue,Process
|
|
|
|
|
|
+from multiprocessing import Queue as PQueue
|
|
|
|
+from multiprocessing import Process
|
|
from BaseDataMaintenance.model.ots.document_product import *
|
|
from BaseDataMaintenance.model.ots.document_product import *
|
|
from BaseDataMaintenance.model.ots.attachment import *
|
|
from BaseDataMaintenance.model.ots.attachment import *
|
|
from BaseDataMaintenance.common.Utils import *
|
|
from BaseDataMaintenance.common.Utils import *
|
|
@@ -52,7 +53,9 @@ class Product_Attachment_Processor():
|
|
attachment_size = getQueueSize("dataflow_attachment")
|
|
attachment_size = getQueueSize("dataflow_attachment")
|
|
if attachment_size<100:
|
|
if attachment_size<100:
|
|
|
|
|
|
- if self.product_attachment_queue.qsize()>self.product_attachment_queue_size/3:
|
|
|
|
|
|
+ _qsize = self.product_attachment_queue.qsize()
|
|
|
|
+ log("product_attachment_queue %d"%(_qsize))
|
|
|
|
+ if _qsize>self.product_attachment_queue_size/3:
|
|
return
|
|
return
|
|
bool_query = BoolQuery(must_queries=[
|
|
bool_query = BoolQuery(must_queries=[
|
|
TermQuery("parameter_status",parameter_status_to_process)
|
|
TermQuery("parameter_status",parameter_status_to_process)
|
|
@@ -67,6 +70,7 @@ class Product_Attachment_Processor():
|
|
_count = 0
|
|
_count = 0
|
|
for data in list_data:
|
|
for data in list_data:
|
|
_id = data.get(DOCUMENT_PRODUCT_ID)
|
|
_id = data.get(DOCUMENT_PRODUCT_ID)
|
|
|
|
+ list_id.append(_id)
|
|
if _id in self.set_product_attachment:
|
|
if _id in self.set_product_attachment:
|
|
continue
|
|
continue
|
|
docid = data.get(DOCUMENT_PRODUCT_DOCID)
|
|
docid = data.get(DOCUMENT_PRODUCT_DOCID)
|
|
@@ -74,7 +78,6 @@ class Product_Attachment_Processor():
|
|
dict_docid_list[docid] = []
|
|
dict_docid_list[docid] = []
|
|
dict_docid_list[docid].append(data)
|
|
dict_docid_list[docid].append(data)
|
|
|
|
|
|
- list_id.append(_id)
|
|
|
|
_count += 1
|
|
_count += 1
|
|
while next_token:
|
|
while next_token:
|
|
if len(dict_docid_list.keys())>=self.product_attachment_queue_size:
|
|
if len(dict_docid_list.keys())>=self.product_attachment_queue_size:
|
|
@@ -85,17 +88,20 @@ class Product_Attachment_Processor():
|
|
list_data = getRow_ots(rows)
|
|
list_data = getRow_ots(rows)
|
|
for data in list_data:
|
|
for data in list_data:
|
|
_id = data.get(DOCUMENT_PRODUCT_ID)
|
|
_id = data.get(DOCUMENT_PRODUCT_ID)
|
|
|
|
+ list_id.append(_id)
|
|
if _id in self.set_product_attachment:
|
|
if _id in self.set_product_attachment:
|
|
continue
|
|
continue
|
|
docid = data.get(DOCUMENT_PRODUCT_DOCID)
|
|
docid = data.get(DOCUMENT_PRODUCT_DOCID)
|
|
if docid not in dict_docid_list:
|
|
if docid not in dict_docid_list:
|
|
dict_docid_list[docid] = []
|
|
dict_docid_list[docid] = []
|
|
dict_docid_list[docid].append(data)
|
|
dict_docid_list[docid].append(data)
|
|
- list_id.append(_id)
|
|
|
|
|
|
+
|
|
_count += 1
|
|
_count += 1
|
|
for k,v in dict_docid_list.items():
|
|
for k,v in dict_docid_list.items():
|
|
self.product_attachment_queue.put(v)
|
|
self.product_attachment_queue.put(v)
|
|
- self.set_product_attachment = set(list_id)
|
|
|
|
|
|
+ _qsize = self.product_attachment_queue.qsize()
|
|
|
|
+ log("after product_attachment_queue %d"%(_qsize))
|
|
|
|
+ self.set_product_attachment = set(list_id)
|
|
|
|
|
|
def get_whole_html(self,_filemd5):
|
|
def get_whole_html(self,_filemd5):
|
|
atta = attachment({attachment_filemd5:_filemd5})
|
|
atta = attachment({attachment_filemd5:_filemd5})
|
|
@@ -251,7 +257,7 @@ class Product_Attachment_Processor():
|
|
mt.run()
|
|
mt.run()
|
|
|
|
|
|
def process_parameters_comsumer(self,):
|
|
def process_parameters_comsumer(self,):
|
|
- # process_count = 2
|
|
|
|
|
|
+ # process_count = 3
|
|
# list_process = []
|
|
# list_process = []
|
|
# for i in range(process_count):
|
|
# for i in range(process_count):
|
|
# p = Process(target=self.start_process)
|
|
# p = Process(target=self.start_process)
|