Skip to content

Commit

Permalink
Adjust compaction planning
Browse files Browse the repository at this point in the history
Increase level 1 min criteria, fix only fast compactions getting run,
and fix very large generations getting included in optimize plans.
  • Loading branch information
jwilder committed Dec 15, 2017
1 parent 749c9d2 commit 2d85ff1
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 116 deletions.
109 changes: 71 additions & 38 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
}

minGenerations := 4
if level == 1 {
minGenerations = 8
}

var cGroups []CompactionGroup
for _, group := range levelGroups {
for _, chunk := range group.chunk(minGenerations) {
Expand Down Expand Up @@ -314,6 +318,11 @@ func (c *DefaultPlanner) PlanOptimize() []CompactionGroup {
for i := 0; i < len(generations); i++ {
cur := generations[i]

// Skip the file if it's over the max size and contains a full block and it does not have any tombstones
if cur.count() > 2 && cur.size() > uint64(maxTSMFileSize) && c.FileStore.BlockCount(cur.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !cur.hasTombstones() {
continue
}

// See if this generation is orphan'd which would prevent it from being further
// compacted until a final full compactin runs.
if i < len(generations)-1 {
Expand Down Expand Up @@ -542,7 +551,7 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
compactable := []tsmGenerations{}
for _, group := range groups {
//if we don't have enough generations to compact, skip it
if len(group) < 2 && !group.hasTombstones() {
if len(group) < 4 && !group.hasTombstones() {
continue
}
compactable = append(compactable, group)
Expand Down Expand Up @@ -673,8 +682,7 @@ type Compactor struct {
// lastSnapshotDuration is the amount of time the last snapshot took to complete.
lastSnapshotDuration time.Duration

// snapshotConcurrency is the amount of parallelism used to snapshot the cache.
snapshotConcurrency int
snapshotLatencies *latencies

// The channel to signal that any in progress snapshots should be aborted.
snapshotsInterrupt chan struct{}
Expand All @@ -696,7 +704,7 @@ func (c *Compactor) Open() {
c.compactionsEnabled = true
c.snapshotsInterrupt = make(chan struct{})
c.compactionsInterrupt = make(chan struct{})
c.snapshotConcurrency = 1
c.snapshotLatencies = &latencies{values: make([]time.Duration, 4)}

c.files = make(map[string]struct{})
}
Expand Down Expand Up @@ -765,14 +773,29 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
c.mu.RLock()
enabled := c.snapshotsEnabled
intC := c.snapshotsInterrupt
concurrency := c.snapshotConcurrency
c.mu.RUnlock()

if !enabled {
return nil, errSnapshotsDisabled
}

start := time.Now()
card := cache.Count()

// Enable throttling if we have lower cardinality or snapshots are going fast.
throttle := card < 3e6 && c.snapshotLatencies.avg() < 15*time.Second

// Write snapshost concurrently if cardinality is relatively high.
concurrency := card / 2e6
if concurrency < 1 {
concurrency = 1
}

// Special case very high cardinality, use max concurrency and don't throttle writes.
if card >= 3e6 {
concurrency = 4
throttle = false
}

splits := cache.Split(concurrency)

Expand All @@ -785,7 +808,7 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
for i := 0; i < concurrency; i++ {
go func(sp *Cache) {
iter := NewCacheKeyIterator(sp, tsdb.DefaultMaxPointsPerBlock, intC)
files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter)
files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, iter, throttle)
resC <- res{files: files, err: err}

}(splits[i])
Expand All @@ -802,35 +825,13 @@ func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
}

dur := time.Since(start).Truncate(time.Second)
maxConcurrency := runtime.GOMAXPROCS(0) / 2
if maxConcurrency < 1 {
maxConcurrency = 1
}
if maxConcurrency > 4 {
maxConcurrency = 4
}

c.mu.Lock()

// See if we were disabled while writing a snapshot
enabled = c.snapshotsEnabled

// See if we need to adjust our snapshot concurrency
if dur > 30*time.Second && dur > c.lastSnapshotDuration {
// Increase snapshot concurrency if they are running slow
c.snapshotConcurrency++
if c.snapshotConcurrency > maxConcurrency {
c.snapshotConcurrency = maxConcurrency
}
} else if dur < 30*time.Second && dur < c.lastSnapshotDuration {
// Decrease snapshot concurrency if they are running too fast
c.snapshotConcurrency--
if c.snapshotConcurrency < 1 {
c.snapshotConcurrency = 1
}
}

c.lastSnapshotDuration = dur
c.snapshotLatencies.add(time.Since(start))
c.mu.Unlock()

if !enabled {
Expand Down Expand Up @@ -899,7 +900,7 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
return nil, err
}

return c.writeNewFiles(maxGeneration, maxSequence, tsm)
return c.writeNewFiles(maxGeneration, maxSequence, tsm, true)
}

// CompactFull writes multiple smaller TSM files into 1 or more larger files.
Expand Down Expand Up @@ -980,7 +981,7 @@ func (c *Compactor) removeTmpFiles(files []string) error {

// writeNewFiles writes from the iterator into new TSM files, rotating
// to a new file once it has reached the max TSM file size.
func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([]string, error) {
func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator, throttle bool) ([]string, error) {
// These are the new TSM files written
var files []string

Expand All @@ -990,7 +991,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([
fileName := filepath.Join(c.Dir, fmt.Sprintf("%09d-%09d.%s.%s", generation, sequence, TSMFileExtension, TmpTSMFileExtension))

// Write as much as possible to this file
err := c.write(fileName, iter)
err := c.write(fileName, iter, throttle)

// We've hit the max file limit and there is more to write. Create a new file
// and continue.
Expand Down Expand Up @@ -1029,17 +1030,19 @@ func (c *Compactor) writeNewFiles(generation, sequence int, iter KeyIterator) ([
return files, nil
}

func (c *Compactor) write(path string, iter KeyIterator) (err error) {
func (c *Compactor) write(path string, iter KeyIterator, throttle bool) (err error) {
fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666)
if err != nil {
return errCompactionInProgress{err: err}
}

// Create the write for the new TSM file.
var w TSMWriter
var (
w TSMWriter
limitWriter io.Writer = fd
)

var limitWriter io.Writer = fd
if c.RateLimit != nil {
if c.RateLimit != nil && throttle {
limitWriter = limiter.NewWriterWithRate(fd, c.RateLimit)
}

Expand Down Expand Up @@ -1549,8 +1552,11 @@ func NewCacheKeyIterator(cache *Cache, size int, interrupt chan struct{}) KeyIte
}

func (c *cacheKeyIterator) EstimatedIndexSize() int {
// We return 0 here since we already have all the entries in memory to write an index.
return 0
var n int
for _, v := range c.order {
n += len(v)
}
return n
}

func (c *cacheKeyIterator) encode() {
Expand Down Expand Up @@ -1724,3 +1730,30 @@ func (a tsmGenerations) IsSorted() bool {
}
return true
}

type latencies struct {
i int
values []time.Duration
}

func (l *latencies) add(t time.Duration) {
l.values[l.i%len(l.values)] = t
l.i++
}

func (l *latencies) avg() time.Duration {
var n int64
var sum time.Duration
for _, v := range l.values {
if v == 0 {
continue
}
sum += v
n++
}

if n > 0 {
return time.Duration(int64(sum) / n)
}
return time.Duration(0)
}
Loading

0 comments on commit 2d85ff1

Please sign in to comment.