From 5207eff8a3eebee60db441a7c64c945d12f96d67 Mon Sep 17 00:00:00 2001 From: Remzi Yang <59198230+HaoYang670@users.noreply.github.com> Date: Tue, 8 Mar 2022 08:01:56 +0800 Subject: [PATCH] Support Java bindings for Avro reader (#10373) In this PR, we add Avro reader JNI and some reader options in cudf java. Re https://github.com/NVIDIA/spark-rapids/issues/4831 Authors: - Remzi Yang (https://github.com/HaoYang670) Approvers: - Bobby Wang (https://github.com/wbo4958) - Jason Lowe (https://github.com/jlowe) URL: https://github.com/rapidsai/cudf/pull/10373 --- .../main/java/ai/rapids/cudf/AvroOptions.java | 41 +++++++++ java/src/main/java/ai/rapids/cudf/Table.java | 87 ++++++++++++++++++ java/src/main/native/src/TableJni.cpp | 39 ++++++++ .../test/java/ai/rapids/cudf/TableTest.java | 60 ++++++++++++ java/src/test/resources/alltypes_plain.avro | Bin 0 -> 868 bytes 5 files changed, 227 insertions(+) create mode 100644 java/src/main/java/ai/rapids/cudf/AvroOptions.java create mode 100644 java/src/test/resources/alltypes_plain.avro diff --git a/java/src/main/java/ai/rapids/cudf/AvroOptions.java b/java/src/main/java/ai/rapids/cudf/AvroOptions.java new file mode 100644 index 00000000000..973f729ab5b --- /dev/null +++ b/java/src/main/java/ai/rapids/cudf/AvroOptions.java @@ -0,0 +1,41 @@ +/* + * + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package ai.rapids.cudf; + +/** + * Options for reading an Avro file + */ +public class AvroOptions extends ColumnFilterOptions { + + public static AvroOptions DEFAULT = new AvroOptions(new Builder()); + + private AvroOptions(Builder builder) { + super(builder); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder extends ColumnFilterOptions.Builder { + public AvroOptions build() { + return new AvroOptions(this); + } + } +} diff --git a/java/src/main/java/ai/rapids/cudf/Table.java b/java/src/main/java/ai/rapids/cudf/Table.java index 17e10933b65..ff966643866 100644 --- a/java/src/main/java/ai/rapids/cudf/Table.java +++ b/java/src/main/java/ai/rapids/cudf/Table.java @@ -251,6 +251,17 @@ private static native long[] readJSON(String[] columnNames, private static native long[] readParquet(String[] filterColumnNames, String filePath, long address, long length, int timeUnit) throws CudfException; + /** + * Read in Avro formatted data. + * @param filterColumnNames name of the columns to read, or an empty array if we want to read + * all of them + * @param filePath the path of the file to read, or null if no path should be read. + * @param address the address of the buffer to read from or 0 if we should not. + * @param length the length of the buffer to read from. + */ + private static native long[] readAvro(String[] filterColumnNames, String filePath, + long address, long length) throws CudfException; + /** * Setup everything to write parquet formatted data to a file. * @param columnNames names that correspond to the table columns @@ -1020,6 +1031,82 @@ public static Table readParquet(ParquetOptions opts, HostMemoryBuffer buffer, null, buffer.getAddress() + offset, len, opts.timeUnit().typeId.getNativeId())); } + /** + * Read an Avro file using the default AvroOptions. + * @param path the local file to read. + * @return the file parsed as a table on the GPU. + */ + public static Table readAvro(File path) { + return readAvro(AvroOptions.DEFAULT, path); + } + + /** + * Read an Avro file. + * @param opts various Avro parsing options. + * @param path the local file to read. + * @return the file parsed as a table on the GPU. + */ + public static Table readAvro(AvroOptions opts, File path) { + return new Table(readAvro(opts.getIncludeColumnNames(), + path.getAbsolutePath(), 0, 0)); + } + + /** + * Read Avro formatted data. + * @param buffer raw Avro formatted bytes. + * @return the data parsed as a table on the GPU. + */ + public static Table readAvro(byte[] buffer) { + return readAvro(AvroOptions.DEFAULT, buffer, 0, buffer.length); + } + + /** + * Read Avro formatted data. + * @param opts various Avro parsing options. + * @param buffer raw Avro formatted bytes. + * @return the data parsed as a table on the GPU. + */ + public static Table readAvro(AvroOptions opts, byte[] buffer) { + return readAvro(opts, buffer, 0, buffer.length); + } + + /** + * Read Avro formatted data. + * @param opts various Avro parsing options. + * @param buffer raw Avro formatted bytes. + * @param offset the starting offset into buffer. + * @param len the number of bytes to parse. + * @return the data parsed as a table on the GPU. + */ + public static Table readAvro(AvroOptions opts, byte[] buffer, long offset, long len) { + assert offset >= 0 && offset < buffer.length; + assert len <= buffer.length - offset; + len = len > 0 ? len : buffer.length - offset; + + try (HostMemoryBuffer newBuf = HostMemoryBuffer.allocate(len)) { + newBuf.setBytes(0, buffer, offset, len); + return readAvro(opts, newBuf, 0, len); + } + } + + /** + * Read Avro formatted data. + * @param opts various Avro parsing options. + * @param buffer raw Avro formatted bytes. + * @param offset the starting offset into buffer. + * @param len the number of bytes to parse. + * @return the data parsed as a table on the GPU. + */ + public static Table readAvro(AvroOptions opts, HostMemoryBuffer buffer, + long offset, long len) { + assert offset >= 0 && offset < buffer.length; + assert len <= buffer.length - offset; + len = len > 0 ? len : buffer.length - offset; + + return new Table(readAvro(opts.getIncludeColumnNames(), + null, buffer.getAddress() + offset, len)); + } + /** * Read a ORC file using the default ORCOptions. * @param path the local file to read. diff --git a/java/src/main/native/src/TableJni.cpp b/java/src/main/native/src/TableJni.cpp index 1cf56da35da..11609155ba3 100644 --- a/java/src/main/native/src/TableJni.cpp +++ b/java/src/main/native/src/TableJni.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -1496,6 +1497,44 @@ JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readParquet(JNIEnv *env, CATCH_STD(env, NULL); } +JNIEXPORT jlongArray JNICALL Java_ai_rapids_cudf_Table_readAvro(JNIEnv *env, jclass, + jobjectArray filter_col_names, + jstring inputfilepath, jlong buffer, + jlong buffer_length, jint unit) { + + const bool read_buffer = (buffer != 0); + if (!read_buffer) { + JNI_NULL_CHECK(env, inputfilepath, "input file or buffer must be supplied", NULL); + } else if (inputfilepath != NULL) { + JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", + "cannot pass in both a buffer and an inputfilepath", NULL); + } else if (buffer_length <= 0) { + JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "An empty buffer is not supported", + NULL); + } + + try { + cudf::jni::auto_set_device(env); + cudf::jni::native_jstring filename(env, inputfilepath); + if (!read_buffer && filename.is_empty()) { + JNI_THROW_NEW(env, "java/lang/IllegalArgumentException", "inputfilepath can't be empty", + NULL); + } + + cudf::jni::native_jstringArray n_filter_col_names(env, filter_col_names); + + auto source = read_buffer ? cudf::io::source_info(reinterpret_cast(buffer), + static_cast(buffer_length)) : + cudf::io::source_info(filename.get()); + + cudf::io::avro_reader_options opts = cudf::io::avro_reader_options::builder(source) + .columns(n_filter_col_names.as_cpp_vector()) + .build(); + return convert_table_for_return(env, cudf::io::read_avro(opts).tbl); + } + CATCH_STD(env, NULL); +} + JNIEXPORT long JNICALL Java_ai_rapids_cudf_Table_writeParquetBufferBegin( JNIEnv *env, jclass, jobjectArray j_col_names, jint j_num_children, jintArray j_children, jbooleanArray j_col_nullability, jobjectArray j_metadata_keys, jobjectArray j_metadata_values, diff --git a/java/src/test/java/ai/rapids/cudf/TableTest.java b/java/src/test/java/ai/rapids/cudf/TableTest.java index f309b1ee703..269c9d7eda1 100644 --- a/java/src/test/java/ai/rapids/cudf/TableTest.java +++ b/java/src/test/java/ai/rapids/cudf/TableTest.java @@ -80,6 +80,7 @@ public class TableTest extends CudfTestBase { private static final File TEST_ORC_FILE = TestUtils.getResourceAsFile("TestOrcFile.orc"); private static final File TEST_ORC_TIMESTAMP_DATE_FILE = TestUtils.getResourceAsFile("timestamp-date-test.orc"); private static final File TEST_DECIMAL_PARQUET_FILE = TestUtils.getResourceAsFile("decimal.parquet"); + private static final File TEST_ALL_TYPES_PLAIN_AVRO_FILE = TestUtils.getResourceAsFile("alltypes_plain.avro"); private static final File TEST_SIMPLE_CSV_FILE = TestUtils.getResourceAsFile("simple.csv"); private static final File TEST_SIMPLE_JSON_FILE = TestUtils.getResourceAsFile("people.json"); @@ -642,6 +643,65 @@ void testReadParquetContainsDecimalData() { } } + @Test + void testReadAvro() { + AvroOptions opts = AvroOptions.builder() + .includeColumn("bool_col") + .includeColumn("int_col") + .includeColumn("timestamp_col") + .build(); + + try (Table expected = new Table.TestBuilder() + .column(true, false, true, false, true, false, true, false) + .column(0, 1, 0, 1, 0, 1, 0, 1) + .column(1235865600000000L, 1235865660000000L, 1238544000000000L, 1238544060000000L, + 1233446400000000L, 1233446460000000L, 1230768000000000L, 1230768060000000L) + .build(); + Table table = Table.readAvro(opts, TEST_ALL_TYPES_PLAIN_AVRO_FILE)) { + assertTablesAreEqual(expected, table); + } + } + + @Test + void testReadAvroBuffer() throws IOException{ + AvroOptions opts = AvroOptions.builder() + .includeColumn("bool_col") + .includeColumn("timestamp_col") + .build(); + + byte[] buffer = Files.readAllBytes(TEST_ALL_TYPES_PLAIN_AVRO_FILE.toPath()); + int bufferLen = buffer.length; + try (Table expected = new Table.TestBuilder() + .column(true, false, true, false, true, false, true, false) + .column(1235865600000000L, 1235865660000000L, 1238544000000000L, 1238544060000000L, + 1233446400000000L, 1233446460000000L, 1230768000000000L, 1230768060000000L) + .build(); + Table table = Table.readAvro(opts, buffer, 0, bufferLen)) { + assertTablesAreEqual(expected, table); + } + } + + @Test + void testReadAvroFull() { + try (Table expected = new Table.TestBuilder() + .column(4, 5, 6, 7, 2, 3, 0, 1) + .column(true, false, true, false, true, false, true, false) + .column(0, 1, 0, 1, 0, 1, 0, 1) + .column(0, 1, 0, 1, 0, 1, 0, 1) + .column(0, 1, 0, 1, 0, 1, 0, 1) + .column(0L, 10L, 0L, 10L, 0L, 10L, 0L, 10L) + .column(0.0f, 1.100000023841858f, 0.0f, 1.100000023841858f, 0.0f, 1.100000023841858f, 0.0f, 1.100000023841858f) + .column(0.0d, 10.1d, 0.0d, 10.1d, 0.0d, 10.1d, 0.0d, 10.1d) + .column("03/01/09", "03/01/09", "04/01/09", "04/01/09", "02/01/09", "02/01/09", "01/01/09", "01/01/09") + .column("0", "1", "0", "1", "0", "1", "0", "1") + .column(1235865600000000L, 1235865660000000L, 1238544000000000L, 1238544060000000L, + 1233446400000000L, 1233446460000000L, 1230768000000000L, 1230768060000000L) + .build(); + Table table = Table.readAvro(TEST_ALL_TYPES_PLAIN_AVRO_FILE)) { + assertTablesAreEqual(expected, table); + } + } + @Test void testReadORC() { ORCOptions opts = ORCOptions.builder() diff --git a/java/src/test/resources/alltypes_plain.avro b/java/src/test/resources/alltypes_plain.avro new file mode 100644 index 0000000000000000000000000000000000000000..d60c628227a470bff6f83f357e2b7872fda881c2 GIT binary patch literal 868 zcmeZI%3@>_ODrqO*DFrWNX<>0!&$9VQdy9yWTjM;nw(#hqNJmgmzWFUm*f}tq?V=T z1i{49GE;L>ij}OQt6@qqLCPW8qm?rANRPs{|T^rU8f1#kq+&IYg@_T47RVI;Ps3{JeB*4o=I-PegYvSPYxul>E}9oK!T= zK?JcXPb^7|FD@y{%u7eplT=xfT8vE#X{t*yb5n~;5_1bsjfW=*sHb!C(=(G3b3!2L z1g=RpH#4~?zgVdj9_z8S2KhzldWi*zz{I6jT##6ltyh*>RGgWg$7QT%sAnVsPI}4t zDXGak#d(PZ1(gi<#4|ViPK@ufae9_CQ}DnlfjdkM%mNG?42%q6j0_n92FCgZhWZAU z3`_LR!obDAz`(@HBqPFb_DqjGgE1>3i;9Du0fUGkL&Kasw^$7rc(M{0 z7)l2$ literal 0 HcmV?d00001