owned this note
owned this note
Published
Linked with GitHub
###### tags: `IAR`
# IAR - Homework 1 - Reinforcement Learning with Pacman
The aim of Pacman is to collect all the food in his environment (maze). The game is won when Pacman collected all the food.
He has to interact with following objects: walls, ghosts, food and capsules.
Pacman and the ghosts are unable to cross the walls. In general, ghosts should be avoided since they cause the game to end when Pacman collides with one of them (game is lost). The only exception is a collision within a limited amount of time after Pacman collected a capsule, where Pacman is able to eat the ghosts.
## Videos
- classic_pacman_50_trainings : AproximateQLearning with classic features, 50 episodes training
- mean_pacman_50_trainings : ApproximateQLearning with enhanced features, 50 episodes training
- deep_pacman : pacman trained with deep learning, 500 episodes training
- QAgentPacman : pacman with QLearning agent, 2000 episodes training
## Pseudo-codes
In this section, we will present pseudo-codes for each algorithm we implemented during the project.
### Question 1
In this case, we know the Markov Decision Process (MDP). Then it's possible to compute exactly $V_0^{\pi}(s)$ from Q-Values.
```
Pseudo-code :
For each iteration :
For each state :
For each possible action :
Compute the Q-Value for each state-action tuple
If there are Q-Values, then the new state value is the maximum of the Q-Values
For each state :
Update state values with the new values
```
```python=
for k in range(iterations):
new_values = util.Counter()
for state in mdp.getStates():
q_values = util.Counter()
# compute q_values for all actions of the state
for action in mdp.getPossibleActions(state):
q_values[action] = self.computeQValueFromValues(
state, action)
if len(q_values) != 0:
# compute next state's value
new_values[state] = max(q_values.values())
for state in mdp.getStates():
# update the value of the state
self.values[state] = new_values[state]
```
```
Pseudo-code :
If it is a terminal state :
Return None
Else :
For each action in action space :
Compute the Q-Value
Return the action associated to the maximum Q-Value
```
```computeActionFromValues(state)```
```python=
def computeActionFromValues(self, state):
"""
The policy is the best action in the given state
according to the values currently stored in self.values.
You may break ties any way you see fit. Note that if
there are no legal actions, which is the case at the
terminal state, you should return None.
"""
q_values = util.Counter()
if self.mdp.isTerminal(state):
return None
for action in self.mdp.getPossibleActions(state):
q_values[action] = self.computeQValueFromValues(state, action)
return q_values.argMax()
```
$Q(s,a) = \sum P(s'|s) \cdot [R + \gamma \cdot V(s')]$
```
Pseudo-code :
Compute the Q-Value based on the formula above
```
```computeQValueFromValues(state, action)```
```python=
def computeQValueFromValues(self, state, action):
"""
Compute the Q-value of action in state from the
value function stored in self.values.
"""
sub_q_values = util.Counter()
for next_state, prob in self.mdp.getTransitionStatesAndProbs(state, action):
sub_q_values[next_state] = prob * (self.mdp.getReward(state, action, next_state) +
self.discount * self.values[next_state])
return sub_q_values.totalCount()
```
### Question 2
The BridgeCrossing test returns correct output with the given parameters in analysis.py
```python=
def question2():
answerDiscount = 0.9
answerNoise = 0.01
return answerDiscount, answerNoise
```
### Question 3
For every test case, the tests are passing with theses parameters
```python=
def question3a():
answerDiscount = 0.1
answerNoise = 0
answerLivingReward = 0
return answerDiscount, answerNoise, answerLivingReward
# If not possible, return 'NOT POSSIBLE'
def question3b():
answerDiscount = 0.2
answerNoise = 0.2
answerLivingReward = 0.1
return answerDiscount, answerNoise, answerLivingReward
# If not possible, return 'NOT POSSIBLE'
def question3c():
answerDiscount = 0.9
answerNoise = 0
answerLivingReward = 0
return answerDiscount, answerNoise, answerLivingReward
# If not possible, return 'NOT POSSIBLE'
def question3d():
answerDiscount = 0.9
answerNoise = 0.2
answerLivingReward = 0
return answerDiscount, answerNoise, answerLivingReward
# If not possible, return 'NOT POSSIBLE'
def question3e():
answerDiscount = 0.9
answerNoise = 0
answerLivingReward = 1
return answerDiscount, answerNoise, answerLivingReward
# If not possible, return 'NOT POSSIBLE'
```
### Question 4
*Note : Here we have no knowledge about the MDP. Instead, the Q-Values are used to determine the best possible action.*
```
Pseudo-code :
We initialize all Q-Values as Q(s,a)=0.0
```
```__init__```
```python=
def __init__(self, **args):
"You can initialize Q-values here..."
ReinforcementAgent.__init__(self, **args)
"*** YOUR CODE HERE ***"
self.q_values = util.Counter()
```
$Q(s,a) = (1-\alpha) \cdot Q(s,a) + \alpha \cdot [R+\gamma \cdot Q(s',a)]$ (1)
```
Pseudo-code :
Update the Q-Value based on the formula above
```
```update```
```python=
def update(self, state, action, nextState, reward):
"""
The parent class calls this to observe a
state = action => nextState and reward transition.
You should do your Q-Value update here
NOTE: You should never call this function,
it will be called on your behalf
"""
"*** YOUR CODE HERE ***"
self.q_values[state, action] = (1 - self.alpha) * self.getQValue(state, action) + self.alpha * (
reward + self.discount * self.getValue(nextState))
```
```
Pseudo-code :
get all possible legal actions
If no legal action is found:
return 0.0
Else:
Update the Q-Value for all actions in action space based on the formula above
```
```computeValuefromQValues```
```python=
def computeValueFromQValues(self, state):
"""
Returns max_action Q(state,action)
where the max is over legal actions. Note that if
there are no legal actions, which is the case at the
terminal state, you should return a value of 0.0.
"""
"*** YOUR CODE HERE ***"
max_action_Q = util.Counter()
legalActions = self.getLegalActions(state)
if not legalActions:
return 0.0
for action in legalActions:
max_action_Q[action] = self.getQValue(state, action)
return max(max_action_Q.values())
```
```
Pseudo-code :
Return the Q-Value for given state and action
```
```getQValue```
```python=
def getQValue(self, state, action):
"""
Returns Q(state,action)
Should return 0.0 if we have never seen a state
or the Q node value otherwise
"""
"*** YOUR CODE HERE ***"
if self.q_values[state, action] == 0:
return 0.0
return self.q_values[state, action]
```
```
Pseudo-code :
Get all possible legal actions for given state
If no legal action is possible:
return None
Else: get Q-Values for all actions in actions space
If all Q-Values equal zero:
choose a random action
Else: choose action with highest Q-Value
```
```computeActionsfromQValues```
```python=
def computeActionFromQValues(self, state):
"""
Compute the best action to take in a state. Note that if there
are no legal actions, which is the case at the terminal state,
you should return None.
"""
"*** YOUR CODE HERE ***"
legalActions = self.getLegalActions(state)
if legalActions == ():
return None
actions_q_values = util.Counter()
for action in legalActions:
actions_q_values[action] = self.getQValue(state, action)
choosed_action = None
# If all actions equal 0 then take a random choice
all_zeros = True
for action in legalActions:
if actions_q_values[action] != 0:
all_zeros = False
break
if all_zeros:
choosed_action = random.choice(legalActions)
return choosed_action
choosed_action = actions_q_values.argMax()
return choosed_action
```
### Question 5
```
Pseudo-code :
action space = all possible legal actions for given state
If ε:
choose random action from action space
Else:
action = maxQ(S,.), based on formula (1)
return selected action
```
```getAction```
```python=
def getAction(self, state):
"""
Compute the action to take in the current state. With
probability self.epsilon, we should take a random action and
take the best policy action otherwise. Note that if there are
no legal actions, which is the case at the terminal state, you
should choose None as the action.
HINT: You might want to use util.flipCoin(prob)
HINT: To pick randomly from a list, use random.choice(list)
"""
# Pick Action
legalActions = self.getLegalActions(state)
action = None
prob = util.flipCoin(self.epsilon)
if prob:
action = random.choice(legalActions)
else:
action = self.getPolicy(state)
return action
```
### Question 6
Passing without changing anything in the code. Tie-break mechanism is already implemented on multiple q-values egal to 0.
### Question 7
Also passing without changing anything.
### Question 8
$Q(s,a;w) = \sum\nolimits_{i=1}^nf_{i}(s,a) \cdot w_{i}$
```
Pseudocode:
sum = 0
foreach feature in features:
sum += weight(feature) ⋅ feature
return sum
```
```getQValue```
```python=
def getQValue(self, state, action):
"""
Should return Q(state,action) = w * featureVector
where * is the dotProduct operator
"""
"*** YOUR CODE HERE ***"
q_value = util.Counter()
features = self.featExtractor.getFeatures(state, action)
for feature in features:
q_value[feature] = self.weights[feature] * features[feature]
return q_value.totalCount()
```
$w_{i} = w_{i} + \alpha\cdot(r+\gamma\cdot\max_{a'}Q(s',a')-Q(s,a))\cdot f_{i}(s,a)$
```
Pseudocode:
Update weights based on the formula above
```
```update```
```python=
def update(self, state, action, nextState, reward):
"""
Should update your weights based on transition
"""
"*** YOUR CODE HERE ***"
difference = (reward + self.discount *
self.getValue(nextState)) - self.getQValue(state, action)
features = self.featExtractor.getFeatures(state, action)
for feature in features:
self.weights[feature] = self.weights[feature] + \
self.alpha * difference * features[feature]
```
## Features choice
### Existing features
Features are capturing important properties of the state. For the calculation of the Q-Value following there were existing features:
1. Whether a ghost is one step away determines the number of ghosts that are one step away from the location of pacman after he takes the action
- This serves to pritoritise between food collection and selfpreservation
2. How far away the next food is determines distance between food and pacman after he takes the action
- This feature is important to win the game since it motivates Pacman to take the shortest path to eat food
3. Whether food will be eaten
- This feature is also really important to win the game since it encourages Pacman to eat food when it's next to him.
Depending on the different features and their weights, Pacman takes the next action. Features 1. and 3. are essential for winning a game in terms of teaching the agent to avoid dangerous ghosts and gain rewards
### New features
Seeing the trained Pacman avoiding scared ghost whereas he could eat them was very fustrating. So we came with an objective : make Pacman chase ghosts when they are scared.
We first modified the feature 1 so he doesn't avoid ghosts when they're scared. We created new functions to retrieve ghost positions based on their state.
```game.py```
```python=
def getScaredGhostPositions(self):
return [g.getPosition() for g in self.getGhostStates() if g.scaredTimer > 0]
def getActiveGhostPositions(self):
return [g.getPosition() for g in self.getGhostStates() if g.scaredTimer == 0]
```
Then we modified the feature ```#-of-ghosts-1-step-away```
```featuresExtractor.py```
```python=
# count the number of ghosts 1-step away
active_ghosts = state.getActiveGhostPositions()
features["#-of-ghosts-1-step-away"] = sum((next_x, next_y) in Actions.getLegalNeighbors(g, walls) for g in active_ghosts)
```
At this point, he wasn't chasing ghosts yet but when a scared ghost was on his path, he was able to eat him.
Based on the food features, we created two new features to help Pacman chasing scared ghosts : ```eats-scared-ghost``` and ```closest-scared-ghost```
```featuresExtractor.py```
```python=
scared_ghosts = state.getScaredGhostPositions()
if not features["#-of-ghosts-1-step-away"] and scared_ghosts:
features["eats-scared-ghost"] = 10.0
distScaredGhosts = closestScaredGhost((next_x, next_y), scared_ghosts, walls)
if distScaredGhosts is not None:
# make the distance a number less than one otherwise the update
# will diverge wildly
features["closest-scared-ghost"] = float(distScaredGhosts) / (walls.width * walls.height)
def closestScaredGhost(pos, ghosts, walls):
fringe = [(pos[0], pos[1], 0)]
expanded = set()
while fringe:
pos_x, pos_y, dist = fringe.pop(0)
if (pos_x, pos_y) in expanded:
continue
expanded.add((pos_x, pos_y))
# if we find a food at this location then exit
for g in ghosts :
ghost_x = int(g[0])
ghost_y = int(g[1])
if (ghost_x, ghost_y) == (pos_x, pos_y):
#print(dist)
return dist
# otherwise spread out from the location to its neighbours
nbrs = Actions.getLegalNeighbors((pos_x, pos_y), walls)
for nbr_x, nbr_y in nbrs:
fringe.append((nbr_x, nbr_y, dist+1))
# no food found
return None
```
It wasn't ezasy to pull all these parameters together but with these new features our Pacman is now able to chase scared ghosts and win games witn many more points than the old one.
## Neural network architecture
When implementing a deep learning pacman agent we used a classic MLD network. The one we used come from IAT-TP2 in `networks.py` file
```python
class MLP(nn.Module):
def __init__(self, ny, nx, nf, na):
"""
Ce constructeur crée une instance de réseau de neurones de type Multi Layer Perceptron (MLP).
L'architecture choisie doit être choisie de façon à capter toute la complexité du problème
sans pour autant devenir intraitable (trop de paramètres d'apprentissages).
:param na: Le nombre d'actions
:type na: int
"""
super(MLP, self).__init__()
self.flatten = nn.Flatten()
self.layers = nn.Sequential(
nn.Linear(ny*nx*nf, 128),
nn.ReLU(),
nn.Linear(128, na)
)
def forward(self, x):
"""Cette fonction réalise un passage dans le réseau de neurones.
:param x: L'état
:return: Le vecteur de valeurs d'actions (une valeur par action)
"""
x = self.flatten(x)
qvalues = self.layers(x)
return qvalues
```
To retrieve a state that we can use to feed the network we add several function to the GameState class in pacman.py
```python
def getStateMatrices(self):
# Create observation matrix as a combination of
# wall, pacman, ghost, food and capsule matrices
# width, height = state.data.layout.width, state.data.layout.height
width, height = self.data.layout.width, self.data.layout.height
observation = np.zeros((6, height, width))
observation[0] = self.getWallMatrix()
observation[1] = self.getPacmanMatrix()
observation[2] = self.getGhostMatrix()
observation[3] = self.getScaredGhostMatrix()
observation[4] = self.getFoodMatrix()
observation[5] = self.getCapsulesMatrix()
return observation
""" Return wall, ghosts, food, capsules matrices """
def getWallMatrix(self):
""" Return matrix with wall coordinates set to 1 """
width, height = self.data.layout.width, self.data.layout.height
grid = self.data.layout.walls
matrix = np.zeros((height, width), dtype=np.int8)
for i in range(grid.height):
for j in range(grid.width):
# Put cell vertically reversed in matrix
cell = 1 if grid[j][i] else 0
matrix[-1-i][j] = cell
return matrix
def getPacmanMatrix(self):
""" Return matrix with pacman coordinates set to 1 """
width, height = self.data.layout.width, self.data.layout.height
matrix = np.zeros((height, width), dtype=np.int8)
for agentState in self.data.agentStates:
if agentState.isPacman:
pos = agentState.configuration.getPosition()
cell = 1
matrix[-1-int(pos[1])][int(pos[0])] = cell
return matrix
def getGhostMatrix(self):
""" Return matrix with ghost coordinates set to 1 """
width, height = self.data.layout.width, self.data.layout.height
matrix = np.zeros((height, width), dtype=np.int8)
for agentState in self.data.agentStates:
if not agentState.isPacman:
if not agentState.scaredTimer > 0:
pos = agentState.configuration.getPosition()
cell = 1
matrix[-1-int(pos[1])][int(pos[0])] = cell
return matrix
def getScaredGhostMatrix(self):
""" Return matrix with ghost coordinates set to 1 """
width, height = self.data.layout.width, self.data.layout.height
matrix = np.zeros((height, width), dtype=np.int8)
for agentState in self.data.agentStates:
if not agentState.isPacman:
if agentState.scaredTimer > 0:
pos = agentState.configuration.getPosition()
cell = 1
matrix[-1-int(pos[1])][int(pos[0])] = cell
return matrix
def getFoodMatrix(self):
""" Return matrix with food coordinates set to 1 """
width, height = self.data.layout.width, self.data.layout.height
grid = self.data.food
matrix = np.zeros((height, width), dtype=np.int8)
for i in range(grid.height):
for j in range(grid.width):
# Put cell vertically reversed in matrix
cell = 1 if grid[j][i] else 0
matrix[-1-i][j] = cell
return matrix
def getCapsulesMatrix(self):
""" Return matrix with capsule coordinates set to 1 """
width, height = self.data.layout.width, self.data.layout.height
capsules = self.data.layout.capsules
matrix = np.zeros((height, width), dtype=np.int8)
for i in capsules:
# Insert capsule cells vertically reversed into matrix
matrix[-1-i[1], i[0]] = 1
return matrix
```
This code also use to return a numpy array of size (6, y, x) where 6 is nf the number of planes.
Having several planes is used to separate every type of data ( one for the food, capsules, ghosts, scared ghosts, pacman, walls) while still using normalized values (0 if nothing, 1 if something)
Once the state is available we can implement a DQNAgent using the LearningAgent structure in the project.
```
PseudoCode
def __init__:
# initilize all variables
nf
ny
nx
na
epsilon, epsilon_init, epsilon_last_episode, epsilon_max_episode
n_episodes
tau
gamma
alpha
criterion
optimizer
replay_memory(size):
DState
DNextState
DReward
Dterminal
Daction
d # counter
ds # counter of steps
...
def updateQ(state, action, next_state, reward):
target position d in memory arrays
store state in Ds
store next_state in Dsn
store reward in Dr
store terminal boolean in Dt
store action in Da
si ds > size of memory: # we have enough samples
optimizer use zero_grad
batch = get a batch of data
calc current value
calc target value
calc loss = criterion(target, current)
# propagatiob
loss.backward()
optimizer.step()
def select_action:
si random(0, 1) < epsilon:
get a random action among legal ones
select better action according to network values = use policy
# final is called at the last action of each episode
def final():
updateQ
update epsilon # decreasing linearly from init to final_exploration_epsilon
if tau == 1:
hard_update()
else
soft_update()
stopEpisode()
call pacman parent class final function
```
complete code is available in the `qLearningAgent.py` file.
Despite several tests, modifications of the structure of the network (adding hidden layers or modifying their size), tweaking of every parameters, The learning process does not converger onto a satisfying decision behavior.
We did not ended up with good results even with a great number of episodes ( > 30000 ) on a very small layout such as `testClassic`.
Also we did not found a clean solution to prevent de facto the network to decide illegalActions.
While getting too much "Stop" actions, even after increasing food and capsule rewards we also tried to disallow the network to choose this action. This experiment mostly resulted in a pacman running back and forth in 2 directions which is a pretty similar behavior to the "Stop" action...
## Comparison between Q-Learning with linear approximation and Deep Q-Learning
For a game with simple rules like this, where features are easy to define. Linear Approximation seems to converge way more quikly to a good behavior. However for more complex games, with non-easy features the neural network has the ability to determine them himself and thus become interesting.