-
Notifications
You must be signed in to change notification settings - Fork 14.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable passing --xcoms to tasks test CLI command #29608
Conversation
Using JSON for this feels cumbersome to me. I don’t have a much better idea though. Also we need to find a way to support custom backends that may not store values as JSON. |
@uranusjr thank you for your feedback!
I think it does for me as well.. At the same time, it's probably worth mentioning that Thinking out loud which alternative options of passing things through we may also have. 1. Simplified format in CLI (task_id, xcom_key, value) triplets joined with a delimiter (a comma below).
Probably all the escaping and ad-hoc handling will make things even worse. 2. Values passed through a file
Sorry for ".json" - Yeah, like again, the contents of the file should have some structure. We may go with yaml :) What generally doesn't look convenient here with whatever structure of the file - requiring to create a file even for simple scenarios (if passing through the file is the only option). --
Let me clarify to make sure that I understand it correctly. You mean that generally the
Though from the usability perspective I wouldn't stick to this option as the only available one. What do you think? |
Regarding the JSON direction potentially a better way would be to make it more explicitly structured like:
or just
It may look a bit more wordy and cumbersome, but at the same time clearer and more extensible in the future. Before adjusting the implementation, I would like to get more eyes on these to see if there are any concerns or if there are good alternatives to the JSON way. |
Hi @vemikhaylov thank you for setting up this PR. This is a pretty important missing piece of the local dev story. It would make sense to offer the option to pass in a json/yaml file path. For a more complex items I imagine this would be easier than manually addimg the json object in the CLI arguments. I think that both file paths and pushing the straight json object can have value. Another question is whether we should allow people to point at pickle files for pickle objects. I know we generally discourage people from turning on pickle, but it is a thing people do so we might want to allow those folks to locally test. cc: @uranusjr WDYT? |
Allowing a path makes sense to me. Since a JSON string has a pretty distinctive shape (starts and ends with either brackets or braces depending on which format we decide to use), I think we can allow both with one option. While path to a pickle object also makes sense in theory, generating that pickle file would be pretty cumbersome and I’m not sure people would want to use that. I would probably leave that out for now until someone asks for it. |
By the way, the argument name can probably be shortened to just
I think this is possible by setting |
I love it! Though technically the "task" declaration operates with the
It looks really great! Though there are the following issues, which I see with the format:
Please let me know if those are valid concerns. I personally prefer more explicit interfaces and sometimes lean towards overly explicit ones, so it all may be rather subjective, feel free to push back :) But so far I would probably stick to:
and
In terms of execution, I wanted to ask if it's fine if I move iteratively here and break it down into two PRs: 1) CLI value passing; 2) File value passing? Among other things, it should facilitate the review process a bit. The first part should be self-contained as well, so probably a safe change to be committed separately? |
I think your plan makes sense. So we’ll use this PR to implement value-passing, and add path support in a later PR. |
dc35f74
to
02a6b2c
Compare
airflow/cli/cli_parser.py
Outdated
@@ -36,13 +38,24 @@ | |||
from airflow.exceptions import AirflowException | |||
from airflow.executors.executor_constants import CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR | |||
from airflow.executors.executor_loader import ExecutorLoader | |||
from airflow.models import XCOM_RETURN_KEY |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we fine with this dependency on the model layer here or it's better to refactor the constant out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to refactor the constant out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created it as a separate PR to keep this one more focused (#30180).
def inject_xcoms(self, xcoms: list[dict[str, Any]]): | ||
""" | ||
To inject upstream dependencies' output instead of trying | ||
to read from DB when testing a task individually. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be honest, I'm not quite sure about the approach, so would ask for feedback. A potential risk and questions around the current one:
-
The users of the framework may start to use it as a public API, but we may want to change how it works at some point in the future. Is it a valid concern or not?
-
Can the users use
XCom
directly without the API's exposed byTaskInstance
or the models aren't available in the DAGs, so we cover the scenarios of the interactions withXCom
on theTaskInstance
layer?
--
Alternative options, which I thought of:
-
Pass
xcoms
down the call stack, like toti.run(injected_xcoms=xcoms)
. Extending theti.run
signature didn't look quite attractive to me, though may be more clear and explicit in comparison to preliminary "injection". But injection is similar to what we do withparams
:airflow/airflow/cli/commands/task_command.py
Line 590 in 4e3b5ae
task.params.update(passed_in_params) -
Have a module with a global variable, which we could initialise in
task_command
and then read from either inTaskInstance.xcom_pull
orBaseXCom.get_one
/.get_many
. Global state.. -
Monkeypatch
ti.xcom_pull
(dirty?) -
Implement a custom
XCom
backend to be used inxcom.py
, which would read the injected data from memory. -
If we are able to run everything inside a common transaction, which is rolled-back at the end of the command execution, then we can just "put" the corresponding XComs in "DB" before running the task?
--
Since I wasn't sure about feasibility and validity of some of the approaches above (especially 4 and 5), I implemented the most straightforward way, similar to what we already have (the params
reference above), seeking for feedback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the users use XCom directly without the API
They can, via XCom.get_one
or get_many
. So I feel the only fully reliable way is to actually write the data to the database so they are available directly from the XCom interface.
We already do something similar for DagRun (if you specify a DagRun that does not actually exist, we would create a temporary one in the database for the test
run and delete it afterwards), so I can think of a couple of approaches:
- Write the custom XCom data to a separate table, and read them on
XCom
. Delete them after the test run. - Write the custom data to the real XCom table, and delete them afterwards. Emit an error (and refuse to execute) if there is already existing data matching the custom input in the table. This feels reasonable since it would only be possible if you test a task in an existing DAG run but don’t want to use the actual upstream XComs, which feels very niche and not a necessary feature to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Write the custom data to the real XCom table, and delete them afterwards. Emit an error (and refuse to execute) if there is already existing data matching the custom input in the table. This feels reasonable since it would only be possible if you test a task in an existing DAG run but don’t want to use the actual upstream XComs, which feels very niche and not a necessary feature to me.
For this one I have a concern that in general script may exit in a tricky way, even skipping finally
(https://stackoverflow.com/questions/49262379/does-finally-always-execute-in-python). When we're dealing with a temporary DAG run, then that's relatively fine and is visible from the run id:
https://github.com/apache/airflow/blob/main/airflow/cli/commands/task_command.py#L74-L80
But for a normal DAG run it may leave the data in a strange state. Also probably if this is an ongoing DAG run, then, if I'm not mistaken, there might be race conditions like "write XCom in test command" -> "write the same XCom in a real task executed in the DAG run" -> "delete XCom in test command (clean up)".
What do you think?
@@ -596,6 +597,10 @@ def task_test(args, dag=None): | |||
task, args.map_index, exec_date_or_run_id=args.execution_date_or_run_id, create_if_necessary="db" | |||
) | |||
|
|||
xcoms = args.xcoms or [] | |||
_validate_injected_xcoms(xcoms, task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we stick to the ti.inject_xcoms
approach, then it may be considered moving the validation logic to ti.inject_xcoms
itself. It'll imply that if inject_xcoms
is called, it should inject all the required XComs at once, but probably that's fine.
Also the exception raised from there will lack the "Please, pass them via the --xcoms argument (see --help for more details)."
part (in the context of the method we won't know where it's called from). It will need either to be caught and added in task_command
or omitted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also the exception raised from there will lack the [context]
You can raise something other than AirflowException (I don’t see why you need to raise that exception), and catch it here to provide the additional context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don’t see why you need to raise that exception
Let me clarify if you are questioning why we should raise an exception at all (like not go with returning a bool flag or something) or you're not sure if we need the AirflowException
class here? If it's the latter, what is the intended scenario for that? I just noticed that it had been used for the args validation in the module.
But generally absolutely aligned! Actually I also mentioned that it may've been intercepted and enriched, definitely not a blocker, just a consideration.
7f7be99
to
f93c1ba
Compare
f93c1ba
to
86b047b
Compare
) | ||
BashOperator( | ||
task_id="run_bash_with_manually_pulled_command", | ||
bash_command='{{ task_instance.xcom_pull(task_ids="manually_push_bash_command", key="command") }}', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't currently cover this case with the beforehand validation of the passed XComs since XComArg.iter_xcom_references(task)
won't include it, it's a lower level. We may try to identify it real-time, when it's being queried to return a meaningful error.
But generally DAG doesn't identify it as a dependency either, because it seems to be hard to do, so probably it's relatively acceptable and this use case is mostly on the user?
def iter_xcom_dependencies(self) -> Iterator[tuple[Operator, str]]: | ||
"""Upstream dependencies that provide XComs used by this operator.""" | ||
from airflow.models.xcom_arg import XComArg | ||
|
||
yield from XComArg.iter_xcom_references(self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it a question why we need to define a method for that instead of calling XComArg.iter_xcom_references
directly? As I see that, when we look at a task
and want to understand, which XComs it depends on, XComArg.iter_xcom_references(task)
seems to me like a lower level detail of implementation. Something similar happens in the mapped operator:
airflow/airflow/models/mappedoperator.py
Lines 638 to 643 in 3f6b557
def iter_mapped_dependencies(self) -> Iterator[Operator]: | |
"""Upstream dependencies that provide XComs used by this task for task mapping.""" | |
from airflow.models.xcom_arg import XComArg | |
for operator, _ in XComArg.iter_xcom_references(self._get_specified_expand_input()): | |
yield operator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iter_xcom_references
is an internal interface that Airflow internals are free to use. MappedOperator has a wrapper for it because the wrapper also references a private member; this is not needed for BaseOperator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the recommendation to call XComArg.iter_xcom_references
directly then, right?
FYI: Fix to failing test in #30239 |
You should rebase now. |
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions. |
Context
During discussion under #28287 it was decided that it may've been useful to be able to pass XCom Args to a task, when it's being tested with the
tasks test
CLI command and notify a user if a task is dependent on some XCom Arg, but it hasn't been passed.Solution
Introducing the
--xcoms
argument for thetasks test
command, which a JSON list with XCom objects can be passed to. The JSON has the following structure:[{"task_id": "my_task", "value": "foo"}, {"task_id": "another_task", "key": "custom_key", "value": 42}]
, which allows user to work both with the cases of the implicit XCom used by the Task Flow API (i.e.,xcom_key == "return_value"
) and of the XCom args with the custom keys (e.g., manually pushed and / or used in the templated kwargs). If"key"
is omitted for any of the XCom object,"return_value"
is used for it by default. The format of the passed argument is validated with the corresponding JSON schema.Hence, a full CLI call may look like:
airflow tasks test example_passing_xcoms_via_test_command python_echo --xcoms [{"task_id": "get_python_echo_message", "value": "test xcom arg"}]
or
airflow tasks test example_passing_xcoms_via_test_command run_bash --xcoms [{"task_id": "get_bash_command", "key": "return_value", "value": "echo \'test bash\'"}]
If a user misses any XCom args in their call, they'll be able to see a message like:
closes: #28287
closes: #24232