diff --git a/pkg/sql/logictest/testdata/logic_test/aggregate b/pkg/sql/logictest/testdata/logic_test/aggregate index 9b2a879b9840..0ac8d0cfdb96 100644 --- a/pkg/sql/logictest/testdata/logic_test/aggregate +++ b/pkg/sql/logictest/testdata/logic_test/aggregate @@ -10,10 +10,10 @@ CREATE TABLE kv ( ) # Aggregate functions return NULL if there are no rows. -query IIIIRRRRRRRRBBTII -SELECT min(1), max(1), count(1), sum_int(1), avg(1), sum(1), stddev(1), stddev_samp(1), stddev_pop(1), var_samp(1), variance(1), var_pop(1), bool_and(true), bool_and(false), xor_agg(b'\x01'), bit_and(1), bit_or(1) FROM kv +query IIIIRRRRRRRRBBTIIR +SELECT min(1), max(1), count(1), sum_int(1), avg(1), sum(1), stddev(1), stddev_samp(1), stddev_pop(1), var_samp(1), variance(1), var_pop(1), bool_and(true), bool_and(false), xor_agg(b'\x01'), bit_and(1), bit_or(1), corr(1, 1) FROM kv ---- -NULL NULL 0 NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL +NULL NULL 0 NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL # Regression test for #29695 query T @@ -42,10 +42,10 @@ SELECT min(i), avg(i), max(i), sum(i) FROM kv ---- NULL NULL NULL NULL -query IIIIRRRRRRBBT -SELECT min(v), max(v), count(v), sum_int(1), avg(v), sum(v), stddev(v), stddev_pop(v), variance(v), var_pop(v), bool_and(v = 1), bool_and(v = 1), xor_agg(s::bytes) FROM kv +query IIIIRRRRRRBBTR +SELECT min(v), max(v), count(v), sum_int(1), avg(v), sum(v), stddev(v), stddev_pop(v), variance(v), var_pop(v), bool_and(v = 1), bool_and(v = 1), xor_agg(s::bytes), corr(v,k) FROM kv ---- -NULL NULL 0 NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL +NULL NULL 0 NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL query T SELECT array_agg(v) FROM kv @@ -63,10 +63,10 @@ SELECT jsonb_agg(v) FROM kv NULL # Aggregate functions triggers aggregation and computation when there is no source. -query IIIIRRRRRRBBT -SELECT min(1), count(1), max(1), sum_int(1), avg(1)::float, sum(1), stddev_samp(1), stddev_pop(1), variance(1), var_pop(1), bool_and(true), bool_or(true), to_hex(xor_agg(b'\x01')) +query IIIIRRRRRRBBTR +SELECT min(1), count(1), max(1), sum_int(1), avg(1)::float, sum(1), stddev_samp(1), stddev_pop(1), variance(1), var_pop(1), bool_and(true), bool_or(true), to_hex(xor_agg(b'\x01')), corr(1, 2) ---- -1 1 1 1 1 1 NULL 0 NULL 0 true true 01 +1 1 1 1 1 1 NULL 0 NULL 0 true true 01 NULL # Aggregate functions triggers aggregation and computation when there is no source. query T @@ -1325,17 +1325,17 @@ INSERT INTO statistics_agg_test (y, x, int_y, int_x) VALUES (4.0, 100.0, 4, 100), (NULL, NULL, NULL, NULL) -query RRRR +query FFFF SELECT corr(y, x)::decimal, corr(int_y, int_x)::decimal, corr(y, int_x)::decimal, corr(int_y, x)::decimal FROM statistics_agg_test ---- -0.933007822647968 0.933007822647968 0.933007822647968 0.933007822647968 +0.9330078226479681 0.9330078226479681 0.9330078226479681 0.9330078226479681 -query R +query F SELECT corr(DISTINCT y, x)::decimal FROM statistics_agg_test ---- 0.9326733179802503 -query R +query F SELECT CAST(corr(DISTINCT y, x) FILTER (WHERE x > 3 AND y < 30) AS decimal) FROM statistics_agg_test ---- 0.9326733179802503 diff --git a/pkg/sql/logictest/testdata/logic_test/distsql_agg b/pkg/sql/logictest/testdata/logic_test/distsql_agg index ebe6e2315f9c..28baa700fe0b 100644 --- a/pkg/sql/logictest/testdata/logic_test/distsql_agg +++ b/pkg/sql/logictest/testdata/logic_test/distsql_agg @@ -572,7 +572,7 @@ CREATE TABLE statistics_agg_test (y INT, x INT) statement ok INSERT INTO statistics_agg_test SELECT y, y%10 FROM generate_series(1, 100) AS y -query R +query F SELECT corr(y, x)::decimal FROM statistics_agg_test ---- 0.045228963191363145 diff --git a/pkg/sql/sem/builtins/aggregate_builtins.go b/pkg/sql/sem/builtins/aggregate_builtins.go index de976922d790..0e3462bed75b 100644 --- a/pkg/sql/sem/builtins/aggregate_builtins.go +++ b/pkg/sql/sem/builtins/aggregate_builtins.go @@ -1736,38 +1736,32 @@ func (a *boolOrAggregate) Size() int64 { return sizeOfBoolOrAggregate } -// corrAggregate represents SQL:2003 correlation coefficient. +// regressionAccumulateBase is a base struct for the aggregate functions for statistics. +// It represents a transition datatype for these functions. +// Ported from Postgresql (see https://github.com/postgres/postgres/blob/bc1fbc960bf5efbb692f4d1bf91bf9bc6390425a/src/backend/utils/adt/float.c#L3277). // -// n be count of rows. -// sx be the sum of the column of values of -// sx2 be the sum of the squares of values in the column -// sy be the sum of the column of values of -// sy2 be the sum of the squares of values in the column -// sxy be the sum of the row-wise products of the value in the -// column times the value in the column. +// The Youngs-Cramer algorithm is used to reduce rounding errors in the aggregate final functions. // -// result: -// 1) If n*sx2 equals sx*sx, then the result is the null value. -// 2) If n*sy2 equals sy*sy, then the result is the null value. -// 3) Otherwise, the resut is SQRT(POWER(n*sxy-sx*sy,2) / ((n*sx2-sx*sx)*(n*sy2-sy*sy))). -// If the exponent of the approximate mathematical result of the operation is not within -// the implementation-defined exponent range for the result data type, then the result -// is the null value. -type corrAggregate struct { - n int +// Note that Y is the first argument to all these aggregates! +// +// It might seem attractive to optimize this by having multiple accumulator +// functions that only calculate the sums actually needed. But on most +// modern machines, a couple of extra floating-point multiplies will be +// insignificant compared to the other per-tuple overhead, so I've chosen +// to minimize code space instead. +type regressionAccumulateBase struct { + n float64 sx float64 - sx2 float64 + sxx float64 sy float64 - sy2 float64 + syy float64 sxy float64 } -func newCorrAggregate([]*types.T, *tree.EvalContext, tree.Datums) tree.AggregateFunc { - return &corrAggregate{} -} - // Add implements tree.AggregateFunc interface. -func (a *corrAggregate) Add(_ context.Context, datumY tree.Datum, otherArgs ...tree.Datum) error { +func (a *regressionAccumulateBase) Add( + _ context.Context, datumY tree.Datum, otherArgs ...tree.Datum, +) error { if datumY == tree.DNull { return nil } @@ -1787,77 +1781,100 @@ func (a *corrAggregate) Add(_ context.Context, datumY tree.Datum, otherArgs ...t return err } - a.n++ - a.sx += x - a.sy += y - a.sx2 += x * x - a.sy2 += y * y - a.sxy += x * y - - if math.IsInf(a.sx, 0) || - math.IsInf(a.sx2, 0) || - math.IsInf(a.sy, 0) || - math.IsInf(a.sy2, 0) || - math.IsInf(a.sxy, 0) { - return tree.ErrFloatOutOfRange - } - - return nil -} - -// Result implements tree.AggregateFunc interface. -func (a *corrAggregate) Result() (tree.Datum, error) { - if a.n < 1 { - return tree.DNull, nil - } - - if a.sx2 == 0 || a.sy2 == 0 { - return tree.DNull, nil - } - - floatN := float64(a.n) - - numeratorX := floatN*a.sx2 - a.sx*a.sx - if math.IsInf(numeratorX, 0) { - return tree.DNull, pgerror.New(pgcode.NumericValueOutOfRange, "float out of range") - } - - numeratorY := floatN*a.sy2 - a.sy*a.sy - if math.IsInf(numeratorY, 0) { - return tree.DNull, pgerror.New(pgcode.NumericValueOutOfRange, "float out of range") - } - - numeratorXY := floatN*a.sxy - a.sx*a.sy - if math.IsInf(numeratorXY, 0) { - return tree.DNull, pgerror.New(pgcode.NumericValueOutOfRange, "float out of range") - } - - if numeratorX <= 0 || numeratorY <= 0 { - return tree.DNull, nil - } - - return tree.NewDFloat(tree.DFloat(numeratorXY / math.Sqrt(numeratorX*numeratorY))), nil + return a.add(y, x) } // Reset implements tree.AggregateFunc interface. -func (a *corrAggregate) Reset(context.Context) { +func (a *regressionAccumulateBase) Reset(context.Context) { a.n = 0 a.sx = 0 - a.sx2 = 0 + a.sxx = 0 a.sy = 0 - a.sy2 = 0 + a.syy = 0 a.sxy = 0 } // Close implements tree.AggregateFunc interface. -func (a *corrAggregate) Close(context.Context) {} +func (a *regressionAccumulateBase) Close(context.Context) {} // Size implements tree.AggregateFunc interface. -func (a *corrAggregate) Size() int64 { +func (a *regressionAccumulateBase) Size() int64 { return sizeOfCorrAggregate } -func (a *corrAggregate) float64Val(datum tree.Datum) (float64, error) { +func (a *regressionAccumulateBase) add(y float64, x float64) error { + n := a.n + sx := a.sx + sxx := a.sxx + sy := a.sy + syy := a.syy + sxy := a.sxy + + // Use the Youngs-Cramer algorithm to incorporate the new values into the + // transition values. + n++ + sx += x + sy += y + + if a.n > 0 { + tmpX := x*n - sx + tmpY := y*n - sy + scale := 1.0 / (n * a.n) + sxx += tmpX * tmpX * scale + syy += tmpY * tmpY * scale + sxy += tmpX * tmpY * scale + + // Overflow check. We only report an overflow error when finite + // inputs lead to infinite results. Note also that sxx, syy and Sxy + // should be NaN if any of the relevant inputs are infinite, so we + // intentionally prevent them from becoming infinite. + if math.IsInf(sx, 0) || math.IsInf(sxx, 0) || math.IsInf(sy, 0) || math.IsInf(syy, 0) || math.IsInf(sxy, 0) { + if ((math.IsInf(sx, 0) || math.IsInf(sxx, 0)) && + !math.IsInf(a.sx, 0) && !math.IsInf(x, 0)) || + ((math.IsInf(sy, 0) || math.IsInf(syy, 0)) && + !math.IsInf(a.sy, 0) && !math.IsInf(y, 0)) || + (math.IsInf(sxy, 0) && + !math.IsInf(a.sx, 0) && !math.IsInf(x, 0) && + !math.IsInf(a.sy, 0) && !math.IsInf(y, 0)) { + return tree.ErrFloatOutOfRange + } + + if math.IsInf(sxx, 0) { + sxx = math.NaN() + } + if math.IsInf(syy, 0) { + syy = math.NaN() + } + if math.IsInf(sxy, 0) { + sxy = math.NaN() + } + } + } else { + // At the first input, we normally can leave Sxx et al as 0. However, + // if the first input is Inf or NaN, we'd better force the dependent + // sums to NaN; otherwise we will falsely report variance zero when + // there are no more inputs. + if math.IsNaN(x) || math.IsInf(x, 0) { + a.sxx = math.NaN() + a.sxy = math.NaN() + } + if math.IsNaN(y) || math.IsInf(y, 0) { + a.syy = math.NaN() + a.sxy = math.NaN() + } + } + + a.n = n + a.sx = sx + a.sy = sy + a.sxx = sxx + a.syy = syy + a.sxy = sxy + + return nil +} + +func (a *regressionAccumulateBase) float64Val(datum tree.Datum) (float64, error) { switch val := datum.(type) { case *tree.DFloat: return float64(*val), nil @@ -1868,6 +1885,27 @@ func (a *corrAggregate) float64Val(datum tree.Datum) (float64, error) { } } +// corrAggregate represents SQL:2003 correlation coefficient. +type corrAggregate struct { + regressionAccumulateBase +} + +func newCorrAggregate([]*types.T, *tree.EvalContext, tree.Datums) tree.AggregateFunc { + return &corrAggregate{} +} + +// Result implements tree.AggregateFunc interface. +func (a *corrAggregate) Result() (tree.Datum, error) { + if a.n < 1 { + return tree.DNull, nil + } + + if a.sxx == 0 || a.syy == 0 { + return tree.DNull, nil + } + return tree.NewDFloat(tree.DFloat(a.sxy / math.Sqrt(a.sxx*a.syy))), nil +} + type countAggregate struct { count int } diff --git a/pkg/sql/sem/builtins/aggregate_builtins_test.go b/pkg/sql/sem/builtins/aggregate_builtins_test.go index 566710f3f133..679f77bd5261 100644 --- a/pkg/sql/sem/builtins/aggregate_builtins_test.go +++ b/pkg/sql/sem/builtins/aggregate_builtins_test.go @@ -34,16 +34,25 @@ import ( func testAggregateResultDeepCopy( t *testing.T, aggFunc func([]*types.T, *tree.EvalContext, tree.Datums) tree.AggregateFunc, - vals []tree.Datum, + firstArgs []tree.Datum, + otherArgs ...[]tree.Datum, ) { evalCtx := tree.NewTestingEvalContext(cluster.MakeTestingClusterSettings()) defer evalCtx.Stop(context.Background()) - aggImpl := aggFunc([]*types.T{vals[0].ResolvedType()}, evalCtx, nil) + argTypes := []*types.T{firstArgs[0].ResolvedType()} + otherArgs = pivotArgs(otherArgs...) + if len(otherArgs) == 0 { + otherArgs = make([][]tree.Datum, len(firstArgs)) + } + for i := range otherArgs[0] { + argTypes = append(argTypes, otherArgs[0][i].ResolvedType()) + } + aggImpl := aggFunc(argTypes, evalCtx, nil) defer aggImpl.Close(context.Background()) - runningDatums := make([]tree.Datum, len(vals)) - runningStrings := make([]string, len(vals)) - for i := range vals { - if err := aggImpl.Add(context.Background(), vals[i]); err != nil { + runningDatums := make([]tree.Datum, len(firstArgs)) + runningStrings := make([]string, len(firstArgs)) + for i := range firstArgs { + if err := aggImpl.Add(context.Background(), firstArgs[i], otherArgs[i]...); err != nil { t.Fatal(err) } res, err := aggImpl.Result() @@ -53,7 +62,7 @@ func testAggregateResultDeepCopy( runningDatums[i] = res runningStrings[i] = res.String() } - finalStrings := make([]string, len(vals)) + finalStrings := make([]string, len(firstArgs)) for i, d := range runningDatums { finalStrings[i] = d.String() } @@ -63,6 +72,21 @@ func testAggregateResultDeepCopy( } } +func pivotArgs(args ...[]tree.Datum) [][]tree.Datum { + if len(args) == 0 { + return nil + } + res := make([][]tree.Datum, len(args[0])) + + for i := range args { + for j := range args[i] { + res[j] = append(res[j], args[i][j]) + } + } + + return res +} + func TestAvgIntResultDeepCopy(t *testing.T) { defer leaktest.AfterTest(t)() testAggregateResultDeepCopy(t, newIntAvgAggregate, makeIntTestDatum(10)) @@ -277,6 +301,39 @@ func TestStdDevPopDecimalResultDeepCopy(t *testing.T) { testAggregateResultDeepCopy(t, newDecimalStdDevPopAggregate, makeDecimalTestDatum(10)) } +func TestCorrFloatFloatResultDeepCopy(t *testing.T) { + defer leaktest.AfterTest(t)() + testAggregateResultDeepCopy(t, newCorrAggregate, makeFloatTestDatum(10), makeFloatTestDatum(10)) +} + +func TestCorrIntIntResultDeepCopy(t *testing.T) { + defer leaktest.AfterTest(t)() + testAggregateResultDeepCopy(t, newCorrAggregate, makeIntTestDatum(10), makeIntTestDatum(10)) +} + +func TestCorrFloatIntResultDeepCopy(t *testing.T) { + defer leaktest.AfterTest(t)() + testAggregateResultDeepCopy(t, newCorrAggregate, makeFloatTestDatum(10), makeIntTestDatum(10)) +} + +func TestCorrIntFloatResultDeepCopy(t *testing.T) { + defer leaktest.AfterTest(t)() + testAggregateResultDeepCopy(t, newCorrAggregate, makeIntTestDatum(10), makeFloatTestDatum(10)) +} + +func TestCorrNullResultDeepCopy(t *testing.T) { + defer leaktest.AfterTest(t)() + t.Run("all null", func(t *testing.T) { + testAggregateResultDeepCopy(t, newCorrAggregate, makeNullTestDatum(10), makeNullTestDatum(10)) + }) + t.Run("with first arg null", func(t *testing.T) { + testAggregateResultDeepCopy(t, newCorrAggregate, makeTestWithNullDatum(10, makeIntTestDatum), makeIntTestDatum(10)) + }) + t.Run("with other arg null", func(t *testing.T) { + testAggregateResultDeepCopy(t, newCorrAggregate, makeIntTestDatum(10), makeTestWithNullDatum(10, makeIntTestDatum)) + }) +} + // makeNullTestDatum will create an array of only DNull // values to make sure the aggregation handles only nulls. func makeNullTestDatum(count int) []tree.Datum {