Selaa lähdekoodia

预下载附件及设置status流程

fangjiasheng 1 vuosi sitten
vanhempi
commit
1fc79ae011

+ 1 - 1
BaseDataMaintenance/common/multiThread.py

@@ -46,7 +46,7 @@ class _taskHandler(threading.Thread):
     def run(self):
         while(True):
             try:
-                logging.info("handler task queue size is %d need_stop %s thread_id:%d-%d"%(self.task_queue.qsize(),str(self.need_stop),os.getpid(),threading.get_ident()))
+                logging.info("%s - handler task queue size is %d need_stop %s thread_id:%d-%d"%(self.task_handler.__name__, self.task_queue.qsize(),str(self.need_stop),os.getpid(),threading.get_ident()))
                 item = self.task_queue.get(True,timeout=5)
 
                 self.task_handler(item,self.result_queue,*self.args,**self.kwargs)

+ 10 - 10
BaseDataMaintenance/maintenance/document/download_attachment_and_set_status_rerun.py

@@ -7,14 +7,13 @@ import time
 import pandas as pd
 import numpy as np
 
-from BaseDataMaintenance.common.multiThread import MultiThreadHandler
-
 sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../../")
 from BaseDataMaintenance.common.ossUtils import downloadFile
 from BaseDataMaintenance.dataSource.source import getConnect_ots
 from BaseDataMaintenance.maintenance.dataflow import Dataflow
 from BaseDataMaintenance.model.ots.attachment import attachment, attachment_filemd5, attachment_status
 from BaseDataMaintenance.model.ots.document import Document, document_docid, document_partitionkey, document_status
+from BaseDataMaintenance.common.multiThread import MultiThreadHandler
 
 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
 
@@ -211,14 +210,14 @@ def set_status_document(args, queue):
     docid, p_key = args
     # print(docid, p_key)
     ots_doc = Document({document_docid: docid, document_partitionkey: p_key, document_status: 1})
-    # ots_doc.update_row(ots_client)
+    ots_doc.update_row(ots_client)
 
 
 def set_status_attachment(args, queue):
     md5, path = args
     # print(md5, path)
     ots_attach = attachment({attachment_filemd5: md5, attachment_status: 0})
-    # ots_attach.update_row(ots_client)
+    ots_attach.update_row(ots_client)
 
 
 def rerun_main(file_path, set_docid=True, set_md5=True):
@@ -240,11 +239,12 @@ def rerun_main(file_path, set_docid=True, set_md5=True):
         print(md5_path_list[:10])
 
     # 多线程执行
-    batch = 1000
+    batch = 100
+    start_index = 1800
     used_md5_list = []
     if md5_path_list and docid_key_list:
         start_time = time.time()
-        for i in range(0, len(docid_key_list), batch):
+        for i in range(start_index, len(docid_key_list), batch):
             logging.info('Start batch ' + str(i) + ' to ' + str(i+batch))
 
             # 取一个batch的docid
@@ -272,7 +272,7 @@ def rerun_main(file_path, set_docid=True, set_md5=True):
             for k in sub_md5_path_list:
                 task_queue.put(k)
             a = MultiThreadHandler(task_queue=task_queue, task_handler=download_attachment_mp,
-                                   result_queue=result_queue, thread_count=1)
+                                   result_queue=result_queue, thread_count=4)
             a.run()
 
             # set attachment status
@@ -287,14 +287,14 @@ def rerun_main(file_path, set_docid=True, set_md5=True):
             # set document status
             task_queue = queue.Queue()
             result_queue = queue.Queue()
-            for k in docid_key_list:
+            for k in sub_docid_key_list:
                 task_queue.put(k)
             a = MultiThreadHandler(task_queue=task_queue, task_handler=set_status_document,
                                    result_queue=result_queue, thread_count=10)
             a.run()
             logging.info('Finish batch ' + str(i) + ' to ' + str(i+batch) + ' ' + str(time.time()-start_time))
-            time.sleep(5)
-            break
+            time.sleep(10)
+
 
 
 if __name__ == '__main__':

+ 136 - 0
BaseDataMaintenance/maintenance/proposedBuilding/one_col_format_unify.py

