Skip to content

Commit

Permalink
Merge pull request #2776 from influxdb/enforce_retention_2_phase
Browse files Browse the repository at this point in the history
Enforce retention policies
  • Loading branch information
otoolep committed Jun 5, 2015
2 parents 4ba7c5d + f3fa2b8 commit ca9f231
Show file tree
Hide file tree
Showing 11 changed files with 279 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- [2751](https://github.com/influxdb/influxdb/pull/2751): Add UDP input. UPD only supports the line protocol now.

### Bugfixes
- [2776](https://github.com/influxdb/influxdb/issues/2776): Re-implement retention policy enforcement.
- [2635](https://github.com/influxdb/influxdb/issues/2635): Fix querying against boolean field in WHERE clause.
- [2644](https://github.com/influxdb/influxdb/issues/2644): Make SHOW queries work with FROM /<regex>/.
- [2501](https://github.com/influxdb/influxdb/issues/2501): Name the FlagSet for the shell and add a version flag. Thanks @neonstalwart
Expand Down
8 changes: 5 additions & 3 deletions cmd/influxd/run/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/influxdb/influxdb/services/httpd"
"github.com/influxdb/influxdb/services/monitor"
"github.com/influxdb/influxdb/services/opentsdb"
"github.com/influxdb/influxdb/services/retention"
"github.com/influxdb/influxdb/services/udp"
"github.com/influxdb/influxdb/tsdb"
)
Expand Down Expand Up @@ -69,9 +70,10 @@ type Config struct {
JoinURLs string `toml:"join-urls"`
} `toml:"initialization"`

Meta meta.Config `toml:"meta"`
Data tsdb.Config `toml:"data"`
Cluster cluster.Config `toml:"cluster"`
Meta meta.Config `toml:"meta"`
Data tsdb.Config `toml:"data"`
Cluster cluster.Config `toml:"cluster"`
Retention retention.Config `toml:"retention"`

Admin admin.Config `toml:"admin"`
HTTPD httpd.Config `toml:"api"`
Expand Down
12 changes: 12 additions & 0 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/influxdb/influxdb/services/graphite"
"github.com/influxdb/influxdb/services/httpd"
"github.com/influxdb/influxdb/services/opentsdb"
"github.com/influxdb/influxdb/services/retention"
"github.com/influxdb/influxdb/services/udp"
"github.com/influxdb/influxdb/tsdb"
)
Expand Down Expand Up @@ -57,6 +58,7 @@ func NewServer(c *Config, joinURLs string) *Server {
s.appendCollectdService(c.Collectd)
s.appendOpenTSDBService(c.OpenTSDB)
s.appendUDPService(c.UDP)
s.appendRetentionPolicyService(c.Retention)
for _, g := range c.Graphites {
s.appendGraphiteService(g)
}
Expand All @@ -70,6 +72,16 @@ func (s *Server) appendClusterService(c cluster.Config) {
s.Services = append(s.Services, srv)
}

func (s *Server) appendRetentionPolicyService(c retention.Config) {
if !c.Enabled {
return
}
srv := retention.NewService(c)
srv.MetaStore = s.MetaStore
srv.TSDBStore = s.TSDBStore
s.Services = append(s.Services, srv)
}

func (s *Server) appendAdminService(c admin.Config) {
srv := admin.NewService(c)
s.Services = append(s.Services, srv)
Expand Down
48 changes: 43 additions & 5 deletions meta/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,14 @@ func (data *Data) ShardGroups(database, policy string) ([]ShardGroupInfo, error)
} else if rpi == nil {
return nil, ErrRetentionPolicyNotFound
}
return rpi.ShardGroups, nil
groups := make([]ShardGroupInfo, 0, len(rpi.ShardGroups))
for _, g := range rpi.ShardGroups {
if g.Deleted() {
continue
}
groups = append(groups, g)
}
return groups, nil
}

// ShardGroupByTimestamp returns the shard group on a database and policy for a given timestamp.
Expand Down Expand Up @@ -331,10 +338,10 @@ func (data *Data) DeleteShardGroup(database, policy string, id uint64) error {
return ErrRetentionPolicyNotFound
}

// Find shard group by ID and remove it.
// Find shard group by ID and set its deletion timestamp.
for i := range rpi.ShardGroups {
if rpi.ShardGroups[i].ID == id {
rpi.ShardGroups = append(rpi.ShardGroups[:i], rpi.ShardGroups[i+1:]...)
rpi.ShardGroups[i].DeletedAt = time.Now().UTC()
return nil
}
}
Expand Down Expand Up @@ -568,13 +575,35 @@ func NewRetentionPolicyInfo(name string) *RetentionPolicyInfo {
// ShardGroupByTimestamp returns the shard group in the policy that contains the timestamp.
func (rpi *RetentionPolicyInfo) ShardGroupByTimestamp(timestamp time.Time) *ShardGroupInfo {
for i := range rpi.ShardGroups {
if rpi.ShardGroups[i].Contains(timestamp) {
if rpi.ShardGroups[i].Contains(timestamp) && !rpi.ShardGroups[i].Deleted() {
return &rpi.ShardGroups[i]
}
}
return nil
}

// ExpiredShardGroups returns the Shard Groups which are considered expired, for the given time.
func (rpi *RetentionPolicyInfo) ExpiredShardGroups(t time.Time) []*ShardGroupInfo {
groups := make([]*ShardGroupInfo, 0)
for i := range rpi.ShardGroups {
if rpi.Duration != 0 && rpi.ShardGroups[i].EndTime.Add(rpi.Duration).Before(t) {
groups = append(groups, &rpi.ShardGroups[i])
}
}
return groups
}

// DeletedShardGroups returns the Shard Groups which are marked as deleted.
func (rpi *RetentionPolicyInfo) DeletedShardGroups() []*ShardGroupInfo {
groups := make([]*ShardGroupInfo, 0)
for i := range rpi.ShardGroups {
if rpi.ShardGroups[i].Deleted() {
groups = append(groups, &rpi.ShardGroups[i])
}
}
return groups
}

// protobuf returns a protocol buffers object.
func (rpi *RetentionPolicyInfo) protobuf() *internal.RetentionPolicyInfo {
return &internal.RetentionPolicyInfo{
Expand Down Expand Up @@ -609,12 +638,16 @@ func shardGroupDuration(d time.Duration) time.Duration {
return 1 * time.Hour
}

// ShardGroupInfo represents metadata about a shard group.
// ShardGroupInfo represents metadata about a shard group. The DeletedAt field is important
// because it makes it clear that a ShardGroup has been marked as deleted, and allow the system
// to be sure that a ShardGroup is not simply missing. If the DeletedAt is set, the system can
// safely delete any associated shards.
type ShardGroupInfo struct {
ID uint64
StartTime time.Time
EndTime time.Time
Shards []ShardInfo
DeletedAt time.Time
}

// Contains return true if the shard group contains data for the timestamp.
Expand All @@ -627,6 +660,11 @@ func (sgi *ShardGroupInfo) Overlaps(min, max time.Time) bool {
return !sgi.StartTime.After(max) && sgi.EndTime.After(min)
}

// Deleted returns whether this ShardGroup has been deleted.
func (sgi *ShardGroupInfo) Deleted() bool {
return !sgi.DeletedAt.IsZero()
}

// clone returns a deep copy of sgi.
func (sgi ShardGroupInfo) clone() ShardGroupInfo {
other := sgi
Expand Down
5 changes: 3 additions & 2 deletions meta/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,9 @@ func TestData_DeleteShardGroup(t *testing.T) {

if err := data.DeleteShardGroup("db0", "rp0", 1); err != nil {
t.Fatal(err)
} else if len(data.Databases[0].RetentionPolicies[0].ShardGroups) != 0 {
t.Fatalf("unexpected shard groups: %#v", data.Databases[0].RetentionPolicies[0].ShardGroups)
}
if sg := data.Databases[0].RetentionPolicies[0].ShardGroups[0]; !sg.Deleted() {
t.Fatalf("shard group not correctly flagged as deleted")
}
}

Expand Down
39 changes: 38 additions & 1 deletion meta/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,15 @@ func (s *Store) close() error {
return nil
}

// IsLeader returns whether this node is a leader of the cluster.
func (s *Store) IsLeader() bool {
// Check if store has already been closed.
if !s.opened {
return false
}
return s.raft.State() == raft.Leader
}

// readID reads the local node ID from the ID file.
func (s *Store) readID() error {
b, err := ioutil.ReadFile(s.IDPath())
Expand Down Expand Up @@ -516,7 +525,7 @@ func (s *Store) CreateShardGroupIfNotExists(database, policy string, timestamp t
// Try to find shard group locally first.
if sgi, err := s.ShardGroupByTimestamp(database, policy, timestamp); err != nil {
return nil, err
} else if sgi != nil {
} else if sgi != nil && !sgi.Deleted() {
return sgi, nil
}

Expand Down Expand Up @@ -551,6 +560,34 @@ func (s *Store) ShardGroups(database, policy string) (a []ShardGroupInfo, err er
return
}

// VisitRetentionPolicies calls the given function with full retention policy details.
func (s *Store) VisitRetentionPolicies(f func(d DatabaseInfo, r RetentionPolicyInfo)) {
s.read(func(data *Data) error {
for _, di := range data.Databases {
for _, rp := range di.RetentionPolicies {
f(di, rp)
}
}
return nil
})
return
}

// VisitShardGroups calls the given function with full shard group details.
func (s *Store) VisitShardGroups(f func(d DatabaseInfo, r RetentionPolicyInfo, s ShardGroupInfo)) {
s.read(func(data *Data) error {
for _, di := range data.Databases {
for _, rp := range di.RetentionPolicies {
for _, sg := range rp.ShardGroups {
f(di, rp, sg)
}
}
}
return nil
})
return
}

// ShardGroupByTimestamp returns a shard group for a policy by timestamp.
func (s *Store) ShardGroupByTimestamp(database, policy string, timestamp time.Time) (sgi *ShardGroupInfo, err error) {
err = s.read(func(data *Data) error {
Expand Down
10 changes: 10 additions & 0 deletions services/retention/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package retention

import (
"github.com/influxdb/influxdb/toml"
)

type Config struct {
Enabled bool `toml:"enabled"`
CheckInterval toml.Duration `toml:"check-interval"`
}
27 changes: 27 additions & 0 deletions services/retention/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package retention_test

import (
"testing"
"time"

"github.com/BurntSushi/toml"
"github.com/influxdb/influxdb/services/retention"
)

func TestConfig_Parse(t *testing.T) {
// Parse configuration.
var c retention.Config
if _, err := toml.Decode(`
enabled = true
check-interval = "1s"
`, &c); err != nil {
t.Fatal(err)
}

// Validate configuration.
if c.Enabled != true {
t.Fatalf("unexpected enabled state: %v", c.Enabled)
} else if time.Duration(c.CheckInterval) != time.Second {
t.Fatalf("unexpected check interval: %v", c.CheckInterval)
}
}
126 changes: 126 additions & 0 deletions services/retention/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package retention

import (
"log"
"os"
"sync"
"time"
)

import (
"github.com/influxdb/influxdb/meta"
)

// Service represents the retention policy enforcement service.
type Service struct {
MetaStore interface {
IsLeader() bool
VisitRetentionPolicies(f func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo))
DeleteShardGroup(database, policy string, id uint64) error
}
TSDBStore interface {
ShardIDs() []uint64
DeleteShard(shardID uint64) error
}

enabled bool
checkInterval time.Duration
wg sync.WaitGroup
done chan struct{}

logger *log.Logger
}

// NewService returns a configure retention policy enforcement service.
func NewService(c Config) *Service {
return &Service{
checkInterval: time.Duration(c.CheckInterval),
done: make(chan struct{}),
logger: log.New(os.Stderr, "[retention] ", log.LstdFlags),
}
}

// Open starts retention policy enforcement.
func (s *Service) Open() error {
s.wg.Add(2)
go s.deleteShardGroups()
go s.deleteShards()
return nil
}

// Close stops retention policy enforcement.
func (s *Service) Close() error {
close(s.done)
s.wg.Wait()
return nil
}

func (s *Service) deleteShardGroups() {
defer s.wg.Done()

ticker := time.NewTicker(s.checkInterval)
defer ticker.Stop()
for {
select {
case <-s.done:
s.logger.Println("retention policy enforcement terminating")
return

case <-ticker.C:
// Only run this on the leader, but always allow the loop to check
// as the leader can change.
if !s.MetaStore.IsLeader() {
continue
}
s.logger.Println("retention policy enforcement commencing")

s.MetaStore.VisitRetentionPolicies(func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo) {
for _, g := range r.ExpiredShardGroups(time.Now().UTC()) {
if err := s.MetaStore.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil {
s.logger.Printf("failed to delete shard group %d from database %s, retention policy %s: %s",
g.ID, d.Name, r.Name, err.Error())
} else {
s.logger.Printf("deleted shard group %d from database %s, retention policy %s",
g.ID, d.Name, r.Name)
}
}
})
}
}
}

func (s *Service) deleteShards() {
defer s.wg.Done()

ticker := time.NewTicker(s.checkInterval)
defer ticker.Stop()
for {
select {
case <-s.done:
s.logger.Println("retention policy enforcement terminating")
return

case <-ticker.C:
s.logger.Println("retention policy shard deletion commencing")

deletedShardIDs := make(map[uint64]struct{}, 0)
s.MetaStore.VisitRetentionPolicies(func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo) {
for _, g := range r.DeletedShardGroups() {
for _, sh := range g.Shards {
deletedShardIDs[sh.ID] = struct{}{}
}
}
})

for _, id := range s.TSDBStore.ShardIDs() {
if _, ok := deletedShardIDs[id]; ok {
if err := s.TSDBStore.DeleteShard(id); err != nil {
s.logger.Printf("failed to delete shard ID %d: %s", id, err.Error())
continue
}
s.logger.Printf("shard ID% deleted", id)
}
}
}
}
}
Loading

0 comments on commit ca9f231

Please sign in to comment.