Home Artificial Intelligence Develop Your First AI Agent: Deep Q-Learning Constructing The Reinforcement Learning Gym

Develop Your First AI Agent: Deep Q-Learning Constructing The Reinforcement Learning Gym

0
Develop Your First AI Agent: Deep Q-Learning
Constructing The Reinforcement Learning Gym

1. Initial Setup

Before we start coding our AI agent, it is strongly recommended that you could have a solid understanding of Object Oriented Programming (OOP) principles in Python.

In the event you wouldn’t have Python installed already, below is a straightforward tutorial by Bhargav Bachina to get you began. The version I will likely be using is 3.11.6.

The one dependency you will have is TensorFlow, an open-source machine learning library by Google that we’ll use to construct and train our neural network. This might be installed through pip within the terminal. My version is 2.14.0.

pip install tensorflow

Or if that doesn’t work:

pip3 install tensorflow

You may even need the package NumPy, but this must be included with TensorFlow. In the event you run into issues there, pip install numpy.

Additionally it is advisable that you simply create a brand new file for every class, (e.g., environment.py). This may keep you from being overwhelmed and ease troubleshooting any errors you could run into.

On your reference, here is the GitHub repository with the finished code: https://github.com/HestonCV/rl-gym-from-scratch. Be at liberty to clone, explore, and use it as a reference point!

2. The Big Picture

To essentially understand the concepts slightly than simply copying code, it’s crucial to get a handle on the various parts we’re going to construct and the way they fit together. This manner, each bit can have a spot in the larger picture.

Below is the code for one training loop with 5000 episodes. An episode is basically one complete round of interaction between the agent and the environment, from start to complete.

This mustn’t be implemented or fully understood at this point. As we construct out each part, if you desire to see how a selected class or method will likely be used, refer back to this.

from environment import Environment
from agent import Agent
from experience_replay import ExperienceReplay
import time

if __name__ == '__main__':

grid_size = 5

environment = Environment(grid_size=grid_size, render_on=True)
agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)
# agent.load(f'models/model_{grid_size}.h5')

experience_replay = ExperienceReplay(capability=10000, batch_size=32)

# Variety of episodes to run before training stops
episodes = 5000
# Max variety of steps in each episode
max_steps = 200

for episode in range(episodes):

# Get the initial state of the environment and set done to False
state = environment.reset()

# Loop until the episode finishes
for step in range(max_steps):
print('Episode:', episode)
print('Step:', step)
print('Epsilon:', agent.epsilon)

# Get the motion alternative from the agents policy
motion = agent.get_action(state)

# Take a step within the environment and save the experience
reward, next_state, done = environment.step(motion)
experience_replay.add_experience(state, motion, reward, next_state, done)

# If the experience replay has enough memory to supply a sample, train the agent
if experience_replay.can_provide_sample():
experiences = experience_replay.sample_batch()
agent.learn(experiences)

# Set the state to the next_state
state = next_state

if done:
break
# time.sleep(0.5)

agent.save(f'models/model_{grid_size}.h5')

Each inner loop is taken into account one step.

Diagram: ‘Agent’ sends ‘Action’ to ‘Environment,’ which sends ‘State’ feedback to ‘Neural Network’, which informs agent with ‘Q-Values.’ The cycle is encompassed by ‘Training Loop.’
Training process through Agent-Environment interaction — Image by creator

In each step:

  • The state is retrieved from the environment.
  • The agent chooses an motion based on this state.
  • Environment is acted on, returning the reward, resulting state after taking the motion, and whether the episode is finished.
  • The initial state, motion, reward, next_state, and done are then saved into experience_replay as a kind of long-term memory (experience).
  • The agent is then trained on a random sample of those experiences.

At the top of every episode, or nevertheless often you want to, the model weights are saved to the models folder. These can later be preloaded to maintain from training from scratch every time. The environment is then reset at the beginning of the following episode.

This basic structure is just about all it takes to create an intelligent agent to unravel a big number of problems!

As stated within the introduction, our problem for the agent is sort of easy: get from its initial position in a grid to the designated goal position.

3. The Environment: Initial Foundations

Essentially the most obvious place to begin in developing this technique is the environment.

To have a functioning RL gym, the environment must do a number of things:

  • Maintain the present state of the world.
  • Keep track of the goal and agent.
  • Allow the agent to make changes to the world.
  • Return the state in a form the model can understand.
  • Render it in a way we are able to understand to look at the agent.

This will likely be the place the agent spends its entire life. We’ll define the environment as a straightforward square matrix/2D array, or a listing of lists in Python.

This environment can have a discrete state-space, meaning that the possible states the agent can encounter are distinct and countable. Each state is a separate, specific condition or scenario within the environment, unlike a continuous state space where the states can vary in an infinite, fluid manner — consider chess versus controlling a automotive.

DQL is specifically designed for discrete action-spaces (a finite variety of actions)— that is what we will likely be specializing in. Other methods are used for continuous action-spaces.

Within the grid, empty space will likely be represented by 0s, the agent will likely be represented by a 1, and the goal will likely be represented by a -1. The dimensions of the environment might be whatever you want to, but because the environment grows larger, the set of all possible states (state-space) grows exponentially. This will slow training time significantly.

The grid will look something like this when rendered:

[0, 1, 0, 0, 0]
[0, 0, 0, 0, 0]
[0, 0, 0, 0, 0]
[0, 0, 0, -1, 0]
[0, 0, 0, 0, 0]

Constructing the Environment class and reset method
We’ll begin by implementing the Environment class and a approach to initialize the environment. For now, it should take an integer, grid_size, but we’ll expand on this shortly.

import numpy as np

class Environment:
def __init__(self, grid_size):
self.grid_size = grid_size
self.grid = []

def reset(self):
# Initialize the empty grid as a 2nd list of 0s
self.grid = np.zeros((self.grid_size, self.grid_size))

When a brand new instance is created, Environment saves grid_size and initializes an empty grid.

The reset method populates the grid using np.zeros((self.grid_size, self.grid_size)) , which takes a tuple, shape, and outputs a 2D NumPy array of that shape consisting only of zeros.

A NumPy array is a grid-like data structure that behaves just like a listing in Python, except that it enables us to efficiently store and manipulate numerical data. It allows for vectorized operations, meaning that operations are robotically applied to all elements within the array without the necessity for explicit loops.

This makes computations on large datasets much faster and more efficient compared to straightforward Python lists. Not only that, nevertheless it is the information structure that our agent’s neural network architecture will expect!

Why the name reset? Well, this method will likely be called to reset the environment and can eventually return the initial state of the grid.

Adding the agent and goal
Next, we’ll construct the methods for adding the agent and the goal to the grid.

import random

def add_agent(self):
# Select a random location
location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))

# Agent is represented by a 1
self.grid[location[0]][location[1]] = 1

return location

def add_goal(self):
# Select a random location
location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))

# Get a random location until it isn't occupied
while self.grid[location[0]][location[1]] == 1:
location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))

# Goal is represented by a -1
self.grid[location[0]][location[1]] = -1

return location

The locations for the agent and the goal will likely be represented by a tuple (x, y). Each methods select random values throughout the boundaries of the grid and return the placement. The foremost difference is that add_goal ensures it doesn’t select a location already occupied by the agent.

We place the agent and goal at random starting locations to introduce variability in each episode, which helps the agent learn to navigate the environment from different starting points, slightly than memorizing one route.

Finally, we’ll add a way to render the world within the console to enable us to see the interactions between the agent and environment.

def render(self):
# Convert to a listing of ints to enhance formatting
grid = self.grid.astype(int).tolist()

for row in grid:
print(row)
print('') # So as to add some space between renders for every step

render does three things: casts the weather of self.grid to type int, converts it right into a Python list, and prints each row.

The one reason we don’t print each row from the NumPy array directly is just that it just doesn’t look as nice.

Tying all of it together..

import numpy as np
import random

class Environment:
def __init__(self, grid_size):
self.grid_size = grid_size
self.grid = []

def reset(self):
# Initialize the empty grid as a 2nd array of 0s
self.grid = np.zeros((self.grid_size, self.grid_size))

def add_agent(self):
# Select a random location
location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))

# Agent is represented by a 1
self.grid[location[0]][location[1]] = 1

return location

def add_goal(self):
# Select a random location
location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))

# Get a random location until it isn't occupied
while self.grid[location[0]][location[1]] == 1:
location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))

# Goal is represented by a -1
self.grid[location[0]][location[1]] = -1

return location

def render(self):
# Convert to a listing of ints to enhance formatting
grid = self.grid.astype(int).tolist()

for row in grid:
print(row)
print('') # So as to add some space between renders for every step

# Test Environment
env = Environment(5)
env.reset()
agent_location = env.add_agent()
goal_location = env.add_goal()
env.render()

print(f'Agent Location: {agent_location}')
print(f'Goal Location: {goal_location}')

>>>
[0, 0, 0, 0, 0]
[0, 0, -1, 0, 0]
[0, 0, 0, 0, 0]
[0, 0, 0, 1, 0]
[0, 0, 0, 0, 0]

Agent Location: (3, 3)
Goal Location: (1, 2)

When taking a look at the locations it could appear there was some error, but they must be read as (row, column) from the highest left to the underside right. Also, keep in mind that the coordinates are zero indexed.

Okay, so the environment is defined. What next?

Expanding on reset
Let’s edit the reset method to handle placing the agent and goal for us. While we’re at it, let’s automate render as well.

class Environment:
def __init__(self, grid_size, render_on=False):
self.grid_size = grid_size
self.grid = []
# Ensure so as to add the brand new attributes
self.render_on = render_on
self.agent_location = None
self.goal_location = None

def reset(self):
# Initialize the empty grid as a 2nd array of 0s
self.grid = np.zeros((self.grid_size, self.grid_size))

# Add the agent and the goal to the grid
self.agent_location = self.add_agent()
self.goal_location = self.add_goal()

if self.render_on:
self.render()

Now, when reset known as, the agent and goal are added to the grid, their initial locations are saved, and if render_on is about to true it should render the grid.

...

# Test Environment
env = Environment(5, render_on=True)
env.reset()

# Now to access agent and goal location you need to use Environment's attributes
print(f'Agent Location: {env.agent_location}')
print(f'Goal Location: {env.goal_location}')

>>>
[0, 0, 0, 0, 0]
[0, 0, 0, 0, 0]
[0, 0, 0, 0, 0]
[0, 0, 0, 0, -1]
[1, 0, 0, 0, 0]

