Эх сурвалжийг харах

Merge remote-tracking branch 'origin/master'

luojiehua 1 жил өмнө
parent
commit
3d63936d35

+ 110 - 93
BaseDataMaintenance/maintenance/enterprise/enterprise2Redis.py

@@ -17,12 +17,13 @@ from BaseDataMaintenance.dataSource.pool import ConnectorPool
 from BaseDataMaintenance.common.multiThread import MultiThreadHandler
 import pandas as pd
 from BaseDataMaintenance.maintenance.dataflow_settings import *
-
+from elasticsearch import Elasticsearch
 
 # flow_enterprise2redis_path = "/data/python/flow_enterprise2redis.log"
 log_enterprise2redis_create_time_path = "/data/python/enterprise2redis_createTime.log"
 
 
+
 # 线上流程 enterprise Redis表维护
 class enterprise2Redis():
 
@@ -57,100 +58,108 @@ class enterprise2Redis():
             finally:
                 pool_db.putConnector(_db)
 
-        ots_client = getConnect_ots()
-        bool_query = BoolQuery(must_queries=[
-            RangeQuery("create_time", range_from=last_create_time, range_to=now_time),
-            RangeQuery("status", range_from=201, range_to=301),
-        ])
-        rows, next_token, total_count, is_all_succeed = ots_client.search("enterprise", "enterprise_index",
-                                                                          SearchQuery(bool_query, get_total_count=True),
-                                                                          columns_to_get=ColumnsToGet(
-                                                                              return_type=ColumnReturnType.NONE))
-
-        if total_count > 0:
-            column_list = ["nicknames", "history_names", 'tyc_id', 'legal_person', 'reg_capital', 'found_date',
-                           'credit_code', 'tax_number', 'reg_number', 'org_number']
-            all_rows = []
-            first_query = False
-            # 第一次查询
-            while not first_query:
-                try:
-                    rows, next_token, total_count, is_all_succeed = ots_client.search(
-                        "enterprise", "enterprise_index",
-                        SearchQuery(
-                            bool_query,
-                            limit=100,
-                            sort=Sort(sorters=[FieldSort('bidi_id', SortOrder.DESC)]),
-                            get_total_count=True),
-                            ColumnsToGet(return_type=ColumnReturnType.SPECIFIED,
-                                     column_names=column_list)
-                        )
-                    first_query = True
-                except:
-                    print("~ first_query ots error ~")
-            # next_token后续查询
-            if first_query:
-                while next_token:
-                    try:
-                        rows, next_token, total_count, is_all_succeed = ots_client.search(
-                            "enterprise", "enterprise_index",
-                            SearchQuery(
-                                bool_query,
-                                next_token=next_token,
-                                limit=100,
-                                get_total_count=True),
-                            ColumnsToGet(return_type=ColumnReturnType.SPECIFIED,
-                                         column_names=column_list)
-                        )
-                        all_rows.extend(rows)
-
-                    except:
-                        print("~ ots query error, try again ~")
-
-            # 清洗过滤
-            # legal_name_list = []
-            # not_legal_name_list = []
-            legal_name_num = 0
-            not_legal_name_num = 0
-            name_sign_list = []
-            for row in all_rows:
-                # 索引字段
-                index_field = row[1]
-                row_dict = dict((item[0], item[1]) for item in index_field)
-                name = ""
-                num = 0
-                for key,value in row_dict.items():
-                    if key=='nicknames':
-                        name = value
-                    else:
-                        if len(str(value).replace("-",""))>1:
-                            num += 1
-                isLegal = isLegalNewName(name)
-                if isLegal>=0:
-                    if num>=1 and len(name)>4:
-                        legal_name_num += 1
-                        name_sign_list.append((name,1))
-                # elif num>=1:
-                #     pass
-                # else:
-                #     not_legal_name_num += 1
-                #     # name_sign_list.append((name,0))
-                #     pass
-
-            pool_db = ConnectorPool(10, 30, getConnect_redis_baseline)
-            # _start_time = time.time()
-            task_queue = Queue()
-            for legal_name in name_sign_list:
-                task_queue.put(legal_name)
-                if task_queue.qsize() >= 100 * 10000:
-                    _mt = MultiThreadHandler(task_queue, handle, None, 30)
-                    _mt.run()
-            if task_queue.qsize() >= 0:
+        # es_url = "http://es-cn-lbj3cjmy3000djxak.elasticsearch.aliyuncs.com"  # 内网
+        es_url = "http://es-cn-lbj3cjmy3000djxak.public.elasticsearch.aliyuncs.com" # 外网
+        es_client = Elasticsearch([es_url],
+                                  http_auth=('elastic', 'WWBu9#1HWHo$$gJm'),
+                                  port=9200)
+        body = {
+            "_source": ["name", "history_names", 'legal_person', 'reg_capital', 'credit_code', 'tax_number',
+                        'reg_number', 'org_number',
+                        "zhao_biao_number", "zhong_biao_number", "dai_li_number", "bid_number"],
+            'query': {  # 查询命令
+                "bool": {
+                    'must': [
+                        {'range': {"update_time": {
+                            "gte": last_create_time,  # >= 大于等于
+                            "lt": now_time  # < 小于
+                        }}},
+                        # {'range': {"create_time": {
+                        #     "gte": last_create_time,  # >= 大于等于
+                        #     "lt": now_time  # < 小于
+                        # }}},
+                        {'range': {"status": {
+                            "gte": 201,  # >= 大于等于
+                            "lt": 301  # < 小于
+                        }}}
+                    ]
+                }
+            },
+            "sort": [
+                {"create_time": "desc"}
+            ]
+        }
+
+        query = es_client.search(index='enterprise', body=body, scroll='10m', size=5000)
+        scroll_id = query['_scroll_id']  # 游标用于输出es查询出的所有结果
+        query_result = query['hits']['hits']
+        result = query_result
+        while len(query_result) > 0:
+            try:
+                query_scroll = es_client.scroll(scroll_id=scroll_id, scroll='10m')
+                scroll_id = query_scroll['_scroll_id']
+                query_result = query_scroll['hits']['hits']
+                if len(query_result) > 0:
+                    result += query_result
+                else:
+                    break
+            except:
+                pass
+
+        es_client.clear_scroll(scroll_id=scroll_id)
+
+        legal_name_num = 0
+        not_legal_name_num = 0
+        add_redis_list = []
+        for item in result:
+            item = item['_source']
+            name = item['name']
+            history_names = item.get("history_names", "")
+            legal_person = item.get("legal_person", "")
+            reg_capital = item.get("reg_capital", "")
+            credit_code = item.get("credit_code", "")
+            tax_number = item.get("tax_number", "")
+            reg_number = item.get("reg_number", "")
+            org_number = item.get("org_number", "")
+            zhao_biao_number = item.get("zhao_biao_number", 0)
+            zhong_biao_number = item.get("zhong_biao_number", 0)
+            dai_li_number = item.get("dai_li_number", 0)
+            bid_number = item.get("bid_number", 0)
+
+            num = 0
+            for business in [history_names, legal_person, reg_capital, credit_code, tax_number, reg_number, org_number]:
+                if len(str(business).replace("-", "")) > 1:
+                    num += 1
+            isLegal = isLegalNewName(name)
+            if isLegal >= 0:
+                if num >= 1 and len(name) > 4:
+                    legal_name_num += 1
+                    _json = {"have_business": 1, "zhao_biao_number": zhao_biao_number,
+                             "zhong_biao_number": zhong_biao_number,
+                             "dai_li_number": dai_li_number, "bid_number": bid_number}
+                    _json = json.dumps(_json, ensure_ascii=False)
+                    add_redis_list.append((name, _json))
+                elif num == 0 and bid_number > 0 and len(name) > 4:
+                    legal_name_num += 1
+                    _json = {"have_business": 0, "zhao_biao_number": zhao_biao_number,
+                             "zhong_biao_number": zhong_biao_number,
+                             "dai_li_number": dai_li_number, "bid_number": bid_number}
+                    _json = json.dumps(_json, ensure_ascii=False)
+                    add_redis_list.append((name, _json))
+
+        pool_db = ConnectorPool(10, 30, getConnect_redis_baseline)
+        # _start_time = time.time()
+        task_queue = Queue()
+        for legal_name in add_redis_list:
+            task_queue.put(legal_name)
+            if task_queue.qsize() >= 100 * 10000:
                 _mt = MultiThreadHandler(task_queue, handle, None, 30)
                 _mt.run()
-            return legal_name_num,not_legal_name_num
+        if task_queue.qsize() >= 0:
+            _mt = MultiThreadHandler(task_queue, handle, None, 30)
+            _mt.run()
 
-        return 0,0
+        return legal_name_num, not_legal_name_num
 
     # Redis新增 合法实体和不合法实体
     def monitor_enterprise2redis(self):
@@ -201,7 +210,8 @@ class enterprise2Redis():
         def handle(item, result_queue):
             _db = pool_db.getConnector()
             try:
-                _db.set(item, 0)
+                # _db.set(item, 0)
+                _db.delete(item)
             except Exception as e:
                 traceback.print_exc()
             finally:
@@ -281,6 +291,13 @@ if __name__ == '__main__':
     # now_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(now_time))
     # legal_name_num,not_legal_name_num = em.update2redis(last_create_time=last_create_time,now_time=now_time)
 
+    # 删除线上实体
+    # _enterprise2Redis = enterprise2Redis()
+    # drop_list = ["个体工商户"]
+    # _enterprise2Redis.fix_up_redis(drop_list)
+
+    # e = enterprise2Redis()
+    # e.monitor_enterprise2redis()
     pass
 
 

+ 21 - 5
BaseDataMaintenance/maintenance/preproject/fillColumns.py

