浏览代码

初始提交

luojiehua 4 年之前
当前提交
5d1cc0f663
共有 32 个文件被更改,包括 1799 次插入0 次删除
  1. 2 0
      .idea/.gitignore
  2. 15 0
      .idea/codeStyles/Project.xml
  3. 5 0
      .idea/codeStyles/codeStyleConfig.xml
  4. 8 0
      .idea/encodings.xml
  5. 6 0
      .idea/misc.xml
  6. 8 0
      .idea/modules.xml
  7. 6 0
      .idea/vcs.xml
  8. 9 0
      BaseDataMaintenance.iml
  9. 8 0
      BaseDataMaintenance/__init__.py
  10. 846 0
      BaseDataMaintenance/common/Utils.py
  11. 0 0
      BaseDataMaintenance/common/__init__.py
  12. 109 0
      BaseDataMaintenance/common/multiThread.py
  13. 0 0
      BaseDataMaintenance/dataSource/__init__.py
  14. 28 0
      BaseDataMaintenance/dataSource/pool.py
  15. 42 0
      BaseDataMaintenance/dataSource/setttings.py
  16. 94 0
      BaseDataMaintenance/dataSource/source.py
  17. 24 0
      BaseDataMaintenance/dataSource/账号.txt
  18. 0 0
      BaseDataMaintenance/maintenance/__init__.py
  19. 107 0
      BaseDataMaintenance/maintenance/proposedBuilding/DataSynchronization.py
  20. 0 0
      BaseDataMaintenance/maintenance/proposedBuilding/__init__.py
  21. 12 0
      BaseDataMaintenance/maintenance/proposedBuilding/readme.md
  22. 1 0
      BaseDataMaintenance/model/__init__.py
  23. 92 0
      BaseDataMaintenance/model/ots/BaseModel.py
  24. 0 0
      BaseDataMaintenance/model/ots/__init__.py
  25. 51 0
      BaseDataMaintenance/model/ots/designed_project.py
  26. 21 0
      BaseDataMaintenance/model/ots/enterprise.py
  27. 196 0
      BaseDataMaintenance/model/ots/proposedBuilding_tmp.py
  28. 0 0
      BaseDataMaintenance/primarykey/__init__.py
  29. 41 0
      BaseDataMaintenance/primarykey/startSnowflake.py
  30. 9 0
      BaseDataMaintenance/start_sychro_proposedBuilding.py
  31. 54 0
      nohup.out
  32. 5 0
      test.py

+ 2 - 0
.idea/.gitignore

@@ -0,0 +1,2 @@
+# Default ignored files
+/workspace.xml

+ 15 - 0
.idea/codeStyles/Project.xml

@@ -0,0 +1,15 @@
+<component name="ProjectCodeStyleConfiguration">
+  <code_scheme name="Project" version="173">
+    <codeStyleSettings language="JAVA">
+      <indentOptions>
+        <option name="TAB_SIZE" value="2" />
+      </indentOptions>
+    </codeStyleSettings>
+    <codeStyleSettings language="Python">
+      <indentOptions>
+        <option name="TAB_SIZE" value="2" />
+        <option name="SMART_TABS" value="true" />
+      </indentOptions>
+    </codeStyleSettings>
+  </code_scheme>
+</component>

+ 5 - 0
.idea/codeStyles/codeStyleConfig.xml

@@ -0,0 +1,5 @@
+<component name="ProjectCodeStyleConfiguration">
+  <state>
+    <option name="PREFERRED_PROJECT_CODE_STYLE" value="Default" />
+  </state>
+</component>

+ 8 - 0
.idea/encodings.xml

@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="Encoding">
+    <file url="file://$PROJECT_DIR$/common/Utils.py" charset="GBK" />
+    <file url="file://$PROJECT_DIR$/common/multiThread.py" charset="GBK" />
+    <file url="file://$PROJECT_DIR$/dataSource" charset="GBK" />
+  </component>
+</project>

+ 6 - 0
.idea/misc.xml

@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="ProjectRootManager" version="2" project-jdk-name="Python 3.5 (dl_nlp)" project-jdk-type="Python SDK">
+    <output url="file://$PROJECT_DIR$/out" />
+  </component>
+</project>

+ 8 - 0
.idea/modules.xml

@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="ProjectModuleManager">
+    <modules>
+      <module fileurl="file://$PROJECT_DIR$/BaseDataMaintenance.iml" filepath="$PROJECT_DIR$/BaseDataMaintenance.iml" />
+    </modules>
+  </component>
+</project>

+ 6 - 0
.idea/vcs.xml

@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="VcsDirectoryMappings">
+    <mapping directory="$PROJECT_DIR$" vcs="Git" />
+  </component>
+</project>

+ 9 - 0
BaseDataMaintenance.iml

@@ -0,0 +1,9 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module type="PYTHON_MODULE" version="4">
+  <component name="NewModuleRootManager" inherit-compiler-output="true">
+    <exclude-output />
+    <content url="file://$MODULE_DIR$" />
+    <orderEntry type="inheritedJdk" />
+    <orderEntry type="sourceFolder" forTests="false" />
+  </component>
+</module>

+ 8 - 0
BaseDataMaintenance/__init__.py

@@ -0,0 +1,8 @@
+
+import sys
+
+import os
+
+sys.path.append(os.path.dirname(__file__)+"/..")
+
+print(sys.path)

+ 846 - 0
BaseDataMaintenance/common/Utils.py

