Forráskód Böngészése

产品数据流版本

luojiehua 1 éve
szülő
commit
5d9340b7b8

+ 1 - 1
.idea/misc.xml

@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <project version="4">
-  <component name="ProjectRootManager" version="2" languageLevel="JDK_13" default="false" project-jdk-name="Python 3.5 (dl_nlp)" project-jdk-type="Python SDK">
+  <component name="ProjectRootManager" version="2" languageLevel="JDK_13" default="false" project-jdk-name="Python 3.7 (py37)" project-jdk-type="Python SDK">
     <output url="file://$PROJECT_DIR$/out" />
   </component>
 </project>

+ 48 - 0
BaseDataMaintenance/common/milvusUtil.py

@@ -0,0 +1,48 @@
+import traceback
+
+from pymilvus import connections,utility,FieldSchema,CollectionSchema,DataType,Collection
+
+
+def init_milvus(host):
+    connections.connect("default",host=host,port=19530)
+
+
+def drop_embedding_collection(collection_name):
+    print("drop collection:",collection_name)
+    utility.drop_collection(collection_name)
+
+
+def create_embedding_schema(collection_name,fields,index_name,index_params):
+    has = utility.has_collection(collection_name)
+    if not has:
+        print("creating collection")
+        coll_schema = CollectionSchema(fields,"this is the embedding schema")
+        coll = Collection(collection_name,coll_schema,consistency_level="Strong")
+
+        #create index for milvus_embedding
+        coll.create_index(index_name,index_params=index_params)
+
+def getCollection(collection_name):
+    coll = Collection(collection_name)
+    print(collection_name,"num_entities",coll.num_entities)
+    return coll
+
+
+def insert_embedding(coll,entities):
+    coll.insert(entities)
+    coll.load()
+    print("num_entities",coll.num_entities)
+
+
+def search_embedding(coll,index_name,vector,search_params,output_fields,limit=3):
+    list_result = []
+    result = coll.search(vector,index_name,search_params,output_fields=output_fields,limit=limit)
+    for hits in result:
+        for hit in hits:
+            list_result.append(hit)
+    return list_result
+
+if __name__ == '__main__':
+    # drop_embedding_collection()
+    create_embedding_schema()
+

+ 82 - 0
BaseDataMaintenance/common/sentencesUtil.py

@@ -0,0 +1,82 @@
+
+
+import torch
+from transformers import AutoTokenizer, AutoModel
+
+
+if torch.cuda.is_available():
+    device = torch.device("cuda")
+else:
+    device = torch.device("cpu")
+# device = torch.device("cpu")
+
+
+# 加载预训练模型和分词器
+# from transformers import BertTokenizer,BertModel
+# tokenizer = BertTokenizer.from_pretrained('bert-large-chinese')
+# model = BertModel.from_pretrained('bert-large-chinese')
+
+
+model_path = "bert-base-chinese"
+model_path = "THUDM/glm-large-chinese"
+model_path = "E:/huggingface/hub/models--THUDM--glm-large-chinese/snapshots/230f54e413fab4bc8f29bd3508aab301d757ef3e"
+model_path = "/data/python/models--THUDM--glm-large-chinese/snapshots/230f54e413fab4bc8f29bd3508aab301d757ef3e/"
+tokenizer = AutoTokenizer.from_pretrained(model_path,trust_remote_code=True)
+model = AutoModel.from_pretrained(model_path,trust_remote_code=True)
+model = model.to(device)
+
+
+def getVectorOfSentence(sentence):
+    inputs = tokenizer.batch_encode_plus([sentence], return_tensors='pt', padding=True, truncation=True)["input_ids"]
+    inputs = inputs.to(device)
+
+    input_shape = inputs.size()
+    position_ids = torch.arange(0, input_shape[-1], dtype=torch.long, device=device)
+    block_position_ids = torch.zeros(input_shape[-1], dtype=torch.long, device=device)
+    position_ids = torch.stack((position_ids, block_position_ids), dim=0).unsqueeze(0)
+    with torch.no_grad():
+        outputs = model(inputs,position_ids=position_ids)
+    last_hidden_state = outputs.last_hidden_states
+    print(last_hidden_state.shape)
+    # cls_embeddings = last_hidden_state[:,-1,:]
+    cls_embeddings = torch.mean(last_hidden_state,dim=1)
+    return cls_embeddings
+
+def get_cosine_similarity(v1,v2):
+    return torch.dot(v1,v2)/(torch.norm(v1)*torch.norm(v2)+1e-8)
+
+
+def normalize_vector(v):
+    return v/torch.sqrt(torch.sum(v*v,dim=1))
+
+def cosine_similarity(x1, x2, dim=1, eps=1e-8):
+    """
+    计算两个张量在指定维度上的余弦相似度。
+    """
+    w1 = torch.norm(x1, p=2, dim=dim, keepdim=True)
+    w2 = torch.norm(x2, p=2, dim=dim, keepdim=True)
+    dot = (x1 * x2).sum(dim=dim, keepdim=True)
+    cos = dot / (w1 * w2 + eps)
+
+    return cos.squeeze(dim=dim)
+
+def get_normalized_vector(sentence):
+    vector = normalize_vector(getVectorOfSentence(sentence)).tolist()[0]
+    return vector
+
+
+def test():
+    import time
+    a = time.time()
+    v1 = getVectorOfSentence("123456?")
+    print("get vector takes %.3f seconds" % (time.time() - a))
+    a = time.time()
+    v2 = getVectorOfSentence("12346")
+    print("get vector takes %.3f seconds" % (time.time() - a))
+    print(v1.shape)
+    a = time.time()
+    print(cosine_similarity(v1,v2))
+    print("cosine_similarity takes %.3f seconds" % (time.time() - a))
+
+if __name__ == '__main__':
+    test()

+ 53 - 13
BaseDataMaintenance/maintenance/attachment/attachmentFix.py

@@ -306,6 +306,16 @@ def fixAttachmentOfDoc(docid,new_files):
 
     list_attach = []
     new_attachments = []
+
+    partitionkey = docid%500+1
+    _document = Document({document_partitionkey:partitionkey,document_docid:docid})
+    _document.fix_columns(ots_client,[document_attachment_path,document_attachment_extract_status],True)
+    log(_document.getProperties().get(document_attachment_path,"[]"))
+    _old_attachments = _document.getProperties().get(document_attachment_path,"[]")
+    if _old_attachments=="":
+        _old_attachments = "[]"
+    _page_attachments = json.loads(_old_attachments)
+
     for _file in new_files:
         filepath = _file.get("filepath")
         filetype = _file.get("file_type")
@@ -334,14 +344,7 @@ def fixAttachmentOfDoc(docid,new_files):
         list_attach.append(_attach)
         new_attachments.append({document_attachment_path_filemd5:current_filemd5})
 
-    partitionkey = docid%500+1
-    _document = Document({document_partitionkey:partitionkey,document_docid:docid})
-    _document.fix_columns(ots_client,[document_attachment_path,document_attachment_extract_status],True)
-    log(_document.getProperties().get(document_attachment_path,"[]"))
-    _old_attachments = _document.getProperties().get(document_attachment_path,"[]")
-    if _old_attachments=="":
-        _old_attachments = "[]"
-    _page_attachments = json.loads(_old_attachments)
+
     print("fixing",docid,_page_attachments)
 
     for new_item in new_attachments:
@@ -390,6 +393,11 @@ def extract_pageAttachments(_html):
             if _text.find(suf)>=0 or _url.find(suf)>=0:
                 is_attach = True
                 file_type = suf.lower()