Agent Location: (4, 0)
Goal Location: (3, 4)

Defining the state of the environment
The last method we’ll implement for now’s get_state. At first glance it seems the state might simply be the grid itself, but the issue with this approach is it isn’t what the neural network will expect.

Neural networks typically need one-dimensional input, not the two-dimensional shape that grid currently is represented by. We are able to fix this by flattening the grid using NumPy’s built-in flatten method. This may place each row into the identical array.

def get_state(self):
# Flatten the grid from 2nd to 1d
state = self.grid.flatten()
return state

This may transform:

[0, 0, 0, 0, 0]
[0, 0, 0, 1, 0]
[0, 0, 0, 0, 0]
[0, 0, 0, 0, -1]
[0, 0, 0, 0, 0]

Into:

[0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0]

As you’ll be able to see, it’s not immediately obvious which cells are which, but this will likely be no problem for a deep neural network.

Now we are able to update reset to return the state right after grid is populated. Nothing else will change.

def reset(self):
...

# Return the initial state of the grid
return self.get_state()

Full code up so far..

import random

class Environment:
def __init__(self, grid_size, render_on=False):
self.grid_size = grid_size
self.grid = []
self.render_on = render_on
self.agent_location = None
self.goal_location = None

def reset(self):
# Initialize the empty grid as a 2nd array of 0s
self.grid = np.zeros((self.grid_size, self.grid_size))

# Add the agent and the goal to the grid
self.agent_location = self.add_agent()
self.goal_location = self.add_goal()

if self.render_on:
self.render()

# Return the initial state of the grid
return self.get_state()

def add_agent(self):
# Select a random location
location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))

# Agent is represented by a 1
self.grid[location[0]][location[1]] = 1

return location

def add_goal(self):
# Select a random location
location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))

# Get a random location until it isn't occupied
while self.grid[location[0]][location[1]] == 1:
location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))

# Goal is represented by a -1
self.grid[location[0]][location[1]] = -1

return location

def render(self):
# Convert to a listing of ints to enhance formatting
grid = self.grid.astype(int).tolist()

for row in grid:
print(row)
print('') # So as to add some space between renders for every step

def get_state(self):
# Flatten the grid from 2nd to 1d
state = self.grid.flatten()
return state

You could have now successfully implemented the muse for the environment! Although, when you haven’t noticed, we are able to’t interact with it yet. The agent is stuck in place.

We’ll return to this problem later after the Agent class has been coded to supply higher context.

4. Implement The Agent Neural Architecture and Policy

As stated previously, the agent is the entity that’s given the state of its environment, on this case a flattened version of the world grid, and comes to a decision on what motion to take from the action-space.

Simply to reiterate, the action-space is the set of all possible actions, on this scenario the agent can move up, down, left, and right, so the dimensions of the motion space is 4.

The state-space is the set of all possible states. This could be a massive number depending on the environment and perspective of the agent. In our case, if the world is a 5×5 grid there are 600 possible states, but when the world is a 25×25 grid there are 390,000, wildly increasing the training time.

For an agent to effectively learn to finish a goal it needs a number of things:

  • Neural network to approximate the Q-values (estimated total amount of future reward for an motion) within the case of DQL.
  • Policy or a technique that the agent follows to decide on an motion.
  • Reward signals from the environment to inform an agent how well it’s doing.
  • Ability to coach on past experiences.

There are two different policies one can implement:

  • Greedy Policy: Select the motion with the best Q-value in the present state.
  • Epsilon-Greedy Policy: Select the motion with the best Q-value in the present state, but there’s a small probability, epsilon (commonly denoted as ϵ), to decide on a random motion. If epsilon = 0.02 then there’s a 2% probability that the motion will likely be random.

What we’ll implement is the Epsilon-Greedy Policy.

Why would random actions help the agent learn? Exploration.

When the agent begins, it could learn a suboptimal path to the goal and proceed to make this alternative without ever changing or learning a brand new route.

Starting with a big epsilon value and slowly decreasing it allows the agent to thoroughly explore the environment because it updates its Q-values before exploiting the learned strategies. The quantity we decrease epsilon by over time known as epsilon decay, which is able to make more sense soon.

Like we did with the environment, we’ll represent the agent with a category.

Now, before we implement the policy, we want a approach to get Q-values. That is where our agent’s brain — or neural network — is available in.

The neural network
Without getting too astray here, a neural network is just an enormous function. The values go in, get passed to every layer and transformed, and a few different values come out at the top. Nothing greater than that. The magic is available in when training begins.

The thought is to present the NN large amounts of labeled data like, “here is an input, and here’s what it is best to output”. It slowly adjusts the values between neurons with each training step, attempting to get as close as possible to the given outputs, finding patterns throughout the data, and hopefully helping us predict for inputs the network has never seen.

Diagram: Neural network with an input layer receiving ‘State,’ hidden layers in the middle, and an output layer delivering ‘Action Q-Values.’
Transformation of State to Q-Values through a neural network — Image by creator

The Agent class and defining the neural architecture
For now we’ll define the neural architecture using TensorFlow and give attention to the “forward pass” of the information.

from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Sequential

class Agent:
def __init__(self, grid_size):
self.grid_size = grid_size
self.model = self.build_model()

def build_model(self):
# Create a sequential model with 3 layers
model = Sequential([
# Input layer expects a flattened grid, hence the input shape is grid_size squared
Dense(128, activation='relu', input_shape=(self.grid_size**2,)),
Dense(64, activation='relu'),
# Output layer with 4 units for the possible actions (up, down, left, right)
Dense(4, activation='linear')
])

model.compile(optimizer='adam', loss='mse')

return model

Again, when you are unfamiliar with neural networks, don’t get too caught up on this section. While we use activations like ‘relu’ and ‘linear’ in our model, an in depth exploration of activation functions is beyond the scope of this text.

All you actually need to know is the model takes in state as input, the values are transformed at each layer within the model, and the 4 Q-values corresponding to every motion are output.

In constructing our agent’s neural network, we start with an input layer that processes the state of the grid, represented as a one-dimensional array of size grid_size². It is because we’ve flattened the grid to simplify the input. This layer is our input itself and doesn’t should be defined in our architecture since it takes no input.

Next, we have now two hidden layers. These are values we don’t see, but as our model learns, they’re essential for getting a more in-depth approximation of the Q-value function:

  1. The primary hidden layer has 128 neurons, Dense(128, activation='relu'), and takes the flattened grid as its input.
  2. The second hidden layer consists of 64 neurons, Dense(64, activation='relu'), and further processes the knowledge.

Finally, the output layer, Dense(4, activation='linear'), comprises 4 neurons, corresponding to the 4 possible actions (up, down, left, right). This layer outputs the Q-values — estimates for the long run reward of every motion.

Typically the more complex problems you could have to unravel, the more hidden layers and neurons you will have. Two hidden layers must be plenty for our easy use-case.

Neurons and layers can and must be experimented with to seek out a balance between speed and results — each adding to the network’s ability to capture and learn from the nuances of the information. Just like the state-space, the larger the neural network, the slower training will likely be.

Greedy Policy
Using this neural network, we at the moment are in a position to get a Q-value prediction, albeit not a excellent one yet, and make a call.

import numpy as np   

def get_action(self, state):
# Add an additional dimension to the state to create a batch with one instance
state = np.expand_dims(state, axis=0)

# Use the model to predict the Q-values (motion values) for the given state
q_values = self.model.predict(state, verbose=0)

# Select and return the motion with the best Q-value
motion = np.argmax(q_values[0]) # Take the motion from the primary (and only) entry

return motion

The TensorFlow neural network architecture requires input, the state, to be in batches. This may be very useful for when you could have a lot of inputs and you would like a full batch of outputs, but it could actually be a bit of confusing while you only have one input to predict for.

state = np.expand_dims(state, axis=0)

We are able to fix this by utilizing NumPy’s expand_dims method, specifying axis=0. What this does is just make it a batch of 1 input. For instance the state of a grid of size 5×5:

[0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0]

Becomes:

[[0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0]]

When training the model you’ll typically use batches of size 32 or more. It is going to look something like this:

