Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cloud): Upload through HTTP proxy node #103

Merged
merged 15 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
{
"scripts": {
"lint": "python3 -m mypy streamer/",
"test": "python3 run_end_to_end_tests.py"
},
"devDependencies": {
"karma": "^6.3.16",
"karma-chrome-launcher": "^3.1.0",
Expand Down
8 changes: 1 addition & 7 deletions shaka-streamer
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,6 @@ def main():
with open(args.bitrate_config) as f:
bitrate_config_dict = yaml.safe_load(f)

if args.cloud_url:
if (not args.cloud_url.startswith('gs://') and
not args.cloud_url.startswith('s3://')):
parser.error('Invalid cloud URL! Only gs:// and s3:// URLs are supported')

try:
with controller.start(args.output, input_config_dict, pipeline_config_dict,
bitrate_config_dict, args.cloud_url,
Expand All @@ -114,8 +109,7 @@ def main():

time.sleep(1)
except (streamer.controller_node.VersionError,
streamer.configuration.ConfigError,
streamer.cloud_node.CloudAccessError) as e:
streamer.configuration.ConfigError) as e:
# These are common errors meant to give the user specific, helpful
# information. Format these errors in a relatively friendly way, with no
# backtrace or other Python-specific information.
Expand Down
190 changes: 0 additions & 190 deletions streamer/cloud_node.py

This file was deleted.

72 changes: 27 additions & 45 deletions streamer/controller_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from streamer import __version__
from streamer import autodetect
from streamer import min_versions
from streamer.cloud_node import CloudNode
from streamer.bitrate_configuration import BitrateConfig, AudioChannelLayout, VideoResolution
from streamer.external_command_node import ExternalCommandNode
from streamer.input_configuration import InputConfig, InputType, MediaType, Input
Expand All @@ -42,6 +41,7 @@
from streamer.pipeline_configuration import ManifestFormat, PipelineConfig, StreamingMode
from streamer.transcoder_node import TranscoderNode
from streamer.periodconcat_node import PeriodConcatNode
from streamer.proxy_node import ProxyNode
import streamer.subprocessWindowsPatch # side-effects only
from streamer.util import is_url
from streamer.pipe import Pipe
Expand Down Expand Up @@ -147,21 +147,6 @@ def next_short_version(version: str) -> str:
_check_command_version('Shaka Packager', ['packager', '-version'],
min_versions.PACKAGER)

if bucket_url:
# Check that the Google Cloud SDK is at least v212, which introduced
# gsutil 4.33 with an important rsync bug fix.
# https://cloud.google.com/sdk/docs/release-notes
# https://github.com/GoogleCloudPlatform/gsutil/blob/master/CHANGES.md
# This is only required if the user asked for upload to cloud storage.
_check_command_version('Google Cloud SDK', ['gcloud', '--version'],
(212, 0, 0))


if bucket_url:
# If using cloud storage, make sure the user is logged in and can access
# the destination, independent of the version check above.
CloudNode.check_access(bucket_url)

self.hermetic_ffmpeg: Optional[str] = None
self.hermetic_packager: Optional[str] = None
if use_hermetic:
Expand All @@ -181,27 +166,35 @@ def next_short_version(version: str) -> str:
self._input_config = InputConfig(input_config_dict)
self._pipeline_config = PipelineConfig(pipeline_config_dict)

if not is_url(output_location):
# Check if the directory for outputted Packager files exists, and if it
# does, delete it and remake a new one.
if os.path.exists(output_location):
shutil.rmtree(output_location)
os.mkdir(output_location)
else:
if bucket_url is not None:
# Check some restrictions and other details on HTTP output.
if not ProxyNode.is_understood(bucket_url):
url_prefixes = [
protocol + '://' for protocol in ProxyNode.ALL_SUPPORTED_PROTOCOLS]
raise RuntimeError(
'Invalid cloud URL! Only these are supported: ' +
', '.join(url_prefixes))

if not ProxyNode.is_supported(bucket_url):
raise RuntimeError('Missing libraries for cloud URL: ' + bucket_url)

if not self._pipeline_config.segment_per_file:
raise RuntimeError(
'For HTTP PUT uploads, the pipeline segment_per_file setting ' +
'must be set to True!')

if bucket_url:
raise RuntimeError(
'Cloud bucket upload is incompatible with HTTP PUT support.')
upload_proxy = ProxyNode.create(bucket_url)
upload_proxy.start()

if self._input_config.multiperiod_inputs_list:
# TODO: Edit Multiperiod input list implementation to support HTTP outputs
raise RuntimeError(
'Multiperiod input list support is incompatible with HTTP outputs.')
# All the outputs now should be sent to the proxy server instead.
output_location = upload_proxy.server_location
self._nodes.append(upload_proxy)
else:
# Check if the directory for outputted Packager files exists, and if it
# does, delete it and remake a new one.
if os.path.exists(output_location):
shutil.rmtree(output_location)
os.mkdir(output_location)

if self._pipeline_config.low_latency_dash_mode:
# Check some restrictions on LL-DASH packaging.
Expand All @@ -214,16 +207,16 @@ def next_short_version(version: str) -> str:
raise RuntimeError(
'For low_latency_dash_mode, the utc_timings must be set.')

# Note that we remove the trailing slash from the output location, because
# otherwise GCS would create a subdirectory whose name is "".
output_location = output_location.rstrip('/')

if self._input_config.inputs:
# InputConfig contains inputs only.
self._append_nodes_for_inputs_list(self._input_config.inputs,
output_location)
else:
# InputConfig contains multiperiod_inputs_list only.
if bucket_url:
raise RuntimeError(
'Direct cloud upload is incompatible with multiperiod support.')

# Create one Transcoder node and one Packager node for each period.
for i, singleperiod in enumerate(self._input_config.multiperiod_inputs_list):
sub_dir_name = 'period_' + str(i + 1)
Expand All @@ -238,17 +231,6 @@ def next_short_version(version: str) -> str:
packager_nodes,
output_location))