+        if not is_attach:
+            for suf in fileSuffix:
+                if _text.find(suf[1:])>=0 or _url.find(suf[1:])>=0:
+                    is_attach = True
+                    file_type = suf.lower()
         if is_attach:
             page_attachments.append({"fileLink":_url,"fileTitle":_text,"file_type":file_type[1:]})
     for _a in list_img:
@@ -423,7 +431,7 @@ def fixDoc(docid,ots_client=None):
     log("=1")
     if _document.getProperties().get(document_attachment_extract_status,0)!=1 or 1:
         log("=2")
-        if _document.getProperties().get(document_attachment_path,'[]')=="[]":
+        if _document.getProperties().get(document_attachment_path,'[]')=="[]" or 1:
             log("=3")
             new_attachments = extract_pageAttachments(_document.getProperties().get(document_dochtmlcon))
             log(str(new_attachments))
@@ -439,6 +447,17 @@ def fixDoc(docid,ots_client=None):
                     os.remove(_file.get("filepath"))
 
 
+def has_attachments(docid):
+    capacity_client = getConnect_ots_capacity()
+    partitionkey = docid%500+1
+    _document = Document({document_partitionkey:partitionkey,document_docid:docid})
+    _document.fix_columns(capacity_client,[document_dochtmlcon],True)
+
+    _html = _document.getProperties().get(document_dochtmlcon,"")
+    _soup = BeautifulSoup(_html,"lxml")
+    list_a = _soup.find_all("a")
+    return True if len(list_a)>0 else False
+
 class FixDocument():
 
     def __init__(self):
@@ -506,12 +525,33 @@ def start_docFix():
     fd = FixDocument()
     fd.start()
 
+
+def export_doc_has_attachment(docs):
+    for _doc in docs:
+        if has_attachments(_doc):
+            df_data["docid"].append(_doc)
+
+    import pandas as pd
+    df = pd.DataFrame(df_data)
+    df.to_excel("2022-01-18_183521_export11.xlsx")
 if __name__=="__main__":
-    # docs = [156668514]
-    # for _doc in docs:
-    #     fixDoc(_doc)
 
-    start_docFix()
+    docs = []
+    a = '''
+    188178166
+
+
+    '''
+    for a in a.split("\n"):
+        b = a.strip()
+        if len(b)>0:
+            docs.append(int(b))
+    df_data = {"docid":[]}
+
+    for _doc in docs:
+        fixDoc(_doc)
+
+    # start_docFix()
 
     # af = AttachmentFix()
     # af.schedule()

+ 5 - 4
BaseDataMaintenance/maintenance/dataflow.py

@@ -1265,6 +1265,7 @@ class Dataflow():
                     to_reverse=True
         if len(base_list)>0:
             base_list.sort(key=lambda x:x["docid"],reverse=to_reverse)
+            base_list.sort(key=lambda x:x.get(document_attachment_extract_status,0),reverse=True)
             base_list.sort(key=lambda x:x["extract_count"],reverse=True)
             return base_list[0]["docid"]
 
@@ -2798,7 +2799,7 @@ class Dataflow_dumplicate(Dataflow):
 
 
     def flow_dumplicate(self,process_count=flow_process_count,status_from=flow_dumplicate_status_from):
-        def producer(columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json]):
+        def producer(columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status]):
             q_size = self.queue_dumplicate.qsize()
             log("dumplicate queue size %d"%(q_size))
             if q_size>flow_process_count//3:
@@ -3881,7 +3882,7 @@ class Dataflow_dumplicate(Dataflow):
                 singleNum_keys = _rule["singleNum_keys"]
                 contain_keys = _rule["contain_keys"]
                 multiNum_keys = _rule["multiNum_keys"]
-                self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint])
+                self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status])
                 _i += step
 
 
@@ -4050,7 +4051,7 @@ class Dataflow_dumplicate(Dataflow):
 
 
     def test_dumplicate(self,docid):
-        columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint]
+        columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status]
         bool_query = BoolQuery(must_queries=[
             TermQuery("docid",docid)
         ])
@@ -4152,7 +4153,7 @@ if __name__ == '__main__':
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
     a = time.time()
-    df_dump.test_dumplicate(330679217)
+    df_dump.test_dumplicate(335604520)
     # df_dump.test_merge([292315564],[287890754])
     # df_dump.flow_remove_project_tmp()
     print("takes",time.time()-a)

+ 22 - 0
BaseDataMaintenance/maintenance/product/1.py

@@ -0,0 +1,22 @@
+
+
+from fuzzywuzzy import fuzz
+import Levenshtein
+
+
+s1 = "abcd"
+s2 = "abcdefgh"
+
+print(fuzz.ratio(s1,s2))
+print(Levenshtein.ratio(s1,s2))
+
+
+print(Levenshtein.jaro("1abdd","1abbd"))
+
+print((4/5+4/5+4/4)/3)
+print((5/5+5/5+3/5)/3)
+
+
+
+
+

+ 0 - 0
BaseDataMaintenance/maintenance/product/__init__.py


+ 161 - 0
BaseDataMaintenance/maintenance/product/productUtils.py

@@ -0,0 +1,161 @@
+
+from BaseDataMaintenance.maintenance.product.product_setting import *
+
+import re
+# 判断是不是入参字符串为全中文
+def judge_pur_chinese(keyword):
+    """
+    中文字符的编码范围为: u'\u4e00' -- u'\u9fff:只要在此范围内就可以判断为中文字符串
+    @param keyword:
+    @return:
+    """
+    # 定义一个需要删除的标点符号字符串列表
+    remove_chars = '[·’!"\#$%&\'()#!()*+,-./:;<=>?\@,:?¥★、….>【】[]《》?“”‘’\[\\]^_`{|}~]+'
+    # 利用re.sub来删除中文字符串中的标点符号
+    strings = re.sub(remove_chars, "", keyword)  # 将keyword中文字符串中remove_chars中包含的标点符号替换为空字符串
+    for ch in strings:
+        if u'\u4e00' <= ch <= u'\u9fff':
+            pass
+        else:
+            return False
+    return True
+
+
+
+from fuzzywuzzy import fuzz
+def is_similar(source,target):
+    source = str(source).lower()
+    target = str(target).lower()
+    max_len = max(len(source),len(target))
+    min_len = min(len(source),len(target))
+    # dis_len = abs(len(source)-len(target))
+    # min_dis = min(max_len*0.2,4)
+    if min_len==0 and max_len>0:
+        return False
+    if max_len<=4:
+        if source==target:
+            return True
+    else:
+        #判断相似度
+        similar = fuzz.ratio(source,target)
+        if similar>90:
+            return True
+        # 全中文判断是否包含
+        if judge_pur_chinese(source) and judge_pur_chinese(target):
+            if len(source)==max_len:
+                if str(source).find(target)>=0:
+                    return True
+            else:
+                if target.find(source)>=0:
+                    return True
+
+    return False
+
+
+SPECS_CHECK_SET = set([i for i in 'abcdefghijklmnopqrstuvwxyz']) | set([i for i in '0123456789']) | set([i for i in 'IⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩⅪⅫ'])
+
+def check_specs(source,target):
+    '''
+    check if the source specs is the same as the target
+    same only if the chars in SPECS_CHECK_SET have the same counts
+    :param source:
+    :param target:
+    :return:
+    '''
+    source = str(source).lower()
+    target = str(target).lower()
+
+    dict_source = {}
+    dict_target = {}
+    for s in source:
+        if s in SPECS_CHECK_SET:
+            if s not in dict_source:
+                dict_source[s] = 0
+            dict_source[s] += 1
+    for s in target:
+        if s in SPECS_CHECK_SET:
+            if s not in dict_target:
+                dict_target[s] = 0
+            dict_target[s] += 1
+    union_keys = set(list(dict_source.keys())) & set(list(dict_target.keys()))
+    if len(dict_source.keys())!= len(union_keys):
+        return False
+    for k,v in dict_source.items():
+        if v!=dict_target.get(k):
+            return False
+    return True
+
+import json
+
+def request_embedding(self,sentence,retry_times=3):
+    for _ in range(retry_times):
+        resp = self.session.post(embedding_url,json={"sentence":sentence})
+        if resp.status_code==200:
+            content = resp.content.decode("utf-8")
+            _d = json.loads(content)
+            if _d.get("success"):
+                return _d.get("vector")
+    return None
+
+def clean_product_name(product_name):
+    '''
+    clean before insert
+    :param product_name:
+    :return:
+    '''
+    return product_name
+
+def clean_product_brand(product_brand):
+    '''
+    clean before insert
+    :param product_brand:
+    :return:
+    '''
+    return product_brand
+
+SPECS_PATTERN = re.compile("[^A-Za-z0-9-\\/()()]")
+def clean_product_specs(product_specs):
+    '''
+    clean before insert
+    :param product_specs:
+    :return:
+    '''
+    _specs = re.sub(SPECS_PATTERN,'',product_specs)
+    if len(_specs)>0:
+        return _specs
+    return product_specs
+
+
+def clean_product_unit_price(product_unit_price):
+    '''
+    clean before insert
+    :param product_unit_price:
+    :return:
+    '''
+    try:
+        if product_unit_price is not None and product_unit_price!="":
+            _price = float(product_unit_price)
+            return _price
+    except Exception as e:
+        return ""
+    return ""
+
+
+def clean_product_quantity(product_quantity):
+    '''
+
+    :param product_quantity:
+    :return:
+    '''
+    try:
+        if product_quantity is not None and product_quantity!="":
+            _quantity = int(product_quantity)
+            return _quantity
+
+    except Exception as e:
+        return ""
+    return ""
+
+if __name__ == '__main__':
+    print(clean_product_specs("XY-K-JLJ-3A"))
+    print(check_specs("佳士比F6",'佳士比”F6'))

