Skip to content

Commit

Permalink
feat(lac): add finite-horizon Lyapunov Candidate (#328)
Browse files Browse the repository at this point in the history
This commit adds the sum of cost over a finite horizon as the Lyapunov Candidate. For more
information see [Han et al. 2020](https://arxiv.org/abs/2004.14288).
  • Loading branch information
rickstaa authored Aug 12, 2023
1 parent a69a7f6 commit ed2c85d
Show file tree
Hide file tree
Showing 13 changed files with 434 additions and 65 deletions.
9 changes: 7 additions & 2 deletions docs/source/usage/algorithms/lac.rst
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,14 @@ Where :math:`L_{target}` is the approximation target received from the `infinite
and :math:`\mathcal{D}` the set of collected transition pairs.

.. note::
.. important::
As explained by `Han et al., 2020`_ the sum of cost over a finite time horizon can also be used as the
approximation target. This version still needs to be implemented in the SLC framework.
approximation target (see `Han et al., 2020` eq (9)):

.. math::
L_{target}(s,a) = \sum_{t}^{t+N} \mathbb{E}_{c_{t}}
To use this Lyapunov candidate, supply the LAC algorithm with the ``horizon_length=N`` argument, where ``N`` is the length of the time horizon you want to use.

.. seealso::
The SLC package also contains a LAC implementation using a double Q-Critic (i.e., :ref:`Lyapunov Twin Critic <latc>`).
Expand Down
50 changes: 50 additions & 0 deletions sandbox/test_finite_horizon_replay_buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""Script used for performing some quick tests on the FiniteHorizonReplayBuffer class.
"""
import gymnasium as gym

# from stable_learning_control.common.buffers import TrajectoryBuffer
from stable_learning_control.algos.common.buffers import FiniteHorizonReplayBuffer

if __name__ == "__main__":
env = gym.make("stable_gym:CartPoleCost-v1")

# Dummy algorithm settings.
obs_dim = env.observation_space.shape[0]
act_dim = env.action_space.shape[0]
buffer_size = int(200)
episodes = 10
local_steps_per_epoch = 100

# Create Memory Buffer.
buffer = FiniteHorizonReplayBuffer(
obs_dim=obs_dim,
act_dim=act_dim,
size=buffer_size,
horizon_size=2,
)

# Create test dummy data.
o, _ = env.reset()
ep_ret, ep_len = 0, 0
for episode in range(1, episodes + 1):
print(f"Episode {episode}:")
d, truncated = False, False
t = 0
while not d and not truncated:
# Retrieve data from the environment.
a = env.action_space.sample()
o_, r, d, truncated, _ = env.step(a)
r = episode + t / 100

# Store data in buffer.
buffer.store(o, a, r, o_, d, truncated)

# Update obs (critical!)
o = o_
t += 1

# Finish path.
if d or truncated:
print("Environment terminated or truncated. Resetting.")
o, _ = env.reset()
ep_ret, ep_len, t = 0, 0, 0
45 changes: 22 additions & 23 deletions sandbox/test_gym_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,44 +7,43 @@
import numpy as np

RANDOM_STEP = True
ENV_NAME = "stable_gym:Oscillator-v1"
# ENV_NAME = "stable_gym:Oscillator-v1"
# ENV_NAME = "stable_gym:Ex3EKF-v1"
# ENV_NAME = "stable_Gym:CartPoleCost-v0"
ENV_NAME = "stable_gym:CartPoleCost-v1"
# ENV_NAME = "PandaReach-v1"

if __name__ == "__main__":
env = gym.make(ENV_NAME)
env = gym.make(ENV_NAME, render_mode="human")

# Take T steps in the environment.
T = 1000
tau = 0.1
# Retrieve time step.
tau = env.dt if hasattr(env, "dt") else env.tau if hasattr(env, "tau") else 0.01

# Take one episode in the environment.
d, truncated, t = False, False, 0
path = []
t1 = []
s = env.reset()
print(f"Taking {T} steps in the Cartpole environment.")
for i in range(int(T / tau)):
action = (
time = []
o, _ = env.reset()
print(f"Performing 1 epsisode in the '{ENV_NAME}' environment.")
while not d and not truncated:
a = (
env.action_space.sample()
if RANDOM_STEP
else np.zeros(env.action_space.shape)
)
s, r, done, info = env.step(action)
try:
env.render()
except NotImplementedError:
pass
path.append(s)
t1.append(i * tau)
print("Finished Cartpole environment simulation.")
o, r, d, truncated, _ = env.step(a)
t += tau
path.append(o)
time.append(t)
print(f"Finished '{ENV_NAME}' environment simulation.")

# Plot results.
print("Plot results.")
fig = plt.figure(figsize=(9, 6))
ax = fig.add_subplot(111)
ax.plot(t1, np.array(path)[:, 0], color="orange", label="x")
ax.plot(t1, np.array(path)[:, 1], color="magenta", label="x_dot")
ax.plot(t1, np.array(path)[:, 2], color="sienna", label="theta")
ax.plot(t1, np.array(path)[:, 3], color="blue", label="theta_dot1")
ax.plot(time, np.array(path)[:, 0], color="orange", label="x")
ax.plot(time, np.array(path)[:, 1], color="magenta", label="x_dot")
ax.plot(time, np.array(path)[:, 2], color="sienna", label="theta")
ax.plot(time, np.array(path)[:, 3], color="blue", label="theta_dot1")

handles, labels = ax.get_legend_handles_labels()
ax.legend(handles, labels, loc=2, fancybox=False, shadow=False)
Expand Down
46 changes: 46 additions & 0 deletions sandbox/test_replay_buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""Script used for performing some quick tests on the ReplayBuffer class.
"""
import gymnasium as gym

# from stable_learning_control.common.buffers import TrajectoryBuffer
from stable_learning_control.algos.pytorch.common.buffers import ReplayBuffer

if __name__ == "__main__":
env = gym.make("stable_gym:CartPoleCost-v1")

# Dummy algorithm settings.
obs_dim = env.observation_space.shape[0]
act_dim = env.action_space.shape[0]
buffer_size = int(200)
episodes = 10
local_steps_per_epoch = 100

# Create Memory Buffer.
buffer = ReplayBuffer(
obs_dim=obs_dim,
act_dim=act_dim,
size=buffer_size,
)

# Create test dummy data.
o, _ = env.reset()
ep_ret, ep_len = 0, 0
for episode in range(1, episodes + 1):
print(f"Episode {episode}:")
d, truncated = False, False
while not d and not truncated:
# Retrieve data from the environment.
a = env.action_space.sample()
o_, r, d, truncated, _ = env.step(a)

# Store data in buffer.
buffer.store(o, a, r, o_, d)

# Update obs (critical!)
o = o_

# Finish path.
if d or truncated:
print("Environment terminated or truncated. Resetting.")
o, _ = env.reset()
ep_ret, ep_len = 0, 0
17 changes: 11 additions & 6 deletions sandbox/test_traj_buffer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
"""Script used for testing the **NEW** trajectory buffer.
"""Script used for preforming some quick tests on the TrajectoryBuffer class. This
buffer was created for a new monte-carlo algorithm we had in mind. The buffer is
designed to store trajectories of variable length.
"""
import gymnasium as gym

# from stable_learning_control.common.buffers import TrajectoryBuffer
from stable_learning_control.algos.pytorch.common.buffers import TrajectoryBuffer

if __name__ == "__main__":
# Create dummy environment.
env = gym.make("stable_gym:CartPoleCost-v1")

# Dummy algorithm settings.
Expand All @@ -32,19 +33,23 @@
for t in range(local_steps_per_epoch):
# Retrieve data from the environment.
a = env.action_space.sample()
next_o, r, d, _, _ = env.step(a)
o_, r, d, truncated, _ = env.step(a)

# Store data in buffer.
buffer.store(o, a, r, next_o, d)
buffer.store(o, a, r, o_, d)

# Update obs (critical!)
o = next_o
o = o_

# Finish path.
if d:
if d or truncated:
print("Environment terminated or truncated. Resetting.")
buffer.finish_path()
o, _ = env.reset()
ep_ret, ep_len = 0, 0

# Retrieve data from buffer.
buffer_data = buffer.get(flat=False)

# Print data.
print(f"Epoch {epoch}:")
125 changes: 122 additions & 3 deletions stable_learning_control/algos/common/buffers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


class ReplayBuffer:
"""A simple FIFO experience replay buffer.
"""A simple first-in-first-out (FIFO) experience replay buffer.
Attributes:
obs_buf (numpy.ndarray): Buffer containing the current state.
Expand Down Expand Up @@ -112,8 +112,127 @@ def sample_batch(self, batch_size=32):
return batch


class FiniteHorizonReplayBuffer(ReplayBuffer):
"""A first-in-first-out (FIFO) experience replay buffer that also stores the
expected cumulative finite-horizon reward.
.. note::
The expected cumulative finite-horizon reward is calculated using the following
formula:
.. math::
L_{target}(s,a) = \\sum_{t}^{t+N} \\mathbb{E}_{c_{t}}
Attributes:
horizon_length (int): The length of the finite-horizon.
horizon_rew_buf (numpy.ndarray): Buffer containing the expected cumulative
finite-horizon reward.
"""

def __init__(self, obs_dim, act_dim, size, horizon_length):
"""Initialise the FiniteHorizonReplayBuffer object.
Args:
obs_dim (tuple): The size of the observation space.
act_dim (tuple): The size of the action space.
size (int): The replay buffer size.
horizon_length (int): The length of the finite-horizon.
"""
super().__init__(obs_dim, act_dim, size)

# Throw error if horizon size is larger than buffer size.
if horizon_length > size:
raise ValueError(
f"Horizon size ({horizon_length}) cannot be larger than buffer size "
f"({size})."
)

self.horizon_length = horizon_length
self._path_start_ptr = 0
self._path_length = 0

# Preallocate memory for expected cumulative finite-horizon reward buffer.
self.horizon_rew_buf = np.zeros(int(size), dtype=np.float32)

def store(self, obs, act, rew, next_obs, done, truncated):
"""Add experience tuple to buffer and calculate expected cumulative finite
horizon reward if the episode is done or truncated.
Args:
obs (numpy.ndarray): Start state (observation).
act (numpy.ndarray): Action.
rew (:obj:`numpy.float64`): Reward.
next_obs (numpy.ndarray): Next state (observation)
done (bool): Boolean specifying whether the terminal state was reached.
truncated (bool): Boolean specifying whether the episode was truncated.
"""
super().store(obs, act, rew, next_obs, done)
self._path_length += 1

# Throw error if path length is larger than horizon size.
if self._path_length > self._max_size:
raise ValueError(
f"Path length ({self._path_length}) cannot be larger than buffer "
f"size ({self._max_size})."
)

# Compute the expected cumulative finite-horizon reward if done or truncated.
if done or truncated:
if self.ptr < self._path_start_ptr:
path_ptrs = np.concatenate(
[
np.arange(self._path_start_ptr, self._max_size),
np.arange(0, self.ptr % self._max_size),
]
)
else:
path_ptrs = np.arange(self._path_start_ptr, self.ptr)

path_rew = self.rew_buf[path_ptrs]

# Calculate the expected cumulative finite-horizon reward.
path_rew = np.pad(path_rew, (0, self.horizon_length), mode="edge")
horizon_rew = [
np.sum(path_rew[i : i + self.horizon_length + 1])
for i in range(len(path_rew) - self.horizon_length)
]

# Store the expected cumulative finite-horizon reward.
self.horizon_rew_buf[path_ptrs] = horizon_rew

# Increase path variables.
self._path_length = 0
self._path_start_ptr = (
self.ptr
) # NOTE: Ptr was already increased by super().store().

def sample_batch(self, batch_size=32):
"""Retrieve a batch of experiences and their expected cumulative finite-horizon
reward from buffer.
Args:
batch_size (int, optional): The batch size. Defaults to ``32``.
Returns:
dict: A batch of experiences.
"""
idxs = np.random.randint(0, self.size, size=batch_size)
batch = dict(
obs=self.obs_buf[idxs],
obs_next=self.obs_next_buf[idxs],
act=self.act_buf[idxs],
rew=self.rew_buf[idxs],
horizon_rew=self.horizon_rew_buf[idxs],
done=self.done_buf[idxs],
)
return batch


# NOTE: It was created for a new monte-carlo algorithm we had in mind but currently not
# used.
class TrajectoryBuffer:
"""A simple FIFO trajectory buffer.
"""A simple FIFO trajectory buffer. It can store trajectories of varying lengths
for Monte Carlo or TD-N learning algorithms.
Attributes:
obs_buf (numpy.ndarray): Buffer containing the current state.
Expand Down Expand Up @@ -355,7 +474,7 @@ def get(self, flat=False):
if not self._min_traj_size_warn:
log_to_std_out(
(
"Trajectories shorter than {self._min_traj_size} have been "
f"Trajectories shorter than {self._min_traj_size} have been "
"removed from the buffer."
),
type="warning",
Expand Down
Loading

0 comments on commit ed2c85d

Please sign in to comment.