Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SFTPSensor.newer_than not working with jinja logical ds/ts expression #36629

Closed
1 of 2 tasks
fpopic opened this issue Jan 6, 2024 · 11 comments · Fixed by #39056
Closed
1 of 2 tasks

SFTPSensor.newer_than not working with jinja logical ds/ts expression #36629

fpopic opened this issue Jan 6, 2024 · 11 comments · Fixed by #39056
Assignees
Labels

Comments

@fpopic
Copy link
Contributor

fpopic commented Jan 6, 2024

Apache Airflow Provider(s)

sftp

Versions of Apache Airflow Providers

apache-airflow-providers-sftp==4.4.0

Apache Airflow version

2.6.3

Operating System

Ubuntu 20.04.6 LTS

Deployment

Virtualenv installation

Deployment details

No response

What happened

I tried to use parameter newer_than of type datetime with jinja expression for logical execution date {{ ds }} or timestamp {{ ts }} but couldn't find a single way how to please the SFTPSensor.newer_than checks.

import datetime
import pendulum

from airflow import models
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from airflow.operators.empty import EmptyOperator


with models.DAG(
    "dag_example_sftp_sensor_newer_than_example",
    schedule_interval="@once",
    start_date=datetime.datetime(2024, 1, 1)
) as dag:

    start_dag = EmptyOperator(task_id="start_dag")
    end_dag = EmptyOperator(task_id="end_dag")

    wait_for_sftp_file = SFTPSensor(
        task_id=f"wait_for_sftp_file",
        sftp_conn_id="sftp_conn_id",
        path=f"some-other-jinja-expression-depending-on-airflow-{{{{ ds }}}}",
        newer_than=pendulum.from_format('{{ ds }}', fmt="YYYY-MM-DD")
    )

    start_dag >> wait_for_sftp_file >> end_dag

I get

File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/dagbag.py", line 346, in parse
    loader.exec_module(new_module)
  File "<frozen importlib._bootstrap_external>", line 843, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/home/airflow/gcs/dags/dag_example_sftp_sensor_newer_than_example.py", line XXX, in <module>
    newer_than=pendulum.from_format('{{ ds }}', fmt="YYYY-MM-DD")
  File "/opt/python3.8/lib/python3.8/site-packages/pendulum/__init__.py", line 259, in from_format
    parts = _formatter.parse(string, fmt, now(), locale=locale)
  File "/opt/python3.8/lib/python3.8/site-packages/pendulum/formatting/formatter.py", line 413, in parse
    raise ValueError("String does not match format {}".format(fmt))
ValueError: String does not match format YYYY-MM-DD

In case I hard-code the value (not using jinja) to something like

newer_than=pendulum.from_format('2024-01-01', fmt="YYYY-MM-DD")

everything works.

What you think should happen instead

Parameter newer_than should be working with jinja templates {{ts}} or {{ds}}.

How to reproduce

airflow tasks test dag_example_sftp_sensor_newer_than_example wait_for_sftp_file 2024-01-01

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@fpopic fpopic added area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Jan 6, 2024
Copy link

boring-cyborg bot commented Jan 6, 2024

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@fpopic fpopic changed the title SFTPSensor.newer_than not working with jinja logical date expression [WIP] SFTPSensor.newer_than not working with jinja logical date expression Jan 6, 2024
@vatsrahul1001
Copy link
Collaborator

@fpopic instead of using pendulum.from_format('{{ ds }}', fmt="YYYY-MM-DD") can you try newer_than='{{ ds }}' as defauft format is YYYY-MM-DD only

@RNHTTR RNHTTR removed the needs-triage label for new issues that we didn't triage yet label Jan 7, 2024
@RNHTTR
Copy link
Contributor

RNHTTR commented Jan 7, 2024

For what it's worth, the SFTPSensor does support templating newer_than, so to @vatsrahul1001 point, I wonder if it's something to do with when the DateTime is parsed?

@fpopic
Copy link
Contributor Author

fpopic commented Jan 7, 2024

@fpopic instead of using pendulum.from_format('{{ ds }}', fmt="YYYY-MM-DD") can you try newer_than='{{ ds }}' as defauft format is YYYY-MM-DD only

@vatsrahul1001 this works.

I found a simpler way using pendulum https://pendulum.eustace.io/docs/#addition-and-subtraction newer_than='{{ logical_date.add(days=-1, hours=-2) }}'

