diff --git a/java/src/main/java/ai/rapids/cudf/ParquetChunkedReader.java b/java/src/main/java/ai/rapids/cudf/ParquetChunkedReader.java index 17d59b757c3..53af52eff07 100644 --- a/java/src/main/java/ai/rapids/cudf/ParquetChunkedReader.java +++ b/java/src/main/java/ai/rapids/cudf/ParquetChunkedReader.java @@ -48,7 +48,21 @@ public ParquetChunkedReader(long chunkSizeByteLimit, File filePath) { * @param filePath Full path of the input Parquet file to read. */ public ParquetChunkedReader(long chunkSizeByteLimit, ParquetOptions opts, File filePath) { - handle = create(chunkSizeByteLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), + this(chunkSizeByteLimit, 0, opts, filePath); + } + + /** + * Construct the reader instance from a read limit, a ParquetOptions object, and a file path. + * + * @param chunkSizeByteLimit Limit on total number of bytes to be returned per read, + * or 0 if there is no limit. + * @param passReadLimit Limit on the amount of memory used for reading and decompressing data or + * 0 if there is no limit + * @param opts The options for Parquet reading. + * @param filePath Full path of the input Parquet file to read. + */ + public ParquetChunkedReader(long chunkSizeByteLimit, long passReadLimit, ParquetOptions opts, File filePath) { + handle = create(chunkSizeByteLimit, passReadLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), filePath.getAbsolutePath(), 0, 0, opts.timeUnit().typeId.getNativeId()); if (handle == 0) { @@ -67,8 +81,26 @@ public ParquetChunkedReader(long chunkSizeByteLimit, ParquetOptions opts, File f * @param len The number of bytes to parse the given buffer. */ public ParquetChunkedReader(long chunkSizeByteLimit, ParquetOptions opts, HostMemoryBuffer buffer, - long offset, long len) { - handle = create(chunkSizeByteLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), null, + long offset, long len) { + this(chunkSizeByteLimit, 0L, opts, buffer, offset, len); + } + + /** + * Construct the reader instance from a read limit and a file already read in a memory buffer. + * + * @param chunkSizeByteLimit Limit on total number of bytes to be returned per read, + * or 0 if there is no limit. + * @param passReadLimit Limit on the amount of memory used for reading and decompressing data or + * 0 if there is no limit + * @param opts The options for Parquet reading. + * @param buffer Raw Parquet file content. + * @param offset The starting offset into buffer. + * @param len The number of bytes to parse the given buffer. + */ + public ParquetChunkedReader(long chunkSizeByteLimit, long passReadLimit, + ParquetOptions opts, HostMemoryBuffer buffer, + long offset, long len) { + handle = create(chunkSizeByteLimit,passReadLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), null, buffer.getAddress() + offset, len, opts.timeUnit().typeId.getNativeId()); if (handle == 0) { @@ -169,6 +201,8 @@ public void close() { * * @param chunkSizeByteLimit Limit on total number of bytes to be returned per read, * or 0 if there is no limit. + * @param passReadLimit Limit on the amount of memory used for reading and decompressing + * data or 0 if there is no limit. * @param filterColumnNames Name of the columns to read, or an empty array if we want to read all. * @param binaryToString Whether to convert the corresponding column to String if it is binary. * @param filePath Full path of the file to read, or given as null if reading from a buffer. @@ -176,8 +210,9 @@ public void close() { * @param length The length of the buffer to read from. * @param timeUnit Return type of time unit for timestamps. */ - private static native long create(long chunkSizeByteLimit, String[] filterColumnNames, - boolean[] binaryToString, String filePath, long bufferAddrs, long length, int timeUnit); + private static native long create(long chunkSizeByteLimit, long passReadLimit, + String[] filterColumnNames, boolean[] binaryToString, + String filePath, long bufferAddrs, long length, int timeUnit); private static native long createWithDataSource(long chunkedSizeByteLimit, String[] filterColumnNames, boolean[] binaryToString, int timeUnit, long dataSourceHandle); diff --git a/java/src/main/native/src/ChunkedReaderJni.cpp b/java/src/main/native/src/ChunkedReaderJni.cpp index 0044385f267..5ce23bbe712 100644 --- a/java/src/main/native/src/ChunkedReaderJni.cpp +++ b/java/src/main/native/src/ChunkedReaderJni.cpp @@ -36,9 +36,9 @@ extern "C" { // This function should take all the parameters that `Table.readParquet` takes, // plus one more parameter `long chunkSizeByteLimit`. JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_create( - JNIEnv *env, jclass, jlong chunk_read_limit, jobjectArray filter_col_names, - jbooleanArray j_col_binary_read, jstring inp_file_path, jlong buffer, jlong buffer_length, - jint unit) { + JNIEnv *env, jclass, jlong chunk_read_limit, jlong pass_read_limit, + jobjectArray filter_col_names, jbooleanArray j_col_binary_read, jstring inp_file_path, + jlong buffer, jlong buffer_length, jint unit) { JNI_NULL_CHECK(env, j_col_binary_read, "Null col_binary_read", 0); bool read_buffer = true; if (buffer == 0) { @@ -79,8 +79,9 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_create( .timestamp_type(cudf::data_type(static_cast(unit))) .build(); - return reinterpret_cast(new cudf::io::chunked_parquet_reader( - static_cast(chunk_read_limit), read_opts)); + return reinterpret_cast( + new cudf::io::chunked_parquet_reader(static_cast(chunk_read_limit), + static_cast(pass_read_limit), read_opts)); } CATCH_STD(env, 0); }