Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement JNI for chunked Parquet reader #11961

Merged
merged 211 commits into from
Nov 18, 2022
Merged
Show file tree
Hide file tree
Changes from 208 commits
Commits
Show all changes
211 commits
Select commit Hold shift + click to select a range
c2e8e87
Fix an issue where using num_rows and skip_rows on a parquet file con…
nvdbaranec Sep 23, 2022
f330431
Merge branch 'branch-22.12' into reader_preprocess_fix_and_opt
nvdbaranec Sep 23, 2022
eadfd63
Fixed an issue with the tests: input columns cannot have unsanitary …
nvdbaranec Sep 27, 2022
c4de038
Merge branch 'branch-22.12' into reader_preprocess_fix_and_opt
nvdbaranec Sep 27, 2022
222c9fe
Copy `parquet_reader_*` into `chunked_parquet_reader_*`
ttnghia Sep 30, 2022
f49cfed
Modify `chunked_parquet_reader_options`
ttnghia Sep 30, 2022
dd39804
Exploit inheritance to extend the options and options_builder classes
ttnghia Oct 4, 2022
81bc68f
Remove unnecessary variable
ttnghia Oct 5, 2022
f8126be
Misc
ttnghia Oct 5, 2022
0e7692c
Add docs
ttnghia Oct 5, 2022
9f9eeb0
PR feedback changes.
nvdbaranec Oct 5, 2022
9b3ea62
Merge branch 'branch-22.12' into reader_preprocess_fix_and_opt
nvdbaranec Oct 5, 2022
d2e409a
Fixed some compile errors from merging.
nvdbaranec Oct 5, 2022
ed41ac1
Add `chunked_parquet_reader`
ttnghia Oct 5, 2022
be782f2
Add empty implementation
ttnghia Oct 5, 2022
7908b66
Merge branch 'branch-22.12' into parquet_reader
ttnghia Oct 5, 2022
a7175c8
Add a destructor and `close`
ttnghia Oct 5, 2022
63a7bd6
Update docs
ttnghia Oct 6, 2022
16c12d9
Fix comment
ttnghia Oct 6, 2022
cd85385
Construct `chunked_parquet_reader`
ttnghia Oct 6, 2022
5944beb
Add comment
ttnghia Oct 6, 2022
7cfa72a
Rename function and implementing
ttnghia Oct 7, 2022
4696bd3
MISC
ttnghia Oct 7, 2022
99dc786
Bare bones implementation. Many types still not working.
nvdbaranec Oct 10, 2022
ad9c399
Merge branch 'branch-22.12' into parquet_reader
ttnghia Oct 11, 2022
ecf225d
Merge branch 'chunked_reader_gpu' into parquet_reader
ttnghia Oct 11, 2022
583a7ef
Add test
ttnghia Oct 11, 2022
b250d6f
Cleanup
ttnghia Oct 11, 2022
e7a9e3e
Modify docs
ttnghia Oct 11, 2022
811354a
Cleanup
ttnghia Oct 11, 2022
12ba72e
Add TODO
ttnghia Oct 11, 2022
45668ff
Add `read_intermediate_data`
ttnghia Oct 11, 2022
1bb8254
Use `read_intermediate_data`
ttnghia Oct 11, 2022
b1c44dd
Merge branch 'branch-22.12' into parquet_reader
ttnghia Oct 12, 2022
56715ef
Fix bug
ttnghia Oct 12, 2022
a7e7e93
Simplify code
ttnghia Oct 12, 2022
8fe87b1
Implement `file_intermediate_data`
ttnghia Oct 12, 2022
464f4f9
Add `make_output`
ttnghia Oct 12, 2022
56756d6
Implement `read_chunk`
ttnghia Oct 12, 2022
3044ac5
Cleanup
ttnghia Oct 12, 2022
ffb8a19
Fix bug when `skip_rows` and `num_rows` are modified inside a called …
ttnghia Oct 12, 2022
baf3603
Fix comment
ttnghia Oct 13, 2022
8bdab44
Store preprocess data
ttnghia Oct 13, 2022
ec4abfb
Implement `chunked_reader` detail class
ttnghia Oct 13, 2022
cb1dea4
Refactoring
ttnghia Oct 13, 2022
a8dfd82
Rename structs
ttnghia Oct 13, 2022
7889e5a
Increment `current_read_chunk`
ttnghia Oct 13, 2022
63a6511
Call preprocessing in `read_chunk`
ttnghia Oct 13, 2022
c1269d1
Fix `has_next`
ttnghia Oct 14, 2022
95e6c1d
Refactoring
ttnghia Oct 14, 2022
bd7b510
Fix errors
ttnghia Oct 14, 2022
eb78526
Merge branch 'branch-22.12' into chunked_reader_gpu. Also: work to…
nvdbaranec Oct 17, 2022
66aeaf4
Change param
ttnghia Oct 18, 2022
1d700e3
Rename variables
ttnghia Oct 18, 2022
4af948b
Remove intermediate variables
ttnghia Oct 18, 2022
28cfc6f
Modify tests
ttnghia Oct 18, 2022
8135ed5
First pass of string support.
nvdbaranec Oct 18, 2022
fbeabfc
Fix bug
ttnghia Oct 18, 2022
df074e0
Remove debug print
ttnghia Oct 18, 2022
5653090
Merge branch 'chunked_reader_gpu' into parquet_reader
ttnghia Oct 18, 2022
3071fd7
Merge branch 'branch-22.12' into parquet_reader
ttnghia Oct 18, 2022
631eff1
Fix tests
ttnghia Oct 18, 2022
d1b4e4c
Fix chunk size limit
ttnghia Oct 18, 2022
3f2f8a4
Turn back to do preprocess once
ttnghia Oct 18, 2022
974e7ef
The read limit parameter is now no longer const but truely runtime pa…
ttnghia Oct 19, 2022
0be096b
Add new test file
ttnghia Oct 19, 2022
f7018fe
Reverse `parquet_test.cpp`
ttnghia Oct 19, 2022
81097eb
Modify `read` to add exception and preprocess once
ttnghia Oct 19, 2022
fcffac8
Rewrite tests
ttnghia Oct 19, 2022
43dd802
Store `decomp_page_data`
ttnghia Oct 19, 2022
eeec023
Rewrite tests
ttnghia Oct 19, 2022
14dfd3f
Simple test
ttnghia Oct 19, 2022
66e9f09
Store `raw_page_data`
ttnghia Oct 19, 2022
669b8cf
Cleanup test
ttnghia Oct 19, 2022
001c6c7
Fix empty output
ttnghia Oct 19, 2022
f50603a
Add `preprocess_file_and_columns`
ttnghia Oct 19, 2022
66976aa
Misc
ttnghia Oct 19, 2022
0b0040a
Fixed some incorrect logic in preprocess tep.
nvdbaranec Oct 19, 2022
467de78
Removed debug stuff. Added some comments.
nvdbaranec Oct 19, 2022
d68bf80
Merge branch 'chunked_reader_gpu' into parquet_reader
ttnghia Oct 20, 2022
1888aa3
Merge branch 'branch-22.12' into parquet_reader
ttnghia Oct 20, 2022
5861747
Change function
ttnghia Oct 20, 2022
721c052
Disable debug printing
ttnghia Oct 20, 2022
dbdfc74
Implement `ParquetChunkedReader`
ttnghia Oct 20, 2022
89e8ce7
Implement C++ JNI for `ParquetChunkedReader`
ttnghia Oct 20, 2022
50e1a81
Fix off-by-one memory access bug
ttnghia Oct 20, 2022
5adb11a
Cleanup
ttnghia Oct 20, 2022
7cda8c2
Fixed an issue with non-first reads in the chunked reader. Made an a…
nvdbaranec Oct 20, 2022
6bc073d
Merge branch 'chunked_reader_gpu' into parquet_reader
ttnghia Oct 20, 2022
2aef5cc
Fix off-by-one bug
ttnghia Oct 20, 2022
89ca1a6
Merge branch 'branch-22.12' into parquet_reader
ttnghia Oct 20, 2022
a3008c9
Merge branch 'parquet_reader' into jni_parquet_reader
ttnghia Oct 20, 2022
5245b9b
Fix an issue related to aliased output pointers in the chunked read c…
nvdbaranec Oct 20, 2022
89336c2
Merge branch 'parquet_reader' into jni_parquet_reader
ttnghia Oct 21, 2022
1be7f91
Do not keep reference---copy object instead
ttnghia Oct 21, 2022
62283c7
Do not keep reference---copy object instead
ttnghia Oct 21, 2022
44424a2
Optimization: don't do any decoding or page size computation for pag…
nvdbaranec Oct 24, 2022
95df356
Merge branch 'branch-22.12' into chunked_reader_gpu
nvdbaranec Oct 24, 2022
445db9b
Fix build issue for spark-rapids-jni
nvdbaranec Oct 24, 2022
b50b563
Cleanup: Remove `chunked_parquet_reader_options` and `chunked_parquet…
ttnghia Oct 24, 2022
a9f556a
Merge branch 'parquet_reader' into jni_parquet_reader
ttnghia Oct 24, 2022
db56908
Cleanup: Remove `chunked_parquet_reader_options` and `chunked_parquet…
ttnghia Oct 24, 2022
ebbab7f
Merge branch 'parquet_reader' into jni_parquet_reader
ttnghia Oct 24, 2022
daa3d7e
Fix compile errors
ttnghia Oct 24, 2022
bb2fbc0
Load native deps and make the constructors public
ttnghia Oct 24, 2022
ef5eaee
Move `preprocess_file` into `reader_preprocess.cu`
ttnghia Oct 24, 2022
d19260d
Move common implementation into `reader_impl_helpers.*`
ttnghia Oct 24, 2022
6569d62
Cleanup
ttnghia Oct 24, 2022
0a1e2c3
Merge branch 'branch-22.12' into parquet_reader
ttnghia Oct 25, 2022
7176381
Merge branch 'parquet_reader' into jni_parquet_reader
ttnghia Oct 25, 2022
c3e1d53
Cleanup
ttnghia Oct 25, 2022
04e1320
More cleanup
ttnghia Oct 25, 2022
395413d
Rewrite docs for `parquet.hpp` files
ttnghia Oct 25, 2022
e3e19e8
Extract functions for `reader` and `chunked_reader`
ttnghia Oct 25, 2022
52339da
Fix issues with string length computation.
nvdbaranec Oct 25, 2022
f7e8694
Merge branch 'nghia_parquet_reader' into chunked_reader_gpu
nvdbaranec Oct 25, 2022
83fa31a
Remove redundant changes
ttnghia Oct 25, 2022
02ccdec
Add simple structs test
ttnghia Oct 25, 2022
ee0ffad
Rewrite tests
ttnghia Oct 25, 2022
09a89e4
Merge branch 'chunked_reader_gpu' into parquet_reader
ttnghia Oct 25, 2022
034a5b7
Merge branch 'branch-22.12' into parquet_reader
ttnghia Oct 25, 2022
3d362bc
Merge branch 'parquet_reader' into jni_parquet_reader
ttnghia Oct 25, 2022
c149d64
Add lists test
ttnghia Oct 25, 2022
9335bb7
MISC
ttnghia Oct 25, 2022
3769fff
Cleanup comments
ttnghia Oct 25, 2022
dc9ef5c
Construct output table metadata just once
ttnghia Oct 25, 2022
0366b7a
Construct `_output_columns` just once
ttnghia Oct 26, 2022
ea2fe9c
Remove `options` member variable
ttnghia Oct 26, 2022
1804056
Make the chunked_read_limit a soft limit - if we can't find a split, …
nvdbaranec Oct 26, 2022
5671c36
Merge branch 'chunked_reader_gpu' into parquet_reader
ttnghia Oct 26, 2022
957e32d
Merge branch 'parquet_reader' into jni_parquet_reader
ttnghia Oct 26, 2022
3d6e13b
Add tests for structs of lists and lists of structs
ttnghia Oct 26, 2022
826c46f
Fixed an issue in split generation code causing indexing off the end …
nvdbaranec Oct 26, 2022
36c1972
Merge branch 'chunked_reader_gpu' into parquet_reader
ttnghia Oct 26, 2022
d32c602
Merge branch 'nghia_parquet_reader' into chunked_reader_gpu
nvdbaranec Oct 26, 2022
88ca034
Just reformat
ttnghia Oct 26, 2022
b046c07
Merge branch 'parquet_reader' into jni_parquet_reader
ttnghia Oct 26, 2022
d76ee06
Change variable names in tests
ttnghia Oct 26, 2022
cf7b786
Merge branch 'nghia_parquet_reader' into chunked_reader_gpu
nvdbaranec Oct 27, 2022
b806323
Optimization: store off global nesting sizes per page so that during…
nvdbaranec Oct 27, 2022
0c2178d
Adding doxygen, refactoring and cleaning up
ttnghia Oct 27, 2022
f759103
Merge branch 'branch-22.12' into parquet_reader
ttnghia Oct 27, 2022
c2bf7f5
Fixed issues with list, and validity size calculations.
nvdbaranec Oct 27, 2022
e7e74c5
More refactoring
ttnghia Oct 27, 2022
ee9edbc
Merge branch 'chunked_reader_gpu' into parquet_reader
ttnghia Oct 27, 2022
6fd3e90
Add more tests
ttnghia Oct 27, 2022
337982a
Merge branch 'parquet_reader' into jni_parquet_reader
ttnghia Oct 28, 2022
9dda980
Add test with empty data
ttnghia Oct 28, 2022
bb35f9f
Add tests
ttnghia Oct 28, 2022
59166cf
Rewrite null tests
ttnghia Oct 28, 2022
eb6e996
Add more extreme tests
ttnghia Oct 28, 2022
1c56794
Rewrite tests to generate input files just once
ttnghia Oct 28, 2022
4777419
Fix tests with structs of lists
ttnghia Oct 28, 2022
aedc37a
Handle nulls for more complex types
ttnghia Oct 28, 2022
c00eb3c
Fix another nulls handling bug for strings
ttnghia Oct 28, 2022
4d24f88
Simplify the null purging process
ttnghia Oct 28, 2022
0f0f5c0
Merge branch 'parquet_reader' into jni_parquet_reader
ttnghia Oct 29, 2022
b15bb39
Cleanup.
nvdbaranec Oct 31, 2022
7e38a56
Merge branch 'nghia_parquet_reader' into chunked_reader_gpu
nvdbaranec Oct 31, 2022
321815d
Fleshed out list-of-structs and struct-of-lists tests.
nvdbaranec Oct 31, 2022
8cf95e8
Docs and cleanup.
nvdbaranec Oct 31, 2022
5bb755e
Update Javadoc
ttnghia Oct 31, 2022
af35c4d
Update doxygen
ttnghia Oct 31, 2022
fb1bd73
Cleaning up
ttnghia Nov 2, 2022
842f9ea
Add doxygen
ttnghia Nov 2, 2022
34e3777
Clean up `reader_impl.hpp`
ttnghia Nov 2, 2022
40e463c
More cleanup
ttnghia Nov 2, 2022
7252b0d
Cleanup `allocate_nesting_info`
ttnghia Nov 2, 2022
696182c
Reformat
ttnghia Nov 2, 2022
4c353fd
Further cleanup
ttnghia Nov 2, 2022
31590cb
Rename `compute_chunk_read_info` into `preprocess_pages`
ttnghia Nov 3, 2022
8671ed6
Make `hasNext` return `true` at least once
ttnghia Nov 4, 2022
8ba10b8
Add doxygen for `handle` variable
ttnghia Nov 4, 2022
d91c690
Re-adding an optimization that somehow got nuked during a merge.
nvdbaranec Nov 10, 2022
c5e73ce
Optimization: store off global nesting sizes per page so that during…
ttnghia Nov 10, 2022
0b31b95
Merge branch 'branch-22.12' into parquet_reader
ttnghia Nov 10, 2022
c8e34ba
Merge branch 'chunked_reader_gpu' into parquet_reader
ttnghia Nov 10, 2022
450e6d5
Merge branch 'branch-22.12' into parquet_reader
ttnghia Nov 14, 2022
4fca0c0
Fix several warnings that show up in the spark-rapids-jni build.
nvdbaranec Nov 14, 2022
fd9280c
Merge branch 'chunked_reader_gpu' into parquet_reader
ttnghia Nov 14, 2022
cedcc07
Several changes from PR review.
nvdbaranec Nov 15, 2022
16f78b6
Merge branch 'chunked_reader_gpu' into parquet_reader
ttnghia Nov 15, 2022
73503bb
Fix typo
ttnghia Nov 15, 2022
e9905b8
Fix test
ttnghia Nov 15, 2022
4072a80
Address review comments
ttnghia Nov 15, 2022
95a97fb
Small optimization
ttnghia Nov 15, 2022
8390d4b
Implement `column_buffer::empty_like`
ttnghia Nov 15, 2022
b4b131f
Merge branch 'branch-22.12' into parquet_reader
ttnghia Nov 15, 2022
5035bf4
Optimize unit tests: Only call `cudf::concatenate` once
ttnghia Nov 15, 2022
912d86c
Remove redundant check
ttnghia Nov 15, 2022
bb2e26e
Fix `cudaLaunchKernel` error in `DecodePageData`
ttnghia Nov 15, 2022
36c7ec2
Add assertion to make sure not to decode/parse empty page array
ttnghia Nov 15, 2022
7203c67
Address some review comments
ttnghia Nov 16, 2022
520448b
Merge branch 'branch-22.12' into parquet_reader
ttnghia Nov 16, 2022
96eed8e
Fix `#endif`
ttnghia Nov 16, 2022
6697e3b
Merge branch 'branch-22.12' into parquet_reader
ttnghia Nov 16, 2022
70f4fde
PR review changes. Updated some incorrect/incomplete function docs.
nvdbaranec Nov 17, 2022
4547483
Made the logic in the row_total_size functor much more readable.
nvdbaranec Nov 17, 2022
fe7d6d1
Merge branch 'branch-22.12' into parquet_reader
ttnghia Nov 17, 2022
db21bc3
Fix the tests
ttnghia Nov 17, 2022
36043d8
Variable renaming for clarity.
nvdbaranec Nov 17, 2022
83f0703
Merge branch 'chunked_reader_gpu' into parquet_reader
ttnghia Nov 17, 2022
3499bda
Merge branch 'branch-22.12' into parquet_reader
ttnghia Nov 17, 2022
198d65c
Merge branch 'parquet_reader' into jni_parquet_reader
ttnghia Nov 17, 2022
6c6e769
Merge branch 'branch-22.12' into jni_parquet_reader
ttnghia Nov 18, 2022
52f3b4e
Check for null handle
ttnghia Nov 18, 2022
a3f467b
Add comments for unused variable
ttnghia Nov 18, 2022
8e84cfd
Fix comment
ttnghia Nov 18, 2022
a48c9a3
Change header include style
ttnghia Nov 18, 2022
3bbf5d0
Update java/src/main/java/ai/rapids/cudf/ParquetChunkedReader.java
ttnghia Nov 18, 2022
6591704
Check for null handle
ttnghia Nov 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 151 additions & 0 deletions java/src/main/java/ai/rapids/cudf/ParquetChunkedReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package ai.rapids.cudf;

