diff --git a/CHANGELOG.md b/CHANGELOG.md index 7610fd50c8e..e5540d93419 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,7 @@ - [#4344](https://github.com/influxdb/influxdb/issues/4344): Make client.Write default to client.precision if none is given. - [#3429](https://github.com/influxdb/influxdb/issues/3429)): Incorrect parsing of regex containing '/' - [#4374](https://github.com/influxdb/influxdb/issues/4374)): Add tsm1 quickcheck tests +- [#4377](https://github.com/influxdb/influxdb/pull/4377): Hinted handoff should not process dropped nodes ## v0.9.4 [2015-09-14] diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 0d23c0f1aa8..9931b9902a7 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -125,7 +125,7 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) { s.ShardWriter.MetaStore = s.MetaStore // Create the hinted handoff service - s.HintedHandoff = hh.NewService(c.HintedHandoff, s.ShardWriter) + s.HintedHandoff = hh.NewService(c.HintedHandoff, s.ShardWriter, s.MetaStore) // Initialize points writer. s.PointsWriter = cluster.NewPointsWriter() diff --git a/services/hh/processor.go b/services/hh/processor.go index 1b3b5b323ad..86d3913b78a 100644 --- a/services/hh/processor.go +++ b/services/hh/processor.go @@ -36,9 +36,11 @@ type Processor struct { maxAge time.Duration retryRateLimit int64 - queues map[uint64]*queue - writer shardWriter - Logger *log.Logger + queues map[uint64]*queue + meta metaStore + writer shardWriter + metastore metaStore + Logger *log.Logger // Shard-level and node-level HH stats. shardStatMaps map[uint64]*expvar.Map @@ -50,11 +52,12 @@ type ProcessorOptions struct { RetryRateLimit int64 } -func NewProcessor(dir string, writer shardWriter, options ProcessorOptions) (*Processor, error) { +func NewProcessor(dir string, writer shardWriter, metastore metaStore, options ProcessorOptions) (*Processor, error) { p := &Processor{ dir: dir, queues: map[uint64]*queue{}, writer: writer, + metastore: metastore, Logger: log.New(os.Stderr, "[handoff] ", log.LstdFlags), shardStatMaps: make(map[uint64]*expvar.Map), nodeStatMaps: make(map[uint64]*expvar.Map), @@ -164,8 +167,13 @@ func (p *Processor) Process() error { p.mu.RLock() defer p.mu.RUnlock() - res := make(chan error, len(p.queues)) - for nodeID, q := range p.queues { + activeQueues, err := p.activeQueues() + if err != nil { + return err + } + + res := make(chan error, len(activeQueues)) + for nodeID, q := range activeQueues { go func(nodeID uint64, q *queue) { // Log how many writes we successfully sent at the end @@ -234,7 +242,7 @@ func (p *Processor) Process() error { }(nodeID, q) } - for range p.queues { + for range activeQueues { err := <-res if err != nil { return err @@ -273,6 +281,20 @@ func (p *Processor) updateShardStats(shardID uint64, stat string, inc int64) { m.Add(stat, inc) } +func (p *Processor) activeQueues() (map[uint64]*queue, error) { + queues := make(map[uint64]*queue) + for id, q := range p.queues { + ni, err := p.metastore.Node(id) + if err != nil { + return nil, err + } + if ni != nil { + queues[id] = q + } + } + return queues, nil +} + func (p *Processor) PurgeOlderThan(when time.Duration) error { p.mu.Lock() defer p.mu.Unlock() @@ -284,3 +306,41 @@ func (p *Processor) PurgeOlderThan(when time.Duration) error { } return nil } + +func (p *Processor) PurgeInactiveOlderThan(when time.Duration) error { + p.mu.Lock() + defer p.mu.Unlock() + + deletedQueues := make([]uint64, 0) + for nodeID, queue := range p.queues { + // Only delete queues for inactive nodes. + ni, err := p.metastore.Node(nodeID) + if err != nil { + return err + } + if ni != nil { + continue + } + + last, err := queue.LastModified() + if err != nil { + return err + } + if last.Before(time.Now().Add(-when)) { + // Close and remove the queue. + if err := queue.Close(); err != nil { + return err + } + if err := queue.Remove(); err != nil { + return err + } + + deletedQueues = append(deletedQueues, nodeID) + } + } + + for _, id := range deletedQueues { + delete(p.queues, id) + } + return nil +} diff --git a/services/hh/processor_test.go b/services/hh/processor_test.go index 6ce93b971ed..fa74c83388d 100644 --- a/services/hh/processor_test.go +++ b/services/hh/processor_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/influxdb/influxdb/meta" "github.com/influxdb/influxdb/models" ) @@ -16,6 +17,14 @@ func (f *fakeShardWriter) WriteShard(shardID, nodeID uint64, points []models.Poi return f.ShardWriteFn(shardID, nodeID, points) } +type fakeMetaStore struct { + NodeFn func(nodeID uint64) (*meta.NodeInfo, error) +} + +func (f *fakeMetaStore) Node(nodeID uint64) (*meta.NodeInfo, error) { + return f.NodeFn(nodeID) +} + func TestProcessorProcess(t *testing.T) { dir, err := ioutil.TempDir("", "processor_test") if err != nil { @@ -23,7 +32,7 @@ func TestProcessorProcess(t *testing.T) { } // expected data to be queue and sent to the shardWriter - var expShardID, expNodeID, count = uint64(100), uint64(200), 0 + var expShardID, activeNodeID, inactiveNodeID, count = uint64(100), uint64(200), uint64(300), 0 pt := models.NewPoint("cpu", models.Tags{"foo": "bar"}, models.Fields{"value": 1.0}, time.Unix(0, 0)) sh := &fakeShardWriter{ @@ -32,8 +41,8 @@ func TestProcessorProcess(t *testing.T) { if shardID != expShardID { t.Errorf("Process() shardID mismatch: got %v, exp %v", shardID, expShardID) } - if nodeID != expNodeID { - t.Errorf("Process() nodeID mismatch: got %v, exp %v", nodeID, expNodeID) + if nodeID != activeNodeID { + t.Errorf("Process() nodeID mismatch: got %v, exp %v", nodeID, activeNodeID) } if exp := 1; len(points) != exp { @@ -47,14 +56,27 @@ func TestProcessorProcess(t *testing.T) { return nil }, } + metastore := &fakeMetaStore{ + NodeFn: func(nodeID uint64) (*meta.NodeInfo, error) { + if nodeID == activeNodeID { + return &meta.NodeInfo{}, nil + } + return nil, nil + }, + } - p, err := NewProcessor(dir, sh, ProcessorOptions{MaxSize: 1024}) + p, err := NewProcessor(dir, sh, metastore, ProcessorOptions{MaxSize: 1024}) if err != nil { t.Fatalf("Process() failed to create processor: %v", err) } - // This should queue the writes - if err := p.WriteShard(expShardID, expNodeID, []models.Point{pt}); err != nil { + // This should queue a write for the active node. + if err := p.WriteShard(expShardID, activeNodeID, []models.Point{pt}); err != nil { + t.Fatalf("Process() failed to write points: %v", err) + } + + // This should queue a write for the inactive node. + if err := p.WriteShard(expShardID, inactiveNodeID, []models.Point{pt}); err != nil { t.Fatalf("Process() failed to write points: %v", err) } @@ -67,7 +89,7 @@ func TestProcessorProcess(t *testing.T) { t.Fatalf("Process() write count mismatch: got %v, exp %v", count, exp) } - // Queue should be empty so no writes should be send again + // All active nodes should have been handled so no writes should be sent again if err := p.Process(); err != nil { t.Fatalf("Process() failed to write points: %v", err) } @@ -77,4 +99,45 @@ func TestProcessorProcess(t *testing.T) { t.Fatalf("Process() write count mismatch: got %v, exp %v", count, exp) } + // Make the inactive node active. + sh.ShardWriteFn = func(shardID, nodeID uint64, points []models.Point) error { + count += 1 + if shardID != expShardID { + t.Errorf("Process() shardID mismatch: got %v, exp %v", shardID, expShardID) + } + if nodeID != inactiveNodeID { + t.Errorf("Process() nodeID mismatch: got %v, exp %v", nodeID, activeNodeID) + } + + if exp := 1; len(points) != exp { + t.Fatalf("Process() points mismatch: got %v, exp %v", len(points), exp) + } + + if points[0].String() != pt.String() { + t.Fatalf("Process() points mismatch:\n got %v\n exp %v", points[0].String(), pt.String()) + } + + return nil + } + metastore.NodeFn = func(nodeID uint64) (*meta.NodeInfo, error) { + return &meta.NodeInfo{}, nil + } + + // This should send the final write to the shard writer + if err := p.Process(); err != nil { + t.Fatalf("Process() failed to write points: %v", err) + } + + if exp := 2; count != exp { + t.Fatalf("Process() write count mismatch: got %v, exp %v", count, exp) + } + + // All queues should have been handled, so no more writes should result. + if err := p.Process(); err != nil { + t.Fatalf("Process() failed to write points: %v", err) + } + + if exp := 2; count != exp { + t.Fatalf("Process() write count mismatch: got %v, exp %v", count, exp) + } } diff --git a/services/hh/queue.go b/services/hh/queue.go index c4b1966fccf..e940e7fa8a5 100644 --- a/services/hh/queue.go +++ b/services/hh/queue.go @@ -134,6 +134,19 @@ func (l *queue) Close() error { return nil } +// Remove removes all underlying file-based resources for the queue. +// It is an error to call this on an open queue. +func (l *queue) Remove() error { + l.mu.Lock() + defer l.mu.Unlock() + + if l.head != nil || l.tail != nil || l.segments != nil { + return fmt.Errorf("queue is open") + } + + return os.RemoveAll(l.dir) +} + // SetMaxSegmentSize updates the max segment size for new and existing // segments. func (l *queue) SetMaxSegmentSize(size int64) error { @@ -181,6 +194,17 @@ func (l *queue) PurgeOlderThan(when time.Time) error { } } +// LastModified returns the last time the queue was modified. +func (l *queue) LastModified() (time.Time, error) { + l.mu.RLock() + defer l.mu.RUnlock() + + if l.tail != nil { + return l.tail.lastModified() + } + return time.Time{}, nil +} + // diskUsage returns the total size on disk used by the queue func (l *queue) diskUsage() int64 { var size int64 diff --git a/services/hh/service.go b/services/hh/service.go index 88860350eb5..455428a2608 100644 --- a/services/hh/service.go +++ b/services/hh/service.go @@ -11,6 +11,7 @@ import ( "time" "github.com/influxdb/influxdb" + "github.com/influxdb/influxdb/meta" "github.com/influxdb/influxdb/models" ) @@ -38,6 +39,7 @@ type Service struct { WriteShard(shardID, ownerID uint64, points []models.Point) error Process() error PurgeOlderThan(when time.Duration) error + PurgeInactiveOlderThan(when time.Duration) error } } @@ -45,8 +47,12 @@ type shardWriter interface { WriteShard(shardID, ownerID uint64, points []models.Point) error } +type metaStore interface { + Node(id uint64) (ni *meta.NodeInfo, err error) +} + // NewService returns a new instance of Service. -func NewService(c Config, w shardWriter) *Service { +func NewService(c Config, w shardWriter, m metaStore) *Service { key := strings.Join([]string{"hh", c.Dir}, ":") tags := map[string]string{"path": c.Dir} @@ -55,7 +61,7 @@ func NewService(c Config, w shardWriter) *Service { statMap: influxdb.NewStatistics(key, "hh", tags), Logger: log.New(os.Stderr, "[handoff] ", log.LstdFlags), } - processor, err := NewProcessor(c.Dir, w, ProcessorOptions{ + processor, err := NewProcessor(c.Dir, w, m, ProcessorOptions{ MaxSize: c.MaxSize, RetryRateLimit: c.RetryRateLimit, }) @@ -83,9 +89,10 @@ func (s *Service) Open() error { s.Logger.Printf("Using data dir: %v", s.cfg.Dir) - s.wg.Add(2) + s.wg.Add(3) go s.retryWrites() go s.expireWrites() + go s.deleteInactiveQueues() return nil } @@ -165,8 +172,19 @@ func (s *Service) expireWrites() { } } -// purgeWrites will cause the handoff queues to remove writes that are no longer -// valid. e.g. queued writes for a node that has been removed -func (s *Service) purgeWrites() { - panic("not implemented") +// deleteInactiveQueues will cause the service to remove queues for inactive nodes. +func (s *Service) deleteInactiveQueues() { + defer s.wg.Done() + ticker := time.NewTicker(time.Hour) + defer ticker.Stop() + for { + select { + case <-s.closing: + return + case <-ticker.C: + if err := s.HintedHandoff.PurgeInactiveOlderThan(time.Duration(s.cfg.MaxAge)); err != nil { + s.Logger.Printf("delete queues failed: %v", err) + } + } + } }