Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Stream objects between buckets #276

Merged
merged 4 commits into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Added `S3Bucket.stream_from` to copy objects between buckets - [#276](https://github.com/PrefectHQ/prefect-aws/pull/276)

### Changed

### Deprecated
Expand Down
65 changes: 65 additions & 0 deletions prefect_aws/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import boto3
from botocore.paginate import PageIterator
from botocore.response import StreamingBody
from prefect import get_run_logger, task
from prefect.blocks.abstract import ObjectStorageBlock
from prefect.filesystems import WritableDeploymentStorage, WritableFileSystem
Expand Down Expand Up @@ -786,6 +787,70 @@ async def download_folder_to_path(

return Path(to_folder)

@sync_compatible
async def stream_from(
self,
bucket: "S3Bucket",
from_path: str,
to_path: Optional[str] = None,
**upload_kwargs: Dict[str, Any],
) -> str:
"""Streams an object from another bucket to this bucket.

Args:
bucket: The bucket to stream from.
from_path: The path of the object to stream.
to_path: The path to stream the object to. Defaults to the object's name.
**upload_kwargs: Additional keyword arguments to pass to
`Client.upload_fileobj`.

Returns:
The path that the object was uploaded to.

Examples:
Stream notes.txt from your-bucket/notes.txt to my-bucket/landed/notes.txt.

```python
from prefect_aws.s3 import S3Bucket

your_s3_bucket = S3Bucket.load("your-bucket")
my_s3_bucket = S3Bucket.load("my-bucket")

my_s3_bucket.stream_from(
your_s3_bucket,
"notes.txt",
to_path="landed/notes.txt"
)
```

"""
if to_path is None:
to_path = Path(from_path).name

# Get the source object's StreamingBody
from_path: str = self._join_bucket_folder(from_path)
from_client = bucket.credentials.get_s3_client()
obj = await run_sync_in_worker_thread(
from_client.get_object, Bucket=bucket.bucket_name, Key=from_path
)
body: StreamingBody = obj["Body"]

# Upload the StreamingBody to this bucket
bucket_path = str(self._join_bucket_folder(to_path))
to_client = self.credentials.get_s3_client()
await run_sync_in_worker_thread(
to_client.upload_fileobj,
Fileobj=body,
Bucket=self.bucket_name,
Key=bucket_path,
**upload_kwargs,
)
self.logger.info(
f"Streamed s3://{bucket.bucket_name}/{from_path} to the bucket "
f"{self.bucket_name!r} path {bucket_path!r}."
)
return bucket_path

@sync_compatible
async def upload_from_path(
self,
Expand Down
13 changes: 13 additions & 0 deletions tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,19 @@ def test_download_folder_to_path(
to_path = Path(to_path)
assert (to_path / "object").read_text() == "TEST OBJECT IN FOLDER"

@pytest.mark.parametrize("to_path", ["to_path", None])
@pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True)
def test_stream_from(
self,
s3_bucket_with_object: S3Bucket,
s3_bucket_empty: S3Bucket,
client_parameters,
to_path,
):
path = s3_bucket_empty.stream_from(s3_bucket_with_object, "object", to_path)
data: bytes = s3_bucket_empty.read_path(path)
assert data == b"TEST"

@pytest.mark.parametrize("to_path", ["new_object", None])
@pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True)
def test_upload_from_path(
Expand Down