@@ -0,0 +1,846 @@
+'''
+Created on 2018年12月20日
+
+@author: User
+'''
+
+import numpy as np
+import re
+import gensim
+from keras import backend as K
+import os
+
+from threading import RLock
+
+from pai_tf_predict_proto import tf_predict_pb2
+import requests
+
+import time
+
+
+model_w2v = None
+lock_model_w2v = RLock()
+
+USE_PAI_EAS = False
+
+Lazy_load = False
+
+ILLEGAL_CHARACTERS_RE = re.compile(r'[\000-\010]|[\013-\014]|[\016-\037]')
+
+def getLegal_str(_str):
+    if _str is not None:
+        return ILLEGAL_CHARACTERS_RE.sub("",str(_str))
+
+def getRow_ots_primary(row):
+    _dict = dict()
+    if row is None:
+        return _dict
+    for part in row.attribute_columns:
+        _dict[part[0]] = part[1]
+    for part in row.primary_key:
+        _dict[part[0]] = part[1]
+    return _dict
+
+def getRow_ots(rows):
+    list_dict = []
+    for row in rows:
+        _dict = dict()
+        for part in row:
+            for v in part:
+                _dict[v[0]] = v[1]
+        list_dict.append(_dict)
+    return list_dict
+
+def getw2vfilepath():
+    w2vfile = os.path.dirname(__file__)+"/../wiki_128_word_embedding_new.vector"
+    if os.path.exists(w2vfile):
+        return w2vfile
+    return "wiki_128_word_embedding_new.vector"
+
+def getLazyLoad():
+    global Lazy_load
+    return Lazy_load
+
+def get_file_name(url, headers):
+    filename = ''
+    if 'Content-Disposition' in headers and headers['Content-Disposition']:
+        disposition_split = headers['Content-Disposition'].split(';')
+        if len(disposition_split) > 1:
+            if disposition_split[1].strip().lower().startswith('filename='):
+                file_name = disposition_split[1].split('=')
+                if len(file_name) > 1:
+                    filename = file_name[1]
+    if not filename and os.path.basename(url):
+        filename = os.path.basename(url).split("?")[0]
+    if not filename:
+        return time.time()
+    return filename
+
+model_word_file = os.path.dirname(__file__)+"/../singlew2v_model.vector"
+model_word = None
+lock_model_word = RLock()
+
+from decimal import Decimal
+import logging
+logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+logger = logging.getLogger(__name__)
+import pickle
+import os
+
+import json
+
+#自定义jsonEncoder
+class MyEncoder(json.JSONEncoder):
+
+    def __init__(self):
+        import numpy as np
+        global np
+
+    def default(self, obj):
+        if isinstance(obj, np.ndarray):
+            return obj.tolist()
+        elif isinstance(obj, bytes):
+            return str(obj, encoding='utf-8')
+        elif isinstance(obj, (np.float_, np.float16, np.float32,
+                              np.float64)):
+            return float(obj)
+        elif isinstance(obj,(np.int64,np.int32)):
+            return int(obj)
+        return json.JSONEncoder.default(self, obj)
+
+vocab_word = None
+vocab_words = None
+
+file_vocab_word = "vocab_word.pk"
+file_vocab_words = "vocab_words.pk"
+
+selffool_authorization = "NjlhMWFjMjVmNWYyNzI0MjY1OGQ1M2Y0ZmY4ZGY0Mzg3Yjc2MTVjYg=="
+selffool_url = "http://pai-eas-vpc.cn-beijing.aliyuncs.com/api/predict/selffool_gpu"
+selffool_seg_authorization = "OWUwM2Q0ZmE3YjYxNzU4YzFiMjliNGVkMTA3MzJkNjQ2MzJiYzBhZg=="
+selffool_seg_url = "http://pai-eas-vpc.cn-beijing.aliyuncs.com/api/predict/selffool_seg_gpu"
+codename_authorization = "Y2M5MDUxMzU1MTU4OGM3ZDk2ZmEzYjkxYmYyYzJiZmUyYTgwYTg5NA=="
+codename_url = "http://pai-eas-vpc.cn-beijing.aliyuncs.com/api/predict/codename_gpu"
+
+form_item_authorization = "ODdkZWY1YWY0NmNhNjU2OTI2NWY4YmUyM2ZlMDg1NTZjOWRkYTVjMw=="
+form_item_url = "http://pai-eas-vpc.cn-beijing.aliyuncs.com/api/predict/form"
+person_authorization = "N2I2MDU2N2Q2MGQ0ZWZlZGM3NDkyNTA1Nzc4YmM5OTlhY2MxZGU1Mw=="
+person_url = "http://pai-eas-vpc.cn-beijing.aliyuncs.com/api/predict/person"
+role_authorization = "OWM1ZDg5ZDEwYTEwYWI4OGNjYmRlMmQ1NzYwNWNlZGZkZmRmMjE4OQ=="
+role_url = "http://pai-eas-vpc.cn-beijing.aliyuncs.com/api/predict/role"
+money_authorization = "MDQyNjc2ZDczYjBhYmM4Yzc4ZGI4YjRmMjc3NGI5NTdlNzJiY2IwZA=="
+money_url = "http://pai-eas-vpc.cn-beijing.aliyuncs.com/api/predict/money"
+codeclasses_authorization = "MmUyNWIxZjQ2NjAzMWJlMGIzYzkxMjMzNWY5OWI3NzJlMWQ1ZjY4Yw=="
+codeclasses_url = "http://pai-eas-vpc.cn-beijing.aliyuncs.com/api/predict/codeclasses"
+
+def viterbi_decode(score, transition_params):
+    """Decode the highest scoring sequence of tags outside of TensorFlow.
+
+    This should only be used at test time.
+
+    Args:
+      score: A [seq_len, num_tags] matrix of unary potentials.
+      transition_params: A [num_tags, num_tags] matrix of binary potentials.
+
+    Returns:
+      viterbi: A [seq_len] list of integers containing the highest scoring tag
+          indices.
+      viterbi_score: A float containing the score for the Viterbi sequence.
+    """
+    trellis = np.zeros_like(score)
+    backpointers = np.zeros_like(score, dtype=np.int32)
+    trellis[0] = score[0]
+
+    for t in range(1, score.shape[0]):
+        v = np.expand_dims(trellis[t - 1], 1) + transition_params
+        trellis[t] = score[t] + np.max(v, 0)
+        backpointers[t] = np.argmax(v, 0)
+
+    viterbi = [np.argmax(trellis[-1])]
+    for bp in reversed(backpointers[1:]):
+        viterbi.append(bp[viterbi[-1]])
+    viterbi.reverse()
+
+    viterbi_score = np.max(trellis[-1])
+    return viterbi, viterbi_score
+
+import ctypes
+import inspect
+
+def _async_raise(tid, exctype):
+    """raises the exception, performs cleanup if needed"""
+    tid = ctypes.c_long(tid)
+    if not inspect.isclass(exctype):
+        exctype = type(exctype)
+    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
+    if res == 0:
+        raise ValueError("invalid thread id")
+    elif res != 1:
+        ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
+        raise SystemError("PyThreadState_SetAsyncExc failed")
+
+def stop_thread(thread):
+    _async_raise(thread.ident, SystemExit)
+
+def limitRun(sess,list_output,feed_dict,MAX_BATCH=1024):
+    len_sample = 0
+    if len(feed_dict.keys())>0:
+        len_sample = len(feed_dict[list(feed_dict.keys())[0]])
+    if len_sample>MAX_BATCH:
+        list_result = [[] for _ in range(len(list_output))]
+        _begin = 0
+        while(_begin<len_sample):
+            new_dict = dict()
+            for _key in feed_dict.keys():
+                new_dict[_key] = feed_dict[_key][_begin:_begin+MAX_BATCH]
+            _output = sess.run(list_output,feed_dict=new_dict)
+            for _index in range(len(list_output)):
+                list_result[_index].extend(_output[_index])
+            _begin += MAX_BATCH
+    else:
+        list_result = sess.run(list_output,feed_dict=feed_dict)
+    return list_result
+
+
+
+def get_values(response,output_name):
+        """
+        Get the value of a specified output tensor
+        :param output_name: name of the output tensor
+        :return: the content of the output tensor
+        """
+        output = response.outputs[output_name]
+        if output.dtype == tf_predict_pb2.DT_FLOAT:
+            _value = output.float_val
+        elif output.dtype == tf_predict_pb2.DT_INT8 or output.dtype == tf_predict_pb2.DT_INT16 or \
+                output.dtype == tf_predict_pb2.DT_INT32:
+            _value = output.int_val
+        elif output.dtype == tf_predict_pb2.DT_INT64:
+            _value = output.int64_val
+        elif output.dtype == tf_predict_pb2.DT_DOUBLE:
+            _value = output.double_val
+        elif output.dtype == tf_predict_pb2.DT_STRING:
+            _value = output.string_val
+        elif output.dtype == tf_predict_pb2.DT_BOOL:
+            _value = output.bool_val
+        return np.array(_value).reshape(response.outputs[output_name].array_shape.dim)
+
+def vpc_requests(url,authorization,request_data,list_outputs):
+    
+    
+    headers = {"Authorization": authorization}
+    dict_outputs = dict()
+    
+    response = tf_predict_pb2.PredictResponse()
+    resp = requests.post(url, data=request_data, headers=headers)
+    
+    
+    if resp.status_code != 200:
+        print(resp.status_code,resp.content)
+        log("调用pai-eas接口出错,authorization:"+str(authorization))
+        return None
+    else:
+        response = tf_predict_pb2.PredictResponse()
+        response.ParseFromString(resp.content)
+        for _output in list_outputs:
+            dict_outputs[_output] = get_values(response, _output)
+        return dict_outputs
+
+def encodeInput(data,word_len,word_flag=True,userFool=False):
+    result = []
+    out_index = 0
+    for item in data:
+        if out_index in [0]:
+            list_word = item[-word_len:]
+        else:
+            list_word = item[:word_len]
+        temp = []
+        if word_flag:
+            for word in list_word:
+                if userFool:
+                    temp.append(getIndexOfWord_fool(word))
+                else:
+                    temp.append(getIndexOfWord(word))
+            list_append = []
+            temp_len = len(temp)
+            while(temp_len<word_len):
+                if userFool:
+                    list_append.append(0)
+                else:
+                    list_append.append(getIndexOfWord("<pad>"))
+                temp_len += 1
+            if out_index in [0]:
+                temp = list_append+temp
+            else:
+                temp = temp+list_append
+        else:
+            for words in list_word:
+                temp.append(getIndexOfWords(words))
+                
+            list_append = []
+            temp_len = len(temp)
+            while(temp_len<word_len):
+                list_append.append(getIndexOfWords("<pad>"))
+                temp_len += 1
+            if out_index in [0,1]:
+                temp = list_append+temp
+            else:
+                temp = temp+list_append
+        result.append(temp)
+        out_index += 1
+    return result
+
+def encodeInput_form(input,MAX_LEN=30):
+    x = np.zeros([MAX_LEN])
+    for i in range(len(input)):
+        if i>=MAX_LEN:
+            break
+        x[i] = getIndexOfWord(input[i])
+    return x
+    
+
+def getVocabAndMatrix(model,Embedding_size = 60):
+    '''
+    @summary:获取子向量的词典和子向量矩阵
+    '''
+    vocab = ["<pad>"]+model.index2word
+    
+    embedding_matrix = np.zeros((len(vocab),Embedding_size))
+    for i in range(1,len(vocab)):
+        embedding_matrix[i] = model[vocab[i]]
+    
+    return vocab,embedding_matrix
+
+def getIndexOfWord(word):
+    global vocab_word,file_vocab_word
+    if vocab_word is None:
+        if os.path.exists(file_vocab_word):
+            vocab = load(file_vocab_word)
+            vocab_word = dict((w, i) for i, w in enumerate(np.array(vocab)))
+        else:
+            model = getModel_word()
+            vocab,_ = getVocabAndMatrix(model, Embedding_size=60)
+            vocab_word = dict((w, i) for i, w in enumerate(np.array(vocab)))
+            save(vocab,file_vocab_word)
+    if word in vocab_word.keys():
+        return vocab_word[word]
+    else:
+        return vocab_word['<pad>']
+        
+def getIndexOfWords(words):
+    global vocab_words,file_vocab_words
+    if vocab_words is None:
+        if os.path.exists(file_vocab_words):
+            vocab = load(file_vocab_words)
+            vocab_words = dict((w, i) for i, w in enumerate(np.array(vocab)))
+        else:
+            model = getModel_w2v()
+            vocab,_ = getVocabAndMatrix(model, Embedding_size=128)
+            vocab_words = dict((w, i) for i, w in enumerate(np.array(vocab)))
+            save(vocab,file_vocab_words)
+    if words in vocab_words.keys():
+        return vocab_words[words]
+    else:
+        return vocab_words["<pad>"]
+
+def isCellphone(phone):
+    if re.search("^1\d{10}$",phone) is not None:
+        return True
+    return False
+
+def popNoneFromDict(_dict):
+    list_pop = []
+    for k,v in _dict.items():
+        if v is None or v=="":
+            list_pop.append(k)
+    for k in list_pop:
+        _dict.pop(k)
+    return _dict
+
+def getCurrent_date(format="%Y-%m-%d %H:%M:%S"):
+    _time = time.strftime(format,time.localtime())
+    return _time
+
+def log_tofile(filename):
+    logging.basicConfig(filename=filename,level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+    logger = logging.getLogger(__name__)
+
+def log(msg):
+    '''
+    @summary:打印信息
+    '''
+    logger.info(msg)
+
+def debug(msg):
+    '''
+    @summary:打印信息
+    '''
+    logger.debug(msg)
+
+
+def save(object_to_save, path):
+    '''
+    保存对象
+    @Arugs:
+        object_to_save: 需要保存的对象
+
+    @Return:
+        保存的路径
+    '''
+    with open(path, 'wb') as f:
+        pickle.dump(object_to_save, f)
+
+def load(path):
+    '''
+    读取对象
+    @Arugs:
+        path: 读取的路径
+
+    @Return:
+        读取的对象
+    '''
+    with open(path, 'rb') as f:
+        object1 = pickle.load(f)
+        return object1
+    
+
+def getIndexOfWord_fool(word):
+    
+    if word in fool_char_to_id.keys():
+        return fool_char_to_id[word]
+    else:
+        return fool_char_to_id["[UNK]"]
+
+
+def find_index(list_tofind,text):
+    '''
+    @summary: 查找所有词汇在字符串中第一次出现的位置
+    @param:
+        list_tofind:待查找词汇
+        text:字符串
+    @return: list,每个词汇第一次出现的位置
+    
+    '''
+    result = []
+    for item in list_tofind:
+        index = text.find(item)
+        if index>=0:
+            result.append(index)
+        else:
+            result.append(-1)
+    return result
+
+
+def combine(list1,list2):
+    '''
+    @summary:将两个list中的字符串两两拼接
+    @param:
+        list1:字符串list
+        list2:字符串list
+    @return:拼接结果list
+    '''
+    result = []
+    for item1 in list1:
+        for item2 in list2:
+            result.append(str(item1)+str(item2))
+    return result
+
+
+def getDigitsDic(unit):
+    '''
+    @summary:拿到中文对应的数字
+    '''
+    DigitsDic = {"零":0, "壹":1, "贰":2, "叁":3, "肆":4, "伍":5, "陆":6, "柒":7, "捌":8, "玖":9,
+                 "〇":0, "一":1, "二":2, "三":3, "四":4, "五":5, "六":6, "七":7, "八":8, "九":9}
+    return DigitsDic.get(unit)
+
+def getMultipleFactor(unit):
+    '''
+    @summary:拿到单位对应的值
+    '''
+    MultipleFactor = {"兆":Decimal(1000000000000),"亿":Decimal(100000000),"万":Decimal(10000),"仟":Decimal(1000),"千":Decimal(1000),"佰":Decimal(100),"百":Decimal(100),"拾":Decimal(10),"十":Decimal(10),"元":Decimal(1),"角":round(Decimal(0.1),1),"分":round(Decimal(0.01),2)}
+    return MultipleFactor.get(unit)
+
+def getUnifyMoney(money):
+    '''
+    @summary:将中文金额字符串转换为数字金额
+    @param:
+        money:中文金额字符串
+    @return: decimal,数据金额
+    '''
+
+
+    MAX_NUM = 12
+    #去掉逗号
+    money = re.sub("[,,]","",money)
+    money = re.sub("[^0-9.零壹贰叁肆伍陆柒捌玖拾佰仟萬億〇一二三四五六七八九十百千万亿元角分]","",money)
+    result = Decimal(0)
+    chnDigits = ["零", "壹", "贰", "叁", "肆", "伍", "陆", "柒", "捌", "玖"]
+    chnFactorUnits = ["兆", "亿", "万", "仟", "佰", "拾","元","角","分"]
+
+    LowMoneypattern = re.compile("^[\d,]+(\.\d+)?$")
+    BigMoneypattern = re.compile("^零?(?P<BigMoney>[%s])$"%("".join(chnDigits)))
+    if re.search(LowMoneypattern,money) is not None:
+        return Decimal(money)
+    elif re.search(BigMoneypattern,money) is not None:
+        return getDigitsDic(re.search(BigMoneypattern,money).group("BigMoney"))
+    for factorUnit in chnFactorUnits:
+        if re.search(re.compile(".*%s.*"%(factorUnit)),money) is not None:
+            subMoneys = re.split(re.compile("%s(?!.*%s.*)"%(factorUnit,factorUnit)),money)
+            if re.search(re.compile("^(\d+(,)?)+(\.\d+)?$"),subMoneys[0]) is not None:
+                result += Decimal(subMoneys[0])*(getMultipleFactor(factorUnit))
+            elif len(subMoneys[0])==1:
+                if re.search(re.compile("^[%s]$"%("".join(chnDigits))),subMoneys[0]) is not None:
+                    result += Decimal(getDigitsDic(subMoneys[0]))*(getMultipleFactor(factorUnit))
+            else:
+                result += Decimal(getUnifyMoney(subMoneys[0]))*(getMultipleFactor(factorUnit))
+
+            if len(subMoneys)>1:
+                if re.search(re.compile("^(\d+(,)?)+(\.\d+)?[百千万亿]?\s?(元)?$"),subMoneys[1]) is not None:
+                    result += Decimal(subMoneys[1])
+                elif len(subMoneys[1])==1:
+                    if re.search(re.compile("^[%s]$"%("".join(chnDigits))),subMoneys[1]) is not None:
+                        result += Decimal(getDigitsDic(subMoneys[1]))
+                else:
+                    result += Decimal(getUnifyMoney(subMoneys[1]))
+            break
+    return result
+
+
+
+
+def getModel_w2v():
+    '''
+    @summary:加载词向量
+    '''
+    global model_w2v,lock_model_w2v
+    with lock_model_w2v:
+        if model_w2v is None:
+            model_w2v = gensim.models.KeyedVectors.load_word2vec_format(getw2vfilepath(),binary=True)
+        return model_w2v
+
+def getModel_word():
+    '''
+    @summary:加载字向量
+    '''
+
+    global model_word,lock_model_w2v
+    with lock_model_word:
+        if model_word is None:
+            model_word = gensim.models.KeyedVectors.load_word2vec_format(model_word_file,binary=True)
+        return model_word
+
+# getModel_w2v()
+# getModel_word()
+
+def findAllIndex(substr,wholestr):
+    '''
+    @summary: 找到字符串的子串的所有begin_index
+    @param:
+        substr:子字符串
+        wholestr:子串所在完整字符串
+    @return: list,字符串的子串的所有begin_index
+    '''
+    copystr = wholestr
+    result = []
+    indexappend = 0
+    while(True):
+        index = copystr.find(substr)
+        if index<0:
+            break
+        else:
+            result.append(indexappend+index)
+            indexappend += index+len(substr)
+            copystr = copystr[index+len(substr):]
+    return result
+    
+  
+def spanWindow(tokens,begin_index,end_index,size,center_include=False,word_flag = False,use_text = False,text = None):
+    '''
+    @summary:取得某个实体的上下文词汇
+    @param:
+        tokens:句子分词list
+        begin_index:实体的开始index
+        end_index:实体的结束index
+        size:左右两边各取多少个词
+        center_include:是否包含实体
+        word_flag:词/字,默认是词
+    @return: list,实体的上下文词汇
+    '''  
+    if use_text:
+        assert text is not None
+    length_tokens = len(tokens)
+    if begin_index>size:
+        begin = begin_index-size
+    else:
+        begin = 0
+    if end_index+size<length_tokens:
+        end = end_index+size+1
+    else:
+        end = length_tokens
+    result = []
+    if not word_flag:
+        result.append(tokens[begin:begin_index])
+        if center_include:
+            if use_text:
+                result.append(text)
+            else:
+                result.append(tokens[begin_index:end_index+1])
+        result.append(tokens[end_index+1:end])
+    else:
+        result.append("".join(tokens[begin:begin_index]))
+        if center_include:
+            if use_text:
+                result.append(text)
+            else:
+                result.append("".join(tokens[begin_index:end_index+1]))
+        result.append("".join(tokens[end_index+1:end]))
+    #print(result)
+    return result
+
+#根据规则补全编号或名称两边的符号
+def fitDataByRule(data):
+    symbol_dict = {"(":")",
+                   "(":")",
+                   "[":"]",
+                   "【":"】",
+                   ")":"(",
+                   ")":"(",
+                   "]":"[",
+                   "】":"【"}
+    leftSymbol_pattern = re.compile("[\((\[【]")
+    rightSymbol_pattern = re.compile("[\))\]】]")
+    leftfinds = re.findall(leftSymbol_pattern,data)
+    rightfinds = re.findall(rightSymbol_pattern,data)
+    result = data
+    if len(leftfinds)+len(rightfinds)==0:
+        return data
+    elif len(leftfinds)==len(rightfinds):
+        return data
+    elif abs(len(leftfinds)-len(rightfinds))==1:
+        if len(leftfinds)>len(rightfinds):
+            if symbol_dict.get(data[0]) is not None:
+                result = data[1:]
+            else:
+                #print(symbol_dict.get(leftfinds[0]))
+                result = data+symbol_dict.get(leftfinds[0])
+        else:
+            if symbol_dict.get(data[-1]) is not None:
+                result = data[:-1]
+            else:
+                result = symbol_dict.get(rightfinds[0])+data
+    result = re.sub("[。]","",result)
+    return  result
+
+def embedding(datas,shape):
+    '''
+    @summary:查找词汇对应的词向量
+    @param:
+        datas:词汇的list
+        shape:结果的shape
+    @return: array,返回对应shape的词嵌入
+    '''
+    model_w2v = getModel_w2v()
+    embed = np.zeros(shape)
+    length = shape[1]
+    out_index = 0
+    #print(datas)
+    for data in datas:
+        index = 0
+        for item in data:
+            item_not_space = re.sub("\s*","",item)
+            if index>=length:
+                break
+            if item_not_space in model_w2v.vocab:
+                embed[out_index][index] = model_w2v[item_not_space]
+                index += 1
+            else:
+                #embed[out_index][index] = model_w2v['unk']
+                index += 1
+        out_index += 1
+    return embed
+
+def embedding_word(datas,shape):
+    '''
+    @summary:查找词汇对应的词向量
+    @param:
+        datas:词汇的list
+        shape:结果的shape
+    @return: array,返回对应shape的词嵌入
+    '''
+    model_w2v = getModel_word()
+    embed = np.zeros(shape)
+    length = shape[1]
+    out_index = 0
+    #print(datas)
+    for data in datas:
+        index = 0
+        for item in str(data)[-shape[1]:]:
+            if index>=length:
+                break
+            if item in model_w2v.vocab:
+                embed[out_index][index] = model_w2v[item]
+                index += 1
+            else:
+                # embed[out_index][index] = model_w2v['unk']
+                index += 1
+        out_index += 1
+    return embed
+
+def formEncoding(text,shape=(100,60),expand=False):
+    embedding = np.zeros(shape)
+    word_model = getModel_word()
+    for i in range(len(text)):
+        if i>=shape[0]:
+            break
+        if text[i] in word_model.vocab:
+            embedding[i] = word_model[text[i]]
+    if expand:
+        embedding = np.expand_dims(embedding,0)
+    return embedding
+
+def partMoney(entity_text,input2_shape = [7]):
+    '''
+    @summary:对金额分段
+    @param:
+        entity_text:数值金额
+        input2_shape:分类数
+    @return: array,分段之后的独热编码
+    '''
+    money = float(entity_text)
+    parts = np.zeros(input2_shape)
+    if money<100:
+        parts[0] = 1
+    elif money<1000:
+        parts[1] = 1
+    elif money<10000:
+        parts[2] = 1
+    elif money<100000:
+        parts[3] = 1
+    elif money<1000000:
+        parts[4] = 1
+    elif money<10000000:
+        parts[5] = 1
+    else:
+        parts[6] = 1
+    return parts
+
+def recall(y_true, y_pred):
+    '''
+    计算召回率
+    @Argus:
+        y_true: 正确的标签
+        y_pred: 模型预测的标签
+
+    @Return
+        召回率
+    '''
+    c1 = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
+    c3 = K.sum(K.round(K.clip(y_true, 0, 1)))
+    if c3 == 0:
+        return 0
+    recall = c1 / c3
+    return recall
+
+
+def f1_score(y_true, y_pred):
+    '''
+    计算F1
+
+    @Argus:
+        y_true: 正确的标签
+        y_pred: 模型预测的标签
+
+    @Return
+        F1值
+    '''
+
+    c1 = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
+    c2 = K.sum(K.round(K.clip(y_pred, 0, 1)))
+    c3 = K.sum(K.round(K.clip(y_true, 0, 1)))
+    precision = c1 / c2
+    if c3 == 0:
+        recall = 0
+    else:
+        recall = c1 / c3
+    f1_score = 2 * (precision * recall) / (precision + recall)
+    return f1_score
+
+
+def precision(y_true, y_pred):
+    '''
+    计算精确率
+
+    @Argus:
+        y_true: 正确的标签
+        y_pred: 模型预测的标签
+
+    @Return
+        精确率
+    '''
+    c1 = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
+    c2 = K.sum(K.round(K.clip(y_pred, 0, 1)))
+    precision = c1 / c2
+    return precision
+
+# def print_metrics(history):
+#     '''
+#     制作每次迭代的各metrics变化图片
+#
+#     @Arugs:
+#         history: 模型训练迭代的历史记录
+#     '''
+#     import matplotlib.pyplot as plt
+#
+#     # loss图
+#     loss = history.history['loss']
+#     val_loss = history.history['val_loss']
+#     epochs = range(1, len(loss) + 1)
+#     plt.subplot(2, 2, 1)
+#     plt.plot(epochs, loss, 'bo', label='Training loss')
+#     plt.plot(epochs, val_loss, 'b', label='Validation loss')
+#     plt.title('Training and validation loss')
+#     plt.xlabel('Epochs')
+#     plt.ylabel('Loss')
+#     plt.legend()
+#
+#     # f1图
+#     f1 = history.history['f1_score']
+#     val_f1 = history.history['val_f1_score']
+#     plt.subplot(2, 2, 2)
+#     plt.plot(epochs, f1, 'bo', label='Training f1')
+#     plt.plot(epochs, val_f1, 'b', label='Validation f1')
+#     plt.title('Training and validation f1')
+#     plt.xlabel('Epochs')
+#     plt.ylabel('F1')
+#     plt.legend()
+#
+#     # precision图
+#     prec = history.history['precision']
+#     val_prec = history.history['val_precision']
+#     plt.subplot(2, 2, 3)
+#     plt.plot(epochs, prec, 'bo', label='Training precision')
+#     plt.plot(epochs, val_prec, 'b', label='Validation pecision')
+#     plt.title('Training and validation precision')
+#     plt.xlabel('Epochs')
+#     plt.ylabel('Precision')
+#     plt.legend()
+#
+#     # recall图
+#     recall = history.history['recall']
+#     val_recall = history.history['val_recall']
+#     plt.subplot(2, 2, 4)
+#     plt.plot(epochs, recall, 'bo', label='Training recall')
+#     plt.plot(epochs, val_recall, 'b', label='Validation recall')
+#     plt.title('Training and validation recall')
+#     plt.xlabel('Epochs')
+#     plt.ylabel('Recall')
+#     plt.legend()
+#
+#     plt.show()
+
+
+if __name__=="__main__":
+    print(fool_char_to_id[">"])
+    # model = getModel_w2v()
+    # vocab,matrix = getVocabAndMatrix(model, Embedding_size=128)
+    # save([vocab,matrix],"vocabMatrix_words.pk")

+ 0 - 0
BaseDataMaintenance/common/__init__.py


+ 109 - 0
BaseDataMaintenance/common/multiThread.py

@@ -0,0 +1,109 @@
+
+import threading
+import queue
+import time
+import traceback
+
+import ctypes
+import inspect
+import sys
+
+def _async_raise(tid, exctype):
+    """raises the exception, performs cleanup if needed"""
+    tid = ctypes.c_long(tid)
+    if not inspect.isclass(exctype):
+        exctype = type(exctype)
+    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
+    if res == 0:
+        raise ValueError("invalid thread id")
+    elif res != 1:
+        ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
+        raise SystemError("PyThreadState_SetAsyncExc failed")
+
+def stop_thread(thread):
+    _async_raise(thread.ident, SystemExit)
+
+class _taskHandler(threading.Thread):
+
+    def __init__(self,task_queue,task_handler,result_queue,*args,**kwargs):
+        threading.Thread.__init__(self)
+        self.task_queue = task_queue
+        self.task_handler = task_handler
+        self.result_queue = result_queue
+        self.args = args
+        self.kwargs = kwargs
+
+    def run(self):
+        while(True):
+            try:
+                # print("task queue size is %d"%(self.task_queue.qsize()))
+                item = self.task_queue.get(True,timeout=10)
+                self.task_handler(item,self.result_queue,*self.args,**self.kwargs)
+                # self.task_queue.task_done()
+            except queue.Empty as e:
+                # print("%s thread is done"%(self.name))
+                break
+            except Exception as e:
+                print("error: %s"%(e))
+                print(traceback.format_exc())
+
+class MultiThreadHandler(object):
+
+    def __init__(self,task_queue,task_handler,result_queue,thread_count=1,*args,**kwargs):
+        self.task_queue = task_queue
+        self.task_handler = task_handler
+        self.result_queue = result_queue
+        self.list_thread = []
+        self.thread_count = thread_count
+        self.args = args
+        self.kwargs = kwargs
+
+    def run(self):
+        for i in range(self.thread_count):
+            th = _taskHandler(self.task_queue,self.task_handler,self.result_queue,*self.args,**self.kwargs)
+            th.setDaemon(True)
+            self.list_thread.append(th)
+
+        for th in self.list_thread:
+            th.start()
+
+        while(not self._check_all_done()):
+            try:
+                time.sleep(1)
+                # ¿ÉÒÔÊÖ¶¯Í£Ö¹
+                # _quit = False
+                # line = sys.stdin.readline()
+                # if line.strip()=="quit":
+                #     _quit = True
+                # if _quit:
+                #     break
+            except KeyboardInterrupt:
+                print("interrupted by keyboard")
+                self.stop_all()
+                break
+        print("the whole task is done")
+
+
+
+    def _check_all_done(self):
+        bool_done = True
+        for th in self.list_thread:
+            if th.isAlive():
+                bool_done = False
+        return bool_done
+
+    def stop_all(self):
+        for th in self.list_thread:
+            if th.isAlive:
+                stop_thread(th)
+
+def test_handler(item,result_queue):
+    print(item)
+
+if __name__=="__main__":
+    task_queue = queue.Queue()
+    result_queue = queue.Queue()
+    for i in range(100):
+        task_queue.put(i)
+    a = MultiThreadHandler(task_queue=task_queue,task_handler=test_handler,result_queue=result_queue,thread_count=3)
+    a.run()

+ 0 - 0
BaseDataMaintenance/dataSource/__init__.py


+ 28 - 0
BaseDataMaintenance/dataSource/pool.py

@@ -0,0 +1,28 @@
+
+from multiprocessing import RLock
+import queue
+
+
+class ConnectorPool():
+
+    def __init__(self,init_num,max_num,method_init,**kwargs):
+        self.connector_pool = queue.Queue()
+        for i in range(init_num):
+            self.connector_pool.put(method_init(**kwargs))
+        self.method_init = method_init
+        self.kwargs = kwargs
+        self._lock = RLock()
+        self.pool_size = init_num
+        self.max_num = max_num
+
+    def getConnector(self):
+        with self._lock:
+            if self.connector_pool.empty():
+                if self.pool_size<self.max_num:
+                    self.connector_pool.put(self.method_init(**self.kwargs))
+            _conn = self.connector_pool.get(block=True)
+            return _conn
+
+    def putConnector(self,_conn):
+        with self._lock:
+            self.connector_pool.put(_conn)

+ 42 - 0
BaseDataMaintenance/dataSource/setttings.py

@@ -0,0 +1,42 @@
+# encoding: utf-8
+solr_collections = {"document":"http://47.97.221.63:8983/solr/",#文档
+                    "company":"http://47.97.210.202:8983/solr/",#公司
+                    "contact":"http://47.97.210.202:8983/solr/",#联系人
+                    "designed_project":"http://47.97.210.202:8983/solr/",
+                    "exclusive_project":"http://47.97.210.202:8983/solr/",
+                    "keyword_dict":"http://47.97.210.202:8983/solr/",
+                    "shen_pi_xiang_mu":"http://47.97.210.202:8983/solr/",#审批项目
+                    "t_company_qualification":"http://47.97.210.202:8983/solr/",
+                    "t_registrant":"http://47.97.210.202:8983/solr/"}
+
+mysql_host = "rm-bp1quo50k0q2ok73gi.mysql.rds.aliyuncs.com"
+mysql_port = 3306
+mysql_user = "bxkc_read"
+mysql_pass = "bxkc_20RE18AD"
+mysql_db = "bxkc"
+
+test_mysql_host = "192.168.2.170"
+test_mysql_port = 3306
+test_mysql_user = "root"
+test_mysql_pass = "pwdformysql0922"
+test_mysql_db = "exportDB"
+
+mongo_host = "47.98.60.3"
+mongo_port = 17017
+mongo_db = "bxkc"
+mongo_user = "bxkc_read"
+mongo_pass = "BidiReadOnly2017"
+
+elasticSearch_url = "http://47.97.210.202:9200/_search"
+
+# neo4j_host = "47.98.60.3"
+neo4j_host = "118.31.10.60"
+neo4j_port = 7687
+neo4j_user = "bxkc_web"
+neo4j_pass = "bxkc_web"
+
+oracle_host = "121.46.18.113"
+oracle_port = 10522
+oracle_user = "bxkc_data_readonly"
+oracle_pass = "P7WUrgcz0@#j8pjg"
+oracle_db = "yanphone"

+ 94 - 0
BaseDataMaintenance/dataSource/source.py

@@ -0,0 +1,94 @@
+#encoding:UTF8
+
+from BaseDataMaintenance.dataSource.setttings import *
+import requests
+import json
+import pymysql
+import pymongo
+from py2neo import Graph,NodeMatcher
+import tablestore
+
+
+
+def solrQuery(collection,args):
+    if collection in solr_collections:
+        _arg = ""
+        for k,v in args.items():
+            _arg += "&%s=%s"%(k,v)
+        _arg = _arg[1:]
+        url = "%s%s/select?%s"%(solr_collections[collection],collection,_arg)
+        resp = requests.get(url)
+        if resp.status_code==200:
+            return json.loads(resp.content.decode())
+        return None
+
+def solrQuery_url(url):
+    resp = requests.get(url)
+    if resp.status_code==200:
+        return json.loads(resp.content.decode())
+    return None
+
+def getConnection_mysql(db=None):
+    if db is None:
+        db = mysql_db
+    connect = pymysql.Connect(host=mysql_host, port=mysql_port, db=db, user=mysql_user, passwd=mysql_pass)
+    return connect
+
+def getConnection_testmysql(db=None):
+    if db is None:
+        db = test_mysql_db
+    connect = pymysql.Connect(host=test_mysql_host, port=test_mysql_port, db=db, user=test_mysql_user, passwd=test_mysql_pass)
+    return connect
+
+def getConnection_oracle():
+    import cx_Oracle
+    connect = cx_Oracle.connect(oracle_user,oracle_pass,'%s:%s/%s'%(oracle_host,oracle_port,oracle_db), encoding = "UTF-8", nencoding = "UTF-8")
+    # connect = cx_Oracle.connect('%s/%s@%s:%s/%s'%(oracle_user,oracle_pass,oracle_host,oracle_port,oracle_db))
+    return connect
+
+def getConnect_mongodb():
+    client = pymongo.MongoClient(mongo_host,mongo_port)
+    db = client[mongo_db]
+    db.authenticate(mongo_user,mongo_pass)
+    return db
+
+def make_elasticSearch(query):
+    resp = requests.post(elasticSearch_url,json=query)
+    if resp.status_code==200:
+        return json.loads(resp.content.decode())
+    return None
+
+def getConnect_neo4j():
+    graph = Graph(host=neo4j_host,auth=(neo4j_user,neo4j_pass))
+    return graph
+    # finded = graph.run("MATCH (n:Organization)-[R:ZhaoBiaoRelation]->(p:Project) where n.name='昆山市周市基础建设开发有限公司的昆山市恒迪服装辅料公司'  RETURN p LIMIT 25")
+    # print(json.loads(json.dumps(finded.data())))
+    # print(finded)
+
+def getConnect_ots():
+    ots_client = tablestore.client.OTSClient('https://bxkc-ots.cn-hangzhou.ots.aliyuncs.com', 'LTAI4G2bwraGDYQ4S5hhCxht', 'k6Llfa0S1KuvYyU2cWchExdQjPGJOY',
+                                             'bxkc-ots', logger_name = 'table_store.log',
+                                             retry_policy = tablestore.WriteRetryPolicy())
+    return ots_client
+
+def getConnect_gdb():
+    from gremlin_python.driver import client
+    client = client.Client('ws://gds-bp130d7rgd9m7n61150070pub.graphdb.rds.aliyuncs.com:3734/gremlin', 'g', username="bxkc", password="k0n1bxkc!0K^Em%j")
+    callback = client.submitAsync("g.V('北京赛克德利科贸有限公司').outE('ZhongBiaoRelation').inV().inE('ZhaoBiaoRelation').outV()")
+    for result in callback.result():
+        for item in result:
+            print(item.id)
+    return client
+
+
+if __name__=="__main__":
+    # solrQuery("document",{"q":"*:*"})
+    # getConnect_mongodb()
+    # data = solrQuery_url('http://47.97.221.63:8983/solr/document/select?fq=(publishtime:[2020-01-01T00:00:00Z%20TO%202020-08-12T23:59:59Z])&q=dochtmlcon:"防盗门"')
+    # data = solrQuery("document",{"q":'dochtmlcon:"防盗门"',"fq":'(publishtime:[2020-01-01T00:00:00Z%20TO%202020-08-12T23:59:59Z])',"fl":"city","rows":1})
+    # data = make_elasticSearch({"query":{"bool":{"must":[{"wildcard":{"nicknames.keyword":"*服装*"}}],"must_not":[],"should":[]}},"from":0,"size":10,"sort":[],"aggs":{}})
+    # print(data)
+    # getConnect_neo4j()
+    conn = getConnection_oracle()
+    # cursor = conn.cursor()
+    # getConnect_gdb()

+ 24 - 0
BaseDataMaintenance/dataSource/账号.txt

@@ -0,0 +1,24 @@
+检索系统: http://47.97.221.63:8983/solr/#/document/query
+			http://47.97.210.202:8983/solr/#/designed_project/query
+Mongodb: host:121.46.18.113	port:17017    账号:bxkc_read	 密码:BidiReadOnly2017  product:47.98.60.3:17017
+http://47.98.60.3:7474/browser/ Neo4j: bolt://47.98.60.3/:7687  账号:bxkc_web	 密码:bxkc_web
+rm-bp1quo50k0q2ok73gi.mysql.rds.aliyuncs.com bxkc_read bxkc_20RE18AD
+elasticSearch http://47.97.210.202:9100/
+
+千里马
+A15013122808 biaoxun666@
+剑鱼
+14748340652	biaoxun666
+14748252604	biaoxun666
+13418079164	biaoxun666
+14748450674	biaoxun666
+14748342971	biaoxun666
+14745297952	biaoxun666
+14748311820	biaoxun666
+14748317104	biaoxun666
+14743601752	biaoxun666
+14743440732	14743440732
+中国导航网
+a15711848144 bidi8888@
+中国招标网
+xinchen2020 bidi8888@

+ 0 - 0
BaseDataMaintenance/maintenance/__init__.py


+ 107 - 0
BaseDataMaintenance/maintenance/proposedBuilding/DataSynchronization.py

@@ -0,0 +1,107 @@
+#encoding:UTF8
+from BaseDataMaintenance.dataSource.pool import ConnectorPool
+from BaseDataMaintenance.dataSource.source import *
+from BaseDataMaintenance.common.Utils import *
+import queue
+from tablestore import *
+from multiprocessing import RLock
+from threading import Thread
+from apscheduler.schedulers.blocking import BlockingScheduler
+from BaseDataMaintenance.common.multiThread import MultiThreadHandler
+from BaseDataMaintenance.model.ots.proposedBuilding_tmp import proposedBuilding_tmp
+from BaseDataMaintenance.model.ots.designed_project import designed_project
+
+class DataSynchronization():
+
+    def __init__(self):
+        self.done_lock = RLock()
+        self.isDone = False
+        self.proposedBuilding_table = "proposedBuilding_tmp"
+        self.proposedBuilding_table_index = "proposedBuilding_tmp_index"
+        self.pool_ots = ConnectorPool(init_num=10,max_num=40,method_init=getConnect_ots)
+
+    def producer(self,task_queue):
+        '''
+        :return:生产数据
+        '''
+        ots_client = getConnect_ots()
+
+        bool_query = BoolQuery(must_queries=[ExistsQuery("crtime")])
+
+        columns = ["uuid","crtime","json_list_group"]
+        rows, next_token, total_count, is_all_succeed = ots_client.search(self.proposedBuilding_table, self.proposedBuilding_table_index,
+                                                                          SearchQuery(bool_query ,sort=Sort(sorters=[FieldSort("crtime",SortOrder.ASC)]), limit=100, get_total_count=True),
+                                                                          ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
+        list_data = getRow_ots(rows)
+        for _data in list_data:
+            _proposed = proposedBuilding_tmp(_data)
+            task_queue.put(_proposed,True)
+        while next_token:
+            rows, next_token, total_count, is_all_succeed = ots_client.search(self.proposedBuilding_table, self.proposedBuilding_table_index,
+                                                                              SearchQuery(bool_query ,next_token=next_token, limit=100, get_total_count=True),
+                                                                              ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
+            list_data = getRow_ots(rows)
+            for _data in list_data:
+                _proposed = proposedBuilding_tmp(_data)
+                task_queue.put(_proposed,True)
+
+    def comsumer(self,task_queue):
+
+        def _handle(_proposed,result_queue,pool_ots):
+            ots_client = pool_ots.getConnector()
+
+            #修改designed_project
+            _project_dict = _proposed.toDesigned_project(ots_client)
+
+            if _project_dict is not None:
+                #更新数据
+                _designed_project = designed_project(_project_dict)
+                _designed_project.update_project(ots_client)
+
+            #删除tmp
+            _proposed.delete_row(ots_client)
+
+
+            pool_ots.putConnector(ots_client)
+
+
+        result_queue = queue.Queue()
+
+        mt = MultiThreadHandler(task_queue,_handle,result_queue,thread_count=30,pool_ots=self.pool_ots)
+        mt.run()
+
+
+    def waitTask(self,task_queue):
+        for i in range(60):
+            if task_queue.qsize()>0:
+                return True
+            else:
+                time.sleep(1)
+        return False
+
+    def maxcompute2ots(self):
+
+        task_queue = queue.Queue(maxsize=10000)
+
+        thread_producer = Thread(target=self.producer,args=([task_queue]))
+        thread_producer.start()
+
+        if self.waitTask(task_queue):
+            thread_comsumer = Thread(target=self.comsumer,args=([task_queue]))
+            thread_comsumer.start()
+
+
+    def scheduler(self):
+        _scheduler = BlockingScheduler()
+        _scheduler.add_job(self.maxcompute2ots,"cron",minute="*/30")
+        _scheduler.start()
+
+def startSychro():
+    ds = DataSynchronization()
+    ds.scheduler()
+
+if __name__=="__main__":
+    ds = DataSynchronization()
+    ds.scheduler()
+
+

+ 0 - 0
BaseDataMaintenance/maintenance/proposedBuilding/__init__.py


+ 12 - 0
BaseDataMaintenance/maintenance/proposedBuilding/readme.md

@@ -0,0 +1,12 @@
+拟在建更新说明:
+
+1. maxcompute上维护一张全量的拟在建项目表
+
+2. maxcompute上对增量公告做过滤,得到符合拟在建项目的公告
+
+3. 存量拟在建融合,通过招标人+项目编号以及招标人+项目名称来做合并
+
+4. 返回增量公告到proposedBuilding_tmp表
+
+5. 定时器任务定时读取tmp表,并更新designed_project表
+

+ 1 - 0
BaseDataMaintenance/model/__init__.py

@@ -0,0 +1 @@
+

+ 92 - 0
BaseDataMaintenance/model/ots/BaseModel.py

@@ -0,0 +1,92 @@
+
+from BaseDataMaintenance.common.Utils import *
+from tablestore import *
+
+class BaseModel():
+
+    def __init__(self):
+        self.all_columns = []
+
+    def getProperties(self):
+        return self.__dict__
+
+    def setProperties(self,k,v):
+        if k in self.__dict__:
+            self.__dict__[k] = v
+
+    def getPrimary_keys(self):
+        raise NotImplementedError
+
+    def setValue(self,k,v,isColumn=False):
+        if "all_columns" not in self.__dict__:
+            self.all_columns = []
+        self.__dict__[k] = v
+        if isColumn:
+            self.all_columns.append(k)
+
+    def getAll_columns(self):
+        return list(self.__dict__.keys())
+
+    def getAttribute_keys(self):
+        return list(set(self.all_columns)-set(self.getPrimary_keys()))
+
+    def getAttribute_turple(self):
+        _list = []
+        for _key in self.getAttribute_keys():
+            _v = self.getProperties().get(_key)
+            if _v is not None and _v!="":
+                _list.append((_key,self.getProperties().get(_key)))
+        return _list
+
+    def getPrimaryKey_turple(self):
+        _list = []
+        for _key in self.getPrimary_keys():
+            _list.append((_key,self.getProperties().get(_key)))
+        return _list
+
+    @staticmethod
+    def search(ots_client,table_name,key_tuple,columns_to_get):
+        try:
+            # 调用get_row接口查询,最后一个参数值1表示只需要返回一个版本的值。
+            consumed, return_row, next_token = ots_client.get_row(table_name, key_tuple, columns_to_get, None, 1)
+            if return_row is not None:
+                _dict = getRow_ots_primary(return_row)
+                return _dict
+            return None
+
+        # 客户端异常,一般为参数错误或者网络异常。
+        except OTSClientError as e:
+            log("get row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message()))
+        # 服务端异常,一般为参数错误或者流控错误。
+        except OTSServiceError as e:
+            log("get row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id()))
+
+    def delete_row(self,ots_client):
+        primary_key = self.getPrimaryKey_turple()
+        row = Row(primary_key)
+        try:
+            consumed, return_row = ots_client.delete_row(self.table_name, row, None)
+        # 客户端异常,一般为参数错误或者网络异常。
+        except OTSClientError as e:
+            log("update row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message()))
+        # 服务端异常,一般为参数错误或者流控错误。
+        except OTSServiceError as e:
+            log("update row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id()))
+        # log ('Delete succeed, consume %s write cu.' % consumed.write)
+
+    def update_row(self,ots_client):
+        primary_key = self.getPrimaryKey_turple()
+        update_of_attribute_columns = {
+            'PUT' : self.getAttribute_turple()
+        }
+        row = Row(primary_key,update_of_attribute_columns)
+        condition = Condition('IGNORE')
+        try:
+            consumed, return_row = ots_client.update_row(self.table_name, row, condition)
+        # 客户端异常,一般为参数错误或者网络异常。
+        except OTSClientError as e:
+            log("update row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message()))
+        # 服务端异常,一般为参数错误或者流控错误。
+        except OTSServiceError as e:
+            log("update row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id()))
+        # log ('update succeed, consume %s write cu.' % consumed.write)

+ 0 - 0
BaseDataMaintenance/model/ots/__init__.py


+ 51 - 0
BaseDataMaintenance/model/ots/designed_project.py

@@ -0,0 +1,51 @@
+#encoding:UTF8
+
+from BaseDataMaintenance.model.ots.BaseModel import BaseModel
+from tablestore import *
+from BaseDataMaintenance.common.Utils import *
+
+class designed_project(BaseModel):
+
+    def __init__(self,_dict):
+        for k,v in _dict.items():
+            self.setValue(k,v,True)
+        self.table_name = "designed_project"
+
+    def getPrimary_keys(self):
+        return ["partitionkey","id"]
+
+    def search_by_docids(self,ots_client,docids):
+        should_q = []
+        for _docid in docids.split(","):
+            should_q.append(TermQuery("docids",_docid))
+
+        bool_query = BoolQuery(should_queries=should_q)
+        columns = ["docids"]
+        rows, next_token, total_count, is_all_succeed = ots_client.search("designed_project", "designed_project_index",
+                                                                          SearchQuery(bool_query, limit=100,get_total_count=True),
+                                                                          ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
+        list_dict = getRow_ots(rows)
+        return list_dict
+
+
+    def update_project(self,ots_client):
+        docids = self.__dict__.get("docids")
+
+        #判断是否有存量生成项目,有则更新且删除多余的
+        list_dict = self.search_by_docids(ots_client,docids)
+        if len(list_dict)>0:
+            for _dict in list_dict[1:]:
+                _designed_delete = designed_project(_dict)
+                _designed_delete.delete_row(ots_client)
+
+            _designed_update = designed_project(list_dict[0])
+            properties = _designed_update.getProperties()
+            _partitionkey = properties.get("partitionkey")
+            _id = properties.get("id")
+            self.setProperties("partitionkey",_partitionkey)
+            self.setProperties("id",_id)
+        self.update_row(ots_client)
+
+
+if __name__=="__main__":
+    pass

+ 21 - 0
BaseDataMaintenance/model/ots/enterprise.py

@@ -0,0 +1,21 @@
+
+from BaseDataMaintenance.model.ots.BaseModel import BaseModel
+
+
+class enterprise(BaseModel):
+
+    def __init__(self,_dict):
+
+        for k,v in _dict.items():
+            self.setValue(k,v,True)
+        self.table_name = "enterprise"
+
+    def getPrimary_keys(self):
+        return ["name"]
+
+
+
+
+if __name__=="__main__":
+    pass
+

+ 196 - 0
BaseDataMaintenance/model/ots/proposedBuilding_tmp.py

@@ -0,0 +1,196 @@
+
+from BaseDataMaintenance.model.ots.BaseModel import BaseModel
+from BaseDataMaintenance.common.Utils import *
+from BaseDataMaintenance.primarykey.startSnowflake import get_guid
+import json
+
+
+class proposedBuilding_tmp(BaseModel):
+
+    def __init__(self,_dict):
+
+        for k,v in _dict.items():
+            self.setValue(k,v,True)
+        self.table_name = "proposedBuilding_tmp"
+
+    def getPrimary_keys(self):
+        return ["uuid"]
+
+
+    def getFollows(self,list_group):
+        dict_stage = {}
+        for _group in list_group:
+            _stage = _group["stage"]
+            if _stage not in dict_stage:
+                dict_stage[_stage] = []
+            dict_stage[_stage].append(_group)
+        LIST_STEGES = ["施工在建","施工准备","环评阶段","设计阶段"]
+        for k,v in dict_stage.items():
+            v.sort(key=lambda x:x.get("page_time",""),reverse=True)
+        list_follows = []
+        last_stage = ""
+        last_pageTime = ""
+        for _STAGE_i in range(len(LIST_STEGES)):
+            _STAGE = LIST_STEGES[_STAGE_i]
+            for _group in dict_stage.get(_STAGE,[]):
+                current_stage = _group.get("stage","")
+                current_pageTime = _group.get("page_time","")
+                if last_stage=="":
+                    last_stage = current_stage
+                    last_pageTime = current_pageTime
+                if last_stage in LIST_STEGES[:_STAGE_i]:
+                    continue
+                if current_pageTime>last_pageTime:
+                    continue
+                last_stage = current_stage
+                last_pageTime = current_pageTime
+                list_follows.append(_group)
+        list_follows.reverse()
+        return list_follows
+
+
+    def getContacts(self,ots_client,_group):
+        _contacts = []
+        enterprise_dict = self.search(ots_client,"enterprise",[("name",_group["tenderee"])],["province","contacts","address"])
+
+        cellphone = ""
+        phone = ""
+        if isCellphone(_group["tenderee_phone"]):
+            cellphone = _group["tenderee_phone"]
+        else:
+            phone = _group["tenderee_phone"]
+
+        _dict_contact = {"address":enterprise_dict.get("address",""),"cellphone":cellphone,"phone":phone,
+                         "company_name":_group["tenderee"],"contact_name":_group["tenderee_contact"],
+                         "type":"业主单位","id":get_guid()}
+        _dict_contact = popNoneFromDict(_dict_contact)
+        _contacts.append(_dict_contact)
+        _docid = int(_group.get("docid"))
+        document_dict = self.search(ots_client,"document",[("partitionkey",_docid%500+1),("docid",_docid)],["sub_docs_json"])
+        win_tenderer = ""
+        if document_dict is not None:
+            sub_docs_json = document_dict.get("sub_docs_json",'[{}]')
+            for sub_docs in json.loads(sub_docs_json):
+                win_tenderer = sub_docs.get("win_tenderer","")
+        if win_tenderer!="":
+            enterprise_dict = self.search(ots_client,"enterprise",[("name",win_tenderer)],["province","contacts","address"])
+            if enterprise_dict is not None:
+                win_contacts = json.loads(enterprise_dict.get("contacts","[]"))
+                for _dict in win_contacts:
+                    cellphone = ""
+                    contact_name = ""
+                    phone = ""
+                    if _dict.get("contact_person","")!="" and (_dict.get("mobile_no","")!="" or _dict.get("phone_no","")!=""):
+                        contact_name = _dict.get("contact_person","")
+                        cellphone = _dict.get("mobile_no","")
+                        phone = _dict.get("phone_no","")
+                        _dict_contact = {"address":enterprise_dict.get("address",""),"cellphone":cellphone,
+                                         "phone":phone,"company_name":win_tenderer,"contact_name":contact_name,
+                                         "type":"%s单位"%(_group.get("stage","")[:2]),"id":get_guid()}
+                        _dict_contact = popNoneFromDict(_dict_contact)
+                        _contacts.append(_dict_contact)
+        return _contacts
+
+
+
+    def toDesigned_project(self,ots_client):
+        if self.json_list_group is None:
+            return None
+        list_group = json.loads(self.json_list_group)
+        for _group in list_group:
+            if "page_time" not in _group or _group["page_time"] is None:
+                _group["page_time"] = ""
+        list_group.sort(key=lambda x:x["page_time"])
+        set_docid = set()
+        set_contacts = set()
+        list_contacts = []
+        set_follows = set()
+        list_follows = []
+        _version_index = 1
+        crtime = getCurrent_date()
+        floor_space = ""
+        covered_area = ""
+        project_address = ""
+        begintime = ""
+        endtime = ""
+        project_description = ''
+        project_name = ""
+        high_project_name = ""
+        ordinary_name = ""
+        project_follow = ""
+        page_time = ""
+        progress = ""
+        des_project_type = "30"
+        status = "1"
+        id = get_guid()
+        partitionkey = id%200+1
+        update_status = "0"
+        update_time = getCurrent_date("%Y-%m-%d")
+        project_code = ""
+        project_type = ""
+        area = ""
+        for _group in self.getFollows(list_group):
+            set_docid.add(str(_group["docid"]))
+            if floor_space=="":
+                floor_space = _group.get("proportion","")
+                covered_area = floor_space
+            if begintime=="" and _group.get("begin_time") is not None:
+                begintime = _group.get("begin_time","")
+            if endtime=="" and _group.get("endtime") is not None:
+                endtime = _group.get("endtime","")
+            if project_description=="":
+                project_description = _group.get("projectDigest","")
+            if project_code=="":
+                project_code = _group.get("project_code","")
+            if project_name=="":
+                project_name = _group.get("project_name","")
+                high_project_name = project_name
+                ordinary_name = project_name
+            if page_time =="":
+                page_time = _group.get("page_time","")
+            if project_type=="":
+                project_type = _group.get("industry","")
+            if area=="":
+                area = _group.get("city","")
+
+            list_contacts.extend(self.getContacts(ots_client,_group))
+
+            _follow = "%s-%s"%(_group["stage"],_group["page_time"])
+            if _follow not in set_follows:
+                list_follows.append({"id":get_guid(),"crtime":getCurrent_date(),"mdate":"%s 00:00:00"%_group["page_time"],
+                                     "progress":_group["stage"],"progress_remark":"截止%s,该项目处于%s阶段"%(_group["page_time"],_group["stage"]),
+                                     "version":"跟进%d"%(_version_index)})
+                set_follows.add(_follow)
+                project_follow = "跟进%d"%(_version_index)
+                _version_index += 1
+                progress = _group["stage"]
+        legal_contacts = []
+        for _c in list_contacts:
+            _line = "%s-%s-%s-%s"%(_c.get("company_name",""),_c.get("contact_name",""),_c.get("cellphone",""),_c.get("phone",""))
+            if _line not in set_follows:
+                legal_contacts.append(_c)
+            set_follows.add(_line)
+        project_dict = {"crtime":crtime,"floor_space":floor_space,"project_address":project_address,
+                        "begintime":begintime,"endtime":endtime,"project_description":project_description,
+                        "project_name":project_name,"ordinary_name":ordinary_name,"high_project_name":high_project_name,
+                        "project_follow":project_follow,"page_time":page_time,"progress":progress,"contacts":json.dumps(legal_contacts,ensure_ascii=False),
+                        "follows":json.dumps(list_follows,ensure_ascii=False),"partitionkey":partitionkey,"id":id,
+                        "docids":",".join(list(set_docid)),"des_project_type":des_project_type,"status":status,"covered_area":covered_area,
+                        "update_status":update_status,"update_time":update_time,"project_code":project_code,
+                        "project_type":project_type,"area":area}
+        return project_dict
+
+
+
+
+
+
+
+
+
+
+if __name__=="__main__":
+    a = proposedBuilding_tmp("1",'2',"3")
+    print(dir(a))
+    print(a.getAttribute_keys())
+

+ 0 - 0
BaseDataMaintenance/primarykey/__init__.py


+ 41 - 0
BaseDataMaintenance/primarykey/startSnowflake.py

@@ -0,0 +1,41 @@
+
+
+import os
+from threading import Thread
+
+
+
+
+import snowflake.client
+import time
+
+def get_guid():
+    while(True):
+        try:
+            return snowflake.client.get_guid()
+        except Exception as e:
+            pass
+
+LINUX_PATH = "/home/python/interface_real_new/ENV"
+
+def startSnowflake(port=None,worker=None):
+    import platform
+    if platform.system()=="Windows":
+        _cmd = "snowflake_start_server"
+    else:
+        _cmd = LINUX_PATH+"/bin/snowflake_start_server"
+        if not os.path.exists(_cmd):
+            _cmd = os.path.dirname(__file__)+"/../..//ENV/bin/snowflake_start_server"
+    if worker is not None:
+        _cmd += " --worker=%d"%worker
+    if port is not None:
+        _cmd += " --port=%d"%port
+    os.system(_cmd)
+
+thread_startSnowflake = Thread(target=startSnowflake)
+thread_startSnowflake.start()
+time.sleep(2)
+
+if __name__=="__main__":
+    thread_startSnowflake = Thread(target=startSnowflake)
+    thread_startSnowflake.start()

+ 9 - 0
BaseDataMaintenance/start_sychro_proposedBuilding.py

@@ -0,0 +1,9 @@
+
+import sys
+import os
+sys.path.append(os.path.dirname(__file__)+"/..")
+
+
+from BaseDataMaintenance.maintenance.proposedBuilding.DataSynchronization import startSychro
+
+startSychro()

+ 54 - 0
nohup.out

@@ -0,0 +1,54 @@
+Traceback (most recent call last):
+  File "d:\anaconda3.4\envs\dl_nlp\lib\runpy.py", line 193, in _run_module_as_main
+    "__main__", mod_spec)
+  File "d:\anaconda3.4\envs\dl_nlp\lib\runpy.py", line 85, in _run_code
+    exec(code, run_globals)
+  File "D:\Anaconda3.4\envs\dl_nlp\Scripts\snowflake_start_server.exe\__main__.py", line 9, in <module>
+  File "d:\anaconda3.4\envs\dl_nlp\lib\site-packages\snowflake\server\__init__.py", line 75, in main
+    http_server.listen(options.port, options.address)
+  File "d:\anaconda3.4\envs\dl_nlp\lib\site-packages\tornado\tcpserver.py", line 143, in listen
+    sockets = bind_sockets(port, address=address)
+  File "d:\anaconda3.4\envs\dl_nlp\lib\site-packages\tornado\netutil.py", line 168, in bind_sockets
+    sock.bind(sockaddr)
+OSError: [WinError 10048] 通常每个套接字地址(协议/网络地址/端口)只允许使用一次。
+Starting snowflake start at localhost:8910
+Traceback (most recent call last):
+  File "d:\anaconda3.4\envs\dl_nlp\lib\runpy.py", line 193, in _run_module_as_main
+    "__main__", mod_spec)
+  File "d:\anaconda3.4\envs\dl_nlp\lib\runpy.py", line 85, in _run_code
+    exec(code, run_globals)
+  File "D:\Anaconda3.4\envs\dl_nlp\Scripts\snowflake_start_server.exe\__main__.py", line 9, in <module>
+  File "d:\anaconda3.4\envs\dl_nlp\lib\site-packages\snowflake\server\__init__.py", line 78, in main
+    tornado.ioloop.IOLoop.instance().start()
+  File "d:\anaconda3.4\envs\dl_nlp\lib\site-packages\tornado\platform\asyncio.py", line 132, in start
+    self.asyncio_loop.run_forever()
+  File "d:\anaconda3.4\envs\dl_nlp\lib\asyncio\base_events.py", line 421, in run_forever
+    self._run_once()
+  File "d:\anaconda3.4\envs\dl_nlp\lib\asyncio\base_events.py", line 1389, in _run_once
+    event_list = self._selector.select(timeout)
+  File "d:\anaconda3.4\envs\dl_nlp\lib\selectors.py", line 323, in select
+    r, w, _ = self._select(self._readers, self._writers, [], timeout)
+  File "d:\anaconda3.4\envs\dl_nlp\lib\selectors.py", line 314, in _select
+    r, w, x = select.select(r, w, w, timeout)
+KeyboardInterrupt
+Starting snowflake start at localhost:8910
+Traceback (most recent call last):
+  File "d:\anaconda3.4\envs\dl_nlp\lib\runpy.py", line 193, in _run_module_as_main
+    "__main__", mod_spec)
+  File "d:\anaconda3.4\envs\dl_nlp\lib\runpy.py", line 85, in _run_code
+    exec(code, run_globals)
+  File "D:\Anaconda3.4\envs\dl_nlp\Scripts\snowflake_start_server.exe\__main__.py", line 9, in <module>
+  File "d:\anaconda3.4\envs\dl_nlp\lib\site-packages\snowflake\server\__init__.py", line 78, in main
+    tornado.ioloop.IOLoop.instance().start()
+  File "d:\anaconda3.4\envs\dl_nlp\lib\site-packages\tornado\platform\asyncio.py", line 132, in start
+    self.asyncio_loop.run_forever()
+  File "d:\anaconda3.4\envs\dl_nlp\lib\asyncio\base_events.py", line 421, in run_forever
+    self._run_once()
+  File "d:\anaconda3.4\envs\dl_nlp\lib\asyncio\base_events.py", line 1389, in _run_once
+    event_list = self._selector.select(timeout)
+  File "d:\anaconda3.4\envs\dl_nlp\lib\selectors.py", line 323, in select
+    r, w, _ = self._select(self._readers, self._writers, [], timeout)
+  File "d:\anaconda3.4\envs\dl_nlp\lib\selectors.py", line 314, in _select
+    r, w, x = select.select(r, w, w, timeout)
+KeyboardInterrupt
+Starting snowflake start at localhost:8910

+ 5 - 0
test.py

@@ -0,0 +1,5 @@
+
+import sys
+import os
+
+print(os.path.dirname(sys.argv[0]))