123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- """
- Deep Deterministic Policy Gradient (DDPG), Reinforcement Learning.
- DDPG is Actor Critic based algorithm.
- Pendulum example.
- View more on my tutorial page: https://morvanzhou.github.io/tutorials/
- Using:
- tensorflow 1.0
- gym 0.8.0
- """
- #######################################################################
- # Copyright (C) #
- # 2016 - 2019 Pinard Liu(liujianping-ok@163.com) #
- # https://www.cnblogs.com/pinard #
- # Permission given to modify the code as long as you keep this #
- # declaration at the top #
- #######################################################################
- ## https://www.cnblogs.com/pinard/p/10345762.html.html ##
- ## 强化学习(十六) 深度确定性策略梯度(DDPG) ##
- import tensorflow as tf
- import numpy as np
- import gym
- import time
- ##################### hyper parameters ####################
- MAX_EPISODES = 2000
- MAX_EP_STEPS = 200
- LR_A = 0.001 # learning rate for actor
- LR_C = 0.002 # learning rate for critic
- GAMMA = 0.9 # reward discount
- TAU = 0.01 # soft replacement
- MEMORY_CAPACITY = 10000
- BATCH_SIZE = 32
- RENDER = False
- ENV_NAME = 'Pendulum-v0'
- ############################### DDPG ####################################
- class DDPG(object):
- def __init__(self, a_dim, s_dim, a_bound,):
- self.memory = np.zeros((MEMORY_CAPACITY, s_dim * 2 + a_dim + 1), dtype=np.float32)
- self.pointer = 0
- self.sess = tf.Session()
- self.a_dim, self.s_dim, self.a_bound = a_dim, s_dim, a_bound,
- self.S = tf.placeholder(tf.float32, [None, s_dim], 's')
- self.S_ = tf.placeholder(tf.float32, [None, s_dim], 's_')
- self.R = tf.placeholder(tf.float32, [None, 1], 'r')
- with tf.variable_scope('Actor'):
- self.a = self._build_a(self.S, scope='eval', trainable=True)
- a_ = self._build_a(self.S_, scope='target', trainable=False)
- with tf.variable_scope('Critic'):
- # assign self.a = a in memory when calculating q for td_error,
- # otherwise the self.a is from Actor when updating Actor
- q = self._build_c(self.S, self.a, scope='eval', trainable=True)
- q_ = self._build_c(self.S_, a_, scope='target', trainable=False)
- # networks parameters
- self.ae_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='Actor/eval')
- self.at_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='Actor/target')
- self.ce_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='Critic/eval')
- self.ct_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='Critic/target')
- # target net replacement
- self.soft_replace = [tf.assign(t, (1 - TAU) * t + TAU * e)
- for t, e in zip(self.at_params + self.ct_params, self.ae_params + self.ce_params)]
- q_target = self.R + GAMMA * q_
- # in the feed_dic for the td_error, the self.a should change to actions in memory
- td_error = tf.losses.mean_squared_error(labels=q_target, predictions=q)
- self.ctrain = tf.train.AdamOptimizer(LR_C).minimize(td_error, var_list=self.ce_params)
- a_loss = - tf.reduce_mean(q) # maximize the q
- self.atrain = tf.train.AdamOptimizer(LR_A).minimize(a_loss, var_list=self.ae_params)
- self.sess.run(tf.global_variables_initializer())
- def choose_action(self, s):
- return self.sess.run(self.a, {self.S: s[np.newaxis, :]})[0]
- def learn(self):
- # soft target replacement
- self.sess.run(self.soft_replace)
- indices = np.random.choice(MEMORY_CAPACITY, size=BATCH_SIZE)
- bt = self.memory[indices, :]
- bs = bt[:, :self.s_dim]
- ba = bt[:, self.s_dim: self.s_dim + self.a_dim]
- br = bt[:, -self.s_dim - 1: -self.s_dim]
- bs_ = bt[:, -self.s_dim:]
- self.sess.run(self.atrain, {self.S: bs})
- self.sess.run(self.ctrain, {self.S: bs, self.a: ba, self.R: br, self.S_: bs_})
- def store_transition(self, s, a, r, s_):
- transition = np.hstack((s, a, [r], s_))
- index = self.pointer % MEMORY_CAPACITY # replace the old memory with new memory
- self.memory[index, :] = transition
- self.pointer += 1
- def _build_a(self, s, scope, trainable):
- with tf.variable_scope(scope):
- net = tf.layers.dense(s, 30, activation=tf.nn.relu, name='l1', trainable=trainable)
- a = tf.layers.dense(net, self.a_dim, activation=tf.nn.tanh, name='a', trainable=trainable)
- return tf.multiply(a, self.a_bound, name='scaled_a')
- def _build_c(self, s, a, scope, trainable):
- with tf.variable_scope(scope):
- n_l1 = 30
- w1_s = tf.get_variable('w1_s', [self.s_dim, n_l1], trainable=trainable)
- w1_a = tf.get_variable('w1_a', [self.a_dim, n_l1], trainable=trainable)
- b1 = tf.get_variable('b1', [1, n_l1], trainable=trainable)
- net = tf.nn.relu(tf.matmul(s, w1_s) + tf.matmul(a, w1_a) + b1)
- return tf.layers.dense(net, 1, trainable=trainable) # Q(s,a)
- ############################### training ####################################
- env = gym.make(ENV_NAME)
- env = env.unwrapped
- env.seed(1)
- s_dim = env.observation_space.shape[0]
- a_dim = env.action_space.shape[0]
- a_bound = env.action_space.high
- ddpg = DDPG(a_dim, s_dim, a_bound)
- var = 3 # control exploration
- t1 = time.time()
- for episode in range(MAX_EPISODES):
- s = env.reset()
- ep_reward = 0
- for j in range(MAX_EP_STEPS):
- if RENDER:
- env.render()
- # Add exploration noise
- a = ddpg.choose_action(s)
- a = np.clip(np.random.normal(a, var), -2, 2) # add randomness to action selection for exploration
- s_, r, done, info = env.step(a)
- ddpg.store_transition(s, a, r / 10, s_)
- if ddpg.pointer > MEMORY_CAPACITY:
- var *= .9995 # decay the action randomness
- ddpg.learn()
- s = s_
- ep_reward += r
- if j == MAX_EP_STEPS-1:
- print('Episode:', episode, ' Reward: %i' % int(ep_reward), 'Explore: %.2f' % var, )
- # if ep_reward > -300:RENDER = True
- break
- if episode % 100 == 0:
- total_reward = 0
- for i in range(10):
- state = env.reset()
- for j in range(MAX_EP_STEPS):
- env.render()
- action = ddpg.choose_action(state) # direct action for test
- state,reward,done,_ = env.step(action)
- total_reward += reward
- if done:
- break
- ave_reward = total_reward/300
- print ('episode: ',episode,'Evaluation Average Reward:',ave_reward)
- print('Running time: ', time.time() - t1)
|