diff --git a/posthog/storage/object_storage.py b/posthog/storage/object_storage.py index 7dc7890253713..06629b26c81cf 100644 --- a/posthog/storage/object_storage.py +++ b/posthog/storage/object_storage.py @@ -1,61 +1,104 @@ +import abc +from typing import Optional + import structlog from boto3 import client from botocore.client import Config +from django.conf import settings logger = structlog.get_logger(__name__) -from django.conf import settings -s3_client = None +class ObjectStorageError(Exception): + pass + + +class S3(metaclass=abc.ABCMeta): + """Just because the full S3 API is available doesn't mean we should use it all""" + + @abc.abstractmethod + def head_bucket(self, bucket: str) -> bool: + pass + + @abc.abstractmethod + def read(self, bucket: str, key: str) -> Optional[str]: + pass + + @abc.abstractmethod + def write(self, bucket: str, key: str, content: str) -> None: + pass + + +class UnavailableStorage(S3): + def read(self, bucket: str, key: str) -> Optional[str]: + pass + + def write(self, bucket: str, key: str, content: str) -> None: + pass + + def head_bucket(self, bucket: str): + return False + + +class ObjectStorage(S3): + def __init__(self, aws_client, bucket: str) -> None: + self.aws_client = aws_client + + def head_bucket(self, bucket: str) -> bool: + try: + return bool(self.aws_client.head_bucket(bucket=bucket)) + except Exception as e: + logger.warn("object_storage.health_check_failed", bucket=bucket, error=e) + return False + + def read(self, bucket: str, key: str) -> Optional[str]: + s3_response = {} + try: + s3_response = self.aws_client.get_object(Bucket=bucket, Key=key) + content = s3_response["Body"].read() + return content.decode("utf-8") + except Exception as e: + logger.error("object_storage.read_failed", bucket=bucket, file_name=key, error=e, s3_response=s3_response) + raise ObjectStorageError("read failed") from e + + def write(self, bucket: str, key: str, content: str) -> None: + s3_response = {} + try: + s3_response = self.aws_client.put_object(Bucket=bucket, Body=content, Key=key) + except Exception as e: + logger.error("object_storage.write_failed", bucket=bucket, file_name=key, error=e, s3_response=s3_response) + raise ObjectStorageError("write failed") from e + +s3_client: S3 = UnavailableStorage() -# boto doing some magic and gets confused if this is hinted as BaseClient -# noinspection PyMissingTypeHints -def storage_client(): + +def storage_client() -> S3: global s3_client - if settings.OBJECT_STORAGE_ENABLED and not s3_client: - s3_client = client( - "s3", - endpoint_url=settings.OBJECT_STORAGE_ENDPOINT, - aws_access_key_id=settings.OBJECT_STORAGE_ACCESS_KEY_ID, - aws_secret_access_key=settings.OBJECT_STORAGE_SECRET_ACCESS_KEY, - config=Config(signature_version="s3v4", connect_timeout=1, retries={"max_attempts": 1}), - region_name="us-east-1", + + if settings.OBJECT_STORAGE_ENABLED and isinstance(s3_client, UnavailableStorage): + s3_client = ObjectStorage( + client( + "s3", + endpoint_url=settings.OBJECT_STORAGE_ENDPOINT, + aws_access_key_id=settings.OBJECT_STORAGE_ACCESS_KEY_ID, + aws_secret_access_key=settings.OBJECT_STORAGE_SECRET_ACCESS_KEY, + config=Config(signature_version="s3v4", connect_timeout=1, retries={"max_attempts": 1}), + region_name="us-east-1", + ), + bucket=settings.OBJECT_STORAGE_BUCKET, ) return s3_client -class ObjectStorageError(Exception): - pass +def write(file_name: str, content: str) -> None: + return storage_client().write(bucket=settings.OBJECT_STORAGE_BUCKET, key=file_name, content=content) -def write(file_name: str, content: str): - s3_response = {} - try: - s3_response = storage_client().put_object(Bucket=OBJECT_STORAGE_BUCKET, Body=content, Key=file_name) - except Exception as e: - logger.error("object_storage.write_failed", file_name=file_name, error=e, s3_response=s3_response) - raise ObjectStorageError("write failed") from e - - -def read(file_name: str): - s3_response = {} - try: - s3_response = storage_client().get_object(Bucket=OBJECT_STORAGE_BUCKET, Key=file_name) - content = s3_response["Body"].read() - return content.decode("utf-8") - except Exception as e: - logger.error("object_storage.read_failed", file_name=file_name, error=e, s3_response=s3_response) - raise ObjectStorageError("read failed") from e +def read(file_name: str) -> Optional[str]: + return storage_client().read(bucket=settings.OBJECT_STORAGE_BUCKET, key=file_name) def health_check() -> bool: - # noinspection PyBroadException - try: - client = storage_client() - response = client.head_bucket(Bucket=settings.OBJECT_STORAGE_BUCKET) if client else False - return bool(response) - except Exception as e: - logger.warn("object_storage.health_check_failed", error=e) - return False + return storage_client().head_bucket(settings.OBJECT_STORAGE_BUCKET) diff --git a/posthog/storage/test/test_object_storage.py b/posthog/storage/test/test_object_storage.py index 32de1228097ab..35dcfad29e395 100644 --- a/posthog/storage/test/test_object_storage.py +++ b/posthog/storage/test/test_object_storage.py @@ -1,12 +1,45 @@ +import uuid from unittest.mock import patch -from posthog.storage.object_storage import health_check +from boto3 import resource +from botocore.client import Config + +from posthog.settings import ( + OBJECT_STORAGE_ACCESS_KEY_ID, + OBJECT_STORAGE_BUCKET, + OBJECT_STORAGE_ENDPOINT, + OBJECT_STORAGE_SECRET_ACCESS_KEY, +) +from posthog.storage.object_storage import health_check, read, write from posthog.test.base import APIBaseTest +TEST_BUCKET = "test_storage_bucket" + class TestStorage(APIBaseTest): + def teardown_method(self, method) -> None: + s3 = resource( + "s3", + endpoint_url=OBJECT_STORAGE_ENDPOINT, + aws_access_key_id=OBJECT_STORAGE_ACCESS_KEY_ID, + aws_secret_access_key=OBJECT_STORAGE_SECRET_ACCESS_KEY, + config=Config(signature_version="s3v4"), + region_name="us-east-1", + ) + bucket = s3.Bucket(OBJECT_STORAGE_BUCKET) + bucket.objects.filter(Prefix=TEST_BUCKET).delete() + @patch("posthog.storage.object_storage.client") def test_does_not_create_client_if_storage_is_disabled(self, patched_s3_client) -> None: with self.settings(OBJECT_STORAGE_ENABLED=False): self.assertFalse(health_check()) patched_s3_client.assert_not_called() + + def test_write_and_read_works_with_known_content(self) -> None: + with self.settings(OBJECT_STORAGE_ENABLED=True): + session_id = str(uuid.uuid4()) + chunk_id = uuid.uuid4() + name = f"{session_id}/{0}-{chunk_id}" + file_name = f"{TEST_BUCKET}/test_write_and_read_works_with_known_content/{name}" + write(file_name, "my content") + self.assertEqual(read(file_name), "my content") diff --git a/posthog/storage/test/test_storage.py b/posthog/storage/test/test_storage.py deleted file mode 100644 index e3c2f322d17b8..0000000000000 --- a/posthog/storage/test/test_storage.py +++ /dev/null @@ -1,37 +0,0 @@ -import unittest -import uuid - -from boto3 import resource -from botocore.client import Config - -from posthog.settings import ( - OBJECT_STORAGE_ACCESS_KEY_ID, - OBJECT_STORAGE_BUCKET, - OBJECT_STORAGE_ENDPOINT, - OBJECT_STORAGE_SECRET_ACCESS_KEY, -) -from posthog.storage.object_storage import read, write - -TEST_BUCKET = "test_storage_bucket" - - -class TestStorage(unittest.TestCase): - def teardown_method(self, method): - s3 = resource( - "s3", - endpoint_url=OBJECT_STORAGE_ENDPOINT, - aws_access_key_id=OBJECT_STORAGE_ACCESS_KEY_ID, - aws_secret_access_key=OBJECT_STORAGE_SECRET_ACCESS_KEY, - config=Config(signature_version="s3v4"), - region_name="us-east-1", - ) - bucket = s3.Bucket(OBJECT_STORAGE_BUCKET) - bucket.objects.filter(Prefix=TEST_BUCKET).delete() - - def test_write_and_read_works_with_known_content(self): - session_id = str(uuid.uuid4()) - chunk_id = uuid.uuid4() - name = f"{session_id}/{0}-{chunk_id}" - file_name = f"{TEST_BUCKET}/test_write_and_read_works_with_known_content/{name}" - write(file_name, "my content") - self.assertEqual(read(file_name), "my content")