diff --git a/java/src/main/java/ai/rapids/cudf/ORCChunkedReader.java b/java/src/main/java/ai/rapids/cudf/ORCChunkedReader.java new file mode 100644 index 00000000000..2f46c8d1825 --- /dev/null +++ b/java/src/main/java/ai/rapids/cudf/ORCChunkedReader.java @@ -0,0 +1,169 @@ +/* + * + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package ai.rapids.cudf; + +/** + * Provide an interface for reading an ORC file in an iterative manner. + */ +public class ORCChunkedReader implements AutoCloseable { + static { + NativeDepsLoader.loadNativeDeps(); + } + + /** + * Construct the reader instance from read limits, output row granularity, + * and a file already loaded in a memory buffer. + * + * @param chunkReadLimit Limit on total number of bytes to be returned per read, + * or 0 if there is no limit. + * @param passReadLimit Limit on the amount of memory used by the chunked reader, + * or 0 if there is no limit. + * @param opts The options for ORC reading. + * @param buffer Raw ORC file content. + * @param offset The starting offset into buffer. + * @param len The number of bytes to parse the given buffer. + */ + public ORCChunkedReader(long chunkReadLimit, long passReadLimit, + ORCOptions opts, HostMemoryBuffer buffer, long offset, long len) { + handle = createReader(chunkReadLimit, passReadLimit, + opts.getIncludeColumnNames(), buffer.getAddress() + offset, len, + opts.usingNumPyTypes(), opts.timeUnit().typeId.getNativeId(), + opts.getDecimal128Columns()); + if (handle == 0) { + throw new IllegalStateException("Cannot create native chunked ORC reader object."); + } + } + + /** + * Construct a chunked ORC reader instance, similar to + * {@link ORCChunkedReader#ORCChunkedReader(long, long, ORCOptions, HostMemoryBuffer, long, long)}, + * with an additional parameter to control the granularity of the output table. + * When reading a chunk table, with respect to the given size limits, a subset of stripes may + * be loaded, decompressed and decoded into a large intermediate table. The reader will then + * subdivide that table into smaller tables for final output using + * {@code outputRowSizingGranularity} as the subdivision step. If the chunked reader is + * constructed without this parameter, the default value of 10k rows will be used. + * + * @param outputRowSizingGranularity The change step in number of rows in the output table. + * @see ORCChunkedReader#ORCChunkedReader(long, long, ORCOptions, HostMemoryBuffer, long, long) + */ + public ORCChunkedReader(long chunkReadLimit, long passReadLimit, long outputRowSizingGranularity, + ORCOptions opts, HostMemoryBuffer buffer, long offset, long len) { + handle = createReaderWithOutputGranularity(chunkReadLimit, passReadLimit, outputRowSizingGranularity, + opts.getIncludeColumnNames(), buffer.getAddress() + offset, len, + opts.usingNumPyTypes(), opts.timeUnit().typeId.getNativeId(), + opts.getDecimal128Columns()); + if (handle == 0) { + throw new IllegalStateException("Cannot create native chunked ORC reader object."); + } + } + + /** + * Check if the given file has anything left to read. + * + * @return A boolean value indicating if there is more data to read from file. + */ + public boolean hasNext() { + if (handle == 0) { + throw new IllegalStateException("Native chunked ORC reader object may have been closed."); + } + + if (firstCall) { + // This function needs to return true at least once, so an empty table + // (but having empty columns instead of no column) can be returned by readChunk() + // if the input file has no row. + firstCall = false; + return true; + } + return hasNext(handle); + } + + /** + * Read a chunk of rows in the given ORC file such that the returning data has total size + * does not exceed the given read limit. If the given file has no data, or all data has been read + * before by previous calls to this function, a null Table will be returned. + * + * @return A table of new rows reading from the given file. + */ + public Table readChunk() { + if (handle == 0) { + throw new IllegalStateException("Native chunked ORC reader object may have been closed."); + } + + long[] columnPtrs = readChunk(handle); + return columnPtrs != null ? new Table(columnPtrs) : null; + } + + @Override + public void close() { + if (handle != 0) { + close(handle); + handle = 0; + } + } + + + /** + * Auxiliary variable to help {@link #hasNext()} returning true at least once. + */ + private boolean firstCall = true; + + /** + * Handle for memory address of the native ORC chunked reader class. + */ + private long handle; + + /** + * Create a native chunked ORC reader object on heap and return its memory address. + * + * @param chunkReadLimit Limit on total number of bytes to be returned per read, + * or 0 if there is no limit. + * @param passReadLimit Limit on the amount of memory used by the chunked reader, + * or 0 if there is no limit. + * @param filterColumnNames Name of the columns to read, or an empty array if we want to read all. + * @param bufferAddrs The address of a buffer to read from, or 0 if we are not using that buffer. + * @param length The length of the buffer to read from. + * @param usingNumPyTypes Whether the parser should implicitly promote TIMESTAMP + * columns to TIMESTAMP_MILLISECONDS for compatibility with NumPy. + * @param timeUnit return type of TimeStamp in units + * @param decimal128Columns name of the columns which are read as Decimal128 rather than Decimal64 + */ + private static native long createReader(long chunkReadLimit, long passReadLimit, + String[] filterColumnNames, long bufferAddrs, long length, + boolean usingNumPyTypes, int timeUnit, String[] decimal128Columns); + + /** + * Create a native chunked ORC reader object, similar to + * {@link ORCChunkedReader#createReader(long, long, String[], long, long, boolean, int, String[])}, + * with an additional parameter to control the granularity of the output table. + * + * @param outputRowSizingGranularity The change step in number of rows in the output table. + * @see ORCChunkedReader#createReader(long, long, String[], long, long, boolean, int, String[]) + */ + private static native long createReaderWithOutputGranularity( + long chunkReadLimit, long passReadLimit, long outputRowSizingGranularity, + String[] filterColumnNames, long bufferAddrs, long length, + boolean usingNumPyTypes, int timeUnit, String[] decimal128Columns); + + private static native boolean hasNext(long handle); + + private static native long[] readChunk(long handle); + + private static native void close(long handle); +} diff --git a/java/src/main/native/src/ChunkedReaderJni.cpp b/java/src/main/native/src/ChunkedReaderJni.cpp index 7681008f584..cf04a87262f 100644 --- a/java/src/main/native/src/ChunkedReaderJni.cpp +++ b/java/src/main/native/src/ChunkedReaderJni.cpp @@ -18,22 +18,22 @@ #include "jni_utils.hpp" #include +#include #include #include #include +#include #include -// This function is defined in `TableJni.cpp`. -jlongArray cudf::jni::convert_table_for_return( - JNIEnv* env, - std::unique_ptr&& table_result, - std::vector>&& extra_columns); - // This file is for the code related to chunked reader (Parquet, ORC, etc.). extern "C" { +// +// Chunked Parquet reader JNI +// + // This function should take all the parameters that `Table.readParquet` takes, // plus one more parameter `long chunkSizeByteLimit`. JNIEXPORT jlong JNICALL @@ -54,19 +54,17 @@ Java_ai_rapids_cudf_ParquetChunkedReader_create(JNIEnv* env, JNI_NULL_CHECK(env, inp_file_path, "Input file or buffer must be supplied", 0); read_buffer = false; } else if (inp_file_path != nullptr) { - JNI_THROW_NEW(env, - "java/lang/IllegalArgumentException", - "Cannot pass in both a buffer and an inp_file_path", - 0); + JNI_THROW_NEW( + env, cudf::jni::ILLEGAL_ARG_CLASS, "Cannot pass in both a buffer and an inp_file_path", 0); } else if (buffer_length <= 0) { - JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "An empty buffer is not supported", 0); + JNI_THROW_NEW(env, cudf::jni::ILLEGAL_ARG_CLASS, "An empty buffer is not supported", 0); } try { cudf::jni::auto_set_device(env); cudf::jni::native_jstring filename(env, inp_file_path); if (!read_buffer && filename.is_empty()) { - JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "inp_file_path cannot be empty", 0); + JNI_THROW_NEW(env, cudf::jni::ILLEGAL_ARG_CLASS, "inp_file_path cannot be empty", 0); } cudf::jni::native_jstringArray n_filter_col_names(env, filter_col_names); @@ -155,7 +153,7 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_readChunk( jclass, jlong handle) { - JNI_NULL_CHECK(env, handle, "handle is null", 0); + JNI_NULL_CHECK(env, handle, "handle is null", nullptr); try { cudf::jni::auto_set_device(env); @@ -163,7 +161,7 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_readChunk( auto chunk = reader_ptr->read_chunk(); return chunk.tbl ? cudf::jni::convert_table_for_return(env, chunk.tbl) : nullptr; } - CATCH_STD(env, 0); + CATCH_STD(env, nullptr); } JNIEXPORT void JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_close(JNIEnv* env, @@ -179,4 +177,151 @@ JNIEXPORT void JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_close(JNIEnv* en CATCH_STD(env, ); } +// +// Chunked ORC reader JNI +// + +namespace { +jlong create_chunked_orc_reader(JNIEnv* env, + jlong chunk_read_limit, + jlong pass_read_limit, + std::optional output_granularity, + jobjectArray filter_col_names, + jlong buffer, + jlong buffer_length, + jboolean using_numpy_Types, + jint unit, + jobjectArray dec128_col_names) +{ + JNI_NULL_CHECK(env, buffer, "buffer is null", 0); + if (buffer_length <= 0) { + JNI_THROW_NEW(env, cudf::jni::ILLEGAL_ARG_CLASS, "An empty buffer is not supported", 0); + } + + try { + cudf::jni::auto_set_device(env); + cudf::jni::native_jstringArray n_filter_col_names(env, filter_col_names); + cudf::jni::native_jstringArray n_dec128_col_names(env, dec128_col_names); + + auto const source = cudf::io::source_info(reinterpret_cast(buffer), + static_cast(buffer_length)); + auto opts_builder = cudf::io::orc_reader_options::builder(source); + if (n_filter_col_names.size() > 0) { + opts_builder = opts_builder.columns(n_filter_col_names.as_cpp_vector()); + } + auto const read_opts = opts_builder.use_index(false) + .use_np_dtypes(static_cast(using_numpy_Types)) + .timestamp_type(cudf::data_type(static_cast(unit))) + .decimal128_columns(n_dec128_col_names.as_cpp_vector()) + .build(); + + if (output_granularity) { + return reinterpret_cast( + new cudf::io::chunked_orc_reader(static_cast(chunk_read_limit), + static_cast(pass_read_limit), + static_cast(output_granularity.value()), + read_opts)); + } + return reinterpret_cast( + new cudf::io::chunked_orc_reader(static_cast(chunk_read_limit), + static_cast(pass_read_limit), + read_opts)); + } + CATCH_STD(env, 0); +} +} // namespace + +// This function should take all the parameters that `Table.readORC` takes, +// plus two more parameters: `chunk_read_limit` and `pass_read_limit`. +JNIEXPORT jlong JNICALL +Java_ai_rapids_cudf_ORCChunkedReader_createReader(JNIEnv* env, + jclass, + jlong chunk_read_limit, + jlong pass_read_limit, + jobjectArray filter_col_names, + jlong buffer, + jlong buffer_length, + jboolean using_numpy_Types, + jint unit, + jobjectArray dec128_col_names) +{ + return create_chunked_orc_reader(env, + chunk_read_limit, + pass_read_limit, + std::nullopt, + filter_col_names, + buffer, + buffer_length, + using_numpy_Types, + unit, + dec128_col_names); +} + +// This function should take all the parameters that `Table.readORC` takes, +// plus three more parameters: `chunk_read_limit`, `pass_read_limit`, `output_granularity`. +JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_ORCChunkedReader_createReaderWithOutputGranularity( + JNIEnv* env, + jclass, + jlong chunk_read_limit, + jlong pass_read_limit, + jlong output_granularity, + jobjectArray filter_col_names, + jlong buffer, + jlong buffer_length, + jboolean using_numpy_Types, + jint unit, + jobjectArray dec128_col_names) +{ + return create_chunked_orc_reader(env, + chunk_read_limit, + pass_read_limit, + output_granularity, + filter_col_names, + buffer, + buffer_length, + using_numpy_Types, + unit, + dec128_col_names); +} + +JNIEXPORT jboolean JNICALL Java_ai_rapids_cudf_ORCChunkedReader_hasNext(JNIEnv* env, + jclass, + jlong handle) +{ + JNI_NULL_CHECK(env, handle, "handle is null", false); + + try { + cudf::jni::auto_set_device(env); + auto const reader_ptr = reinterpret_cast(handle); + return reader_ptr->has_next(); + } + CATCH_STD(env, false); +} + +JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_ORCChunkedReader_readChunk(JNIEnv* env, + jclass, + jlong handle) +{ + JNI_NULL_CHECK(env, handle, "handle is null", nullptr); + + try { + cudf::jni::auto_set_device(env); + auto const reader_ptr = reinterpret_cast(handle); + auto chunk = reader_ptr->read_chunk(); + return chunk.tbl ? cudf::jni::convert_table_for_return(env, chunk.tbl) : nullptr; + } + CATCH_STD(env, nullptr); +} + +JNIEXPORT void JNICALL Java_ai_rapids_cudf_ORCChunkedReader_close(JNIEnv* env, jclass, jlong handle) +{ + JNI_NULL_CHECK(env, handle, "handle is null", ); + + try { + cudf::jni::auto_set_device(env); + delete reinterpret_cast(handle); + } + CATCH_STD(env, ); +} + } // extern "C" diff --git a/java/src/test/java/ai/rapids/cudf/TableTest.java b/java/src/test/java/ai/rapids/cudf/TableTest.java index 8560a9caad7..dc6eb55fc6a 100644 --- a/java/src/test/java/ai/rapids/cudf/TableTest.java +++ b/java/src/test/java/ai/rapids/cudf/TableTest.java @@ -81,6 +81,7 @@ public class TableTest extends CudfTestBase { private static final File TEST_PARQUET_FILE_CHUNKED_READ = TestUtils.getResourceAsFile("splittable.parquet"); private static final File TEST_PARQUET_FILE_BINARY = TestUtils.getResourceAsFile("binary.parquet"); private static final File TEST_ORC_FILE = TestUtils.getResourceAsFile("TestOrcFile.orc"); + private static final File TEST_ORC_FILE_CHUNKED_READ = TestUtils.getResourceAsFile("splittable.orc"); private static final File TEST_ORC_TIMESTAMP_DATE_FILE = TestUtils.getResourceAsFile("timestamp-date-test.orc"); private static final File TEST_DECIMAL_PARQUET_FILE = TestUtils.getResourceAsFile("decimal.parquet"); private static final File TEST_ALL_TYPES_PLAIN_AVRO_FILE = TestUtils.getResourceAsFile("alltypes_plain.avro"); @@ -1699,6 +1700,29 @@ void testReadORCTimeUnit() { } } + @Test + void testORCChunkedReader() throws IOException { + byte[] buffer = Files.readAllBytes(TEST_ORC_FILE_CHUNKED_READ.toPath()); + long len = buffer.length; + + try (HostMemoryBuffer hostBuf = hostMemoryAllocator.allocate(len)) { + hostBuf.setBytes(0, buffer, 0, len); + try (ORCChunkedReader reader = new ORCChunkedReader(0, 2 * 1024 * 1024, 10000, + ORCOptions.DEFAULT, hostBuf, 0, len)) { + int numChunks = 0; + long totalRows = 0; + while (reader.hasNext()) { + ++numChunks; + try (Table chunk = reader.readChunk()) { + totalRows += chunk.getRowCount(); + } + } + assertEquals(10, numChunks); + assertEquals(1000000, totalRows); + } + } + } + @Test void testCrossJoin() { try (Table leftTable = new Table.TestBuilder() diff --git a/java/src/test/resources/splittable.orc b/java/src/test/resources/splittable.orc new file mode 100644 index 00000000000..1f5e094534f Binary files /dev/null and b/java/src/test/resources/splittable.orc differ