Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cloud): Add cloud delete support #164

Merged
merged 1 commit into from
Oct 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 45 additions & 9 deletions streamer/proxy_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
# HTTP status codes
HTTP_STATUS_CREATED = 201
HTTP_STATUS_ACCEPTED = 202
HTTP_STATUS_NO_CONTENT = 204
HTTP_STATUS_FAILED = 500

# S3 has a minimum chunk size for multipart uploads.
Expand All @@ -50,14 +51,15 @@

# Optional: To support GCS, import Google Cloud Storage library.
try:
import google.cloud.storage as gcs # type: ignore
import google.cloud.storage # type: ignore
import google.api_core.exceptions # type: ignore
SUPPORTED_PROTOCOLS.append('gs')
except:
pass

# Optional: To support S3, import AWS's boto3 library.
try:
import boto3 as aws # type: ignore
import boto3 # type: ignore
SUPPORTED_PROTOCOLS.append('s3')
except:
pass
Expand Down Expand Up @@ -92,9 +94,10 @@ def _reset(self, now: float) -> None:


class RequestHandlerBase(BaseHTTPRequestHandler):
"""A request handler that processes the PUT requests coming from
Shaka Packager and pushes them to the destination.
"""A request handler that processes requests coming from Shaka Packager and
relays them to the destination.
"""

def __init__(self, rate_limiter: RateLimiter, *args, **kwargs):
self._rate_limiter: RateLimiter = rate_limiter

Expand All @@ -104,7 +107,7 @@ def __init__(self, rate_limiter: RateLimiter, *args, **kwargs):
super().__init__(*args, **kwargs)

def do_PUT(self) -> None:
"""Handle the PUT requests coming from Shaka Packager."""
"""Handle PUT requests coming from Shaka Packager."""

if self._rate_limiter.suppress(self.path):
# Skip this upload.
Expand Down Expand Up @@ -151,6 +154,20 @@ def do_PUT(self) -> None:
# "returned nothing".
self.end_headers()

def do_DELETE(self) -> None:
"""Handle DELETE requests coming from Shaka Packager."""
try:
self.handle_delete(self.path)
self.send_response(HTTP_STATUS_NO_CONTENT)
except Exception as ex:
print('Upload failure: ' + str(ex))
traceback.print_exc()
self.send_response(HTTP_STATUS_FAILED)

# If we don't call this at the end of the handler, Packager says we
# "returned nothing".
self.end_headers()

@abc.abstractmethod
def handle_non_chunked(self, path: str, length: int,
file: BinaryIO) -> None:
Expand All @@ -172,12 +189,16 @@ def end_chunked(self) -> None:
"""End the chunked transfer."""
pass

@abc.abstractmethod
def handle_delete(self, path: str) -> None:
"""Delete the file from cloud storage."""
pass

class GCSHandler(RequestHandlerBase):
# Can't annotate the bucket here as a parameter if we don't have the library.
def __init__(self, bucket: Any, base_path: str,
rate_limiter: RateLimiter, *args, **kwargs) -> None:
self._bucket: gcs.Bucket = bucket
self._bucket: google.cloud.storage.Bucket = bucket
self._base_path: str = base_path
self._chunked_output: Optional[BinaryIO] = None

Expand Down Expand Up @@ -213,12 +234,23 @@ def end_chunked(self) -> None:
self._chunked_output.close()
self._chunked_output = None

def handle_delete(self, path: str) -> None:
# No leading slashes, or we get a blank folder name.
full_path = (self._base_path + path).strip('/')
blob = self._bucket.blob(full_path)
try:
blob.delete()
except google.api_core.exceptions.NotFound:
# Some delete calls seem to throw "not found", but the files still get
# deleted. So ignore these and don't fail the request.
pass


class S3Handler(RequestHandlerBase):
# Can't annotate the client here as a parameter if we don't have the library.
def __init__(self, client: Any, bucket_name: str, base_path: str,
rate_limiter: RateLimiter, *args, **kwargs) -> None:
self._client: aws.Client = client
self._client: boto3.Client = client
self._bucket_name: str = bucket_name
self._base_path: str = base_path

Expand Down Expand Up @@ -292,6 +324,10 @@ def end_chunked(self) -> None:
self._next_part_number = 0
self._part_info = []

def handle_delete(self, path: str) -> None:
self._client.delete_object(
Bucket=self._bucket_name, Key=self._upload_path)


class HTTPUploadBase(ThreadedNodeBase):
"""Runs an HTTP server at `self.server_location` to upload to cloud.
Expand Down Expand Up @@ -362,7 +398,7 @@ def __init__(self, upload_location: str) -> None:
super().__init__()

url = urllib.parse.urlparse(upload_location)
self._client = gcs.Client()
self._client = google.cloud.storage.Client()
self._bucket = self._client.bucket(url.netloc)
# Strip both left and right slashes. Otherwise, we get a blank folder name.
self._base_path = url.path.strip('/')
Expand All @@ -380,7 +416,7 @@ def __init__(self, upload_location: str) -> None:
super().__init__()

url = urllib.parse.urlparse(upload_location)
self._client = aws.client('s3')
self._client = boto3.client('s3')
self._bucket_name = url.netloc
# Strip both left and right slashes. Otherwise, we get a blank folder name.
self._base_path = url.path.strip('/')
Expand Down