[[0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, -1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
...
[0, 0, 0, 0, 0, 0, 0, 0, 1, -1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]

Now that we have now prepared the input for the model in the proper format, we are able to predict the Q-values for every motion and select the best one.

...

# Use the model to predict the Q-values (motion values) for the given state
q_values = self.model.predict(state, verbose=0)

# Select and return the motion with the best Q-value
motion = np.argmax(q_values[0]) # Take the motion from the primary (and only) entry

...

We simply give the model the state and it outputs a batch of predictions. Remember, because we’re feeding the network a batch of 1, it should return a batch of 1. Moreover, verbose=0 ensures that the console stays clear of routine debug messages each time the predict function known as.

Finally, we elect and return the index of the motion with the best value using np.argmax on the primary and only entry within the batch.

In our case, the indices 0, 1, 2, and three will likely be mapped to up, down, left, and right respectively.

The Greedy-Policy at all times picks the motion that has the best reward in accordance with the present Q-values, which can not at all times result in the most effective long-term outcomes.

Epsilon-Greedy Policy
We’ve got implemented the Greedy-Policy, but what we would like to have is the Epsilon-Greedy policy. This introduces randomness into the agent’s alternative to permit for exploration of the state-space.

Simply to recap, epsilon is the probability that a random motion will likely be chosen. We also want some approach to decrease this over time because the agent learns, allowing exploitation of its learned policy. As briefly mentioned before, this known as epsilon decay.

The epsilon decay value must be set to a decimal number lower than 1, which is used to progressively reduce the epsilon value after each step the agent takes.

Typically epsilon will start at 1, and epsilon decay will likely be some value very near 1, like 0.998. After each step within the training process you multiply epsilon by the epsilon decay.

As an example this, below is how epsilon will change over the training process.

Initialize Values:
epsilon = 1
epsilon_decay = 0.998

-----------------

Step 1:
epsilon = 1

epsilon = 1 * 0.998 = 0.998

-----------------

Step 2:
epsilon = 0.998

epsilon = 0.998 * 0.998 = 0.996

-----------------

Step 3:
epsilon = 0.996

epsilon = 0.996 * 0.998 = 0.994

-----------------

Step 4:
epsilon = 0.994

epsilon = 0.994 * 0.998 = 0.992

-----------------

...

-----------------

Step 1000:
epsilon = 1 * (0.998)^1000 = 0.135

-----------------

...and so forth

As you’ll be able to see epsilon slowly approaches zero with each step. By step 1000, there’s a 13.5% probability that a random motion will likely be chosen. Epsilon decay is a worth that can should be tweaked based on the state-space. With a big state-space, more exploration could also be obligatory, or the next epsilon decay.

Graph: Epsilon value starts at 1.0, decreases to 0.1 over steps, illustrating epsilon-greedy strategy’s shift from exploration to exploitation.
Decay of epsilon over steps — Image by creator

Even when the agent is trained well, it is useful to maintain a small epsilon value. We should always define a stopping point where epsilon doesn’t get any lower, epsilon end. This might be 0.1, 0.01, and even 0.001 depending on the use-case and complexity of the duty.

Within the figure above, you’ll notice epsilon stops decreasing at 0.1, the pre-defined epsilon end.

Let’s update our Agent class to include epsilon.

import numpy as np

class Agent:
def __init__(self, grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01):
self.grid_size = grid_size
self.epsilon = epsilon
self.epsilon_decay = epsilon_decay
self.epsilon_end = epsilon_end
...

...

def get_action(self, state):

# rand() returns a random value between 0 and 1
if np.random.rand() <= self.epsilon:
# Exploration: random motion
motion = np.random.randint(0, 4)
else:
# Add an additional dimension to the state to create a batch with one instance
state = np.expand_dims(state, axis=0)

# Use the model to predict the Q-values (motion values) for the given state
q_values = self.model.predict(state, verbose=0)

# Select and return the motion with the best Q-value
motion = np.argmax(q_values[0]) # Take the motion from the primary (and only) entry

# Decay the epsilon value to scale back the exploration over time
if self.epsilon > self.epsilon_end:
self.epsilon *= self.epsilon_decay

return motion

We’ve given epsilon, epsilon_decay, and epsilon_end default values of 1, 0.998, and 0.01, respectively.

Remember epsilon, and its associated values, are hyper-parameters, parameters used to regulate the training process. They’ll and must be experimented with to realize the most effective result.

The tactic, get_action, has been updated to include epsilon. If the random value given by np.random.rand is lower than or equal to epsilon, a random motion is chosen. Otherwise, the method is similar as before.

Finally, if epsilon has not reached epsilon_end, we update it by multiplying by epsilon_decay like so — self.epsilon *= self.epsilon_decay.

Agent up so far:

from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Sequential
import numpy as np

class Agent:
def __init__(self, grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01):
self.grid_size = grid_size
self.epsilon = epsilon
self.epsilon_decay = epsilon_decay
self.epsilon_end = epsilon_end
self.model = self.build_model()

def build_model(self):
# Create a sequential model with 3 layers
model = Sequential([
# Input layer expects a flattened grid, hence the input shape is grid_size squared
Dense(128, activation='relu', input_shape=(self.grid_size**2,)),
Dense(64, activation='relu'),
# Output layer with 4 units for the possible actions (up, down, left, right)
Dense(4, activation='linear')
])

model.compile(optimizer='adam', loss='mse')

return model

def get_action(self, state):

# rand() returns a random value between 0 and 1
if np.random.rand() <= self.epsilon:
# Exploration: random motion
motion = np.random.randint(0, 4)
else:
# Add an additional dimension to the state to create a batch with one instance
state = np.expand_dims(state, axis=0)

# Use the model to predict the Q-values (motion values) for the given state
q_values = self.model.predict(state, verbose=0)

# Select and return the motion with the best Q-value
motion = np.argmax(q_values[0]) # Take the motion from the primary (and only) entry

# Decay the epsilon value to scale back the exploration over time
if self.epsilon > self.epsilon_end:
self.epsilon *= self.epsilon_decay

return motion

We’ve got effectively implemented the Epsilon-Greedy Policy, and we’re almost able to enable the agent to learn!

5. Affect The Environment: Ending Up

Environment currently has methods for reseting the grid, adding the agent and goal, providing the present state, and printing the grid to the console.

For the environment to be complete we want to find a way to not only allow the agent to affect it, but additionally provide feedback in the shape of rewards.

Defining the reward structure
Coming up with a very good reward structure is the foremost challenge of reinforcement learning. Your problem might be perfectly throughout the capabilities of the model, but when the reward structure isn’t arrange accurately it could never learn.

The goal of the rewards is to encourage specific behavior. In our case we would like to guide the agent towards the goal cell, defined by -1.

Just like the layers and neurons within the network, and epsilon and its associated values, there might be many right (and plenty of unsuitable) ways to define the reward structure.

The 2 foremost sorts of reward structures:

  • Sparse: When rewards are only given in a handful of states.
  • Dense: When rewards are common throughout the state-space.

With sparse rewards the agent has little or no feedback to steer it. This may be like simply giving a set penalty for every step, and if the agent reaches the goal you provide one large reward.

The agent can definitely learn to achieve the goal, but depending on the dimensions of the state-space it could actually take for much longer and will get stuck on a suboptimal strategy.

That is in contrast with dense reward structures, which permit the agent to coach quicker and behave more predictably.

Dense reward structures either

  • have multiple goal.
  • give hints throughout an episode.

The agent then has more opportunities to learn desired behavior.

As an example, pretend you’re training an agent to make use of a body to walk, and the one reward you give it is that if it reaches a goal. The agent may learn to get there by simply inching or rolling along the bottom, or not even learn in any respect.

As an alternative, when you reward the agent for heading towards the goal, staying on its feet, putting one foot in front of the opposite, and standing up straight, you’re going to get a way more natural and interesting gait while also improving learning.

Allowing the agent to affect the environment
To even have rewards, you will need to allow the agent to interact with its world. Let’s revisit the Environment class to define this interaction.

...

def move_agent(self, motion):
# Map agent motion to the proper movement
moves = {
0: (-1, 0), # Up
1: (1, 0), # Down
2: (0, -1), # Left
3: (0, 1) # Right
}

previous_location = self.agent_location

# Determine the brand new location after applying the motion
move = moves[action]
new_location = (previous_location[0] + move[0], previous_location[1] + move[1])

# Check for a legitimate move
if self.is_valid_location(new_location):
# Remove agent from old location
self.grid[previous_location[0]][previous_location[1]] = 0

# Add agent to latest location
self.grid[new_location[0]][new_location[1]] = 1

# Update agent's location
self.agent_location = new_location

def is_valid_location(self, location):
# Check if the placement is throughout the boundaries of the grid
if (0 <= location[0] < self.grid_size) and (0 <= location[1] < self.grid_size):
return True
else:
return False

The above code first defines the change in coordinates related to each motion value. If the motion 0 is chosen, then the coordinates change by (-1, 0).

Remember, on this scenario the coordinates are interpreted as (row, column). If row lowers by one, the agent moves up one cell, and if column lowers by one, the agent moves left one cell.

It then calculates the brand new location based on the move. If the brand new location is valid, agent_location is updated. Otherwise, the agent_location is left the identical.

Also, is_valid_location simply checks if the brand new location is throughout the grid boundaries.

That’s fairly clear-cut, but what are we missing? Feedback!

Providing feedback
The environment needs to supply an appropriate reward and whether the episode is complete or not.

Let’s incorporate the done flag first to point that an episode is finished.

...

def move_agent(self, motion):
...
done = False # The episode isn't done by default

# Check for a legitimate move
if self.is_valid_location(new_location):
# Remove agent from old location
self.grid[previous_location[0]][previous_location[1]] = 0

# Add agent to latest location
self.grid[new_location[0]][new_location[1]] = 1

# Update agent's location
self.agent_location = new_location

# Check if the brand new location is the reward location
if self.agent_location == self.goal_location:
# Episode is complete
done = True

return done

...

We’ve set done to false by default. If the brand new agent_location is similar as goal_location then done is about to true. Finally, we return this value.

We’re ready for our reward structure. First, I’ll show the implementation for the sparse reward structure. This may be satisfactory for a grid of around 5×5, but we’ll update it to permit for a bigger environment.

Sparse rewards
Implementing sparse rewards is sort of easy. We primarily need to present a reward for landing on the goal.

Let’s also give a small negative reward for every step that doesn’t land on the goal and a bigger one for hitting the boundary. This may encourage our agent to prioritize the shortest path.

...

def move_agent(self, motion):
...
done = False # The episode isn't done by default
reward = 0 # Initialize reward

# Check for a legitimate move
if self.is_valid_location(new_location):
# Remove agent from old location
self.grid[previous_location[0]][previous_location[1]] = 0

# Add agent to latest location
self.grid[new_location[0]][new_location[1]] = 1

# Update agent's location
self.agent_location = new_location

# Check if the brand new location is the reward location
if self.agent_location == self.goal_location:
# Reward for getting the goal
reward = 100

# Episode is complete
done = True
else:
# Small punishment for valid move that didn't get the goal
reward = -1
else:
# Barely larger punishment for an invalid move
reward = -3

return reward, done

...

Ensure to initialize reward in order that it could actually be accessed after the if blocks. Also, check fastidiously for every case: valid move and achieved goal, valid move and didn’t achieve goal, and invalid move.

Dense rewards
Putting our dense reward system into practice remains to be quite easy, it just involves providing feedback more often.

What could be a very good approach to reward the agent to maneuver towards the goal more incrementally?

The primary way is to return the negative of the Manhattan distance. The Manhattan distance is the space within the row direction, plus the space within the column direction, slightly than because the crow flies. Here’s what that appears like in code:

reward = -(np.abs(self.goal_location[0] - new_location[0]) + 
np.abs(self.goal_location[1] - new_location[1]))

So, the variety of steps within the row direction plus the variety of steps within the column direction, negated.

The opposite way we are able to do that is provide a reward based on the direction the agent moves: if it moves away from the goal provide a negative reward and if it moves toward it provide a positive reward.

We are able to calculate this by subtracting the brand new Manhattan distance from the previous Manhattan distance. It is going to either be 1 or -1 since the agent can only move one cell per step.

In our case it will make most sense to decide on the second option. This could provide higher results since it gives immediate feedback based on that step slightly than a more general reward.

The code for this selection:

...

def move_agent(self, motion):
...
if self.agent_location == self.goal_location:
...
else:
# Calculate the space before the move
previous_distance = np.abs(self.goal_location[0] - previous_location[0]) +
np.abs(self.goal_location[1] - previous_location[1])

# Calculate the space after the move
new_distance = np.abs(self.goal_location[0] - new_location[0]) +
np.abs(self.goal_location[1] - new_location[1])

# If new_location is closer to the goal, reward = 1, if further, reward = -1
reward = (previous_distance - new_distance)
...

As you’ll be able to see, if the agent didn’t get the goal, we calculate previous_distance, new_distance, after which define reward because the difference of those.

Depending on the performance it could be appropriate to scale it, or any reward within the system. You possibly can do that by simply multiplying by a number (e.g., 0.01, 2, 100) if it must be higher. Their proportions have to effectively guide the agent to the goal. As an example, a reward of 1 for moving closer to the goal and a reward of 0.1 for the goal itself wouldn’t make much sense.

Rewards are proportional. In the event you scale each positive and negative reward by the identical factor it mustn’t generally effect training, other than very large or very small values.

In summary, if the agent is 10 steps away from the goal, and it moves to an area 11 steps away, then reward will likely be -1.

Here is the updated move_agent.

def move_agent(self, motion):
# Map agent motion to the proper movement
moves = {
0: (-1, 0), # Up
1: (1, 0), # Down
2: (0, -1), # Left
3: (0, 1) # Right
}

previous_location = self.agent_location

# Determine the brand new location after applying the motion
move = moves[action]
new_location = (previous_location[0] + move[0], previous_location[1] + move[1])

done = False # The episode isn't done by default
reward = 0 # Initialize reward

# Check for a legitimate move
if self.is_valid_location(new_location):
# Remove agent from old location
self.grid[previous_location[0]][previous_location[1]] = 0

# Add agent to latest location
self.grid[new_location[0]][new_location[1]] = 1

# Update agent's location
self.agent_location = new_location

# Check if the brand new location is the reward location
if self.agent_location == self.goal_location:
# Reward for getting the goal
reward = 100

# Episode is complete
done = True
else:
# Calculate the space before the move
previous_distance = np.abs(self.goal_location[0] - previous_location[0]) +
np.abs(self.goal_location[1] - previous_location[1])

# Calculate the space after the move
new_distance = np.abs(self.goal_location[0] - new_location[0]) +
np.abs(self.goal_location[1] - new_location[1])

# If new_location is closer to the goal, reward = 1, if further, reward = -1
reward = (previous_distance - new_distance)
else:
# Barely larger punishment for an invalid move
reward = -3

return reward, done

The reward for achieving the goal and attempting an invalid move should remain the identical with this structure.

Step penalty
There’s only one thing we’re missing.

The agent is currently not penalized for the way long it takes to achieve the goal. Our implemented reward structure has many net neutral loops. It could go backwards and forwards between two locations eternally, and accumulate no penalty. We are able to fix this by subtracting a small value each step, causing the penalty of moving away to be greater than the reward for moving closer. This illustration should make it much clearer.

Diagram: Two vertically stacked images with three circled representing states, with arrows pointing to and from each. The top image is labeled ‘Without Step Penalty’ with each circle labeled ‘-1’, ‘+1’, and ‘+100’ respectively. The bottom image is labeled ‘With Step Penalty’ with each circle labeled ‘-1.1’, ‘+0.9’, and ‘+100’ respectively.
Reward paths with and with no step penalty — Image by creator

Imagine the agent is starting on the left most node and must make a call. And not using a step penalty, it could decide to go forward, then back as over and over because it wants and its total reward could be 1 before finally moving to the goal.

So mathematically, looping 1000 times after which moving to the goal is just as valid as moving straight there.

Try to assume looping in either case and see how penalty is gathered (or not gathered).

Let’s implement this.

...

# If new_location is closer to the goal, reward = 0.9, if further, reward = -1.1
reward = (previous_distance - new_distance) - 0.1

...

That’s it. The agent should now be incentivized to take the shortest path, stopping looping behavior.

Okay, but what’s the point?
At this point you could be pondering it’s a waste of time to define a reward system and train an agent for a task that might be accomplished with much simpler algorithms.

And also you could be correct.

The rationale we’re doing that is to learn methods to take into consideration guiding your agent to its goal. On this case it could appear trivial, but what if the agent’s environment included items to select up, enemies to battle, obstacles to undergo, and more?

Or a robot in the true world with dozens of sensors and motors that it must coordinate in sequence to navigate complex and varied environments?

Designing a system to do these items using traditional programming could be quite difficult and most definitely wouldn’t behave near as organic or general as using RL and a very good reward structure to encourage an agent to learn optimal strategies.

Reinforcement learning is most useful in applications where defining the precise sequence of steps required to finish the duty is difficult or unattainable because of the complexity and variability of the environment. The one thing you would like for RL to work is to find a way to define what is beneficial behavior and what behavior must be discouraged.

The ultimate Environment method — step.
With the each component of Environment in place we are able to now define the center of the interaction between the agent and the environment.

Thankfully, it is sort of easy.

def step(self, motion):
# Apply the motion to the environment, record the observations
reward, done = self.move_agent(motion)
next_state = self.get_state()

# Render the grid at each step
if self.render_on:
self.render()

return reward, next_state, done

step first moves the agent within the environment and records reward and done. Then it gets the state immediately following this interaction, next_state. Then if render_on is about to true the grid is rendered.

Finally, step returns the recorded values, reward, next_state and done.

These will likely be essential to constructing the experiences our agent will learn from.

Congratulations! You could have officially accomplished the development of the environment in your DRL gym.

Below is the finished Environment class.

import random
import numpy as np

class Environment:
def __init__(self, grid_size, render_on=False):
self.grid_size = grid_size
self.render_on = render_on
self.grid = []
self.agent_location = None
self.goal_location = None

def reset(self):
# Initialize the empty grid as a 2nd array of 0s
self.grid = np.zeros((self.grid_size, self.grid_size))

# Add the agent and the goal to the grid
self.agent_location = self.add_agent()
self.goal_location = self.add_goal()

# Render the initial grid
if self.render_on:
self.render()

# Return the initial state
return self.get_state()

def add_agent(self):
# Select a random location
location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))

