Skip to content

Commit

Permalink
Refactor, simplify, revive PR
Browse files Browse the repository at this point in the history
 - Add convenience script for linting and testing
 - Revert -c for compatibility
 - Remove -H (use case unclear)
 - Do not treat proxy node as special
 - Simplify interface to proxy node
 - Simplify proxy node internals to use Python libraries for GCS and S3
 - Disable multiperiod with cloud upload (for now)
 - Revert changes to multiperiod node
  • Loading branch information
joeyparrish committed Oct 22, 2024
1 parent e6d4db8 commit 71642c1
Show file tree
Hide file tree
Showing 8 changed files with 239 additions and 507 deletions.
4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
{
"scripts": {
"lint": "python3 -m mypy streamer/",
"test": "python3 run_end_to_end_tests.py"
},
"devDependencies": {
"karma": "^6.3.16",
"karma-chrome-launcher": "^3.1.0",
Expand Down
17 changes: 5 additions & 12 deletions shaka-streamer
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,15 @@ def main():
'bitrates and resolutions for transcoding. ' +
'(optional, see example in ' +
'config_files/bitrate_config.yaml)')
parser.add_argument('-c', '--cloud-url',
default=None,
help='The Google Cloud Storage or Amazon S3 URL to ' +
'upload to. (Starts with gs:// or s3://)')
parser.add_argument('-o', '--output',
default='output_files',
help='The output folder to write files to, or an HTTP ' +
'or HTTPS URL where files will be PUT.' +
'Used even if uploading to cloud storage.')
parser.add_argument('-H', '--add-header',
action='append',
metavar='key=value',
help='Headers to include when sending PUT requests ' +
'to the specified output location.')
parser.add_argument('--skip-deps-check',
action='store_true',
help='Skip checks for dependencies and their versions. ' +
Expand All @@ -97,15 +96,9 @@ def main():
with open(args.bitrate_config) as f:
bitrate_config_dict = yaml.safe_load(f)

extra_headers = {}
if args.add_header:
for header in args.add_header:
key, value = tuple(header.split('=', 1))
extra_headers[key] = value

try:
with controller.start(args.output, input_config_dict, pipeline_config_dict,
bitrate_config_dict, extra_headers,
bitrate_config_dict, args.cloud_url,
not args.skip_deps_check,
not args.use_system_binaries):
# Sleep so long as the pipeline is still running.
Expand Down
69 changes: 24 additions & 45 deletions streamer/controller_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from streamer import __version__
from streamer import autodetect
from streamer import min_versions
from streamer import proxy_node
from streamer.bitrate_configuration import BitrateConfig, AudioChannelLayout, VideoResolution
from streamer.external_command_node import ExternalCommandNode
from streamer.input_configuration import InputConfig, InputType, MediaType, Input
Expand All @@ -42,7 +41,7 @@
from streamer.pipeline_configuration import ManifestFormat, PipelineConfig, StreamingMode
from streamer.transcoder_node import TranscoderNode
from streamer.periodconcat_node import PeriodConcatNode
from streamer.proxy_node import HTTPUpload
from streamer.proxy_node import ProxyNode
import streamer.subprocessWindowsPatch # side-effects only
from streamer.util import is_url
from streamer.pipe import Pipe
Expand All @@ -61,7 +60,6 @@ def __init__(self) -> None:
dir=global_temp_dir, prefix='shaka-live-', suffix='')

self._nodes: List[NodeBase] = []
self.upload_proxy: Optional[HTTPUpload] = None

def __del__(self) -> None:
# Clean up named pipes by removing the temp directory we placed them in.
Expand All @@ -77,7 +75,7 @@ def start(self, output_location: str,
input_config_dict: Dict[str, Any],
pipeline_config_dict: Dict[str, Any],
bitrate_config_dict: Dict[Any, Any] = {},
extra_headers: Dict[str, str] = {},
bucket_url: Union[str, None] = None,
check_deps: bool = True,
use_hermetic: bool = True) -> 'ControllerNode':
"""Create and start all other nodes.
Expand Down Expand Up @@ -168,15 +166,22 @@ def next_short_version(version: str) -> str:
self._input_config = InputConfig(input_config_dict)
self._pipeline_config = PipelineConfig(pipeline_config_dict)

# Note that we remove the trailing slash from the output location, because
# otherwise GCS would create a subdirectory whose name is "".
output_location = output_location.rstrip('/')
if (proxy_node.is_supported_protocol(output_location)
and self._pipeline_config.use_local_proxy):
self.upload_proxy = self.get_upload_node(output_location, extra_headers)
if bucket_url is not None:
if not ProxyNode.is_understood(bucket_url):
url_prefixes = [
protocol + '://' for protocol in ProxyNode.ALL_SUPPORTED_PROTOCOLS]
raise RuntimeError(
'Invalid cloud URL! Only these are supported: ' +
', '.join(url_prefixes))

if not ProxyNode.is_supported(bucket_url):
raise RuntimeError('Missing libraries for cloud URL: ' + bucket_url)

upload_proxy = ProxyNode.create(bucket_url)

# All the outputs now should be sent to the proxy server instead.
output_location = self.upload_proxy.server_location
self._nodes.append(self.upload_proxy)
output_location = upload_proxy.server_location
self._nodes.append(upload_proxy)

if not is_url(output_location):
# Check if the directory for outputted Packager files exists, and if it
Expand Down Expand Up @@ -208,6 +213,10 @@ def next_short_version(version: str) -> str:
output_location)
else:
# InputConfig contains multiperiod_inputs_list only.
if bucket_url:
raise RuntimeError(
'Direct cloud upload is incompatible with multiperiod support.')

# Create one Transcoder node and one Packager node for each period.
for i, singleperiod in enumerate(self._input_config.multiperiod_inputs_list):
sub_dir_name = 'period_' + str(i + 1)
Expand All @@ -220,8 +229,7 @@ def next_short_version(version: str) -> str:
self._nodes.append(PeriodConcatNode(
self._pipeline_config,
packager_nodes,
output_location,
self.upload_proxy))
output_location))

for node in self._nodes:
node.start()
Expand Down Expand Up @@ -309,8 +317,7 @@ def _append_nodes_for_inputs_list(self, inputs: List[Input],
# and put that period in it.
if period_dir:
output_location = os.path.join(output_location, period_dir)
if not is_url(output_location):
os.mkdir(output_location)
os.mkdir(output_location)

self._nodes.append(PackagerNode(self._pipeline_config,
output_location,
Expand All @@ -324,15 +331,11 @@ def check_status(self) -> ProcessStatus:
If one node is errored, this returns Errored; otherwise if one node is running,
this returns Running; this only returns Finished if all nodes are finished.
If there are no nodes, this returns Finished.
:rtype: ProcessStatus
"""
if not self._nodes:
return ProcessStatus.Finished

value = max(node.check_status().value for node in self._nodes
# We don't check the the upload node.
if node != self.upload_proxy)
value = max(node.check_status().value for node in self._nodes)
return ProcessStatus(value)

def stop(self) -> None:
Expand All @@ -358,30 +361,6 @@ def is_low_latency_dash_mode(self) -> bool:

return self._pipeline_config.low_latency_dash_mode

def get_upload_node(self, upload_location: str,
extra_headers: Dict[str, str]) -> HTTPUpload:
"""
Args:
upload_location (str): The location where media content will be uploaded.
extra_headers (Dict[str, str]): Extra headers to be added when
sending the PUT request to `upload_location`.
:rtype: HTTPUpload
:raises: `RuntimeError` if the protocol used in `upload_location` was not
recognized.
"""

# We need to pass a temporary direcotry when working with multi-period input
# and using HTTP PUT for uploading. This is so that the HTTPUpload
# keeps a copy of the manifests in the temporary directory so we can use them
# later to assemble the multi-period manifests.
upload_temp_dir = None
if self._input_config.multiperiod_inputs_list:
upload_temp_dir = os.path.join(self._temp_dir, 'multiperiod_manifests')
os.mkdir(upload_temp_dir)
return proxy_node.get_upload_node(upload_location, extra_headers,
upload_temp_dir)

class VersionError(Exception):
"""A version error for one of Shaka Streamer's external dependencies.
Expand Down
66 changes: 10 additions & 56 deletions streamer/periodconcat_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,14 @@
import os
import re
import time
from typing import List, Optional
from typing import List
from xml.etree import ElementTree
from http.client import HTTPConnection, CREATED
from streamer import __version__
from streamer.node_base import ProcessStatus, ThreadedNodeBase
from streamer.packager_node import PackagerNode
from streamer.pipeline_configuration import PipelineConfig, ManifestFormat
from streamer.output_stream import AudioOutputStream, VideoOutputStream
from streamer.m3u8_concater import HLSConcater
from streamer.proxy_node import HTTPUpload
from streamer.util import is_url


class PeriodConcatNode(ThreadedNodeBase):
Expand All @@ -38,20 +35,17 @@ class PeriodConcatNode(ThreadedNodeBase):
def __init__(self,
pipeline_config: PipelineConfig,
packager_nodes: List[PackagerNode],
output_location: str,
upload_proxy: Optional[HTTPUpload]) -> None:
output_location: str) -> None:
"""Stores all relevant information needed for the period concatenation."""
super().__init__(thread_name='periodconcat', continue_on_exception=False, sleep_time=1)
super().__init__(thread_name='periodconcat', continue_on_exception=False, sleep_time=3)
self._pipeline_config = pipeline_config
self._output_location = output_location
self._packager_nodes: List[PackagerNode] = packager_nodes
self._proxy_node = upload_proxy
self._concat_will_fail = self._check_failed_concatenation()

def _check_failed_concatenation(self) -> bool:
self._concat_will_fail = False

# know whether the first period has video and audio or not.
fp_has_vid, fp_has_aud = False, False
for output_stream in self._packager_nodes[0].output_streams:
for output_stream in packager_nodes[0].output_streams:
if isinstance(output_stream, VideoOutputStream):
fp_has_vid = True
elif isinstance(output_stream, AudioOutputStream):
Expand All @@ -65,6 +59,7 @@ def _check_failed_concatenation(self) -> bool:
elif isinstance(output_stream, AudioOutputStream):
has_aud = True
if has_vid != fp_has_vid or has_aud != fp_has_aud:
self._concat_will_fail = True
print("\nWARNING: Stopping period concatenation.")
print("Period#{} has {}video and has {}audio while Period#1 "
"has {}video and has {}audio.".format(i + 1,
Expand All @@ -77,21 +72,8 @@ def _check_failed_concatenation(self) -> bool:
"\tperiods with other periods that have video.\n"
"\tThis is necessary for the concatenation to be performed successfully.\n")
time.sleep(5)
return True

if self._proxy_node is None and is_url(self._output_location):
print("\nWARNING: Stopping period concatenation.")
print("Shaka Packager is using HTTP PUT but not using"
" Shaka Streamer's upload proxy.")
print("\nHINT:\n\tShaka Streamer's upload proxy stores the manifest files\n"
"\ttemporarily in the local filesystem to use them for period concatenation.\n"
"\tSet use_local_proxy to True in the pipeline config to enable the"
" upload proxy.\n")
time.sleep(5)
return True
# Otherwise, we don't have a reason to fail.
return False

break

def _thread_single_pass(self) -> None:
"""Watches all the PackagerNode(s), if at least one of them is running it skips this
_thread_single_pass, if all of them are finished, it starts period concatenation, if one of
Expand All @@ -108,41 +90,13 @@ def _thread_single_pass(self) -> None:
'to an error in PackagerNode#{}.'.format(i + 1))