import java.io.File;

/**
* Provide an interface for reading a Parquet file in an iterative manner.
*/
public class ParquetChunkedReader implements AutoCloseable {
static {
NativeDepsLoader.loadNativeDeps();
}

/**
* Construct the reader instance from a read limit and a file path.
*
* @param chunkSizeByteLimit Limit on total number of bytes to be returned per read,
* or 0 if there is no limit.
* @param filePath Full path of the input Parquet file to read.
*/
public ParquetChunkedReader(long chunkSizeByteLimit, File filePath) {
this(chunkSizeByteLimit, ParquetOptions.DEFAULT, filePath);
}

/**
* Construct the reader instance from a read limit, a ParquetOptions object, and a file path.
*
* @param chunkSizeByteLimit Limit on total number of bytes to be returned per read,
* or 0 if there is no limit.
* @param opts The options for Parquet reading.
* @param filePath Full path of the input Parquet file to read.
*/
public ParquetChunkedReader(long chunkSizeByteLimit, ParquetOptions opts, File filePath) {
handle = create(chunkSizeByteLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(),
filePath.getAbsolutePath(), 0, 0, opts.timeUnit().typeId.getNativeId());

if(handle == 0) {
throw new IllegalStateException("Cannot create native chunked Parquet reader object.");
}
}

/**
* Construct the reader instance from a read limit and a file already read in a memory buffer.
*
* @param chunkSizeByteLimit Limit on total number of bytes to be returned per read,
* or 0 if there is no limit.
* @param opts The options for Parquet reading.
* @param buffer Raw Parquet file content.
* @param offset The starting offset into buffer.
* @param len The number of bytes to parse the given buffer.
*/
public ParquetChunkedReader(long chunkSizeByteLimit, ParquetOptions opts, HostMemoryBuffer buffer,
long offset, long len) {
handle = create(chunkSizeByteLimit, opts.getIncludeColumnNames(), opts.getReadBinaryAsString(), null,
buffer.getAddress() + offset, len, opts.timeUnit().typeId.getNativeId());

if(handle == 0) {
throw new IllegalStateException("Cannot create native chunked Parquet reader object.");
}
}

/**
* Check if the given file has anything left to read.
*
* @return A boolean value indicating if there is more data to read from file.
*/
public boolean hasNext() {
if (firstCall) {
// This function needs to return true at least once, so an empty table
// (but having empty columns instead of no column) can be returned by readChunk()
// if the input file has no row.
firstCall = false;
return true;
}
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
return hasNext(handle);
}

/**
* Read a chunk of rows in the given Parquet file such that the returning data has total size
* does not exceed the given read limit. If the given file has no data, or all data has been read
* before by previous calls to this function, a null Table will be returned.
*
* @return A table of new rows reading from the given file.
*/
public Table readChunk() {
long[] columnPtrs = readChunk(handle);
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
if (columnPtrs == null) {
return null;
} else {
return new Table(columnPtrs);
}
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void close() {
if (handle != 0) {
close(handle);
handle = 0;
}
}


/**
* Auxiliary variable to help {@link #hasNext()} returning true at least once.
*/
private boolean firstCall = true;

/**
* Handle for memory address of the native Parquet chunked reader class.
*/
private long handle;


/**
* Create a native chunked Parquet reader object on heap and return its memory address.
*
* @param chunkSizeByteLimit Limit on total number of bytes to be returned per read,
* or 0 if there is no limit.
* @param filterColumnNames Name of the columns to read, or an empty array if we want to read all.
* @param binaryToString Whether to convert the corresponding column to String if it is binary.
* @param filePath Full path of the file to read, or given as null if reading from a buffer.
* @param bufferAddrs The address of a buffer to read from, or 0 if we are not using that buffer.
* @param length The length of the buffer to read from.
* @param timeUnit Return type of time unit for timestamps.
*/
private static native long create(long chunkSizeByteLimit, String[] filterColumnNames,
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
boolean[] binaryToString, String filePath, long bufferAddrs, long length, int timeUnit);

private static native boolean hasNext(long handle);

private static native long[] readChunk(long handle);

private static native void close(long handle);
}
1 change: 1 addition & 0 deletions java/src/main/native/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ add_library(
cudfjni
src/Aggregation128UtilsJni.cpp
src/AggregationJni.cpp
src/ChunkedReaderJni.cpp
src/CudfJni.cpp
src/CudaJni.cpp
src/ColumnVectorJni.cpp
Expand Down
125 changes: 125 additions & 0 deletions java/src/main/native/src/ChunkedReaderJni.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <memory>
#include <vector>

#include <cudf/column/column.hpp>
#include <cudf/io/parquet.hpp>
#include <cudf/table/table.hpp>

#include "../include/jni_utils.hpp"
ttnghia marked this conversation as resolved.
Show resolved Hide resolved

#include "cudf_jni_apis.hpp"

// This function is defined in `TableJni.cpp`.
jlongArray
cudf::jni::convert_table_for_return(JNIEnv *env, std::unique_ptr<cudf::table> &&table_result,
std::vector<std::unique_ptr<cudf::column>> &&extra_columns);

// This file is for the code releated to chunked reader (Parquet, ORC, etc.).

extern "C" {

// This function should take all the parameters that `Table.readParquet` takes,
// plus one more parameter `long chunkSizeByteLimit`.
JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_create(
JNIEnv *env, jclass, jlong chunk_read_limit, jobjectArray filter_col_names,
jbooleanArray j_col_binary_read, jstring inp_file_path, jlong buffer, jlong buffer_length,
jint unit) {
JNI_NULL_CHECK(env, j_col_binary_read, "Null col_binary_read", 0);
bool read_buffer = true;
if (buffer == 0) {
JNI_NULL_CHECK(env, inp_file_path, "Input file or buffer must be supplied", 0);
read_buffer = false;
} else if (inp_file_path != nullptr) {
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException",
"Cannot pass in both a buffer and an inp_file_path", 0);
} else if (buffer_length <= 0) {
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "An empty buffer is not supported", 0);
}

try {
cudf::jni::auto_set_device(env);
cudf::jni::native_jstring filename(env, inp_file_path);
if (!read_buffer && filename.is_empty()) {
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "inp_file_path cannot be empty", 0);
}

cudf::jni::native_jstringArray n_filter_col_names(env, filter_col_names);

// TODO: This variable is unused now, but we still don't know what to do with it yet.
// As such, it needs to stay here for a little more time before we decide to use it again,
// or remove it completely.
cudf::jni::native_jbooleanArray n_col_binary_read(env, j_col_binary_read);
(void)n_col_binary_read;

auto const source = read_buffer ?
cudf::io::source_info(reinterpret_cast<char *>(buffer),
static_cast<std::size_t>(buffer_length)) :
cudf::io::source_info(filename.get());

auto opts_builder = cudf::io::parquet_reader_options::builder(source);
if (n_filter_col_names.size() > 0) {
opts_builder = opts_builder.columns(n_filter_col_names.as_cpp_vector());
}
auto const read_opts = opts_builder.convert_strings_to_categories(false)
.timestamp_type(cudf::data_type(static_cast<cudf::type_id>(unit)))
.build();

return reinterpret_cast<jlong>(new cudf::io::chunked_parquet_reader(
static_cast<std::size_t>(chunk_read_limit), read_opts));
}
CATCH_STD(env, 0);
}