+ 252 - 0
BaseDataMaintenance/maintenance/product/product_dict.py

@@ -0,0 +1,252 @@
+
+from BaseDataMaintenance.common.milvusUtil import *
+from multiprocessing import Process,Queue
+from BaseDataMaintenance.common.Utils import *
+from BaseDataMaintenance.common.multiThread import MultiThreadHandler
+from apscheduler.schedulers.blocking import BlockingScheduler
+
+from BaseDataMaintenance.model.ots.document_product_dict import *
+
+from BaseDataMaintenance.dataSource.source import getConnect_ots
+
+from tablestore import *
+from BaseDataMaintenance.common.Utils import getRow_ots
+
+from BaseDataMaintenance.maintenance.product.productUtils import *
+
+import time
+import traceback
+import json
+import requests
+
+
+
+class Product_Dict_Manager():
+
+    def __init__(self):
+
+        self.search_params = {"metric_type":"IP",
+                              "params":{"nprobe":10}}
+
+        self.init_milvus()
+        self.ots_client = getConnect_ots()
+        self.queue_product_dict = Queue()
+
+        self.session = requests.Session()
+
+        self.collection_name_name = COLLECTION_NAME_NAME
+        self.collection_name_brand = COLLECTION_NAME_BRAND
+        self.collection_name_specs = COLLECTION_NAME_SPECS
+        self.Coll_name = getCollection(self.collection_name_name)
+        self.Coll_brand = getCollection(self.collection_name_brand)
+        self.Coll_specs = getCollection(self.collection_name_specs)
+
+
+    def init_milvus(self):
+        from pymilvus import connections,FieldSchema,DataType
+        fields = [
+            # FieldSchema(name="pk_id",dtype=DataType.INT64,is_primary=True,auto_id=True), # pk is the same as ots
+            FieldSchema(name="ots_id",dtype=DataType.VARCHAR,max_length=32,is_primary=True),
+            FieldSchema(name="ots_name",dtype=DataType.VARCHAR,max_length=MAX_NAME_LENGTH),
+            FieldSchema(name="embedding",dtype=DataType.FLOAT_VECTOR,dim=1024),
+            FieldSchema(name="ots_parent_id",dtype=DataType.VARCHAR,max_length=32),
+            FieldSchema(name="ots_grade",dtype=DataType.INT64)
+        ]
+
+        index_name = "embedding"
+        index_params = {"params":{"nlist":2048},
+                        "index_type":"IVF_SQ8",
+                        "metric_type":"IP"}
+
+        init_milvus(milvus_host)
+
+        #build the product name brand specs embedding respectively
+        create_embedding_schema(self.collection_name_name,fields,index_name,index_params)
+        create_embedding_schema(self.collection_name_brand,fields,index_name,index_params)
+        create_embedding_schema(self.collection_name_specs,fields,index_name,index_params)
+
+
+
+
+
+
+    def embedding_producer(self,columns=[DOCUMENT_PRODUCT_DICT_NAME,DOCUMENT_PRODUCT_DICT_PARENT_ID,DOCUMENT_PRODUCT_DICT_GRADE]):
+
+        bool_query = BoolQuery(
+            must_queries=[RangeQuery(DOCUMENT_PRODUCT_DICT_GRADE,3,5,True,True)],
+            must_not_queries=[TermQuery(DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED,2)])
+
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict","document_product_dict_index",
+                                                                            SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED)]),limit=100,get_total_count=True),
+                                                                            columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
+
+        list_dict = getRow_ots(rows)
+        for _d in list_dict:
+            self.queue_product_dict.put(_d)
+
+        while next_token:
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict","document_product_dict_index",
+                                                                                SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+                                                                                columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
+            list_dict = getRow_ots(rows)
+            for _d in list_dict:
+                self.queue_product_dict.put(_d)
+            if self.queue_product_dict.qsize()>=1000:
+                break
+        log("product_dict embedding total_count:%d"%total_count)
+
+
+
+    def get_collection(self,grade):
+        Coll = None
+        Coll_name = None
+        if grade ==SPECS_GRADE:
+            Coll = self.Coll_specs
+            Coll_name = self.collection_name_specs
+        if grade == BRAND_GRADE:
+            Coll = self.Coll_brand
+            Coll_name = self.collection_name_brand
+        if grade == NAME_GRADE:
+            Coll = self.Coll_name
+            Coll_name = self.collection_name_name
+        return Coll,Coll_name
+
+    def embedding_comsumer(self):
+        def handle(item,result_queue):
+            try:
+                id = item.get(DOCUMENT_PRODUCT_DICT_ID)
+                name = str(item.get(DOCUMENT_PRODUCT_DICT_NAME))[:MAX_NAME_LENGTH]
+                vector = request_embedding(name)
+                parent_id = item.get(DOCUMENT_PRODUCT_DICT_PARENT_ID)
+                grade = item.get(DOCUMENT_PRODUCT_DICT_GRADE)
+                Coll,_ = self.get_collection(grade)
+                if vector is not None and Coll is not None:
+
+                    data = [[id],
+                            [name],
+                            [vector],
+                            [parent_id],
+                            [grade]]
+                    insert_embedding(Coll,data)
+                    _pd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:id,DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:2})
+                    _pd.update_row(self.ots_client)
+
+            except Exception as e:
+                traceback.print_exc()
+
+
+        self.embedding_producer()
+        start_time = time.time()
+        q_size = self.queue_product_dict.qsize()
+        mt = MultiThreadHandler(self.queue_product_dict,handle,None,5,1)
+        mt.run()
+        log("process embedding %d records cost %.2f s"%(q_size,time.time()-start_time))
+
+
+    def start_embedding_product_dict(self):
+        from apscheduler.schedulers.blocking import BlockingScheduler
+        scheduler = BlockingScheduler()
+        # scheduler.add_job(func=self.embedding_producer,trigger="cron",minute="*/1")
+        scheduler.add_job(func=self.embedding_comsumer,trigger="cron",second="*/5")
+        scheduler.start()
+
+    def delete_collections(self):
+        drop_embedding_collection(self.collection_name_name)
+        drop_embedding_collection(self.collection_name_brand)
+        drop_embedding_collection(self.collection_name_specs)
+
+def start_embedding_product_dict():
+    pdm = Product_Dict_Manager()
+    pdm.start_embedding_product_dict()
+
+def drop_product_dict_collections():
+    pdm = Product_Dict_Manager()
+    pdm.delete_collections()
+
+
+def search_similar():
+
+    task_queue = Queue()
+    ots_client = getConnect_ots()
+    pdm = Product_Dict_Manager()
+
+    list_data = []
+
+    columns=[DOCUMENT_PRODUCT_DICT_NAME,DOCUMENT_PRODUCT_DICT_PARENT_ID,DOCUMENT_PRODUCT_DICT_GRADE]
+
+    bool_query = BoolQuery(
+        must_queries=[RangeQuery(DOCUMENT_PRODUCT_DICT_GRADE,5,5,True,True)]
+    )
+
+    rows,next_token,total_count,is_all_succeed = ots_client.search("document_product_dict","document_product_dict_index",
+                                                                        SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED)]),limit=100,get_total_count=True),
+                                                                        columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
+
+    list_dict = getRow_ots(rows)
+    for _d in list_dict:
+        list_data.append(_d)
+
+    while next_token:
+        rows,next_token,total_count,is_all_succeed = ots_client.search("document_product_dict","document_product_dict_index",
+                                                                            SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+                                                                            columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
+        list_dict = getRow_ots(rows)
+        for _d in list_dict:
+            list_data.append(_d)
+        if len(list_data)>=100000:
+            break
+    log("product_dict embedding total_count:%d"%total_count)
+    for _d in list_data:
+        task_queue.put(_d)
+
+    result_queue = Queue()
+
+    def handle(item,result_queue):
+        id = item.get(DOCUMENT_PRODUCT_DICT_ID)
+        name = item.get(DOCUMENT_PRODUCT_DICT_NAME)
+        vector = pdm.request_embedding(name)
+        parent_id = item.get(DOCUMENT_PRODUCT_DICT_PARENT_ID)
+        grade = item.get(DOCUMENT_PRODUCT_DICT_GRADE)
+        Coll,Coll_name = pdm.get_collection(grade)
+        output_fields = ['ots_id','ots_name',"ots_parent_id"]
+        if vector is not None and Coll is not None:
+            search_list = search_embedding(Coll,embedding_index_name,[vector],pdm.search_params,output_fields,limit=10)
+            for _item in search_list:
+                ots_id = _item.id
+                ots_name = _item.entity.get("ots_name")
+                ots_parent_id = _item.entity.get("ots_parent_id")
+                if name!=ots_name:
+                    if is_similar(name,ots_name):
+                        _d = {"source_id":id,"source_name":name,"grade":grade,"target_id":ots_id,"target_name":ots_name,"parent_id":parent_id,"target_parent_id":ots_parent_id}
+                        result_queue.put(_d)
+
+
+    mt = MultiThreadHandler(task_queue,handle,result_queue,5,1)
+    mt.run()
+
+    df_data = {}
+    _set = set()
+    df_columns = ["source_id","source_name","grade","parent_id","target_id","target_name","parent_parent_id"]
+    while 1:
+        try:
+            item = result_queue.get(timeout=1)
+            source_name = item.get("source_name")
+            target_name = item.get("target_name")
+            _key1 = "%s-%s"%(source_name,target_name)
+            _key2 = "%s-%s"%(target_name,source_name)
+            for c in df_columns:
+                if c not in df_data:
+                    df_data[c] = []
+                df_data[c].append(getLegal_str(item.get(c)))
+
+        except Exception as e:
+            break
+    import pandas as pd
+    df = pd.DataFrame(df_data)
+    df.to_excel("search_similar1.xlsx",columns=df_columns)
+
+
+
+
+if __name__ == '__main__':
+    start_embedding_product_dict

+ 20 - 0
BaseDataMaintenance/maintenance/product/product_setting.py

@@ -0,0 +1,20 @@
+
+
+milvus_host = "192.168.0.114"
+
+embedding_url = "http://localhost:17130/embedding"
+
+embedding_index_name = "embedding"
+
+PRODUCT_TMEP_STATUS_FROM = [1,50]
+PRODUCT_TMEP_STATUS_TO_SYNC = [201,300]
+PRODUCT_TEMP_STATUS_TO_NO_SYNC = [401,450]
+PRODUCT_TEMP_STATUS_TO_REPEATED = [451,500]
+
+COLLECTION_NAME_NAME = "product_dict_embedding_name"
+COLLECTION_NAME_BRAND = "product_dict_embedding_brand"
+COLLECTION_NAME_SPECS = "product_dict_embedding_specs"
+
+NAME_GRADE = 3
+BRAND_GRADE = 4
+SPECS_GRADE = 5

+ 431 - 0
BaseDataMaintenance/maintenance/product/products.py

@@ -0,0 +1,431 @@
+
+from BaseDataMaintenance.common.documentFingerprint import getMD5
+from BaseDataMaintenance.common.Utils import *
+from BaseDataMaintenance.common.milvusUtil import *
+from BaseDataMaintenance.common.multiThread import MultiThreadHandler
+
+from BaseDataMaintenance.maintenance.product.productUtils import *
+
+from BaseDataMaintenance.model.ots.document_product_tmp import *
+from BaseDataMaintenance.model.ots.document_product import *
+from BaseDataMaintenance.model.ots.document_product_dict import *
+
+from tablestore import *
+
+from BaseDataMaintenance.dataSource.source import getConnect_ots
+from multiprocessing import Process,Queue
+from random import randint
+
+from BaseDataMaintenance.maintenance.product.product_dict import Product_Dict_Manager
+
+
+class Product_Manager(Product_Dict_Manager):
+
+    def __init__(self):
+        super(Product_Manager, self).__init__()
+        self.process_queue = Queue()
+        self.ots_client = getConnect_ots()
+
+
+    def get_product_id(self,docid,name,brand,specs,unit_price,quantity):
+        if name is None:
+            name = ""
+        if brand is None:
+            brand = ""
+        if specs is None:
+            specs = ""
+        if quantity is None:
+            quantity = ""
+        if unit_price is None or unit_price=="":
+            unit_price = ""
+        else:
+            unit_price = "%.2f"%float(unit_price)
+        product_id = getMD5(str(docid))+str(name)+str(brand)+str(specs)+str(unit_price)+str(quantity)
+        return product_id
+
+    def producer(self,process_count=3000):
+        q_size = self.process_queue.qsize()
+        if q_size>process_count/6:
+            return
+        bool_query = BoolQuery(must_queries=[RangeQuery(DOCUMENT_PRODUCT_TMP_STATUS,1,51)])
+
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_temp","document_product_temp_index",
+                                                                            SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
+                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
+        list_data = getRow_ots(rows)
+        _count = len(list_data)
+        for _d in list_data:
+            self.process_queue.put(_d)
+        while next_token:
+            rows,next_token,total_count_is_all_succeed = self.ots_client.search("document_product_temp","document_product_temp_index",
+                                                                                SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+                                                                                columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
+            list_data = getRow_ots(rows)
+            for _d in list_data:
+                self.process_queue.put(_d)
+            _count += len(list_data)
+            if _count>=process_count:
+                break
+
+    def comsumer(self):
+        def start_thread(thread_count):
+            mt = MultiThreadHandler(self.process_queue,self.comsumer_handle,None,thread_count,1,False,True)
+            mt.run()
+
+        process_count = 3
+        thread_count = 10
+        list_process = []
+        for _i in range(process_count):
+            p = Process(target=start_thread,args=(thread_count,))
+            list_process.append(p)
+        for p in list_process:
+            p.start()
+        for p in list_process:
+            p.join()
+
+
+    def comsumer_handle(self,item,result_queue):
+        self.standardize(item)
+
+
+    def standardize(self,tmp_dict,output_fields = ['ots_id','ots_name',"ots_parent_id"]):
+        '''
+        Standardizes the product data
+        通过匹配标准参数表进行标准化,匹配是非精确匹配,校验规则是?
+        :return:
+        only save the standard product
+        one temp data is regard as standard product onli if match the name,contition on this,
+        if the brand is matched: if can be standard then change else add new brand ;if not matched replace as ""
+        and the same as specs
+
+        auto add the connection of name-brand and brand-specs because the 3 degree tree structure
+        '''
+        # todo:1. 产品参数表自动添加新的数据? 1. add new contections between existing names.2. add new specs
+        # 型号在进行匹配时要求差异字符串不能包含数字和字母和罗马数字,且不能忽略出现次数差异
+
+
+        save_product_tmp = Document_product_tmp({DOCUMENT_PRODUCT_TMP_ID:tmp_dict.get(DOCUMENT_PRODUCT_TMP_ID)})
+
+        _status = 0
+
+        document_product_tmp = Document_product_tmp(tmp_dict)
+
+        name = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_NAME)
+        brand = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_BRAND)
+        specs = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_SPECS)
+
+        new_name = ""
+        new_brand = ""
+        new_specs = ""
+
+        name_ots_id = None
+        brand_ots_id = None
+        specs_ots_id = None
+        if name is not None and name!="":
+            name_vector = request_embedding(name)
+            if name_vector is not None:
+                Coll,_ = self.get_collection(NAME_GRADE)
+                search_list = search_embedding(Coll,embedding_index_name,[name_vector],self.search_params,output_fields,limit=60)
+
+                for _search in search_list:
+                    ots_id = _search.entity.get("ots_id")
+                    ots_name = _search.entity.get("ots_name")
+                    ots_parent_id = _search.entity.get("ots_parent_id")
+
+                    if is_similar(name,ots_name):
+                        name_ots_id = ots_id
+                        new_name = ots_name
+
+                        #update alias of name
+                        _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:name_ots_id})
+                        _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
+                        if _flag and _dpd.updateAlias(name):
+                            _dpd.update_row(self.ots_client)
+                        break
+        if name_ots_id is not None:
+            if brand is not None and brand!="":
+                brand_vector = request_embedding(brand)
+                if brand_vector is not None:
+                    Coll,_ = self.get_collection(BRAND_GRADE)
+                    search_list = search_embedding(Coll,embedding_index_name,[name_vector],self.search_params,output_fields,limit=60)
+
+                    for _search in search_list:
+                        ots_id = _search.entity.get("ots_id")
+                        ots_name = _search.entity.get("ots_name")
+                        ots_parent_id = _search.entity.get("ots_parent_id")
+
+                        if is_similar(brand,ots_name):
+                            new_brand = ots_name
+
+                            # judge if the brand which parent_id is name_ots_id exists,if not insert one else update alias
+
+                            if name_ots_id is not None:
+                                brand_ots_id = get_document_product_dict_id(name_ots_id,new_brand)
+
+                                _d_brand = {DOCUMENT_PRODUCT_DICT_ID:brand_ots_id,
+                                            DOCUMENT_PRODUCT_DICT_NAME:new_brand,
+                                            DOCUMENT_PRODUCT_DICT_ALIAS:"%s&&%s"%(brand,new_brand),
+                                            DOCUMENT_PRODUCT_DICT_GRADE:BRAND_GRADE,
+                                            DOCUMENT_PRODUCT_DICT_STATUS:1,
+                                            DOCUMENT_PRODUCT_DICT_PARENT_ID:name_ots_id,
+                                            DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+                                            DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+                                            }
+                                _dpd_brand = Document_product_dict(_d_brand)
+                                if not _dpd_brand.exists_row(self.ots_client):
+                                    _dpd_brand.update_row(self.ots_client)
+                                else:
+                                    #update alias
+                                    _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:brand_ots_id})
+                                    _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
+                                    if _flag:
+                                        if _dpd.updateAlias(brand):
+                                            _dpd.update_row(self.ots_client)
+                            break
+                        else:
+                            # add new brand?
+                            pass
+
+            if specs is not None and specs!="":
+                specs_vector = request_embedding(specs)
+                if specs_vector is not None:
+                    Coll,_ = self.get_collection(SPECS_GRADE)
+                    search_list = search_embedding(Coll,embedding_index_name,[name_vector],self.search_params,output_fields,limit=60)
+
+                    for _search in search_list:
+                        ots_id = _search.entity.get("ots_id")
+                        ots_name = _search.entity.get("ots_name")
+                        ots_parent_id = _search.entity.get("ots_parent_id")
+
+                        if is_similar(specs,ots_name):
+                            if check_specs(specs,ots_name):
+                                new_specs = ots_name
+
+                                # to update the document_product_dict which is builded for search
+                                if brand_ots_id is not None:
+                                    # judge if the specs which parent_id is brand_ots_id exists,insert one if not exists else update alias
+                                    specs_ots_id = get_document_product_dict_id(brand_ots_id,new_specs)
+
+                                    _d_specs = {DOCUMENT_PRODUCT_DICT_ID:specs_ots_id,
+                                                DOCUMENT_PRODUCT_DICT_NAME:new_specs,
+                                                DOCUMENT_PRODUCT_DICT_ALIAS:"%s&&"%(specs,new_specs),
+                                                DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
+                                                DOCUMENT_PRODUCT_DICT_STATUS:1,
+                                                DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
+                                                DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+                                                DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+                                                }
+                                    _dpd_specs = Document_product_dict(_d_specs)
+                                    if not _dpd_specs.exists_row(self.ots_client):
+                                        _dpd_specs.update_row(self.ots_client)
+                                    else:
+                                        #update alias
+                                        _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:specs_ots_id})
+                                        _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
+                                        if _flag:
+                                            if _dpd.updateAlias(specs):
+                                                _dpd.update_row(self.ots_client)
+                            else:
+                                new_specs = clean_product_specs(specs)
+                                # insert into document_product_dict a new record
+                                # to update the document_product_dict which is builded for search
+                                # add new specs
+                                if brand_ots_id is not None and name_ots_id is not None:
+                                    _md5 = get_document_product_dict_id(brand_ots_id,new_specs)
+                                    _d = {DOCUMENT_PRODUCT_DICT_ID:_md5,
+                                          DOCUMENT_PRODUCT_DICT_NAME:new_specs,
+                                          DOCUMENT_PRODUCT_DICT_ALIAS:"%s&&%s"%(specs,new_specs),
+                                          DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
+                                          DOCUMENT_PRODUCT_DICT_STATUS:1,
+                                          DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
+                                          DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+                                          DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+                                          }
+                                    _dpd = Document_product_dict(_d)
+                                    _dpd.update_row(self.ots_client)
+                            break
+
+                        else:
+                            # add new specs?
+                            pass
+
+        # judge if the product matches the standard product
+        if name_ots_id is not None:
+
+            #standard the product and same to document_product table
+            _product = Document_product(tmp_dict)
+            docid = _product.getProperties().get(DOCUMENT_PRODUCT_DOCID)
+            unit_price = _product.getProperties().get(DOCUMENT_PRODUCT_UNIT_PRICE)
+            quantity = _product.getProperties().get(DOCUMENT_PRODUCT_QUANTITY)
+
+            unit_price = clean_product_unit_price(unit_price)
+            quantity = clean_product_quantity(quantity)
+
+            _product.setValue(DOCUMENT_PRODUCT_UNIT_PRICE,unit_price,True)
+            _product.setValue(DOCUMENT_PRODUCT_QUANTITY,quantity,True)
+            if isinstance(unit_price,(float,int)) and isinstance(quantity,(float,int)):
+                total_price = "%.2f"%(unit_price*quantity)
+                _product.setValue(DOCUMENT_PRODUCT_TOTAL_PRICE,total_price,True)
+
+            new_id = self.get_product_id(docid,new_name,new_brand,new_specs,unit_price,quantity)
+
+            _product.setValue(DOCUMENT_PRODUCT_ID,new_id,True)
+            _product.setValue(DOCUMENT_PRODUCT_ORIGINAL_ID,tmp_dict.get(DOCUMENT_PRODUCT_TMP_ID),True)
+
+            if name_ots_id is not None:
+                _product.setValue(DOCUMENT_PRODUCT_DICT_NAME_ID,name_ots_id,True)
+            if brand_ots_id is not None:
+                _product.setValue(DOCUMENT_PRODUCT_DICT_BRAND_ID,brand_ots_id,True)
+            if specs_ots_id is not None:
+                _product.setValue(DOCUMENT_PRODUCT_DICT_SPECS_ID,specs_ots_id,True)
+
+            _product.setValue(DOCUMENT_PRODUCT_NAME,new_name,True)
+            _product.setValue(DOCUMENT_PRODUCT_BRAND,new_brand,True)
+            _product.setValue(DOCUMENT_PRODUCT_SPECS,new_specs,True)
+
+
+            _product.setValue(DOCUMENT_PRODUCT_BRANDSPECS,"%s&&%s"%(new_brand,new_specs),True)
+            _product.setValue(DOCUMENT_PRODUCT_FULL_NAME,"%s&&%s&&%s"%(new_name,new_brand,new_specs),True)
+
+
+            if self.dumplicate(_product):
+                _status = randint(201,301)
+                save_product_tmp.setValue(DOCUMENT_PRODUCT_TMP_NEW_ID,new_id,True)
+
+                _product.update_row(self.ots_client)
+
+            else:
+                _status = randint(451,500)
+
+        else:
+            _status = randint(401,450)
+
+        save_product_tmp.setValue(DOCUMENT_PRODUCT_TMP_STATUS,_status,True)
+        save_product_tmp.update_row(self.ots_client)
+
+    def get_value_count(self,name,brand,specs,unit_price,quantity):
+
+        value_count = 0
+        if len(name)>0:
+            value_count += 1
+        if len(brand)>0:
+            value_count += 1
+        if len(specs)>0:
+            value_count += 1
+        if isinstance(unit_price,(float,int)) and unit_price>0:
+            value_count += 1
+        if isinstance(quantity,(float,int)) and quantity>0:
+            value_count += 1
+
+    def dumplicate_search_product(self,document_product):
+
+        docid = document_product.getProperties().get(DOCUMENT_PRODUCT_DOCID)
+        name = document_product.getProperties().get(DOCUMENT_PRODUCT_NAME)
+        brand = document_product.getProperties().get(DOCUMENT_PRODUCT_BRAND,"")
+        specs = document_product.getProperties().get(DOCUMENT_PRODUCT_SPECS,"")
+        unit_price = document_product.getProperties().get(DOCUMENT_PRODUCT_UNIT_PRICE,"")
+        quantity = document_product.getProperties().get(DOCUMENT_PRODUCT_QUANTITY,"")
+        page_time = document_product.getProperties().get(DOCUMENT_PRODUCT_PAGE_TIME)
+        tenderee = document_product.getProperties().get(DOCUMENT_PRODUCT_TENDEREE,"")
+        supplier = document_product.getProperties().get(DOCUMENT_PRODUCT_SUPPLIER,"")
+
+        page_time_before = page_time
+        page_time_after = page_time
+        try:
+            page_time_bofore = timeAdd(page_time,-30)
+            page_time_after = timeAdd(page_time,30)
+        except Exception as e:
+            pass
+
+        if len(name)>0 and len(brand)>0 and len(specs)>0 and isinstance(unit_price,(float,int)) and isinstance(quantity,(float,int)):
+            bool_query = BoolQuery(must_queries=[TermQuery("name",name),
+                                                 RangeQuery("page_time",page_time_before,page_time_after,True,True),
+                                                 TermQuery("brand",brand),
+                                                 TermQuery("specs",specs),
+                                                 TermQuery("unit_price",unit_price),
+                                                 TermQuery("quantity",quantity)
+                                                 ])
+
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
+                                                                                SearchQuery(bool_query,limit=1),
+                                                                                columns_to_get=ColumnsToGet(["name",'brand','specs'],return_type=ColumnReturnType.SPECIFIED))
+            list_data = getRow_ots(rows)
+            if len(list_data)>0:
+                return list_data[0].get(DOCUMENT_PRODUCT_ID),1
+
+        if len(name)>0 and len(brand)>0 and len(supplier)>0 and len(tenderee)>0:
+            bool_query = BoolQuery(must_queries=[TermQuery("name",name),
+                                                 RangeQuery("page_time",page_time_before,page_time_after,True,True),
+                                                 TermQuery("brand",brand),
+                                                 TermQuery("tenderee",tenderee),
+                                                 TermQuery("supplier",supplier),
+                                                 ])
+
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
+                                                                                SearchQuery(bool_query,limit=50),
+                                                                                columns_to_get=ColumnsToGet(["name",'brand','specs','unit_price','quantity'],return_type=ColumnReturnType.SPECIFIED))
+            list_data = getRow_ots(rows)
+            value_count = self.get_value_count(name,brand,specs,unit_price,quantity)
+
+            for _d in list_data:
+                s_id = _d.get(DOCUMENT_PRODUCT_ID)
+                s_name = _d.get(DOCUMENT_PRODUCT_NAME,"")
+                s_brand = _d.get(DOCUMENT_PRODUCT_BRAND,"")
+                s_specs = _d.get(DOCUMENT_PRODUCT_SPECS,"")
+                s_unit_price = _d.get(DOCUMENT_PRODUCT_UNIT_PRICE,"")
+                s_quantity = _d.get(DOCUMENT_PRODUCT_QUANTITY,"")
+                check_flag = True
+                value_count1 = self.get_value_count(s_name,s_brand,s_specs,s_unit_price,s_quantity)
+                if len(specs)>0 and len(s_specs)>0 and specs!=s_specs:
+                    check_flag = False
+                elif isinstance(unit_price,(float,int)) and isinstance(s_unit_price,(float,int)) and unit_price!=s_unit_price:
+                    check_flag = False
+                elif isinstance(quantity,(float,int)) and isinstance(s_quantity,(float,int)) and quantity!=s_quantity:
+                    check_flag = False
+
+                if check_flag:
+                    if value_count<value_count1:
+                        to_save = 0
+                    else:
+                        to_save = 1
+                    return s_id,to_save
+        return None,1
+
+
+
+    def dumplicate(self,document_product):
+        '''
+        Duplicates the product data
+        将同一个产品的采购结果公示进行去重,结合公告进行。
+        :return:True if not repeated else False
+        '''
+
+        dump_id,to_save = self.dumplicate_search_product(document_product)
+
+        if dump_id is not None:
+            document_product.setValue(DOCUMENT_PRODUCT_DUMP_ID,dump_id,True)
+
+        if to_save==1:
+            if dump_id is not None:
+                _d = {DOCUMENT_PRODUCT_ID,dump_id}
+                _dp = Document_product(_d)
+                _dp.delete_row(self.ots_client)
+            return True
+        else:
+            return False
+
+
+
+
+if __name__ == '__main__':
+
+    a = str(1) + "CT设备" + "西门子" + str(5.5) + str(10)
+    print(a,getMD5(a))
+    a = str(1) + "CT设备" + "" + str(5.5) + str(10)
+    print(a,getMD5(a))
+    a = str(1) + "CT设备" + "" + "" + str(10)
+    print(a,getMD5(a))
+    a = str(1) + "CT设备" + "" + "" + ""
+    print(a,getMD5(a))
+    a = str(1) + "CT设备" + "" + str(5.5) + ""
+    print(a,getMD5(a))

