-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Http url output support #2
Changes from all commits
2227485
905eb26
fe0e2c1
4142edd
e5a9fb1
3969e56
a142d07
5768e5f
a3610bd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,7 @@ | |
from streamer.pipeline_configuration import PipelineConfig, StreamingMode | ||
from streamer.transcoder_node import TranscoderNode | ||
from streamer.periodconcat_node import PeriodConcatNode | ||
from streamer.util import is_url | ||
|
||
|
||
class ControllerNode(object): | ||
|
@@ -96,7 +97,7 @@ def _create_pipe(self, suffix = '') -> str: | |
|
||
return path | ||
|
||
def start(self, output_dir: str, | ||
def start(self, output_location: str, | ||
input_config_dict: Dict[str, Any], | ||
pipeline_config_dict: Dict[str, Any], | ||
bitrate_config_dict: Dict[Any, Any] = {}, | ||
|
@@ -137,14 +138,6 @@ def start(self, output_dir: str, | |
# the destination, independent of the version check above. | ||
CloudNode.check_access(bucket_url) | ||
|
||
|
||
self._output_dir = output_dir | ||
# Check if the directory for outputted Packager files exists, and if it | ||
# does, delete it and remake a new one. | ||
if os.path.exists(self._output_dir): | ||
shutil.rmtree(self._output_dir) | ||
os.mkdir(self._output_dir) | ||
|
||
# Define resolutions and bitrates before parsing other configs. | ||
bitrate_config = BitrateConfig(bitrate_config_dict) | ||
|
||
|
@@ -157,29 +150,55 @@ def start(self, output_dir: str, | |
self._input_config = InputConfig(input_config_dict) | ||
self._pipeline_config = PipelineConfig(pipeline_config_dict) | ||
|
||
if not is_url(output_location): | ||
# Check if the directory for outputted Packager files exists, and if it | ||
# does, delete it and remake a new one. | ||
if os.path.exists(output_location): | ||
shutil.rmtree(output_location) | ||
os.mkdir(output_location) | ||
else: | ||
# Check some restrictions and other details on HTTP output. | ||
if not self._pipeline_config.segment_per_file: | ||
raise RuntimeError( | ||
'For HTTP PUT uploads, the pipeline segment_per_file setting ' + | ||
'must be set to True!') | ||
|
||
if bucket_url: | ||
raise RuntimeError( | ||
'Cloud bucket upload is incompatible with HTTP PUT support.') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You have my permission to kill CloudNode once we have a built-in auth proxy that can handle GCS. We're currently using CloudNode internally for this stream: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Woohoo! I look forward to the day when we can kill it. :D |
||
|
||
if self._input_config.multiperiod_inputs_list: | ||
# TODO: Edit Multiperiod input list implementation to support HTTP outputs | ||
raise RuntimeError( | ||
'Multiperiod input list support is incompatible with HTTP outputs.') | ||
|
||
# Note that we remove the trailing slash from the output location, because | ||
# otherwise GCS would create a subdirectory whose name is "". | ||
output_location = output_location.rstrip('/') | ||
|
||
if self._input_config.inputs: | ||
# InputConfig contains inputs only. | ||
self._append_nodes_for_inputs_list(self._input_config.inputs) | ||
self._append_nodes_for_inputs_list(self._input_config.inputs, output_location) | ||
else: | ||
# InputConfig contains multiperiod_inputs_list only. | ||
# Create one Transcoder node and one Packager node for each period. | ||
for i, singleperiod in enumerate(self._input_config.multiperiod_inputs_list): | ||
sub_dir_name = 'period_' + str(i) | ||
self._append_nodes_for_inputs_list(singleperiod.inputs, sub_dir_name) | ||
self._append_nodes_for_inputs_list(singleperiod.inputs, output_location, sub_dir_name) | ||
|
||
if self._pipeline_config.streaming_mode == StreamingMode.VOD: | ||
packager_nodes = [node for node in self._nodes if isinstance(node, PackagerNode)] | ||
self._nodes.append(PeriodConcatNode( | ||
self._pipeline_config, | ||
packager_nodes, | ||
self._output_dir)) | ||
output_location)) | ||
|
||
if bucket_url: | ||
cloud_temp_dir = os.path.join(self._temp_dir, 'cloud') | ||
os.mkdir(cloud_temp_dir) | ||
|
||
packager_nodes = [node for node in self._nodes if isinstance(node, PackagerNode)] | ||
self._nodes.append(CloudNode(self._output_dir, | ||
self._nodes.append(CloudNode(output_location, | ||
bucket_url, | ||
cloud_temp_dir, | ||
packager_nodes, | ||
|
@@ -190,7 +209,7 @@ def start(self, output_dir: str, | |
|
||
return self | ||
|
||
def _append_nodes_for_inputs_list(self, inputs: List[Input], | ||
def _append_nodes_for_inputs_list(self, inputs: List[Input], output_location: str, | ||
period_dir: Optional[str] = None) -> None: | ||
"""A common method that creates Transcoder and Packager nodes for a list of Inputs passed to it. | ||
|
||
|
@@ -250,17 +269,15 @@ def _append_nodes_for_inputs_list(self, inputs: List[Input], | |
self._nodes.append(TranscoderNode(inputs, | ||
self._pipeline_config, | ||
outputs)) | ||
|
||
output_dir = self._output_dir | ||
|
||
# If the inputs list was a period in multiperiod_inputs_list, create a nested directory | ||
# and put that period in it. | ||
if period_dir: | ||
output_dir = os.path.join(output_dir, period_dir) | ||
os.mkdir(output_dir) | ||
output_location = os.path.join(output_location, period_dir) | ||
os.mkdir(output_location) | ||
|
||
self._nodes.append(PackagerNode(self._pipeline_config, | ||
output_dir, | ||
output_location, | ||
outputs)) | ||
|
||
def check_status(self) -> ProcessStatus: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
# Copyright 2019 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# https://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
"""Utility functions used by multiple modules.""" | ||
|
||
def is_url(output_location: str) -> bool: | ||
"""Returns True if the output location is a URL.""" | ||
return (output_location.startswith('http:') or | ||
output_location.startswith('https:')) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we wish to use a more specific error like ConflictingFields or MalformedField, found in
configuration.py
? I feel that these errors are more for configuration inputs though, and the output is its own entity rather than a configuration. We can also create a new error type ifRuntime Error
does not suffice?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're using RuntimeError in a few similar places, so I think this is okay for now. I'd be fine with a new error type, if you like, and it may become necessary or useful to have one in the future. But I'm not worried about it at the moment.