Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP url output support #82

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion run_end_to_end_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def start():

controller = ControllerNode()
try:
controller.start(OUTPUT_DIR,
controller.start(configs['output_location'],
configs['input_config'],
configs['pipeline_config'],
configs['bitrate_config'],
Expand All @@ -177,6 +177,12 @@ def start():
})
return createCrossOriginResponse(
status=418, mimetype='application/json', body=body)
elif isinstance(e, RuntimeError):
body = json.dumps({
'error_type': 'RuntimeError',
})
return createCrossOriginResponse(
status=418, mimetype='application/json', body=body)
else:
traceback.print_exc()
return createCrossOriginResponse(status=500, body=str(e))
Expand Down
3 changes: 2 additions & 1 deletion shaka-streamer
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ def main():
'upload to. (Starts with gs:// or s3://)')
parser.add_argument('-o', '--output',
default='output_files',
help='The output folder to write files to. ' +
help='The output folder to write files to, or an HTTP ' +
'or HTTPS URL where files will be PUT.' +
'Used even if uploading to cloud storage.')
parser.add_argument('--skip_deps_check',
action='store_true',
Expand Down
55 changes: 36 additions & 19 deletions streamer/controller_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from streamer.pipeline_configuration import PipelineConfig, StreamingMode
from streamer.transcoder_node import TranscoderNode
from streamer.periodconcat_node import PeriodConcatNode
from streamer.util import is_url


class ControllerNode(object):
Expand Down Expand Up @@ -96,7 +97,7 @@ def _create_pipe(self, suffix = '') -> str:

return path

def start(self, output_dir: str,
def start(self, output_location: str,
input_config_dict: Dict[str, Any],
pipeline_config_dict: Dict[str, Any],
bitrate_config_dict: Dict[Any, Any] = {},
Expand Down Expand Up @@ -137,14 +138,6 @@ def start(self, output_dir: str,
# the destination, independent of the version check above.
CloudNode.check_access(bucket_url)


self._output_dir = output_dir
# Check if the directory for outputted Packager files exists, and if it
# does, delete it and remake a new one.
if os.path.exists(self._output_dir):
shutil.rmtree(self._output_dir)
os.mkdir(self._output_dir)

# Define resolutions and bitrates before parsing other configs.
bitrate_config = BitrateConfig(bitrate_config_dict)

Expand All @@ -157,29 +150,55 @@ def start(self, output_dir: str,
self._input_config = InputConfig(input_config_dict)
self._pipeline_config = PipelineConfig(pipeline_config_dict)

if not is_url(output_location):
# Check if the directory for outputted Packager files exists, and if it
# does, delete it and remake a new one.
if os.path.exists(output_location):
shutil.rmtree(output_location)
os.mkdir(output_location)
else:
# Check some restrictions and other details on HTTP output.
if not self._pipeline_config.segment_per_file:
raise RuntimeError(
'For HTTP PUT uploads, the pipeline segment_per_file setting ' +
'must be set to True!')

if bucket_url:
raise RuntimeError(
'Cloud bucket upload is incompatible with HTTP PUT support.')

if self._input_config.multiperiod_inputs_list:
# TODO: Edit Multiperiod input list implementation to support HTTP outputs
raise RuntimeError(
'Multiperiod input list support is incompatible with HTTP outputs.')

# Note that we remove the trailing slash from the output location, because
# otherwise GCS would create a subdirectory whose name is "".
output_location = output_location.rstrip('/')

if self._input_config.inputs:
# InputConfig contains inputs only.
self._append_nodes_for_inputs_list(self._input_config.inputs)
self._append_nodes_for_inputs_list(self._input_config.inputs, output_location)
else:
# InputConfig contains multiperiod_inputs_list only.
# Create one Transcoder node and one Packager node for each period.
for i, singleperiod in enumerate(self._input_config.multiperiod_inputs_list):
sub_dir_name = 'period_' + str(i)
self._append_nodes_for_inputs_list(singleperiod.inputs, sub_dir_name)
self._append_nodes_for_inputs_list(singleperiod.inputs, output_location, sub_dir_name)

if self._pipeline_config.streaming_mode == StreamingMode.VOD:
packager_nodes = [node for node in self._nodes if isinstance(node, PackagerNode)]
self._nodes.append(PeriodConcatNode(
self._pipeline_config,
packager_nodes,
self._output_dir))
output_location))

if bucket_url:
cloud_temp_dir = os.path.join(self._temp_dir, 'cloud')
os.mkdir(cloud_temp_dir)

packager_nodes = [node for node in self._nodes if isinstance(node, PackagerNode)]
self._nodes.append(CloudNode(self._output_dir,
self._nodes.append(CloudNode(output_location,
bucket_url,
cloud_temp_dir,
packager_nodes,
Expand All @@ -190,7 +209,7 @@ def start(self, output_dir: str,

return self

def _append_nodes_for_inputs_list(self, inputs: List[Input],
def _append_nodes_for_inputs_list(self, inputs: List[Input], output_location: str,
period_dir: Optional[str] = None) -> None:
"""A common method that creates Transcoder and Packager nodes for a list of Inputs passed to it.

Expand Down Expand Up @@ -250,17 +269,15 @@ def _append_nodes_for_inputs_list(self, inputs: List[Input],
self._nodes.append(TranscoderNode(inputs,
self._pipeline_config,
outputs))

output_dir = self._output_dir

# If the inputs list was a period in multiperiod_inputs_list, create a nested directory
# and put that period in it.
if period_dir:
output_dir = os.path.join(output_dir, period_dir)
os.mkdir(output_dir)
output_location = os.path.join(output_location, period_dir)
os.mkdir(output_location)

self._nodes.append(PackagerNode(self._pipeline_config,
output_dir,
output_location,
outputs))

def check_status(self) -> ProcessStatus:
Expand Down
64 changes: 41 additions & 23 deletions streamer/packager_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from streamer.output_stream import OutputStream
from streamer.pipeline_configuration import EncryptionMode, PipelineConfig
from streamer.util import is_url
from typing import List, Optional, Union

# Alias a few classes to avoid repeating namespaces later.
Expand All @@ -33,38 +34,58 @@


INIT_SEGMENT = {
MediaType.AUDIO: '{dir}/audio_{language}_{channels}c_{bitrate}_{codec}_init.{format}',
MediaType.VIDEO: '{dir}/video_{resolution_name}_{bitrate}_{codec}_init.{format}',
MediaType.TEXT: '{dir}/text_{language}_init.{format}',
MediaType.AUDIO: 'audio_{language}_{channels}c_{bitrate}_{codec}_init.{format}',
MediaType.VIDEO: 'video_{resolution_name}_{bitrate}_{codec}_init.{format}',
MediaType.TEXT: 'text_{language}_init.{format}',
}

MEDIA_SEGMENT = {
MediaType.AUDIO: '{dir}/audio_{language}_{channels}c_{bitrate}_{codec}_$Number$.{format}',
MediaType.VIDEO: '{dir}/video_{resolution_name}_{bitrate}_{codec}_$Number$.{format}',
MediaType.TEXT: '{dir}/text_{language}_$Number$.{format}',
MediaType.AUDIO: 'audio_{language}_{channels}c_{bitrate}_{codec}_$Number$.{format}',
MediaType.VIDEO: 'video_{resolution_name}_{bitrate}_{codec}_$Number$.{format}',
MediaType.TEXT: 'text_{language}_$Number$.{format}',
}

SINGLE_SEGMENT = {
MediaType.AUDIO: '{dir}/audio_{language}_{channels}c_{bitrate}_{codec}.{format}',
MediaType.VIDEO: '{dir}/video_{resolution_name}_{bitrate}_{codec}.{format}',
MediaType.TEXT: '{dir}/text_{language}.{format}',
MediaType.AUDIO: 'audio_{language}_{channels}c_{bitrate}_{codec}.{format}',
MediaType.VIDEO: 'video_{resolution_name}_{bitrate}_{codec}.{format}',
MediaType.TEXT: 'text_{language}.{format}',
}

class SegmentError(Exception):
"""Raise when segment is incompatible with format."""
pass

def build_path(output_location, sub_path):
"""Handle annoying edge cases with paths for cloud upload.
If a path has two slashes, GCS will create an intermediate directory named "".
So we have to be careful in how we construct paths to avoid this.
"""
# ControllerNode should have already stripped trailing slashes from the output
# location.

# Sometimes the segment dir is empty. This handles that special case.
if not sub_path:
return output_location

if is_url(output_location):
# Don't use os.path.join, since URLs must use forward slashes and Streamer
# could be used on Windows.
return output_location + '/' + sub_path

return os.path.join(output_location, sub_path)


class PackagerNode(node_base.PolitelyWaitOnFinish):

def __init__(self,
pipeline_config: PipelineConfig,
output_dir: str,
output_location: str,
output_streams: List[OutputStream]) -> None:
super().__init__()
self._pipeline_config: PipelineConfig = pipeline_config
self.output_dir: str = output_dir
self._segment_dir: str = os.path.join(output_dir, pipeline_config.segment_folder)
self.output_location: str = output_location
self._segment_dir: str = build_path(
output_location, pipeline_config.segment_folder)
self._output_streams: List[OutputStream] = output_streams

def start(self) -> None:
Expand Down Expand Up @@ -140,16 +161,13 @@ def _setup_stream(self, stream: OutputStream) -> str:
dict['language'] = stream.input.language

if self._pipeline_config.segment_per_file:
dict['init_segment'] = stream.fill_template(
INIT_SEGMENT[stream.type],
dir=self._segment_dir)
dict['segment_template'] = stream.fill_template(
MEDIA_SEGMENT[stream.type],
dir=self._segment_dir)
dict['init_segment'] = build_path(
self._segment_dir, stream.fill_template(INIT_SEGMENT[stream.type]))
dict['segment_template'] = build_path(
self._segment_dir, stream.fill_template(MEDIA_SEGMENT[stream.type]))
else:
dict['output'] = stream.fill_template(
SINGLE_SEGMENT[stream.type],
dir=self._segment_dir)
dict['output'] = build_path(
self._segment_dir, stream.fill_template(SINGLE_SEGMENT[stream.type]))

if stream.is_dash_only():
dict['dash_only'] = '1'
Expand All @@ -168,7 +186,7 @@ def _setup_manifest_format(self) -> List[str]:
args += [
# Generate DASH manifest file.
'--mpd_output',
os.path.join(self.output_dir, self._pipeline_config.dash_output),
os.path.join(self.output_location, self._pipeline_config.dash_output),
]
if ManifestFormat.HLS in self._pipeline_config.manifest_format:
if self._pipeline_config.streaming_mode == StreamingMode.LIVE:
Expand All @@ -182,7 +200,7 @@ def _setup_manifest_format(self) -> List[str]:
args += [
# Generate HLS playlist file(s).
'--hls_master_playlist_output',
os.path.join(self.output_dir, self._pipeline_config.hls_output),
os.path.join(self.output_location, self._pipeline_config.hls_output),
]
return args

Expand Down
16 changes: 8 additions & 8 deletions streamer/periodconcat_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@

class PeriodConcatNode(ThreadedNodeBase):
"""A node that concatenates multiple DASH manifests and/or HLS playlists
when the input is a multiperiod_inputs_list.
when the input is a multiperiod_inputs_list and the output is to the local the system.
"""

def __init__(self,
pipeline_config: PipelineConfig,
packager_nodes: List[PackagerNode],
output_dir: str) -> None:
output_location: str) -> None:
"""Stores all relevant information needed for the period concatenation."""
super().__init__(thread_name='periodconcat', continue_on_exception=False, sleep_time=3)
self._pipeline_config = pipeline_config
self._output_dir = output_dir
self._output_location = output_location
self._packager_nodes: List[PackagerNode] = packager_nodes

def _thread_single_pass(self) -> None:
Expand Down Expand Up @@ -81,7 +81,7 @@ def find(elem: ElementTree.Element, *args: str) -> ElementTree.Element:

# Get the root of an MPD file that we will concatenate periods into.
concat_mpd = ElementTree.ElementTree(file=os.path.join(
self._packager_nodes[0].output_dir,
self._packager_nodes[0].output_location,
self._pipeline_config.dash_output)).getroot()

# Get the default namespace.
Expand All @@ -97,24 +97,24 @@ def find(elem: ElementTree.Element, *args: str) -> ElementTree.Element:
for packager_node in self._packager_nodes:

mpd = ElementTree.ElementTree(file=os.path.join(
packager_node.output_dir,
packager_node.output_location,
self._pipeline_config.dash_output)).getroot()
period = find(mpd, 'Period')
period.attrib['duration'] = mpd.attrib['mediaPresentationDuration']

# A BaseURL that will have the relative path to media file.
base_url = ElementTree.Element('BaseURL')
base_url.text = os.path.relpath(packager_node.output_dir, self._output_dir) + '/'
base_url.text = os.path.relpath(packager_node.output_location, self._output_location) + '/'
period.insert(0, base_url)

periods.append(period)

# Add the periods collected from all the files.
concat_mpd.extend(periods)

# Write the period concat to the output_dir.
# Write the period concat to the output_location.
with open(os.path.join(
self._output_dir,
self._output_location,
self._pipeline_config.dash_output), 'w') as master_dash:

contents = "<?xml version='1.0' encoding='UTF-8'?>\n"
Expand Down
20 changes: 20 additions & 0 deletions streamer/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Utility functions used by multiple modules."""

def is_url(output_location: str) -> bool:
"""Returns True if the output location is a URL."""
return (output_location.startswith('http:') or
output_location.startswith('https:'))
Loading