+ 56 - 0
BaseDataMaintenance/model/ots/document_product.py

@@ -0,0 +1,56 @@
+
+from BaseDataMaintenance.model.ots.BaseModel import BaseModel
+
+
+DOCUMENT_PRODUCT_ID = 'id'
+DOCUMENT_PRODUCT_NAME = 'name'
+DOCUMENT_PRODUCT_BRAND = 'brand'
+DOCUMENT_PRODUCT_SPECS = 'specs'
+DOCUMENT_PRODUCT_BRANDSPECS = 'brandSpecs'
+DOCUMENT_PRODUCT_FULL_NAME = 'full_name'
+DOCUMENT_PRODUCT_UNIT_PRICE = 'unit_price'
+DOCUMENT_PRODUCT_QUANTITY = 'quantity'
+DOCUMENT_PRODUCT_QUANTITY_UNIT = 'quantity_unit'
+DOCUMENT_PRODUCT_TOTAL_PRICE = 'total_price'
+DOCUMENT_PRODUCT_SUPPLIER = 'supplier'
+DOCUMENT_PRODUCT_PARAMETER = 'parameter'
+DOCUMENT_PRODUCT_CATEGORY = 'category'
+DOCUMENT_PRODUCT_DOCID = "docid"
+DOCUMENT_PRODUCT_DOCTITLE = "doctitle"
+DOCUMENT_PRODUCT_PROJECT_NAME = 'project_name'
+DOCUMENT_PRODUCT_PROJECT_CODE = 'project_code'
+DOCUMENT_PRODUCT_TENDEREE = 'tenderee'
+DOCUMENT_PRODUCT_TENDEREE_PHONE = 'tenderee_phone'
+DOCUMENT_PRODUCT_TENDEREE_CONTACT = 'tenderee_contact'
+DOCUMENT_PRODUCT_PROCUREMENT_SYSTEM = 'procurement_system'
+DOCUMENT_PRODUCT_BIDDING_BUDGET = 'bidding_budget'
+DOCUMENT_PRODUCT_WIN_TENDERER = 'win_tenderer'
+DOCUMENT_PRODUCT_PROVINCE = 'province'
+DOCUMENT_PRODUCT_CITY = 'city'
+DOCUMENT_PRODUCT_DISTRICT = 'district'
+DOCUMENT_PRODUCT_PAGE_TIME = 'page_time'
+DOCUMENT_PRODUCT_PAGE_TIME_YEAR = 'page_time_year'
+DOCUMENT_PRODUCT_INFO_TYPE = 'info_type'
+DOCUMENT_PRODUCT_INDUSTRY = 'industry'
+DOCUMENT_PRODUCT_STATUS = 'status'
+DOCUMENT_PRODUCT_CREATE_TIME = 'create_time'
+DOCUMENT_PRODUCT_UPDATE_TIME = 'update_time'
+
+DOCUMENT_PRODUCT_ORIGINAL_ID = 'original_id'
+DOCUMENT_PRODUCT_DICT_NAME_ID = 'dict_name_id'
+DOCUMENT_PRODUCT_DICT_BRAND_ID = "dict_brand_id"
+DOCUMENT_PRODUCT_DICT_SPECS_ID = "dict_specs_id"
+
+DOCUMENT_PRODUCT_DUMP_ID = "dump_id"
+
+class Document_product(BaseModel):
+
+    def initialize(self,dict):
+        BaseModel.__init__(self)
+        for k,v in dict.items():
+            self.setValue(k,v,True)
+
+        self.table_name = 'document_product'
+
+    def getPrimaryKey_turple(self):
+        return ['id']

+ 42 - 0
BaseDataMaintenance/model/ots/document_product_dict.py

@@ -0,0 +1,42 @@
+
+from BaseDataMaintenance.model.ots.BaseModel import BaseModel
+
+DOCUMENT_PRODUCT_DICT_ID = "id"
+DOCUMENT_PRODUCT_DICT_NAME = "name"
+DOCUMENT_PRODUCT_DICT_ALIAS = "alias"
+DOCUMENT_PRODUCT_DICT_GRADE = "grade"
+DOCUMENT_PRODUCT_DICT_STATUS = "status"
+DOCUMENT_PRODUCT_DICT_PARENT_ID = "parent_id"
+DOCUMENT_PRODUCT_DICT_CREATE_TIME = "create_time"
+DOCUMENT_PRODUCT_DICT_UPDATE_TIME = "update_time"
+DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED = "is_synchonized"
+
+MAX_NAME_LENGTH = 300
+
+class Document_product_dict(BaseModel):
+
+    def __init__(self,_dict):
+        BaseModel.__init__(self)
+        for k,v in _dict.items():
+            self.setValue(k,v,True)
+        self.table_name = "document_product_dict"
+
+    def getPrimary_keys(self):
+        return ["id"]
+
+
+
+    def updateAlias(self,name):
+        name = str(name).lower()
+        alias = self.getProperties().get(DOCUMENT_PRODUCT_DICT_ALIAS,"")
+        l_alias = alias.split("&&")
+        if name not in set(l_alias):
+            alias+="&&%s"%name
+            self.setValue(DOCUMENT_PRODUCT_DICT_ALIAS,alias,True)
+            return True
+        return False
+
+
+from BaseDataMaintenance.common.documentFingerprint import getMD5
+def get_document_product_dict_id(parent_md5,name):
+    return getMD5(parent_md5+"&&%s"%name)