# Agent is represented by a 1
self.grid[location[0]][location[1]] = 1
return location

def add_goal(self):
# Select a random location
location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))

# Get a random location until it isn't occupied
while self.grid[location[0]][location[1]] == 1:
location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1))

# Goal is represented by a -1
self.grid[location[0]][location[1]] = -1

return location

def move_agent(self, motion):
# Map agent motion to the proper movement
moves = {
0: (-1, 0), # Up
1: (1, 0), # Down
2: (0, -1), # Left
3: (0, 1) # Right
}

previous_location = self.agent_location

# Determine the brand new location after applying the motion
move = moves[action]
new_location = (previous_location[0] + move[0], previous_location[1] + move[1])

done = False # The episode isn't done by default
reward = 0 # Initialize reward

# Check for a legitimate move
if self.is_valid_location(new_location):
# Remove agent from old location
self.grid[previous_location[0]][previous_location[1]] = 0

# Add agent to latest location
self.grid[new_location[0]][new_location[1]] = 1

# Update agent's location
self.agent_location = new_location

# Check if the brand new location is the reward location
if self.agent_location == self.goal_location:
# Reward for getting the goal
reward = 100

# Episode is complete
done = True
else:
# Calculate the space before the move
previous_distance = np.abs(self.goal_location[0] - previous_location[0]) +
np.abs(self.goal_location[1] - previous_location[1])

# Calculate the space after the move
new_distance = np.abs(self.goal_location[0] - new_location[0]) +
np.abs(self.goal_location[1] - new_location[1])

# If new_location is closer to the goal, reward = 0.9, if further, reward = -1.1
reward = (previous_distance - new_distance) - 0.1
else:
# Barely larger punishment for an invalid move
reward = -3

return reward, done

def is_valid_location(self, location):
# Check if the placement is throughout the boundaries of the grid
if (0 <= location[0] < self.grid_size) and (0 <= location[1] < self.grid_size):
return True
else:
return False

def get_state(self):
# Flatten the grid from 2nd to 1d
state = self.grid.flatten()
return state

def render(self):
# Convert to a listing of ints to enhance formatting
grid = self.grid.astype(int).tolist()
for row in grid:
print(row)
print('') # So as to add some space between renders for every step

def step(self, motion):
# Apply the motion to the environment, record the observations
reward, done = self.move_agent(motion)
next_state = self.get_state()

# Render the grid at each step
if self.render_on:
self.render()

return reward, next_state, done

We’ve got passed through lots at this point. It might be helpful to return to the large picture at the start and reevaluate how each part interacts using your latest knowledge before moving on.

6. Learn From Experiences: Experience Replay

The agent’s model and policy, together with the environment’s reward structure and mechanism for taking steps have all been accomplished, but we want some approach to remember the past in order that the agent can learn from it.

This might be done by saving the experiences.

Each experience consists of a number of things:

  • State: The state before an motion is taken.
  • Motion: What motion was taken on this state.
  • Reward: Positive or negative feedback the agent received from the environment based on its motion.
  • Next State: The state immediately following the motion, allowing the agent to act, not only based on the implications of the present state, but many states upfront.
  • Done: Indicates the top of an experience, letting the agent know if the duty has been accomplished or not. It may be either true or false at each step.

These terms mustn’t be latest to you, nevertheless it never hurts to see them again!

Each experience is related to exactly one step from the agent. This may provide all the context needed to coach it.

The ExperienceReplay class
To maintain track of and serve these experiences when needed, we’ll define one last class, ExperienceReplay.

from collections import deque, namedtuple

class ExperienceReplay:
def __init__(self, capability, batch_size):
# Memory stores the experiences in a deque, so if capability is exceeded it removes
# the oldest item efficiently
self.memory = deque(maxlen=capability)

# Batch size specifices the quantity of experiences that will likely be sampled directly
self.batch_size = batch_size

# Experience is a namedtuple that stores the relevant information for training
self.Experience = namedtuple('Experience', ['state', 'action', 'reward', 'next_state', 'done'])

This class will take capability, an integer value that defines the utmost variety of experiences we’ll save at a time, and batch_size, an integer value that determines what number of experiences we sample at a time for training.

Batching the experiences
In the event you remember, the neural network within the Agent class takes batches of input. While we only used a batch of size one to predict, this is able to be incredibly inefficient for training. Typically, batches of size 32 or higher are more common.

Batching the input for training does two things:

  • Increases efficiency since it allows for parallel processing of multiple data points, reducing computational overhead and making higher use of GPU or CPU resources.
  • Helps the model learn more consistently, because it’s learning from quite a lot of examples directly, which might make it higher at handling latest, unseen data.

Memory
The memory will likely be a deque (short for double-ended queue). This permits us so as to add latest experiences to the front, and because the max length defined by capability is reached, the deque will remove them without having to shift each element as you’d with a Python list. This will greatly improve speed when capability is about to 10,000 or more.

Experience
Each experience will likely be defined as a namedtuple. Although, many other data structures would work, it will improve readability as we extract each part as needed in training.

add_experience and sample_batch implementation
Adding a brand new experience and sampling a batch are slightly straightforward.

import random

def add_experience(self, state, motion, reward, next_state, done):
# Create a brand new experience and store it in memory
experience = self.Experience(state, motion, reward, next_state, done)
self.memory.append(experience)

def sample_batch(self):
# Batch will likely be a random sample of experiences from memory of size batch_size
batch = random.sample(self.memory, self.batch_size)
return batch

The tactic add_experience creates a namedtuple with each a part of an experience, state, motion, reward, next_state, and done, and appends it to memory.

sample_batch is just as easy. It gets and returns a random sample from memory of size batch_size.

Diagram: Experience Replay system storing individual ‘Experience’ units, each comprising state, action, reward, next state, and done status. A subset of these experiences is compiled into a ‘Batch’ that the Agent uses in its learning process to update its decision-making strategy.
Experience Replay storing experiences for Agent to batch and learn from — Image by creator

The last method needed — can_provide_sample
Finally, it will be useful to find a way to ascertain if memory accommodates enough experiences to supply us with a full sample before attempting to get a batch for training.

def can_provide_sample(self):
# Determines if the length of memory has exceeded batch_size
return len(self.memory) >= self.batch_size

Accomplished ExperienceReplay class…

import random
from collections import deque, namedtuple

class ExperienceReplay:
def __init__(self, capability, batch_size):
# Memory stores the experiences in a deque, so if capability is exceeded it removes
# the oldest item efficiently
self.memory = deque(maxlen=capability)

# Batch size specifices the quantity of experiences that will likely be sampled directly
self.batch_size = batch_size

# Experience is a namedtuple that stores the relevant information for training
self.Experience = namedtuple('Experience', ['state', 'action', 'reward', 'next_state', 'done'])

def add_experience(self, state, motion, reward, next_state, done):
# Create a brand new experience and store it in memory
experience = self.Experience(state, motion, reward, next_state, done)
self.memory.append(experience)

def sample_batch(self):
# Batch will likely be a random sample of experiences from memory of size batch_size
batch = random.sample(self.memory, self.batch_size)
return batch

def can_provide_sample(self):
# Determines if the length of memory has exceeded batch_size
return len(self.memory) >= self.batch_size

With the mechanism for saving each experience and sampling from them in place, we are able to return to the Agent class to finally enable learning.

7. Define The Agent’s Learning Process: Fitting The NN

The goal, when training the neural network, is to get the Q-values it produces to accurately represent the long run reward each alternative will provide.

Essentially, we would like the network to learn to predict how worthwhile each decision is, considering not only the immediate reward, but additionally the rewards it may lead to in the long run.

Incorporating future rewards
To realize this, we incorporate the Q-values of the following state into the training process.

When the agent takes an motion and moves to a brand new state, we have a look at the Q-values on this latest state to assist inform the worth of the previous motion. In other words, the potential future rewards influence the perceived value of the present decisions.

The learn method

import numpy as np

def learn(self, experiences):
states = np.array([experience.state for experience in experiences])
actions = np.array([experience.action for experience in experiences])
rewards = np.array([experience.reward for experience in experiences])
next_states = np.array([experience.next_state for experience in experiences])
dones = np.array([experience.done for experience in experiences])

# Predict the Q-values (motion values) for the given state batch
current_q_values = self.model.predict(states, verbose=0)

# Predict the Q-values for the next_state batch
next_q_values = self.model.predict(next_states, verbose=0)
...

Using the provided batch, experiences, we’ll extract each part using list comprehension and the namedtuple values we defined earlier in ExperienceReplay. Then we convert every one right into a NumPy array to enhance efficiency and to align with what the model expects, as explained previously.

Finally, we use the model to predict the Q-values of the present state the motion was taken in and the state immediately following it.

Before continuing with the learn method, I want to clarify something called the discount factor.

Discounting future rewards — the role of gamma
Intuitively, we all know that immediate rewards are generally prioritized when all else is equal. (Would you want your paycheck today or next week?)

Representing this mathematically can seem much less intuitive. When considering the long run, we don’t want it to be equally essential (weighted) as the current. By how much we discount the long run, or lower its effect on each decision, is defined by gamma (commonly denoted by the greek letter γ).

Gamma might be adjusted, with higher values encouraging planning and lower values encouraging more short sighted behavior. We’ll use a default value of 0.99.

The discount factor will just about at all times be between 0 and 1. A reduction factor greater than 1, prioritizing the long run over the current, would introduce unstable behavior and has little to no practical applications.

Implementing gamma and defining the goal Q-values
Recall that within the context of coaching a neural network, the method hinges on two key elements: the input data we offer and the corresponding outputs we would like the network to learn to predict.

We’ll need to supply the network with some goal Q-values which are updated based on the reward given by the environment at this specific state and motion, plus the discounted (by gamma) predicted reward of the most effective motion at the following state.

I do know that’s lots to absorb, but it should be best explained through implementation and example.

import numpy as np
...

class Agent:
def __init__(self, grid_size, epsilon=1, epsilon_decay=0.995, epsilon_end=0.01, gamma=0.99):
...
self.gamma = gamma
...
...

def learn(self, experiences):
...

# Initialize the goal Q-values as the present Q-values
target_q_values = current_q_values.copy()

# Loop through each experience within the batch
for i in range(len(experiences)):
if dones[i]:
# If the episode is finished, there isn't any next Q-value
# [i, actions[i]] is the numpy equivalent of [i][actions[i]]
target_q_values[i, actions[i]] = rewards[i]
else:
# The updated Q-value is the reward plus the discounted max Q-value for the following state
# [i, actions[i]] is the numpy equivalent of [i][actions[i]]
target_q_values[i, actions[i]] = rewards[i] + self.gamma * np.max(next_q_values[i])
...

We’ve defined the category attribute, gamma, with a default value of 0.99.

Then, after getting the prediction for state and next_state that we implemented above, we initialize target_q_values to the present Q-values. These will likely be updated in the next loop.

Updating target_q_values
We loop through each experience within the batch with two cases for updating the values:

  • If the episode is done, the target_q_value for that motion is just the reward given because there isn’t any relevant next_q_value.
  • Otherwise, the episode isn’t done, and the target_q_value for that motion becomes the reward given, plus the discounted Q-value of the anticipated next motion in next_q_values.

Update if done is true:

target_q_values[i, actions[i]] = rewards[i]

Update if done is fake:

target_q_values[i, actions[i]] = rewards[i] + self.gamma * np.max(next_q_values[i])

The syntax here, target_q_values[i, actions[i]], can seem confusing nevertheless it’s essentially the Q-value of the i-th experience, for the motion actions[i].

       Experience in batch   Reward from environment
v v
target_q_values[i, actions[i]] = rewards[i]
^
Index of the motion chosen

That is NumPy’s corresponding to [i][actions[i]] in Python lists. Remember each motion is an index (0 to three).

How target_q_values is updated
Just as an example this more clearly I’ll show how target_q_values more closely aligns with the actual rewards given as we train. Keep in mind that we’re working with a batch. This will likely be a batch of three with example values for simplicity.

Also, be certain that you understand that the entries in experiences are independent. Meaning this isn’t a sequence of steps, but a random sample from a group of individual experiences.

Pretend the values of actions, rewards, dones, current_q_values, and next_q_values are as follows.

gamma = 0.99
actions = [1, 2, 2] # (down, left, left)
rewards = [1, -1, 100] # Rewards given by the environment for the motion
dones = [False, False, True] # Indicating whether the episode is complete

current_q_values = [
[2, 5, -2, -3], # On this state, motion 2 (index 1) is best to this point
[1, 3, 4, -1], # Here, motion 3 (index 2) is currently favored
[-3, 2, 6, 1] # Motion 3 (index 2) has the best Q-value on this state
]

next_q_values = [
[1, 4, -1, -2], # Future Q-values after taking each motion from the primary state
[2, 2, 5, 0], # Future Q-values from the second state
[-2, 3, 7, 2] # Future Q-values from the third state
]

We then copy current_q_values into target_q_values to be updated.

target_q_values = current_q_values

Then, for each experience within the batch we are able to show the associated values.

This isn’t code, but simply an example of the values at each stage. In the event you wander off, you’ll want to refer back to the initial values to see where each is coming from.

Entry 1

i = 0 # That is the primary entry within the batch (first loop)

# First entries of associated values
actions[i] = 1
rewards[i] = 1
dones[i] = False
target_q_values[i] = [2, 5, -2, -3]
next_q_values[i] = [1, 4, -1, -2]

Because dones[i] is fake for this experience we want to contemplate the next_q_values and apply gamma (0.99).

target_q_values[i, actions[i]] = rewards[i] + 0.99 * max(next_q_values[i])

Why get the biggest of next_q_values[i]? Because that might be the following motion chosen and we would like the estimated reward (Q-value).

Then we update the i-th target_q_values on the index corresponding to actions[i] to the reward for this state/motion pair plus the discounted reward for the following state/motion pair.

Listed here are the goal values on this experience after being updated.

# Updated target_q_values[i]
target_q_values[i] = [2, 4.96, -2, -3]
^ ^
i = 0 motion[i] = 1

As you’ll be able to see, for the present state, selecting 1 (down) is now much more desirable since the value is higher and this behavior has been reinforced.

It might help to calculate these yourself to essentially make it clear.

Entry 2

i = 1 # That is the second entry within the batch

# Second entries of associated values
actions[i] = 2
rewards[i] = -1
dones[i] = False
target_q_values[i] = [1, 3, 4, -1]
next_q_values[i] = [2, 2, 5, 0]

dones[i] can also be false here, so we do need to contemplate the next_q_values.

