From bb313765e4524d9e8235073560eb56af802de5bf Mon Sep 17 00:00:00 2001 From: Jacob Marble Date: Fri, 25 May 2018 16:07:45 -0700 Subject: [PATCH] tsdb/tsm1: Clean up TSM filename format/parse --- tsdb/engine/tsm1/compact.go | 55 +++++++++++++++++++++----------- tsdb/engine/tsm1/compact_test.go | 32 +++++++++++-------- tsdb/engine/tsm1/engine.go | 35 +++++++++++--------- tsdb/engine/tsm1/file_store.go | 19 ++++++++--- 4 files changed, 90 insertions(+), 51 deletions(-) diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 47f13873004..dd125d9a321 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -92,8 +92,7 @@ type CompactionPlanner interface { // to minimize the number of TSM files on disk while rolling up a bounder number // of files. type DefaultPlanner struct { - FileStore fileStore - ParseFileName ParseFileNameFunc + FileStore fileStore // compactFullWriteColdDuration specifies the length of time after // which if no writes have been committed to the WAL, the engine will @@ -125,6 +124,7 @@ type fileStore interface { Stats() []FileStat LastModified() time.Time BlockCount(path string, idx int) int + ParseFileName(path string) (int, int, error) } func NewDefaultPlanner(fs fileStore, writeColdDuration time.Duration) *DefaultPlanner { @@ -139,8 +139,16 @@ func NewDefaultPlanner(fs fileStore, writeColdDuration time.Duration) *DefaultPl // 000001-01.tsm, 000001-02.tsm would be in the same generation // 000001 each with different sequence numbers. type tsmGeneration struct { - id int - files []FileStat + id int + files []FileStat + parseFileName ParseFileNameFunc +} + +func newTsmGeneration(id int, parseFileNameFunc ParseFileNameFunc) *tsmGeneration { + return &tsmGeneration{ + id: id, + parseFileName: parseFileNameFunc, + } } // size returns the total size of the files in the generation. @@ -158,7 +166,7 @@ func (t *tsmGeneration) level() int { // 1 file with a sequence num of 1. Level 2 is generated by compacting multiple // level 1 files. Level 3 is generate by compacting multiple level 2 files. Level // 4 is for anything else. - _, seq, _ := ParseFileName(t.files[0].Path) + _, seq, _ := t.parseFileName(t.files[0].Path) if seq < 4 { return seq } @@ -185,6 +193,10 @@ func (c *DefaultPlanner) SetFileStore(fs *FileStore) { c.FileStore = fs } +func (c *DefaultPlanner) ParseFileName(path string) (int, int, error) { + return c.FileStore.ParseFileName(path) +} + // FullyCompacted returns true if the shard is fully compacted. func (c *DefaultPlanner) FullyCompacted() bool { gens := c.findGenerations(false) @@ -603,7 +615,7 @@ func (c *DefaultPlanner) findGenerations(skipInUse bool) tsmGenerations { tsmStats := c.FileStore.Stats() generations := make(map[int]*tsmGeneration, len(tsmStats)) for _, f := range tsmStats { - gen, _, _ := ParseFileName(f.Path) + gen, _, _ := c.ParseFileName(f.Path) // Skip any files that are assigned to a current compaction plan if _, ok := c.filesInUse[f.Path]; skipInUse && ok { @@ -612,9 +624,7 @@ func (c *DefaultPlanner) findGenerations(skipInUse bool) tsmGenerations { group := generations[gen] if group == nil { - group = &tsmGeneration{ - id: gen, - } + group = newTsmGeneration(gen, c.ParseFileName) generations[gen] = group } group.files = append(group.files, f) @@ -682,8 +692,8 @@ type Compactor struct { // RateLimit is the limit for disk writes for all concurrent compactions. RateLimit limiter.Rate - // Function used for generating TSM filenames during compaction. - FormatFileName FormatFileNameFunc + formatFileName FormatFileNameFunc + parseFileName ParseFileNameFunc mu sync.RWMutex snapshotsEnabled bool @@ -705,10 +715,19 @@ type Compactor struct { // NewCompactor returns a new instance of Compactor. func NewCompactor() *Compactor { return &Compactor{ - FormatFileName: DefaultFormatFileName, + formatFileName: DefaultFormatFileName, + parseFileName: DefaultParseFileName, } } +func (c *Compactor) WithFormatFileNameFunc(formatFileNameFunc FormatFileNameFunc) { + c.formatFileName = formatFileNameFunc +} + +func (c *Compactor) WithParseFileNameFunc(parseFileNameFunc ParseFileNameFunc) { + c.parseFileName = parseFileNameFunc +} + // Open initializes the Compactor. func (c *Compactor) Open() { c.mu.Lock() @@ -786,7 +805,7 @@ func (c *Compactor) EnableCompactions() { } // WriteSnapshot writes a Cache snapshot to one or more new TSM files. -func (c *Compactor) WriteSnapshot(cache *Cache, formatFileName FormatFileNameFunc) ([]string, error) { +func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) { c.mu.RLock() enabled := c.snapshotsEnabled intC := c.snapshotsInterrupt @@ -825,7 +844,7 @@ func (c *Compactor) WriteSnapshot(cache *Cache, formatFileName FormatFileNameFun for i := 0; i < concurrency; i++ { go func(sp *Cache) { iter := NewCacheKeyIterator(sp, tsdb.DefaultMaxPointsPerBlock, intC) - files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, nil, iter, throttle, formatFileName) + files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, nil, iter, throttle) resC <- res{files: files, err: err} }(splits[i]) @@ -874,7 +893,7 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) { // number to ensure we write to the next unique location. var maxGeneration, maxSequence int for _, f := range tsmFiles { - gen, seq, err := ParseFileName(f) + gen, seq, err := c.parseFileName(f) if err != nil { return nil, err } @@ -918,7 +937,7 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) { return nil, err } - return c.writeNewFiles(maxGeneration, maxSequence, tsmFiles, tsm, true, c.FormatFileName) + return c.writeNewFiles(maxGeneration, maxSequence, tsmFiles, tsm, true) } // CompactFull writes multiple smaller TSM files into 1 or more larger files. @@ -999,7 +1018,7 @@ func (c *Compactor) removeTmpFiles(files []string) error { // writeNewFiles writes from the iterator into new TSM files, rotating // to a new file once it has reached the max TSM file size. -func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter KeyIterator, throttle bool, formatFileName FormatFileNameFunc) ([]string, error) { +func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter KeyIterator, throttle bool) ([]string, error) { // These are the new TSM files written var files []string @@ -1007,7 +1026,7 @@ func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter K sequence++ // New TSM files are written to a temp file and renamed when fully completed. - fileName := filepath.Join(c.Dir, c.FormatFileName(generation, sequence)+"."+TSMFileExtension+"."+TmpTSMFileExtension) + fileName := filepath.Join(c.Dir, c.formatFileName(generation, sequence)+"."+TSMFileExtension+"."+TmpTSMFileExtension) // Write as much as possible to this file err := c.write(fileName, iter, throttle) diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index 31567f5a37a..46483f94b19 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -38,7 +38,7 @@ func TestCompactor_Snapshot(t *testing.T) { compactor.Dir = dir compactor.FileStore = &fakeFileStore{} - files, err := compactor.WriteSnapshot(c, tsm1.DefaultFormatFileName) + files, err := compactor.WriteSnapshot(c) if err == nil { t.Fatalf("expected error writing snapshot: %v", err) } @@ -49,7 +49,7 @@ func TestCompactor_Snapshot(t *testing.T) { compactor.Open() - files, err = compactor.WriteSnapshot(c, tsm1.DefaultFormatFileName) + files, err = compactor.WriteSnapshot(c) if err != nil { t.Fatalf("unexpected error writing snapshot: %v", err) } @@ -142,13 +142,13 @@ func TestCompactor_CompactFull(t *testing.T) { t.Fatalf("files length mismatch: got %v, exp %v", got, exp) } - expGen, expSeq, err := tsm1.ParseFileName(f3) + expGen, expSeq, err := tsm1.DefaultParseFileName(f3) if err != nil { t.Fatalf("unexpected error parsing file name: %v", err) } expSeq = expSeq + 1 - gotGen, gotSeq, err := tsm1.ParseFileName(files[0]) + gotGen, gotSeq, err := tsm1.DefaultParseFileName(files[0]) if err != nil { t.Fatalf("unexpected error parsing file name: %v", err) } @@ -527,13 +527,13 @@ func TestCompactor_CompactFull_SkipFullBlocks(t *testing.T) { t.Fatalf("files length mismatch: got %v, exp %v", got, exp) } - expGen, expSeq, err := tsm1.ParseFileName(f3) + expGen, expSeq, err := tsm1.DefaultParseFileName(f3) if err != nil { t.Fatalf("unexpected error parsing file name: %v", err) } expSeq = expSeq + 1 - gotGen, gotSeq, err := tsm1.ParseFileName(files[0]) + gotGen, gotSeq, err := tsm1.DefaultParseFileName(files[0]) if err != nil { t.Fatalf("unexpected error parsing file name: %v", err) } @@ -629,13 +629,13 @@ func TestCompactor_CompactFull_TombstonedSkipBlock(t *testing.T) { t.Fatalf("files length mismatch: got %v, exp %v", got, exp) } - expGen, expSeq, err := tsm1.ParseFileName(f3) + expGen, expSeq, err := tsm1.DefaultParseFileName(f3) if err != nil { t.Fatalf("unexpected error parsing file name: %v", err) } expSeq = expSeq + 1 - gotGen, gotSeq, err := tsm1.ParseFileName(files[0]) + gotGen, gotSeq, err := tsm1.DefaultParseFileName(files[0]) if err != nil { t.Fatalf("unexpected error parsing file name: %v", err) } @@ -732,13 +732,13 @@ func TestCompactor_CompactFull_TombstonedPartialBlock(t *testing.T) { t.Fatalf("files length mismatch: got %v, exp %v", got, exp) } - expGen, expSeq, err := tsm1.ParseFileName(f3) + expGen, expSeq, err := tsm1.DefaultParseFileName(f3) if err != nil { t.Fatalf("unexpected error parsing file name: %v", err) } expSeq = expSeq + 1 - gotGen, gotSeq, err := tsm1.ParseFileName(files[0]) + gotGen, gotSeq, err := tsm1.DefaultParseFileName(files[0]) if err != nil { t.Fatalf("unexpected error parsing file name: %v", err) } @@ -840,13 +840,13 @@ func TestCompactor_CompactFull_TombstonedMultipleRanges(t *testing.T) { t.Fatalf("files length mismatch: got %v, exp %v", got, exp) } - expGen, expSeq, err := tsm1.ParseFileName(f3) + expGen, expSeq, err := tsm1.DefaultParseFileName(f3) if err != nil { t.Fatalf("unexpected error parsing file name: %v", err) } expSeq = expSeq + 1 - gotGen, gotSeq, err := tsm1.ParseFileName(files[0]) + gotGen, gotSeq, err := tsm1.DefaultParseFileName(files[0]) if err != nil { t.Fatalf("unexpected error parsing file name: %v", err) } @@ -956,13 +956,13 @@ func TestCompactor_CompactFull_MaxKeys(t *testing.T) { t.Fatalf("files length mismatch: got %v, exp %v", got, exp) } - expGen, expSeq, err := tsm1.ParseFileName(f2Name) + expGen, expSeq, err := tsm1.DefaultParseFileName(f2Name) if err != nil { t.Fatalf("unexpected error parsing file name: %v", err) } expSeq = expSeq + 1 - gotGen, gotSeq, err := tsm1.ParseFileName(files[0]) + gotGen, gotSeq, err := tsm1.DefaultParseFileName(files[0]) if err != nil { t.Fatalf("unexpected error parsing file name: %v", err) } @@ -2885,3 +2885,7 @@ func (w *fakeFileStore) Close() { } w.readers = nil } + +func (w *fakeFileStore) ParseFileName(path string) (int, int, error) { + return tsm1.DefaultParseFileName(path) +} diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index c5ee56282cb..30367db31a8 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -176,7 +176,7 @@ type Engine struct { WALEnabled bool // Invoked when creating a backup file "as new". - GenerateFormatFileNameFunc func() FormatFileNameFunc + formatFileName FormatFileNameFunc // Controls whether to enabled compactions when the engine is open enableCompactionsOnOpen bool @@ -242,11 +242,11 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts CacheFlushWriteColdDuration: time.Duration(opt.Config.CacheSnapshotWriteColdDuration), enableCompactionsOnOpen: true, WALEnabled: opt.WALEnabled, - GenerateFormatFileNameFunc: func() FormatFileNameFunc { return DefaultFormatFileName }, - stats: stats, - compactionLimiter: opt.CompactionLimiter, - scheduler: newScheduler(stats, opt.CompactionLimiter.Capacity()), - seriesIDSets: opt.SeriesIDSets, + formatFileName: DefaultFormatFileName, + stats: stats, + compactionLimiter: opt.CompactionLimiter, + scheduler: newScheduler(stats, opt.CompactionLimiter.Capacity()), + seriesIDSets: opt.SeriesIDSets, } // Feature flag to enable per-series type checking, by default this is off and @@ -265,6 +265,16 @@ func NewEngine(id uint64, idx tsdb.Index, path string, walPath string, sfile *ts return e } +func (e *Engine) WithFormatFileNameFunc(formatFileNameFunc FormatFileNameFunc) { + e.Compactor.WithFormatFileNameFunc(formatFileNameFunc) + e.formatFileName = formatFileNameFunc +} + +func (e *Engine) WithParseFileNameFunc(parseFileNameFunc ParseFileNameFunc) { + e.FileStore.WithParseFileNameFunc(parseFileNameFunc) + e.Compactor.WithParseFileNameFunc(parseFileNameFunc) +} + // Digest returns a reader for the shard's digest. func (e *Engine) Digest() (io.ReadCloser, int64, error) { digestPath := filepath.Join(e.path, "digest.tsd") @@ -1157,8 +1167,7 @@ func (e *Engine) readFileFromBackup(tr *tar.Reader, shardRelativePath string, as } if asNew { - formatFileName := e.GenerateFormatFileNameFunc() - filename = formatFileName(e.FileStore.NextGeneration(), 1) + "." + TSMFileExtension + filename = e.formatFileName(e.FileStore.NextGeneration(), 1) + "." + TSMFileExtension } tmp := fmt.Sprintf("%s.%s", filepath.Join(e.path, filename), TmpTSMFileExtension) @@ -1759,7 +1768,6 @@ func (e *Engine) WriteSnapshot() error { logEnd() }() - var formatFileName FormatFileNameFunc closedFiles, snapshot, err := func() (segments []string, snapshot *Cache, err error) { e.mu.Lock() defer e.mu.Unlock() @@ -1780,9 +1788,6 @@ func (e *Engine) WriteSnapshot() error { return } - // Determine filename generation function under lock. - formatFileName = e.GenerateFormatFileNameFunc() - return }() @@ -1804,7 +1809,7 @@ func (e *Engine) WriteSnapshot() error { zap.String("path", e.path), zap.Duration("duration", time.Since(dedup))) - return e.writeSnapshotAndCommit(log, closedFiles, snapshot, formatFileName) + return e.writeSnapshotAndCommit(log, closedFiles, snapshot) } // CreateSnapshot will create a temp directory that holds @@ -1826,7 +1831,7 @@ func (e *Engine) CreateSnapshot() (string, error) { } // writeSnapshotAndCommit will write the passed cache to a new TSM file and remove the closed WAL segments. -func (e *Engine) writeSnapshotAndCommit(log *zap.Logger, closedFiles []string, snapshot *Cache, formatFileName FormatFileNameFunc) (err error) { +func (e *Engine) writeSnapshotAndCommit(log *zap.Logger, closedFiles []string, snapshot *Cache) (err error) { defer func() { if err != nil { e.Cache.ClearSnapshot(false) @@ -1834,7 +1839,7 @@ func (e *Engine) writeSnapshotAndCommit(log *zap.Logger, closedFiles []string, s }() // write the new snapshot files - newFiles, err := e.Compactor.WriteSnapshot(snapshot, formatFileName) + newFiles, err := e.Compactor.WriteSnapshot(snapshot) if err != nil { log.Info("Error writing snapshot from compactor", zap.Error(err)) return err diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index a48a7bd0e12..f58d08c5c31 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -181,6 +181,8 @@ type FileStore struct { currentTempDirID int + parseFileName ParseFileNameFunc + obs tsdb.FileStoreObserver } @@ -222,7 +224,8 @@ func NewFileStore(dir string) *FileStore { files: map[string]TSMFile{}, logger: logger, }, - obs: noFileStoreObserver{}, + obs: noFileStoreObserver{}, + parseFileName: DefaultParseFileName, } fs.purger.fileStore = fs return fs @@ -233,6 +236,14 @@ func (f *FileStore) WithObserver(obs tsdb.FileStoreObserver) { f.obs = obs } +func (f *FileStore) WithParseFileNameFunc(parseFileNameFunc ParseFileNameFunc) { + f.parseFileName = parseFileNameFunc +} + +func (f *FileStore) ParseFileName(path string) (int, int, error) { + return f.parseFileName(path) +} + // enableTraceLogging must be called before the FileStore is opened. func (f *FileStore) enableTraceLogging(enabled bool) { f.traceLogging = enabled @@ -485,7 +496,7 @@ func (f *FileStore) Open() error { readerC := make(chan *res) for i, fn := range files { // Keep track of the latest ID - generation, _, err := ParseFileName(fn) + generation, _, err := f.parseFileName(fn) if err != nil { return err } @@ -1048,8 +1059,8 @@ func DefaultFormatFileName(generation, sequence int) string { // ParseFileNameFunc is executed when parsing a TSM filename into generation & sequence. type ParseFileNameFunc func(name string) (generation, sequence int, err error) -// ParseFileName is used to parse the filenames of TSM files. -var ParseFileName ParseFileNameFunc = func(name string) (int, int, error) { +// DefaultParseFileName is used to parse the filenames of TSM files. +func DefaultParseFileName(name string) (int, int, error) { base := filepath.Base(name) idx := strings.Index(base, ".") if idx == -1 {