policy_gradient(reinforce)


  • 伪代码:

  • agent.py
import random
import torch
from torch.utils.tensorboard import SummaryWriter
from model import PolicyGradientAgent
import gym


class config:
    def __init__(self, lr, discounted_factor, hidden_dim, batch_size, seed):
        # self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.device = torch.device("cpu")
        self.lr = lr
        self.gamma = discounted_factor
        self.hidden_dim = hidden_dim
        self.batch_size = batch_size
        if seed != 0:
            env.seed(seed)
            random.seed(seed)
        self.train_eps = 320
        self.test_eps = 30
        self.train_steps = 400
        self.test_steps = 500
        self.state_dim = env.observation_space.shape[0]


def train(config, agent):
    ma_reward = 0
    reward_tra = []
    state_tra = []
    action_tra = []
    print("--------------开始训练...---------------")
    for eps in range(config.train_eps):
        state = env.reset()
        train_steps = 0
        total_reward = 0
        while True:
            action = agent.choose_action(state)
            next_state, reward, done, _ = env.step(action)
            train_steps += 1
            if done:
                reward = 0
            reward_tra.append(reward)
            state_tra.append(state)
            action_tra.append(float(action))
            state = next_state
            total_reward += reward
            if done or train_steps == config.train_steps:
                break
        # 需要一定数量的Trajectory才对网络进行更新
        if eps > 0 and eps % agent.batch_size == 0:
            agent.update(reward_tra, action_tra, state_tra)
            reward_tra = []
            action_tra = []
            state_tra = []
        if ma_reward == 0:
            ma_reward = total_reward
        else:
            ma_reward = 0.9 * ma_reward + total_reward * 0.1
        eps_reward = total_reward
        writer.add_scalar("episode_train_reward", eps_reward, eps)
        writer.add_scalar("episode_train_reward", ma_reward, eps)
        print("Episode: {}  Reward: {}  Total_steps: {}".format(eps, eps_reward, train_steps))
    print("--------------训练完成!!!---------------")


def test(config, agent):
    ma_reward = 0
    print("--------------开始测试...---------------")
    for eps in range(config.test_eps):
        total_reward = 0
        with torch.no_grad():
            state = env.reset()
            test_steps = 0
            while True:
                env.render()
                action = agent.choose_action(state)
                next_state, reward, done, _ = env.step(action)
                test_steps += 1
                total_reward += reward
                if done:
                    reward = 0
                state = next_state
                if done or test_steps == config.test_steps:
                    break
        if ma_reward == 0:
            ma_reward = total_reward
        else:
            ma_reward = 0.9 * ma_reward + total_reward * 0.1
        eps_reward = total_reward
        writer.add_scalar("episode_test_reward", eps_reward, eps)
        writer.add_scalar("episode_test_reward", ma_reward, eps)
        print("Episode: {}  Reward: {}  Total_steps: {}".format(eps, eps_reward, test_steps))
    print("--------------测试完成!!!---------------")
    env.close()


writer = SummaryWriter("logs")
env = gym.make("CartPole-v1")
if __name__ == "__main__":
    cfg = config(lr=0.001,
                 discounted_factor=0.99,
                 hidden_dim=128,
                 batch_size=8,
                 seed=1)
    agent = PolicyGradientAgent(cfg)
    train(cfg, agent)
    agent.save()
    cfg = config(lr=0.001,
                 discounted_factor=0.99,
                 hidden_dim=128,
                 batch_size=8,
                 seed=10)
    agent = PolicyGradientAgent(cfg)
    agent.load()
    test(cfg, agent)
  • model.py
import torch
from torch import nn
import torch.nn.functional as F
import numpy as np
from torch.autograd import Variable
from torch.distributions import Bernoulli

class PolicyGradient(nn.Module):
    def __init__(self, state_dim, hidden_dim):
        super(PolicyGradient, self).__init__()
        self.linear1 = nn.Linear(state_dim, hidden_dim)
        self.linear2 = nn.Linear(hidden_dim, hidden_dim)
        self.linear3 = nn.Linear(hidden_dim, 1)

    def forward(self, x):
        x = F.relu(self.linear1(x))
        x = F.relu(self.linear2(x))
        return torch.sigmoid(self.linear3(x))


class PolicyGradientAgent:
    def __init__(self, cfg):
        self.state_dim = cfg.state_dim
        self.hidden_dim = cfg.hidden_dim
        self.device = cfg.device
        self.batch_size = cfg.batch_size
        self.policy_net = PolicyGradient(self.state_dim, self.hidden_dim).to(self.device)
        self.lr = cfg.lr        # 学习率
        self.gamma = cfg.gamma  # 折扣因子
        self.optimizer = torch.optim.RMSprop(self.policy_net.parameters(), lr=self.lr)  # 优化器

    def choose_action(self, state):
        state = torch.from_numpy(state).float()
        state = Variable(state)     # 转为Variable才能求梯度
        # state = torch.tensor(state, device=self.device)     # 转为tensor
        probs = self.policy_net(state)
        m = Bernoulli(probs)    # 伯努利分布
        action = m.sample()     # 按概率进行采样
        action = action.data.numpy().astype(int)[0]  # 转化为int类型
        return action

    def update(self, reward_tra, action_tra, state_tra):
        tmp = 0
        for i in reversed(range(len(reward_tra))):
            if reward_tra[i] == 0:
                tmp = 0
            else:
                tmp = tmp * self.gamma + reward_tra[i]
                reward_tra[i] = tmp
                # tmp *= self.gamma
                # reward_tra[i] += tmp

        reward_mean = np.mean(reward_tra)   # 计算期望值
        reward_std = np.std(reward_tra)     # 计算标准差
        # 归一化
        for i in range(len(reward_tra)):
            reward_tra[i] = (reward_tra[i] - reward_mean) / reward_std

        self.optimizer.zero_grad()  # 清空梯度
        for i in range(len(reward_tra)):
            state = Variable(torch.from_numpy(state_tra[i]).float()).to(self.device)
            probs = self.policy_net(state).to(self.device)
            m = Bernoulli(probs)
            action = Variable(torch.FloatTensor([action_tra[i]])).to(self.device)
            loss = -m.log_prob(action) * reward_tra[i]
            loss.backward()
        self.optimizer.step()

    def save(self):
        torch.save(self.policy_net.state_dict(), "pg_net.pth")

    def load(self):
        self.policy_net.load_state_dict(torch.load("pg_net.pth"))

文章作者: Vyron Su
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Vyron Su !