diff --git a/CHANGELOG.md b/CHANGELOG.md index 46cd8d32..399c5844 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added `S3Bucket.stream_from` to copy objects between buckets - [#276](https://github.com/PrefectHQ/prefect-aws/pull/276) + ### Changed ### Deprecated diff --git a/prefect_aws/s3.py b/prefect_aws/s3.py index eebd96df..cb5f4b64 100644 --- a/prefect_aws/s3.py +++ b/prefect_aws/s3.py @@ -8,6 +8,7 @@ import boto3 from botocore.paginate import PageIterator +from botocore.response import StreamingBody from prefect import get_run_logger, task from prefect.blocks.abstract import ObjectStorageBlock from prefect.filesystems import WritableDeploymentStorage, WritableFileSystem @@ -786,6 +787,70 @@ async def download_folder_to_path( return Path(to_folder) + @sync_compatible + async def stream_from( + self, + bucket: "S3Bucket", + from_path: str, + to_path: Optional[str] = None, + **upload_kwargs: Dict[str, Any], + ) -> str: + """Streams an object from another bucket to this bucket. + + Args: + bucket: The bucket to stream from. + from_path: The path of the object to stream. + to_path: The path to stream the object to. Defaults to the object's name. + **upload_kwargs: Additional keyword arguments to pass to + `Client.upload_fileobj`. + + Returns: + The path that the object was uploaded to. + + Examples: + Stream notes.txt from your-bucket/notes.txt to my-bucket/landed/notes.txt. + + ```python + from prefect_aws.s3 import S3Bucket + + your_s3_bucket = S3Bucket.load("your-bucket") + my_s3_bucket = S3Bucket.load("my-bucket") + + my_s3_bucket.stream_from( + your_s3_bucket, + "notes.txt", + to_path="landed/notes.txt" + ) + ``` + + """ + if to_path is None: + to_path = Path(from_path).name + + # Get the source object's StreamingBody + from_path: str = self._join_bucket_folder(from_path) + from_client = bucket.credentials.get_s3_client() + obj = await run_sync_in_worker_thread( + from_client.get_object, Bucket=bucket.bucket_name, Key=from_path + ) + body: StreamingBody = obj["Body"] + + # Upload the StreamingBody to this bucket + bucket_path = str(self._join_bucket_folder(to_path)) + to_client = self.credentials.get_s3_client() + await run_sync_in_worker_thread( + to_client.upload_fileobj, + Fileobj=body, + Bucket=self.bucket_name, + Key=bucket_path, + **upload_kwargs, + ) + self.logger.info( + f"Streamed s3://{bucket.bucket_name}/{from_path} to the bucket " + f"{self.bucket_name!r} path {bucket_path!r}." + ) + return bucket_path + @sync_compatible async def upload_from_path( self, diff --git a/tests/test_s3.py b/tests/test_s3.py index 342f46e8..f57b900e 100644 --- a/tests/test_s3.py +++ b/tests/test_s3.py @@ -715,6 +715,19 @@ def test_download_folder_to_path( to_path = Path(to_path) assert (to_path / "object").read_text() == "TEST OBJECT IN FOLDER" + @pytest.mark.parametrize("to_path", ["to_path", None]) + @pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True) + def test_stream_from( + self, + s3_bucket_with_object: S3Bucket, + s3_bucket_empty: S3Bucket, + client_parameters, + to_path, + ): + path = s3_bucket_empty.stream_from(s3_bucket_with_object, "object", to_path) + data: bytes = s3_bucket_empty.read_path(path) + assert data == b"TEST" + @pytest.mark.parametrize("to_path", ["new_object", None]) @pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True) def test_upload_from_path(