diff --git a/streamer/proxy_node.py b/streamer/proxy_node.py index fdbaf20..dbaeec5 100644 --- a/streamer/proxy_node.py +++ b/streamer/proxy_node.py @@ -29,6 +29,7 @@ # HTTP status codes HTTP_STATUS_CREATED = 201 HTTP_STATUS_ACCEPTED = 202 +HTTP_STATUS_NO_CONTENT = 204 HTTP_STATUS_FAILED = 500 # S3 has a minimum chunk size for multipart uploads. @@ -50,14 +51,15 @@ # Optional: To support GCS, import Google Cloud Storage library. try: - import google.cloud.storage as gcs # type: ignore + import google.cloud.storage # type: ignore + import google.api_core.exceptions # type: ignore SUPPORTED_PROTOCOLS.append('gs') except: pass # Optional: To support S3, import AWS's boto3 library. try: - import boto3 as aws # type: ignore + import boto3 # type: ignore SUPPORTED_PROTOCOLS.append('s3') except: pass @@ -92,9 +94,10 @@ def _reset(self, now: float) -> None: class RequestHandlerBase(BaseHTTPRequestHandler): - """A request handler that processes the PUT requests coming from - Shaka Packager and pushes them to the destination. + """A request handler that processes requests coming from Shaka Packager and + relays them to the destination. """ + def __init__(self, rate_limiter: RateLimiter, *args, **kwargs): self._rate_limiter: RateLimiter = rate_limiter @@ -120,7 +123,7 @@ def log_request(self, code: str = '-', size: str = '-') -> None: return super().log_request(code, size) def do_PUT(self) -> None: - """Handle the PUT requests coming from Shaka Packager.""" + """Handle PUT requests coming from Shaka Packager.""" if self._rate_limiter.suppress(self.path): # Skip this upload. @@ -167,6 +170,20 @@ def do_PUT(self) -> None: # "returned nothing". self.end_headers() + def do_DELETE(self) -> None: + """Handle DELETE requests coming from Shaka Packager.""" + try: + self.handle_delete(self.path) + self.send_response(HTTP_STATUS_NO_CONTENT) + except Exception as ex: + print('Upload failure: ' + str(ex)) + traceback.print_exc() + self.send_response(HTTP_STATUS_FAILED) + + # If we don't call this at the end of the handler, Packager says we + # "returned nothing". + self.end_headers() + @abc.abstractmethod def handle_non_chunked(self, path: str, length: int, file: BinaryIO) -> None: @@ -188,12 +205,16 @@ def end_chunked(self) -> None: """End the chunked transfer.""" pass + @abc.abstractmethod + def handle_delete(self, path: str) -> None: + """Delete the file from cloud storage.""" + pass class GCSHandler(RequestHandlerBase): # Can't annotate the bucket here as a parameter if we don't have the library. def __init__(self, bucket: Any, base_path: str, rate_limiter: RateLimiter, *args, **kwargs) -> None: - self._bucket: gcs.Bucket = bucket + self._bucket: google.cloud.storage.Bucket = bucket self._base_path: str = base_path self._chunked_output: Optional[BinaryIO] = None @@ -229,12 +250,23 @@ def end_chunked(self) -> None: self._chunked_output.close() self._chunked_output = None + def handle_delete(self, path: str) -> None: + # No leading slashes, or we get a blank folder name. + full_path = (self._base_path + path).strip('/') + blob = self._bucket.blob(full_path) + try: + blob.delete() + except google.api_core.exceptions.NotFound: + # Some delete calls seem to throw "not found", but the files still get + # deleted. So ignore these and don't fail the request. + pass + class S3Handler(RequestHandlerBase): # Can't annotate the client here as a parameter if we don't have the library. def __init__(self, client: Any, bucket_name: str, base_path: str, rate_limiter: RateLimiter, *args, **kwargs) -> None: - self._client: aws.Client = client + self._client: boto3.Client = client self._bucket_name: str = bucket_name self._base_path: str = base_path @@ -308,6 +340,10 @@ def end_chunked(self) -> None: self._next_part_number = 0 self._part_info = [] + def handle_delete(self, path: str) -> None: + self._client.delete_object( + Bucket=self._bucket_name, Key=self._upload_path) + class HTTPUploadBase(ThreadedNodeBase): """Runs an HTTP server at `self.server_location` to upload to cloud. @@ -378,7 +414,7 @@ def __init__(self, upload_location: str) -> None: super().__init__() url = urllib.parse.urlparse(upload_location) - self._client = gcs.Client() + self._client = google.cloud.storage.Client() self._bucket = self._client.bucket(url.netloc) # Strip both left and right slashes. Otherwise, we get a blank folder name. self._base_path = url.path.strip('/') @@ -396,7 +432,7 @@ def __init__(self, upload_location: str) -> None: super().__init__() url = urllib.parse.urlparse(upload_location) - self._client = aws.client('s3') + self._client = boto3.client('s3') self._bucket_name = url.netloc # Strip both left and right slashes. Otherwise, we get a blank folder name. self._base_path = url.path.strip('/')