浏览代码

数据质量检查每日数据入局

znj 3 月之前
父节点
当前提交
1e8a9566ca
共有 1 个文件被更改,包括 182 次插入0 次删除
  1. 182 0
      BaseDataMaintenance/maintenance/dataflow.py

+ 182 - 0
BaseDataMaintenance/maintenance/dataflow.py

@@ -4350,6 +4350,187 @@ class Dataflow_dumplicate(Dataflow):
         ACCESS_TOKEN_DATAWORKS = "https://oapi.dingtalk.com/robot/send?access_token=9489f01c4ab9f0c3f87e2ff5c3e35eb9fb0d17afb6244de4683596df1111daea"
         sentMsgToDD(msg,ACCESS_TOKEN_DATAWORKS,atMobiles=atMobiles)
 
+    def send_daily_check_data2(self):
+        import datetime
+        import pandas as pd
+        from itertools import groupby
+        dict_channel = {"公告变更": 51,
+                         "招标公告": 52,
+                         "中标信息": 101,
+                         "招标预告": 102,
+                         "招标答疑": 103,
+                         "资审结果": 105,
+                         "法律法规": 106,
+                         "新闻资讯": 107,
+                         "采购意向": 114,
+                         "拍卖出让": 115,
+                         "土地矿产": 116,
+                         "产权交易": 117,
+                         "废标公告": 118,
+                         "候选人公示": 119,
+                         "合同公告": 120}
+        label2channel = {v:k for k,v in dict_channel.items()}
+        def post_data(url,json_data):
+            post_sucess = False
+            for i in range(3):
+                if not post_sucess:
+                    try:
+                        # 发送POST请求,传输JSON数据
+                        response = requests.post(url, json=json_data)
+                        # 检查响应状态码
+                        if response.status_code == 200:
+                            post_sucess = True
+                    except requests.exceptions.RequestException as e:
+                        log("send_daily_check_data2,post error reason: %s"%(str(e)))
+                        pass
+            return post_sucess
+
+        res_json = {
+            "data": [],
+            "count": 0
+        }
+
+        # 获取昨天的日期
+        date = str(datetime.date.today() - datetime.timedelta(days=1))
+        oss_path = 'tmp_document_quality_data/'
+        object_path = oss_path + date + '/'
+
+        csv_name = "数据质量监控检查结果.xlsx"
+        ObjectName = object_path + csv_name
+        LocalPath = os.path.join(self.current_path,"download",csv_name)
+        down_res = downloadFile(self.bucket,ObjectName,LocalPath,retry=3)
+        if down_res:
+            df = pd.read_excel(LocalPath)
+            for web_source_no,original_docchannel,error_rule in zip(df['web_source_no'],df['original_docchannel'],df['error_rule']):
+                error_rule = json.loads(error_rule)
+                for error_type,error_sample in error_rule.items():
+                    tmp_data = {
+                          "WEB_SOURCE_NO": web_source_no,
+                          "TITLE": "",
+                          "COUNT": 5,
+                          "WEBTYPE": label2channel.get(original_docchannel,""),
+                          "TYPE": error_type,
+                          "FILEMD5": ",".join([str(docid) for docid in error_sample]),
+                          "PUBDATE": "",
+                          "REGDATE": "",
+                          "ENDDATE": ""
+                        }
+                    res_json['data'].append(tmp_data)
+                    res_json['count'] += 1
+            os.remove(LocalPath)
+
+        csv_name = "公告重复量大的编号.xlsx"
+        ObjectName = object_path + csv_name
+        down_res = downloadFile(self.bucket, ObjectName, LocalPath, retry=3)
+        if down_res:
+            df = pd.read_excel(LocalPath)
+            tmp_list = []
+            for web_source_no,fingerprint,original_docchannel,doctitle,cnt in zip(df['web_source_no'], df['fingerprint'],
+                                                                      df['original_docchannel'],df['doctitle'],df['cnt']):
+                tmp_data = {
+                    "WEB_SOURCE_NO": web_source_no,
+                    "TITLE": doctitle,
+                    "COUNT": cnt,
+                    "WEBTYPE": label2channel.get(original_docchannel, ""),
+                    "TYPE": "编号公告重复",
+                    "FILEMD5": fingerprint,
+                    "PUBDATE": "",
+                    "REGDATE": "",
+                    "ENDDATE": ""
+                }
+                tmp_list.append(tmp_data)
+            tmp_list.sort(key=lambda x: x['WEB_SOURCE_NO'])
+            for key, group in groupby(tmp_list, lambda x: (x['WEB_SOURCE_NO'])):
+                group = list(group)[:5]
+                res_json['data'].extend(group)
+                res_json['count'] += len(group)
+            os.remove(LocalPath)
+
+        csv_name = "公告附件重复量大的编号.xlsx"
+        ObjectName = object_path + csv_name
+        down_res = downloadFile(self.bucket, ObjectName, LocalPath, retry=3)
+        if down_res:
+            df = pd.read_excel(LocalPath)
+            tmp_list = []
+            for web_source_no,filemd5,original_docchannel,cnt in zip(df['web_source_no'],df['filemd5'],
+                                                                      df['original_docchannel'],df['cnt']):
+                tmp_data = {
+                    "WEB_SOURCE_NO": web_source_no,
+                    "TITLE": "",
+                    "COUNT": cnt,
+                    "WEBTYPE": label2channel.get(original_docchannel, ""),
+                    "TYPE": "编号附件重复",
+                    "FILEMD5": filemd5,
+                    "PUBDATE": "",
+                    "REGDATE": "",
+                    "ENDDATE": ""
+                }
+                tmp_list.append(tmp_data)
+            tmp_list.sort(key=lambda x: x['WEB_SOURCE_NO'])
+            for key, group in groupby(tmp_list, lambda x: (x['WEB_SOURCE_NO'])):
+                group = list(group)[:5]
+                res_json['data'].extend(group)
+                res_json['count'] += len(group)
+            os.remove(LocalPath)
+
+        csv_name = "附件识别异常的站源.xlsx"
+        ObjectName = object_path + csv_name
+        down_res = downloadFile(self.bucket, ObjectName, LocalPath, retry=3)
+        if down_res:
+            df = pd.read_excel(LocalPath)
+            for web_source_no,original_docchannel,error_ratio,error_sample in zip(df['web_source_no'], df['original_docchannel'],
+                                                                        df['error_ratio'], df['error_sample']):
+                tmp_data = {
+                    "WEB_SOURCE_NO": web_source_no,
+                    "TITLE": "",
+                    "COUNT": 1,
+                    "WEBTYPE": label2channel.get(original_docchannel, ""),
+                    "TYPE": "附件识别异常",
+                    "FILEMD5": ",".join(json.loads(error_sample)[:5]),
+                    "PUBDATE": "",
+                    "REGDATE": "",
+                    "ENDDATE": ""
+                }
+                res_json['data'].append(tmp_data)
+                res_json['count'] += 1
+            os.remove(LocalPath)
+
+        csv_name = "报名时间,截止时间在发布时间之前的公告.xlsx"
+        ObjectName = object_path + csv_name
+        down_res = downloadFile(self.bucket, ObjectName, LocalPath, retry=3)
+        if down_res:
+            df = pd.read_excel(LocalPath)
+            tmp_list = []
+            for docid,doctitle,web_source_no,original_docchannel,page_time,time_bidclose,time_registration_end in zip(df['docid'],
+                                                                                     df['doctitle'],df['web_source_no'],df['original_docchannel'],
+                                                                                     df['page_time'],df['time_bidclose'],df['time_registration_end']):
+                time_registration_end = time_registration_end if str(time_registration_end) and str(time_registration_end) != 'nan' else ""
+                time_bidclose = time_bidclose if str(time_bidclose) and str(time_bidclose) != 'nan' else ""
+                tmp_data = {
+                    "WEB_SOURCE_NO": web_source_no,
+                    "TITLE": doctitle,
+                    "COUNT": 1,
+                    "WEBTYPE": label2channel.get(original_docchannel, ""),
+                    "TYPE": "截止日期在发布日期之前",
+                    "FILEMD5": str(docid),
+                    "PUBDATE": page_time[:10],
+                    "REGDATE": time_registration_end[:10],
+                    "ENDDATE": time_bidclose[:10]
+                }
+                tmp_list.append(tmp_data)
+            tmp_list.sort(key=lambda x: x['WEB_SOURCE_NO'])
+            for key, group in groupby(tmp_list, lambda x: (x['WEB_SOURCE_NO'])):
+                group = list(group)[:5]
+                res_json['data'].extend(group)
+                res_json['count'] += len(group)
+            os.remove(LocalPath)
+
+        # url = "http://120.132.118.205:17090/saveQualityListData"
+        url = "http://data-monitor.bidizhaobiao.com/oldApi/saveQualityListData"
+        res = post_data(url,res_json)
+        if res:
+            log("send_daily_check_data2,sent data len: %d"%(res_json['count']))
+
 
     def start_flow_dumplicate(self):
         schedule = BlockingScheduler()
@@ -4358,6 +4539,7 @@ class Dataflow_dumplicate(Dataflow):
         schedule.add_job(self.bdm.monitor_dumplicate,"cron",minute="*/10")
         schedule.add_job(self.flow_remove,"cron",hour="20")
         schedule.add_job(self.send_daily_check_data,"cron",hour='9', minute='10')
+        schedule.add_job(self.send_daily_check_data2,"cron",hour='9', minute='10')
         schedule.add_job(self.flow_remove_project_tmp,"cron",hour="20")
         schedule.add_job(self.fix_doc_which_not_in_project,"cron",minute="*/10")
         schedule.start()