Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Allow writing parquet to python file like objects #5061

Merged
merged 7 commits into from
May 4, 2020

Conversation

benfred
Copy link
Member

@benfred benfred commented Apr 30, 2020

Add code that allows writing parquet files to python file-like objects. This
lets you write cudf dataframes as parquet to BytesIO or SocketIO objects,
without having to write out to the disk first.

Add code that allows writing parquet files to python file like objects. This
lets you write cudf dataframes as parquet to BytesIO or SocketIO objects,
without having to write out to the disk first.
@benfred benfred requested a review from a team as a code owner April 30, 2020 19:34
@GPUtester
Copy link
Collaborator

Please update the changelog in order to start CI tests.

View the gpuCI docs here.

@codecov
Copy link

codecov bot commented May 1, 2020

Codecov Report

Merging #5061 into branch-0.14 will decrease coverage by 0.04%.
The diff coverage is n/a.

Impacted file tree graph

@@               Coverage Diff               @@
##           branch-0.14    #5061      +/-   ##
===============================================
- Coverage        88.50%   88.45%   -0.05%     
===============================================
  Files               54       54              
  Lines            10270    10270              
===============================================
- Hits              9089     9084       -5     
- Misses            1181     1186       +5     
Impacted Files Coverage Δ
python/cudf/cudf/core/buffer.py 82.92% <0.00%> (-3.66%) ⬇️
python/cudf/cudf/core/column/column.py 86.27% <0.00%> (-0.27%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 1046eed...51e9ea1. Read the comment docs.

@benfred
Copy link
Member Author

benfred commented May 1, 2020

We've noticed that the reading/writing parquet files is really fast with cudf, and this change lets spill to host memory using parquet, without hitting disk by writing to a python BytesIO object. This also lets you right directly to the network using SocketIO etc.

On a small test this conversion is almost twice as fast as converting to/from pandas:

In [4]: def convert_to_from_pandas(gdf): 
   ...:     pdf = gdf.to_pandas() 
   ...:     cudf.from_pandas(pdf) 
   ...:                                                                                                       

In [5]: def convert_to_from_parquet(gdf): 
   ...:     buffer = io.BytesIO() 
   ...:     gdf.to_parquet(buffer) 
   ...:     cudf.read_parquet(buffer) 
   ...:                                                                                                       

In [6]: time convert_to_from_pandas(gdf)                                                                      
CPU times: user 2.04 s, sys: 572 ms, total: 2.62 s
Wall time: 2.62 s

In [7]: time convert_to_from_parquet(gdf)                                                                     
CPU times: user 753 ms, sys: 600 ms, total: 1.35 s
Wall time: 1.35 s

@benfred benfred changed the title [WIP] Allow writing parquet to python file like objects [REVIEW] Allow writing parquet to python file like objects May 1, 2020
Copy link
Collaborator

@kkraus14 kkraus14 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have concerns about tightly coupling this to io.IOBase here. This makes a write first generate a host buffer from the C++ side and then write it into the io.IOBase by copy.

From my perspective it would be better to generate specifically an io.BytesIO object with the size up front, and then write directly into the buffer underneath it via the getbuffer() API.

python/cudf/cudf/_lib/io/utils.pxd Show resolved Hide resolved
python/cudf/cudf/_lib/io/utils.pyx Outdated Show resolved Hide resolved
Copy link
Member Author

@benfred benfred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review!

It’s a cool idea with having the buffer pre-allocated and the writer directly write to the memory there without any intermediate copies. This would remove a memcpy .

There are some problems with this though, and I’m not sure it will work with a BytesIO object. The big problem is that you can’t pre-allocate buffer space on a BytesIO, the best we could do is create with an ‘initial_bytes’ bytes array afaict (which BytesIO will then copy, negating any benefit from doing this - also allocating this byte array just to copy it means doubling the allocations). It’s also problematic figuring out how many bytes we need to allocate before hand, especially given compression options in the parquetwriter.

We might be better off using the ‘HOST_BUFFER’ sink type, which writes to a std::vector<char>. We could expose this as an opaque type to Python without too much effort. I can add this if you and Olivier think it's the way to go.

I think there are some benefits to this IOBase implementation that outweighs the downsides of an extra memcpy though: this more closely matches the to_parquet implementation in pandas which supports writing to any file-like object, and this lets you do things like write directly to a python SocketIO or File. Like this works (and is basically the same speed as passing the filename instead of the file object):

with open(“output.parquet”, “wb”) as o:
    gdf.to_parquet(o)

python/cudf/cudf/_lib/io/utils.pyx Outdated Show resolved Hide resolved
@kkraus14
Copy link
Collaborator

kkraus14 commented May 1, 2020

We might be better off using the ‘HOST_BUFFER’ sink type, which writes to a std::vector<char>. We could expose this as an opaque type to Python without too much effort. I can add this if you and Olivier think it's the way to go.

I think this would likely be the highest performance and sets us up to do things more asynchronously moving forward. Additionally, we could likely wrap the returned buffer in a class that inherits from IOBase zero-copy to still make it a file-like object. What would be interesting about this approach as well is it gives us more fine grained control over synchronization to only synchronize if we're trying to use non-CUDA aware methods against it that don't respect stream ordering.

I think there are some benefits to this IOBase implementation that outweighs the downsides of an extra memcpy though: this more closely matches the to_parquet implementation in pandas which supports writing to any file-like object, and this lets you do things like write directly to a python SocketIO or File.

Agreed. We should keep this to allow people to write to an arbitrary file-like object that they provide.

@kkraus14 kkraus14 added 0 - Waiting on Author Waiting for author to respond to review Python Affects Python cuDF API. labels May 1, 2020
@kkraus14 kkraus14 added 3 - Ready for Review Ready for review by team 4 - Needs cuDF (Python) Reviewer and removed 0 - Waiting on Author Waiting for author to respond to review labels May 1, 2020
@kkraus14 kkraus14 added 5 - Ready to Merge Testing and reviews complete, ready to merge and removed 3 - Ready for Review Ready for review by team 4 - Needs cuDF (Python) Reviewer labels May 1, 2020
@rjzamora
Copy link
Member

rjzamora commented May 3, 2020

This is great @benfred - Thank you for all the work here!

@kkraus14 @madsbk - Note that the motivation for both this and #4870 (both by @benfred) is larger-than-memory shuffling on a single GPU. I can imagine these features being used in combination with Mads’ explicit comms to dramatically reduce the size of the task graph needed for shuffling in dask_cudf (discussion here for those unfamiliar with the problem). The algorithm is not likely to get upstream-dask/distributed support (since it is explicit-comms based, and resiliency would be a problem), but it could be useful in some cases. My idea goes something like this:

  • The primary API would take in a dask_cudf.DataFrame (call it a) with npartitions partitions
  • Since explicit communication requires that the input ddf be persisted in memory, we would “iterate” through a in chunks that can fit comfortably in global worker device memory. For example, each iteration might correspond to n partitions each. For each iteration i, we simply define a new ddf (b_i), comprising a subset of n partitions.
  • For iteration i, we perform an explicit shuffle (call it _shuffle_to_pqcache) on b_i (corresponding to a single “super” task). The _shuffle_to_pqcache logic will split each partition into npartitions shards, and send each shard to the worker responsible for that shard (use simple modulus mapping). After exchanging shards, each worker can create or append to a separate cached parquet file (in host memory) for each output partition that worker is responsible for. The actual output of the _shuffle_to_pqcache task can be a simple boolean confirmation that the exchange and caching is done.
  • After all iterations of _shuffle_to_pqcache are completed, we would need some kind of follow-up task(s) (called something like _worker_pqcache_to_ddf) to convert the in-memory parquet files, on each worker, into the new output partitions. I have not thought through this particular step much yet, so I apologize if I am failing to think of an obvious roadblock here.

@madsbk - Since you are most familiar with the nuts and bolts of explicit communication, do you think this general approach has any promise?

@benfred
Copy link
Member Author

benfred commented May 4, 2020

I think this would likely be the highest performance and sets us up to do things more asynchronously moving forward. Additionally, we could likely wrap the returned buffer in a class that inherits from IOBase zero-copy to still make it a file-like object. What would be interesting about this approach as well is it gives us more fine grained control over synchronization to only synchronize if we're trying to use non-CUDA aware methods against it that don't respect stream ordering.

That sounds like a good plan to me - I'll submit a new PR for this later today

@kkraus14 kkraus14 merged commit 85ce695 into rapidsai:branch-0.14 May 4, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
5 - Ready to Merge Testing and reviews complete, ready to merge Python Affects Python cuDF API.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants