1
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import gymnasium as gym
import radar_wrapper as radar
import numpy as np
import cv2

class ActorCritic(nn.Module):
    LENGTH_RAY = 70
    N_RAYS = 5
    N_ACTIONS = 5
    GAMMA = 0.95

    def __init__(self, device=torch.device('cpu')):
        super(ActorCritic, self).__init__()
        self.fc1 = nn.Linear(self.N_RAYS+1, 128)
        self.fc2 = nn.Linear(128, 128)
        self.policy_head = nn.Linear(128, self.N_ACTIONS)
        self.value_head = nn.Linear(128, 1)
        self.optimizer = optim.Adam(self.parameters(), lr=0.001)
        self.loss_fn = nn.MSELoss()
        self.to(device)
        self.device = device

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        probs = F.softmax(self.policy_head(x), dim=1)
        value = self.value_head(x).squeeze(1)
        return probs, value

    def training_step(self, state, action, reward, state_, done):
        state = torch.tensor(np.array([state]), dtype=torch.float32, device=self.device)
        action = torch.tensor([action], dtype=torch.int64, device=self.device)
        reward = torch.tensor([reward], dtype=torch.float32, device=self.device)
        state_ = torch.tensor(np.array([state_]), dtype=torch.float32, device=self.device)

        probs, state_value = self.forward(state)
        dist = torch.distributions.Categorical(probs)
        log_prob = dist.log_prob(action)

        with torch.no_grad():
            _, state_value_ = self.forward(state_)
            target = reward + self.GAMMA * state_value_ * (1 - int(done))

        delta = target - state_value
        entropy = dist.entropy().mean()
        actor_loss = -log_prob * delta.detach() - 0.01 * entropy
        critic_loss = delta**2

        loss = actor_loss + critic_loss

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        return loss.item()

    def train_agent(self, epochs):
        if os.path.exists("mlp_final.pt"):
            print("Loading existing smart brain...")
            self.load_state_dict(torch.load("mlp_final.pt", map_location=self.device))
        else:
            print("Starting from scratch...")

        env = gym.make("CarRacing-v3", render_mode="rgb_array", domain_randomize=False, continuous=False)
        for epoch in range(epochs):
            obs, info = env.reset()
            readings = radar.get_radar_readings(obs, self.N_RAYS, self.LENGTH_RAY) / self.LENGTH_RAY
            raw_speed = env.unwrapped.car.hull.linearVelocity.length
            readings = np.append(readings, raw_speed / 10.0)
            terminated = False
            truncated = False
            total_reward = 0
            offroad_count = 0
            while not (terminated or truncated):
                state_tensor = torch.tensor(np.array([readings]), dtype=torch.float32).to(self.device)
                probs, _ = self.forward(state_tensor)
                dist = torch.distributions.Categorical(probs)
                action = dist.sample().item()

                obs, reward, terminated, truncated, info = env.step(action)
                if all(readings[:-1] < 0.1):
                    offroad_count += 1
                    reward = -1
                    if offroad_count > 20:
                        terminated = True
                else:
                    offroad_count = 0
                next_readings = radar.get_radar_readings(obs, self.N_RAYS, self.LENGTH_RAY) / self.LENGTH_RAY
                raw_speed = env.unwrapped.car.hull.linearVelocity.length
                next_readings = np.append(next_readings, raw_speed / 10.0)
                self.training_step(readings, action, reward, next_readings, terminated)

                readings = next_readings
                total_reward += reward

                if epoch % 20 == 0:
                    print(probs)
                    cv2.imshow("Game", obs)
                    cv2.waitKey(1)

            print(f"Epoch: {epoch} | Reward: {total_reward:.2f}")

            # save game
            if epoch % 20 == 0 and epoch > 0:
                torch.save(self.state_dict(), 'mlp.pt')

        torch.save(self.state_dict(), 'mlp_final.pt')

    def play(self):
        self.load_state_dict(torch.load("mlp_final.pt", map_location=self.device))
        self.eval()
        env = gym.make("CarRacing-v3", render_mode="rgb_array", domain_randomize=False, continuous=False)
        obs, info = env.reset()
        readings = radar.get_radar_readings(obs, self.N_RAYS, self.LENGTH_RAY) / self.LENGTH_RAY
        readings = np.append(readings, env.unwrapped.car.hull.linearVelocity.length / 10.0)
        terminated = False
        truncated = False
        total_reward = 0

        while not (terminated or truncated):
            state_tensor = torch.tensor(np.array([readings]), dtype=torch.float32).to(self.device)
            probs, _ = self.forward(state_tensor)
            dist = torch.distributions.Categorical(probs)
            action = dist.sample().item()
            print(action)

            obs, reward, terminated, truncated, info = env.step(action)
            next_readings = radar.get_radar_readings(obs, self.N_RAYS, self.LENGTH_RAY) / self.LENGTH_RAY
            next_readings = np.append(next_readings, env.unwrapped.car.hull.linearVelocity.length / 10)
            readings = next_readings
            total_reward += reward

            cv2.imshow("Game", obs)
            cv2.waitKey(1)

For immediate assistance, please email our customer support: [email protected]

Download RAW File