Skip to content

Commit

Permalink
Java expose parquet pass_read_limit (#14564)
Browse files Browse the repository at this point in the history
This exposes the chunked parquet reader parameter pass_read_limit to java.

Authors:
  - Robert (Bobby) Evans (https://github.com/revans2)

Approvers:
  - Jason Lowe (https://github.com/jlowe)

URL: #14564
  • Loading branch information
revans2 authored Dec 5, 2023
1 parent 4054f3e commit 1c46d7d
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 10 deletions.
45 changes: 40 additions & 5 deletions java/src/main/java/ai/rapids/cudf/ParquetChunkedReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,21 @@ public ParquetChunkedReader(long chunkSizeByteLimit, File filePath) {
* @param filePath Full path of the input Parquet file to read.
*/
public ParquetChunkedReader(long chunkSizeByteLimit, ParquetOptions opts, File filePath) {
handle = create(chunkSizeByteLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(),
this(chunkSizeByteLimit, 0, opts, filePath);
}

/**
* Construct the reader instance from a read limit, a ParquetOptions object, and a file path.
*
* @param chunkSizeByteLimit Limit on total number of bytes to be returned per read,
* or 0 if there is no limit.
* @param passReadLimit Limit on the amount of memory used for reading and decompressing data or
* 0 if there is no limit
* @param opts The options for Parquet reading.
* @param filePath Full path of the input Parquet file to read.
*/
public ParquetChunkedReader(long chunkSizeByteLimit, long passReadLimit, ParquetOptions opts, File filePath) {
handle = create(chunkSizeByteLimit, passReadLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(),
filePath.getAbsolutePath(), 0, 0, opts.timeUnit().typeId.getNativeId());

if (handle == 0) {
Expand All @@ -67,8 +81,26 @@ public ParquetChunkedReader(long chunkSizeByteLimit, ParquetOptions opts, File f
* @param len The number of bytes to parse the given buffer.
*/
public ParquetChunkedReader(long chunkSizeByteLimit, ParquetOptions opts, HostMemoryBuffer buffer,
long offset, long len) {
handle = create(chunkSizeByteLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), null,
long offset, long len) {
this(chunkSizeByteLimit, 0L, opts, buffer, offset, len);
}

/**
* Construct the reader instance from a read limit and a file already read in a memory buffer.
*
* @param chunkSizeByteLimit Limit on total number of bytes to be returned per read,
* or 0 if there is no limit.
* @param passReadLimit Limit on the amount of memory used for reading and decompressing data or
* 0 if there is no limit
* @param opts The options for Parquet reading.
* @param buffer Raw Parquet file content.
* @param offset The starting offset into buffer.
* @param len The number of bytes to parse the given buffer.
*/
public ParquetChunkedReader(long chunkSizeByteLimit, long passReadLimit,
ParquetOptions opts, HostMemoryBuffer buffer,
long offset, long len) {
handle = create(chunkSizeByteLimit,passReadLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), null,
buffer.getAddress() + offset, len, opts.timeUnit().typeId.getNativeId());

if (handle == 0) {
Expand Down Expand Up @@ -169,15 +201,18 @@ public void close() {
*
* @param chunkSizeByteLimit Limit on total number of bytes to be returned per read,
* or 0 if there is no limit.
* @param passReadLimit Limit on the amount of memory used for reading and decompressing
* data or 0 if there is no limit.
* @param filterColumnNames Name of the columns to read, or an empty array if we want to read all.
* @param binaryToString Whether to convert the corresponding column to String if it is binary.
* @param filePath Full path of the file to read, or given as null if reading from a buffer.
* @param bufferAddrs The address of a buffer to read from, or 0 if we are not using that buffer.
* @param length The length of the buffer to read from.
* @param timeUnit Return type of time unit for timestamps.
*/
private static native long create(long chunkSizeByteLimit, String[] filterColumnNames,
boolean[] binaryToString, String filePath, long bufferAddrs, long length, int timeUnit);
private static native long create(long chunkSizeByteLimit, long passReadLimit,
String[] filterColumnNames, boolean[] binaryToString,
String filePath, long bufferAddrs, long length, int timeUnit);

private static native long createWithDataSource(long chunkedSizeByteLimit,
String[] filterColumnNames, boolean[] binaryToString, int timeUnit, long dataSourceHandle);
Expand Down
11 changes: 6 additions & 5 deletions java/src/main/native/src/ChunkedReaderJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ extern "C" {
// This function should take all the parameters that `Table.readParquet` takes,
// plus one more parameter `long chunkSizeByteLimit`.
JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_create(
JNIEnv *env, jclass, jlong chunk_read_limit, jobjectArray filter_col_names,
jbooleanArray j_col_binary_read, jstring inp_file_path, jlong buffer, jlong buffer_length,
jint unit) {
JNIEnv *env, jclass, jlong chunk_read_limit, jlong pass_read_limit,
jobjectArray filter_col_names, jbooleanArray j_col_binary_read, jstring inp_file_path,
jlong buffer, jlong buffer_length, jint unit) {
JNI_NULL_CHECK(env, j_col_binary_read, "Null col_binary_read", 0);
bool read_buffer = true;
if (buffer == 0) {
Expand Down Expand Up @@ -79,8 +79,9 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_create(
.timestamp_type(cudf::data_type(static_cast<cudf::type_id>(unit)))
.build();

return reinterpret_cast<jlong>(new cudf::io::chunked_parquet_reader(
static_cast<std::size_t>(chunk_read_limit), read_opts));
return reinterpret_cast<jlong>(
new cudf::io::chunked_parquet_reader(static_cast<std::size_t>(chunk_read_limit),
static_cast<std::size_t>(pass_read_limit), read_opts));
}
CATCH_STD(env, 0);
}
Expand Down

0 comments on commit 1c46d7d

Please sign in to comment.