+ 51 - 0
BaseDataMaintenance/model/ots/document_product_tmp.py

@@ -0,0 +1,51 @@
+
+from BaseDataMaintenance.model.ots.BaseModel import BaseModel
+
+
+DOCUMENT_PRODUCT_TMP_ID = 'id'
+DOCUMENT_PRODUCT_TMP_NAME = 'name'
+DOCUMENT_PRODUCT_TMP_BRAND = 'brand'
+DOCUMENT_PRODUCT_TMP_SPECS = 'specs'
+DOCUMENT_PRODUCT_TMP_BRANDSPECS = 'brandSpecs'
+DOCUMENT_PRODUCT_TMP_FULL_NAME = 'full_name'
+DOCUMENT_PRODUCT_TMP_UNIT_PRICE = 'unit_price'
+DOCUMENT_PRODUCT_TMP_QUANTITY = 'quantity'
+DOCUMENT_PRODUCT_TMP_QUANTITY_UNIT = 'quantity_unit'
+DOCUMENT_PRODUCT_TMP_TOTAL_PRICE = 'total_price'
+DOCUMENT_PRODUCT_TMP_SUPPLIER = 'supplier'
+DOCUMENT_PRODUCT_TMP_PARAMETER = 'parameter'
+DOCUMENT_PRODUCT_TMP_CATEGORY = 'category'
+DOCUMENT_PRODUCT_TMP_DOCID = "docid"
+DOCUMENT_PRODUCT_TMP_DOCTITLE = "doctitle"
+DOCUMENT_PRODUCT_TMP_PROJECT_NAME = 'project_name'
+DOCUMENT_PRODUCT_TMP_PROJECT_CODE = 'project_code'
+DOCUMENT_PRODUCT_TMP_TENDEREE = 'tenderee'
+DOCUMENT_PRODUCT_TMP_TENDEREE_PHONE = 'tenderee_phone'
+DOCUMENT_PRODUCT_TMP_TENDEREE_CONTACT = 'tenderee_contact'
+DOCUMENT_PRODUCT_TMP_PROCUREMENT_SYSTEM = 'procurement_system'
+DOCUMENT_PRODUCT_TMP_BIDDING_BUDGET = 'bidding_budget'
+DOCUMENT_PRODUCT_TMP_WIN_TENDERER = 'win_tenderer'
+DOCUMENT_PRODUCT_TMP_PROVINCE = 'province'
+DOCUMENT_PRODUCT_TMP_CITY = 'city'
+DOCUMENT_PRODUCT_TMP_DISTRICT = 'district'
+DOCUMENT_PRODUCT_TMP_PAGE_TIME = 'page_time'
+DOCUMENT_PRODUCT_TMP_PAGE_TIME_YEAR = 'page_time_year'
+DOCUMENT_PRODUCT_TMP_INFO_TYPE = 'info_type'
+DOCUMENT_PRODUCT_TMP_INDUSTRY = 'industry'
+DOCUMENT_PRODUCT_TMP_STATUS = 'status'
+DOCUMENT_PRODUCT_TMP_CREATE_TIME = 'create_time'
+DOCUMENT_PRODUCT_TMP_UPDATE_TIME = 'update_time'
+
+DOCUMENT_PRODUCT_TMP_NEW_ID = "new_id"
+
+
+class Document_product_tmp(BaseModel):
+
+    def initialize(self,dict):
+        BaseModel.__init__(self)
+        for k,v in dict.items():
+            self.setValue(k,v,True)
+        self.table_name = 'document_product_temp'
+
+    def getPrimaryKey_turple(self):
+        return ['id']

