PAI-TF_app.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. import tensorflow as tf
  2. import sys
  3. import os
  4. import json
  5. import re
  6. import time
  7. import uuid
  8. from BiddingKG.dl.common.Utils import log
  9. import BiddingKG.dl.interface.predictor as predictor
  10. import BiddingKG.dl.interface.Preprocessing as Preprocessing
  11. import BiddingKG.dl.interface.getAttributes as getAttributes
  12. import BiddingKG.dl.entityLink.entityLink as entityLink
  13. import numpy as np
  14. import ctypes
  15. import inspect
  16. from threading import Thread
  17. import traceback
  18. os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
  19. os.environ["CUDA_VISIBLE_DEVICES"] = ""
  20. sys.path.append(os.path.abspath(".."))
  21. tf.app.flags.DEFINE_string("tables", "", "tables info")
  22. FLAGS = tf.app.flags.FLAGS
  23. print("tables:" + FLAGS.tables)
  24. tables = [FLAGS.tables]
  25. filename_queue = tf.train.string_input_producer(tables, num_epochs=1)
  26. reader = tf.TableRecordReader()
  27. key, value = reader.read(filename_queue)
  28. record_defaults = [[1.0], [1.0], [1.0], [1.0], ["Iris-virginica"]]
  29. col1, col2, col3, col4, col5 = tf.decode_csv(value, record_defaults = record_defaults)
  30. # line 9 and 10 can be written like below for short. It will be helpful when too many columns exist.
  31. # record_defaults = [[1.0]] * 4 + [["Iris-virginica"]]
  32. # value_list = tf.decode_csv(value, record_defaults = record_defaults)
  33. writer = tf.TableRecordWriter("odps://demo_show/tables/iris_output")
  34. write_to_table = writer.write([0, 1, 2, 3, 4], [col1, col2, col3, col4, col5])
  35. # line 16 can be written like below for short. It will be helpful when too many columns exist.
  36. # write_to_table = writer.write(range(5), value_list)
  37. close_table = writer.close()
  38. init = tf.global_variables_initializer()
  39. with tf.Session() as sess:
  40. sess.run(init)
  41. sess.run(tf.local_variables_initializer())
  42. coord = tf.train.Coordinator()
  43. threads = tf.train.start_queue_runners(coord=coord)
  44. try:
  45. step = 0
  46. while not coord.should_stop():
  47. step += 1
  48. sess.run(write_to_table)
  49. except tf.errors.OutOfRangeError:
  50. print('%d records copied' % step)
  51. finally:
  52. sess.run(close_table)
  53. coord.request_stop()
  54. coord.join(threads)