From c1ed6c81ec4bcf29d50d9618fc838ab83dcaf495 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Tue, 12 Apr 2022 19:56:02 +0800 Subject: [PATCH 1/3] Java JNI to support Parquet field id Signed-off-by: Chong Gao --- .../ai/rapids/cudf/ColumnWriterOptions.java | 100 ++++++++++++++++-- .../CompressionMetadataWriterOptions.java | 5 + java/src/main/java/ai/rapids/cudf/Table.java | 9 +- java/src/main/native/src/TableJni.cpp | 31 ++++-- 4 files changed, 129 insertions(+), 16 deletions(-) diff --git a/java/src/main/java/ai/rapids/cudf/ColumnWriterOptions.java b/java/src/main/java/ai/rapids/cudf/ColumnWriterOptions.java index 78b3d5d52ec..b68c6631b5d 100644 --- a/java/src/main/java/ai/rapids/cudf/ColumnWriterOptions.java +++ b/java/src/main/java/ai/rapids/cudf/ColumnWriterOptions.java @@ -33,9 +33,13 @@ public class ColumnWriterOptions { private boolean isNullable; private boolean isMap = false; private String columnName; + // only for Parquet + private int parquetFieldId; + private ColumnWriterOptions(AbstractStructBuilder builder) { this.columnName = builder.name; this.isNullable = builder.isNullable; + this.parquetFieldId = builder.parquetFieldId; this.childColumnOptions = (ColumnWriterOptions[]) builder.children.toArray(new ColumnWriterOptions[0]); } @@ -67,6 +71,10 @@ public AbstractStructBuilder(String name, boolean isNullable) { super(name, isNullable); } + public AbstractStructBuilder(String name, boolean isNullable, int parquetFieldId) { + super(name, isNullable, parquetFieldId); + } + protected AbstractStructBuilder() { super(); } @@ -84,6 +92,8 @@ public static abstract class NestedBuilder children = new ArrayList<>(); protected boolean isNullable = true; protected String name = ""; + // Parquet structure needs + protected int parquetFieldId; /** * Builder specific to build a Struct meta @@ -93,22 +103,42 @@ protected NestedBuilder(String name, boolean isNullable) { this.isNullable = isNullable; } + protected NestedBuilder(String name, boolean isNullable, int parquetFieldId) { + this.name = name; + this.isNullable = isNullable; + this.parquetFieldId = parquetFieldId; + } + protected NestedBuilder() {} protected ColumnWriterOptions withColumns(String name, boolean isNullable) { return new ColumnWriterOptions(name, isNullable); } + protected ColumnWriterOptions withColumns(String name, boolean isNullable, int parquetFieldId) { + return new ColumnWriterOptions(name, isNullable, parquetFieldId); + } + protected ColumnWriterOptions withDecimal(String name, int precision, boolean isNullable) { return new ColumnWriterOptions(name, false, precision, isNullable); } + protected ColumnWriterOptions withDecimal(String name, int precision, + boolean isNullable, int parquetFieldId) { + return new ColumnWriterOptions(name, false, precision, isNullable, parquetFieldId); + } + protected ColumnWriterOptions withTimestamp(String name, boolean isInt96, boolean isNullable) { return new ColumnWriterOptions(name, isInt96, UNKNOWN_PRECISION, isNullable); } + protected ColumnWriterOptions withTimestamp(String name, boolean isInt96, + boolean isNullable, int parquetFieldId) { + return new ColumnWriterOptions(name, isInt96, UNKNOWN_PRECISION, isNullable, parquetFieldId); + } + /** * Set the list column meta. * Lists should have only one child in ColumnVector, but the metadata expects a @@ -155,16 +185,16 @@ public T withStructColumn(StructColumnWriterOptions child) { /** * Set column name */ - public T withNonNullableColumns(String... name) { - withColumns(false, name); + public T withNonNullableColumns(String... names) { + withColumns(false, names); return (T) this; } /** * Set nullable column meta data */ - public T withNullableColumns(String... name) { - withColumns(true, name); + public T withNullableColumns(String... names) { + withColumns(true, names); return (T) this; } @@ -172,13 +202,22 @@ public T withNullableColumns(String... name) { * Set a simple child meta data * @return this for chaining. */ - public T withColumns(boolean nullable, String... name) { - for (String n : name) { + public T withColumns(boolean nullable, String... names) { + for (String n : names) { children.add(withColumns(n, nullable)); } return (T) this; } + /** + * Set a simple child meta data + * @return this for chaining. + */ + public T withColumns(boolean nullable, String name, int parquetFieldId) { + children.add(withColumns(name, nullable, parquetFieldId)); + return (T) this; + } + /** * Set a Decimal child meta data * @return this for chaining. @@ -188,6 +227,15 @@ public T withDecimalColumn(String name, int precision, boolean nullable) { return (T) this; } + /** + * Set a Decimal child meta data + * @return this for chaining. + */ + public T withDecimalColumn(String name, int precision, boolean nullable, int parquetFieldId) { + children.add(withDecimal(name, precision, nullable, parquetFieldId)); + return (T) this; + } + /** * Set a Decimal child meta data * @return this for chaining. @@ -206,6 +254,15 @@ public T withDecimalColumn(String name, int precision) { return (T) this; } + /** + * Set a timestamp child meta data + * @return this for chaining. + */ + public T withTimestampColumn(String name, boolean isInt96, boolean nullable, int parquetFieldId) { + children.add(withTimestamp(name, isInt96, nullable, parquetFieldId)); + return (T) this; + } + /** * Set a timestamp child meta data * @return this for chaining. @@ -244,6 +301,12 @@ public ColumnWriterOptions(String columnName, boolean isTimestampTypeInt96, this.columnName = columnName; } + public ColumnWriterOptions(String columnName, boolean isTimestampTypeInt96, + int precision, boolean isNullable, int parquetFieldId) { + this(columnName, isTimestampTypeInt96, precision, isNullable); + this.parquetFieldId = parquetFieldId; + } + public ColumnWriterOptions(String columnName, boolean isNullable) { this.isTimestampTypeInt96 = false; this.precision = UNKNOWN_PRECISION; @@ -251,6 +314,11 @@ public ColumnWriterOptions(String columnName, boolean isNullable) { this.columnName = columnName; } + public ColumnWriterOptions(String columnName, boolean isNullable, int parquetFieldId) { + this(columnName, isNullable); + this.parquetFieldId = parquetFieldId; + } + public ColumnWriterOptions(String columnName) { this(columnName, true); } @@ -302,6 +370,15 @@ int[] getFlatPrecision() { } } + int[] getFlatParquetFieldId() { + int[] ret = {parquetFieldId}; + if (childColumnOptions.length > 0) { + return getFlatInts(ret, (opt) -> opt.getFlatParquetFieldId()); + } else { + return ret; + } + } + boolean[] getFlatIsNullable() { boolean[] ret = {isNullable}; if (childColumnOptions.length > 0) { @@ -418,6 +495,13 @@ public static StructBuilder structBuilder(String name, boolean isNullable) { return new StructBuilder(name, isNullable); } + /** + * Creates a StructBuilder for column called 'name' + */ + public static StructBuilder structBuilder(String name, boolean isNullable, int parquetFieldId) { + return new StructBuilder(name, isNullable, parquetFieldId); + } + /** * Creates a StructBuilder for column called 'name' */ @@ -477,6 +561,10 @@ public StructBuilder(String name, boolean isNullable) { super(name, isNullable); } + public StructBuilder(String name, boolean isNullable, int parquetFieldId) { + super(name, isNullable, parquetFieldId); + } + public StructColumnWriterOptions build() { return new StructColumnWriterOptions(this); } diff --git a/java/src/main/java/ai/rapids/cudf/CompressionMetadataWriterOptions.java b/java/src/main/java/ai/rapids/cudf/CompressionMetadataWriterOptions.java index 9292975d0ce..c1bf02e74b0 100644 --- a/java/src/main/java/ai/rapids/cudf/CompressionMetadataWriterOptions.java +++ b/java/src/main/java/ai/rapids/cudf/CompressionMetadataWriterOptions.java @@ -41,6 +41,11 @@ int[] getFlatPrecision() { return super.getFlatInts(new int[]{}, (opt) -> opt.getFlatPrecision()); } + @Override + int[] getFlatParquetFieldId() { + return super.getFlatInts(new int[]{}, (opt) -> opt.getFlatParquetFieldId()); + } + @Override int[] getFlatNumChildren() { return super.getFlatInts(new int[]{}, (opt) -> opt.getFlatNumChildren()); diff --git a/java/src/main/java/ai/rapids/cudf/Table.java b/java/src/main/java/ai/rapids/cudf/Table.java index ff966643866..8cc1251d4d6 100644 --- a/java/src/main/java/ai/rapids/cudf/Table.java +++ b/java/src/main/java/ai/rapids/cudf/Table.java @@ -289,7 +289,9 @@ private static native long writeParquetFileBegin(String[] columnNames, int statsFreq, boolean[] isInt96, int[] precisions, - boolean[] isMapValues, String filename) throws CudfException; + boolean[] isMapValues, + int[] parquetFieldIds, + String filename) throws CudfException; /** * Setup everything to write parquet formatted data to a buffer. @@ -319,6 +321,7 @@ private static native long writeParquetBufferBegin(String[] columnNames, boolean[] isInt96, int[] precisions, boolean[] isMapValues, + int[] parquetFieldIds, HostBufferConsumer consumer) throws CudfException; /** @@ -1201,6 +1204,7 @@ private ParquetTableWriter(ParquetWriterOptions options, File outputFile) { boolean[] timeInt96Values = options.getFlatIsTimeTypeInt96(); boolean[] isMapValues = options.getFlatIsMap(); int[] precisions = options.getFlatPrecision(); + int[] parquetFieldIds = options.getFlatParquetFieldId(); int[] flatNumChildren = options.getFlatNumChildren(); this.consumer = null; @@ -1215,6 +1219,7 @@ private ParquetTableWriter(ParquetWriterOptions options, File outputFile) { timeInt96Values, precisions, isMapValues, + parquetFieldIds, outputFile.getAbsolutePath()); } @@ -1224,6 +1229,7 @@ private ParquetTableWriter(ParquetWriterOptions options, HostBufferConsumer cons boolean[] timeInt96Values = options.getFlatIsTimeTypeInt96(); boolean[] isMapValues = options.getFlatIsMap(); int[] precisions = options.getFlatPrecision(); + int[] parquetFieldIds = options.getFlatParquetFieldId(); int[] flatNumChildren = options.getFlatNumChildren(); this.consumer = consumer; @@ -1238,6 +1244,7 @@ private ParquetTableWriter(ParquetWriterOptions options, HostBufferConsumer cons timeInt96Values, precisions, isMapValues, + parquetFieldIds, consumer); } diff --git a/java/src/main/native/src/TableJni.cpp b/java/src/main/native/src/TableJni.cpp index cebe476dd87..f0073281293 100644 --- a/java/src/main/native/src/TableJni.cpp +++ b/java/src/main/native/src/TableJni.cpp @@ -673,6 +673,7 @@ int set_column_metadata(cudf::io::column_in_metadata &column_metadata, cudf::jni::native_jbooleanArray &is_int96, cudf::jni::native_jintArray &precisions, cudf::jni::native_jbooleanArray &is_map, + cudf::jni::native_jintArray &parquetFieldIds, cudf::jni::native_jintArray &children, int num_children, int read_index) { int write_index = 0; for (int i = 0; i < num_children; i++, write_index++) { @@ -687,12 +688,15 @@ int set_column_metadata(cudf::io::column_in_metadata &column_metadata, if (is_map[read_index]) { child.set_list_column_as_map(); } + if (!parquetFieldIds.is_null()) { + child.set_parquet_field_id(parquetFieldIds[read_index]); + } column_metadata.add_child(child); int childs_children = children[read_index++]; if (childs_children > 0) { read_index = set_column_metadata(column_metadata.child(write_index), col_names, nullability, is_int96, - precisions, is_map, children, childs_children, read_index); + precisions, is_map, parquetFieldIds, children, childs_children, read_index); } } return read_index; @@ -701,12 +705,14 @@ int set_column_metadata(cudf::io::column_in_metadata &column_metadata, void createTableMetaData(JNIEnv *env, jint num_children, jobjectArray &j_col_names, jintArray &j_children, jbooleanArray &j_col_nullability, jbooleanArray &j_is_int96, jintArray &j_precisions, - jbooleanArray &j_is_map, cudf::io::table_input_metadata &metadata) { + jbooleanArray &j_is_map, cudf::io::table_input_metadata &metadata, + jintArray &j_parquetFieldIds) { cudf::jni::auto_set_device(env); cudf::jni::native_jstringArray col_names(env, j_col_names); cudf::jni::native_jbooleanArray col_nullability(env, j_col_nullability); cudf::jni::native_jbooleanArray is_int96(env, j_is_int96); cudf::jni::native_jintArray precisions(env, j_precisions); + cudf::jni::native_jintArray parquetFieldIds(env, j_parquetFieldIds); cudf::jni::native_jintArray children(env, j_children); cudf::jni::native_jbooleanArray is_map(env, j_is_map); @@ -729,11 +735,14 @@ void createTableMetaData(JNIEnv *env, jint num_children, jobjectArray &j_col_nam if (is_map[read_index]) { metadata.column_metadata[write_index].set_list_column_as_map(); } + if (!parquetFieldIds.is_null()) { + metadata.column_metadata[write_index].set_parquet_field_id(parquetFieldIds[read_index]); + } int childs_children = children[read_index++]; if (childs_children > 0) { read_index = set_column_metadata(metadata.column_metadata[write_index], cpp_names, col_nullability, - is_int96, precisions, is_map, children, childs_children, read_index); + is_int96, precisions, is_map, parquetFieldIds, children, childs_children, read_index); } } } @@ -1539,7 +1548,7 @@ JNIEXPORT long JNICALL Java_ai_rapids_cudf_Table_writeParquetBufferBegin( JNIEnv *env, jclass, jobjectArray j_col_names, jint j_num_children, jintArray j_children, jbooleanArray j_col_nullability, jobjectArray j_metadata_keys, jobjectArray j_metadata_values, jint j_compression, jint j_stats_freq, jbooleanArray j_isInt96, jintArray j_precisions, - jbooleanArray j_is_map, jobject consumer) { + jbooleanArray j_is_map, jintArray j_parquetFieldIds, jobject consumer) { JNI_NULL_CHECK(env, j_col_names, "null columns", 0); JNI_NULL_CHECK(env, j_col_nullability, "null nullability", 0); JNI_NULL_CHECK(env, j_metadata_keys, "null metadata keys", 0); @@ -1554,7 +1563,7 @@ JNIEXPORT long JNICALL Java_ai_rapids_cudf_Table_writeParquetBufferBegin( sink_info sink{data_sink.get()}; table_input_metadata metadata; createTableMetaData(env, j_num_children, j_col_names, j_children, j_col_nullability, j_isInt96, - j_precisions, j_is_map, metadata); + j_precisions, j_is_map, metadata, j_parquetFieldIds); auto meta_keys = cudf::jni::native_jstringArray{env, j_metadata_keys}.as_cpp_vector(); auto meta_values = cudf::jni::native_jstringArray{env, j_metadata_values}.as_cpp_vector(); @@ -1583,7 +1592,7 @@ JNIEXPORT long JNICALL Java_ai_rapids_cudf_Table_writeParquetFileBegin( JNIEnv *env, jclass, jobjectArray j_col_names, jint j_num_children, jintArray j_children, jbooleanArray j_col_nullability, jobjectArray j_metadata_keys, jobjectArray j_metadata_values, jint j_compression, jint j_stats_freq, jbooleanArray j_isInt96, jintArray j_precisions, - jbooleanArray j_is_map, jstring j_output_path) { + jbooleanArray j_is_map, jintArray j_parquetFieldIds, jstring j_output_path) { JNI_NULL_CHECK(env, j_col_names, "null columns", 0); JNI_NULL_CHECK(env, j_col_nullability, "null nullability", 0); JNI_NULL_CHECK(env, j_metadata_keys, "null metadata keys", 0); @@ -1596,7 +1605,7 @@ JNIEXPORT long JNICALL Java_ai_rapids_cudf_Table_writeParquetFileBegin( using namespace cudf::jni; table_input_metadata metadata; createTableMetaData(env, j_num_children, j_col_names, j_children, j_col_nullability, j_isInt96, - j_precisions, j_is_map, metadata); + j_precisions, j_is_map, metadata, j_parquetFieldIds); auto meta_keys = cudf::jni::native_jstringArray{env, j_metadata_keys}.as_cpp_vector(); auto meta_values = cudf::jni::native_jstringArray{env, j_metadata_values}.as_cpp_vector(); @@ -1721,8 +1730,10 @@ JNIEXPORT long JNICALL Java_ai_rapids_cudf_Table_writeORCBufferBegin( table_input_metadata metadata; // ORC has no `j_is_int96`, but `createTableMetaData` needs a lvalue. jbooleanArray j_is_int96 = NULL; + // ORC has no `j_parquetFieldIds`, but `createTableMetaData` needs a lvalue. + jintArray j_parquetFieldIds = NULL; createTableMetaData(env, j_num_children, j_col_names, j_children, j_col_nullability, j_is_int96, - j_precisions, j_is_map, metadata); + j_precisions, j_is_map, metadata, j_parquetFieldIds); auto meta_keys = cudf::jni::native_jstringArray{env, j_metadata_keys}.as_cpp_vector(); auto meta_values = cudf::jni::native_jstringArray{env, j_metadata_values}.as_cpp_vector(); @@ -1766,8 +1777,10 @@ JNIEXPORT long JNICALL Java_ai_rapids_cudf_Table_writeORCFileBegin( table_input_metadata metadata; // ORC has no `j_is_int96`, but `createTableMetaData` needs a lvalue. jbooleanArray j_is_int96 = NULL; + // ORC has no `j_parquetFieldIds`, but `createTableMetaData` needs a lvalue. + jintArray j_parquetFieldIds = NULL; createTableMetaData(env, j_num_children, j_col_names, j_children, j_col_nullability, j_is_int96, - j_precisions, j_is_map, metadata); + j_precisions, j_is_map, metadata, j_parquetFieldIds); auto meta_keys = cudf::jni::native_jstringArray{env, j_metadata_keys}.as_cpp_vector(); auto meta_values = cudf::jni::native_jstringArray{env, j_metadata_values}.as_cpp_vector(); From 2f7560fa77a797724daf29c4820df14a8bd2d681 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Wed, 13 Apr 2022 17:52:10 +0800 Subject: [PATCH 2/3] Fix code style Signed-off-by: Chong Gao --- java/src/main/native/src/TableJni.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/java/src/main/native/src/TableJni.cpp b/java/src/main/native/src/TableJni.cpp index f0073281293..f715f8308e0 100644 --- a/java/src/main/native/src/TableJni.cpp +++ b/java/src/main/native/src/TableJni.cpp @@ -694,9 +694,9 @@ int set_column_metadata(cudf::io::column_in_metadata &column_metadata, column_metadata.add_child(child); int childs_children = children[read_index++]; if (childs_children > 0) { - read_index = - set_column_metadata(column_metadata.child(write_index), col_names, nullability, is_int96, - precisions, is_map, parquetFieldIds, children, childs_children, read_index); + read_index = set_column_metadata(column_metadata.child(write_index), col_names, nullability, + is_int96, precisions, is_map, parquetFieldIds, children, + childs_children, read_index); } } return read_index; @@ -740,9 +740,9 @@ void createTableMetaData(JNIEnv *env, jint num_children, jobjectArray &j_col_nam } int childs_children = children[read_index++]; if (childs_children > 0) { - read_index = - set_column_metadata(metadata.column_metadata[write_index], cpp_names, col_nullability, - is_int96, precisions, is_map, parquetFieldIds, children, childs_children, read_index); + read_index = set_column_metadata(metadata.column_metadata[write_index], cpp_names, + col_nullability, is_int96, precisions, is_map, + parquetFieldIds, children, childs_children, read_index); } } } From 31fe9923de62b1f9102a8a220757afa0ea077ef2 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Wed, 13 Apr 2022 19:43:41 +0800 Subject: [PATCH 3/3] Add test case Signed-off-by: Chong Gao --- .../test/java/ai/rapids/cudf/TableTest.java | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/java/src/test/java/ai/rapids/cudf/TableTest.java b/java/src/test/java/ai/rapids/cudf/TableTest.java index 7be1ca2118b..eb11e241bd0 100644 --- a/java/src/test/java/ai/rapids/cudf/TableTest.java +++ b/java/src/test/java/ai/rapids/cudf/TableTest.java @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.OriginalType; import org.junit.jupiter.api.Test; @@ -7899,6 +7900,54 @@ void testParquetWriteToFileUncompressedNoStats() throws IOException { } } + @Test + void testParquetWriteWithFieldId() throws IOException { + ColumnWriterOptions.StructBuilder sBuilder = + structBuilder("c3", true, 3) + .withColumns(true, "c31", 31) + .withColumns(true, "c32", 32); + ParquetWriterOptions options = ParquetWriterOptions.builder() + .withColumns(true, "c1", 1) + .withDecimalColumn("c2", 9, true, 2) + .withStructColumn(sBuilder.build()) + .withTimestampColumn("c4", true, true, 4) + .build(); + + File tempFile = File.createTempFile("test-field-id", ".parquet"); + try { + HostColumnVector.StructType structType = new HostColumnVector.StructType( + true, + new HostColumnVector.BasicType(true, DType.STRING), + new HostColumnVector.BasicType(true, DType.STRING)); + + try (Table table0 = new Table.TestBuilder() + .column(true, false) // c1 + .decimal32Column(0, 298, 2473) // c2 + .column(structType, // c3 + new HostColumnVector.StructData("a", "b"), new HostColumnVector.StructData("a", "b")) + .timestampMicrosecondsColumn(1000L, 2000L) // c4 + .build()) { + try (TableWriter writer = Table.writeParquetChunked(options, tempFile.getAbsoluteFile())) { + writer.write(table0); + } + } + + try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath( + new Path(tempFile.getAbsolutePath()), + new Configuration()))) { + MessageType schema = reader.getFooter().getFileMetaData().getSchema(); + assert (schema.getFields().get(0).getId().intValue() == 1); + assert (schema.getFields().get(1).getId().intValue() == 2); + assert (schema.getFields().get(2).getId().intValue() == 3); + assert (((GroupType) schema.getFields().get(2)).getFields().get(0).getId().intValue() == 31); + assert (((GroupType) schema.getFields().get(2)).getFields().get(1).getId().intValue() == 32); + assert (schema.getFields().get(3).getId().intValue() == 4); + } + } finally { + tempFile.delete(); + } + } + /** Return a column where DECIMAL64 has been up-casted to DECIMAL128 */ private ColumnVector castDecimal64To128(ColumnView c) { DType dtype = c.getType();