Sfoglia il codice sorgente

将匹配中向量生成和搜索用redis缓存

luojiehua 1 anno fa
parent
commit
2a03d0b6f7

+ 2 - 2
BaseDataMaintenance/common/milvusUtil.py

@@ -2,7 +2,6 @@ import traceback
 
 from pymilvus import connections,utility,FieldSchema,CollectionSchema,DataType,Collection
 
-
 def init_milvus(host):
     connections.connect("default",host=host,port=19530)
 
@@ -36,10 +35,11 @@ def insert_embedding(coll,entities):
 
 def search_embedding(coll,index_name,vector,search_params,output_fields,limit=3):
     list_result = []
-    result = coll.search(vector,index_name,search_params,output_fields=output_fields,limit=limit)
+    result = coll.search(vector,index_name,search_params,top_k=limit,output_fields=output_fields,limit=limit)
     for hits in result:
         for hit in hits:
             list_result.append(hit)
+
     return list_result
 
 if __name__ == '__main__':

+ 5 - 0
BaseDataMaintenance/dataSource/source.py

@@ -153,6 +153,11 @@ def getConnect_redis_doc():
                            db=7,password=REDIS_PASS)
     return db
 
+def getConnect_redis_product():
+    db = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT,
+                           db=8,password=REDIS_PASS)
+    return db
+
 if __name__=="__main__":
     # solrQuery("document",{"q":"*:*"})
     # getConnect_mongodb()

+ 11 - 1
BaseDataMaintenance/maintenance/3.py

@@ -1,4 +1,14 @@
 
 objectPath = "a//b/c"
 a = str(objectPath).replace("//","/")
-print(a)
+print(a)
+
+import pyautogui
+
+import time
+
+start_time =  time.time()
+for _ in range(10000):
+    pyautogui.size()
+print(time.time()-start_time)
+print(pyautogui.size())

+ 1 - 1
BaseDataMaintenance/maintenance/dataflow.py

@@ -2842,7 +2842,7 @@ class Dataflow_dumplicate(Dataflow):
 
     def flow_dumpcate_comsumer(self):
         from multiprocessing import Process
-        process_count = 2
+        process_count = 3
         thread_count = 30
         list_process = []
         def start_thread():

+ 0 - 1
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -710,7 +710,6 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
             log("getExtract_json_fromRedis error %s"%(str(e)))
         finally:
             try:
-
                 if db.connection.check_health():
                     self.pool_redis_doc.putConnector(db)
             except Exception as e:

+ 82 - 0
BaseDataMaintenance/maintenance/product/productUtils.py

@@ -5,6 +5,88 @@ from BaseDataMaintenance.maintenance.product.product_setting import *
 import Levenshtein
 import re
 # 判断是不是入参字符串为全中文
