Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retrieve output docker swarm operator #41531

Merged

Conversation

rgriffier
Copy link
Contributor

@rgriffier rgriffier commented Aug 16, 2024

Closes: #41445

Updates the DockerSwarmOperator to include the same retrieve_output functionality as the DockerOperator: get access of the content of a file as XCom at the end of a task, before the container is destroyed. To take account of possible container replication at deployment time, a Task and Container retrieval step has been added, in addition to the specific retrieve_output functionality.

The modifications were tested with the Dag provided below:

from airflow import DAG
from airflow.utils.dates import days_ago
from datetime import timedelta

from docker import types
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.docker.operators.docker_swarm import DockerSwarmOperator

params = {
    'dag_id': 'test_xcom_docker_docker_swarm',
    'catchup': False,
    'max_active_runs': 1,
    'default_args': {
        'owner': 'airflow',
        'start_date': days_ago(1),
        'retries': 1,
        'retry_delay': timedelta(minutes=5)
    }
}

with DAG(**params) as dag:
    write_xcom_docker = DockerOperator(
        task_id='write_xcom_docker',
        image='python:latest',
        api_version='auto',
        command="""python -c '

import os
import pickle

capitals = {
  "Canada": "Ottawa", 
  "England": "London",
  "France": "Paris",
  "Germany": "Berlin", 
}

file_path = "/tmp/variable.pickle"
with open(file_path, "wb+") as file:
    pickle.dump(capitals, file)

    '
    """,
        retrieve_output=True,
        retrieve_output_path='/tmp/variable.pickle')

    write_xcom_docker_swarm = DockerSwarmOperator(
        task_id='write_xcom_docker_swarm',
        image='python:latest',
        api_version='auto',
        command="""python -c '

import os
import pickle

capitals = {
  "Canada": "Ottawa", 
  "England": "London",
  "France": "Paris",
  "Germany": "Berlin", 
}

file_path = "/tmp/variable.pickle"
with open(file_path, "wb+") as file:
    pickle.dump(capitals, file)

    '
    """,
        retrieve_output=True,
        retrieve_output_path='/tmp/variable.pickle')

    write_xcom_docker_swarm_replicas = DockerSwarmOperator(
        task_id='write_xcom_docker_swarm_replicas',
        image='python:latest',
        api_version='auto',
        command="""python -c '

import os
import pickle

capitals = {
  "Canada": "Ottawa", 
  "England": "London",
  "France": "Paris",
  "Germany": "Berlin", 
}

file_path = "/tmp/variable.pickle"
with open(file_path, "wb+") as file:
    pickle.dump(capitals, file)

    '
    """,
        retrieve_output=True,
        retrieve_output_path='/tmp/variable.pickle',
        mode=types.ServiceMode(mode="replicated", replicas=2))

write_xcom_docker >> write_xcom_docker_swarm >> write_xcom_docker_swarm_replicas

Here the results:

  • DockerOperator:
    image
  • DockerSwarmOperator without replication:
    image
  • DockerSwarmOperator with replicas=2:
    image
    As replicas=2 in the Service configuration inside the DAG, 2 files were loaded inside the return_value XCOM.

If needed, I can create a specific PR for the v2-10-test branch !


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

Copy link

boring-cyborg bot commented Aug 16, 2024

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: [email protected]
    Slack: https://s.apache.org/airflow-slack

@rgriffier rgriffier force-pushed the add-retrieve-output-docker-swarm-operator branch 12 times, most recently from 906897c to bde1589 Compare August 21, 2024 16:19
@rgriffier rgriffier force-pushed the add-retrieve-output-docker-swarm-operator branch from bde1589 to 651d9d5 Compare August 22, 2024 06:08
@potiuk potiuk merged commit 16f0073 into apache:main Aug 22, 2024
53 checks passed
Copy link

boring-cyborg bot commented Aug 22, 2024

Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions.

@rgriffier
Copy link
Contributor Author

@potiuk Thank you for validating the PR. Do I need to make a specific one for 2.10 so that the feature is available in the next release 2.XX?

@potiuk
Copy link
Member

potiuk commented Aug 22, 2024

This is provider-only change - it will be released in the next docker provider

Note! Current state of airflow development is that technically there will be no new 2.* releases any more (just 2.11 as a bridge release for Airflow 3) - but users will be able to upgrade providers for 2 for quite some time and we will keep on releasing them.

@spoutin
Copy link

spoutin commented Oct 13, 2024

unless I'm missing something, the docker sdk function to get_archive is only available on the the workers that hosted the docker container since it uses the container ID. If you have a multi node setup how are you able to retrieve the logs if the airflow worker that executed the docker container is not on the same docker swarm worker node?

i also noticed that you're calling the inspect_container, this has the same limitation; you can only inspect containers that are on the same worker.

The docker "tasks" APIs are what we should be using to access Docker Services tasks not the "container" APIs.

What's confusing me is that this was tested but I'm not sure how. Unless I'm confused about the Docker Swarm APIs or there is some form of a Docker API gateway / proxy that is routing the docker APIs to the correct workers I'm not sure how this is working.

@potiuk
Copy link
Member

potiuk commented Oct 14, 2024

Any comments for the above @rgriffier ?

@rgriffier
Copy link
Contributor Author

I'll have to check, I did test the code in Swarm mode but I'm not sure whether I was testing with a container deployed on a node other than the manager node... I'll have a look at it this weekend, I'll tell you again (but I'm fairly convinced by @spoutin's arguments, I'm afraid I tested in a configuration far from the reality of use)...

@rgriffier
Copy link
Contributor Author

I've just checked and it doesn't work if the container being deployed is not on the same host as the manager.
My bad, I wasn't on my usual infrastructure and I didn't notice that I only had one node. @spoutin comment was right on the mark. I've seen in your PR that you're proposing to delete what I'd added, which is actually a good idea. I thinks it's impossible to do what I wanted with the Docker API at the moment: Swarm mode does not allow containers deployed on worker nodes to be queried from the manager. Sorry for the bad testing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

DockerSwarmOperator retrieve_output XCom not working
3 participants