Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more filter options to list_keys of S3Hook #16821

Closed
wants to merge 7 commits into from
Closed

Add more filter options to list_keys of S3Hook #16821

wants to merge 7 commits into from

Conversation

sunank200
Copy link
Collaborator

Add more filter options to list_keys of S3Hook

This commit adds following filters to list the keys in list_keys of S3Hook:
- start_after_key filters the any keys after the specified key. start_after_key can be any key in the bucket.
- start_after_datetime filters all the keys with last modified date-time greater than or equal to the start_after_datetime.
- to_datetime filters all the keys with last modified date-time less than or equal to the to_datetime

closes: #16627

This change wouldn't affect dependencies for other operators like S3DeleteObjectsOperator, S3ListOperator, S3Hook methods:get_wildcard_key, delete_bucket and S3KeysUnchangedSensor.

Corresponding unittest has been added to test_s3.py.


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@boring-cyborg boring-cyborg bot added area:providers provider:amazon AWS/Amazon - related issues labels Jul 5, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Jul 5, 2021

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (flake8, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: [email protected]
    Slack: https://s.apache.org/airflow-slack

@sunank200 sunank200 changed the title Add more filter options to list_keys of S3Hoo Add more filter options to list_keys of S3Hook Jul 5, 2021
@ferruzzi
Copy link
Contributor

ferruzzi commented Jul 6, 2021

Is this functionality we want in the hooks? Per the discussion here it sounds like that should be in the operator, maybe?

@kaxil kaxil closed this Jul 6, 2021
@kaxil kaxil reopened this Jul 6, 2021
@kaxil
Copy link
Member

kaxil commented Jul 6, 2021

Re-triggered CI

Copy link
Contributor

@dstandish dstandish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me offer a recommendation.

Rather than adding start_after_datetime and to_datetime, just add a single param called object_filter that takes a function (obj: dict) -> bool

then this function can be applied like so:

        for page in response:
            if 'Contents' in page:
                for k in page['Contents']:
                    <evaluate object filter here and only append contditionally>
                    keys.append(k['Key'])

with obj filter people can apply whatevery logic they want and they don't have to understand what start_from_datetime / to_datetime mean and how to use (since they aren't part of the s3 api)

what do yall think

:param start_after_key: returns keys after this specified key in the bucket.
:type start_after_key: str
:param start_after_datetime: returns keys with last modified datetime greater than the specified datetime.
:type start_after_datetime: datetime , ISO8601: '%Y-%m-%dT%H:%M:%S%z', e.g. 2021-02-20T05:20:20+0000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the type is datetime then why do you need to specify string formatting here? it makes it look like the type is actually string

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dstandish I don't think we require to specify the string formatting here. datetime object should be passed here as an argument, it will work fine. String type passed as an argument won't work. I was just trying to give example. I will update the examples as:

e.g:  to_datetime =datetime.strptime('2024-08-19T09:55:48+0000','%Y-%m-%dT%H:%M:%S%z')