@@ -0,0 +1,136 @@
+import time
+import pandas as pd
+from sqlalchemy import create_engine
+from tablestore import INF_MIN, INF_MAX, CompositeColumnCondition, LogicalOperator, SingleColumnCondition, \
+    ComparatorType, Direction, OTSClientError, OTSServiceError
+from BaseDataMaintenance.dataSource.source import getConnect_ots
+
+ots_client = getConnect_ots()
+
+
+def get_one_col_all_data():
+    table_name = 'designed_project'
+
+    # 设置范围读的起始主键。
+    inclusive_start_primary_key = [('partitionkey', INF_MIN), ('id', INF_MIN)]
+
+    # 设置范围读的结束主键。
+    exclusive_end_primary_key = [('partitionkey', INF_MAX), ('id', INF_MAX)]
+
+    # 查询所有列。
+    columns_to_get = ['project_investment']
+
+    # 每次最多返回90行,如果总共有100个结果,首次查询时指定limit=90,则第一次最多返回90,最少可能返回0个结果,但是next_start_primary_key不为None。
+    limit = 90
+
+    # 设置过滤器,增加列条件。过滤条件为address列值等于'China'且age列值小于50。
+    cond = CompositeColumnCondition(LogicalOperator.AND)
+
+    # 如果某行不存在对应列时,您需要配置pass_if_missing参数来确定该行是否满足过滤条件。
+    # 当不设置pass_if_missing或者设置pass_if_missing为True时,表示当某行不存在该列时,该行满足过滤条件。
+    # 当设置pass_if_missing为False时,表示当某行不存在该列时,该行不满足过滤条件。
+    # cond.add_sub_condition(SingleColumnCondition("address", 'China', ComparatorType.EQUAL, pass_if_missing=False))
+    # cond.add_sub_condition(SingleColumnCondition("age", 50, ComparatorType.LESS_THAN, pass_if_missing=False))
+
+    all_rows = []
+    start_time1 = time.time()
+    try:
+        # 调用get_range接口。
+        consumed, next_start_primary_key, row_list, next_token = ots_client.get_range(
+            table_name,
+            Direction.FORWARD,
+            inclusive_start_primary_key,
+            exclusive_end_primary_key,
+            columns_to_get,
+            limit,
+            # column_filter=cond,
+            max_version=1,
+            # time_range=(1557125059000, 1557129059000)  # start_time大于等于1557125059000,end_time小于1557129059000。
+        )
+        all_rows.extend(row_list)
+
+        # 当next_start_primary_key不为空时,则继续读取数据。
+        index = 0
+        start_time = time.time()
+        while next_start_primary_key is not None:
+            if index % 1000 == 0:
+                print('Loop', (index+2)*limit, time.time()-start_time)
+                start_time = time.time()
+            inclusive_start_primary_key = next_start_primary_key
+            consumed, next_start_primary_key, row_list, next_token = ots_client.get_range(
+                table_name,
+                Direction.FORWARD,
+                inclusive_start_primary_key,
+                exclusive_end_primary_key,
+                columns_to_get,
+                limit,
+                # column_filter=cond,
+                max_version=1
+            )
+            all_rows.extend(row_list)
+            index += 1
+
+    # 客户端异常,一般为参数错误或者网络异常。
+    except OTSClientError as e:
+        print('get row failed, http_status:%d, error_message:%s' % (e.get_http_status(), e.get_error_message()))
+    # 服务端异常,一般为参数错误或者流控错误。
+    except OTSServiceError as e:
+        print('get row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s' % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id()))
+
+    # 打印主键和属性列。
+    result_rows = []
+    for row in all_rows:
+        # print(row.primary_key, row.attribute_columns)
+        result_rows.append([row.primary_key[0][1], row.primary_key[1][1], row.attribute_columns[0][1]])
+    print('Total rows: ', len(all_rows), 'Total time: ', time.time() - start_time1)
+    print(result_rows[0])
+    print(result_rows[1])
+
+    return result_rows
+
+
+def list_to_xlsx(data_list):
+    df = pd.DataFrame(data_list)
+    df.columns = ['partitionkey', 'id', 'project_investment']
+    df.to_csv('D:\\BIDI_DOC\\比地_文档\\统一格式_project_investment.csv', index=False)
+
+
+def csv_to_mysql():
+    mysql_host = '192.168.2.170:3306'
+    mysql_db = 'exportdb'
+    mysql_user = 'root'
+    mysql_pwd = 'pwdformysql0922'
+    mysql_table = 'bdm_one_col_format_unify'
+    xlsx_path = r'D:\BIDI_DOC\比地_文档\统一格式_project_investment.csv'
+
+    engine = create_engine('mysql+pymysql://{}:{}@{}/{}?charset=utf8'.format(mysql_user, mysql_pwd, mysql_host, mysql_db))
+    # df = pd.read_excel(xlsx_path)
+    df = pd.read_csv(xlsx_path)
+    # 表名 需删除索引列
+    df.to_sql(mysql_table, con=engine, if_exists='append', index=False)
+    """
+        to_sql参数:(比较重要)
+            if_exists:表如果存在怎么处理
+                    append:追加
+                    replace:删除原表,建立新表再添加
+                    fail:什么都不干
+             chunksize: 默认的话是一次性导入, 给值的话是批量导入,一批次导入多少
+             index=False:不插入索引index
+             dtype 创建表结构
+               需要导入 import sqlalchemy
+               dtype = {'id': sqlalchemy.types.BigInteger(),
+                 'name': sqlalchemy.types.String(length=20),
+                 'sex': sqlalchemy.types.String(length=20),
+                 'age': sqlalchemy.types.BigInteger(),
+                 })
+             
+    """
+
+
+if __name__ == '__main__':
+    # _list = get_one_col_all_data()
+    # list_to_xlsx(_list)
+    csv_to_mysql()
+
+    # 拿到单行所有数据上传到maxcompute处理成统一格式,再更新
+    # 本地取数据是因为maxcompute取数据需要数据类型,一列中有多种数据类型会报错

+ 53 - 0
BaseDataMaintenance/maintenance/proposedBuilding/update_col.py

@@ -0,0 +1,53 @@
+from BaseDataMaintenance.model.ots.designed_project import designed_project, designed_project_id, \
+    designed_project_partitionkey, designed_project_project_investment, designed_project_engineer_cost, \
+    designed_project_proportion_unit, designed_project_facade_type, designed_project_construct_install_fee, \
+    designed_project_project_structure, designed_project_has_steel, designed_project_covered_area, \
+    designed_project_floor_space, designed_project_elevator, designed_project_project_nature, \
+    designed_project_floors_num
+from BaseDataMaintenance.dataSource.source import getConnect_ots
+
+
+ots_client = getConnect_ots()
+
+
+def update():
+    _dict1 = {
+        designed_project_partitionkey: 2,
+        designed_project_id: 4624623048274739201,
+        designed_project_project_investment: 100000.12354,
+        designed_project_engineer_cost: 20000.222,
+        designed_project_construct_install_fee: 10000.4,
+        designed_project_floor_space: '',
+        designed_project_covered_area: 900,
+        designed_project_proportion_unit: 'm',
+        designed_project_project_structure: '框架结构',
+        designed_project_facade_type: '玻璃幕墙',
+        designed_project_has_steel: 1,
+        designed_project_elevator: 1,
+        designed_project_project_nature: '新建',
+        designed_project_floors_num: 0,
+    }
+
+    _dict2 = {
+        designed_project_partitionkey: 2,
+        designed_project_id: 4647683317091532801,
+        designed_project_project_investment: None,
+        designed_project_engineer_cost: 20.222,
+        designed_project_construct_install_fee: 100,
+        designed_project_floor_space: '',
+        designed_project_covered_area: 5465.36,
+        # designed_project_proportion_unit: '㎡',
+        designed_project_proportion_unit: '',
+        designed_project_project_structure: None,
+        designed_project_facade_type: '',
+        designed_project_has_steel: 0,
+        designed_project_elevator: 0,
+        designed_project_project_nature: None,
+        designed_project_floors_num: 2,
+    }
+    ots_dp = designed_project(_dict2)
+    ots_dp.update_row(ots_client)
+
+
+if __name__ == '__main__':
+    update()

+ 47 - 0
BaseDataMaintenance/model/ots/designed_project.py

@@ -4,6 +4,53 @@ from BaseDataMaintenance.model.ots.BaseModel import BaseModel
 from tablestore import *
 from BaseDataMaintenance.common.Utils import *
 
+
+designed_project_partitionkey = 'partitionkey'
+designed_project_id = 'id'
+designed_project_area = 'area'
+designed_project_begintime = 'begintime'
+designed_project_city = 'city'
+designed_project_construct_install_fee = 'construct_install_fee'
+designed_project_contacts = 'contacts'
+designed_project_covered_area = 'covered_area'
+designed_project_crtime = 'crtime'
+designed_project_des_project_type = 'des_project_type'
+designed_project_district = 'district'
+designed_project_docids = 'docids'
+designed_project_elevator = 'elevator'
+designed_project_endtime = 'endtime'
+designed_project_engineer_cost = 'engineer_cost'
+designed_project_facade_type = 'facade_type'
+designed_project_facade_type2 = 'facade_type2'
+designed_project_floor_space = 'floor_space'
+designed_project_floors_num = 'floors_num'
+designed_project_follow_number = 'follow_number'
+designed_project_follows = 'follows'
+designed_project_full_text = 'full_text'
+designed_project_has_steel = 'has_steel'
+designed_project_high_project_name = 'high_project_name'
+designed_project_json_list_group = 'json_list_group'
+designed_project_latest_service_time = 'latest_service_time'
+designed_project_ordinary_name = 'ordinary_name'
+designed_project_page_time = 'page_time'
+designed_project_progress = 'progress'
+designed_project_project_address = 'project_address'
+designed_project_project_code = 'project_code'
+designed_project_project_description = 'project_description'
+designed_project_project_follow = 'project_follow'
+designed_project_project_investment = 'project_investment'
+designed_project_project_name = 'project_name'
+designed_project_project_nature = 'project_nature'
+designed_project_project_structure = 'project_structure'
+designed_project_project_type = 'project_type'
+designed_project_proportion_unit = 'proportion_unit'
+designed_project_province = 'province'
+designed_project_spids = 'spids'
+designed_project_status = 'status'
+designed_project_update_status = 'update_status'
+designed_project_update_time = 'update_time'
+
+
 class designed_project(BaseModel):
 
     def __init__(self,_dict):

+ 25 - 13
BaseDataMaintenance/start_main.py

@@ -1,20 +1,29 @@
-
 import sys
 import os
-sys.path.append(os.path.dirname(__file__)+"/..")
+sys.path.append(os.path.dirname(__file__) + "/..")
 import argparse
 
+
 def main(args=None):
     parser = argparse.ArgumentParser()
-    parser.add_argument("--aA",dest="attachAttachment",action="store_true",help="start attachmentAttachment process")
-    parser.add_argument("--etr",dest="enterpriseToRedis",action="store_true",help="start attachmentAttachment process")
-    parser.add_argument("--filename",dest="filename",type=str,default=None,help="start attachmentAttachment process")
-    parser.add_argument("--delkey",dest="deleteEnterpriseKey",action="store_true",help="start attachmentAttachment process")
-    parser.add_argument("--keys",dest="keys",type=str,default=None,help="start attachmentAttachment process")
-    parser.add_argument("--rptc",dest="remove_processed_tyc_company",action="store_true",help="start attachmentAttachment process")
-    parser.add_argument("--product_dict_synchonize",dest="product_dict_synchonize",action="store_true",help="start product_dict_synchonize process")
-    parser.add_argument("--delete_product_collections",dest="delete_product_collections",action="store_true",help="start product_dict_synchonize process")
-    parser.add_argument("--search_similar",dest="search_similar",action="store_true",help="start product_dict_synchonize process")
+    parser.add_argument("--aA", dest="attachAttachment", action="store_true", help="start attachmentAttachment process")
+    parser.add_argument("--etr", dest="enterpriseToRedis", action="store_true",
+                        help="start attachmentAttachment process")
+    parser.add_argument("--filename", dest="filename", type=str, default=None,
+                        help="start attachmentAttachment process")
+    parser.add_argument("--delkey", dest="deleteEnterpriseKey", action="store_true",
+                        help="start attachmentAttachment process")
+    parser.add_argument("--keys", dest="keys", type=str, default=None, help="start attachmentAttachment process")
+    parser.add_argument("--rptc", dest="remove_processed_tyc_company", action="store_true",
+                        help="start attachmentAttachment process")
+    parser.add_argument("--product_dict_synchonize", dest="product_dict_synchonize", action="store_true",
+                        help="start product_dict_synchonize process")
+    parser.add_argument("--delete_product_collections", dest="delete_product_collections", action="store_true",
+                        help="start product_dict_synchonize process")
+    parser.add_argument("--search_similar", dest="search_similar", action="store_true",
+                        help="start product_dict_synchonize process")
+    parser.add_argument("--rerun", dest="rerun", type=str, default=None,
+                        help="start download_attachment_and_set_status_rerun process, with docid file path and md5 file path")
 
     args = parser.parse_args(args)
     if args.attachAttachment:
@@ -27,7 +36,7 @@ def main(args=None):
     if args.deleteEnterpriseKey:
         from BaseDataMaintenance.model.redis.enterprise import remove_enterprise_key
         if args.keys or args.filename:
-            remove_enterprise_key(args.filename,args.keys)
+            remove_enterprise_key(args.filename, args.keys)
     if args.remove_processed_tyc_company:
         from BaseDataMaintenance.maintenance.tyc_company.remove_processed import start_remove_processed_tyc_company
         start_remove_processed_tyc_company()
@@ -40,7 +49,10 @@ def main(args=None):
     if args.search_similar:
         from BaseDataMaintenance.maintenance.product.product_dict import search_similar
         search_similar()
+    if args.rerun:
+        from BaseDataMaintenance.maintenance.document.download_attachment_and_set_status_rerun import rerun_main
+        rerun_main(args.rerun)
 
 
 if __name__ == '__main__':
-    main()
+    main()