|
- import os,sys
- from apscheduler.schedulers.blocking import BlockingScheduler
- from BaseDataMaintenance.dataSource.source import getConnect_ots,getConnect_activateMQ
- from BaseDataMaintenance.dataSource.interface import *
- from BaseDataMaintenance.common.Utils import *
- from tablestore import *
- from BaseDataMaintenance.dataSource.setttings import *
- from queue import Queue
- from BaseDataMaintenance.dataSource.source import getConnect_redis_baseline
- from BaseDataMaintenance.dataSource.pool import ConnectorPool
- from BaseDataMaintenance.common.multiThread import MultiThreadHandler
- import pandas as pd
- from BaseDataMaintenance.maintenance.dataflow_settings import *
- from elasticsearch import Elasticsearch
- # flow_enterprise2redis_path = "/data/python/flow_enterprise2redis.log"
- log_enterprise2redis_create_time_path = "/data/python/enterprise2redis_createTime.log"
- # 线上流程 enterprise Redis表维护
- class enterprise2Redis():
- def __init__(self):
- self.ots_client = getConnect_ots()
- self.recieviers = ["1175730271@qq.com","531870502@qq.com"]
- self.list_proposed_count = []
- self.current_path = os.path.dirname(__file__)
- def cmd_execute(self,_cmd):
- return os.popen(_cmd).read()
- def get_last_tenmin_time(self,nums=15):
- current_time = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
- last_ten_minite_time = timeAdd(current_time,0,"%Y-%m-%d %H:%M:%S",-10)
- return last_ten_minite_time[:nums]
- @staticmethod
- def update2redis(last_create_time, now_time):
- if not last_create_time:
- return 0, 0
- def handle(item, result_queue):
- _db = pool_db.getConnector()
- try:
- # item: (name,sign:0/1)
- _db.set(item[0], item[1])
- except Exception as e:
- traceback.print_exc()
- finally:
- pool_db.putConnector(_db)
- # es_url = "http://es-cn-lbj3cjmy3000djxak.elasticsearch.aliyuncs.com" # 内网
- es_url = "http://es-cn-lbj3cjmy3000djxak.public.elasticsearch.aliyuncs.com" # 外网
- es_client = Elasticsearch([es_url],
- http_auth=('elastic', 'WWBu9#1HWHo$$gJm'),
- port=9200)
- body = {
- "_source": ["name", "history_names", 'legal_person', 'reg_capital', 'credit_code', 'tax_number',
- 'reg_number', 'org_number',
- "zhao_biao_number", "zhong_biao_number", "dai_li_number", "bid_number"],
- 'query': { # 查询命令
- "bool": {
- 'must': [
- {'range': {"update_time": {
- "gte": last_create_time, # >= 大于等于
- "lt": now_time # < 小于
- }}},
- # {'range': {"create_time": {
- # "gte": last_create_time, # >= 大于等于
- # "lt": now_time # < 小于
- # }}},
- {'range': {"status": {
- "gte": 201, # >= 大于等于
- "lt": 301 # < 小于
- }}}
- ]
- }
- },
- "sort": [
- {"create_time": "desc"}
- ]
- }
- query = es_client.search(index='enterprise', body=body, scroll='10m', size=5000)
- scroll_id = query['_scroll_id'] # 游标用于输出es查询出的所有结果
- query_result = query['hits']['hits']
- result = query_result
- while len(query_result) > 0:
- try:
- query_scroll = es_client.scroll(scroll_id=scroll_id, scroll='10m')
- scroll_id = query_scroll['_scroll_id']
- query_result = query_scroll['hits']['hits']
- if len(query_result) > 0:
- result += query_result
- else:
- break
- except:
- pass
- es_client.clear_scroll(scroll_id=scroll_id)
- legal_name_num = 0
- not_legal_name_num = 0
- add_redis_list = []
- for item in result:
- item = item['_source']
- name = item['name']
- history_names = item.get("history_names", "")
- legal_person = item.get("legal_person", "")
- reg_capital = item.get("reg_capital", "")
- credit_code = item.get("credit_code", "")
- tax_number = item.get("tax_number", "")
- reg_number = item.get("reg_number", "")
- org_number = item.get("org_number", "")
- zhao_biao_number = item.get("zhao_biao_number", 0)
- zhong_biao_number = item.get("zhong_biao_number", 0)
- dai_li_number = item.get("dai_li_number", 0)
- bid_number = item.get("bid_number", 0)
- num = 0
- for business in [history_names, legal_person, reg_capital, credit_code, tax_number, reg_number, org_number]:
- if len(str(business).replace("-", "")) > 1:
- num += 1
- isLegal = isLegalNewName(name)
- if isLegal >= 0:
- if num >= 1 and len(name) > 4:
- legal_name_num += 1
- _json = {"have_business": 1, "zhao_biao_number": zhao_biao_number,
- "zhong_biao_number": zhong_biao_number,
- "dai_li_number": dai_li_number, "bid_number": bid_number,
- "credit_code":credit_code}
- _json = json.dumps(_json, ensure_ascii=False)
- add_redis_list.append((name, _json))
- elif num == 0 and bid_number > 0 and len(name) > 4:
- legal_name_num += 1
- _json = {"have_business": 0, "zhao_biao_number": zhao_biao_number,
- "zhong_biao_number": zhong_biao_number,
- "dai_li_number": dai_li_number, "bid_number": bid_number,
- "credit_code":credit_code}
- _json = json.dumps(_json, ensure_ascii=False)
- add_redis_list.append((name, _json))
- pool_db = ConnectorPool(10, 30, getConnect_redis_baseline)
- # _start_time = time.time()
- task_queue = Queue()
- for legal_name in add_redis_list:
- task_queue.put(legal_name)
- if task_queue.qsize() >= 100 * 10000:
- _mt = MultiThreadHandler(task_queue, handle, None, 30)
- _mt.run()
- if task_queue.qsize() >= 0:
- _mt = MultiThreadHandler(task_queue, handle, None, 30)
- _mt.run()
- return legal_name_num, not_legal_name_num
- # Redis新增 合法实体和不合法实体
- def monitor_enterprise2redis(self):
- # ots_client = getConnect_ots()
- log_path = log_enterprise2redis_create_time_path
- all_log_msg = []
- with open(log_path, mode='r', encoding='utf-8') as f:
- all_log_msg = f.readlines()
- # all_log_msg.remove('\n')
- if all_log_msg[-1]=='\n':
- all_log_msg = all_log_msg[:-1]
- last_create_time = ""
- for _log in all_log_msg[::-1]:
- if "success" in _log:
- try:
- last_create_time = re.search('(?:process time:)(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})',_log).group(1)
- break
- except:
- continue
- now_time = time.time()-60*60 # now_time:前一小时
- now_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(now_time))
- legal_name_num,not_legal_name_num = self.update2redis(last_create_time=last_create_time,now_time=now_time)
- last_process_time = now_time
- if legal_name_num or not_legal_name_num:
- log_msg = "%s Enterprise2redis status:success, add legal name num: %d, add not legal name num: %d, last process time:%s"%\
- ((time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()),legal_name_num,not_legal_name_num,last_process_time))
- else:
- log_msg = "%s Enterprise2redis status:failed, add legal name num: %d, add not legal name num: %d, last process time:%s" % \
- ((time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()),legal_name_num, not_legal_name_num, last_process_time))
- log(log_msg)
- # 限制写入log最大记录数量
- all_log_msg = all_log_msg[-2000:]
- all_log_msg.append(log_msg + '\n')
- if all_log_msg:
- with open(log_path,mode='w',encoding='utf-8') as f:
- f.writelines(all_log_msg)
- sentMsgToDD(log_msg , ACCESS_TOKEN_DATAWORKS)
- # 修复(删除 或 置value为0)Redis中错误的数据
- @staticmethod
- def fix_up_redis(data_iter):
- def handle(item, result_queue):
- _db = pool_db.getConnector()
- try:
- # _db.set(item, 0)
- _db.delete(item)
- except Exception as e:
- traceback.print_exc()
- finally:
- pool_db.putConnector(_db)
- data_len = len(data_iter)
- _start_time = time.time()
- pool_db = ConnectorPool(10, 30, getConnect_redis_baseline)
- task_queue = Queue()
- # 置value为0
- for key in data_iter:
- task_queue.put(key)
- if task_queue.qsize() >= 100 * 10000:
- _mt = MultiThreadHandler(task_queue, handle, None, 30)
- _mt.run()
- if task_queue.qsize() >= 0:
- _mt = MultiThreadHandler(task_queue, handle, None, 30)
- _mt.run()
- cost_time = time.time() - _start_time
- log("Fix up %d not legal name sucess, cost time %s s"%(data_len,str(cost_time)))
- def start_monitor(self):
- #数据监控
- scheduler = BlockingScheduler()
- # self.monitor_enterprise2redis()
- # scheduler.add_job(self.monitor_enterprise2redis,"cron",minute="*/10")
- scheduler.add_job(self.monitor_enterprise2redis,"cron",hour="*/3")
- scheduler.start()
- # 新实体合法判断
- def isLegalNewName(enterprise_name):
- # 名称开头判断
- if re.search("^[\da-zA-Z][^\da-zA-Z]|"
- "^[^\da-zA-Z\u4e00-\u9fa5\[【((]|"
- "^[\[【((].{,1}[\]】))]|"
- "^[0〇]|"
- "^(20[0-2][0-9]|[0-2]?[0-9]年|[0-1]?[0-9]月|[0-3]?[0-9]日)", enterprise_name):
- return -1
- if len(re.findall("[\u4e00-\u9fa5]", enterprise_name)) < 2:
- return -1
- if len(re.findall("[\u4e00-\u9fa5]", enterprise_name)) / len(enterprise_name) < 0.5:
- return -1
- if re.search("╳|*|\*|×|xx|XX|\s", enterprise_name):
- return -1
- if re.search("[区市镇乡县洲州路街]$", enterprise_name) and not re.search("(超市|门市|保护区|园区|景区|校区|社区|服务区|工区|小区|集市|花市|夜市|学区|旅游区|矿区|林区|度假区|示范区|菜市)$", enterprise_name):
- return -1
- if re.search("^个人|^个体|测试$", enterprise_name):
- return -1
- if re.search("个人|个体", enterprise_name):
- _split = re.split("个人|个体", enterprise_name)
- if len(_split[0]) <= 5:
- return -1
- if re.search("测试", enterprise_name) and len(enterprise_name) < 8:
- return -1
- if re.search("^(省|自治[县州区]|市|县|区|镇|乡|街道)", enterprise_name) and not re.search(
- "^(镇江|乡宁|镇原|镇海|镇安|镇巴|镇坪|镇赉|镇康|镇沅|镇雄|镇远|镇宁|乡城|镇平|市中|市南|市北)", enterprise_name):
- return -1
- if re.search("\d{1,2}:\d{2}(:\d{2})?|(rar|xlsx|zip|png|jpg|swf|docx|txt|pdf|PDF|doc|xls|bmp|&?nbsp)",
- enterprise_name):
- return -1
- if re.search("(招标|代理)(人|机构)|联系(人|方式)|中标|候选|第.名|^(项目|业主)", enterprise_name):
- return -1
- if re.search("评[标选委审]", enterprise_name) and not re.search("评[标选委审].{0,2}中心", enterprise_name):
- return -1
- if re.search("[a-zA-Z\d一二三四五六七八九十]{1,2}(标段?)|第.批$", enterprise_name):
- return 0
- return 1
- if __name__ == '__main__':
- # dm = BaseDataMonitor()
- # # dm.start_monitor()
- # log_filename = "C:\\Users\\Administrator\\Desktop\\flow_init_2023-02-03.log"
- # dm.check_document_uuid(log_filename)
- # sentMsgToDD("报警test_msg",ACCESS_TOKEN_DATAWORKS)
- # dm.monitor_proposedBuilding()
- # print(dm.get_last_tenmin_time(16))
- # 第一次推送到Redis , create_time:2022-12-16
- # em = EnterpriseMonitor()
- # last_create_time = "2022-12-16 00:00:01"
- # now_time = time.time()
- # now_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(now_time))
- # legal_name_num,not_legal_name_num = em.update2redis(last_create_time=last_create_time,now_time=now_time)
- # 删除线上实体
- # _enterprise2Redis = enterprise2Redis()
- # drop_list = ["个体工商户"]
- # _enterprise2Redis.fix_up_redis(drop_list)
- # e = enterprise2Redis()
- # e.monitor_enterprise2redis()
- pass
|