Skip to content

Commit

Permalink
Merge branch 'branch-24.10' into parquet-nrows
Browse files Browse the repository at this point in the history
  • Loading branch information
lithomas1 authored Aug 1, 2024
2 parents 6447f12 + 211dbe4 commit 9ea93ba
Show file tree
Hide file tree
Showing 38 changed files with 1,049 additions and 122 deletions.
3 changes: 0 additions & 3 deletions ci/run_cudf_memcheck_ctests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ export LIBCUDF_MEMCHECK_ENABLED=1
for gt in ./*_TEST ; do
test_name=$(basename ${gt})
# Run gtests with compute-sanitizer
if [[ "$test_name" == "ERROR_TEST" ]] || [[ "$test_name" == "STREAM_IDENTIFICATION_TEST" ]]; then
continue
fi
echo "Running compute-sanitizer on $test_name"
compute-sanitizer --tool memcheck ${gt} "$@"
done
Expand Down
1 change: 1 addition & 0 deletions conda/environments/all_cuda-118_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies:
- dlpack>=0.8,<1.0
- doxygen=1.9.1
- fastavro>=0.22.9
- flatbuffers==24.3.25
- fmt>=10.1.1,<11
- fsspec>=0.6.0
- gcc_linux-64=11.*
Expand Down
1 change: 1 addition & 0 deletions conda/environments/all_cuda-125_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies:
- dlpack>=0.8,<1.0
- doxygen=1.9.1
- fastavro>=0.22.9
- flatbuffers==24.3.25
- fmt>=10.1.1,<11
- fsspec>=0.6.0
- gcc_linux-64=11.*
Expand Down
3 changes: 3 additions & 0 deletions conda/recipes/libcudf/conda_build_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ librdkafka_version:
fmt_version:
- ">=10.1.1,<11"

flatbuffers_version:
- "=24.3.25"

spdlog_version:
- ">=1.12.0,<1.13"

Expand Down
1 change: 1 addition & 0 deletions conda/recipes/libcudf/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ requirements:
- dlpack {{ dlpack_version }}
- librdkafka {{ librdkafka_version }}
- fmt {{ fmt_version }}
- flatbuffers {{ flatbuffers_version }}
- spdlog {{ spdlog_version }}
- zlib {{ zlib_version }}

Expand Down
5 changes: 4 additions & 1 deletion cpp/src/column/column_view.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ void prefetch_col_data(ColumnView& col, void const* data_ptr, std::string_view k
key, data_ptr, col.size() * size_of(col.type()), cudf::get_default_stream());
} else if (col.type().id() == type_id::STRING) {
strings_column_view scv{col};

if (data_ptr == nullptr) {
// Do not call chars_size if the data_ptr is nullptr.
return;
}
cudf::experimental::prefetch::detail::prefetch_noexcept(
key,
data_ptr,
Expand Down
26 changes: 12 additions & 14 deletions cpp/src/io/parquet/compact_protocol_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@ class parquet_field_bool : public parquet_field {
struct parquet_field_bool_list : public parquet_field_list<bool, FieldType::BOOLEAN_TRUE> {
parquet_field_bool_list(int f, std::vector<bool>& v) : parquet_field_list(f, v)
{
auto const read_value = [this](uint32_t i, CompactProtocolReader* cpr) {
auto const read_value = [&val = v](uint32_t i, CompactProtocolReader* cpr) {
auto const current_byte = cpr->getb();
assert_bool_field_type(current_byte);
this->val[i] = current_byte == static_cast<int>(FieldType::BOOLEAN_TRUE);
val[i] = current_byte == static_cast<int>(FieldType::BOOLEAN_TRUE);
};
bind_read_func(read_value);
}
Expand Down Expand Up @@ -188,8 +188,8 @@ template <typename T, FieldType EXPECTED_TYPE>
struct parquet_field_int_list : public parquet_field_list<T, EXPECTED_TYPE> {
parquet_field_int_list(int f, std::vector<T>& v) : parquet_field_list<T, EXPECTED_TYPE>(f, v)
{
auto const read_value = [this](uint32_t i, CompactProtocolReader* cpr) {
this->val[i] = cpr->get_zigzag<T>();
auto const read_value = [&val = v](uint32_t i, CompactProtocolReader* cpr) {
val[i] = cpr->get_zigzag<T>();
};
this->bind_read_func(read_value);
}
Expand Down Expand Up @@ -229,11 +229,11 @@ class parquet_field_string : public parquet_field {
struct parquet_field_string_list : public parquet_field_list<std::string, FieldType::BINARY> {
parquet_field_string_list(int f, std::vector<std::string>& v) : parquet_field_list(f, v)
{
auto const read_value = [this](uint32_t i, CompactProtocolReader* cpr) {
auto const read_value = [&val = v](uint32_t i, CompactProtocolReader* cpr) {
auto const l = cpr->get_u32();
CUDF_EXPECTS(l < static_cast<size_t>(cpr->m_end - cpr->m_cur), "string length mismatch");

this->val[i].assign(reinterpret_cast<char const*>(cpr->m_cur), l);
val[i].assign(reinterpret_cast<char const*>(cpr->m_cur), l);
cpr->m_cur += l;
};
bind_read_func(read_value);
Expand Down Expand Up @@ -269,8 +269,8 @@ struct parquet_field_enum_list : public parquet_field_list<Enum, FieldType::I32>
parquet_field_enum_list(int f, std::vector<Enum>& v)
: parquet_field_list<Enum, FieldType::I32>(f, v)
{
auto const read_value = [this](uint32_t i, CompactProtocolReader* cpr) {
this->val[i] = static_cast<Enum>(cpr->get_i32());
auto const read_value = [&val = v](uint32_t i, CompactProtocolReader* cpr) {
val[i] = static_cast<Enum>(cpr->get_i32());
};
this->bind_read_func(read_value);
}
Expand Down Expand Up @@ -354,8 +354,8 @@ struct parquet_field_struct_list : public parquet_field_list<T, FieldType::STRUC
parquet_field_struct_list(int f, std::vector<T>& v)
: parquet_field_list<T, FieldType::STRUCT>(f, v)
{
auto const read_value = [this](uint32_t i, CompactProtocolReader* cpr) {
cpr->read(&this->val[i]);
auto const read_value = [&val = v](uint32_t i, CompactProtocolReader* cpr) {
cpr->read(&val[i]);
};
this->bind_read_func(read_value);
}
Expand Down Expand Up @@ -395,7 +395,7 @@ struct parquet_field_binary_list
: public parquet_field_list<std::vector<uint8_t>, FieldType::BINARY> {
parquet_field_binary_list(int f, std::vector<std::vector<uint8_t>>& v) : parquet_field_list(f, v)
{
auto const read_value = [this](uint32_t i, CompactProtocolReader* cpr) {
auto const read_value = [&val = v](uint32_t i, CompactProtocolReader* cpr) {
auto const l = cpr->get_u32();
CUDF_EXPECTS(l <= static_cast<size_t>(cpr->m_end - cpr->m_cur), "binary length mismatch");

Expand Down Expand Up @@ -482,9 +482,7 @@ void CompactProtocolReader::skip_struct_field(int t, int depth)
skip_struct_field(t, depth + 1);
}
break;
default:
// printf("unsupported skip for type %d\n", t);
break;
default: break;
}
}

Expand Down
14 changes: 14 additions & 0 deletions cpp/src/utilities/prefetch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,20 @@ cudaError_t prefetch_noexcept(std::string_view key,
rmm::cuda_stream_view stream,
rmm::cuda_device_id device_id) noexcept
{
// Don't try to prefetch nullptrs or empty data. Sometimes libcudf has column
// views that use nullptrs with a nonzero size as an optimization.
if (ptr == nullptr) {
if (prefetch_config::instance().debug) {
std::cerr << "Skipping prefetch of nullptr" << std::endl;
}
return cudaSuccess;
}
if (size == 0) {
if (prefetch_config::instance().debug) {
std::cerr << "Skipping prefetch of size 0" << std::endl;
}
return cudaSuccess;
}
if (prefetch_config::instance().get(key)) {
if (prefetch_config::instance().debug) {
std::cerr << "Prefetching " << size << " bytes for key " << key << " at location " << ptr
Expand Down
4 changes: 4 additions & 0 deletions cpp/tests/error/error_handling_test.cu
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ CUDF_KERNEL void test_kernel(int* data) { data[threadIdx.x] = threadIdx.x; }
// calls.
TEST(StreamCheck, FailedKernel)
{
if (getenv("LIBCUDF_MEMCHECK_ENABLED")) { GTEST_SKIP(); }

rmm::cuda_stream stream;
int a;
test_kernel<<<0, 0, 0, stream.value()>>>(&a);
Expand All @@ -61,6 +63,8 @@ TEST(StreamCheck, FailedKernel)

TEST(StreamCheck, CatchFailedKernel)
{
if (getenv("LIBCUDF_MEMCHECK_ENABLED")) { GTEST_SKIP(); }

rmm::cuda_stream stream;
int a;
test_kernel<<<0, 0, 0, stream.value()>>>(&a);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ void test_cudaLaunchKernel()
} catch (std::runtime_error&) {
return;
}
if (getenv("LIBCUDF_MEMCHECK_ENABLED")) { return; }
throw std::runtime_error("No exception raised for kernel on default stream!");
}

Expand Down
3 changes: 2 additions & 1 deletion dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ dependencies:
- fmt>=10.1.1,<11
- librmm==24.10.*,>=0.0.0a0
- libkvikio==24.10.*,>=0.0.0a0
- flatbuffers==24.3.25
- librdkafka>=1.9.0,<1.10.0a0
# Align nvcomp version with rapids-cmake
- nvcomp==3.0.6
Expand Down Expand Up @@ -630,7 +631,7 @@ dependencies:
common:
- output_types: [conda, requirements, pyproject]
packages:
- polars>=1.0
- polars>=1.0,<1.3
run_dask_cudf:
common:
- output_types: [conda, requirements, pyproject]
Expand Down
17 changes: 12 additions & 5 deletions docs/cudf/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,16 +556,23 @@ def on_missing_reference(app, env, node, contnode):
("py:class", "Dtype"),
# The following are erroneously warned due to
# https://github.com/sphinx-doc/sphinx/issues/11225
("py:obj", "cudf.DatetimeIndex.time"),
("py:obj", "cudf.DatetimeIndex.date"),
("py:obj", "cudf.Index.values_host"),
("py:class", "pa.Array"),
("py:class", "ScalarLike"),
("py:class", "ParentType"),
("py:class", "ColumnLike"),
("py:class", "ColumnLike"),
("py:obj", "cudf.Index.transpose"),
("py:obj", "cudf.Index.T"),
("py:obj", "cudf.Index.to_flat_index"),
("py:obj", "cudf.MultiIndex.to_flat_index"),
("py:meth", "pyarrow.Table.to_pandas"),
("py:class", "pa.Array"),
("py:class", "ScalarLike"),
("py:class", "ParentType"),
("py:class", "pyarrow.lib.DataType"),
("py:class", "pyarrow.lib.Table"),
("py:class", "pyarrow.lib.Scalar"),
("py:class", "pyarrow.lib.ChunkedArray"),
("py:class", "pyarrow.lib.Array"),
("py:class", "ColumnLike"),
# TODO: Remove this when we figure out why typing_extensions doesn't seem
# to map types correctly for intersphinx
("py:class", "typing_extensions.Self"),
Expand Down
16 changes: 16 additions & 0 deletions docs/cudf/source/cudf_pandas/how-it-works.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,19 @@ transfers.
When using `cudf.pandas`, cuDF's [pandas compatibility
mode](api.options) is automatically enabled, ensuring consistency with
pandas-specific semantics like default sort ordering.

`cudf.pandas` uses a managed memory pool by default. This allows `cudf.pandas` to process datasets larger than the memory of the GPU it is running on. Managed memory prefetching is also enabled by default to improve memory access performance. For more information on CUDA Unified Memory (managed memory), performance, and prefetching, see [this NVIDIA Developer blog post](https://developer.nvidia.com/blog/improving-gpu-memory-oversubscription-performance/).

Pool allocators improve allocation performance. Without using one, memory
allocation may be a bottleneck depending on the workload. Managed memory
enables oversubscribing GPU memory. This allows cudf.pandas to process
data larger than GPU memory in many cases, without CPU (Pandas) fallback.

Other memory allocators can be used by changing the environment
variable `CUDF_PANDAS_RMM_MODE` to one of the following.

1. "managed_pool" (default): CUDA Unified Memory (managed memory) with RMM's asynchronous pool allocator.
2. "managed": CUDA Unified Memory, (managed memory) with no pool allocator.
3. "async": CUDA's built-in pool asynchronous pool allocator with normal CUDA device memory.
4. "pool": RMM's asynchronous pool allocator with normal CUDA device memory.
5. "cuda": normal CUDA device memory with no pool allocator.
52 changes: 35 additions & 17 deletions java/src/main/java/ai/rapids/cudf/ColumnView.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -5034,8 +5034,8 @@ private static NestedColumnVector createNestedColumnVector(DType type, long rows
// DATA MOVEMENT
/////////////////////////////////////////////////////////////////////////////

private static HostColumnVectorCore copyToHostNestedHelper(
ColumnView deviceCvPointer, HostMemoryAllocator hostMemoryAllocator) {
private static HostColumnVectorCore copyToHostAsyncNestedHelper(
Cuda.Stream stream, ColumnView deviceCvPointer, HostMemoryAllocator hostMemoryAllocator) {
if (deviceCvPointer == null) {
return null;
}
Expand All @@ -5056,20 +5056,20 @@ private static HostColumnVectorCore copyToHostNestedHelper(
currValidity = deviceCvPointer.getValid();
if (currData != null) {
hostData = hostMemoryAllocator.allocate(currData.length);
hostData.copyFromDeviceBuffer(currData);
hostData.copyFromDeviceBufferAsync(currData, stream);
}
if (currValidity != null) {
hostValid = hostMemoryAllocator.allocate(currValidity.length);
hostValid.copyFromDeviceBuffer(currValidity);
hostValid.copyFromDeviceBufferAsync(currValidity, stream);
}
if (currOffsets != null) {
hostOffsets = hostMemoryAllocator.allocate(currOffsets.length);
hostOffsets.copyFromDeviceBuffer(currOffsets);
hostOffsets.copyFromDeviceBufferAsync(currOffsets, stream);
}
int numChildren = deviceCvPointer.getNumChildren();
for (int i = 0; i < numChildren; i++) {
try(ColumnView childDevPtr = deviceCvPointer.getChildColumnView(i)) {
children.add(copyToHostNestedHelper(childDevPtr, hostMemoryAllocator));
children.add(copyToHostAsyncNestedHelper(stream, childDevPtr, hostMemoryAllocator));
}
}
currNullCount = deviceCvPointer.getNullCount();
Expand Down Expand Up @@ -5103,11 +5103,20 @@ private static HostColumnVectorCore copyToHostNestedHelper(
}
}

/** Copy the data to the host synchronously. */
public HostColumnVector copyToHost(HostMemoryAllocator hostMemoryAllocator) {
HostColumnVector result = copyToHostAsync(Cuda.DEFAULT_STREAM, hostMemoryAllocator);
Cuda.DEFAULT_STREAM.sync();
return result;
}

/**
* Copy the data to the host.
* Copy the data to the host asynchronously. The caller MUST synchronize on the stream
* before examining the result.
*/
public HostColumnVector copyToHost(HostMemoryAllocator hostMemoryAllocator) {
try (NvtxRange toHost = new NvtxRange("ensureOnHost", NvtxColor.BLUE)) {
public HostColumnVector copyToHostAsync(Cuda.Stream stream,
HostMemoryAllocator hostMemoryAllocator) {
try (NvtxRange toHost = new NvtxRange("toHostAsync", NvtxColor.BLUE)) {
HostMemoryBuffer hostDataBuffer = null;
HostMemoryBuffer hostValidityBuffer = null;
HostMemoryBuffer hostOffsetsBuffer = null;
Expand All @@ -5127,16 +5136,16 @@ public HostColumnVector copyToHost(HostMemoryAllocator hostMemoryAllocator) {
if (!type.isNestedType()) {
if (valid != null) {
hostValidityBuffer = hostMemoryAllocator.allocate(valid.getLength());
hostValidityBuffer.copyFromDeviceBuffer(valid);
hostValidityBuffer.copyFromDeviceBufferAsync(valid, stream);
}
if (offsets != null) {
hostOffsetsBuffer = hostMemoryAllocator.allocate(offsets.length);
hostOffsetsBuffer.copyFromDeviceBuffer(offsets);
hostOffsetsBuffer.copyFromDeviceBufferAsync(offsets, stream);
}
// If a strings column is all null values there is no data buffer allocated
if (data != null) {
hostDataBuffer = hostMemoryAllocator.allocate(data.length);
hostDataBuffer.copyFromDeviceBuffer(data);
hostDataBuffer.copyFromDeviceBufferAsync(data, stream);
}
HostColumnVector ret = new HostColumnVector(type, rows, Optional.of(nullCount),
hostDataBuffer, hostValidityBuffer, hostOffsetsBuffer);
Expand All @@ -5145,21 +5154,21 @@ public HostColumnVector copyToHost(HostMemoryAllocator hostMemoryAllocator) {
} else {
if (data != null) {
hostDataBuffer = hostMemoryAllocator.allocate(data.length);
hostDataBuffer.copyFromDeviceBuffer(data);
hostDataBuffer.copyFromDeviceBufferAsync(data, stream);
}

if (valid != null) {
hostValidityBuffer = hostMemoryAllocator.allocate(valid.getLength());
hostValidityBuffer.copyFromDeviceBuffer(valid);
hostValidityBuffer.copyFromDeviceBufferAsync(valid, stream);
}
if (offsets != null) {
hostOffsetsBuffer = hostMemoryAllocator.allocate(offsets.getLength());
hostOffsetsBuffer.copyFromDeviceBuffer(offsets);
hostOffsetsBuffer.copyFromDeviceBufferAsync(offsets, stream);
}
List<HostColumnVectorCore> children = new ArrayList<>();
for (int i = 0; i < getNumChildren(); i++) {
try (ColumnView childDevPtr = getChildColumnView(i)) {
children.add(copyToHostNestedHelper(childDevPtr, hostMemoryAllocator));
children.add(copyToHostAsyncNestedHelper(stream, childDevPtr, hostMemoryAllocator));
}
}
HostColumnVector ret = new HostColumnVector(type, rows, Optional.of(nullCount),
Expand Down Expand Up @@ -5192,10 +5201,19 @@ public HostColumnVector copyToHost(HostMemoryAllocator hostMemoryAllocator) {
}
}

/** Copy the data to host memory synchronously */
public HostColumnVector copyToHost() {
return copyToHost(DefaultHostMemoryAllocator.get());
}

/**
* Copy the data to the host asynchronously. The caller MUST synchronize on the stream
* before examining the result.
*/
public HostColumnVector copyToHostAsync(Cuda.Stream stream) {
return copyToHostAsync(stream, DefaultHostMemoryAllocator.get());
}

/**
* Calculate the total space required to copy the data to the host. This should be padded to
* the alignment that the CPU requires.
Expand Down
Loading

0 comments on commit 9ea93ba

Please sign in to comment.