-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Single pass serialization #4699
Conversation
72a7d1b
to
f8b7dc9
Compare
2d4d221
to
fc40639
Compare
fc40639
to
ca6d0f6
Compare
@jrbourbeau @mrocklin @jakirkham, this is ready for the first round of reviews :) |
try: | ||
with cache_dumps_lock: | ||
result = cache_dumps[func] | ||
except KeyError: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't have a good feeling on what timescales where currently optimizing or whether this is a particularly performance critical section. Therefore, this comment might be irrelevant.
However, exception handling is relatively expensive and if we encounter a lot of cache misses a isin
should be faster. That's ns
level optimization. I could imagine the pickling is usually an order of magnitude slower and this doesn't matter at all
with cache_dumps_lock: | ||
result = cache_dumps[func] | ||
except KeyError: | ||
result = pickle.dumps(func, protocol=4) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason why protocol=4
is hard coded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also curious about this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am curious too :)
This is taken directly from
distributed/distributed/worker.py
Line 3527 in fa5d993
result = pickle.dumps(func, protocol=4) |
@jakirkham do you know?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That line comes from @mrocklin's PR ( #4019 ), which allowed connections to dynamically determine what compression and pickle protocols are supported and then use them in communication. In a few places I think Matt found it easier to simply force pickle protocol 4 than allow it to be configurable. So if this is coming from that worker code, that is the history
result = cache_dumps[func] | ||
except KeyError: | ||
result = pickle.dumps(func, protocol=4) | ||
if len(result) < 100000: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can be a bit more generous with the cache size. currently we're at 100 (LRU maxsize) * 100_000 B (result) ~ 1MB
. Considering how much stuff we're logging without taking size into account too much, I would suggest to be more generous with this upper limit since large results are the juicy cache hits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related: in loads_function
, what if we used hash(bytes_object)
as the key instead of bytes_object
itself? Then we wouldn't have to hang onto references to those large bytestrings that we won't look at again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sounds like that was just copied and moved over from here
distributed/distributed/worker.py
Line 3528 in fa5d993
if len(result) < 100000: |
Perhaps we can make a new issue and revisit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we can make a new issue and revisit?
My plan is to remove worker.dumps_function()
completely, it shouldn't be required to call it explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok in that case I don't think the protocol=4
bit above will be needed
|
||
def loads_function(bytes_object): | ||
""" Load a function from bytes, cache bytes """ | ||
if len(bytes_object) < 100000: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
personal preference: I would put the size of the cache into a constant s.t. the two function don't drift apart
@fjetter good points! When we settle on an overall design I will incorporate you suggestions. Right now I am waiting on @jrbourbeau @mrocklin to review the overall design before continuing :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does seem a bit cleaner and simpler to me, without being a fundamental change, which is nice. I haven't thought more carefully about the implications yet though.
result = cache_dumps[func] | ||
except KeyError: | ||
result = pickle.dumps(func, protocol=4) | ||
if len(result) < 100000: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related: in loads_function
, what if we used hash(bytes_object)
as the key instead of bytes_object
itself? Then we wouldn't have to hang onto references to those large bytestrings that we won't look at again.
@@ -20,6 +21,8 @@ | |||
dask_deserialize = dask.utils.Dispatch("dask_deserialize") | |||
|
|||
_cached_allowed_modules = {} | |||
non_list_collection_types = (tuple, set, frozenset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are set
and frozenset
necessary here, since they can't contain lists or dicts, even recursively within tuples?
>>> {([])}
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: unhashable type: 'list'
and ( | ||
"pickle" not in serializers | ||
or serializers.index("pickle") > serializers.index("msgpack") | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still care about whether pickle is used or not, now that we have msgpack_persist_lists
?
Related: what happens if a MsgpackList
gets pickled? Won't it be passed on (in a task, say) as a MsgpackList
, not a plain list
? Whereas msgpack_decode_default
returns them as plain lists.
return {"__Set__": True, "as-tuple": tuple(obj)} | ||
|
||
if typ is MsgpackList: | ||
return {"__MsgpackList__": True, "as-tuple": tuple(obj.data)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return {"__MsgpackList__": True, "as-tuple": tuple(obj.data)} | |
return {"__MsgpackList__": True, "as-tuple": obj.data} |
What would happen if we did this instead? obj.data
should already be a list, so I'm wondering if the extra copy to a tuple is necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general things here seem ok. There are issues around passing through the list of serializers. We need to make sure that we can turn pickle off.
# With <https://github.com/dask/distributed/pull/4699>, | ||
# deserialization is done as part of communication. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jrbourbeau I think that you might want to be aware of this change
if typ in (Serialized, SerializedCallable): | ||
sub_header, sub_frames = obj.header, obj.frames | ||
elif callable(obj): | ||
sub_header, sub_frames = {"callable": dumps_function(obj)}, [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Functions can be quite large sometimes, for example if users close over large variables out of function scope. Msgpack may not handle this well in some cases
x = np.arange(1000000000)
def f(y):
return y + x.sum()
Obviously users shouldn't do this, but they will.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we're bypassing the list of serializers here. This allows users to get past configurations where users specifically turn off pickle.
sub_header, sub_frames = serialize_and_split( | ||
obj, serializers=serializers, on_error=on_error, context=context | ||
) | ||
_inplace_compress_frames(sub_header, sub_frames) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The inplace stuff always makes me uncomfortable. Thoughts on making new header/frames dict/lists here instead?
For reference, it was these sorts of inplace operations that previously caused us to run into the msgpack tuple vs list difference. I think that avoiding them when we can is useful, unless there is a large performance boost (which I wouldn't expect here).
if deserialize == "delay-exception": | ||
return DelayedExceptionRaise(e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am confused about when this is necessary and why it wasn't before. I'm wary of creating new systems like this if we can avoid it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I understand this now that I've seen the c.submit(identity, Foo())
test below
# `__MsgpackList__`, we decode it here explicitly. This way | ||
# we can delay the convertion to a regular `list` until it | ||
# gets to a worker. | ||
if "__MsgpackList__" in obj: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the type of obj here? Is in
the right test here, or is this special value in a more specific place?
header, frames = serialize([[[x]]]) | ||
assert "dask" in str(header) | ||
assert len(frames) == 1 | ||
assert x.data == np.frombuffer(frames[0]).data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious, why did we drop this test?
@@ -4628,8 +4624,6 @@ async def test_recreate_error_futures(c, s, a, b): | |||
|
|||
function, args, kwargs = await c._recreate_error_locally(f) | |||
assert f.status == "error" | |||
assert function.__name__ == "div" | |||
assert args == (1, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious, what happened here?
distributed/tests/test_client.py
Outdated
assert results == list(map(inc, range(10))) | ||
assert a.data and b.data | ||
assert results == list(map(inc, range(10))) | ||
assert a.data and b.data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hrm, you mentioned this in meeting a couple of weeks ago. I see now how this is unfortunate.
I would expect this test to now be written as
with pytest.raises(CancelledError):
await c.submit(identity, Foo())
I wouldn't expect the other lines here to be indented. In general seeing assert statements under a raises
context manager is a sign that something is unclean :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have changed it to make it more clear what is going on:
# Notice, because serialization is delayed until `distributed.batched`
# we don't get an exception immediately. The exception is raised and logged
# when the ongoing communication between the client the scheduler encounters
# the `Foo` class. Before <https://github.com/dask/distributed/pull/4699>
# the serialization happened immediately in `submit()`, which would raise the
# `MyException`.
with captured_logger("distributed") as caplog:
future = c.submit(identity, Foo())
# We sleep to make sure that a `BatchedSend.interval` has passed.
await asyncio.sleep(c.scheduler_comm.interval)
# Check that the serialization error was logged
assert "Failed to serialize" in caplog.getvalue()
I'm curious, what do you think of the approach? We cannot easily catch the exception because it happens as part of the ongoing communication and not in the submit()
call, but at least we log the exception.
distributed/tests/test_client.py
Outdated
with pytest.raises(TypeError): | ||
await c.run_on_scheduler(lambda: inc) | ||
await c.run(lambda: inc) | ||
await c.run_on_scheduler(lambda: inc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the user has specified that they don't want to allow serialization with pickle then these should continue to fail. Probably we need to feed the list of serializers down wherever serialize is beting called. I expect that this might be awkward to do when going through msgpack machinery. Maybe there is some global that we can misuse?
distributed/tests/test_core.py
Outdated
@@ -771,8 +771,7 @@ async def f(): | |||
await server.listen("tcp://") | |||
|
|||
async with rpc(server.address, serializers=["msgpack"]) as r: | |||
with pytest.raises(TypeError): | |||
await r.echo(x=to_serialize(inc)) | |||
await r.echo(x=to_serialize(inc)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These sorts of changes are probably not ok. They fundamentally change the intent of the test, which is to ensure that things like this can be disallowed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general things here seem ok. There are issues around passing through the list of serializers. We need to make sure that we can turn pickle off.
with cache_dumps_lock: | ||
result = cache_dumps[func] | ||
except KeyError: | ||
result = pickle.dumps(func, protocol=4) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also curious about this
if typ in (Serialized, SerializedCallable): | ||
sub_header, sub_frames = obj.header, obj.frames | ||
elif callable(obj): | ||
sub_header, sub_frames = {"callable": dumps_function(obj)}, [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we're bypassing the list of serializers here. This allows users to get past configurations where users specifically turn off pickle.
cache_dumps = LRU(maxsize=100) | ||
cache_loads = LRU(maxsize=100) | ||
cache_dumps_lock = threading.Lock() | ||
cache_loads_lock = threading.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are discussing getting rid of dumps_function
, will these still be needed or will they go away as well?
This PR streamlines the serialization in Distributed by relying on msgpack and only refer to the
serialize()/deserialize()
infrastructure when encountering objects not supported by msgpack.black distributed
/flake8 distributed
/isort distributed