+ 0 - 0
BaseDataMaintenance/services/__init__.py


+ 26 - 0
BaseDataMaintenance/services/embedding_services.py

@@ -0,0 +1,26 @@
+
+
+from flask import Flask,request,jsonify
+app = Flask(__name__)
+
+from BaseDataMaintenance.common.sentencesUtil import *
+
+@app.route("/embedding",methods=["POST"])
+def embedding():
+    _r = {"success": True}
+    try:
+        sentence = request.json.get("sentence","")
+        vector = get_normalized_vector(sentence)
+        _r["vector"] = vector
+    except Exception as e:
+        _r["success"] = False
+
+
+    return jsonify(_r)
+
+## 启动方法
+## 将此文件放在与BaseDataMaintenance同级下
+## nohup /data/anaconda3/envs/py37/bin/gunicorn -w 1 --limit-request-fields 0 --limit-request-line 0 -t 1000 --keep-alive 600 -b 0.0.0.0:17130 embedding_services:app >> embedding.log &
+
+if __name__ == "__main__":
+    app.run(host="0.0.0.0",port="15010",debug=True)

+ 23 - 0
BaseDataMaintenance/services/test_embedding.py

@@ -0,0 +1,23 @@
+
+
+import requests
+
+import time
+import json
+session = requests.Session()
+def post_request():
+
+    for _ in range(100):
+        start_time = time.time()
+        _json = {"test":1,"sentence":"testtesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttest"}
+        resp = session.post("http://localhost:17130/embedding", json=_json)
+
+        if resp.status_code==200:
+            content = resp.content.decode("utf-8")
+            _d = json.loads(content)
+            if _d.get("success"):
+                print(resp.status_code,_d.get("success"),time.time()-start_time)
+
+
+if __name__ == '__main__':
+    post_request()

