伪代码:
Model.py:
import torch
from torch import nn
import torch.nn.functional as F
import numpy as np
import random
from torch.nn import MSELoss
import math
# 定义DQN的网络结构
class network(nn.Module):
# 双隐含层的全连接网络
def __init__(self, state_dim, hidden_dim, action_dim):
super(network, self).__init__()
self.linear1 = nn.Linear(state_dim, hidden_dim)
self.linear2 = nn.Linear(hidden_dim, hidden_dim)
self.linear3 = nn.Linear(hidden_dim, action_dim)
def forward(self, x):
x = F.relu(self.linear1(x)) # 非线性采用ReLU
x = F.relu(self.linear2(x))
return self.linear3(x)
# 定义缓冲区类,实际上是一个循环队列结构
class ReplayBuffer:
def __init__(self, capacity):
self.position = 0 # 位置指针
self.capacity = capacity # 缓冲区容量
self.buffer = [] # 缓冲区
# 将数据(s_t, a_t, r_t, s_{t+1}, done)放入缓冲区
def push(self, state, action, reward, next_state, done):
if len(self.buffer) < self.capacity: # 相当于申请内存
self.buffer.append(None)
self.buffer[self.position] = (state, action, reward, next_state, done) # 插入数据
self.position = (self.position + 1) % self.capacity
# 采样函数,在缓冲区中随机抽取一个batch的数据喂给神经网络
def sample(self, batch_size):
batch = random.sample(self.buffer, batch_size)
state, action, reward, next_state, done = zip(*batch) # 解压
return state, action, reward, next_state, done
def __len__(self):
return len(self.buffer)
class DQN_Agent:
def __init__(self, config):
self.frame_idx = 0
# epsilon为随训练次数增加而减小的变量
self.epsilon = lambda frame_idx: config.epsilon_min + \
(config.epsilon_max - config.epsilon_min) * \
math.exp(-1. * frame_idx / config.epsilon_decay)
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # 选取设备
self.action_dim = config.action_dim
self.state_dim = config.state_dim
self.hidden_dim = config.hidden_dim
self.memory = ReplayBuffer(config.buffer_capacity) # 声明缓冲区
self.batch_size = config.batch_size
self.learning_rate = config.learning_rate # 学习率
self.gamma = config.gamma # 折扣因子
self.target_net = network(self.state_dim,
self.hidden_dim,
self.action_dim).to(self.device) # 声明target Q-net
self.policy_net = network(self.state_dim,
self.hidden_dim,
self.action_dim).to(self.device) # 声明policy Q-net
for target_param, param in zip(self.target_net.parameters(),self.policy_net.parameters()):
target_param.data.copy_(param.data) # 复制参数到目标网路targe_net
self.optimizer = torch.optim.Adam(self.policy_net.parameters(),
lr=self.learning_rate)
def update(self):
# 当前缓冲区中数据过少,不进行参数更新
if len(self.memory) < self.batch_size:
return
state_batch, action_batch, reward_batch, next_state_batch, done_batch = self.memory.sample(self.batch_size)
# 将batch转为tensor
state_batch = torch.tensor(state_batch, device=self.device, dtype=torch.float)
action_batch = torch.tensor(action_batch, device=self.device).unsqueeze(1)
reward_batch = torch.tensor(reward_batch, device=self.device, dtype=torch.float)
next_state_batch = torch.tensor(next_state_batch, device=self.device, dtype=torch.float)
done_batch = torch.tensor(np.float32(done_batch), device=self.device)
# dim=1 : 按列找action_batch对应的q_value
q_val = self.policy_net(state_batch).gather(dim=1, index=action_batch)
# .max(1):找每个动作中的q-val最大值,返回的是一个tensor,通过下标0访问该行,detach():使之无梯度
next_q_val = self.target_net(next_state_batch).max(1)[0].detach()
q_predict = q_val
q_target = reward_batch + self.gamma * next_q_val * (1 - done_batch)
loss_fn = MSELoss()
self.optimizer.zero_grad() # 清空梯度
loss = loss_fn(q_predict, q_target.unsqueeze(1)) # 计算损失
loss.backward() # 损失反向传播
self.optimizer.step() # 优化器更新网络参数
def choose_action(self, state):
self.frame_idx += 1
if random.random() < self.epsilon(self.frame_idx):
action = random.randrange(self.action_dim)
else:
with torch.no_grad():
state = torch.tensor([state], device=self.device, dtype=torch.float32)
state_action = self.policy_net(state)
action = state_action.max(1)[1].item()
return action
def save(self, root):
torch.save(self.target_net.state_dict(), root+"target_net.pth")
def load(self, path):
self.target_net.load_state_dict(torch.load(path+"target_net.pth"))
self.policy_net.load_state_dict(self.target_net.state_dict())
- agent.py
import numpy as np
import gym
from model import DQN_Agent
from torch.utils.tensorboard import SummaryWriter
import os
import sys
curr_path = os.path.dirname(os.path.abspath(__file__)) # 当前文件所在绝对路径
parent_path = os.path.dirname(curr_path) # 父路径
sys.path.append(parent_path) # 添加路径到系统路径
class Agent_Config:
def __init__(self, lr, eps_max, eps_min, hidden_dim, gamma, batch_size):
self.epsilon_max = eps_max # e-greedy上界
self.epsilon_min = eps_min # e-greedy下界
self.epsilon_decay = 500 # e-greedy下降率
self.learning_rate = lr # 学习率
self.batch_size = batch_size
self.gamma = gamma # 折扣因子
self.action_dim = env.action_space.n
self.state_dim = env.observation_space.shape[0]
self.hidden_dim = hidden_dim
self.buffer_capacity = 100000
self.train_episode = 300
self.test_episode = 30
self.update_freq = 5
self.model_path = curr_path # 保存模型的路径
def train(config, agent):
train_reward = []
ma_reward = []
print("-------------训练开始--------------")
for episode in range(config.train_episode):
state = env.reset()
eps_reward = 0
eps_steps = 0
while True:
action = agent.choose_action(state)
next_state, reward, done, info = env.step(action)
agent.memory.push(state, action, reward, next_state, done)
agent.update()
state = next_state
eps_reward += reward
eps_steps += 1
if done or eps_steps > 200:
break
if (episode + 1) % config.update_freq == 0:
# 更新target Q-net参数
agent.target_net.load_state_dict(agent.policy_net.state_dict())
train_reward.append(eps_reward)
if ma_reward:
ma_reward.append(ma_reward[- 1] * 0.9 + eps_reward * 0.1)
else:
ma_reward.append(eps_reward)
writer.add_scalar("train_reward", eps_reward, episode)
writer.add_scalar("train_reward", ma_reward[episode], episode)
print("训练回合:{} 训练步数:{} 训练奖励:{}".format(episode + 1, eps_steps, eps_reward))
print("-------------训练结束--------------")
return train_reward, ma_reward
def test(config, agent):
test_reward = []
ma_reward = []
print("-------------测试开始--------------")
for episode in range(config.test_episode):
state = env.reset()
eps_reward = 0
eps_steps = 0
while True:
env.render()
action = agent.choose_action(state)
next_state, reward, done, info = env.step(action)
state = next_state
eps_reward += reward
eps_steps += 1
if done or eps_steps > 200:
break
test_reward.append(eps_reward)
if ma_reward:
ma_reward.append(ma_reward[-1] * 0.9 + eps_reward * 0.1)
else:
ma_reward.append(eps_reward)
writer.add_scalar("test_reward", eps_reward, episode)
writer.add_scalar("test_reward", ma_reward[episode], episode)
print("测试回合:{} 测试步数:{} 测试奖励:{}".format(episode, eps_steps, eps_reward))
print("-------------测试结束--------------")
env.close()
return test_reward, ma_reward
env_name = "CartPole-v1"
writer = SummaryWriter("logs")
env = gym.make(env_name)
env.seed(2)
np.random.seed(2)
if __name__ == "__main__":
config = Agent_Config(lr=0.001,
eps_max=0.9,
eps_min=0.01,
hidden_dim=256,
gamma=0.95,
batch_size=64)
agent = DQN_Agent(config)
train_rewards, train_ma_rewards = train(config, agent)
agent.save(config.model_path)
agent = DQN_Agent(config)
agent.load(config.model_path)
test_rewards, test_ma_rewards = test(config, agent)
writer.close()