Q-learning
思路
Value function:
Q-learning求解最优策略的方法就是直接选Q-value最大的动作,即
则有
将$V^*(s) = \max Q^\pi(s,a)$带入动作价值函数中,有:
思路
Value function:
Q-learning求解最优策略的方法就是直接选Q-value最大的动作,即
则有
将$V^*(s) = \max Q^\pi(s,a)$带入动作价值函数中,有:
我们希望式子(1)能够近似成立,即目标函数为:
因而Q-table的更新可以转换为regression问题,进而可以按下式进行更新:
注意:训练前期Agent被期望尽可能地在未知环境中进行探索以更新Q-table,因而设定一个$\epsilon$-greedy机制,并采用random()获得随机数,当所获得的随机数大于$\epsilon$时任选动作,当小于$\epsilon$时则选择Q-value中最大值对应的动作。$\epsilon$的值应该随着训练的进行而变小,前期需要尽可能地探索(Exploation),而后期则更注重利用(Eploitation).
- 探索机制两种实现方法
- $\epsilon-greedy$
- Boltzmann Exporation
算法流程图:
实例代码:
import numpy as np
import pandas as pd
import time
np.random.seed(2) # reproducible,产生伪随机数
N_STATES = 6 # state space
ACTIONS = ['left', 'right'] # action space
EPSILON = 0.9 # greedy police
ALPHA = 0.1 # learning rate
LAMBDA = 0.9 # discount rate for reward
MAX_EPISODES = 30 # maximum episodes, 玩的回合数
FRESH_TIME = 0.3 # 每个Episode结束后休息的时长
def build_q_table(n_states, actions): # 创建Q-table
table = pd.DataFrame(
np.zeros((n_states, len(actions))), # q_table initial values as 0
columns=actions, # 列标签为action
)
# print(table) # show table
return table
#build_q_table(N_STATES, ACTIONS)
def choose_action(state, q_table):
state_actions = q_table.iloc[state, :] # 将q_table中某一行赋值到state_actions中
if(np.random.uniform() > EPSILON) or (state_actions.all() == 0):
# action-greedy or state-action(判断是否为第一步)
action_name = np.random.choice(ACTIONS)
else:
action_name = state_actions.idxmax() # 取Q-table中当前行中的大者对应的标签
return action_name # 返回标签名,即动作名left or right
def get_any_feedback(S, A):
# This is how agent will interact with the environment
if A == 'right':
if S == N_STATES - 2: # 当前到了终点前一格,如果还是往右就是终点
S_ = 'terminal'
R = 1
else: # 位置加1
S_ = S + 1
R = 0
else:
R = 0
if S == 0: # 在第一格,不能再往左
S_ = S
else: # 位置减1
S_ = S - 1
return S_, R
def update_env(S, episode, step_counter): # 环境更新,env_step
env_list = ['-']*(N_STATES-1) + ['T']
if S == 'terminal':
interaction = 'Episode %s: total_steps = %s' % (episode+1, step_counter)
print('\r{}'. format(interaction), end='')
time.sleep(2)
print('\r ', end='')
else:
env_list[S] = 'o'
interaction = ''.join(env_list)
print('\r{}'.format(interaction), end='')
time.sleep(FRESH_TIME)
def RL():
q_table = build_q_table(N_STATES, ACTIONS) # build q table
# train
for episode in range(MAX_EPISODES):
step_counter = 0
S = 0 # initial position
is_terminated = False # bool flag for judging whether the agent arrive at the terminal or not
update_env(S, episode, step_counter)
while not is_terminated:
A = choose_action(S, q_table) #选择动作
S_, R = get_any_feedback(S, A)
q_predict = q_table.loc[S, A] # q_predict对应更新前的q-value
if S_ != 'terminal':
q_target = R + LAMBDA * q_table.iloc[S_, :]. max()
else:
q_target = R # 到达终点,没有下一个table了
is_terminated = True # 标志位置真
q_table.loc[S, A] += ALPHA * (q_target - q_predict) # update q_table
S = S_
update_env(S, episode, step_counter+1)
step_counter += 1
return q_table
if __name__ == "__main__":
q_table = RL()
print('\r\nQ-table:\n')
print(q_table)
CliffWalking-v0
1 Q-learning实现
import numpy as np
import pandas as pd
import time
import gym
class QLearningAgent:
def __init__(self, n_state, n_action, learning_rate, max_steps, epsilon, discounted_factor):
self.EPSILON = epsilon # epsilon greedy
self.ALPHA = learning_rate # learning rate
self.GAMMA = discounted_factor
self.MAX_STEPS = max_steps
# self.Q_table = np.zeros((n_state, n_action))
self.Q_table = pd.DataFrame(
np.zeros((n_state, n_action)) # q_table initial values as 0
)
print(self.Q_table)
def choose_action(self, observation):
state_action = self.Q_table.iloc[observation, :]
if (np.random.uniform() > 1-self.EPSILON) or (state_action.all() == 0):
action = np.random.choice(env.action_space.n)
else:
action = state_action.idxmax()
return action
def learn(self, state, observation, action, r, done):
q_predict = self.Q_table.loc[state, action]
if done:
q_target = r
else:
q_target = r + self.GAMMA * self.Q_table.iloc[observation, :].max()
self.Q_table.loc[state, action] += self.ALPHA * (q_target - q_predict)
def train(self, episode):
state = env.reset()
step_counter = 0
total_reward = 0
while True:
action = self.choose_action(state)
observation, reward, done, info = env.step(action)
self.learn(state, observation, action, reward, done)
step_counter += 1
state = observation
total_reward += reward
if done or step_counter == self.MAX_STEPS:
break
return step_counter, total_reward
def test(self):
state = env.reset()
step_counter = 0
total_reward = 0
while True:
env.render()
action = self.choose_action(state)
observation, reward, done, info = env.step(action)
step_counter += 1
state = observation
total_reward += reward
if done:
break
env.close()
return step_counter, total_reward
env = gym.make("CliffWalking-v0")
np.random.seed(2)
if __name__ == "__main__":
agent = QLearningAgent(
n_state=env.observation_space.n,
n_action=env.action_space.n,
learning_rate=0.1,
max_steps=500,
epsilon=0.1,
discounted_factor=0.95
)
print("\rStart training...\n")
time.sleep(1)
for episode_counter in range(1000):
ep_steps, ep_reward = agent.train(episode_counter)
print("\rEpisode: %d Total reward: %.1f Steps: %d" % (episode_counter, ep_reward, ep_steps))
print("\rTraining Completed!!!\n")
test_step, test_reward = agent.test()
print("\rTest Completed.\n")
print("\rTest reward: %.1f Steps: %d" % (test_reward, test_step))
print("\r\nQ_table :\n")
print(agent.Q_table)
2 Sarsa实现
class SarsaAgent:
def __init__(self, learning_rate, e_greedy, gamma, n_actions, n_states, max_steps):
self.N_ACTIONS = n_actions
self.N_STATES = n_states
self.ALPHA = learning_rate
self.EPSILON = e_greedy
self.SarsaTable = np.zeros((n_states, n_actions))
self.GAMMA = gamma
self.MAX_STEPS = max_steps
# print(self.SarsaTable)
def choose_action(self, observation):
state_action = self.SarsaTable[observation, :]
# state_action = self.SarsaTable.iloc[observation, :]
if (np.random.uniform(0, 1) < self.EPSILON) or (state_action.all() == 0.0):
action = np.random.choice(env.action_space.n)
else:
maxS = np.max(state_action)
action_list = np.where(state_action == maxS)[0]
action = np.random.choice(action_list)
return action
def learn(self, cur_obs, cur_act, reward, next_obs, next_act, done):
sarsa_predict = self.SarsaTable[cur_obs, cur_act]
if done:
sarsa_target = reward
else:
sarsa_target = reward + self.GAMMA * self.SarsaTable[next_obs, next_act]
self.SarsaTable[cur_obs, cur_act] += self.ALPHA * (sarsa_target - sarsa_predict)
def train(self):
cur_obs = env.reset()
cur_act = self.choose_action(cur_obs)
step_counter = 0
total_reward = 0
while True:
next_obs, reward, done, info = env.step(cur_act)
next_act = self.choose_action(next_obs)
self.learn(cur_obs, cur_act, reward, next_obs, next_act, done)
cur_act = next_act
cur_obs = next_obs
step_counter += 1
total_reward += reward
if done or step_counter == self.MAX_STEPS:
break
return total_reward, step_counter
def test(self):
cur_obs = env.reset()
cur_act = self.choose_action(cur_obs)
step_counter = 0
total_reward = 0
while True:
env.render()
next_obs, reward, done, info = env.step(cur_act)
next_act = self.choose_action(next_obs)
cur_act = next_act
cur_obs = next_obs
step_counter += 1
total_reward += reward
if done or step_counter == self.MAX_STEPS:
break
return total_reward, step_counter
np.random.seed(1)
env = gym.make("CliffWalking-v0")
if __name__ == '__main__':
agent = SarsaAgent(
learning_rate=0.05,
e_greedy=0.1,
gamma=0.9,
n_actions=env.action_space.n,
n_states=env.observation_space.n,
max_steps=500
)
print("Start to train...")
for episode in range(500):
ep_reward, ep_steps = agent.train()
print("Episode: %d Total reward: %.1f Steps: %d\n" %(episode, ep_reward, ep_steps))
print("Training Completed!!!")
print("\rSarsaTable:\n")
print(agent.SarsaTable)
print("\nStart to train...\n")
test_reward, test_steps = agent.test()
print("Test Result:\n Test Reward: %.1f\n Test Steps: %d\n" %(test_reward, test_steps))
Q-learning和Sarsa的区别:前者采用贪心策略,用下一状态的最大的Q-value对应的动作的奖励来对Q表格进行更新,即更新时的数据为$(R, Q(s_t, a_t), max Q(s_{t+1},a_{t+1}))$, 而更新后用于更新的$max Q(s_{t+1},a_{t+1})$对应的动作不一定就是下一状态虚所选择的动作;而后者则是采用保守的方法,选用$(R, Q(s_t, a_t), Q(s_{t+1},a_{t+1}))$来对价值函数进行更新,其中$a_{t+1}$就是Agent在下一状态所选择的动作。