dqn


  • 伪代码:

  • Model.py:

import torch
from torch import nn
import torch.nn.functional as F
import numpy as np
import random
from torch.nn import MSELoss
import math

# 定义DQN的网络结构
class network(nn.Module):
    # 双隐含层的全连接网络
    def __init__(self, state_dim, hidden_dim, action_dim):
        super(network, self).__init__()
        self.linear1 = nn.Linear(state_dim, hidden_dim)
        self.linear2 = nn.Linear(hidden_dim, hidden_dim)
        self.linear3 = nn.Linear(hidden_dim, action_dim)

    def forward(self, x):
        x = F.relu(self.linear1(x)) # 非线性采用ReLU
        x = F.relu(self.linear2(x))
        return self.linear3(x)


# 定义缓冲区类,实际上是一个循环队列结构
class ReplayBuffer:
    def __init__(self, capacity):
        self.position = 0           # 位置指针
        self.capacity = capacity    # 缓冲区容量
        self.buffer = []            # 缓冲区

    # 将数据(s_t, a_t, r_t, s_{t+1}, done)放入缓冲区
    def push(self, state, action, reward, next_state, done):
        if len(self.buffer) < self.capacity:    # 相当于申请内存
            self.buffer.append(None)
        self.buffer[self.position] = (state, action, reward, next_state, done)  # 插入数据
        self.position = (self.position + 1) % self.capacity     

    # 采样函数,在缓冲区中随机抽取一个batch的数据喂给神经网络
    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = zip(*batch)   # 解压
        return state, action, reward, next_state, done

    def __len__(self):
        return len(self.buffer)


class DQN_Agent:
    def __init__(self, config):
        self.frame_idx = 0
        # epsilon为随训练次数增加而减小的变量
        self.epsilon = lambda frame_idx: config.epsilon_min + \
                                         (config.epsilon_max - config.epsilon_min) * \
                                         math.exp(-1. * frame_idx / config.epsilon_decay)
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # 选取设备
        self.action_dim = config.action_dim
        self.state_dim = config.state_dim
        self.hidden_dim = config.hidden_dim
        self.memory = ReplayBuffer(config.buffer_capacity)  # 声明缓冲区
        self.batch_size = config.batch_size        
        self.learning_rate = config.learning_rate   # 学习率
        self.gamma = config.gamma       # 折扣因子
        self.target_net = network(self.state_dim, 
                                  self.hidden_dim, 
                                  self.action_dim).to(self.device)     # 声明target Q-net
        self.policy_net = network(self.state_dim,
                                  self.hidden_dim, 
                                  self.action_dim).to(self.device)     # 声明policy Q-net
        for target_param, param in zip(self.target_net.parameters(),self.policy_net.parameters()): 
            target_param.data.copy_(param.data)     # 复制参数到目标网路targe_net
        self.optimizer = torch.optim.Adam(self.policy_net.parameters(),
                                          lr=self.learning_rate)

    def update(self):
        # 当前缓冲区中数据过少,不进行参数更新
        if len(self.memory) < self.batch_size:
            return

        state_batch, action_batch, reward_batch, next_state_batch, done_batch = self.memory.sample(self.batch_size)
        # 将batch转为tensor
        state_batch = torch.tensor(state_batch, device=self.device, dtype=torch.float)
        action_batch = torch.tensor(action_batch, device=self.device).unsqueeze(1)
        reward_batch = torch.tensor(reward_batch, device=self.device, dtype=torch.float)
        next_state_batch = torch.tensor(next_state_batch, device=self.device, dtype=torch.float)
        done_batch = torch.tensor(np.float32(done_batch), device=self.device)
        # dim=1 : 按列找action_batch对应的q_value
        q_val = self.policy_net(state_batch).gather(dim=1, index=action_batch)
        # .max(1):找每个动作中的q-val最大值,返回的是一个tensor,通过下标0访问该行,detach():使之无梯度
        next_q_val = self.target_net(next_state_batch).max(1)[0].detach()
        q_predict = q_val
        q_target = reward_batch + self.gamma * next_q_val * (1 - done_batch)
        loss_fn = MSELoss()
        self.optimizer.zero_grad()  # 清空梯度
        loss = loss_fn(q_predict, q_target.unsqueeze(1))    # 计算损失
        loss.backward()             # 损失反向传播
        self.optimizer.step()       # 优化器更新网络参数

    def choose_action(self, state):
        self.frame_idx += 1
        if random.random() < self.epsilon(self.frame_idx):
            action = random.randrange(self.action_dim)
        else:
            with torch.no_grad():   
                state = torch.tensor([state], device=self.device, dtype=torch.float32)
                state_action = self.policy_net(state)
                action = state_action.max(1)[1].item()      
        return action

    def save(self, root):
        torch.save(self.target_net.state_dict(), root+"target_net.pth")

    def load(self, path):
        self.target_net.load_state_dict(torch.load(path+"target_net.pth"))
        self.policy_net.load_state_dict(self.target_net.state_dict())
  • agent.py
