Ver código fonte

数据遗漏检查后自动同步

luojiehua 6 meses atrás
pai
commit
24632dfe69

+ 42 - 1
BaseDataMaintenance/dataMonitor/data_monitor.py

@@ -20,7 +20,6 @@ from BaseDataMaintenance.common.multiThread import MultiThreadHandler
 
 from BaseDataMaintenance.maintenance.dataflow_settings import *
 
-from BaseDataMaintenance.maintenance.dataflow_mq import fixDoc_to_queue_extract,fixDoc_to_queue_init
 
 import pandas as pd
 
@@ -37,6 +36,48 @@ flow_init_check_dir = "/data/python/flow_init_check"
 flow_dumplicate_log_path = "/home/appuser/python/flow_dumplicate.log"
 
 
+def fixDoc_to_queue_init(filename=""):
+    import pandas as pd
+    from BaseDataMaintenance.model.oracle.GongGaoTemp import dict_oracle2ots
+    from BaseDataMaintenance.model.oracle.TouSuTemp import dict_oracle2ots as dict_oracle2ots_tousu
+
+    from BaseDataMaintenance.dataSource.source import getConnection_oracle
+    current_path = os.path.abspath(os.path.dirname(__file__))
+    if filename=="":
+        filename = os.path.join(current_path,"check.xlsx")
+    df = pd.read_excel(filename)
+    if "docchannel" in dict_oracle2ots:
+        dict_oracle2ots.pop("docchannel")
+    row_name = ",".join(list(dict_oracle2ots.keys()))
+
+    list_tousu_keys = []
+    for k,v in dict_oracle2ots_tousu.items():
+        if str(k).isupper():
+            list_tousu_keys.append(k)
+    row_name_tousu = ",".join(list(list_tousu_keys))
+    conn = getConnection_oracle()
+    cursor = conn.cursor()
+    _count = 0
+    for uuid,tablename,_exists,_toolong in zip(df["uuid"],df["tablename"],df["exists"],df["tolong"]):
+        if _exists==0 and _toolong==0:
+            _count += 1
+            is_tousu = False
+            if tablename in ('bxkc.t_wei_fa_ji_lu_temp','bxkc.t_tou_su_chu_li_temp','bxkc.t_qi_ta_shi_xin_temp'):
+                is_tousu = True
+            _source = str(tablename).replace("_TEMP","")
+            if is_tousu:
+                _source = str(tablename).replace("_temp","")
+            _rowname = row_name_tousu if is_tousu else row_name
+
+            sql = " insert into %s(%s) select %s from %s where id='%s' "%(tablename,_rowname,_rowname,_source,uuid)
+            log("%d:%s"%(_count,sql))
+            cursor.execute(sql)
+
+    conn.commit()
+    conn.close()
+
+    return _count
+
 class BaseDataMonitor():
 
     def __init__(self):

+ 2 - 1
BaseDataMaintenance/fixDoc_to_queue_extract.py

@@ -3,7 +3,8 @@ import sys,os
 
 sys.path.append(os.path.dirname(__file__)+"/..")
 
-from BaseDataMaintenance.maintenance.dataflow_mq import fixDoc_to_queue_extract,fixDoc_to_queue_init
+from BaseDataMaintenance.maintenance.dataflow_mq import fixDoc_to_queue_extract
+from BaseDataMaintenance.dataMonitor.data_monitor import fixDoc_to_queue_init
 
 
 if __name__ == '__main__':

+ 0 - 37
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -1948,44 +1948,7 @@ def check_data_synchronization():
 
 current_path = os.path.abspath(os.path.dirname(__file__))
 
-def fixDoc_to_queue_init(filename=""):
-    import pandas as pd
-    from BaseDataMaintenance.model.oracle.GongGaoTemp import dict_oracle2ots
-    from BaseDataMaintenance.model.oracle.TouSuTemp import dict_oracle2ots as dict_oracle2ots_tousu
-    if filename=="":
-        filename = os.path.join(current_path,"check.xlsx")
-    df = pd.read_excel(filename)
-    if "docchannel" in dict_oracle2ots:
-        dict_oracle2ots.pop("docchannel")
-    row_name = ",".join(list(dict_oracle2ots.keys()))
-
-    list_tousu_keys = []
-    for k,v in dict_oracle2ots_tousu.items():
-        if str(k).isupper():
-            list_tousu_keys.append(k)
-    row_name_tousu = ",".join(list(list_tousu_keys))
-    conn = getConnection_oracle()
-    cursor = conn.cursor()
-    _count = 0
-    for uuid,tablename,_exists,_toolong in zip(df["uuid"],df["tablename"],df["exists"],df["tolong"]):
-        if _exists==0 and _toolong==0:
-            _count += 1
-            is_tousu = False
-            if tablename in ('bxkc.t_wei_fa_ji_lu_temp','bxkc.t_tou_su_chu_li_temp','bxkc.t_qi_ta_shi_xin_temp'):
-                is_tousu = True
-            _source = str(tablename).replace("_TEMP","")
-            if is_tousu:
-                _source = str(tablename).replace("_temp","")
-            _rowname = row_name_tousu if is_tousu else row_name
-
-            sql = " insert into %s(%s) select %s from %s where id='%s' "%(tablename,_rowname,_rowname,_source,uuid)
-            log("%d:%s"%(_count,sql))
-            cursor.execute(sql)
-
-    conn.commit()
-    conn.close()
 
-    return _count
 
 if __name__ == '__main__':
     # di = Dataflow_init()