Skip to content

Commit

Permalink
builtins: use Youngs-Crammer algorithm for aggregation functions
Browse files Browse the repository at this point in the history
This commit replaces existing algorithm for correlation calculation with the Youngs-Crammer algorithm ported from Postgresql to reduce rounding errors. It introduces the base structure for aggregate functions for statistics.
It also amends aggregates builtin tests so functions with several arguments can be tested.

Release note: None
  • Loading branch information
mneverov committed Oct 16, 2020
1 parent 88b4341 commit 6b24abc
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 101 deletions.
26 changes: 13 additions & 13 deletions pkg/sql/logictest/testdata/logic_test/aggregate
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ CREATE TABLE kv (
)

# Aggregate functions return NULL if there are no rows.
query IIIIRRRRRRRRBBTII
SELECT min(1), max(1), count(1), sum_int(1), avg(1), sum(1), stddev(1), stddev_samp(1), stddev_pop(1), var_samp(1), variance(1), var_pop(1), bool_and(true), bool_and(false), xor_agg(b'\x01'), bit_and(1), bit_or(1) FROM kv
query IIIIRRRRRRRRBBTIIR
SELECT min(1), max(1), count(1), sum_int(1), avg(1), sum(1), stddev(1), stddev_samp(1), stddev_pop(1), var_samp(1), variance(1), var_pop(1), bool_and(true), bool_and(false), xor_agg(b'\x01'), bit_and(1), bit_or(1), corr(1, 1) FROM kv
----
NULL NULL 0 NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL
NULL NULL 0 NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL

# Regression test for #29695
query T
Expand Down Expand Up @@ -42,10 +42,10 @@ SELECT min(i), avg(i), max(i), sum(i) FROM kv
----
NULL NULL NULL NULL

query IIIIRRRRRRBBT
SELECT min(v), max(v), count(v), sum_int(1), avg(v), sum(v), stddev(v), stddev_pop(v), variance(v), var_pop(v), bool_and(v = 1), bool_and(v = 1), xor_agg(s::bytes) FROM kv
query IIIIRRRRRRBBTR
SELECT min(v), max(v), count(v), sum_int(1), avg(v), sum(v), stddev(v), stddev_pop(v), variance(v), var_pop(v), bool_and(v = 1), bool_and(v = 1), xor_agg(s::bytes), corr(v,k) FROM kv
----
NULL NULL 0 NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL
NULL NULL 0 NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL

query T
SELECT array_agg(v) FROM kv
Expand All @@ -63,10 +63,10 @@ SELECT jsonb_agg(v) FROM kv
NULL

# Aggregate functions triggers aggregation and computation when there is no source.
query IIIIRRRRRRBBT
SELECT min(1), count(1), max(1), sum_int(1), avg(1)::float, sum(1), stddev_samp(1), stddev_pop(1), variance(1), var_pop(1), bool_and(true), bool_or(true), to_hex(xor_agg(b'\x01'))
query IIIIRRRRRRBBTR
SELECT min(1), count(1), max(1), sum_int(1), avg(1)::float, sum(1), stddev_samp(1), stddev_pop(1), variance(1), var_pop(1), bool_and(true), bool_or(true), to_hex(xor_agg(b'\x01')), corr(1, 2)
----
1 1 1 1 1 1 NULL 0 NULL 0 true true 01
1 1 1 1 1 1 NULL 0 NULL 0 true true 01 NULL

# Aggregate functions triggers aggregation and computation when there is no source.
query T
Expand Down Expand Up @@ -1325,17 +1325,17 @@ INSERT INTO statistics_agg_test (y, x, int_y, int_x) VALUES
(4.0, 100.0, 4, 100),
(NULL, NULL, NULL, NULL)

query RRRR
query FFFF
SELECT corr(y, x)::decimal, corr(int_y, int_x)::decimal, corr(y, int_x)::decimal, corr(int_y, x)::decimal FROM statistics_agg_test
----
0.933007822647968 0.933007822647968 0.933007822647968 0.933007822647968
0.9330078226479681 0.9330078226479681 0.9330078226479681 0.9330078226479681

query R
query F
SELECT corr(DISTINCT y, x)::decimal FROM statistics_agg_test
----
0.9326733179802503

query R
query F
SELECT CAST(corr(DISTINCT y, x) FILTER (WHERE x > 3 AND y < 30) AS decimal) FROM statistics_agg_test
----
0.9326733179802503
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/distsql_agg
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ CREATE TABLE statistics_agg_test (y INT, x INT)
statement ok
INSERT INTO statistics_agg_test SELECT y, y%10 FROM generate_series(1, 100) AS y

