ソースを参照

新增 合法实体添加到Redis定时流程

znj 1 年間 前
コミット
e93e0cfc17
1 ファイル変更280 行追加0 行削除
  1. 280 0
      BaseDataMaintenance/maintenance/enterprise/enterprise2Redis.py

+ 280 - 0
BaseDataMaintenance/maintenance/enterprise/enterprise2Redis.py

@@ -0,0 +1,280 @@
+
+
+import os,sys
+
+from apscheduler.schedulers.blocking import BlockingScheduler
+
+from BaseDataMaintenance.dataSource.source import getConnect_ots,getConnect_activateMQ
+
+from BaseDataMaintenance.dataSource.interface import *
+from BaseDataMaintenance.common.Utils import *
+
+from tablestore import *
+from BaseDataMaintenance.dataSource.setttings import *
+from queue import Queue
+from BaseDataMaintenance.dataSource.source import getConnect_redis_baseline
+from BaseDataMaintenance.dataSource.pool import ConnectorPool
+from BaseDataMaintenance.common.multiThread import MultiThreadHandler
+import pandas as pd
+from BaseDataMaintenance.maintenance.dataflow_settings import *
+
+
+# flow_enterprise2redis_path = "/data/python/flow_enterprise2redis.log"
+log_enterprise2redis_create_time_path = "/data/python/enterprise2redis_createTime.log"
+
+
+# 线上流程 enterprise Redis表维护
+class enterprise2Redis():
+
+    def __init__(self):
+        self.ots_client = getConnect_ots()
+        self.recieviers = ["1175730271@qq.com","531870502@qq.com"]
+
+        self.list_proposed_count = []
+        self.current_path = os.path.dirname(__file__)
+
+    def cmd_execute(self,_cmd):
+        return os.popen(_cmd).read()
+
+    def get_last_tenmin_time(self,nums=15):
+        current_time = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
+
+        last_ten_minite_time = timeAdd(current_time,0,"%Y-%m-%d %H:%M:%S",-10)
+        return last_ten_minite_time[:nums]
+
+    @staticmethod
+    def update2redis(last_create_time, now_time):
+        if not last_create_time:
+            return 0, 0
+
+        def handle(item, result_queue):
+            _db = pool_db.getConnector()
+            try:
+                # item: (name,sign:0/1)
+                _db.set(item[0], item[1])
+            except Exception as e:
+                traceback.print_exc()
+            finally:
+                pool_db.putConnector(_db)
+
+        ots_client = getConnect_ots()
+        bool_query = BoolQuery(must_queries=[
+            RangeQuery("create_time", range_from=last_create_time, range_to=now_time),
+            RangeQuery("status", range_from=201, range_to=301),
+        ])
+        rows, next_token, total_count, is_all_succeed = ots_client.search("enterprise", "enterprise_index",
+                                                                          SearchQuery(bool_query, get_total_count=True),
+                                                                          columns_to_get=ColumnsToGet(
+                                                                              return_type=ColumnReturnType.NONE))
+
+        if total_count > 0:
+            column_list = ["nicknames", "history_names", 'tyc_id', 'legal_person', 'reg_capital', 'found_date',
+                           'credit_code', 'tax_number', 'reg_number', 'org_number']
+            all_rows = []
+            first_query = False
+            # 第一次查询
+            while not first_query:
+                try:
+                    rows, next_token, total_count, is_all_succeed = ots_client.search(
+                        "enterprise", "enterprise_index",
+                        SearchQuery(
+                            bool_query,
+                            limit=100,
+                            sort=Sort(sorters=[FieldSort('bidi_id', SortOrder.DESC)]),
+                            get_total_count=True),
+                            ColumnsToGet(return_type=ColumnReturnType.SPECIFIED,
+                                     column_names=column_list)
+                        )
+                    first_query = True
+                except:
+                    print("~ first_query ots error ~")
+            # next_token后续查询
+            if first_query:
+                while next_token:
+                    try:
+                        rows, next_token, total_count, is_all_succeed = ots_client.search(
+                            "enterprise", "enterprise_index",
+                            SearchQuery(
+                                bool_query,
+                                next_token=next_token,
+                                limit=100,
+                                get_total_count=True),
+                            ColumnsToGet(return_type=ColumnReturnType.SPECIFIED,
+                                         column_names=column_list)
+                        )
+                        all_rows.extend(rows)
+
+                    except:
+                        print("~ ots query error, try again ~")
+
+            # 清洗过滤
+            # legal_name_list = []
+            # not_legal_name_list = []
+            legal_name_num = 0
+            not_legal_name_num = 0
+            name_sign_list = []
+            for row in all_rows:
+                # 索引字段
+                index_field = row[1]
+                row_dict = dict((item[0], item[1]) for item in index_field)
+                name = ""
+                num = 0
+                for key,value in row_dict.items():
+                    if key=='nicknames':
+                        name = value
+                    else:
+                        if len(str(value).replace("-",""))>1:
+                            num += 1
+                isLegal = isLegalNewName(name)
+                if isLegal>=0:
+                    if num>=1 and len(name)>4:
+                        legal_name_num += 1
+                        name_sign_list.append((name,1))
+                # elif num>=1:
+                #     pass
+                # else:
+                #     not_legal_name_num += 1
+                #     # name_sign_list.append((name,0))
+                #     pass
+
+            pool_db = ConnectorPool(10, 30, getConnect_redis_baseline)
+            # _start_time = time.time()
+            task_queue = Queue()
+            for legal_name in name_sign_list:
+                task_queue.put(legal_name)
+                if task_queue.qsize() >= 100 * 10000:
+                    _mt = MultiThreadHandler(task_queue, handle, None, 30)
+                    _mt.run()
+            if task_queue.qsize() >= 0:
+                _mt = MultiThreadHandler(task_queue, handle, None, 30)
+                _mt.run()
+            return legal_name_num,not_legal_name_num
+
+        return 0,0
+
+    # Redis新增 合法实体和不合法实体
+    def monitor_enterprise2redis(self):
+        # ots_client = getConnect_ots()
+        log_path = log_enterprise2redis_create_time_path
+        all_log_msg = []
+        with open(log_path, mode='r', encoding='utf-8') as f:
+            all_log_msg = f.readlines()
+            all_log_msg.remove('\n')
+
+        last_create_time = ""
+        for _log in all_log_msg[::-1]:
+            if "success" in _log:
+                try:
+                    last_create_time = re.search('(?:process time:)(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})',_log).group(1)
+                    break
+                except:
+                    continue
+
+        now_time = time.time()-60*60 # now_time:前一小时
+        now_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(now_time))
+
+        legal_name_num,not_legal_name_num = self.update2redis(last_create_time=last_create_time,now_time=now_time)
+
+        last_process_time = now_time
+
+        if legal_name_num or not_legal_name_num:
+            log_msg = "%s Enterprise2redis status:success, add legal name num: %d, add not legal name num: %d, last process time:%s"%\
+                      ((time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()),legal_name_num,not_legal_name_num,last_process_time))
+        else:
+            log_msg = "%s Enterprise2redis status:failed, add legal name num: %d, add not legal name num: %d, last process time:%s" % \
+                      ((time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()),legal_name_num, not_legal_name_num, last_process_time))
+
+        # 限制写入log最大记录数量
+        all_log_msg = all_log_msg[-2000:]
+        all_log_msg.append(log_msg + '\n')
+        if all_log_msg:
+            with open(log_path,mode='w',encoding='utf-8') as f:
+                f.writelines(all_log_msg)
+
+        sentMsgToDD(log_msg , ACCESS_TOKEN_DATAWORKS)
+
+    # 修复(删除 或 置value为0)Redis中错误的数据
+    @staticmethod
+    def fix_up_redis(data_iter):
+        def handle(item, result_queue):
+            _db = pool_db.getConnector()
+            try:
+                _db.set(item, 0)
+            except Exception as e:
+                traceback.print_exc()
+            finally:
+                pool_db.putConnector(_db)
+        data_len = len(data_iter)
+        _start_time = time.time()
+        pool_db = ConnectorPool(10, 30, getConnect_redis_baseline)
+        task_queue = Queue()
+        # 置value为0
+        for key in data_iter:
+            task_queue.put(key)
+            if task_queue.qsize() >= 100 * 10000:
+                _mt = MultiThreadHandler(task_queue, handle, None, 30)
+                _mt.run()
+        if task_queue.qsize() >= 0:
+            _mt = MultiThreadHandler(task_queue, handle, None, 30)
+            _mt.run()
+        cost_time = time.time() - _start_time
+        log("Fix up %d not legal name sucess, cost time %s s"%(data_len,str(cost_time)))
+
+
+    def start_monitor(self):
+        #数据监控
+
+        scheduler = BlockingScheduler()
+
+        # scheduler.add_job(self.monitor_enterprise2redis,"cron",minute="*/10")
+        scheduler.add_job(self.monitor_enterprise2redis,"cron",hour="*/3")
+        scheduler.start()
+
+# 新实体合法判断
+def isLegalNewName(enterprise_name):
+    # head_character_list = ["[",'【',"(",'(']
+    # tail_character_list = ["]",'】',")",')']
+    # 名称开头判断
+    if re.search("^[\da-zA-Z][^\da-zA-Z]|"
+                 "^[^\da-zA-Z\u4e00-\u9fa5\[【((]|"
+                 "^[\[【((].{,1}[\]】))]|"
+                 "^[0〇]|"
+                 "^(20[0-2][0-9]|[0-2]?[0-9]年|[0-1]?[0-9]月|[0-3]?[0-9]日)",enterprise_name):
+        return -1
+    if len(re.findall("[\u4e00-\u9fa5]",enterprise_name))<2:
+        return -1
+    if re.search("╳|*|\*|×|xx|XX",enterprise_name):
+        return -1
+    if re.search("^(省|自治[县州区]|市|县|区|镇|乡|街道)",enterprise_name) and not re.search("^(镇江|乡宁|镇原|镇海|镇安|镇巴|镇坪|镇赉|镇康|镇沅|镇雄|镇远|镇宁|乡城|镇平|市中|市南|市北)",enterprise_name):
+        return -1
+    if re.search("\d{1,2}:\d{2}(:\d{2})?|(rar|xlsx|zip|png|jpg|swf|docx|txt|pdf|PDF|doc|xls|bmp|&?nbsp)",enterprise_name):
+        return -1
+    if re.search("(招标|代理)(人|机构)|联系(人|方式)|中标|评[标审选委]|候选|第.名|^(项目|业主)",enterprise_name):
+        return -1
+    if re.search("[a-zA-Z\d一二三四五六七八九十]{1,2}(包|标段?)|第.批",enterprise_name):
+        return 0
+    return 1
+
+
+
+if __name__ == '__main__':
+
+    # dm = BaseDataMonitor()
+    # # dm.start_monitor()
+    # log_filename = "C:\\Users\\Administrator\\Desktop\\flow_init_2023-02-03.log"
+    # dm.check_document_uuid(log_filename)
+
+    # sentMsgToDD("报警test_msg",ACCESS_TOKEN_DATAWORKS)
+    # dm.monitor_proposedBuilding()
+    # print(dm.get_last_tenmin_time(16))
+
+    # 第一次推送到Redis , create_time:2022-12-16
+    em = EnterpriseMonitor()
+    last_create_time = "2022-12-16 00:00:01"
+    now_time = time.time()
+    now_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(now_time))
+    legal_name_num,not_legal_name_num = em.update2redis(last_create_time=last_create_time,now_time=now_time)
+
+    pass
+
+