Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java expose parquet pass_read_limit #14564

Merged
merged 2 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 40 additions & 5 deletions java/src/main/java/ai/rapids/cudf/ParquetChunkedReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,21 @@ public ParquetChunkedReader(long chunkSizeByteLimit, File filePath) {
* @param filePath Full path of the input Parquet file to read.
*/
public ParquetChunkedReader(long chunkSizeByteLimit, ParquetOptions opts, File filePath) {
handle = create(chunkSizeByteLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(),
this(chunkSizeByteLimit, 0, opts, filePath);
}

/**
* Construct the reader instance from a read limit, a ParquetOptions object, and a file path.
*
* @param chunkSizeByteLimit Limit on total number of bytes to be returned per read,
* or 0 if there is no limit.
* @param passReadLimit Limit on the amount of memory used for reading and decompressing data or
* 0 if there is no limit
* @param opts The options for Parquet reading.
* @param filePath Full path of the input Parquet file to read.
*/
public ParquetChunkedReader(long chunkSizeByteLimit, long passReadLimit, ParquetOptions opts, File filePath) {
handle = create(chunkSizeByteLimit, passReadLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(),
filePath.getAbsolutePath(), 0, 0, opts.timeUnit().typeId.getNativeId());

if (handle == 0) {
Expand All @@ -67,8 +81,26 @@ public ParquetChunkedReader(long chunkSizeByteLimit, ParquetOptions opts, File f
* @param len The number of bytes to parse the given buffer.
*/
public ParquetChunkedReader(long chunkSizeByteLimit, ParquetOptions opts, HostMemoryBuffer buffer,
long offset, long len) {
handle = create(chunkSizeByteLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), null,
long offset, long len) {
this(chunkSizeByteLimit, 0L, opts, buffer, offset, len);
}

/**
* Construct the reader instance from a read limit and a file already read in a memory buffer.
*
* @param chunkSizeByteLimit Limit on total number of bytes to be returned per read,
* or 0 if there is no limit.
* @param passReadLimit Limit on the amount of memory used for reading and decompressing data or
* 0 if there is no limit
* @param opts The options for Parquet reading.
* @param buffer Raw Parquet file content.
* @param offset The starting offset into buffer.
* @param len The number of bytes to parse the given buffer.
*/
public ParquetChunkedReader(long chunkSizeByteLimit, long passReadLimit,
ParquetOptions opts, HostMemoryBuffer buffer,
long offset, long len) {
handle = create(chunkSizeByteLimit,passReadLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), null,
buffer.getAddress() + offset, len, opts.timeUnit().typeId.getNativeId());

if (handle == 0) {
Expand Down Expand Up @@ -169,15 +201,18 @@ public void close() {
*
* @param chunkSizeByteLimit Limit on total number of bytes to be returned per read,
* or 0 if there is no limit.
* @param passReadLimit Limit on the amount of memory used for reading and decompressing
* data or 0 if there is no limit.
* @param filterColumnNames Name of the columns to read, or an empty array if we want to read all.
* @param binaryToString Whether to convert the corresponding column to String if it is binary.
* @param filePath Full path of the file to read, or given as null if reading from a buffer.
* @param bufferAddrs The address of a buffer to read from, or 0 if we are not using that buffer.
* @param length The length of the buffer to read from.
* @param timeUnit Return type of time unit for timestamps.
*/
private static native long create(long chunkSizeByteLimit, String[] filterColumnNames,
boolean[] binaryToString, String filePath, long bufferAddrs, long length, int timeUnit);
private static native long create(long chunkSizeByteLimit, long passReadLimit,
String[] filterColumnNames, boolean[] binaryToString,
String filePath, long bufferAddrs, long length, int timeUnit);

private static native long createWithDataSource(long chunkedSizeByteLimit,
String[] filterColumnNames, boolean[] binaryToString, int timeUnit, long dataSourceHandle);
Expand Down
11 changes: 6 additions & 5 deletions java/src/main/native/src/ChunkedReaderJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ extern "C" {
// This function should take all the parameters that `Table.readParquet` takes,
// plus one more parameter `long chunkSizeByteLimit`.
JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_create(
JNIEnv *env, jclass, jlong chunk_read_limit, jobjectArray filter_col_names,
jbooleanArray j_col_binary_read, jstring inp_file_path, jlong buffer, jlong buffer_length,
jint unit) {
JNIEnv *env, jclass, jlong chunk_read_limit, jlong pass_read_limit,
jobjectArray filter_col_names, jbooleanArray j_col_binary_read, jstring inp_file_path,
jlong buffer, jlong buffer_length, jint unit) {
JNI_NULL_CHECK(env, j_col_binary_read, "Null col_binary_read", 0);
bool read_buffer = true;
if (buffer == 0) {
Expand Down Expand Up @@ -79,8 +79,9 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_create(
.timestamp_type(cudf::data_type(static_cast<cudf::type_id>(unit)))
.build();

return reinterpret_cast<jlong>(new cudf::io::chunked_parquet_reader(
static_cast<std::size_t>(chunk_read_limit), read_opts));
return reinterpret_cast<jlong>(
new cudf::io::chunked_parquet_reader(static_cast<std::size_t>(chunk_read_limit),
static_cast<std::size_t>(pass_read_limit), read_opts));
}
CATCH_STD(env, 0);
}
Expand Down