+ 13 - 0
BaseDataMaintenance/start_main.py

@@ -12,6 +12,10 @@ def main(args=None):
     parser.add_argument("--delkey",dest="deleteEnterpriseKey",action="store_true",help="start attachmentAttachment process")
     parser.add_argument("--keys",dest="keys",type=str,default=None,help="start attachmentAttachment process")
     parser.add_argument("--rptc",dest="remove_processed_tyc_company",action="store_true",help="start attachmentAttachment process")
+    parser.add_argument("--product_dict_synchonize",dest="product_dict_synchonize",action="store_true",help="start product_dict_synchonize process")
+    parser.add_argument("--delete_product_collections",dest="delete_product_collections",action="store_true",help="start product_dict_synchonize process")
+    parser.add_argument("--search_similar",dest="search_similar",action="store_true",help="start product_dict_synchonize process")
+
     args = parser.parse_args(args)
     if args.attachAttachment:
         from BaseDataMaintenance.maintenance.document.attachAttachment import start_attachAttachment
@@ -27,6 +31,15 @@ def main(args=None):
     if args.remove_processed_tyc_company:
         from BaseDataMaintenance.maintenance.tyc_company.remove_processed import start_remove_processed_tyc_company
         start_remove_processed_tyc_company()
+    if args.product_dict_synchonize:
+        from BaseDataMaintenance.maintenance.product.product_dict import start_embedding_product_dict
+        start_embedding_product_dict()
+    if args.delete_product_collections:
+        from BaseDataMaintenance.maintenance.product.product_dict import drop_product_dict_collections
+        drop_product_dict_collections()
+    if args.search_similar:
+        from BaseDataMaintenance.maintenance.product.product_dict import search_similar
+        search_similar()
 
 
 if __name__ == '__main__':

+ 20 - 0
BaseDataMaintenance/test.py

@@ -0,0 +1,20 @@
+
+
+from BaseDataMaintenance.common.Utils import load,article_limit
+from bs4 import BeautifulSoup
+import re
+filename = "329546490.pk"
+
+item = load(filename)
+
+_dochtmlcon = item.get("dochtmlcon","")
+
+
+_dochtmlcon = re.sub("<html>|</html>|<body>|</body>", "", _dochtmlcon)
+_soup = BeautifulSoup(_dochtmlcon,"lxml")
+_soup = article_limit(_soup,200000)
+_dochtmlcon = str(_soup)
+
+print("done",len(_dochtmlcon))
+
+

+ 24 - 0
BaseDataMaintenance/test/test_detail_link.py

@@ -0,0 +1,24 @@
+
+
+import requests
+
+
+header={
+    "Accept": "text/html, application/xhtml+xml, image/jxr, */*",
+    "Referer": "http://uia.hnist.cn/sso/login?service=http%3A%2F%2Fportal.hnist.\
+                cn%2Fuser%2FsimpleSSOLogin",
+    "Accept-Language": "zh-Hans-CN,zh-Hans;q=0.8,en-US;q=0.5,en;q=0.3",
+    "Content-Type": "application/x-www-form-urlencoded",
+    "Connection": "Keep-Alive",
+    "User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) \
+     AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.162 Safari/537.36",
+    #"Accept-Encoding": "gzip, deflate",
+    "Origin": "http://uia.hnist.cn",
+    "Upgrade-Insecure-Requests": "1",
+}
+
+url = "http://www.gxzhzb.com/NewsView.aspx?R=3ba60a67-23e6-449f-b089-1272f6191bbe&TypeKeyID=e8b40a16-2a09-4a03-9d04-9352bb9b4e9c&NewsKeyID=5a17d526-7fd7-4244-ba23-9d6c7d633c68"
+resp = requests.get(url,headers=header)
+
+print(resp.status_code)
+print(resp.content.decode('utf-8'))