From fcef6180eeb8e7a57da298123d9f69253a0483d6 Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Mon, 14 Mar 2022 13:37:21 +0800 Subject: [PATCH] JNI support of Collect in Reduction Signed-off-by: sperlingxx --- .../java/ai/rapids/cudf/ColumnVector.java | 11 + .../ai/rapids/cudf/ReductionAggregation.java | 65 ++- .../java/ai/rapids/cudf/ReductionTest.java | 470 +++++++++++++----- 3 files changed, 409 insertions(+), 137 deletions(-) diff --git a/java/src/main/java/ai/rapids/cudf/ColumnVector.java b/java/src/main/java/ai/rapids/cudf/ColumnVector.java index 11b654ccec6..aab8e7dd475 100644 --- a/java/src/main/java/ai/rapids/cudf/ColumnVector.java +++ b/java/src/main/java/ai/rapids/cudf/ColumnVector.java @@ -1206,6 +1206,17 @@ public static ColumnVector emptyStructs(HostColumnVector.DataType dataType, long } } + /** + * Create a new vector from the given values. + */ + public static ColumnVector fromBooleans(boolean... values) { + byte[] bytes = new byte[values.length]; + for (int i = 0; i < values.length; i++) { + bytes[i] = values[i] ? (byte) 1 : (byte) 0; + } + return build(DType.BOOL8, values.length, (b) -> b.appendArray(bytes)); + } + /** * Create a new vector from the given values. */ diff --git a/java/src/main/java/ai/rapids/cudf/ReductionAggregation.java b/java/src/main/java/ai/rapids/cudf/ReductionAggregation.java index 7eff85dcd0d..e18098b7832 100644 --- a/java/src/main/java/ai/rapids/cudf/ReductionAggregation.java +++ b/java/src/main/java/ai/rapids/cudf/ReductionAggregation.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -209,4 +209,67 @@ public static ReductionAggregation nth(int offset) { public static ReductionAggregation nth(int offset, NullPolicy nullPolicy) { return new ReductionAggregation(Aggregation.nth(offset, nullPolicy)); } + + /** + * Collect the values into a list. Nulls will be skipped. + */ + public static ReductionAggregation collectList() { + return new ReductionAggregation(Aggregation.collectList()); + } + + /** + * Collect the values into a list. + * + * @param nullPolicy Indicates whether to include/exclude nulls during collection. + */ + public static ReductionAggregation collectList(NullPolicy nullPolicy) { + return new ReductionAggregation(Aggregation.collectList(nullPolicy)); + } + + /** + * Collect the values into a set. All null values will be excluded, and all nan values are regarded as + * unique instances. + */ + public static ReductionAggregation collectSet() { + return new ReductionAggregation(Aggregation.collectSet()); + } + + /** + * Collect the values into a set. + * + * @param nullPolicy Indicates whether to include/exclude nulls during collection. + * @param nullEquality Flag to specify whether null entries within each list should be considered equal. + * @param nanEquality Flag to specify whether NaN values in floating point column should be considered equal. + */ + public static ReductionAggregation collectSet(NullPolicy nullPolicy, + NullEquality nullEquality, NaNEquality nanEquality) { + return new ReductionAggregation(Aggregation.collectSet(nullPolicy, nullEquality, nanEquality)); + } + + /** + * Merge the partial lists produced by multiple CollectListAggregations. + * NOTICE: The partial lists to be merged should NOT include any null list element (but can include null list entries). + */ + public static ReductionAggregation mergeLists() { + return new ReductionAggregation(Aggregation.mergeLists()); + } + + /** + * Merge the partial sets produced by multiple CollectSetAggregations. Each null/nan value will be regarded as + * a unique instance. + */ + public static ReductionAggregation mergeSets() { + return new ReductionAggregation(Aggregation.mergeSets()); + } + + /** + * Merge the partial sets produced by multiple CollectSetAggregations. + * + * @param nullEquality Flag to specify whether null entries within each list should be considered equal. + * @param nanEquality Flag to specify whether NaN values in floating point column should be considered equal. + */ + public static ReductionAggregation mergeSets(NullEquality nullEquality, NaNEquality nanEquality) { + return new ReductionAggregation(Aggregation.mergeSets(nullEquality, nanEquality)); + } + } diff --git a/java/src/test/java/ai/rapids/cudf/ReductionTest.java b/java/src/test/java/ai/rapids/cudf/ReductionTest.java index 2b26597c8f7..3daa3db9218 100644 --- a/java/src/test/java/ai/rapids/cudf/ReductionTest.java +++ b/java/src/test/java/ai/rapids/cudf/ReductionTest.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2019, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,12 +17,14 @@ */ package ai.rapids.cudf; +import com.google.common.collect.Lists; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import java.util.EnumSet; +import java.util.List; import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -43,12 +45,14 @@ class ReductionTest extends CudfTestBase { Aggregation.Kind.ANY, Aggregation.Kind.ALL); - private static Scalar buildExpectedScalar(ReductionAggregation op, DType baseType, Object expectedObject) { + private static Scalar buildExpectedScalar(ReductionAggregation op, + HostColumnVector.DataType dataType, Object expectedObject) { + if (expectedObject == null) { - return Scalar.fromNull(baseType); + return Scalar.fromNull(dataType.getType()); } if (FLOAT_REDUCTIONS.contains(op.getWrapped().kind)) { - if (baseType.equals(DType.FLOAT32)) { + if (dataType.getType().equals(DType.FLOAT32)) { return Scalar.fromFloat((Float) expectedObject); } return Scalar.fromDouble((Double) expectedObject); @@ -56,7 +60,7 @@ private static Scalar buildExpectedScalar(ReductionAggregation op, DType baseTyp if (BOOL_REDUCTIONS.contains(op.getWrapped().kind)) { return Scalar.fromBool((Boolean) expectedObject); } - switch (baseType.typeId) { + switch (dataType.getType().typeId) { case BOOL8: return Scalar.fromBool((Boolean) expectedObject); case INT8: @@ -77,177 +81,346 @@ private static Scalar buildExpectedScalar(ReductionAggregation op, DType baseTyp case TIMESTAMP_MILLISECONDS: case TIMESTAMP_MICROSECONDS: case TIMESTAMP_NANOSECONDS: - return Scalar.timestampFromLong(baseType, (Long) expectedObject); + return Scalar.timestampFromLong(dataType.getType(), (Long) expectedObject); case STRING: return Scalar.fromString((String) expectedObject); + case LIST: + HostColumnVector.DataType et = dataType.getChild(0); + ColumnVector col = null; + try { + switch (et.getType().typeId) { + case BOOL8: + col = et.isNullable() ? ColumnVector.fromBoxedBooleans((Boolean[]) expectedObject) : + ColumnVector.fromBooleans((boolean[]) expectedObject); + return Scalar.listFromColumnView(col); + case INT8: + col = et.isNullable() ? ColumnVector.fromBoxedBytes((Byte[]) expectedObject) : + ColumnVector.fromBytes((byte[]) expectedObject); + return Scalar.listFromColumnView(col); + case INT16: + col = et.isNullable() ? ColumnVector.fromBoxedShorts((Short[]) expectedObject) : + ColumnVector.fromShorts((short[]) expectedObject); + return Scalar.listFromColumnView(col); + case INT32: + col = et.isNullable() ? ColumnVector.fromBoxedInts((Integer[]) expectedObject) : + ColumnVector.fromInts((int[]) expectedObject); + return Scalar.listFromColumnView(col); + case INT64: + col = et.isNullable() ? ColumnVector.fromBoxedLongs((Long[]) expectedObject) : + ColumnVector.fromLongs((long[]) expectedObject); + return Scalar.listFromColumnView(col); + case FLOAT32: + col = et.isNullable() ? ColumnVector.fromBoxedFloats((Float[]) expectedObject) : + ColumnVector.fromFloats((float[]) expectedObject); + return Scalar.listFromColumnView(col); + case FLOAT64: + col = et.isNullable() ? ColumnVector.fromBoxedDoubles((Double[]) expectedObject) : + ColumnVector.fromDoubles((double[]) expectedObject); + return Scalar.listFromColumnView(col); + case STRING: + col = ColumnVector.fromStrings((String[]) expectedObject); + return Scalar.listFromColumnView(col); + default: + throw new IllegalArgumentException("Unexpected element type of List: " + et); + } + } finally { + if (col != null) { + col.close(); + } + } default: - throw new IllegalArgumentException("Unexpected type: " + baseType); + throw new IllegalArgumentException("Unexpected type: " + dataType); } } private static Stream createBooleanParams() { Boolean[] vals = new Boolean[]{true, true, null, false, true, false, null}; + HostColumnVector.DataType bool = new HostColumnVector.BasicType(true, DType.BOOL8); return Stream.of( - Arguments.of(ReductionAggregation.sum(), new Boolean[0], null, 0.), - Arguments.of(ReductionAggregation.sum(), new Boolean[]{null, null, null}, null, 0.), - Arguments.of(ReductionAggregation.sum(), vals, true, 0.), - Arguments.of(ReductionAggregation.min(), vals, false, 0.), - Arguments.of(ReductionAggregation.max(), vals, true, 0.), - Arguments.of(ReductionAggregation.product(), vals, false, 0.), - Arguments.of(ReductionAggregation.sumOfSquares(), vals, true, 0.), - Arguments.of(ReductionAggregation.mean(), vals, 0.6, DELTAD), - Arguments.of(ReductionAggregation.standardDeviation(), vals, 0.5477225575051662, DELTAD), - Arguments.of(ReductionAggregation.variance(), vals, 0.3, DELTAD), - Arguments.of(ReductionAggregation.any(), vals, true, 0.), - Arguments.of(ReductionAggregation.all(), vals, false, 0.) + Arguments.of(ReductionAggregation.sum(), new Boolean[0], bool, null, 0.), + Arguments.of(ReductionAggregation.sum(), new Boolean[]{null, null, null}, bool, null, 0.), + Arguments.of(ReductionAggregation.sum(), vals, bool, true, 0.), + Arguments.of(ReductionAggregation.min(), vals, bool, false, 0.), + Arguments.of(ReductionAggregation.max(), vals, bool, true, 0.), + Arguments.of(ReductionAggregation.product(), vals, bool, false, 0.), + Arguments.of(ReductionAggregation.sumOfSquares(), vals, bool, true, 0.), + Arguments.of(ReductionAggregation.mean(), vals, bool, 0.6, DELTAD), + Arguments.of(ReductionAggregation.standardDeviation(), vals, bool, 0.5477225575051662, DELTAD), + Arguments.of(ReductionAggregation.variance(), vals, bool, 0.3, DELTAD), + Arguments.of(ReductionAggregation.any(), vals, bool, true, 0.), + Arguments.of(ReductionAggregation.all(), vals, bool, false, 0.) ); } private static Stream createByteParams() { Byte[] vals = new Byte[]{-1, 7, 123, null, 50, 60, 100}; + HostColumnVector.DataType int8 = new HostColumnVector.BasicType(true, DType.INT8); return Stream.of( - Arguments.of(ReductionAggregation.sum(), new Byte[0], null, 0.), - Arguments.of(ReductionAggregation.sum(), new Byte[]{null, null, null}, null, 0.), - Arguments.of(ReductionAggregation.sum(), vals, (byte) 83, 0.), - Arguments.of(ReductionAggregation.min(), vals, (byte) -1, 0.), - Arguments.of(ReductionAggregation.max(), vals, (byte) 123, 0.), - Arguments.of(ReductionAggregation.product(), vals, (byte) 160, 0.), - Arguments.of(ReductionAggregation.sumOfSquares(), vals, (byte) 47, 0.), - Arguments.of(ReductionAggregation.mean(), vals, 56.5, DELTAD), - Arguments.of(ReductionAggregation.standardDeviation(), vals, 49.24530434467839, DELTAD), - Arguments.of(ReductionAggregation.variance(), vals, 2425.1, DELTAD), - Arguments.of(ReductionAggregation.any(), vals, true, 0.), - Arguments.of(ReductionAggregation.all(), vals, true, 0.) + Arguments.of(ReductionAggregation.sum(), new Byte[0], int8, null, 0.), + Arguments.of(ReductionAggregation.sum(), new Byte[]{null, null, null}, int8, null, 0.), + Arguments.of(ReductionAggregation.sum(), vals, int8, (byte) 83, 0.), + Arguments.of(ReductionAggregation.min(), vals, int8, (byte) -1, 0.), + Arguments.of(ReductionAggregation.max(), vals, int8, (byte) 123, 0.), + Arguments.of(ReductionAggregation.product(), vals, int8, (byte) 160, 0.), + Arguments.of(ReductionAggregation.sumOfSquares(), vals, int8, (byte) 47, 0.), + Arguments.of(ReductionAggregation.mean(), vals, int8, 56.5, DELTAD), + Arguments.of(ReductionAggregation.standardDeviation(), vals, int8, 49.24530434467839, DELTAD), + Arguments.of(ReductionAggregation.variance(), vals, int8, 2425.1, DELTAD), + Arguments.of(ReductionAggregation.any(), vals, int8, true, 0.), + Arguments.of(ReductionAggregation.all(), vals, int8, true, 0.) ); } private static Stream createShortParams() { Short[] vals = new Short[]{-1, 7, 123, null, 50, 60, 100}; + HostColumnVector.DataType int16 = new HostColumnVector.BasicType(true, DType.INT16); return Stream.of( - Arguments.of(ReductionAggregation.sum(), new Short[0], null, 0.), - Arguments.of(ReductionAggregation.sum(), new Short[]{null, null, null}, null, 0.), - Arguments.of(ReductionAggregation.sum(), vals, (short) 339, 0.), - Arguments.of(ReductionAggregation.min(), vals, (short) -1, 0.), - Arguments.of(ReductionAggregation.max(), vals, (short) 123, 0.), - Arguments.of(ReductionAggregation.product(), vals, (short) -22624, 0.), - Arguments.of(ReductionAggregation.sumOfSquares(), vals, (short) 31279, 0.), - Arguments.of(ReductionAggregation.mean(), vals, 56.5, DELTAD), - Arguments.of(ReductionAggregation.standardDeviation(), vals, 49.24530434467839, DELTAD), - Arguments.of(ReductionAggregation.variance(), vals, 2425.1, DELTAD), - Arguments.of(ReductionAggregation.any(), vals, true, 0.), - Arguments.of(ReductionAggregation.all(), vals, true, 0.) + Arguments.of(ReductionAggregation.sum(), new Short[0], int16, null, 0.), + Arguments.of(ReductionAggregation.sum(), new Short[]{null, null, null}, int16, null, 0.), + Arguments.of(ReductionAggregation.sum(), vals, int16, (short) 339, 0.), + Arguments.of(ReductionAggregation.min(), vals, int16, (short) -1, 0.), + Arguments.of(ReductionAggregation.max(), vals, int16, (short) 123, 0.), + Arguments.of(ReductionAggregation.product(), vals, int16, (short) -22624, 0.), + Arguments.of(ReductionAggregation.sumOfSquares(), vals, int16, (short) 31279, 0.), + Arguments.of(ReductionAggregation.mean(), vals, int16, 56.5, DELTAD), + Arguments.of(ReductionAggregation.standardDeviation(), vals, int16, 49.24530434467839, DELTAD), + Arguments.of(ReductionAggregation.variance(), vals, int16, 2425.1, DELTAD), + Arguments.of(ReductionAggregation.any(), vals, int16, true, 0.), + Arguments.of(ReductionAggregation.all(), vals, int16, true, 0.) ); } private static Stream createIntParams() { Integer[] vals = new Integer[]{-1, 7, 123, null, 50, 60, 100}; + HostColumnVector.BasicType int32 = new HostColumnVector.BasicType(true, DType.INT32); return Stream.of( - Arguments.of(ReductionAggregation.sum(), new Integer[0], null, 0.), - Arguments.of(ReductionAggregation.sum(), new Integer[]{null, null, null}, null, 0.), - Arguments.of(ReductionAggregation.sum(), vals, 339, 0.), - Arguments.of(ReductionAggregation.min(), vals, -1, 0.), - Arguments.of(ReductionAggregation.max(), vals, 123, 0.), - Arguments.of(ReductionAggregation.product(), vals, -258300000, 0.), - Arguments.of(ReductionAggregation.sumOfSquares(), vals, 31279, 0.), - Arguments.of(ReductionAggregation.mean(), vals, 56.5, DELTAD), - Arguments.of(ReductionAggregation.standardDeviation(), vals, 49.24530434467839, DELTAD), - Arguments.of(ReductionAggregation.variance(), vals, 2425.1, DELTAD), - Arguments.of(ReductionAggregation.any(), vals, true, 0.), - Arguments.of(ReductionAggregation.all(), vals, true, 0.) + Arguments.of(ReductionAggregation.sum(), new Integer[0], int32, null, 0.), + Arguments.of(ReductionAggregation.sum(), new Integer[]{null, null, null}, int32, null, 0.), + Arguments.of(ReductionAggregation.sum(), vals, int32, 339, 0.), + Arguments.of(ReductionAggregation.min(), vals, int32, -1, 0.), + Arguments.of(ReductionAggregation.max(), vals, int32, 123, 0.), + Arguments.of(ReductionAggregation.product(), vals, int32, -258300000, 0.), + Arguments.of(ReductionAggregation.sumOfSquares(), vals, int32, 31279, 0.), + Arguments.of(ReductionAggregation.mean(), vals, int32, 56.5, DELTAD), + Arguments.of(ReductionAggregation.standardDeviation(), vals, int32, 49.24530434467839, DELTAD), + Arguments.of(ReductionAggregation.variance(), vals, int32, 2425.1, DELTAD), + Arguments.of(ReductionAggregation.any(), vals, int32, true, 0.), + Arguments.of(ReductionAggregation.all(), vals, int32, true, 0.) ); } private static Stream createLongParams() { Long[] vals = new Long[]{-1L, 7L, 123L, null, 50L, 60L, 100L}; + HostColumnVector.BasicType int64 = new HostColumnVector.BasicType(true, DType.INT64); return Stream.of( - Arguments.of(ReductionAggregation.sum(), new Long[0], null, 0.), - Arguments.of(ReductionAggregation.sum(), new Long[]{null, null, null}, null, 0.), - Arguments.of(ReductionAggregation.sum(), vals, 339L, 0.), - Arguments.of(ReductionAggregation.min(), vals, -1L, 0.), - Arguments.of(ReductionAggregation.max(), vals, 123L, 0.), - Arguments.of(ReductionAggregation.product(), vals, -258300000L, 0.), - Arguments.of(ReductionAggregation.sumOfSquares(), vals, 31279L, 0.), - Arguments.of(ReductionAggregation.mean(), vals, 56.5, DELTAD), - Arguments.of(ReductionAggregation.standardDeviation(), vals, 49.24530434467839, DELTAD), - Arguments.of(ReductionAggregation.variance(), vals, 2425.1, DELTAD), - Arguments.of(ReductionAggregation.any(), vals, true, 0.), - Arguments.of(ReductionAggregation.all(), vals, true, 0.), - Arguments.of(ReductionAggregation.quantile(0.5), vals, 55.0, DELTAD), - Arguments.of(ReductionAggregation.quantile(0.9), vals, 111.5, DELTAD) + Arguments.of(ReductionAggregation.sum(), new Long[0], int64, null, 0.), + Arguments.of(ReductionAggregation.sum(), new Long[]{null, null, null}, int64, null, 0.), + Arguments.of(ReductionAggregation.sum(), vals, int64, 339L, 0.), + Arguments.of(ReductionAggregation.min(), vals, int64, -1L, 0.), + Arguments.of(ReductionAggregation.max(), vals, int64, 123L, 0.), + Arguments.of(ReductionAggregation.product(), vals, int64, -258300000L, 0.), + Arguments.of(ReductionAggregation.sumOfSquares(), vals, int64, 31279L, 0.), + Arguments.of(ReductionAggregation.mean(), vals, int64, 56.5, DELTAD), + Arguments.of(ReductionAggregation.standardDeviation(), vals, int64, 49.24530434467839, DELTAD), + Arguments.of(ReductionAggregation.variance(), vals, int64, 2425.1, DELTAD), + Arguments.of(ReductionAggregation.any(), vals, int64, true, 0.), + Arguments.of(ReductionAggregation.all(), vals, int64, true, 0.), + Arguments.of(ReductionAggregation.quantile(0.5), vals, int64, 55.0, DELTAD), + Arguments.of(ReductionAggregation.quantile(0.9), vals, int64, 111.5, DELTAD) ); } private static Stream createFloatParams() { Float[] vals = new Float[]{-1f, 7f, 123f, null, 50f, 60f, 100f}; + Float[] notNulls = new Float[]{-1f, 7f, 123f, 50f, 60f, 100f}; + Float[] repeats = new Float[]{Float.MIN_VALUE, 7f, 7f, null, null, Float.NaN, Float.NaN, 50f, 50f, 100f}; + HostColumnVector.BasicType fp32 = new HostColumnVector.BasicType(true, DType.FLOAT32); + HostColumnVector.DataType listOfFloat = new HostColumnVector.ListType( + true, new HostColumnVector.BasicType(true, DType.FLOAT32)); return Stream.of( - Arguments.of(ReductionAggregation.sum(), new Float[0], null, 0f), - Arguments.of(ReductionAggregation.sum(), new Float[]{null, null, null}, null, 0f), - Arguments.of(ReductionAggregation.sum(), vals, 339f, 0f), - Arguments.of(ReductionAggregation.min(), vals, -1f, 0f), - Arguments.of(ReductionAggregation.max(), vals, 123f, 0f), - Arguments.of(ReductionAggregation.product(), vals, -258300000f, 0f), - Arguments.of(ReductionAggregation.sumOfSquares(), vals, 31279f, 0f), - Arguments.of(ReductionAggregation.mean(), vals, 56.5f, DELTAF), - Arguments.of(ReductionAggregation.standardDeviation(), vals, 49.24530434467839f, DELTAF), - Arguments.of(ReductionAggregation.variance(), vals, 2425.1f, DELTAF), - Arguments.of(ReductionAggregation.any(), vals, true, 0f), - Arguments.of(ReductionAggregation.all(), vals, true, 0f) + Arguments.of(ReductionAggregation.sum(), new Float[0], fp32, null, 0f), + Arguments.of(ReductionAggregation.sum(), new Float[]{null, null, null}, fp32, null, 0f), + Arguments.of(ReductionAggregation.sum(), vals, fp32, 339f, 0f), + Arguments.of(ReductionAggregation.min(), vals, fp32, -1f, 0f), + Arguments.of(ReductionAggregation.max(), vals, fp32, 123f, 0f), + Arguments.of(ReductionAggregation.product(), vals, fp32, -258300000f, 0f), + Arguments.of(ReductionAggregation.sumOfSquares(), vals, fp32, 31279f, 0f), + Arguments.of(ReductionAggregation.mean(), vals, fp32, 56.5f, DELTAF), + Arguments.of(ReductionAggregation.standardDeviation(), vals, fp32, 49.24530434467839f, DELTAF), + Arguments.of(ReductionAggregation.variance(), vals, fp32, 2425.1f, DELTAF), + Arguments.of(ReductionAggregation.any(), vals, fp32, true, 0f), + Arguments.of(ReductionAggregation.all(), vals, fp32, true, 0f), + Arguments.of(ReductionAggregation.collectList(NullPolicy.INCLUDE), vals, listOfFloat, vals, 0f), + Arguments.of(ReductionAggregation.collectList(), vals, listOfFloat, notNulls, 0f), + Arguments.of(ReductionAggregation.collectSet( + NullPolicy.EXCLUDE, NullEquality.EQUAL, NaNEquality.ALL_EQUAL), + repeats, listOfFloat, + new Float[]{Float.MIN_VALUE, 7f, 50f, 100f, Float.NaN}, 0f), + Arguments.of(ReductionAggregation.collectSet( + NullPolicy.INCLUDE, NullEquality.EQUAL, NaNEquality.ALL_EQUAL), + repeats, listOfFloat, + new Float[]{Float.MIN_VALUE, 7f, 50f, 100f, Float.NaN, null}, 0f), + Arguments.of(ReductionAggregation.collectSet( + NullPolicy.INCLUDE, NullEquality.UNEQUAL, NaNEquality.ALL_EQUAL), + repeats, listOfFloat, + new Float[]{Float.MIN_VALUE, 7f, 50f, 100f, Float.NaN, null, null}, 0f), + Arguments.of(ReductionAggregation.collectSet( + NullPolicy.INCLUDE, NullEquality.EQUAL, NaNEquality.UNEQUAL), + repeats, listOfFloat, + new Float[]{Float.MIN_VALUE, 7f, 50f, 100f, Float.NaN, Float.NaN, null}, 0f), + Arguments.of(ReductionAggregation.collectSet( + NullPolicy.INCLUDE, NullEquality.UNEQUAL, NaNEquality.UNEQUAL), + repeats, listOfFloat, + new Float[]{Float.MIN_VALUE, 7f, 50f, 100f, Float.NaN, Float.NaN, null, null}, 0f), + Arguments.of(ReductionAggregation.collectSet(), + repeats, listOfFloat, + new Float[]{Float.MIN_VALUE, 7f, 50f, 100f, Float.NaN, Float.NaN}, 0f) ); } private static Stream createDoubleParams() { Double[] vals = new Double[]{-1., 7., 123., null, 50., 60., 100.}; + Double[] notNulls = new Double[]{-1., 7., 123., 50., 60., 100.}; + Double[] repeats = new Double[]{Double.MIN_VALUE, 7., 7., null, null, Double.NaN, Double.NaN, 50., 50., 100.}; + HostColumnVector.BasicType fp64 = new HostColumnVector.BasicType(true, DType.FLOAT64); + HostColumnVector.DataType listOfDouble = new HostColumnVector.ListType( + true, new HostColumnVector.BasicType(true, DType.FLOAT64)); return Stream.of( - Arguments.of(ReductionAggregation.sum(), new Double[0], null, 0.), - Arguments.of(ReductionAggregation.sum(), new Double[]{null, null, null}, null, 0.), - Arguments.of(ReductionAggregation.sum(), vals, 339., 0.), - Arguments.of(ReductionAggregation.min(), vals, -1., 0.), - Arguments.of(ReductionAggregation.max(), vals, 123., 0.), - Arguments.of(ReductionAggregation.product(), vals, -258300000., 0.), - Arguments.of(ReductionAggregation.sumOfSquares(), vals, 31279., 0.), - Arguments.of(ReductionAggregation.mean(), vals, 56.5, DELTAD), - Arguments.of(ReductionAggregation.standardDeviation(), vals, 49.24530434467839, DELTAD), - Arguments.of(ReductionAggregation.variance(), vals, 2425.1, DELTAD), - Arguments.of(ReductionAggregation.any(), vals, true, 0.), - Arguments.of(ReductionAggregation.all(), vals, true, 0.), - Arguments.of(ReductionAggregation.quantile(0.5), vals, 55.0, DELTAD), - Arguments.of(ReductionAggregation.quantile(0.9), vals, 111.5, DELTAD) + Arguments.of(ReductionAggregation.sum(), new Double[0], fp64, null, 0.), + Arguments.of(ReductionAggregation.sum(), new Double[]{null, null, null}, fp64, null, 0.), + Arguments.of(ReductionAggregation.sum(), vals, fp64, 339., 0.), + Arguments.of(ReductionAggregation.min(), vals, fp64, -1., 0.), + Arguments.of(ReductionAggregation.max(), vals, fp64, 123., 0.), + Arguments.of(ReductionAggregation.product(), vals, fp64, -258300000., 0.), + Arguments.of(ReductionAggregation.sumOfSquares(), vals, fp64, 31279., 0.), + Arguments.of(ReductionAggregation.mean(), vals, fp64, 56.5, DELTAD), + Arguments.of(ReductionAggregation.standardDeviation(), vals, fp64, 49.24530434467839, DELTAD), + Arguments.of(ReductionAggregation.variance(), vals, fp64, 2425.1, DELTAD), + Arguments.of(ReductionAggregation.any(), vals, fp64, true, 0.), + Arguments.of(ReductionAggregation.all(), vals, fp64, true, 0.), + Arguments.of(ReductionAggregation.quantile(0.5), vals, fp64, 55.0, DELTAD), + Arguments.of(ReductionAggregation.quantile(0.9), vals, fp64, 111.5, DELTAD), + Arguments.of(ReductionAggregation.collectList(NullPolicy.INCLUDE), vals, listOfDouble, vals, 0.), + Arguments.of(ReductionAggregation.collectList(NullPolicy.EXCLUDE), vals, listOfDouble, notNulls, 0.), + Arguments.of(ReductionAggregation.collectSet( + NullPolicy.EXCLUDE, NullEquality.EQUAL, NaNEquality.ALL_EQUAL), + repeats, listOfDouble, + new Double[]{Double.MIN_VALUE, 7., 50., 100., Double.NaN}, 0.), + Arguments.of(ReductionAggregation.collectSet( + NullPolicy.INCLUDE, NullEquality.EQUAL, NaNEquality.ALL_EQUAL), + repeats, listOfDouble, + new Double[]{Double.MIN_VALUE, 7., 50., 100., Double.NaN, null}, 0.), + Arguments.of(ReductionAggregation.collectSet( + NullPolicy.INCLUDE, NullEquality.UNEQUAL, NaNEquality.ALL_EQUAL), + repeats, listOfDouble, + new Double[]{Double.MIN_VALUE, 7., 50., 100., Double.NaN, null, null}, 0.), + Arguments.of(ReductionAggregation.collectSet( + NullPolicy.INCLUDE, NullEquality.EQUAL, NaNEquality.UNEQUAL), + repeats, listOfDouble, + new Double[]{Double.MIN_VALUE, 7., 50., 100., Double.NaN, Double.NaN, null}, 0.), + Arguments.of(ReductionAggregation.collectSet( + NullPolicy.INCLUDE, NullEquality.UNEQUAL, NaNEquality.UNEQUAL), + repeats, listOfDouble, + new Double[]{Double.MIN_VALUE, 7., 50., 100., Double.NaN, Double.NaN, null, null}, 0.), + Arguments.of(ReductionAggregation.collectSet(), + repeats, listOfDouble, + new Double[]{Double.MIN_VALUE, 7., 50., 100., Double.NaN, Double.NaN}, 0.) ); } private static Stream createTimestampDaysParams() { Integer[] vals = new Integer[]{-1, 7, 123, null, 50, 60, 100}; + HostColumnVector.BasicType tsDay = new HostColumnVector.BasicType(true, DType.TIMESTAMP_DAYS); return Stream.of( - Arguments.of(ReductionAggregation.max(), new Integer[0], null), - Arguments.of(ReductionAggregation.max(), new Integer[]{null, null, null}, null), - Arguments.of(ReductionAggregation.max(), vals, 123), - Arguments.of(ReductionAggregation.min(), vals, -1) + Arguments.of(ReductionAggregation.max(), new Integer[0], tsDay, null), + Arguments.of(ReductionAggregation.max(), new Integer[]{null, null, null}, tsDay, null), + Arguments.of(ReductionAggregation.max(), vals, tsDay, 123), + Arguments.of(ReductionAggregation.min(), vals, tsDay, -1) ); } - private static Stream createTimestampResolutionParams() { + private static Stream createTimestampResolutionParams(HostColumnVector.BasicType tpe) { Long[] vals = new Long[]{-1L, 7L, 123L, null, 50L, 60L, 100L}; return Stream.of( - Arguments.of(ReductionAggregation.max(), new Long[0], null), - Arguments.of(ReductionAggregation.max(), new Long[]{null, null, null}, null), - Arguments.of(ReductionAggregation.min(), vals, -1L), - Arguments.of(ReductionAggregation.max(), vals, 123L) + Arguments.of(ReductionAggregation.max(), new Long[0], tpe, null), + Arguments.of(ReductionAggregation.max(), new Long[]{null, null, null}, tpe, null), + Arguments.of(ReductionAggregation.min(), vals, tpe, -1L), + Arguments.of(ReductionAggregation.max(), vals, tpe, 123L) + ); + } + + private static Stream createTimestampSecondsParams() { + return createTimestampResolutionParams( + new HostColumnVector.BasicType(true, DType.TIMESTAMP_SECONDS)); + } + + private static Stream createTimestampMilliSecondsParams() { + return createTimestampResolutionParams( + new HostColumnVector.BasicType(true, DType.TIMESTAMP_MILLISECONDS)); + } + + private static Stream createTimestampMicroSecondsParams() { + return createTimestampResolutionParams( + new HostColumnVector.BasicType(true, DType.TIMESTAMP_MICROSECONDS)); + } + + private static Stream createTimestampNanoSecondsParams() { + return createTimestampResolutionParams( + new HostColumnVector.BasicType(true, DType.TIMESTAMP_NANOSECONDS)); + } + + private static Stream createFloatArrayParams() { + List[] inputs = new List[]{ + Lists.newArrayList(-1f, 7f, null), + Lists.newArrayList(7f, 50f, 60f, Float.NaN), + Lists.newArrayList(), + Lists.newArrayList(60f, 100f, Float.NaN, null) + }; + HostColumnVector.DataType fpList = new HostColumnVector.ListType( + true, new HostColumnVector.BasicType(true, DType.FLOAT32)); + return Stream.of( + Arguments.of(ReductionAggregation.mergeLists(), inputs, fpList, + new Float[]{-1f, 7f, null, + 7f, 50f, 60f, Float.NaN, + 60f, 100f, Float.NaN, null}, 0f), + Arguments.of(ReductionAggregation.mergeSets(NullEquality.EQUAL, NaNEquality.ALL_EQUAL), + inputs, fpList, + new Float[]{-1f, 7f, 50f, 60f, 100f, Float.NaN, null}, 0f), + Arguments.of(ReductionAggregation.mergeSets(NullEquality.UNEQUAL, NaNEquality.ALL_EQUAL), + inputs, fpList, + new Float[]{-1f, 7f, 50f, 60f, 100f, Float.NaN, null, null}, 0f), + Arguments.of(ReductionAggregation.mergeSets(NullEquality.EQUAL, NaNEquality.UNEQUAL), + inputs, fpList, + new Float[]{-1f, 7f, 50f, 60f, 100f, Float.NaN, Float.NaN, null}, 0f), + Arguments.of(ReductionAggregation.mergeSets(), + inputs, fpList, + new Float[]{-1f, 7f, 50f, 60f, 100f, Float.NaN, Float.NaN, null, null}, 0f) ); } private static void assertEqualsDelta(ReductionAggregation op, Scalar expected, Scalar result, - Double percentage) { + Double percentage) { if (FLOAT_REDUCTIONS.contains(op.getWrapped().kind)) { assertEqualsWithinPercentage(expected.getDouble(), result.getDouble(), percentage); + } else if (expected.getType().typeId == DType.DTypeEnum.LIST) { + try (ColumnView e = expected.getListAsColumnView(); + ColumnView r = result.getListAsColumnView()) { + AssertUtils.assertColumnsAreEqual(e, r); + } } else { assertEquals(expected, result); } } private static void assertEqualsDelta(ReductionAggregation op, Scalar expected, Scalar result, - Float percentage) { + Float percentage) { if (FLOAT_REDUCTIONS.contains(op.getWrapped().kind)) { assertEqualsWithinPercentage(expected.getFloat(), result.getFloat(), percentage); + } else if (expected.getType().typeId == DType.DTypeEnum.LIST) { + try (ColumnView e = expected.getListAsColumnView(); + ColumnView r = result.getListAsColumnView()) { + AssertUtils.assertColumnsAreEqual(e, r); + } } else { assertEquals(expected, result); } @@ -255,8 +428,9 @@ private static void assertEqualsDelta(ReductionAggregation op, Scalar expected, @ParameterizedTest @MethodSource("createBooleanParams") - void testBoolean(ReductionAggregation op, Boolean[] values, Object expectedObject, Double delta) { - try (Scalar expected = buildExpectedScalar(op, DType.BOOL8, expectedObject); + void testBoolean(ReductionAggregation op, Boolean[] values, + HostColumnVector.DataType tpe, Object expectedObject, Double delta) { + try (Scalar expected = buildExpectedScalar(op, tpe, expectedObject); ColumnVector v = ColumnVector.fromBoxedBooleans(values); Scalar result = v.reduce(op, expected.getType())) { assertEqualsDelta(op, expected, result, delta); @@ -265,8 +439,9 @@ void testBoolean(ReductionAggregation op, Boolean[] values, Object expectedObjec @ParameterizedTest @MethodSource("createByteParams") - void testByte(ReductionAggregation op, Byte[] values, Object expectedObject, Double delta) { - try (Scalar expected = buildExpectedScalar(op, DType.INT8, expectedObject); + void testByte(ReductionAggregation op, Byte[] values, + HostColumnVector.DataType tpe, Object expectedObject, Double delta) { + try (Scalar expected = buildExpectedScalar(op, tpe, expectedObject); ColumnVector v = ColumnVector.fromBoxedBytes(values); Scalar result = v.reduce(op, expected.getType())) { assertEqualsDelta(op, expected, result, delta); @@ -275,8 +450,9 @@ void testByte(ReductionAggregation op, Byte[] values, Object expectedObject, Dou @ParameterizedTest @MethodSource("createShortParams") - void testShort(ReductionAggregation op, Short[] values, Object expectedObject, Double delta) { - try (Scalar expected = buildExpectedScalar(op, DType.INT16, expectedObject); + void testShort(ReductionAggregation op, Short[] values, + HostColumnVector.DataType tpe, Object expectedObject, Double delta) { + try (Scalar expected = buildExpectedScalar(op, tpe, expectedObject); ColumnVector v = ColumnVector.fromBoxedShorts(values); Scalar result = v.reduce(op, expected.getType())) { assertEqualsDelta(op, expected, result, delta); @@ -285,8 +461,9 @@ void testShort(ReductionAggregation op, Short[] values, Object expectedObject, D @ParameterizedTest @MethodSource("createIntParams") - void testInt(ReductionAggregation op, Integer[] values, Object expectedObject, Double delta) { - try (Scalar expected = buildExpectedScalar(op, DType.INT32, expectedObject); + void testInt(ReductionAggregation op, Integer[] values, + HostColumnVector.DataType tpe, Object expectedObject, Double delta) { + try (Scalar expected = buildExpectedScalar(op, tpe, expectedObject); ColumnVector v = ColumnVector.fromBoxedInts(values); Scalar result = v.reduce(op, expected.getType())) { assertEqualsDelta(op, expected, result, delta); @@ -295,8 +472,9 @@ void testInt(ReductionAggregation op, Integer[] values, Object expectedObject, D @ParameterizedTest @MethodSource("createLongParams") - void testLong(ReductionAggregation op, Long[] values, Object expectedObject, Double delta) { - try (Scalar expected = buildExpectedScalar(op, DType.INT64, expectedObject); + void testLong(ReductionAggregation op, Long[] values, + HostColumnVector.DataType tpe, Object expectedObject, Double delta) { + try (Scalar expected = buildExpectedScalar(op, tpe, expectedObject); ColumnVector v = ColumnVector.fromBoxedLongs(values); Scalar result = v.reduce(op, expected.getType())) { assertEqualsDelta(op, expected, result, delta); @@ -305,8 +483,9 @@ void testLong(ReductionAggregation op, Long[] values, Object expectedObject, Dou @ParameterizedTest @MethodSource("createFloatParams") - void testFloat(ReductionAggregation op, Float[] values, Object expectedObject, Float delta) { - try (Scalar expected = buildExpectedScalar(op, DType.FLOAT32, expectedObject); + void testFloat(ReductionAggregation op, Float[] values, + HostColumnVector.DataType tpe, Object expectedObject, Float delta) { + try (Scalar expected = buildExpectedScalar(op, tpe, expectedObject); ColumnVector v = ColumnVector.fromBoxedFloats(values); Scalar result = v.reduce(op, expected.getType())) { assertEqualsDelta(op, expected, result, delta); @@ -315,8 +494,9 @@ void testFloat(ReductionAggregation op, Float[] values, Object expectedObject, F @ParameterizedTest @MethodSource("createDoubleParams") - void testDouble(ReductionAggregation op, Double[] values, Object expectedObject, Double delta) { - try (Scalar expected = buildExpectedScalar(op, DType.FLOAT64, expectedObject); + void testDouble(ReductionAggregation op, Double[] values, + HostColumnVector.DataType tpe, Object expectedObject, Double delta) { + try (Scalar expected = buildExpectedScalar(op, tpe, expectedObject); ColumnVector v = ColumnVector.fromBoxedDoubles(values); Scalar result = v.reduce(op, expected.getType())) { assertEqualsDelta(op, expected, result, delta); @@ -325,8 +505,9 @@ void testDouble(ReductionAggregation op, Double[] values, Object expectedObject, @ParameterizedTest @MethodSource("createTimestampDaysParams") - void testTimestampDays(ReductionAggregation op, Integer[] values, Object expectedObject) { - try (Scalar expected = buildExpectedScalar(op, DType.TIMESTAMP_DAYS, expectedObject); + void testTimestampDays(ReductionAggregation op, Integer[] values, + HostColumnVector.DataType tpe, Object expectedObject) { + try (Scalar expected = buildExpectedScalar(op, tpe, expectedObject); ColumnVector v = ColumnVector.timestampDaysFromBoxedInts(values); Scalar result = v.reduce(op, expected.getType())) { assertEquals(expected, result); @@ -334,9 +515,10 @@ void testTimestampDays(ReductionAggregation op, Integer[] values, Object expecte } @ParameterizedTest - @MethodSource("createTimestampResolutionParams") - void testTimestampSeconds(ReductionAggregation op, Long[] values, Object expectedObject) { - try (Scalar expected = buildExpectedScalar(op, DType.TIMESTAMP_SECONDS, expectedObject); + @MethodSource("createTimestampSecondsParams") + void testTimestampSeconds(ReductionAggregation op, Long[] values, + HostColumnVector.DataType tpe, Object expectedObject) { + try (Scalar expected = buildExpectedScalar(op, tpe, expectedObject); ColumnVector v = ColumnVector.timestampSecondsFromBoxedLongs(values); Scalar result = v.reduce(op, expected.getType())) { assertEquals(expected, result); @@ -344,9 +526,10 @@ void testTimestampSeconds(ReductionAggregation op, Long[] values, Object expecte } @ParameterizedTest - @MethodSource("createTimestampResolutionParams") - void testTimestampMilliseconds(ReductionAggregation op, Long[] values, Object expectedObject) { - try (Scalar expected = buildExpectedScalar(op, DType.TIMESTAMP_MILLISECONDS, expectedObject); + @MethodSource("createTimestampMilliSecondsParams") + void testTimestampMilliseconds(ReductionAggregation op, Long[] values, + HostColumnVector.DataType tpe, Object expectedObject) { + try (Scalar expected = buildExpectedScalar(op, tpe, expectedObject); ColumnVector v = ColumnVector.timestampMilliSecondsFromBoxedLongs(values); Scalar result = v.reduce(op, expected.getType())) { assertEquals(expected, result); @@ -354,9 +537,10 @@ void testTimestampMilliseconds(ReductionAggregation op, Long[] values, Object ex } @ParameterizedTest - @MethodSource("createTimestampResolutionParams") - void testTimestampMicroseconds(ReductionAggregation op, Long[] values, Object expectedObject) { - try (Scalar expected = buildExpectedScalar(op, DType.TIMESTAMP_MICROSECONDS, expectedObject); + @MethodSource("createTimestampMicroSecondsParams") + void testTimestampMicroseconds(ReductionAggregation op, Long[] values, + HostColumnVector.DataType tpe, Object expectedObject) { + try (Scalar expected = buildExpectedScalar(op, tpe, expectedObject); ColumnVector v = ColumnVector.timestampMicroSecondsFromBoxedLongs(values); Scalar result = v.reduce(op, expected.getType())) { assertEquals(expected, result); @@ -364,15 +548,29 @@ void testTimestampMicroseconds(ReductionAggregation op, Long[] values, Object ex } @ParameterizedTest - @MethodSource("createTimestampResolutionParams") - void testTimestampNanoseconds(ReductionAggregation op, Long[] values, Object expectedObject) { - try (Scalar expected = buildExpectedScalar(op, DType.TIMESTAMP_NANOSECONDS, expectedObject); + @MethodSource("createTimestampNanoSecondsParams") + void testTimestampNanoseconds(ReductionAggregation op, Long[] values, + HostColumnVector.DataType tpe, Object expectedObject) { + try (Scalar expected = buildExpectedScalar(op, tpe, expectedObject); ColumnVector v = ColumnVector.timestampNanoSecondsFromBoxedLongs(values); Scalar result = v.reduce(op, expected.getType())) { assertEquals(expected, result); } } + @ParameterizedTest + @MethodSource("createFloatArrayParams") + void testFloatArray(ReductionAggregation op, List[] values, + HostColumnVector.DataType tpe, Object expectedObject, Float delta) { + HostColumnVector.DataType listType = new HostColumnVector.ListType( + true, new HostColumnVector.BasicType(true, DType.FLOAT32)); + try (Scalar expected = buildExpectedScalar(op, tpe, expectedObject); + ColumnVector v = ColumnVector.fromLists(listType, values); + Scalar result = v.reduce(op, expected.getType())) { + assertEqualsDelta(op, expected, result, delta); + } + } + @Test void testWithSetOutputType() { try (Scalar expected = Scalar.fromLong(1 * 2 * 3 * 4L); @@ -387,13 +585,13 @@ void testWithSetOutputType() { assertEquals(expected, result); } - try (Scalar expected = Scalar.fromLong((1*1L) + (2*2L) + (3*3L) + (4*4L)); + try (Scalar expected = Scalar.fromLong((1 * 1L) + (2 * 2L) + (3 * 3L) + (4 * 4L)); ColumnVector cv = ColumnVector.fromBytes(new byte[]{1, 2, 3, 4}); Scalar result = cv.sumOfSquares(DType.INT64)) { assertEquals(expected, result); } - try (Scalar expected = Scalar.fromFloat((1 + 2 + 3 + 4f)/4); + try (Scalar expected = Scalar.fromFloat((1 + 2 + 3 + 4f) / 4); ColumnVector cv = ColumnVector.fromBytes(new byte[]{1, 2, 3, 4}); Scalar result = cv.mean(DType.FLOAT32)) { assertEquals(expected, result);