|
@@ -2237,7 +2237,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
return dict_time
|
|
|
|
|
|
|
|
|
- def get_attrs_before_dump(self,docid,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type,"detail_link",'products']):
|
|
|
+ def get_attrs_before_dump(self,docid,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type,"detail_link",'products','crtime']):
|
|
|
|
|
|
bool_query = BoolQuery(must_queries=[
|
|
|
TermQuery("docid",docid)
|
|
@@ -2642,6 +2642,25 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
|
|
|
list_dict = getRow_ots(rows)
|
|
|
list_data = []
|
|
|
+
|
|
|
+ set_docids = set([doc.get(document_tmp_docid) for doc in list_dict])
|
|
|
+ current_date = getCurrent_date("%Y-%m-%d")
|
|
|
+ page_time = item.get("page_time","")
|
|
|
+ crtime = item.get("crtime","")[:10]
|
|
|
+ if page_time == '':
|
|
|
+ page_time = current_date
|
|
|
+ if crtime == '':
|
|
|
+ crtime = current_date
|
|
|
+ # 新爬取的历史数据去重时,document表无数据,补充document_tmp表的数据
|
|
|
+ if table_name=='document' and page_time<timeAdd(current_date,-7) and crtime>=timeAdd(current_date,-7):
|
|
|
+ rows, next_token, total_count, is_all_succeed = self.ots_client.search("document_tmp", "document_tmp_index",
|
|
|
+ SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(sort_column)]),limit=100,get_total_count=True),
|
|
|
+ ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
|
|
|
+ list_dict_add = getRow_ots(rows)
|
|
|
+ for doc in list_dict_add:
|
|
|
+ if doc.get(document_tmp_docid) not in set_docids:
|
|
|
+ list_dict.append(doc)
|
|
|
+
|
|
|
for _dict in list_dict:
|
|
|
self.post_extract(_dict)
|
|
|
_docid = _dict.get(document_tmp_docid)
|
|
@@ -2959,7 +2978,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
|
|
|
return list_rules,table_name,table_index
|
|
|
|
|
|
- def producer_flow_dumplicate(self,process_count,status_from,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type,"detail_link",'products']):
|
|
|
+ def producer_flow_dumplicate(self,process_count,status_from,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type,"detail_link",'products','crtime']):
|
|
|
q_size = self.queue_dumplicate.qsize()
|
|
|
log("dumplicate queue size %d"%(q_size))
|
|
|
|
|
@@ -4516,7 +4535,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
singleNum_keys = _rule["singleNum_keys"]
|
|
|
contain_keys = _rule["contain_keys"]
|
|
|
multiNum_keys = _rule["multiNum_keys"]
|
|
|
- self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle,document_tmp_attachment_path,document_tmp_source_stage,document_tmp_source_type,document_update_document,document_tmp_web_source_name,'detail_link','products'],b_log=b_log)
|
|
|
+ self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle,document_tmp_attachment_path,document_tmp_source_stage,document_tmp_source_type,document_update_document,document_tmp_web_source_name,'detail_link','products','crtime'],b_log=b_log)
|
|
|
_i += step
|
|
|
|
|
|
|
|
@@ -5336,7 +5355,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
# project_uuids为目标项目uuid,手动把docid合并到project_uuids对应的项目中
|
|
|
def test_dumplicate(self,docid,project_uuids=[]):
|
|
|
# columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status]
|
|
|
- columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type,'detail_link','products']
|
|
|
+ columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type,'detail_link','products','crtime']
|
|
|
# print('columns',columns)
|
|
|
item = self.get_attrs_before_dump(docid,columns)
|
|
|
# 是否需要把属性复制到doc_tmp表
|