if self._concat_will_fail:
raise RuntimeError('Unable to concatenate the inputs')

# If the packager was pushing HTTP requests to the stream's proxy server,
# the proxy server should have stored the manifest files in a temporary
# directory in the filesystem.
if self._proxy_node is not None:
assert self._proxy_node.temp_dir, ('There should be a proxy temp direcotry'
' when processing multi-period input')
self._output_location = self._proxy_node.temp_dir
# As the period concatenator node is the last to run, changing the
# output location at run time won't disturb any other node.
for packager_node in self._packager_nodes:
packager_node.output_location = packager_node.output_location.replace(
self._proxy_node.server_location,
self._proxy_node.temp_dir, 1)
raise RuntimeError('Unable to concatenate the inputs.')

if ManifestFormat.DASH in self._pipeline_config.manifest_format:
self._dash_concat()

if ManifestFormat.HLS in self._pipeline_config.manifest_format:
self._hls_concat()

# Push the concatenated manifests if a proxy is used.
if self._proxy_node is not None:
conn = HTTPConnection(self._proxy_node.server.server_name,
self._proxy_node.server.server_port)
# The concatenated manifest files where written in `self._output_location`.
for manifest_file_name in os.listdir(self._output_location):
if manifest_file_name.endswith(('.mpd', '.m3u8')):
manifest_file_path = os.path.join(self._output_location, manifest_file_name)
conn.request('PUT', '/' + manifest_file_name, open(manifest_file_path, 'r'))
res = conn.getresponse()
if res.status != CREATED:
print("Got unexpected status code: {}, Msg: {!r}".format(res.status,
res.read()))

self._status = ProcessStatus.Finished

Expand Down
2 changes: 1 addition & 1 deletion streamer/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
class Pipe:
"""A class that represents a pipe."""

def __init__(self):
def __init__(self) -> None:
"""Initializes a non-functioning pipe."""

self._read_pipe_name = ''
Expand Down
5 changes: 0 additions & 5 deletions streamer/pipeline_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,11 +320,6 @@ class PipelineConfig(configuration.Base):
default=EncryptionConfig({})).cast()
"""Encryption settings."""

use_local_proxy = configuration.Field(bool, default=True).cast()
"""Whether to use shaka streamer's local proxy when uploading to a remote
storage. This must be set to True when uploading to GCS or amazon S3 buckets.
"""

# TODO: Generalize this to low_latency_mode once LL-HLS is supported by Packager
low_latency_dash_mode = configuration.Field(bool, default=False).cast()
"""If true, stream in low latency mode for DASH."""
Expand Down
Loading

0 comments on commit 71642c1

Please sign in to comment.