Skip to content

Commit

Permalink
work towards faster start and stop (#131)
Browse files Browse the repository at this point in the history
Conflicts:
	supervisor/rpcinterface.py
  • Loading branch information
mcdonc authored and mnaberez committed Sep 8, 2014
1 parent dad9f8d commit c5d5cad
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 88 deletions.
183 changes: 105 additions & 78 deletions supervisor/rpcinterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import time
import datetime
import errno
import types

from supervisor.options import readFile
from supervisor.options import tailFile
Expand All @@ -23,7 +24,10 @@
from supervisor.states import getSupervisorStateDescription
from supervisor.states import ProcessStates
from supervisor.states import getProcessStateDescription
from supervisor.states import RUNNING_STATES
from supervisor.states import (
RUNNING_STATES,
STOPPED_STATES,
)

API_VERSION = '3.0'

Expand Down Expand Up @@ -269,49 +273,56 @@ def startProcess(self, name, wait=True):
except (NotExecutable, NoPermission), why:
raise RPCError(Faults.NOT_EXECUTABLE, why.args[0])

started = []
if process.get_state() in RUNNING_STATES:
raise RPCError(Faults.ALREADY_STARTED, name)

startsecs = process.config.startsecs
process.spawn()

def startit():
if not started:
# We call reap() in order to more quickly obtain the side effects of
# process.finish(), which reap() eventually ends up calling. This
# might be the case if the spawn() was successful but then the process
# died before its startsecs elapsed or it exited with an unexpected
# exit code. In particular, finish() may set spawnerr, which we can
# check and immediately raise an RPCError, avoiding the need to
# defer by returning a callback.

if process.get_state() in RUNNING_STATES:
raise RPCError(Faults.ALREADY_STARTED, name)
self.supervisord.reap()

process.spawn()
if process.spawnerr:
raise RPCError(Faults.SPAWN_ERROR, name)

# We call process.transition() in order to more quickly obtain its
# side effects. In particular, it might set the process' state from
# STARTING->RUNNING if the process has a startsecs==0.
process.transition()

if wait and process.get_state() != ProcessStates.RUNNING:
# by default, this branch will almost always be hit for processes
# with default startsecs configurations, because the default number
# of startsecs for a process is "1", and the process will not have
# entered the RUNNING state yet even though we've called
# transition() on it. This is because a process is not considered
# RUNNING until it has stayed up > startsecs.

def onwait():
if process.spawnerr:
raise RPCError(Faults.SPAWN_ERROR, name)

# we use a list here to fake out lexical scoping;
# using a direct assignment to 'started' in the
# function appears to not work (symptom: 2nd or 3rd
# call through, it forgets about 'started', claiming
# it's undeclared).
started.append(time.time())
state = process.get_state()

if not wait or not startsecs:
return True
if state not in (ProcessStates.STARTING, ProcessStates.RUNNING):
raise RPCError(Faults.ABNORMAL_TERMINATION, name)

t = time.time()
runtime = (t - started[0])
state = process.get_state()

if state not in (ProcessStates.STARTING, ProcessStates.RUNNING):
raise RPCError(Faults.ABNORMAL_TERMINATION, name)
if state == ProcessStates.RUNNING:
return True

if runtime < startsecs:
return NOT_DONE_YET

if state == ProcessStates.RUNNING:
return True

raise RPCError(Faults.ABNORMAL_TERMINATION, name)
onwait.delay = 0.05
onwait.rpcinterface = self
return onwait # deferred

startit.delay = 0.05
startit.rpcinterface = self
return startit # deferred
return True

def startProcessGroup(self, name, wait=True):
""" Start all processes in the group named 'name'
Expand Down Expand Up @@ -369,36 +380,43 @@ def stopProcess(self, name, wait=True):
group_name, process_name = split_namespec(name)
return self.stopProcessGroup(group_name, wait)

stopped = []
called = []

def killit():
if not called:
if process.get_state() not in RUNNING_STATES:
raise RPCError(Faults.NOT_RUNNING)
# use a mutable for lexical scoping; see startProcess
called.append(1)

if not stopped:
msg = process.stop()
if msg is not None:
raise RPCError(Faults.FAILED, msg)
stopped.append(1)

if wait:
if process.get_state() not in RUNNING_STATES:
raise RPCError(Faults.NOT_RUNNING)

msg = process.stop()
if msg is not None:
raise RPCError(Faults.FAILED, msg)

# We'll try to reap any killed child. FWIW, reap calls waitpid, and
# then, if waitpid returns a pid, calls finish() on the process with
# that pid, which drains any I/O from the process' dispatchers and
# changes the process' state. I chose to call reap without once=True
# because we don't really care if we reap more than one child. Even if
# we only reap one child. we may not even be reaping the child that we
# just stopped (this is all async, and process.stop() may not work, and
# we'll need to wait for SIGKILL during process.transition() as the
# result of normal select looping).

self.supervisord.reap()

if wait and process.get_state() not in STOPPED_STATES:

def onwait():
# process will eventually enter a stopped state by
# virtue of the supervisord.reap() method being called
# during normal operations
self.supervisord.options.logger.info(
'waiting for %s to stop' % process.config.name
)
if process.get_state() not in STOPPED_STATES:
return NOT_DONE_YET
else:
return True

if process.get_state() not in (ProcessStates.STOPPED,
ProcessStates.EXITED):
return NOT_DONE_YET
else:
return True

killit.delay = 0.2
killit.rpcinterface = self
return killit # deferred
onwait.delay = 0
onwait.rpcinterface = self
return onwait # deferred

return True

def stopProcessGroup(self, name, wait=True):
""" Stop all processes in the process group named 'name'
Expand Down Expand Up @@ -794,46 +812,55 @@ def allfunc(
callbacks=callbacks, # used only to fool scoping, never passed by caller
results=results, # used only to fool scoping, never passed by caller
):

if not callbacks:

for group, process in processes:
name = make_namespec(group.config.name, process.config.name)
if predicate(process):
try:
callback = func(name, **extra_kwargs)
callbacks.append((group, process, callback))
except RPCError, e:
results.append({'name':process.config.name,
'group':group.config.name,
'status':e.code,
'description':e.text})
continue
if isinstance(callback, types.FunctionType):
callbacks.append((group, process, callback))
else:
results.append(
{'name':process.config.name,
'group':group.config.name,
'status':Faults.SUCCESS,
'description':'OK'}
)

if not callbacks:
return results

group, process, callback = callbacks.pop(0)
for struct in callbacks[:]:

try:
value = callback()
except RPCError, e:
results.append(
{'name':process.config.name,
'group':group.config.name,
'status':e.code,
'description':e.text})
return NOT_DONE_YET
group, process, cb = struct

if value is NOT_DONE_YET:
# push it back into the queue; it will finish eventually
callbacks.append((group, process, callback))
else:
results.append(
{'name':process.config.name,
'group':group.config.name,
'status':Faults.SUCCESS,
'description':'OK'}
)
try:
value = cb()
except RPCError, e:
results.append(
{'name':process.config.name,
'group':group.config.name,
'status':e.code,
'description':e.text})
value = None

if value is not NOT_DONE_YET:
results.append(
{'name':process.config.name,
'group':group.config.name,
'status':Faults.SUCCESS,
'description':'OK'}
)
callbacks.remove(struct)

if callbacks:
return NOT_DONE_YET
Expand Down
3 changes: 3 additions & 0 deletions supervisor/tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,9 @@ def set_procattr(self, process_name, attr_name, val, group_name=None):
process = self.process_groups[group_name].processes[process_name]
setattr(process, attr_name, val)

def reap(self):
self.reaped = True

class DummyDispatcher:
write_event_handled = False
read_event_handled = False
Expand Down
20 changes: 10 additions & 10 deletions supervisor/tests/test_rpcinterfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,10 @@ def test_startProcess_already_started(self):
supervisord = PopulatedDummySupervisor(options, 'foo', pconfig)
supervisord.set_procattr('foo', 'pid', 10)
interface = self._makeOne(supervisord)
callback = interface.startProcess('foo')
self._assertRPCError(xmlrpc.Faults.ALREADY_STARTED,
callback)
self._assertRPCError(
xmlrpc.Faults.ALREADY_STARTED,
interface.startProcess, 'foo'
)

def test_startProcess_bad_group_name(self):
options = DummyOptions()
Expand Down Expand Up @@ -371,26 +372,25 @@ def test_startProcess_spawnerr(self):
process = supervisord.process_groups['foo'].processes['foo']
process.spawnerr = 'abc'
interface = self._makeOne(supervisord)
callback = interface.startProcess('foo')
self._assertRPCError(xmlrpc.Faults.SPAWN_ERROR, callback)
self._assertRPCError(
xmlrpc.Faults.SPAWN_ERROR,
interface.startProcess,
'foo'
)

def test_startProcess(self):
from supervisor import http
options = DummyOptions()
pconfig = DummyPConfig(options, 'foo', __file__, autostart=False,
startsecs=.01)
from supervisor.process import ProcessStates
supervisord = PopulatedDummySupervisor(options, 'foo', pconfig)
supervisord.set_procattr('foo', 'state', ProcessStates.STOPPED)
interface = self._makeOne(supervisord)
callback = interface.startProcess('foo')
self.assertEqual(callback(), http.NOT_DONE_YET)
result = interface.startProcess('foo')
process = supervisord.process_groups['foo'].processes['foo']
self.assertEqual(process.spawned, True)
self.assertEqual(interface.update_text, 'startProcess')
process.state = ProcessStates.RUNNING
time.sleep(.02)
result = callback()
self.assertEqual(result, True)

def test_startProcess_nowait(self):
Expand Down

0 comments on commit c5d5cad

Please sign in to comment.