|
@@ -17,12 +17,13 @@ from BaseDataMaintenance.dataSource.pool import ConnectorPool
|
|
|
from BaseDataMaintenance.common.multiThread import MultiThreadHandler
|
|
|
import pandas as pd
|
|
|
from BaseDataMaintenance.maintenance.dataflow_settings import *
|
|
|
-
|
|
|
+from elasticsearch import Elasticsearch
|
|
|
|
|
|
# flow_enterprise2redis_path = "/data/python/flow_enterprise2redis.log"
|
|
|
log_enterprise2redis_create_time_path = "/data/python/enterprise2redis_createTime.log"
|
|
|
|
|
|
|
|
|
+
|
|
|
# 线上流程 enterprise Redis表维护
|
|
|
class enterprise2Redis():
|
|
|
|
|
@@ -57,100 +58,108 @@ class enterprise2Redis():
|
|
|
finally:
|
|
|
pool_db.putConnector(_db)
|
|
|
|
|
|
- ots_client = getConnect_ots()
|
|
|
- bool_query = BoolQuery(must_queries=[
|
|
|
- RangeQuery("create_time", range_from=last_create_time, range_to=now_time),
|
|
|
- RangeQuery("status", range_from=201, range_to=301),
|
|
|
- ])
|
|
|
- rows, next_token, total_count, is_all_succeed = ots_client.search("enterprise", "enterprise_index",
|
|
|
- SearchQuery(bool_query, get_total_count=True),
|
|
|
- columns_to_get=ColumnsToGet(
|
|
|
- return_type=ColumnReturnType.NONE))
|
|
|
-
|
|
|
- if total_count > 0:
|
|
|
- column_list = ["nicknames", "history_names", 'tyc_id', 'legal_person', 'reg_capital', 'found_date',
|
|
|
- 'credit_code', 'tax_number', 'reg_number', 'org_number']
|
|
|
- all_rows = []
|
|
|
- first_query = False
|
|
|
- # 第一次查询
|
|
|
- while not first_query:
|
|
|
- try:
|
|
|
- rows, next_token, total_count, is_all_succeed = ots_client.search(
|
|
|
- "enterprise", "enterprise_index",
|
|
|
- SearchQuery(
|
|
|
- bool_query,
|
|
|
- limit=100,
|
|
|
- sort=Sort(sorters=[FieldSort('bidi_id', SortOrder.DESC)]),
|
|
|
- get_total_count=True),
|
|
|
- ColumnsToGet(return_type=ColumnReturnType.SPECIFIED,
|
|
|
- column_names=column_list)
|
|
|
- )
|
|
|
- first_query = True
|
|
|
- except:
|
|
|
- print("~ first_query ots error ~")
|
|
|
- # next_token后续查询
|
|
|
- if first_query:
|
|
|
- while next_token:
|
|
|
- try:
|
|
|
- rows, next_token, total_count, is_all_succeed = ots_client.search(
|
|
|
- "enterprise", "enterprise_index",
|
|
|
- SearchQuery(
|
|
|
- bool_query,
|
|
|
- next_token=next_token,
|
|
|
- limit=100,
|
|
|
- get_total_count=True),
|
|
|
- ColumnsToGet(return_type=ColumnReturnType.SPECIFIED,
|
|
|
- column_names=column_list)
|
|
|
- )
|
|
|
- all_rows.extend(rows)
|
|
|
-
|
|
|
- except:
|
|
|
- print("~ ots query error, try again ~")
|
|
|
-
|
|
|
- # 清洗过滤
|
|
|
- # legal_name_list = []
|
|
|
- # not_legal_name_list = []
|
|
|
- legal_name_num = 0
|
|
|
- not_legal_name_num = 0
|
|
|
- name_sign_list = []
|
|
|
- for row in all_rows:
|
|
|
- # 索引字段
|
|
|
- index_field = row[1]
|
|
|
- row_dict = dict((item[0], item[1]) for item in index_field)
|
|
|
- name = ""
|
|
|
- num = 0
|
|
|
- for key,value in row_dict.items():
|
|
|
- if key=='nicknames':
|
|
|
- name = value
|
|
|
- else:
|
|
|
- if len(str(value).replace("-",""))>1:
|
|
|
- num += 1
|
|
|
- isLegal = isLegalNewName(name)
|
|
|
- if isLegal>=0:
|
|
|
- if num>=1 and len(name)>4:
|
|
|
- legal_name_num += 1
|
|
|
- name_sign_list.append((name,1))
|
|
|
- # elif num>=1:
|
|
|
- # pass
|
|
|
- # else:
|
|
|
- # not_legal_name_num += 1
|
|
|
- # # name_sign_list.append((name,0))
|
|
|
- # pass
|
|
|
-
|
|
|
- pool_db = ConnectorPool(10, 30, getConnect_redis_baseline)
|
|
|
- # _start_time = time.time()
|
|
|
- task_queue = Queue()
|
|
|
- for legal_name in name_sign_list:
|
|
|
- task_queue.put(legal_name)
|
|
|
- if task_queue.qsize() >= 100 * 10000:
|
|
|
- _mt = MultiThreadHandler(task_queue, handle, None, 30)
|
|
|
- _mt.run()
|
|
|
- if task_queue.qsize() >= 0:
|
|
|
+ # es_url = "http://es-cn-lbj3cjmy3000djxak.elasticsearch.aliyuncs.com" # 内网
|
|
|
+ es_url = "http://es-cn-lbj3cjmy3000djxak.public.elasticsearch.aliyuncs.com" # 外网
|
|
|
+ es_client = Elasticsearch([es_url],
|
|
|
+ http_auth=('elastic', 'WWBu9#1HWHo$$gJm'),
|
|
|
+ port=9200)
|
|
|
+ body = {
|
|
|
+ "_source": ["name", "history_names", 'legal_person', 'reg_capital', 'credit_code', 'tax_number',
|
|
|
+ 'reg_number', 'org_number',
|
|
|
+ "zhao_biao_number", "zhong_biao_number", "dai_li_number", "bid_number"],
|
|
|
+ 'query': { # 查询命令
|
|
|
+ "bool": {
|
|
|
+ 'must': [
|
|
|
+ {'range': {"update_time": {
|
|
|
+ "gte": last_create_time, # >= 大于等于
|
|
|
+ "lt": now_time # < 小于
|
|
|
+ }}},
|
|
|
+ # {'range': {"create_time": {
|
|
|
+ # "gte": last_create_time, # >= 大于等于
|
|
|
+ # "lt": now_time # < 小于
|
|
|
+ # }}},
|
|
|
+ {'range': {"status": {
|
|
|
+ "gte": 201, # >= 大于等于
|
|
|
+ "lt": 301 # < 小于
|
|
|
+ }}}
|
|
|
+ ]
|
|
|
+ }
|
|
|
+ },
|
|
|
+ "sort": [
|
|
|
+ {"create_time": "desc"}
|
|
|
+ ]
|
|
|
+ }
|
|
|
+
|
|
|
+ query = es_client.search(index='enterprise', body=body, scroll='10m', size=5000)
|
|
|
+ scroll_id = query['_scroll_id'] # 游标用于输出es查询出的所有结果
|
|
|
+ query_result = query['hits']['hits']
|
|
|
+ result = query_result
|
|
|
+ while len(query_result) > 0:
|
|
|
+ try:
|
|
|
+ query_scroll = es_client.scroll(scroll_id=scroll_id, scroll='10m')
|
|
|
+ scroll_id = query_scroll['_scroll_id']
|
|
|
+ query_result = query_scroll['hits']['hits']
|
|
|
+ if len(query_result) > 0:
|
|
|
+ result += query_result
|
|
|
+ else:
|
|
|
+ break
|
|
|
+ except:
|
|
|
+ pass
|
|
|
+
|
|
|
+ es_client.clear_scroll(scroll_id=scroll_id)
|
|
|
+
|
|
|
+ legal_name_num = 0
|
|
|
+ not_legal_name_num = 0
|
|
|
+ add_redis_list = []
|
|
|
+ for item in result:
|
|
|
+ item = item['_source']
|
|
|
+ name = item['name']
|
|
|
+ history_names = item.get("history_names", "")
|
|
|
+ legal_person = item.get("legal_person", "")
|
|
|
+ reg_capital = item.get("reg_capital", "")
|
|
|
+ credit_code = item.get("credit_code", "")
|
|
|
+ tax_number = item.get("tax_number", "")
|
|
|
+ reg_number = item.get("reg_number", "")
|
|
|
+ org_number = item.get("org_number", "")
|
|
|
+ zhao_biao_number = item.get("zhao_biao_number", 0)
|
|
|
+ zhong_biao_number = item.get("zhong_biao_number", 0)
|
|
|
+ dai_li_number = item.get("dai_li_number", 0)
|
|
|
+ bid_number = item.get("bid_number", 0)
|
|
|
+
|
|
|
+ num = 0
|
|
|
+ for business in [history_names, legal_person, reg_capital, credit_code, tax_number, reg_number, org_number]:
|
|
|
+ if len(str(business).replace("-", "")) > 1:
|
|
|
+ num += 1
|
|
|
+ isLegal = isLegalNewName(name)
|
|
|
+ if isLegal >= 0:
|
|
|
+ if num >= 1 and len(name) > 4:
|
|
|
+ legal_name_num += 1
|
|
|
+ _json = {"have_business": 1, "zhao_biao_number": zhao_biao_number,
|
|
|
+ "zhong_biao_number": zhong_biao_number,
|
|
|
+ "dai_li_number": dai_li_number, "bid_number": bid_number}
|
|
|
+ _json = json.dumps(_json, ensure_ascii=False)
|
|
|
+ add_redis_list.append((name, _json))
|
|
|
+ elif num == 0 and bid_number > 0 and len(name) > 4:
|
|
|
+ legal_name_num += 1
|
|
|
+ _json = {"have_business": 0, "zhao_biao_number": zhao_biao_number,
|
|
|
+ "zhong_biao_number": zhong_biao_number,
|
|
|
+ "dai_li_number": dai_li_number, "bid_number": bid_number}
|
|
|
+ _json = json.dumps(_json, ensure_ascii=False)
|
|
|
+ add_redis_list.append((name, _json))
|
|
|
+
|
|
|
+ pool_db = ConnectorPool(10, 30, getConnect_redis_baseline)
|
|
|
+ # _start_time = time.time()
|
|
|
+ task_queue = Queue()
|
|
|
+ for legal_name in add_redis_list:
|
|
|
+ task_queue.put(legal_name)
|
|
|
+ if task_queue.qsize() >= 100 * 10000:
|
|
|
_mt = MultiThreadHandler(task_queue, handle, None, 30)
|
|
|
_mt.run()
|
|
|
- return legal_name_num,not_legal_name_num
|
|
|
+ if task_queue.qsize() >= 0:
|
|
|
+ _mt = MultiThreadHandler(task_queue, handle, None, 30)
|
|
|
+ _mt.run()
|
|
|
|
|
|
- return 0,0
|
|
|
+ return legal_name_num, not_legal_name_num
|
|
|
|
|
|
# Redis新增 合法实体和不合法实体
|
|
|
def monitor_enterprise2redis(self):
|
|
@@ -201,7 +210,8 @@ class enterprise2Redis():
|
|
|
def handle(item, result_queue):
|
|
|
_db = pool_db.getConnector()
|
|
|
try:
|
|
|
- _db.set(item, 0)
|
|
|
+ # _db.set(item, 0)
|
|
|
+ _db.delete(item)
|
|
|
except Exception as e:
|
|
|
traceback.print_exc()
|
|
|
finally:
|
|
@@ -281,6 +291,13 @@ if __name__ == '__main__':
|
|
|
# now_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(now_time))
|
|
|
# legal_name_num,not_legal_name_num = em.update2redis(last_create_time=last_create_time,now_time=now_time)
|
|
|
|
|
|
+ # 删除线上实体
|
|
|
+ # _enterprise2Redis = enterprise2Redis()
|
|
|
+ # drop_list = ["个体工商户"]
|
|
|
+ # _enterprise2Redis.fix_up_redis(drop_list)
|
|
|
+
|
|
|
+ # e = enterprise2Redis()
|
|
|
+ # e.monitor_enterprise2redis()
|
|
|
pass
|
|
|
|
|
|
|