@@ -269,6 +270,9 @@ def list_keys(
delimiter: Optional[str] = None,
page_size: Optional[int] = None,
max_items: Optional[int] = None,
start_after_key: Optional[str] = '',
Copy link
Contributor

@dstandish dstandish Jul 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think None is the standard default for optional string. I think this is the most intuitive approach, compared with empty string.

Then you can you can add the parameter only conditionally to paginate. To me it makes sense to only specify parameters when you actually want to use them. None is a good default to mean, "do not use this param" (unless supplying StartAfter=None in paginate has the same effect as omitting it, in which case leaving it in is fine).

Copy link
Contributor

@pateash pateash Jul 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely agree with @dstandish,
I have rarely seen a default string as literal '' in my experience in Airflow codebase.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
start_after_key: Optional[str] = '',
start_after_key: Optional[str] = None,

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense. I will add the condition to use StartAfter only when it's not None. Also, I will make the default string None as per the Airflow codebase.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done with the changes

StartAfter=start_after_key,
)
# JMESPath to query directly on paginated results
filtered_response = response.search(
Copy link
Contributor

@dstandish dstandish Jul 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you filtering even when StartAfter is None?

Copy link
Collaborator Author

@sunank200 sunank200 Jul 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When StartAfter is None, paginator.paginate() doesn't use it as filter. It filters on StartAfter only when it's not a None. So, filtering works in both the conditions when it's None and not None. But like you have suggested above, I will add a condition to using the parameters only when passed by the user as argument.

filtered_response = response.search(
"Contents[?to_string("
"LastModified)<='\"{}\"' && "
"to_string(LastModified)>='\"{"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's going to be called start_after_datetime then this should be strictly greater than, i think

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but i recommend just from_datetime and to_datetime

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. from_datetime and to_datetime make more sense. I will do the change.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed from_datetime, to_datetime and using ResponseFilter class with the filter method which supports all the operations defined by user: commit

@dstandish
Copy link
Contributor

dstandish commented Jul 7, 2021

Let me offer a recommendation.

Rather than adding start_after_datetime and to_datetime, just add a single param called object_filter that takes a function (obj: dict) -> bool

then this function can be applied like so:

        for page in response:
            if 'Contents' in page:
                for k in page['Contents']:
                    <evaluate object filter here and only append contditionally>
                    keys.append(k['Key'])

with obj filter people can apply whatevery logic they want and they don't have to understand what start_from_datetime / to_datetime mean and how to use (since they aren't part of the s3 api)

what do yall think

Maybe this isn't the greatest idea... I thought there were more attributes available in the response.

Here's what you get in the response:

{'Key': 'tmp/hello/1.txt',
 'LastModified': datetime.datetime(2021, 7, 6, 3, 13, 41, tzinfo=tzutc()),
 'ETag': '"49f68a5c8493ec2c0bf489821c21fc3b"',
 'Size': 2,
 'StorageClass': 'STANDARD'}

So I guess there isn't much else in that response that you'd want to filter on.

Still, putting the filtering in the user's hands (through the use of an object_filter callable would remove the ambiguity of strictly greater than vs greater than (and same with less than)?

Also there's the problem of naming.

If we include both parameters from and to, we should probably call them from_datetime and to_datetime, where from is >= and to is <.

One other thing.... I am not sure why we use the jmespath when we already have the datetime object available in the for loop in the existing code. I think it is best to compare LastModified directly with the user supplied datetime rather than converting everything to string and doing a json search. For one this is additional transformation that inherently could introduce an error through formatting difference. I also suspect the json search is significantly less efficient but not sure. Bottom line, if you already have datetime on both sides of the comparison, why convert them both to string to compare them?

Comment on lines 316 to 317
"LastModified)<='\"{}\"' && "
"to_string(LastModified)>='\"{"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does not handle case where you have exactly one of to_datetime and start_after_datetime (i.e. not both)

also pendulum and datetime are formatted differently. observe:

>>> import pendulum
>>> '{}'.format(pendulum.now())
'2021-07-06T21:22:11.514220-07:00'
>>> from datetime import datetime
>>> '{}'.format(datetime.now())
'2021-07-06 21:22:35.418199'

if you compare the objects directly, your feature could support both pendulum (which is used commonly with airflow) and datetime; but by using string formatting you really cannot use both.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@potiuk @dstandish @pateash I have pushed the changes just now.

Created ResponseFilter class to parse and filter the s3 boto3 response based on various operations defined by the user. This class can be extended as required for parsing and filtering the response based on different object filters operation. For now, I have added the following allowed operations.

 allowed_operations = {
        "lte": operator.le,
        "gte": operator.ge,
        "gt": operator.gt,
        "lt": operator.lt,
        "eq": operator.eq,
    }

ResponseFilter.filter(object_filter=object_filter) puts the filtering in the user's hands (through the use of a filter and would remove the ambiguity of strictly greater than vs greater than (and same with less than). Please refer to the commits in this thread and test changes.

I have added the comment for the implementation details in the code.

Copy link
Contributor

@dstandish dstandish Jul 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not sure adding this ResponseFilter is the right approach. i am not sure we need to add this abstraction layer.

Why not just let the user put an arbitrary callable that does the filtering that the user needs?

For example, your docstring shows this example:

object_filter={"LastModified__lt": datetime.now()},

But with arbitrary callable, this can be implemented about as simply this way:

object_filter=lambda x: x['LastModified'] < datetime.now(),

This is roughly as compact and roughly as simple, but the benefit is there's no need to understand the options available within the class and how it works, and if user wants to do arbitrary filtering they can.


That said, I still think it's probably best to go with your original thought, i.e. from_datetime and to_datetime (though with the updated naming / logic).

(I know I initially suggested adding object filter callable, but subsequently I walked back that suggestion; since there aren't other useful filter attributes in the object metadata, it's not as useful).

And your approach of adding only a certain number of allowable operations doesn't really provide the flexibility of the callable.

Simply adding the two optional datetime params, from and to, is simple, easy to understand, useful. It's true that with simple from / to params, we can't provide flexibility with interval edge inclusivity, but to me that seems like an acceptable tradeoff for the simplicity and usabilitiy. If we go with this approach I would probably vote for doing >= for from and < for to.


In conclusion i think going with from and to seems best, but if not that then simple arbitrary callable instead of the ResponseFilter apparatus seems best. Interested in what others think.

Copy link
Collaborator Author

@sunank200 sunank200 Jul 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure @dstandish, I think the to and from approach keeps the code simple and doesn't add additional complexity to it. ResponseFilter and object_filter callable in my opinion makes things more generic for any filter operations in the future but it does add additional complexity to it. Would be happy to accommodate the change once others comment and conclude on the approach.

@potiuk what do you think?

@@ -269,6 +270,9 @@ def list_keys(
delimiter: Optional[str] = None,
page_size: Optional[int] = None,
max_items: Optional[int] = None,
start_after_key: Optional[str] = '',
Copy link
Contributor

@pateash pateash Jul 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely agree with @dstandish,
I have rarely seen a default string as literal '' in my experience in Airflow codebase.

@@ -269,6 +270,9 @@ def list_keys(
delimiter: Optional[str] = None,
page_size: Optional[int] = None,
max_items: Optional[int] = None,
start_after_key: Optional[str] = '',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
start_after_key: Optional[str] = '',
start_after_key: Optional[str] = None,

@potiuk
Copy link
Member

potiuk commented Jul 10, 2021

Hey @sunank200 - how about applying the comments from reviews ? I am preparing to release next wave of providers and it would be great to merge that one in.

@sunank200
Copy link
Collaborator Author

@potiuk sure. Was planning to comment on the review and do changes today. Some situation at home hence the delay. I will resolve the issues today.

@potiuk
Copy link
Member

potiuk commented Jul 11, 2021

@potiuk sure. Was planning to comment on the review and do changes today. Some situation at home hence the delay. I will resolve the issues today.

No worries. Hope all is good !

@sunank200
Copy link
Collaborator Author

sunank200 commented Jul 12, 2021

@potiuk @dstandish @pateash I have done the required changes. I would be more than happy to accommodate the feedback.

closes: #16627

@potiuk
Copy link
Member

potiuk commented Jul 14, 2021

Can you please rebase to latest main @sunank200 ? We had some failure in main recently and they are all fixed now.

This commit adds following filters to list the keys in list_keys of S3Hook:
- start_after_key filters the any keys after the specified key. start_after_key can be any key in the bucket.
- start_after_datetime filters all the keys with last modified datetime greater than or equal to the start_after_datetime.
- to_datetime filters all the keys with last modified datetime less than or equal to the to_datetime
Tests to filter keys after specified key and to filter keys based on last modified datetime.
… object_filter callable.

Response filter to filter objects based on operations defined by user. It is written in a generic way such that it can be extended in future for different operations.
… object_filter callable.

Response filter to filter objects based on operations defined by user. It is written in a generic way such that it can be extended in future for different operations.
Fixes for Flake8 error and indentation bug fix to return result when object_filter is None
@sunank200
Copy link
Collaborator Author

@potiuk I have rebase to latest main.

@potiuk potiuk closed this Jul 17, 2021
@potiuk potiuk reopened this Jul 17, 2021
@potiuk
Copy link
Member

potiuk commented Jul 17, 2021

Reopened it to re-trigger the build

@potiuk
Copy link
Member

potiuk commented Jul 18, 2021

There are some static checks and docs build that needs fixes (I recommend installing pre-commit to fix the static checks)

@github-actions
Copy link

github-actions bot commented Sep 2, 2021

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Sep 2, 2021
@github-actions github-actions bot closed this Sep 8, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers provider:amazon AWS/Amazon - related issues stale Stale PRs per the .github/workflows/stale.yml policy file
Projects
None yet
Development

Successfully merging this pull request may close these issues.

add more filter options to list_keys of S3Hook
6 participants