1
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import gymnasium as gym
import radar_wrapper as radar
import numpy as np
import cv2
class ActorCritic(nn.Module):
LENGTH_RAY = 70
N_RAYS = 5
N_ACTIONS = 5
GAMMA = 0.95
def __init__(self, device=torch.device('cpu')):
super(ActorCritic, self).__init__()
self.fc1 = nn.Linear(self.N_RAYS+1, 128)
self.fc2 = nn.Linear(128, 128)
self.policy_head = nn.Linear(128, self.N_ACTIONS)
self.value_head = nn.Linear(128, 1)
self.optimizer = optim.Adam(self.parameters(), lr=0.001)
self.loss_fn = nn.MSELoss()
self.to(device)
self.device = device
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
probs = F.softmax(self.policy_head(x), dim=1)
value = self.value_head(x).squeeze(1)
return probs, value
def training_step(self, state, action, reward, state_, done):
state = torch.tensor(np.array([state]), dtype=torch.float32, device=self.device)
action = torch.tensor([action], dtype=torch.int64, device=self.device)
reward = torch.tensor([reward], dtype=torch.float32, device=self.device)
state_ = torch.tensor(np.array([state_]), dtype=torch.float32, device=self.device)
probs, state_value = self.forward(state)
dist = torch.distributions.Categorical(probs)
log_prob = dist.log_prob(action)
with torch.no_grad():
_, state_value_ = self.forward(state_)
target = reward + self.GAMMA * state_value_ * (1 - int(done))
delta = target - state_value
entropy = dist.entropy().mean()
actor_loss = -log_prob * delta.detach() - 0.01 * entropy
critic_loss = delta**2
loss = actor_loss + critic_loss
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()
def train_agent(self, epochs):
if os.path.exists("mlp_final.pt"):
print("Loading existing smart brain...")
self.load_state_dict(torch.load("mlp_final.pt", map_location=self.device))
else:
print("Starting from scratch...")
env = gym.make("CarRacing-v3", render_mode="rgb_array", domain_randomize=False, continuous=False)
for epoch in range(epochs):
obs, info = env.reset()
readings = radar.get_radar_readings(obs, self.N_RAYS, self.LENGTH_RAY) / self.LENGTH_RAY
raw_speed = env.unwrapped.car.hull.linearVelocity.length
readings = np.append(readings, raw_speed / 10.0)
terminated = False
truncated = False
total_reward = 0
offroad_count = 0
while not (terminated or truncated):
state_tensor = torch.tensor(np.array([readings]), dtype=torch.float32).to(self.device)
probs, _ = self.forward(state_tensor)
dist = torch.distributions.Categorical(probs)
action = dist.sample().item()
obs, reward, terminated, truncated, info = env.step(action)
if all(readings[:-1] < 0.1):
offroad_count += 1
reward = -1
if offroad_count > 20:
terminated = True
else:
offroad_count = 0
next_readings = radar.get_radar_readings(obs, self.N_RAYS, self.LENGTH_RAY) / self.LENGTH_RAY
raw_speed = env.unwrapped.car.hull.linearVelocity.length
next_readings = np.append(next_readings, raw_speed / 10.0)
self.training_step(readings, action, reward, next_readings, terminated)
readings = next_readings
total_reward += reward
if epoch % 20 == 0:
print(probs)
cv2.imshow("Game", obs)
cv2.waitKey(1)
print(f"Epoch: {epoch} | Reward: {total_reward:.2f}")
# save game
if epoch % 20 == 0 and epoch > 0:
torch.save(self.state_dict(), 'mlp.pt')
torch.save(self.state_dict(), 'mlp_final.pt')
def play(self):
self.load_state_dict(torch.load("mlp_final.pt", map_location=self.device))
self.eval()
env = gym.make("CarRacing-v3", render_mode="rgb_array", domain_randomize=False, continuous=False)
obs, info = env.reset()
readings = radar.get_radar_readings(obs, self.N_RAYS, self.LENGTH_RAY) / self.LENGTH_RAY
readings = np.append(readings, env.unwrapped.car.hull.linearVelocity.length / 10.0)
terminated = False
truncated = False
total_reward = 0
while not (terminated or truncated):
state_tensor = torch.tensor(np.array([readings]), dtype=torch.float32).to(self.device)
probs, _ = self.forward(state_tensor)
dist = torch.distributions.Categorical(probs)
action = dist.sample().item()
print(action)
obs, reward, terminated, truncated, info = env.step(action)
next_readings = radar.get_radar_readings(obs, self.N_RAYS, self.LENGTH_RAY) / self.LENGTH_RAY
next_readings = np.append(next_readings, env.unwrapped.car.hull.linearVelocity.length / 10)
readings = next_readings
total_reward += reward
cv2.imshow("Game", obs)
cv2.waitKey(1)
For immediate assistance, please email our customer support: [email protected]