Skip to content

Commit

Permalink
Add custom serialization support for pyarrow
Browse files Browse the repository at this point in the history
Closes dask#2103
  • Loading branch information
dhirschf committed Jul 13, 2018
1 parent c0f842d commit 5bac442
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 0 deletions.
5 changes: 5 additions & 0 deletions distributed/protocol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,8 @@ def _register_keras():
@partial(register_serialization_lazy, "sparse")
def _register_sparse():
from . import sparse


@partial(register_serialization_lazy, "arrow")
def _register_arrow():
from . import arrow
53 changes: 53 additions & 0 deletions distributed/protocol/arrow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from __future__ import print_function, division, absolute_import

from .serialize import register_serialization


def serialize_batch(batch):
import pyarrow as pa
sink = pa.BufferOutputStream()
writer = pa.RecordBatchStreamWriter(sink, batch.schema)
writer.write_batch(batch)
writer.close()
buf = sink.get_result()
header = {}
frames = [buf.to_pybytes()]
return header, frames


def deserialize_batch(header, frames):
import pyarrow as pa
blob = frames[0]
reader = pa.RecordBatchStreamReader(pa.BufferReader(blob))
return reader.read_next_batch()


def serialize_table(tbl):
import pyarrow as pa
sink = pa.BufferOutputStream()
writer = pa.RecordBatchStreamWriter(sink, tbl.schema)
writer.write_table(tbl)
writer.close()
buf = sink.get_result()
header = {}
frames = [buf.to_pybytes()]
return header, frames


def deserialize_table(header, frames):
import pyarrow as pa
blob = frames[0]
reader = pa.RecordBatchStreamReader(pa.BufferReader(blob))
return reader.read_all()


register_serialization(
'pyarrow.lib.RecordBatch',
serialize_batch,
deserialize_batch
)
register_serialization(
'pyarrow.lib.Table',
serialize_batch,
deserialize_table
)

0 comments on commit 5bac442

Please sign in to comment.