fangjiasheng 5 месяцев назад
Родитель
Сommit
004ad92949

+ 1 - 1
BaseDataMaintenance/dataMonitor/data_monitor_fjs.py

@@ -218,7 +218,7 @@ class BaseDataMonitor():
                 os.mkdir(flow_init_check_dir)
 
             log_filename = os.path.join(flow_init_log_dir,"flow_init.log")
-            check_filename = os.path.join(flow_init_check_dir,"flow_init_2024-05-21.xlsx")
+            check_filename = os.path.join(flow_init_check_dir,"flow_init_2024-06-11.xlsx")
 
             list_uuid = []
             task_queue = Queue()

+ 82 - 0
BaseDataMaintenance/maintenance/proposedBuilding/compare_pb.py

@@ -0,0 +1,82 @@
+import json
+import os
+import sys
+
+with open('json.txt', 'r', encoding='utf-8') as f:
+    json_list = f.readlines()
+json_list = list(set(json_list))
+if '[]\n' in json_list:
+    json_list.remove('[]\n')
+data_list = [json.loads(x[:-1]) for x in json_list]
+data_list = [x[0] for x in data_list]
+
+with open('C:/Users/Administrator/Downloads/pb_json_list_240730.json', 'r', encoding='utf-8') as f:
+    json_str = f.read()
+json_list = json.loads(json_str)[1:]
+data_list2 = [json.loads(x[0]) for x in json_list]
+
+docid_data_list = []
+for data in data_list:
+    docid_list = [x.get('docid') for x in data]
+    docid_list = list(set(docid_list))
+    if None in docid_list:
+        docid_list.remove(None)
+    if 0 in docid_list:
+        docid_list.remove(0)
+    docid_list.sort(key=lambda x: x)
+    if data[0].get('docchannel') is not None:
+        data[0]['docchannel'] = int(data[0]['docchannel'])
+    docid_data_list.append([docid_list, data])
+
+docid_data_list2 = []
+for data in data_list2:
+    docid_list = [x.get('docid') for x in data]
+    docid_list = list(set(docid_list))
+    if None in docid_list:
+        docid_list.remove(None)
+    if 0 in docid_list:
+        docid_list.remove(0)
+    docid_list.sort(key=lambda x: x)
+    data[0]['pb_project_name'] = data[0]['project_name_refind']
+    if data[0].get('docchannel') is not None:
+        data[0]['docchannel'] = int(data[0]['docchannel'])
+    docid_data_list2.append([docid_list, data])
+
+equal_list = []
+not_equal_list = []
+for docid_list, data in docid_data_list:
+    for docid_list2, data2 in docid_data_list2:
+        if len(set(docid_list).intersection(set(docid_list2))) > 0:
+            if docid_list == docid_list2:
+                equal_list.append([data, data2])
+            else:
+                not_equal_list.append([data, data2])
+
+print('len(data_list)', len(data_list))
+print('len(equal_list)', len(equal_list))
+print('len(not_equal_list)', len(not_equal_list))
+print('='*30)
+print('='*30)
+
+for data, data2 in not_equal_list:
+    print('data', [[x.get('docid'), x.get('page_time'), x.get('stage')] for x in data[1:]])
+    print('data2', [[x.get('docid'), x.get('page_time'), x.get('stage')] for x in data2[1:]])
+
+    for key in set(list(data[0].keys()) + list(data2[0].keys())):
+        if key in ['partitionkey', 'has_stage', 'service_days', 'service_time',
+                   'nlp_enterprise_attachment', 'nlp_enterprise', 'show_name_refind',
+                   'project_name_refind', 'tenderee_contact', 'project2_uuid',
+                   'tenderee_phone']:
+            continue
+        if data[0].get(key) in [0., None, ""] and data2[0].get(key) in [0., None, ""]:
+            continue
+
+        if data[0].get(key) != data2[0].get(key):
+            print('data.get(key)', key, data[0].get(key))
+            print('data2.get(key)', key, data2[0].get(key))
+            # if key in ['page_time', 'stage']:
+            #     print('data', [[x.get('page_time'), x.get('stage')] for x in data[1:]])
+            #     print('data2', [[x.get('page_time'), x.get('stage')] for x in data2[1:]])
+
+    print('='*30)
+    input('next')

+ 1555 - 84
BaseDataMaintenance/maintenance/proposedBuilding/pb_project_production.py

@@ -1,124 +1,1595 @@
-#encoding:UTF8
-from BaseDataMaintenance.dataSource.pool import ConnectorPool
-from BaseDataMaintenance.dataSource.source import *
-from BaseDataMaintenance.common.Utils import *
+# encoding:UTF8
+import copy
+import inspect
+import json
+import logging
+import multiprocessing
+import os
+import re
+import sys
+import time
+import pandas as pd
+from concurrent.futures.thread import ThreadPoolExecutor
+from datetime import datetime, timedelta
 import queue
+import logging
 from tablestore import *
-from multiprocessing import RLock
-from threading import Thread
-from apscheduler.schedulers.blocking import BlockingScheduler
-from BaseDataMaintenance.common.multiThread import MultiThreadHandler
+sys.path.append(os.path.abspath(os.path.dirname(__file__)) + "/../../../")
+
+from BaseDataMaintenance.common.Utils import getRow_ots
+from BaseDataMaintenance.dataSource.source import getConnect_ots
 from BaseDataMaintenance.model.ots.proposedBuilding_tmp import proposedBuilding_tmp
 from BaseDataMaintenance.model.ots.designed_project import designed_project
 import traceback
 
+logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+
+
+def log(*args):
+    print_str = ""
+    print_str += str(globals().get('docid')) + " - "
+    for obj in args:
+        print_str += str(obj) + ' '
+    logging.info(print_str[:-1])
 
-class DataProduction:
-    def __init__(self):
-        self.done_lock = RLock()
-        self.isDone = False
-        self.proposedBuilding_table = "proposedBuilding_tmp"
-        self.proposedBuilding_table_index = "proposedBuilding_tmp_index"
+
+class PBProduction:
+    def __init__(self, test_mode=0, update=1):
+        print('init111')
         self.ots_client = getConnect_ots()
+        self.province_area_dict = {
+            '北京': '华北',
+            '天津': '华北',
+            '河北': '华北',
+            '山西': '华北',
+            '内蒙古': '华北',
+            '广东': '华南',
+            '海南': '华南',
+            '广西': '华南',
+            '上海': '华东',
+            '江苏': '华东',
+            '浙江': '华东',
+            '安徽': '华东',
+            '福建': '华东',
+            '江西': '华东',
+            '山东': '华东',
+            '河南': '华中',
+            '湖北': '华中',
+            '湖南': '华中',
+            '重庆': '西南',
+            '四川': '西南',
+            '贵州': '西南',
+            '云南': '西南',
+            '西藏': '西南',
+            '黑龙江': '东北',
+            '辽宁': '东北',
+            '吉林': '东北',
+            '陕西': '西北',
+            '甘肃': '西北',
+            '青海': '西北',
+            '宁夏': '西北',
+            '新疆': '西北',
+            '台湾': '港澳台',
+            '香港': '港澳台',
+            '澳门': '港澳台',
+        }
+
+        self.opertime_txt = os.path.abspath(os.path.dirname(__file__)) + '/opertime.txt'
+        with open(self.opertime_txt, 'r') as f:
+            self.opertime_last = f.read()
+
+        self.stage_pattern, self.stage_priority_dict = self.get_stage_pattern()
+
+        self.document_tmp_cols = ['docid', 'tenderee', 'project_code', 'project_name',
+                                  'agency', 'opertime', 'pb_project_name', 'proportion',
+                                  'doctitle', 'city', 'province', 'product']
+        self.document_tmp_sort_col = 'opertime'
+
+        self.document_cols = ['docid', 'extract_json', 'tenderee', 'agency', 'doctitle',
+                              'pb_project_name', 'proportion', 'project_code', 'page_time',
+                              'area', 'province', 'city', 'district',
+                              'docchannel', 'original_id',
+                              'project_name', 'doctitle']
+        self.document_sort_col = 'page_time'
+
+        self.test_doc_ids = [
+            # 509852929,
+            # 509807936,
+            # 377270046,
+            # 305681325,
+            # 475235476,
+            510627610
+        ]
+        self.test_sp_ids = [400063255442, ]
+
+        # test_mode
+        # 1 - 根据test_doc_ids中的docid,搜索document,作为数据集
+        # 0 - 根据opertime.txt文件中的opertime,搜索document_tmp大于该时间的数据,作为数据集
+        self.test_mode = test_mode
+
+        # 1 - 最终结果更新到designed_project表
+        # 0 - 不更新
+        self.update_designed_project_flag = update
+
+    def get_stage_pattern(self):
+        stage_dict = {
+            "立项阶段": "立项|项目投资",
+            "可研阶段": "可行性研究|可研",
+            "环评阶段": "环境评价|环境影响|环境评测|环评|(水保|环保|环境保护)(编制|验收|监测)",
+            "稳评阶段": "稳定风险|社会稳定|风险评估",
+            "咨询阶段": "(水影响|能源|交通影响|地质灾害|地址灾害|地震安全性|地震安全性|气象|雷击风险|安全|海洋|森林环境)(评[价估测])|水土保持|(水|交|灾|震|能|气|安|海|林)评",
+            "造价阶段": "(决算书|预算|结算|造价|决算)(编制|咨询)",
+            "设计阶段": "(施工图(纸|)|初步|项目|工程)(方案|)设计|测绘|规划设计",
+            "勘察阶段": "(勘察|勘查)设计|勘察技术|勘查|勘察",
+            "施工图审": "(施工图(纸|)|防雷|消防|人防)审查|施工图审",
+            "施工许可": "施工许可证",
+            "施工准备": "施工准备|监理|资格预审|资审",
+            "施工在建": "施工",
+            "竣工阶段": "竣工|验收",
+            "EPC总承包": "总承包|EPC"
+        }
+
+        stage_priority_dict = {
+            "立项阶段": 1,
+            "可研阶段": 3,
+            "环评阶段": 2,
+            "稳评阶段": 3,
+            "咨询阶段": 2,
+            "造价阶段": 2,
+            "设计阶段": 4,
+            "勘察阶段": 4,
+            "施工图审": 2,
+            "施工许可": 2,
+            "施工准备": 3,
+            "施工在建": 5,
+            "竣工阶段": 3,
+            "EPC总承包": 4
+        }
+
+        list_stage_v = []
+        for k, v in stage_dict.items():
+            list_stage_v.append("(?P<%s>%s)" % (k, v))
+        stage_pattern = "|".join(list_stage_v)
+        return stage_pattern, stage_priority_dict
+
+    def pb_data_start(self, show=1):
+        start_time = time.time()
+        data_list = self.get_data_from_document_tmp()
+        if show:
+            log('get_data_from_document_tmp len', len(data_list),
+                'cost:', round(time.time() - start_time, 2),
+                'avg:', round((time.time() - start_time) / max(len(data_list), 1), 2))
+        return data_list
+
+    def pb_data_flow(self, data_list, show=1):
+        flow_start_time = time.time()
+
+        if len(data_list) == 0:
+            return
+
+        start_time = time.time()
+        all_merge_data_list = self.get_data_from_document_by_rules(data_list, threads=1)
+        if show:
+            log('get_data_from_document_by_rules len', len(all_merge_data_list),
+                'cost:', round(time.time() - start_time, 2),
+                'avg:', round((time.time() - start_time) / max(len(all_merge_data_list), 1), 2))
+            for data in all_merge_data_list:
+                print('len(data)', len(data))
+
+        start_time = time.time()
+        all_merge_data_list = self.get_data_from_project2(all_merge_data_list)
+        if show:
+            log('get_data_from_project2 len', len(all_merge_data_list),
+                'cost:', round(time.time() - start_time, 2),
+                'avg:', round((time.time() - start_time) / max(len(all_merge_data_list), 1), 2))
+        # for merge_data in all_merge_data_list:
+        #     print(merge_data[0].get('project_name'), len(merge_data))
+
+        start_time = time.time()
+        pb_group_list = self.data_to_pb_project(all_merge_data_list)
+        if show:
+            log('data_to_pb_project len', len(pb_group_list),
+                'cost:', round(time.time() - start_time, 2),
+                'avg:', round((time.time() - start_time) / max(len(pb_group_list), 1), 2))
+            for pb_group in pb_group_list:
+                if pb_group:
+                    log('1 follows len', len(pb_group) - 1, pb_group[0].get('pb_project_name'))
+
+            # if pb_group_list:
+            #     print('json_list_group111', json.dumps(pb_group_list[0], ensure_ascii=False))
+
+        start_time = time.time()
+        update_pb_group_list = self.pb_project_to_designed_project(pb_group_list)
+        if show:
+            log('pb_project_to_designed_project len', len(update_pb_group_list),
+                'cost:', round(time.time() - start_time, 2),
+                'avg:', round((time.time() - start_time) / max(len(update_pb_group_list), 1), 2))
+            for pb_group in update_pb_group_list:
+                if pb_group:
+                    follows = json.loads(pb_group.get('follows'))
+                    log('2 follows len', len(follows), pb_group.get("project_name"))
+                # print('=' * 30)
 
-    def producer(self,task_queue):
+        # self.update_opertime(data_list)
+
+        log('process data num:', len(data_list), 'update pb num:', len(update_pb_group_list),
+            'cost:', round(time.time() - flow_start_time, 2),
+            'avg:', round((time.time() - flow_start_time) / len(data_list), 2))
+        return pb_group_list
+
+    def search_util(self, table, table_index, bool_query, columns, sort_col, limit=999999, show_total=0, while_next=1, asc=0):
+        if sort_col:
+            if asc:
+                sort = Sort(sorters=[FieldSort(sort_col, SortOrder.ASC)])
+            else:
+                sort = Sort(sorters=[FieldSort(sort_col, SortOrder.DESC)])
+        else:
+            sort = None
+        return_type = ColumnReturnType.SPECIFIED
+        rows, next_token, total_count, is_all_succeed = self.ots_client.search(table, table_index,
+                                                                               SearchQuery(bool_query, sort=sort,
+                                                                                           limit=100,
+                                                                                           get_total_count=True),
+                                                                               ColumnsToGet(columns, return_type))
+        list_data = getRow_ots(rows)
+        if show_total:
+            print('search total_count', total_count)
+        if len(list_data) >= limit:
+            print('limit ', limit, len(list_data))
+            return list_data[:limit]
+
+        if while_next:
+            while next_token and len(list_data) < limit:
+                rows, next_token, total_count, is_all_succeed = self.ots_client.search(table, table_index,
+                                                                                       SearchQuery(bool_query,
+                                                                                                   next_token=next_token,
+                                                                                                   limit=100,
+                                                                                                   get_total_count=True),
+                                                                                       ColumnsToGet(columns,
+                                                                                                    return_type))
+                list_data += getRow_ots(rows)
+        return list_data
+
+    def multi_thread_util(self, _list, task, threads):
+        result_list = []
+        # 创建一个线程池,参数为线程数量
+        with ThreadPoolExecutor(max_workers=threads) as executor:
+            # 使用线程池映射任务函数和参数
+            futures = [executor.submit(task, _list[i]) for i in range(len(_list))]
+
+            # 等待所有任务完成,并获取返回值
+            for future in futures:
+                result = future.result()  # 获取任务的返回值
+                # print(f"Task returned: {result}")
+                result_list.append(result)
+        return result_list
+
+    def get_data_from_document_tmp(self, max_cnt=3000):
         """
-        :return:生产数据
+        从document_temp表拿增量数据
+
+        :return:
         """
-        ots_client = getConnect_ots()
+        def task(data):
+            _bool_query = BoolQuery(
+                must_queries=[BoolQuery(must_queries=[TermQuery('docid', data)])
+                              ])
+            result = self.search_util('document_tmp', 'document_tmp_index',
+                                      _bool_query, self.document_tmp_cols,
+                                      self.document_tmp_sort_col, show_total=0)
+            return result
 
-        bool_query = BoolQuery(must_queries=[ExistsQuery("crtime")])
+        def task1(data):
+            _bool_query = BoolQuery(
+                must_queries=[BoolQuery(must_queries=[TermQuery('docid', data)])
+                              ])
+            cols = copy.deepcopy(self.document_tmp_cols)
+            cols.remove('opertime')
+            cols.append('page_time')
+            result = self.search_util('document', 'document_index',
+                                      _bool_query, cols,
+                                      self.document_sort_col, show_total=0)
+            return result
 
-        columns = ["uuid", "crtime", "json_list_group"]
+        if self.test_mode:
+            data_list = self.multi_thread_util(self.test_doc_ids, task1, 20)
+            data_list = [y for x in data_list for y in x]
+        else:
+            bool_query = BoolQuery(must_queries=[RangeQuery("opertime", self.opertime_last, None),
+                                                 RangeQuery('status', 81, None),
+                                                 TermQuery('save', 1),
+                                                 BoolQuery(should_queries=[RangeQuery('docchannel', None, 123),
+                                                                           TermQuery('docchannel', '302')
+                                                                           ])
+                                                 ])
+            data_list = self.search_util('document_tmp', 'document_tmp_index',
+                                         bool_query, self.document_tmp_cols,
+                                         self.document_tmp_sort_col, show_total=1,
+                                         limit=max_cnt, asc=1)
+        return data_list
 
-        rows, next_token, total_count, is_all_succeed = ots_client.search(self.proposedBuilding_table, self.proposedBuilding_table_index,
-                                                                          SearchQuery(bool_query, sort=Sort(sorters=[FieldSort("crtime", SortOrder.DESC)]), limit=100, get_total_count=True),
-                                                                          ColumnsToGet(columns, return_type=ColumnReturnType.SPECIFIED))
-        list_data = getRow_ots(rows)
-        for _data in list_data:
-            _proposed = proposedBuilding_tmp(_data)
-            task_queue.put(_proposed,True)
-        _count = len(list_data)
-        while next_token:
-            rows, next_token, total_count, is_all_succeed = ots_client.search(self.proposedBuilding_table, self.proposedBuilding_table_index,
-                                                                              SearchQuery(bool_query ,next_token=next_token, limit=100, get_total_count=True),
-                                                                              ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
-            list_data = getRow_ots(rows)
-            for _data in list_data:
-                _proposed = proposedBuilding_tmp(_data)
-                task_queue.put(_proposed,True)
-            _count += len(list_data)
-            if _count>3000:
+    def extract_stage(self, doctitle, project_name, _pattern, priority_dict, product='', tenderee='', agency=''):
+        if not project_name:
+            project_name = ''
+        if not doctitle:
+            doctitle = ''
+        content = project_name + '\t' + doctitle
+
+        # 判断这几类直接返回
+        if not content:
+            return None
+        if re.search("拍卖|转让|产权|出让|租赁|招租", content) is not None:
+            return None
+        # 去掉招标人代理人
+        content = re.sub(str(tenderee), '', content)
+        content = re.sub(str(agency), '', content)
+
+        # 竣工阶段标志:消防、物业、安保、装修、通风系统、排烟、第三方检测
+        # if re.search("消防|物业|安保|装修|通风系统|排烟|第三方检测", content) is not None:
+        #     return '竣工阶段'
+
+        # 替换混淆词
+        _content = re.sub("设计院|设计总院|造价咨询有限公司", "", content)
+
+        list_stage = []
+        for stage_search in re.finditer(_pattern, _content):
+            for k, v in stage_search.groupdict().items():
+                if v is not None:
+                    list_stage.append([k, priority_dict.get(k)])
+        if len(list_stage) > 0:
+            list_stage.sort(key=lambda x: x[1])
+            stage = list_stage[0][0]
+
+            # 用product判断竣工阶段是否合法
+            if product:
+                if not re.search('施工|工程|建设', product):
+                    stage = None
+                    for s in list_stage:
+                        if s[0] != '竣工阶段':
+                            stage = s[0]
+                            break
+
+            # 立项排除立项目
+            if stage == '立项阶段':
+                sub_content = re.sub('立项目', '', _content)
+                for stage_search in re.finditer(_pattern, sub_content):
+                    for k, v in stage_search.groupdict().items():
+                        if v is not None:
+                            list_stage.append([k, priority_dict.get(k)])
+                if len(list_stage) > 0:
+                    list_stage.sort(key=lambda x: x[1])
+                    stage = list_stage[0][0]
+
+            return stage
+        return None
+
+    def judge_from_designed_project(self, pb_project_name):
+        """
+        验证没有stage的pb_project_name在拟在建表中模糊查询能否查到
+
+        :return:
+        """
+        if not pb_project_name:
+            return False
+
+        bool_query = BoolQuery(must_queries=[MatchPhraseQuery("project_name", pb_project_name),
+                                             BoolQuery(must_not_queries=[TermQuery('status', '404')])])
+        sub_data_list = self.search_util('designed_project', 'designed_project_index', bool_query,
+                                         ['docid'], None, limit=2, while_next=0)
+        print('judge_from_designed_project', pb_project_name, len(sub_data_list))
+        if len(sub_data_list) > 0:
+            return True
+        else:
+            return False
+
+    def get_data_from_document_by_rules(self, data_list, threads=1, show=0):
+        def task(data_dict):
+            pb_project_name = data_dict.get('pb_project_name')
+            tenderee = data_dict.get('tenderee')
+            project_code = data_dict.get('project_code')
+            proportion = data_dict.get('proportion')
+            agency = data_dict.get('agency')
+            doctitle = data_dict.get('doctitle')
+            project_name = data_dict.get('project_name')
+            province = data_dict.get('province')
+            city = data_dict.get('city')
+            product = data_dict.get('product')
+
+            # 设置省市搜索
+            if city in ['', None, '-', '未知']:
+                if province in ['', None, '-', '未知']:
+                    location_col = 'province'
+                    location_val = None
+                else:
+                    location_col = 'province'
+                    location_val = province
+            else:
+                location_col = 'city'
+                location_val = city
+
+            # 提取阶段
+            stage = self.extract_stage(doctitle, project_name, self.stage_pattern, self.stage_priority_dict,
+                                       product, tenderee, agency)
+
+            # stage为空且在拟在建表模糊搜索无结果的
+            if stage is None and not self.judge_from_designed_project(pb_project_name):
+                return []
+
+            merge_data_list = []
+            # pb_project_name, city
+            if pb_project_name and len(pb_project_name) >= 3:
+                bool_query = BoolQuery(must_queries=[TermQuery("pb_project_name", pb_project_name),
+                                                     TermQuery(location_col, location_val),
+                                                     RangeQuery('status', 201, 301)])
+                # log('bool_query1', 'pb_project_name', pb_project_name, location_col, location_val, )
+                sub_data_list = self.search_util('document', 'document_index', bool_query,
+                                                 self.document_cols, self.document_sort_col,
+                                                 limit=500)
+                sub_data_list = [[x, 0] for x in sub_data_list]
+                merge_data_list += sub_data_list
+                if show:
+                    print('rule 1', pb_project_name, len(sub_data_list), [x[0].get('docid') for x in sub_data_list])
+
+            # tenderee + project_code
+            if tenderee and project_code and len(project_code) > 6 and project_code[-1] != '-':
+                bool_query = BoolQuery(must_queries=[TermQuery("tenderee", tenderee),
+                                                     TermQuery("project_codes", project_code),
+                                                     RangeQuery('status', 201, 301)
+                                                     ])
+                sub_data_list = self.search_util('document', 'document_index', bool_query,
+                                                 self.document_cols, self.document_sort_col,
+                                                 limit=500)
+                sub_data_list = [[x, 1] for x in sub_data_list]
+                merge_data_list += sub_data_list
+                if show:
+                    print('rule 2', pb_project_name, len(sub_data_list), [x[0].get('docid') for x in sub_data_list])
+
+            # tenderee + proportion
+            if tenderee and proportion and len(proportion) >= 2:
+                bool_query = BoolQuery(must_queries=[TermQuery("tenderee", tenderee),
+                                                     TermQuery("proportion", proportion),
+                                                     RangeQuery('status', 201, 301)
+                                                     ])
+                sub_data_list = self.search_util('document', 'document_index', bool_query,
+                                                 self.document_cols, self.document_sort_col,
+                                                 limit=500)
+                sub_data_list = [[x, 2] for x in sub_data_list]
+                merge_data_list += sub_data_list
+                if show:
+                    print('rule 3', pb_project_name, len(sub_data_list), [x[0].get('docid') for x in sub_data_list])
+
+            return merge_data_list
+
+        all_merge_data_list = self.multi_thread_util(data_list, task, threads)
+        if show:
+            for data in all_merge_data_list:
+                print('multi_thread_util len(data)', len(data))
+
+        temp_list = []
+        rule_0_cnt, rule_1_cnt, rule_2_cnt = 0, 0, 0
+        for data in all_merge_data_list:
+            if data:
+                # print('data', data)
+                for d in data:
+                    if d[1] == 0:
+                        rule_0_cnt += 1
+                    elif d[1] == 1:
+                        rule_1_cnt += 1
+                    elif d[1] == 2:
+                        rule_2_cnt += 1
+                new_data = [x[0] for x in data]
+                # if show:
+                #     print('data_process_document len(new_data)0', len(new_data))
+                new_data = self.data_process_document(new_data)
+                # if show:
+                #     print('data_process_document len(new_data)', len(new_data))
+                temp_list.append(new_data)
+        all_merge_data_list = temp_list
+        # if show:
+        #     for data in all_merge_data_list:
+        #         print('data_process_document len(data)', len(data))
+        # all_merge_data_list = [x[0] for x in all_merge_data_list]
+        log('rule_0_cnt, rule_1_cnt, rule_2_cnt', rule_0_cnt, rule_1_cnt, rule_2_cnt)
+        return all_merge_data_list
+
+    def get_data_from_project2(self, all_merge_data_list, show=1):
+        def task1(docid):
+            bool_query = BoolQuery(must_queries=[TermQuery("docids", docid)])
+            sub_data_list = self.search_util('project2', 'project2_index',
+                                             bool_query, ['docids', 'uuid'], None)
+            return sub_data_list
+
+        def task2(docid):
+            bool_query = BoolQuery(must_queries=[TermQuery("docid", docid)])
+            sub_data_list = self.search_util('document', 'document_index',
+                                             bool_query, self.document_cols,
+                                             self.document_sort_col)
+            return sub_data_list
+
+        new_all_merge_data_list = []
+        for merge_data_list in all_merge_data_list:
+            docid_list = []
+            # pb_project_name = merge_data_list[0].get('pb_project_name')
+            for data in merge_data_list:
+                docid = data.get('docid')
+                if docid:
+                    docid_list.append(docid)
+            docid_list = list(set(docid_list))
+
+            data_list = self.multi_thread_util(docid_list, task1, 1)
+            data_list = [y for x in data_list for y in x]
+            # data_list = []
+            # for docid in docid_list:
+            #     bool_query = BoolQuery(must_queries=[TermQuery("docids", docid)])
+            #     data_list += self.search_util('project2', 'project2_index',
+            #                                   bool_query, ['docids'], None)
+
+            project2_docid_list = []
+            for data_dict in data_list:
+                docids = data_dict.get('docids')
+                if docids:
+                    project2_docid_list += docids.split(',')
+            project2_docid_list = list(set([int(x) for x in project2_docid_list]))
+            uuid_list = [x.get('uuid') for x in data_list]
+            uuid_list = list(set(uuid_list))
+
+            new_docid_list = list(set(project2_docid_list) - (set(docid_list)))
+            # print('docid_list', docid_list)
+            # print('project2_docid_list', project2_docid_list)
+            # print('new_docid_list', new_docid_list)
+
+            new_data_list = self.multi_thread_util(new_docid_list, task2, 3)
+            new_data_list = [y for x in new_data_list for y in x]
+            # print('rule 4', pb_project_name, len(new_data_list), [x.get('docid') for x in new_data_list])
+
+            # new_data_list = []
+            # for docid in new_docid_list:
+            #     bool_query = BoolQuery(must_queries=[TermQuery("docid", docid)])
+            #     new_data_list += self.search_util('document', 'document_index',
+            #                                       bool_query, self.document_cols,
+            #                                       self.document_sort_col)
+            new_data_list = self.data_process_document(new_data_list)
+            new_all_merge_data_list.append(merge_data_list + new_data_list)
+            # print('project2 num', len(merge_data_list), len(new_data_list))
+        return new_all_merge_data_list
+
+    def get_data_from_designed_project(self):
+        return
+
+    def data_process_document(self, data_list):
+        new_data_list = []
+        for data_dict in data_list:
+            if not data_dict.get('page_time'):
+                print('no page_time! docid:', data_dict.get('docid'))
+                continue
+            if not data_dict.get('extract_json'):
+                print('no extract_json! docid:', data_dict.get('docid'))
+                continue
+            try:
+                extract_json = json.loads(data_dict.get('extract_json'))
+            except:
+                print(data_dict)
+                raise
+            data_dict['shenpi_id'] = None
+            data_dict.pop('extract_json')
+            if extract_json:
+                pb_value = extract_json.get('pb')
+                if not pb_value or pb_value == 'error':
+                    print('extract_json pb is None! docid:', data_dict.get('docid'))
+                    continue
+                for key in pb_value.keys():
+                    if key not in data_dict.keys():
+                        data_dict[key] = pb_value.get(key)
+
+            # 审批项目的docid转化为sp_id
+            if data_dict.get('docchannel') == 302:
+                data_dict['docid'] = None
+                data_dict['shenpi_id'] = int(data_dict.get('original_id'))
+
+            # set type_id
+            if data_dict['docid'] not in [None, ""] and data_dict['shenpi_id'] in [None, ""]:
+                data_dict['type_id'] = data_dict['docid']
+            elif data_dict['shenpi_id'] not in [None, ""] and data_dict['docid'] in [None, ""]:
+                data_dict['type_id'] = -data_dict['shenpi_id']
+            new_data_list.append(data_dict)
+        return new_data_list
+
+    def add_main_project(self, pbs, show=0):
+        # 判断是否已存在main_project,删掉
+        for p in pbs:
+            if p.get('type_id') and p.get('type_id') == 0:
+                pbs.pop(p)
+
+        # 获取数量最多的project_name_refind作为main_project_name
+        refind_dict = {}
+        for p in pbs:
+            project_name_refind = p.get('project_name_refind')
+            if project_name_refind:
+                pass
+            elif p.get('doctitle'):
+                project_name_refind = p.get('doctitle')
+            elif p.get('project_name'):
+                project_name_refind = p.get('project_name')
+            else:
+                continue
+
+            if project_name_refind in refind_dict.keys():
+                refind_dict[project_name_refind] += [p]
+            else:
+                refind_dict[project_name_refind] = [p]
+
+        refind_list = []
+        for key in refind_dict.keys():
+            refind_list.append([key, refind_dict.get(key)])
+        if not refind_list:
+            main_project_name = ''
+            main_project = None
+        else:
+            refind_list.sort(key=lambda x: len(x[1]))
+            main_project_name = refind_list[-1][0]
+            p_list = refind_list[-1][1]
+            p_list.sort(key=lambda x: len(str(x)))
+            main_project = copy.deepcopy(p_list[-1])
+
+            # 所有字段合到main_project中
+            col_list = ["province", "city", "district", "tenderee", "tenderee_contact", "tenderee_phone",
+                        "agency", "project_code", "project_name", "proportion", "projectDigest", "projectAddress",
+                        "begin_time", "end_time", "project_name_refind", "industry", "location", 'doctitle',
+                        'service_time', 'service_days', 'docchannel', 'project2_uuid', 'stage']
+            # 合并
+            for col in col_list:
+                if main_project.get(col) is None:
+                    for p in pbs:
+                        if p.get(col) is not None:
+                            main_project[col] = p.get(col)
+                            break
+
+        if main_project is not None:
+            # 设置main_project特有属性
+            main_project["project_name"] = main_project_name
+            main_project["docid"] = 0
+            main_project["shenpi_id"] = 0
+            main_project["type_id"] = 0
+            main_project["main"] = 1
+            pbs = [main_project] + pbs
+            if show:
+                print('add_main_project main_project', main_project)
+        else:
+            # print("can't find main_project", pbs)
+            pass
+        return pbs
+
+    def remerge_by_district(self, pbs):
+        province_dict = {}
+        city_dict = {}
+        district_dict = {}
+        for g in pbs[1:]:
+            province = g.get('province')
+            city = g.get('city')
+            district = g.get('district')
+
+            if province in ['', 'None', '-', None, '未知', '全国']:
+                province = 'None'
+            if city in ['', 'None', '-', None, '未知', '全国']:
+                city = 'None'
+            if district in ['', 'None', '-', None, '未知', '全国']:
+                district = 'None'
+
+            province = re.sub('省', '', province)
+            city = re.sub('市', '', city)
+            district = re.sub('[区县]', '', district)
+
+            if province in province_dict.keys():
+                province_dict[province] += [g]
+            else:
+                province_dict[province] = [g]
+            if city in city_dict.keys():
+                city_dict[city] += [g]
+            else:
+                city_dict[city] = [g]
+            if district in district_dict.keys():
+                district_dict[district] += [g]
+            else:
+                district_dict[district] = [g]
+
+        # 同一个城市,太多不同地区直接不输出
+        if len(district_dict.keys()) >= 5 and len(city_dict.keys()) == 1:
+            return
+
+            # 按省分
+        if 'None' in province_dict.keys():
+            province_list = list(province_dict.keys())
+            province_list.remove('None')
+            province_list = [[x, len(province_dict.get(x))] for x in province_list]
+            none_cnt = len(province_dict.get('None'))
+        else:
+            province_list = [[x, len(province_dict.get(x))] for x in province_dict.keys()]
+            none_cnt = 0
+
+        if province_list:
+            province_list.sort(key=lambda x: x[1])
+            max_cnt = province_list[-1][1]
+        else:
+            max_cnt = 0
+        split_flag = 0
+        if none_cnt != len(pbs) and max_cnt / (len(pbs) - none_cnt) < 0.85:
+            split_flag = 1
+
+        new_list = []
+        if len(province_list) >= 2 and split_flag:
+            for key in province_dict.keys():
+                new_group = province_dict.get(key)
+                new_group = self.add_main_project(new_group)
+                new_list.append(new_group)
+        else:
+            # 按市分
+            if 'None' in city_dict.keys():
+                city_list = list(city_dict.keys())
+                city_list.remove('None')
+                city_list = [[x, len(city_dict.get(x))] for x in city_list]
+            else:
+                city_list = [[x, len(city_dict.get(x))] for x in city_dict.keys()]
+
+            if 'None' in district_dict.keys():
+                district_dict.pop('None')
+
+            if len(city_list) >= 4:
+                for key in city_dict.keys():
+                    new_group = city_dict.get(key)
+                    new_group = self.add_main_project(new_group)
+                    new_list.append(new_group)
+            else:
+                new_list = [pbs]
+        return new_list
+
+    def judge_project(self, pbs, show=0):
+        # print('pbs1', pbs)
+        pbs.sort(key=lambda x: x.get("main", 0), reverse=True)
+        main_project = pbs[0]
+        if show:
+            # print('judge_project pbs', pbs)
+            # print('judge_project main_project', main_project)
+            pass
+
+        new_need_col = ['projectDigest', 'stage', 'tenderee', 'page_time']
+        follow_need_col = ['page_time']
+
+        new_list = []
+        new_enough_flag = 1
+        for col in new_need_col:
+            if not main_project.get(col):
+                new_enough_flag = 0
                 break
+        if show:
+            print('judge_project new_enough_flag', new_enough_flag)
+
+        # 只有一个竣工阶段的也不要
+        if len(pbs) == 2 and main_project.get('stage') == '竣工阶段':
+            new_enough_flag = 0
+
+        if new_enough_flag:
+            main_project["new_enough"] = new_enough_flag
+            main_project["follow_enough"] = 1
+            new_list = [main_project]
+            # print('pbs', pbs)
+            # print(len(pbs), pbs[0])
+            for project in pbs[1:]:
+                follow_enough_flag = 1
+                # if project.get('new_enough'):
+                # print('project', project)
+                for col in follow_need_col:
+                    if not project.get(col):
+                        follow_enough_flag = 0
+                        break
+                if follow_enough_flag:
+                    project["new_enough"] = 0
+                    project["follow_enough"] = follow_enough_flag
+                    new_list.append(project)
+        # print('new_list', new_list)
+        # print('+'*20)
+        return new_list
+
+    def get_project_name_refind(self, project_name, doctitle, tenderee='', agency='', min_len=3):
+        # 跳过部分
+        re_str11 = '网上超市|服务市场采购|印刷服务|复印纸|车辆维修和保养|商品房预售|办公家具定点|直接订购|定点议价' \
+                   '|政府采购意向|信息技术服务定点议价|信息技术服务定点采购|法人章刻制中介机构|专用设备|办公设备采购' \
+                   '|线上摇号选取'
+        re_str12 = '物业'
+        re_str13 = '公共资源交易平台'
+        re_str19 = '环境影响评价(文件|)(审批|审核|受理)|拟作出的建设'
+
+        # 干扰部分
+        re_str1 = "<.*?>[.*?]|{.*?}|〔.*?〕|《.*?》|【.*?】|\(.*?\)|\[.*?\]|(.*?)|\d{1,2}月\d{1,2}[日号]|\d{1,2}:\d{2}"
+
+        re_str4 = '[,.:;,。:;\'\"“”‘’\-/<>#@!$%&*+=·¥|??|-+#"﹝﹒!]'
+
+        re_str5 = '[工程项目建设拟对年批第作出的个及在已]|标段|EPC|总承包|招标|文件|开标|记录|公示|验收|勘察|编制|公开选取|准予|论证|各单位|附件|建筑业' \
+                  '|责任|诚挚|拟作出审批意见|生产|涉及|消防|政府|投资|方案|技术支持|文件|研发|申请报告|出具|现代|产业|依法|报批|行政|审批|许可|开展' \
+                  '|活动|开放日|系列|某部|零星工程|某(地产|型号|单位)|权限内|办理了|外商|我院|召开|我市|启动|我单位|我(县|区|会)|成功|举办|举行' \
+                  '|被评为|征(求|询)|包括|不包括|层层分包|合同估算价|万元以内|组织|全(市|区|县)|承接|积极|针对|企业|小规模|安全|助推|装修|改造' \
+                  '|新建|居住|技术|建设|建筑|安装|园林|绿化|信息化|采购|商品房|预售|许可|房产|测量|报告|业务|零星|维修|水土保持|扩建|夜间|工地' \
+                  '|整治|高速公路|备案|加油站|设施|环境|保护|合同|履约|在线|询价|面积|受理|经济中心|服务|食品|加工|利用|公开|选取|动物|疫苗|框架' \
+                  '|协议|房屋|中国|不动产|实验室|限额|以下|单位|入围|审查|合格|意见|新能源|常规|许可|申请|加工|制品|建议书|可研|结算|审核|遴选' \
+                  ''
+
+        re_str6 = '总承包|设计|环评|监理|施工|竣工|项目|工程|EPC|验收|勘察设计|全过程造价咨询|造价咨询|勘察|可行性研究报告|初步设计|社会稳定风险评估|测绘' \
+                  '|(地震安全性|环境影响|水土保持)评(价估)'
+
+        re_str7 = '许可信息公开表|办理结果公示|审批信息公开表|验收公示表|信息披露|备案登记表|验收结果公示|' \
+                  '审批意见公开|受理公示|施工许可|情况说明|合同纠纷调解|施工许可双公示|施工许可证|政策'
+
+        re_str8 = '[〔〕()\[\]《》()【】{}{}[]<>]'
+
+        re_str14 = '[〔(\[(【{{[<](采购结果|设计|环评|监理|施工|竣工|工程|EPC|验收|勘察设计|全过程造价咨询|造价咨询|勘察)[〕)\]》)】}}]>]'
+
+        # 截取部分
+        re_str2 = '机场快线|科技园|产业园|工业园|工程|项目|施工|竣工|总承包|改造|监理|可研|验收|勘察设计|全过程造价咨询|造价咨询|勘察|可行性研究报告|EPC|初步设计|社会稳定风险评估'
+        re_str3 = '关于(公布核准的|一种用于|核发|作出|同意|开展|调整|请求|规范|要求|进一步|遴选|领取)|关于公[布开示]|关于[为对]|关于|公司|集团|局|委托'
+        re_str9 = '改扩建|建设|迁改|土建|测绘|(地震安全性|环境影响|水土保持)评(价估)'
+
+        # 混淆部分
+        re_str10 = '局部'
+        re_str17 = '(工程|信息|)有限|公司|集团|局|大学|院|学校|学院|中心'
+        re_str18 = '(设计|造价|咨询|建设|项目|管理|工程)+有限|(信息|职业|技术|管理)+(大学|学校|学院|中心|院|所)'
+        re_str26 = '服务类|(设计|造价|咨询|建设|项目|管理|工程)+(大学|学校|学院|中心|院|所|集团|局)'
+
+        # 需判断删除部分
+        re_str15 = '[ 、#※.?<|①=-_—]|=-|##|\*+|[\((](001|[0-9]|[一二三四五六七八九十]+|)[\))]|(0001|0000|001|002|01|02)+'
+        re_str16 = '[0-9][.、]'
+
+        # 删除特定表达
+        re_str20 = '公共资源交易中心.*关于'
+        re_str21 = '[\u4e00-\u9fff]{2,}市[\u4e00-\u9fff]{2,}区'
+        re_str22 = '[\u4e00-\u9fff]{2,4}区[^至]'
+        re_str23 = '.{1,2}招标公告|(PDF|pdf)(版|)'
+        re_str25 = '(小区)$'
+        re_str27 = '[\u4e00-\u9fff]{2,3}省|[\u4e00-\u9fff]{2,3}市'
+
+        re_str_area = '华北|华南|华东|华中|西南|东北|西北'
+        re_str_province = '北京|天津|河北|山西|内蒙古|广东|海南|广西|上海|江苏|浙江|安徽|福建|江西|山东|河南|湖北|湖南|重庆|四川|贵州|云南|西藏|黑龙江|辽宁|吉林|陕西|甘肃|青海|宁夏|新疆|台湾|香港|澳门'
+        re_str_city = '东城|和平|石家庄|唐山|秦皇岛|邯郸|邢台|保定|张家口|承德|沧州|廊坊|衡水|太原|大同|阳泉|长治' \
+                      '|晋城|朔州|晋中|运城|忻州|临汾|吕梁|呼和浩特|包头|乌海|赤峰|通辽|鄂尔多斯|呼伦贝尔|巴彦淖尔' \
+                      '|乌兰察布|兴安盟|锡林郭勒盟|阿拉善盟|广州|韶关|深圳|珠海|汕头|佛山|江门|湛江|茂名|肇庆|惠州' \
+                      '|梅州|汕尾|河源|阳江|清远|潮州|揭阳|云浮|海口|三亚|南宁|柳州|桂林|梧州|北海|防城港|钦州|贵港' \
+                      '|玉林|百色|贺州|河池|来宾|崇左|黄浦|南京|无锡|徐州|常州|苏州|南通|连云港|淮安|盐城|扬州|镇江' \
+                      '|泰州|宿迁|杭州|宁波|温州|嘉兴|湖州|绍兴|金华|衢州|舟山|台州|丽水|合肥|芜湖|蚌埠|淮南|马鞍山' \
+                      '|淮北|铜陵|安庆|黄山|滁州|阜阳|宿州|六安|亳州|池州|宣城|福州|厦门|莆田|三明|泉州|漳州|南平' \
+                      '|龙岩|宁德|南昌|景德镇|萍乡|九江|新余|鹰潭|赣州|吉安|宜春|抚州|上饶|济南|青岛|淄博|枣庄' \
+                      '|东营|烟台|潍坊|济宁|泰安|威海|日照|临沂|德州|聊城|滨州|菏泽|郑州|开封|洛阳|平顶山|安阳|鹤壁' \
+                      '|新乡|焦作|濮阳|许昌|漯河|三门峡|南阳|商丘|信阳|周口|驻马店|武汉|黄石|十堰|宜昌|襄阳|鄂州' \
+                      '|荆门|孝感|荆州|黄冈|咸宁|随州|恩施土家族|长沙|株洲|湘潭|衡阳|邵阳|岳阳|常德|张家界|益阳' \
+                      '|郴州|永州|怀化|娄底|湘西土家族|万州|成都|自贡|攀枝花|泸州|德阳|绵阳|广元|遂宁|内江|乐山' \
+                      '|南充|眉山|宜宾|广安|达州|雅安|巴中|资阳|阿坝藏族羌族|甘孜藏族|凉山彝族|贵阳|六盘水|遵义' \
+                      '|安顺|铜仁|黔西南布依族|毕节|黔东南苗族|黔南布依族|昆明|曲靖|玉溪|保山|昭通|丽江|普洱|临沧' \
+                      '|楚雄彝族|红河哈尼族|文山|西双版纳傣族|大理白族|德宏傣族景颇族|怒江傈僳族|迪庆藏族|拉萨|昌都' \
+                      '|山南|日喀则|那曲|阿里地区|林芝|哈尔滨|齐齐哈尔|鸡西|鹤岗|双鸭山|大庆|伊春|佳木斯|七台河' \
+                      '|牡丹江|黑河|绥化|大兴安岭|沈阳|大连|鞍山|抚顺|本溪|丹东|锦州|营口|阜新|辽阳|盘锦|铁岭' \
+                      '|朝阳|葫芦岛|长春|吉林|四平|辽源|通化|白山|松原|白城|延边朝鲜族|西安|铜川|宝鸡|咸阳|渭南' \
+                      '|延安|汉中|榆林|安康|商洛|兰州|嘉峪关|金昌|白银|天水|武威|张掖|平凉|酒泉|庆阳|定西|陇南' \
+                      '|临夏回族自治州|甘南藏族|西宁|海东|海北藏族|黄南藏族|海南藏族|果洛藏族|玉树藏族|海西蒙古族' \
+                      '|银川|石嘴山|吴忠|固原|中卫|乌鲁木齐|克拉玛依|吐鲁番|哈密|昌吉|博尔塔拉蒙古|巴音郭楞蒙古' \
+                      '|阿克苏|克孜勒苏柯尔克孜|喀什|和田地区|伊犁|伊犁哈萨克|塔城地区|阿勒泰|中山|东莞|天门|仙桃|潜江' \
+                      '|石河子|五家渠|阿拉尔|图木舒克|三沙|儋州|涪陵|永川|西城|朝阳|丰台|石景山|海淀|门头沟' \
+                      '|房山|通州|顺义|昌平|大兴|怀柔|平谷|密云|延庆|河东|河西|河北区|红桥|东丽|西青|津南|北辰' \
+                      '|武清|宝坻|滨海|宁河|静海|蓟州|渝中|大渡口|江北|沙坪坝|九龙坡|南岸|北碚|綦江|大足|渝北' \
+                      '|巴南|黔江|长寿|江津|合川|南川|璧山|铜梁|潼南|荣昌|开州|徐汇|长宁|静安|普陀|虹口|杨浦' \
+                      '|闵行|宝山|嘉定|浦东新|金山|松江|青浦|奉贤|崇明|济源|神农架林区|五指山|文昌|琼海|万宁' \
+                      '|东方|定安|屯昌|澄迈|临高|白沙黎族|昌江黎族|乐东黎族|陵水黎族|保亭黎族|琼中黎族|梁平' \
+                      '|丰都|城口|垫江|忠县|云阳|奉节|巫山|巫溪|石柱|秀山|武隆|酉阳|彭水|南开|北屯|铁门关' \
+                      '|双河|可克达拉|昆玉|胡杨河'
+
+        re_str28 = '({})(地区)?|({})省?|({})[区市]?'.format(re_str_area, re_str_province, re_str_city)
+        re_str29 = '(({})(地区)?({})省?)|(({})省?({})[区市]?)'.format(re_str_area, re_str_province, re_str_province,
+                                                                re_str_city)
+
+        # 直接删除部分
+        re_str24 = '(的|)(竞争性谈判|竞争性磋商|磋商|中标|单一来源|招标|更正)(采购|)(公告|)'
+
+        add_col = project_name if project_name else '' + doctitle if doctitle else ''
+        if re.search(re_str11, add_col) and not re.search(re_str12, add_col):
+            return '', ''
+
+        from_col_list = [project_name, doctitle]
+        name_refind_flag_dict = {'True': [], 'False': []}
+        for col in from_col_list:
+            name_refind = ""
+            match_flag = False
+            if col is not None and len(col) > 0:
+                name_refind = col
+
+                # 部分跳过
+                if re.search(re_str13, name_refind):
+                    continue
+
+                # 替换特定表达
+                match = re.search(re_str20, name_refind)
+                if match:
+                    name_refind = name_refind[match.span()[1]:]
+
+                # 去掉干扰
+                name_refind = re.sub('年度', '年', name_refind)
+                name_refind = re.sub(re_str4, '', name_refind)
+                name_refind = re.sub(re_str14, '', name_refind)
+
+                # print('name_refind', name_refind)
+
+                # 连续截取工程前的,看哪一部分最适合当refind
+                match = re.finditer(re_str2, name_refind)
+                prob_name_list = []
+                last_index = 0
+                project_word_in_org = []
+                for m in match:
+                    # 混淆词,设施工程中的施工
+                    if m.span()[0] > 0 and name_refind[m.span()[0] - 1] in ['设']:
+                        continue
+
+                    # 判断是不是公司名里的工程
+                    if re.search(re_str26, name_refind[m.span()[1]:]):
+                        project_word_in_org.append(
+                            name_refind[max(0, m.span()[0] - 1):min(m.span()[1] + 1, len(name_refind))])
+                        continue
+                    if re.search(re_str17, name_refind[m.span()[1]:m.span()[1] + 3]):
+                        project_word_in_org.append(
+                            name_refind[max(0, m.span()[0] - 1):min(m.span()[1] + 1, len(name_refind))])
+                        continue
+                    if re.search(re_str18, name_refind[m.span()[1]:]):
+                        project_word_in_org.append(
+                            name_refind[max(0, m.span()[0] - 1):min(m.span()[1] + 1, len(name_refind))])
+                        continue
+
+                    match_flag = True
+                    prob_name_list.append(name_refind[last_index:m.span()[1]])
+                    last_index = m.span()[1]
+                # print('match_flag', match_flag, name_refind)
+
+                # 找不到则用第二套截取
+                if not prob_name_list:
+                    match = re.finditer(re_str9, name_refind)
+                    last_index = 0
+                    for m in match:
+                        # 混淆词,设施工程中的施工
+                        if m.span()[0] > 0 and name_refind[m.span()[0] - 1] in ['设']:
+                            continue
+
+                        # 判断是不是公司名里的工程
+                        if re.search(re_str26, name_refind[m.span()[1]:]):
+                            project_word_in_org.append(
+                                name_refind[max(0, m.span()[0] - 1):min(m.span()[1] + 1, len(name_refind))])
+                            continue
+                        if re.search(re_str17, name_refind[m.span()[1]:m.span()[1] + 3]):
+                            project_word_in_org.append(
+                                name_refind[max(0, m.span()[0] - 1):min(m.span()[1] + 1, len(name_refind))])
+                            continue
+                        if re.search(re_str18, name_refind[m.span()[1]:]):
+                            project_word_in_org.append(
+                                name_refind[max(0, m.span()[0] - 1):min(m.span()[1] + 1, len(name_refind))])
+                            continue
+                        match_flag = True
+                        prob_name_list.append(name_refind[last_index:m.span()[1]])
+                        last_index = m.span()[1]
+                if not prob_name_list:
+                    prob_name_list = [name_refind]
+
+                # 一开始不去掉括号里的内容,截取后再去掉
+                for i, name in enumerate(prob_name_list):
+                    # 括号内容大于一半字数,则不去掉括号中的字
+                    match = re.search(re_str1, name)
+                    # print('name', name)
+                    # print('match', match)
+                    if match and len(match.group()) < len(name) / 2:
+                        name = re.sub(re_str1, "", name)
+                    name = re.sub(re_str8, "", name)
+                    prob_name_list[i] = name
+
+                # 判断refind是否合法
+                # print('prob_name_list2', prob_name_list)
+                name_refind = ''
+                for name in prob_name_list:
+                    # 截取公司后的
+                    match = re.finditer(re_str3, name)
+                    prob_name_list2 = []
+                    for m in match:
+                        # 排除混淆的情况
+                        if m.group() in re_str10 and re.search(re_str10, name):
+                            continue
+                        prob_name_list2.append(name[m.span()[1]:])
+                    if prob_name_list2:
+                        name = prob_name_list2[-1]
+
+                    # 剔除工程类判断词
+                    match1 = re.finditer(re_str6, name)
+                    for m1 in match1:
+                        # 混淆词,设施工程中的施工
+                        if m1.span()[0] > 0 and name[m1.span()[0] - 1] in ['设']:
+                            continue
+                        s_index, e_index = m1.span()
+                        word = name[s_index:e_index]
+                        s_index = s_index - 1 if s_index > 0 else 0
+                        e_index = e_index + 1 if e_index < len(name) else len(name)
+                        word1 = name[s_index:e_index]
+                        if word1 in project_word_in_org:
+                            continue
+                        name = re.sub(re.escape(word), '=' * len(word), name)
+                    name = re.sub('={2,}', "", name)
+
+                    # 剔除一些无关词占用长度
+                    if len(re.findall('[\u4e00-\u9fff]', name)) >= min_len \
+                            and len(re.findall('[\u4e00-\u9fff]', re.sub(re_str5, '', name))) >= min_len:
+                        name_refind = name
+                        break
+
+            if match_flag:
+                name_refind_flag_dict['True'] += [name_refind]
+            else:
+                name_refind_flag_dict['False'] += [name_refind]
+
+        true_list = name_refind_flag_dict.get('True')
+        false_list = name_refind_flag_dict.get('False')
+        name_refind_candidate_list = []
+        if true_list:
+            true_list.sort(key=lambda x: len(x), reverse=True)
+            name_refind = true_list[0]
+            name_refind_candidate_list += true_list
+        # else:
+        #     name_refind = ''
+        if false_list:
+            false_list.sort(key=lambda x: len(x), reverse=True)
+            name_refind_candidate_list += false_list
+
+        # 对候选name_refind循环
+        name_refind = ''
+        show_name_refind = ''
+        for name_refind in name_refind_candidate_list:
+            # 直接判断删除数字
+            match = re.match(re_str16, name_refind)
+            if match and not re.match('[0-9]', name_refind[match.span()[1]:match.span()[1] + 1]):
+                name_refind = name_refind[match.span()[1]:]
+
+            # 删除开头奇怪数字
+            match = re.match(re_str15, name_refind)
+            if match and not re.match('[a-zA-Z地块号]', name_refind[match.span()[1]:match.span()[1] + 1]):
+                name_refind = name_refind[match.span()[1]:]
+
+            # 删除期数
+            name_refind = re.sub('[1-9一二三四五六七八九十]期', '', name_refind)
 
-    def comsumer(self,task_queue):
+            # 跳过'xx省xx市'
+            if re.search(re_str21, name_refind):
+                sub_word = re.sub(re_str21, '', name_refind)
+                sub_word = re.sub(re_str2 + '|' + re_str9, '', sub_word)
+                if len(sub_word) <= 1:
+                    name_refind = ''
+                    continue
+            match27 = re.search(re_str27, name_refind)
+            if match27 and len(match27.group()) == len(name_refind):
+                name_refind = ''
+                continue
+            match28 = re.search(re_str28, name_refind)
+            if match28 and len(match28.group()) == len(name_refind):
+                name_refind = ''
+                continue
+            match29 = re.search(re_str29, name_refind)
+            if match29 and len(match29.group()) == len(name_refind):
+                name_refind = ''
+                continue
 
-        def _handle(_proposed,result_queue,ots_client):
+            # 删除类似'招标公告'表达
+            match2 = re.match(re_str23, name_refind)
+            if match2:
+                name_refind = name_refind[match2.span()[1]:]
+            name_refind = re.sub(re_str24, '', name_refind)
 
-            print(_proposed)
+            # 跳过文件审批
+            if re.search(re_str19, name_refind):
+                name_refind = ''
+                continue
 
-            #修改designed_project
-            _time = time.time()
-            _project_dict = _proposed.toDesigned_project(ots_client)
-            log("toDesigned_project takes %.2fs"%(time.time()-_time))
+            # 跳过网上超市
+            if re.search(re_str11, name_refind):
+                name_refind = ''
+                continue
 
+            show_name_refind = copy.deepcopy(name_refind)
+
+            # 删除区
+            match2 = re.match(re_str22, name_refind)
+            if match2:
+                name_refind = name_refind[match2.span()[1] - 1:]
+
+            # 删除'小区表达'
+            if len(name_refind) >= min_len + 2:
+                name_refind = re.sub(re_str25, '', name_refind)
+
+            # 判断name_refind是否是从公司中来的,过滤
+            if tenderee in [None, 'None', '-', '']:
+                tenderee = ''
+            if agency in [None, 'None', '-', '']:
+                agency = ''
             try:
-                _time = time.time()
-                if _project_dict is not None:
-                    #更新数据
-                    _designed_project = designed_project(_project_dict)
-                    _designed_project.update_project(ots_client)
+                if len(name_refind) >= 4 and (
+                        re.search(re.escape(name_refind[-4:]), tenderee) or re.search(re.escape(name_refind[-4:]),
+                                                                                      agency)):
+                    name_refind = ''
+                    show_name_refind = ''
+            except:
+                pass
+
+            # 判断长度
+            if len(name_refind) < min_len:
+                name_refind = ''
+                show_name_refind = ''
+                continue
+
+            break
+
+        return name_refind, show_name_refind
+
+    def get_proper_main_cols(self, pbs):
+        list_group = pbs
+
+        # 过滤特定project_name_refind
+        project_name_refind = list_group[0].get('project_name')
+        if project_name_refind == '':
+            return
+
+        filter_list = ['信息技术服务定点议价', '信息技术服务定点采购', '法人章刻制中介机构', '政府采购意向']
+        re_str11 = '网上超市|服务市场采购|印刷服务|复印纸|车辆维修和保养|商品房预售|办公家具定点|直接订购|定点议价' \
+                   '|政府采购意向|信息技术服务定点议价|信息技术服务定点采购|法人章刻制中介机构|专用设备|办公设备采购' \
+                   '|线上摇号选取'
+        for s in filter_list:
+            if s in project_name_refind:
+                return
+        if re.search(re_str11, project_name_refind):
+            return
+
+        # 特殊情况可以短一点
+        if project_name_refind and re.search('科技园|产业园|工业园', project_name_refind):
+            if len(project_name_refind) <= 5:
+                return
+        else:
+            if len(project_name_refind) <= 6:
+                return
+
+        # 过滤stage过少的大组
+        if len(list_group) >= 100:
+            stage_cnt = 0
+            for g in list_group[1:]:
+                if g.get('stage') and len(g.get('stage', '')) > 0:
+                    stage_cnt += 1
+            if stage_cnt <= 10:
+                logging.info('stage_cnt <= 10 ' + list_group[0].get('project_name', ''))
+                return
+
+        # 按page_time排序
+        main_project = list_group[0]
+        temp_list = list_group[1:]
+        temp_list.sort(key=lambda x: datetime.strptime(x.get('page_time'), '%Y-%m-%d'), reverse=True)
+        main_project['page_time'] = temp_list[0].get('page_time')
+
+        # 项目概况取最新的
+        for g in temp_list:
+            digest = g.get('projectDigest')
+            if digest and len(digest) >= 10:
+                main_project['projectDigest'] = digest
+                break
+
+        cols = ['industry', 'province']
+        dict_list = [{}, {}]
+        district_no_list = ['全国', '未知', '-', '', None]
+        for g in temp_list:
+            for i, col in enumerate(cols):
+                col_value = g.get(col)
+                if col_value and len(col_value) >= 2:
+                    # if col in ['province'] and col_value in district_no_list:
+                    #     continue
+                    if col_value in dict_list[i].keys():
+                        dict_list[i][col_value] += 1
+                    else:
+                        dict_list[i][col_value] = 1
+
+        # 行业分类取众数
+        industry_list = [[x, dict_list[0].get(x)] for x in dict_list[0].keys()]
+        if industry_list:
+            industry_list.sort(key=lambda x: (x[1], x[0]), reverse=True)
+            industry = industry_list[0][0]
+        else:
+            industry = ''
+        main_project['industry'] = industry
+
+        # 区域省市区取众数
+        province_list = [[x, dict_list[1].get(x)] for x in dict_list[1].keys()]
+        if province_list:
+            province_list.sort(key=lambda x: (x[1], x[0]), reverse=True)
+            province = None
+            for item in province_list:
+                if item[0] not in ['全国', '未知']:
+                    province = item[0]
+                    break
+                if item[0] in ['全国', '未知'] and item[0] is None:
+                    province = item[0]
+            if province in ['全国', '未知']:
+                area = province
+            else:
+                area = self.province_area_dict.get(re.sub('省', '', province_list[0][0]))
+        else:
+            province = ''
+            area = ''
+        main_project['province'] = province
+        main_project['area'] = area
+
+        city_dict = {}
+        for g in temp_list:
+            city = g.get('city')
+            if g.get('province') == province and city and len(city) >= 2:
+                if city in city_dict.keys():
+                    city_dict[city] += 1
+                else:
+                    city_dict[city] = 1
+
+        city_list = [[x, city_dict.get(x)] for x in city_dict.keys()]
+        if city_list:
+            city_list.sort(key=lambda x: (x[1], x[0]), reverse=True)
+            city = None
+            for item in city_list:
+                if item[0] not in ['全国', '未知']:
+                    city = item[0]
+                    break
+                if item[0] in ['全国', '未知'] and item[0] is None:
+                    city = item[0]
+        else:
+            city = ''
+        main_project['city'] = city
+
+        district_dict = {}
+        for g in temp_list:
+            district = g.get('district')
+            if g.get('city') == city and district and len(district) >= 2:
+                if district in district_dict.keys():
+                    district_dict[district] += 1
+                else:
+                    district_dict[district] = 1
+        district_list = [[x, district_dict.get(x)] for x in district_dict.keys()]
+        if district_list:
+            district_list.sort(key=lambda x: (x[1], x[0]), reverse=True)
+            district = None
+            for item in district_list:
+                if item[0] not in ['全国', '未知']:
+                    district = item[0]
+                    break
+                if item[0] in ['全国', '未知'] and item[0] is None:
+                    district = item[0]
+        else:
+            district = ''
+        main_project['district'] = district
+
+        # 重新生成展示用的project_name_refind, 取众数
+        show_name_refind_dict = {}
+        for g in temp_list:
+            project_name = g.get('project_name')
+            doctitlle = g.get('doctitlle')
+            tenderee = g.get('tenderee', '')
+            agency = g.get('agency', '')
+            _, show_name_refind = self.get_project_name_refind(project_name, doctitlle, tenderee, agency)
+            if not show_name_refind:
+                continue
+            if show_name_refind in show_name_refind_dict.keys():
+                show_name_refind_dict[show_name_refind] += 1
+            else:
+                show_name_refind_dict[show_name_refind] = 1
+        show_name_refind_list = [[x, show_name_refind_dict.get(x)] for x in show_name_refind_dict.keys()]
+        if show_name_refind_list:
+            show_name_refind_list.sort(key=lambda x: x[1], reverse=True)
+            show_name_refind = show_name_refind_list[0][0]
+            main_project['show_name_refind'] = show_name_refind
+            main_project['project_name'] = show_name_refind
+
+        if not main_project.get('project_name') or not main_project.get('project_name_refind'):
+            return
+        if len(main_project['project_name']) <= 6 or len(main_project['project_name_refind']) <= 6:
+            return
+
+        # 建筑面积分开数字和单位,取最新有值的
+        for i, g in enumerate(temp_list):
+            proportion = g.get('proportion')
+            if proportion:
+                proportion_unit = proportion[-1]
+                try:
+                    proportion = float(proportion[:-1])
+                except:
+                    proportion = None
+                    proportion_unit = None
+            else:
+                proportion = None
+                proportion_unit = None
+            temp_list[i]['proportion'] = proportion
+            temp_list[i]['proportion_unit'] = proportion_unit
+
+        for g in temp_list:
+            proportion = g.get('proportion')
+            proportion_unit = g.get('proportion_unit')
+            if proportion is not None and proportion_unit is not None:
+                main_project['proportion'] = proportion
+                main_project['proportion_unit'] = proportion_unit
+                break
+        if main_project.get('proportion') in ['', None] or main_project.get('proportion_unit') in ['', None]:
+            main_project['proportion'] = None
+            main_project['proportion_unit'] = None
+
+        # 有无钢结构、有无电梯 有一篇有就是1
+        main_project['has_steel'] = 0
+        for g in temp_list:
+            if g.get('has_steel') == 1:
+                main_project['has_steel'] = 1
+                break
+        main_project['has_elevator'] = 0
+        for g in temp_list:
+            if g.get('has_elevator') == 1:
+                main_project['has_elevator'] = 1
+                break
+
+        # 工程性质 取最新的
+        for g in temp_list:
+            project_property = g.get('project_property')
+            if project_property:
+                main_project['project_property'] = project_property
+                break
 
-                #删除tmp
-                _proposed.delete_row(ots_client)
-                log("update designed takes %.2fs"%(time.time()-_time))
-            except Exception as e:
-                log("comsumer failed cause of %s"%(str(e)))
-                log(traceback.format_exc())
+        # 最高层 取最大的
+        for key in ['max_floor']:
+            max_value = 0
+            for g in temp_list:
+                value = g.get(key)
+                if value and value > max_value:
+                    max_value = value
+            if max_value > 0:
+                main_project[key] = max_value
 
+        # 总投资、工程造价、建安费 去重相加
+        for key in ['total_invest', 'construct_install_fee', 'engineer_cost']:
+            total_value_list = []
+            for g in temp_list:
+                value = g.get(key)
+                if value:
+                    total_value_list += [value]
+            total_value_list = list(set(total_value_list))
+            total_value = sum(total_value_list)
+            if total_value > 0:
+                main_project[key] = total_value
 
-        result_queue = queue.Queue()
+        # 结构类型、外墙类型 合并去重
+        for key in ['structure', 'wall_type', 'wall_type2']:
+            value_list = []
+            for g in temp_list:
+                value = g.get(key)
+                if value:
+                    value_list.append(value)
+            if value_list:
+                value_list = list(set(value_list))
+                value_list.sort(key=lambda x: x)
+                main_project[key] = ','.join(value_list)
 
-        mt = MultiThreadHandler(task_queue,_handle,result_queue,thread_count=30,ots_client=self.ots_client)
-        mt.run()
+        list_group = [main_project] + temp_list
+        return list_group
 
-    def waitTask(self,task_queue):
-        for i in range(60):
-            if task_queue.qsize()>0:
-                return True
+    def filter_cols(self, pbs):
+        list_group = pbs
+
+        # follow项目保留stage, page_time, docid, shenpi_id, type_id, phase, section
+        remain_col_follow = ["docid", "shenpi_id", 'type_id', "stage", "page_time", "location",
+                             'phase', "section", 'new_enough', 'follow_enough']
+        for i in range(len(list_group)):
+            new_dict = {}
+            if i == 0:
+                continue
             else:
-                time.sleep(1)
-        return False
+                for col in remain_col_follow:
+                    new_dict[col] = list_group[i].get(col)
+            list_group[i] = new_dict
+        return list_group
+
+    def data_to_pb_project(self, all_merge_data_list, show=0):
+        pb_group_list = []
+        cols = ['docid', 'extract_json', 'tenderee', 'agency', 'doctitle', 'project_name', 'project_code', 'page_time']
+        for merge_data_list in all_merge_data_list:
+            pbs = merge_data_list
+            if show:
+                if pbs:
+                    print([x.get('docid') for x in pbs])
+
+            # 去重排序限制
+            pbs = list(set([json.dumps(x) for x in pbs]))
+            pbs = [json.loads(x) for x in pbs]
+
+            temp_list = []
+            for pb in pbs:
+                try:
+                    datetime.strptime(pb.get('page_time'), '%Y-%m-%d')
+                    temp_list.append(pb)
+                except:
+                    continue
+            pbs = temp_list
+
+            pbs.sort(key=lambda x: datetime.strptime(x.get('page_time'), '%Y-%m-%d'), reverse=True)
+            pbs = pbs[:500]
+            # print('data_to_pb_project len(pbs) 0', len(pbs))
+
+            # 生成main_project
+            pbs = self.add_main_project(pbs)
+            pb_group_list.append(pbs)
+        if show:
+            print('data_to_pb_project len(pb_group_list) 0', len(pb_group_list))
+            if pb_group_list:
+                for pbs in pb_group_list:
+                    if pbs:
+                        print('docid', [x.get('docid') for x in pbs])
+                        print('shenpi_id', [x.get('shenpi_id') for x in pbs])
+                        print('total_invest', [x.get('total_invest') for x in pbs])
+                        print('project_name', [x.get('project_name') for x in pbs])
+
+        # 根据省市地区重新分组
+        temp_pb_group_list = []
+        delete_pb_group_list = []
+        for pb_group in pb_group_list:
+            if not pb_group:
+                continue
+            # 得到新的分组
+            sub_pb_group_list = self.remerge_by_district(pb_group)
+            if not sub_pb_group_list:
+                continue
+            if len(sub_pb_group_list) > 1:
+                temp_pb_group_list += sub_pb_group_list
+                delete_pb_group_list.append(pb_group)
+        for pb_group in delete_pb_group_list:
+            pb_group_list.remove(pb_group)
+        pb_group_list += temp_pb_group_list
+        if show:
+            print('data_to_pb_project len(pb_group_list) 1', len(pb_group_list))
+            if pb_group_list:
+                for pbs in pb_group_list:
+                    if pbs:
+                        print('docid', [x.get('docid') for x in pbs])
+                        print('shenpi_id', [x.get('shenpi_id') for x in pbs])
+                        print('total_invest', [x.get('total_invest') for x in pbs])
+                        print('project_name', [x.get('project_name') for x in pbs])
+
+        # 判断组是否符合拟在建条件
+        temp_pb_group_list = []
+        for pb_group in pb_group_list:
+            if not pb_group:
+                continue
+            new_pb_group = self.judge_project(pb_group)
+            if new_pb_group:
+                temp_pb_group_list.append(new_pb_group)
+        pb_group_list = temp_pb_group_list
+        if show:
+            print('data_to_pb_project len(pb_group_list) 2', len(pb_group_list))
+            if pb_group_list:
+                for pbs in pb_group_list:
+                    if pbs:
+                        print('docid', [x.get('docid') for x in pbs])
+                        print('shenpi_id', [x.get('shenpi_id') for x in pbs])
+                        print('total_invest', [x.get('total_invest') for x in pbs])
+                        print('project_name', [x.get('project_name') for x in pbs])
+
+        # main_project的每个字段选取合适的值
+        temp_pb_group_list = []
+        for pb_group in pb_group_list:
+            if not pb_group:
+                continue
+            new_pb_group = self.get_proper_main_cols(pb_group)
+            if new_pb_group:
+                temp_pb_group_list.append(new_pb_group)
+        pb_group_list = temp_pb_group_list
+        if show:
+            print('data_to_pb_project len(pb_group_list) 3', len(pb_group_list))
+            if pb_group_list:
+                for pbs in pb_group_list:
+                    if pbs:
+                        print('docid', [x.get('docid') for x in pbs])
+                        print('total_invest', [x.get('total_invest') for x in pbs])
+                        print('project_name', [x.get('project_name') for x in pbs])
+
+        # 筛选需输出字段
+        temp_pb_group_list = []
+        for pb_group in pb_group_list:
+            new_pb_group = self.filter_cols(pb_group)
+            temp_pb_group_list.append(new_pb_group)
+        pb_group_list = temp_pb_group_list
+        if show:
+            print('data_to_pb_project len(pb_group_list) 4', len(pb_group_list))
+        return pb_group_list
+
+    def pb_project_to_designed_project(self, pb_project_list):
+        update_project_list = []
+        for pb_project in pb_project_list:
+            pb_temp = proposedBuilding_tmp({"json_list_group": json.dumps(pb_project, ensure_ascii=False)})
+            # print('before to_designed_project_online main_project', pb_project[0])
+            project_dict = pb_temp.to_designed_project_online(self.ots_client)
+            if project_dict:
+                # print('project_dict', project_dict)
+                if self.update_designed_project_flag:
+                    _designed_project = designed_project(project_dict)
+                    _designed_project.update_project(self.ots_client)
+                    log('update to designed_project success!')
+                update_project_list.append(project_dict)
+        return update_project_list
 
-    def maxcompute2ots(self):
+    def update_opertime(self, data_list):
+        data_format = "%Y-%m-%d %H:%M:%S"
+        opertime_last = self.opertime_last
+        opertime_last = datetime.strptime(opertime_last, data_format)
+        for data_dict in data_list:
+            opertime = data_dict.get('opertime')
+            opertime = datetime.strptime(opertime, data_format)
+            if opertime > opertime_last:
+                opertime_last = opertime
+        opertime_last = opertime_last.strftime(data_format)
+        with open(self.opertime_txt, 'w') as f:
+            f.write(opertime_last)
+        return opertime_last
 
-        task_queue = queue.Queue()
 
-        self.producer(task_queue)
+def run_flow(data, test_mode=0, update=1):
+    ds = PBProduction(test_mode, update)
+    globals()['docid'] = data.get('docid')
+    try:
+        pbs = ds.pb_data_flow([data])
+    except:
+        traceback.print_exc()
+        print(data.get('docid'))
+        pbs = []
+    return pbs
 
-        # _dict = {"uuid":"12313","crtime":123,
-        #                 "json_list_group":'''
-        #                 [{"docid": 218170163, "shenpi_id": null, "type_id": "218170163", "page_time": "2022-01-26", "province": "广东", "city": "深圳", "district": "罗湖", "tenderee": "深圳市城市建设开发(集团)有限公司", "tenderee_contact": "孙工", "tenderee_phone": "18998998087", "agency": "", "project_code": null, "project_name": "成都市白鹭湾(住宅用地)项目可行性研究", "doctitle": "成都市白鹭湾(住宅用地)项目可行性研究服务招标公告", "docchannel": "52", "stage": "可研阶段", "proportion": "建筑面积为32388.83㎡", "projectDigest": "招标信息),六、开标地点:深圳市罗湖区桂园街道滨河东路1011号鹿丹大厦12层会议室;(鉴于疫情原因,投标人自行决定是否到开标现场开标;如投标人需要到开标现场,请投标人关注、执行深圳市关于疫情的相关规定,并提前2天与招标人进行沟通协商。),七、投标人资格标准等详细内容详见招标文件。招标联系人:孙工;联系电话:18998998087;邮箱:sundh@szcjjt.com;张工;联系电话:13928429353;邮箱:zli@szcjjt.com", "projectAddress": null, "begin_time": null, "end_time": null, "project_name_refind": "成都市白鹭湾(住宅用地)项目可行性研究", "industry": "办公楼", "location": null, "section_num": "-1", "new_enough": 1, "follow_enough": 1}]
-        #                 '''}
-        # task_queue.put(proposedBuilding_tmp(_dict))
 
-        self.comsumer(task_queue)
+if __name__ == "__main__":
+    # df = pd.read_csv('pb_docid_240730.csv')
+    # docid_list = df['docid'].tolist()
 
-    def scheduler(self):
-        _scheduler = BlockingScheduler()
-        _scheduler.add_job(self.maxcompute2ots,"cron",minute="*/1")
-        _scheduler.start()
+    process_num = 4
 
+    sleep_sec = 100
+    min_data_cnt = 20
+    while True:
+        sleep_flag = 1
+        ds = PBProduction()
+        ds.test_mode = 0
+        ds.test_doc_ids = [514054154]
+        # ds.test_doc_ids = docid_list[:1000]
 
-def startSychro():
-    ds = DataProduction()
-    ds.scheduler()
+        tmp_data_list = ds.pb_data_start()
+        log('len(tmp_data_list)', len(tmp_data_list))
+        if len(tmp_data_list) <= 0:
+            print('sleep for', sleep_sec)
+            time.sleep(sleep_sec)
+            continue
 
+        mp_start_time = time.time()
+        with multiprocessing.Pool(processes=process_num) as pool:
+            results = pool.map(run_flow, tmp_data_list)
+            str_list = [json.dumps(x, ensure_ascii=False) + '\n' for x in results]
+            # with open('json.txt', 'w', encoding='utf-8') as f:
+            #     f.writelines(str_list)
+            len_list = [len(x) for x in results]
+            all_pb_num = sum(len_list)
 
-if __name__=="__main__":
-    ds = DataProduction()
-    # ds.scheduler()
-    ds.maxcompute2ots()
+            # 更新数据时间
+            if not ds.test_mode:
+                last_time_str = ds.update_opertime(tmp_data_list)
 
+                # 判断时间
+                data_format = "%Y-%m-%d %H:%M:%S"
+                last_time = datetime.strptime(last_time_str, data_format)
+                now = datetime.now()
+                print('*' * 30, last_time_str)
+                if now - last_time >= timedelta(seconds=sleep_sec):
+                    sleep_flag = 0
 
+            pool.close()
+            pool.join()
+        log('mp process data num:', len(tmp_data_list), 'update pb num:', all_pb_num,
+            'cost:', round(time.time() - mp_start_time, 2),
+            'avg:', round((time.time() - mp_start_time) / len(tmp_data_list), 2))
 
+        if sleep_flag:
+            print('sleep for', sleep_sec)
+            time.sleep(sleep_sec)

+ 88 - 0
BaseDataMaintenance/maintenance/proposedBuilding/repair_pb_project.py

@@ -0,0 +1,88 @@
+from tablestore import BoolQuery, TermQuery, Sort, FieldSort, SortOrder, ColumnReturnType, SearchQuery, ColumnsToGet
+
+from BaseDataMaintenance.common.Utils import getRow_ots
+from BaseDataMaintenance.dataSource.source import getConnect_ots
+from BaseDataMaintenance.maintenance.proposedBuilding.pb_project_production import run_flow, PBProduction
+from BaseDataMaintenance.model.ots.designed_project import designed_project
+
+
+def repair_pb_project(pb_project_id, pb_project_pk):
+    # 修改原来错的拟在建项目status为404, update_time为now
+    ots_client = getConnect_ots()
+    obj = designed_project({'id': pb_project_id, 'partitionkey': pb_project_pk, 'status': '404'})
+    obj.update_row(ots_client, delete=0)
+
+    # 把错的拟在建的docids,spids拿回来,用单进程重新跑一遍
+    bool_query = BoolQuery(must_queries=[TermQuery("id", pb_project_id)])
+    data_list = search_util(ots_client,
+                            'designed_project', 'designed_project_index',
+                            bool_query, ['docids', 'spids'], 'update_time',
+                            limit=1, while_next=0)
+    id_list = []
+    if data_list:
+        docids = data_list[0].get('docids')
+        spids = data_list[0].get('spids')
+        if docids:
+            id_list += docids.split(',')
+        if spids:
+            id_list += spids.split(',')
+    id_list = list(set(id_list))
+    print('id_list', id_list)
+
+    # 重跑
+    for docid in id_list:
+        ds = PBProduction(test_mode=1, update=1)
+        ds.test_doc_ids = [docid]
+        tmp_data_list = ds.pb_data_start()
+        print('len(tmp_data_list)', len(tmp_data_list))
+        if tmp_data_list:
+            run_flow(tmp_data_list[0], test_mode=1, update=1)
+
+    # 修改update_time,同步document
+    # obj = designed_project({'id': _id, 'partitionkey': pk,
+    #                         'status': '404', 'update_time': '2024-07-31 14:35:31'})
+    # obj.update_row(ots_client, delete=0)
+    return
+
+
+def search_util(ots_client, table, table_index, bool_query, columns, sort_col,
+                limit=999999, show_total=0, while_next=1, asc=0):
+    if sort_col:
+        if asc:
+            sort = Sort(sorters=[FieldSort(sort_col, SortOrder.ASC)])
+        else:
+            sort = Sort(sorters=[FieldSort(sort_col, SortOrder.DESC)])
+    else:
+        sort = None
+    return_type = ColumnReturnType.SPECIFIED
+    rows, next_token, total_count, is_all_succeed = ots_client.search(table, table_index,
+                                                                      SearchQuery(bool_query, sort=sort,
+                                                                                  limit=100,
+                                                                                  get_total_count=True),
+                                                                      ColumnsToGet(columns, return_type))
+    list_data = getRow_ots(rows)
+    if show_total:
+        print('search total_count', total_count)
+    if len(list_data) >= limit:
+        print('limit ', limit, len(list_data))
+        return list_data[:limit]
+
+    if while_next:
+        while next_token and len(list_data) < limit:
+            rows, next_token, total_count, is_all_succeed = ots_client.search(table, table_index,
+                                                                              SearchQuery(bool_query,
+                                                                                          next_token=next_token,
+                                                                                          limit=100,
+                                                                                          get_total_count=True),
+                                                                              ColumnsToGet(columns,
+                                                                                           return_type))
+            list_data += getRow_ots(rows)
+    return list_data
+
+
+if __name__ == '__main__':
+    _id = 4915783839244091393
+    pk = 194
+
+    repair_pb_project(_id, pk)
+

+ 7 - 3
BaseDataMaintenance/model/ots/designed_project.py

@@ -1,6 +1,7 @@
 #encoding:UTF8
 
 from BaseDataMaintenance.model.ots.BaseModel import BaseModel
+from datetime import datetime
 from tablestore import *
 from BaseDataMaintenance.common.Utils import *
 
@@ -109,7 +110,7 @@ class designed_project(BaseModel):
                 _list.append((_key,_v))
         return _list
 
-    def update_row(self,ots_client,retrytimes=3):
+    def update_row(self,ots_client,retrytimes=3, delete=1):
         primary_key = self.getPrimaryKey_turple()
         update_of_attribute_columns = {
             'PUT' : self.getAttribute_turple()
@@ -118,7 +119,7 @@ class designed_project(BaseModel):
         condition = Condition('IGNORE')
         for i in range(retrytimes):
             try:
-                if self.exists_row(ots_client):
+                if self.exists_row(ots_client) and delete:
                     self.delete_row(ots_client)
                 consumed, return_row = ots_client.update_row(self.table_name, row, condition)
                 return True
@@ -149,7 +150,10 @@ class designed_project(BaseModel):
             for _dict in list_dict[1:]:
                 _designed_delete = designed_project(_dict)
 
-                _designed_delete.setValue("status","404",True)
+                _designed_delete.setValue("status", "404", True)
+                current_time = datetime.now()
+                current_time = current_time.strftime('%Y-%m-%d %H:%M:%S')
+                _designed_delete.setValue("update_time", current_time, True)
                 _designed_delete.update_project(ots_client)
                 # _designed_delete.delete_row(ots_client)
 

Разница между файлами не показана из-за своего большого размера
+ 898 - 62
BaseDataMaintenance/model/ots/proposedBuilding_tmp.py


Некоторые файлы не были показаны из-за большого количества измененных файлов