Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[gpuCI] Forward-merge branch-22.12 to branch-23.02 [skip gpuci] #12204

Merged
merged 1 commit into from
Nov 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 155 additions & 0 deletions java/src/main/java/ai/rapids/cudf/ParquetChunkedReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package ai.rapids.cudf;

import java.io.File;

/**
* Provide an interface for reading a Parquet file in an iterative manner.
*/
public class ParquetChunkedReader implements AutoCloseable {
static {
NativeDepsLoader.loadNativeDeps();
}

/**
* Construct the reader instance from a read limit and a file path.
*
* @param chunkSizeByteLimit Limit on total number of bytes to be returned per read,
* or 0 if there is no limit.
* @param filePath Full path of the input Parquet file to read.
*/
public ParquetChunkedReader(long chunkSizeByteLimit, File filePath) {
this(chunkSizeByteLimit, ParquetOptions.DEFAULT, filePath);
}

/**
* Construct the reader instance from a read limit, a ParquetOptions object, and a file path.
*
* @param chunkSizeByteLimit Limit on total number of bytes to be returned per read,
* or 0 if there is no limit.
* @param opts The options for Parquet reading.
* @param filePath Full path of the input Parquet file to read.
*/
public ParquetChunkedReader(long chunkSizeByteLimit, ParquetOptions opts, File filePath) {
handle = create(chunkSizeByteLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(),
filePath.getAbsolutePath(), 0, 0, opts.timeUnit().typeId.getNativeId());

if(handle == 0) {
throw new IllegalStateException("Cannot create native chunked Parquet reader object.");
}
}

/**
* Construct the reader instance from a read limit and a file already read in a memory buffer.
*
* @param chunkSizeByteLimit Limit on total number of bytes to be returned per read,
* or 0 if there is no limit.
* @param opts The options for Parquet reading.
* @param buffer Raw Parquet file content.
* @param offset The starting offset into buffer.
* @param len The number of bytes to parse the given buffer.
*/
public ParquetChunkedReader(long chunkSizeByteLimit, ParquetOptions opts, HostMemoryBuffer buffer,
long offset, long len) {
handle = create(chunkSizeByteLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), null,
buffer.getAddress() + offset, len, opts.timeUnit().typeId.getNativeId());

if(handle == 0) {
throw new IllegalStateException("Cannot create native chunked Parquet reader object.");
}
}

/**
* Check if the given file has anything left to read.
*
* @return A boolean value indicating if there is more data to read from file.
*/
public boolean hasNext() {
if(handle == 0) {
throw new IllegalStateException("Native chunked Parquet reader object may have been closed.");
}

if (firstCall) {
// This function needs to return true at least once, so an empty table
// (but having empty columns instead of no column) can be returned by readChunk()
// if the input file has no row.
firstCall = false;
return true;
}
return hasNext(handle);
}

/**
* Read a chunk of rows in the given Parquet file such that the returning data has total size
* does not exceed the given read limit. If the given file has no data, or all data has been read
* before by previous calls to this function, a null Table will be returned.
*
* @return A table of new rows reading from the given file.
*/
public Table readChunk() {
if(handle == 0) {
throw new IllegalStateException("Native chunked Parquet reader object may have been closed.");
}

long[] columnPtrs = readChunk(handle);
return columnPtrs != null ? new Table(columnPtrs) : null;
}

@Override
public void close() {
if (handle != 0) {
close(handle);
handle = 0;
}
}


/**
* Auxiliary variable to help {@link #hasNext()} returning true at least once.
*/
private boolean firstCall = true;

/**
* Handle for memory address of the native Parquet chunked reader class.
*/
private long handle;


/**
* Create a native chunked Parquet reader object on heap and return its memory address.
*
* @param chunkSizeByteLimit Limit on total number of bytes to be returned per read,
* or 0 if there is no limit.
* @param filterColumnNames Name of the columns to read, or an empty array if we want to read all.
* @param binaryToString Whether to convert the corresponding column to String if it is binary.
* @param filePath Full path of the file to read, or given as null if reading from a buffer.
* @param bufferAddrs The address of a buffer to read from, or 0 if we are not using that buffer.
* @param length The length of the buffer to read from.
* @param timeUnit Return type of time unit for timestamps.
*/
private static native long create(long chunkSizeByteLimit, String[] filterColumnNames,
boolean[] binaryToString, String filePath, long bufferAddrs, long length, int timeUnit);

private static native boolean hasNext(long handle);

private static native long[] readChunk(long handle);

