Skip to content

Commit

Permalink
Merge pull request #11791 from influxdata/bj-revert-limit-full-compac…
Browse files Browse the repository at this point in the history
…tion-1.8

Revert "Limit force-full and cold compaction size."
  • Loading branch information
benbjohnson authored Feb 11, 2019
2 parents 8b5b0fc + b87605f commit aa3dfc0
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 80 deletions.
28 changes: 3 additions & 25 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ const (
TSMFileExtension = "tsm"
)

// ColdCompactionSplitSize is the maximum number of files to compact together
// when performing a compaction on a cold shard.
const ColdCompactionSplitSize = 8

var (
errMaxFileExceeded = fmt.Errorf("max file exceeded")
errSnapshotsDisabled = fmt.Errorf("snapshots disabled")
Expand Down Expand Up @@ -457,16 +453,11 @@ func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
return nil
}

var groups []CompactionGroup
if forceFull {
groups = []CompactionGroup{tsmFiles}
} else {
groups = splitCompactionGroups(tsmFiles, ColdCompactionSplitSize)
}
if !c.acquire(groups) {
group := []CompactionGroup{tsmFiles}
if !c.acquire(group) {
return nil
}
return groups
return group
}

// don't plan if nothing has changed in the filestore
Expand Down Expand Up @@ -2107,16 +2098,3 @@ func (l *latencies) avg() time.Duration {
}
return time.Duration(0)
}

// splitCompactionGroups returns groups with a maximum size of sz.
func splitCompactionGroups(tsmFiles []string, sz int) []CompactionGroup {
var groups []CompactionGroup
for i := 0; i < len(tsmFiles); i += sz {
var group CompactionGroup
for j := 0; j < sz && i+j < len(tsmFiles); j++ {
group = append(group, tsmFiles[i+j])
}
groups = append(groups, group)
}
return groups
}
60 changes: 8 additions & 52 deletions tsdb/engine/tsm1/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2325,7 +2325,7 @@ func TestDefaultPlanner_PlanOptimize_Tombstones(t *testing.T) {

}

// Ensure that the planner will compact files in groups if no writes
// Ensure that the planner will compact all files if no writes
// have happened in some interval
func TestDefaultPlanner_Plan_FullOnCold(t *testing.T) {
data := []tsm1.FileStat{
Expand Down Expand Up @@ -2353,26 +2353,6 @@ func TestDefaultPlanner_Plan_FullOnCold(t *testing.T) {
Path: "06-01.tsm1",
Size: 2 * 1024 * 1024,
},
{
Path: "07-01.tsm1",
Size: 2 * 1024 * 1024,
},
{
Path: "08-01.tsm1",
Size: 2 * 1024 * 1024,
},
{
Path: "09-01.tsm1",
Size: 2 * 1024 * 1024,
},
{
Path: "10-01.tsm1",
Size: 2 * 1024 * 1024,
},
{
Path: "11-01.tsm1",
Size: 2 * 1024 * 1024,
},
}

cp := tsm1.NewDefaultPlanner(
Expand All @@ -2385,38 +2365,14 @@ func TestDefaultPlanner_Plan_FullOnCold(t *testing.T) {
)

tsm := cp.Plan(time.Now().Add(-time.Second))
if exp, got := 2, len(tsm); got != exp {
t.Fatalf("tsm groups length mismatch: got %v, exp %v", got, exp)
if exp, got := len(data), len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}

if exp, got := 8, len(tsm[0]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if got, exp := tsm[0][0], data[0].Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
} else if got, exp := tsm[0][1], data[1].Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
} else if got, exp := tsm[0][2], data[2].Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
} else if got, exp := tsm[0][3], data[3].Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
} else if got, exp := tsm[0][4], data[4].Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
} else if got, exp := tsm[0][5], data[5].Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
} else if got, exp := tsm[0][6], data[6].Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
} else if got, exp := tsm[0][7], data[7].Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
}

if exp, got := 3, len(tsm[1]); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
} else if got, exp := tsm[1][0], data[8].Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
} else if got, exp := tsm[1][1], data[9].Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
} else if got, exp := tsm[1][2], data[10].Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
for i, p := range data {
if got, exp := tsm[0][i], p.Path; got != exp {
t.Fatalf("tsm file mismatch: got %v, exp %v", got, exp)
}
}
}

Expand Down Expand Up @@ -2596,7 +2552,7 @@ func TestDefaultPlanner_Plan_TwoGenLevel3(t *testing.T) {
time.Hour)

tsm := cp.Plan(time.Now().Add(-24 * time.Hour))
if exp, got := 2, len(tsm); got != exp {
if exp, got := 1, len(tsm); got != exp {
t.Fatalf("tsm file length mismatch: got %v, exp %v", got, exp)
}
}
Expand Down
5 changes: 4 additions & 1 deletion tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1492,10 +1492,13 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
max = math.MaxInt64
}

overlapsTimeRangeMinMax := false
var overlapsTimeRangeMinMax bool
var overlapsTimeRangeMinMaxLock sync.Mutex
e.FileStore.Apply(func(r TSMFile) error {
if r.OverlapsTimeRange(min, max) {
overlapsTimeRangeMinMaxLock.Lock()
overlapsTimeRangeMinMax = true
overlapsTimeRangeMinMaxLock.Unlock()
}
return nil
})
Expand Down
7 changes: 5 additions & 2 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1373,11 +1373,14 @@ func (s *Store) WriteToShard(shardID uint64, points []models.Point) error {
s.mu.RUnlock()
return ErrShardNotFound
}

epoch := s.epochs[shardID]

s.mu.RUnlock()

// enter the epoch tracker
guards, gen := s.epochs[shardID].StartWrite()
defer s.epochs[shardID].EndWrite(gen)
guards, gen := epoch.StartWrite()
defer epoch.EndWrite(gen)

// wait for any guards before writing the points.
for _, guard := range guards {
Expand Down

0 comments on commit aa3dfc0

Please sign in to comment.