Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single pass serialization #4699

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
176ed15
Implementing single pass serialization
madsbk Apr 15, 2021
0a4433a
Added dumps/loads cache
madsbk Apr 15, 2021
ff0a542
Fixed test_batched test
madsbk Apr 16, 2021
084680c
Fixed test_turn_off_pickle
madsbk Apr 16, 2021
6548968
fixed test_pickle_safe
madsbk Apr 16, 2021
ca6d0f6
fixed test_executor_offload
madsbk Apr 16, 2021
f951260
Merge branch 'main' of github.com:dask/distributed into single_pass_s…
madsbk Apr 22, 2021
2da2476
delay exceptions when loads_function
madsbk Apr 22, 2021
9717ae8
fixed test_rpc_serialization
madsbk Apr 22, 2021
ec3d4ad
Making sure to also unpack remotes in MsgpackList
madsbk Apr 22, 2021
f9b6f7c
Merge branch 'main' of github.com:dask/distributed into single_pass_s…
madsbk Apr 27, 2021
845b290
Revert "fixed test_rpc_serialization"
madsbk Apr 27, 2021
cd937b5
Checking the deserialises before unpickle
madsbk Apr 27, 2021
7fb8a26
Rewritten test_robust_unserializable()
madsbk Apr 27, 2021
79f8d24
typo
madsbk Apr 27, 2021
9aa4bbe
Revert "Fixed test_turn_off_pickle"
madsbk Apr 28, 2021
f700725
Checking the serialises before pickle
madsbk Apr 28, 2021
13ddcec
Revert "fixed test_pickle_safe"
madsbk Apr 28, 2021
2fa9f12
test_robust_unserializable(): catching CancelledError
madsbk Apr 28, 2021
e081747
Merge branch 'main' of github.com:dask/distributed into single_pass_s…
madsbk May 4, 2021
f544006
Merge branch 'main' of github.com:dask/distributed into single_pass_s…
madsbk May 5, 2021
6a2a915
MsgpackList: support isinstance(x, list)
madsbk May 5, 2021
be143ba
SerializedCallable: accept args
madsbk May 5, 2021
29baab1
to_serialize = msgpack_persist_lists
madsbk May 5, 2021
e7d5105
replaced Serialize and to_serialize with TaskGraphValue
madsbk May 5, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 113 additions & 22 deletions distributed/protocol/core.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import logging
import threading

import msgpack

from ..utils import LRU
from . import pickle
from .compression import decompress, maybe_compress
from .serialize import (
MsgpackList,
Serialize,
Serialized,
SerializedCallable,
TaskGraphValue,
merge_and_deserialize,
msgpack_decode_default,
msgpack_encode_default,
Expand All @@ -15,6 +21,39 @@

logger = logging.getLogger(__name__)

cache_dumps = LRU(maxsize=100)
cache_loads = LRU(maxsize=100)
cache_dumps_lock = threading.Lock()
cache_loads_lock = threading.Lock()
Comment on lines +24 to +27
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are discussing getting rid of dumps_function, will these still be needed or will they go away as well?



def dumps_function(func):
""" Dump a function to bytes, cache functions """
try:
with cache_dumps_lock:
result = cache_dumps[func]
except KeyError:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't have a good feeling on what timescales where currently optimizing or whether this is a particularly performance critical section. Therefore, this comment might be irrelevant.
However, exception handling is relatively expensive and if we encounter a lot of cache misses a isin should be faster. That's ns level optimization. I could imagine the pickling is usually an order of magnitude slower and this doesn't matter at all

result = pickle.dumps(func, protocol=4)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why protocol=4 is hard coded?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also curious about this

Copy link
Contributor Author

@madsbk madsbk Apr 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious too :)
This is taken directly from

result = pickle.dumps(func, protocol=4)

@jakirkham do you know?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That line comes from @mrocklin's PR ( #4019 ), which allowed connections to dynamically determine what compression and pickle protocols are supported and then use them in communication. In a few places I think Matt found it easier to simply force pickle protocol 4 than allow it to be configurable. So if this is coming from that worker code, that is the history

if len(result) < 100000:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can be a bit more generous with the cache size. currently we're at 100 (LRU maxsize) * 100_000 B (result) ~ 1MB. Considering how much stuff we're logging without taking size into account too much, I would suggest to be more generous with this upper limit since large results are the juicy cache hits.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related: in loads_function, what if we used hash(bytes_object) as the key instead of bytes_object itself? Then we wouldn't have to hang onto references to those large bytestrings that we won't look at again.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like that was just copied and moved over from here

if len(result) < 100000:

Perhaps we can make a new issue and revisit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can make a new issue and revisit?

My plan is to remove worker.dumps_function() completely, it shouldn't be required to call it explicitly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok in that case I don't think the protocol=4 bit above will be needed

with cache_dumps_lock:
cache_dumps[func] = result
except TypeError: # Unhashable function
result = pickle.dumps(func, protocol=4)
return result


def loads_function(bytes_object):
""" Load a function from bytes, cache bytes """
if len(bytes_object) < 100000:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