query R
query F
SELECT corr(y, x)::decimal FROM statistics_agg_test
----
0.045228963191363145
Expand Down
198 changes: 118 additions & 80 deletions pkg/sql/sem/builtins/aggregate_builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -1736,38 +1736,32 @@ func (a *boolOrAggregate) Size() int64 {
return sizeOfBoolOrAggregate
}

// corrAggregate represents SQL:2003 correlation coefficient.
// regressionAccumulateBase is a base struct for the aggregate functions for statistics.
// It represents a transition datatype for these functions.
// Ported from Postgresql (see https://github.com/postgres/postgres/blob/bc1fbc960bf5efbb692f4d1bf91bf9bc6390425a/src/backend/utils/adt/float.c#L3277).
//
// n be count of rows.
// sx be the sum of the column of values of <independent variable expression>
// sx2 be the sum of the squares of values in the <independent variable expression> column
// sy be the sum of the column of values of <dependent variable expression>
// sy2 be the sum of the squares of values in the <dependent variable expression> column
// sxy be the sum of the row-wise products of the value in the <independent variable expression>
// column times the value in the <dependent variable expression> column.
// The Youngs-Cramer algorithm is used to reduce rounding errors in the aggregate final functions.
//
// result:
// 1) If n*sx2 equals sx*sx, then the result is the null value.
// 2) If n*sy2 equals sy*sy, then the result is the null value.
// 3) Otherwise, the resut is SQRT(POWER(n*sxy-sx*sy,2) / ((n*sx2-sx*sx)*(n*sy2-sy*sy))).
// If the exponent of the approximate mathematical result of the operation is not within
// the implementation-defined exponent range for the result data type, then the result
// is the null value.
type corrAggregate struct {
n int
// Note that Y is the first argument to all these aggregates!
//
// It might seem attractive to optimize this by having multiple accumulator
// functions that only calculate the sums actually needed. But on most
// modern machines, a couple of extra floating-point multiplies will be
// insignificant compared to the other per-tuple overhead, so I've chosen
// to minimize code space instead.
type regressionAccumulateBase struct {
n float64
sx float64
sx2 float64
sxx float64
sy float64
sy2 float64
syy float64
sxy float64
}

func newCorrAggregate([]*types.T, *tree.EvalContext, tree.Datums) tree.AggregateFunc {
return &corrAggregate{}
}

// Add implements tree.AggregateFunc interface.
func (a *corrAggregate) Add(_ context.Context, datumY tree.Datum, otherArgs ...tree.Datum) error {
func (a *regressionAccumulateBase) Add(
_ context.Context, datumY tree.Datum, otherArgs ...tree.Datum,
) error {
if datumY == tree.DNull {
return nil
}
Expand All @@ -1787,77 +1781,100 @@ func (a *corrAggregate) Add(_ context.Context, datumY tree.Datum, otherArgs ...t
return err
}

a.n++
a.sx += x
a.sy += y
a.sx2 += x * x
a.sy2 += y * y
a.sxy += x * y

if math.IsInf(a.sx, 0) ||
math.IsInf(a.sx2, 0) ||
math.IsInf(a.sy, 0) ||
math.IsInf(a.sy2, 0) ||
math.IsInf(a.sxy, 0) {
return tree.ErrFloatOutOfRange
}

return nil
}

// Result implements tree.AggregateFunc interface.
func (a *corrAggregate) Result() (tree.Datum, error) {
if a.n < 1 {
return tree.DNull, nil
}

if a.sx2 == 0 || a.sy2 == 0 {
return tree.DNull, nil
}

floatN := float64(a.n)

numeratorX := floatN*a.sx2 - a.sx*a.sx
if math.IsInf(numeratorX, 0) {
return tree.DNull, pgerror.New(pgcode.NumericValueOutOfRange, "float out of range")
}

numeratorY := floatN*a.sy2 - a.sy*a.sy
if math.IsInf(numeratorY, 0) {
return tree.DNull, pgerror.New(pgcode.NumericValueOutOfRange, "float out of range")
}

numeratorXY := floatN*a.sxy - a.sx*a.sy
if math.IsInf(numeratorXY, 0) {
return tree.DNull, pgerror.New(pgcode.NumericValueOutOfRange, "float out of range")
}

if numeratorX <= 0 || numeratorY <= 0 {
return tree.DNull, nil
}

return tree.NewDFloat(tree.DFloat(numeratorXY / math.Sqrt(numeratorX*numeratorY))), nil
return a.add(y, x)
}

// Reset implements tree.AggregateFunc interface.
func (a *corrAggregate) Reset(context.Context) {
func (a *regressionAccumulateBase) Reset(context.Context) {
a.n = 0
a.sx = 0
a.sx2 = 0
a.sxx = 0
a.sy = 0
a.sy2 = 0
a.syy = 0
a.sxy = 0
}

// Close implements tree.AggregateFunc interface.
func (a *corrAggregate) Close(context.Context) {}
func (a *regressionAccumulateBase) Close(context.Context) {}

// Size implements tree.AggregateFunc interface.
func (a *corrAggregate) Size() int64 {
func (a *regressionAccumulateBase) Size() int64 {
return sizeOfCorrAggregate
}

func (a *corrAggregate) float64Val(datum tree.Datum) (float64, error) {
func (a *regressionAccumulateBase) add(y float64, x float64) error {
n := a.n
sx := a.sx
sxx := a.sxx
sy := a.sy
syy := a.syy
sxy := a.sxy

// Use the Youngs-Cramer algorithm to incorporate the new values into the
// transition values.
n++
sx += x
sy += y

if a.n > 0 {
tmpX := x*n - sx
tmpY := y*n - sy
scale := 1.0 / (n * a.n)
sxx += tmpX * tmpX * scale
syy += tmpY * tmpY * scale
sxy += tmpX * tmpY * scale

// Overflow check. We only report an overflow error when finite
// inputs lead to infinite results. Note also that sxx, syy and Sxy
// should be NaN if any of the relevant inputs are infinite, so we
// intentionally prevent them from becoming infinite.
if math.IsInf(sx, 0) || math.IsInf(sxx, 0) || math.IsInf(sy, 0) || math.IsInf(syy, 0) || math.IsInf(sxy, 0) {
if ((math.IsInf(sx, 0) || math.IsInf(sxx, 0)) &&
!math.IsInf(a.sx, 0) && !math.IsInf(x, 0)) ||
((math.IsInf(sy, 0) || math.IsInf(syy, 0)) &&
!math.IsInf(a.sy, 0) && !math.IsInf(y, 0)) ||
(math.IsInf(sxy, 0) &&
!math.IsInf(a.sx, 0) && !math.IsInf(x, 0) &&
!math.IsInf(a.sy, 0) && !math.IsInf(y, 0)) {
return tree.ErrFloatOutOfRange
}

if math.IsInf(sxx, 0) {
sxx = math.NaN()
}
if math.IsInf(syy, 0) {
syy = math.NaN()
}
if math.IsInf(sxy, 0) {
sxy = math.NaN()
}
}
} else {
// At the first input, we normally can leave Sxx et al as 0. However,
// if the first input is Inf or NaN, we'd better force the dependent
// sums to NaN; otherwise we will falsely report variance zero when
// there are no more inputs.
if math.IsNaN(x) || math.IsInf(x, 0) {
a.sxx = math.NaN()
a.sxy = math.NaN()
}
if math.IsNaN(y) || math.IsInf(y, 0) {
a.syy = math.NaN()
a.sxy = math.NaN()
}
}

a.n = n
a.sx = sx
a.sy = sy
a.sxx = sxx
a.syy = syy
a.sxy = sxy

return nil
}

func (a *regressionAccumulateBase) float64Val(datum tree.Datum) (float64, error) {
switch val := datum.(type) {
case *tree.DFloat:
return float64(*val), nil
Expand All @@ -1868,6 +1885,27 @@ func (a *corrAggregate) float64Val(datum tree.Datum) (float64, error) {
}
}

// corrAggregate represents SQL:2003 correlation coefficient.
type corrAggregate struct {
regressionAccumulateBase
}

func newCorrAggregate([]*types.T, *tree.EvalContext, tree.Datums) tree.AggregateFunc {
return &corrAggregate{}
}

// Result implements tree.AggregateFunc interface.
func (a *corrAggregate) Result() (tree.Datum, error) {
if a.n < 1 {
return tree.DNull, nil
}

if a.sxx == 0 || a.syy == 0 {
return tree.DNull, nil
}
return tree.NewDFloat(tree.DFloat(a.sxy / math.Sqrt(a.sxx*a.syy))), nil
}

type countAggregate struct {
count int
}
Expand Down
Loading

0 comments on commit 6b24abc

Please sign in to comment.