Browse Source

优化合并规则;调试线上json格式问题

luojiehua 2 years ago
parent
commit
a96ca40930

+ 17 - 12
BaseDataMaintenance/maintenance/dataflow.py

@@ -3597,9 +3597,12 @@ class Dataflow_dumplicate(Dataflow):
 
                 _step = 4
                 _begin = 0
-                must_queries = [RangeQuery(project_page_time,page_time_less,page_time_greater,True,True),
+                must_queries = []
+                if page_time_less is not None and page_time_greater is not None:
+                    must_queries = [RangeQuery(project_page_time,page_time_less,page_time_greater,True,True),
                                 ]
 
+                print("page_time_less,page_time_greater",page_time,page_time_less,page_time_greater)
                 #sub_project_name非必要条件
                 # if sub_project_q is not None:
                 #     must_queries.append(sub_project_q)
@@ -3654,16 +3657,18 @@ class Dataflow_dumplicate(Dataflow):
 
                 list_check_data.sort(key=lambda x:x[1],reverse=True)
                 for _data,_ in list_check_data:
-                        _time = time.time()
-                        _check,_prob = check_merge_rule(_proj,_data,b_log=b_log,return_prob=True)
-                        if _check:
-                            o_proj = Project(_data)
-                            o_proj.fix_columns(self.ots_client,fix_columns,True)
-                            for k in fix_columns:
-                                _data[k] = o_proj.getProperties().get(k)
+                    _time = time.time()
+                    _check,_prob = check_merge_rule(_proj,_data,b_log=b_log,return_prob=True)
+                    projects_check_rule_time += time.time()-_time
+                    _time = time.time()
+                    if _check:
+                        o_proj = Project(_data)
+                        o_proj.fix_columns(self.ots_client,fix_columns,True)
+                        for k in fix_columns:
+                            _data[k] = o_proj.getProperties().get(k)
 
-                            update_projects_by_project(_data,[_proj])
-                            projects_update_time += time.time()-_time
+                        update_projects_by_project(_data,[_proj])
+                        projects_update_time += time.time()-_time
 
             whole_time = time.time()-whole_time_start
             log("merge_project whole_time:%.3f projects_prepare_time:%.3f projects_query_time:%.3f projects_merge_count:%d rules%d projects_check_rule_time %.3f projects_update_time %.3f"%(whole_time,projects_prepare_time,projects_query_time,projects_merge_count,len(list_must_query),projects_check_rule_time,projects_update_time))
@@ -3718,7 +3723,7 @@ class Dataflow_dumplicate(Dataflow):
 
         _time = time.time()
         dumplicate_document_in_merge(list_projects)
-        # log("dumplicate document %d takes:%.3f"%(len(list_projects),time.time()-_time))
+        log("dumplicate document %d takes:%.3f"%(len(list_projects),time.time()-_time))
 
         _time = time.time()
         project_json = to_project_json(list_projects)
@@ -4029,7 +4034,7 @@ if __name__ == '__main__':
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
     a = time.time()
-    df_dump.test_dumplicate(292069783)
+    df_dump.test_dumplicate(292444835)
     print("takes",time.time()-a)
     # df_dump.fix_doc_which_not_in_project()
     # df_dump.delete_projects_by_document(16288036)

+ 2 - 0
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -851,6 +851,8 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
                     dtmp.update_row(self.ots_client)
                     dhtml.update_row(self.ots_client)
                     _extract.setValue(document_extract2_status,random.randint(1,50),True)
+                    if _docid==290544305:
+                        _extract.setValue(document_extract2_status,-1,True)
                     _extract.update_row(self.ots_client)
                     _to_ack = True
             except Exception:

+ 34 - 32
BaseDataMaintenance/maxcompute/documentMerge.py

@@ -2822,37 +2822,39 @@ def dumplicate_document_in_merge(list_projects):
                 extract_count = _d.get(document_tmp_extract_count,0)
                 docchannel = _d.get(document_docchannel,0)
                 page_time = _d.get(document_page_time,"")
-                if docchannel>0:
-                    if docchannel in dict_channel_proj:
-                        n_d = dict_channel_proj[docchannel]
-                        n_docid = n_d.get(document_docid)
-                        n_is_multipack = n_d.get("is_multipack",True)
-                        n_extract_count = n_d.get(document_tmp_extract_count,0)
-                        n_page_time = n_d.get(document_page_time,"")
-                        if docid==n_docid:
-                            continue
-                        if not check_page_time_dup(page_time,n_page_time):
-                            continue
-
-                        if extract_count>n_extract_count:
-                            n_d[document_status] = 401
-                            set_dup_docid.add(str(n_docid))
-                            dict_channel_proj[docchannel] = _d
-                        elif extract_count==n_extract_count:
-                            if int(n_docid)>int(docid):
-                                n_d[document_status] = 401
-                                set_dup_docid.add(str(n_docid))
-                                dict_channel_proj[docchannel] = _d
-                            elif int(n_docid)<int(docid):
-                                _d[document_status] = 401
-                                set_dup_docid.add(str(docid))
-                        else:
-                            _d[document_status] = 401
-                            set_dup_docid.add(str(docid))
-                        if not is_multipack and not n_is_multipack:
-                            pass
-                    else:
-                        dict_channel_proj[docchannel] = _d
+                if _status>=401 and _status<=450:
+                    set_dup_docid.add(str(docid))
+                # if docchannel>0:
+                #     if docchannel in dict_channel_proj:
+                #         n_d = dict_channel_proj[docchannel]
+                #         n_docid = n_d.get(document_docid)
+                #         n_is_multipack = n_d.get("is_multipack",True)
+                #         n_extract_count = n_d.get(document_tmp_extract_count,0)
+                #         n_page_time = n_d.get(document_page_time,"")
+                #         if docid==n_docid:
+                #             continue
+                #         if not check_page_time_dup(page_time,n_page_time):
+                #             continue
+                #
+                #         if extract_count>n_extract_count:
+                #             n_d[document_status] = 401
+                #             set_dup_docid.add(str(n_docid))
+                #             dict_channel_proj[docchannel] = _d
+                #         elif extract_count==n_extract_count:
+                #             if int(n_docid)>int(docid):
+                #                 n_d[document_status] = 401
+                #                 set_dup_docid.add(str(n_docid))
+                #                 dict_channel_proj[docchannel] = _d
+                #             elif int(n_docid)<int(docid):
+                #                 _d[document_status] = 401
+                #                 set_dup_docid.add(str(docid))
+                #         else:
+                #             _d[document_status] = 401
+                #             set_dup_docid.add(str(docid))
+                #         if not is_multipack and not n_is_multipack:
+                #             pass
+                #     else:
+                #         dict_channel_proj[docchannel] = _d
 
             set_docids = set_docids-set_dup_docid
             if len(set_docids)==0:
@@ -2897,7 +2899,7 @@ class f_dumplicate_projects(BaseUDAF):
             set_uuid.add(uuid_1)
 
         list_projects = dumplicate_projects(list_data,False)
-        dumplicate_document_in_merge(list_projects)
+        # dumplicate_document_in_merge(list_projects)
 
         project_json = to_project_json(list_projects)
 

+ 174 - 6
BaseDataMaintenance/test/ab.py

@@ -1,12 +1,180 @@
 
 
-from BaseDataMaintenance.model.ots.project import *
+from BaseDataMaintenance.model.ots.document_extract2 import *
 
 from BaseDataMaintenance.dataSource.source import getConnect_ots
 
+
+import json
 ots_client = getConnect_ots()
-_d = {project_uuid:"f9afc516-7ae1-4ddf-83f3-395f294b288a",
-      project_docids:"288256566,288133788,288157306,288137729",
-      project_docid_number:4}
-_p = Project(_d)
-_p.update_row(ots_client)
+docid = 290544305
+a = {"name":"双腿链条索具&DSL-WLL3TON\载荷3t"}
+test_str = json.dumps(a,ensure_ascii=False)
+print(test_str)
+
+_d = {document_extract2_docid:docid,
+      document_extract2_partitionkey:docid%500+1,
+      "test_str":test_str}
+_extract = Document_extract(_d)
+
+
+_json = '''{
+    "attachmentTypes": "xls",
+    "bidway": "公开招标",
+    "candidate": "",
+    "code": [
+        "8110-XJ-2212-25925",
+        "GG-221215-01433",
+        "200000205415",
+        "200000205774"
+    ],
+    "cost_time": {
+        "attrs": 10.43,
+        "codename": 0.7,
+        "deposit": 0.0,
+        "district": 0.08,
+        "moneygrade": 0.0,
+        "nerToken": 0.76,
+        "person": 10.31,
+        "prem": 2.04,
+        "preprocess": 6.62,
+        "product": 0.35,
+        "product_attrs": 0.01,
+        "roleRuleFinal": 0.01,
+        "rolegrade": 0.0,
+        "rule": 0.01,
+        "rule_channel": 0.26,
+        "tableToText": 5.75,
+        "tendereeRuleRecall": 0.01,
+        "time": 0.41,
+        "total_unit_money": 0.0
+    },
+    "demand_info": {
+        "data": [],
+        "header": [],
+        "header_col": []
+    },
+    "deposit_patment_way": "",
+    "district": {
+        "area": "西南",
+        "city": "未知",
+        "district": "未知",
+        "is_in_text": false,
+        "province": "云南"
+    },
+    "docchannel": {
+        "docchannel": "招标公告",
+        "doctype": "采招数据",
+        "life_docchannel": "招标公告"
+    },
+    "docid": "",
+    "doctitle_refine": "",
+    "exist_table": 1,
+    "extract_count": 3,
+    "fail_reason": "",
+    "fingerprint": "md5=c0e93670c3c537c43429fce633536233",
+    "industry": {
+        "class": "零售批发",
+        "class_name": "其他交通运输设备",
+        "subclass": "专用设备"
+    },
+    "match_enterprise": [],
+    "match_enterprise_type": 0,
+    "moneysource": "",
+    "name": "双腿链条索具&DSL-WLL3TON\\\\载荷3t",
+    "nlp_enterprise": [
+        "云南锡业集团物流有限公司",
+        "云南锡业集团"
+    ],
+    "nlp_enterprise_attachment": [],
+    "person_review": [],
+    "prem": {
+        "Project": {
+            "code": "",
+            "roleList": [
+                {
+                    "address": "",
+                    "linklist": [
+                        [
+                            "郑雯",
+                            ""
+                        ]
+                    ],
+                    "role_money": {
+                        "discount_ratio": "",
+                        "downward_floating_ratio": "",
+                        "floating_ratio": "",
+                        "money": 0,
+                        "money_unit": ""
+                    },
+                    "role_name": "tenderee",
+                    "role_text": "云南锡业集团物流有限公司",
+                    "serviceTime": ""
+                }
+            ],
+            "tendereeMoney": 0,
+            "tendereeMoneyUnit": ""
+        }
+    },
+    "process_time": "2022-12-19 17:02:23",
+    "product": [
+        "吊链"
+    ],
+    "product_attrs": {
+        "data": [
+            {
+                "brand": "",
+                "product": "双腿链条索具&DSL-WLL3TON/载荷3t",
+                "quantity": "4",
+                "quantity_unit": "付",
+                "specs": "DSL-WLL3TON/载荷3T",
+                "unitPrice": ""
+            },
+            {
+                "brand": "",
+                "product": "四钩吊链&2t/链长500mm",
+                "quantity": "1",
+                "quantity_unit": "个",
+                "specs": "2T/链长500MM",
+                "unitPrice": ""
+            }
+        ],
+        "header": [
+            "标的名称_数量_单位___规格型号"
+        ],
+        "header_col": [
+            "标的编码_标的名称_规格型号_单位_数量"
+        ]
+    },
+    "serviceTime": "",
+    "success": true,
+    "time_bidclose": "",
+    "time_bidopen": "2022-12-19",
+    "time_bidstart": "",
+    "time_commencement": "",
+    "time_completion": "",
+    "time_earnestMoneyEnd": "",
+    "time_earnestMoneyStart": "",
+    "time_getFileEnd": "",
+    "time_getFileStart": "",
+    "time_listingEnd": "",
+    "time_listingStart": "",
+    "time_publicityEnd": "",
+    "time_publicityStart": "",
+    "time_registrationEnd": "2022-12-19",
+    "time_registrationStart": "",
+    "time_release": "2022-12-15",
+    "total_tendereeMoney": 0,
+    "total_tendereeMoneyUnit": "",
+    "version_date": "2022-12-13"
+}
+'''
+
+_extract.setValue(document_extract2_extract_json,"{}",True)
+_extract.setValue(document_extract2_industry_json,"{}",True)
+_extract.setValue(document_extract2_status,1,True)
+_extract.setValue(document_extract2_extract_json,_json,True)
+_extract.update_row(ots_client)
+
+#
+# _p.update_row(ots_client)