Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cloud): Avoid rate limit issues on live streams #162

Merged
merged 2 commits into from
Oct 23, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 69 additions & 11 deletions streamer/proxy_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@

import abc
import threading
import time
import traceback
import urllib.parse
from typing import Any, IO, Optional

from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
from typing import Any, BinaryIO, Optional

from streamer.node_base import ProcessStatus, ThreadedNodeBase


# HTTP status codes
HTTP_STATUS_CREATED = 201
HTTP_STATUS_ACCEPTED = 202
HTTP_STATUS_FAILED = 500

# S3 has a minimum chunk size for multipart uploads.
Expand All @@ -40,6 +43,11 @@
ALL_SUPPORTED_PROTOCOLS: list[str] = ['gs', 's3']


# Don't write the same file more than once per rate limiter period.
# For live streams, this avoids HTTP 429 "Too many request" errors.
RATE_LIMITER_PERIOD_IN_SECONDS = 2


# Optional: To support GCS, import Google Cloud Storage library.
try:
import google.cloud.storage as gcs # type: ignore
Expand All @@ -55,12 +63,57 @@
pass


class RateLimiter(object):
"""A rate limiter that tracks which files we have written to recently."""

def __init__(self) -> None:
self._reset()

def suppress(self, path) -> bool:
"""Returns true if you should skip this upload."""

now = self._now()
if self._last_check != now:
self._reset()
joeyparrish marked this conversation as resolved.
Show resolved Hide resolved

if path in self._recent_files:
return True # skip

self._recent_files.add(path)
return False # upload

def _reset(self) -> None:
self._recent_files: set[str] = set()
self._last_check: int = self._now()

def _now(self) -> int:
"""A timestamp used internally to track recency."""

return int(time.time() // RATE_LIMITER_PERIOD_IN_SECONDS)


class RequestHandlerBase(BaseHTTPRequestHandler):
"""A request handler that processes the PUT requests coming from
Shaka Packager and pushes them to the destination.
"""
def __init__(self, rate_limiter: RateLimiter, *args, **kwargs):
self._rate_limiter: RateLimiter = rate_limiter

# The HTTP server passes *args and *kwargs that we need to pass along, but
# don't otherwise care about. This must happen last, or somehow our
# members never get set.
super().__init__(*args, **kwargs)

def do_PUT(self) -> None:
"""Handle the PUT requests coming from Shaka Packager."""

if self._rate_limiter.suppress(self.path):
# Skip this upload.
self.rfile.close()
self.send_response(HTTP_STATUS_ACCEPTED)
self.end_headers()
return

try:
if self.headers.get('Transfer-Encoding', '').lower() == 'chunked':
# Here we parse the chunked transfer encoding and delegate to the
Expand Down Expand Up @@ -100,7 +153,8 @@ def do_PUT(self) -> None:
self.end_headers()

@abc.abstractmethod
def handle_non_chunked(self, path: str, length: int, file: IO) -> None:
def handle_non_chunked(self, path: str, length: int,
file: BinaryIO) -> None:
"""Write the non-chunked data stream from |file| to the destination."""
pass

Expand All @@ -123,17 +177,18 @@ def end_chunked(self) -> None:
class GCSHandler(RequestHandlerBase):
# Can't annotate the bucket here as a parameter if we don't have the library.
def __init__(self, bucket: Any, base_path: str,
*args, **kwargs) -> None:
rate_limiter: RateLimiter, *args, **kwargs) -> None:
self._bucket: gcs.Bucket = bucket
self._base_path: str = base_path
self._chunked_output: Optional[IO] = None
self._chunked_output: Optional[BinaryIO] = None

# The HTTP server passes *args and *kwargs that we need to pass along, but
# don't otherwise care about. This must happen last, or somehow our
# members never get set.
super().__init__(*args, **kwargs)
super().__init__(rate_limiter, *args, **kwargs)

def handle_non_chunked(self, path: str, length: int, file: IO) -> None:
def handle_non_chunked(self, path: str, length: int,
file: BinaryIO) -> None:
# No leading slashes, or we get a blank folder name.
full_path = (self._base_path + path).strip('/')
blob = self._bucket.blob(full_path)
Expand Down Expand Up @@ -163,7 +218,7 @@ def end_chunked(self) -> None:
class S3Handler(RequestHandlerBase):
# Can't annotate the client here as a parameter if we don't have the library.
def __init__(self, client: Any, bucket_name: str, base_path: str,
*args, **kwargs) -> None:
rate_limiter: RateLimiter, *args, **kwargs) -> None:
self._client: aws.Client = client
self._bucket_name: str = bucket_name
self._base_path: str = base_path
Expand All @@ -178,9 +233,10 @@ def __init__(self, client: Any, bucket_name: str, base_path: str,
# The HTTP server passes *args and *kwargs that we need to pass along, but
# don't otherwise care about. This must happen last, or somehow our
# members never get set.
super().__init__(*args, **kwargs)
super().__init__(rate_limiter, *args, **kwargs)

def handle_non_chunked(self, path: str, length: int, file: IO) -> None:
def handle_non_chunked(self, path: str, length: int,
file: BinaryIO) -> None:
# No leading slashes, or we get a blank folder name.
full_path = (self._base_path + path).strip('/')
# length is unused here.
Expand Down Expand Up @@ -253,6 +309,7 @@ def __init__(self) -> None:
super().__init__(thread_name=self.__class__.__name__,
continue_on_exception=True,
sleep_time=3)
self._rate_limiter = RateLimiter()

@abc.abstractmethod
def create_handler(self, *args, **kwargs) -> BaseHTTPRequestHandler:
Expand Down Expand Up @@ -313,7 +370,8 @@ def __init__(self, upload_location: str) -> None:

def create_handler(self, *args, **kwargs) -> BaseHTTPRequestHandler:
"""Returns a cloud-provider-specific request handler to upload to cloud."""
return GCSHandler(self._bucket, self._base_path, *args, **kwargs)
return GCSHandler(self._bucket, self._base_path,
self._rate_limiter, *args, **kwargs)


class S3Upload(HTTPUploadBase):
Expand All @@ -331,7 +389,7 @@ def __init__(self, upload_location: str) -> None:
def create_handler(self, *args, **kwargs) -> BaseHTTPRequestHandler:
"""Returns a cloud-provider-specific request handler to upload to cloud."""
return S3Handler(self._client, self._bucket_name, self._base_path,
*args, **kwargs)
self._rate_limiter, *args, **kwargs)


class ProxyNode(object):
Expand Down