JNIEXPORT jboolean JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_hasNext(JNIEnv *env, jclass,
jlong handle) {
JNI_NULL_CHECK(env, handle, "handle is null", false);

try {
cudf::jni::auto_set_device(env);
auto const reader_ptr = reinterpret_cast<cudf::io::chunked_parquet_reader *const>(handle);
return reader_ptr->has_next();
}
CATCH_STD(env, false);
}

JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_readChunk(JNIEnv *env, jclass,
jlong handle) {
JNI_NULL_CHECK(env, handle, "handle is null", 0);

try {
cudf::jni::auto_set_device(env);
auto const reader_ptr = reinterpret_cast<cudf::io::chunked_parquet_reader *const>(handle);
auto chunk = reader_ptr->read_chunk();
return chunk.tbl ? cudf::jni::convert_table_for_return(env, chunk.tbl) : nullptr;
}
CATCH_STD(env, 0);
}

JNIEXPORT void JNICALL Java_ai_rapids_cudf_ParquetChunkedReader_close(JNIEnv *env, jclass,
jlong handle) {
JNI_NULL_CHECK(env, handle, "handle is null", );

try {
cudf::jni::auto_set_device(env);
delete reinterpret_cast<cudf::io::chunked_parquet_reader *>(handle);
}
CATCH_STD(env, );
}

} // extern "C"
18 changes: 18 additions & 0 deletions java/src/test/java/ai/rapids/cudf/TableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@

