From 6f661261beb375464d5487b53c28edff976f9aa8 Mon Sep 17 00:00:00 2001 From: Joey Parrish Date: Wed, 16 Oct 2019 15:11:48 -0700 Subject: [PATCH] Wait for transcoder and packager to finish Instead of forcefully shutting down all nodes once any one node completes, let certain nodes (TranscoderNode and PackagerNode) wait politely for their subprocesses to finish. The forceful shutdown will still occur if one of the other nodes stops in the "Errored" state. Closes #32 Change-Id: Ica65a9b1c3d0047e6bb2fecd9d2734d1a8d52579 --- CHANGELOG.md | 1 + streamer/controller_node.py | 3 ++- streamer/node_base.py | 24 +++++++++++++++++++++--- streamer/packager_node.py | 2 +- streamer/transcoder_node.py | 2 +- 5 files changed, 26 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index acd49ae..46f2744 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ - #30: Fixed cloud upload for VOD - #29: Added webcam support on macOS - Make common errors easier to read + - #32: Fix early shutdown and missing files ## 0.2.0 (2019-10-14) diff --git a/streamer/controller_node.py b/streamer/controller_node.py index 9ac4912..8fb52a2 100644 --- a/streamer/controller_node.py +++ b/streamer/controller_node.py @@ -214,8 +214,9 @@ def check_status(self): def stop(self): """Stop all nodes.""" + status = self.check_status() for node in self._nodes: - node.stop() + node.stop(status) self._nodes = [] def is_vod(self): diff --git a/streamer/node_base.py b/streamer/node_base.py index faadda7..e900368 100644 --- a/streamer/node_base.py +++ b/streamer/node_base.py @@ -22,6 +22,7 @@ import sys import threading import time +import traceback class ProcessStatus(enum.Enum): @@ -47,7 +48,7 @@ def __init__(self): def __del__(self): # If the process isn't stopped by now, stop it here. It is preferable to # explicitly call stop(). - self.stop() + self.stop(None) @abc.abstractmethod def start(self): @@ -109,7 +110,7 @@ def check_status(self): else: return ProcessStatus.Errored - def stop(self): + def stop(self, status): """Stop the subprocess if it's still running.""" if self._process: # Slightly more polite than kill. Try this first. @@ -127,6 +128,23 @@ def stop(self): # this, it can create a zombie process. self._process.wait() +class PolitelyWaitOnFinishMixin(object): + """A mixin that makes stop() wait for the subprocess if status is Finished. + + This is as opposed to the base class behavior, in which stop() forces + the subprocesses of a node to terminate. + """ + + def stop(self, status): + if status == ProcessStatus.Finished: + try: + print('Waiting for', self.__class__.__name__) + self._process.wait(timeout=300) # 5m timeout + except subprocess.TimeoutExpired: + traceback.print_exc() # print the exception + # Fall through. + + super().stop(status) class ThreadedNodeBase(NodeBase): """A base class for nodes that run a thread. @@ -175,7 +193,7 @@ def start(self): self._status = ProcessStatus.Running self._thread.start() - def stop(self): + def stop(self, status): self._status = ProcessStatus.Finished self._thread.join() diff --git a/streamer/packager_node.py b/streamer/packager_node.py index 9ebc433..73db68b 100644 --- a/streamer/packager_node.py +++ b/streamer/packager_node.py @@ -51,7 +51,7 @@ class SegmentError(Exception): pass -class PackagerNode(node_base.NodeBase): +class PackagerNode(node_base.PolitelyWaitOnFinishMixin, node_base.NodeBase): def __init__(self, pipeline_config, output_dir, output_streams): super().__init__() diff --git a/streamer/transcoder_node.py b/streamer/transcoder_node.py index 90d52dd..a818038 100644 --- a/streamer/transcoder_node.py +++ b/streamer/transcoder_node.py @@ -29,7 +29,7 @@ StreamingMode = pipeline_configuration.StreamingMode -class TranscoderNode(node_base.NodeBase): +class TranscoderNode(node_base.PolitelyWaitOnFinishMixin, node_base.NodeBase): def __init__(self, input_config, pipeline_config, outputs): super().__init__()