Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added support to write vector/array of big decimals to parquet #6022

Merged
merged 2 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,7 @@

import io.deephaven.util.SafeCloseable;

import java.util.Iterator;
import java.util.PrimitiveIterator;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
* Interface for {@link SafeCloseable closeable} {@link PrimitiveIterator primitive iterators}.
Expand Down
150 changes: 107 additions & 43 deletions engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,37 @@
//
package io.deephaven.engine.util;

import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.primitive.iterator.CloseableIterator;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.table.ChunkSource;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.vectors.ObjectVectorColumnWrapper;
import io.deephaven.vector.ObjectVector;
import io.deephaven.vector.Vector;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.math.BigDecimal;
import java.util.Iterator;
import java.util.Properties;

/**
* Utilities to support BigDecimal exhaust.
*
* <p>
* Parquet and Avro decimal types make a whole column decimal type have a fixed precision and scale; BigDecimal columns
* in Deephaven are, each value, arbitrary precision (its own precision and scale).
*
* <p>
* For static tables, it is possible to compute overall precision and scale values that fit every existing value. For
* refreshing tables, we need the user to tell us.
*/
public class BigDecimalUtils {
private static final PrecisionAndScale EMPTY_TABLE_PRECISION_AND_SCALE = new PrecisionAndScale(1, 1);
private static final int TARGET_CHUNK_SIZE = 4096;

public static final int INVALID_PRECISION_OR_SCALE = -1;

private static final PrecisionAndScale EMPTY_TABLE_PRECISION_AND_SCALE = new PrecisionAndScale(1, 1);
private static final int INIT_MAX_PRECISION_MINUS_SCALE = -1;
private static final int INIT_MAX_SCALE = -1;

/**
* Immutable way to store and pass precision and scale values.
*/
Expand All @@ -44,67 +50,125 @@ public PrecisionAndScale(final int precision, final int scale) {
/**
* Compute an overall precision and scale that would fit all existing values in a table.
*
* @param t a Deephaven table
* @param colName a Column for {@code t}, which should be of {@code BigDecimal} type
* @return a {@code PrecisionAndScale} object result.
* @param table A Deephaven table
* @param colName Column for {@code table}, which should be of {@code BigDecimal} {@link ColumnSource#getType type}
* or {@link ColumnSource#getComponentType component type}
* @return A {@link PrecisionAndScale} object result.
*/
public static PrecisionAndScale computePrecisionAndScale(
final Table t, final String colName) {
final ColumnSource<BigDecimal> src = t.getColumnSource(colName, BigDecimal.class);
return computePrecisionAndScale(t.getRowSet(), src);
final Table table,
final String colName) {
final ColumnSource<?> src = table.getColumnSource(colName);
return computePrecisionAndScale(table.getRowSet(), src);
}

/**
* Compute an overall precision and scale that would fit all existing values in a column source. Note that this
* requires a full table scan to ensure the correct values are determined.
*
* @param rowSet The rowset for the provided column
* @param source a {@code ColumnSource} of {@code BigDecimal} type
* @return a {@code PrecisionAndScale} object result.
* @param columnSource A {@code ColumnSource} of {@code BigDecimal} {@link ColumnSource#getType type} or
* {@link ColumnSource#getComponentType component type}
* @return A {@link PrecisionAndScale} object result.
*/
public static PrecisionAndScale computePrecisionAndScale(
final RowSet rowSet,
final ColumnSource<BigDecimal> source) {
final ColumnSource<?> columnSource) {
if (rowSet.isEmpty()) {
return EMPTY_TABLE_PRECISION_AND_SCALE;
}

// We will walk the entire table to determine the max(precision - scale) and
// max(scale), which corresponds to max(digits left of the decimal point), max(digits right of the decimal
// point). Then we convert to (precision, scale) before returning.
int maxPrecisionMinusScale = -1;
int maxScale = -1;
try (final ChunkSource.GetContext context = source.makeGetContext(TARGET_CHUNK_SIZE);
final RowSequence.Iterator it = rowSet.getRowSequenceIterator()) {
while (it.hasMore()) {
final RowSequence rowSeq = it.getNextRowSequenceWithLength(TARGET_CHUNK_SIZE);
final ObjectChunk<BigDecimal, ? extends Values> chunk =
source.getChunk(context, rowSeq).asObjectChunk();
for (int i = 0; i < chunk.size(); ++i) {
final BigDecimal x = chunk.get(i);
if (x == null) {
continue;
}

final int precision = x.precision();
final int scale = x.scale();
final int precisionMinusScale = precision - scale;
if (precisionMinusScale > maxPrecisionMinusScale) {
maxPrecisionMinusScale = precisionMinusScale;
}
if (scale > maxScale) {
maxScale = scale;
}
final BigDecimalParameters result = new BigDecimalParameters(INIT_MAX_PRECISION_MINUS_SCALE, INIT_MAX_SCALE);
final ObjectVector<?> columnVector = new ObjectVectorColumnWrapper<>(columnSource, rowSet);
try (final CloseableIterator<?> columnIterator = columnVector.iterator()) {
final Class<?> columnType = columnSource.getType();
if (columnType == BigDecimal.class) {
// noinspection unchecked
processFlatColumn((Iterator<BigDecimal>) columnIterator, result);
} else if (columnSource.getComponentType() == BigDecimal.class) {
if (columnType.isArray()) {
// noinspection unchecked
processArrayColumn((Iterator<BigDecimal[]>) columnIterator, result);
} else if (Vector.class.isAssignableFrom(columnType)) {
// noinspection unchecked
processVectorColumn((Iterator<ObjectVector<BigDecimal>>) columnIterator, result);
}
} else {
throw new IllegalArgumentException("Column source is not of type BigDecimal or an array/vector of " +
"BigDecimal, but of type " + columnType + " and component type " +
columnSource.getComponentType());
}
}

// If these are < 0, then every value we visited was null
if (maxPrecisionMinusScale < 0 && maxScale < 0) {
// If these are same as initial values, then every value we visited was null
if (result.maxPrecisionMinusScale == INIT_MAX_PRECISION_MINUS_SCALE && result.maxScale == INIT_MAX_SCALE) {
return EMPTY_TABLE_PRECISION_AND_SCALE;
}

return new PrecisionAndScale(maxPrecisionMinusScale + maxScale, maxScale);
return new PrecisionAndScale(result.maxPrecisionMinusScale + result.maxScale, result.maxScale);
}

private static class BigDecimalParameters {
private int maxPrecisionMinusScale;
private int maxScale;

private BigDecimalParameters(final int maxPrecisionMinusScale, final int maxScale) {
this.maxPrecisionMinusScale = maxPrecisionMinusScale;
this.maxScale = maxScale;
}

/**
* Update the maximum values for the parameters based on the given value.
*/
private void updateMaximum(@Nullable final BigDecimal value) {
if (value == null) {
return;
}
final int precision = value.precision();
final int scale = value.scale();
final int precisionMinusScale = precision - scale;
if (precisionMinusScale > maxPrecisionMinusScale) {
maxPrecisionMinusScale = precisionMinusScale;
}
if (scale > maxScale) {
maxScale = scale;
}
}
}

private static void processFlatColumn(
@NotNull final Iterator<BigDecimal> columnIterator,
@NotNull final BigDecimalParameters result) {
columnIterator.forEachRemaining(result::updateMaximum);
}

private static void processVectorColumn(
@NotNull final Iterator<ObjectVector<BigDecimal>> columnIterator,
@NotNull final BigDecimalParameters result) {
columnIterator.forEachRemaining(values -> {
if (values == null) {
return;
}
try (final CloseableIterator<BigDecimal> valuesIterator = values.iterator()) {
valuesIterator.forEachRemaining(result::updateMaximum);
}
});
}

private static void processArrayColumn(
@NotNull final Iterator<BigDecimal[]> columnIterator,
@NotNull final BigDecimalParameters result) {
columnIterator.forEachRemaining(values -> {
if (values == null) {
return;
}
for (final BigDecimal value : values) {
result.updateMaximum(value);
}
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public static PrecisionAndScale getPrecisionAndScale(
@NotNull final Map<String, Map<ParquetCacheTags, Object>> computedCache,
@NotNull final String columnName,
@NotNull final RowSet rowSet,
@NotNull Supplier<ColumnSource<BigDecimal>> columnSourceSupplier) {
@NotNull final Supplier<ColumnSource<?>> columnSourceSupplier) {
return (PrecisionAndScale) computedCache
.computeIfAbsent(columnName, unusedColumnName -> new HashMap<>())
.computeIfAbsent(ParquetCacheTags.DECIMAL_ARGS,
Expand Down Expand Up @@ -152,7 +152,7 @@ static TypeInfo bigDecimalTypeInfo(
final String columnName = column.getName();
// noinspection unchecked
final PrecisionAndScale precisionAndScale = getPrecisionAndScale(
computedCache, columnName, rowSet, () -> (ColumnSource<BigDecimal>) columnSourceMap.get(columnName));
computedCache, columnName, rowSet, () -> columnSourceMap.get(columnName));
final Set<Class<?>> clazzes = Set.of(BigDecimal.class);
return new TypeInfo() {
@Override
Expand All @@ -175,14 +175,9 @@ static TypeInfo getTypeInfo(
final RowSet rowSet,
final Map<String, ? extends ColumnSource<?>> columnSourceMap,
@NotNull final ParquetInstructions instructions) {
if (BigDecimal.class.equals(column.getDataType())) {
if (column.getDataType() == BigDecimal.class || column.getComponentType() == BigDecimal.class) {
return bigDecimalTypeInfo(computedCache, column, rowSet, columnSourceMap);
}
if (BigDecimal.class.equals(column.getComponentType())) {
throw new UnsupportedOperationException("Writing arrays/vector columns for big decimals is currently not " +
"supported");
// TODO(deephaven-core#4612): Add support for this
}
return lookupTypeInfo(column, instructions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ static <DATA_TYPE> TransferObject<?> create(
@NotNull final Map<String, Map<ParquetCacheTags, Object>> computedCache,
@NotNull final String columnName,
@NotNull final ColumnSource<DATA_TYPE> columnSource) {
Class<DATA_TYPE> columnType = columnSource.getType();
final Class<DATA_TYPE> columnType = columnSource.getType();
if (columnType == int.class) {
return IntTransfer.create(columnSource, tableRowSet, instructions.getTargetPageSize());
}
Expand Down Expand Up @@ -84,13 +84,11 @@ static <DATA_TYPE> TransferObject<?> create(
return new CodecTransfer<>(columnSource, codec, tableRowSet, instructions.getTargetPageSize());
}
if (columnType == BigDecimal.class) {
// noinspection unchecked
final ColumnSource<BigDecimal> bigDecimalColumnSource = (ColumnSource<BigDecimal>) columnSource;
final BigDecimalUtils.PrecisionAndScale precisionAndScale = TypeInfos.getPrecisionAndScale(
computedCache, columnName, tableRowSet, () -> bigDecimalColumnSource);
computedCache, columnName, tableRowSet, () -> columnSource);
final ObjectCodec<BigDecimal> codec = new BigDecimalParquetBytesCodec(
precisionAndScale.precision, precisionAndScale.scale);
return new CodecTransfer<>(bigDecimalColumnSource, codec, tableRowSet, instructions.getTargetPageSize());
return new CodecTransfer<>(columnSource, codec, tableRowSet, instructions.getTargetPageSize());
}
if (columnType == BigInteger.class) {
return new CodecTransfer<>(columnSource, new BigIntegerParquetBytesCodec(), tableRowSet,
Expand Down Expand Up @@ -136,6 +134,13 @@ static <DATA_TYPE> TransferObject<?> create(
if (componentType == String.class) {
return new StringArrayTransfer(columnSource, tableRowSet, instructions.getTargetPageSize());
}
if (componentType == BigDecimal.class) {
final BigDecimalUtils.PrecisionAndScale precisionAndScale = TypeInfos.getPrecisionAndScale(
computedCache, columnName, tableRowSet, () -> columnSource);
final ObjectCodec<BigDecimal> codec = new BigDecimalParquetBytesCodec(
precisionAndScale.precision, precisionAndScale.scale);
return new CodecArrayTransfer<>(columnSource, codec, tableRowSet, instructions.getTargetPageSize());
}
if (componentType == BigInteger.class) {
return new CodecArrayTransfer<>(columnSource, new BigIntegerParquetBytesCodec(),
tableRowSet, instructions.getTargetPageSize());
Expand All @@ -152,7 +157,7 @@ static <DATA_TYPE> TransferObject<?> create(
if (componentType == LocalDateTime.class) {
return new LocalDateTimeArrayTransfer(columnSource, tableRowSet, instructions.getTargetPageSize());
}
// TODO(deephaven-core#4612): Handle arrays of BigDecimal and if explicit codec provided
// TODO(deephaven-core#4612): Handle if explicit codec provided
}
if (Vector.class.isAssignableFrom(columnType)) {
if (componentType == int.class) {
Expand Down Expand Up @@ -182,6 +187,13 @@ static <DATA_TYPE> TransferObject<?> create(
if (componentType == String.class) {
return new StringVectorTransfer(columnSource, tableRowSet, instructions.getTargetPageSize());
}
if (componentType == BigDecimal.class) {
final BigDecimalUtils.PrecisionAndScale precisionAndScale = TypeInfos.getPrecisionAndScale(
computedCache, columnName, tableRowSet, () -> columnSource);
final ObjectCodec<BigDecimal> codec = new BigDecimalParquetBytesCodec(
precisionAndScale.precision, precisionAndScale.scale);
return new CodecVectorTransfer<>(columnSource, codec, tableRowSet, instructions.getTargetPageSize());
}
if (componentType == BigInteger.class) {
return new CodecVectorTransfer<>(columnSource, new BigIntegerParquetBytesCodec(),
tableRowSet, instructions.getTargetPageSize());
Expand All @@ -198,7 +210,7 @@ static <DATA_TYPE> TransferObject<?> create(
if (componentType == LocalDateTime.class) {
return new LocalDateTimeVectorTransfer(columnSource, tableRowSet, instructions.getTargetPageSize());
}
// TODO(deephaven-core#4612): Handle vectors of BigDecimal and if explicit codec provided
// TODO(deephaven-core#4612): Handle if explicit codec provided
}

// Go with the default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1825,29 +1825,6 @@ private static Table arrayToVectorTable(final Table table) {
return arrayToVectorFormulas.isEmpty() ? table : table.updateView(arrayToVectorFormulas);
}

@Test
public void testBigDecimalArrayColumn() {
final Table bdArrayTable = TableTools.emptyTable(10000).select(Selectable.from(List.of(
"someBigDecimalArrayColumn = new java.math.BigDecimal[] {i % 10 == 0 ? null : " +
"java.math.BigDecimal.valueOf(ii).stripTrailingZeros()}")));
final File dest = new File(rootFile + File.separator + "testBigDecimalArrayColumn.parquet");
try {
ParquetTools.writeTable(bdArrayTable, dest.getPath());
fail("Expected exception because writing arrays of big decimal column types is not supported");
} catch (final RuntimeException e) {
assertTrue(e.getCause() instanceof UnsupportedOperationException);
}

// Convert array to vector table
final Table bdVectorTable = arrayToVectorTable(bdArrayTable);
try {
ParquetTools.writeTable(bdVectorTable, dest.getPath());
fail("Expected exception because writing vectors of big decimal column types is not supported");
} catch (final RuntimeException e) {
assertTrue(e.getCause() instanceof UnsupportedOperationException);
}
}

@Test
public void testArrayColumns() {
ArrayList<String> columns =
Expand All @@ -1862,6 +1839,7 @@ public void testArrayColumns() {
"someByteArrayColumn = new byte[] {i % 10 == 0 ? null : (byte)i}",
"someCharArrayColumn = new char[] {i % 10 == 0 ? null : (char)i}",
"someTimeArrayColumn = new Instant[] {i % 10 == 0 ? null : (Instant)DateTimeUtils.now() + i}",
"someBigDecimalArrayColumn = new java.math.BigDecimal[] {i % 10 == 0 ? null : java.math.BigDecimal.valueOf(ii).stripTrailingZeros()}",
"someBiArrayColumn = new java.math.BigInteger[] {i % 10 == 0 ? null : java.math.BigInteger.valueOf(i)}",
"someDateArrayColumn = new java.time.LocalDate[] {i % 10 == 0 ? null : java.time.LocalDate.ofEpochDay(i)}",
"someTimeArrayColumn = new java.time.LocalTime[] {i % 10 == 0 ? null : java.time.LocalTime.of(i%24, i%60, (i+10)%60)}",
Expand Down
5 changes: 4 additions & 1 deletion py/server/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ def get_table_with_array_data(self):
"someCharArrayColumn = new char[] {i % 10 == 0 ? null : (char)i}",
"someTimeArrayColumn = new Instant[] {i % 10 == 0 ? null : (Instant)DateTimeUtils.now() + i}",
"someBiColumn = new java.math.BigInteger[] {i % 10 == 0 ? null : java.math.BigInteger.valueOf(i)}",
"someBdColumn = new java.math.BigDecimal[] {i % 10 == 0 ? null : " +
"java.math.BigDecimal.valueOf(ii).stripTrailingZeros()}",
"nullStringArrayColumn = new String[] {(String)null}",
"nullIntArrayColumn = new int[] {(int)null}",
"nullLongArrayColumn = new long[] {(long)null}",
Expand All @@ -240,7 +242,8 @@ def get_table_with_array_data(self):
"nullByteArrayColumn = new byte[] {(byte)null}",
"nullCharArrayColumn = new char[] {(char)null}",
"nullTimeArrayColumn = new Instant[] {(Instant)null}",
"nullBiColumn = new java.math.BigInteger[] {(java.math.BigInteger)null}"
"nullBiColumn = new java.math.BigInteger[] {(java.math.BigInteger)null}",
"nullBdColumn = new java.math.BigDecimal[] {(java.math.BigDecimal)null}"
])
return dh_table

Expand Down