@fpopic fpopic closed this as completed Jan 7, 2024
@fpopic fpopic changed the title [WIP] SFTPSensor.newer_than not working with jinja logical date expression SFTPSensor.newer_than not working with jinja logical date expression Jan 7, 2024
@fpopic fpopic changed the title SFTPSensor.newer_than not working with jinja logical date expression SFTPSensor.newer_than not working with jinja logical ds/ts expression Jan 7, 2024
@fpopic
Copy link
Contributor Author

fpopic commented Jan 9, 2024

@RNHTTR @vatsrahul1001 Sorry for closing, but was too early.

My pattern was testing for non-existing file on SFTP server and it did not raise import exception, since the newer_than check comes later in code, but once the file exists on SFTP server, I am getting

[2024-01-09T08:18:29.763+0000] {sftp.py:90} INFO - Found File XXXX_YYY_KW2351_ZZZZ.WWW last modified: 20231221050445
[2024-01-09 08:18:29,771] {taskinstance.py:1826} ERROR - Task failed with exception\nTraceback (most recent call last):\n  File "/opt/python3.8/lib/python3.8/site-packages/airflow/sensors/base.py", line 225, in execute\n    raise e\n  File "/opt/python3.8/lib/python3.8/site-packages/airflow/sensors/base.py", line 212, in execute\n    poke_return = self.poke(context)\n  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/sftp/sensors/sftp.py", line 97, in poke\n    _newer_than = convert_to_utc(self.newer_than)\n  File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/timezone.py", line 104, in convert_to_utc\n    if not is_localized(value):\n  File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/timezone.py", line 39, in is_localized\n    return value.utcoffset() is not None\nAttributeError: 'str' object has no attribute 'utcoffset'
[2024-01-09T08:18:29.771+0000] {taskinstance.py:1826} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/sensors/base.py", line 225, in execute
    raise e
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/sensors/base.py", line 212, in execute
    poke_return = self.poke(context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/sftp/sensors/sftp.py", line 97, in poke
    _newer_than = convert_to_utc(self.newer_than)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/timezone.py", line 104, in convert_to_utc
    if not is_localized(value):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/timezone.py", line 39, in is_localized
    return value.utcoffset() is not None
AttributeError: 'str' object has no attribute 'utcoffset'
[2024-01-09 08:18:29,783] {taskinstance.py:1346} INFO - Marking task as FAILED. dag_id=dag_XXX, task_id=wait_for_sftp_file, execution_date=20231222T000000, start_date=, end_date=20240109T081829
[2024-01-09T08:18:29.783+0000] {taskinstance.py:1346} INFO - Marking task as FAILED. dag_id=dag_XXX, task_id=wait_for_sftp_file, execution_date=20231222T000000, start_date=, end_date=20240109T081829
Traceback (most recent call last):
  File "/opt/python3.8/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/__main__.py", line 48, in main
    args.func(args)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/cli/cli_config.py", line 52, in command
    return func(*args, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/cli.py", line 112, in wrapper
    return f(*args, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 618, in task_test
    ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1723, in run
    self._run_raw_task(
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 73, in wrapper
    return func(*args, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1408, in _run_raw_task
    self._execute_task_with_callbacks(context, test_mode)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1560, in _execute_task_with_callbacks
    result = self._execute_task(context, task_orig)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1630, in _execute_task
    result = execute_callable(context=context)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/sensors/base.py", line 225, in execute
    raise e
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/sensors/base.py", line 212, in execute
    poke_return = self.poke(context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/sftp/sensors/sftp.py", line 97, in poke
    _newer_than = convert_to_utc(self.newer_than)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/timezone.py", line 104, in convert_to_utc
    if not is_localized(value):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/timezone.py", line 39, in is_localized
    return value.utcoffset() is not None
AttributeError: 'str' object has no attribute 'utcoffset'

this was with:

wait_for_sftp_file = SFTPSensor(
    task_id=f"wait_for_sftp_file",
    sftp_conn_id="sftp_conn_id",
    path=f"XXXX_YYY_KW{{{{ macros.ds_format(ds, '%Y-%m-%d', '%y%U') }}}}_ZZZZ.WWW",
    newer_than='{{ dag_run.logical_date.add(days=-1, hours=-2) }}',
)

@fpopic fpopic reopened this Jan 9, 2024
@fpopic
Copy link
Contributor Author

fpopic commented Jan 9, 2024

@AdamPaslawski @hussein-awala might you be able to help with

File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/timezone.py", line 39, in is_localized
    return value.utcoffset() is not None
AttributeError: 'str' object has no attribute 'utcoffset'

I am thinking to add at least one unit test that depends on logical date jinja expression https://github.com/apache/airflow/blob/main/tests/providers/sftp/sensors/test_sftp.py

@fpopic
Copy link
Contributor Author

fpopic commented Jan 9, 2024

I think I recreated here the same problem in unit test

https://github.com/apache/airflow/actions/runs/7459580855/job/20296049813#step:5:781.

image

Copy link

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Jan 24, 2024
@fpopic
Copy link
Contributor Author

fpopic commented Jan 31, 2024

How can I request help here?

@hussein-awala hussein-awala removed the stale Stale PRs per the .github/workflows/stale.yml policy file label Jan 31, 2024
@hussein-awala hussein-awala self-assigned this Jan 31, 2024
@hussein-awala
Copy link
Member

I will take a look tomorrow

@grrolland
Copy link
Contributor

Hi,

I try to fixe this issue here : #39056

The newer_than parameter is actually not interpreted as a str when the sensor poke().

Hope it helps ! Works for me.

grrolland pushed a commit to grrolland/airflow that referenced this issue Apr 19, 2024
Taragolis pushed a commit that referenced this issue May 6, 2024
…sion (#39056)

* Fixes #36629

* Fixes PR failed test

* Remove an parametrize duplicate tests

* Fix formatting

* Fix formatting

* Fixes #36629

* Fixes PR failed test

* Remove an parametrize duplicate tests

* update simple-salesforce type hints to support 1.12.6 (#39047)

* Fix formatting

* Add changelog for airflow python client 2.9.0 (#39060)

* Upgrade to latest hatchling as build dependency (#39044)

* Prepare docs 1st wave (RC3) + ad hoc April 2024 (#38995) (#39054)

* Prepare docs 1st wave (RC3) + ad hoc April 2024 (#38995)

* update databricks

* [docs] update `DagBag` class docstring to include all params (#38814)

* update docstring for DagBag class

* break long line

* fix space

Signed-off-by: kalyanr <[email protected]>

---------

Signed-off-by: kalyanr <[email protected]>

* Data aware scheduling docs edits (#38687)

* Moves airflow import in deprecated pod_generator to local (#39062)

The import might be invoked when K8S executor starts with sentry on
and it might lead to circular imports

Related: #31442

* KPO xcom sidecar PodDefault usage (#38951)

We should use the same, non deprecated, version of PodDefaults for the
xcom sidecar when creating and reading xcom.

* Fix formatting

* Change date/time parsing method for newer_than parameter un SFTPSensor

* Add examples in AWS auth manager documentation (#39040)

* update document (#39068)

* Update hatchling to version 1.24.0 (#39072)

* Check that the dataset<>task exists before trying to render graph (#39069)

* Change date/time parsing method for newer_than parameter un SFTPSensor

* Fix utc timezone in unit tests

* Fix utc timezone in unit tests

---------

Signed-off-by: kalyanr <[email protected]>
Co-authored-by: Grégoire Rolland <[email protected]>
Co-authored-by: Hussein Awala <[email protected]>
Co-authored-by: Ephraim Anierobi <[email protected]>
Co-authored-by: Jarek Potiuk <[email protected]>
Co-authored-by: Elad Kalif <[email protected]>
Co-authored-by: Kalyan <[email protected]>
Co-authored-by: Laura Zdanski <[email protected]>
Co-authored-by: Jed Cunningham <[email protected]>
Co-authored-by: Vincent <[email protected]>
Co-authored-by: humit <[email protected]>
Co-authored-by: Brent Bovenzi <[email protected]>
pateash pushed a commit to pateash/airflow that referenced this issue May 13, 2024
…sion (apache#39056)

* Fixes apache#36629

* Fixes PR failed test

* Remove an parametrize duplicate tests

* Fix formatting

* Fix formatting

* Fixes apache#36629

* Fixes PR failed test

* Remove an parametrize duplicate tests

* update simple-salesforce type hints to support 1.12.6 (apache#39047)

* Fix formatting

* Add changelog for airflow python client 2.9.0 (apache#39060)

* Upgrade to latest hatchling as build dependency (apache#39044)

* Prepare docs 1st wave (RC3) + ad hoc April 2024 (apache#38995) (apache#39054)

* Prepare docs 1st wave (RC3) + ad hoc April 2024 (apache#38995)

* update databricks

* [docs] update `DagBag` class docstring to include all params (apache#38814)

* update docstring for DagBag class

* break long line

* fix space

Signed-off-by: kalyanr <[email protected]>

---------

Signed-off-by: kalyanr <[email protected]>

* Data aware scheduling docs edits (apache#38687)

* Moves airflow import in deprecated pod_generator to local (apache#39062)

The import might be invoked when K8S executor starts with sentry on
and it might lead to circular imports

Related: apache#31442

* KPO xcom sidecar PodDefault usage (apache#38951)

We should use the same, non deprecated, version of PodDefaults for the
xcom sidecar when creating and reading xcom.

* Fix formatting

* Change date/time parsing method for newer_than parameter un SFTPSensor

* Add examples in AWS auth manager documentation (apache#39040)

* update document (apache#39068)

* Update hatchling to version 1.24.0 (apache#39072)

* Check that the dataset<>task exists before trying to render graph (apache#39069)

* Change date/time parsing method for newer_than parameter un SFTPSensor

* Fix utc timezone in unit tests

* Fix utc timezone in unit tests

---------

Signed-off-by: kalyanr <[email protected]>
Co-authored-by: Grégoire Rolland <[email protected]>
Co-authored-by: Hussein Awala <[email protected]>
Co-authored-by: Ephraim Anierobi <[email protected]>
Co-authored-by: Jarek Potiuk <[email protected]>
Co-authored-by: Elad Kalif <[email protected]>
Co-authored-by: Kalyan <[email protected]>
Co-authored-by: Laura Zdanski <[email protected]>
Co-authored-by: Jed Cunningham <[email protected]>
Co-authored-by: Vincent <[email protected]>
Co-authored-by: humit <[email protected]>
Co-authored-by: Brent Bovenzi <[email protected]>
kosteev pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue Nov 9, 2024
…sion (#39056)

* Fixes apache/airflow#36629

* Fixes PR failed test

* Remove an parametrize duplicate tests

* Fix formatting

* Fix formatting

* Fixes apache/airflow#36629

* Fixes PR failed test

* Remove an parametrize duplicate tests

* update simple-salesforce type hints to support 1.12.6 (#39047)

* Fix formatting

* Add changelog for airflow python client 2.9.0 (#39060)

* Upgrade to latest hatchling as build dependency (#39044)

* Prepare docs 1st wave (RC3) + ad hoc April 2024 (#38995) (#39054)

* Prepare docs 1st wave (RC3) + ad hoc April 2024 (#38995)

* update databricks

* [docs] update `DagBag` class docstring to include all params (#38814)

* update docstring for DagBag class

* break long line

* fix space

Signed-off-by: kalyanr <[email protected]>

---------

Signed-off-by: kalyanr <[email protected]>

* Data aware scheduling docs edits (#38687)

* Moves airflow import in deprecated pod_generator to local (#39062)

The import might be invoked when K8S executor starts with sentry on
and it might lead to circular imports

Related: #31442

* KPO xcom sidecar PodDefault usage (#38951)

We should use the same, non deprecated, version of PodDefaults for the
xcom sidecar when creating and reading xcom.

* Fix formatting

* Change date/time parsing method for newer_than parameter un SFTPSensor

* Add examples in AWS auth manager documentation (#39040)

* update document (#39068)

* Update hatchling to version 1.24.0 (#39072)

* Check that the dataset<>task exists before trying to render graph (#39069)

* Change date/time parsing method for newer_than parameter un SFTPSensor

* Fix utc timezone in unit tests

* Fix utc timezone in unit tests

---------

Signed-off-by: kalyanr <[email protected]>
Co-authored-by: Grégoire Rolland <[email protected]>
Co-authored-by: Hussein Awala <[email protected]>
Co-authored-by: Ephraim Anierobi <[email protected]>
Co-authored-by: Jarek Potiuk <[email protected]>
Co-authored-by: Elad Kalif <[email protected]>
Co-authored-by: Kalyan <[email protected]>
Co-authored-by: Laura Zdanski <[email protected]>
Co-authored-by: Jed Cunningham <[email protected]>
Co-authored-by: Vincent <[email protected]>
Co-authored-by: humit <[email protected]>
Co-authored-by: Brent Bovenzi <[email protected]>
GitOrigin-RevId: 8965f2e6be8af3ca6fd12cccb91f6409bbbb6d7f
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants