Skip to content

Commit

Permalink
fix: improve delete speed when a measurement is part of the predicate (
Browse files Browse the repository at this point in the history
…#23786)

* fix: improve delete speed when a measurement is part of the predicate

* test: add test for deleting measurement by predicate

* chore: improve error messaging and capturing

* chore: set goland to use the right formatting style
  • Loading branch information
jeffreyssmith2nd authored Oct 14, 2022
1 parent 4ed184d commit 2ad8995
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 38 deletions.
6 changes: 4 additions & 2 deletions cmd/influxd/launcher/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync"
"time"

"github.com/influxdata/influxql"

"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/http"
"github.com/influxdata/influxdb/v2/kit/platform"
Expand Down Expand Up @@ -119,8 +121,8 @@ func (t *TemporaryEngine) SeriesCardinality(ctx context.Context, bucketID platfo
}

// DeleteBucketRangePredicate will delete a bucket from the range and predicate.
func (t *TemporaryEngine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate) error {
return t.engine.DeleteBucketRangePredicate(ctx, orgID, bucketID, min, max, pred)
func (t *TemporaryEngine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate, measurement influxql.Expr) error {
return t.engine.DeleteBucketRangePredicate(ctx, orgID, bucketID, min, max, pred, measurement)
}

func (t *TemporaryEngine) CreateBucket(ctx context.Context, b *influxdb.Bucket) error {
Expand Down
4 changes: 3 additions & 1 deletion delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package influxdb
import (
"context"

"github.com/influxdata/influxql"

"github.com/influxdata/influxdb/v2/kit/platform"
)

Expand All @@ -15,5 +17,5 @@ type Predicate interface {

// DeleteService will delete a bucket from the range and predict.
type DeleteService interface {
DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID platform.ID, min, max int64, pred Predicate) error
DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID platform.ID, min, max int64, pred Predicate, measurement influxql.Expr) error
}
75 changes: 66 additions & 9 deletions http/delete_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ import (
"context"
"encoding/json"
"fmt"
"io"
http "net/http"
"time"

"github.com/influxdata/influxql"

"github.com/influxdata/httprouter"
"github.com/influxdata/influxdb/v2"
pcontext "github.com/influxdata/influxdb/v2/context"
Expand Down Expand Up @@ -91,7 +94,7 @@ func (h *DeleteHandler) handleDelete(w http.ResponseWriter, r *http.Request) {
return
}

dr, err := decodeDeleteRequest(
dr, measurement, err := decodeDeleteRequest(
ctx, r,
h.OrganizationService,
h.BucketService,
Expand Down Expand Up @@ -121,7 +124,7 @@ func (h *DeleteHandler) handleDelete(w http.ResponseWriter, r *http.Request) {
return
}

if err := h.DeleteService.DeleteBucketRangePredicate(r.Context(), dr.Org.ID, dr.Bucket.ID, dr.Start, dr.Stop, dr.Predicate); err != nil {
if err := h.DeleteService.DeleteBucketRangePredicate(r.Context(), dr.Org.ID, dr.Bucket.ID, dr.Start, dr.Stop, dr.Predicate, measurement); err != nil {
h.HandleHTTPError(ctx, &errors.Error{
Code: errors.EInternal,
Op: "http/handleDelete",
Expand All @@ -139,24 +142,78 @@ func (h *DeleteHandler) handleDelete(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}

func decodeDeleteRequest(ctx context.Context, r *http.Request, orgSvc influxdb.OrganizationService, bucketSvc influxdb.BucketService) (*deleteRequest, error) {
func decodeDeleteRequest(ctx context.Context, r *http.Request, orgSvc influxdb.OrganizationService, bucketSvc influxdb.BucketService) (*deleteRequest, influxql.Expr, error) {
dr := new(deleteRequest)
err := json.NewDecoder(r.Body).Decode(dr)
buf, err := io.ReadAll(r.Body)
if err != nil {
return nil, &errors.Error{
je := &errors.Error{
Code: errors.EInvalid,
Msg: "invalid request; error parsing request json",
Msg: "error reading json body",
Err: err,
}
return nil, nil, je
}
buffer := bytes.NewBuffer(buf)
err = json.NewDecoder(buffer).Decode(dr)
if err != nil {
je := &errors.Error{
Code: errors.EInvalid,
Msg: "error decoding json body",
Err: err,
}
return nil, nil, je
}

var drd deleteRequestDecode
err = json.Unmarshal(buf, &drd)
if err != nil {
je := &errors.Error{
Code: errors.EInvalid,
Msg: "error decoding json body for predicate",
Err: err,
}
return nil, nil, je
}
var measurementExpr influxql.Expr
if drd.Predicate != "" {
expr, err := influxql.ParseExpr(drd.Predicate)
if err != nil {
return nil, nil, &errors.Error{
Code: errors.EInvalid,
Msg: "invalid request; error parsing predicate",
Err: err,
}
}
measurementExpr, _, err = influxql.PartitionExpr(influxql.CloneExpr(expr), func(e influxql.Expr) (bool, error) {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if ok && tag.Val == "_measurement" {
return true, nil
}
}
}
return false, nil
})
if err != nil {
return nil, nil, &errors.Error{
Code: errors.EInvalid,
Msg: "invalid request; error partitioning predicate",
Err: err,
}
}
}

if dr.Org, err = queryOrganization(ctx, r, orgSvc); err != nil {
return nil, err
return nil, nil, err
}

if dr.Bucket, err = queryBucket(ctx, dr.Org.ID, r, bucketSvc); err != nil {
return nil, err
return nil, nil, err
}
return dr, nil
return dr, measurementExpr, nil
}

type deleteRequest struct {
Expand Down
66 changes: 60 additions & 6 deletions http/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestDelete(t *testing.T) {
contentType: "application/json; charset=utf-8",
body: `{
"code": "invalid",
"message": "invalid request; error parsing request json: invalid RFC3339Nano for field start, please format your time with RFC3339Nano format, example: 2009-01-02T23:00:00Z"
"message": "error decoding json body: invalid RFC3339Nano for field start, please format your time with RFC3339Nano format, example: 2009-01-02T23:00:00Z"
}`,
},
},
Expand All @@ -89,7 +89,7 @@ func TestDelete(t *testing.T) {
contentType: "application/json; charset=utf-8",
body: `{
"code": "invalid",
"message": "invalid request; error parsing request json: invalid RFC3339Nano for field stop, please format your time with RFC3339Nano format, example: 2009-01-01T23:00:00Z"
"message": "error decoding json body: invalid RFC3339Nano for field stop, please format your time with RFC3339Nano format, example: 2009-01-01T23:00:00Z"
}`,
},
},
Expand All @@ -106,7 +106,7 @@ func TestDelete(t *testing.T) {
contentType: "application/json; charset=utf-8",
body: fmt.Sprintf(`{
"code": "invalid",
"message": "invalid request; error parsing request json: %s"
"message": "error decoding json body: %s"
}`, msgStartTooSoon),
},
},
Expand All @@ -123,7 +123,7 @@ func TestDelete(t *testing.T) {
contentType: "application/json; charset=utf-8",
body: fmt.Sprintf(`{
"code": "invalid",
"message": "invalid request; error parsing request json: %s"
"message": "error decoding json body: %s"
}`, msgStopTooLate),
},
},
Expand Down Expand Up @@ -321,7 +321,61 @@ func TestDelete(t *testing.T) {
statusCode: http.StatusBadRequest,
body: `{
"code": "invalid",
"message": "invalid request; error parsing request json: the logical operator OR is not supported yet at position 25"
"message": "error decoding json body: the logical operator OR is not supported yet at position 25"
}`,
},
},
{
name: "unsupported delete measurements",
args: args{
queryParams: map[string][]string{
"org": {"org1"},
"bucket": {"buck1"},
},
body: []byte(`{
"start":"2009-01-01T23:00:00Z",
"stop":"2019-11-10T01:00:00Z",
"predicate": "_measurement=\"cpu\" or _measurement=\"mem\""
}`),
authorizer: &influxdb.Authorization{
UserID: user1ID,
Status: influxdb.Active,
Permissions: []influxdb.Permission{
{
Action: influxdb.WriteAction,
Resource: influxdb.Resource{
Type: influxdb.BucketsResourceType,
ID: influxtesting.IDPtr(platform.ID(2)),
OrgID: influxtesting.IDPtr(platform.ID(1)),
},
},
},
},
},
fields: fields{
DeleteService: mock.NewDeleteService(),
BucketService: &mock.BucketService{
FindBucketFn: func(ctx context.Context, f influxdb.BucketFilter) (*influxdb.Bucket, error) {
return &influxdb.Bucket{
ID: platform.ID(2),
Name: "bucket1",
}, nil
},
},
OrganizationService: &mock.OrganizationService{
FindOrganizationF: func(ctx context.Context, f influxdb.OrganizationFilter) (*influxdb.Organization, error) {
return &influxdb.Organization{
ID: platform.ID(1),
Name: "org1",
}, nil
},
},
},
wants: wants{
statusCode: http.StatusBadRequest,
body: `{
"code": "invalid",
"message": "error decoding json body: the logical operator OR is not supported yet at position 19"
}`,
},
},
Expand All @@ -335,7 +389,7 @@ func TestDelete(t *testing.T) {
body: []byte(`{
"start":"2009-01-01T23:00:00Z",
"stop":"2019-11-10T01:00:00Z",
"predicate": "tag1=\"v1\" and (tag2=\"v2\" and tag3=\"v3\")"
"predicate": "_measurement=\"testing\" and tag1=\"v1\" and (tag2=\"v2\" and tag3=\"v3\")"
}`),
authorizer: &influxdb.Authorization{
UserID: user1ID,
Expand Down
12 changes: 7 additions & 5 deletions mock/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package mock
import (
"context"

"github.com/influxdata/influxql"

"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform"
)
Expand All @@ -11,20 +13,20 @@ var _ influxdb.DeleteService = &DeleteService{}

// DeleteService is a mock delete server.
type DeleteService struct {
DeleteBucketRangePredicateF func(tx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate) error
DeleteBucketRangePredicateF func(tx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate, measurement influxql.Expr) error
}

// NewDeleteService returns a mock DeleteService where its methods will return
// zero values.
func NewDeleteService() DeleteService {
return DeleteService{
DeleteBucketRangePredicateF: func(tx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate) error {
DeleteBucketRangePredicateF: func(tx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate, measurement influxql.Expr) error {
return nil
},
}
}

//DeleteBucketRangePredicate calls DeleteBucketRangePredicateF.
func (s DeleteService) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate) error {
return s.DeleteBucketRangePredicateF(ctx, orgID, bucketID, min, max, pred)
// DeleteBucketRangePredicate calls DeleteBucketRangePredicateF.
func (s DeleteService) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate, measurement influxql.Expr) error {
return s.DeleteBucketRangePredicateF(ctx, orgID, bucketID, min, max, pred, measurement)
}
4 changes: 2 additions & 2 deletions storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (e *Engine) DeleteBucket(ctx context.Context, orgID, bucketID platform.ID)

// DeleteBucketRangePredicate deletes data within a bucket from the storage engine. Any data
// deleted must be in [min, max], and the key must match the predicate if provided.
func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate) error {
func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate, measurement influxql.Expr) error {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()

Expand All @@ -336,7 +336,7 @@ func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID
if e.closing == nil {
return ErrEngineClosed
}
return e.tsdbStore.DeleteSeriesWithPredicate(ctx, bucketID.String(), min, max, pred)
return e.tsdbStore.DeleteSeriesWithPredicate(ctx, bucketID.String(), min, max, pred, measurement)
}

// RLockKVStore locks the KV store as well as the engine in preparation for doing a backup.
Expand Down
Loading

0 comments on commit 2ad8995

Please sign in to comment.