private static native void close(long handle);
}
1 change: 1 addition & 0 deletions java/src/main/native/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ add_library(
cudfjni
src/Aggregation128UtilsJni.cpp
src/AggregationJni.cpp
src/ChunkedReaderJni.cpp
src/CudfJni.cpp
src/CudaJni.cpp
src/ColumnVectorJni.cpp
Expand Down
124 changes: 124 additions & 0 deletions java/src/main/native/src/ChunkedReaderJni.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <memory>
#include <vector>

#include <cudf/column/column.hpp>
#include <cudf/io/parquet.hpp>
#include <cudf/table/table.hpp>

#include "cudf_jni_apis.hpp"
#include "jni_utils.hpp"

// This function is defined in `TableJni.cpp`.
jlongArray
cudf::jni::convert_table_for_return(JNIEnv *env, std::unique_ptr<cudf::table> &&table_result,
std::vector<std::unique_ptr<cudf::column>> &&extra_columns);

// This file is for the code releated to chunked reader (Parquet, ORC, etc.).

extern "C" {

// This function should take all the parameters that `Table.readParquet` takes,
// plus one more parameter `long chunkSizeByteLimit`.
JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_create(
JNIEnv *env, jclass, jlong chunk_read_limit, jobjectArray filter_col_names,
jbooleanArray j_col_binary_read, jstring inp_file_path, jlong buffer, jlong buffer_length,
jint unit) {
JNI_NULL_CHECK(env, j_col_binary_read, "Null col_binary_read", 0);
bool read_buffer = true;
if (buffer == 0) {
JNI_NULL_CHECK(env, inp_file_path, "Input file or buffer must be supplied", 0);
read_buffer = false;
} else if (inp_file_path != nullptr) {
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException",
"Cannot pass in both a buffer and an inp_file_path", 0);
} else if (buffer_length <= 0) {
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "An empty buffer is not supported", 0);
}

try {
cudf::jni::auto_set_device(env);
cudf::jni::native_jstring filename(env, inp_file_path);
if (!read_buffer && filename.is_empty()) {
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "inp_file_path cannot be empty", 0);
}

cudf::jni::native_jstringArray n_filter_col_names(env, filter_col_names);

// TODO: This variable is unused now, but we still don't know what to do with it yet.
// As such, it needs to stay here for a little more time before we decide to use it again,
// or remove it completely.
cudf::jni::native_jbooleanArray n_col_binary_read(env, j_col_binary_read);
(void)n_col_binary_read;

auto const source = read_buffer ?
cudf::io::source_info(reinterpret_cast<char *>(buffer),
static_cast<std::size_t>(buffer_length)) :
cudf::io::source_info(filename.get());

auto opts_builder = cudf::io::parquet_reader_options::builder(source);
if (n_filter_col_names.size() > 0) {
opts_builder = opts_builder.columns(n_filter_col_names.as_cpp_vector());
}
auto const read_opts = opts_builder.convert_strings_to_categories(false)
.timestamp_type(cudf::data_type(static_cast<cudf::type_id>(unit)))
.build();

return reinterpret_cast<jlong>(new cudf::io::chunked_parquet_reader(
static_cast<std::size_t>(chunk_read_limit), read_opts));
}
CATCH_STD(env, 0);
}

JNIEXPORT jboolean JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_hasNext(JNIEnv *env, jclass,
jlong handle) {
JNI_NULL_CHECK(env, handle, "handle is null", false);

try {
cudf::jni::auto_set_device(env);
auto const reader_ptr = reinterpret_cast<cudf::io::chunked_parquet_reader *const>(handle);
return reader_ptr->has_next();
}
CATCH_STD(env, false);
}

JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_readChunk(JNIEnv *env, jclass,
jlong handle) {
JNI_NULL_CHECK(env, handle, "handle is null", 0);

try {
cudf::jni::auto_set_device(env);
auto const reader_ptr = reinterpret_cast<cudf::io::chunked_parquet_reader *const>(handle);
auto chunk = reader_ptr->read_chunk();
return chunk.tbl ? cudf::jni::convert_table_for_return(env, chunk.tbl) : nullptr;
}
CATCH_STD(env, 0);
}

JNIEXPORT void JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_close(JNIEnv *env, jclass,
jlong handle) {
JNI_NULL_CHECK(env, handle, "handle is null", );

try {
cudf::jni::auto_set_device(env);
delete reinterpret_cast<cudf::io::chunked_parquet_reader *>(handle);
}
CATCH_STD(env, );
}

} // extern "C"
18 changes: 18 additions & 0 deletions java/src/test/java/ai/rapids/cudf/TableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@

public class TableTest extends CudfTestBase {
private static final File TEST_PARQUET_FILE = TestUtils.getResourceAsFile("acq.parquet");
private static final File TEST_PARQUET_FILE_CHUNKED_READ = TestUtils.getResourceAsFile("splittable.parquet");
private static final File TEST_PARQUET_FILE_BINARY = TestUtils.getResourceAsFile("binary.parquet");
private static final File TEST_ORC_FILE = TestUtils.getResourceAsFile("TestOrcFile.orc");
private static final File TEST_ORC_TIMESTAMP_DATE_FILE = TestUtils.getResourceAsFile("timestamp-date-test.orc");
Expand Down Expand Up @@ -725,6 +726,23 @@ void testReadParquetContainsDecimalData() {
}
}

@Test
void testChunkedReadParquet() {
try (ParquetChunkedReader reader = new ParquetChunkedReader(240000,
TEST_PARQUET_FILE_CHUNKED_READ)) {
int numChunks = 0;
long totalRows = 0;
while(reader.hasNext()) {
++numChunks;
try(Table chunk = reader.readChunk()) {
totalRows += chunk.getRowCount();
}
}
assertEquals(2, numChunks);
assertEquals(40000, totalRows);
}
}

@Test
void testReadAvro() {
AvroOptions opts = AvroOptions.builder()
Expand Down
Binary file added java/src/test/resources/splittable.parquet
Binary file not shown.