Python强化学习中的Off-Policy与On-Policy算法:DQN与PPO的采样效率对比

Python强化学习中的Off-Policy与On-Policy算法:DQN与PPO的采样效率对比

大家好,今天我们来探讨强化学习中两个重要的概念:Off-Policy学习和On-Policy学习,并以DQN(Deep Q-Network)和PPO(Proximal Policy Optimization)为例,深入比较它们的采样效率。采样效率是衡量强化学习算法优劣的重要指标,尤其是在样本获取成本高昂的环境中。理解这两种策略类型及其采样效率差异,对于我们在实际应用中选择合适的算法至关重要。

1. 强化学习基础与Policy的含义

在深入探讨Off-Policy和On-Policy之前,我们先简单回顾一下强化学习的基础概念。强化学习的目标是训练一个智能体(Agent)在某个环境(Environment)中做出最优决策,以最大化累积奖励(Cumulative Reward)。智能体通过与环境交互,观察状态(State),执行动作(Action),并获得奖励(Reward)。

Policy 是强化学习的核心概念之一。Policy定义了在给定状态下,智能体应该采取的动作的概率分布。它可以是确定性的(Deterministic Policy),即在每个状态下选择一个固定的动作;也可以是随机性的(Stochastic Policy),即在每个状态下,根据概率分布选择动作。

数学上,Policy可以表示为:

  • Deterministic Policy: π(s) = a (在状态s下,选择动作a)
  • Stochastic Policy: π(a|s) = P(A = a | S = s) (在状态s下,选择动作a的概率)

强化学习的目标就是找到一个最优的Policy,记为π*,它能够最大化智能体的累积奖励。

2. On-Policy与Off-Policy学习

On-Policy学习和Off-Policy学习是强化学习中两种不同的学习范式,其主要区别在于生成数据的Policy学习的Policy 是否相同。

  • On-Policy Learning: 智能体使用当前Policy(即正在学习的Policy)与环境交互,生成样本数据,并用这些数据来改进 同一个 Policy。换句话说,生成数据的Policy和学习的Policy是相同的。

  • Off-Policy Learning: 智能体使用一个Policy(称为行为Policy,Behavior Policy)与环境交互,生成样本数据,然后使用这些数据来学习 另一个 Policy(称为目标Policy,Target Policy)。行为Policy和目标Policy可以不同。

简单来说,On-Policy学习是“自己走,自己学”,而Off-Policy学习是“别人走,我来学”。

对比表格:

特性 On-Policy Learning Off-Policy Learning
数据来源 当前Policy (Target Policy) 行为Policy (Behavior Policy)
Policy关系 生成数据的Policy = 学习的Policy 生成数据的Policy ≠ 学习的Policy
例子 PPO, A2C, Sarsa DQN, DDPG
优点 通常更稳定,方差较小 可以利用历史数据和探索性策略
缺点 探索性可能受限,采样效率可能较低 容易受到数据分布差异的影响

3. DQN:Off-Policy学习的代表

DQN是一种经典的Off-Policy强化学习算法,它结合了Q-Learning和深度神经网络。 DQN使用经验回放(Experience Replay)机制存储智能体与环境交互产生的样本数据,然后从经验回放缓冲区中随机采样数据来更新Q函数。

DQN的核心思想:

DQN的目标是学习一个最优的Q函数,Q(s, a),它表示在状态s下采取动作a所能获得的期望累积奖励。DQN通过迭代更新Q函数来逼近最优Q函数。

DQN的算法流程:

  1. 初始化: 初始化Q网络参数 θ,经验回放缓冲区 D。
  2. 循环:
    • 选择动作: 使用 ε-greedy策略选择动作 a:以概率 ε 随机选择动作,以概率 1-ε 选择 Q(s, a; θ) 值最大的动作。
    • 执行动作: 在环境中执行动作 a,获得奖励 r 和下一个状态 s’。
    • 存储经验: 将经验 (s, a, r, s’) 存储到经验回放缓冲区 D 中。
    • 采样: 从经验回放缓冲区 D 中随机采样一批经验 (si, ai, ri, s’i)。
    • 计算目标Q值: yi = ri + γ maxa’ Q(s’i, a’; θ) (θ 是目标Q网络的参数,定期从 θ 更新)。
    • 更新Q网络: 通过最小化损失函数 L(θ) = E[(yi – Q(si, ai; θ))2] 来更新Q网络参数 θ。
    • 更新目标Q网络: 定期将Q网络参数 θ 复制到目标Q网络参数 θ

