From 04487a25dc221c050a2cba47db9ce681b08d370a Mon Sep 17 00:00:00 2001 From: simonsobs Date: Wed, 19 Jan 2022 21:51:13 +0100 Subject: [PATCH 1/9] OCSAgent: make process stop() report it's been run Also catch errors in the stop function. Document the signatures for process/task start/stop functions. --- ocs/ocs_agent.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/ocs/ocs_agent.py b/ocs/ocs_agent.py index 42f0a5ff..ce6ee56d 100644 --- a/ocs/ocs_agent.py +++ b/ocs/ocs_agent.py @@ -386,6 +386,12 @@ def register_task(self, name, func, blocking=True, startup=False): launched on startup. If the ``startup`` argument is a dictionary, this is passed to the Operation's start function. + + Notes: + The function func will be called with arguments (session, + params) where session is the active OpSession and params + is passed from the client. + """ self.tasks[name] = AgentTask(func, blocking=blocking) self.sessions[name] = None @@ -412,6 +418,14 @@ def register_process(self, name, start_func, stop_func, blocking=True, startup=F dictionary, this is passed to the Operation's start function. + Notes: + The functions start_func and stop_func will be called with + arguments (session, params) where session is the active + OpSession and params is passed from the client. + + (Passing params to the stop_func might not be supported in + the client library so don't count on that being useful.) + """ self.processes[name] = AgentProcess(start_func, stop_func, blocking=blocking) @@ -756,15 +770,20 @@ def stop(self, op_name, params=None): ocs.OK: the Process stop routine has been launched. """ + print('stop called for {}'.format(op_name)) if op_name in self.tasks: return (ocs.ERROR, 'No implementation for "%s" because it is a task.' % op_name, {}) elif op_name in self.processes: + def _errback(self, *args, **kw): + print(f'Error calling stopper for "{op_name}"; args:', + args, kw) session = self.sessions.get(op_name) if session is None: return (ocs.ERROR, 'No session active.', {}) proc = self.processes[op_name] d2 = threads.deferToThread(proc.stopper, session, params) + d2.addErrback(_errback) return (ocs.OK, 'Requested stop on process "%s".' % op_name, session.encoded()) else: return (ocs.ERROR, 'No process called "%s".' % op_name, {}) From 1134fd0275f6d32ec62fdc67398efe8d5ca81555 Mon Sep 17 00:00:00 2001 From: Matthew Hasselfield Date: Fri, 1 Jul 2022 18:39:57 -0400 Subject: [PATCH 2/9] ocs_agent: support non-blocking (inlineCallbacks) stop functions By default, the code will expect the stop function to run the same way as the start function. Some existing agents have non-blocking start functions, but the stoppers are not written with inlineCallbacks; the current code will notice this and print a warning message. --- ocs/agents/fake_data/agent.py | 23 +++++++++++++++++++++- ocs/ocs_agent.py | 37 ++++++++++++++++++++++++++--------- tests/agents/test_fakedata.py | 6 ++++-- 3 files changed, 54 insertions(+), 12 deletions(-) diff --git a/ocs/agents/fake_data/agent.py b/ocs/agents/fake_data/agent.py index f070661a..d9114096 100644 --- a/ocs/agents/fake_data/agent.py +++ b/ocs/agents/fake_data/agent.py @@ -7,7 +7,7 @@ from os import environ import numpy as np from autobahn.wamp.exception import ApplicationError -from twisted.internet.defer import inlineCallbacks +from twisted.internet.defer import inlineCallbacks, returnValue from autobahn.twisted.util import sleep as dsleep # For logging @@ -23,6 +23,8 @@ def __init__(self, agent, self.log = agent.log self.lock = threading.Semaphore() self.job = None + self._counter_running = None + self.channel_names = ['channel_%02i' % i for i in range(num_channels)] self.sample_rate = max(1e-6, sample_rate) # #nozeros @@ -164,6 +166,23 @@ def _stop_acq(self, session, params): return (ok, {True: 'Requested process stop.', False: 'Failed to request process stop.'}[ok]) + @inlineCallbacks + def count_seconds(self, session, params): + # This process runs entirely in the reactor, as does its stop function. + session.data = {'counter': 0, + 'last_update': time.time()} + session.set_status('running') + while session.status == 'running': + yield dsleep(1) + session.data['last_update'] = time.time() + session.data['counter'] += 1 + return True, 'Exited on request.' + + @inlineCallbacks + def _stop_count_seconds(self, session, params): + yield # Make this a generator. + session.set_status('stopping') + # Tasks @ocs_agent.param('heartbeat', default=True, type=bool) @@ -262,6 +281,8 @@ def main(args=None): frame_length=args.frame_length) agent.register_process('acq', fdata.acq, fdata._stop_acq, blocking=True, startup=startup) + agent.register_process('count', fdata.count_seconds, fdata._stop_count_seconds, + blocking=False, startup=startup) agent.register_task('set_heartbeat', fdata.set_heartbeat) agent.register_task('delay_task', fdata.delay_task, blocking=False) diff --git a/ocs/ocs_agent.py b/ocs/ocs_agent.py index ce6ee56d..0c1d3194 100644 --- a/ocs/ocs_agent.py +++ b/ocs/ocs_agent.py @@ -398,7 +398,8 @@ def register_task(self, name, func, blocking=True, startup=False): if startup is not False: self.startup_ops.append(('task', name, startup)) - def register_process(self, name, start_func, stop_func, blocking=True, startup=False): + def register_process(self, name, start_func, stop_func, blocking=True, + stopper_blocking=None, startup=False): """Register a Process for this agent. Args: @@ -407,9 +408,13 @@ def register_process(self, name, start_func, stop_func, blocking=True, startup=F handle the "start" operation of the Process. stop_func (callable): The function that will be called to handle the "stop" operation of the Process. - blocking (bool): Indicates that ``func`` should be - launched in a worker thread, rather than running in the - main reactor thread. + blocking (bool): Indicates that ``start_func`` should be + launched in a worker thread, rather than running in + the reactor. + stopper_blocking (bool or None): Indicates that + ``stop_func`` should be launched in a worker thread, + rather than running in the reactor. Defaults to the + value of ``blocking``. startup (bool or dict): Controls if and how the Operation is launched when the Agent successfully starts up and connects to the WAMP realm. If False, the Operation @@ -427,8 +432,9 @@ def register_process(self, name, start_func, stop_func, blocking=True, startup=F the client library so don't count on that being useful.) """ - self.processes[name] = AgentProcess(start_func, stop_func, - blocking=blocking) + self.processes[name] = AgentProcess( + start_func, stop_func, blocking=blocking, + stopper_blocking=stopper_blocking) self.sessions[name] = None if startup is not False: self.startup_ops.append(('process', name, startup)) @@ -782,8 +788,18 @@ def _errback(self, *args, **kw): if session is None: return (ocs.ERROR, 'No session active.', {}) proc = self.processes[op_name] - d2 = threads.deferToThread(proc.stopper, session, params) - d2.addErrback(_errback) + if proc.stop_blocking: + d2 = threads.deferToThread(proc.stopper, session, params) + d2.addErrback(_errback) + else: + # Assume it returns a deferred. + d2 = proc.stopper(session, params) + if not isinstance(d2, Deferred): + # Warn but let it slide. + print(f'WARNING: process {op_name} needs to be ' + 'registered with stop_blocking=True.') + else: + d2.addErrback(_errback) return (ocs.OK, 'Requested stop on process "%s".' % op_name, session.encoded()) else: return (ocs.ERROR, 'No process called "%s".' % op_name, {}) @@ -839,10 +855,13 @@ def encoded(self): } class AgentProcess: - def __init__(self, launcher, stopper, blocking=None): + def __init__(self, launcher, stopper, blocking=None, stopper_blocking=None): self.launcher = launcher self.stopper = stopper self.blocking = blocking + if stopper_blocking is None: + stopper_blocking = blocking + self.stopper_blocking = stopper_blocking self.docstring = launcher.__doc__ def encoded(self): diff --git a/tests/agents/test_fakedata.py b/tests/agents/test_fakedata.py index 7b3d6ace..4ea43530 100644 --- a/tests/agents/test_fakedata.py +++ b/tests/agents/test_fakedata.py @@ -30,18 +30,20 @@ def test_fake_data_acq(agent): class TestStopAcq: + @pytest_twisted.inlineCallbacks def test_fake_data_stop_acq_not_running(self, agent): session = create_session('acq') - res = agent._stop_acq(session, params=None) + res = yield agent._stop_acq(session, params=None) assert res[0] is False + @pytest_twisted.inlineCallbacks def test_fake_data_stop_acq_while_running(self, agent): session = create_session('acq') # set running job to 'acq' agent.job = 'acq' - res = agent._stop_acq(session, params=None) + res = yield agent._stop_acq(session, params=None) assert res[0] is True From 50ad87430991ea3d0c7dc89d7d1f1022244d3b57 Mon Sep 17 00:00:00 2001 From: Matthew Hasselfield Date: Fri, 1 Jul 2022 19:14:22 -0400 Subject: [PATCH 3/9] Modify host_manager and registry stopper functions They should run in the reactor now. --- ocs/agents/host_manager/agent.py | 2 ++ ocs/agents/registry/agent.py | 2 ++ tests/agents/test_registry_agent.py | 6 ++++-- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/ocs/agents/host_manager/agent.py b/ocs/agents/host_manager/agent.py index bc7ad8ce..666eb1fa 100644 --- a/ocs/agents/host_manager/agent.py +++ b/ocs/agents/host_manager/agent.py @@ -507,7 +507,9 @@ def manager(self, session, params): yield dsleep(max(min(sleep_times), .001)) return True, 'Exited.' + @inlineCallbacks def _stop_manager(self, session, params): + yield if session.status == 'done': return session.set_status('stopping') diff --git a/ocs/agents/registry/agent.py b/ocs/agents/registry/agent.py index 87c4d45b..aba1be6e 100644 --- a/ocs/agents/registry/agent.py +++ b/ocs/agents/registry/agent.py @@ -187,8 +187,10 @@ def main(self, session: ocs_agent.OpSession, params): return True, "Stopped registry main process" + @inlineCallbacks def _stop_main(self, session, params): """Stop function for the 'main' process.""" + yield if self._run: session.set_status('stopping') self._run = False diff --git a/tests/agents/test_registry_agent.py b/tests/agents/test_registry_agent.py index d7becb29..7c11f1f8 100644 --- a/tests/agents/test_registry_agent.py +++ b/tests/agents/test_registry_agent.py @@ -70,18 +70,20 @@ def test_registry_main_expire_agent(self, agent): class TestStopMain: + @pytest_twisted.inlineCallbacks def test_registry_stop_main_while_running(self, agent): session = create_session('main') # Fake run main process agent._run = True - res = agent._stop_main(session, params=None) + res = yield agent._stop_main(session, params=None) assert res[0] is True + @pytest_twisted.inlineCallbacks def test_registry_stop_main_not_running(self, agent): session = create_session('main') - res = agent._stop_main(session, params=None) + res = yield agent._stop_main(session, params=None) assert res[0] is False From 4eace3db3d27cb181e911eb11330ab997420c10d Mon Sep 17 00:00:00 2001 From: Matthew Hasselfield Date: Tue, 5 Jul 2022 13:07:29 -0400 Subject: [PATCH 4/9] ocs_agent: enable Task "abort" API call. This functions the same way as Process "stop", except that it is entirely optional and should normally cause the operation to exit with error. --- docs/developer/clients.rst | 3 +- ocs/ocs_agent.py | 138 +++++++++++++++++++++++++++---------- 2 files changed, 101 insertions(+), 40 deletions(-) diff --git a/docs/developer/clients.rst b/docs/developer/clients.rst index b4650afe..17016063 100644 --- a/docs/developer/clients.rst +++ b/docs/developer/clients.rst @@ -40,8 +40,7 @@ Process (referred to generally "Operation") exposed by the OCS Agent with the specified ``agent-instance-id``. Each of these attributes has a set of methods associated with them for controlling the Operation. The methods for running an Agent's Operations are described in :ref:`agent_ops`. They are "start", -"status", "wait", and "stop" ("abort" is not implemented at the time of this -writing.) +"status", "wait", "stop" (Process only) and "abort" (Task only). Once the Client is instantiated, Operations can be commanded, for example, to start a Process called 'acq' (a common Process name for beginning data diff --git a/ocs/ocs_agent.py b/ocs/ocs_agent.py index 0c1d3194..18294f19 100644 --- a/ocs/ocs_agent.py +++ b/ocs/ocs_agent.py @@ -369,16 +369,23 @@ def publish_status(self, message, session): self.log.error('Unable to publish status. TransportLost. ' + 'crossbar server likely unreachable.') - def register_task(self, name, func, blocking=True, startup=False): + def register_task(self, name, func, aborter=None, blocking=True, + aborter_blocking=None, startup=False): """Register a Task for this agent. Args: name (string): The name of the Task. func (callable): The function that will be called to handle the "start" operation of the Task. + aborter (callable): The function that will be called to + handle the "abort" operation of the Task (optional). blocking (bool): Indicates that ``func`` should be launched in a worker thread, rather than running in the main reactor thread. + aborter_blocking(bool or None): Indicates that ``aborter`` + should be run in a worked thread, rather than running + in the main reactor thread. Defaults to value of + ``blocking``. startup (bool or dict): Controls if and how the Operation is launched when the Agent successfully starts up and connects to the WAMP realm. If False, the Operation @@ -388,12 +395,18 @@ def register_task(self, name, func, blocking=True, startup=False): function. Notes: - The function func will be called with arguments (session, - params) where session is the active OpSession and params - is passed from the client. + + The functions func and aborter will be called with + arguments (session, params) where session is the active + OpSession and params is passed from the client. + + (Passing params to the aborter might not be supported in + the client library so don't count on that being useful.) """ - self.tasks[name] = AgentTask(func, blocking=blocking) + self.tasks[name] = AgentTask( + func, blocking=blocking, aborter=aborter, + aborter_blocking=aborter_blocking) self.sessions[name] = None if startup is not False: self.startup_ops.append(('task', name, startup)) @@ -763,9 +776,75 @@ def wait(self, op_name, timeout=None): return (ocs.TIMEOUT, 'Operation "%s" still running; wait timed out.' % op_name, session.encoded()) + def _stop_helper(self, stop_type, op_name, params): + """Common stopper/aborter code for Process stop and Task + abort. + + Args: + stop_type (str): either 'stop' or 'abort'. + op_name (str): the op_name. + params (dict or None): Params to be passed to stopper + function. + + """ + print(f'{stop_type} called for {op_name}') + + # Find the op and populate op_type, op, stopper, stopper_blocking. + if op_name in self.tasks: + op_type = 'task' + op = self.tasks[op_name] + stopper = op.aborter + stopper_blocking = op.aborter_blocking + elif op_name in self.processes: + op_type = 'process' + op = self.processes[op_name] + stopper = op.stopper + stopper_blocking = op.stopper_blocking + else: + return (ocs.ERROR, 'No operation called "%s".' % op_name, {}) + + # Make sure the API function matches the op_type ... + if (stop_type == 'stop' and op_type == 'task') or \ + (stop_type == 'abort' and op_type == 'process'): + return (ocs.ERROR, f'Cannot "{stop_type}" "{op_name}" because ' + 'it is a "{op_type}".', {}) + + def _errback(self, *args, **kw): + print(f'Error calling stopper for "{op_name}"; args:', + args, kw) + session = self.sessions.get(op_name) + if session is None: + return (ocs.ERROR, 'No session active.', {}) + + if session.status in ['stopping', 'done']: + return (ocs.ERROR, f'The operation is already {session.status}', {}) + + if stopper_blocking: + # Launch the code in a thread. + d2 = threads.deferToThread(stopper, session, params) + d2.addErrback(_errback) + else: + # Assume the stopper returns a deferred (and will soon run + # in the reactor). + d2 = stopper(session, params) + if not isinstance(d2, Deferred): + # Warn but let it slide. in the past the default was + # to run the stopper in a worker thread. Most + # stoppers run very quickly so it is probably not + # going to break much to have them run in the reactor. + # Change this to an error after all Agents have been + # updated for a while. + print(f'WARNING: {op_type} {op_name} needs to be ' + 'registered with stopper_blocking=True.') + else: + d2.addErrback(_errback) + + return (ocs.OK, f'Requested {stop_type} on {op_type} {op_name}".', + session.encoded()) + def stop(self, op_name, params=None): """ - Launch a Process stop routine. + Initiate a Process stop routine. Returns (status, message, session). @@ -776,46 +855,23 @@ def stop(self, op_name, params=None): ocs.OK: the Process stop routine has been launched. """ - print('stop called for {}'.format(op_name)) - if op_name in self.tasks: - return (ocs.ERROR, 'No implementation for "%s" because it is a task.' % op_name, - {}) - elif op_name in self.processes: - def _errback(self, *args, **kw): - print(f'Error calling stopper for "{op_name}"; args:', - args, kw) - session = self.sessions.get(op_name) - if session is None: - return (ocs.ERROR, 'No session active.', {}) - proc = self.processes[op_name] - if proc.stop_blocking: - d2 = threads.deferToThread(proc.stopper, session, params) - d2.addErrback(_errback) - else: - # Assume it returns a deferred. - d2 = proc.stopper(session, params) - if not isinstance(d2, Deferred): - # Warn but let it slide. - print(f'WARNING: process {op_name} needs to be ' - 'registered with stop_blocking=True.') - else: - d2.addErrback(_errback) - return (ocs.OK, 'Requested stop on process "%s".' % op_name, session.encoded()) - else: - return (ocs.ERROR, 'No process called "%s".' % op_name, {}) + return self._stop_helper('stop', op_name, params) def abort(self, op_name, params=None): """ - Initiate a Task abort routine. This function is not currently - implemented in any useful way. + Initiate a Task abort routine. Returns (status, message, session). Possible values for status: - ocs.ERROR: you called a function that does not do anything. + ocs.ERROR: the specified op_name is not known, or refers to + a Process. Also returned if Task is known but not running. + + ocs.OK: the Process stop routine has been launched. + """ - return (ocs.ERROR, 'No implementation of abort() for operation "%s"' % op_name, {}) + return self._stop_helper('abort', op_name, params) def status(self, op_name, params=None): """ @@ -841,15 +897,21 @@ def status(self, op_name, params=None): class AgentTask: - def __init__(self, launcher, blocking=None): + def __init__(self, launcher, blocking=None, aborter=None, + aborter_blocking=None): self.launcher = launcher self.blocking = blocking + self.aborter = aborter + if aborter_blocking is None: + aborter_blocking = blocking + self.aborter_blocking = aborter_blocking self.docstring = launcher.__doc__ def encoded(self): """Dict of static info for API self-description.""" return { 'blocking': self.blocking, + 'abortable': (self.aborter is not None), 'docstring': self.docstring, 'op_type': 'task', } From 8140836bddc96d1918a8033c1f2056122824bbf4 Mon Sep 17 00:00:00 2001 From: Matthew Hasselfield Date: Tue, 5 Jul 2022 13:09:01 -0400 Subject: [PATCH 5/9] FakeDataAgent: make delay_task abortable. --- ocs/agents/fake_data/agent.py | 16 +++++++++++++--- tests/agents/test_fakedata.py | 9 +++++++++ 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/ocs/agents/fake_data/agent.py b/ocs/agents/fake_data/agent.py index d9114096..d1d23a78 100644 --- a/ocs/agents/fake_data/agent.py +++ b/ocs/agents/fake_data/agent.py @@ -23,7 +23,6 @@ def __init__(self, agent, self.log = agent.log self.lock = threading.Semaphore() self.job = None - self._counter_running = None self.channel_names = ['channel_%02i' % i for i in range(num_channels)] self.sample_rate = max(1e-6, sample_rate) # #nozeros @@ -235,14 +234,24 @@ def delay_task(self, session, params): 'delay_so_far': 0} session.set_status('running') t0 = time.time() - while True: + while session.status == 'running': session.data['delay_so_far'] = time.time() - t0 sleep_time = min(0.5, delay - session.data['delay_so_far']) if sleep_time < 0: break yield dsleep(sleep_time) + + if session.status != 'running': + return False, 'Aborted after %.1f seconds' % session.data['delay_so_far'] + return succeed, 'Exited after %.1f seconds' % session.data['delay_so_far'] + @inlineCallbacks + def _abort_delay_task(self, session, params): + if session.status == 'running': + session.set_status('stopping') + yield + def add_agent_args(parser_in=None): if parser_in is None: @@ -284,7 +293,8 @@ def main(args=None): agent.register_process('count', fdata.count_seconds, fdata._stop_count_seconds, blocking=False, startup=startup) agent.register_task('set_heartbeat', fdata.set_heartbeat) - agent.register_task('delay_task', fdata.delay_task, blocking=False) + agent.register_task('delay_task', fdata.delay_task, blocking=False, + aborter=fdata._abort_delay_task) runner.run(agent, auto_reconnect=True) diff --git a/tests/agents/test_fakedata.py b/tests/agents/test_fakedata.py index 4ea43530..cd05abcd 100644 --- a/tests/agents/test_fakedata.py +++ b/tests/agents/test_fakedata.py @@ -54,6 +54,15 @@ def test_fake_data_delay_task(agent): res = yield agent.delay_task(session, params=params) assert res[0] is True +@pytest_twisted.inlineCallbacks +def test_fake_data_delay_task_abort(agent): + session = create_session('delay_task') + params = {'delay': 0.001, 'succeed': True} + D1 = agent.delay_task(session, params=params) + D2 = yield agent._abort_delay_task(session, params=None) + res = yield D1 + assert res[0] is False + def test_fake_data_try_set_job_running_job(agent): # set running job to 'acq' From 276d3540fb44addd40df517291057a0d94d7d044 Mon Sep 17 00:00:00 2001 From: Matthew Hasselfield Date: Tue, 5 Jul 2022 16:37:58 -0400 Subject: [PATCH 6/9] OCSAgent: log the completion of stop/abort In addition to errback there is now also a callback to print something to the log on success. The errback had a typo so it probably never worked in the past. --- ocs/ocs_agent.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/ocs/ocs_agent.py b/ocs/ocs_agent.py index 18294f19..5a2d2bc9 100644 --- a/ocs/ocs_agent.py +++ b/ocs/ocs_agent.py @@ -809,9 +809,6 @@ def _stop_helper(self, stop_type, op_name, params): return (ocs.ERROR, f'Cannot "{stop_type}" "{op_name}" because ' 'it is a "{op_type}".', {}) - def _errback(self, *args, **kw): - print(f'Error calling stopper for "{op_name}"; args:', - args, kw) session = self.sessions.get(op_name) if session is None: return (ocs.ERROR, 'No session active.', {}) @@ -819,10 +816,22 @@ def _errback(self, *args, **kw): if session.status in ['stopping', 'done']: return (ocs.ERROR, f'The operation is already {session.status}', {}) + # Use callback/errback to print message to logs. + def _callback(*args, **kw): + try: + ok, msg = args + except: + ok, msg = True, str(args) + print(f'Stopper for "{op_name}" terminated with ok={ok} and ' + f'message {msg}') + def _errback(*args, **kw): + print(f'Error calling stopper for "{op_name}"; args:', + args, kw) + if stopper_blocking: # Launch the code in a thread. d2 = threads.deferToThread(stopper, session, params) - d2.addErrback(_errback) + d2.addCallback(_callback).addErrback(_errback) else: # Assume the stopper returns a deferred (and will soon run # in the reactor). @@ -837,7 +846,7 @@ def _errback(self, *args, **kw): print(f'WARNING: {op_type} {op_name} needs to be ' 'registered with stopper_blocking=True.') else: - d2.addErrback(_errback) + d2.addCallback(_callback).addErrback(_errback) return (ocs.OK, f'Requested {stop_type} on {op_type} {op_name}".', session.encoded()) From 5ce46264bccb838077502c41d3a326ac6d50863f Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Fri, 14 Oct 2022 15:08:04 -0400 Subject: [PATCH 7/9] Address review comments --- ocs/agents/fake_data/agent.py | 2 +- ocs/ocs_agent.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ocs/agents/fake_data/agent.py b/ocs/agents/fake_data/agent.py index d1d23a78..a1d22127 100644 --- a/ocs/agents/fake_data/agent.py +++ b/ocs/agents/fake_data/agent.py @@ -7,7 +7,7 @@ from os import environ import numpy as np from autobahn.wamp.exception import ApplicationError -from twisted.internet.defer import inlineCallbacks, returnValue +from twisted.internet.defer import inlineCallbacks from autobahn.twisted.util import sleep as dsleep # For logging diff --git a/ocs/ocs_agent.py b/ocs/ocs_agent.py index 5a2d2bc9..9f46e5e0 100644 --- a/ocs/ocs_agent.py +++ b/ocs/ocs_agent.py @@ -383,7 +383,7 @@ def register_task(self, name, func, aborter=None, blocking=True, launched in a worker thread, rather than running in the main reactor thread. aborter_blocking(bool or None): Indicates that ``aborter`` - should be run in a worked thread, rather than running + should be run in a worker thread, rather than running in the main reactor thread. Defaults to value of ``blocking``. startup (bool or dict): Controls if and how the Operation From d36c161c3abdf2f3fb065eef60ecccdf6b392099 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Fri, 14 Oct 2022 15:07:04 -0400 Subject: [PATCH 8/9] docs: Add abort task section to agent guide --- docs/developer/writing_an_agent/task.rst | 53 ++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/docs/developer/writing_an_agent/task.rst b/docs/developer/writing_an_agent/task.rst index a789bfa1..ca97ee08 100644 --- a/docs/developer/writing_an_agent/task.rst +++ b/docs/developer/writing_an_agent/task.rst @@ -81,6 +81,59 @@ the Task with crossbar. agent.register_task('print', barebone.print) +Aborting a Task +``````````````` +'print' is a very short Task that runs very quickly, however if we have a long +running Task, we might need the ability to stop it before it would normally +complete. OCS supports aborting a Task, however this mechanism needs to be +implemented within the Agent code. This will require adding an aborter +function, which typically will look like this: + +.. code-block:: python + + def _abort_print(self, session, params): + if session.status == 'running': + session.set_status('stopping') + +Within the Task function, at points that are reasonable to request an abort, +you must add a check of the ``session.status`` that then exits the Task if the +status is no longer running. For example: + +.. code-block:: python + + if session.status != 'running': + return False, 'Aborted print' + +Where you insert this interrupt code will vary from Agent to Agent. Tasks that +run quickly do not need an abort to be implemented at all. However, for long +running Tasks abort should be implemented. + +When registering the Task, the aborter must be specified: + +.. code-block:: python + + agent.register_task('print', barebone.print, aborter=barebone._abort_print) + +.. note:: + + By default the aborter will run in the same threading pattern as the task. + If your Task runs in the main reactor (i.e. is decorated with + ``@inlineCallbacks``), then the aborter should also run in the reactor, and so + needs to ``yield`` at the end of the method. In our example this would look + like: + + .. code-block:: python + + @inlineCallbacks + def _abort_print(self, session, params): + if session.status == 'running': + session.set_status('stopping') + yield + +Again, since 'print' runs quickly, we do not implement an aborter for it here. +For an example of an abortable task, see +:func:`ocs.agents.fake_data.agent.FakeDataAgent.delay_task`. + Agent Code `````````` From 09fed8be5cefc5613218ee0caa730e5533d0f5a0 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Mon, 17 Oct 2022 10:31:39 -0400 Subject: [PATCH 9/9] docs: Address comments on abort docs --- docs/developer/writing_an_agent/process.rst | 2 ++ docs/developer/writing_an_agent/task.rst | 8 +++++--- ocs/agents/fake_data/agent.py | 3 ++- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/docs/developer/writing_an_agent/process.rst b/docs/developer/writing_an_agent/process.rst index e13f416a..854c1460 100644 --- a/docs/developer/writing_an_agent/process.rst +++ b/docs/developer/writing_an_agent/process.rst @@ -1,3 +1,5 @@ +.. _adding_a_process: + Adding a Process ---------------- diff --git a/docs/developer/writing_an_agent/task.rst b/docs/developer/writing_an_agent/task.rst index ca97ee08..d5cb48b7 100644 --- a/docs/developer/writing_an_agent/task.rst +++ b/docs/developer/writing_an_agent/task.rst @@ -96,8 +96,8 @@ function, which typically will look like this: session.set_status('stopping') Within the Task function, at points that are reasonable to request an abort, -you must add a check of the ``session.status`` that then exits the Task if the -status is no longer running. For example: +you must add a check of the ``session.status`` that then exits the Task with an +error (i.e. returns ``False``) if the status is no longer running. For example: .. code-block:: python @@ -106,7 +106,9 @@ status is no longer running. For example: Where you insert this interrupt code will vary from Agent to Agent. Tasks that run quickly do not need an abort to be implemented at all. However, for long -running Tasks abort should be implemented. +running Tasks abort should be implemented. (We will see this interruption +implementation again in the next step where we discuss +:ref:`adding_a_process`.) When registering the Task, the aborter must be specified: diff --git a/ocs/agents/fake_data/agent.py b/ocs/agents/fake_data/agent.py index a1d22127..730acef0 100644 --- a/ocs/agents/fake_data/agent.py +++ b/ocs/agents/fake_data/agent.py @@ -207,7 +207,8 @@ def set_heartbeat(self, session, params): def delay_task(self, session, params): """delay_task(delay=5, succeed=True) - **Task** - Sleep (delay) for the requested number of seconds. + **Task** (abortable) - Sleep (delay) for the requested number of + seconds. This can run simultaneously with the acq Process. This Task should run in the reactor thread.