-
Notifications
You must be signed in to change notification settings - Fork 0
/
continuous_agent.py
58 lines (48 loc) · 2.17 KB
/
continuous_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import numpy as np
import torch
import abstract_agent
import continuous_dqns.dqn
import environments.abstract_environment
import helpers
class ContinuousAgent(abstract_agent.AbstractAgent):
def __init__(self, environment: environments.abstract_environment.AbstractEnvironment,
dqn: continuous_dqns.dqn.ContinuousDQN, stride: float):
super().__init__(environment)
self.dqn = dqn
self.stride = stride
self.dqn.train_q_network = torch.compile(self.dqn.train_q_network)
def sample_angle(self, epsilon: float) -> float:
if epsilon <= np.random.uniform():
return self._get_greedy_angle(self.state)
else:
return np.random.uniform(-np.pi, np.pi)
def step(self, epsilon: float = 0.) -> tuple[tuple, float]:
angle = self.sample_angle(epsilon)
action = self.angle_to_action(angle)
next_state, distance_to_goal = self.environment.step(self.state, action)
reward = self.compute_reward(distance_to_goal)
transition = (self.state, angle, reward, next_state)
self.state = next_state
return transition, distance_to_goal
def _get_greedy_angle(self, state: np.ndarray) -> float:
state = torch.from_numpy(state)
state.unsqueeze_(0)
state = state.to(self.dqn.device)
angle = self.dqn.get_greedy_continuous_angles(state).cpu()
angle.squeeze_(0)
return angle.item()
@staticmethod
def compute_reward(distance_to_goal: np.ndarray) -> np.ndarray:
return -distance_to_goal
def get_greedy_action(self, state: np.ndarray) -> np.ndarray:
angle = self._get_greedy_angle(state)
return self.angle_to_action(angle)
def get_batch_q_values(self, states: torch.Tensor) -> torch.Tensor:
with torch.no_grad():
states = states.to(self.dqn.device)
q_values = self.dqn.get_greedy_continuous_angles(states).cpu()
return q_values
def angle_to_action(self, angle: float) -> np.ndarray:
action = self.stride * np.array([np.cos(angle),
np.sin(angle)], dtype=np.float32)
return helpers.trunc(action, 7)