-
Notifications
You must be signed in to change notification settings - Fork 38
AWS S3 copy and move tasks and S3Bucket
methods
#316
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution! Let me know if you have any questions about any of my comments!
prefect_aws/s3.py
Outdated
"""Uses S3's internal | ||
[CopyObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html) | ||
to copy objects within or between buckets. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also have a stream_from
method on this class, but this method looks like it is different in subtle ways. Could you update the docstring to explicitly callout when you would use the function vs. stream_from
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah so the stream_from
is good when there isn't a single set of credentials that can read from the source and write to the destination. The example I gave in #276 was with a public bucket like NOAA's GHCN-D bucket, which must be read via an unsigned request. Its shortcoming is that the object has to be downloaded to the Python process, then uploaded again. Sometimes that is necessary, but sometimes we can just use S3's internal copy operation, which completes large copies in seconds.
Will add context to the docstrings of both methods.
prefect_aws/s3.py
Outdated
source_bucket_name: str, | ||
aws_credentials: AwsCredentials, | ||
target_bucket_name: Optional[str] = None, | ||
aws_client_parameters: AwsClientParameters = AwsClientParameters(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can set client parameters on a AwsCredntials
object, so I think we can remove this parameter from the task.
prefect_aws/s3.py
Outdated
source_bucket_name: str, | ||
aws_credentials: AwsCredentials, | ||
target_bucket_name: Optional[str] = None, | ||
aws_client_parameters: AwsClientParameters = AwsClientParameters(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can set client parameters on a AwsCredntials
object, so I think we can remove this parameter from the task.
prefect_aws/s3.py
Outdated
s3_client = aws_credentials.get_boto3_session().client( | ||
"s3", **aws_client_parameters.get_params_override() | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd recommend using get_client
to ensure that the client parameters on aws_credentials
are respected.
s3_client = aws_credentials.get_boto3_session().client( | |
"s3", **aws_client_parameters.get_params_override() | |
) | |
s3_client = aws_credentials.get_client("s3") |
prefect_aws/s3.py
Outdated
s3_client = aws_credentials.get_boto3_session().client( | ||
"s3", **aws_client_parameters.get_params_override() | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd recommend using get_client
to ensure that the client parameters on aws_credentials
are respected.
s3_client = aws_credentials.get_boto3_session().client( | |
"s3", **aws_client_parameters.get_params_override() | |
) | |
s3_client = aws_credentials.get_client("s3") |
tests/test_s3.py
Outdated
self, | ||
s3_bucket_with_object: S3Bucket, | ||
s3_bucket_2_empty: S3Bucket, | ||
client_parameters, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the client_parameter
fixture isn't used in this test.
tests/test_s3.py
Outdated
def test_move_object_within_bucket( | ||
self, | ||
s3_bucket_with_object: S3Bucket, | ||
client_parameters, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the client_parameter
fixture isn't used in this test. Can it be removed?
tests/test_s3.py
Outdated
def test_move_object_to_nonexistent_bucket_fails( | ||
self, | ||
s3_bucket_with_object: S3Bucket, | ||
client_parameters, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the client_parameter
fixture isn't used in this test. Can it be removed?
tests/test_s3.py
Outdated
def test_move_object_onto_itself_fails( | ||
self, | ||
s3_bucket_with_object: S3Bucket, | ||
client_parameters, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the client_parameter
fixture isn't used in this test. Can it be removed?
tests/test_s3.py
Outdated
self, | ||
s3_bucket_with_object: S3Bucket, | ||
s3_bucket_2_empty: S3Bucket, | ||
client_parameters, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the client_parameter
fixture isn't used in this test. Can it be removed?
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making the requested update @dominictarro! I left some comments on some fixtures that aren't being used, but once those are removed this PR is good to go!
s3_bucket_with_object.move_object("object", "object") | ||
assert s3_bucket_with_object.read_path("object") == b"TEST" | ||
|
||
@pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixture is unused and can be removed.
@pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True) |
with pytest.raises(ClientError): | ||
assert s3_bucket_with_object.read_path("object") == b"TEST" | ||
|
||
@pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixture is unused and can be removed.
@pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True) |
s3_bucket_with_object.copy_object("object", "object_copy_4", s3_bucket_2_empty) | ||
assert s3_bucket_2_empty.read_path("object_copy_4") == b"TEST" | ||
|
||
@pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixture is unused and can be removed.
@pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True) |
@@ -811,3 +975,71 @@ def test_upload_from_folder( | |||
break | |||
else: | |||
raise AssertionError("Files did upload") | |||
|
|||
@pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixture is unused and can be removed.
@pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True) |
) | ||
assert s3_bucket_with_object.read_path("object") == b"TEST" | ||
|
||
@pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixture is unused and can be removed.
@pytest.mark.parametrize("client_parameters", aws_clients[-1:], indirect=True) |
assert read(bucket, "object") == b"TEST" | ||
|
||
|
||
@pytest.mark.parametrize("client_parameters", aws_clients, indirect=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixture is unused and can be removed.
@pytest.mark.parametrize("client_parameters", aws_clients, indirect=True) |
read(bucket, "subfolder/object_copy") | ||
|
||
|
||
@pytest.mark.parametrize("client_parameters", aws_clients, indirect=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixture is unused and can be removed.
@pytest.mark.parametrize("client_parameters", aws_clients, indirect=True) |
assert read(bucket, "subfolder/new_object") == b"TEST" | ||
|
||
|
||
@pytest.mark.parametrize("client_parameters", aws_clients, indirect=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixture is unused and can be removed
@pytest.mark.parametrize("client_parameters", aws_clients, indirect=True) |
@@ -205,6 +224,151 @@ async def test_flow(): | |||
assert output == b"NEW OBJECT" | |||
|
|||
|
|||
@pytest.mark.parametrize("client_parameters", aws_clients, indirect=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixture is unused and can be removed.
@pytest.mark.parametrize("client_parameters", aws_clients, indirect=True) |
The fixture is required by the
|
Ah, that's right. I forgot about the logic that populates |
Added
prefect_aws.s3.s3_copy
prefect_aws.s3.s3_move
prefect_aws.s3.S3Bucket.copy_object
prefect_aws.s3.S3Bucket.move_object
Closes #315
Example
Screenshots
Checklist
pre-commit
checks.pre-commit install && pre-commit run --all
locally for formatting and linting.mkdocs serve
view documentation locally.