Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit periodic keepalive events from Worker #1191

Merged
merged 18 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions ansible_runner/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,15 @@ def main(sys_args=None):
"Using this will also assure that the directory is deleted when the job finishes."
)
)
worker_subparser.add_argument(
"--keepalive-seconds",
dest="keepalive_seconds",
default=0,
type=int,
help=(
"Emit a synthetic keepalive event every N seconds of idle. (default=0, disabled)"
)
)
process_subparser = subparser.add_parser(
'process',
help="Receive the output of remote ansible-runner work and distribute the results"
Expand Down Expand Up @@ -859,6 +868,7 @@ def main(sys_args=None):
limit=vargs.get('limit'),
streamer=streamer,
suppress_env_files=vargs.get("suppress_env_files"),
keepalive_seconds=vargs.get("keepalive_seconds"),
)
try:
res = run(**run_options)
Expand Down
3 changes: 2 additions & 1 deletion ansible_runner/config/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def __init__(self,
process_isolation=False, process_isolation_executable=None,
container_image=None, container_volume_mounts=None, container_options=None, container_workdir=None, container_auth_data=None,
ident=None, rotate_artifacts=0, timeout=None, ssh_key=None, quiet=False, json_mode=False,
check_job_event_data=False, suppress_env_files=False):
check_job_event_data=False, suppress_env_files=False, keepalive_seconds=0):
# common params
self.host_cwd = host_cwd
self.envvars = envvars
Expand Down Expand Up @@ -95,6 +95,7 @@ def __init__(self,
self.timeout = timeout
self.check_job_event_data = check_job_event_data
self.suppress_env_files = suppress_env_files
self.keepalive_seconds = keepalive_seconds

# setup initial environment
if private_data_dir:
Expand Down
158 changes: 118 additions & 40 deletions ansible_runner/streaming.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations # allow newer type syntax until 3.10 is our minimum

import codecs
import json
import os
Expand All @@ -6,17 +8,17 @@
import tempfile
import uuid
import traceback
try:
from collections.abc import Mapping
except ImportError:
from collections import Mapping


import ansible_runner
from ansible_runner.exceptions import ConfigurationError
from ansible_runner.loader import ArtifactLoader
import ansible_runner.plugins
from ansible_runner.utils import register_for_cleanup
from ansible_runner.utils.streaming import stream_dir, unstream_dir
from collections.abc import Mapping
from functools import wraps
from threading import Event, RLock, Thread


class UUIDEncoder(json.JSONEncoder):
Expand Down Expand Up @@ -60,12 +62,18 @@ def run(self):
return self.status, self.rc


class Worker(object):
def __init__(self, _input=None, _output=None, **kwargs):
class Worker:
def __init__(self, _input=None, _output=None, keepalive_seconds: int = 0, **kwargs):
if _input is None:
_input = sys.stdin.buffer
if _output is None:
_output = sys.stdout.buffer

self._keepalive_interval_sec = keepalive_seconds
self._keepalive_thread: Thread | None = None
nitzmahone marked this conversation as resolved.
Show resolved Hide resolved
self._output_event = Event()
self._output_lock = RLock()

self._input = _input
self._output = _output

Expand All @@ -81,6 +89,64 @@ def __init__(self, _input=None, _output=None, **kwargs):
self.status = "unstarted"
self.rc = None

def _begin_keepalive(self):
"""Starts a keepalive thread at most once"""
if not self._keepalive_thread:
self._keepalive_thread = Thread(target=self._keepalive_loop)
self._keepalive_thread.start()

def _end_keepalive(self):
"""Disable the keepalive interval and notify the keepalive thread to shut down"""
self._keepalive_interval_sec = 0
self._output_event.set()

def _keepalive_loop(self):
"""Main loop for keepalive injection thread; exits when keepalive interval is <= 0"""
while self._keepalive_interval_sec > 0:
# block until output has occurred or keepalive interval elapses
if self._output_event.wait(timeout=self._keepalive_interval_sec):
# output was sent before keepalive timeout; reset the event and start waiting again
nitzmahone marked this conversation as resolved.
Show resolved Hide resolved
self._output_event.clear()
continue

# keepalive interval elapsed; try to send a keepalive...
# pre-acquire the output lock without blocking
if not self._output_lock.acquire(blocking=False):
nitzmahone marked this conversation as resolved.
Show resolved Hide resolved
# something else has the lock; output is imminent, so just skip this keepalive
# NB: a long-running operation under an event handler that's holding this lock but not actually moving
# output could theoretically block keepalives long enough to cause problems, but it's probably not
# worth the added locking hassle to be pedantic about it
continue

try:
# were keepalives recently disabled?
if self._keepalive_interval_sec <= 0:
# we're probably shutting down; don't risk corrupting output by writing now, just bail out
return
# output a keepalive event
# FIXME: this could be a lot smaller (even just `{}`) if a short-circuit discard was guaranteed in
# Processor or if other layers were more defensive about missing event keys and/or unknown dictionary
# values...
self.event_handler(dict(event='keepalive', counter=0, uuid=0))
nitzmahone marked this conversation as resolved.
Show resolved Hide resolved
finally:
# always release the output lock (
self._output_lock.release()

def _synchronize_output_reset_keepalive(wrapped_method):
"""
Utility decorator to synchronize event writes and flushes to avoid keepalives splatting in the middle of
mid-write events, and reset keepalive interval on write completion.
"""
@wraps(wrapped_method)
def wrapper(self, *args, **kwargs):
with self._output_lock:
ret = wrapped_method(self, *args, **kwargs)
# signal the keepalive thread last, so the timeout restarts after the last write, not before the first
self._output_event.set()
return ret

return wrapper

def update_paths(self, kwargs):
if kwargs.get('envvars'):
if 'ANSIBLE_ROLES_PATH' in kwargs['envvars']:
Expand All @@ -93,62 +159,70 @@ def update_paths(self, kwargs):
return kwargs

def run(self):
while True:
try:
line = self._input.readline()
data = json.loads(line)
except (json.decoder.JSONDecodeError, IOError):
self.status_handler({'status': 'error', 'job_explanation': 'Failed to JSON parse a line from transmit stream.'}, None)
self.finished_callback(None) # send eof line
return self.status, self.rc

if 'kwargs' in data:
self.job_kwargs = self.update_paths(data['kwargs'])
elif 'zipfile' in data:
self._begin_keepalive()
try:
while True:
try:
unstream_dir(self._input, data['zipfile'], self.private_data_dir)
except Exception:
self.status_handler({
'status': 'error',
'job_explanation': 'Failed to extract private data directory on worker.',
'result_traceback': traceback.format_exc()
}, None)
line = self._input.readline()
data = json.loads(line)
except (json.decoder.JSONDecodeError, IOError):
self.status_handler({'status': 'error', 'job_explanation': 'Failed to JSON parse a line from transmit stream.'}, None)
self.finished_callback(None) # send eof line
return self.status, self.rc
elif 'eof' in data:
break

self.kwargs.update(self.job_kwargs)
self.kwargs['quiet'] = True
self.kwargs['suppress_ansible_output'] = True
self.kwargs['private_data_dir'] = self.private_data_dir
self.kwargs['status_handler'] = self.status_handler
self.kwargs['event_handler'] = self.event_handler
self.kwargs['artifacts_handler'] = self.artifacts_handler
self.kwargs['finished_callback'] = self.finished_callback

r = ansible_runner.interface.run(**self.kwargs)
self.status, self.rc = r.status, r.rc

# FIXME: do cleanup on the tempdir
if 'kwargs' in data:
self.job_kwargs = self.update_paths(data['kwargs'])
elif 'zipfile' in data:
try:
unstream_dir(self._input, data['zipfile'], self.private_data_dir)
except Exception:
self.status_handler({
'status': 'error',
'job_explanation': 'Failed to extract private data directory on worker.',
'result_traceback': traceback.format_exc()
}, None)
self.finished_callback(None) # send eof line
return self.status, self.rc
elif 'eof' in data:
break

self.kwargs.update(self.job_kwargs)
self.kwargs['quiet'] = True
self.kwargs['suppress_ansible_output'] = True
self.kwargs['private_data_dir'] = self.private_data_dir
self.kwargs['status_handler'] = self.status_handler
self.kwargs['event_handler'] = self.event_handler
self.kwargs['artifacts_handler'] = self.artifacts_handler
self.kwargs['finished_callback'] = self.finished_callback

r = ansible_runner.interface.run(**self.kwargs)
self.status, self.rc = r.status, r.rc

# FIXME: do cleanup on the tempdir
finally:
self._end_keepalive()

return self.status, self.rc

@_synchronize_output_reset_keepalive
def status_handler(self, status_data, runner_config):
self.status = status_data['status']
self._output.write(json.dumps(status_data).encode('utf-8'))
self._output.write(b'\n')
self._output.flush()

@_synchronize_output_reset_keepalive
def event_handler(self, event_data):
self._output.write(json.dumps(event_data).encode('utf-8'))
self._output.write(b'\n')
self._output.flush()

@_synchronize_output_reset_keepalive
def artifacts_handler(self, artifact_dir):
stream_dir(artifact_dir, self._output)
self._output.flush()

@_synchronize_output_reset_keepalive
def finished_callback(self, runner_obj):
self._output.write(json.dumps({'eof': True}).encode('utf-8'))
nitzmahone marked this conversation as resolved.
Show resolved Hide resolved
self._output.write(b'\n')
Expand Down Expand Up @@ -210,6 +284,7 @@ def status_callback(self, status_data):
self.status_handler(status_data, runner_config=self.config)

def event_callback(self, event_data):
# FIXME: this needs to be more defensive to not blow up on "malformed" events or new values it doesn't recognize
full_filename = os.path.join(self.artifact_dir,
'job_events',
'{}-{}.json'.format(event_data['counter'],
Expand Down Expand Up @@ -254,6 +329,9 @@ def run(self):
self.artifacts_callback(data)
elif 'eof' in data:
break
# FIXME: add a short-circuit here to minimize the overhead of keepalives?
# elif data.get('event') == 'keepalive':
# continue
else:
self.event_callback(data)

Expand Down