+
+from BaseDataMaintenance.dataSource.source import getConnect_redis_product
+from BaseDataMaintenance.dataSource.pool import ConnectorPool
+from BaseDataMaintenance.common.Utils import log
+from BaseDataMaintenance.common.documentFingerprint import getMD5
+from BaseDataMaintenance.common.milvusUtil import search_embedding
+
+pool_product = ConnectorPool(10,30,getConnect_redis_product)
+
+
+def get_milvus_search(coll,index_name,name,vector,search_params,output_fields,limit=3):
+
+    if name is None or name=="":
+        return None
+    db = pool_product.getConnector()
+    try:
+        _md5 = getMD5(str(name))+"_milvus"
+        _search_list = db.get(_md5)
+        if _search_list is not None:
+            return json.loads(_search_list)
+        else:
+            list_result = []
+            result = coll.search(vector,index_name,search_params,top_k=limit,output_fields=output_fields,limit=limit)
+            for hits in result:
+                for hit in hits:
+                    list_result.append(hit)
+
+            final_list = []
+            for _search in list_result:
+                _d = {}
+                for k in output_fields:
+                    _d[k] = _search.entity.get(k)
+                final_list.append(_d)
+            db.set(_md5,json.dumps(final_list))
+            db.expire(_md5,2*60)
+            return final_list
+
+    except Exception as e:
+        log("getExtract_json_fromRedis error %s"%(str(e)))
+        raise RuntimeError("get milvus search error")
+    finally:
+        try:
+            if db.connection.check_health():
+                pool_product.putConnector(db)
+        except Exception as e:
+            pass
+    return None
+
+
+
+    return list_result
+
+def get_embedding_request(sentence,retry_times=3):
+
+    if sentence is None or sentence=="":
+        return None
+    db = pool_product.getConnector()
+
+    try:
+        _md5 = getMD5(str(sentence))+"_embedding"
+        _embedding = db.get(_md5)
+        if _embedding is not None:
+            return json.loads(_embedding)
+        else:
+            _embedding = request_embedding(sentence,retry_times=retry_times)
+            if _embedding is not None:
+                db.set(_md5,json.dumps(_embedding))
+            return _embedding
+    except Exception as e:
+        log("getExtract_json_fromRedis error %s"%(str(e)))
+        raise RuntimeError("get embedding request error")
+    finally:
+        try:
+            if db.connection.check_health():
+                pool_product.putConnector(db)
+        except Exception as e:
+            pass
+    return None
+
+
+
+
 def judge_pur_chinese(keyword):
     """
     中文字符的编码范围为: u'\u4e00' -- u'\u9fff:只要在此范围内就可以判断为中文字符串

+ 67 - 19
BaseDataMaintenance/maintenance/product/product_dict.py

@@ -362,7 +362,10 @@ class Product_Dict_Manager():
 
         #update document_product_dict
         if original_id is None or original_id=="":
-            original_id = get_document_product_dict_id(parent_id,name)
+            if parent_id is None:
+                original_id = get_document_product_dict_id("",name)
+            else:
+                original_id = get_document_product_dict_id(parent_id,name)
         if parent_id is not None and parent_id!="":
             _d = {DOCUMENT_PRODUCT_DICT_ID:original_id,
                   DOCUMENT_PRODUCT_DICT_ALIAS:alias,
@@ -434,7 +437,7 @@ class Product_Dict_Manager():
             return
         _d = {DOCUMENT_PRODUCT_DICT_ID:original_id}
         dpd = Document_product_dict(_d)
-        if not dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_NAME,DOCUMENT_PRODUCT_DICT_ALIAS,DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS,DOCUMENT_PRODUCT_DICT_PARENT_ID],True):
+        if not dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_NAME,DOCUMENT_PRODUCT_DICT_ALIAS,DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS,DOCUMENT_PRODUCT_DICT_PARENT_ID,DOCUMENT_PRODUCT_DICT_CREATE_TIME,DOCUMENT_PRODUCT_DICT_UPDATE_TIME,DOCUMENT_PRODUCT_DICT_STATUS],True):
             return
 
         if parent_id is None or parent_id=="":
@@ -524,6 +527,7 @@ class Product_Dict_Manager():
         # update document_product_dict
         _d = {DOCUMENT_PRODUCT_DICT_ID:new_id,
               DOCUMENT_PRODUCT_DICT_NAME:name,
+              DOCUMENT_PRODUCT_DICT_GRADE:grade,
               DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
               DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS:standard_alias}
 
@@ -531,6 +535,12 @@ class Product_Dict_Manager():
             _d[DOCUMENT_PRODUCT_DICT_ALIAS] = alias
         if parent_id is not None and parent_id!="":
             _d[DOCUMENT_PRODUCT_DICT_PARENT_ID] = parent_id
+        if old_name!=name:
+            _d[DOCUMENT_PRODUCT_DICT_CREATE_TIME] = dpd.getProperties().get(DOCUMENT_PRODUCT_DICT_CREATE_TIME)
+            _d[DOCUMENT_PRODUCT_DICT_UPDATE_TIME] = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
+            _d[DOCUMENT_PRODUCT_DICT_STATUS] = dpd.getProperties().get(DOCUMENT_PRODUCT_DICT_STATUS)
+
+
 
         dpd = Document_product_dict(_d)
         dpd.update_row(self.ots_client)
@@ -539,30 +549,41 @@ class Product_Dict_Manager():
             # in the case of name changed ,delete the old name row
             _d = {DOCUMENT_PRODUCT_DICT_ID:original_id}
             dpd = Document_product_dict(_d)
-            dpd.delete_row(self.ots_client)
 
+            self.recurse_update_dict(original_id,new_id)
+
+            dpd.delete_row(self.ots_client)
             # change the next level parent_id
 
-            bool_query = BoolQuery(must_queries=[
-                TermQuery(DOCUMENT_PRODUCT_DICT_PARENT_ID,original_id)
-            ])
-            rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
-                                                                                SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_PARENT_ID)]),limit=100,get_total_count=True),
-                                                                                columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
 
-            list_data = getRow_ots(rows)
-            while next_token:
-                rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
-                                                                                    SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
-                                                                                    columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+    def recurse_update_dict(self,parent_id,new_parent_id):
+        bool_query = BoolQuery(must_queries=[
+            TermQuery(DOCUMENT_PRODUCT_DICT_PARENT_ID,parent_id)
+        ])
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict","document_product_dict_index",
+                                                                            SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_PARENT_ID)]),limit=100,get_total_count=True),
+                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
 
-                list_data.extend(getRow_ots(rows))
-            for _data in list_data:
-                dpd = Document_product_dict(_data)
-                dpd.setValue(DOCUMENT_PRODUCT_DICT_PARENT_ID,new_id,True)
-                dpd.update_row(self.ots_client)
+        list_data = getRow_ots(rows)
+        while next_token:
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict","document_product_dict_index",
+                                                                                SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+                                                                                columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
 
+            list_data.extend(getRow_ots(rows))
+        for _data in list_data:
+            dpd = Document_product_dict(_data)
+            old_id = dpd.getProperties().get(DOCUMENT_PRODUCT_DICT_ID)
+            new_id = get_document_product_dict_id(new_parent_id,dpd.getProperties().get(DOCUMENT_PRODUCT_DICT_NAME))
+
+            self.recurse_update_dict(old_id,new_id)
+
+            dpd.setValue(DOCUMENT_PRODUCT_DICT_PARENT_ID,new_parent_id,True)
+            dpd.setValue(DOCUMENT_PRODUCT_DICT_ID,new_id,True)
+            dpd.update_row(self.ots_client)
 
+            dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:old_id})
+            dpd.delete_row(self.ots_client)
 
     def act_delete(self,name,alias,grade,original_id,parent_id,standard_alias,create_time):
         #search records which name=name and grade=grade
@@ -597,10 +618,37 @@ class Product_Dict_Manager():
         log("delete document_product_dict name:%s grade:%s count:%s"%(str(name),str(grade),str(len(list_data))))
         for _data in list_data:
             id = _data.get(DOCUMENT_PRODUCT_DICT_ID)
+
+            self.recurse_delete_dict(id)
+
             _d = {DOCUMENT_PRODUCT_DICT_ID:id}
             dpd = Document_product_dict(_d)
             dpd.delete_row(self.ots_client)
 
+
+
+    def recurse_delete_dict(self,id):
+        bool_query = BoolQuery(must_queries=[
+            TermQuery(DOCUMENT_PRODUCT_DICT_PARENT_ID,id)
+        ])
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict","document_product_dict_index",
+                                                                            SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_PARENT_ID)]),limit=100,get_total_count=True),
+                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+
+        list_data = getRow_ots(rows)
+        while next_token:
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict","document_product_dict_index",
+                                                                                SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+                                                                                columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+
+            list_data.extend(getRow_ots(rows))
+        for _data in list_data:
+            _id = _data.get(DOCUMENT_PRODUCT_DICT_ID)
+            self.recurse_delete_dict(_id)
+            dpd = Document_product_dict(_data)
+            dpd.delete_row(self.ots_client)
+
+
     def embedding_interface_producer(self):
 
         bool_query = BoolQuery(must_queries=[

+ 128 - 77
BaseDataMaintenance/maintenance/product/products.py

@@ -113,53 +113,6 @@ class Product_Manager(Product_Dict_Manager):
         self.standardize(item)
 
 
-    def match_specs(self,specs):
-        bool_query = BoolQuery(must_queries=[
-            TermQuery(DOCUMENT_PRODUCT_DICT_NAME,specs),
-            TermQuery(DOCUMENT_PRODUCT_DICT_GRADE,SPECS_GRADE)
-        ])
-        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict","document_product_dict_index",
-                                                                            SearchQuery(bool_query,get_total_count=True))
-        if total_count>0:
-            new_specs = specs
-            return new_specs
-        else:
-            debug("getting sepcs %s"%(specs))
-            list_specs = []
-            c_specs = clean_product_specs(specs)
-            list_specs.append(c_specs)
-
-            for s in re.split("[\u4e00-\u9fff]",specs):
-                if s!="" and len(s)>4:
-                    list_specs.append(s)
-            similar_flag = None
-            _index = 0
-            break_flag = False
-            for c_specs in list_specs:
-                if break_flag:
-                    break
-                _index += 1
-                specs_vector = request_embedding(c_specs)
-
-                if specs_vector is not None:
-                    Coll,_ = self.get_collection(SPECS_GRADE)
-                    search_list = search_embedding(Coll,embedding_index_name,[specs_vector],self.search_params,output_fields,limit=60)
-
-                    for _search in search_list:
-
-                        ots_id = _search.entity.get("standard_name_id")
-                        ots_name = _search.entity.get("standard_name")
-                        ots_parent_id = _search.entity.get("ots_parent_id")
-
-                        debug("checking specs %s and %s"%(specs,ots_name))
-                        if is_similar(specs,ots_name):
-                            # log("specs is_similar")
-                            if check_specs(c_specs,ots_name):
-                                break_flag = True
-                                new_specs = ots_name
-                                return new_specs
-
-
 
     def standardize(self,tmp_dict,output_fields = ['ots_id','ots_name',"ots_parent_id","standard_name","standard_name_id"]):
         '''
@@ -229,7 +182,7 @@ class Product_Manager(Product_Dict_Manager):
                 name_vector = request_embedding(name)
                 if name_vector is not None:
                     Coll,_ = self.get_collection(NAME_GRADE)
-                    search_list = search_embedding(Coll,embedding_index_name,[name_vector],self.search_params,output_fields,limit=60)
+                    search_list = search_embedding(Coll,embedding_index_name,[name_vector],self.search_params,output_fields,limit=20)
 
                     for _search in search_list:
                         ots_id = _search.entity.get("standard_name_id")
@@ -352,24 +305,33 @@ class Product_Manager(Product_Dict_Manager):
                     if _find:
                         break
                     l_brand = [brand]
-                    l_brand.append(clean_product_brand(s_brand))
+                    l_brand.append(clean_product_brand(brand))
                     brand_ch = get_chinese_string(brand)
                     l_brand.extend(brand_ch)
 
                     for brand in l_brand:
                         if _find:
                             break
-                        brand_vector = request_embedding(brand)
+                        start_time = time.time()
+                        # brand_vector = request_embedding(brand)
+                        brand_vector = get_embedding_request(brand)
+                        log("get embedding for brand %s takes %.4fs"%(brand,time.time()-start_time))
                         if brand_vector is not None:
                             Coll,_ = self.get_collection(BRAND_GRADE)
-                            search_list = search_embedding(Coll,embedding_index_name,[brand_vector],self.search_params,output_fields,limit=60)
-
+                            start_time = time.time()
+                            # search_list = search_embedding(Coll,embedding_index_name,[brand_vector],self.search_params,output_fields,limit=10)
+                            search_list = get_milvus_search(Coll,embedding_index_name,brand,[brand_vector],self.search_params,output_fields,limit=10)
+                            log("get search_list for brand %s takes %.4fs"%(brand,time.time()-start_time))
                             # log("search brand %s"%(brand))
                             for _search in search_list:
 
-                                ots_id = _search.entity.get("standard_name_id")
-                                ots_name = _search.entity.get("standard_name")
-                                ots_parent_id = _search.entity.get("ots_parent_id")
+
+                                # ots_id = _search.entity.get("standard_name_id")
+                                # ots_name = _search.entity.get("standard_name")
+                                # ots_parent_id = _search.entity.get("ots_parent_id")
+                                ots_id = _search.get("standard_name_id")
+                                ots_name = _search.get("standard_name")
+                                ots_parent_id = _search.get("ots_parent_id")
 
                                 # log("check brand %s and %s"%(brand,ots_name))
                                 if is_similar(brand,ots_name,_radio=95) or check_brand(brand,ots_name):
@@ -419,6 +381,26 @@ class Product_Manager(Product_Dict_Manager):
                                                                                     SearchQuery(bool_query,get_total_count=True))
                 if total_count>0:
                     new_specs = specs
+
+                    if brand_ots_id is not None:
+                        # judge if the specs which parent_id is brand_ots_id exists,insert one if not exists else update alias
+                        specs_ots_id = get_document_product_dict_id(brand_ots_id,new_specs)
+
+                        _d_specs = {DOCUMENT_PRODUCT_DICT_ID:specs_ots_id,
+                                    DOCUMENT_PRODUCT_DICT_NAME:new_specs,
+                                    DOCUMENT_PRODUCT_DICT_ALIAS:"%s"%(str(specs).lower()),
+                                    DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
+                                    DOCUMENT_PRODUCT_DICT_STATUS:1,
+                                    DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
+                                    DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
+                                    DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+                                    DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+                                    }
+                        _dpd_specs = Document_product_dict(_d_specs)
+                        # _dpd_specs.updateAlias(str(new_specs).lower())
+                        if not _dpd_specs.exists_row(self.ots_client):
+                            _dpd_specs.update_row(self.ots_client)
+                            # user interface to add
                 else:
                     debug("getting sepcs %s"%(specs))
                     list_specs = []
@@ -566,9 +548,15 @@ class Product_Manager(Product_Dict_Manager):
             for specs in list_candidates:
                 if _find:
                     break
-                s = self.match_specs(specs)
-                if s is not None:
-                    new_specs = s
+                bool_query = BoolQuery(must_queries=[
+                    TermQuery(DOCUMENT_PRODUCT_DICT_NAME,specs),
+                    TermQuery(DOCUMENT_PRODUCT_DICT_GRADE,SPECS_GRADE)
+                ])
+                rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict","document_product_dict_index",
+                                                                                    SearchQuery(bool_query,get_total_count=True))
+                if total_count>0:
+                    new_specs = specs
+                    _find = True
                     if brand_ots_id is not None:
                         # judge if the specs which parent_id is brand_ots_id exists,insert one if not exists else update alias
                         specs_ots_id = get_document_product_dict_id(brand_ots_id,new_specs)
@@ -587,8 +575,62 @@ class Product_Manager(Product_Dict_Manager):
                         # _dpd_specs.updateAlias(str(new_specs).lower())
                         if not _dpd_specs.exists_row(self.ots_client):
                             _dpd_specs.update_row(self.ots_client)
-                        _find = True
-                        break
+                            # user interface to add
+                else:
+                    debug("getting sepcs %s"%(specs))
+                    list_specs = []
+                    c_specs = clean_product_specs(specs)
+                    list_specs.append(c_specs)
+
+                    for s in re.split("[\u4e00-\u9fff]",specs):
+                        if s!="" and len(s)>4:
+                            list_specs.append(s)
+                    similar_flag = None
+                    _index = 0
+                    for c_specs in list_specs:
+                        if _find:
+                            break
+                        _index += 1
+                        specs_vector = request_embedding(c_specs)
+
+                        if specs_vector is not None:
+                            Coll,_ = self.get_collection(SPECS_GRADE)
+                            search_list = search_embedding(Coll,embedding_index_name,[specs_vector],self.search_params,output_fields,limit=20)
+
+                            for _search in search_list:
+                                if _find:
+                                    break
+
+                                ots_id = _search.entity.get("standard_name_id")
+                                ots_name = _search.entity.get("standard_name")
+                                ots_parent_id = _search.entity.get("ots_parent_id")
+
+                                debug("checking specs %s and %s"%(specs,ots_name))
+                                if is_similar(specs,ots_name):
+                                    # log("specs is_similar")
+                                    if check_specs(c_specs,ots_name):
+                                        break_flag = True
+                                        new_specs = ots_name
+                                        if brand_ots_id is not None:
+                                            # judge if the specs which parent_id is brand_ots_id exists,insert one if not exists else update alias
+                                            specs_ots_id = get_document_product_dict_id(brand_ots_id,new_specs)
+
+                                            _d_specs = {DOCUMENT_PRODUCT_DICT_ID:specs_ots_id,
+                                                        DOCUMENT_PRODUCT_DICT_NAME:new_specs,
+                                                        DOCUMENT_PRODUCT_DICT_ALIAS:"%s"%(str(specs).lower()),
+                                                        DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
+                                                        DOCUMENT_PRODUCT_DICT_STATUS:1,
+                                                        DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
+                                                        DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
+                                                        DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+                                                        DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+                                                        }
+                                            _dpd_specs = Document_product_dict(_d_specs)
+                                            # _dpd_specs.updateAlias(str(new_specs).lower())
+                                            if not _dpd_specs.exists_row(self.ots_client):
+                                                _dpd_specs.update_row(self.ots_client)
+                                            _find = True
+                                            break
 
         # judge if the product matches the standard product
         if name_ots_id is not None:
@@ -612,27 +654,34 @@ class Product_Manager(Product_Dict_Manager):
 
 
             if isinstance(unit_price,(float,int)) and isinstance(quantity,(float,int)) and isinstance(total_price,(float,int)):
-                new_quantity = total_price/unit_price
-                if new_quantity!=quantity:
-                    if new_quantity==total_price//unit_price:
-                        quantity = int(new_quantity)
-                        _product.setValue(DOCUMENT_PRODUCT_QUANTITY,quantity,True)
-                    else:
-                        is_legal_data = False
+                if unit_price>0:
+                    new_quantity = total_price/unit_price
+                    if new_quantity!=quantity:
+                        if new_quantity==total_price//unit_price:
+                            quantity = int(new_quantity)
+                            _product.setValue(DOCUMENT_PRODUCT_QUANTITY,quantity,True)
+                        else:
+                            is_legal_data = False
+                elif quantity>0:
+                    unit_price = total_price/quantity
+                    _product.setValue(DOCUMENT_PRODUCT_UNIT_PRICE,unit_price,True)
+
             elif isinstance(unit_price,(float,int)) and isinstance(quantity,(float,int)):
                 total_price = float("%.2f"%(unit_price*quantity))
                 _product.setValue(DOCUMENT_PRODUCT_TOTAL_PRICE,total_price,True)
             elif isinstance(unit_price,(float,int)) and isinstance(total_price,(float,int)):
-                quantity = int(total_price//unit_price)
-                _product.setValue(DOCUMENT_PRODUCT_QUANTITY,quantity,True)
+                if unit_price>0:
+                    quantity = int(total_price//unit_price)
+                    _product.setValue(DOCUMENT_PRODUCT_QUANTITY,quantity,True)
             elif isinstance(quantity,(float,int)) and isinstance(total_price,(float,int)):
-                unit_price = float("%.2f"%(total_price/quantity))
-                _product.setValue(DOCUMENT_PRODUCT_UNIT_PRICE,unit_price,True)
+                if quantity>0:
+                    unit_price = float("%.2f"%(total_price/quantity))
+                    _product.setValue(DOCUMENT_PRODUCT_UNIT_PRICE,unit_price,True)
             elif isinstance(quantity,(float,int)) and quantity>10000:
                 is_legal_data = False
 
             if isinstance(_product.getProperties().get(DOCUMENT_PRODUCT_TOTAL_PRICE),(float,int)) and isinstance(win_bid_price,(float,int)):
-                if _product.getProperties().get(DOCUMENT_PRODUCT_TOTAL_PRICE)>win_bid_price:
+                if _product.getProperties().get(DOCUMENT_PRODUCT_TOTAL_PRICE)>win_bid_price*10:
                     is_legal_data = False
 
             if isinstance(_product.getProperties().get(DOCUMENT_PRODUCT_UNIT_PRICE),(float,int)) and _product.getProperties().get(DOCUMENT_PRODUCT_UNIT_PRICE)>100000000:
@@ -956,8 +1005,10 @@ def fix_product_data():
     :return:
     '''
     ots_client = getConnect_ots()
-    bool_query = BoolQuery(must_queries=[RangeQuery("status",1)
-                                         ])
+    bool_query = BoolQuery(must_queries=[
+        # RangeQuery("status",1)
+        TermQuery("docid",246032980)
+    ])
 
     rows,next_token,total_count,is_all_succeed = ots_client.search("document_product","document_product_index",
                                                                    SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
@@ -1022,7 +1073,7 @@ def fix_product_data():
     def handle(item,result_queue):
         print("handle")
 
-    mt = MultiThreadHandler(task_queue,handle,None,30,1)
+    mt = MultiThreadHandler(task_queue,deleteAndReprocess,None,30,1)
     mt.run()
 
 def test_check_brand():
@@ -1089,9 +1140,9 @@ def test_match():
 def test():
     # pm = Product_Manager()
     # pm.test()
-    # fix_product_data()
+    fix_product_data()
     # test_check_brand()
-    test_match()
+    # test_match()
 
 if __name__ == '__main__':