Skip to content

Commit

Permalink
tsdb/tsm1: Clean up TSM filename format/parse
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob Marble committed May 29, 2018
1 parent 4e1228f commit bb31376
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 51 deletions.
55 changes: 37 additions & 18 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ type CompactionPlanner interface {
// to minimize the number of TSM files on disk while rolling up a bounder number
// of files.
type DefaultPlanner struct {
FileStore fileStore
ParseFileName ParseFileNameFunc
FileStore fileStore

// compactFullWriteColdDuration specifies the length of time after
// which if no writes have been committed to the WAL, the engine will
Expand Down Expand Up @@ -125,6 +124,7 @@ type fileStore interface {
Stats() []FileStat
LastModified() time.Time
BlockCount(path string, idx int) int
ParseFileName(path string) (int, int, error)
}

func NewDefaultPlanner(fs fileStore, writeColdDuration time.Duration) *DefaultPlanner {
Expand All @@ -139,8 +139,16 @@ func NewDefaultPlanner(fs fileStore, writeColdDuration time.Duration) *DefaultPl
// 000001-01.tsm, 000001-02.tsm would be in the same generation
// 000001 each with different sequence numbers.
type tsmGeneration struct {
id int
files []FileStat
id int
files []FileStat
parseFileName ParseFileNameFunc
}

func newTsmGeneration(id int, parseFileNameFunc ParseFileNameFunc) *tsmGeneration {
return &tsmGeneration{
id: id,
parseFileName: parseFileNameFunc,
}
}

// size returns the total size of the files in the generation.
Expand All @@ -158,7 +166,7 @@ func (t *tsmGeneration) level() int {
// 1 file with a sequence num of 1. Level 2 is generated by compacting multiple
// level 1 files. Level 3 is generate by compacting multiple level 2 files. Level
// 4 is for anything else.
_, seq, _ := ParseFileName(t.files[0].Path)
_, seq, _ := t.parseFileName(t.files[0].Path)
if seq < 4 {
return seq
}
Expand All @@ -185,6 +193,10 @@ func (c *DefaultPlanner) SetFileStore(fs *FileStore) {
c.FileStore = fs
}

func (c *DefaultPlanner) ParseFileName(path string) (int, int, error) {
return c.FileStore.ParseFileName(path)
}

// FullyCompacted returns true if the shard is fully compacted.
func (c *DefaultPlanner) FullyCompacted() bool {
gens := c.findGenerations(false)
Expand Down Expand Up @@ -603,7 +615,7 @@ func (c *DefaultPlanner) findGenerations(skipInUse bool) tsmGenerations {
tsmStats := c.FileStore.Stats()
generations := make(map[int]*tsmGeneration, len(tsmStats))
for _, f := range tsmStats {
gen, _, _ := ParseFileName(f.Path)
gen, _, _ := c.ParseFileName(f.Path)

// Skip any files that are assigned to a current compaction plan
if _, ok := c.filesInUse[f.Path]; skipInUse && ok {
Expand All @@ -612,9 +624,7 @@ func (c *DefaultPlanner) findGenerations(skipInUse bool) tsmGenerations {

group := generations[gen]
if group == nil {
group = &tsmGeneration{
id: gen,
}
group = newTsmGeneration(gen, c.ParseFileName)
generations[gen] = group
}
group.files = append(group.files, f)
Expand Down Expand Up @@ -682,8 +692,8 @@ type Compactor struct {
// RateLimit is the limit for disk writes for all concurrent compactions.
RateLimit limiter.Rate

// Function used for generating TSM filenames during compaction.
FormatFileName FormatFileNameFunc
formatFileName FormatFileNameFunc
parseFileName ParseFileNameFunc

mu sync.RWMutex
snapshotsEnabled bool
Expand All @@ -705,10 +715,19 @@ type Compactor struct {
// NewCompactor returns a new instance of Compactor.
func NewCompactor() *Compactor {
return &Compactor{
FormatFileName: DefaultFormatFileName,
formatFileName: DefaultFormatFileName,
parseFileName: DefaultParseFileName,
}
}

func (c *Compactor) WithFormatFileNameFunc(formatFileNameFunc FormatFileNameFunc) {
c.formatFileName = formatFileNameFunc
}

func (c *Compactor) WithParseFileNameFunc(parseFileNameFunc ParseFileNameFunc) {
c.parseFileName = parseFileNameFunc
}

// Open initializes the Compactor.
func (c *Compactor) Open() {
c.mu.Lock()
Expand Down Expand Up @@ -786,7 +805,7 @@ func (c *Compactor) EnableCompactions() {
}

// WriteSnapshot writes a Cache snapshot to one or more new TSM files.
func (c *Compactor) WriteSnapshot(cache *Cache, formatFileName FormatFileNameFunc) ([]string, error) {
func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
c.mu.RLock()
enabled := c.snapshotsEnabled
intC := c.snapshotsInterrupt
Expand Down Expand Up @@ -825,7 +844,7 @@ func (c *Compactor) WriteSnapshot(cache *Cache, formatFileName FormatFileNameFun
for i := 0; i < concurrency; i++ {
go func(sp *Cache) {
iter := NewCacheKeyIterator(sp, tsdb.DefaultMaxPointsPerBlock, intC)
files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, nil, iter, throttle, formatFileName)
files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, nil, iter, throttle)
resC <- res{files: files, err: err}

}(splits[i])
Expand Down Expand Up @@ -874,7 +893,7 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
// number to ensure we write to the next unique location.
var maxGeneration, maxSequence int
for _, f := range tsmFiles {
gen, seq, err := ParseFileName(f)
gen, seq, err := c.parseFileName(f)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -918,7 +937,7 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
return nil, err
}

return c.writeNewFiles(maxGeneration, maxSequence, tsmFiles, tsm, true, c.FormatFileName)
return c.writeNewFiles(maxGeneration, maxSequence, tsmFiles, tsm, true)
}

// CompactFull writes multiple smaller TSM files into 1 or more larger files.
Expand Down Expand Up @@ -999,15 +1018,15 @@ func (c *Compactor) removeTmpFiles(files []string) error {

// writeNewFiles writes from the iterator into new TSM files, rotating
// to a new file once it has reached the max TSM file size.
func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter KeyIterator, throttle bool, formatFileName FormatFileNameFunc) ([]string, error) {
func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter KeyIterator, throttle bool) ([]string, error) {
// These are the new TSM files written
var files []string

for {
sequence++

// New TSM files are written to a temp file and renamed when fully completed.
fileName := filepath.Join(c.Dir, c.FormatFileName(generation, sequence)+"."+TSMFileExtension+"."+TmpTSMFileExtension)
fileName := filepath.Join(c.Dir, c.formatFileName(generation, sequence)+"."+TSMFileExtension+"."+TmpTSMFileExtension)

// Write as much as possible to this file
err := c.write(fileName, iter, throttle)
Expand Down
32 changes: 18 additions & 14 deletions tsdb/engine/tsm1/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestCompactor_Snapshot(t *testing.T) {
compactor.Dir = dir
compactor.FileStore = &fakeFileStore{}

files, err := compactor.WriteSnapshot(c, tsm1.DefaultFormatFileName)
files, err := compactor.WriteSnapshot(c)
if err == nil {
t.Fatalf("expected error writing snapshot: %v", err)
}
Expand All @@ -49,7 +49,7 @@ func TestCompactor_Snapshot(t *testing.T) {

compactor.Open()

files, err = compactor.WriteSnapshot(c, tsm1.DefaultFormatFileName)
files, err = compactor.WriteSnapshot(c)
if err != nil {
t.Fatalf("unexpected error writing snapshot: %v", err)
}
Expand Down Expand Up @@ -142,13 +142,13 @@ func TestCompactor_CompactFull(t *testing.T) {
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
}

expGen, expSeq, err := tsm1.ParseFileName(f3)
expGen, expSeq, err := tsm1.DefaultParseFileName(f3)
if err != nil {
t.Fatalf("unexpected error parsing file name: %v", err)
}
expSeq = expSeq + 1

gotGen, gotSeq, err := tsm1.ParseFileName(files[0])
gotGen, gotSeq, err := tsm1.DefaultParseFileName(files[0])
if err != nil {
t.Fatalf("unexpected error parsing file name: %v", err)
}
Expand Down Expand Up @@ -527,13 +527,13 @@ func TestCompactor_CompactFull_SkipFullBlocks(t *testing.T) {
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
}

expGen, expSeq, err := tsm1.ParseFileName(f3)
expGen, expSeq, err := tsm1.DefaultParseFileName(f3)
if err != nil {
t.Fatalf("unexpected error parsing file name: %v", err)
}
expSeq = expSeq + 1

gotGen, gotSeq, err := tsm1.ParseFileName(files[0])
gotGen, gotSeq, err := tsm1.DefaultParseFileName(files[0])
if err != nil {
t.Fatalf("unexpected error parsing file name: %v", err)
}
Expand Down Expand Up @@ -629,13 +629,13 @@ func TestCompactor_CompactFull_TombstonedSkipBlock(t *testing.T) {
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
}

expGen, expSeq, err := tsm1.ParseFileName(f3)
expGen, expSeq, err := tsm1.DefaultParseFileName(f3)
if err != nil {
t.Fatalf("unexpected error parsing file name: %v", err)
}
expSeq = expSeq + 1

gotGen, gotSeq, err := tsm1.ParseFileName(files[0])
gotGen, gotSeq, err := tsm1.DefaultParseFileName(files[0])
if err != nil {
t.Fatalf("unexpected error parsing file name: %v", err)
}
Expand Down Expand Up @@ -732,13 +732,13 @@ func TestCompactor_CompactFull_TombstonedPartialBlock(t *testing.T) {
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
}

expGen, expSeq, err := tsm1.ParseFileName(f3)
expGen, expSeq, err := tsm1.DefaultParseFileName(f3)
if err != nil {
t.Fatalf("unexpected error parsing file name: %v", err)
}
expSeq = expSeq + 1

gotGen, gotSeq, err := tsm1.ParseFileName(files[0])
gotGen, gotSeq, err := tsm1.DefaultParseFileName(files[0])
if err != nil {
t.Fatalf("unexpected error parsing file name: %v", err)
}
Expand Down Expand Up @@ -840,13 +840,13 @@ func TestCompactor_CompactFull_TombstonedMultipleRanges(t *testing.T) {
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
}

expGen, expSeq, err := tsm1.ParseFileName(f3)
expGen, expSeq, err := tsm1.DefaultParseFileName(f3)
if err != nil {
t.Fatalf("unexpected error parsing file name: %v", err)
}
expSeq = expSeq + 1

gotGen, gotSeq, err := tsm1.ParseFileName(files[0])
gotGen, gotSeq, err := tsm1.DefaultParseFileName(files[0])
if err != nil {
t.Fatalf("unexpected error parsing file name: %v", err)
}
Expand Down Expand Up @@ -956,13 +956,13 @@ func TestCompactor_CompactFull_MaxKeys(t *testing.T) {
t.Fatalf("files length mismatch: got %v, exp %v", got, exp)
}

expGen, expSeq, err := tsm1.ParseFileName(f2Name)
expGen, expSeq, err := tsm1.DefaultParseFileName(f2Name)
if err != nil {
t.Fatalf("unexpected error parsing file name: %v", err)
}
expSeq = expSeq + 1

gotGen, gotSeq, err := tsm1.ParseFileName(files[0])
gotGen, gotSeq, err := tsm1.DefaultParseFileName(files[0])
if err != nil {
t.Fatalf("unexpected error parsing file name: %v", err)
}
Expand Down Expand Up @@ -2885,3 +2885,7 @@ func (w *fakeFileStore) Close() {
}
w.readers = nil
}

func (w *fakeFileStore) ParseFileName(path string) (int, int, error) {
return tsm1.DefaultParseFileName(path)
}
35 changes: 20 additions & 15 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ type Engine struct {
WALEnabled bool

// Invoked when creating a backup file "as new".
GenerateFormatFileNameFunc func() FormatFileNameFunc
formatFileName FormatFileNameFunc

// Controls whether to enabled compactions when the engine is open
enableCompactionsOnOpen bool
Expand Down Expand Up @@ -242,11 +242,11 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts
CacheFlushWriteColdDuration: time.Duration(opt.Config.CacheSnapshotWriteColdDuration),
enableCompactionsOnOpen: true,
WALEnabled: opt.WALEnabled,
GenerateFormatFileNameFunc: func() FormatFileNameFunc { return DefaultFormatFileName },
stats: stats,
compactionLimiter: opt.CompactionLimiter,
scheduler: newScheduler(stats, opt.CompactionLimiter.Capacity()),
seriesIDSets: opt.SeriesIDSets,
formatFileName: DefaultFormatFileName,
stats: stats,
compactionLimiter: opt.CompactionLimiter,
scheduler: newScheduler(stats, opt.CompactionLimiter.Capacity()),
seriesIDSets: opt.SeriesIDSets,
}

// Feature flag to enable per-series type checking, by default this is off and
Expand All @@ -265,6 +265,16 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts
return e
}

func (e *Engine) WithFormatFileNameFunc(formatFileNameFunc FormatFileNameFunc) {
e.Compactor.WithFormatFileNameFunc(formatFileNameFunc)
e.formatFileName = formatFileNameFunc
}

func (e *Engine) WithParseFileNameFunc(parseFileNameFunc ParseFileNameFunc) {
e.FileStore.WithParseFileNameFunc(parseFileNameFunc)
e.Compactor.WithParseFileNameFunc(parseFileNameFunc)
}

// Digest returns a reader for the shard's digest.
func (e *Engine) Digest() (io.ReadCloser, int64, error) {
digestPath := filepath.Join(e.path, "digest.tsd")
Expand Down Expand Up @@ -1157,8 +1167,7 @@ func (e *Engine) readFileFromBackup(tr *tar.Reader, shardRelativePath string, as
}

if asNew {
formatFileName := e.GenerateFormatFileNameFunc()
filename = formatFileName(e.FileStore.NextGeneration(), 1) + "." + TSMFileExtension
filename = e.formatFileName(e.FileStore.NextGeneration(), 1) + "." + TSMFileExtension
}

tmp := fmt.Sprintf("%s.%s", filepath.Join(e.path, filename), TmpTSMFileExtension)
Expand Down Expand Up @@ -1759,7 +1768,6 @@ func (e *Engine) WriteSnapshot() error {
logEnd()
}()

var formatFileName FormatFileNameFunc
closedFiles, snapshot, err := func() (segments []string, snapshot *Cache, err error) {
e.mu.Lock()
defer e.mu.Unlock()
Expand All @@ -1780,9 +1788,6 @@ func (e *Engine) WriteSnapshot() error {
return
}

// Determine filename generation function under lock.
formatFileName = e.GenerateFormatFileNameFunc()

return
}()

Expand All @@ -1804,7 +1809,7 @@ func (e *Engine) WriteSnapshot() error {
zap.String("path", e.path),
zap.Duration("duration", time.Since(dedup)))

return e.writeSnapshotAndCommit(log, closedFiles, snapshot, formatFileName)
return e.writeSnapshotAndCommit(log, closedFiles, snapshot)
}

// CreateSnapshot will create a temp directory that holds
Expand All @@ -1826,15 +1831,15 @@ func (e *Engine) CreateSnapshot() (string, error) {
}

// writeSnapshotAndCommit will write the passed cache to a new TSM file and remove the closed WAL segments.
func (e *Engine) writeSnapshotAndCommit(log *zap.Logger, closedFiles []string, snapshot *Cache, formatFileName FormatFileNameFunc) (err error) {
func (e *Engine) writeSnapshotAndCommit(log *zap.Logger, closedFiles []string, snapshot *Cache) (err error) {
defer func() {
if err != nil {
e.Cache.ClearSnapshot(false)
}
}()

// write the new snapshot files
newFiles, err := e.Compactor.WriteSnapshot(snapshot, formatFileName)
newFiles, err := e.Compactor.WriteSnapshot(snapshot)
if err != nil {
log.Info("Error writing snapshot from compactor", zap.Error(err))
return err
Expand Down
Loading

0 comments on commit bb31376

Please sign in to comment.