Skip to content

Commit

Permalink
Merge branch 'parquet-field-id-writing' of github.com:PointKernel/cud…
Browse files Browse the repository at this point in the history
…f into parquet-field-id-writing
  • Loading branch information
PointKernel committed Apr 13, 2022
2 parents 648445f + 31fe992 commit f657407
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 20 deletions.
100 changes: 94 additions & 6 deletions java/src/main/java/ai/rapids/cudf/ColumnWriterOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@ public class ColumnWriterOptions {
private boolean isNullable;
private boolean isMap = false;
private String columnName;
// only for Parquet
private int parquetFieldId;

private ColumnWriterOptions(AbstractStructBuilder builder) {
this.columnName = builder.name;
this.isNullable = builder.isNullable;
this.parquetFieldId = builder.parquetFieldId;
this.childColumnOptions =
(ColumnWriterOptions[]) builder.children.toArray(new ColumnWriterOptions[0]);
}
Expand Down Expand Up @@ -67,6 +71,10 @@ public AbstractStructBuilder(String name, boolean isNullable) {
super(name, isNullable);
}

public AbstractStructBuilder(String name, boolean isNullable, int parquetFieldId) {
super(name, isNullable, parquetFieldId);
}

protected AbstractStructBuilder() {
super();
}
Expand All @@ -84,6 +92,8 @@ public static abstract class NestedBuilder<T extends NestedBuilder, V extends Co
protected List<ColumnWriterOptions> children = new ArrayList<>();
protected boolean isNullable = true;
protected String name = "";
// Parquet structure needs
protected int parquetFieldId;

/**
* Builder specific to build a Struct meta
Expand All @@ -93,22 +103,42 @@ protected NestedBuilder(String name, boolean isNullable) {
this.isNullable = isNullable;
}

protected NestedBuilder(String name, boolean isNullable, int parquetFieldId) {
this.name = name;
this.isNullable = isNullable;
this.parquetFieldId = parquetFieldId;
}

protected NestedBuilder() {}

protected ColumnWriterOptions withColumns(String name, boolean isNullable) {
return new ColumnWriterOptions(name, isNullable);
}

protected ColumnWriterOptions withColumns(String name, boolean isNullable, int parquetFieldId) {
return new ColumnWriterOptions(name, isNullable, parquetFieldId);
}

protected ColumnWriterOptions withDecimal(String name, int precision,
boolean isNullable) {
return new ColumnWriterOptions(name, false, precision, isNullable);
}

protected ColumnWriterOptions withDecimal(String name, int precision,
boolean isNullable, int parquetFieldId) {
return new ColumnWriterOptions(name, false, precision, isNullable, parquetFieldId);
}

protected ColumnWriterOptions withTimestamp(String name, boolean isInt96,
boolean isNullable) {
return new ColumnWriterOptions(name, isInt96, UNKNOWN_PRECISION, isNullable);
}

protected ColumnWriterOptions withTimestamp(String name, boolean isInt96,
boolean isNullable, int parquetFieldId) {
return new ColumnWriterOptions(name, isInt96, UNKNOWN_PRECISION, isNullable, parquetFieldId);
}

/**
* Set the list column meta.
* Lists should have only one child in ColumnVector, but the metadata expects a
Expand Down Expand Up @@ -155,30 +185,39 @@ public T withStructColumn(StructColumnWriterOptions child) {
/**
* Set column name
*/
public T withNonNullableColumns(String... name) {
withColumns(false, name);
public T withNonNullableColumns(String... names) {
withColumns(false, names);
return (T) this;
}

/**
* Set nullable column meta data
*/
public T withNullableColumns(String... name) {
withColumns(true, name);
public T withNullableColumns(String... names) {
withColumns(true, names);
return (T) this;
}

/**
* Set a simple child meta data
* @return this for chaining.
*/
public T withColumns(boolean nullable, String... name) {
for (String n : name) {
public T withColumns(boolean nullable, String... names) {
for (String n : names) {
children.add(withColumns(n, nullable));
}
return (T) this;
}

/**
* Set a simple child meta data
* @return this for chaining.
*/
public T withColumns(boolean nullable, String name, int parquetFieldId) {
children.add(withColumns(name, nullable, parquetFieldId));
return (T) this;
}

/**
* Set a Decimal child meta data
* @return this for chaining.
Expand All @@ -188,6 +227,15 @@ public T withDecimalColumn(String name, int precision, boolean nullable) {
return (T) this;
}

/**
* Set a Decimal child meta data
* @return this for chaining.
*/
public T withDecimalColumn(String name, int precision, boolean nullable, int parquetFieldId) {
children.add(withDecimal(name, precision, nullable, parquetFieldId));
return (T) this;
}

/**
* Set a Decimal child meta data
* @return this for chaining.
Expand All @@ -206,6 +254,15 @@ public T withDecimalColumn(String name, int precision) {
return (T) this;
}

/**
* Set a timestamp child meta data
* @return this for chaining.
*/
public T withTimestampColumn(String name, boolean isInt96, boolean nullable, int parquetFieldId) {
children.add(withTimestamp(name, isInt96, nullable, parquetFieldId));
return (T) this;
}

/**
* Set a timestamp child meta data
* @return this for chaining.
Expand Down Expand Up @@ -244,13 +301,24 @@ public ColumnWriterOptions(String columnName, boolean isTimestampTypeInt96,
this.columnName = columnName;
}

public ColumnWriterOptions(String columnName, boolean isTimestampTypeInt96,
int precision, boolean isNullable, int parquetFieldId) {
this(columnName, isTimestampTypeInt96, precision, isNullable);
this.parquetFieldId = parquetFieldId;
}

public ColumnWriterOptions(String columnName, boolean isNullable) {
this.isTimestampTypeInt96 = false;
this.precision = UNKNOWN_PRECISION;
this.isNullable = isNullable;
this.columnName = columnName;
}

public ColumnWriterOptions(String columnName, boolean isNullable, int parquetFieldId) {
this(columnName, isNullable);
this.parquetFieldId = parquetFieldId;
}

public ColumnWriterOptions(String columnName) {
this(columnName, true);
}
Expand Down Expand Up @@ -302,6 +370,15 @@ int[] getFlatPrecision() {
}
}

int[] getFlatParquetFieldId() {
int[] ret = {parquetFieldId};
if (childColumnOptions.length > 0) {
return getFlatInts(ret, (opt) -> opt.getFlatParquetFieldId());
} else {
return ret;
}
}

boolean[] getFlatIsNullable() {
boolean[] ret = {isNullable};
if (childColumnOptions.length > 0) {
Expand Down Expand Up @@ -418,6 +495,13 @@ public static StructBuilder structBuilder(String name, boolean isNullable) {
return new StructBuilder(name, isNullable);
}

/**
* Creates a StructBuilder for column called 'name'
*/
public static StructBuilder structBuilder(String name, boolean isNullable, int parquetFieldId) {
return new StructBuilder(name, isNullable, parquetFieldId);
}

/**
* Creates a StructBuilder for column called 'name'
*/
Expand Down Expand Up @@ -477,6 +561,10 @@ public StructBuilder(String name, boolean isNullable) {
super(name, isNullable);
}

public StructBuilder(String name, boolean isNullable, int parquetFieldId) {
super(name, isNullable, parquetFieldId);
}

public StructColumnWriterOptions build() {
return new StructColumnWriterOptions(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ int[] getFlatPrecision() {
return super.getFlatInts(new int[]{}, (opt) -> opt.getFlatPrecision());
}

@Override
int[] getFlatParquetFieldId() {
return super.getFlatInts(new int[]{}, (opt) -> opt.getFlatParquetFieldId());
}

@Override
int[] getFlatNumChildren() {
return super.getFlatInts(new int[]{}, (opt) -> opt.getFlatNumChildren());
Expand Down
9 changes: 8 additions & 1 deletion java/src/main/java/ai/rapids/cudf/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,9 @@ private static native long writeParquetFileBegin(String[] columnNames,
int statsFreq,
boolean[] isInt96,
int[] precisions,
boolean[] isMapValues, String filename) throws CudfException;
boolean[] isMapValues,
int[] parquetFieldIds,
String filename) throws CudfException;

/**
* Setup everything to write parquet formatted data to a buffer.
Expand Down Expand Up @@ -319,6 +321,7 @@ private static native long writeParquetBufferBegin(String[] columnNames,
boolean[] isInt96,
int[] precisions,
boolean[] isMapValues,
int[] parquetFieldIds,
HostBufferConsumer consumer) throws CudfException;

/**
Expand Down Expand Up @@ -1201,6 +1204,7 @@ private ParquetTableWriter(ParquetWriterOptions options, File outputFile) {
boolean[] timeInt96Values = options.getFlatIsTimeTypeInt96();
boolean[] isMapValues = options.getFlatIsMap();
int[] precisions = options.getFlatPrecision();
int[] parquetFieldIds = options.getFlatParquetFieldId();
int[] flatNumChildren = options.getFlatNumChildren();

this.consumer = null;
Expand All @@ -1215,6 +1219,7 @@ private ParquetTableWriter(ParquetWriterOptions options, File outputFile) {
timeInt96Values,
precisions,
isMapValues,
parquetFieldIds,
outputFile.getAbsolutePath());
}

Expand All @@ -1224,6 +1229,7 @@ private ParquetTableWriter(ParquetWriterOptions options, HostBufferConsumer cons
boolean[] timeInt96Values = options.getFlatIsTimeTypeInt96();
boolean[] isMapValues = options.getFlatIsMap();
int[] precisions = options.getFlatPrecision();
int[] parquetFieldIds = options.getFlatParquetFieldId();
int[] flatNumChildren = options.getFlatNumChildren();

this.consumer = consumer;
Expand All @@ -1238,6 +1244,7 @@ private ParquetTableWriter(ParquetWriterOptions options, HostBufferConsumer cons
timeInt96Values,
precisions,
isMapValues,
parquetFieldIds,
consumer);
}

Expand Down
Loading

0 comments on commit f657407

Please sign in to comment.