Skip to content

Commit

Permalink
HTTP url output support (#82)
Browse files Browse the repository at this point in the history
## Feature
- Allow users to specify a HTTP url as an output
- Segments and manifests will be uploaded to the specified url via HTTP PUT
- Urls ending with a '/' will have the slash removed to prevent issues with Google Cloud

## Testing
- Added unit tests to check for config compatibility errors, such as specifying an HTTP url output with a "multi_period_input" and with "segment_per_file" set to false

## Shortcomings
- The multi period input list feature is currently structured to use the local file system and does not easily support HTTP uploads. This must be addressed as the multi period input feature continues to evolve
- The HTTP upload feature does not make sense if used with a cloud url, and although there is an error check for this, there is no unit test. The cloud url unit test would have required a mock bucket to pass the cloud url validity checks. Because of the work involved in writing a mock bucket and [the future (and hopefully soon) removal of the CloudNode](CaitlinOCallaghan#2 (comment)), it was decided that forgoing this particular unit test was okay.
CaitlinOCallaghan authored Aug 2, 2021
1 parent b83afe9 commit 8cd8537
Showing 7 changed files with 145 additions and 53 deletions.
8 changes: 7 additions & 1 deletion run_end_to_end_tests.py
Original file line number Diff line number Diff line change
@@ -155,7 +155,7 @@ def start():

controller = ControllerNode()
try:
controller.start(OUTPUT_DIR,
controller.start(configs['output_location'],
configs['input_config'],
configs['pipeline_config'],
configs['bitrate_config'],
@@ -177,6 +177,12 @@ def start():
})
return createCrossOriginResponse(
status=418, mimetype='application/json', body=body)
elif isinstance(e, RuntimeError):
body = json.dumps({
'error_type': 'RuntimeError',
})
return createCrossOriginResponse(
status=418, mimetype='application/json', body=body)
else:
traceback.print_exc()
return createCrossOriginResponse(status=500, body=str(e))
3 changes: 2 additions & 1 deletion shaka-streamer
Original file line number Diff line number Diff line change
@@ -66,7 +66,8 @@ def main():
'upload to. (Starts with gs:// or s3://)')
parser.add_argument('-o', '--output',
default='output_files',
help='The output folder to write files to. ' +
help='The output folder to write files to, or an HTTP ' +
'or HTTPS URL where files will be PUT.' +
'Used even if uploading to cloud storage.')
parser.add_argument('--skip_deps_check',
action='store_true',
55 changes: 36 additions & 19 deletions streamer/controller_node.py
Original file line number Diff line number Diff line change
@@ -41,6 +41,7 @@
from streamer.pipeline_configuration import PipelineConfig, StreamingMode
from streamer.transcoder_node import TranscoderNode
from streamer.periodconcat_node import PeriodConcatNode
from streamer.util import is_url


class ControllerNode(object):
@@ -96,7 +97,7 @@ def _create_pipe(self, suffix = '') -> str:

return path

def start(self, output_dir: str,
def start(self, output_location: str,
input_config_dict: Dict[str, Any],
pipeline_config_dict: Dict[str, Any],
bitrate_config_dict: Dict[Any, Any] = {},
@@ -137,14 +138,6 @@ def start(self, output_dir: str,
# the destination, independent of the version check above.
CloudNode.check_access(bucket_url)


self._output_dir = output_dir
# Check if the directory for outputted Packager files exists, and if it
# does, delete it and remake a new one.
if os.path.exists(self._output_dir):
shutil.rmtree(self._output_dir)
os.mkdir(self._output_dir)

# Define resolutions and bitrates before parsing other configs.
bitrate_config = BitrateConfig(bitrate_config_dict)

@@ -157,29 +150,55 @@ def start(self, output_dir: str,
self._input_config = InputConfig(input_config_dict)
self._pipeline_config = PipelineConfig(pipeline_config_dict)

if not is_url(output_location):
# Check if the directory for outputted Packager files exists, and if it
# does, delete it and remake a new one.
if os.path.exists(output_location):
shutil.rmtree(output_location)
os.mkdir(output_location)
else:
# Check some restrictions and other details on HTTP output.
if not self._pipeline_config.segment_per_file:
raise RuntimeError(
'For HTTP PUT uploads, the pipeline segment_per_file setting ' +
'must be set to True!')

if bucket_url:
raise RuntimeError(
'Cloud bucket upload is incompatible with HTTP PUT support.')

if self._input_config.multiperiod_inputs_list:
# TODO: Edit Multiperiod input list implementation to support HTTP outputs
raise RuntimeError(
'Multiperiod input list support is incompatible with HTTP outputs.')

# Note that we remove the trailing slash from the output location, because
# otherwise GCS would create a subdirectory whose name is "".
output_location = output_location.rstrip('/')

if self._input_config.inputs:
# InputConfig contains inputs only.
self._append_nodes_for_inputs_list(self._input_config.inputs)
self._append_nodes_for_inputs_list(self._input_config.inputs, output_location)
else:
# InputConfig contains multiperiod_inputs_list only.
# Create one Transcoder node and one Packager node for each period.
for i, singleperiod in enumerate(self._input_config.multiperiod_inputs_list):
sub_dir_name = 'period_' + str(i)
self._append_nodes_for_inputs_list(singleperiod.inputs, sub_dir_name)
self._append_nodes_for_inputs_list(singleperiod.inputs, output_location, sub_dir_name)

if self._pipeline_config.streaming_mode == StreamingMode.VOD:
packager_nodes = [node for node in self._nodes if isinstance(node, PackagerNode)]
self._nodes.append(PeriodConcatNode(
self._pipeline_config,
packager_nodes,
self._output_dir))
output_location))

if bucket_url:
cloud_temp_dir = os.path.join(self._temp_dir, 'cloud')
os.mkdir(cloud_temp_dir)

packager_nodes = [node for node in self._nodes if isinstance(node, PackagerNode)]
self._nodes.append(CloudNode(self._output_dir,
self._nodes.append(CloudNode(output_location,
bucket_url,
cloud_temp_dir,
packager_nodes,
@@ -190,7 +209,7 @@ def start(self, output_dir: str,

return self

def _append_nodes_for_inputs_list(self, inputs: List[Input],
def _append_nodes_for_inputs_list(self, inputs: List[Input], output_location: str,
period_dir: Optional[str] = None) -> None:
"""A common method that creates Transcoder and Packager nodes for a list of Inputs passed to it.
@@ -250,17 +269,15 @@ def _append_nodes_for_inputs_list(self, inputs: List[Input],
self._nodes.append(TranscoderNode(inputs,
self._pipeline_config,
outputs))

output_dir = self._output_dir

# If the inputs list was a period in multiperiod_inputs_list, create a nested directory
# and put that period in it.
if period_dir:
output_dir = os.path.join(output_dir, period_dir)
os.mkdir(output_dir)
output_location = os.path.join(output_location, period_dir)
os.mkdir(output_location)

self._nodes.append(PackagerNode(self._pipeline_config,
output_dir,
output_location,
outputs))

def check_status(self) -> ProcessStatus:
64 changes: 41 additions & 23 deletions streamer/packager_node.py
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@

from streamer.output_stream import OutputStream
from streamer.pipeline_configuration import EncryptionMode, PipelineConfig
from streamer.util import is_url
from typing import List, Optional, Union

# Alias a few classes to avoid repeating namespaces later.
@@ -33,38 +34,58 @@


INIT_SEGMENT = {
MediaType.AUDIO: '{dir}/audio_{language}_{channels}c_{bitrate}_{codec}_init.{format}',
MediaType.VIDEO: '{dir}/video_{resolution_name}_{bitrate}_{codec}_init.{format}',
MediaType.TEXT: '{dir}/text_{language}_init.{format}',
MediaType.AUDIO: 'audio_{language}_{channels}c_{bitrate}_{codec}_init.{format}',
MediaType.VIDEO: 'video_{resolution_name}_{bitrate}_{codec}_init.{format}',
MediaType.TEXT: 'text_{language}_init.{format}',
}

MEDIA_SEGMENT = {
MediaType.AUDIO: '{dir}/audio_{language}_{channels}c_{bitrate}_{codec}_$Number$.{format}',
MediaType.VIDEO: '{dir}/video_{resolution_name}_{bitrate}_{codec}_$Number$.{format}',
MediaType.TEXT: '{dir}/text_{language}_$Number$.{format}',
MediaType.AUDIO: 'audio_{language}_{channels}c_{bitrate}_{codec}_$Number$.{format}',
MediaType.VIDEO: 'video_{resolution_name}_{bitrate}_{codec}_$Number$.{format}',
MediaType.TEXT: 'text_{language}_$Number$.{format}',
}

SINGLE_SEGMENT = {
MediaType.AUDIO: '{dir}/audio_{language}_{channels}c_{bitrate}_{codec}.{format}',
MediaType.VIDEO: '{dir}/video_{resolution_name}_{bitrate}_{codec}.{format}',
MediaType.TEXT: '{dir}/text_{language}.{format}',
MediaType.AUDIO: 'audio_{language}_{channels}c_{bitrate}_{codec}.{format}',
MediaType.VIDEO: 'video_{resolution_name}_{bitrate}_{codec}.{format}',
MediaType.TEXT: 'text_{language}.{format}',
}

class SegmentError(Exception):
"""Raise when segment is incompatible with format."""
pass

def build_path(output_location, sub_path):
"""Handle annoying edge cases with paths for cloud upload.
If a path has two slashes, GCS will create an intermediate directory named "".
So we have to be careful in how we construct paths to avoid this.
"""
# ControllerNode should have already stripped trailing slashes from the output
# location.

# Sometimes the segment dir is empty. This handles that special case.
if not sub_path:
return output_location

if is_url(output_location):
# Don't use os.path.join, since URLs must use forward slashes and Streamer
# could be used on Windows.
return output_location + '/' + sub_path

return os.path.join(output_location, sub_path)


class PackagerNode(node_base.PolitelyWaitOnFinish):

def __init__(self,
pipeline_config: PipelineConfig,
output_dir: str,
output_location: str,
output_streams: List[OutputStream]) -> None:
super().__init__()
self._pipeline_config: PipelineConfig = pipeline_config
self.output_dir: str = output_dir
self._segment_dir: str = os.path.join(output_dir, pipeline_config.segment_folder)
self.output_location: str = output_location
self._segment_dir: str = build_path(
output_location, pipeline_config.segment_folder)
self._output_streams: List[OutputStream] = output_streams

def start(self) -> None:
@@ -140,16 +161,13 @@ def _setup_stream(self, stream: OutputStream) -> str:
dict['language'] = stream.input.language

if self._pipeline_config.segment_per_file:
dict['init_segment'] = stream.fill_template(
INIT_SEGMENT[stream.type],
dir=self._segment_dir)
dict['segment_template'] = stream.fill_template(
MEDIA_SEGMENT[stream.type],
dir=self._segment_dir)
dict['init_segment'] = build_path(
self._segment_dir, stream.fill_template(INIT_SEGMENT[stream.type]))
dict['segment_template'] = build_path(
self._segment_dir, stream.fill_template(MEDIA_SEGMENT[stream.type]))
else:
dict['output'] = stream.fill_template(
SINGLE_SEGMENT[stream.type],
dir=self._segment_dir)
dict['output'] = build_path(
self._segment_dir, stream.fill_template(SINGLE_SEGMENT[stream.type]))

if stream.is_dash_only():
dict['dash_only'] = '1'
@@ -168,7 +186,7 @@ def _setup_manifest_format(self) -> List[str]:
args += [
# Generate DASH manifest file.
'--mpd_output',
os.path.join(self.output_dir, self._pipeline_config.dash_output),
os.path.join(self.output_location, self._pipeline_config.dash_output),
]
if ManifestFormat.HLS in self._pipeline_config.manifest_format:
if self._pipeline_config.streaming_mode == StreamingMode.LIVE:
@@ -182,7 +200,7 @@ def _setup_manifest_format(self) -> List[str]:
args += [
# Generate HLS playlist file(s).
'--hls_master_playlist_output',
os.path.join(self.output_dir, self._pipeline_config.hls_output),
os.path.join(self.output_location, self._pipeline_config.hls_output),
]
return args

16 changes: 8 additions & 8 deletions streamer/periodconcat_node.py
Original file line number Diff line number Diff line change
@@ -26,17 +26,17 @@

class PeriodConcatNode(ThreadedNodeBase):
"""A node that concatenates multiple DASH manifests and/or HLS playlists
when the input is a multiperiod_inputs_list.
when the input is a multiperiod_inputs_list and the output is to the local the system.
"""

def __init__(self,
pipeline_config: PipelineConfig,
packager_nodes: List[PackagerNode],
output_dir: str) -> None:
output_location: str) -> None:
"""Stores all relevant information needed for the period concatenation."""
super().__init__(thread_name='periodconcat', continue_on_exception=False, sleep_time=3)
self._pipeline_config = pipeline_config
self._output_dir = output_dir
self._output_location = output_location
self._packager_nodes: List[PackagerNode] = packager_nodes

def _thread_single_pass(self) -> None:
@@ -81,7 +81,7 @@ def find(elem: ElementTree.Element, *args: str) -> ElementTree.Element:

# Get the root of an MPD file that we will concatenate periods into.
concat_mpd = ElementTree.ElementTree(file=os.path.join(
self._packager_nodes[0].output_dir,
self._packager_nodes[0].output_location,
self._pipeline_config.dash_output)).getroot()

# Get the default namespace.
@@ -97,24 +97,24 @@ def find(elem: ElementTree.Element, *args: str) -> ElementTree.Element:
for packager_node in self._packager_nodes:

mpd = ElementTree.ElementTree(file=os.path.join(
packager_node.output_dir,
packager_node.output_location,
self._pipeline_config.dash_output)).getroot()
period = find(mpd, 'Period')
period.attrib['duration'] = mpd.attrib['mediaPresentationDuration']

# A BaseURL that will have the relative path to media file.
base_url = ElementTree.Element('BaseURL')
base_url.text = os.path.relpath(packager_node.output_dir, self._output_dir) + '/'
base_url.text = os.path.relpath(packager_node.output_location, self._output_location) + '/'
period.insert(0, base_url)

periods.append(period)

# Add the periods collected from all the files.
concat_mpd.extend(periods)

# Write the period concat to the output_dir.
# Write the period concat to the output_location.
with open(os.path.join(
self._output_dir,
self._output_location,
self._pipeline_config.dash_output), 'w') as master_dash:

contents = "<?xml version='1.0' encoding='UTF-8'?>\n"
20 changes: 20 additions & 0 deletions streamer/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Utility functions used by multiple modules."""

def is_url(output_location: str) -> bool:
"""Returns True if the output location is a URL."""
return (output_location.startswith('http:') or
output_location.startswith('https:'))
32 changes: 31 additions & 1 deletion tests/tests.js
Original file line number Diff line number Diff line change
@@ -13,13 +13,15 @@
// limitations under the License.

const flaskServerUrl = 'http://localhost:5000/';
const outputHttpUrl = 'http://localhost:80/';
const dashManifestUrl = flaskServerUrl + 'output_files/dash.mpd';
const hlsManifestUrl = flaskServerUrl + 'output_files/hls.m3u8';
const OUTPUT_DIR = 'output_files/'
const TEST_DIR = 'test_assets/';
let player;
let video;

async function startStreamer(inputConfig, pipelineConfig, bitrateConfig={}) {
async function startStreamer(inputConfig, pipelineConfig, bitrateConfig={}, outputLocation=OUTPUT_DIR) {
// Send a request to flask server to start Shaka Streamer.
const response = await fetch(flaskServerUrl + 'start', {
method: 'POST',
@@ -30,6 +32,7 @@ async function startStreamer(inputConfig, pipelineConfig, bitrateConfig={}) {
'input_config': inputConfig,
'pipeline_config': pipelineConfig,
'bitrate_config': bitrateConfig,
'output_location': outputLocation
}),
});

@@ -351,6 +354,33 @@ function errorTests() {
field_name: 'inputs',
}));
});

it('fails when segment_per_file is false with a HTTP url output', async () => {
const inputConfig = getBasicInputConfig();
const pipelineConfig = {
streaming_mode: 'vod',
resolutions: [],
segment_per_file: false,
};

await expectAsync(startStreamer(inputConfig, pipelineConfig, {}, outputHttpUrl))
.toBeRejectedWith(jasmine.objectContaining({
error_type: 'RuntimeError',
}));
});

it('fails when multiperiod_inputs_list is used with a HTTP url output', async () => {
const inputConfig = {
'multiperiod_inputs_list': [
getBasicInputConfig(),
],
};

await expectAsync(startStreamer(inputConfig, minimalPipelineConfig, {}, outputHttpUrl))
.toBeRejectedWith(jasmine.objectContaining({
error_type: 'RuntimeError',
}));
});
}

function resolutionTests(manifestUrl, format) {

0 comments on commit 8cd8537

Please sign in to comment.