Delay until Docker Swarm recognizes finished service #18270
Replies: 12 comments 4 replies
-
@nullhack @retornam @CodingJonas @CaptainCuddleCube @nalepae @akki Can you look at it? It may be interesting for you. |
Beta Was this translation helpful? Give feedback.
-
Sure, I'll put on my list, after reviewing the other PR |
Beta Was this translation helpful? Give feedback.
-
I currently using the workaround with a subprocess without issues. Not saying it is perfect, but I can make a pull request out of if, since it seems to me like a sufficient solution. |
Beta Was this translation helpful? Give feedback.
-
@CodingJonas Just curious if you asked upstream why |
Beta Was this translation helpful? Give feedback.
-
That's a good point, I see that I open an issue on their side today and come around to polish my changes for a at least WIP merge request to see if my current solution is feasible :) |
Beta Was this translation helpful? Give feedback.
-
@CodingJonas you are hitting a known bug in |
Beta Was this translation helpful? Give feedback.
-
I'm not quite sure if this is an Airflow issue? |
Beta Was this translation helpful? Give feedback.
-
The issue docker/docker-py#931 is still not yet fixed and the DockerSwarmOperator still have the 60s delay before finishing a completed service. Any updated workaround you are using to prevent this delay ? |
Beta Was this translation helpful? Give feedback.
-
Sorry for not following up on my workaround, we moved to Kubernetes, so we did not finish polishing the workaround properly. I can still add the main code parts we used to try to fix it. Perhaps it helps you! def _run_service(self):
...
if self.enable_logging:
service_log_stream = self.cli.service_logs(self.service['ID'], follow=True, stdout=True, stderr=True, is_tty=self.tty)
_start_logging_async(self.log, service_log_stream)
...
def _start_logging_async(logger, service_log_stream):
"""
The logging task is blocking and thus stops the operator from recognizing in time when the service finishes.
Since the logging thread is demonized, it will automatically be terminated once the main script finishes.
"""
p = Thread(target=_stream_logs_to_output,
args=(logger, service_log_stream),
daemon=True)
p.start()
def _stream_logs_to_output(logger, logs):
line = ''
while True:
try:
log = next(logs)
# TODO: Remove this clause once https://github.com/docker/docker-py/issues/931 is fixed
except requests.exceptions.ConnectionError:
logger.info("Connection Error while fetching log")
# If the service log stream stopped sending messages, check if it the service has
# terminated.
break
except StopIteration:
logger.info("StopIteration while fetching log")
# If the service log stream terminated, stop fetching logs further.
break
else:
try:
log = log.decode()
except UnicodeDecodeError:
continue
if log == '\n':
logger.info(line)
line = ''
else:
line += log
# flush any remaining log stream
if line:
logger.info(line) The only addition we did is wrapping the |
Beta Was this translation helpful? Give feedback.
-
FYI, docker/docker-py#931 has got fixed. Also, we can remove this exception handling as per the comment. |
Beta Was this translation helpful? Give feedback.
-
docker-py version 5.0.3 was released with a the fix of docker/docker-py#931 https://github.com/docker/docker-py/releases/tag/5.0.3 |
Beta Was this translation helpful? Give feedback.
-
I opened #18867 as followup |
Beta Was this translation helpful? Give feedback.
-
Apache Airflow version: 2.0.0dev
What happened:
When running a DockerSwarmOperator the service finishes, but Airflow will detect that finished service only 60 seconds after it has finished.
The issue lies in this line:
airflow/airflow/providers/docker/operators/docker_swarm.py
Line 171 in 832593a
When removing this it works as expected.
next(logs)
is a blocking call, and for some reason the docker-py library, which is behind this call, does not recognize the finished service. After I think exactly 60 seconds this call crashes, which allows the operator to continue.How to reproduce it:
Any DAG using the DockerSwarmOperator works, e.g.:
Anything else we need to know:
A workaround I found was to execute the logging in a separate process, while checking for the current status of the service in the main process. Once the service has finished, the logging process can simply be terminated.
The workaround would look something like this:
I removed the
_stream_logs_to_output
function from the class to better separate used ressources.Beta Was this translation helpful? Give feedback.
All reactions