From 9a7e0ec1d5e0b5c3b67b2df97c68c0e43e0d86b7 Mon Sep 17 00:00:00 2001 From: mariocynicys Date: Wed, 4 Aug 2021 22:47:43 +0200 Subject: [PATCH] feat: Windows support (#85) Internally refactored to use Pipe objects instead of paths. The Pipe objects support Windows using an artificial FIFO, and every other platform using os.mkfifo. Closes #8 --- .gitignore | 3 +- run_end_to_end_tests.py | 12 ++- setup.py | 2 + streamer/controller_node.py | 56 ++++--------- streamer/input_configuration.py | 19 +---- streamer/output_stream.py | 80 ++++++++++++------ streamer/packager_node.py | 35 ++------ streamer/pipe.py | 144 ++++++++++++++++++++++++++++++++ streamer/transcoder_node.py | 10 +-- 9 files changed, 244 insertions(+), 117 deletions(-) create mode 100644 streamer/pipe.py diff --git a/.gitignore b/.gitignore index f57df4f..3646b1e 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,5 @@ shaka_streamer.egg-info/ .idea/ venv/ dev/ -.vscode/ \ No newline at end of file +.vscode/ +packager.exe \ No newline at end of file diff --git a/run_end_to_end_tests.py b/run_end_to_end_tests.py index f72b01e..b1ae16d 100755 --- a/run_end_to_end_tests.py +++ b/run_end_to_end_tests.py @@ -125,7 +125,9 @@ def hlsStreamsReady(master_playlist_path): return False for playlist_path in playlist_list: - if playlist_path == master_playlist_path: + # Use os.path.samefile method instead of the == operator because + # this might be a windows machine. + if os.path.samefile(playlist_path, master_playlist_path): # Skip the master playlist continue @@ -180,6 +182,7 @@ def start(): elif isinstance(e, RuntimeError): body = json.dumps({ 'error_type': 'RuntimeError', + 'message': str(e), }) return createCrossOriginResponse( status=418, mimetype='application/json', body=body) @@ -275,7 +278,11 @@ def main(): return 1 # Install test dependencies. - subprocess.check_call(['npm', 'install']) + if os.name == 'nt': + install_deps_command = ['npm.cmd', 'install'] + else: + install_deps_command = ['npm', 'install'] + subprocess.check_call(install_deps_command) # Fetch streams used in tests. if not os.path.exists(TEST_DIR): @@ -296,6 +303,7 @@ def main(): for i in range(trials): # Start up karma. karma_args = [ + 'node', 'node_modules/karma/bin/karma', 'start', 'tests/karma.conf.js', diff --git a/setup.py b/setup.py index 47fa94e..12df0b5 100644 --- a/setup.py +++ b/setup.py @@ -31,6 +31,7 @@ packages=setuptools.find_packages(), install_requires=[ 'PyYAML', + 'pywin32;platform_system=="Windows"', ], scripts=['shaka-streamer'], classifiers=[ @@ -38,6 +39,7 @@ 'License :: OSI Approved :: Apache Software License', 'Operating System :: POSIX :: Linux', 'Operating System :: MacOS :: MacOS X', + 'Operating System :: Microsoft :: Windows', ], # Python 3.5 tested in Ubuntu 16.04 LTS (Xenial Xerus). python_requires='>=3.5', diff --git a/streamer/controller_node.py b/streamer/controller_node.py index a39f647..5247d74 100644 --- a/streamer/controller_node.py +++ b/streamer/controller_node.py @@ -24,11 +24,9 @@ import os import re import shutil -import string import subprocess import sys import tempfile -import uuid from typing import Any, Dict, List, Optional, Tuple, Union from streamer.cloud_node import CloudNode @@ -42,6 +40,7 @@ from streamer.transcoder_node import TranscoderNode from streamer.periodconcat_node import PeriodConcatNode from streamer.util import is_url +from streamer.pipe import Pipe class ControllerNode(object): @@ -68,35 +67,6 @@ def __enter__(self) -> 'ControllerNode': def __exit__(self, *unused_args) -> None: self.stop() - def _create_pipe(self, suffix = '') -> str: - """Create a uniquely-named named pipe in the node's temp directory. - - Args: - suffix (str): An optional suffix added to the pipe's name. Used to - indicate to the packager when it's getting a certain text - format in a pipe. - Raises: - RuntimeError: If the platform doesn't have mkfifo. - Returns: - The path to the named pipe, as a string. - """ - - # TODO(#8): mkfifo only works on Unix. We would need a special case for a - # Windows port some day. - - if not hasattr(os, 'mkfifo'): - raise RuntimeError('Platform not supported due to lack of mkfifo') - - # Since the tempfile module creates actual files, use uuid to generate a - # filename, then call mkfifo to create the named pipe. - unique_name = str(uuid.uuid4()) + suffix - path = os.path.join(self._temp_dir, unique_name) - - readable_by_owner_only = 0o600 # Unix permission bits - os.mkfifo(path, mode=readable_by_owner_only) - - return path - def start(self, output_location: str, input_config_dict: Dict[str, Any], pipeline_config_dict: Dict[str, Any], @@ -227,15 +197,17 @@ def _append_nodes_for_inputs_list(self, inputs: List[Input], output_location: st # that node to write to. TranscoderNode will then instruct FFmpeg to # read from that pipe for this input. if input.input_type == InputType.EXTERNAL_COMMAND: - command_output = self._create_pipe() + command_output = Pipe.create_ipc_pipe(self._temp_dir) self._nodes.append(ExternalCommandNode( - input.name, command_output)) - input.set_pipe(command_output) + input.name, command_output.write_end())) + # reset the name of the input to be the output pipe path - which the + # transcoder node will read from - instead of a shell command. + input.reset_name(command_output.read_end()) if input.media_type == MediaType.AUDIO: for audio_codec in self._pipeline_config.audio_codecs: - outputs.append(AudioOutputStream(self._create_pipe(), - input, + outputs.append(AudioOutputStream(input, + self._temp_dir, audio_codec, self._pipeline_config.channels)) @@ -247,8 +219,8 @@ def _append_nodes_for_inputs_list(self, inputs: List[Input], output_location: st if input.get_resolution() < output_resolution: continue - outputs.append(VideoOutputStream(self._create_pipe(), - input, + outputs.append(VideoOutputStream(input, + self._temp_dir, video_codec, output_resolution)) @@ -257,14 +229,16 @@ def _append_nodes_for_inputs_list(self, inputs: List[Input], output_location: st # If the input is a VTT or TTML file, pass it directly to the packager # without any intermediate processing or any named pipe. # TODO: Test TTML inputs - text_pipe = None # Bypass transcoder + skip_transcoding = True # Bypass transcoder else: # Otherwise, the input is something like an mkv file with text tracks # in it. These will be extracted by the transcoder and passed in a # pipe to the packager. - text_pipe = self._create_pipe('.vtt') + skip_transcoding = False - outputs.append(TextOutputStream(text_pipe, input)) + outputs.append(TextOutputStream(input, + self._temp_dir, + skip_transcoding)) self._nodes.append(TranscoderNode(inputs, self._pipeline_config, diff --git a/streamer/input_configuration.py b/streamer/input_configuration.py index 555ff2a..520a3cf 100644 --- a/streamer/input_configuration.py +++ b/streamer/input_configuration.py @@ -243,25 +243,12 @@ def disallow_field(name: str, reason: str) -> None: disallow_field('start_time', reason) disallow_field('end_time', reason) - # A path to a pipe into which this input's contents are fed. - # None for most input types. - self._pipe: Optional[str] = None - def set_pipe(self, pipe: str) -> None: - """Set the path to a pipe into which this input's contents are fed. - - If set, this is what TranscoderNode will read from instead of .name. - """ - - self._pipe = pipe - - def get_path_for_transcode(self) -> str: - """Get the path which the transcoder will use to read the input. - - For some input types, this is a named pipe. For others, this is .name. + def reset_name(self, pipe_path: str) -> None: + """Set the name to a pipe path into which this input's contents are fed. """ - return self._pipe or self.name + self.name = pipe_path def get_stream_specifier(self) -> str: """Get an FFmpeg stream specifier for this input. diff --git a/streamer/output_stream.py b/streamer/output_stream.py index 8a7c792..e02f9f3 100644 --- a/streamer/output_stream.py +++ b/streamer/output_stream.py @@ -16,7 +16,8 @@ from streamer.bitrate_configuration import AudioCodec, AudioChannelLayout, VideoCodec, VideoResolution from streamer.input_configuration import Input, MediaType -from typing import Dict, Optional, Union +from streamer.pipe import Pipe +from typing import Dict, Union class OutputStream(object): @@ -24,26 +25,24 @@ class OutputStream(object): def __init__(self, type: MediaType, - pipe: Optional[str], input: Input, - codec: Union[AudioCodec, VideoCodec, None]) -> None: + codec: Union[AudioCodec, VideoCodec, None], + pipe_dir: str, + skip_transcoding: bool = False, + pipe_suffix: str = '') -> None: + self.type: MediaType = type - # If "pipe" is None, then this will not be transcoded. - self.pipe: Optional[str] = pipe + self.skip_transcoding = skip_transcoding self.input: Input = input - self.codec: Union[AudioCodec, VideoCodec, None] = codec self._features: Dict[str, str] = {} + self.codec: Union[AudioCodec, VideoCodec, None] = codec - def fill_template(self, template: str, **kwargs) -> str: - """Fill in a template string using **kwargs and features of the output.""" - - value_map: Dict[str, str] = {} - # First take any feature values from this object. - value_map.update(self._features) - # Then fill in any values from kwargs. - value_map.update(kwargs) - # Now fill in the template with these values. - return template.format(**value_map) + if self.skip_transcoding: + # If skip_transcoding is specified, let the Packager read from a plain + # file instead of an IPC pipe. + self.ipc_pipe = Pipe.create_file_pipe(self.input.name, mode='r') + else: + self.ipc_pipe = Pipe.create_ipc_pipe(pipe_dir, pipe_suffix) def is_hardware_accelerated(self) -> bool: """Returns True if this output stream uses hardware acceleration.""" @@ -58,18 +57,47 @@ def get_ffmpeg_codec_string(self, hwaccel_api: str) -> str: def is_dash_only(self) -> bool: """Returns True if the output format is restricted to DASH protocol""" - assert self.codec is not None - return self.codec.get_output_format() is 'webm' + if self.codec is not None: + return self.codec.get_output_format() == 'webm' + return False + + def get_init_seg_file(self) -> Pipe: + INIT_SEGMENT = { + MediaType.AUDIO: 'audio_{language}_{channels}c_{bitrate}_{codec}_init.{format}', + MediaType.VIDEO: 'video_{resolution_name}_{bitrate}_{codec}_init.{format}', + MediaType.TEXT: 'text_{language}_init.{format}', + } + path_templ = INIT_SEGMENT[self.type].format(**self._features) + return Pipe.create_file_pipe(path_templ, mode='w') + + def get_media_seg_file(self) -> Pipe: + MEDIA_SEGMENT = { + MediaType.AUDIO: 'audio_{language}_{channels}c_{bitrate}_{codec}_$Number$.{format}', + MediaType.VIDEO: 'video_{resolution_name}_{bitrate}_{codec}_$Number$.{format}', + MediaType.TEXT: 'text_{language}_$Number$.{format}', + } + path_templ = MEDIA_SEGMENT[self.type].format(**self._features) + return Pipe.create_file_pipe(path_templ, mode='w') + + def get_single_seg_file(self) -> Pipe: + SINGLE_SEGMENT = { + MediaType.AUDIO: 'audio_{language}_{channels}c_{bitrate}_{codec}.{format}', + MediaType.VIDEO: 'video_{resolution_name}_{bitrate}_{codec}.{format}', + MediaType.TEXT: 'text_{language}.{format}', + } + path_templ = SINGLE_SEGMENT[self.type].format(**self._features) + return Pipe.create_file_pipe(path_templ, mode='w') + class AudioOutputStream(OutputStream): def __init__(self, - pipe: str, input: Input, + pipe_dir: str, codec: AudioCodec, channels: int) -> None: - super().__init__(MediaType.AUDIO, pipe, input, codec) + super().__init__(MediaType.AUDIO, input, codec, pipe_dir) # Override the codec type and specify that it's an audio codec self.codec: AudioCodec = codec @@ -105,11 +133,11 @@ def get_bitrate(self) -> str: class VideoOutputStream(OutputStream): def __init__(self, - pipe: str, input: Input, + pipe_dir: str, codec: VideoCodec, resolution: VideoResolution) -> None: - super().__init__(MediaType.VIDEO, pipe, input, codec) + super().__init__(MediaType.VIDEO, input, codec, pipe_dir) # Override the codec type and specify that it's an audio codec self.codec: VideoCodec = codec self.resolution = resolution @@ -130,14 +158,16 @@ def get_bitrate(self) -> str: class TextOutputStream(OutputStream): def __init__(self, - pipe: Optional[str], - input: Input): + input: Input, + pipe_dir: str, + skip_transcoding: bool): # We don't have a codec per se for text, but we'd like to generically # process OutputStream objects in ways that are easier with this attribute # set, so set it to None. codec = None - super().__init__(MediaType.TEXT, pipe, input, codec) + super().__init__(MediaType.TEXT, input, codec, pipe_dir, + skip_transcoding, pipe_suffix='.vtt') # The features that will be used to generate the output filename. self._features = { diff --git a/streamer/packager_node.py b/streamer/packager_node.py index f50844b..49006bb 100644 --- a/streamer/packager_node.py +++ b/streamer/packager_node.py @@ -27,30 +27,9 @@ from typing import List, Optional, Union # Alias a few classes to avoid repeating namespaces later. -MediaType = input_configuration.MediaType - ManifestFormat = pipeline_configuration.ManifestFormat StreamingMode = pipeline_configuration.StreamingMode - -INIT_SEGMENT = { - MediaType.AUDIO: 'audio_{language}_{channels}c_{bitrate}_{codec}_init.{format}', - MediaType.VIDEO: 'video_{resolution_name}_{bitrate}_{codec}_init.{format}', - MediaType.TEXT: 'text_{language}_init.{format}', -} - -MEDIA_SEGMENT = { - MediaType.AUDIO: 'audio_{language}_{channels}c_{bitrate}_{codec}_$Number$.{format}', - MediaType.VIDEO: 'video_{resolution_name}_{bitrate}_{codec}_$Number$.{format}', - MediaType.TEXT: 'text_{language}_$Number$.{format}', -} - -SINGLE_SEGMENT = { - MediaType.AUDIO: 'audio_{language}_{channels}c_{bitrate}_{codec}.{format}', - MediaType.VIDEO: 'video_{resolution_name}_{bitrate}_{codec}.{format}', - MediaType.TEXT: 'text_{language}.{format}', -} - class SegmentError(Exception): """Raise when segment is incompatible with format.""" pass @@ -141,10 +120,9 @@ def start(self) -> None: stdout=stdout) def _setup_stream(self, stream: OutputStream) -> str: + dict = { - # If pipe is None, this wasn't transcoded, so we take the input path - # directly. - 'in': stream.pipe or stream.input.name, + 'in': stream.ipc_pipe.read_end(), 'stream': stream.type.value, } @@ -162,12 +140,15 @@ def _setup_stream(self, stream: OutputStream) -> str: if self._pipeline_config.segment_per_file: dict['init_segment'] = build_path( - self._segment_dir, stream.fill_template(INIT_SEGMENT[stream.type])) + self._segment_dir, + stream.get_init_seg_file().write_end()) dict['segment_template'] = build_path( - self._segment_dir, stream.fill_template(MEDIA_SEGMENT[stream.type])) + self._segment_dir, + stream.get_media_seg_file().write_end()) else: dict['output'] = build_path( - self._segment_dir, stream.fill_template(SINGLE_SEGMENT[stream.type])) + self._segment_dir, + stream.get_single_seg_file().write_end()) if stream.is_dash_only(): dict['dash_only'] = '1' diff --git a/streamer/pipe.py b/streamer/pipe.py new file mode 100644 index 0000000..cf50d40 --- /dev/null +++ b/streamer/pipe.py @@ -0,0 +1,144 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""A module that encapsulates all the platform-specific logic related to creating +named pipes.""" + +import os +import uuid +from threading import Thread +from typing import Optional + +class Pipe: + """A class that represents a pipe.""" + + def __init__(self): + """Initializes a non-functioning pipe.""" + + self._read_pipe_name = '' + self._write_pipe_name = '' + self._thread: Optional[Thread] = None + + @staticmethod + def create_ipc_pipe(temp_dir: str, suffix: str = '') -> 'Pipe': + """A static method used to create a pipe between two processes. + + On POSIX systems, it creates a named pipe using `os.mkfifo`. + + On Windows platforms, it starts a backgroud thread that transfars data from the + writer to the reader process it is connected to. + """ + + unique_name = str(uuid.uuid4()) + suffix + pipe = Pipe() + + # New Technology, aka WindowsNT. + if os.name == 'nt': + import win32pipe # type: ignore + pipe_name = '-nt-shaka-' + unique_name + # The read pipe is connected to a writer process. + pipe._read_pipe_name = r'\\.\pipe\W' + pipe_name + # The write pipe is connected to a reader process. + pipe._write_pipe_name = r'\\.\pipe\R' + pipe_name + buf_size = 64 * 1024 + + read_side = win32pipe.CreateNamedPipe( + pipe._read_pipe_name, + win32pipe.PIPE_ACCESS_INBOUND, + win32pipe.PIPE_WAIT | win32pipe.PIPE_TYPE_BYTE | win32pipe.PIPE_READMODE_BYTE, + 1, + buf_size, + buf_size, + 0, + None) + + write_side = win32pipe.CreateNamedPipe( + pipe._write_pipe_name, + win32pipe.PIPE_ACCESS_OUTBOUND, + win32pipe.PIPE_WAIT | win32pipe.PIPE_TYPE_BYTE | win32pipe.PIPE_READMODE_BYTE, + 1, + buf_size, + buf_size, + 0, + None) + + pipe._thread = Thread( + target=Pipe._win_thread_fn, + args=(read_side, write_side, buf_size), + daemon=True) + # Start the thread. + pipe._thread.start() + elif hasattr(os, 'mkfifo'): + pipe_name = os.path.join(temp_dir, unique_name) + pipe._read_pipe_name = pipe_name + pipe._write_pipe_name = pipe_name + readable_by_owner_only = 0o600 # Unix permission bits + os.mkfifo(pipe_name, mode=readable_by_owner_only) # type: ignore + else: + raise RuntimeError('Platform not supported.') + return pipe + + @staticmethod + def create_file_pipe(path: str, mode: str) -> 'Pipe': + """Returns a Pipe object whose read or write end is a path to a file.""" + + pipe = Pipe() + # A process will write on the read pipe(file). + if mode == 'w': + pipe._read_pipe_name = path + # A process will read from the write pipe(file). + elif mode == 'r': + pipe._write_pipe_name = path + else: + raise RuntimeError("'{}' is not a valid mode for a Pipe.".format(mode)) + return pipe + + @staticmethod + def _win_thread_fn(read_side, write_side, buf_size): + """This method serves as a server that connects a writer client + to a reader client. + + This methods will run as a thread, and will only be called on Windows platforms. + """ + + import win32pipe, win32file, pywintypes # type: ignore + try: + # Connect to both ends of the pipe before starting the transfer. + # This funciton is blocking. If no process is connected yet, it will wait + # indefinitely. + win32pipe.ConnectNamedPipe(read_side) + win32pipe.ConnectNamedPipe(write_side) + while True: + # Writer -> read_side -> write_side -> Reader + _, data = win32file.ReadFile(read_side, buf_size) + win32file.WriteFile(write_side, data) + except Exception as ex: + # Remove the pipes from the system. + win32file.CloseHandle(read_side) + win32file.CloseHandle(write_side) + # If the error was due to one of the processes shutting down, just exit normally. + if isinstance(ex, pywintypes.error) and ex.args[0] in [109, 232]: + return 0 + # Otherwise, raise that error. + raise ex + + def read_end(self) -> str: + """Returns a pipe/file path that a reader process can read from.""" + assert self._write_pipe_name + return self._write_pipe_name + + def write_end(self) -> str: + """Returns a pipe/file path that a writer process can write to.""" + assert self._read_pipe_name + return self._read_pipe_name diff --git a/streamer/transcoder_node.py b/streamer/transcoder_node.py index 7ff8345..2f79ed6 100644 --- a/streamer/transcoder_node.py +++ b/streamer/transcoder_node.py @@ -106,7 +106,7 @@ def start(self) -> None: # The input name always comes after the applicable input arguments. args += [ # The input itself. - '-i', input.get_path_for_transcode(), + '-i', input.name, ] for i, input in enumerate(self._inputs): @@ -123,7 +123,7 @@ def start(self) -> None: if output_stream.input != input: # Skip outputs that don't match this exact input object. continue - if output_stream.pipe is None: + if output_stream.skip_transcoding: # This input won't be transcoded. This is common for VTT text input. continue @@ -140,8 +140,7 @@ def start(self) -> None: assert(isinstance(output_stream, TextOutputStream)) args += self._encode_text(output_stream, input) - # The output pipe. - args += [output_stream.pipe] + args += [output_stream.ipc_pipe.write_end()] env = {} if self._pipeline_config.debug_logs: @@ -227,7 +226,8 @@ def _encode_video(self, stream: VideoOutputStream, input: Input) -> List[str]: # https://github.com/google/shaka-streamer/issues/36 filters.append('setsar=1:1') - if stream.codec in {VideoCodec.H264, VideoCodec.HEVC}: + if (stream.codec in {VideoCodec.H264, VideoCodec.HEVC} + and not stream.is_hardware_accelerated()): # These presets are specifically recognized by the software encoder. if self._pipeline_config.streaming_mode == StreamingMode.LIVE: args += [