import random
import torch
from torch.utils.tensorboard import SummaryWriter
from model import PolicyGradientAgent
import gym
class config:
def __init__(self, lr, discounted_factor, hidden_dim, batch_size, seed):
self.device = torch.device("cpu")
self.lr = lr
self.gamma = discounted_factor
self.hidden_dim = hidden_dim
self.batch_size = batch_size
if seed != 0:
env.seed(seed)
random.seed(seed)
self.train_eps = 320
self.test_eps = 30
self.train_steps = 400
self.test_steps = 500
self.state_dim = env.observation_space.shape[0]
def train(config, agent):
ma_reward = 0
reward_tra = []
state_tra = []
action_tra = []
print("--------------开始训练...---------------")
for eps in range(config.train_eps):
state = env.reset()
train_steps = 0
total_reward = 0
while True:
action = agent.choose_action(state)
next_state, reward, done, _ = env.step(action)
train_steps += 1
if done:
reward = 0
reward_tra.append(reward)
state_tra.append(state)
action_tra.append(float(action))
state = next_state
total_reward += reward
if done or train_steps == config.train_steps:
break
if eps > 0 and eps % agent.batch_size == 0:
agent.update(reward_tra, action_tra, state_tra)
reward_tra = []
action_tra = []
state_tra = []
if ma_reward == 0:
ma_reward = total_reward
else:
ma_reward = 0.9 * ma_reward + total_reward * 0.1
eps_reward = total_reward
writer.add_scalar("episode_train_reward", eps_reward, eps)
writer.add_scalar("episode_train_reward", ma_reward, eps)
print("Episode: {} Reward: {} Total_steps: {}".format(eps, eps_reward, train_steps))
print("--------------训练完成!!!---------------")
def test(config, agent):
ma_reward = 0
print("--------------开始测试...---------------")
for eps in range(config.test_eps):
total_reward = 0
with torch.no_grad():
state = env.reset()
test_steps = 0
while True:
env.render()
action = agent.choose_action(state)
next_state, reward, done, _ = env.step(action)
test_steps += 1
total_reward += reward
if done:
reward = 0
state = next_state
if done or test_steps == config.test_steps:
break
if ma_reward == 0:
ma_reward = total_reward
else:
ma_reward = 0.9 * ma_reward + total_reward * 0.1
eps_reward = total_reward
writer.add_scalar("episode_test_reward", eps_reward, eps)
writer.add_scalar("episode_test_reward", ma_reward, eps)
print("Episode: {} Reward: {} Total_steps: {}".format(eps, eps_reward, test_steps))
print("--------------测试完成!!!---------------")
env.close()
writer = SummaryWriter("logs")
env = gym.make("CartPole-v1")
if __name__ == "__main__":
cfg = config(lr=0.001,
discounted_factor=0.99,
hidden_dim=128,
batch_size=8,
seed=1)
agent = PolicyGradientAgent(cfg)
train(cfg, agent)
agent.save()
cfg = config(lr=0.001,
discounted_factor=0.99,
hidden_dim=128,
batch_size=8,
seed=10)
agent = PolicyGradientAgent(cfg)
agent.load()
test(cfg, agent)
import torch
from torch import nn
import torch.nn.functional as F
import numpy as np
from torch.autograd import Variable
from torch.distributions import Bernoulli
class PolicyGradient(nn.Module):
def __init__(self, state_dim, hidden_dim):
super(PolicyGradient, self).__init__()
self.linear1 = nn.Linear(state_dim, hidden_dim)
self.linear2 = nn.Linear(hidden_dim, hidden_dim)
self.linear3 = nn.Linear(hidden_dim, 1)
def forward(self, x):
x = F.relu(self.linear1(x))
x = F.relu(self.linear2(x))
return torch.sigmoid(self.linear3(x))
class PolicyGradientAgent:
def __init__(self, cfg):
self.state_dim = cfg.state_dim
self.hidden_dim = cfg.hidden_dim
self.device = cfg.device
self.batch_size = cfg.batch_size
self.policy_net = PolicyGradient(self.state_dim, self.hidden_dim).to(self.device)
self.lr = cfg.lr
self.gamma = cfg.gamma
self.optimizer = torch.optim.RMSprop(self.policy_net.parameters(), lr=self.lr)
def choose_action(self, state):
state = torch.from_numpy(state).float()
state = Variable(state)
probs = self.policy_net(state)
m = Bernoulli(probs)
action = m.sample()
action = action.data.numpy().astype(int)[0]
return action
def update(self, reward_tra, action_tra, state_tra):
tmp = 0
for i in reversed(range(len(reward_tra))):
if reward_tra[i] == 0:
tmp = 0
else:
tmp = tmp * self.gamma + reward_tra[i]
reward_tra[i] = tmp
reward_mean = np.mean(reward_tra)
reward_std = np.std(reward_tra)
for i in range(len(reward_tra)):
reward_tra[i] = (reward_tra[i] - reward_mean) / reward_std
self.optimizer.zero_grad()
for i in range(len(reward_tra)):
state = Variable(torch.from_numpy(state_tra[i]).float()).to(self.device)
probs = self.policy_net(state).to(self.device)
m = Bernoulli(probs)
action = Variable(torch.FloatTensor([action_tra[i]])).to(self.device)
loss = -m.log_prob(action) * reward_tra[i]
loss.backward()
self.optimizer.step()
def save(self):
torch.save(self.policy_net.state_dict(), "pg_net.pth")
def load(self):
self.policy_net.load_state_dict(torch.load("pg_net.pth"))