-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
null object pattern in object storage
- Loading branch information
1 parent
308f914
commit b46c460
Showing
3 changed files
with
118 additions
and
79 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,61 +1,104 @@ | ||
import abc | ||
from typing import Optional | ||
|
||
import structlog | ||
from boto3 import client | ||
from botocore.client import Config | ||
from django.conf import settings | ||
|
||
logger = structlog.get_logger(__name__) | ||
|
||
from django.conf import settings | ||
|
||
s3_client = None | ||
class ObjectStorageError(Exception): | ||
pass | ||
|
||
|
||
class S3(metaclass=abc.ABCMeta): | ||
"""Just because the full S3 API is available doesn't mean we should use it all""" | ||
|
||
@abc.abstractmethod | ||
def head_bucket(self, bucket: str) -> bool: | ||
pass | ||
|
||
@abc.abstractmethod | ||
def read(self, bucket: str, key: str) -> Optional[str]: | ||
pass | ||
|
||
@abc.abstractmethod | ||
def write(self, bucket: str, key: str, content: str) -> None: | ||
pass | ||
|
||
|
||
class UnavailableStorage(S3): | ||
def read(self, bucket: str, key: str) -> Optional[str]: | ||
pass | ||
|
||
def write(self, bucket: str, key: str, content: str) -> None: | ||
pass | ||
|
||
def head_bucket(self, bucket: str): | ||
return False | ||
|
||
|
||
class ObjectStorage(S3): | ||
def __init__(self, aws_client, bucket: str) -> None: | ||
self.aws_client = aws_client | ||
|
||
def head_bucket(self, bucket: str) -> bool: | ||
try: | ||
return bool(self.aws_client.head_bucket(bucket=bucket)) | ||
except Exception as e: | ||
logger.warn("object_storage.health_check_failed", bucket=bucket, error=e) | ||
return False | ||
|
||
def read(self, bucket: str, key: str) -> Optional[str]: | ||
s3_response = {} | ||
try: | ||
s3_response = self.aws_client.get_object(Bucket=bucket, Key=key) | ||
content = s3_response["Body"].read() | ||
return content.decode("utf-8") | ||
except Exception as e: | ||
logger.error("object_storage.read_failed", bucket=bucket, file_name=key, error=e, s3_response=s3_response) | ||
raise ObjectStorageError("read failed") from e | ||
|
||
def write(self, bucket: str, key: str, content: str) -> None: | ||
s3_response = {} | ||
try: | ||
s3_response = self.aws_client.put_object(Bucket=bucket, Body=content, Key=key) | ||
except Exception as e: | ||
logger.error("object_storage.write_failed", bucket=bucket, file_name=key, error=e, s3_response=s3_response) | ||
raise ObjectStorageError("write failed") from e | ||
|
||
|
||
s3_client: S3 = UnavailableStorage() | ||
|
||
# boto doing some magic and gets confused if this is hinted as BaseClient | ||
# noinspection PyMissingTypeHints | ||
def storage_client(): | ||
|
||
def storage_client() -> S3: | ||
global s3_client | ||
if settings.OBJECT_STORAGE_ENABLED and not s3_client: | ||
s3_client = client( | ||
"s3", | ||
endpoint_url=settings.OBJECT_STORAGE_ENDPOINT, | ||
aws_access_key_id=settings.OBJECT_STORAGE_ACCESS_KEY_ID, | ||
aws_secret_access_key=settings.OBJECT_STORAGE_SECRET_ACCESS_KEY, | ||
config=Config(signature_version="s3v4", connect_timeout=1, retries={"max_attempts": 1}), | ||
region_name="us-east-1", | ||
|
||
if settings.OBJECT_STORAGE_ENABLED and isinstance(s3_client, UnavailableStorage): | ||
s3_client = ObjectStorage( | ||
client( | ||
"s3", | ||
endpoint_url=settings.OBJECT_STORAGE_ENDPOINT, | ||
aws_access_key_id=settings.OBJECT_STORAGE_ACCESS_KEY_ID, | ||
aws_secret_access_key=settings.OBJECT_STORAGE_SECRET_ACCESS_KEY, | ||
config=Config(signature_version="s3v4", connect_timeout=1, retries={"max_attempts": 1}), | ||
region_name="us-east-1", | ||
), | ||
bucket=settings.OBJECT_STORAGE_BUCKET, | ||
) | ||
|
||
return s3_client | ||
|
||
|
||
class ObjectStorageError(Exception): | ||
pass | ||
def write(file_name: str, content: str) -> None: | ||
return storage_client().write(bucket=settings.OBJECT_STORAGE_BUCKET, key=file_name, content=content) | ||
|
||
|
||
def write(file_name: str, content: str): | ||
s3_response = {} | ||
try: | ||
s3_response = storage_client().put_object(Bucket=OBJECT_STORAGE_BUCKET, Body=content, Key=file_name) | ||
except Exception as e: | ||
logger.error("object_storage.write_failed", file_name=file_name, error=e, s3_response=s3_response) | ||
raise ObjectStorageError("write failed") from e | ||
|
||
|
||
def read(file_name: str): | ||
s3_response = {} | ||
try: | ||
s3_response = storage_client().get_object(Bucket=OBJECT_STORAGE_BUCKET, Key=file_name) | ||
content = s3_response["Body"].read() | ||
return content.decode("utf-8") | ||
except Exception as e: | ||
logger.error("object_storage.read_failed", file_name=file_name, error=e, s3_response=s3_response) | ||
raise ObjectStorageError("read failed") from e | ||
def read(file_name: str) -> Optional[str]: | ||
return storage_client().read(bucket=settings.OBJECT_STORAGE_BUCKET, key=file_name) | ||
|
||
|
||
def health_check() -> bool: | ||
# noinspection PyBroadException | ||
try: | ||
client = storage_client() | ||
response = client.head_bucket(Bucket=settings.OBJECT_STORAGE_BUCKET) if client else False | ||
return bool(response) | ||
except Exception as e: | ||
logger.warn("object_storage.health_check_failed", error=e) | ||
return False | ||
return storage_client().head_bucket(settings.OBJECT_STORAGE_BUCKET) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,45 @@ | ||
import uuid | ||
from unittest.mock import patch | ||
|
||
from posthog.storage.object_storage import health_check | ||
from boto3 import resource | ||
from botocore.client import Config | ||
|
||
from posthog.settings import ( | ||
OBJECT_STORAGE_ACCESS_KEY_ID, | ||
OBJECT_STORAGE_BUCKET, | ||
OBJECT_STORAGE_ENDPOINT, | ||
OBJECT_STORAGE_SECRET_ACCESS_KEY, | ||
) | ||
from posthog.storage.object_storage import health_check, read, write | ||
from posthog.test.base import APIBaseTest | ||
|
||
TEST_BUCKET = "test_storage_bucket" | ||
|
||
|
||
class TestStorage(APIBaseTest): | ||
def teardown_method(self, method) -> None: | ||
s3 = resource( | ||
"s3", | ||
endpoint_url=OBJECT_STORAGE_ENDPOINT, | ||
aws_access_key_id=OBJECT_STORAGE_ACCESS_KEY_ID, | ||
aws_secret_access_key=OBJECT_STORAGE_SECRET_ACCESS_KEY, | ||
config=Config(signature_version="s3v4"), | ||
region_name="us-east-1", | ||
) | ||
bucket = s3.Bucket(OBJECT_STORAGE_BUCKET) | ||
bucket.objects.filter(Prefix=TEST_BUCKET).delete() | ||
|
||
@patch("posthog.storage.object_storage.client") | ||
def test_does_not_create_client_if_storage_is_disabled(self, patched_s3_client) -> None: | ||
with self.settings(OBJECT_STORAGE_ENABLED=False): | ||
self.assertFalse(health_check()) | ||
patched_s3_client.assert_not_called() | ||
|
||
def test_write_and_read_works_with_known_content(self) -> None: | ||
with self.settings(OBJECT_STORAGE_ENABLED=True): | ||
session_id = str(uuid.uuid4()) | ||
chunk_id = uuid.uuid4() | ||
name = f"{session_id}/{0}-{chunk_id}" | ||
file_name = f"{TEST_BUCKET}/test_write_and_read_works_with_known_content/{name}" | ||
write(file_name, "my content") | ||
self.assertEqual(read(file_name), "my content") |
This file was deleted.
Oops, something went wrong.