Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document experiments on thread-safety #350

Merged
merged 2 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 202 additions & 0 deletions docs/is_thread_safe/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
# Is Stretch Body thread safe?

The short answer: yes, if you don't use asyncio. This tutorial builds up to a few kinds of threading experiments that prove this, and hopefully will serve as a launching point for further exploration.

First, let's define what it means to be thread-safe. If we create an instance of the `Robot` class, we'd like to know if we can safely use it in Python threads. Using the `Robot`, there's two ways we interact with the underlying hardware: reading sensor data and writing motion commands. Our experiments will cover:

1. Single threaded reading and writing
2. Single writer, but multithreaded readers
3. Multithreading for both reading and writing

Having multiple readers, even if only one thread can send commands, would be useful for visualization purposes.

## Writing

Let's create an instance of the `Robot`:

```python
import stretch_body.robot

r = stretch_body.robot.Robot()
assert r.startup()
assert r.is_homed()
```

Then create a "writer", that will send motion commands:

```python
import random

def write():
r.lift.move_to(random.uniform(0.3, 1.1))
r.arm.move_to(random.uniform(0.0, 0.4))
r.end_of_arm.move_to('wrist_roll', random.uniform(-1.0, 1.0))
r.head.move_to('head_pan', random.uniform(-1.0, 1.0))
r.push_command()
```

Then let's define a writing-only runner, that sends motion commands at 10hz:

```python
def wo_runner():
"""write-only runner"""
while True:
write()
time.sleep(0.1)
```

### Results

We can run `wo_runner()` in the main thread:

```python
wo_runner()
```

or launch it in a thread:

```python
import threading
threading.Thread(target=wo_runner).start()
```

When AsyncIO is turned on, the program exits with no error. Likely an exception is being thrown in the thread, but isn't being surfaced to the user. Surfacing this exception is the first step towards making this library thread-safe with AsyncIO.

