Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delay until Docker Swarm recognizes finished service #9310

Closed
CodingJonas opened this issue Jun 15, 2020 · 9 comments
Closed

Delay until Docker Swarm recognizes finished service #9310

CodingJonas opened this issue Jun 15, 2020 · 9 comments
Labels
area:providers kind:bug This is a clearly a bug

Comments

@CodingJonas
Copy link
Contributor

CodingJonas commented Jun 15, 2020

Apache Airflow version: 2.0.0dev

  • OS (e.g. from /etc/os-release): Ubuntu 18.04
  • Install tools: pip

What happened:
When running a DockerSwarmOperator the service finishes, but Airflow will detect that finished service only 60 seconds after it has finished.

The issue lies in this line:

    def _stream_logs_to_output(self):
        logs = self.cli.service_logs(
            self.service['ID'], follow=True, stdout=True, stderr=True, is_tty=self.tty
        )
        line = ''
        while True:
            try:
                log = next(logs)

When removing this it works as expected. next(logs) is a blocking call, and for some reason the docker-py library, which is behind this call, does not recognize the finished service. After I think exactly 60 seconds this call crashes, which allows the operator to continue.

How to reproduce it:
Any DAG using the DockerSwarmOperator works, e.g.:

    task1 = DockerSwarmOperator(
        task_id='docker_swarm_validator',
        image='alpine:3.11.5',
        api_version='auto',
        command='echo X',
        tty=True,
    )

Anything else we need to know:

A workaround I found was to execute the logging in a separate process, while checking for the current status of the service in the main process. Once the service has finished, the logging process can simply be terminated.
The workaround would look something like this:

        if self.enable_logging:
            # Since this subprocess is daemonized, it will automatically be terminated once the main script finishes
            p = Process(target=_stream_logs_to_output,
                        args=(self.log,
                              self.cli.service_logs(self.service['ID'], follow=True, stdout=True, stderr=True, is_tty=self.tty)
                              ),
                        daemon=True
                        )
            p.start()

I removed the _stream_logs_to_output function from the class to better separate used ressources.

@CodingJonas CodingJonas added the kind:bug This is a clearly a bug label Jun 15, 2020
@mik-laj
Copy link
Member

mik-laj commented Jun 15, 2020

@nullhack @retornam @CodingJonas @CaptainCuddleCube @nalepae @akki Can you look at it? It may be interesting for you.

@nullhack
Copy link
Contributor

Sure, I'll put on my list, after reviewing the other PR

@CodingJonas
Copy link
Contributor Author

I currently using the workaround with a subprocess without issues. Not saying it is perfect, but I can make a pull request out of if, since it seems to me like a sufficient solution.

@akki
Copy link
Contributor

akki commented Jul 23, 2020

@CodingJonas Just curious if you asked upstream why docker-py behaves that way?
It might be worth asking them to know if they can fix the root cause itself.

@CodingJonas
Copy link
Contributor Author

That's a good point, I see that I open an issue on their side today and come around to polish my changes for a at least WIP merge request to see if my current solution is feasible :)

@retornam
Copy link
Contributor

@CodingJonas you are hitting a known bug in docker-py where is is_tty is set and there isn't any streaming output for about 60 seconds, the connection is closed see docker/docker-py#931 (comment), there is a fix for it in docker/docker-py#1959

@eladkal
Copy link
Contributor

eladkal commented May 6, 2021

I'm not quite sure if this is an Airflow issue?

@enima2684
Copy link
Contributor

The issue docker/docker-py#931 is still not yet fixed and the DockerSwarmOperator still have the 60s delay before finishing a completed service.

Any updated workaround you are using to prevent this delay ?

@CodingJonas
Copy link
Contributor Author

CodingJonas commented Sep 6, 2021

Sorry for not following up on my workaround, we moved to Kubernetes, so we did not finish polishing the workaround properly. I can still add the main code parts we used to try to fix it. Perhaps it helps you!

    def _run_service(self):
        ...
        if self.enable_logging:
            service_log_stream = self.cli.service_logs(self.service['ID'], follow=True, stdout=True, stderr=True, is_tty=self.tty)
            _start_logging_async(self.log, service_log_stream)
        ...

def _start_logging_async(logger, service_log_stream):
    """
    The logging task is blocking and thus stops the operator from recognizing in time when the service finishes.
    Since the logging thread is demonized, it will automatically be terminated once the main script finishes.
    """
    p = Thread(target=_stream_logs_to_output,
               args=(logger, service_log_stream),
               daemon=True)
    p.start()

def _stream_logs_to_output(logger, logs):
    line = ''
    while True:
        try:
            log = next(logs)
        # TODO: Remove this clause once https://github.com/docker/docker-py/issues/931 is fixed
        except requests.exceptions.ConnectionError:
            logger.info("Connection Error while fetching log")
            # If the service log stream stopped sending messages, check if it the service has
            # terminated.
            break
        except StopIteration:
            logger.info("StopIteration while fetching log")
            # If the service log stream terminated, stop fetching logs further.
            break
        else:
            try:
                log = log.decode()
            except UnicodeDecodeError:
                continue
            if log == '\n':
                logger.info(line)
                line = ''
            else:
                line += log
    # flush any remaining log stream
    if line:
        logger.info(line)

The only addition we did is wrapping the _stream_logs_to_output method in a Thread.

@apache apache locked and limited conversation to collaborators Sep 15, 2021
@eladkal eladkal closed this as completed Sep 15, 2021

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
area:providers kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

7 participants