Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Java bindings for Avro reader #10373

Merged
merged 10 commits into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions java/src/main/java/ai/rapids/cudf/AvroOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package ai.rapids.cudf;

/**
* Options for reading an Avro file
*/
public class AvroOptions extends ColumnFilterOptions {

public static AvroOptions DEFAULT = new AvroOptions(new Builder());

private AvroOptions(Builder builder) {
super(builder);
}

public static Builder builder() {
return new Builder();
}

public static class Builder extends ColumnFilterOptions.Builder<Builder> {
public AvroOptions build() {
return new AvroOptions(this);
}
}
}
87 changes: 87 additions & 0 deletions java/src/main/java/ai/rapids/cudf/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,17 @@ private static native long[] readJSON(String[] columnNames,
private static native long[] readParquet(String[] filterColumnNames, String filePath,
long address, long length, int timeUnit) throws CudfException;

/**
* Read in Avro formatted data.
* @param filterColumnNames name of the columns to read, or an empty array if we want to read
* all of them
* @param filePath the path of the file to read, or null if no path should be read.
* @param address the address of the buffer to read from or 0 if we should not.
* @param length the length of the buffer to read from.
*/
private static native long[] readAvro(String[] filterColumnNames, String filePath,
long address, long length) throws CudfException;

/**
* Setup everything to write parquet formatted data to a file.
* @param columnNames names that correspond to the table columns
Expand Down Expand Up @@ -1020,6 +1031,82 @@ public static Table readParquet(ParquetOptions opts, HostMemoryBuffer buffer,
null, buffer.getAddress() + offset, len, opts.timeUnit().typeId.getNativeId()));
}

/**
* Read an Avro file using the default AvroOptions.
* @param path the local file to read.
* @return the file parsed as a table on the GPU.
*/
public static Table readAvro(File path) {
return readAvro(AvroOptions.DEFAULT, path);
}

/**
* Read an Avro file.
* @param opts various Avro parsing options.
* @param path the local file to read.
* @return the file parsed as a table on the GPU.
*/
public static Table readAvro(AvroOptions opts, File path) {
return new Table(readAvro(opts.getIncludeColumnNames(),
path.getAbsolutePath(), 0, 0));
}

/**
* Read Avro formatted data.
* @param buffer raw Avro formatted bytes.
* @return the data parsed as a table on the GPU.
*/
public static Table readAvro(byte[] buffer) {
return readAvro(AvroOptions.DEFAULT, buffer, 0, buffer.length);
}

/**
* Read Avro formatted data.
* @param opts various Avro parsing options.
* @param buffer raw Avro formatted bytes.
* @return the data parsed as a table on the GPU.
*/
public static Table readAvro(AvroOptions opts, byte[] buffer) {
return readAvro(opts, buffer, 0, buffer.length);
}

/**
* Read Avro formatted data.
* @param opts various Avro parsing options.
* @param buffer raw Avro formatted bytes.
* @param offset the starting offset into buffer.
* @param len the number of bytes to parse.
* @return the data parsed as a table on the GPU.
*/
public static Table readAvro(AvroOptions opts, byte[] buffer, long offset, long len) {
assert offset >= 0 && offset < buffer.length;
assert len <= buffer.length - offset;
len = len > 0 ? len : buffer.length - offset;

try (HostMemoryBuffer newBuf = HostMemoryBuffer.allocate(len)) {
newBuf.setBytes(0, buffer, offset, len);
return readAvro(opts, newBuf, 0, len);
}
}

/**
* Read Avro formatted data.
* @param opts various Avro parsing options.
* @param buffer raw Avro formatted bytes.
* @param offset the starting offset into buffer.
* @param len the number of bytes to parse.
* @return the data parsed as a table on the GPU.
*/
public static Table readAvro(AvroOptions opts, HostMemoryBuffer buffer,
long offset, long len) {
firestarman marked this conversation as resolved.
Show resolved Hide resolved
assert offset >= 0 && offset < buffer.length;
assert len <= buffer.length - offset;
len = len > 0 ? len : buffer.length - offset;

return new Table(readAvro(opts.getIncludeColumnNames(),
null, buffer.getAddress() + offset, len));
}

/**
* Read a ORC file using the default ORCOptions.
* @param path the local file to read.
Expand Down
38 changes: 38 additions & 0 deletions java/src/main/native/src/TableJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <cudf/io/csv.hpp>
#include <cudf/io/data_sink.hpp>
#include <cudf/io/json.hpp>
#include <cudf/io/avro.hpp>
#include <cudf/io/orc.hpp>
#include <cudf/io/parquet.hpp>
#include <cudf/join.hpp>
Expand Down Expand Up @@ -1496,6 +1497,43 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readParquet(JNIEnv *env,
CATCH_STD(env, NULL);
}

JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readAvro(
JNIEnv *env, jclass, jobjectArray filter_col_names, jstring inputfilepath, jlong buffer, jlong buffer_length, jint unit) {
firestarman marked this conversation as resolved.
Show resolved Hide resolved

const bool read_buffer = (buffer != 0);
if (!read_buffer) {
JNI_NULL_CHECK(env, inputfilepath, "input file or buffer must be supplied", NULL);
} else if (inputfilepath != NULL) {
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException",
"cannot pass in both a buffer and an inputfilepath", NULL);
} else if (buffer_length <= 0) {
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "An empty buffer is not supported",
NULL);
}

try {
cudf::jni::auto_set_device(env);
cudf::jni::native_jstring filename(env, inputfilepath);
if (!read_buffer && filename.is_empty()) {
JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "inputfilepath can't be empty",
NULL);
}

cudf::jni::native_jstringArray n_filter_col_names(env, filter_col_names);

auto source = read_buffer ? cudf::io::source_info(reinterpret_cast<char *>(buffer),
static_cast<std::size_t>(buffer_length)) :
cudf::io::source_info(filename.get());

cudf::io::avro_reader_options opts =
cudf::io::avro_reader_options::builder(source)
.columns(n_filter_col_names.as_cpp_vector())
.build();
return convert_table_for_return(env, cudf::io::read_avro(opts).tbl);
}
CATCH_STD(env, NULL);
}

JNIEXPORT long JNICALL Java_ai_rapids_cudf_Table_writeParquetBufferBegin(
JNIEnv *env, jclass, jobjectArray j_col_names, jint j_num_children, jintArray j_children,
jbooleanArray j_col_nullability, jobjectArray j_metadata_keys, jobjectArray j_metadata_values,
Expand Down
60 changes: 60 additions & 0 deletions java/src/test/java/ai/rapids/cudf/TableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class TableTest extends CudfTestBase {
private static final File TEST_ORC_FILE = TestUtils.getResourceAsFile("TestOrcFile.orc");
private static final File TEST_ORC_TIMESTAMP_DATE_FILE = TestUtils.getResourceAsFile("timestamp-date-test.orc");
private static final File TEST_DECIMAL_PARQUET_FILE = TestUtils.getResourceAsFile("decimal.parquet");
private static final File TEST_ALL_TYPES_PLAIN_AVRO_FILE = TestUtils.getResourceAsFile("alltypes_plain.avro");
private static final File TEST_SIMPLE_CSV_FILE = TestUtils.getResourceAsFile("simple.csv");
private static final File TEST_SIMPLE_JSON_FILE = TestUtils.getResourceAsFile("people.json");

Expand Down Expand Up @@ -642,6 +643,65 @@ void testReadParquetContainsDecimalData() {
}
}

@Test
void testReadAvro() {
AvroOptions opts = AvroOptions.builder()
.includeColumn("bool_col")
.includeColumn("int_col")
.includeColumn("timestamp_col")
.build();

try (Table expected = new Table.TestBuilder()
.column(true, false, true, false, true, false, true, false)
.column(0, 1, 0, 1, 0, 1, 0, 1)
.column(1235865600000000L, 1235865660000000L, 1238544000000000L, 1238544060000000L,
1233446400000000L, 1233446460000000L, 1230768000000000L, 1230768060000000L)
.build();
Table table = Table.readAvro(opts, TEST_ALL_TYPES_PLAIN_AVRO_FILE)) {
assertTablesAreEqual(expected, table);
}
}

@Test
void testReadAvroBuffer() throws IOException{
AvroOptions opts = AvroOptions.builder()
.includeColumn("bool_col")
.includeColumn("timestamp_col")
.build();

byte[] buffer = Files.readAllBytes(TEST_ALL_TYPES_PLAIN_AVRO_FILE.toPath());
int bufferLen = buffer.length;
try (Table expected = new Table.TestBuilder()
.column(true, false, true, false, true, false, true, false)
.column(1235865600000000L, 1235865660000000L, 1238544000000000L, 1238544060000000L,
1233446400000000L, 1233446460000000L, 1230768000000000L, 1230768060000000L)
.build();
Table table = Table.readAvro(opts, buffer, 0, bufferLen)) {
assertTablesAreEqual(expected, table);
}
}

@Test
void testReadAvroFull() {
try (Table expected = new Table.TestBuilder()
.column(4, 5, 6, 7, 2, 3, 0, 1)
.column(true, false, true, false, true, false, true, false)
.column(0, 1, 0, 1, 0, 1, 0, 1)
.column(0, 1, 0, 1, 0, 1, 0, 1)
.column(0, 1, 0, 1, 0, 1, 0, 1)
.column(0L, 10L, 0L, 10L, 0L, 10L, 0L, 10L)
.column(0.0f, 1.100000023841858f, 0.0f, 1.100000023841858f, 0.0f, 1.100000023841858f, 0.0f, 1.100000023841858f)
.column(0.0d, 10.1d, 0.0d, 10.1d, 0.0d, 10.1d, 0.0d, 10.1d)
.column("03/01/09", "03/01/09", "04/01/09", "04/01/09", "02/01/09", "02/01/09", "01/01/09", "01/01/09")
.column("0", "1", "0", "1", "0", "1", "0", "1")
.column(1235865600000000L, 1235865660000000L, 1238544000000000L, 1238544060000000L,
1233446400000000L, 1233446460000000L, 1230768000000000L, 1230768060000000L)
.build();
Table table = Table.readAvro(TEST_ALL_TYPES_PLAIN_AVRO_FILE)) {
assertTablesAreEqual(expected, table);
}
}

@Test
void testReadORC() {
ORCOptions opts = ORCOptions.builder()
Expand Down
Binary file added java/src/test/resources/alltypes_plain.avro
Binary file not shown.