Simple Example of Q-learning


Q-learning

思路

  • Value function:

  • Q-learning求解最优策略的方法就是直接选Q-value最大的动作,即

    则有

    将$V^*(s) = \max Q^\pi(s,a)$带入动作价值函数中,有:

我们希望式子(1)能够近似成立,即目标函数为:

因而Q-table的更新可以转换为regression问题,进而可以按下式进行更新:

注意:训练前期Agent被期望尽可能地在未知环境中进行探索以更新Q-table,因而设定一个$\epsilon$-greedy机制,并采用random()获得随机数,当所获得的随机数大于$\epsilon$时任选动作,当小于$\epsilon$时则选择Q-value中最大值对应的动作。$\epsilon$的值应该随着训练的进行而变小,前期需要尽可能地探索(Exploation),而后期则更注重利用(Eploitation).

  • 探索机制两种实现方法
    • $\epsilon-greedy$
    • Boltzmann Exporation

算法流程图:

实例代码:

import numpy as np
import pandas as pd
import time

np.random.seed(2)               # reproducible,产生伪随机数

N_STATES = 6                    # state space
ACTIONS = ['left', 'right']     # action space
EPSILON = 0.9                   # greedy police
ALPHA = 0.1                     # learning rate
LAMBDA = 0.9                    # discount rate for reward
MAX_EPISODES = 30               # maximum episodes, 玩的回合数
FRESH_TIME = 0.3                # 每个Episode结束后休息的时长

def build_q_table(n_states, actions):           # 创建Q-table
    table = pd.DataFrame(
        np.zeros((n_states, len(actions))),     # q_table initial values as 0
        columns=actions,                        # 列标签为action
    )
    # print(table)   # show table
    return table


#build_q_table(N_STATES, ACTIONS)


def choose_action(state, q_table):
    state_actions = q_table.iloc[state, :]          # 将q_table中某一行赋值到state_actions中
    if(np.random.uniform() > EPSILON) or (state_actions.all() == 0):    
    # action-greedy or state-action(判断是否为第一步)
        action_name = np.random.choice(ACTIONS)
    else:
        action_name = state_actions.idxmax()    # 取Q-table中当前行中的大者对应的标签
    return action_name      # 返回标签名,即动作名left or right


def get_any_feedback(S, A):
    # This is how agent will interact with the environment
    if A == 'right':
        if S == N_STATES - 2:       # 当前到了终点前一格,如果还是往右就是终点
            S_ = 'terminal'
            R = 1
        else:                       # 位置加1
            S_ = S + 1
            R = 0
    else:
        R = 0
        if S == 0:                   # 在第一格,不能再往左
            S_ = S
        else:                       # 位置减1
            S_ = S - 1
    return S_, R


def update_env(S, episode, step_counter):   # 环境更新,env_step
    env_list = ['-']*(N_STATES-1) + ['T']
    if S == 'terminal':
        interaction = 'Episode %s: total_steps = %s' % (episode+1, step_counter)
        print('\r{}'. format(interaction), end='')
        time.sleep(2)
        print('\r                            ', end='')
    else:
        env_list[S] = 'o'
        interaction = ''.join(env_list)
        print('\r{}'.format(interaction), end='')
        time.sleep(FRESH_TIME)


def RL():
    q_table = build_q_table(N_STATES, ACTIONS)  # build q table
    # train
    for episode in range(MAX_EPISODES): 
        step_counter = 0
        S = 0                                   # initial position
        is_terminated = False                   # bool flag for judging whether the agent arrive at the terminal or not
        update_env(S, episode, step_counter)
        while not is_terminated:
            A = choose_action(S, q_table)   #选择动作
            S_, R = get_any_feedback(S, A)
            q_predict = q_table.loc[S, A]   # q_predict对应更新前的q-value
            if S_ != 'terminal':
                q_target = R + LAMBDA * q_table.iloc[S_, :]. max()   
            else:
                q_target = R                        # 到达终点,没有下一个table了
                is_terminated = True                # 标志位置真

            q_table.loc[S, A] += ALPHA * (q_target - q_predict)      # update q_table
            S = S_

            update_env(S, episode, step_counter+1)
            step_counter += 1
    return q_table


if __name__ == "__main__":
    q_table = RL()
    print('\r\nQ-table:\n')
    print(q_table)

CliffWalking-v0

1 Q-learning实现

import numpy as np
import pandas as pd
import time
import gym

class QLearningAgent:
    def __init__(self,  n_state, n_action, learning_rate, max_steps, epsilon, discounted_factor):
        self.EPSILON = epsilon      # epsilon greedy
        self.ALPHA = learning_rate       # learning rate
        self.GAMMA = discounted_factor
        self.MAX_STEPS = max_steps
        # self.Q_table = np.zeros((n_state, n_action))
        self.Q_table = pd.DataFrame(
            np.zeros((n_state, n_action))  # q_table initial values as 0
        )
        print(self.Q_table)

    def choose_action(self, observation):
        state_action = self.Q_table.iloc[observation, :]
        if (np.random.uniform() > 1-self.EPSILON) or (state_action.all() == 0):
            action = np.random.choice(env.action_space.n)
        else:
            action = state_action.idxmax()
        return action

    def learn(self, state, observation, action, r, done):
        q_predict = self.Q_table.loc[state, action]
        if done:
            q_target = r
        else:
            q_target = r + self.GAMMA * self.Q_table.iloc[observation, :].max()
        self.Q_table.loc[state, action] += self.ALPHA * (q_target - q_predict)

    def train(self, episode):
        state = env.reset()
        step_counter = 0
        total_reward = 0
        while True:
            action = self.choose_action(state)
            observation, reward, done, info = env.step(action)
            self.learn(state, observation, action, reward, done)
            step_counter += 1
            state = observation
            total_reward += reward
            if done or step_counter == self.MAX_STEPS:
                break
        return step_counter, total_reward

    def test(self):
        state = env.reset()
        step_counter = 0
        total_reward = 0
        while True:
            env.render()
            action = self.choose_action(state)
            observation, reward, done, info = env.step(action)
            step_counter += 1
            state = observation
            total_reward += reward
            if done:
                break
        env.close()
        return step_counter, total_reward


