|
@@ -3802,7 +3802,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
list_dynamic = json.loads(_dynamic)
|
|
list_dynamic = json.loads(_dynamic)
|
|
for _d in list_dynamic:
|
|
for _d in list_dynamic:
|
|
_title = _d.get("doctitle","")
|
|
_title = _d.get("doctitle","")
|
|
- if re.search("验收公[示告]",_title) is not None:
|
|
|
|
|
|
+ if re.search("验收公[示告]|验收结果",_title) is not None or _d.get("docchannel")==122:
|
|
is_yanshou = True
|
|
is_yanshou = True
|
|
break
|
|
break
|
|
|
|
|
|
@@ -4038,42 +4038,48 @@ class Dataflow_dumplicate(Dataflow):
|
|
|
|
|
|
bidclose_time = page_time
|
|
bidclose_time = page_time
|
|
web_source_name = item.get(document_tmp_web_source_name,"")
|
|
web_source_name = item.get(document_tmp_web_source_name,"")
|
|
|
|
+ docchannel = item.get(document_tmp_docchannel,"0")
|
|
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- if len(page_time)>0:
|
|
|
|
- l_page_time = timeAdd(page_time,days=-90)
|
|
|
|
- dict_time = item.get("dict_time",{})
|
|
|
|
- for k,v in dict_time.items():
|
|
|
|
- if v is not None and len(v)>0:
|
|
|
|
- if l_page_time>v:
|
|
|
|
- has_before = True
|
|
|
|
- if v>page_time:
|
|
|
|
- has_after = True
|
|
|
|
- if k==document_tmp_time_bidclose:
|
|
|
|
- bidclose_time = v
|
|
|
|
-
|
|
|
|
- set_web_source = {"中国招标投标公共服务平台","比地招标"}
|
|
|
|
-
|
|
|
|
- if web_source_name in set_web_source and bidclose_time<page_time:
|
|
|
|
- return False
|
|
|
|
-
|
|
|
|
- log("check page_time has_before %s has_after %s"%(str(has_before),str(has_after)))
|
|
|
|
- if has_before:
|
|
|
|
- _query = BoolQuery(must_queries=[MatchPhraseQuery(document_doctitle,item.get(document_doctitle,""))],
|
|
|
|
- must_not_queries=[TermQuery(document_docid,item.get(document_docid,0))])
|
|
|
|
- if not has_after:
|
|
|
|
- log("check page_time false %s==%s-%s"%(l_page_time,k,v))
|
|
|
|
-
|
|
|
|
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
|
|
|
|
- SearchQuery(_query,get_total_count=True,limit=1))
|
|
|
|
- if total_count>0:
|
|
|
|
- return False
|
|
|
|
- if item.get(document_web_source_name,"")=="中国政府采购网":
|
|
|
|
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
|
|
|
|
- SearchQuery(_query,get_total_count=True,limit=1))
|
|
|
|
- if total_count>0:
|
|
|
|
- return False
|
|
|
|
|
|
+ try:
|
|
|
|
+ docchannel = int(docchannel)
|
|
|
|
+ except:
|
|
|
|
+ docchannel = 0
|
|
|
|
+
|
|
|
|
+ if docchannel<200:
|
|
|
|
+
|
|
|
|
+ if len(page_time)>0:
|
|
|
|
+ l_page_time = timeAdd(page_time,days=-90)
|
|
|
|
+ dict_time = item.get("dict_time",{})
|
|
|
|
+ for k,v in dict_time.items():
|
|
|
|
+ if v is not None and len(v)>0:
|
|
|
|
+ if l_page_time>v:
|
|
|
|
+ has_before = True
|
|
|
|
+ if v>page_time:
|
|
|
|
+ has_after = True
|
|
|
|
+ if k==document_tmp_time_bidclose:
|
|
|
|
+ bidclose_time = v
|
|
|
|
+
|
|
|
|
+ set_web_source = {"中国招标投标公共服务平台","比地招标"}
|
|
|
|
+
|
|
|
|
+ if web_source_name in set_web_source and bidclose_time<page_time:
|
|
|
|
+ return False
|
|
|
|
+
|
|
|
|
+ log("check page_time has_before %s has_after %s"%(str(has_before),str(has_after)))
|
|
|
|
+ if has_before:
|
|
|
|
+ _query = BoolQuery(must_queries=[MatchPhraseQuery(document_doctitle,item.get(document_doctitle,""))],
|
|
|
|
+ must_not_queries=[TermQuery(document_docid,item.get(document_docid,0))])
|
|
|
|
+ if not has_after:
|
|
|
|
+ log("check page_time false %s==%s-%s"%(l_page_time,k,v))
|
|
|
|
+
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
|
|
|
|
+ SearchQuery(_query,get_total_count=True,limit=1))
|
|
|
|
+ if total_count>0:
|
|
|
|
+ return False
|
|
|
|
+ if item.get(document_web_source_name,"")=="中国政府采购网":
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
|
|
|
|
+ SearchQuery(_query,get_total_count=True,limit=1))
|
|
|
|
+ if total_count>0:
|
|
|
|
+ return False
|
|
|
|
|
|
return True
|
|
return True
|
|
|
|
|
|
@@ -4285,6 +4291,65 @@ class Dataflow_dumplicate(Dataflow):
|
|
mt.run()
|
|
mt.run()
|
|
|
|
|
|
|
|
|
|
|
|
+ def send_daily_check_data(self):
|
|
|
|
+ import datetime
|
|
|
|
+ def get_download_url(bucket, ObjectName, timeout):
|
|
|
|
+ url = ""
|
|
|
|
+ exist = bucket.object_exists(ObjectName)
|
|
|
|
+ if exist:
|
|
|
|
+ get_url = False
|
|
|
|
+ for i in range(3):
|
|
|
|
+ try:
|
|
|
|
+ url = bucket.sign_url('GET', ObjectName, timeout)
|
|
|
|
+ url = url.replace("-internal", "") # 替换地址里的内网标识
|
|
|
|
+ get_url = True
|
|
|
|
+ except:
|
|
|
|
+ pass
|
|
|
|
+ if get_url:
|
|
|
|
+ break
|
|
|
|
+ return url
|
|
|
|
+
|
|
|
|
+ file_timeout = 60 * 60 * 24 * 5 # 文件下载链接保存 5 天
|
|
|
|
+ # 获取昨天的日期
|
|
|
|
+ date = str(datetime.date.today() - datetime.timedelta(days=1))
|
|
|
|
+ oss_path = 'tmp_document_quality_data/'
|
|
|
|
+ object_path = oss_path + date + '/'
|
|
|
|
+ msg = "每日数据质量检查结果(报警):"
|
|
|
|
+
|
|
|
|
+ csv_name = "数据质量监控检查结果.xlsx"
|
|
|
|
+ ObjectName = object_path + csv_name
|
|
|
|
+ url = get_download_url(self.bucket,ObjectName,file_timeout)
|
|
|
|
+ if url:
|
|
|
|
+ msg += "\n文件名:\"%s\",链接:%s" % (csv_name, url)
|
|
|
|
+
|
|
|
|
+ csv_name = "公告重复量大的编号.xlsx"
|
|
|
|
+ ObjectName = object_path + csv_name
|
|
|
|
+ url = get_download_url(self.bucket, ObjectName, file_timeout)
|
|
|
|
+ if url:
|
|
|
|
+ msg += "\n文件名:\"%s\",链接:%s" % (csv_name, url)
|
|
|
|
+
|
|
|
|
+ csv_name = "公告附件重复量大的编号.xlsx"
|
|
|
|
+ ObjectName = object_path + csv_name
|
|
|
|
+ url = get_download_url(self.bucket, ObjectName, file_timeout)
|
|
|
|
+ if url:
|
|
|
|
+ msg += "\n文件名:\"%s\",链接:%s" % (csv_name, url)
|
|
|
|
+
|
|
|
|
+ csv_name = "附件识别异常的站源.xlsx"
|
|
|
|
+ ObjectName = object_path + csv_name
|
|
|
|
+ url = get_download_url(self.bucket, ObjectName, file_timeout)
|
|
|
|
+ if url:
|
|
|
|
+ msg += "\n文件名:\"%s\",链接:%s" % (csv_name, url)
|
|
|
|
+
|
|
|
|
+ csv_name = "报名时间,截止时间在发布时间之前的公告.xlsx"
|
|
|
|
+ ObjectName = object_path + csv_name
|
|
|
|
+ url = get_download_url(self.bucket, ObjectName, file_timeout)
|
|
|
|
+ if url:
|
|
|
|
+ msg += "\n文件名:\"%s\",链接:%s" % (csv_name, url)
|
|
|
|
+
|
|
|
|
+ atMobiles = ['18813973429'] # 维阵
|
|
|
|
+ ACCESS_TOKEN_DATAWORKS = "https://oapi.dingtalk.com/robot/send?access_token=9489f01c4ab9f0c3f87e2ff5c3e35eb9fb0d17afb6244de4683596df1111daea"
|
|
|
|
+ sentMsgToDD(msg,ACCESS_TOKEN_DATAWORKS,atMobiles=atMobiles)
|
|
|
|
+
|
|
|
|
|
|
def start_flow_dumplicate(self):
|
|
def start_flow_dumplicate(self):
|
|
schedule = BlockingScheduler()
|
|
schedule = BlockingScheduler()
|
|
@@ -4292,6 +4357,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
schedule.add_job(self.flow_dumpcate_comsumer,"cron",second="*/30")
|
|
schedule.add_job(self.flow_dumpcate_comsumer,"cron",second="*/30")
|
|
schedule.add_job(self.bdm.monitor_dumplicate,"cron",minute="*/10")
|
|
schedule.add_job(self.bdm.monitor_dumplicate,"cron",minute="*/10")
|
|
schedule.add_job(self.flow_remove,"cron",hour="20")
|
|
schedule.add_job(self.flow_remove,"cron",hour="20")
|
|
|
|
+ schedule.add_job(self.send_daily_check_data,"cron",hour='9', minute='10')
|
|
schedule.add_job(self.flow_remove_project_tmp,"cron",hour="20")
|
|
schedule.add_job(self.flow_remove_project_tmp,"cron",hour="20")
|
|
schedule.add_job(self.fix_doc_which_not_in_project,"cron",minute="*/10")
|
|
schedule.add_job(self.fix_doc_which_not_in_project,"cron",minute="*/10")
|
|
schedule.start()
|
|
schedule.start()
|
|
@@ -4339,7 +4405,7 @@ class Dataflow_dumplicate(Dataflow):
|
|
list_dict = getRow_ots(rows)
|
|
list_dict = getRow_ots(rows)
|
|
|
|
|
|
for item in list_dict:
|
|
for item in list_dict:
|
|
- self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=True,upgrade=False)
|
|
|
|
|
|
+ self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=True,upgrade=True)
|
|
return
|
|
return
|
|
|
|
|
|
def test_merge(self,list_docid_less,list_docid_greater):
|
|
def test_merge(self,list_docid_less,list_docid_greater):
|
|
@@ -4515,7 +4581,7 @@ if __name__ == '__main__':
|
|
# test_attachment_interface()
|
|
# test_attachment_interface()
|
|
df_dump = Dataflow_dumplicate(start_delete_listener=False)
|
|
df_dump = Dataflow_dumplicate(start_delete_listener=False)
|
|
# df_dump.start_flow_dumplicate()
|
|
# df_dump.start_flow_dumplicate()
|
|
- df_dump.test_dumplicate(576859812
|
|
|
|
|
|
+ df_dump.test_dumplicate(583564377
|
|
)
|
|
)
|
|
# compare_dumplicate_check()
|
|
# compare_dumplicate_check()
|
|
# df_dump.test_merge([391898061
|
|
# df_dump.test_merge([391898061
|