personal preference: I would put the size of the cache into a constant s.t. the two function don't drift apart

with cache_dumps_lock:
if bytes_object in cache_loads:
return cache_loads[bytes_object]
else:
result = pickle.loads(bytes_object)
cache_loads[bytes_object] = result
return result
return pickle.loads(bytes_object)


def dumps(msg, serializers=None, on_error="message", context=None) -> list:
"""Transform Python message to bytestream suitable for communication
Expand Down Expand Up @@ -47,25 +86,32 @@ def _inplace_compress_frames(header, frames):

def _encode_default(obj):
typ = type(obj)
if typ is Serialize or typ is Serialized:
offset = len(frames)
if typ is Serialized:
sub_header, sub_frames = obj.header, obj.frames
else:
sub_header, sub_frames = serialize_and_split(
obj, serializers=serializers, on_error=on_error, context=context
)
_inplace_compress_frames(sub_header, sub_frames)
sub_header["num-sub-frames"] = len(sub_frames)
frames.append(
msgpack.dumps(
sub_header, default=msgpack_encode_default, use_bin_type=True
)
)
frames.extend(sub_frames)
return {"__Serialized__": offset}

ret = msgpack_encode_default(obj)
if ret is not obj:
return ret

if typ is Serialize:
obj = obj.data # TODO: remove Serialize/to_serialize completely

offset = len(frames)
if typ in (Serialized, SerializedCallable):
sub_header, sub_frames = obj.header, obj.frames
elif callable(obj):
sub_header, sub_frames = {"callable": dumps_function(obj)}, []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functions can be quite large sometimes, for example if users close over large variables out of function scope. Msgpack may not handle this well in some cases

x = np.arange(1000000000)

def f(y):
    return y + x.sum()

Obviously users shouldn't do this, but they will.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we're bypassing the list of serializers here. This allows users to get past configurations where users specifically turn off pickle.

else:
return msgpack_encode_default(obj)
sub_header, sub_frames = serialize_and_split(
obj, serializers=serializers, on_error=on_error, context=context
)
_inplace_compress_frames(sub_header, sub_frames)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inplace stuff always makes me uncomfortable. Thoughts on making new header/frames dict/lists here instead?

For reference, it was these sorts of inplace operations that previously caused us to run into the msgpack tuple vs list difference. I think that avoiding them when we can is useful, unless there is a large performance boost (which I wouldn't expect here).

sub_header["num-sub-frames"] = len(sub_frames)
frames.append(
msgpack.dumps(
sub_header, default=msgpack_encode_default, use_bin_type=True
)
)
frames.extend(sub_frames)
return {"__Serialized__": offset}

frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
return frames
Expand All @@ -75,9 +121,18 @@ def _encode_default(obj):
raise


class DelayedExceptionRaise:
def __init__(self, err):
self.err = err


def loads(frames, deserialize=True, deserializers=None):
""" Transform bytestream back into Python value """

if deserializers is None:
# TODO: get from configuration both here and in protocol.serialize()
deserializers = ("dask", "pickle")

try:

def _decode_default(obj):
Expand All @@ -87,19 +142,55 @@ def _decode_default(obj):
frames[offset],
object_hook=msgpack_decode_default,
use_list=False,
**msgpack_opts
**msgpack_opts,
)
offset += 1
sub_frames = frames[offset : offset + sub_header["num-sub-frames"]]
if "callable" in sub_header:
if deserialize:
try:
if "pickle" not in deserializers:
raise TypeError(
f"Cannot deserialize {sub_header['callable']}, "
"pickle isn't in deserializers"
)
return loads_function(sub_header["callable"])
except Exception as e:
if deserialize == "delay-exception":
return DelayedExceptionRaise(e)
Comment on lines +168 to +169
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused about when this is necessary and why it wasn't before. I'm wary of creating new systems like this if we can avoid it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand this now that I've seen the c.submit(identity, Foo()) test below

else:
raise
else:
return SerializedCallable(sub_header, sub_frames)
if deserialize:
if "compression" in sub_header:
sub_frames = decompress(sub_header, sub_frames)
return merge_and_deserialize(
sub_header, sub_frames, deserializers=deserializers
)
try:
return merge_and_deserialize(
sub_header, sub_frames, deserializers=deserializers
)
except Exception as e:
if deserialize == "delay-exception":
return DelayedExceptionRaise(e)
else:
raise
else:
return Serialized(sub_header, sub_frames)
else:
# Notice, even though `msgpack_decode_default()` supports
# `__MsgpackList__`, we decode it here explicitly. This way
# we can delay the convertion to a regular `list` until it
# gets to a worker.
if "__MsgpackList__" in obj:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the type of obj here? Is in the right test here, or is this special value in a more specific place?

if deserialize:
return list(obj["as-tuple"])
else:
return MsgpackList(obj["as-tuple"])
if "__TaskGraphValue__" in obj:
if deserialize:
return obj["data"]
else:
return TaskGraphValue(obj["data"])
return msgpack_decode_default(obj)

return msgpack.loads(
Expand Down
Loading