
Policy Gradient와 Actor-Critic
(수정: 2026년 1월 3일 오전 04:13)
Policy Gradient와 Actor-Critic
DQN 계열 알고리즘은 가치 함수를 학습하여 간접적으로 정책을 도출합니다. 반면 Policy Gradient는 정책을 직접 학습합니다. 이 글에서는 Policy Gradient의 원리와 이를 개선한 Actor-Critic 방법을 다룹니다.
1. Value-Based vs Policy-Based
Value-Based (DQN 계열)
상태 → Q(s,a) 학습 → argmax로 행동 선택
- 이산 행동 공간에 적합
- 결정론적 정책
Policy-Based
상태 → π(a|s) 직접 학습 → 확률적으로 행동 선택
- 연속 행동 공간 가능
- 확률적 정책 (탐험 내장)
- 고차원 행동 공간 처리 가능
언제 Policy-Based를 사용하는가?
| 상황 | 적합한 방법 |
|---|---|
| 이산 행동, 작은 공간 | Value-Based (DQN) |
| 연속 행동 (로봇 제어) | Policy-Based |
| 고차원 행동 공간 | Policy-Based |
| 확률적 정책이 최적일 때 | Policy-Based |
2. Policy Gradient 정리 (Policy Gradient Theorem)
목표 함수
정책 π_θ의 성능을 측정하는 목표 함수:
여기서 τ는 궤적(trajectory): s_0, a_0, r_0, s_1, a_1, r_1, ...
Policy Gradient Theorem
- G_t: 시점 t부터의 누적 보상 (return)
- ∇log π: 정책의 로그 확률의 그래디언트
직관적 이해
- 좋은 결과를 낸 행동 (G_t > 0): 해당 행동의 확률 증가
- 나쁜 결과를 낸 행동 (G_t < 0): 해당 행동의 확률 감소
3. REINFORCE 알고리즘
가장 기본적인 Policy Gradient 알고리즘입니다.
알고리즘
1. 정책 π_θ로 에피소드 전체 수집: τ = (s_0, a_0, r_0, ..., s_T) 2. 각 시점의 return 계산: G_t = Σ γ^k * r_{t+k} 3. 정책 업데이트: θ ← θ + α * Σ ∇log π_θ(a_t|s_t) * G_t 4. 반복
PyTorch 구현
import torch import torch.nn as nn import torch.optim as optim import torch.nn.functional as F from torch.distributions import Categorical import gymnasium as gym import numpy as np class PolicyNetwork(nn.Module): def __init__(self, state_dim, action_dim): super().__init__() self.network = nn.Sequential( nn.Linear(state_dim, 128), nn.ReLU(), nn.Linear(128, 128), nn.ReLU(), nn.Linear(128, action_dim) ) def forward(self, x): logits = self.network(x) return F.softmax(logits, dim=-1) class REINFORCEAgent: def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99): self.gamma = gamma self.policy = PolicyNetwork(state_dim, action_dim) self.optimizer = optim.Adam(self.policy.parameters(), lr=lr) # 에피소드 기록 self.log_probs = [] self.rewards = [] def select_action(self, state): state_tensor = torch.FloatTensor(state).unsqueeze(0) probs = self.policy(state_tensor) dist = Categorical(probs) action = dist.sample() # 로그 확률 저장 (나중에 업데이트용) self.log_probs.append(dist.log_prob(action)) return action.item() def store_reward(self, reward): self.rewards.append(reward) def update(self): # Return 계산 returns = [] G = 0 for r in reversed(self.rewards): G = r + self.gamma * G returns.insert(0, G) returns = torch.tensor(returns) # 정규화 (분산 감소) returns = (returns - returns.mean()) / (returns.std() + 1e-8) # Policy Gradient 손실 policy_loss = [] for log_prob, G in zip(self.log_probs, returns): policy_loss.append(-log_prob * G) loss = torch.stack(policy_loss).sum() self.optimizer.zero_grad() loss.backward() self.optimizer.step() # 기록 초기화 self.log_probs = [] self.rewards = [] return loss.item() def train_reinforce(env_name='CartPole-v1', episodes=1000): env = gym.make(env_name) state_dim = env.observation_space.shape[0] action_dim = env.action_space.n agent = REINFORCEAgent(state_dim, action_dim) episode_rewards = [] for episode in range(episodes): state, _ = env.reset() total_reward = 0 while True: action = agent.select_action(state) next_state, reward, terminated, truncated, _ = env.step(action) done = terminated or truncated agent.store_reward(reward) total_reward += reward state = next_state if done: break # 에피소드 종료 후 업데이트 agent.update() episode_rewards.append(total_reward) if episode % 100 == 0: avg_reward = np.mean(episode_rewards[-100:]) print(f"Episode {episode}, Avg Reward: {avg_reward:.1f}") env.close() return episode_rewards if __name__ == "__main__": rewards = train_reinforce()
REINFORCE의 문제점
- 높은 분산: 에피소드마다 return이 크게 다름
- 샘플 비효율성: 에피소드가 끝나야 업데이트 가능
- credit assignment 어려움: 어떤 행동이 보상에 기여했는지 불분명
4. Baseline을 사용한 분산 감소
아이디어
Return 대신 Advantage 사용:
이론적으로 baseline V(s)를 빼도 그래디언트의 기대값은 변하지 않지만, 분산은 크게 줄어듭니다.
REINFORCE with Baseline
class REINFORCEWithBaseline: def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99): self.gamma = gamma # 정책 네트워크 self.policy = PolicyNetwork(state_dim, action_dim) # 가치 네트워크 (Baseline) self.value = nn.Sequential( nn.Linear(state_dim, 128), nn.ReLU(), nn.Linear(128, 1) ) self.policy_optimizer = optim.Adam(self.policy.parameters(), lr=lr) self.value_optimizer = optim.Adam(self.value.parameters(), lr=lr) self.log_probs = [] self.values = [] self.rewards = [] def select_action(self, state): state_tensor = torch.FloatTensor(state).unsqueeze(0) # 정책에서 행동 샘플링 probs = self.policy(state_tensor) dist = Categorical(probs) action = dist.sample() # 가치 추정 value = self.value(state_tensor) self.log_probs.append(dist.log_prob(action)) self.values.append(value) return action.item() def update(self): # Return 계산 returns = [] G = 0 for r in reversed(self.rewards): G = r + self.gamma * G returns.insert(0, G) returns = torch.tensor(returns) values = torch.cat(self.values).squeeze() # Advantage = Return - Value (Baseline) advantages = returns - values.detach() # Policy 손실 policy_loss = [] for log_prob, advantage in zip(self.log_probs, advantages): policy_loss.append(-log_prob * advantage) policy_loss = torch.stack(policy_loss).sum() # Value 손실 (MSE) value_loss = F.mse_loss(values, returns) # 업데이트 self.policy_optimizer.zero_grad() policy_loss.backward() self.policy_optimizer.step() self.value_optimizer.zero_grad() value_loss.backward() self.value_optimizer.step() # 초기화 self.log_probs = [] self.values = [] self.rewards = []
5. Actor-Critic
구조
- Actor (정책 네트워크): 행동 선택
- Critic (가치 네트워크): 행동 평가
상태 ↓ ┌────┴────┐ ↓ ↓ Actor Critic ↓ ↓ π(a|s) V(s) ↓ ↓ 행동 TD 오차
TD 오차 (Temporal Difference Error)
매 스텝마다 업데이트 가능:
이 TD 오차가 Advantage의 추정치가 됩니다.
구현
class ActorCritic(nn.Module): def __init__(self, state_dim, action_dim): super().__init__() # 공유 레이어 self.shared = nn.Sequential( nn.Linear(state_dim, 128), nn.ReLU() ) # Actor head self.actor = nn.Sequential( nn.Linear(128, 128), nn.ReLU(), nn.Linear(128, action_dim) ) # Critic head self.critic = nn.Sequential( nn.Linear(128, 128), nn.ReLU(), nn.Linear(128, 1) ) def forward(self, x): shared_out = self.shared(x) policy_logits = self.actor(shared_out) value = self.critic(shared_out) return F.softmax(policy_logits, dim=-1), value class A2CAgent: """Advantage Actor-Critic (A2C)""" def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99, value_coef=0.5, entropy_coef=0.01): self.gamma = gamma self.value_coef = value_coef self.entropy_coef = entropy_coef self.model = ActorCritic(state_dim, action_dim) self.optimizer = optim.Adam(self.model.parameters(), lr=lr) def select_action(self, state): state_tensor = torch.FloatTensor(state).unsqueeze(0) probs, value = self.model(state_tensor) dist = Categorical(probs) action = dist.sample() return action.item(), dist.log_prob(action), value, dist.entropy() def compute_returns(self, rewards, dones, next_value): """GAE 없이 간단한 return 계산""" returns = [] R = next_value for r, done in zip(reversed(rewards), reversed(dones)): R = r + self.gamma * R * (1 - done) returns.insert(0, R) return torch.tensor(returns) def update(self, log_probs, values, rewards, dones, next_value, entropies): returns = self.compute_returns(rewards, dones, next_value) values = torch.cat(values).squeeze() log_probs = torch.stack(log_probs) entropies = torch.stack(entropies) # Advantage advantages = returns - values.detach() # Actor 손실 actor_loss = -(log_probs * advantages).mean() # Critic 손실 critic_loss = F.mse_loss(values, returns) # 엔트로피 보너스 (탐험 촉진) entropy_loss = -entropies.mean() # 총 손실 total_loss = (actor_loss + self.value_coef * critic_loss + self.entropy_coef * entropy_loss) self.optimizer.zero_grad() total_loss.backward() torch.nn.utils.clip_grad_norm_(self.model.parameters(), 0.5) self.optimizer.step() return actor_loss.item(), critic_loss.item() def train_a2c(env_name='CartPole-v1', episodes=1000, n_steps=5): """n-step A2C 학습""" env = gym.make(env_name) state_dim = env.observation_space.shape[0] action_dim = env.action_space.n agent = A2CAgent(state_dim, action_dim) episode_rewards = [] state, _ = env.reset() episode_reward = 0 for step in range(episodes * 200): # 총 스텝 수 log_probs = [] values = [] rewards = [] dones = [] entropies = [] # n-step 수집 for _ in range(n_steps): action, log_prob, value, entropy = agent.select_action(state) next_state, reward, terminated, truncated, _ = env.step(action) done = terminated or truncated log_probs.append(log_prob) values.append(value) rewards.append(reward) dones.append(float(done)) entropies.append(entropy) episode_reward += reward state = next_state if done: episode_rewards.append(episode_reward) if len(episode_rewards) % 100 == 0: avg = np.mean(episode_rewards[-100:]) print(f"Episode {len(episode_rewards)}, Avg: {avg:.1f}") episode_reward = 0 state, _ = env.reset() # 다음 상태의 가치 with torch.no_grad(): _, next_value = agent.model(torch.FloatTensor(state).unsqueeze(0)) next_value = next_value.item() * (1 - dones[-1]) # 업데이트 agent.update(log_probs, values, rewards, dones, next_value, entropies) env.close() return episode_rewards
6. A3C (Asynchronous Advantage Actor-Critic)
A3C는 여러 환경을 병렬로 실행하여 샘플 효율성과 학습 속도를 높입니다.
구조
Global Network ↑ ┌────────────┼────────────┐ ↓ ↓ ↓ Worker 1 Worker 2 Worker 3 ↓ ↓ ↓ Env 1 Env 2 Env 3
핵심 특징
- 비동기 업데이트: 각 워커가 독립적으로 그래디언트 계산
- 글로벌 네트워크: 모든 워커가 공유
- 다양한 경험: 병렬 환경으로 다양한 상태 탐험
import torch.multiprocessing as mp def worker(global_model, optimizer, rank, env_name): """개별 워커 프로세스""" env = gym.make(env_name) local_model = ActorCritic(...) while True: # 글로벌 모델 동기화 local_model.load_state_dict(global_model.state_dict()) # 경험 수집 states, actions, rewards, dones = collect_experience(env, local_model) # 손실 계산 loss = compute_loss(local_model, states, actions, rewards, dones) # 그래디언트 계산 및 글로벌 모델 업데이트 optimizer.zero_grad() loss.backward() # 로컬 그래디언트를 글로벌 모델에 적용 for local_param, global_param in zip(local_model.parameters(), global_model.parameters()): global_param.grad = local_param.grad optimizer.step() def train_a3c(num_workers=4): global_model = ActorCritic(...) global_model.share_memory() # 프로세스 간 공유 optimizer = optim.Adam(global_model.parameters()) processes = [] for rank in range(num_workers): p = mp.Process(target=worker, args=(global_model, optimizer, rank, 'CartPole-v1')) p.start() processes.append(p) for p in processes: p.join()
7. 연속 행동 공간
로봇 제어 등 연속 행동 공간에서는 가우시안 정책을 사용합니다.
가우시안 정책
class ContinuousPolicy(nn.Module): def __init__(self, state_dim, action_dim): super().__init__() self.network = nn.Sequential( nn.Linear(state_dim, 128), nn.ReLU(), nn.Linear(128, 128), nn.ReLU() ) # 평균과 로그 표준편차 출력 self.mean = nn.Linear(128, action_dim) self.log_std = nn.Parameter(torch.zeros(action_dim)) def forward(self, state): features = self.network(state) mean = self.mean(features) std = self.log_std.exp() return mean, std def sample(self, state): mean, std = self(state) dist = torch.distributions.Normal(mean, std) action = dist.sample() # 행동 범위 제한 (예: -1 ~ 1) action = torch.tanh(action) log_prob = dist.log_prob(action).sum(dim=-1) return action, log_prob def evaluate(self, state, action): mean, std = self(state) dist = torch.distributions.Normal(mean, std) log_prob = dist.log_prob(action).sum(dim=-1) entropy = dist.entropy().sum(dim=-1) return log_prob, entropy
8. GAE (Generalized Advantage Estimation)
Advantage 추정의 bias-variance 트레이드오프를 조절합니다.
공식
- λ = 0: 1-step TD (low variance, high bias)
- λ = 1: Monte Carlo (high variance, low bias)
- 보통 λ = 0.95 사용
구현
def compute_gae(rewards, values, dones, next_value, gamma=0.99, lam=0.95): """Generalized Advantage Estimation 계산""" advantages = [] gae = 0 # 마지막 값 추가 values = values + [next_value] for t in reversed(range(len(rewards))): delta = rewards[t] + gamma * values[t + 1] * (1 - dones[t]) - values[t] gae = delta + gamma * lam * (1 - dones[t]) * gae advantages.insert(0, gae) returns = [adv + val for adv, val in zip(advantages, values[:-1])] return advantages, returns
9. 핵심 비교
| 알고리즘 | 특징 | 장점 | 단점 |
|---|---|---|---|
| REINFORCE | 에피소드 단위 업데이트 | 단순함 | 높은 분산 |
| REINFORCE + Baseline | V(s) baseline 사용 | 분산 감소 | 여전히 에피소드 단위 |
| Actor-Critic | 매 스텝 업데이트 | 빠른 학습 | 구현 복잡 |
| A2C | 동기 병렬 | 안정적 | 리소스 필요 |
| A3C | 비동기 병렬 | 빠름 | 구현 복잡 |
Quiz
Policy Gradient에서 baseline을 사용하는 이유는?