Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid compaction queue stats flutter #22245

Merged
merged 3 commits into from
Aug 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
### Bug Fixes

1. [22242](https://github.com/influxdata/influxdb/pull/22242): Preserve comments in flux queries when saving task definitions
1. [22245](https://github.com/influxdata/influxdb/pull/22245): Avoid compaction queue stats flutter
1. [22236](https://github.com/influxdata/influxdb/pull/22236): influxdb2 packages should depend on curl

## v2.0.8 [2021-08-13]
Expand Down
44 changes: 22 additions & 22 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ type CompactionGroup []string
// CompactionPlanner determines what TSM files and WAL segments to include in a
// given compaction run.
type CompactionPlanner interface {
Plan(lastWrite time.Time) []CompactionGroup
PlanLevel(level int) []CompactionGroup
PlanOptimize() []CompactionGroup
Plan(lastWrite time.Time) ([]CompactionGroup, int64)
PlanLevel(level int) ([]CompactionGroup, int64)
PlanOptimize() ([]CompactionGroup, int64)
Release(group []CompactionGroup)
FullyCompacted() (bool, string)

Expand Down Expand Up @@ -234,13 +234,13 @@ func (c *DefaultPlanner) ForceFull() {
}

// PlanLevel returns a set of TSM files to rewrite for a specific level.
func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
func (c *DefaultPlanner) PlanLevel(level int) ([]CompactionGroup, int64) {
// If a full plan has been requested, don't plan any levels which will prevent
// the full plan from acquiring them.
c.mu.RLock()
if c.forceFull {
c.mu.RUnlock()
return nil
return nil, 0
}
c.mu.RUnlock()

Expand All @@ -252,7 +252,7 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
// If there is only one generation and no tombstones, then there's nothing to
// do.
if len(generations) <= 1 && !generations.hasTombstones() {
return nil
return nil, 0
}

// Group each generation by level such that two adjacent generations in the same
Expand Down Expand Up @@ -321,22 +321,22 @@ func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
}

if !c.acquire(cGroups) {
return nil
return nil, int64(len(cGroups))
}

return cGroups
return cGroups, int64(len(cGroups))
}

// PlanOptimize returns all TSM files if they are in different generations in order
// to optimize the index across TSM files. Each returned compaction group can be
// compacted concurrently.
func (c *DefaultPlanner) PlanOptimize() []CompactionGroup {
func (c *DefaultPlanner) PlanOptimize() ([]CompactionGroup, int64) {
// If a full plan has been requested, don't plan any levels which will prevent
// the full plan from acquiring them.
c.mu.RLock()
if c.forceFull {
c.mu.RUnlock()
return nil
return nil, 0
}
c.mu.RUnlock()

Expand All @@ -348,7 +348,7 @@ func (c *DefaultPlanner) PlanOptimize() []CompactionGroup {
// If there is only one generation and no tombstones, then there's nothing to
// do.
if len(generations) <= 1 && !generations.hasTombstones() {
return nil
return nil, 0
}

// Group each generation by level such that two adjacent generations in the same
Expand Down Expand Up @@ -413,15 +413,15 @@ func (c *DefaultPlanner) PlanOptimize() []CompactionGroup {
}

if !c.acquire(cGroups) {
return nil
return nil, int64(len(cGroups))
}

return cGroups
return cGroups, int64(len(cGroups))
}

// Plan returns a set of TSM files to rewrite for level 4 or higher. The planning returns
// multiple groups if possible to allow compactions to run concurrently.
func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
func (c *DefaultPlanner) Plan(lastWrite time.Time) ([]CompactionGroup, int64) {
generations := c.findGenerations(true)

c.mu.RLock()
Expand Down Expand Up @@ -471,27 +471,27 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {

// Make sure we have more than 1 file and more than 1 generation
if len(tsmFiles) <= 1 || genCount <= 1 {
return nil
return nil, 0
}

group := []CompactionGroup{tsmFiles}
if !c.acquire(group) {
return nil
return nil, int64(len(group))
}
return group
return group, int64(len(group))
}

// don't plan if nothing has changed in the filestore
if c.lastPlanCheck.After(c.FileStore.LastModified()) && !generations.hasTombstones() {
return nil
return nil, 0
}

c.lastPlanCheck = time.Now()

// If there is only one generation, return early to avoid re-compacting the same file
// over and over again.
if len(generations) <= 1 && !generations.hasTombstones() {
return nil
return nil, 0
}

// Need to find the ending point for level 4 files. They will be the oldest files. We scan
Expand Down Expand Up @@ -584,7 +584,7 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
}

if len(groups) == 0 {
return nil
return nil, 0
}

// With the groups, we need to evaluate whether the group as a whole can be compacted
Expand Down Expand Up @@ -612,9 +612,9 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
}

if !c.acquire(tsmFiles) {
return nil
return nil, int64(len(tsmFiles))
}
return tsmFiles
return tsmFiles, int64(len(tsmFiles))
}

// findGenerations groups all the TSM files by generation based
Expand Down
Loading