Skip to content

Commit

Permalink
Add a boolean sentinel for field ID
Browse files Browse the repository at this point in the history
  • Loading branch information
Chong Gao committed Apr 14, 2022
1 parent 31fe992 commit de3d17a
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 19 deletions.
25 changes: 20 additions & 5 deletions java/src/main/java/ai/rapids/cudf/ColumnWriterOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ public class ColumnWriterOptions {
private boolean isMap = false;
private String columnName;
// only for Parquet
private boolean hasParquetFieldId;
private int parquetFieldId;

private ColumnWriterOptions(AbstractStructBuilder builder) {
this.columnName = builder.name;
this.isNullable = builder.isNullable;
this.hasParquetFieldId = builder.hasParquetFieldId;
this.parquetFieldId = builder.parquetFieldId;
this.childColumnOptions =
(ColumnWriterOptions[]) builder.children.toArray(new ColumnWriterOptions[0]);
Expand Down Expand Up @@ -93,6 +95,7 @@ public static abstract class NestedBuilder<T extends NestedBuilder, V extends Co
protected boolean isNullable = true;
protected String name = "";
// Parquet structure needs
protected boolean hasParquetFieldId;
protected int parquetFieldId;

/**
Expand All @@ -106,16 +109,17 @@ protected NestedBuilder(String name, boolean isNullable) {
protected NestedBuilder(String name, boolean isNullable, int parquetFieldId) {
this.name = name;
this.isNullable = isNullable;
this.hasParquetFieldId = true;
this.parquetFieldId = parquetFieldId;
}

protected NestedBuilder() {}

protected ColumnWriterOptions withColumns(String name, boolean isNullable) {
protected ColumnWriterOptions withColumn(String name, boolean isNullable) {
return new ColumnWriterOptions(name, isNullable);
}

protected ColumnWriterOptions withColumns(String name, boolean isNullable, int parquetFieldId) {
protected ColumnWriterOptions withColumn(String name, boolean isNullable, int parquetFieldId) {
return new ColumnWriterOptions(name, isNullable, parquetFieldId);
}

Expand Down Expand Up @@ -204,7 +208,7 @@ public T withNullableColumns(String... names) {
*/
public T withColumns(boolean nullable, String... names) {
for (String n : names) {
children.add(withColumns(n, nullable));
children.add(withColumn(n, nullable));
}
return (T) this;
}
Expand All @@ -213,8 +217,8 @@ public T withColumns(boolean nullable, String... names) {
* Set a simple child meta data
* @return this for chaining.
*/
public T withColumns(boolean nullable, String name, int parquetFieldId) {
children.add(withColumns(name, nullable, parquetFieldId));
public T withColumn(boolean nullable, String name, int parquetFieldId) {
children.add(withColumn(name, nullable, parquetFieldId));
return (T) this;
}

Expand Down Expand Up @@ -304,6 +308,7 @@ public ColumnWriterOptions(String columnName, boolean isTimestampTypeInt96,
public ColumnWriterOptions(String columnName, boolean isTimestampTypeInt96,
int precision, boolean isNullable, int parquetFieldId) {
this(columnName, isTimestampTypeInt96, precision, isNullable);
this.hasParquetFieldId = true;
this.parquetFieldId = parquetFieldId;
}

Expand All @@ -316,6 +321,7 @@ public ColumnWriterOptions(String columnName, boolean isNullable) {

public ColumnWriterOptions(String columnName, boolean isNullable, int parquetFieldId) {
this(columnName, isNullable);
this.hasParquetFieldId = true;
this.parquetFieldId = parquetFieldId;
}

Expand Down Expand Up @@ -370,6 +376,15 @@ int[] getFlatPrecision() {
}
}

boolean[] getFlatHasParquetFieldId() {
boolean[] ret = {hasParquetFieldId};
if (childColumnOptions.length > 0) {
return getFlatBooleans(ret, (opt) -> opt.getFlatHasParquetFieldId());
} else {
return ret;
}
}

int[] getFlatParquetFieldId() {
int[] ret = {parquetFieldId};
if (childColumnOptions.length > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ int[] getFlatPrecision() {
return super.getFlatInts(new int[]{}, (opt) -> opt.getFlatPrecision());
}

@Override
boolean[] getFlatHasParquetFieldId() {
return super.getFlatBooleans(new boolean[]{}, (opt) -> opt.getFlatHasParquetFieldId());
}

@Override
int[] getFlatParquetFieldId() {
return super.getFlatInts(new int[]{}, (opt) -> opt.getFlatParquetFieldId());
Expand Down
6 changes: 6 additions & 0 deletions java/src/main/java/ai/rapids/cudf/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ private static native long writeParquetFileBegin(String[] columnNames,
boolean[] isInt96,
int[] precisions,
boolean[] isMapValues,
boolean[] hasParquetFieldIds,
int[] parquetFieldIds,
String filename) throws CudfException;

Expand Down Expand Up @@ -321,6 +322,7 @@ private static native long writeParquetBufferBegin(String[] columnNames,
boolean[] isInt96,
int[] precisions,
boolean[] isMapValues,
boolean[] hasParquetFieldIds,
int[] parquetFieldIds,
HostBufferConsumer consumer) throws CudfException;

Expand Down Expand Up @@ -1204,6 +1206,7 @@ private ParquetTableWriter(ParquetWriterOptions options, File outputFile) {
boolean[] timeInt96Values = options.getFlatIsTimeTypeInt96();
boolean[] isMapValues = options.getFlatIsMap();
int[] precisions = options.getFlatPrecision();
boolean[] hasParquetFieldIds = options.getFlatHasParquetFieldId();
int[] parquetFieldIds = options.getFlatParquetFieldId();
int[] flatNumChildren = options.getFlatNumChildren();

Expand All @@ -1219,6 +1222,7 @@ private ParquetTableWriter(ParquetWriterOptions options, File outputFile) {
timeInt96Values,
precisions,
isMapValues,
hasParquetFieldIds,
parquetFieldIds,
outputFile.getAbsolutePath());
}
Expand All @@ -1229,6 +1233,7 @@ private ParquetTableWriter(ParquetWriterOptions options, HostBufferConsumer cons
boolean[] timeInt96Values = options.getFlatIsTimeTypeInt96();
boolean[] isMapValues = options.getFlatIsMap();
int[] precisions = options.getFlatPrecision();
boolean[] hasParquetFieldIds = options.getFlatHasParquetFieldId();
int[] parquetFieldIds = options.getFlatParquetFieldId();
int[] flatNumChildren = options.getFlatNumChildren();

Expand All @@ -1244,6 +1249,7 @@ private ParquetTableWriter(ParquetWriterOptions options, HostBufferConsumer cons
timeInt96Values,
precisions,
isMapValues,
hasParquetFieldIds,
parquetFieldIds,
consumer);
}
Expand Down
22 changes: 14 additions & 8 deletions java/src/main/native/src/TableJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -706,12 +706,13 @@ void createTableMetaData(JNIEnv *env, jint num_children, jobjectArray &j_col_nam
jintArray &j_children, jbooleanArray &j_col_nullability,
jbooleanArray &j_is_int96, jintArray &j_precisions,
jbooleanArray &j_is_map, cudf::io::table_input_metadata &metadata,
jintArray &j_parquetFieldIds) {
jbooleanArray &j_hasParquetFieldIds, jintArray &j_parquetFieldIds) {
cudf::jni::auto_set_device(env);
cudf::jni::native_jstringArray col_names(env, j_col_names);
cudf::jni::native_jbooleanArray col_nullability(env, j_col_nullability);
cudf::jni::native_jbooleanArray is_int96(env, j_is_int96);
cudf::jni::native_jintArray precisions(env, j_precisions);
cudf::jni::native_jbooleanArray hasParquetFieldIds(env, j_hasParquetFieldIds);
cudf::jni::native_jintArray parquetFieldIds(env, j_parquetFieldIds);
cudf::jni::native_jintArray children(env, j_children);
cudf::jni::native_jbooleanArray is_map(env, j_is_map);
Expand All @@ -735,7 +736,7 @@ void createTableMetaData(JNIEnv *env, jint num_children, jobjectArray &j_col_nam
if (is_map[read_index]) {
metadata.column_metadata[write_index].set_list_column_as_map();
}
if (!parquetFieldIds.is_null()) {
if (!parquetFieldIds.is_null() && hasParquetFieldIds[read_index]) {
metadata.column_metadata[write_index].set_parquet_field_id(parquetFieldIds[read_index]);
}
int childs_children = children[read_index++];
Expand Down Expand Up @@ -1548,7 +1549,8 @@ JNIEXPORT long JNICALL Java_ai_rapids_cudf_Table_writeParquetBufferBegin(
JNIEnv *env, jclass, jobjectArray j_col_names, jint j_num_children, jintArray j_children,
jbooleanArray j_col_nullability, jobjectArray j_metadata_keys, jobjectArray j_metadata_values,
jint j_compression, jint j_stats_freq, jbooleanArray j_isInt96, jintArray j_precisions,
jbooleanArray j_is_map, jintArray j_parquetFieldIds, jobject consumer) {
jbooleanArray j_is_map, jbooleanArray j_hasParquetFieldIds, jintArray j_parquetFieldIds,
jobject consumer) {
JNI_NULL_CHECK(env, j_col_names, "null columns", 0);
JNI_NULL_CHECK(env, j_col_nullability, "null nullability", 0);
JNI_NULL_CHECK(env, j_metadata_keys, "null metadata keys", 0);
Expand All @@ -1563,7 +1565,7 @@ JNIEXPORT long JNICALL Java_ai_rapids_cudf_Table_writeParquetBufferBegin(
sink_info sink{data_sink.get()};
table_input_metadata metadata;
createTableMetaData(env, j_num_children, j_col_names, j_children, j_col_nullability, j_isInt96,
j_precisions, j_is_map, metadata, j_parquetFieldIds);
j_precisions, j_is_map, metadata, j_hasParquetFieldIds, j_parquetFieldIds);

auto meta_keys = cudf::jni::native_jstringArray{env, j_metadata_keys}.as_cpp_vector();
auto meta_values = cudf::jni::native_jstringArray{env, j_metadata_values}.as_cpp_vector();
Expand Down Expand Up @@ -1592,7 +1594,8 @@ JNIEXPORT long JNICALL Java_ai_rapids_cudf_Table_writeParquetFileBegin(
JNIEnv *env, jclass, jobjectArray j_col_names, jint j_num_children, jintArray j_children,
jbooleanArray j_col_nullability, jobjectArray j_metadata_keys, jobjectArray j_metadata_values,
jint j_compression, jint j_stats_freq, jbooleanArray j_isInt96, jintArray j_precisions,
jbooleanArray j_is_map, jintArray j_parquetFieldIds, jstring j_output_path) {
jbooleanArray j_is_map, jbooleanArray j_hasParquetFieldIds, jintArray j_parquetFieldIds,
jstring j_output_path) {
JNI_NULL_CHECK(env, j_col_names, "null columns", 0);
JNI_NULL_CHECK(env, j_col_nullability, "null nullability", 0);
JNI_NULL_CHECK(env, j_metadata_keys, "null metadata keys", 0);
Expand All @@ -1605,7 +1608,7 @@ JNIEXPORT long JNICALL Java_ai_rapids_cudf_Table_writeParquetFileBegin(
using namespace cudf::jni;
table_input_metadata metadata;
createTableMetaData(env, j_num_children, j_col_names, j_children, j_col_nullability, j_isInt96,
j_precisions, j_is_map, metadata, j_parquetFieldIds);
j_precisions, j_is_map, metadata, j_hasParquetFieldIds, j_parquetFieldIds);

auto meta_keys = cudf::jni::native_jstringArray{env, j_metadata_keys}.as_cpp_vector();
auto meta_values = cudf::jni::native_jstringArray{env, j_metadata_values}.as_cpp_vector();
Expand Down Expand Up @@ -1731,9 +1734,11 @@ JNIEXPORT long JNICALL Java_ai_rapids_cudf_Table_writeORCBufferBegin(
// ORC has no `j_is_int96`, but `createTableMetaData` needs a lvalue.
jbooleanArray j_is_int96 = NULL;
// ORC has no `j_parquetFieldIds`, but `createTableMetaData` needs a lvalue.
jbooleanArray j_hasParquetFieldIds = NULL;
jintArray j_parquetFieldIds = NULL;

createTableMetaData(env, j_num_children, j_col_names, j_children, j_col_nullability, j_is_int96,
j_precisions, j_is_map, metadata, j_parquetFieldIds);
j_precisions, j_is_map, metadata, j_hasParquetFieldIds, j_parquetFieldIds);

auto meta_keys = cudf::jni::native_jstringArray{env, j_metadata_keys}.as_cpp_vector();
auto meta_values = cudf::jni::native_jstringArray{env, j_metadata_values}.as_cpp_vector();
Expand Down Expand Up @@ -1778,9 +1783,10 @@ JNIEXPORT long JNICALL Java_ai_rapids_cudf_Table_writeORCFileBegin(
// ORC has no `j_is_int96`, but `createTableMetaData` needs a lvalue.
jbooleanArray j_is_int96 = NULL;
// ORC has no `j_parquetFieldIds`, but `createTableMetaData` needs a lvalue.
jbooleanArray j_hasParquetFieldIds = NULL;
jintArray j_parquetFieldIds = NULL;
createTableMetaData(env, j_num_children, j_col_names, j_children, j_col_nullability, j_is_int96,
j_precisions, j_is_map, metadata, j_parquetFieldIds);
j_precisions, j_is_map, metadata, j_hasParquetFieldIds, j_parquetFieldIds);

auto meta_keys = cudf::jni::native_jstringArray{env, j_metadata_keys}.as_cpp_vector();
auto meta_values = cudf::jni::native_jstringArray{env, j_metadata_values}.as_cpp_vector();
Expand Down
17 changes: 11 additions & 6 deletions java/src/test/java/ai/rapids/cudf/TableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7902,15 +7902,18 @@ void testParquetWriteToFileUncompressedNoStats() throws IOException {

@Test
void testParquetWriteWithFieldId() throws IOException {
// field IDs are:
// c1: -1, c2: 2, c3: 3, c31: 31, c32: 32, c4: -4, c5: not specified
ColumnWriterOptions.StructBuilder sBuilder =
structBuilder("c3", true, 3)
.withColumns(true, "c31", 31)
.withColumns(true, "c32", 32);
.withColumn(true, "c31", 31)
.withColumn(true, "c32", 32);
ParquetWriterOptions options = ParquetWriterOptions.builder()
.withColumns(true, "c1", 1)
.withColumn(true, "c1", -1)
.withDecimalColumn("c2", 9, true, 2)
.withStructColumn(sBuilder.build())
.withTimestampColumn("c4", true, true, 4)
.withTimestampColumn("c4", true, true, -4)
.withColumns( true, "c5")
.build();

File tempFile = File.createTempFile("test-field-id", ".parquet");
Expand All @@ -7926,6 +7929,7 @@ void testParquetWriteWithFieldId() throws IOException {
.column(structType, // c3
new HostColumnVector.StructData("a", "b"), new HostColumnVector.StructData("a", "b"))
.timestampMicrosecondsColumn(1000L, 2000L) // c4
.column("a", "b") // c5
.build()) {
try (TableWriter writer = Table.writeParquetChunked(options, tempFile.getAbsoluteFile())) {
writer.write(table0);
Expand All @@ -7936,12 +7940,13 @@ void testParquetWriteWithFieldId() throws IOException {
new Path(tempFile.getAbsolutePath()),
new Configuration()))) {
MessageType schema = reader.getFooter().getFileMetaData().getSchema();
assert (schema.getFields().get(0).getId().intValue() == 1);
assert (schema.getFields().get(0).getId().intValue() == -1);
assert (schema.getFields().get(1).getId().intValue() == 2);
assert (schema.getFields().get(2).getId().intValue() == 3);
assert (((GroupType) schema.getFields().get(2)).getFields().get(0).getId().intValue() == 31);
assert (((GroupType) schema.getFields().get(2)).getFields().get(1).getId().intValue() == 32);
assert (schema.getFields().get(3).getId().intValue() == 4);
assert (schema.getFields().get(3).getId().intValue() == -4);
assert (schema.getFields().get(4).getId() == null);
}
} finally {
tempFile.delete();
Expand Down

0 comments on commit de3d17a

Please sign in to comment.