-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Efficient Pandas serialization #614
Comments
I've spent a lot of time lately working on DataFrame serialization with as little overhead as possible. I'm happy to assist as best I can with development to support this use case and benchmarking to support the decision making. I recently did some rudimentary benchmarking on uncompressed numeric data, see slide 24 in http://www.slideshare.net/wesm/python-data-wrangling-preparing-for-the-future the benchmarking code (which I can clean up and publish if so desired) looks like: import pyarrow as pa
import pyarrow.io as io
import pyarrow.ipc as ipc
def write_arrow(df):
batch = pa.RecordBatch.from_pandas(df)
buf = BytesIO()
writer = ipc.ArrowFileWriter(buf, batch.schema)
writer.write_record_batch(batch)
writer.close()
return buf.getvalue()
def read_arrow(buf):
reader = ipc.ArrowFileReader(buf)
return [reader.get_record_batch(i).to_pandas()
for i in range(reader.num_record_batches)] Here, on the read path, this is all zero-copy up until converting to pandas.DataFrame, which introduces some overhead. This overhead could be reduced through the slightly messy process of deserializing directly into a 2D ndarray and then constructing pandas's internal BlockManager data structure. this will become much more favorable with pandas 2.0 (can be fully zero-copy if we use bitmaps for all null handling, see discussion in wesm/pandas2#46). For compressible data you can obviously add buffer-level compression -- see https://issues.apache.org/jira/browse/ARROW-300. So in other words, if you have an array with a null bitmap buffer and a contiguous buffer, these buffers would be compressed independently on going into the IPC blob. Dictionary encoding isn't yet implemented, but it's provided for in the IPC metadata. Maybe this is pie in the sky, but at some point it may be in dask's interests to adopt a language independent serialization protocol so there is the option to run task worker code written in C++, Julia, or other languages. |
Hi @wesm . Thanks for the read and write functions. These should be easy to benchmark and easy to incorporate as an optional dependency (hooray for Arrow being on conda-forge by the way). @jreback any interest in trying these out in the wild? You have some communication heavy computations. Zero copy sounds nice. So too does compression on different components. It'll be nice to try some workloads in the wild though to see how much impact these have. That might help us to prioritize or deprioritize them. I have not yet run into workloads that are limited by single memory copies. Network bandwidth is more commonly a larger issue for me, at least on Amazon's hardware.
It's decently possible and there is some slow momentum towards that direction. The custom serialization system is a step in that direction. There are other users clamoring for destination-specific serialization (e.g. gpu-gpu transfers) that would also benefit from this. Presumably you would say "OK, this data of type T has to move from worker A to B, what is the best serialization that we have for this case?" If there were a use case today the serialization work could happen in about a week. I put the cost of making a decent Julia/R/whatever worker/client at "a couple of weeks" by someone familiar with the networking stack in that language. See #586 |
sure, i'll give this a go. IIRC you are going to have this settable via though since on win-64.... will wait for @wesm to kindly make packages available for |
Ah, forgot about the Windows thing. You should wait to try this until I give you the go ahead. I'm fairly comfortable putting Arrow into an optional plugin after #606 gets merged. There will be a few of these for other formats as well. |
Initial timings comparing Pickle and Arrow. Corrections or counter-benchmarks welcome. Based on this very narrow benchmark custom serialization + blosc is winning out. In [1]: import pyarrow as pa
...: import pyarrow.io as io
...: import pyarrow.ipc as ipc
...:
...: def write_arrow(df):
...: batch = pa.RecordBatch.from_pandas(df)
...:
...: buf = BytesIO()
...: writer = ipc.ArrowFileWriter(buf, batch.schema)
...: writer.write_record_batch(batch)
...: writer.close()
...: return buf.getvalue()
...:
...:
...: def read_arrow(buf):
...: reader = ipc.ArrowFileReader(buf)
...: return [reader.get_record_batch(i).to_pandas()
...: for i in range(reader.num_record_batches)]
...:
In [2]: from io import BytesIO
In [3]: import string
In [4]: import numpy as np
In [5]: import pandas as pd
In [6]: n = 1000000
In [7]: df = pd.DataFrame({'x': np.random.randint(0, 10000, size=n, dtype='i4'), 'y': np.random.random(n),
...: 't': np.random.choice(list(string.ascii_lowercase), size=n)})
In [8]: import pickle
In [9]: %time len(write_arrow(df))
CPU times: user 120 ms, sys: 4 ms, total: 124 ms
Wall time: 121 ms
Out[9]: 17000630
In [10]: %time len(pickle.dumps(df))
CPU times: user 36 ms, sys: 8 ms, total: 44 ms
Wall time: 42.7 ms
Out[10]: 14003232
In [11]: %time len(write_arrow(df[['x']]))
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 3.4 ms
Out[11]: 4000334
In [12]: %time len(pickle.dumps(df[['x']]))
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 2.85 ms
Out[12]: 4000781
In [13]: import blosc
In [14]: x = df.x.values # There is an easier way to do this
In [15]: %time len(blosc.compress(x.data, typesize=x.dtype.itemsize, cname='lz4', clevel=5))
CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 3.72 ms
Out[15]: 2020787
In [16]: %time len(pickle.dumps(df[['t']]))
CPU times: user 44 ms, sys: 0 ns, total: 44 ms
Wall time: 45.5 ms
Out[16]: 2002878
In [17]: %time len(write_arrow(df[['t']]))
CPU times: user 96 ms, sys: 0 ns, total: 96 ms
Wall time: 95 ms
Out[17]: 5000406 |
How did you install pyarrow? I believe you are using a debug build (the conda-forge artifact is a debug build, I can fix this to allow you to do a more apples-to-apples comparison) |
I may have pip installed some thing in between
|
Yes, you're on a debug build of pyarrow. I'll let you know when there's a new build available built with optimizations turned on (this build is |
Digging into a memory leak bug (ARROW-362) but will get a release build up on conda-forge after that's fixed |
PyArrow release builds are up. You ideally should to disable multithreading in blosc to make the benchmarks comparable -- we could add multithreaded writes to Arrow, too, and get similar speedups
if you change the characteristics of the string data, you get different results:
there's some performance loss on the Arrow read path right now because we aren't constructing the precise pandas BlockManager. If we wanted to hyperoptimize for the pandas 1.x memory layout this would be a nice project for someone to tackle (multithreaded blockmanager read/write) note you can also go through arrow's native IO subsystem and avoid extra pybytes interactions -- this lets you interact with memory in C/C++ with zero copy
YMMV, worth double checking my work in case I made some mistakes |
OK, there is a trivial implementation using Arrow on the Dask side here: #643 Once questions there get resolved we should have enough in place to start doing more integrative benchmarks. In my mind there are two approaches, Blosc-and-custom-code or Arrow. Each has pros and cons. The Blosc solution can be implemented today and is likely to be near optimal in speed, at least for numeric data. The main cons here are that we need to write code around the block manager etc. and we need to maintain this going forward. We're likely to miss things like new dtypes, categorical index, etc.. The good news here is that we've already done a lot of this a couple times before and can probably steal ideas from previous implementations in systems like partd. The Arrow solution has two main things going for it:
However in the short term it is missing some things like per-column or dtype-specific compression, comprehensive support for the Pandas abstraction (see issues in #643, I'd love to be wrong about this) etc.. I would like to write up and tune some performance benchmarks for dask.dataframe sometime in the next month. Serialization and communication costs do contribute significantly in shuffling so this is likely to end up on my critical path soon-ish. |
It seems reasonable to have as many options available as possible (assuming the development burden is not too much), and as long as we have reproducible benchmarks to evaluate performance that can help drive performance engineering work. The Arrow<->Pandas conversion has a number of immaturities so it's not an turnkey solution at the moment -- it is an accessible codebase, though, so I invite others to get involved and I can help review patches and where help reach consensus w/ the other Arrow devs where design changes are needed (e.g. adding compression options). |
@mrocklin wholesale pickling of the frame
pickle in a dict-of-columns, about the same
but now can compress per-column
points of note:
|
I would like to avoid pickle if possible. It uses a couple of needless memory copies. Also if we don't compress (which happens sometimes) it's nice to be able to just pass along the memoryview directly. We could still do this when operating on columns (sliced memoryiews are just views). Why would compressing columns individually be better than compressing several at once? I think that most of the fast compressors are purely local. If I put up a minimal draft of this would you have time to review/suggest tests? |
This is a big assumption that only sometimes is True. IOW, you assume that 2 integer (or 2 float or whatever) are compressable just because they happen to be the same dtype. If they represent different data then they may or may not compress well as a 2-d array (not that actually know if blosc is smart about this, if it is, then what I am saying may be False!). I think its safer/better to simply compress single columns (e.g. this is what a column-major storage system does, e.g. redshift).
yes I don't think you actually need pickle at all. (assume that you only have strings for object dtypes, FYI, you can do a
sure |
@FrancescAlted can you shed some light? |
Question for @jreback: The hard part for me on this problem is to create a function that takes a dataframe and a column and produces a numpy array + some metadata without performing a copy? The metadata would be for timestamp, cateogorical columns, etc.. I suspect that it will take me a while to make a robust version of this function that is aware of all of the Pandas corner cases. |
so I'll point you to this: https://github.com/pandas-dev/pandas/blob/master/pandas/io/packers.py#L258 (see Actually the code that exists in msgpack is pretty robust for serializing/de-serialzing to bytes (which are then sent to msgpack), but you can almost copy these (again for testing). Though they do block-by-block (can easily be changed to do column-by-column instead). As a bonus, @llllllllll has done work to make the uncompression zero-copy (if possible, as its not always possible) |
@jreback Yes, my experience is also that the compression by columns normally brings the best results in large tables. The reason for that is that the shuffle (or bitshuffle) filter normally does a much better job at putting zeros together. When you have complex dtypes, shuffle can still do a good job, but it is slower (i.e. it does not use the SIMD instructions in CPUs), and in addition, Blosc has a limit (mainly for performance reasons) for applying the shuffle filter only for dtypes < 256 bytes. |
Regarding serializing/deserializing, have you guys ever tried Google Protocol Buffers? My experience is that they are really fast, specially when used in combination with streamed gRPC. The disadvantage is that both sides need to know the schema in advance, but perhaps this schema can be made flexible enough to transmit buffers of compressed chunks with the actual dtype encoded inside. |
@FrancescAlted the protobuf serialize/deserialize steps are fairly expensive. I have had a hard time breaking through the 500 MB/s range on in-memory serialized protobufs. By comparison, I've been able to get 10x or better performance through zero-copy in-memory columnar: http://wesmckinney.com/blog/arrow-streaming-columnar/ |
Oh wow, that's pretty cool :) |
If we add ZSTD/LZ4/Snappy compression steps, we can increase throughput on the wire depending on how fast our network is. I am more than happy to help if these tools are useful |
I started looking at Pandas serialization again recently. I'm curious about the near-term expected state of Arrow in the following regards:
@wesm do you have thoughts on either of these two issues? Is mostly-comprehensive coverage of the pandas dataframe abstraction in scope? What are your thoughts on optional per-column compression? Dask tends to compress a small sample of each frame to determine if it should or should not compress the entire dataset. |
Yes, comprehensive pandas coverage is definitely in scope. If folks here can't help with the development, you can surely help with requirements gathering and creating JIRAs. We can attach column metadata to indicate that a particular field is the index (we should generate some unique identifier to give the index "column" a unique name) On per-column compression: all-or-nothing would be simpler (on a per-batch basis), but if per-column has a lot of benefits, then we could discuss adding the appropriate metadata to support that. Implementing it for Python/C++ isn't especially difficult. |
I recently added a small set of tests here that I intend to increase to try to eventually define "comprehensive". We did this for NumPy and it was fairly effective at flushing out issues (or being a repository for new arrays as issues arose). I would be very happy to extend that or find a way to crowdsource it. For compression there are two topics:
For NumPy communication (which we've spent a bit more time optimizing) I've found that testing the effectiveness of compression on a small sample before compressing the whole has been useful. Short term Dask could also do this itself after Arrow passes data off. Given that we don't always want to compress I assume that per-column compression would be useful, but I don't have any concrete experience here. Currently we just pickle a dataframe, do the optional compression thing, and ship it down a wire, so it isn't hard to beat the current state. |
I think it would make sense to create something like: with we could even have optional dep on pyarrow (and handle fallbacks and such). |
@jreback is there a Pandas issue for this? This is the sort of thing that I would love to have in Pandas rather than in Dask. Also relevant links:
import io
import pandas as pd
import pyarrow as pa
df = pd.DataFrame({"a": [1, 2]})
batch = pa.RecordBatch.from_pandas(df)
sink = io.BytesIO()
writer = pa.StreamWriter(sink, batch.schema)
writer.write_batch(batch)
writer.close()
bytebuffer = sink.getvalue()
reader = pa.StreamReader(pa.BufferReader(bytebuffer))
batch_read = reader.get_next_batch()
df_read = batch_read.to_pandas() |
On in-memory data you will get slightly better performance (and no GIL issues -- with
|
I'm happy to add some convenience functions to Arrow. Opening some JIRAs now. Patches welcome also |
To be clear there are three opportunities for GIL issues:
Shuffle-like computations on numpy arrays are currently much faster than they are on Pandas dataframes, so there is definitely a fair bit of performance to squeeze out here. |
@mrocklin and of course happy to add a convenience function directly in pandas, now that |
@jreback I would advise adding such convenience functions in Arrow so we don't have to set up integration tests inside pandas just yet |
happy either way :> though I wouldn't find this a big burden on the current pandas code (nor testing). Literally this would be a direct wrapper to call a single arrow function. |
Yeah, totally. When ARROW-596/597 are done (let's do this month), they'll go out in the 0.3 release (this month). Probably will update conda artifacts before then |
I decided to pick this up again yesterday; sorry about the new JIRAs Wes :). My work is at master...TomAugspurger:arrow-serialization Heres a notebook with some benchmarks at http://nbviewer.jupyter.org/gist/TomAugspurger/ab01751275b8f5262dabc7fd07a0f19f/serialization.ipynb (scroll down for the plot) Arrow is pretty similar to pickle, other than An important is that pickle can use pandas' RangeIndex to avoid serializing the entire index if possible. With a All the tests pass, other than the I think using Arrow for serialization would be good to add as an option, and probably even the default in a future release.. |
No need to apologize for new issues, only way to make the software better! We should try to get an ASV setup rolling so that we can have microbenchmarks for all these cases to put some targets on the wall to optimize, then we can make sure that we don't have any perf regressions going forward. Maybe we can add these ASV benchmarks to the nightly runs on the pandas box at some point. I suspect that the perf difference in float64 data largely has to do with the null bitmap construction -- pickle writes the internal blocks unmodified. This can be partially mitigated through parallel processing when you have many columns (https://issues.apache.org/jira/browse/ARROW-1594) Not sure if you're using pickle with NumPy arrays anywhere, but if so I recommend you also take a look at using
Might seem crazy to beat pickle by 4x with an ndarray, but avoiding extra copies will do that |
I'm happy to do that. I'll also update the benchmarks to have versions with and without nulls. I believe the numpy code is also using pickle. I'll benchmark that as well. |
For NumPy we're actually aiming for zero-copy. We serialize the metadata and then just pass down the You might be interested in looking at our test suite for numpy arrays here: https://github.com/dask/distributed/blob/master/distributed/protocol/tests/test_numpy.py#L30-L65 A few corner cases have bit us in the past that are now recorded there. They might be of general value. |
The numpy code is not using pickle. |
Ah, good to know (I just looked at the fallback for object types). |
@mrocklin cool, that's helpful, thanks. I opened https://issues.apache.org/jira/browse/ARROW-1596 so that we can expand the test suite based on this Internally |
Adding a couple of thoughts here: First if Pandas objects support pickle protocol 5, which it sounds like they may already do ( pandas-dev/pandas#34244 ), and Dask supports pickle protocol 5, which PR ( #3784 ) implements, then it should be possible to get efficient serialization of Pandas objects just using pickle protocol 5. Second if Pandas objects simply consist of other objects that are already serializable with Dask (like NumPy arrays), then it should be possible to traverse objects and serialize everything we can. In fact this is how we added SciPy sparse matrix serialization recently. cc @TomAugspurger (for awareness) |
FYI this is now in Distributed 2.17.0+. Would be interesting to see what sort of mileage people get out of this 🙂 |
As of Dask + Distributed 2.21.0, we now support pickle protocol 5 on older versions of Python with the In [1]: import numpy
...: import pandas
...:
...: from distributed.protocol import serialize, deserialize
In [2]: df = pandas.DataFrame({
...: "a": [1, 2, 3],
...: "b": [0.0, 0.7, 1.3]
...: })
...: df
Out[2]:
a b
0 1 0.0
1 2 0.7
2 3 1.3
In [3]: header, frames = serialize(df)
In [4]: list(map(numpy.asarray, frames[1:]))
Out[4]: [array([[0. , 0.7, 1.3]]), array([[1, 2, 3]])]
In [5]: df2 = deserialize(header, frames); df2
Out[5]:
a b
0 1 0.0
1 2 0.7
2 3 1.3 |
Closing as this seems to be resolved with pickle protocol 5, but please feel free to reopen if I've missed something. |
We need an efficient way to serialize Pandas Dataframes. As of #606 we can now customize this beyond just pickle. There are a number of different data regimes here including pure numeric data, very compressible numeric data like time series, text data with repeats, categoricals, long text data etc.. We care both about fast encoding and about fast compression for larger results. We want something that has minimal overhead on small dataframes (important for shuffling).
Several options come to mind:
@jreback @wesm @shoyer
It would be good to make this decision with a benchmark in hand. It would be good both to get people's opinions on solutions we should consider as well as some benchmarks that are representative of data that they care about.
The text was updated successfully, but these errors were encountered: