generated from ansible-collections/collection_template
-
Notifications
You must be signed in to change notification settings - Fork 122
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add docker_container_exec module (#105)
* Move some code from plugin_utils to module_utils. * First version of docker_container_exec module. * Linting. * Avoid using display. * Move common socket_handler code to module_utils. * Add module support, use it in docker_container_exec. * Add tests. * Fix copyright lines. * Add this and other 'new' modules to README. * Apply suggestions from code review Co-authored-by: Andrew Klychkov <[email protected]> * Update plugins/modules/docker_container_exec.py Co-authored-by: Andrew Klychkov <[email protected]>
- Loading branch information
1 parent
6722435
commit ff503d9
Showing
9 changed files
with
742 additions
and
212 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,232 @@ | ||
# Copyright (c) 2019-2021, Felix Fontein <[email protected]> | ||
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) | ||
|
||
from __future__ import (absolute_import, division, print_function) | ||
__metaclass__ = type | ||
|
||
|
||
import os | ||
import os.path | ||
import socket as pysocket | ||
|
||
from ansible.module_utils.basic import missing_required_lib | ||
from ansible.module_utils.six import PY3 | ||
|
||
try: | ||
from docker.utils import socket as docker_socket | ||
import struct | ||
except Exception: | ||
# missing Docker SDK for Python handled in ansible_collections.community.docker.plugins.module_utils.common | ||
pass | ||
|
||
from ansible_collections.community.docker.plugins.module_utils.socket_helper import ( | ||
make_unblocking, | ||
shutdown_writing, | ||
write_to_socket, | ||
) | ||
|
||
|
||
PARAMIKO_POLL_TIMEOUT = 0.01 # 10 milliseconds | ||
|
||
|
||
class DockerSocketHandlerBase(object): | ||
def __init__(self, sock, selectors, log=None): | ||
make_unblocking(sock) | ||
|
||
self._selectors = selectors | ||
if log is not None: | ||
self._log = log | ||
else: | ||
self._log = lambda msg: True | ||
self._paramiko_read_workaround = hasattr(sock, 'send_ready') and 'paramiko' in str(type(sock)) | ||
|
||
self._sock = sock | ||
self._block_done_callback = None | ||
self._block_buffer = [] | ||
self._eof = False | ||
self._read_buffer = b'' | ||
self._write_buffer = b'' | ||
self._end_of_writing = False | ||
|
||
self._current_stream = None | ||
self._current_missing = 0 | ||
self._current_buffer = b'' | ||
|
||
self._selector = self._selectors.DefaultSelector() | ||
self._selector.register(self._sock, self._selectors.EVENT_READ) | ||
|
||
def __enter__(self): | ||
return self | ||
|
||
def __exit__(self, type, value, tb): | ||
self._selector.close() | ||
|
||
def set_block_done_callback(self, block_done_callback): | ||
self._block_done_callback = block_done_callback | ||
if self._block_done_callback is not None: | ||
while self._block_buffer: | ||
elt = self._block_buffer.remove(0) | ||
self._block_done_callback(*elt) | ||
|
||
def _add_block(self, stream_id, data): | ||
if self._block_done_callback is not None: | ||
self._block_done_callback(stream_id, data) | ||
else: | ||
self._block_buffer.append((stream_id, data)) | ||
|
||
def _read(self): | ||
if self._eof: | ||
return | ||
if hasattr(self._sock, 'recv'): | ||
try: | ||
data = self._sock.recv(262144) | ||
except Exception as e: | ||
# After calling self._sock.shutdown(), OpenSSL's/urllib3's | ||
# WrappedSocket seems to eventually raise ZeroReturnError in | ||
# case of EOF | ||
if 'OpenSSL.SSL.ZeroReturnError' in str(type(e)): | ||
self._eof = True | ||
return | ||
else: | ||
raise | ||
elif PY3 and isinstance(self._sock, getattr(pysocket, 'SocketIO')): | ||
data = self._sock.read() | ||
else: | ||
data = os.read(self._sock.fileno()) | ||
if data is None: | ||
# no data available | ||
return | ||
self._log('read {0} bytes'.format(len(data))) | ||
if len(data) == 0: | ||
# Stream EOF | ||
self._eof = True | ||
return | ||
self._read_buffer += data | ||
while len(self._read_buffer) > 0: | ||
if self._current_missing > 0: | ||
n = min(len(self._read_buffer), self._current_missing) | ||
self._current_buffer += self._read_buffer[:n] | ||
self._read_buffer = self._read_buffer[n:] | ||
self._current_missing -= n | ||
if self._current_missing == 0: | ||
self._add_block(self._current_stream, self._current_buffer) | ||
self._current_buffer = b'' | ||
if len(self._read_buffer) < 8: | ||
break | ||
self._current_stream, self._current_missing = struct.unpack('>BxxxL', self._read_buffer[:8]) | ||
self._read_buffer = self._read_buffer[8:] | ||
if self._current_missing < 0: | ||
# Stream EOF (as reported by docker daemon) | ||
self._eof = True | ||
break | ||
|
||
def _handle_end_of_writing(self): | ||
if self._end_of_writing and len(self._write_buffer) == 0: | ||
self._end_of_writing = False | ||
self._log('Shutting socket down for writing') | ||
shutdown_writing(self._sock, self._log) | ||
|
||
def _write(self): | ||
if len(self._write_buffer) > 0: | ||
written = write_to_socket(self._sock, self._write_buffer) | ||
self._write_buffer = self._write_buffer[written:] | ||
self._log('wrote {0} bytes, {1} are left'.format(written, len(self._write_buffer))) | ||
if len(self._write_buffer) > 0: | ||
self._selector.modify(self._sock, self._selectors.EVENT_READ | self._selectors.EVENT_WRITE) | ||
else: | ||
self._selector.modify(self._sock, self._selectors.EVENT_READ) | ||
self._handle_end_of_writing() | ||
|
||
def select(self, timeout=None, _internal_recursion=False): | ||
if not _internal_recursion and self._paramiko_read_workaround and len(self._write_buffer) > 0: | ||
# When the SSH transport is used, docker-py internally uses Paramiko, whose | ||
# Channel object supports select(), but only for reading | ||
# (https://github.com/paramiko/paramiko/issues/695). | ||
if self._sock.send_ready(): | ||
self._write() | ||
return True | ||
while timeout is None or timeout > PARAMIKO_POLL_TIMEOUT: | ||
result = self.select(PARAMIKO_POLL_TIMEOUT, _internal_recursion=True) | ||
if self._sock.send_ready(): | ||
self._read() | ||
result += 1 | ||
if result > 0: | ||
return True | ||
if timeout is not None: | ||
timeout -= PARAMIKO_POLL_TIMEOUT | ||
self._log('select... ({0})'.format(timeout)) | ||
events = self._selector.select(timeout) | ||
for key, event in events: | ||
if key.fileobj == self._sock: | ||
self._log( | ||
'select event read:{0} write:{1}'.format( | ||
event & self._selectors.EVENT_READ != 0, | ||
event & self._selectors.EVENT_WRITE != 0)) | ||
if event & self._selectors.EVENT_READ != 0: | ||
self._read() | ||
if event & self._selectors.EVENT_WRITE != 0: | ||
self._write() | ||
result = len(events) | ||
if self._paramiko_read_workaround and len(self._write_buffer) > 0: | ||
if self._sock.send_ready(): | ||
self._write() | ||
result += 1 | ||
return result > 0 | ||
|
||
def is_eof(self): | ||
return self._eof | ||
|
||
def end_of_writing(self): | ||
self._end_of_writing = True | ||
self._handle_end_of_writing() | ||
|
||
def consume(self): | ||
stdout = [] | ||
stderr = [] | ||
|
||
def append_block(stream_id, data): | ||
if stream_id == docker_socket.STDOUT: | ||
stdout.append(data) | ||
elif stream_id == docker_socket.STDERR: | ||
stderr.append(data) | ||
else: | ||
raise ValueError('{0} is not a valid stream ID'.format(stream_id)) | ||
|
||
self.end_of_writing() | ||
|
||
self.set_block_done_callback(append_block) | ||
while not self._eof: | ||
self.select() | ||
return b''.join(stdout), b''.join(stderr) | ||
|
||
def write(self, str): | ||
self._write_buffer += str | ||
if len(self._write_buffer) == len(str): | ||
self._write() | ||
|
||
|
||
class DockerSocketHandlerModule(DockerSocketHandlerBase): | ||
def __init__(self, sock, module, selectors): | ||
super(DockerSocketHandlerModule, self).__init__(sock, selectors, module.debug) | ||
|
||
|
||
def find_selectors(module): | ||
try: | ||
# ansible-base 2.10+ has selectors a compat version of selectors, which a bundled fallback: | ||
from ansible.module_utils.compat import selectors | ||
return selectors | ||
except ImportError: | ||
pass | ||
try: | ||
# Python 3.4+ | ||
import selectors | ||
return selectors | ||
except ImportError: | ||
pass | ||
try: | ||
# backport package installed in the system | ||
import selectors2 | ||
return selectors2 | ||
except ImportError: | ||
pass | ||
module.fail_json(msg=missing_required_lib('selectors2', reason='for handling stdin')) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
# Copyright (c) 2019-2021, Felix Fontein <[email protected]> | ||
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) | ||
|
||
from __future__ import (absolute_import, division, print_function) | ||
__metaclass__ = type | ||
|
||
|
||
import fcntl | ||
import os | ||
import os.path | ||
import socket as pysocket | ||
|
||
from ansible.module_utils.six import PY3 | ||
|
||
|
||
def make_unblocking(sock): | ||
if hasattr(sock, '_sock'): | ||
sock._sock.setblocking(0) | ||
elif hasattr(sock, 'setblocking'): | ||
sock.setblocking(0) | ||
else: | ||
fcntl.fcntl(sock.fileno(), fcntl.F_SETFL, fcntl.fcntl(sock.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) | ||
|
||
|
||
def _empty_writer(msg): | ||
pass | ||
|
||
|
||
def shutdown_writing(sock, log=_empty_writer): | ||
if hasattr(sock, 'shutdown_write'): | ||
sock.shutdown_write() | ||
elif hasattr(sock, 'shutdown'): | ||
try: | ||
sock.shutdown(pysocket.SHUT_WR) | ||
except TypeError as e: | ||
# probably: "TypeError: shutdown() takes 1 positional argument but 2 were given" | ||
log('Shutting down for writing not possible; trying shutdown instead: {0}'.format(e)) | ||
sock.shutdown() | ||
elif PY3 and isinstance(sock, getattr(pysocket, 'SocketIO')): | ||
sock._sock.shutdown(pysocket.SHUT_WR) | ||
else: | ||
log('No idea how to signal end of writing') | ||
|
||
|
||
def write_to_socket(sock, data): | ||
if hasattr(sock, '_send_until_done'): | ||
# WrappedSocket (urllib3/contrib/pyopenssl) doesn't have `send`, but | ||
# only `sendall`, which uses `_send_until_done` under the hood. | ||
return sock._send_until_done(data) | ||
elif hasattr(sock, 'send'): | ||
return sock.send(data) | ||
else: | ||
return os.write(sock.fileno(), data) |
Oops, something went wrong.