target_q_values[i, actions[i]] = rewards[i] + 0.99 * max(next_q_values[i])

Again, updating the i-th experience’s target_q_values on the index actions[i].

# Updated target_q_values[i]
target_q_values[i] = [1, 3, 3.95, -1]
^ ^
i = 1 motion[i] = 2

Selecting 2 (left) is now less desirable since the Q-value is lower and this behavior is discouraged.

Entry 3

Finally, the last entry within the batch.

i = 2 # That is the third and final entry within the batch

# Second entries of associated values
actions[i] = 2
rewards[i] = 100
dones[i] = True
target_q_values[i] = [-3, 2, 6, 1]
next_q_values[i] = [-2, 3, 7, 2]

dones[i] for this entry is true, indicating that the episode is complete and there will likely be no further actions taken. This implies we don’t consider next_q_values in our update.

target_q_values[i, actions[i]] = rewards[i]

Notice that we simply set target_q_values[i, action[i]] to the worth of rewards[i], because no more actions will likely be taken — there isn’t any future to contemplate.

# Updated target_q_values[i]
target_q_values[i] = [-3, 2, 100, 1]
^ ^
i = 2 motion[i] = 2

Selecting 2 (left) on this and similar states will now be way more desirable.

That is the state where the goal was to the left of the agent, so when that motion was chosen the total reward was given.

Although it could actually seem slightly confusing, the thought is just to make updated Q-values that accurately represent the rewards given by the environment to supply to the neural network. That’s what the NN is presupposed to approximate.

Try to assume it in reverse. Since the reward for reaching the goal is substantial, it should create a propagation effect throughout the states resulting in the one where the agent achieves the goal. That is the facility of gamma in considering the following state and its role within the rippling of reward values backward through the state-space.

Diagram: ‘Rippling Effect’ of Rewards across the State-Space in a Q-learning environment. The central square, representing the highest reward, is surrounded by other squares with progressively decreasing values, illustrating how the reward’s impact diminishes over distance due to the discount factor. Arrows point from high-value squares to adjacent lower-value squares, visually demonstrating the concept of reward propagation through the state-space.
Rippling effect of rewards across the state-space — Image by creator

Above is a simplified version of the Q-values and the effect of the discount factor, only considering the reward for the goal, not the incremental rewards or penalties.

Pick any cell within the grid and move to the best quality adjoining cell. You will notice that it at all times provides an optimal path to the goal.

This effect isn’t immediate. It requires the agent to explore the state and action-space to progressively learn and adjust its strategy, constructing an understanding of how different actions result in various rewards over time.

If the reward structure was fastidiously crafted, it will slowly guide our agent towards taking more advantageous actions.

Fitting the neural network
For the learn method, the last item there’s to do is provide the agent’s neural network with states and their associated target_q_values. TensorFlow will then handle updating the weights to more closely predict these values on similar states.

...

def learn(self, experiences):
states = np.array([experience.state for experience in experiences])
actions = np.array([experience.action for experience in experiences])
rewards = np.array([experience.reward for experience in experiences])
next_states = np.array([experience.next_state for experience in experiences])
dones = np.array([experience.done for experience in experiences])

# Predict the Q-values (motion values) for the given state batch
current_q_values = self.model.predict(states, verbose=0)

# Predict the Q-values for the next_state batch
next_q_values = self.model.predict(next_states, verbose=0)

# Initialize the goal Q-values as the present Q-values
target_q_values = current_q_values.copy()

# Loop through each experience within the batch
for i in range(len(experiences)):
if dones[i]:
# If the episode is finished, there isn't any next Q-value
target_q_values[i, actions[i]] = rewards[i]
else:
# The updated Q-value is the reward plus the discounted max Q-value for the following state
# [i, actions[i]] is the numpy equivalent of [i][actions[i]]
target_q_values[i, actions[i]] = rewards[i] + self.gamma * np.max(next_q_values[i])

# Train the model
self.model.fit(states, target_q_values, epochs=1, verbose=0)

The one latest part is self.model.fit(states, target_q_values, epochs=1, verbose=0). fit takes two foremost arguments: the input data and the goal values we would like. On this case, our input is a batch states and the goal values are the updated Q-values for every state.

epochs=1 simply sets the variety of times you would like the network to attempt to fit to the information. One is enough because we would like it to find a way to generalize well, not to suit to this specific batch. verbose=0 simply tells TensorFlow to not print debug messages like progress bars.

The Agent class is now equipped with the power to learn from experiences nevertheless it needs two more easy methods — save and load.

Saving and loading trained models
Saving and loading the model prevents us from having to completely retrain each time we want it. We are able to use the straightforward TensorFlow methods that only take one argument, file_path.

from tensorflow.keras.models import load_model

def load(self, file_path):
self.model = load_model(file_path)

def save(self, file_path):
self.model.save(file_path)

Make a directory called models, or whatever you want, and you then can save your trained model at set intervals. These files end in .h5. So each time you desire to save your model you just call agent.save(‘models/model_name.h5’). The identical goes for when you desire to load one.

Full Agent class

from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Sequential, load_model
import numpy as np

class Agent:
def __init__(self, grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01, gamma=0.99):
self.grid_size = grid_size
self.epsilon = epsilon
self.epsilon_decay = epsilon_decay
self.epsilon_end = epsilon_end
self.gamma = gamma

def build_model(self):
# Create a sequential model with 3 layers
model = Sequential([
# Input layer expects a flattened grid, hence the input shape is grid_size squared
Dense(128, activation='relu', input_shape=(self.grid_size**2,)),
Dense(64, activation='relu'),
# Output layer with 4 units for the possible actions (up, down, left, right)
Dense(4, activation='linear')
])

model.compile(optimizer='adam', loss='mse')

return model

def get_action(self, state):

# rand() returns a random value between 0 and 1
if np.random.rand() <= self.epsilon:
# Exploration: random motion
motion = np.random.randint(0, 4)
else:
# Add an additional dimension to the state to create a batch with one instance
state = np.expand_dims(state, axis=0)

# Use the model to predict the Q-values (motion values) for the given state
q_values = self.model.predict(state, verbose=0)

# Select and return the motion with the best Q-value
motion = np.argmax(q_values[0]) # Take the motion from the primary (and only) entry

# Decay the epsilon value to scale back the exploration over time
if self.epsilon > self.epsilon_end:
self.epsilon *= self.epsilon_decay

return motion

def learn(self, experiences):
states = np.array([experience.state for experience in experiences])
actions = np.array([experience.action for experience in experiences])
rewards = np.array([experience.reward for experience in experiences])
next_states = np.array([experience.next_state for experience in experiences])
dones = np.array([experience.done for experience in experiences])

# Predict the Q-values (motion values) for the given state batch
current_q_values = self.model.predict(states, verbose=0)

# Predict the Q-values for the next_state batch
next_q_values = self.model.predict(next_states, verbose=0)

# Initialize the goal Q-values as the present Q-values
target_q_values = current_q_values.copy()

# Loop through each experience within the batch
for i in range(len(experiences)):
if dones[i]:
# If the episode is finished, there isn't any next Q-value
target_q_values[i, actions[i]] = rewards[i]
else:
# The updated Q-value is the reward plus the discounted max Q-value for the following state
# [i, actions[i]] is the numpy equivalent of [i][actions[i]]
target_q_values[i, actions[i]] = rewards[i] + self.gamma * np.max(next_q_values[i])

# Train the model
self.model.fit(states, target_q_values, epochs=1, verbose=0)

def load(self, file_path):
self.model = load_model(file_path)

def save(self, file_path):
self.model.save(file_path)

Each class of your deep reinforcement learning gym is now complete! You could have successfully coded Agent, Environment, and ExperienceReplay. The one thing left is the foremost training loop.

8. Executing The Training Loop: Putting It All Together

We’re at the ultimate stretch of the project! Every bit we have now coded, Agent, Environment, and ExperienceReplay, needs some approach to interact.

This will likely be the foremost program where each episode is run and where we define our hyper-parameters like epsilon.

Even though it is fairly easy, I’ll break up each part as we code it to make it more clear.

Initialize each part
First, we set grid_size and use the classes we have now made to initialize each instance.

from environment import Environment
from agent import Agent
from experience_replay import ExperienceReplay

if __name__ == '__main__':

grid_size = 5

environment = Environment(grid_size=grid_size, render_on=True)
agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)
experience_replay = ExperienceReplay(capability=10000, batch_size=32)
...

Now we have now each part we want for the foremost training loop.

Episode and step cap
Next, we’ll define the variety of episodes we would like the training to run, and the max variety of steps allowed in each episode.

Capping the variety of steps helps ensure our agent doesn’t get stuck in a loop and encourages shorter paths. We will likely be fairly generous and for a 5×5 we’ll set the max to 200. This may should be increased for larger environments.

from environment import Environment
from agent import Agent
from experience_replay import ExperienceReplay

if __name__ == '__main__':

grid_size = 5

environment = Environment(grid_size=grid_size, render_on=True)
agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)
experience_replay = ExperienceReplay(capability=10000, batch_size=32)

# Variety of episodes to run before training stops
episodes = 5000
# Max variety of steps in each episode
max_steps = 200
...

Episode loop
In each episode we’ll reset environment and save the initial state. Then we perform each step until either done is true or max_steps is reached. Finally, we save the model. The logic for every step has not been implemented quite yet.

from environment import Environment
from agent import Agent
from experience_replay import ExperienceReplay

if __name__ == '__main__':

grid_size = 5

environment = Environment(grid_size=grid_size, render_on=True)
agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)
experience_replay = ExperienceReplay(capability=10000, batch_size=32)

# Variety of episodes to run before training stops
episodes = 5000
# Max variety of steps in each episode
max_steps = 200

for episode in range(episodes):
# Get the initial state of the environment and set done to False
state = environment.reset()

# Loop until the episode finishes
for step in range(max_steps):
# Logic for every step
...
if done:
break

agent.save(f'models/model_{grid_size}.h5')

Notice we name the model using grid_size since the NN architecture will likely be different for every input size. Attempting to load a 5×5 model right into a 10×10 architecture will throw an error.

Step logic
Finally, inside the step loop we’ll lay out the interaction between each bit as discussed before.

from environment import Environment
from agent import Agent
from experience_replay import ExperienceReplay

if __name__ == '__main__':

grid_size = 5

