Skip to content

Commit

Permalink
feat: Windows support (#85)
Browse files Browse the repository at this point in the history
Internally refactored to use Pipe objects instead of paths.  The Pipe objects support Windows using an artificial FIFO, and every other platform using os.mkfifo.

Closes #8
  • Loading branch information
mariocynicys authored Aug 4, 2021
1 parent 8cd8537 commit 9a7e0ec
Show file tree
Hide file tree
Showing 9 changed files with 244 additions and 117 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ shaka_streamer.egg-info/
.idea/
venv/
dev/
.vscode/
.vscode/
packager.exe
12 changes: 10 additions & 2 deletions run_end_to_end_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ def hlsStreamsReady(master_playlist_path):
return False

for playlist_path in playlist_list:
if playlist_path == master_playlist_path:
# Use os.path.samefile method instead of the == operator because
# this might be a windows machine.
if os.path.samefile(playlist_path, master_playlist_path):
# Skip the master playlist
continue

Expand Down Expand Up @@ -180,6 +182,7 @@ def start():
elif isinstance(e, RuntimeError):
body = json.dumps({
'error_type': 'RuntimeError',
'message': str(e),
})
return createCrossOriginResponse(
status=418, mimetype='application/json', body=body)
Expand Down Expand Up @@ -275,7 +278,11 @@ def main():
return 1

# Install test dependencies.
subprocess.check_call(['npm', 'install'])
if os.name == 'nt':
install_deps_command = ['npm.cmd', 'install']
else:
install_deps_command = ['npm', 'install']
subprocess.check_call(install_deps_command)

# Fetch streams used in tests.
if not os.path.exists(TEST_DIR):
Expand All @@ -296,6 +303,7 @@ def main():
for i in range(trials):
# Start up karma.
karma_args = [
'node',
'node_modules/karma/bin/karma',
'start',
'tests/karma.conf.js',
Expand Down
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@
packages=setuptools.find_packages(),
install_requires=[
'PyYAML',
'pywin32;platform_system=="Windows"',
],
scripts=['shaka-streamer'],
classifiers=[
'Programming Language :: Python :: 3',
'License :: OSI Approved :: Apache Software License',
'Operating System :: POSIX :: Linux',
'Operating System :: MacOS :: MacOS X',
'Operating System :: Microsoft :: Windows',
],
# Python 3.5 tested in Ubuntu 16.04 LTS (Xenial Xerus).
python_requires='>=3.5',
Expand Down
56 changes: 15 additions & 41 deletions streamer/controller_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@
import os
import re
import shutil
import string
import subprocess
import sys
import tempfile
import uuid

from typing import Any, Dict, List, Optional, Tuple, Union
from streamer.cloud_node import CloudNode
Expand All @@ -42,6 +40,7 @@
from streamer.transcoder_node import TranscoderNode
from streamer.periodconcat_node import PeriodConcatNode
from streamer.util import is_url
from streamer.pipe import Pipe


class ControllerNode(object):
Expand All @@ -68,35 +67,6 @@ def __enter__(self) -> 'ControllerNode':
def __exit__(self, *unused_args) -> None:
self.stop()

def _create_pipe(self, suffix = '') -> str:
"""Create a uniquely-named named pipe in the node's temp directory.
Args:
suffix (str): An optional suffix added to the pipe's name. Used to
indicate to the packager when it's getting a certain text
format in a pipe.
Raises:
RuntimeError: If the platform doesn't have mkfifo.
Returns:
The path to the named pipe, as a string.
"""

# TODO(#8): mkfifo only works on Unix. We would need a special case for a
# Windows port some day.

if not hasattr(os, 'mkfifo'):
raise RuntimeError('Platform not supported due to lack of mkfifo')

# Since the tempfile module creates actual files, use uuid to generate a
# filename, then call mkfifo to create the named pipe.
unique_name = str(uuid.uuid4()) + suffix
path = os.path.join(self._temp_dir, unique_name)

readable_by_owner_only = 0o600 # Unix permission bits
os.mkfifo(path, mode=readable_by_owner_only)

return path

def start(self, output_location: str,
input_config_dict: Dict[str, Any],
pipeline_config_dict: Dict[str, Any],
Expand Down Expand Up @@ -227,15 +197,17 @@ def _append_nodes_for_inputs_list(self, inputs: List[Input], output_location: st
# that node to write to. TranscoderNode will then instruct FFmpeg to
# read from that pipe for this input.
if input.input_type == InputType.EXTERNAL_COMMAND:
command_output = self._create_pipe()
command_output = Pipe.create_ipc_pipe(self._temp_dir)
self._nodes.append(ExternalCommandNode(
input.name, command_output))
input.set_pipe(command_output)
input.name, command_output.write_end()))
# reset the name of the input to be the output pipe path - which the
# transcoder node will read from - instead of a shell command.
input.reset_name(command_output.read_end())

if input.media_type == MediaType.AUDIO:
for audio_codec in self._pipeline_config.audio_codecs:
outputs.append(AudioOutputStream(self._create_pipe(),
input,
outputs.append(AudioOutputStream(input,
self._temp_dir,
audio_codec,
self._pipeline_config.channels))

Expand All @@ -247,8 +219,8 @@ def _append_nodes_for_inputs_list(self, inputs: List[Input], output_location: st
if input.get_resolution() < output_resolution:
continue

outputs.append(VideoOutputStream(self._create_pipe(),
input,
outputs.append(VideoOutputStream(input,
self._temp_dir,
video_codec,
output_resolution))

Expand All @@ -257,14 +229,16 @@ def _append_nodes_for_inputs_list(self, inputs: List[Input], output_location: st
# If the input is a VTT or TTML file, pass it directly to the packager
# without any intermediate processing or any named pipe.
# TODO: Test TTML inputs
text_pipe = None # Bypass transcoder
skip_transcoding = True # Bypass transcoder
else:
# Otherwise, the input is something like an mkv file with text tracks
# in it. These will be extracted by the transcoder and passed in a
# pipe to the packager.
text_pipe = self._create_pipe('.vtt')
skip_transcoding = False

outputs.append(TextOutputStream(text_pipe, input))
outputs.append(TextOutputStream(input,
self._temp_dir,
skip_transcoding))

self._nodes.append(TranscoderNode(inputs,
self._pipeline_config,
Expand Down
19 changes: 3 additions & 16 deletions streamer/input_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,25 +243,12 @@ def disallow_field(name: str, reason: str) -> None:
disallow_field('start_time', reason)
disallow_field('end_time', reason)

# A path to a pipe into which this input's contents are fed.
# None for most input types.
self._pipe: Optional[str] = None

def set_pipe(self, pipe: str) -> None:
"""Set the path to a pipe into which this input's contents are fed.
If set, this is what TranscoderNode will read from instead of .name.
"""

self._pipe = pipe

def get_path_for_transcode(self) -> str:
"""Get the path which the transcoder will use to read the input.
For some input types, this is a named pipe. For others, this is .name.
def reset_name(self, pipe_path: str) -> None:
"""Set the name to a pipe path into which this input's contents are fed.
"""

return self._pipe or self.name
self.name = pipe_path

def get_stream_specifier(self) -> str:
"""Get an FFmpeg stream specifier for this input.
Expand Down
80 changes: 55 additions & 25 deletions streamer/output_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,33 @@

from streamer.bitrate_configuration import AudioCodec, AudioChannelLayout, VideoCodec, VideoResolution
from streamer.input_configuration import Input, MediaType
from typing import Dict, Optional, Union
from streamer.pipe import Pipe
from typing import Dict, Union


class OutputStream(object):
"""Base class for output streams."""

def __init__(self,
type: MediaType,
pipe: Optional[str],
input: Input,
codec: Union[AudioCodec, VideoCodec, None]) -> None:
codec: Union[AudioCodec, VideoCodec, None],
pipe_dir: str,
skip_transcoding: bool = False,
pipe_suffix: str = '') -> None:

self.type: MediaType = type
# If "pipe" is None, then this will not be transcoded.
self.pipe: Optional[str] = pipe
self.skip_transcoding = skip_transcoding
self.input: Input = input
self.codec: Union[AudioCodec, VideoCodec, None] = codec
self._features: Dict[str, str] = {}
self.codec: Union[AudioCodec, VideoCodec, None] = codec

def fill_template(self, template: str, **kwargs) -> str:
"""Fill in a template string using **kwargs and features of the output."""

value_map: Dict[str, str] = {}
# First take any feature values from this object.
value_map.update(self._features)
# Then fill in any values from kwargs.
value_map.update(kwargs)
# Now fill in the template with these values.
return template.format(**value_map)
if self.skip_transcoding:
# If skip_transcoding is specified, let the Packager read from a plain
# file instead of an IPC pipe.
self.ipc_pipe = Pipe.create_file_pipe(self.input.name, mode='r')
else:
self.ipc_pipe = Pipe.create_ipc_pipe(pipe_dir, pipe_suffix)

def is_hardware_accelerated(self) -> bool:
"""Returns True if this output stream uses hardware acceleration."""
Expand All @@ -58,18 +57,47 @@ def get_ffmpeg_codec_string(self, hwaccel_api: str) -> str:

def is_dash_only(self) -> bool:
"""Returns True if the output format is restricted to DASH protocol"""
assert self.codec is not None
return self.codec.get_output_format() is 'webm'
if self.codec is not None:
return self.codec.get_output_format() == 'webm'
return False

def get_init_seg_file(self) -> Pipe:
INIT_SEGMENT = {
MediaType.AUDIO: 'audio_{language}_{channels}c_{bitrate}_{codec}_init.{format}',
MediaType.VIDEO: 'video_{resolution_name}_{bitrate}_{codec}_init.{format}',
MediaType.TEXT: 'text_{language}_init.{format}',
}
path_templ = INIT_SEGMENT[self.type].format(**self._features)
return Pipe.create_file_pipe(path_templ, mode='w')

def get_media_seg_file(self) -> Pipe:
MEDIA_SEGMENT = {
MediaType.AUDIO: 'audio_{language}_{channels}c_{bitrate}_{codec}_$Number$.{format}',
MediaType.VIDEO: 'video_{resolution_name}_{bitrate}_{codec}_$Number$.{format}',
MediaType.TEXT: 'text_{language}_$Number$.{format}',
}
path_templ = MEDIA_SEGMENT[self.type].format(**self._features)
return Pipe.create_file_pipe(path_templ, mode='w')

def get_single_seg_file(self) -> Pipe:
SINGLE_SEGMENT = {
MediaType.AUDIO: 'audio_{language}_{channels}c_{bitrate}_{codec}.{format}',
MediaType.VIDEO: 'video_{resolution_name}_{bitrate}_{codec}.{format}',
MediaType.TEXT: 'text_{language}.{format}',
}
path_templ = SINGLE_SEGMENT[self.type].format(**self._features)
return Pipe.create_file_pipe(path_templ, mode='w')


class AudioOutputStream(OutputStream):

def __init__(self,
pipe: str,
input: Input,
pipe_dir: str,
codec: AudioCodec,
channels: int) -> None:

super().__init__(MediaType.AUDIO, pipe, input, codec)
super().__init__(MediaType.AUDIO, input, codec, pipe_dir)
# Override the codec type and specify that it's an audio codec
self.codec: AudioCodec = codec

Expand Down Expand Up @@ -105,11 +133,11 @@ def get_bitrate(self) -> str:
class VideoOutputStream(OutputStream):

def __init__(self,
pipe: str,
input: Input,
pipe_dir: str,
codec: VideoCodec,
resolution: VideoResolution) -> None:
super().__init__(MediaType.VIDEO, pipe, input, codec)
super().__init__(MediaType.VIDEO, input, codec, pipe_dir)
# Override the codec type and specify that it's an audio codec
self.codec: VideoCodec = codec
self.resolution = resolution
Expand All @@ -130,14 +158,16 @@ def get_bitrate(self) -> str:
class TextOutputStream(OutputStream):

def __init__(self,
pipe: Optional[str],
input: Input):
input: Input,
pipe_dir: str,
skip_transcoding: bool):
# We don't have a codec per se for text, but we'd like to generically
# process OutputStream objects in ways that are easier with this attribute
# set, so set it to None.
codec = None

super().__init__(MediaType.TEXT, pipe, input, codec)
super().__init__(MediaType.TEXT, input, codec, pipe_dir,
skip_transcoding, pipe_suffix='.vtt')

# The features that will be used to generate the output filename.
self._features = {
Expand Down
Loading

0 comments on commit 9a7e0ec

Please sign in to comment.