Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HH should not process dropped nodes #4377

Merged
merged 3 commits into from
Oct 9, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
- [#4344](https://github.com/influxdb/influxdb/issues/4344): Make client.Write default to client.precision if none is given.
- [#3429](https://github.com/influxdb/influxdb/issues/3429)): Incorrect parsing of regex containing '/'
- [#4374](https://github.com/influxdb/influxdb/issues/4374)): Add tsm1 quickcheck tests
- [#4377](https://github.com/influxdb/influxdb/pull/4377): Hinted handoff should not process dropped nodes

## v0.9.4 [2015-09-14]

Expand Down
2 changes: 1 addition & 1 deletion cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
s.ShardWriter.MetaStore = s.MetaStore

// Create the hinted handoff service
s.HintedHandoff = hh.NewService(c.HintedHandoff, s.ShardWriter)
s.HintedHandoff = hh.NewService(c.HintedHandoff, s.ShardWriter, s.MetaStore)

// Initialize points writer.
s.PointsWriter = cluster.NewPointsWriter()
Expand Down
74 changes: 67 additions & 7 deletions services/hh/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ type Processor struct {
maxAge time.Duration
retryRateLimit int64

queues map[uint64]*queue
writer shardWriter
Logger *log.Logger
queues map[uint64]*queue
meta metaStore
writer shardWriter
metastore metaStore
Logger *log.Logger

// Shard-level and node-level HH stats.
shardStatMaps map[uint64]*expvar.Map
Expand All @@ -50,11 +52,12 @@ type ProcessorOptions struct {
RetryRateLimit int64
}

func NewProcessor(dir string, writer shardWriter, options ProcessorOptions) (*Processor, error) {
func NewProcessor(dir string, writer shardWriter, metastore metaStore, options ProcessorOptions) (*Processor, error) {
p := &Processor{
dir: dir,
queues: map[uint64]*queue{},
writer: writer,
metastore: metastore,
Logger: log.New(os.Stderr, "[handoff] ", log.LstdFlags),
shardStatMaps: make(map[uint64]*expvar.Map),
nodeStatMaps: make(map[uint64]*expvar.Map),
Expand Down Expand Up @@ -164,8 +167,13 @@ func (p *Processor) Process() error {
p.mu.RLock()
defer p.mu.RUnlock()

res := make(chan error, len(p.queues))
for nodeID, q := range p.queues {
activeQueues, err := p.activeQueues()
if err != nil {
return err
}

res := make(chan error, len(activeQueues))
for nodeID, q := range activeQueues {
go func(nodeID uint64, q *queue) {

// Log how many writes we successfully sent at the end
Expand Down Expand Up @@ -234,7 +242,7 @@ func (p *Processor) Process() error {
}(nodeID, q)
}

for range p.queues {
for range activeQueues {
err := <-res
if err != nil {
return err
Expand Down Expand Up @@ -273,6 +281,20 @@ func (p *Processor) updateShardStats(shardID uint64, stat string, inc int64) {
m.Add(stat, inc)
}

func (p *Processor) activeQueues() (map[uint64]*queue, error) {
queues := make(map[uint64]*queue)
for id, q := range p.queues {
ni, err := p.metastore.Node(id)
if err != nil {
return nil, err
}
if ni != nil {
queues[id] = q
}
}
return queues, nil
}

func (p *Processor) PurgeOlderThan(when time.Duration) error {
p.mu.Lock()
defer p.mu.Unlock()
Expand All @@ -284,3 +306,41 @@ func (p *Processor) PurgeOlderThan(when time.Duration) error {
}
return nil
}

func (p *Processor) PurgeInactiveOlderThan(when time.Duration) error {
p.mu.Lock()
defer p.mu.Unlock()

deletedQueues := make([]uint64, 0)
for nodeID, queue := range p.queues {
// Only delete queues for inactive nodes.
ni, err := p.metastore.Node(nodeID)
if err != nil {
return err
}
if ni != nil {
continue
}

last, err := queue.LastModified()
if err != nil {
return err
}
if last.Before(time.Now().Add(-when)) {
// Close and remove the queue.
if err := queue.Close(); err != nil {
return err
}
if err := queue.Remove(); err != nil {
return err
}

deletedQueues = append(deletedQueues, nodeID)
}
}

for _, id := range deletedQueues {
delete(p.queues, id)
}
return nil
}
77 changes: 70 additions & 7 deletions services/hh/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/models"
)

Expand All @@ -16,14 +17,22 @@ func (f *fakeShardWriter) WriteShard(shardID, nodeID uint64, points []models.Poi
return f.ShardWriteFn(shardID, nodeID, points)
}

type fakeMetaStore struct {
NodeFn func(nodeID uint64) (*meta.NodeInfo, error)
}

func (f *fakeMetaStore) Node(nodeID uint64) (*meta.NodeInfo, error) {
return f.NodeFn(nodeID)
}

func TestProcessorProcess(t *testing.T) {
dir, err := ioutil.TempDir("", "processor_test")
if err != nil {
t.Fatalf("failed to create temp dir: %v", err)
}

// expected data to be queue and sent to the shardWriter
var expShardID, expNodeID, count = uint64(100), uint64(200), 0
var expShardID, activeNodeID, inactiveNodeID, count = uint64(100), uint64(200), uint64(300), 0
pt := models.NewPoint("cpu", models.Tags{"foo": "bar"}, models.Fields{"value": 1.0}, time.Unix(0, 0))

sh := &fakeShardWriter{
Expand All @@ -32,8 +41,8 @@ func TestProcessorProcess(t *testing.T) {
if shardID != expShardID {
t.Errorf("Process() shardID mismatch: got %v, exp %v", shardID, expShardID)
}
if nodeID != expNodeID {
t.Errorf("Process() nodeID mismatch: got %v, exp %v", nodeID, expNodeID)
if nodeID != activeNodeID {
t.Errorf("Process() nodeID mismatch: got %v, exp %v", nodeID, activeNodeID)
}

if exp := 1; len(points) != exp {
Expand All @@ -47,14 +56,27 @@ func TestProcessorProcess(t *testing.T) {
return nil
},
}
metastore := &fakeMetaStore{
NodeFn: func(nodeID uint64) (*meta.NodeInfo, error) {
if nodeID == activeNodeID {
return &meta.NodeInfo{}, nil
}
return nil, nil
},
}

p, err := NewProcessor(dir, sh, ProcessorOptions{MaxSize: 1024})
p, err := NewProcessor(dir, sh, metastore, ProcessorOptions{MaxSize: 1024})
if err != nil {
t.Fatalf("Process() failed to create processor: %v", err)
}

// This should queue the writes
if err := p.WriteShard(expShardID, expNodeID, []models.Point{pt}); err != nil {
// This should queue a write for the active node.
if err := p.WriteShard(expShardID, activeNodeID, []models.Point{pt}); err != nil {
t.Fatalf("Process() failed to write points: %v", err)
}

// This should queue a write for the inactive node.
if err := p.WriteShard(expShardID, inactiveNodeID, []models.Point{pt}); err != nil {
t.Fatalf("Process() failed to write points: %v", err)
}

Expand All @@ -67,7 +89,7 @@ func TestProcessorProcess(t *testing.T) {
t.Fatalf("Process() write count mismatch: got %v, exp %v", count, exp)
}

// Queue should be empty so no writes should be send again
// All active nodes should have been handled so no writes should be sent again
if err := p.Process(); err != nil {
t.Fatalf("Process() failed to write points: %v", err)
}
Expand All @@ -77,4 +99,45 @@ func TestProcessorProcess(t *testing.T) {
t.Fatalf("Process() write count mismatch: got %v, exp %v", count, exp)
}

// Make the inactive node active.
sh.ShardWriteFn = func(shardID, nodeID uint64, points []models.Point) error {
count += 1
if shardID != expShardID {
t.Errorf("Process() shardID mismatch: got %v, exp %v", shardID, expShardID)
}
if nodeID != inactiveNodeID {
t.Errorf("Process() nodeID mismatch: got %v, exp %v", nodeID, activeNodeID)
}

if exp := 1; len(points) != exp {
t.Fatalf("Process() points mismatch: got %v, exp %v", len(points), exp)
}

if points[0].String() != pt.String() {
t.Fatalf("Process() points mismatch:\n got %v\n exp %v", points[0].String(), pt.String())
}

return nil
}
metastore.NodeFn = func(nodeID uint64) (*meta.NodeInfo, error) {
return &meta.NodeInfo{}, nil
}

// This should send the final write to the shard writer
if err := p.Process(); err != nil {
t.Fatalf("Process() failed to write points: %v", err)
}

if exp := 2; count != exp {
t.Fatalf("Process() write count mismatch: got %v, exp %v", count, exp)
}

// All queues should have been handled, so no more writes should result.
if err := p.Process(); err != nil {
t.Fatalf("Process() failed to write points: %v", err)
}

if exp := 2; count != exp {
t.Fatalf("Process() write count mismatch: got %v, exp %v", count, exp)
}
}
24 changes: 24 additions & 0 deletions services/hh/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,19 @@ func (l *queue) Close() error {
return nil
}

// Remove removes all underlying file-based resources for the queue.
// It is an error to call this on an open queue.
func (l *queue) Remove() error {
l.mu.Lock()
defer l.mu.Unlock()

if l.head != nil || l.tail != nil || l.segments != nil {
return fmt.Errorf("queue is open")
}

return os.RemoveAll(l.dir)
}

// SetMaxSegmentSize updates the max segment size for new and existing
// segments.
func (l *queue) SetMaxSegmentSize(size int64) error {
Expand Down Expand Up @@ -181,6 +194,17 @@ func (l *queue) PurgeOlderThan(when time.Time) error {
}
}

// LastModified returns the last time the queue was modified.
func (l *queue) LastModified() (time.Time, error) {
l.mu.RLock()
defer l.mu.RUnlock()

if l.tail != nil {
return l.tail.lastModified()
}
return time.Time{}, nil
}

// diskUsage returns the total size on disk used by the queue
func (l *queue) diskUsage() int64 {
var size int64
Expand Down
32 changes: 25 additions & 7 deletions services/hh/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/models"
)

Expand Down Expand Up @@ -38,15 +39,20 @@ type Service struct {
WriteShard(shardID, ownerID uint64, points []models.Point) error
Process() error
PurgeOlderThan(when time.Duration) error
PurgeInactiveOlderThan(when time.Duration) error
}
}

type shardWriter interface {
WriteShard(shardID, ownerID uint64, points []models.Point) error
}

type metaStore interface {
Node(id uint64) (ni *meta.NodeInfo, err error)
}

// NewService returns a new instance of Service.
func NewService(c Config, w shardWriter) *Service {
func NewService(c Config, w shardWriter, m metaStore) *Service {
key := strings.Join([]string{"hh", c.Dir}, ":")
tags := map[string]string{"path": c.Dir}

Expand All @@ -55,7 +61,7 @@ func NewService(c Config, w shardWriter) *Service {
statMap: influxdb.NewStatistics(key, "hh", tags),
Logger: log.New(os.Stderr, "[handoff] ", log.LstdFlags),
}
processor, err := NewProcessor(c.Dir, w, ProcessorOptions{
processor, err := NewProcessor(c.Dir, w, m, ProcessorOptions{
MaxSize: c.MaxSize,
RetryRateLimit: c.RetryRateLimit,
})
Expand Down Expand Up @@ -83,9 +89,10 @@ func (s *Service) Open() error {

s.Logger.Printf("Using data dir: %v", s.cfg.Dir)

s.wg.Add(2)
s.wg.Add(3)
go s.retryWrites()
go s.expireWrites()
go s.deleteInactiveQueues()

return nil
}
Expand Down Expand Up @@ -165,8 +172,19 @@ func (s *Service) expireWrites() {
}
}

// purgeWrites will cause the handoff queues to remove writes that are no longer
// valid. e.g. queued writes for a node that has been removed
func (s *Service) purgeWrites() {
panic("not implemented")
// deleteInactiveQueues will cause the service to remove queues for inactive nodes.
func (s *Service) deleteInactiveQueues() {
defer s.wg.Done()
ticker := time.NewTicker(time.Hour)
defer ticker.Stop()
for {
select {
case <-s.closing:
return
case <-ticker.C:
if err := s.HintedHandoff.PurgeInactiveOlderThan(time.Duration(s.cfg.MaxAge)); err != nil {
s.Logger.Printf("delete queues failed: %v", err)
}
}
}
}