@@ -20,7 +20,10 @@ class PreprojectFill():
 
         def comsumer_handle(_row,result_queue):
             if _row.get(preproject_uuid) is None:
-                _row[preproject_uuid] = uuid4().hex
+                _preproject = Preproject(_row)
+                # 删除无uuid数据
+                _preproject.delete_row(self.ots_client)
+                return
             if _row.get(preproject_has_bidfile) is None:
                 json_docids = _row.get(preproject_json_docids)
                 if json_docids is not None:
@@ -41,16 +44,27 @@ class PreprojectFill():
             _preproject = Preproject(_row)
             _preproject.update_row(self.ots_client)
 
+
         _mul = MultiThreadHandler(self.task_queue,comsumer_handle,None,10)
         _mul.run()
 
 
 
     def fill_producer(self):
-        q1 = BoolQuery(should_queries=[WildcardQuery("uuid","*"),
-                                          RangeQuery("has_bidfile",0)])
+        # 存在uuid数据,补充'has_bidfile'字段
+        q1 = BoolQuery(must_queries=[
+            ExistsQuery("uuid"),
+            BoolQuery(must_not_queries=[
+                ExistsQuery("has_bidfile")
+            ])
+        ])
+        # 无uuid数据,用于删除行数据
+        q2 = BoolQuery(must_not_queries=[
+                ExistsQuery("uuid")
+            ])
         columns = ["uuid","has_bidfile","json_docids"]
-        query = BoolQuery(must_not_queries=[q1])
+        query = BoolQuery(should_queries=[q1,
+                                          q2])
         rows,next_token,total_count,is_all_succeed = self.ots_client.search("preproject","preproject_index",
                                                                             SearchQuery(query,sort=Sort(sorters=[FieldSort("uuid")]),get_total_count=True,limit=100),
                                                                             ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
@@ -66,7 +80,9 @@ class PreprojectFill():
                                                                                 ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
 
     def fill_contact_producer(self):
-        q1 = BoolQuery(must_queries=[TermQuery("status",1),
+        q1 = BoolQuery(must_queries=[
+            TermQuery("status",1),
+            ExistsQuery("uuid")
                                        ])
         columns = ["status",preproject_tenderee,preproject_last_tenderee_contact,preproject_last_tenderee_phone,preproject_last_win_tenderer,preproject_last_win_tenderer_contact,preproject_last_win_tenderer_phone]
         query = q1