Skip to content

Commit

Permalink
fix retention policy creation inconsistencies
Browse files Browse the repository at this point in the history
  • Loading branch information
corylanou committed Oct 11, 2016
1 parent 3116951 commit 5c7bb73
Show file tree
Hide file tree
Showing 13 changed files with 128 additions and 62 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
- [#7226](https://github.com/influxdata/influxdb/issues/7226): Fix database locked up when deleting shards
- [#7382](https://github.com/influxdata/influxdb/issues/7382): Shard stats include wal path tag so disk bytes make more sense.
- [#7385](https://github.com/influxdata/influxdb/pull/7385): Reduce query planning allocations
- [#7448](https://github.com/influxdata/influxdb/pull/7448): Fix Retention Policy Inconsistencies

## v1.0.2 [2016-10-05]

Expand Down
2 changes: 1 addition & 1 deletion cmd/influxd/run/server_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (s *Server) URL() string {
func (s *Server) CreateDatabaseAndRetentionPolicy(db string, rp *meta.RetentionPolicySpec) error {
if _, err := s.MetaClient.CreateDatabase(db); err != nil {
return err
} else if _, err := s.MetaClient.CreateRetentionPolicy(db, rp); err != nil {
} else if _, err := s.MetaClient.CreateRetentionPolicy(db, rp, true); err != nil {
return err
}
return nil
Expand Down
6 changes: 6 additions & 0 deletions cmd/influxd/run/server_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,12 @@ func init() {
exp: `{"results":[{}]}`,
once: true,
},
&Query{
name: "create retention policy with default on",
command: `CREATE RETENTION POLICY rp3 ON db0 DURATION 1h REPLICATION 1 SHARD DURATION 30m DEFAULT`,
exp: `{"results":[{"error":"retention policy conflicts with an existing policy"}]}`,
once: true,
},
&Query{
name: "show retention policy should show both with custom shard",
command: `SHOW RETENTION POLICIES ON db0`,
Expand Down
2 changes: 1 addition & 1 deletion coordinator/meta_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type MetaClient interface {
CreateContinuousQuery(database, name, query string) error
CreateDatabase(name string) (*meta.DatabaseInfo, error)
CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error)
CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error)
CreateSubscription(database, rp, name, mode string, destinations []string) error
CreateUser(name, password string, admin bool) (*meta.UserInfo, error)
Database(name string) *meta.DatabaseInfo
Expand Down
6 changes: 3 additions & 3 deletions coordinator/meta_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type MetaClient struct {
CreateContinuousQueryFn func(database, name, query string) error
CreateDatabaseFn func(name string) (*meta.DatabaseInfo, error)
CreateDatabaseWithRetentionPolicyFn func(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
CreateRetentionPolicyFn func(database string, spec *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error)
CreateRetentionPolicyFn func(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error)
CreateSubscriptionFn func(database, rp, name, mode string, destinations []string) error
CreateUserFn func(name, password string, admin bool) (*meta.UserInfo, error)
DatabaseFn func(name string) *meta.DatabaseInfo
Expand Down Expand Up @@ -52,8 +52,8 @@ func (c *MetaClient) CreateDatabaseWithRetentionPolicy(name string, spec *meta.R
return c.CreateDatabaseWithRetentionPolicyFn(name, spec)
}

func (c *MetaClient) CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error) {
return c.CreateRetentionPolicyFn(database, spec)
func (c *MetaClient) CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error) {
return c.CreateRetentionPolicyFn(database, spec, makeDefault)
}

func (c *MetaClient) DropShard(id uint64) error {
Expand Down
8 changes: 1 addition & 7 deletions coordinator/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,17 +271,11 @@ func (e *StatementExecutor) executeCreateRetentionPolicyStatement(stmt *influxql
}

// Create new retention policy.
rp, err := e.MetaClient.CreateRetentionPolicy(stmt.Database, &spec)
_, err := e.MetaClient.CreateRetentionPolicy(stmt.Database, &spec, stmt.Default)
if err != nil {
return err
}

// If requested, set new policy as the default.
if stmt.Default {
if err := e.MetaClient.SetDefaultRetentionPolicy(stmt.Database, rp.Name); err != nil {
return err
}
}
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions services/graphite/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type Service struct {
MetaClient interface {
CreateDatabase(name string) (*meta.DatabaseInfo, error)
CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error)
CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error)
Database(name string) *meta.DatabaseInfo
RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error)
}
Expand Down Expand Up @@ -140,7 +140,7 @@ func (s *Service) Open() error {
if db := s.MetaClient.Database(s.database); db != nil {
if rp, _ := s.MetaClient.RetentionPolicy(s.database, s.retentionPolicy); rp == nil {
spec := meta.RetentionPolicySpec{Name: s.retentionPolicy}
if _, err := s.MetaClient.CreateRetentionPolicy(s.database, &spec); err != nil {
if _, err := s.MetaClient.CreateRetentionPolicy(s.database, &spec, true); err != nil {
s.logger.Printf("Failed to ensure target retention policy %s exists: %s", s.database, err.Error())
}
}
Expand Down
2 changes: 1 addition & 1 deletion services/graphite/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (d *DatabaseCreator) CreateDatabase(name string) (*meta.DatabaseInfo, error
return nil, nil
}

func (d *DatabaseCreator) CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error) {
func (d *DatabaseCreator) CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error) {
return nil, nil
}

Expand Down
8 changes: 4 additions & 4 deletions services/meta/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (c *Client) CreateDatabase(name string) (*DatabaseInfo, error) {
// create default retention policy
if c.retentionAutoCreate {
rpi := DefaultRetentionPolicyInfo()
if err := data.CreateRetentionPolicy(name, rpi); err != nil {
if err := data.CreateRetentionPolicy(name, rpi, true); err != nil {
return nil, err
}
if err := data.SetDefaultRetentionPolicy(name, rpi.Name); err != nil {
Expand Down Expand Up @@ -224,7 +224,7 @@ func (c *Client) CreateDatabaseWithRetentionPolicy(name string, spec *RetentionP

rpi := spec.NewRetentionPolicyInfo()
if rp := db.RetentionPolicy(rpi.Name); rp == nil {
if err := data.CreateRetentionPolicy(name, rpi); err != nil {
if err := data.CreateRetentionPolicy(name, rpi, true); err != nil {
return nil, err
}
} else if !spec.Matches(rp) {
Expand Down Expand Up @@ -273,7 +273,7 @@ func (c *Client) DropDatabase(name string) error {
}

// CreateRetentionPolicy creates a retention policy on the specified database.
func (c *Client) CreateRetentionPolicy(database string, spec *RetentionPolicySpec) (*RetentionPolicyInfo, error) {
func (c *Client) CreateRetentionPolicy(database string, spec *RetentionPolicySpec, makeDefault bool) (*RetentionPolicyInfo, error) {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -284,7 +284,7 @@ func (c *Client) CreateRetentionPolicy(database string, spec *RetentionPolicySpe
}

rp := spec.NewRetentionPolicyInfo()
if err := data.CreateRetentionPolicy(database, rp); err != nil {
if err := data.CreateRetentionPolicy(database, rp, makeDefault); err != nil {
return nil, err
}

Expand Down
14 changes: 7 additions & 7 deletions services/meta/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func TestMetaClient_CreateRetentionPolicy(t *testing.T) {
ReplicaN: &rp0.ReplicaN,
Duration: &rp0.Duration,
ShardGroupDuration: rp0.ShardGroupDuration,
}); err != nil {
}, true); err != nil {
t.Fatal(err)
}

Expand All @@ -265,7 +265,7 @@ func TestMetaClient_CreateRetentionPolicy(t *testing.T) {
ReplicaN: &rp0.ReplicaN,
Duration: &rp0.Duration,
ShardGroupDuration: rp0.ShardGroupDuration,
}); err != nil {
}, true); err != nil {
t.Fatal(err)
} else if actual, err = c.RetentionPolicy("db0", "rp0"); err != nil {
t.Fatal(err)
Expand All @@ -283,7 +283,7 @@ func TestMetaClient_CreateRetentionPolicy(t *testing.T) {
ReplicaN: &rp1.ReplicaN,
Duration: &rp1.Duration,
ShardGroupDuration: rp1.ShardGroupDuration,
})
}, true)
if exp := meta.ErrRetentionPolicyExists; got != exp {
t.Fatalf("got error %v, expected error %v", got, exp)
}
Expand All @@ -298,7 +298,7 @@ func TestMetaClient_CreateRetentionPolicy(t *testing.T) {
ReplicaN: &rp1.ReplicaN,
Duration: &rp1.Duration,
ShardGroupDuration: rp1.ShardGroupDuration,
})
}, true)
if exp := meta.ErrRetentionPolicyExists; got != exp {
t.Fatalf("got error %v, expected error %v", got, exp)
}
Expand All @@ -313,7 +313,7 @@ func TestMetaClient_CreateRetentionPolicy(t *testing.T) {
ReplicaN: &rp1.ReplicaN,
Duration: &rp1.Duration,
ShardGroupDuration: rp1.ShardGroupDuration,
})
}, true)
if exp := meta.ErrRetentionPolicyExists; got != exp {
t.Fatalf("got error %v, expected error %v", got, exp)
}
Expand All @@ -329,7 +329,7 @@ func TestMetaClient_CreateRetentionPolicy(t *testing.T) {
ReplicaN: &rp1.ReplicaN,
Duration: &rp1.Duration,
ShardGroupDuration: rp1.ShardGroupDuration,
})
}, true)
if exp := meta.ErrIncompatibleDurations; got != exp {
t.Fatalf("got error %v, expected error %v", got, exp)
}
Expand Down Expand Up @@ -481,7 +481,7 @@ func TestMetaClient_DropRetentionPolicy(t *testing.T) {
Name: "rp0",
Duration: &duration,
ReplicaN: &replicaN,
}); err != nil {
}, true); err != nil {
t.Fatal(err)
}

Expand Down
12 changes: 11 additions & 1 deletion services/meta/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (data *Data) RetentionPolicy(database, name string) (*RetentionPolicyInfo,

// CreateRetentionPolicy creates a new retention policy on a database.
// Returns an error if name is blank or if a database does not exist.
func (data *Data) CreateRetentionPolicy(database string, rpi *RetentionPolicyInfo) error {
func (data *Data) CreateRetentionPolicy(database string, rpi *RetentionPolicyInfo, makeDefault bool) error {
// Validate retention policy.
if rpi == nil {
return ErrRetentionPolicyRequired
Expand Down Expand Up @@ -163,11 +163,21 @@ func (data *Data) CreateRetentionPolicy(database string, rpi *RetentionPolicyInf
if rp.ReplicaN != rpi.ReplicaN || rp.Duration != rpi.Duration || rp.ShardGroupDuration != rpi.ShardGroupDuration {
return ErrRetentionPolicyExists
}
// if they want to make it default, and it's not the default, it's not an identical command so it's an error
if makeDefault && di.DefaultRetentionPolicy != rpi.Name {
return ErrRetentionPolicyConflict
}
return nil
}

// Append copy of new policy.
di.RetentionPolicies = append(di.RetentionPolicies, *rpi)

// Set the default if needed
if makeDefault {
di.DefaultRetentionPolicy = rpi.Name
}

return nil
}

Expand Down
56 changes: 56 additions & 0 deletions services/meta/data_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package meta

import (
"reflect"
"sort"
"time"

"testing"
)

func Test_newShardOwner(t *testing.T) {
// An error is returned if there are no data nodes available.
_, err := NewShardOwner(ShardInfo{}, map[int]int{})
if err == nil {
t.Error("got no error, but expected one")
}

ownerFreqs := map[int]int{1: 15, 2: 11, 3: 12}
id, err := NewShardOwner(ShardInfo{ID: 4}, ownerFreqs)
if err != nil {
t.Fatal(err)
}

// The ID that owns the fewest shards is returned.
if got, exp := id, uint64(2); got != exp {
t.Errorf("got id %d, expected id %d", got, exp)
}

// The ownership frequencies are updated.
if got, exp := ownerFreqs, map[int]int{1: 15, 2: 12, 3: 12}; !reflect.DeepEqual(got, exp) {
t.Errorf("got owner frequencies %v, expected %v", got, exp)
}
}

func TestShardGroupSort(t *testing.T) {
sg1 := ShardGroupInfo{
ID: 1,
StartTime: time.Unix(1000, 0),
EndTime: time.Unix(1100, 0),
TruncatedAt: time.Unix(1050, 0),
}

sg2 := ShardGroupInfo{
ID: 2,
StartTime: time.Unix(1000, 0),
EndTime: time.Unix(1100, 0),
}

sgs := ShardGroupInfos{sg2, sg1}

sort.Sort(sgs)

if sgs[len(sgs)-1].ID != 2 {
t.Fatal("unstable sort for ShardGroupInfos")
}
}
69 changes: 34 additions & 35 deletions services/meta/data_test.go
Original file line number Diff line number Diff line change
@@ -1,56 +1,55 @@
package meta
package meta_test

import (
"reflect"
"sort"
"testing"
"time"

"testing"
"github.com/influxdata/influxdb/services/meta"
)

func Test_newShardOwner(t *testing.T) {
// An error is returned if there are no data nodes available.
_, err := NewShardOwner(ShardInfo{}, map[int]int{})
if err == nil {
t.Error("got no error, but expected one")
}
func Test_Data_CreateRetentionPolicy(t *testing.T) {
data := meta.Data{}

ownerFreqs := map[int]int{1: 15, 2: 11, 3: 12}
id, err := NewShardOwner(ShardInfo{ID: 4}, ownerFreqs)
err := data.CreateDatabase("foo")
if err != nil {
t.Fatal(err)
}

// The ID that owns the fewest shards is returned.
if got, exp := id, uint64(2); got != exp {
t.Errorf("got id %d, expected id %d", got, exp)
err = data.CreateRetentionPolicy("foo", &meta.RetentionPolicyInfo{
Name: "bar",
ReplicaN: 1,
Duration: 24 * time.Hour,
}, false)
if err != nil {
t.Fatal(err)
}

// The ownership frequencies are updated.
if got, exp := ownerFreqs, map[int]int{1: 15, 2: 12, 3: 12}; !reflect.DeepEqual(got, exp) {
t.Errorf("got owner frequencies %v, expected %v", got, exp)
rp, err := data.RetentionPolicy("foo", "bar")
if err != nil {
t.Fatal(err)
}
}

func TestShardGroupSort(t *testing.T) {
sg1 := ShardGroupInfo{
ID: 1,
StartTime: time.Unix(1000, 0),
EndTime: time.Unix(1100, 0),
TruncatedAt: time.Unix(1050, 0),
if rp == nil {
t.Fatal("creation of retention policy failed")
}

sg2 := ShardGroupInfo{
ID: 2,
StartTime: time.Unix(1000, 0),
EndTime: time.Unix(1100, 0),
// Try to recreate the same RP with default set to true, should fail
err = data.CreateRetentionPolicy("foo", &meta.RetentionPolicyInfo{
Name: "bar",
ReplicaN: 1,
Duration: 24 * time.Hour,
}, true)
if err == nil || err != meta.ErrRetentionPolicyConflict {
t.Fatalf("unexpected error. got: %v, exp: %s", err, meta.ErrRetentionPolicyConflict)
}

sgs := ShardGroupInfos{sg2, sg1}

sort.Sort(sgs)

if sgs[len(sgs)-1].ID != 2 {
t.Fatal("unstable sort for ShardGroupInfos")
// Creating the same RP with the same specifications should succeed
err = data.CreateRetentionPolicy("foo", &meta.RetentionPolicyInfo{
Name: "bar",
ReplicaN: 1,
Duration: 24 * time.Hour,
}, false)
if err != nil {
t.Fatal(err)
}
}

0 comments on commit 5c7bb73

Please sign in to comment.