Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce retention policy minimum (currently 1 hour) #1902

Merged
merged 6 commits into from
Mar 10, 2015
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## v0.9.0-rc11 [unreleased]

### Features
- [#1902](https://github.com/influxdb/influxdb/pull/1902): Enforce retention policies to have a minimum duraton.

## v0.9.0-rc10 [2015-03-09]

### Bugfixes
Expand Down
2 changes: 1 addition & 1 deletion httpd/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func TestHandler_UpdateRetentionPolicy(t *testing.T) {
s := NewHTTPServer(srvr)
defer s.Close()

query := map[string]string{"q": "ALTER RETENTION POLICY bar ON foo REPLICATION 42 DURATION 1m DEFAULT"}
query := map[string]string{"q": "ALTER RETENTION POLICY bar ON foo REPLICATION 42 DURATION 2h DEFAULT"}
status, body := MustHTTP("GET", s.URL+`/query`, query, nil, "")

// Verify updated policy.
Expand Down
7 changes: 7 additions & 0 deletions influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import (
"github.com/influxdb/influxdb/client"
)

const (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this could be defined in a better place -- server.go, with the other constants there. It makes more sense to group it with those, as opposed to here. influxdb.go is in the same package as server.go so you can still reference the constant in this file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think I had a "duh" moment there. I thought I looked for constants but I clearly missed them. Moved.

retentionPolicyMinDuration = time.Hour
)

var (
// ErrServerOpen is returned when opening an already open server.
ErrServerOpen = errors.New("server already open")
Expand Down Expand Up @@ -74,6 +78,9 @@ var (
// ErrRetentionPolicyNameRequired is returned using a blank shard space name.
ErrRetentionPolicyNameRequired = errors.New("retention policy name required")

// ErrRetentionPolicyMinDuration is returned when creating replicaiton policy with a duration smaller than RetenionPolicyMinDuration.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: typo in comment.

ErrRetentionPolicyMinDuration = fmt.Errorf("retention policy duration needs to be at least %s", retentionPolicyMinDuration)

// ErrDefaultRetentionPolicyNotFound is returned when using the default
// policy on a database but the default has not been set.
ErrDefaultRetentionPolicyNotFound = errors.New("default retention policy not found")
Expand Down
24 changes: 17 additions & 7 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1269,6 +1269,11 @@ func (s *Server) RetentionPolicies(database string) ([]*RetentionPolicy, error)

// CreateRetentionPolicy creates a retention policy for a database.
func (s *Server) CreateRetentionPolicy(database string, rp *RetentionPolicy) error {
// Enforce duration of at least retentionPolicyMinDuration
if rp.Duration < retentionPolicyMinDuration {
return ErrRetentionPolicyMinDuration
}

const (
day = time.Hour * 24
month = day * 30
Expand Down Expand Up @@ -1335,6 +1340,11 @@ type RetentionPolicyUpdate struct {

// UpdateRetentionPolicy updates an existing retention policy on a database.
func (s *Server) UpdateRetentionPolicy(database, name string, rpu *RetentionPolicyUpdate) error {
// Enforce duration of at least retentionPolicyMinDuration
if *rpu.Duration < retentionPolicyMinDuration {
return ErrRetentionPolicyMinDuration
}

c := &updateRetentionPolicyCommand{Database: database, Name: name, Policy: rpu}
_, err := s.broadcast(updateRetentionPolicyMessageType, c)
return err
Expand Down Expand Up @@ -2558,20 +2568,20 @@ func (s *Server) executeShowUsersStatement(q *influxql.ShowUsersStatement, user
return &Result{Series: []*influxql.Row{row}}
}

func (s *Server) executeCreateRetentionPolicyStatement(q *influxql.CreateRetentionPolicyStatement, user *User) *Result {
rp := NewRetentionPolicy(q.Name)
rp.Duration = q.Duration
rp.ReplicaN = uint32(q.Replication)
func (s *Server) executeCreateRetentionPolicyStatement(stmt *influxql.CreateRetentionPolicyStatement, user *User) *Result {
rp := NewRetentionPolicy(stmt.Name)
rp.Duration = stmt.Duration
rp.ReplicaN = uint32(stmt.Replication)

// Create new retention policy.
err := s.CreateRetentionPolicy(q.Database, rp)
err := s.CreateRetentionPolicy(stmt.Database, rp)
if err != nil {
return &Result{Err: err}
}

// If requested, set new policy as the default.
if q.Default {
err = s.SetDefaultRetentionPolicy(q.Database, q.Name)
if stmt.Default {
err = s.SetDefaultRetentionPolicy(stmt.Database, stmt.Name)
}

return &Result{Err: err}
Expand Down
91 changes: 77 additions & 14 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func TestServer_CreateRetentionPolicy(t *testing.T) {
func TestServer_CreateRetentionPolicy_ErrDatabaseNotFound(t *testing.T) {
s := OpenServer(NewMessagingClient())
defer s.Close()
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != influxdb.ErrDatabaseNotFound {
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour}); err != influxdb.ErrDatabaseNotFound {
t.Fatal(err)
}
}
Expand All @@ -566,7 +566,7 @@ func TestServer_CreateRetentionPolicy_ErrRetentionPolicyNameRequired(t *testing.
s := OpenServer(NewMessagingClient())
defer s.Close()
s.CreateDatabase("foo")
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: ""}); err != influxdb.ErrRetentionPolicyNameRequired {
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "", Duration: time.Hour}); err != influxdb.ErrRetentionPolicyNameRequired {
t.Fatal(err)
}
}
Expand All @@ -576,8 +576,18 @@ func TestServer_CreateRetentionPolicy_ErrRetentionPolicyExists(t *testing.T) {
s := OpenServer(NewMessagingClient())
defer s.Close()
s.CreateDatabase("foo")
s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"})
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != influxdb.ErrRetentionPolicyExists {
s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour})
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour}); err != influxdb.ErrRetentionPolicyExists {
t.Fatal(err)
}
}

// Ensure the server returns an error when creating a retention policy with a duration less than one hour.
func TestServer_CreateRetentionPolicy_ErrRetentionPolicyMinDuration(t *testing.T) {
s := OpenServer(NewMessagingClient())
defer s.Close()
s.CreateDatabase("foo")
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Minute}); err != influxdb.ErrRetentionPolicyMinDuration {
t.Fatal(err)
}
}
Expand All @@ -603,7 +613,7 @@ func TestServer_AlterRetentionPolicy(t *testing.T) {
}

// Alter the retention policy.
duration := time.Minute
duration := 2 * time.Hour
replicaN := uint32(3)
rp2 := &influxdb.RetentionPolicyUpdate{
Duration: &duration,
Expand Down Expand Up @@ -646,14 +656,67 @@ func TestServer_AlterRetentionPolicy(t *testing.T) {
}
}

// Ensure the server an error is returned if trying to alter a retention policy with a duration too small.
func TestServer_AlterRetentionPolicy_Minduration(t *testing.T) {
s := OpenServer(NewMessagingClient())
defer s.Close()

// Create a database.
if err := s.CreateDatabase("foo"); err != nil {
t.Fatal(err)
}

// Create a retention policy on the database.
rp := &influxdb.RetentionPolicy{
Name: "bar",
Duration: time.Hour,
ReplicaN: 2,
}
if err := s.CreateRetentionPolicy("foo", rp); err != nil {
t.Fatal(err)
}

// Alter the retention policy.
duration := 2 * time.Hour
replicaN := uint32(3)
rp2 := &influxdb.RetentionPolicyUpdate{
Duration: &duration,
ReplicaN: &replicaN,
}
if err := s.UpdateRetentionPolicy("foo", "bar", rp2); err != nil {
t.Fatal(err)
}

// Restart the server to make sure the changes persist afterwards.
s.Restart()

// Verify that the policy exists.
if o, err := s.RetentionPolicy("foo", "bar"); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if o == nil {
t.Fatalf("retention policy not found")
} else if o.Duration != *rp2.Duration {
t.Fatalf("retention policy mismatch:\n\texp Duration = %s\n\tgot Duration = %s\n", rp2.Duration, o.Duration)
} else if o.ReplicaN != *rp2.ReplicaN {
t.Fatalf("retention policy mismatch:\n\texp ReplicaN = %d\n\tgot ReplicaN = %d\n", rp2.ReplicaN, o.ReplicaN)
}

// Test update duration only.
duration = time.Hour
results := s.ExecuteQuery(MustParseQuery(`ALTER RETENTION POLICY bar ON foo DURATION 1m`), "foo", nil)
if results.Error() == nil {
t.Fatalf("unexpected error: %s", results.Error())
}
}

// Ensure the server can delete an existing retention policy.
func TestServer_DeleteRetentionPolicy(t *testing.T) {
s := OpenServer(NewMessagingClient())
defer s.Close()

// Create a database and retention policy.
s.CreateDatabase("foo")
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != nil {
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour}); err != nil {
t.Fatal(err)
} else if rp, _ := s.RetentionPolicy("foo", "bar"); rp == nil {
t.Fatal("retention policy not created")
Expand Down Expand Up @@ -707,7 +770,7 @@ func TestServer_SetDefaultRetentionPolicy(t *testing.T) {
defer s.Close()
s.CreateDatabase("foo")

rp := &influxdb.RetentionPolicy{Name: "bar", ShardGroupDuration: 7 * time.Hour * 24}
rp := &influxdb.RetentionPolicy{Name: "bar", ShardGroupDuration: time.Hour, Duration: time.Hour}
if err := s.CreateRetentionPolicy("foo", rp); err != nil {
t.Fatal(err)
} else if rp, _ := s.RetentionPolicy("foo", "bar"); rp == nil {
Expand Down Expand Up @@ -1630,7 +1693,7 @@ func TestServer_CreateShardGroupIfNotExist(t *testing.T) {
defer s.Close()
s.CreateDatabase("foo")

if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != nil {
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour}); err != nil {
t.Fatal(err)
}

Expand All @@ -1653,7 +1716,7 @@ func TestServer_DeleteShardGroup(t *testing.T) {
defer s.Close()
s.CreateDatabase("foo")

if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != nil {
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour}); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -1759,13 +1822,13 @@ func TestServer_NormalizeMeasurement(t *testing.T) {

// Default database with one policy.
s.CreateDatabase("db0")
s.CreateRetentionPolicy("db0", &influxdb.RetentionPolicy{Name: "rp0"})
s.CreateRetentionPolicy("db0", &influxdb.RetentionPolicy{Name: "rp0", Duration: time.Hour})
s.SetDefaultRetentionPolicy("db0", "rp0")

// Another database with two policies.
s.CreateDatabase("db1")
s.CreateRetentionPolicy("db1", &influxdb.RetentionPolicy{Name: "rp1"})
s.CreateRetentionPolicy("db1", &influxdb.RetentionPolicy{Name: "rp2"})
s.CreateRetentionPolicy("db1", &influxdb.RetentionPolicy{Name: "rp1", Duration: time.Hour})
s.CreateRetentionPolicy("db1", &influxdb.RetentionPolicy{Name: "rp2", Duration: time.Hour})
s.SetDefaultRetentionPolicy("db1", "rp1")

// Another database with no policies.
Expand Down Expand Up @@ -1804,7 +1867,7 @@ func TestServer_NormalizeQuery(t *testing.T) {
s := OpenServer(NewMessagingClient())
defer s.Close()
s.CreateDatabase("db0")
s.CreateRetentionPolicy("db0", &influxdb.RetentionPolicy{Name: "rp0"})
s.CreateRetentionPolicy("db0", &influxdb.RetentionPolicy{Name: "rp0", Duration: time.Hour})
s.SetDefaultRetentionPolicy("db0", "rp0")

// Execute the tests
Expand All @@ -1828,7 +1891,7 @@ func TestServer_CreateContinuousQuery(t *testing.T) {
if err := s.CreateDatabase("foo"); err != nil {
t.Fatal(err)
}
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != nil {
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour}); err != nil {
t.Fatal(err)
}
s.SetDefaultRetentionPolicy("foo", "bar")
Expand Down