DQN的优势:

  • 稳定性: 经验回放机制打破了样本之间的相关性,提高了学习的稳定性。
  • 利用历史数据: 能够利用智能体过去积累的经验,提高学习效率。

DQN的劣势:

  • 采样效率较低: 需要大量的样本数据才能训练出一个好的Q函数。 由于使用epsilon-greedy策略,很多采样是探索性的,并没有直接用于学习最优策略。
  • 容易过估计: Q函数容易过估计,导致性能下降。
  • 不适合连续动作空间: DQN只能处理离散动作空间,无法直接应用于连续动作空间。

DQN的Python代码示例 (简化版,使用PyTorch):

import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np

# 定义Q网络
class QNetwork(nn.Module):
    def __init__(self, state_size, action_size):
        super(QNetwork, self).__init__()
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_size)

    def forward(self, state):
        x = torch.relu(self.fc1(state))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

# 定义经验回放缓冲区
class ReplayBuffer:
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, state, action, reward, next_state, done):
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = (state, action, reward, next_state, done)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

# DQN Agent
class DQNAgent:
    def __init__(self, state_size, action_size, learning_rate=1e-3, gamma=0.99, epsilon_start=1.0, epsilon_end=0.01, epsilon_decay=0.995, buffer_size=10000, batch_size=32):
        self.state_size = state_size
        self.action_size = action_size
        self.learning_rate = learning_rate
        self.gamma = gamma
        self.epsilon = epsilon_start
        self.epsilon_start = epsilon_start
        self.epsilon_end = epsilon_end
        self.epsilon_decay = epsilon_decay
        self.buffer_size = buffer_size
        self.batch_size = batch_size

        self.q_network = QNetwork(state_size, action_size)
        self.target_network = QNetwork(state_size, action_size)
        self.target_network.load_state_dict(self.q_network.state_dict())  # 初始时同步参数
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=self.learning_rate)
        self.replay_buffer = ReplayBuffer(buffer_size)

    def select_action(self, state):
        if random.random() < self.epsilon:
            return random.randrange(self.action_size)
        else:
            state = torch.FloatTensor(state)
            q_values = self.q_network(state)
            return np.argmax(q_values.detach().numpy())

    def update_model(self):
        if len(self.replay_buffer) < self.batch_size:
            return

        batch = self.replay_buffer.sample(self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        states = torch.FloatTensor(states)
        actions = torch.LongTensor(actions)
        rewards = torch.FloatTensor(rewards)
        next_states = torch.FloatTensor(next_states)
        dones = torch.FloatTensor(dones)

        q_values = self.q_network(states).gather(1, actions.unsqueeze(1)).squeeze(1)
        next_q_values = self.target_network(next_states).max(1)[0]
        expected_q_values = rewards + self.gamma * next_q_values * (1 - dones)

        loss = nn.MSELoss()(q_values, expected_q_values)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # 软更新目标网络
        for target_param, param in zip(self.target_network.parameters(), self.q_network.parameters()):
            target_param.data.copy_(0.005 * param.data + (1 - 0.005) * target_param.data)

    def update_epsilon(self):
        self.epsilon = max(self.epsilon_end, self.epsilon_decay * self.epsilon)

# 训练循环 (伪代码,需要根据具体环境进行修改)
# env = YourEnvironment() # 你的环境
# state_size = env.observation_space.shape[0]
# action_size = env.action_space.n
# agent = DQNAgent(state_size, action_size)

# num_episodes = 1000
# for episode in range(num_episodes):
#     state = env.reset()
#     done = False
#     while not done:
#         action = agent.select_action(state)
#         next_state, reward, done, _ = env.step(action)
#         agent.replay_buffer.push(state, action, reward, next_state, done)
#         agent.update_model()
#         agent.update_epsilon()
#         state = next_state
#     print(f"Episode: {episode}, Epsilon: {agent.epsilon}")

4. PPO:On-Policy学习的代表

PPO是一种先进的On-Policy强化学习算法,它在Trust Region Policy Optimization (TRPO) 的基础上进行了简化,更容易实现和调整。 PPO通过限制新Policy和旧Policy之间的差异,来保证学习的稳定性,避免Policy更新过大而导致性能下降。

PPO的核心思想:

PPO的目标是找到一个Policy,它能够在最大化期望累积奖励的同时,与之前的Policy保持一定的相似性。PPO通过引入一个Clipping机制来限制Policy的更新幅度。

PPO的算法流程:

  1. 收集数据: 使用当前Policy πθ 与环境交互,收集一批经验 (st, at, rt, s’t)。
  2. 计算优势函数: 使用收集到的数据,计算每个状态-动作对的优势函数 At。优势函数表示在状态 st 下采取动作 at 相对于平均水平的优势。
  3. 计算Policy Ratio: 计算新Policy πθ’ 与旧Policy πθ 的概率比率: rt(θ’) = πθ’(at|st) / πθ(at|st)。
  4. 定义目标函数: 定义PPO的目标函数:
    L(θ’) = Et[min(rt(θ’)At, clip(rt(θ’), 1-ε, 1+ε)At)]
    其中,clip(rt(θ’), 1-ε, 1+ε) 表示将 rt(θ’) 限制在 [1-ε, 1+ε] 范围内,ε 是一个超参数,用于控制Policy的更新幅度。
  5. 更新Policy: 使用梯度上升法最大化目标函数 L(θ’),更新Policy参数 θ’。
  6. 重复步骤1-5,直到Policy收敛。

PPO的优势:

  • 采样效率较高: PPO是On-Policy算法,它使用当前Policy生成的数据来更新Policy,因此采样效率较高。 虽然每次迭代只能使用当前策略采集的数据,但是这些数据都用于改进当前策略,避免了DQN中大量探索性采样带来的浪费。
  • 稳定性: Clipping机制限制了Policy的更新幅度,保证了学习的稳定性。
  • 易于实现和调整: PPO相对TRPO更简单,更容易实现和调整。
  • 适用于连续动作空间: 可以直接应用于连续动作空间。

PPO的劣势:

  • 对超参数敏感: PPO的性能对超参数的选择比较敏感,需要仔细调整。
  • 可能陷入局部最优: 由于Policy更新幅度受限,PPO可能陷入局部最优。

PPO的Python代码示例 (简化版,使用PyTorch):

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.distributions import Categorical

# 定义Actor网络 (Policy网络)
class Actor(nn.Module):
    def __init__(self, state_size, action_size):
        super(Actor, self).__init__()
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_size)

    def forward(self, state):
        x = torch.relu(self.fc1(state))
        x = torch.relu(self.fc2(x))
        action_probs = torch.softmax(self.fc3(x), dim=-1)  # 输出动作概率
        return action_probs

# 定义Critic网络 (价值网络)
class Critic(nn.Module):
    def __init__(self, state_size):
        super(Critic, self).__init__()
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, 1)

    def forward(self, state):
        x = torch.relu(self.fc1(state))
        x = torch.relu(self.fc2(x))
        value = self.fc3(x)
        return value

# PPO Agent
class PPOAgent:
    def __init__(self, state_size, action_size, learning_rate=1e-4, gamma=0.99, clip_epsilon=0.2, update_epochs=4, batch_size=64):
        self.state_size = state_size
        self.action_size = action_size
        self.learning_rate = learning_rate
        self.gamma = gamma
        self.clip_epsilon = clip_epsilon
        self.update_epochs = update_epochs
        self.batch_size = batch_size

        self.actor = Actor(state_size, action_size)
        self.critic = Critic(state_size)
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=self.learning_rate)
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=self.learning_rate)

    def select_action(self, state):
        state = torch.FloatTensor(state)
        action_probs = self.actor(state)
        dist = Categorical(action_probs)  # 创建分类分布
        action = dist.sample()  # 从分布中采样动作
        return action.item(), dist.log_prob(action)  # 返回动作和动作的log概率

    def compute_advantage(self, rewards, values, dones):
        advantages = torch.zeros_like(rewards)
        advantage = 0
        for t in reversed(range(len(rewards))):
            delta = rewards[t] + self.gamma * values[t+1] * (1 - dones[t]) - values[t]
            advantage = delta + self.gamma * 0.95 * (1 - dones[t]) * advantage  # GAE
            advantages[t] = advantage
        return advantages

    def update_model(self, states, actions, log_probs, rewards, next_states, dones):
        values = self.critic(torch.FloatTensor(states)).squeeze()
        next_values = self.critic(torch.FloatTensor(next_states)).squeeze()
        advantages = self.compute_advantage(torch.FloatTensor(rewards), torch.cat([values, next_values[-1:]]), torch.FloatTensor(dones))
        returns = advantages + values

        states = torch.FloatTensor(states)
        actions = torch.LongTensor(actions)
        old_log_probs = torch.FloatTensor(log_probs)
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8) # Normalize advantages

        # PPO Update Loop
        for _ in range(self.update_epochs):
            # Mini-batch GD
            for i in range(0, len(states), self.batch_size):
                batch_states = states[i:i+self.batch_size]
                batch_actions = actions[i:i+self.batch_size]
                batch_old_log_probs = old_log_probs[i:i+self.batch_size]
                batch_advantages = advantages[i:i+self.batch_size]
                batch_returns = returns[i:i+self.batch_size]

                # Actor Update
                action_probs = self.actor(batch_states)
                dist = Categorical(action_probs)
                new_log_probs = dist.log_prob(batch_actions)
                ratio = torch.exp(new_log_probs - batch_old_log_probs)
                surr1 = ratio * batch_advantages
                surr2 = torch.clamp(ratio, 1 - self.clip_epsilon, 1 + self.clip_epsilon) * batch_advantages
                actor_loss = -torch.min(surr1, surr2).mean()

                self.actor_optimizer.zero_grad()
                actor_loss.backward()
                self.actor_optimizer.step()

                # Critic Update
                critic_loss = nn.MSELoss()(self.critic(batch_states).squeeze(), batch_returns)
                self.critic_optimizer.zero_grad()
                critic_loss.backward()
                self.critic_optimizer.step()

