Skip to content

Commit

Permalink
Consistently use msgpack module, added tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Akm0d committed Dec 17, 2019
1 parent 3fe5ded commit 89b01b5
Show file tree
Hide file tree
Showing 14 changed files with 562 additions and 207 deletions.
26 changes: 2 additions & 24 deletions salt/log/handlers/fluent_mod.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,36 +86,14 @@
# Import salt libs
from salt.log.setup import LOG_LEVELS
from salt.log.mixins import NewStyleClassMixIn
import salt.utils.msgpack
import salt.utils.network

# Import Third party libs
from salt.ext import six

log = logging.getLogger(__name__)

try:
# Attempt to import msgpack
import msgpack
import salt.utils.msgpack
# There is a serialization issue on ARM and potentially other platforms
# for some msgpack bindings, check for it
if msgpack.loads(msgpack.dumps([1, 2, 3]), use_list=True) is None:
raise ImportError
import salt.utils.msgpack
except ImportError:
# Fall back to msgpack_pure
try:
import msgpack_pure as msgpack
import salt.utils.msgpack
except ImportError:
# TODO: Come up with a sane way to get a configured logfile
# and write to the logfile when this error is hit also
LOG_FORMAT = '[%(levelname)-8s] %(message)s'
salt.log.setup_console_logger(log_format=LOG_FORMAT)
log.fatal('Unable to import msgpack or msgpack_pure python modules')
# Don't exit if msgpack is not available, this is to make local mode
# work without msgpack
#sys.exit(salt.exitcodes.EX_GENERIC)

# Define the module's virtual name
__virtualname__ = 'fluent'
Expand Down Expand Up @@ -458,7 +436,7 @@ def _make_packet(self, label, timestamp, data):
packet = (tag, timestamp, data)
if self.verbose:
print(packet)
return salt.utils.msgpack.packb(packet, _msgpack_module=msgpack)
return salt.utils.msgpack.packb(packet)

def _send(self, bytes_):
self.lock.acquire()
Expand Down
92 changes: 12 additions & 80 deletions salt/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import salt.log
import salt.transport.frame
import salt.utils.immutabletypes as immutabletypes
import salt.utils.msgpack
import salt.utils.stringutils
from salt.exceptions import SaltReqTimeoutError, SaltDeserializationError
from salt.utils.data import CaseInsensitiveDict
Expand All @@ -30,68 +31,20 @@

log = logging.getLogger(__name__)

HAS_MSGPACK = False
try:
# Attempt to import msgpack
import msgpack
# There is a serialization issue on ARM and potentially other platforms
# for some msgpack bindings, check for it
if msgpack.version >= (0, 4, 0):
if msgpack.loads(msgpack.dumps([1, 2, 3], use_bin_type=False), use_list=True) is None:
raise ImportError
else:
if msgpack.loads(msgpack.dumps([1, 2, 3]), use_list=True) is None:
raise ImportError
HAS_MSGPACK = True
except ImportError:
# Fall back to msgpack_pure
try:
import msgpack_pure as msgpack # pylint: disable=import-error
HAS_MSGPACK = True
except ImportError:
# TODO: Come up with a sane way to get a configured logfile
# and write to the logfile when this error is hit also
LOG_FORMAT = '[%(levelname)-8s] %(message)s'
salt.log.setup_console_logger(log_format=LOG_FORMAT)
log.fatal('Unable to import msgpack or msgpack_pure python modules')
# Don't exit if msgpack is not available, this is to make local mode
# work without msgpack
#sys.exit(salt.defaults.exitcodes.EX_GENERIC)


if HAS_MSGPACK:
import salt.utils.msgpack


if HAS_MSGPACK and not hasattr(msgpack, 'exceptions'):
class PackValueError(Exception):
'''
older versions of msgpack do not have PackValueError
'''

class exceptions(object):
'''
older versions of msgpack do not have an exceptions module
'''
PackValueError = PackValueError()

msgpack.exceptions = exceptions()


def package(payload):
'''
This method for now just wraps msgpack.dumps, but it is here so that
we can make the serialization a custom option in the future with ease.
'''
return salt.utils.msgpack.dumps(payload, _msgpack_module=msgpack)
return salt.utils.msgpack.dumps(payload)


def unpackage(package_):
'''
Unpackages a payload
'''
return salt.utils.msgpack.loads(package_, use_list=True,
_msgpack_module=msgpack)
return salt.utils.msgpack.loads(package_, use_list=True)


