Skip to content

Commit

Permalink
Implement JNI for chunked ORC reader (#15446)
Browse files Browse the repository at this point in the history
This adds JNI implementation for chunked ORC reader, allowing to read ORC files by an iterative manner.

Depends on: 
 * #15094

Closes #12228.

Authors:
  - Nghia Truong (https://github.com/ttnghia)

Approvers:
  - Jason Lowe (https://github.com/jlowe)

URL: #15446
  • Loading branch information
ttnghia authored May 6, 2024
1 parent d3c4cf4 commit 4dc6162
Show file tree
Hide file tree
Showing 4 changed files with 352 additions and 14 deletions.
169 changes: 169 additions & 0 deletions java/src/main/java/ai/rapids/cudf/ORCChunkedReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package ai.rapids.cudf;

/**
* Provide an interface for reading an ORC file in an iterative manner.
*/
public class ORCChunkedReader implements AutoCloseable {
static {
NativeDepsLoader.loadNativeDeps();
}

/**
* Construct the reader instance from read limits, output row granularity,
* and a file already loaded in a memory buffer.
*
* @param chunkReadLimit Limit on total number of bytes to be returned per read,
* or 0 if there is no limit.
* @param passReadLimit Limit on the amount of memory used by the chunked reader,
* or 0 if there is no limit.
* @param opts The options for ORC reading.
* @param buffer Raw ORC file content.
* @param offset The starting offset into buffer.
* @param len The number of bytes to parse the given buffer.
*/
public ORCChunkedReader(long chunkReadLimit, long passReadLimit,
ORCOptions opts, HostMemoryBuffer buffer, long offset, long len) {
handle = createReader(chunkReadLimit, passReadLimit,
opts.getIncludeColumnNames(), buffer.getAddress() + offset, len,
opts.usingNumPyTypes(), opts.timeUnit().typeId.getNativeId(),
opts.getDecimal128Columns());
if (handle == 0) {
throw new IllegalStateException("Cannot create native chunked ORC reader object.");
}
}

/**
* Construct a chunked ORC reader instance, similar to
* {@link ORCChunkedReader#ORCChunkedReader(long, long, ORCOptions, HostMemoryBuffer, long, long)},
* with an additional parameter to control the granularity of the output table.
* When reading a chunk table, with respect to the given size limits, a subset of stripes may
* be loaded, decompressed and decoded into a large intermediate table. The reader will then
* subdivide that table into smaller tables for final output using
* {@code outputRowSizingGranularity} as the subdivision step. If the chunked reader is
* constructed without this parameter, the default value of 10k rows will be used.
*
* @param outputRowSizingGranularity The change step in number of rows in the output table.
* @see ORCChunkedReader#ORCChunkedReader(long, long, ORCOptions, HostMemoryBuffer, long, long)
*/
public ORCChunkedReader(long chunkReadLimit, long passReadLimit, long outputRowSizingGranularity,
ORCOptions opts, HostMemoryBuffer buffer, long offset, long len) {
handle = createReaderWithOutputGranularity(chunkReadLimit, passReadLimit, outputRowSizingGranularity,
opts.getIncludeColumnNames(), buffer.getAddress() + offset, len,
opts.usingNumPyTypes(), opts.timeUnit().typeId.getNativeId(),
opts.getDecimal128Columns());
if (handle == 0) {
throw new IllegalStateException("Cannot create native chunked ORC reader object.");
}
}

/**
* Check if the given file has anything left to read.
*
* @return A boolean value indicating if there is more data to read from file.
*/
public boolean hasNext() {
if (handle == 0) {
throw new IllegalStateException("Native chunked ORC reader object may have been closed.");
}

if (firstCall) {
// This function needs to return true at least once, so an empty table
// (but having empty columns instead of no column) can be returned by readChunk()
// if the input file has no row.
firstCall = false;
return true;
}
return hasNext(handle);
}

/**
* Read a chunk of rows in the given ORC file such that the returning data has total size
* does not exceed the given read limit. If the given file has no data, or all data has been read
* before by previous calls to this function, a null Table will be returned.
*
* @return A table of new rows reading from the given file.
*/
public Table readChunk() {
if (handle == 0) {
throw new IllegalStateException("Native chunked ORC reader object may have been closed.");
}

long[] columnPtrs = readChunk(handle);
return columnPtrs != null ? new Table(columnPtrs) : null;
}

@Override
public void close() {
if (handle != 0) {
close(handle);
handle = 0;
}
}


/**
* Auxiliary variable to help {@link #hasNext()} returning true at least once.
*/
private boolean firstCall = true;

/**
* Handle for memory address of the native ORC chunked reader class.
*/
private long handle;

/**
* Create a native chunked ORC reader object on heap and return its memory address.
*
* @param chunkReadLimit Limit on total number of bytes to be returned per read,
* or 0 if there is no limit.
* @param passReadLimit Limit on the amount of memory used by the chunked reader,
* or 0 if there is no limit.
* @param filterColumnNames Name of the columns to read, or an empty array if we want to read all.
* @param bufferAddrs The address of a buffer to read from, or 0 if we are not using that buffer.
* @param length The length of the buffer to read from.
* @param usingNumPyTypes Whether the parser should implicitly promote TIMESTAMP
* columns to TIMESTAMP_MILLISECONDS for compatibility with NumPy.
* @param timeUnit return type of TimeStamp in units
* @param decimal128Columns name of the columns which are read as Decimal128 rather than Decimal64
*/
private static native long createReader(long chunkReadLimit, long passReadLimit,
String[] filterColumnNames, long bufferAddrs, long length,
boolean usingNumPyTypes, int timeUnit, String[] decimal128Columns);

/**
* Create a native chunked ORC reader object, similar to
* {@link ORCChunkedReader#createReader(long, long, String[], long, long, boolean, int, String[])},
* with an additional parameter to control the granularity of the output table.
*
* @param outputRowSizingGranularity The change step in number of rows in the output table.
* @see ORCChunkedReader#createReader(long, long, String[], long, long, boolean, int, String[])
*/
private static native long createReaderWithOutputGranularity(
long chunkReadLimit, long passReadLimit, long outputRowSizingGranularity,
String[] filterColumnNames, long bufferAddrs, long length,
boolean usingNumPyTypes, int timeUnit, String[] decimal128Columns);

private static native boolean hasNext(long handle);

private static native long[] readChunk(long handle);

private static native void close(long handle);
}
173 changes: 159 additions & 14 deletions java/src/main/native/src/ChunkedReaderJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@
#include "jni_utils.hpp"

#include <cudf/column/column.hpp>
#include <cudf/io/orc.hpp>
#include <cudf/io/parquet.hpp>
#include <cudf/table/table.hpp>

#include <memory>
#include <optional>
#include <vector>

// This function is defined in `TableJni.cpp`.
jlongArray cudf::jni::convert_table_for_return(
JNIEnv* env,
std::unique_ptr<cudf::table>&& table_result,
std::vector<std::unique_ptr<cudf::column>>&& extra_columns);

// This file is for the code related to chunked reader (Parquet, ORC, etc.).

extern "C" {

//
// Chunked Parquet reader JNI
//

// This function should take all the parameters that `Table.readParquet` takes,
// plus one more parameter `long chunkSizeByteLimit`.
JNIEXPORT jlong JNICALL
Expand All @@ -54,19 +54,17 @@ Java_ai_rapids_cudf_ParquetChunkedReader_create(JNIEnv* env,
JNI_NULL_CHECK(env, inp_file_path, "Input file or buffer must be supplied", 0);
read_buffer = false;
} else if (inp_file_path != nullptr) {
JNI_THROW_NEW(env,
"java/lang/IllegalArgumentException",
"Cannot pass in both a buffer and an inp_file_path",
0);
JNI_THROW_NEW(
env, cudf::jni::ILLEGAL_ARG_CLASS, "Cannot pass in both a buffer and an inp_file_path", 0);
} else if (buffer_length <= 0) {
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "An empty buffer is not supported", 0);
JNI_THROW_NEW(env, cudf::jni::ILLEGAL_ARG_CLASS, "An empty buffer is not supported", 0);
}

try {
cudf::jni::auto_set_device(env);
cudf::jni::native_jstring filename(env, inp_file_path);
if (!read_buffer && filename.is_empty()) {
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "inp_file_path cannot be empty", 0);
JNI_THROW_NEW(env, cudf::jni::ILLEGAL_ARG_CLASS, "inp_file_path cannot be empty", 0);
}

cudf::jni::native_jstringArray n_filter_col_names(env, filter_col_names);
Expand Down Expand Up @@ -155,15 +153,15 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_readChunk(
jclass,
jlong handle)
{
JNI_NULL_CHECK(env, handle, "handle is null", 0);
JNI_NULL_CHECK(env, handle, "handle is null", nullptr);

try {
cudf::jni::auto_set_device(env);
auto const reader_ptr = reinterpret_cast<cudf::io::chunked_parquet_reader* const>(handle);
auto chunk = reader_ptr->read_chunk();
return chunk.tbl ? cudf::jni::convert_table_for_return(env, chunk.tbl) : nullptr;
}
CATCH_STD(env, 0);
CATCH_STD(env, nullptr);
}

JNIEXPORT void JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_close(JNIEnv* env,
Expand All @@ -179,4 +177,151 @@ JNIEXPORT void JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_close(JNIEnv* en
CATCH_STD(env, );
}

//
// Chunked ORC reader JNI
//

namespace {
jlong create_chunked_orc_reader(JNIEnv* env,
jlong chunk_read_limit,
jlong pass_read_limit,
std::optional<jlong> output_granularity,
jobjectArray filter_col_names,
jlong buffer,
jlong buffer_length,
jboolean using_numpy_Types,
jint unit,
jobjectArray dec128_col_names)
{
JNI_NULL_CHECK(env, buffer, "buffer is null", 0);
if (buffer_length <= 0) {
JNI_THROW_NEW(env, cudf::jni::ILLEGAL_ARG_CLASS, "An empty buffer is not supported", 0);
}

try {
cudf::jni::auto_set_device(env);
cudf::jni::native_jstringArray n_filter_col_names(env, filter_col_names);
cudf::jni::native_jstringArray n_dec128_col_names(env, dec128_col_names);

auto const source = cudf::io::source_info(reinterpret_cast<char*>(buffer),
static_cast<std::size_t>(buffer_length));
auto opts_builder = cudf::io::orc_reader_options::builder(source);
if (n_filter_col_names.size() > 0) {
opts_builder = opts_builder.columns(n_filter_col_names.as_cpp_vector());
}
auto const read_opts = opts_builder.use_index(false)
.use_np_dtypes(static_cast<bool>(using_numpy_Types))
.timestamp_type(cudf::data_type(static_cast<cudf::type_id>(unit)))
.decimal128_columns(n_dec128_col_names.as_cpp_vector())
.build();

if (output_granularity) {
return reinterpret_cast<jlong>(
new cudf::io::chunked_orc_reader(static_cast<std::size_t>(chunk_read_limit),
static_cast<std::size_t>(pass_read_limit),
static_cast<std::size_t>(output_granularity.value()),
read_opts));
}
return reinterpret_cast<jlong>(
new cudf::io::chunked_orc_reader(static_cast<std::size_t>(chunk_read_limit),
static_cast<std::size_t>(pass_read_limit),
read_opts));
}
CATCH_STD(env, 0);
}
} // namespace

// This function should take all the parameters that `Table.readORC` takes,
// plus two more parameters: `chunk_read_limit` and `pass_read_limit`.
JNIEXPORT jlong JNICALL
Java_ai_rapids_cudf_ORCChunkedReader_createReader(JNIEnv* env,
jclass,
jlong chunk_read_limit,
jlong pass_read_limit,
jobjectArray filter_col_names,
jlong buffer,
jlong buffer_length,
jboolean using_numpy_Types,
jint unit,
jobjectArray dec128_col_names)
{
return create_chunked_orc_reader(env,
chunk_read_limit,
pass_read_limit,
std::nullopt,
filter_col_names,
buffer,
buffer_length,
using_numpy_Types,
unit,
dec128_col_names);
}

// This function should take all the parameters that `Table.readORC` takes,
// plus three more parameters: `chunk_read_limit`, `pass_read_limit`, `output_granularity`.
JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_ORCChunkedReader_createReaderWithOutputGranularity(
JNIEnv* env,
jclass,
jlong chunk_read_limit,
jlong pass_read_limit,
jlong output_granularity,
jobjectArray filter_col_names,
jlong buffer,
jlong buffer_length,
jboolean using_numpy_Types,
jint unit,
jobjectArray dec128_col_names)
{
return create_chunked_orc_reader(env,
chunk_read_limit,
pass_read_limit,
output_granularity,
filter_col_names,
buffer,
buffer_length,
using_numpy_Types,
unit,
dec128_col_names);
}

JNIEXPORT jboolean JNICALL Java_ai_rapids_cudf_ORCChunkedReader_hasNext(JNIEnv* env,
jclass,
jlong handle)
{
JNI_NULL_CHECK(env, handle, "handle is null", false);

try {
cudf::jni::auto_set_device(env);
auto const reader_ptr = reinterpret_cast<cudf::io::chunked_orc_reader* const>(handle);
return reader_ptr->has_next();
}
CATCH_STD(env, false);
}

JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_ORCChunkedReader_readChunk(JNIEnv* env,
jclass,
jlong handle)
{
JNI_NULL_CHECK(env, handle, "handle is null", nullptr);

try {
cudf::jni::auto_set_device(env);
auto const reader_ptr = reinterpret_cast<cudf::io::chunked_orc_reader* const>(handle);
auto chunk = reader_ptr->read_chunk();
return chunk.tbl ? cudf::jni::convert_table_for_return(env, chunk.tbl) : nullptr;
}
CATCH_STD(env, nullptr);
}

JNIEXPORT void JNICALL Java_ai_rapids_cudf_ORCChunkedReader_close(JNIEnv* env, jclass, jlong handle)
{
JNI_NULL_CHECK(env, handle, "handle is null", );

try {
cudf::jni::auto_set_device(env);
delete reinterpret_cast<cudf::io::chunked_orc_reader*>(handle);
}
CATCH_STD(env, );
}

} // extern "C"
Loading

0 comments on commit 4dc6162

Please sign in to comment.