Skip to content

Commit

Permalink
builtins: add memory accounting for ST_MakeLine/ST_Union aggregates
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
otan committed Sep 28, 2020
1 parent 38cc898 commit 8d518ef
Showing 1 changed file with 32 additions and 9 deletions.
41 changes: 32 additions & 9 deletions pkg/sql/sem/builtins/aggregate_builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,9 @@ var aggregates = map[string]builtinDefinition{
func(
params []*types.T, evalCtx *tree.EvalContext, arguments tree.Datums,
) tree.AggregateFunc {
return &stMakeLineAgg{}
return &stMakeLineAgg{
acc: evalCtx.Mon.MakeBoundAccount(),
}
},
infoBuilder{
info: "Forms a LineString from Point, MultiPoint or LineStrings. Other shapes will be ignored.",
Expand Down Expand Up @@ -627,7 +629,9 @@ func makeSTUnionBuiltin() builtinDefinition {
func(
params []*types.T, evalCtx *tree.EvalContext, arguments tree.Datums,
) tree.AggregateFunc {
return &stUnionAgg{}
return &stUnionAgg{
acc: evalCtx.Mon.MakeBoundAccount(),
}
},
infoBuilder{
info: "Applies a spatial union to the geometries provided.",
Expand All @@ -640,11 +644,12 @@ func makeSTUnionBuiltin() builtinDefinition {
type stMakeLineAgg struct {
flatCoords []float64
layout geom.Layout
acc mon.BoundAccount
}

// Add implements the AggregateFunc interface.
func (agg *stMakeLineAgg) Add(
_ context.Context, firstArg tree.Datum, otherArgs ...tree.Datum,
ctx context.Context, firstArg tree.Datum, otherArgs ...tree.Datum,
) error {
if firstArg == tree.DNull {
return nil
Expand All @@ -667,6 +672,9 @@ func (agg *stMakeLineAgg) Add(
}
switch g.(type) {
case *geom.Point, *geom.LineString, *geom.MultiPoint:
if err := agg.acc.Grow(ctx, int64(len(g.FlatCoords())*8)); err != nil {
return err
}
agg.flatCoords = append(agg.flatCoords, g.FlatCoords()...)
}
return nil
Expand All @@ -685,12 +693,15 @@ func (agg *stMakeLineAgg) Result() (tree.Datum, error) {
}

// Reset implements the AggregateFunc interface.
func (agg *stMakeLineAgg) Reset(context.Context) {
func (agg *stMakeLineAgg) Reset(ctx context.Context) {
agg.flatCoords = agg.flatCoords[:0]
agg.acc.Empty(ctx)
}

// Close implements the AggregateFunc interface.
func (agg *stMakeLineAgg) Close(context.Context) {}
func (agg *stMakeLineAgg) Close(ctx context.Context) {
agg.acc.Close(ctx)
}

// Size implements the AggregateFunc interface.
func (agg *stMakeLineAgg) Size() int64 {
Expand All @@ -701,11 +712,14 @@ type stUnionAgg struct {
srid geopb.SRID
// TODO(#geo): store the current union object in C memory, to avoid the EWKB round trips.
ewkb geopb.EWKB
acc mon.BoundAccount
set bool
}

// Add implements the AggregateFunc interface.
func (agg *stUnionAgg) Add(_ context.Context, firstArg tree.Datum, otherArgs ...tree.Datum) error {
func (agg *stUnionAgg) Add(
ctx context.Context, firstArg tree.Datum, otherArgs ...tree.Datum,
) error {
if firstArg == tree.DNull {
return nil
}
Expand All @@ -723,12 +737,18 @@ func (agg *stUnionAgg) Add(_ context.Context, firstArg tree.Datum, otherArgs ...
}
return geo.NewMismatchingSRIDsError(geomArg.Geometry.SpatialObject(), c.SpatialObject())
}
if err := agg.acc.Grow(ctx, int64(len(geomArg.EWKB()))); err != nil {
return err
}
var err error
// TODO(#geo):We are allocating a slice for the result each time we
// call geos.Union in cStringToSafeGoBytes.
// We could change geos.Union to accept the existing slice.
agg.ewkb, err = geos.Union(agg.ewkb, geomArg.EWKB())
return err
if err != nil {
return err
}
return agg.acc.ResizeTo(ctx, int64(len(agg.ewkb)))
}

// Result implements the AggregateFunc interface.
Expand All @@ -744,13 +764,16 @@ func (agg *stUnionAgg) Result() (tree.Datum, error) {
}

// Reset implements the AggregateFunc interface.
func (agg *stUnionAgg) Reset(context.Context) {
func (agg *stUnionAgg) Reset(ctx context.Context) {
agg.ewkb = nil
agg.set = false
agg.acc.Empty(ctx)
}

// Close implements the AggregateFunc interface.
func (agg *stUnionAgg) Close(context.Context) {}
func (agg *stUnionAgg) Close(ctx context.Context) {
agg.acc.Close(ctx)
}

// Size implements the AggregateFunc interface.
func (agg *stUnionAgg) Size() int64 {
Expand Down

0 comments on commit 8d518ef

Please sign in to comment.