Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task abort and other stop fix-ups #253

Merged
merged 9 commits into from
Oct 17, 2022
3 changes: 1 addition & 2 deletions docs/developer/clients.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ Process (referred to generally "Operation") exposed by the OCS Agent with the
specified ``agent-instance-id``. Each of these attributes has a set of methods
associated with them for controlling the Operation. The methods for running an
Agent's Operations are described in :ref:`agent_ops`. They are "start",
"status", "wait", and "stop" ("abort" is not implemented at the time of this
writing.)
"status", "wait", "stop" (Process only) and "abort" (Task only).

Once the Client is instantiated, Operations can be commanded, for example, to
start a Process called 'acq' (a common Process name for beginning data
Expand Down
2 changes: 2 additions & 0 deletions docs/developer/writing_an_agent/process.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.. _adding_a_process:

Adding a Process
----------------

Expand Down
55 changes: 55 additions & 0 deletions docs/developer/writing_an_agent/task.rst
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,61 @@ the Task with crossbar.

agent.register_task('print', barebone.print)

Aborting a Task
```````````````
'print' is a very short Task that runs very quickly, however if we have a long
running Task, we might need the ability to stop it before it would normally
complete. OCS supports aborting a Task, however this mechanism needs to be
implemented within the Agent code. This will require adding an aborter
function, which typically will look like this:

.. code-block:: python

def _abort_print(self, session, params):
if session.status == 'running':
session.set_status('stopping')

Within the Task function, at points that are reasonable to request an abort,
you must add a check of the ``session.status`` that then exits the Task with an
error (i.e. returns ``False``) if the status is no longer running. For example:

.. code-block:: python

if session.status != 'running':
return False, 'Aborted print'

Where you insert this interrupt code will vary from Agent to Agent. Tasks that
run quickly do not need an abort to be implemented at all. However, for long
running Tasks abort should be implemented. (We will see this interruption
implementation again in the next step where we discuss
:ref:`adding_a_process`.)

When registering the Task, the aborter must be specified:

.. code-block:: python

agent.register_task('print', barebone.print, aborter=barebone._abort_print)

.. note::

By default the aborter will run in the same threading pattern as the task.
If your Task runs in the main reactor (i.e. is decorated with
``@inlineCallbacks``), then the aborter should also run in the reactor, and so
needs to ``yield`` at the end of the method. In our example this would look
like:

.. code-block:: python

@inlineCallbacks
def _abort_print(self, session, params):
if session.status == 'running':
session.set_status('stopping')
yield

Again, since 'print' runs quickly, we do not implement an aborter for it here.
For an example of an abortable task, see
:func:`ocs.agents.fake_data.agent.FakeDataAgent.delay_task`.

Agent Code
``````````

Expand Down
38 changes: 35 additions & 3 deletions ocs/agents/fake_data/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def __init__(self, agent,
self.log = agent.log
self.lock = threading.Semaphore()
self.job = None

self.channel_names = ['channel_%02i' % i for i in range(num_channels)]
self.sample_rate = max(1e-6, sample_rate) # #nozeros

Expand Down Expand Up @@ -164,6 +165,23 @@ def _stop_acq(self, session, params):
return (ok, {True: 'Requested process stop.',
False: 'Failed to request process stop.'}[ok])

@inlineCallbacks
def count_seconds(self, session, params):
# This process runs entirely in the reactor, as does its stop function.
session.data = {'counter': 0,
'last_update': time.time()}
session.set_status('running')
while session.status == 'running':
yield dsleep(1)
session.data['last_update'] = time.time()
session.data['counter'] += 1
return True, 'Exited on request.'

@inlineCallbacks
def _stop_count_seconds(self, session, params):
yield # Make this a generator.
session.set_status('stopping')

# Tasks

@ocs_agent.param('heartbeat', default=True, type=bool)
Expand All @@ -189,7 +207,8 @@ def set_heartbeat(self, session, params):
def delay_task(self, session, params):
"""delay_task(delay=5, succeed=True)

**Task** - Sleep (delay) for the requested number of seconds.
**Task** (abortable) - Sleep (delay) for the requested number of
seconds.

This can run simultaneously with the acq Process. This Task
should run in the reactor thread.
Expand All @@ -216,14 +235,24 @@ def delay_task(self, session, params):
'delay_so_far': 0}
session.set_status('running')
t0 = time.time()
while True:
while session.status == 'running':
session.data['delay_so_far'] = time.time() - t0
sleep_time = min(0.5, delay - session.data['delay_so_far'])
if sleep_time < 0:
break
yield dsleep(sleep_time)

if session.status != 'running':
return False, 'Aborted after %.1f seconds' % session.data['delay_so_far']

return succeed, 'Exited after %.1f seconds' % session.data['delay_so_far']

@inlineCallbacks
def _abort_delay_task(self, session, params):
if session.status == 'running':
session.set_status('stopping')
yield


def add_agent_args(parser_in=None):
if parser_in is None:
Expand Down Expand Up @@ -262,8 +291,11 @@ def main(args=None):
frame_length=args.frame_length)
agent.register_process('acq', fdata.acq, fdata._stop_acq,
blocking=True, startup=startup)
agent.register_process('count', fdata.count_seconds, fdata._stop_count_seconds,
blocking=False, startup=startup)
agent.register_task('set_heartbeat', fdata.set_heartbeat)
agent.register_task('delay_task', fdata.delay_task, blocking=False)
agent.register_task('delay_task', fdata.delay_task, blocking=False,
aborter=fdata._abort_delay_task)

runner.run(agent, auto_reconnect=True)

Expand Down
2 changes: 2 additions & 0 deletions ocs/agents/host_manager/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,9 @@ def manager(self, session, params):
yield dsleep(max(min(sleep_times), .001))
return True, 'Exited.'

@inlineCallbacks
def _stop_manager(self, session, params):
yield
if session.status == 'done':
return
session.set_status('stopping')
Expand Down
2 changes: 2 additions & 0 deletions ocs/agents/registry/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,10 @@ def main(self, session: ocs_agent.OpSession, params):

return True, "Stopped registry main process"

@inlineCallbacks
def _stop_main(self, session, params):
"""Stop function for the 'main' process."""
yield
if self._run:
session.set_status('stopping')
self._run = False
Expand Down
Loading