environment = Environment(grid_size=grid_size, render_on=True)
agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)
experience_replay = ExperienceReplay(capability=10000, batch_size=32)

# Variety of episodes to run before training stops
episodes = 5000
# Max variety of steps in each episode
max_steps = 200

for episode in range(episodes):
# Get the initial state of the environment and set done to False
state = environment.reset()

# Loop until the episode finishes
for step in range(max_steps):
print('Episode:', episode)
print('Step:', step)
print('Epsilon:', agent.epsilon)

# Get the motion alternative from the agents policy
motion = agent.get_action(state)

# Take a step within the environment and save the experience
reward, next_state, done = environment.step(motion)
experience_replay.add_experience(state, motion, reward, next_state, done)

# If the experience replay has enough memory to supply a sample, train the agent
if experience_replay.can_provide_sample():
experiences = experience_replay.sample_batch()
agent.learn(experiences)

# Set the state to the next_state
state = next_state

if done:
break

agent.save(f'models/model_{grid_size}.h5')

For each step of the episode, we start by printing the episode and step number to present us some details about where we’re in training. Moreover, you’ll be able to print epsilon to see what percentage of the agent’s actions are random. It also helps because if you desire to stop for any reason you’ll be able to restart the agent at the identical epsilon value.

After printing the knowledge, we use the agent policy to get motion from this state to take a step in environment, recording the returned values.

Then we save state, motion, reward, next_state, and done as an experience. If experience_replay has enough memory we train agent on a random batch of experiences.

Finally, we set state to next_state and check if the episode is done.

When you’ve run a minimum of one episode you’ll have a model saved you’ll be able to load and either proceed where you left off or evaluate the performance.

After you initialize agent simply use its load method just like how we saved — agent.load(f’models/model_{grid_size}.h5')

You may also add a slight delay at each step when you’re evaluating the model using time — time.sleep(0.5). This causes each step to pause for half a second. Ensure you include import time.

Accomplished training loop

from environment import Environment
from agent import Agent
from experience_replay import ExperienceReplay
import time

if __name__ == '__main__':

grid_size = 5

environment = Environment(grid_size=grid_size, render_on=True)
agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)
# agent.load(f'models/model_{grid_size}.h5')

experience_replay = ExperienceReplay(capability=10000, batch_size=32)

# Variety of episodes to run before training stops
episodes = 5000
# Max variety of steps in each episode
max_steps = 200

for episode in range(episodes):

# Get the initial state of the environment and set done to False
state = environment.reset()

# Loop until the episode finishes
for step in range(max_steps):
print('Episode:', episode)
print('Step:', step)
print('Epsilon:', agent.epsilon)

# Get the motion alternative from the agents policy
motion = agent.get_action(state)

# Take a step within the environment and save the experience
reward, next_state, done = environment.step(motion)
experience_replay.add_experience(state, motion, reward, next_state, done)

# If the experience replay has enough memory to supply a sample, train the agent
if experience_replay.can_provide_sample():
experiences = experience_replay.sample_batch()
agent.learn(experiences)

# Set the state to the next_state
state = next_state

if done:
break

# Optionally, pause for half a second to guage the model
# time.sleep(0.5)

agent.save(f'models/model_{grid_size}.h5')

Whenever you need time.sleep or agent.load you’ll be able to simply uncomment them.

Running this system
Give it a run! It’s best to find a way to successfully train the agent to finish the goal as much as an 8×8 or so grid environment. Any grid size much larger than this and the training begins to struggle.

Attempt to see how large you’ll be able to get the environment. You possibly can do a number of things equivalent to adding layers and neurons to the neural network, changing epsilon_decay, or giving more time to coach. Doing this may solidify your understanding of every part.

As an example, you could notice epsilon reaches epsilon_end slightly fast. Don’t be afraid to vary the epsilon_decay to values of 0.9998 or 0.99998 when you would really like.

Because the grid size grows, the state the network is fed gets exponentially larger.

I’ve included a brief bonus section at the top to repair this and to exhibit that there are a lot of ways you’ll be able to represent the environment for the agent.

9. Wrapping It Up

Congratulations on completing this comprehensive journey through the world of Reinforcement and Deep Q-Learning!

Although there’s at all times more to cover, you may walk away having acquired essential insights and skills.

On this guide you:

  • Were introduced to the core concepts of reinforcement learning and why it’s an important area in AI.
  • Built a straightforward environment, laying the groundwork for agent interaction and learning.
  • Defined the agent’s Neural Network architecture to be used with Deep Q-Learning, enabling your agent to make decisions in additional complex environments than traditional Q-Learning.
  • Understood why exploration is vital before exploiting the learned strategy and implemented the Epsilon-Greedy policy.
  • Implemented the reward system to guide the agent to the goal and learned the differences between sparse and dense rewards.
  • Designed the experience replay mechanism, allowing the agent to learn from past experiences.
  • Gained hands-on experience in fitting the neural network, a critical process where the agent improves its performance based on feedback from the environment.
  • Put all these pieces together in a training loop, witnessing the agent’s learning process in motion and tweaking it for optimal performance.

By now, it is best to feel confident in your understanding of Reinforcement Learning and Deep Q-Learning. You’ve built a solid foundation, not only in theory but additionally in practical application, by constructing a DRL gym from scratch.

This data equips you to tackle more complex RL problems and paves the way in which for further exploration on this exciting field of AI.

Gif: Grid displays multicolored circles playing a game inspired by Agar.io. Each circle is labeled with its respective size. You can see them collect small circles before eventually eating one another until a single circle is left as the winner.
Agar.io inspired game where agents are encouraged to eat each other to win — GIF by creator

Above is a grid game inspired by Agar.io where agents are encouraged to grow in size, often from eating each other. At each step the environment was plotted on a graph using the Python library, Matplotlib. The boxes across the agents are their field of view. That is fed to them as their state from the environment as a flattened grid, just like what we’ve done in our system.

Games like this, and a myriad of other uses, might be crafted with easy modifications to what you could have made here.

Remember though, Deep Q-Learning is simply suitable for a discrete action-space — one which has a finite variety of distinct actions. For a continuous action-space, like in a physics based environment, you will have to explore other methods on the earth of DRL.

10. Bonus: Optimize State Representation

Imagine it or not, the way in which we have now currently been representing state isn’t probably the most optimal for this use.

It is definitely incredibly inefficient.

For a grid of 100×100 there are 99,990,000 possible states. Not only would the model should be quite large considering the dimensions of the input — 10,000 values, it will require a big volume of coaching data. Depending on the computational resources one has available this might take days or even weeks.

One other downfall is flexibility. The model currently is stuck at one grid size. If you desire to use a special sized grid, you might want to train one other model completely from scratch.

We want a approach to represent the state that significantly reduces the state-space and translates well to any grid size.

The higher way
While there are several ways to do that, the best, and possibly simplest, is to make use of the relative distance from the goal.

Somewhat than the state for a 5×5 grid looking like this:

[0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0]

It may be represented with only two values:

[-2, -1]

Using this method would lower the state-space of a 100×100 grid from 99,990,000 to 39,601!

Not only that, but it could actually generalize a lot better. It simply has to learn that moving down is the precise alternative when the primary value is negative, and moving right is acceptable when the second value is negative, with the alternative actions applying for positive values.

This allows the model to only explore a fraction of the state-space.

Gif: Labeled ‘Learning Progression Across Episodes’. Legend shows the color white as ‘Goal’, blue as ‘Up’, red as ‘Down’, green as ‘Left’ and yellow as ‘right’. The grid shows the agents choice at each cell if the ‘Goal’ is in the center. The agents choice slowly changes to optimal as the ‘Episode’ count at the bottom increases — eventually settling on an optimal strategy around episode 9.
25×25 heat-map of agent’s decisions at each cell with the goal in the middle—GIF by creator

Above is the progression of a model’s learning, trained on a 25×25 grid. It shows the agent’s alternative color coded at each cell with the goal in the middle.

At first, in the course of the exploration stage, the agent’s strategy is totally off. You possibly can see that it chooses to go up when it’s above the goal, down when it’s below, and so forth.

But in under 10 episodes it learns a technique that permits it to achieve the goal within the shortest variety of steps from any cell.

This also applies with the goal at any location.

Diagram: Labeled ‘Varied Goal Locations’. Legend shows the color white as ‘Goal’, blue as ‘Up’, red as ‘Down’, green as ‘Left’ and yellow as ‘right’. There are four grids showing the optimal choice for the agent at each cell with the goal at different locations.
4 25×25 heat-maps of the model applied to varied goal locations — Image by creator

And at last it generalizes its learning incredibly well.

Diagram: Labeled ‘Model Strategy For 201x201 Grid’. Legend shows the color white as ‘Goal’, blue as ‘Up’, red as ‘Down’, green as ‘Left’ and yellow as ‘right’. The grid shows the agents optimal choice at each cell if the ‘Goal’ is in the center. Blue under the goal, green to the right, etc.
201×201 heat-map of the 25×25 model’s decisions, showing generalization — Image by creator

This model has only ever seen a 25×25 grid, nevertheless it could use its strategy on a far larger environment — 201×201. With an environment this size there are 1,632,200,400 agent-goal permutations!

Let’s update our code with this radical improvement.

Implementation
There really isn’t much we want to do to get this working, thankfully.

The very first thing is update get_state in Environment.

def get_state(self):
# Calculate row distance and column distance
relative_distance = (self.agent_location[0] - self.goal_location[0],
self.agent_location[1] - self.goal_location[1])

# Unpack tuple into numpy array
state = np.array([*relative_distance])
return state

