Skip to content

Commit

Permalink
Merge pull request #7024 from influxdata/jw-drop
Browse files Browse the repository at this point in the history
Bug fixes
  • Loading branch information
jwilder authored Jul 18, 2016
2 parents 27650da + b692ef4 commit d8fcd9f
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 38 deletions.
18 changes: 18 additions & 0 deletions pkg/limiter/fixed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package limiter

// Fixed is a simple channel based concurrency limiter. It uses a fixed
// size channel to limit callers from proceeding until there is a value avalable
// in the channel. If all are in-use, the caller blocks until one is freed.
type Fixed chan struct{}

func NewFixed(limit int) Fixed {
return make(Fixed, limit)
}

func (t Fixed) Take() {
t <- struct{}{}
}

func (t Fixed) Release() {
<-t
}
40 changes: 37 additions & 3 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,18 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
}

iter := NewCacheKeyIterator(cache, tsdb.DefaultMaxPointsPerBlock)
return c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter)
files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter)

// See if we were closed while writing a snapshot
c.mu.RLock()
opened = c.opened
c.mu.RUnlock()

if !opened {
return nil, errSnapshotsDisabled
}

return files, err
}

// Compact will write multiple smaller TSM files into 1 or more larger files
Expand Down Expand Up @@ -601,7 +612,18 @@ func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) {
return nil, errCompactionsDisabled
}

return c.compact(false, tsmFiles)
files, err := c.compact(false, tsmFiles)

// See if we were closed while writing a snapshot
c.mu.RLock()
opened = c.opened
c.mu.RUnlock()

if !opened {
return nil, errCompactionsDisabled
}

return files, err
}

// Compact will write multiple smaller TSM files into 1 or more larger files
Expand All @@ -614,7 +636,19 @@ func (c *Compactor) CompactFast(tsmFiles []string) ([]string, error) {
return nil, errCompactionsDisabled
}

return c.compact(true, tsmFiles)
files, err := c.compact(true, tsmFiles)

// See if we were closed while writing a snapshot
c.mu.RLock()
opened = c.opened
c.mu.RUnlock()

if !opened {
return nil, errCompactionsDisabled
}

return files, err

}

