Skip to content

Commit

Permalink
fix: fixes an error querying virtual dbrps (#23731)
Browse files Browse the repository at this point in the history
* fix: fixes an error querying virtual dbrps

When the virtual pointer was set to false, the mappings were being ignored.

* fix: missed part in a rebase

* test: add test for shard mapping virtual dbrps

* fix: do not create virtual mappings for equivalent physical mappings
  • Loading branch information
jeffreyssmith2nd authored Oct 13, 2022
1 parent a0f1184 commit 4ed184d
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 100 deletions.
10 changes: 7 additions & 3 deletions dbrp/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,16 +366,20 @@ func (s *Service) FindMany(ctx context.Context, filter influxdb.DBRPMappingFilte
// we were unable to find any virtual mappings, so return what physical mappings we have
return ms, len(ms), nil
}
OUTER:
for _, bucket := range buckets {
if bucket == nil {
continue
}
newMapping := bucketToMapping(bucket)
// if any bucket already exists that is default for this database,
// this virtual mapping should not be the default
if newMapping.Default {
for _, m := range ms {
if m.Database == newMapping.Database && m.Default {
for _, m := range ms {
if m.Database == newMapping.Database {
if newMapping.Virtual && m.RetentionPolicy == newMapping.RetentionPolicy {
continue OUTER
}
if m.Default && newMapping.Default {
newMapping.Default = false
break
}
Expand Down
14 changes: 12 additions & 2 deletions testing/dbrp_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,17 @@ func FindManyDBRPMappingsV2(
{ID: 500, Name: "testdb4", OrgID: MustIDBase16(dbrpOrg2ID)},
}, 0, nil
}},
DBRPMappingsV2: []*influxdb.DBRPMapping{},
DBRPMappingsV2: []*influxdb.DBRPMapping{
{
ID: 500,
Database: "testdb4",
RetentionPolicy: "autogen",
Default: true,
Virtual: false,
OrganizationID: MustIDBase16(dbrpOrg2ID),
BucketID: 500,
},
},
},
args: args{
filter: influxdb.DBRPMappingFilter{
Expand All @@ -715,7 +725,7 @@ func FindManyDBRPMappingsV2(
Database: "testdb4",
RetentionPolicy: "autogen",
Default: true,
Virtual: true,
Virtual: false,
OrganizationID: MustIDBase16(dbrpOrg2ID),
BucketID: 500,
},
Expand Down
3 changes: 1 addition & 2 deletions v1/coordinator/shard_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"io"
"time"

"github.com/influxdata/influx-cli/v2/api"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/influxql/query"
"github.com/influxdata/influxdb/v2/kit/platform"
Expand Down Expand Up @@ -67,7 +66,7 @@ func (e *LocalShardMapper) mapShards(ctx context.Context, a *LocalShardMapping,
OrgID: &orgID,
Database: &s.Database,
RetentionPolicy: &s.RetentionPolicy,
Virtual: api.PtrBool(false),
Virtual: nil,
})
if err != nil {
return fmt.Errorf("finding DBRP mappings: %v", err)
Expand Down
197 changes: 107 additions & 90 deletions v1/coordinator/shard_mapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package coordinator_test

import (
"context"
"github.com/influxdata/influx-cli/v2/api"
"reflect"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/influxdata/influx-cli/v2/api"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/dbrp/mocks"
"github.com/influxdata/influxdb/v2/influxql/query"
Expand All @@ -20,107 +20,124 @@ import (
)

func TestLocalShardMapper(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

dbrp := mocks.NewMockDBRPMappingService(ctrl)
orgID := platform.ID(0xff00)
bucketID := platform.ID(0xffee)
db := "db0"
rp := "rp0"
filt := influxdb.DBRPMappingFilter{OrgID: &orgID, Database: &db, RetentionPolicy: &rp, Virtual: api.PtrBool(false)}
res := []*influxdb.DBRPMapping{{Database: db, RetentionPolicy: rp, OrganizationID: orgID, BucketID: bucketID}}
dbrp.EXPECT().
FindMany(gomock.Any(), filt).
Times(2).
Return(res, 1, nil)

var metaClient MetaClient
metaClient.ShardGroupsByTimeRangeFn = func(database, policy string, min, max time.Time) ([]meta.ShardGroupInfo, error) {
if database != bucketID.String() {
t.Errorf("unexpected database: %s", database)
}
if policy != meta.DefaultRetentionPolicyName {
t.Errorf("unexpected retention policy: %s", policy)
}
return []meta.ShardGroupInfo{
{ID: 1, Shards: []meta.ShardInfo{
{ID: 1, Owners: []meta.ShardOwner{{NodeID: 0}}},
{ID: 2, Owners: []meta.ShardOwner{{NodeID: 0}}},
}},
{ID: 2, Shards: []meta.ShardInfo{
{ID: 3, Owners: []meta.ShardOwner{{NodeID: 0}}},
{ID: 4, Owners: []meta.ShardOwner{{NodeID: 0}}},
}},
}, nil

tests := []struct {
name string
db string
rp string
filt influxdb.DBRPMappingFilter
mapping []*influxdb.DBRPMapping
}{
{
name: "Physical DBRP Mapping",
db: "db0",
rp: "rp0",
filt: influxdb.DBRPMappingFilter{OrgID: &orgID, Database: api.PtrString("db0"), RetentionPolicy: api.PtrString("rp0"), Virtual: nil},
mapping: []*influxdb.DBRPMapping{{Database: "db0", RetentionPolicy: "rp0", OrganizationID: orgID, BucketID: bucketID}},
},
}

tsdbStore := &internal.TSDBStoreMock{}
tsdbStore.ShardGroupFn = func(ids []uint64) tsdb.ShardGroup {
if !reflect.DeepEqual(ids, []uint64{1, 2, 3, 4}) {
t.Errorf("unexpected shard ids: %#v", ids)
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

dbrp := mocks.NewMockDBRPMappingService(ctrl)
dbrp.EXPECT().
FindMany(gomock.Any(), tc.filt).
Times(2).
Return(tc.mapping, len(tc.mapping), nil)

var sh MockShard
sh.CreateIteratorFn = func(ctx context.Context, measurement *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) {
if measurement.Name != "cpu" {
t.Errorf("unexpected measurement: %s", measurement.Name)
var metaClient MetaClient
metaClient.ShardGroupsByTimeRangeFn = func(database, policy string, min, max time.Time) ([]meta.ShardGroupInfo, error) {
if database != bucketID.String() {
t.Errorf("unexpected database: %s", database)
}
if policy != meta.DefaultRetentionPolicyName {
t.Errorf("unexpected retention policy: %s", policy)
}
return []meta.ShardGroupInfo{
{ID: 1, Shards: []meta.ShardInfo{
{ID: 1, Owners: []meta.ShardOwner{{NodeID: 0}}},
{ID: 2, Owners: []meta.ShardOwner{{NodeID: 0}}},
}},
{ID: 2, Shards: []meta.ShardInfo{
{ID: 3, Owners: []meta.ShardOwner{{NodeID: 0}}},
{ID: 4, Owners: []meta.ShardOwner{{NodeID: 0}}},
}},
}, nil
}
return &FloatIterator{}, nil
}
return &sh
}

// Initialize the shard mapper.
shardMapper := &coordinator.LocalShardMapper{
MetaClient: &metaClient,
TSDBStore: tsdbStore,
DBRP: dbrp,
}
tsdbStore := &internal.TSDBStoreMock{}
tsdbStore.ShardGroupFn = func(ids []uint64) tsdb.ShardGroup {
if !reflect.DeepEqual(ids, []uint64{1, 2, 3, 4}) {
t.Errorf("unexpected shard ids: %#v", ids)
}

// Normal measurement.
measurement := &influxql.Measurement{
Database: db,
RetentionPolicy: rp,
Name: "cpu",
}
ic, err := shardMapper.MapShards(context.Background(), []influxql.Source{measurement}, influxql.TimeRange{}, query.SelectOptions{OrgID: orgID})
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
var sh MockShard
sh.CreateIteratorFn = func(ctx context.Context, measurement *influxql.Measurement, opt query.IteratorOptions) (query.Iterator, error) {
if measurement.Name != "cpu" {
t.Errorf("unexpected measurement: %s", measurement.Name)
}
return &FloatIterator{}, nil
}
return &sh
}

// This should be a LocalShardMapping.
m, ok := ic.(*coordinator.LocalShardMapping)
if !ok {
t.Fatalf("unexpected mapping type: %T", ic)
} else if len(m.ShardMap) != 1 {
t.Fatalf("unexpected number of shard mappings: %d", len(m.ShardMap))
}
// Initialize the shard mapper.
shardMapper := &coordinator.LocalShardMapper{
MetaClient: &metaClient,
TSDBStore: tsdbStore,
DBRP: dbrp,
}

if _, err := ic.CreateIterator(context.Background(), measurement, query.IteratorOptions{OrgID: orgID}); err != nil {
t.Fatalf("unexpected error: %s", err)
}
// Normal measurement.
measurement := &influxql.Measurement{
Database: tc.db,
RetentionPolicy: tc.rp,
Name: "cpu",
}
ic, err := shardMapper.MapShards(context.Background(), []influxql.Source{measurement}, influxql.TimeRange{}, query.SelectOptions{OrgID: orgID})
if err != nil {
t.Fatalf("unexpected error: %s", err)
}

// Subquery.
subquery := &influxql.SubQuery{
Statement: &influxql.SelectStatement{
Sources: []influxql.Source{measurement},
},
}
ic, err = shardMapper.MapShards(context.Background(), []influxql.Source{subquery}, influxql.TimeRange{}, query.SelectOptions{OrgID: orgID})
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
// This should be a LocalShardMapping.
m, ok := ic.(*coordinator.LocalShardMapping)
if !ok {
t.Fatalf("unexpected mapping type: %T", ic)
} else if len(m.ShardMap) != 1 {
t.Fatalf("unexpected number of shard mappings: %d", len(m.ShardMap))
}

// This should be a LocalShardMapping.
m, ok = ic.(*coordinator.LocalShardMapping)
if !ok {
t.Fatalf("unexpected mapping type: %T", ic)
} else if len(m.ShardMap) != 1 {
t.Fatalf("unexpected number of shard mappings: %d", len(m.ShardMap))
}
if _, err := ic.CreateIterator(context.Background(), measurement, query.IteratorOptions{OrgID: orgID}); err != nil {
t.Fatalf("unexpected error: %s", err)
}

// Subquery.
subquery := &influxql.SubQuery{
Statement: &influxql.SelectStatement{
Sources: []influxql.Source{measurement},
},
}
ic, err = shardMapper.MapShards(context.Background(), []influxql.Source{subquery}, influxql.TimeRange{}, query.SelectOptions{OrgID: orgID})
if err != nil {
t.Fatalf("unexpected error: %s", err)
}

if _, err := ic.CreateIterator(context.Background(), measurement, query.IteratorOptions{OrgID: orgID}); err != nil {
t.Fatalf("unexpected error: %s", err)
// This should be a LocalShardMapping.
m, ok = ic.(*coordinator.LocalShardMapping)
if !ok {
t.Fatalf("unexpected mapping type: %T", ic)
} else if len(m.ShardMap) != 1 {
t.Fatalf("unexpected number of shard mappings: %d", len(m.ShardMap))
}

if _, err := ic.CreateIterator(context.Background(), measurement, query.IteratorOptions{OrgID: orgID}); err != nil {
t.Fatalf("unexpected error: %s", err)
}
})
}
}
5 changes: 2 additions & 3 deletions v1/coordinator/statement_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/davecgh/go-spew/spew"
"github.com/golang/mock/gomock"
"github.com/influxdata/influx-cli/v2/api"
"github.com/influxdata/influxdb/v2"
icontext "github.com/influxdata/influxdb/v2/context"
"github.com/influxdata/influxdb/v2/dbrp/mocks"
Expand Down Expand Up @@ -46,7 +45,7 @@ func TestQueryExecutor_ExecuteQuery_SelectStatement(t *testing.T) {
dbrp := mocks.NewMockDBRPMappingService(ctrl)
orgID := platform.ID(0xff00)
empty := ""
filt := influxdb.DBRPMappingFilter{OrgID: &orgID, Database: &empty, RetentionPolicy: &empty, Virtual: api.PtrBool(false)}
filt := influxdb.DBRPMappingFilter{OrgID: &orgID, Database: &empty, RetentionPolicy: &empty, Virtual: nil}
res := []*influxdb.DBRPMapping{{}}
dbrp.EXPECT().
FindMany(gomock.Any(), filt).
Expand Down Expand Up @@ -112,7 +111,7 @@ func TestQueryExecutor_ExecuteQuery_MaxSelectBucketsN(t *testing.T) {
dbrp := mocks.NewMockDBRPMappingService(ctrl)
orgID := platform.ID(0xff00)
empty := ""
filt := influxdb.DBRPMappingFilter{OrgID: &orgID, Database: &empty, RetentionPolicy: &empty, Virtual: api.PtrBool(false)}
filt := influxdb.DBRPMappingFilter{OrgID: &orgID, Database: &empty, RetentionPolicy: &empty, Virtual: nil}
res := []*influxdb.DBRPMapping{{}}
dbrp.EXPECT().
FindMany(gomock.Any(), filt).
Expand Down

0 comments on commit 4ed184d

Please sign in to comment.