# 训练循环 (伪代码,需要根据具体环境进行修改)
# env = YourEnvironment() # 你的环境
# state_size = env.observation_space.shape[0]
# action_size = env.action_space.n
# agent = PPOAgent(state_size, action_size)

# num_episodes = 1000
# for episode in range(num_episodes):
#     states, actions, log_probs, rewards, next_states, dones = [], [], [], [], [], []
#     state = env.reset()
#     done = False
#     while not done:
#         action, log_prob = agent.select_action(state)
#         next_state, reward, done, _ = env.step(action)

#         states.append(state)
#         actions.append(action)
#         log_probs.append(log_prob.item())
#         rewards.append(reward)
#         next_states.append(next_state)
#         dones.append(done)

#         state = next_state

#     agent.update_model(states, actions, log_probs, rewards, next_states, dones)
#     print(f"Episode: {episode}")

5. DQN与PPO的采样效率对比

DQN和PPO在采样效率方面存在显著差异。DQN作为Off-Policy算法,虽然可以利用历史数据,但其探索性策略(如ε-greedy)导致大量采样并未直接用于学习最优策略。此外,经验回放缓冲区中的数据分布可能与当前Policy存在差异,这也会降低采样效率。

PPO作为On-Policy算法,使用当前Policy生成的数据来更新Policy,能够更有效地利用采样数据。虽然每次迭代只能使用当前策略采集的数据,但是这些数据都用于改进当前策略。PPO的Clipping机制也限制了Policy的更新幅度,避免了因Policy更新过大而导致性能下降的情况。

对比表格:

特性 DQN (Off-Policy) PPO (On-Policy)
采样效率 较低 较高
数据利用率 较低,大量探索性采样 较高,采样数据直接用于Policy改进
数据分布 经验回放缓冲区可能与当前Policy存在差异 数据分布与当前Policy一致
适用场景 样本获取成本较低,环境变化缓慢 样本获取成本较高,需要快速学习

总的来说,在样本获取成本较高的情况下,PPO通常比DQN更有效率。 但是,如果环境变化缓慢,DQN可以通过经验回放来利用历史数据,从而提高学习效率。

6. 提升采样效率的策略

无论是On-Policy还是Off-Policy算法,都有一些通用的策略可以提升采样效率:

  • 优先经验回放 (Prioritized Experience Replay): 对于DQN,可以优先采样那些TD误差较大的样本,因为这些样本包含更多的信息,更有助于学习。
  • 多步学习 (Multi-step Learning): 使用n步奖励来更新Q函数或Policy,可以减少方差,提高学习效率。
  • 模仿学习 (Imitation Learning): 利用专家数据来初始化Policy,可以加速学习过程。
  • 课程学习 (Curriculum Learning): 从简单到复杂地训练智能体,可以提高学习的稳定性和效率。
  • 奖励塑造 (Reward Shaping): 设计合适的奖励函数,可以引导智能体更快地学习到最优策略。

7. 总结:选择合适的算法,提升学习效率

DQN和PPO分别代表了Off-Policy和On-Policy强化学习的两种范式。DQN能够利用历史数据,但采样效率较低;PPO采样效率较高,但对超参数敏感。 在实际应用中,我们需要根据具体环境和任务的特点,选择合适的算法。 此外,还可以通过一些通用的策略来提升采样效率,提高强化学习算法的性能。 理解Off-Policy和On-Policy的差异,以及它们各自的优缺点,对于我们在强化学习领域取得成功至关重要。

更多IT精英技术系列讲座,到智猿学院

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注