-
Notifications
You must be signed in to change notification settings - Fork 912
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[REVIEW] Expose the HOST_BUFFER parquet sink to python #5094
Conversation
Expose the HOST_BUFFER sink type to python for parquet files. (rapidsai#5061 (comment))
Please update the changelog in order to start CI tests. View the gpuCI docs here. |
Codecov Report
@@ Coverage Diff @@
## branch-0.15 #5094 +/- ##
==============================================
Coverage ? 88.44%
==============================================
Files ? 54
Lines ? 10267
Branches ? 0
==============================================
Hits ? 9081
Misses ? 1186
Partials ? 0 Continue to review full report at Codecov.
|
@@ -13,3 +14,5 @@ | |||
read_parquet_metadata, | |||
write_to_dataset, | |||
) | |||
|
|||
HostBuffer = cudf._lib.io.utils.HostBuffer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we just do from cudf._lib.io.utils import HostBuffer
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could go from cudf._lib.io.utils import HostBuffer # noqa
, but think we'd need the noqa tag otherwise flake8 will complain about unused imports. (I was trying to be consistent here with #4870 (comment) =)
def test_parquet_writer_host_buffer(tmpdir, simple_gdf): | ||
buffer = cudf.io.HostBuffer() | ||
simple_gdf.to_parquet(buffer) | ||
|
||
assert_eq(cudf.read_parquet(buffer), simple_gdf) | ||
|
||
gdf_fname = tmpdir.join("gdf.parquet") | ||
with open(gdf_fname, "wb") as o: | ||
o.write(buffer.read()) | ||
assert_eq(cudf.read_parquet(gdf_fname), simple_gdf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we also add a test in reserving the size of the HostBuffer()
up front just to make sure that doesn't break things? :)
# Write HostBuffer to disk | ||
shutil.copyfileobj(buffer, open("output.parquet", "wb")) | ||
""" | ||
cdef vector[char] buf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a note: RMM will have the ability to allocate pinned host memory in the future which will be faster for moving device <--> host as well as allow things to remain asynchronous.
if initial_capacity: | ||
self.buf.reserve(initial_capacity) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need to check it here, passing 0 should have no affect:
if initial_capacity: | |
self.buf.reserve(initial_capacity) | |
self.buf.reserve(initial_capacity) |
|
||
def read(self, int n=-1): | ||
if self.pos >= self.buf.size(): | ||
return b"" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a little bit awkward that this returns an empty bytes object while below returns a MemoryView. Can we return an empty memoryview?
@@ -67,3 +72,61 @@ cdef cppclass iobase_data_sink(data_sink): | |||
|
|||
size_t bytes_written() with gil: | |||
return buf.tell() | |||
|
|||
|
|||
cdef class HostBuffer: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to consider exposing buffer protocol for this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use this name here? Thought this was going to be reserved for the pinned memory allocation made by RMM (once that arrives). ( rapidsai/rmm#260 )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah good call, yes we shouldn't use HostBuffer
here.
self.pos += count | ||
|
||
return PyMemoryView_FromMemory(self.buf.data() + start, count, | ||
PyBUF_READ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems better to support the buffer protocol on this class instead. Otherwise this could refer to an invalid memory segment that Python has already garbage collected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
We have code already that adapts a c++ byte vector to the buffer protocol here:
cudf/python/cudf/cudf/_lib/parquet.pyx
Lines 49 to 86 in 85ce695
cdef class BufferArrayFromVector: | |
cdef Py_ssize_t length | |
cdef unique_ptr[vector[uint8_t]] in_vec | |
# these two things declare part of the buffer interface | |
cdef Py_ssize_t shape[1] | |
cdef Py_ssize_t strides[1] | |
@staticmethod | |
cdef BufferArrayFromVector from_unique_ptr( | |
unique_ptr[vector[uint8_t]] in_vec | |
): | |
cdef BufferArrayFromVector buf = BufferArrayFromVector() | |
buf.in_vec = move(in_vec) | |
buf.length = dereference(buf.in_vec).size() | |
return buf | |
def __getbuffer__(self, Py_buffer *buffer, int flags): | |
cdef Py_ssize_t itemsize = sizeof(uint8_t) | |
self.shape[0] = self.length | |
self.strides[0] = 1 | |
buffer.buf = dereference(self.in_vec).data() | |
buffer.format = NULL # byte | |
buffer.internal = NULL | |
buffer.itemsize = itemsize | |
buffer.len = self.length * itemsize # product(shape) * itemsize | |
buffer.ndim = 1 | |
buffer.obj = self | |
buffer.readonly = 0 | |
buffer.shape = self.shape | |
buffer.strides = self.strides | |
buffer.suboffsets = NULL | |
def __releasebuffer__(self, Py_buffer *buffer): | |
pass |
Should we use that class instead of creating a new one? The only change really is between vector<char>
expected by the HOST_BUFFER sink and the vector<uint_8>
in this class (and I think we could either cast the cpp vector, or use cython fused types to get around).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we actually have 2 instances of classes like this one where ideally we could standardize them.
No updates in a while, retargeting to 0.15. |
@benfred I'm closing this for now since it's gone stale, feel free to reopen if / when you're looking to work on it further. |
Expose the HOST_BUFFER sink type to python for parquet files.
(As discussed here: #5061 (comment) )