Skip to content

Commit

Permalink
Add Java methods to split and write column views (#8546)
Browse files Browse the repository at this point in the history
This PR pertains to making a couple of optimizations needed to support cases when the formation of a vector isn't needed for operations. 
- Split returns a ColumnView in cases where we don't need to own the underlying buffers
- Added a method to write ColumnView to parquet directly, circumventing the formation of Table with ColumnVectors

Authors:
  - Raza Jafri (https://github.com/razajafri)

Approvers:
  - Jason Lowe (https://github.com/jlowe)

URL: #8546
  • Loading branch information
razajafri authored Jun 18, 2021
1 parent 45626b6 commit 0099f11
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 11 deletions.
74 changes: 71 additions & 3 deletions java/src/main/java/ai/rapids/cudf/ColumnView.java
Original file line number Diff line number Diff line change
Expand Up @@ -594,12 +594,80 @@ public final ColumnVector subVector(int start, int end) {
* @return A new ColumnVector array with slices from the original ColumnVector
*/
public final ColumnVector[] split(int... indices) {
ColumnView[] views = splitAsViews(indices);
ColumnVector[] columnVectors = new ColumnVector[views.length];
try {
for (int i = 0; i < views.length; i++) {
columnVectors[i] = views[i].copyToColumnVector();
}
return columnVectors;
} catch (Throwable t) {
for (ColumnVector cv : columnVectors) {
if (cv != null) {
cv.close();
}
}
throw t;
} finally {
for (ColumnView view : views) {
view.close();
}
}
}

/**
* Splits a ColumnView (including null values) into a set of ColumnViews
* according to a set of indices. No data is moved or copied.
*
* IMPORTANT NOTE: Nothing is copied out from the vector and the slices will only be relevant for
* the lifecycle of the underlying ColumnVector.
*
* The "split" function divides the input column into multiple intervals
* of rows using the splits indices values and it stores the intervals into the
* output columns. Regarding the interval of indices, a pair of values are taken
* from the indices array in a consecutive manner. The pair of indices are
* left-closed and right-open.
*
* The indices array ('splits') is required to be a monotonic non-decreasing set.
* The indices in the array are required to comply with the following conditions:
* a, b belongs to Range[0, input column size]
* a <= b, where the position of 'a' is less or equal to the position of 'b'.
*
* The split function will take a pair of indices from the indices array
* ('splits') in a consecutive manner. For the first pair, the function will
* take the value 0 and the first element of the indices array. For the last pair,
* the function will take the last element of the indices array and the size of
* the input column.
*
* Exceptional cases for the indices array are:
* When the values in the pair are equal, the function return an empty column.
* When the values in the pair are 'strictly decreasing', the outcome is
* undefined.
* When any of the values in the pair don't belong to the range[0, input column
* size), the outcome is undefined.
* When the indices array is empty, an empty array of ColumnViews is returned.
*
* The output columns may have different sizes. The number of
* columns must be equal to the number of indices in the array plus one.
*
* Example:
* input: {10, 12, 14, 16, 18, 20, 22, 24, 26, 28}
* splits: {2, 5, 9}
* output: {{10, 12}, {14, 16, 18}, {20, 22, 24, 26}, {28}}
*
* Note that this is very similar to the output from a PartitionedTable.
*
*
* @param indices the indices to split with
* @return A new ColumnView array with slices from the original ColumnView
*/
public ColumnView[] splitAsViews(int... indices) {
long[] nativeHandles = split(this.getNativeView(), indices);
ColumnVector[] columnVectors = new ColumnVector[nativeHandles.length];
ColumnView[] columnViews = new ColumnView[nativeHandles.length];
for (int i = 0; i < nativeHandles.length; i++) {
columnVectors[i] = new ColumnVector(nativeHandles[i]);
columnViews[i] = new ColumnView(nativeHandles[i]);
}
return columnVectors;
return columnViews;
}

/**
Expand Down
40 changes: 40 additions & 0 deletions java/src/main/java/ai/rapids/cudf/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,46 @@ public static TableWriter writeParquetChunked(ParquetWriterOptions options,
return new ParquetTableWriter(options, consumer);
}

/**
* This is an evolving API and most likely be removed in future releases. Please use with the
* caveat that this will not exist in the near future.
* @param options the Parquet writer options.
* @param consumer a class that will be called when host buffers are ready with Parquet
* formatted data in them.
* @param columnViews ColumnViews to write to Parquet
*/
public static void writeColumnViewsToParquet(ParquetWriterOptions options,
HostBufferConsumer consumer,
ColumnView... columnViews) {
assert columnViews != null && columnViews.length > 0 : "ColumnViews can't be null or empty";
long rows = columnViews[0].getRowCount();

for (ColumnView columnView : columnViews) {
assert (null != columnView) : "ColumnViews can't be null";
assert (rows == columnView.getRowCount()) : "All columns should have the same number of " +
"rows " + columnView.getType();
}

// Since Arrays are mutable objects make a copy
long[] viewPointers = new long[columnViews.length];
for (int i = 0; i < columnViews.length; i++) {
viewPointers[i] = columnViews[i].getNativeView();
}

long nativeHandle = createCudfTableView(viewPointers);
try {
try (ParquetTableWriter writer = new ParquetTableWriter(options, consumer)) {
long total = 0;
for (ColumnView cv : columnViews) {
total += cv.getDeviceMemorySize();
}
writeParquetChunk(writer.handle, nativeHandle, total);
}
} finally {
deleteCudfTable(nativeHandle);
}
}

/**
* Writes this table to a Parquet file on the host
*
Expand Down
14 changes: 12 additions & 2 deletions java/src/main/native/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ message(VERBOSE "CUDF_JNI: Build with GPUDirect Storage support: ${USE_GDS}")
message(VERBOSE "CUDF_JNI: Build with static Arrow library: ${CUDF_JNI_ARROW_STATIC}")

set(CUDF_SOURCE_DIR "${PROJECT_SOURCE_DIR}/../../../../cpp")
set(CUDF_CPP_BUILD_DIR "${CUDF_SOURCE_DIR}/build")
if (DEFINED ENV{CUDF_CPP_BUILD_DIR})
set(CUDF_CPP_BUILD_DIR "$ENV{CUDF_CPP_BUILD_DIR}")
else()
set(CUDF_CPP_BUILD_DIR "${CUDF_SOURCE_DIR}/build")
endif()

set(CMAKE_MODULE_PATH
"${CMAKE_CURRENT_SOURCE_DIR}/cmake/Modules/"
Expand Down Expand Up @@ -111,27 +115,33 @@ endif(CUDA_STATIC_RUNTIME)

###################################################################################################
# - Thrust/CUB/libcudacxx ------------------------------------------------------------------------------------

find_path(THRUST_INCLUDE "thrust"
HINTS "$ENV{CUDF_ROOT}/_deps/thrust-src"
"${CUDF_CPP_BUILD_DIR}/_deps/thrust-src"
"$ENV{CONDA_PREFIX}/include")

message(STATUS "THRUST: THRUST_INCLUDE set to ${THRUST_INCLUDE}")

find_path(CUB_INCLUDE "cub"
HINTS "$ENV{CUDF_ROOT}/_deps/thrust-src"
"${CUDF_CPP_BUILD_DIR}/_deps/thrust-src"
"$ENV{CONDA_PREFIX}/include")

message(STATUS "CUB: CUB_INCLUDE set to ${CUB_INCLUDE}")

find_path(LIBCUDACXX_INCLUDE "cuda"
HINTS "$ENV{CUDF_ROOT}/_deps/libcudacxx-src/include"
"${CUDF_CPP_BUILD_DIR}/_deps/libcudacxx-src/include")

message(STATUS "LIBCUDACXX: LIBCUDACXX_INCLUDE set to ${LIBCUDACXX_INCLUDE}")

find_path(SPDLOG_INCLUDE "spdlog"
HINTS "${CUDF_CPP_BUILD_DIR}/_deps/spdlog-src/include"
"$ENV{RMM_ROOT}/_deps/spdlog-src/include"
"$ENV{RMM_ROOT}/include"
"$ENV{CONDA_PREFIX}/include")

message(STATUS "SPDLOG: SPDLOG_INCLUDE set to ${SPDLOG_INCLUDE}")
###################################################################################################
# - CUDF ------------------------------------------------------------------------------------------

Expand Down
9 changes: 3 additions & 6 deletions java/src/main/native/src/ColumnViewJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,14 +471,11 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_ColumnView_split(JNIEnv *env, j
}

std::vector<cudf::column_view> result = cudf::split(*n_column, indices);

cudf::jni::native_jlongArray n_result(env, result.size());
std::vector<std::unique_ptr<cudf::column>> column_result(result.size());
for (size_t i = 0; i < result.size(); i++) {
column_result[i].reset(new cudf::column(result[i]));
n_result[i] = reinterpret_cast<jlong>(column_result[i].get());
}
for (size_t i = 0; i < result.size(); i++) {
column_result[i].release();
cudf::column_view const * c = new cudf::column_view(result[i]);
n_result[i] = reinterpret_cast<jlong>(c);
}
return n_result.get_jArray();
}
Expand Down

0 comments on commit 0099f11

Please sign in to comment.