enterprise2Redis.py 13 KB


  1. import os,sys
  2. from apscheduler.schedulers.blocking import BlockingScheduler
  3. from BaseDataMaintenance.dataSource.source import getConnect_ots,getConnect_activateMQ
  4. from BaseDataMaintenance.dataSource.interface import *
  5. from BaseDataMaintenance.common.Utils import *
  6. from tablestore import *
  7. from BaseDataMaintenance.dataSource.setttings import *
  8. from queue import Queue
  9. from BaseDataMaintenance.dataSource.source import getConnect_redis_baseline
  10. from BaseDataMaintenance.dataSource.pool import ConnectorPool
  11. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  12. import pandas as pd
  13. from BaseDataMaintenance.maintenance.dataflow_settings import *
  14. from elasticsearch import Elasticsearch
  15. # flow_enterprise2redis_path = "/data/python/flow_enterprise2redis.log"
  16. log_enterprise2redis_create_time_path = "/data/python/enterprise2redis_createTime.log"
  17. # 线上流程 enterprise Redis表维护
  18. class enterprise2Redis():
  19. def __init__(self):
  20. self.ots_client = getConnect_ots()
  21. self.recieviers = ["1175730271@qq.com","531870502@qq.com"]
  22. self.list_proposed_count = []
  23. self.current_path = os.path.dirname(__file__)
  24. def cmd_execute(self,_cmd):
  25. return os.popen(_cmd).read()
  26. def get_last_tenmin_time(self,nums=15):
  27. current_time = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
  28. last_ten_minite_time = timeAdd(current_time,0,"%Y-%m-%d %H:%M:%S",-10)
  29. return last_ten_minite_time[:nums]
  30. @staticmethod
  31. def update2redis(last_create_time, now_time):
  32. if not last_create_time:
  33. return 0, 0
  34. def handle(item, result_queue):
  35. _db = pool_db.getConnector()
  36. try:
  37. # item: (name,sign:0/1)
  38. _db.set(item[0], item[1])
  39. except Exception as e:
  40. traceback.print_exc()
  41. finally:
  42. pool_db.putConnector(_db)
  43. # es_url = "http://es-cn-lbj3cjmy3000djxak.elasticsearch.aliyuncs.com" # 内网
  44. es_url = "http://es-cn-lbj3cjmy3000djxak.public.elasticsearch.aliyuncs.com" # 外网
  45. es_client = Elasticsearch([es_url],
  46. http_auth=('elastic', 'WWBu9#1HWHo$$gJm'),
  47. port=9200)
  48. body = {
  49. "_source": ["name", "history_names", 'legal_person', 'reg_capital', 'credit_code', 'tax_number',
  50. 'reg_number', 'org_number',
  51. "zhao_biao_number", "zhong_biao_number", "dai_li_number", "bid_number"],
  52. 'query': { # 查询命令
  53. "bool": {
  54. 'must': [
  55. {'range': {"update_time": {
  56. "gte": last_create_time, # >= 大于等于
  57. "lt": now_time # < 小于
  58. }}},
  59. # {'range': {"create_time": {
  60. # "gte": last_create_time, # >= 大于等于
  61. # "lt": now_time # < 小于
  62. # }}},
  63. {'range': {"status": {
  64. "gte": 201, # >= 大于等于
  65. "lt": 301 # < 小于
  66. }}}
  67. ]
  68. }
  69. },
  70. "sort": [
  71. {"create_time": "desc"}
  72. ]
  73. }
  74. query = es_client.search(index='enterprise', body=body, scroll='10m', size=5000)
  75. scroll_id = query['_scroll_id'] # 游标用于输出es查询出的所有结果
  76. query_result = query['hits']['hits']
  77. result = query_result
  78. while len(query_result) > 0:
  79. try:
  80. query_scroll = es_client.scroll(scroll_id=scroll_id, scroll='10m')
  81. scroll_id = query_scroll['_scroll_id']
  82. query_result = query_scroll['hits']['hits']
  83. if len(query_result) > 0:
  84. result += query_result
  85. else:
  86. break
  87. except:
  88. pass
  89. es_client.clear_scroll(scroll_id=scroll_id)
  90. legal_name_num = 0
  91. not_legal_name_num = 0
  92. add_redis_list = []
  93. for item in result:
  94. item = item['_source']
  95. name = item['name']
  96. history_names = item.get("history_names", "")
  97. legal_person = item.get("legal_person", "")
  98. reg_capital = item.get("reg_capital", "")
  99. credit_code = item.get("credit_code", "")
  100. tax_number = item.get("tax_number", "")
  101. reg_number = item.get("reg_number", "")
  102. org_number = item.get("org_number", "")
  103. zhao_biao_number = item.get("zhao_biao_number", 0)
  104. zhong_biao_number = item.get("zhong_biao_number", 0)
  105. dai_li_number = item.get("dai_li_number", 0)
  106. bid_number = item.get("bid_number", 0)
  107. num = 0
  108. for business in [history_names, legal_person, reg_capital, credit_code, tax_number, reg_number, org_number]:
  109. if len(str(business).replace("-", "")) > 1:
  110. num += 1
  111. isLegal = isLegalNewName(name)
  112. if isLegal >= 0:
  113. if num >= 1 and len(name) > 4:
  114. legal_name_num += 1
  115. _json = {"have_business": 1, "zhao_biao_number": zhao_biao_number,
  116. "zhong_biao_number": zhong_biao_number,
  117. "dai_li_number": dai_li_number, "bid_number": bid_number,
  118. "credit_code":credit_code}
  119. _json = json.dumps(_json, ensure_ascii=False)
  120. add_redis_list.append((name, _json))
  121. elif num == 0 and bid_number > 0 and len(name) > 4:
  122. legal_name_num += 1
  123. _json = {"have_business": 0, "zhao_biao_number": zhao_biao_number,
  124. "zhong_biao_number": zhong_biao_number,
  125. "dai_li_number": dai_li_number, "bid_number": bid_number,
  126. "credit_code":credit_code}
  127. _json = json.dumps(_json, ensure_ascii=False)
  128. add_redis_list.append((name, _json))
  129. pool_db = ConnectorPool(10, 30, getConnect_redis_baseline)
  130. # _start_time = time.time()
  131. task_queue = Queue()
  132. for legal_name in add_redis_list:
  133. task_queue.put(legal_name)
  134. if task_queue.qsize() >= 100 * 10000:
  135. _mt = MultiThreadHandler(task_queue, handle, None, 30)
  136. _mt.run()
  137. if task_queue.qsize() >= 0:
  138. _mt = MultiThreadHandler(task_queue, handle, None, 30)
  139. _mt.run()
  140. return legal_name_num, not_legal_name_num
  141. # Redis新增 合法实体和不合法实体
  142. def monitor_enterprise2redis(self):
  143. # ots_client = getConnect_ots()
  144. log_path = log_enterprise2redis_create_time_path
  145. all_log_msg = []
  146. with open(log_path, mode='r', encoding='utf-8') as f:
  147. all_log_msg = f.readlines()
  148. # all_log_msg.remove('\n')
  149. if all_log_msg[-1]=='\n':
  150. all_log_msg = all_log_msg[:-1]
  151. last_create_time = ""
  152. for _log in all_log_msg[::-1]:
  153. if "success" in _log:
  154. try:
  155. last_create_time = re.search('(?:process time:)(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})',_log).group(1)
  156. break
  157. except:
  158. continue
  159. now_time = time.time()-60*60 # now_time:前一小时
  160. now_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(now_time))
  161. legal_name_num,not_legal_name_num = self.update2redis(last_create_time=last_create_time,now_time=now_time)
  162. last_process_time = now_time
  163. if legal_name_num or not_legal_name_num:
  164. log_msg = "%s Enterprise2redis status:success, add legal name num: %d, add not legal name num: %d, last process time:%s"%\
  165. ((time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()),legal_name_num,not_legal_name_num,last_process_time))
  166. else:
  167. log_msg = "%s Enterprise2redis status:failed, add legal name num: %d, add not legal name num: %d, last process time:%s" % \
  168. ((time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()),legal_name_num, not_legal_name_num, last_process_time))
  169. log(log_msg)
  170. # 限制写入log最大记录数量
  171. all_log_msg = all_log_msg[-2000:]
  172. all_log_msg.append(log_msg + '\n')
  173. if all_log_msg:
  174. with open(log_path,mode='w',encoding='utf-8') as f:
  175. f.writelines(all_log_msg)
  176. sentMsgToDD(log_msg , ACCESS_TOKEN_DATAWORKS)
  177. # 修复(删除 或 置value为0)Redis中错误的数据
  178. @staticmethod
  179. def fix_up_redis(data_iter):
  180. def handle(item, result_queue):
  181. _db = pool_db.getConnector()
  182. try:
  183. # _db.set(item, 0)
  184. _db.delete(item)
  185. except Exception as e:
  186. traceback.print_exc()
  187. finally:
  188. pool_db.putConnector(_db)
  189. data_len = len(data_iter)
  190. _start_time = time.time()
  191. pool_db = ConnectorPool(10, 30, getConnect_redis_baseline)
  192. task_queue = Queue()
  193. # 置value为0
  194. for key in data_iter:
  195. task_queue.put(key)
  196. if task_queue.qsize() >= 100 * 10000:
  197. _mt = MultiThreadHandler(task_queue, handle, None, 30)
  198. _mt.run()
  199. if task_queue.qsize() >= 0:
  200. _mt = MultiThreadHandler(task_queue, handle, None, 30)
  201. _mt.run()
  202. cost_time = time.time() - _start_time
  203. log("Fix up %d not legal name sucess, cost time %s s"%(data_len,str(cost_time)))
  204. def start_monitor(self):
  205. #数据监控
  206. scheduler = BlockingScheduler()
  207. # self.monitor_enterprise2redis()
  208. # scheduler.add_job(self.monitor_enterprise2redis,"cron",minute="*/10")
  209. scheduler.add_job(self.monitor_enterprise2redis,"cron",hour="*/3")
  210. scheduler.start()
  211. # 新实体合法判断
  212. def isLegalNewName(enterprise_name):
  213. # 名称开头判断
  214. if re.search("^[\da-zA-Z][^\da-zA-Z]|"
  215. "^[^\da-zA-Z\u4e00-\u9fa5\[【((]|"
  216. "^[\[【((].{,1}[\]】))]|"
  217. "^[0〇]|"
  218. "^(20[0-2][0-9]|[0-2]?[0-9]年|[0-1]?[0-9]月|[0-3]?[0-9]日)", enterprise_name):
  219. return -1
  220. if len(re.findall("[\u4e00-\u9fa5]", enterprise_name)) < 2:
  221. return -1
  222. if len(re.findall("[\u4e00-\u9fa5]", enterprise_name)) / len(enterprise_name) < 0.5:
  223. return -1
  224. if re.search("╳|*|\*|×|xx|XX|\s", enterprise_name):
  225. return -1
  226. if re.search("[区市镇乡县洲州路街]$", enterprise_name) and not re.search("(超市|门市|保护区|园区|景区|校区|社区|服务区|工区|小区|集市|花市|夜市|学区|旅游区|矿区|林区|度假区|示范区|菜市)$", enterprise_name):
  227. return -1
  228. if re.search("^个人|^个体|测试$", enterprise_name):
  229. return -1
  230. if re.search("个人|个体", enterprise_name):
  231. _split = re.split("个人|个体", enterprise_name)
  232. if len(_split[0]) <= 5:
  233. return -1
  234. if re.search("测试", enterprise_name) and len(enterprise_name) < 8:
  235. return -1
  236. if re.search("^(省|自治[县州区]|市|县|区|镇|乡|街道)", enterprise_name) and not re.search(
  237. "^(镇江|乡宁|镇原|镇海|镇安|镇巴|镇坪|镇赉|镇康|镇沅|镇雄|镇远|镇宁|乡城|镇平|市中|市南|市北)", enterprise_name):
  238. return -1
  239. if re.search("\d{1,2}:\d{2}(:\d{2})?|(rar|xlsx|zip|png|jpg|swf|docx|txt|pdf|PDF|doc|xls|bmp|&?nbsp)",
  240. enterprise_name):
  241. return -1
  242. if re.search("(招标|代理)(人|机构)|联系(人|方式)|中标|候选|第.名|^(项目|业主)", enterprise_name):
  243. return -1
  244. if re.search("评[标选委审]", enterprise_name) and not re.search("评[标选委审].{0,2}中心", enterprise_name):
  245. return -1
  246. if re.search("[a-zA-Z\d一二三四五六七八九十]{1,2}(标段?)|第.批$", enterprise_name):
  247. return 0
  248. return 1
  249. if __name__ == '__main__':
  250. # dm = BaseDataMonitor()
  251. # # dm.start_monitor()
  252. # log_filename = "C:\\Users\\Administrator\\Desktop\\flow_init_2023-02-03.log"
  253. # dm.check_document_uuid(log_filename)
  254. # sentMsgToDD("报警test_msg",ACCESS_TOKEN_DATAWORKS)
  255. # dm.monitor_proposedBuilding()
  256. # print(dm.get_last_tenmin_time(16))
  257. # 第一次推送到Redis , create_time:2022-12-16
  258. # em = EnterpriseMonitor()
  259. # last_create_time = "2022-12-16 00:00:01"
  260. # now_time = time.time()
  261. # now_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(now_time))
  262. # legal_name_num,not_legal_name_num = em.update2redis(last_create_time=last_create_time,now_time=now_time)
  263. # 删除线上实体
  264. # _enterprise2Redis = enterprise2Redis()
  265. # drop_list = ["个体工商户"]
  266. # _enterprise2Redis.fix_up_redis(drop_list)
  267. # e = enterprise2Redis()
  268. # e.monitor_enterprise2redis()
  269. pass