Skip to content

Commit

Permalink
Fix un-merged frames (dask#4666)
Browse files Browse the repository at this point in the history
* Add test for un-merged frames

* Don't double-split/compress Serialized frames

Previously we would re-serialize an object, even if it was a Serialized
object.  Instead we should just unpack its header and frames and be
done.

* specify num_sub_frames in all cases
  • Loading branch information
mrocklin authored and douglasdavis committed Apr 2, 2021
1 parent cf492ed commit 072756d
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 5 deletions.
11 changes: 7 additions & 4 deletions distributed/protocol/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,14 @@ def _encode_default(obj):
if typ is Serialize:
obj = obj.data
offset = len(frames)
sub_header, sub_frames = serialize_and_split(
obj, serializers=serializers, on_error=on_error, context=context
)
if typ is Serialized:
sub_header, sub_frames = obj.header, obj.frames
else:
sub_header, sub_frames = serialize_and_split(
obj, serializers=serializers, on_error=on_error, context=context
)
_inplace_compress_frames(sub_header, sub_frames)
sub_header["num-sub-frames"] = len(sub_frames)
_inplace_compress_frames(sub_header, sub_frames)
frames.append(
msgpack.dumps(
sub_header, default=msgpack_encode_default, use_bin_type=True
Expand Down
16 changes: 15 additions & 1 deletion distributed/protocol/tests/test_serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from dask.utils_test import inc

from distributed import wait
from distributed import Nanny, wait
from distributed.comm.utils import from_frames, to_frames
from distributed.protocol import (
Serialize,
Expand Down Expand Up @@ -499,3 +499,17 @@ def test_ser_memoryview_object():
data_in = memoryview(np.array(["hello"], dtype=object))
with pytest.raises(TypeError):
serialize(data_in, on_error="raise")


@gen_cluster(client=True, Worker=Nanny)
async def test_large_pickled_object(c, s, a, b):
np = pytest.importorskip("numpy")

class Data:
def __init__(self, n):
self.data = np.empty(n, dtype="u1")

x = Data(100_000_000)
y = await c.scatter(x, workers=[a.worker_address])
z = c.submit(lambda x: x, y, workers=[b.worker_address])
await z

0 comments on commit 072756d

Please sign in to comment.