Skip to content

Commit

Permalink
Merge pull request #1902 from influxdb/rp-1hour-min
Browse files Browse the repository at this point in the history
Enforce retention policy minimum (currently 1 hour)
  • Loading branch information
corylanou committed Mar 10, 2015
2 parents 031ea1d + 410e357 commit bb35021
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 22 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## v0.9.0-rc11 [unreleased]

### Features
- [#1902](https://github.com/influxdb/influxdb/pull/1902): Enforce retention policies to have a minimum duraton.

## v0.9.0-rc10 [2015-03-09]

### Bugfixes
Expand Down
2 changes: 1 addition & 1 deletion httpd/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func TestHandler_UpdateRetentionPolicy(t *testing.T) {
s := NewHTTPServer(srvr)
defer s.Close()

query := map[string]string{"q": "ALTER RETENTION POLICY bar ON foo REPLICATION 42 DURATION 1m DEFAULT"}
query := map[string]string{"q": "ALTER RETENTION POLICY bar ON foo REPLICATION 42 DURATION 2h DEFAULT"}
status, body := MustHTTP("GET", s.URL+`/query`, query, nil, "")

// Verify updated policy.
Expand Down
3 changes: 3 additions & 0 deletions influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ var (
// ErrRetentionPolicyNameRequired is returned using a blank shard space name.
ErrRetentionPolicyNameRequired = errors.New("retention policy name required")

// ErrRetentionPolicyMinDuration is returned when creating replication policy with a duration smaller than RetenionPolicyMinDuration.
ErrRetentionPolicyMinDuration = fmt.Errorf("retention policy duration needs to be at least %s", retentionPolicyMinDuration)

// ErrDefaultRetentionPolicyNotFound is returned when using the default
// policy on a database but the default has not been set.
ErrDefaultRetentionPolicyNotFound = errors.New("default retention policy not found")
Expand Down
27 changes: 20 additions & 7 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ const (

// DefaultShardRetention is the length of time before a shard is dropped.
DefaultShardRetention = 7 * (24 * time.Hour)

// Defines the minimum duration allowed for all retention policies
retentionPolicyMinDuration = time.Hour
)

// Server represents a collection of metadata and raw metric data.
Expand Down Expand Up @@ -1269,6 +1272,11 @@ func (s *Server) RetentionPolicies(database string) ([]*RetentionPolicy, error)

// CreateRetentionPolicy creates a retention policy for a database.
func (s *Server) CreateRetentionPolicy(database string, rp *RetentionPolicy) error {
// Enforce duration of at least retentionPolicyMinDuration
if rp.Duration < retentionPolicyMinDuration {
return ErrRetentionPolicyMinDuration
}

const (
day = time.Hour * 24
month = day * 30
Expand Down Expand Up @@ -1335,6 +1343,11 @@ type RetentionPolicyUpdate struct {

// UpdateRetentionPolicy updates an existing retention policy on a database.
func (s *Server) UpdateRetentionPolicy(database, name string, rpu *RetentionPolicyUpdate) error {
// Enforce duration of at least retentionPolicyMinDuration
if *rpu.Duration < retentionPolicyMinDuration {
return ErrRetentionPolicyMinDuration
}

c := &updateRetentionPolicyCommand{Database: database, Name: name, Policy: rpu}
_, err := s.broadcast(updateRetentionPolicyMessageType, c)
return err
Expand Down Expand Up @@ -2558,20 +2571,20 @@ func (s *Server) executeShowUsersStatement(q *influxql.ShowUsersStatement, user
return &Result{Series: []*influxql.Row{row}}
}

func (s *Server) executeCreateRetentionPolicyStatement(q *influxql.CreateRetentionPolicyStatement, user *User) *Result {
rp := NewRetentionPolicy(q.Name)
rp.Duration = q.Duration
rp.ReplicaN = uint32(q.Replication)
func (s *Server) executeCreateRetentionPolicyStatement(stmt *influxql.CreateRetentionPolicyStatement, user *User) *Result {
rp := NewRetentionPolicy(stmt.Name)
rp.Duration = stmt.Duration
rp.ReplicaN = uint32(stmt.Replication)

// Create new retention policy.
err := s.CreateRetentionPolicy(q.Database, rp)
err := s.CreateRetentionPolicy(stmt.Database, rp)
if err != nil {
return &Result{Err: err}
}

// If requested, set new policy as the default.
if q.Default {
err = s.SetDefaultRetentionPolicy(q.Database, q.Name)
if stmt.Default {
err = s.SetDefaultRetentionPolicy(stmt.Database, stmt.Name)
}

return &Result{Err: err}
Expand Down
91 changes: 77 additions & 14 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func TestServer_CreateRetentionPolicy(t *testing.T) {
func TestServer_CreateRetentionPolicy_ErrDatabaseNotFound(t *testing.T) {
s := OpenServer(NewMessagingClient())
defer s.Close()
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != influxdb.ErrDatabaseNotFound {
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour}); err != influxdb.ErrDatabaseNotFound {
t.Fatal(err)
}
}
Expand All @@ -566,7 +566,7 @@ func TestServer_CreateRetentionPolicy_ErrRetentionPolicyNameRequired(t *testing.
s := OpenServer(NewMessagingClient())
defer s.Close()
s.CreateDatabase("foo")
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: ""}); err != influxdb.ErrRetentionPolicyNameRequired {
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "", Duration: time.Hour}); err != influxdb.ErrRetentionPolicyNameRequired {
t.Fatal(err)
}
}
Expand All @@ -576,8 +576,18 @@ func TestServer_CreateRetentionPolicy_ErrRetentionPolicyExists(t *testing.T) {
s := OpenServer(NewMessagingClient())
defer s.Close()
s.CreateDatabase("foo")
s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"})
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != influxdb.ErrRetentionPolicyExists {
s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour})
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour}); err != influxdb.ErrRetentionPolicyExists {
t.Fatal(err)
}
}

// Ensure the server returns an error when creating a retention policy with a duration less than one hour.
func TestServer_CreateRetentionPolicy_ErrRetentionPolicyMinDuration(t *testing.T) {
s := OpenServer(NewMessagingClient())
defer s.Close()
s.CreateDatabase("foo")
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Minute}); err != influxdb.ErrRetentionPolicyMinDuration {
t.Fatal(err)
}
}
Expand All @@ -603,7 +613,7 @@ func TestServer_AlterRetentionPolicy(t *testing.T) {
}

// Alter the retention policy.
duration := time.Minute
duration := 2 * time.Hour
replicaN := uint32(3)
rp2 := &influxdb.RetentionPolicyUpdate{
Duration: &duration,
Expand Down Expand Up @@ -646,14 +656,67 @@ func TestServer_AlterRetentionPolicy(t *testing.T) {
}
}

// Ensure the server an error is returned if trying to alter a retention policy with a duration too small.
func TestServer_AlterRetentionPolicy_Minduration(t *testing.T) {
s := OpenServer(NewMessagingClient())
defer s.Close()

// Create a database.
if err := s.CreateDatabase("foo"); err != nil {
t.Fatal(err)
}

// Create a retention policy on the database.
rp := &influxdb.RetentionPolicy{
Name: "bar",
Duration: time.Hour,
ReplicaN: 2,
}
if err := s.CreateRetentionPolicy("foo", rp); err != nil {
t.Fatal(err)
}

// Alter the retention policy.
duration := 2 * time.Hour
replicaN := uint32(3)
rp2 := &influxdb.RetentionPolicyUpdate{
Duration: &duration,
ReplicaN: &replicaN,
}
if err := s.UpdateRetentionPolicy("foo", "bar", rp2); err != nil {
t.Fatal(err)
}

// Restart the server to make sure the changes persist afterwards.
s.Restart()

// Verify that the policy exists.
if o, err := s.RetentionPolicy("foo", "bar"); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if o == nil {
t.Fatalf("retention policy not found")
} else if o.Duration != *rp2.Duration {
t.Fatalf("retention policy mismatch:\n\texp Duration = %s\n\tgot Duration = %s\n", rp2.Duration, o.Duration)
} else if o.ReplicaN != *rp2.ReplicaN {
t.Fatalf("retention policy mismatch:\n\texp ReplicaN = %d\n\tgot ReplicaN = %d\n", rp2.ReplicaN, o.ReplicaN)
}

// Test update duration only.
duration = time.Hour
results := s.ExecuteQuery(MustParseQuery(`ALTER RETENTION POLICY bar ON foo DURATION 1m`), "foo", nil)
if results.Error() == nil {
t.Fatalf("unexpected error: %s", results.Error())
}
}

// Ensure the server can delete an existing retention policy.
func TestServer_DeleteRetentionPolicy(t *testing.T) {
s := OpenServer(NewMessagingClient())
defer s.Close()

// Create a database and retention policy.
s.CreateDatabase("foo")
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != nil {
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour}); err != nil {
t.Fatal(err)
} else if rp, _ := s.RetentionPolicy("foo", "bar"); rp == nil {
t.Fatal("retention policy not created")
Expand Down Expand Up @@ -707,7 +770,7 @@ func TestServer_SetDefaultRetentionPolicy(t *testing.T) {
defer s.Close()
s.CreateDatabase("foo")

rp := &influxdb.RetentionPolicy{Name: "bar", ShardGroupDuration: 7 * time.Hour * 24}
rp := &influxdb.RetentionPolicy{Name: "bar", ShardGroupDuration: time.Hour, Duration: time.Hour}
if err := s.CreateRetentionPolicy("foo", rp); err != nil {
t.Fatal(err)
} else if rp, _ := s.RetentionPolicy("foo", "bar"); rp == nil {
Expand Down Expand Up @@ -1630,7 +1693,7 @@ func TestServer_CreateShardGroupIfNotExist(t *testing.T) {
defer s.Close()
s.CreateDatabase("foo")

if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != nil {
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour}); err != nil {
t.Fatal(err)
}

Expand All @@ -1653,7 +1716,7 @@ func TestServer_DeleteShardGroup(t *testing.T) {
defer s.Close()
s.CreateDatabase("foo")

if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != nil {
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour}); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -1759,13 +1822,13 @@ func TestServer_NormalizeMeasurement(t *testing.T) {

// Default database with one policy.
s.CreateDatabase("db0")
s.CreateRetentionPolicy("db0", &influxdb.RetentionPolicy{Name: "rp0"})
s.CreateRetentionPolicy("db0", &influxdb.RetentionPolicy{Name: "rp0", Duration: time.Hour})
s.SetDefaultRetentionPolicy("db0", "rp0")

// Another database with two policies.
s.CreateDatabase("db1")
s.CreateRetentionPolicy("db1", &influxdb.RetentionPolicy{Name: "rp1"})
s.CreateRetentionPolicy("db1", &influxdb.RetentionPolicy{Name: "rp2"})
s.CreateRetentionPolicy("db1", &influxdb.RetentionPolicy{Name: "rp1", Duration: time.Hour})
s.CreateRetentionPolicy("db1", &influxdb.RetentionPolicy{Name: "rp2", Duration: time.Hour})
s.SetDefaultRetentionPolicy("db1", "rp1")

// Another database with no policies.
Expand Down Expand Up @@ -1804,7 +1867,7 @@ func TestServer_NormalizeQuery(t *testing.T) {
s := OpenServer(NewMessagingClient())
defer s.Close()
s.CreateDatabase("db0")
s.CreateRetentionPolicy("db0", &influxdb.RetentionPolicy{Name: "rp0"})
s.CreateRetentionPolicy("db0", &influxdb.RetentionPolicy{Name: "rp0", Duration: time.Hour})
s.SetDefaultRetentionPolicy("db0", "rp0")

// Execute the tests
Expand All @@ -1828,7 +1891,7 @@ func TestServer_CreateContinuousQuery(t *testing.T) {
if err := s.CreateDatabase("foo"); err != nil {
t.Fatal(err)
}
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != nil {
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour}); err != nil {
t.Fatal(err)
}
s.SetDefaultRetentionPolicy("foo", "bar")
Expand Down

0 comments on commit bb35021

Please sign in to comment.