enterprise2Redis.py 13 KB


  1. import os,sys
  2. from apscheduler.schedulers.blocking import BlockingScheduler
  3. from BaseDataMaintenance.dataSource.source import getConnect_ots,getConnect_activateMQ
  4. from BaseDataMaintenance.dataSource.interface import *
  5. from BaseDataMaintenance.common.Utils import *
  6. from tablestore import *
  7. from BaseDataMaintenance.dataSource.setttings import *
  8. from queue import Queue
  9. from BaseDataMaintenance.dataSource.source import getConnect_redis_baseline
  10. from BaseDataMaintenance.dataSource.pool import ConnectorPool
  11. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  12. import pandas as pd
  13. from BaseDataMaintenance.maintenance.dataflow_settings import *
  14. from elasticsearch import Elasticsearch
  15. # flow_enterprise2redis_path = "/data/python/flow_enterprise2redis.log"
  16. log_enterprise2redis_create_time_path = "/data/python/enterprise2redis_createTime.log"
  17. # 线上流程 enterprise Redis表维护
  18. class enterprise2Redis():
  19. def __init__(self):
  20. self.ots_client = getConnect_ots()
  21. self.recieviers = ["1175730271@qq.com","531870502@qq.com"]
  22. self.list_proposed_count = []
  23. self.current_path = os.path.dirname(__file__)
  24. def cmd_execute(self,_cmd):
  25. return os.popen(_cmd).read()
  26. def get_last_tenmin_time(self,nums=15):
  27. current_time = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
  28. last_ten_minite_time = timeAdd(current_time,0,"%Y-%m-%d %H:%M:%S",-10)
  29. return last_ten_minite_time[:nums]
  30. @staticmethod
  31. def update2redis(last_create_time, now_time):
  32. if not last_create_time:
  33. return 0, 0
  34. def handle(item, result_queue):
  35. _db = pool_db.getConnector()
  36. try:
  37. # item: (name,sign:0/1)
  38. _db.set(item[0], item[1])
  39. except Exception as e:
  40. traceback.print_exc()
  41. finally:
  42. pool_db.putConnector(_db)
  43. # es_url = "http://es-cn-lbj3cjmy3000djxak.elasticsearch.aliyuncs.com" # 内网
  44. es_url = "http://es-cn-lbj3cjmy3000djxak.public.elasticsearch.aliyuncs.com" # 外网
  45. es_client = Elasticsearch([es_url],
  46. http_auth=('elastic', 'WWBu9#1HWHo$$gJm'),
  47. port=9200)
  48. body = {
  49. "_source": ["name", "history_names", 'legal_person', 'reg_capital', 'credit_code', 'tax_number',
  50. 'reg_number', 'org_number',
  51. "zhao_biao_number", "zhong_biao_number", "dai_li_number", "bid_number"],
  52. 'query': { # 查询命令
  53. "bool": {
  54. 'must': [
  55. {'range': {"update_time": {
  56. "gte": last_create_time, # >= 大于等于
  57. "lt": now_time # < 小于
  58. }}},
  59. # {'range': {"create_time": {
  60. # "gte": last_create_time, # >= 大于等于
  61. # "lt": now_time # < 小于
  62. # }}},
  63. {'range': {"status": {
  64. "gte": 201, # >= 大于等于
  65. "lt": 301 # < 小于
  66. }}}
  67. ]
  68. }
  69. },
  70. "sort": [
  71. {"create_time": "desc"}
  72. ]
  73. }
  74. query = es_client.search(index='enterprise', body=body, scroll='10m', size=5000)
  75. scroll_id = query['_scroll_id'] # 游标用于输出es查询出的所有结果
  76. query_result = query['hits']['hits']
  77. result = query_result
  78. while len(query_result) > 0:
  79. try:
  80. query_scroll = es_client.scroll(scroll_id=scroll_id, scroll='10m')
  81. scroll_id = query_scroll['_scroll_id']
  82. query_result = query_scroll['hits']['hits']
  83. if len(query_result) > 0:
  84. result += query_result
  85. else:
  86. break
  87. except:
  88. pass
  89. es_client.clear_scroll(scroll_id=scroll_id)
  90. legal_name_num = 0
  91. not_legal_name_num = 0
  92. add_redis_list = []
  93. for item in result:
  94. item = item['_source']
  95. name = item['name']
  96. history_names = item.get("history_names", "")
  97. legal_person = item.get("legal_person", "")
  98. reg_capital = item.get("reg_capital", "")
  99. credit_code = item.get("credit_code", "")
  100. credit_code = credit_code if len(credit_code)>2 else ""
  101. tax_number = item.get("tax_number", "")
  102. reg_number = item.get("reg_number", "")
  103. org_number = item.get("org_number", "")
  104. zhao_biao_number = item.get("zhao_biao_number", 0)
  105. zhong_biao_number = item.get("zhong_biao_number", 0)
  106. dai_li_number = item.get("dai_li_number", 0)
  107. bid_number = item.get("bid_number", 0)
  108. num = 0
  109. for business in [history_names, legal_person, reg_capital, credit_code, tax_number, reg_number, org_number]:
  110. if len(str(business).replace("-", "")) > 1:
  111. num += 1
  112. isLegal = isLegalNewName(name)
  113. if isLegal >= 0:
  114. if num >= 1 and len(name) > 4:
  115. legal_name_num += 1
  116. _json = {"have_business": 1, "zhao_biao_number": zhao_biao_number,
  117. "zhong_biao_number": zhong_biao_number,
  118. "dai_li_number": dai_li_number, "bid_number": bid_number,
  119. "credit_code":credit_code}
  120. _json = json.dumps(_json, ensure_ascii=False)
  121. add_redis_list.append((name, _json))
  122. elif num == 0 and bid_number > 0 and len(name) > 4:
  123. legal_name_num += 1
  124. _json = {"have_business": 0, "zhao_biao_number": zhao_biao_number,
  125. "zhong_biao_number": zhong_biao_number,
  126. "dai_li_number": dai_li_number, "bid_number": bid_number,
  127. "credit_code":credit_code}
  128. _json = json.dumps(_json, ensure_ascii=False)
  129. add_redis_list.append((name, _json))
  130. pool_db = ConnectorPool(10, 30, getConnect_redis_baseline)
  131. # _start_time = time.time()
  132. task_queue = Queue()
  133. for legal_name in add_redis_list:
  134. task_queue.put(legal_name)
  135. if task_queue.qsize() >= 100 * 10000:
  136. _mt = MultiThreadHandler(task_queue, handle, None, 30)
  137. _mt.run()
  138. if task_queue.qsize() >= 0:
  139. _mt = MultiThreadHandler(task_queue, handle, None, 30)
  140. _mt.run()
  141. return legal_name_num, not_legal_name_num
  142. # Redis新增 合法实体和不合法实体
  143. def monitor_enterprise2redis(self):
  144. # ots_client = getConnect_ots()
  145. log_path = log_enterprise2redis_create_time_path
  146. all_log_msg = []
  147. with open(log_path, mode='r', encoding='utf-8') as f:
  148. all_log_msg = f.readlines()
  149. # all_log_msg.remove('\n')
  150. if all_log_msg[-1]=='\n':
  151. all_log_msg = all_log_msg[:-1]
  152. last_create_time = ""
  153. for _log in all_log_msg[::-1]:
  154. if "success" in _log:
  155. try:
  156. last_create_time = re.search('(?:process time:)(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})',_log).group(1)
  157. break
  158. except:
  159. continue
  160. now_time = time.time()-60*60 # now_time:前一小时
  161. now_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(now_time))
  162. legal_name_num,not_legal_name_num = self.update2redis(last_create_time=last_create_time,now_time=now_time)
  163. last_process_time = now_time
  164. if legal_name_num or not_legal_name_num:
  165. log_msg = "%s Enterprise2redis status:success, add legal name num: %d, add not legal name num: %d, last process time:%s"%\
  166. ((time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()),legal_name_num,not_legal_name_num,last_process_time))
  167. else:
  168. log_msg = "%s Enterprise2redis status:failed, add legal name num: %d, add not legal name num: %d, last process time:%s" % \
  169. ((time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()),legal_name_num, not_legal_name_num, last_process_time))
  170. log(log_msg)
  171. # 限制写入log最大记录数量
  172. all_log_msg = all_log_msg[-2000:]
  173. all_log_msg.append(log_msg + '\n')
  174. if all_log_msg:
  175. with open(log_path,mode='w',encoding='utf-8') as f:
  176. f.writelines(all_log_msg)
  177. sentMsgToDD(log_msg , ACCESS_TOKEN_DATAWORKS)
  178. # 修复(删除 或 置value为0)Redis中错误的数据
  179. @staticmethod
  180. def fix_up_redis(data_iter):
  181. def handle(item, result_queue):
  182. _db = pool_db.getConnector()
  183. try:
  184. # _db.set(item, 0)
  185. _db.delete(item)
  186. except Exception as e:
  187. traceback.print_exc()
  188. finally:
  189. pool_db.putConnector(_db)
  190. data_len = len(data_iter)
  191. _start_time = time.time()
  192. pool_db = ConnectorPool(10, 30, getConnect_redis_baseline)
  193. task_queue = Queue()
  194. # 置value为0
  195. for key in data_iter:
  196. task_queue.put(key)
  197. if task_queue.qsize() >= 100 * 10000:
  198. _mt = MultiThreadHandler(task_queue, handle, None, 30)
  199. _mt.run()
  200. if task_queue.qsize() >= 0:
  201. _mt = MultiThreadHandler(task_queue, handle, None, 30)
  202. _mt.run()
  203. cost_time = time.time() - _start_time
  204. log("Fix up %d not legal name sucess, cost time %s s"%(data_len,str(cost_time)))
  205. def start_monitor(self):
  206. #数据监控
  207. scheduler = BlockingScheduler()
  208. # self.monitor_enterprise2redis()
  209. # scheduler.add_job(self.monitor_enterprise2redis,"cron",minute="*/10")
  210. scheduler.add_job(self.monitor_enterprise2redis,"cron",hour="*/3")
  211. scheduler.start()
  212. # 新实体合法判断
  213. def isLegalNewName(enterprise_name):
  214. # 名称开头判断
  215. if re.search("^[\da-zA-Z][^\da-zA-Z]|"
  216. "^[^\da-zA-Z\u4e00-\u9fa5\[【((]|"
  217. "^[\[【((].{,1}[\]】))]|"
  218. "^[0〇]|"
  219. "^(20[0-2][0-9]|[0-2]?[0-9]年|[0-1]?[0-9]月|[0-3]?[0-9]日)", enterprise_name):
  220. return -1
  221. if len(re.findall("[\u4e00-\u9fa5]", enterprise_name)) < 2:
  222. return -1
  223. if len(re.findall("[\u4e00-\u9fa5]", enterprise_name)) / len(enterprise_name) < 0.5:
  224. return -1
  225. if re.search("╳|*|\*|×|xx|XX|\s", enterprise_name):
  226. return -1
  227. if re.search("[区市镇乡县洲州路街]$", enterprise_name) and not re.search("(超市|门市|保护区|园区|景区|校区|社区|服务区|工区|小区|集市|花市|夜市|学区|旅游区|矿区|林区|度假区|示范区|菜市)$", enterprise_name):
  228. return -1
  229. if re.search("^个人|^个体|测试$", enterprise_name):
  230. return -1
  231. if re.search("个人|个体", enterprise_name):
  232. _split = re.split("个人|个体", enterprise_name)
  233. if len(_split[0]) <= 5:
  234. return -1
  235. if re.search("测试", enterprise_name) and len(enterprise_name) < 8:
  236. return -1
  237. if re.search("^(省|自治[县州区]|市|县|区|镇|乡|街道)", enterprise_name) and not re.search(
  238. "^(镇江|乡宁|镇原|镇海|镇安|镇巴|镇坪|镇赉|镇康|镇沅|镇雄|镇远|镇宁|乡城|镇平|市中|市南|市北)", enterprise_name):
  239. return -1
  240. if re.search("\d{1,2}:\d{2}(:\d{2})?|(rar|xlsx|zip|png|jpg|swf|docx|txt|pdf|PDF|doc|xls|bmp|&?nbsp)",
  241. enterprise_name):
  242. return -1
  243. if re.search("(招标|代理)(人|机构)|联系(人|方式)|中标|候选|第.名|^(项目|业主)", enterprise_name):
  244. return -1
  245. if re.search("评[标选委审]", enterprise_name) and not re.search("评[标选委审].{0,2}中心", enterprise_name):
  246. return -1
  247. if re.search("[a-zA-Z\d一二三四五六七八九十]{1,2}(标段?)|第.批$", enterprise_name):
  248. return 0
  249. return 1
  250. if __name__ == '__main__':
  251. # dm = BaseDataMonitor()
  252. # # dm.start_monitor()
  253. # log_filename = "C:\\Users\\Administrator\\Desktop\\flow_init_2023-02-03.log"
  254. # dm.check_document_uuid(log_filename)
  255. # sentMsgToDD("报警test_msg",ACCESS_TOKEN_DATAWORKS)
  256. # dm.monitor_proposedBuilding()
  257. # print(dm.get_last_tenmin_time(16))
  258. # 第一次推送到Redis , create_time:2022-12-16
  259. # em = EnterpriseMonitor()
  260. # last_create_time = "2022-12-16 00:00:01"
  261. # now_time = time.time()
  262. # now_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(now_time))
  263. # legal_name_num,not_legal_name_num = em.update2redis(last_create_time=last_create_time,now_time=now_time)
  264. # 删除线上实体
  265. # _enterprise2Redis = enterprise2Redis()
  266. # drop_list = ["个体工商户"]
  267. # _enterprise2Redis.fix_up_redis(drop_list)
  268. # e = enterprise2Redis()
  269. # e.monitor_enterprise2redis()
  270. pass