diff --git a/pkg/limiter/fixed.go b/pkg/limiter/fixed.go new file mode 100644 index 00000000000..a1b2e999cd3 --- /dev/null +++ b/pkg/limiter/fixed.go @@ -0,0 +1,18 @@ +package limiter + +// Fixed is a simple channel based concurrency limiter. It uses a fixed +// size channel to limit callers from proceeding until there is a value avalable +// in the channel. If all are in-use, the caller blocks until one is freed. +type Fixed chan struct{} + +func NewFixed(limit int) Fixed { + return make(Fixed, limit) +} + +func (t Fixed) Take() { + t <- struct{}{} +} + +func (t Fixed) Release() { + <-t +} diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index b4989ece1bb..b40019a0364 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -534,7 +534,18 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { } iter := NewCacheKeyIterator(cache, tsdb.DefaultMaxPointsPerBlock) - return c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter) + files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter) + + // See if we were closed while writing a snapshot + c.mu.RLock() + opened = c.opened + c.mu.RUnlock() + + if !opened { + return nil, errSnapshotsDisabled + } + + return files, err } // Compact will write multiple smaller TSM files into 1 or more larger files @@ -601,7 +612,18 @@ func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) { return nil, errCompactionsDisabled } - return c.compact(false, tsmFiles) + files, err := c.compact(false, tsmFiles) + + // See if we were closed while writing a snapshot + c.mu.RLock() + opened = c.opened + c.mu.RUnlock() + + if !opened { + return nil, errCompactionsDisabled + } + + return files, err } // Compact will write multiple smaller TSM files into 1 or more larger files @@ -614,7 +636,19 @@ func (c *Compactor) CompactFast(tsmFiles []string) ([]string, error) { return nil, errCompactionsDisabled } - return c.compact(true, tsmFiles) + files, err := c.compact(true, tsmFiles) + + // See if we were closed while writing a snapshot + c.mu.RLock() + opened = c.opened + c.mu.RUnlock() + + if !opened { + return nil, errCompactionsDisabled + } + + return files, err + } // writeNewFiles will write from the iterator into new TSM files, rotating diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 116ce0c13f2..7063b69b3bb 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -159,8 +159,6 @@ func (e *Engine) SetCompactionsEnabled(enabled bool) { go e.compactTSMLevel(true, 1) go e.compactTSMLevel(true, 2) go e.compactTSMLevel(false, 3) - - e.logger.Printf("compactions enabled for: %v", e.path) } else { e.mu.Lock() if !e.compactionsEnabled { @@ -179,8 +177,6 @@ func (e *Engine) SetCompactionsEnabled(enabled bool) { // Wait for compaction goroutines to exit e.wg.Wait() - - e.logger.Printf("compactions disabled for: %v", e.path) } } @@ -600,6 +596,12 @@ func (e *Engine) DeleteSeriesRange(seriesKeys []string, min, max int64) error { return nil } + // Disable and abort running compactions so that tombstones added existing tsm + // files don't get removed. This would cause deleted measurements/series to + // re-appear once the compaction completed. + e.SetCompactionsEnabled(false) + defer e.SetCompactionsEnabled(true) + e.mu.RLock() defer e.mu.RUnlock() @@ -771,7 +773,7 @@ func (e *Engine) compactCache() { if e.ShouldCompactCache(e.WAL.LastWriteTime()) { start := time.Now() err := e.WriteSnapshot() - if err != nil { + if err != nil && err != errCompactionsDisabled { e.logger.Printf("error writing snapshot: %v", err) atomic.AddInt64(&e.stats.CacheCompactionErrors, 1) } else { @@ -832,7 +834,7 @@ func (e *Engine) compactTSMLevel(fast bool, level int) { if fast { files, err = e.Compactor.CompactFast(group) - if err != nil { + if err != nil && err != errCompactionsDisabled { e.logger.Printf("error compacting TSM files: %v", err) atomic.AddInt64(&e.stats.TSMCompactionErrors[level-1], 1) time.Sleep(time.Second) @@ -840,7 +842,7 @@ func (e *Engine) compactTSMLevel(fast bool, level int) { } } else { files, err = e.Compactor.CompactFull(group) - if err != nil { + if err != nil && err != errCompactionsDisabled { e.logger.Printf("error compacting TSM files: %v", err) atomic.AddInt64(&e.stats.TSMCompactionErrors[level-1], 1) time.Sleep(time.Second) @@ -915,7 +917,7 @@ func (e *Engine) compactTSMFull() { ) if optimize { files, err = e.Compactor.CompactFast(group) - if err != nil { + if err != nil && err != errCompactionsDisabled { e.logger.Printf("error compacting TSM files: %v", err) atomic.AddInt64(&e.stats.TSMOptimizeCompactionErrors, 1) @@ -924,7 +926,7 @@ func (e *Engine) compactTSMFull() { } } else { files, err = e.Compactor.CompactFull(group) - if err != nil { + if err != nil && err != errCompactionsDisabled { e.logger.Printf("error compacting TSM files: %v", err) atomic.AddInt64(&e.stats.TSMFullCompactionErrors, 1) diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index 3926f93ef10..f5d94ee20c3 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -18,6 +18,7 @@ import ( "github.com/golang/snappy" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/pkg/limiter" ) const ( @@ -62,8 +63,9 @@ var ( // Statistics gathered by the WAL. const ( - statWALOldBytes = "oldSegmentsDiskBytes" - statWALCurrentBytes = "currentSegmentDiskBytes" + statWALOldBytes = "oldSegmentsDiskBytes" + statWALCurrentBytes = "currentSegmentDiskBytes" + defaultWaitingWALWrites = 10 ) type WAL struct { @@ -90,7 +92,8 @@ type WAL struct { LoggingEnabled bool // statistics for the WAL - stats *WALStatistics + stats *WALStatistics + limiter limiter.Fixed } func NewWAL(path string) *WAL { @@ -103,6 +106,7 @@ func NewWAL(path string) *WAL { logger: log.New(os.Stderr, "[tsm1wal] ", log.LstdFlags), closing: make(chan struct{}), stats: &WALStatistics{}, + limiter: limiter.NewFixed(defaultWaitingWALWrites), } } @@ -277,6 +281,12 @@ func (l *WAL) LastWriteTime() time.Time { } func (l *WAL) writeToLog(entry WALEntry) (int, error) { + // limit how many concurrent encodings can be in flight. Since we can only + // write one at a time to disk, a slow disk can cause the allocations below + // to increase quickly. If we're backed up, wait until others have completed. + l.limiter.Take() + defer l.limiter.Release() + // encode and compress the entry while we're not locked bytes := getBuf(walEncodeBufSize) defer putBuf(bytes) diff --git a/tsdb/shard.go b/tsdb/shard.go index e537686096d..0747484f91f 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -172,6 +172,10 @@ type ShardStatistics struct { // Statistics returns statistics for periodic monitoring. func (s *Shard) Statistics(tags map[string]string) []models.Statistic { + if err := s.ready(); err != nil { + return nil + } + tags = s.statTags.Merge(tags) statistics := []models.Statistic{{ Name: "shard", diff --git a/tsdb/store.go b/tsdb/store.go index 58895776146..91ecf41f950 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -16,6 +16,7 @@ import ( "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/pkg/limiter" ) var ( @@ -145,7 +146,7 @@ func (s *Store) loadShards() error { err error } - throttle := newthrottle(runtime.GOMAXPROCS(0)) + t := limiter.NewFixed(runtime.GOMAXPROCS(0)) resC := make(chan *res) var n int @@ -171,8 +172,8 @@ func (s *Store) loadShards() error { for _, sh := range shards { n++ go func(index *DatabaseIndex, db, rp, sh string) { - throttle.take() - defer throttle.release() + t.Take() + defer t.Release() start := time.Now() path := filepath.Join(s.path, db, rp, sh) @@ -514,7 +515,7 @@ func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error { err error } - throttle := newthrottle(runtime.GOMAXPROCS(0)) + t := limiter.NewFixed(runtime.GOMAXPROCS(0)) resC := make(chan res) var n int @@ -523,8 +524,8 @@ func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error { n++ go func(sh *Shard) { - throttle.take() - defer throttle.release() + t.Take() + defer t.Release() if err := fn(sh); err != nil { resC <- res{err: fmt.Errorf("shard %d: %s", sh.id, err)} @@ -914,20 +915,3 @@ func measurementsFromSourcesOrDB(db *DatabaseIndex, sources ...influxql.Source) return measurements, nil } - -// throttle is a simple channel based concurrency limiter. It uses a fixed -// size channel to limit callers from proceeding until there is a value avalable -// in the channel. If all are in-use, the caller blocks until one is freed. -type throttle chan struct{} - -func newthrottle(limit int) throttle { - return make(throttle, limit) -} - -func (t throttle) take() { - t <- struct{}{} -} - -func (t throttle) release() { - <-t -}