-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathagent.py
More file actions
277 lines (212 loc) · 9.88 KB
/
agent.py
File metadata and controls
277 lines (212 loc) · 9.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
import time
import numpy as np
import torch
import collections
class ReplayBuffer:
def __init__(self, maxlen=5000):
# contains a deque for the replay buffer
# contains also a deque for the weights and an attribute for the probabilities for prioritised action replay
# the probabilies are computed right before each time a batch is sampled
self.buffer = collections.deque(maxlen=maxlen)
self.p = None
self.w = collections.deque(maxlen=maxlen)
class duelNet(torch.nn.Module):
def __init__(self, input_dimension, dim_1=500):
super(duelNet, self).__init__()
self.layer_in = torch.nn.Linear(in_features=input_dimension, out_features=dim_1)
self.layer_v = torch.nn.Linear(in_features=dim_1, out_features=1)
self.layer_a = torch.nn.Linear(in_features=dim_1, out_features=4)
self.relu = torch.nn.ReLU()
def forward(self, inp):
# implementing the duelling deep Q architecture proposed by DeepMind
# first hidden layer
out = self.relu(self.layer_in(inp))
# calculate value stream, returns 1 value for state
values = self.layer_v(out)
# calculate advantage stream, returns 4 advantages, 1 for each action
adv = self.layer_a(out)
# calculate mean of the advantage
adv_mean = torch.mean(adv.unsqueeze(-1), axis=1)
# aggregating layer
q = values + adv.squeeze() - adv_mean
return q
class Agent:
# Function to initialise the agent
def __init__(self, lr=0.005, duel=True):
# self.loss=[]
self.s_time = time.time()
# Set the episode length
self.episode_length = 500
# Reset the total number of steps which the agent has taken
self.num_steps_taken = 0
# The state variable stores the latest state of the agent in the environment
self.state = None
# The action variable stores the latest action which the agent has applied to the environment
self.action = None
self.b = ReplayBuffer(maxlen=5000)
self.net = duelNet(2)
self.target = duelNet(2)
self.update_target()
self.lr = lr
self.e = 1
self.optimiser = torch.optim.Adam(self.net.parameters(), lr=self.lr)
self.loss_fn = torch.nn.MSELoss()
self.stopped = False
self.test = False
self.decay = 0.97
self.net.train()
self.move_dict = {
0: np.array([0.02, 0], dtype=np.float32),
1: np.array([0, 0.02], dtype=np.float32),
2: np.array([-0.02, 0], dtype=np.float32),
3: np.array([0, -0.02], dtype=np.float32),
}
# no variables relating to past actions or states are stored, other than the buffer
def update_target(self):
self.target.load_state_dict(self.net.state_dict())
# Function to check whether the agent has reached the end of an episode
def has_finished_episode(self):
if self.num_steps_taken == self.episode_length or (
self.test and self.num_steps_taken == 100
):
# reset num steps
self.num_steps_taken = 0
# if we were testing and did not stop
if self.test and not self.stopped:
self.test = False
# if we were not testing at all
elif (
not self.test
and (time.time() - self.s_time) > 60
and self.episode_length % 10 == 0
):
# set the testing flag to true
self.test = True
return True
if not self.stopped:
# decay learning rate and epsilon, update target network and reduce episode length
self.lr *= self.decay
self.optimiser = torch.optim.Adam(self.net.parameters(), lr=self.lr)
self.update_target()
self.e *= self.decay
self.episode_length = max(self.episode_length - 5, 100)
# if we havent found goal after a long time, update some parameters to increase exploration
if self.e < 0.1:
# print('e is low',self.e,time.time()-self.s_time)
self.e = 0.45
self.decay = 0.98
self.episode_length = 525
# increase learning rate as well so weights wont stagnate
self.lr = 0.002
self.optimiser = torch.optim.Adam(self.net.parameters(), lr=self.lr)
return True
else:
return False
# Function to get the next action, using whatever method you like
def get_next_action(self, state):
if self.test or self.stopped:
action = self.get_greedy_action(state, discrete=True)
elif np.random.random() < (self.e * (1.003 ** self.num_steps_taken)):
# exploration grows with number of steps per episode
action = np.random.choice([0, 1, 2, 3], p=[0.3, 0.325, 0.05, 0.325])
# actions biased against the left, which corresponds to action value 2
else:
action = self.get_greedy_action(state, discrete=True)
# Update the number of steps which the agent has taken
self.num_steps_taken += 1
# Store the state; this will be used later, when storing the transition
self.state = state
# Store the action; this will be used later, when storing the transition
self.action = action
return self._discrete_action_to_continuous(action)
def _discrete_action_to_continuous(self, discrete_action):
# converts discrete value to continuous, using a dictionary of defined movements
return self.move_dict[discrete_action]
# Function to set the next state and distance, which resulted from applying action self.action at state self.state
def set_next_state_and_distance(self, next_state, distance_to_goal):
# once we have completed early stopping we do not train anymore
if self.stopped:
return
# if we are testing and we hit the goal, set self.stopped to True
if self.test == True:
# environment.show(state)
if distance_to_goal < 0.03:
self.stopped = True # once self.stopped is true, all training stops
self.e = 0
print(
"successful policy! stopped training at",
np.round(time.time() - self.s_time, 3),
)
self.episode_length = 100
return
# Convert the distance to a reward, weight also how far right the agent has moved
reward = 0.5 * (1 - distance_to_goal) + 0.5 * next_state[0]
# reducing reward for bumping into wall
if (
np.sum((self.state - next_state) ** 2) < 0.00005
): # using a distance metric in case of any rounding errors
reward -= 0.02
# since our distance travelled per step is 0.02, we negate the the max expected gain in reward
# prevents over-generalisation over long periods of moving right and when suddenly there is a wall on the right
# Create a transition
transition = (self.state, self.action, reward, next_state)
self.b.buffer.append(np.array(transition, dtype=object))
# add max weight otherwise add a min of 0.01
if len(self.b.w) != 0:
w = np.max(self.b.w)
self.b.w.append(w)
else:
self.b.w.append(0.01)
# dont train if we're testing
if self.test:
return
# batch sizes of 50 are used here
if len(self.b.buffer) > 50:
self.rebalance_p()
minibatch_indices = np.random.choice(
range(len(self.b.buffer)), 50, p=self.b.p, replace=False
)
train_batch = np.vstack([self.b.buffer[i] for i in minibatch_indices])
loss = self.train_batch(train_batch, minibatch_indices) # train the network
def train_batch(self, batch, batch_indices, gamma=0.90):
# set net to train
self.net.train()
state, action, reward, next_state = [
torch.tensor(np.vstack(batch[:, i])).squeeze() for i in range(4)
]
reward = reward.float()
# obtain actions from target
tn_actions = torch.argmax(self.target(next_state).detach(), axis=1)
# obtain q values from q net using actions from target
q2 = self.net(next_state)
q2 = torch.gather(q2, dim=1, index=tn_actions.unsqueeze(-1)).squeeze(-1)
# combine with reward and gamma
q2 = reward + gamma * q2
q1 = self.net(state)
q1 = torch.gather(q1, dim=1, index=action.unsqueeze(-1)).squeeze(-1)
self.optimiser.zero_grad()
# calculate the loss and clamp between -1,1 to ensure stability
loss = torch.clamp(self.loss_fn(q2, q1), -1, 1)
loss.backward()
self.optimiser.step()
# update weights for prioritised action replay
new_w = np.abs((q2 - q1).detach().numpy()) + 0.01
# convert weight deque to np array, update the weights for batch indices, then convert back to a deque
self.b.w = np.array(self.b.w)
self.b.w[batch_indices] = new_w
self.b.w = collections.deque(self.b.w, maxlen=5000)
return loss.item()
# Function to get the greedy action for a particular state
def get_greedy_action(self, state, discrete=False):
# calculate q values from q net and take argmax
self.net.eval()
action = self.net(torch.tensor(state).float().unsqueeze(0)).detach().numpy()
action = np.argmax(action)
if discrete:
return action
else:
return self._discrete_action_to_continuous(action)
def rebalance_p(self, a=0.7):
# updating p values based on current weights
w_power = np.array(self.b.w) ** a
self.b.p = (w_power / np.sum(w_power)).tolist()