env = gym.make("CliffWalking-v0")
np.random.seed(2)

if __name__ == "__main__":
    agent = QLearningAgent(
        n_state=env.observation_space.n,
        n_action=env.action_space.n,
        learning_rate=0.1,
        max_steps=500,
        epsilon=0.1,
        discounted_factor=0.95
    )
    print("\rStart training...\n")
    time.sleep(1)
    for episode_counter in range(1000):
        ep_steps, ep_reward = agent.train(episode_counter)
        print("\rEpisode: %d     Total reward: %.1f     Steps: %d" % (episode_counter, ep_reward, ep_steps))

    print("\rTraining Completed!!!\n")
    test_step, test_reward = agent.test()
    print("\rTest Completed.\n")
    print("\rTest reward: %.1f     Steps: %d" % (test_reward, test_step))
    print("\r\nQ_table :\n")
    print(agent.Q_table)

2 Sarsa实现

class SarsaAgent:

    def __init__(self,  learning_rate, e_greedy, gamma, n_actions, n_states, max_steps):
        self.N_ACTIONS = n_actions
        self.N_STATES = n_states
        self.ALPHA = learning_rate
        self.EPSILON = e_greedy
        self.SarsaTable = np.zeros((n_states, n_actions))
        self.GAMMA = gamma
        self.MAX_STEPS = max_steps
        # print(self.SarsaTable)

    def choose_action(self, observation):
        state_action = self.SarsaTable[observation, :]
        # state_action = self.SarsaTable.iloc[observation, :]
        if (np.random.uniform(0, 1) < self.EPSILON) or (state_action.all() == 0.0):
            action = np.random.choice(env.action_space.n)
        else:
            maxS = np.max(state_action)
            action_list = np.where(state_action == maxS)[0]
            action = np.random.choice(action_list)
        return action

    def learn(self, cur_obs, cur_act, reward, next_obs, next_act, done):
        sarsa_predict = self.SarsaTable[cur_obs, cur_act]
        if done:
            sarsa_target = reward
        else:
            sarsa_target = reward + self.GAMMA * self.SarsaTable[next_obs, next_act]
        self.SarsaTable[cur_obs, cur_act] += self.ALPHA * (sarsa_target - sarsa_predict)

    def train(self):
        cur_obs = env.reset()
        cur_act = self.choose_action(cur_obs)
        step_counter = 0
        total_reward = 0
        while True:
            next_obs, reward, done, info = env.step(cur_act)
            next_act = self.choose_action(next_obs)
            self.learn(cur_obs, cur_act, reward, next_obs, next_act, done)
            cur_act = next_act
            cur_obs = next_obs
            step_counter += 1
            total_reward += reward
            if done or step_counter == self.MAX_STEPS:
                break
        return total_reward, step_counter

    def test(self):
        cur_obs = env.reset()
        cur_act = self.choose_action(cur_obs)
        step_counter = 0
        total_reward = 0
        while True:
            env.render()
            next_obs, reward, done, info  = env.step(cur_act)
            next_act = self.choose_action(next_obs)
            cur_act = next_act
            cur_obs = next_obs
            step_counter += 1
            total_reward += reward
            if done or step_counter == self.MAX_STEPS:
                break
        return total_reward, step_counter


np.random.seed(1)
env = gym.make("CliffWalking-v0")

if __name__ == '__main__':
    agent = SarsaAgent(
        learning_rate=0.05,
        e_greedy=0.1,
        gamma=0.9,
        n_actions=env.action_space.n,
        n_states=env.observation_space.n,
        max_steps=500
    )

    print("Start to train...")
    for episode in range(500):
        ep_reward, ep_steps = agent.train()
        print("Episode:  %d   Total reward:  %.1f   Steps: %d\n" %(episode, ep_reward, ep_steps))

    print("Training Completed!!!")
    print("\rSarsaTable:\n")
    print(agent.SarsaTable)
    print("\nStart to train...\n")

    test_reward, test_steps = agent.test()
    print("Test Result:\n     Test Reward:  %.1f\n     Test Steps: %d\n" %(test_reward, test_steps))

Q-learning和Sarsa的区别:前者采用贪心策略,用下一状态的最大的Q-value对应的动作的奖励来对Q表格进行更新,即更新时的数据为$(R, Q(s_t, a_t), max Q(s_{t+1},a_{t+1}))$, 而更新后用于更新的$max Q(s_{t+1},a_{t+1})$对应的动作不一定就是下一状态虚所选择的动作;而后者则是采用保守的方法,选用$(R, Q(s_t, a_t), Q(s_{t+1},a_{t+1}))$来对价值函数进行更新,其中$a_{t+1}$就是Agent在下一状态所选择的动作。


文章作者: Vyron Su
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Vyron Su !