Skip to content

Commit

Permalink
Wait for transcoder and packager to finish
Browse files Browse the repository at this point in the history
Instead of forcefully shutting down all nodes once any one node
completes, let certain nodes (TranscoderNode and PackagerNode) wait
politely for their subprocesses to finish.  The forceful shutdown will
still occur if one of the other nodes stops in the "Errored" state.

Closes #32

Change-Id: Ica65a9b1c3d0047e6bb2fecd9d2734d1a8d52579
  • Loading branch information
joeyparrish committed Oct 17, 2019
1 parent 75d8466 commit 6f66126
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- #30: Fixed cloud upload for VOD
- #29: Added webcam support on macOS
- Make common errors easier to read
- #32: Fix early shutdown and missing files


## 0.2.0 (2019-10-14)
Expand Down
3 changes: 2 additions & 1 deletion streamer/controller_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,9 @@ def check_status(self):

def stop(self):
"""Stop all nodes."""
status = self.check_status()
for node in self._nodes:
node.stop()
node.stop(status)
self._nodes = []

def is_vod(self):
Expand Down
24 changes: 21 additions & 3 deletions streamer/node_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import sys
import threading
import time
import traceback


class ProcessStatus(enum.Enum):
Expand All @@ -47,7 +48,7 @@ def __init__(self):
def __del__(self):
# If the process isn't stopped by now, stop it here. It is preferable to
# explicitly call stop().
self.stop()
self.stop(None)

@abc.abstractmethod
def start(self):
Expand Down Expand Up @@ -109,7 +110,7 @@ def check_status(self):
else:
return ProcessStatus.Errored

def stop(self):
def stop(self, status):
"""Stop the subprocess if it's still running."""
if self._process:
# Slightly more polite than kill. Try this first.
Expand All @@ -127,6 +128,23 @@ def stop(self):
# this, it can create a zombie process.
self._process.wait()

class PolitelyWaitOnFinishMixin(object):
"""A mixin that makes stop() wait for the subprocess if status is Finished.
This is as opposed to the base class behavior, in which stop() forces
the subprocesses of a node to terminate.
"""

def stop(self, status):
if status == ProcessStatus.Finished:
try:
print('Waiting for', self.__class__.__name__)
self._process.wait(timeout=300) # 5m timeout
except subprocess.TimeoutExpired:
traceback.print_exc() # print the exception
# Fall through.

super().stop(status)

class ThreadedNodeBase(NodeBase):
"""A base class for nodes that run a thread.
Expand Down Expand Up @@ -175,7 +193,7 @@ def start(self):
self._status = ProcessStatus.Running
self._thread.start()

def stop(self):
def stop(self, status):
self._status = ProcessStatus.Finished
self._thread.join()

Expand Down
2 changes: 1 addition & 1 deletion streamer/packager_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class SegmentError(Exception):
pass


class PackagerNode(node_base.NodeBase):
class PackagerNode(node_base.PolitelyWaitOnFinishMixin, node_base.NodeBase):

def __init__(self, pipeline_config, output_dir, output_streams):
super().__init__()
Expand Down
2 changes: 1 addition & 1 deletion streamer/transcoder_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
StreamingMode = pipeline_configuration.StreamingMode


class TranscoderNode(node_base.NodeBase):
class TranscoderNode(node_base.PolitelyWaitOnFinishMixin, node_base.NodeBase):

def __init__(self, input_config, pipeline_config, outputs):
super().__init__()
Expand Down

0 comments on commit 6f66126

Please sign in to comment.