Somewhat than a flattened version of the grid, we calculate the space from the goal and return it as a NumPy array. The * operator simply unpacks the tuple into individual components. It is going to have the identical effect as doing this — state = np.array([relative_distance[0], relative_distance[1]).

Also, in move_agent we are able to update the penalty for hitting the boundary to be the identical as moving away from the goal. That is in order that while you change the grid size, the agent isn’t discouraged from moving outside where it was originally trained.

def move_agent(self, motion):
...
else:
# Same punishment for an invalid move
reward = -1.1

return reward, done

Updating the neural architecture
Currently our TensorFlow model looks like this. I’ve excluded all the things else for simplicity.

class Agent:
def __init__(self, grid_size, ...):
self.grid_size = grid_size
...
self.model = self.build_model()

def build_model(self):
# Create a sequential model with 3 layers
model = Sequential([
# Input layer expects a flattened grid, hence the input shape is grid_size squared
Dense(128, activation='relu', input_shape=(self.grid_size**2,)),
Dense(64, activation='relu'),
# Output layer with 4 units for the possible actions (up, down, left, right)
Dense(4, activation='linear')
])

model.compile(optimizer='adam', loss='mse')

return model
...

In the event you remember, our model architecture must have a consistent input. On this case, the input size relied on grid_size.

With our updated state representation, each state will only have two values regardless of what grid_size is. We are able to update the model to expect this. Also, we are able to remove self.grid_size altogether since the Agent class now not relies on it.

class Agent:
def __init__(self, ...):
...
self.model = self.build_model()

def build_model(self):
# Create a sequential model with 3 layers
model = Sequential([
# Input layer expects a flattened grid, hence the input shape is grid_size squared
Dense(64, activation='relu', input_shape=(2,)),
Dense(32, activation='relu'),
# Output layer with 4 units for the possible actions (up, down, left, right)
Dense(4, activation='linear')
])

model.compile(optimizer='adam', loss='mse')

return model
...

The input_shape parameter expects a tuple representing the state of the input.

(2,) specifies a one-dimensional array with two values. Looking something like this:

[-2, 0]

While (2,1), a two-dimensional array for instance, specifies two rows and one column. Looking something like this:

[[-2],
[0]]

Finally, we’ve lowered the variety of neurons in our hidden layers to 64 and 32 respectively. With this straightforward state representation it’s still probably overkill, but should run plenty fast enough.

Whenever you start training, attempt to see how few neurons you would like for the model to effectively learn. You possibly can even try removing the second layer when you like.

Fixing the foremost training loop
The training loop requires only a few adjustments. Let’s update it to match our changes.

from environment import Environment
from agent import Agent
from experience_replay import ExperienceReplay
import time

if __name__ == '__main__':

grid_size = 5

environment = Environment(grid_size=grid_size, render_on=True)
agent = Agent(epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)
# agent.load(f'models/model.h5')

experience_replay = ExperienceReplay(capability=10000, batch_size=32)

# Variety of episodes to run before training stops
episodes = 5000
# Max variety of steps in each episode
max_steps = 200

for episode in range(episodes):

# Get the initial state of the environment and set done to False
state = environment.reset()

# Loop until the episode finishes
for step in range(max_steps):
print('Episode:', episode)
print('Step:', step)
print('Epsilon:', agent.epsilon)

# Get the motion alternative from the agents policy
motion = agent.get_action(state)

# Take a step within the environment and save the experience
reward, next_state, done = environment.step(motion)
experience_replay.add_experience(state, motion, reward, next_state, done)

# If the experience replay has enough memory to supply a sample, train the agent
if experience_replay.can_provide_sample():
experiences = experience_replay.sample_batch()
agent.learn(experiences)

# Set the state to the next_state
state = next_state

if done:
break

# Optionally, pause for half a second to guage the model
# time.sleep(0.5)

agent.save(f'models/model.h5')

Because agent now not needs the grid_size, we are able to remove it to stop any errors.

We also now not have to present the model different names for every grid_size, since one model now works on any size.

In the event you’re interested by ExperienceReplay, it should remain the identical.

Please note that there isn’t any one-size-fits-all state representation. In some cases it could make sense to supply the total grid like we did, or a subsection of it like I’ve done with the multi-agent system in section 9. The goal is to seek out a balance between simplifying the state-space and providing adequate information for the agent to learn.

Hyper-parameters
Even a straightforward environment like ours requires adjustments of the hyper-parameters. Keep in mind that these are the values we are able to change that effect training.

Every one we have now discussed includes:

  • epsilon, epsilon_decay, epsilon_end (exploration/exploitation)
  • gamma (discount factor)
  • variety of neurons and layers
  • batch_size, capability (experience replay)
  • max_steps

There are many others, but there’s just yet another we’ll discuss that will likely be critical for learning.

Learning rate
The Learning Rate (LR) is a hyper-parameter of the neural network model.

It mainly tells the neural network how much to regulate its weights — values used for transformation of the input — every time it’s fit to the information.

The values of LR typically range from 1 right down to 0.0000001, with probably the most common being values like 0.01, 0.001, and 0.0001.

Diagram: Labeled ‘Learning Rate — Too Small’, displaying an arrow repeatedly bouncing down one side of a v shaped line with ‘Optimal Strategy’ labeled at the bottom.
Sub-optimal learning rate which will never converge on an optimal strategy — Image by creator

If the training rate is just too low, it may not update the Q-values quickly enough to learn an optimal strategy, a process often known as convergence. In the event you notice that there appears to be a stagnation in learning, or none in any respect, this might be an indication that the training rate isn’t high enough.

While these diagrams on learning rate are greatly simplified, they need to get the essential idea across.

Diagram: Labeled ‘Learning Rate — Too Large’, displaying an arrow repeatedly bouncing higher and higher up a v shaped line with ‘Optimal Strategy’ labeled at the bottom.
Sub-optimal learning rate that causes the Q-Values to proceed to grow exponentially — Image by creator

One the opposite side, a learning rate that is just too high could cause your values to “explode” or turn into increasingly large. The adjustments the model makes are too great, causing it to diverge — or worsen over time.

What’s the right learning rate?
How long is a bit of string?

In lots of cases you simply must use easy trial and error. A very good approach to determine in case your learning rate is the difficulty is to ascertain the output of the model.

This is strictly the difficulty I used to be facing when training this model. After switching to the simplified state representation, it refused to learn. The agent would actually proceed to go to the underside right of the grid after extensively testing each hyper-parameter.

It didn’t make sense to me, so I made a decision to try the Q-values output by the model within the Agent get_action method.

Step 10
[[ 0.29763165 0.28393078 -0.01633328 -0.45749056]]

Step 50
[[ 7.173178 6.3558702 -0.48632553 -3.1968129 ]]

Step 100
[[ 33.015953 32.89661 33.11674 -14.883122]]

Step 200
[[573.52844 590.95685 592.3647 531.27576]]

...

Step 5000
[[37862352. 34156752. 35527612. 37821140.]]

That is an example of exploding values.

In TensorFlow the optimizer we’re using to regulate the weights, Adam, has a default learning rate of 0.001. For this specific case it happened to be much too high.

Diagram: Labeled ‘Learning Rate — Balanced’, displaying an arrow repeatedly bouncing down a v shaped line with ‘Optimal Strategy’ labeled at the bottom.
Balanced learning rate, eventually converging to the Optimal Strategy — Image by creator

After testing various values, a sweet spot appears to be at 0.00001.

Let’s implement this.

from tensorflow.keras.optimizers import Adam

def build_model(self):
# Create a sequential model with 3 layers
model = Sequential([
# Input layer expects a flattened grid, hence the input shape is grid_size squared
Dense(64, activation='relu', input_shape=(2,)),
Dense(32, activation='relu'),
# Output layer with 4 units for the possible actions (up, down, left, right)
Dense(4, activation='linear')
])

# Update learning rate
optimizer = Adam(learning_rate=0.00001)

# Compile the model with the custom optimizer
model.compile(optimizer=optimizer, loss='mse')

return model

Be at liberty to regulate this and observe how the Q-values are affected. Also, be sure to import Adam.

Finally, you’ll be able to once more begin training!

Heat-map code
Below is the code for plotting your personal heat-map as shown previously when you have an interest.

import matplotlib.pyplot as plt
import numpy as np
from tensorflow.keras.models import load_model

def generate_heatmap(episode, grid_size, model_path):
# Load the model
model = load_model(model_path)

goal_location = (grid_size // 2, grid_size // 2) # Center of the grid

# Initialize an array to store the colour intensities
heatmap_data = np.zeros((grid_size, grid_size, 3))

# Define colours for every motion
colours = {
0: np.array([0, 0, 1]), # Blue for up
1: np.array([1, 0, 0]), # Red for down
2: np.array([0, 1, 0]), # Green for left
3: np.array([1, 1, 0]) # Yellow for right
}

# Calculate Q-values for every state and determine the colour intensity
for x in range(grid_size):
for y in range(grid_size):
relative_distance = (x - goal_location[0], y - goal_location[1])
state = np.array([*relative_distance]).reshape(1, -1)
q_values = model.predict(state)
best_action = np.argmax(q_values)
if (x, y) == goal_location:
heatmap_data[x, y] = np.array([1, 1, 1])
else:
heatmap_data[x, y] = colours[best_action]

# Plotting the heatmap
plt.imshow(heatmap_data, interpolation='nearest')
plt.xlabel(f'Episode: {episode}')
plt.axis('off')
plt.tight_layout(pad=0)
plt.savefig(f'./figures/heatmap_{grid_size}_{episode}', bbox_inches='tight')

Simply import it into your training loop and run it nevertheless often you want to.

Next steps
Once you could have effectively trained your model and experimented with the hyper-parameters, I encourage you to really make it your personal.

Some ideas for expanding the system:

  • Add obstacles between the agent and goal
  • Create a more varied environment, possibly with randomly generated rooms and pathways
  • Implement a multi-agent cooperation/competition system — hide and seek
  • Create a Pong inspired game
  • Implement resource management equivalent to a hunger or energy system where the agent needs to gather food on the approach to the goal

Here is an example that goes beyond our easy grid system:

Gif: A red square controlled by the agent moves between green rectangles as it plays a game inspired by Flappy Bird.
Flappy Bird inspired game where the agent must avoid the pipes to survive — GIF by creator

Using Pygame, a preferred Python library for making 2nd games, I constructed a Flappy Bird clone. Then I defined the interactions, constraints, and reward structure in our prebuilt Environment class.

I represented the state as the present velocity and site of the agent, the space to the closest pipe, and the placement of the opening.

For the Agent class I simply updated the input size to (4,), added more layers to the NN, and updated the network to only output two values — jump or not jump.

You could find and run this within the flappy_bird directory on the GitHub repo. Ensure to pip install pygame.

This shows that what you’ve built is applicable with quite a lot of environments. You possibly can even have the agent explore a 3d environment or perform more abstract tasks like stock trading.

While expanding your system don’t be afraid to get creative along with your environment, state representation, and reward system. Just like the agent, we learn best by exploration!

I hope constructing a DRL gym from scratch has opened your eyes to the fantastic thing about AI and has inspired you to dive deeper.

LEAVE A REPLY

Please enter your comment!
Please enter your name here