// writeNewFiles will write from the iterator into new TSM files, rotating
Expand Down
20 changes: 11 additions & 9 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,6 @@ func (e *Engine) SetCompactionsEnabled(enabled bool) {
go e.compactTSMLevel(true, 1)
go e.compactTSMLevel(true, 2)
go e.compactTSMLevel(false, 3)

e.logger.Printf("compactions enabled for: %v", e.path)
} else {
e.mu.Lock()
if !e.compactionsEnabled {
Expand All @@ -179,8 +177,6 @@ func (e *Engine) SetCompactionsEnabled(enabled bool) {

// Wait for compaction goroutines to exit
e.wg.Wait()

e.logger.Printf("compactions disabled for: %v", e.path)
}
}

Expand Down Expand Up @@ -600,6 +596,12 @@ func (e *Engine) DeleteSeriesRange(seriesKeys []string, min, max int64) error {
return nil
}

// Disable and abort running compactions so that tombstones added existing tsm
// files don't get removed. This would cause deleted measurements/series to
// re-appear once the compaction completed.
e.SetCompactionsEnabled(false)
defer e.SetCompactionsEnabled(true)

e.mu.RLock()
defer e.mu.RUnlock()

Expand Down Expand Up @@ -771,7 +773,7 @@ func (e *Engine) compactCache() {
if e.ShouldCompactCache(e.WAL.LastWriteTime()) {
start := time.Now()
err := e.WriteSnapshot()
if err != nil {
if err != nil && err != errCompactionsDisabled {
e.logger.Printf("error writing snapshot: %v", err)
atomic.AddInt64(&e.stats.CacheCompactionErrors, 1)
} else {
Expand Down Expand Up @@ -832,15 +834,15 @@ func (e *Engine) compactTSMLevel(fast bool, level int) {

if fast {
files, err = e.Compactor.CompactFast(group)
if err != nil {
if err != nil && err != errCompactionsDisabled {
e.logger.Printf("error compacting TSM files: %v", err)
atomic.AddInt64(&e.stats.TSMCompactionErrors[level-1], 1)
time.Sleep(time.Second)
return
}
} else {
files, err = e.Compactor.CompactFull(group)
if err != nil {
if err != nil && err != errCompactionsDisabled {
e.logger.Printf("error compacting TSM files: %v", err)
atomic.AddInt64(&e.stats.TSMCompactionErrors[level-1], 1)
time.Sleep(time.Second)
Expand Down Expand Up @@ -915,7 +917,7 @@ func (e *Engine) compactTSMFull() {
)
if optimize {
files, err = e.Compactor.CompactFast(group)
if err != nil {
if err != nil && err != errCompactionsDisabled {
e.logger.Printf("error compacting TSM files: %v", err)
atomic.AddInt64(&e.stats.TSMOptimizeCompactionErrors, 1)

Expand All @@ -924,7 +926,7 @@ func (e *Engine) compactTSMFull() {
}
} else {
files, err = e.Compactor.CompactFull(group)
if err != nil {
if err != nil && err != errCompactionsDisabled {
e.logger.Printf("error compacting TSM files: %v", err)
atomic.AddInt64(&e.stats.TSMFullCompactionErrors, 1)

Expand Down
16 changes: 13 additions & 3 deletions tsdb/engine/tsm1/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/golang/snappy"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/limiter"
)

const (
Expand Down Expand Up @@ -62,8 +63,9 @@ var (

// Statistics gathered by the WAL.
const (
statWALOldBytes = "oldSegmentsDiskBytes"
statWALCurrentBytes = "currentSegmentDiskBytes"
statWALOldBytes = "oldSegmentsDiskBytes"
statWALCurrentBytes = "currentSegmentDiskBytes"
defaultWaitingWALWrites = 10
)

type WAL struct {
Expand All @@ -90,7 +92,8 @@ type WAL struct {
LoggingEnabled bool

// statistics for the WAL
stats *WALStatistics
stats *WALStatistics
limiter limiter.Fixed
}

func NewWAL(path string) *WAL {
Expand All @@ -103,6 +106,7 @@ func NewWAL(path string) *WAL {
logger: log.New(os.Stderr, "[tsm1wal] ", log.LstdFlags),
closing: make(chan struct{}),
stats: &WALStatistics{},
limiter: limiter.NewFixed(defaultWaitingWALWrites),
}
}

Expand Down Expand Up @@ -277,6 +281,12 @@ func (l *WAL) LastWriteTime() time.Time {
}

func (l *WAL) writeToLog(entry WALEntry) (int, error) {
// limit how many concurrent encodings can be in flight. Since we can only
// write one at a time to disk, a slow disk can cause the allocations below
// to increase quickly. If we're backed up, wait until others have completed.
l.limiter.Take()
defer l.limiter.Release()

// encode and compress the entry while we're not locked
bytes := getBuf(walEncodeBufSize)
defer putBuf(bytes)
Expand Down
4 changes: 4 additions & 0 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ type ShardStatistics struct {

// Statistics returns statistics for periodic monitoring.
func (s *Shard) Statistics(tags map[string]string) []models.Statistic {
if err := s.ready(); err != nil {
return nil
}

tags = s.statTags.Merge(tags)
statistics := []models.Statistic{{
Name: "shard",
Expand Down
30 changes: 7 additions & 23 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/limiter"
)

var (
Expand Down Expand Up @@ -145,7 +146,7 @@ func (s *Store) loadShards() error {
err error
}

throttle := newthrottle(runtime.GOMAXPROCS(0))
t := limiter.NewFixed(runtime.GOMAXPROCS(0))

resC := make(chan *res)
var n int
Expand All @@ -171,8 +172,8 @@ func (s *Store) loadShards() error {
for _, sh := range shards {
n++
go func(index *DatabaseIndex, db, rp, sh string) {
throttle.take()
defer throttle.release()
t.Take()
defer t.Release()

start := time.Now()
path := filepath.Join(s.path, db, rp, sh)
Expand Down Expand Up @@ -514,7 +515,7 @@ func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error {
err error
}

throttle := newthrottle(runtime.GOMAXPROCS(0))
t := limiter.NewFixed(runtime.GOMAXPROCS(0))

resC := make(chan res)
var n int
Expand All @@ -523,8 +524,8 @@ func (s *Store) walkShards(shards []*Shard, fn func(sh *Shard) error) error {
n++

go func(sh *Shard) {
throttle.take()
defer throttle.release()
t.Take()
defer t.Release()

if err := fn(sh); err != nil {
resC <- res{err: fmt.Errorf("shard %d: %s", sh.id, err)}
Expand Down Expand Up @@ -914,20 +915,3 @@ func measurementsFromSourcesOrDB(db *DatabaseIndex, sources ...influxql.Source)

return measurements, nil
}

// throttle is a simple channel based concurrency limiter. It uses a fixed
// size channel to limit callers from proceeding until there is a value avalable
// in the channel. If all are in-use, the caller blocks until one is freed.
type throttle chan struct{}

func newthrottle(limit int) throttle {
return make(throttle, limit)
}

func (t throttle) take() {
t <- struct{}{}
}

func (t throttle) release() {
<-t
}

0 comments on commit d8fcd9f

Please sign in to comment.