def format_payload(enc, **kwargs):
Expand Down Expand Up @@ -147,32 +100,27 @@ def ext_type_decoder(code, data):
gc.disable() # performance optimization for msgpack
loads_kwargs = {'use_list': True,
'ext_hook': ext_type_decoder}
if msgpack.version >= (0, 4, 0):
if salt.utils.msgpack.version >= (0, 4, 0):
# msgpack only supports 'encoding' starting in 0.4.0.
# Due to this, if we don't need it, don't pass it at all so
# that under Python 2 we can still work with older versions
# of msgpack.
if msgpack.version >= (0, 5, 2):
if salt.utils.msgpack.version >= (0, 5, 2):
if encoding is None:
loads_kwargs['raw'] = True
else:
loads_kwargs['raw'] = False
else:
loads_kwargs['encoding'] = encoding
try:
ret = salt.utils.msgpack.loads(msg, use_list=True,
ext_hook=ext_type_decoder,
encoding=encoding,
_msgpack_module=msgpack)
ret = salt.utils.msgpack.unpackb(msg, **loads_kwargs)
except UnicodeDecodeError:
# msg contains binary data
loads_kwargs.pop('raw', None)
loads_kwargs.pop('encoding', None)
ret = msgpack.loads(msg, **loads_kwargs)
ret = salt.utils.msgpack.loads(msg, **loads_kwargs)
else:
ret = salt.utils.msgpack.loads(msg, use_list=True,
ext_hook=ext_type_decoder,
_msgpack_module=msgpack)
ret = salt.utils.msgpack.loads(msg, **loads_kwargs)
if six.PY3 and encoding is None and not raw:
ret = salt.transport.frame.decode_embedded_strs(ret)
except Exception as exc:
Expand Down Expand Up @@ -226,7 +174,7 @@ def ext_type_encoder(obj):
# msgpack doesn't support datetime.datetime and datetime.date datatypes.
# So here we have converted these types to custom datatype
# This is msgpack Extended types numbered 78
return msgpack.ExtType(78, salt.utils.stringutils.to_bytes(
return salt.utils.msgpack.ExtType(78, salt.utils.stringutils.to_bytes(
obj.strftime('%Y%m%dT%H:%M:%S.%f')))
# The same for immutable types
elif isinstance(obj, immutabletypes.ImmutableDict):
Expand All @@ -242,18 +190,8 @@ def ext_type_encoder(obj):
return obj

try:
if msgpack.version >= (0, 4, 0):
# msgpack only supports 'use_bin_type' starting in 0.4.0.
# Due to this, if we don't need it, don't pass it at all so
# that under Python 2 we can still work with older versions
# of msgpack.
return salt.utils.msgpack.dumps(msg, default=ext_type_encoder,
use_bin_type=use_bin_type,
_msgpack_module=msgpack)
else:
return salt.utils.msgpack.dumps(msg, default=ext_type_encoder,
_msgpack_module=msgpack)
except (OverflowError, msgpack.exceptions.PackValueError):
return salt.utils.msgpack.packb(msg, default=ext_type_encoder, use_bin_type=use_bin_type)
except (OverflowError, salt.utils.msgpack.exceptions.PackValueError):
# msgpack<=0.4.6 don't call ext encoder on very long integers raising the error instead.
# Convert any very long longs to strings and call dumps again.
def verylong_encoder(obj, context):
Expand All @@ -280,13 +218,7 @@ def verylong_encoder(obj, context):
return obj

msg = verylong_encoder(msg, set())
if msgpack.version >= (0, 4, 0):
return salt.utils.msgpack.dumps(msg, default=ext_type_encoder,
use_bin_type=use_bin_type,
_msgpack_module=msgpack)
else:
return salt.utils.msgpack.dumps(msg, default=ext_type_encoder,
_msgpack_module=msgpack)
return salt.utils.msgpack.packb(msg, default=ext_type_encoder, use_bin_type=use_bin_type)

def dump(self, msg, fn_):
'''
Expand Down
45 changes: 7 additions & 38 deletions salt/serializers/msgpack.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,43 +12,17 @@
import logging

# Import Salt Libs
from salt.log import setup_console_logger
import salt.utils.msgpack
from salt.serializers import DeserializationError, SerializationError

# Import 3rd-party libs
from salt.ext import six

log = logging.getLogger(__name__)


try:
# Attempt to import msgpack
import msgpack
import salt.utils.msgpack
# There is a serialization issue on ARM and potentially other platforms
# for some msgpack bindings, check for it
if msgpack.loads(msgpack.dumps([1, 2, 3]), use_list=True) is None:
raise ImportError
available = True
except ImportError:
# Fall back to msgpack_pure
try:
import msgpack_pure as msgpack # pylint: disable=import-error
import salt.utils.msgpack
except ImportError:
# TODO: Come up with a sane way to get a configured logfile
# and write to the logfile when this error is hit also
LOG_FORMAT = '[%(levelname)-8s] %(message)s'
setup_console_logger(log_format=LOG_FORMAT)
log.fatal('Unable to import msgpack or msgpack_pure python modules')
# Don't exit if msgpack is not available, this is to make local mode
# work without msgpack
#sys.exit(salt.defaults.exitcodes.EX_GENERIC)
available = False
available = salt.utils.msgpack.HAS_MSGPACK


if not available:

def _fail():
raise RuntimeError('msgpack is not available')

Expand All @@ -58,22 +32,19 @@ def _serialize(obj, **options):
def _deserialize(stream_or_string, **options):
_fail()

elif msgpack.version >= (0, 2, 0):
elif salt.utils.msgpack.version >= (0, 2, 0):

def _serialize(obj, **options):
try:
return salt.utils.msgpack.dumps(obj, _msgpack_module=msgpack,
**options)
return salt.utils.msgpack.dumps(obj, **options)
except Exception as error:
raise SerializationError(error)

def _deserialize(stream_or_string, **options):
try:
options.setdefault('use_list', True)
options.setdefault('encoding', 'utf-8')
return salt.utils.msgpack.loads(stream_or_string,
_msgpack_module=msgpack,
**options)
return salt.utils.msgpack.loads(stream_or_string, **options)
except Exception as error:
raise DeserializationError(error)

Expand All @@ -100,16 +71,14 @@ def _decoder(obj):
def _serialize(obj, **options):
try:
obj = _encoder(obj)
return salt.utils.msgpack.dumps(obj, _msgpack_module=msgpack,
**options)
return salt.utils.msgpack.dumps(obj, **options)
except Exception as error:
raise SerializationError(error)

def _deserialize(stream_or_string, **options):
options.setdefault('use_list', True)
try:
obj = salt.utils.msgpack.loads(stream_or_string,
_msgpack_module=msgpack)
obj = salt.utils.msgpack.loads(stream_or_string)
return _decoder(obj)
except Exception as error:
raise DeserializationError(error)
Expand Down
4 changes: 2 additions & 2 deletions salt/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import salt.utils.files
import salt.utils.hashutils
import salt.utils.immutabletypes as immutabletypes
import salt.utils.msgpack as msgpack
import salt.utils.msgpack
import salt.utils.platform
import salt.utils.process
import salt.utils.url
Expand Down Expand Up @@ -2260,7 +2260,7 @@ def check_pause(self, low):
with salt.utils.files.fopen(pause_path, 'rb') as fp_:
try:
pdat = msgpack_deserialize(fp_.read())
except msgpack.UnpackValueError:
except salt.utils.msgpack.exceptions.UnpackValueError:
# Reading race condition
if tries > 10:
# Break out if there are a ton of read errors
Expand Down
1 change: 1 addition & 0 deletions salt/states/pkg.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
# The following imports are used by the namespaced win_pkg funcs
# and need to be included in their globals.
# pylint: disable=import-error,unused-import
import salt.utils.msgpack as msgpack
from salt.utils.versions import LooseVersion
# pylint: enable=import-error,unused-import
# pylint: enable=invalid-name
Expand Down
12 changes: 5 additions & 7 deletions salt/transport/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
import socket
import time

# Import 3rd-party libs
import msgpack

# Import Tornado libs
import tornado
import tornado.gen
Expand All @@ -23,6 +20,7 @@
from tornado.ioloop import IOLoop, TimeoutError as TornadoTimeoutError
from tornado.iostream import IOStream, StreamClosedError
# Import Salt libs
import salt.utils.msgpack
import salt.transport.client
import salt.transport.frame
from salt.ext import six
Expand Down Expand Up @@ -166,15 +164,15 @@ def return_message(msg):
else:
return _null
# msgpack deprecated `encoding` starting with version 0.5.2
if msgpack.version >= (0, 5, 2):
if salt.utils.msgpack.version >= (0, 5, 2):
# Under Py2 we still want raw to be set to True
msgpack_kwargs = {'raw': six.PY2}
else:
if six.PY2:
msgpack_kwargs = {'encoding': None}
else:
msgpack_kwargs = {'encoding': 'utf-8'}
unpacker = msgpack.Unpacker(**msgpack_kwargs)
unpacker = salt.utils.msgpack.Unpacker(**msgpack_kwargs)
while not stream.closed():
try:
wire_bytes = yield stream.read_bytes(4096, partial=True)
Expand Down Expand Up @@ -263,15 +261,15 @@ def __init__(self, socket_path, io_loop=None):
self._closing = False
self.stream = None
# msgpack deprecated `encoding` starting with version 0.5.2
if msgpack.version >= (0, 5, 2):
if salt.utils.msgpack.version >= (0, 5, 2):
# Under Py2 we still want raw to be set to True
msgpack_kwargs = {'raw': six.PY2}
else:
if six.PY2:
msgpack_kwargs = {'encoding': None}
else:
msgpack_kwargs = {'encoding': 'utf-8'}
self.unpacker = msgpack.Unpacker(**msgpack_kwargs)
self.unpacker = salt.utils.msgpack.Unpacker(**msgpack_kwargs)

def connected(self):
return self.stream is not None and not self.stream.closed()
Expand Down
Loading

0 comments on commit 89b01b5

Please sign in to comment.