import numpy as np
import gym
from model import DQN_Agent
from torch.utils.tensorboard import SummaryWriter
import os
import sys

curr_path = os.path.dirname(os.path.abspath(__file__))  # 当前文件所在绝对路径
parent_path = os.path.dirname(curr_path)  # 父路径
sys.path.append(parent_path)  # 添加路径到系统路径


class Agent_Config:
    def __init__(self, lr, eps_max, eps_min, hidden_dim, gamma, batch_size):
        self.epsilon_max = eps_max       # e-greedy上界
        self.epsilon_min = eps_min       # e-greedy下界
        self.epsilon_decay = 500         # e-greedy下降率
        self.learning_rate = lr          # 学习率
        self.batch_size = batch_size
        self.gamma = gamma               # 折扣因子
        self.action_dim = env.action_space.n
        self.state_dim = env.observation_space.shape[0]
        self.hidden_dim = hidden_dim
        self.buffer_capacity = 100000
        self.train_episode = 300
        self.test_episode = 30
        self.update_freq = 5
        self.model_path = curr_path  # 保存模型的路径


def train(config, agent):
    train_reward = []
    ma_reward = []
    print("-------------训练开始--------------")
    for episode in range(config.train_episode):
        state = env.reset()
        eps_reward = 0
        eps_steps = 0
        while True:
            action = agent.choose_action(state)
            next_state, reward, done, info = env.step(action)
            agent.memory.push(state, action, reward, next_state, done)
            agent.update()
            state = next_state
            eps_reward += reward
            eps_steps += 1
            if done or eps_steps > 200:
                break
        if (episode + 1) % config.update_freq == 0:
            # 更新target Q-net参数
            agent.target_net.load_state_dict(agent.policy_net.state_dict())
        train_reward.append(eps_reward)
        if ma_reward:
            ma_reward.append(ma_reward[- 1] * 0.9 + eps_reward * 0.1)
        else:
            ma_reward.append(eps_reward)
        writer.add_scalar("train_reward", eps_reward, episode)
        writer.add_scalar("train_reward", ma_reward[episode], episode)
        print("训练回合:{}  训练步数:{}  训练奖励:{}".format(episode + 1, eps_steps, eps_reward))
    print("-------------训练结束--------------")
    return train_reward, ma_reward


def test(config, agent):
    test_reward = []
    ma_reward = []
    print("-------------测试开始--------------")
    for episode in range(config.test_episode):
        state = env.reset()
        eps_reward = 0
        eps_steps = 0
        while True:
            env.render()
            action = agent.choose_action(state)
            next_state, reward, done, info = env.step(action)
            state = next_state
            eps_reward += reward
            eps_steps += 1
            if done or eps_steps > 200:
                break
        test_reward.append(eps_reward)
        if ma_reward:
            ma_reward.append(ma_reward[-1] * 0.9 + eps_reward * 0.1)
        else:
            ma_reward.append(eps_reward)
        writer.add_scalar("test_reward", eps_reward, episode)
        writer.add_scalar("test_reward", ma_reward[episode], episode)
        print("测试回合:{}  测试步数:{}  测试奖励:{}".format(episode, eps_steps, eps_reward))
    print("-------------测试结束--------------")
    env.close()
    return test_reward, ma_reward


env_name = "CartPole-v1"
writer = SummaryWriter("logs")
env = gym.make(env_name)
env.seed(2)
np.random.seed(2)
if __name__ == "__main__":
    config = Agent_Config(lr=0.001,
                          eps_max=0.9,
                          eps_min=0.01,
                          hidden_dim=256,
                          gamma=0.95,
                          batch_size=64)
    agent = DQN_Agent(config)
    train_rewards, train_ma_rewards = train(config, agent)
    agent.save(config.model_path)
    agent = DQN_Agent(config)
    agent.load(config.model_path)
    test_rewards, test_ma_rewards = test(config, agent)
    writer.close()

文章作者: Vyron Su
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Vyron Su !