Переглянути джерело

数据质量检查定期发送

znj 4 місяців тому
батько
коміт
63044fae75

+ 41 - 8
BaseDataMaintenance/dataSource/interface.py

@@ -73,7 +73,32 @@ ACCESS_TOKEN_SUANFA = "https://oapi.dingtalk.com/robot/send?access_token=eec7d42
 ACCESS_TOKEN_DATAWORKS = "https://oapi.dingtalk.com/robot/send?access_token=9489f01c4ab9f0c3f87e2ff5c3e35eb9fb0d17afb6244de4683596df1111daea"
 
 
-def sentMsgToDD(msg,access_token=ACCESS_TOKEN_SUANFA,atAll=False):
+# def sentMsgToDD(msg,access_token=ACCESS_TOKEN_SUANFA,atAll=False):
+#     timestamp = str(round(time.time() * 1000))
+#     secret = 'SECb1c5d36f73fb7cd36f91c71cb05441a7bbdad872e051234a626c7d7ceba6ee6a'
+#     secret_enc = secret.encode('utf-8')
+#     string_to_sign = '{}\n{}'.format(timestamp, secret)
+#     string_to_sign_enc = string_to_sign.encode('utf-8')
+#     hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
+#     sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
+#     # print(timestamp)
+#     # print(sign)
+#
+#        #导入依赖库
+#     headers={'Content-Type': 'application/json'}   #定义数据类型
+#     webhook = "%s&timestamp=%s&sign=%s"%(access_token,timestamp,sign)
+#     #定义要发送的数据
+#     #"at": {"atMobiles": "['"+ mobile + "']"
+#     data = {
+#         "msgtype": "text",
+#         "text": {"content": msg},
+#         "isAtAll": False,
+#         "at":{"isAtAll": atAll}
+#     }
+#     res = requests.post(webhook, data=json.dumps(data), headers=headers)   #发送post请求
+#     # print(res.status_code)
+
+def sentMsgToDD(msg,access_token=ACCESS_TOKEN_SUANFA,atAll=False,atMobiles=[]):
     timestamp = str(round(time.time() * 1000))
     secret = 'SECb1c5d36f73fb7cd36f91c71cb05441a7bbdad872e051234a626c7d7ceba6ee6a'
     secret_enc = secret.encode('utf-8')
@@ -89,17 +114,25 @@ def sentMsgToDD(msg,access_token=ACCESS_TOKEN_SUANFA,atAll=False):
     webhook = "%s&timestamp=%s&sign=%s"%(access_token,timestamp,sign)
     #定义要发送的数据
     #"at": {"atMobiles": "['"+ mobile + "']"
-    data = {
-        "msgtype": "text",
-        "text": {"content": msg},
-        "isAtAll": False,
-        "at":{"isAtAll": atAll}
-    }
+    if atMobiles: # at 特定人群手机号
+        data = {
+            "msgtype": "text",
+            "text": {"content": msg},
+            "isAtAll": False,
+            "at": {'atMobiles':atMobiles
+                    ,"isAtAll": False}
+        }
+    else:
+        data = {
+            "msgtype": "text",
+            "text": {"content": msg},
+            "isAtAll": False,
+            "at":{"isAtAll": atAll}
+        }
     res = requests.post(webhook, data=json.dumps(data), headers=headers)   #发送post请求
     # print(res.status_code)
 
 
-
 if __name__=="__main__":
     # print(getAttachDealInterface(base64.b64encode(open("F://Workspace2016/BaseDataMaintenance/BaseDataMaintenance/maintenance/attachment/readme.md","rb").read()),"pdf"))
     # sentMsgToDD("测试消息")

+ 51 - 0
BaseDataMaintenance/maintenance/dataflow.py

@@ -4291,6 +4291,56 @@ class Dataflow_dumplicate(Dataflow):
             mt.run()
 
 
+    def send_daily_check_data(self):
+        import datetime
+        def get_download_url(bucket, ObjectName, timeout):
+            url = ""
+            exist = bucket.object_exists(ObjectName)
+            if exist:
+                url = bucket.sign_url('GET', ObjectName, timeout)
+            return url
+
+        file_timeout = 60 * 60 * 24 * 5 # 文件下载链接保存 5 天
+        # 获取昨天的日期
+        date = str(datetime.date.today() - datetime.timedelta(days=1))
+        oss_path = 'tmp_document_quality_data/'
+        object_path = oss_path + date + '/'
+        msg = "每日数据质量检查结果(报警):"
+
+        csv_name = "数据质量监控检查结果.csv"
+        ObjectName = object_path + csv_name
+        url = get_download_url(self.bucket,ObjectName,file_timeout)
+        if url:
+            msg += "\n文件名:\"%s\",链接:%s"%(csv_name,url)
+
+        csv_name = "公告重复量大的编号.csv"
+        ObjectName = object_path + csv_name
+        url = get_download_url(self.bucket, ObjectName, file_timeout)
+        if url:
+            msg += "\n文件名:\"%s\",链接:%s" % (csv_name, url)
+
+        csv_name = "公告附件重复量大的编号.csv"
+        ObjectName = object_path + csv_name
+        url = get_download_url(self.bucket, ObjectName, file_timeout)
+        if url:
+            msg += "\n文件名:\"%s\",链接:%s" % (csv_name, url)
+
+        csv_name = "附件识别异常的站源.csv"
+        ObjectName = object_path + csv_name
+        url = get_download_url(self.bucket, ObjectName, file_timeout)
+        if url:
+            msg += "\n文件名:\"%s\",链接:%s" % (csv_name, url)
+
+        csv_name = "报名时间,截止时间在发布时间之前的公告.csv"
+        ObjectName = object_path + csv_name
+        url = get_download_url(self.bucket, ObjectName, file_timeout)
+        if url:
+            msg += "\n文件名:\"%s\",链接:%s" % (csv_name, url)
+
+        atMobiles = ['18813973429'] # 维阵
+        ACCESS_TOKEN_DATAWORKS = "https://oapi.dingtalk.com/robot/send?access_token=9489f01c4ab9f0c3f87e2ff5c3e35eb9fb0d17afb6244de4683596df1111daea"
+        sentMsgToDD(msg,ACCESS_TOKEN_DATAWORKS,atMobiles=atMobiles)
+
 
     def start_flow_dumplicate(self):
         schedule = BlockingScheduler()
@@ -4298,6 +4348,7 @@ class Dataflow_dumplicate(Dataflow):
         schedule.add_job(self.flow_dumpcate_comsumer,"cron",second="*/30")
         schedule.add_job(self.bdm.monitor_dumplicate,"cron",minute="*/10")
         schedule.add_job(self.flow_remove,"cron",hour="20")
+        schedule.add_job(self.send_daily_check_data,"cron",hour='9', minute='10')
         schedule.add_job(self.flow_remove_project_tmp,"cron",hour="20")
         schedule.add_job(self.fix_doc_which_not_in_project,"cron",minute="*/10")
         schedule.start()