REINFORCE算法
REINFORCE算法是由Ronald J. Williams在1992年的论文《联结主义强化学习的简单统计梯度跟踪算法》(Simple Statistical Gradient-Following Algorithms for Connectionist Reinforcement Learning)中提出的基于策略的算法。
REINFORCE算法的想法很自然:在学习过程中,产生高收益的行动应该多做些,导致坏结果的行动应该少做点。我们希望成功的学习结果会让策略函数收敛到能在环境中表现最好的情况。策略函数中动作概率的更新是由策略梯度实现的,因此REINFORCE被称为策略梯度算法。
通过这个想法,我们即将设计的算法会有三个主要组件:
- 一个策略函数,里面有一些参数,这些参数在学习过程中将要被更新;
- 一个要最大化的目标,能给策略提供即时评价;
- 一个参数更新方法。
from torch.distributions import Categorical
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
gamma = 0.99
class Pi(nn.Module):
def __init__(self, in_dim, out_dim):
super(Pi, self).__init__()
layers = [
nn.Linear(in_dim, 64),
nn.ReLU(),
nn.Linear(64, out_dim),
]
self.model = nn.Sequential(*layers)
self.onpolicy_reset()
self.train() # set training mode
def onpolicy_reset(self):
self.log_probs = []
self.rewards = []
def forward(self, x):
pdparam = self.model(x)
return pdparam
def act(self, state):
x = torch.from_numpy(state.astype(np.float32)) # to tensor
pdparam = self.forward(x) # forward pass
pd = Categorical(logits=pdparam) # probability distribution
action = pd.sample() # pi(a|s) in action via pd
log_prob = pd.log_prob(action) # log_prob of pi(a|s)
self.log_probs.append(log_prob) # store for training
return action.item()
def train(pi, optimizer):
# Inner gradient-ascent loop of REINFORCE algorithm
T = len(pi.rewards)
rets = np.empty(T, dtype=np.float32) # the returns
future_ret = 0.0
# compute the returns efficiently
for t in reversed(range(T)):
future_ret = pi.rewards[t] + gamma * future_ret
rets[t] = future_ret
rets = torch.tensor(rets)
log_probs = torch.stack(pi.log_probs)
loss = - log_probs * rets # gradient term; Negative for maximizing
loss = torch.sum(loss)
optimizer.zero_grad()
loss.backward() # backpropagate, compute gradients
optimizer.step() # gradient-ascent, update the weights
return loss
def main():
env = gym.make('CartPole-v0')
in_dim = env.observation_space.shape[0] # 4
out_dim = env.action_space.n # 2
pi = Pi(in_dim, out_dim) # policy pi_theta for REINFORCE
optimizer = optim.Adam(pi.parameters(), lr=0.01)
for epi in range(300):
state = env.reset()
for t in range(200): # cartpole max timestep is 200
action = pi.act(state)
state, reward, done, _ = env.step(action)
pi.rewards.append(reward)
env.render()
if done:
break
loss = train(pi, optimizer) # train per episode
total_reward = sum(pi.rewards)
solved = total_reward > 195.0
pi.onpolicy_reset() # onpolicy: clear memory after training
print(f'Episode {epi}, loss: {loss}, \
total_reward: {total_reward}, solved: {solved}')
if __name__ == '__main__':
main()