From 8d518efaabdafd016a703ffe7f30a8a76049d5e5 Mon Sep 17 00:00:00 2001 From: Oliver Tan Date: Mon, 28 Sep 2020 13:31:13 -0700 Subject: [PATCH] builtins: add memory accounting for ST_MakeLine/ST_Union aggregates Release note: None --- pkg/sql/sem/builtins/aggregate_builtins.go | 41 +++++++++++++++++----- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/pkg/sql/sem/builtins/aggregate_builtins.go b/pkg/sql/sem/builtins/aggregate_builtins.go index ae5f0475a6b5..de976922d790 100644 --- a/pkg/sql/sem/builtins/aggregate_builtins.go +++ b/pkg/sql/sem/builtins/aggregate_builtins.go @@ -372,7 +372,9 @@ var aggregates = map[string]builtinDefinition{ func( params []*types.T, evalCtx *tree.EvalContext, arguments tree.Datums, ) tree.AggregateFunc { - return &stMakeLineAgg{} + return &stMakeLineAgg{ + acc: evalCtx.Mon.MakeBoundAccount(), + } }, infoBuilder{ info: "Forms a LineString from Point, MultiPoint or LineStrings. Other shapes will be ignored.", @@ -627,7 +629,9 @@ func makeSTUnionBuiltin() builtinDefinition { func( params []*types.T, evalCtx *tree.EvalContext, arguments tree.Datums, ) tree.AggregateFunc { - return &stUnionAgg{} + return &stUnionAgg{ + acc: evalCtx.Mon.MakeBoundAccount(), + } }, infoBuilder{ info: "Applies a spatial union to the geometries provided.", @@ -640,11 +644,12 @@ func makeSTUnionBuiltin() builtinDefinition { type stMakeLineAgg struct { flatCoords []float64 layout geom.Layout + acc mon.BoundAccount } // Add implements the AggregateFunc interface. func (agg *stMakeLineAgg) Add( - _ context.Context, firstArg tree.Datum, otherArgs ...tree.Datum, + ctx context.Context, firstArg tree.Datum, otherArgs ...tree.Datum, ) error { if firstArg == tree.DNull { return nil @@ -667,6 +672,9 @@ func (agg *stMakeLineAgg) Add( } switch g.(type) { case *geom.Point, *geom.LineString, *geom.MultiPoint: + if err := agg.acc.Grow(ctx, int64(len(g.FlatCoords())*8)); err != nil { + return err + } agg.flatCoords = append(agg.flatCoords, g.FlatCoords()...) } return nil @@ -685,12 +693,15 @@ func (agg *stMakeLineAgg) Result() (tree.Datum, error) { } // Reset implements the AggregateFunc interface. -func (agg *stMakeLineAgg) Reset(context.Context) { +func (agg *stMakeLineAgg) Reset(ctx context.Context) { agg.flatCoords = agg.flatCoords[:0] + agg.acc.Empty(ctx) } // Close implements the AggregateFunc interface. -func (agg *stMakeLineAgg) Close(context.Context) {} +func (agg *stMakeLineAgg) Close(ctx context.Context) { + agg.acc.Close(ctx) +} // Size implements the AggregateFunc interface. func (agg *stMakeLineAgg) Size() int64 { @@ -701,11 +712,14 @@ type stUnionAgg struct { srid geopb.SRID // TODO(#geo): store the current union object in C memory, to avoid the EWKB round trips. ewkb geopb.EWKB + acc mon.BoundAccount set bool } // Add implements the AggregateFunc interface. -func (agg *stUnionAgg) Add(_ context.Context, firstArg tree.Datum, otherArgs ...tree.Datum) error { +func (agg *stUnionAgg) Add( + ctx context.Context, firstArg tree.Datum, otherArgs ...tree.Datum, +) error { if firstArg == tree.DNull { return nil } @@ -723,12 +737,18 @@ func (agg *stUnionAgg) Add(_ context.Context, firstArg tree.Datum, otherArgs ... } return geo.NewMismatchingSRIDsError(geomArg.Geometry.SpatialObject(), c.SpatialObject()) } + if err := agg.acc.Grow(ctx, int64(len(geomArg.EWKB()))); err != nil { + return err + } var err error // TODO(#geo):We are allocating a slice for the result each time we // call geos.Union in cStringToSafeGoBytes. // We could change geos.Union to accept the existing slice. agg.ewkb, err = geos.Union(agg.ewkb, geomArg.EWKB()) - return err + if err != nil { + return err + } + return agg.acc.ResizeTo(ctx, int64(len(agg.ewkb))) } // Result implements the AggregateFunc interface. @@ -744,13 +764,16 @@ func (agg *stUnionAgg) Result() (tree.Datum, error) { } // Reset implements the AggregateFunc interface. -func (agg *stUnionAgg) Reset(context.Context) { +func (agg *stUnionAgg) Reset(ctx context.Context) { agg.ewkb = nil agg.set = false + agg.acc.Empty(ctx) } // Close implements the AggregateFunc interface. -func (agg *stUnionAgg) Close(context.Context) {} +func (agg *stUnionAgg) Close(ctx context.Context) { + agg.acc.Close(ctx) +} // Size implements the AggregateFunc interface. func (agg *stUnionAgg) Size() int64 {