However, if we [disable AsyncIO](https://github.com/hello-robot/stretch_body/blob/master/docs/robot_parameters.md#use_asyncio), we'll see the robot start to jitter and it tracks random motion goals.

Next, we can run multiple threads with no problem:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again dynamixels don't run any of it's methods in async event loop threads. They just run on regular threads which is to be noted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think asyncio could be the source of the silent error?

Copy link
Contributor

@hello-fazil hello-fazil Aug 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be. I think we should try your tests with the async param disabled to see what happens and another case have the exception hooks commented out to see what happens.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow! Disabling use_asyncio works!

```python
import threading

threading.Thread(target=wo_runner).start()
threading.Thread(target=wo_runner).start()
```

### High-Low Competition

In the previous experiment, it's hard to tell the two-thread jitter apart from the one-thread jitter. So let's write two competing writers:

```python
def low_write():
r.lift.move_to(0.3)
r.arm.move_to(0.0)
r.end_of_arm.move_to('wrist_roll', -1.0)
r.head.move_to('head_pan', -1.0)
r.push_command()

def high_write():
r.lift.move_to(1.1)
r.arm.move_to(0.4)
r.end_of_arm.move_to('wrist_roll', 1.0)
r.head.move_to('head_pan', 1.0)
r.push_command()
```

and create runners for each:

```python
def low_rw_runner():
"""read-and-write runner"""
pl=NBPlot()
while True:
low_write()
read(pl)
time.sleep(0.1)

def high_rw_runner():
"""read-and-write runner"""
pl=NBPlot()
s = time.time()
while time.time() - s < 10:
high_write()
read(pl)
time.sleep(0.1)
```

Then, launch them with:

```python
if __name__ == "__main__":
import threading
threading.Thread(target=low_rw_runner).start()
threading.Thread(target=high_rw_runner).start()
```

Running this, we'll see the threads compete with one another, and the joints will oscillate between their limits, depending on which thread is given more processing time by the operating system at that moment. After 10 seconds, the high writer dies, and the robot moves to its lower joint limits.

## Reading

Let's create a "reader", that will plot the joint states for the four joints:

```python
def read(plotter):
lift_pos = r.lift.status['pos']
arm_pos = r.arm.status['pos']
roll_pos = r.end_of_arm.get_joint('wrist_roll').status['pos']
pan_pos = r.head.get_joint('head_pan').status['pos']
plotter.plot(lift_pos, arm_pos, roll_pos, pan_pos)
```

Then let's define a reading-only runner, that will plot joint state at 10hz:

```python
def ro_runner():
"""read-only runner"""
pl=NBPlot()
while True:
read(pl)
time.sleep(0.1)
```

`NBPlot` is defined within `multiprocessing_plotter.py` in this folder. It enables Matplotlib to plot within threads.

### Results

We can run `ro_runner()` in the main thread without any issues, but if we launch `ro_runner()` in a thread:

```python
import threading
threading.Thread(target=ro_runner).start()
```

Once again, the program exits with no error if we use asyncio and works fine if we disable it. Surfacing the underlying error will be helpful to debug.

## Reading and Writing

For completion, we have `wr_runner()`:

```python
def rw_runner():
"""read-and-write runner"""
pl=NBPlot()
while True:
write()
read(pl)
time.sleep(0.1)
```

We run it with:

```python
import threading

threading.Thread(target=rw_runner).start()
threading.Thread(target=rw_runner).start()
```

And we'll see two live plots pop up and show the robot tracking the motion commands being sent by the two threads.

![live plots](./liveplots.png)

## Code

The code for this experiment lives in two files within this same directory:

- `read_write.py`: which defines the reading and writing runners
- `low_high_competition.py`: which defines the low & high writers that compete in two threads
- `multiprocessing_plotter.py`: which is a helper library for plotting within Python threads. The logic does not impact these experiments.

## Takeaway

This document captures a few experiments around Stretch Body's thread safety. It shows that the library is currently thread safe only if you disable AsyncIO, and highlights some possible ways to start debugging how to make the library thread-safe with AsyncIO.
Binary file added docs/is_thread_safe/liveplots.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
53 changes: 53 additions & 0 deletions docs/is_thread_safe/low_high_competition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import time
import random
import stretch_body.robot
# Required Hack whenever importing cv2 (stretch_body does it implicitly). Details: https://forum.hello-robot.com/t/1047
import os; os.environ['QT_QPA_PLATFORM_PLUGIN_PATH'] = '/usr/lib/x86_64-linux-gnu/qt5/plugins/platforms/libqxcb.so'
from multiprocessing_plotter import NBPlot

r = stretch_body.robot.Robot()
assert r.startup()
assert r.is_homed()

def low_write():
r.lift.move_to(0.3)
r.arm.move_to(0.0)
r.end_of_arm.move_to('wrist_roll', -1.0)
r.head.move_to('head_pan', -1.0)
r.push_command()

def high_write():
r.lift.move_to(1.1)
r.arm.move_to(0.4)
r.end_of_arm.move_to('wrist_roll', 1.0)
r.head.move_to('head_pan', 1.0)
r.push_command()

def read(plotter):
lift_pos = r.lift.status['pos']
arm_pos = r.arm.status['pos']
roll_pos = r.end_of_arm.get_joint('wrist_roll').status['pos']
pan_pos = r.head.get_joint('head_pan').status['pos']
plotter.plot(lift_pos, arm_pos, roll_pos, pan_pos)

def low_rw_runner():
"""read-and-write runner"""
pl=NBPlot()
while True:
low_write()
read(pl)
time.sleep(0.1)

def high_rw_runner():
"""read-and-write runner"""
pl=NBPlot()
s = time.time()
while time.time() - s < 10:
high_write()
read(pl)
time.sleep(0.1)

if __name__ == "__main__":
import threading
threading.Thread(target=low_rw_runner).start()
threading.Thread(target=high_rw_runner).start()
103 changes: 103 additions & 0 deletions docs/is_thread_safe/multiprocessing_plotter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Based on the multiprocessing tutorial here:
# https://matplotlib.org/stable/gallery/misc/multiprocess_sgskip.html

import threading
import multiprocessing as mp
import time
import random

import matplotlib.pyplot as plt
import numpy as np


class ProcessPlotter:
def __init__(self):
self.time = []
self.lift = []
self.arm = []
self.wrist_roll = []
self.head_pan = []

def terminate(self):
plt.close('all')

def call_back(self):
while self.pipe.poll():
command = self.pipe.recv()
if command is None:
self.terminate()
return False
else:
self.time.append(command[0])
self.lift.append(command[1])
self.arm.append(command[2])
self.wrist_roll.append(command[3])
self.head_pan.append(command[4])
self.ax[0].plot(self.time, self.lift, color='red')
self.ax[1].plot(self.time, self.arm, color='green')
self.ax[2].plot(self.time, self.wrist_roll, color='blue')
self.ax[3].plot(self.time, self.head_pan, color='purple')
self.fig.canvas.draw()
return True

def __call__(self, pipe):
self.pipe = pipe
self.fig, self.ax = plt.subplots(4)
self.ax[0].set_ylim([0.0, 1.1])
self.ax[1].set_ylim([0.0, 0.5])
self.ax[2].set_ylim([-1.57, 1.57])
self.ax[3].set_ylim([-4.04, 1.73])
timer = self.fig.canvas.new_timer(interval=250)
timer.add_callback(self.call_back)
timer.start()

plt.show()


class NBPlot:
def __init__(self):
self.plot_pipe, plotter_pipe = mp.Pipe()
self.plotter = ProcessPlotter()
self.plot_process = mp.Process(
target=self.plotter, args=(plotter_pipe,), daemon=True)
self.plot_process.start()
self.start_time = time.time()

def plot(self, lift_pos, arm_pos, roll_pos, pan_pos, finished=False):
send = self.plot_pipe.send
if finished:
send(None)
else:
data = np.array([time.time() - self.start_time, lift_pos, arm_pos, roll_pos, pan_pos])
send(data)


def main(flag):
pl=NBPlot()
while not flag.is_set():
pl.plot(random.uniform(0.0, 1.1), random.uniform(0.0, 0.5), random.uniform(-1.57, 1.57), random.uniform(-4.04, 1.5))
time.sleep(0.25)
pl.plot(None, None, None, None, finished=True)


# if __name__ == '__main__':
# shutdown_flag = threading.Event()
# threading.Thread(target=main, args=(shutdown_flag,)).start()
# threading.Thread(target=main, args=(shutdown_flag,)).start()
# print('Press Ctrl-C to exit')
# try:
# while True:
# pass
# except:
# pass
# finally:
# shutdown_flag.set()
# time.sleep(1)


if __name__ == '__main__':
shutdown_flag = threading.Event()
threading.Thread(target=main, args=(shutdown_flag,)).start()
threading.Thread(target=main, args=(shutdown_flag,)).start()
time.sleep(5)
shutdown_flag.set()
Loading