if bucket_url:
cloud_temp_dir = os.path.join(self._temp_dir, 'cloud')
os.mkdir(cloud_temp_dir)

packager_nodes = [node for node in self._nodes if isinstance(node, PackagerNode)]
self._nodes.append(CloudNode(output_location,
bucket_url,
cloud_temp_dir,
packager_nodes,
self.is_vod()))

for node in self._nodes:
node.start()

Expand Down
13 changes: 10 additions & 3 deletions streamer/node_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,16 @@ class ThreadedNodeBase(NodeBase):
The thread repeats some callback in a background thread.
"""

_thread: Optional[threading.Thread]

def __init__(self, thread_name: str, continue_on_exception: bool, sleep_time: float):
super().__init__()
self._status = ProcessStatus.Finished
self._thread_name = thread_name
self._thread = None
self._continue_on_exception = continue_on_exception
self._sleep_time = sleep_time
self._thread = threading.Thread(target=self._thread_main, name=thread_name)
self._sleep_waker_event = threading.Event()

def _thread_main(self) -> None:
while self._status == ProcessStatus.Running:
Expand All @@ -183,7 +186,7 @@ def _thread_main(self) -> None:
return

# Wait a little bit before performing the next pass.
time.sleep(self._sleep_time)
self._sleep_waker_event.wait(self._sleep_time)

@abc.abstractmethod
def _thread_single_pass(self) -> None:
Expand All @@ -200,11 +203,15 @@ def _thread_single_pass(self) -> None:

def start(self) -> None:
self._status = ProcessStatus.Running
self._thread = threading.Thread(target=self._thread_main, name=self._thread_name)
self._thread.start()

def stop(self, status: Optional[ProcessStatus]) -> None:
self._status = ProcessStatus.Finished
self._thread.join()
# If the thread was sleeping, wake it up.
self._sleep_waker_event.set()
if self._thread:
self._thread.join()

def check_status(self) -> ProcessStatus:
return self._status
Loading