diff --git a/supervisor/rpcinterface.py b/supervisor/rpcinterface.py index b76d841dd..0cb402c00 100644 --- a/supervisor/rpcinterface.py +++ b/supervisor/rpcinterface.py @@ -2,6 +2,7 @@ import time import datetime import errno +import types from supervisor.compat import as_string from supervisor.compat import unicode @@ -27,7 +28,10 @@ from supervisor.states import getSupervisorStateDescription from supervisor.states import ProcessStates from supervisor.states import getProcessStateDescription -from supervisor.states import RUNNING_STATES +from supervisor.states import ( + RUNNING_STATES, + STOPPED_STATES, + ) API_VERSION = '3.0' @@ -273,49 +277,56 @@ def startProcess(self, name, wait=True): except (NotExecutable, NoPermission) as why: raise RPCError(Faults.NOT_EXECUTABLE, why.args[0]) - started = [] + if process.get_state() in RUNNING_STATES: + raise RPCError(Faults.ALREADY_STARTED, name) - startsecs = process.config.startsecs + process.spawn() - def startit(): - if not started: + # We call reap() in order to more quickly obtain the side effects of + # process.finish(), which reap() eventually ends up calling. This + # might be the case if the spawn() was successful but then the process + # died before its startsecs elapsed or it exited with an unexpected + # exit code. In particular, finish() may set spawnerr, which we can + # check and immediately raise an RPCError, avoiding the need to + # defer by returning a callback. - if process.get_state() in RUNNING_STATES: - raise RPCError(Faults.ALREADY_STARTED, name) + self.supervisord.reap() - process.spawn() + if process.spawnerr: + raise RPCError(Faults.SPAWN_ERROR, name) + # We call process.transition() in order to more quickly obtain its + # side effects. In particular, it might set the process' state from + # STARTING->RUNNING if the process has a startsecs==0. + process.transition() + + if wait and process.get_state() != ProcessStates.RUNNING: + # by default, this branch will almost always be hit for processes + # with default startsecs configurations, because the default number + # of startsecs for a process is "1", and the process will not have + # entered the RUNNING state yet even though we've called + # transition() on it. This is because a process is not considered + # RUNNING until it has stayed up > startsecs. + + def onwait(): if process.spawnerr: raise RPCError(Faults.SPAWN_ERROR, name) - # we use a list here to fake out lexical scoping; - # using a direct assignment to 'started' in the - # function appears to not work (symptom: 2nd or 3rd - # call through, it forgets about 'started', claiming - # it's undeclared). - started.append(time.time()) + state = process.get_state() - if not wait or not startsecs: - return True + if state not in (ProcessStates.STARTING, ProcessStates.RUNNING): + raise RPCError(Faults.ABNORMAL_TERMINATION, name) - t = time.time() - runtime = (t - started[0]) - state = process.get_state() - - if state not in (ProcessStates.STARTING, ProcessStates.RUNNING): - raise RPCError(Faults.ABNORMAL_TERMINATION, name) + if state == ProcessStates.RUNNING: + return True - if runtime < startsecs: return NOT_DONE_YET - if state == ProcessStates.RUNNING: - return True - - raise RPCError(Faults.ABNORMAL_TERMINATION, name) + onwait.delay = 0.05 + onwait.rpcinterface = self + return onwait # deferred - startit.delay = 0.05 - startit.rpcinterface = self - return startit # deferred + return True def startProcessGroup(self, name, wait=True): """ Start all processes in the group named 'name' @@ -373,36 +384,43 @@ def stopProcess(self, name, wait=True): group_name, process_name = split_namespec(name) return self.stopProcessGroup(group_name, wait) - stopped = [] - called = [] - - def killit(): - if not called: - if process.get_state() not in RUNNING_STATES: - raise RPCError(Faults.NOT_RUNNING) - # use a mutable for lexical scoping; see startProcess - called.append(1) - - if not stopped: - msg = process.stop() - if msg is not None: - raise RPCError(Faults.FAILED, msg) - stopped.append(1) - - if wait: + if process.get_state() not in RUNNING_STATES: + raise RPCError(Faults.NOT_RUNNING) + + msg = process.stop() + if msg is not None: + raise RPCError(Faults.FAILED, msg) + + # We'll try to reap any killed child. FWIW, reap calls waitpid, and + # then, if waitpid returns a pid, calls finish() on the process with + # that pid, which drains any I/O from the process' dispatchers and + # changes the process' state. I chose to call reap without once=True + # because we don't really care if we reap more than one child. Even if + # we only reap one child. we may not even be reaping the child that we + # just stopped (this is all async, and process.stop() may not work, and + # we'll need to wait for SIGKILL during process.transition() as the + # result of normal select looping). + + self.supervisord.reap() + + if wait and process.get_state() not in STOPPED_STATES: + + def onwait(): + # process will eventually enter a stopped state by + # virtue of the supervisord.reap() method being called + # during normal operations + self.supervisord.options.logger.info( + 'waiting for %s to stop' % process.config.name + ) + if process.get_state() not in STOPPED_STATES: return NOT_DONE_YET - else: - return True - - if process.get_state() not in (ProcessStates.STOPPED, - ProcessStates.EXITED): - return NOT_DONE_YET - else: return True - killit.delay = 0.2 - killit.rpcinterface = self - return killit # deferred + onwait.delay = 0 + onwait.rpcinterface = self + return onwait # deferred + + return True def stopProcessGroup(self, name, wait=True): """ Stop all processes in the process group named 'name' @@ -792,6 +810,7 @@ def allfunc( callbacks=callbacks, # used only to fool scoping, never passed by caller results=results, # used only to fool scoping, never passed by caller ): + if not callbacks: for group, process in processes: @@ -799,39 +818,47 @@ def allfunc( if predicate(process): try: callback = func(name, **extra_kwargs) - callbacks.append((group, process, callback)) except RPCError as e: results.append({'name':process.config.name, 'group':group.config.name, 'status':e.code, 'description':e.text}) continue + if isinstance(callback, types.FunctionType): + callbacks.append((group, process, callback)) + else: + results.append( + {'name':process.config.name, + 'group':group.config.name, + 'status':Faults.SUCCESS, + 'description':'OK'} + ) if not callbacks: return results - group, process, callback = callbacks.pop(0) + for struct in callbacks[:]: - try: - value = callback() - except RPCError as e: - results.append( - {'name':process.config.name, - 'group':group.config.name, - 'status':e.code, - 'description':e.text}) - return NOT_DONE_YET + group, process, cb = struct - if value is NOT_DONE_YET: - # push it back into the queue; it will finish eventually - callbacks.append((group, process, callback)) - else: - results.append( - {'name':process.config.name, - 'group':group.config.name, - 'status':Faults.SUCCESS, - 'description':'OK'} - ) + try: + value = cb() + except RPCError as e: + results.append( + {'name':process.config.name, + 'group':group.config.name, + 'status':e.code, + 'description':e.text}) + value = None + + if value is not NOT_DONE_YET: + results.append( + {'name':process.config.name, + 'group':group.config.name, + 'status':Faults.SUCCESS, + 'description':'OK'} + ) + callbacks.remove(struct) if callbacks: return NOT_DONE_YET diff --git a/supervisor/tests/base.py b/supervisor/tests/base.py index c6321765b..1ecdd6895 100644 --- a/supervisor/tests/base.py +++ b/supervisor/tests/base.py @@ -1022,6 +1022,9 @@ def set_procattr(self, process_name, attr_name, val, group_name=None): process = self.process_groups[group_name].processes[process_name] setattr(process, attr_name, val) + def reap(self): + self.reaped = True + class DummyDispatcher: write_event_handled = False read_event_handled = False diff --git a/supervisor/tests/test_rpcinterfaces.py b/supervisor/tests/test_rpcinterfaces.py index a0f479f4e..1f680ede1 100644 --- a/supervisor/tests/test_rpcinterfaces.py +++ b/supervisor/tests/test_rpcinterfaces.py @@ -316,9 +316,10 @@ def test_startProcess_already_started(self): supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) supervisord.set_procattr('foo', 'pid', 10) interface = self._makeOne(supervisord) - callback = interface.startProcess('foo') - self._assertRPCError(xmlrpc.Faults.ALREADY_STARTED, - callback) + self._assertRPCError( + xmlrpc.Faults.ALREADY_STARTED, + interface.startProcess, 'foo' + ) def test_startProcess_bad_group_name(self): options = DummyOptions() @@ -372,11 +373,13 @@ def test_startProcess_spawnerr(self): process = supervisord.process_groups['foo'].processes['foo'] process.spawnerr = 'abc' interface = self._makeOne(supervisord) - callback = interface.startProcess('foo') - self._assertRPCError(xmlrpc.Faults.SPAWN_ERROR, callback) + self._assertRPCError( + xmlrpc.Faults.SPAWN_ERROR, + interface.startProcess, + 'foo' + ) def test_startProcess(self): - from supervisor import http options = DummyOptions() pconfig = DummyPConfig(options, 'foo', __file__, autostart=False, startsecs=.01) @@ -384,14 +387,11 @@ def test_startProcess(self): supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) supervisord.set_procattr('foo', 'state', ProcessStates.STOPPED) interface = self._makeOne(supervisord) - callback = interface.startProcess('foo') - self.assertEqual(callback(), http.NOT_DONE_YET) + result = interface.startProcess('foo') process = supervisord.process_groups['foo'].processes['foo'] self.assertEqual(process.spawned, True) self.assertEqual(interface.update_text, 'startProcess') process.state = ProcessStates.RUNNING - time.sleep(.02) - result = callback() self.assertEqual(result, True) def test_startProcess_nowait(self):