public class TableTest extends CudfTestBase {
private static final File TEST_PARQUET_FILE = TestUtils.getResourceAsFile("acq.parquet");
private static final File TEST_PARQUET_FILE_CHUNKED_READ = TestUtils.getResourceAsFile("splittable.parquet");
private static final File TEST_PARQUET_FILE_BINARY = TestUtils.getResourceAsFile("binary.parquet");
private static final File TEST_ORC_FILE = TestUtils.getResourceAsFile("TestOrcFile.orc");
private static final File TEST_ORC_TIMESTAMP_DATE_FILE = TestUtils.getResourceAsFile("timestamp-date-test.orc");
Expand Down Expand Up @@ -725,6 +726,23 @@ void testReadParquetContainsDecimalData() {
}
}

@Test
void testChunkedReadParquet() {
try (ParquetChunkedReader reader = new ParquetChunkedReader(240000,
TEST_PARQUET_FILE_CHUNKED_READ)) {
int numChunks = 0;
long totalRows = 0;
while(reader.hasNext()) {
++numChunks;
try(Table chunk = reader.readChunk()) {
totalRows += chunk.getRowCount();
}
}
assertEquals(2, numChunks);
assertEquals(40000, totalRows);
}
}

@Test
void testReadAvro() {
AvroOptions opts = AvroOptions.builder()
Expand Down
Binary file added java/src/test/resources/splittable.parquet
Binary file not shown.