diff --git a/block.go b/block.go index 232f96e3..06305019 100644 --- a/block.go +++ b/block.go @@ -15,6 +15,7 @@ package tsdb import ( + "encoding/binary" "encoding/json" "io/ioutil" "os" @@ -252,6 +253,10 @@ type Block struct { dir string meta BlockMeta + // Symbol Table Size in bytes. + // We maintain this variable to avoid recalculation everytime. + symbolTableSize uint64 + chunkr ChunkReader indexr IndexReader tombstones TombstoneReader @@ -279,12 +284,23 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) { return nil, err } + // Calculating symbol table size. + tmp := make([]byte, 8) + symTblSize := uint64(0) + for _, v := range ir.SymbolTable() { + // Size of varint length of the symbol. + symTblSize += uint64(binary.PutUvarint(tmp, uint64(len(v)))) + // Size of the symbol. + symTblSize += uint64(len(v)) + } + pb := &Block{ - dir: dir, - meta: *meta, - chunkr: cr, - indexr: ir, - tombstones: tr, + dir: dir, + meta: *meta, + chunkr: cr, + indexr: ir, + tombstones: tr, + symbolTableSize: symTblSize, } return pb, nil } @@ -354,6 +370,11 @@ func (pb *Block) Tombstones() (TombstoneReader, error) { return blockTombstoneReader{TombstoneReader: pb.tombstones, b: pb}, nil } +// GetSymbolTableSize returns the Symbol Table Size in the index of this block. +func (pb *Block) GetSymbolTableSize() uint64 { + return pb.symbolTableSize +} + func (pb *Block) setCompactionFailed() error { pb.meta.Compaction.Failed = true return writeMetaFile(pb.dir, &pb.meta) @@ -487,10 +508,13 @@ Outer: func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, error) { numStones := 0 - pb.tombstones.Iter(func(id uint64, ivs Intervals) error { + if err := pb.tombstones.Iter(func(id uint64, ivs Intervals) error { numStones += len(ivs) return nil - }) + }); err != nil { + // This should never happen, as the iteration function only returns nil. + panic(err) + } if numStones == 0 { return nil, nil } diff --git a/checkpoint.go b/checkpoint.go index d988d356..f45f3791 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -109,6 +109,10 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo stats := &CheckpointStats{} var sr io.Reader + // We close everything explicitly because Windows needs files to be + // closed before being deleted. But we also have defer so that we close + // files if there is an error somewhere. + var closers []io.Closer { lastFn, k, err := LastCheckpoint(w.Dir()) if err != nil && err != ErrNotFound { @@ -126,6 +130,7 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo return nil, errors.Wrap(err, "open last checkpoint") } defer last.Close() + closers = append(closers, last) sr = last } @@ -134,6 +139,7 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo return nil, errors.Wrap(err, "create segment reader") } defer segsr.Close() + closers = append(closers, segsr) if sr != nil { sr = io.MultiReader(sr, segsr) @@ -263,6 +269,9 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo if err := fileutil.Replace(cpdirtmp, cpdir); err != nil { return nil, errors.Wrap(err, "rename checkpoint directory") } + if err := closeAll(closers...); err != nil { + return stats, errors.Wrap(err, "close opened files") + } if err := w.Truncate(n + 1); err != nil { // If truncating fails, we'll just try again at the next checkpoint. // Leftover segments will just be ignored in the future if there's a checkpoint diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 0a033e5a..cd24ad1f 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -145,7 +145,9 @@ func (b *writeBenchmark) run() { if err := b.storage.Close(); err != nil { exitWithError(err) } - b.stopProfiling() + if err := b.stopProfiling(); err != nil { + exitWithError(err) + } }) } @@ -248,7 +250,9 @@ func (b *writeBenchmark) startProfiling() { if err != nil { exitWithError(fmt.Errorf("bench: could not create cpu profile: %v", err)) } - pprof.StartCPUProfile(b.cpuprof) + if err := pprof.StartCPUProfile(b.cpuprof); err != nil { + exitWithError(fmt.Errorf("bench: could not start CPU profile: %v", err)) + } // Start memory profiling. b.memprof, err = os.Create(filepath.Join(b.outPath, "mem.prof")) @@ -271,29 +275,36 @@ func (b *writeBenchmark) startProfiling() { runtime.SetMutexProfileFraction(20) } -func (b *writeBenchmark) stopProfiling() { +func (b *writeBenchmark) stopProfiling() error { if b.cpuprof != nil { pprof.StopCPUProfile() b.cpuprof.Close() b.cpuprof = nil } if b.memprof != nil { - pprof.Lookup("heap").WriteTo(b.memprof, 0) + if err := pprof.Lookup("heap").WriteTo(b.memprof, 0); err != nil { + return fmt.Errorf("error writing mem profile: %v", err) + } b.memprof.Close() b.memprof = nil } if b.blockprof != nil { - pprof.Lookup("block").WriteTo(b.blockprof, 0) + if err := pprof.Lookup("block").WriteTo(b.blockprof, 0); err != nil { + return fmt.Errorf("error writing block profile: %v", err) + } b.blockprof.Close() b.blockprof = nil runtime.SetBlockProfileRate(0) } if b.mtxprof != nil { - pprof.Lookup("mutex").WriteTo(b.mtxprof, 0) + if err := pprof.Lookup("mutex").WriteTo(b.mtxprof, 0); err != nil { + return fmt.Errorf("error writing mutex profile: %v", err) + } b.mtxprof.Close() b.mtxprof = nil runtime.SetMutexProfileFraction(0) } + return nil } func measureTime(stage string, f func()) time.Duration { diff --git a/compact.go b/compact.go index d4a2dd30..6dc55295 100644 --- a/compact.go +++ b/compact.go @@ -99,7 +99,7 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { Buckets: prometheus.ExponentialBuckets(1, 2, 10), }) m.chunkSize = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "prometheus_tsdb_compaction_chunk_size", + Name: "prometheus_tsdb_compaction_chunk_size_bytes", Help: "Final size of chunks on their first compaction", Buckets: prometheus.ExponentialBuckets(32, 1.5, 12), }) @@ -109,7 +109,7 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { Buckets: prometheus.ExponentialBuckets(4, 1.5, 12), }) m.chunkRange = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "prometheus_tsdb_compaction_chunk_range", + Name: "prometheus_tsdb_compaction_chunk_range_seconds", Help: "Final time range of chunks on their first compaction", Buckets: prometheus.ExponentialBuckets(100, 4, 10), }) @@ -469,6 +469,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe if err != nil { return errors.Wrap(err, "open chunk writer") } + defer chunkw.Close() // Record written chunk sizes on level 1 compactions. if meta.Compaction.Level == 1 { chunkw = &instrumentedChunkWriter{ @@ -483,11 +484,15 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe if err != nil { return errors.Wrap(err, "open index writer") } + defer indexw.Close() if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil { return errors.Wrap(err, "write compaction") } - + // We are explicitly closing them here to check for error even + // though these are covered under defer. This is because in Windows, + // you cannot delete these unless they are closed and the defer is to + // make sure they are closed if the function exits due to an error above. if err = chunkw.Close(); err != nil { return errors.Wrap(err, "close chunk writer") } @@ -651,7 +656,9 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } for _, chk := range chks { - c.chunkPool.Put(chk.Chunk) + if err := c.chunkPool.Put(chk.Chunk); err != nil { + return errors.Wrap(err, "put chunk") + } } for _, l := range lset { diff --git a/db.go b/db.go index 66407a2b..b8bf3531 100644 --- a/db.go +++ b/db.go @@ -119,11 +119,13 @@ type DB struct { type dbMetrics struct { loadedBlocks prometheus.GaugeFunc + symbolTableSize prometheus.GaugeFunc reloads prometheus.Counter reloadsFailed prometheus.Counter compactionsTriggered prometheus.Counter cutoffs prometheus.Counter cutoffsFailed prometheus.Counter + startTime prometheus.GaugeFunc tombCleanTimer prometheus.Histogram } @@ -138,6 +140,19 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { defer db.mtx.RUnlock() return float64(len(db.blocks)) }) + m.symbolTableSize = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_symbol_table_size_bytes", + Help: "Size of symbol table on disk (in bytes)", + }, func() float64 { + db.mtx.RLock() + blocks := db.blocks[:] + db.mtx.RUnlock() + symTblSize := uint64(0) + for _, b := range blocks { + symTblSize += b.GetSymbolTableSize() + } + return float64(symTblSize) + }) m.reloads = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_reloads_total", Help: "Number of times the database reloaded block data from disk.", @@ -158,6 +173,17 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { Name: "prometheus_tsdb_retention_cutoffs_failures_total", Help: "Number of times the database failed to cut off block data from disk.", }) + m.startTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_lowest_timestamp", + Help: "Lowest timestamp value stored in the database.", + }, func() float64 { + db.mtx.RLock() + defer db.mtx.RUnlock() + if len(db.blocks) == 0 { + return float64(db.head.minTime) + } + return float64(db.blocks[0].meta.MinTime) + }) m.tombCleanTimer = prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "prometheus_tsdb_tombstone_cleanup_seconds", Help: "The time taken to recompact blocks to remove tombstones.", @@ -166,11 +192,13 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { if r != nil { r.MustRegister( m.loadedBlocks, + m.symbolTableSize, m.reloads, m.reloadsFailed, m.cutoffs, m.cutoffsFailed, m.compactionsTriggered, + m.startTime, m.tombCleanTimer, ) } @@ -192,7 +220,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db if err := repairBadIndexVersion(l, dir); err != nil { return nil, err } - // Migrate old WAL. + // Migrate old WAL if one exists. if err := MigrateWAL(l, filepath.Join(dir, "wal")); err != nil { return nil, errors.Wrap(err, "migrate WAL") } @@ -272,7 +300,7 @@ func (db *DB) run() { case <-db.compactc: db.metrics.compactionsTriggered.Inc() - _, err := db.compact() + err := db.compact() if err != nil { level.Error(db.logger).Log("msg", "compaction failed", "err", err) backoff = exponential(backoff, 1*time.Second, 1*time.Minute) @@ -338,12 +366,12 @@ func (a dbAppender) Commit() error { // this is sufficient to reliably delete old data. // Old blocks are only deleted on reload based on the new block's parent information. // See DB.reload documentation for further information. -func (db *DB) compact() (changes bool, err error) { +func (db *DB) compact() (err error) { db.cmtx.Lock() defer db.cmtx.Unlock() if !db.compactionsEnabled { - return false, nil + return nil } // Check whether we have pending head blocks that are ready to be persisted. @@ -351,7 +379,7 @@ func (db *DB) compact() (changes bool, err error) { for { select { case <-db.stopc: - return changes, nil + return nil default: } // The head has a compactable range if 1.5 level 0 ranges are between the oldest @@ -374,14 +402,13 @@ func (db *DB) compact() (changes bool, err error) { maxt: maxt - 1, } if _, err = db.compactor.Write(db.dir, head, mint, maxt, nil); err != nil { - return changes, errors.Wrap(err, "persist head block") + return errors.Wrap(err, "persist head block") } - changes = true runtime.GC() if err := db.reload(); err != nil { - return changes, errors.Wrap(err, "reload blocks") + return errors.Wrap(err, "reload blocks") } db.mtx.RLock() l := len(db.blocks) @@ -408,7 +435,7 @@ func (db *DB) compact() (changes bool, err error) { for { plan, err := db.compactor.Plan(db.dir) if err != nil { - return changes, errors.Wrap(err, "plan compaction") + return errors.Wrap(err, "plan compaction") } if len(plan) == 0 { break @@ -416,23 +443,22 @@ func (db *DB) compact() (changes bool, err error) { select { case <-db.stopc: - return changes, nil + return nil default: } if _, err := db.compactor.Compact(db.dir, plan...); err != nil { - return changes, errors.Wrapf(err, "compact %s", plan) + return errors.Wrapf(err, "compact %s", plan) } - changes = true runtime.GC() if err := db.reload(); err != nil { - return changes, errors.Wrap(err, "reload blocks") + return errors.Wrap(err, "reload blocks") } runtime.GC() } - return changes, nil + return nil } func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { diff --git a/db_test.go b/db_test.go index 83e30dcf..85a58431 100644 --- a/db_test.go +++ b/db_test.go @@ -110,7 +110,7 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { testutil.Ok(t, err) seriesSet := query(t, querier, labels.NewEqualMatcher("foo", "bar")) - testutil.Equals(t, seriesSet, map[string][]sample{}) + testutil.Equals(t, map[string][]sample{}, seriesSet) testutil.Ok(t, querier.Close()) err = app.Commit() @@ -122,7 +122,7 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { seriesSet = query(t, querier, labels.NewEqualMatcher("foo", "bar")) - testutil.Equals(t, seriesSet, map[string][]sample{`{foo="bar"}`: {{t: 0, v: 0}}}) + testutil.Equals(t, map[string][]sample{`{foo="bar"}`: {{t: 0, v: 0}}}, seriesSet) } func TestDataNotAvailableAfterRollback(t *testing.T) { @@ -143,7 +143,7 @@ func TestDataNotAvailableAfterRollback(t *testing.T) { seriesSet := query(t, querier, labels.NewEqualMatcher("foo", "bar")) - testutil.Equals(t, seriesSet, map[string][]sample{}) + testutil.Equals(t, map[string][]sample{}, seriesSet) } func TestDBAppenderAddRef(t *testing.T) { @@ -179,7 +179,7 @@ func TestDBAppenderAddRef(t *testing.T) { testutil.Ok(t, err) err = app2.AddFast(9999999, 1, 1) - testutil.Equals(t, errors.Cause(err), ErrNotFound) + testutil.Equals(t, ErrNotFound, errors.Cause(err)) testutil.Ok(t, app2.Commit()) @@ -412,7 +412,7 @@ func TestDB_Snapshot(t *testing.T) { testutil.Ok(t, series.Err()) } testutil.Ok(t, seriesSet.Err()) - testutil.Equals(t, sum, 1000.0) + testutil.Equals(t, 1000.0, sum) } func TestDB_SnapshotWithDelete(t *testing.T) { @@ -689,7 +689,7 @@ func TestWALFlushedOnDBClose(t *testing.T) { values, err := q.LabelValues("labelname") testutil.Ok(t, err) - testutil.Equals(t, values, []string{"labelvalue"}) + testutil.Equals(t, []string{"labelvalue"}, values) } func TestTombstoneClean(t *testing.T) { @@ -859,6 +859,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6 } block := createEmptyBlock(c.t, filepath.Join(dest, meta.ULID.String()), meta) + testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere. c.blocks = append(c.blocks, block) // Now check that all expected blocks are actually persisted on disk. @@ -1130,7 +1131,7 @@ func TestChunkAtBlockBoundary(t *testing.T) { err := app.Commit() testutil.Ok(t, err) - _, err = db.compact() + err = db.compact() testutil.Ok(t, err) for _, block := range db.blocks { @@ -1182,7 +1183,7 @@ func TestQuerierWithBoundaryChunks(t *testing.T) { err := app.Commit() testutil.Ok(t, err) - _, err = db.compact() + err = db.compact() testutil.Ok(t, err) testutil.Assert(t, len(db.blocks) >= 3, "invalid test, less than three blocks in DB") diff --git a/fileutil/fileutil.go b/fileutil/fileutil.go index 2158bfd2..1154e730 100644 --- a/fileutil/fileutil.go +++ b/fileutil/fileutil.go @@ -48,7 +48,7 @@ func Rename(from, to string) error { // It is not atomic. func Replace(from, to string) error { if err := os.RemoveAll(to); err != nil { - return nil + return err } if err := os.Rename(from, to); err != nil { return err diff --git a/head.go b/head.go index 4f0c2c95..bc8cdfbe 100644 --- a/head.go +++ b/head.go @@ -82,8 +82,8 @@ type headMetrics struct { seriesRemoved prometheus.Counter seriesNotFound prometheus.Counter chunks prometheus.Gauge - chunksCreated prometheus.Gauge - chunksRemoved prometheus.Gauge + chunksCreated prometheus.Counter + chunksRemoved prometheus.Counter gcDuration prometheus.Summary minTime prometheus.GaugeFunc maxTime prometheus.GaugeFunc @@ -102,27 +102,27 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { Name: "prometheus_tsdb_head_series", Help: "Total number of series in the head block.", }) - m.seriesCreated = prometheus.NewGauge(prometheus.GaugeOpts{ + m.seriesCreated = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_head_series_created_total", Help: "Total number of series created in the head", }) - m.seriesRemoved = prometheus.NewGauge(prometheus.GaugeOpts{ + m.seriesRemoved = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_head_series_removed_total", Help: "Total number of series removed in the head", }) m.seriesNotFound = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_head_series_not_found", + Name: "prometheus_tsdb_head_series_not_found_total", Help: "Total number of requests for series that were not found.", }) m.chunks = prometheus.NewGauge(prometheus.GaugeOpts{ Name: "prometheus_tsdb_head_chunks", Help: "Total number of chunks in the head block.", }) - m.chunksCreated = prometheus.NewGauge(prometheus.GaugeOpts{ + m.chunksCreated = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_head_chunks_created_total", Help: "Total number of chunks created in the head", }) - m.chunksRemoved = prometheus.NewGauge(prometheus.GaugeOpts{ + m.chunksRemoved = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_head_chunks_removed_total", Help: "Total number of chunks removed in the head", }) @@ -620,21 +620,22 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro } func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { - s := a.head.series.getByID(ref) + if t < a.minValidTime { + return ErrOutOfBounds + } + s := a.head.series.getByID(ref) if s == nil { return errors.Wrap(ErrNotFound, "unknown series") } s.Lock() - err := s.appendable(t, v) - s.Unlock() - - if err != nil { + if err := s.appendable(t, v); err != nil { + s.Unlock() return err } - if t < a.minValidTime { - return ErrOutOfBounds - } + s.pendingCommit = true + s.Unlock() + if t < a.mint { a.mint = t } @@ -694,6 +695,7 @@ func (a *headAppender) Commit() error { for _, s := range a.samples { s.series.Lock() ok, chunkCreated := s.series.append(s.T, s.V) + s.series.pendingCommit = false s.series.Unlock() if !ok { @@ -713,6 +715,11 @@ func (a *headAppender) Commit() error { func (a *headAppender) Rollback() error { a.head.metrics.activeAppenders.Dec() + for _, s := range a.samples { + s.series.Lock() + s.series.pendingCommit = false + s.series.Unlock() + } a.head.putAppendBuffer(a.samples) // Series are created in the head memory regardless of rollback. Thus we have @@ -786,7 +793,7 @@ func (h *Head) gc() { symbols := make(map[string]struct{}) values := make(map[string]stringset, len(h.values)) - h.postings.Iter(func(t labels.Label, _ index.Postings) error { + if err := h.postings.Iter(func(t labels.Label, _ index.Postings) error { symbols[t.Name] = struct{}{} symbols[t.Value] = struct{}{} @@ -797,7 +804,10 @@ func (h *Head) gc() { } ss.set(t.Value) return nil - }) + }); err != nil { + // This should never happen, as the iteration function only returns nil. + panic(err) + } h.symMtx.Lock() @@ -1165,7 +1175,7 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) { series.Lock() rmChunks += series.truncateChunksBefore(mint) - if len(series.chunks) > 0 { + if len(series.chunks) > 0 || series.pendingCommit { series.Unlock() continue } @@ -1256,9 +1266,10 @@ type memSeries struct { chunkRange int64 firstChunkID int - nextAt int64 // timestamp at which to cut the next chunk. - lastValue float64 - sampleBuf [4]sample + nextAt int64 // Timestamp at which to cut the next chunk. + lastValue float64 + sampleBuf [4]sample + pendingCommit bool // Whether there are samples waiting to be committed to this series. app chunkenc.Appender // Current appender for the chunk. } diff --git a/head_test.go b/head_test.go index b06a66c2..76a8b72e 100644 --- a/head_test.go +++ b/head_test.go @@ -17,6 +17,7 @@ import ( "io/ioutil" "math/rand" "os" + "sort" "testing" "github.com/prometheus/tsdb/chunkenc" @@ -28,13 +29,11 @@ import ( ) func BenchmarkCreateSeries(b *testing.B) { - lbls, err := labels.ReadLabels("testdata/all.series", b.N) + lbls, err := labels.ReadLabels("testdata/20kseries.json", b.N) testutil.Ok(b, err) h, err := NewHead(nil, nil, nil, 10000) - if err != nil { - testutil.Ok(b, err) - } + testutil.Ok(b, err) defer h.Close() b.ReportAllocs() @@ -386,231 +385,203 @@ Outer: } } -// func TestDeleteUntilCurMax(t *testing.T) { -// numSamples := int64(10) - -// dir, _ := ioutil.TempDir("", "test") -// defer os.RemoveAll(dir) - -// hb := createTestHead(t, dir, 0, 2*numSamples) -// app := hb.Appender() - -// smpls := make([]float64, numSamples) -// for i := int64(0); i < numSamples; i++ { -// smpls[i] = rand.Float64() -// app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) -// } - -// testutil.Ok(t, app.Commit()) -// testutil.Ok(t, hb.Delete(0, 10000, labels.NewEqualMatcher("a", "b"))) -// app = hb.Appender() -// _, err := app.Add(labels.Labels{{"a", "b"}}, 11, 1) -// testutil.Ok(t, err) -// testutil.Ok(t, app.Commit()) - -// q := hb.Querier(0, 100000) -// res := q.Select(labels.NewEqualMatcher("a", "b")) - -// require.True(t, res.Next()) -// exps := res.At() -// it := exps.Iterator() -// ressmpls, err := expandSeriesIterator(it) -// testutil.Ok(t, err) -// testutil.Equals(t, []sample{{11, 1}}, ressmpls) -// } - -// func TestDelete_e2e(t *testing.T) { -// numDatapoints := 1000 -// numRanges := 1000 -// timeInterval := int64(2) -// maxTime := int64(2 * 1000) -// minTime := int64(200) -// // Create 8 series with 1000 data-points of different ranges, delete and run queries. -// lbls := [][]labels.Label{ -// { -// {"a", "b"}, -// {"instance", "localhost:9090"}, -// {"job", "prometheus"}, -// }, -// { -// {"a", "b"}, -// {"instance", "127.0.0.1:9090"}, -// {"job", "prometheus"}, -// }, -// { -// {"a", "b"}, -// {"instance", "127.0.0.1:9090"}, -// {"job", "prom-k8s"}, -// }, -// { -// {"a", "b"}, -// {"instance", "localhost:9090"}, -// {"job", "prom-k8s"}, -// }, -// { -// {"a", "c"}, -// {"instance", "localhost:9090"}, -// {"job", "prometheus"}, -// }, -// { -// {"a", "c"}, -// {"instance", "127.0.0.1:9090"}, -// {"job", "prometheus"}, -// }, -// { -// {"a", "c"}, -// {"instance", "127.0.0.1:9090"}, -// {"job", "prom-k8s"}, -// }, -// { -// {"a", "c"}, -// {"instance", "localhost:9090"}, -// {"job", "prom-k8s"}, -// }, -// } - -// seriesMap := map[string][]sample{} -// for _, l := range lbls { -// seriesMap[labels.New(l...).String()] = []sample{} -// } - -// dir, _ := ioutil.TempDir("", "test") -// defer os.RemoveAll(dir) - -// hb := createTestHead(t, dir, minTime, maxTime) -// app := hb.Appender() - -// for _, l := range lbls { -// ls := labels.New(l...) -// series := []sample{} - -// ts := rand.Int63n(300) -// for i := 0; i < numDatapoints; i++ { -// v := rand.Float64() -// if ts >= minTime && ts <= maxTime { -// series = append(series, sample{ts, v}) -// } - -// _, err := app.Add(ls, ts, v) -// if ts >= minTime && ts <= maxTime { -// testutil.Ok(t, err) -// } else { -// testutil.EqualsError(t, err, ErrOutOfBounds.Error()) -// } - -// ts += rand.Int63n(timeInterval) + 1 -// } - -// seriesMap[labels.New(l...).String()] = series -// } - -// testutil.Ok(t, app.Commit()) - -// // Delete a time-range from each-selector. -// dels := []struct { -// ms []labels.Matcher -// drange Intervals -// }{ -// { -// ms: []labels.Matcher{labels.NewEqualMatcher("a", "b")}, -// drange: Intervals{{300, 500}, {600, 670}}, -// }, -// { -// ms: []labels.Matcher{ -// labels.NewEqualMatcher("a", "b"), -// labels.NewEqualMatcher("job", "prom-k8s"), -// }, -// drange: Intervals{{300, 500}, {100, 670}}, -// }, -// { -// ms: []labels.Matcher{ -// labels.NewEqualMatcher("a", "c"), -// labels.NewEqualMatcher("instance", "localhost:9090"), -// labels.NewEqualMatcher("job", "prometheus"), -// }, -// drange: Intervals{{300, 400}, {100, 6700}}, -// }, -// // TODO: Add Regexp Matchers. -// } - -// for _, del := range dels { -// // Reset the deletes everytime. -// writeTombstoneFile(hb.dir, newEmptyTombstoneReader()) -// hb.tombstones = newEmptyTombstoneReader() - -// for _, r := range del.drange { -// testutil.Ok(t, hb.Delete(r.Mint, r.Maxt, del.ms...)) -// } - -// matched := labels.Slice{} -// for _, ls := range lbls { -// s := labels.Selector(del.ms) -// if s.Matches(ls) { -// matched = append(matched, ls) -// } -// } - -// sort.Sort(matched) - -// for i := 0; i < numRanges; i++ { -// mint := rand.Int63n(200) -// maxt := mint + rand.Int63n(timeInterval*int64(numDatapoints)) - -// q := hb.Querier(mint, maxt) -// ss := q.Select(del.ms...) - -// // Build the mockSeriesSet. -// matchedSeries := make([]Series, 0, len(matched)) -// for _, m := range matched { -// smpls := boundedSamples(seriesMap[m.String()], mint, maxt) -// smpls = deletedSamples(smpls, del.drange) - -// // Only append those series for which samples exist as mockSeriesSet -// // doesn't skip series with no samples. -// // TODO: But sometimes SeriesSet returns an empty SeriesIterator -// if len(smpls) > 0 { -// matchedSeries = append(matchedSeries, newSeries( -// m.Map(), -// smpls, -// )) -// } -// } -// expSs := newListSeriesSet(matchedSeries) - -// // Compare both SeriesSets. -// for { -// eok, rok := expSs.Next(), ss.Next() - -// // Skip a series if iterator is empty. -// if rok { -// for !ss.At().Iterator().Next() { -// rok = ss.Next() -// if !rok { -// break -// } -// } -// } -// testutil.Equals(t, eok, rok, "next") - -// if !eok { -// break -// } -// sexp := expSs.At() -// sres := ss.At() - -// testutil.Equals(t, sexp.Labels(), sres.Labels(), "labels") - -// smplExp, errExp := expandSeriesIterator(sexp.Iterator()) -// smplRes, errRes := expandSeriesIterator(sres.Iterator()) - -// testutil.Equals(t, errExp, errRes, "samples error") -// testutil.Equals(t, smplExp, smplRes, "samples") -// } -// } -// } - -// return -// } +func TestDeleteUntilCurMax(t *testing.T) { + numSamples := int64(10) + hb, err := NewHead(nil, nil, nil, 1000000) + testutil.Ok(t, err) + app := hb.Appender() + smpls := make([]float64, numSamples) + for i := int64(0); i < numSamples; i++ { + smpls[i] = rand.Float64() + _, err := app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) + testutil.Ok(t, err) + } + testutil.Ok(t, app.Commit()) + testutil.Ok(t, hb.Delete(0, 10000, labels.NewEqualMatcher("a", "b"))) + + // Test the series have been deleted. + q, err := NewBlockQuerier(hb, 0, 100000) + testutil.Ok(t, err) + res, err := q.Select(labels.NewEqualMatcher("a", "b")) + testutil.Ok(t, err) + testutil.Assert(t, !res.Next(), "series didn't get deleted") + + // Add again and test for presence. + app = hb.Appender() + _, err = app.Add(labels.Labels{{"a", "b"}}, 11, 1) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + q, err = NewBlockQuerier(hb, 0, 100000) + testutil.Ok(t, err) + res, err = q.Select(labels.NewEqualMatcher("a", "b")) + testutil.Ok(t, err) + testutil.Assert(t, res.Next(), "series don't exist") + exps := res.At() + it := exps.Iterator() + ressmpls, err := expandSeriesIterator(it) + testutil.Ok(t, err) + testutil.Equals(t, []sample{{11, 1}}, ressmpls) +} +func TestDelete_e2e(t *testing.T) { + numDatapoints := 1000 + numRanges := 1000 + timeInterval := int64(2) + // Create 8 series with 1000 data-points of different ranges, delete and run queries. + lbls := [][]labels.Label{ + { + {"a", "b"}, + {"instance", "localhost:9090"}, + {"job", "prometheus"}, + }, + { + {"a", "b"}, + {"instance", "127.0.0.1:9090"}, + {"job", "prometheus"}, + }, + { + {"a", "b"}, + {"instance", "127.0.0.1:9090"}, + {"job", "prom-k8s"}, + }, + { + {"a", "b"}, + {"instance", "localhost:9090"}, + {"job", "prom-k8s"}, + }, + { + {"a", "c"}, + {"instance", "localhost:9090"}, + {"job", "prometheus"}, + }, + { + {"a", "c"}, + {"instance", "127.0.0.1:9090"}, + {"job", "prometheus"}, + }, + { + {"a", "c"}, + {"instance", "127.0.0.1:9090"}, + {"job", "prom-k8s"}, + }, + { + {"a", "c"}, + {"instance", "localhost:9090"}, + {"job", "prom-k8s"}, + }, + } + seriesMap := map[string][]sample{} + for _, l := range lbls { + seriesMap[labels.New(l...).String()] = []sample{} + } + dir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(dir) + hb, err := NewHead(nil, nil, nil, 100000) + testutil.Ok(t, err) + app := hb.Appender() + for _, l := range lbls { + ls := labels.New(l...) + series := []sample{} + ts := rand.Int63n(300) + for i := 0; i < numDatapoints; i++ { + v := rand.Float64() + _, err := app.Add(ls, ts, v) + testutil.Ok(t, err) + series = append(series, sample{ts, v}) + ts += rand.Int63n(timeInterval) + 1 + } + seriesMap[labels.New(l...).String()] = series + } + testutil.Ok(t, app.Commit()) + // Delete a time-range from each-selector. + dels := []struct { + ms []labels.Matcher + drange Intervals + }{ + { + ms: []labels.Matcher{labels.NewEqualMatcher("a", "b")}, + drange: Intervals{{300, 500}, {600, 670}}, + }, + { + ms: []labels.Matcher{ + labels.NewEqualMatcher("a", "b"), + labels.NewEqualMatcher("job", "prom-k8s"), + }, + drange: Intervals{{300, 500}, {100, 670}}, + }, + { + ms: []labels.Matcher{ + labels.NewEqualMatcher("a", "c"), + labels.NewEqualMatcher("instance", "localhost:9090"), + labels.NewEqualMatcher("job", "prometheus"), + }, + drange: Intervals{{300, 400}, {100, 6700}}, + }, + // TODO: Add Regexp Matchers. + } + for _, del := range dels { + // Reset the deletes everytime. + hb.tombstones = NewMemTombstones() + for _, r := range del.drange { + testutil.Ok(t, hb.Delete(r.Mint, r.Maxt, del.ms...)) + } + matched := labels.Slice{} + for _, ls := range lbls { + s := labels.Selector(del.ms) + if s.Matches(ls) { + matched = append(matched, ls) + } + } + sort.Sort(matched) + for i := 0; i < numRanges; i++ { + q, err := NewBlockQuerier(hb, 0, 100000) + testutil.Ok(t, err) + defer q.Close() + ss, err := q.Select(del.ms...) + testutil.Ok(t, err) + // Build the mockSeriesSet. + matchedSeries := make([]Series, 0, len(matched)) + for _, m := range matched { + smpls := seriesMap[m.String()] + smpls = deletedSamples(smpls, del.drange) + // Only append those series for which samples exist as mockSeriesSet + // doesn't skip series with no samples. + // TODO: But sometimes SeriesSet returns an empty SeriesIterator + if len(smpls) > 0 { + matchedSeries = append(matchedSeries, newSeries( + m.Map(), + smpls, + )) + } + } + expSs := newListSeriesSet(matchedSeries) + // Compare both SeriesSets. + for { + eok, rok := expSs.Next(), ss.Next() + // Skip a series if iterator is empty. + if rok { + for !ss.At().Iterator().Next() { + rok = ss.Next() + if !rok { + break + } + } + } + testutil.Equals(t, eok, rok) + if !eok { + break + } + sexp := expSs.At() + sres := ss.At() + testutil.Equals(t, sexp.Labels(), sres.Labels()) + smplExp, errExp := expandSeriesIterator(sexp.Iterator()) + smplRes, errRes := expandSeriesIterator(sres.Iterator()) + testutil.Equals(t, errExp, errRes) + testutil.Equals(t, smplExp, smplRes) + } + } + } + return +} func boundedSamples(full []sample, mint, maxt int64) []sample { for len(full) > 0 { @@ -808,6 +779,64 @@ func TestGCSeriesAccess(t *testing.T) { testutil.Equals(t, ErrNotFound, err) } +func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { + h, err := NewHead(nil, nil, nil, 1000) + testutil.Ok(t, err) + defer h.Close() + + h.initTime(0) + + app := h.appender() + lset := labels.FromStrings("a", "1") + _, err = app.Add(lset, 2100, 1) + testutil.Ok(t, err) + + testutil.Ok(t, h.Truncate(2000)) + testutil.Assert(t, nil != h.series.getByHash(lset.Hash(), lset), "series should not have been garbage collected") + + testutil.Ok(t, app.Commit()) + + q, err := NewBlockQuerier(h, 1500, 2500) + testutil.Ok(t, err) + defer q.Close() + + ss, err := q.Select(labels.NewEqualMatcher("a", "1")) + testutil.Ok(t, err) + + testutil.Equals(t, true, ss.Next()) +} + +func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { + h, err := NewHead(nil, nil, nil, 1000) + testutil.Ok(t, err) + defer h.Close() + + h.initTime(0) + + app := h.appender() + lset := labels.FromStrings("a", "1") + _, err = app.Add(lset, 2100, 1) + testutil.Ok(t, err) + + testutil.Ok(t, h.Truncate(2000)) + testutil.Assert(t, nil != h.series.getByHash(lset.Hash(), lset), "series should not have been garbage collected") + + testutil.Ok(t, app.Rollback()) + + q, err := NewBlockQuerier(h, 1500, 2500) + testutil.Ok(t, err) + defer q.Close() + + ss, err := q.Select(labels.NewEqualMatcher("a", "1")) + testutil.Ok(t, err) + + testutil.Equals(t, false, ss.Next()) + + // Truncate again, this time the series should be deleted + testutil.Ok(t, h.Truncate(2050)) + testutil.Equals(t, (*memSeries)(nil), h.series.getByHash(lset.Hash(), lset)) +} + func TestHead_LogRollback(t *testing.T) { dir, err := ioutil.TempDir("", "wal_rollback") testutil.Ok(t, err) @@ -829,5 +858,5 @@ func TestHead_LogRollback(t *testing.T) { series, ok := recs[0].([]RefSeries) testutil.Assert(t, ok, "expected series record but got %+v", recs[0]) - testutil.Equals(t, series, []RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}) + testutil.Equals(t, []RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, series) } diff --git a/index/index.go b/index/index.go index c58ff6ea..c75796d7 100644 --- a/index/index.go +++ b/index/index.go @@ -271,7 +271,9 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta } // We add padding to 16 bytes to increase the addressable space we get through 4 byte // series references. - w.addPadding(16) + if err := w.addPadding(16); err != nil { + return errors.Errorf("failed to write padding bytes: %v", err) + } if w.pos%16 != 0 { return errors.Errorf("series write not 16-byte aligned at %d", w.pos) diff --git a/querier_test.go b/querier_test.go index fd8c7dec..3515df3e 100644 --- a/querier_test.go +++ b/querier_test.go @@ -1243,11 +1243,11 @@ func BenchmarkMergedSeriesSet(b *testing.B) { 100, 1000, 10000, - 100000, + 20000, } { for _, j := range []int{1, 2, 4, 8, 16, 32} { b.Run(fmt.Sprintf("series=%d,blocks=%d", k, j), func(b *testing.B) { - lbls, err := labels.ReadLabels("testdata/1m.series", k) + lbls, err := labels.ReadLabels("testdata/20kseries.json", k) testutil.Ok(b, err) sort.Sort(labels.Slice(lbls)) diff --git a/repair_test.go b/repair_test.go index c8097600..b5ecca61 100644 --- a/repair_test.go +++ b/repair_test.go @@ -2,10 +2,10 @@ package tsdb import ( "os" - "reflect" "testing" "github.com/prometheus/tsdb/chunks" + "github.com/prometheus/tsdb/testutil" "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" @@ -49,47 +49,32 @@ func TestRepairBadIndexVersion(t *testing.T) { // In its current state, lookups should fail with the fixed code. const dir = "testdata/repair_index_version/01BZJ9WJQPWHGNC2W4J9TA62KC/" meta, err := readMetaFile(dir) - if err == nil { - t.Fatal("error expected but got none") - } + testutil.NotOk(t, err) // Touch chunks dir in block. os.MkdirAll(dir+"chunks", 0777) r, err := index.NewFileReader(dir + "index") - if err != nil { - t.Fatal(err) - } + testutil.Ok(t, err) p, err := r.Postings("b", "1") - if err != nil { - t.Fatal(err) - } + testutil.Ok(t, err) for p.Next() { t.Logf("next ID %d", p.At()) var lset labels.Labels - if err := r.Series(p.At(), &lset, nil); err == nil { - t.Fatal("expected error but got none") - } - } - if p.Err() != nil { - t.Fatal(err) + testutil.NotOk(t, r.Series(p.At(), &lset, nil)) } + testutil.Ok(t, p.Err()) + testutil.Ok(t, r.Close()) // On DB opening all blocks in the base dir should be repaired. db, err := Open("testdata/repair_index_version", nil, nil, nil) - if err != nil { - t.Fatal(err) - } + testutil.Ok(t, err) db.Close() r, err = index.NewFileReader(dir + "index") - if err != nil { - t.Fatal(err) - } + testutil.Ok(t, err) p, err = r.Postings("b", "1") - if err != nil { - t.Fatal(err) - } + testutil.Ok(t, err) res := []labels.Labels{} for p.Next() { @@ -97,26 +82,17 @@ func TestRepairBadIndexVersion(t *testing.T) { var lset labels.Labels var chks []chunks.Meta - if err := r.Series(p.At(), &lset, &chks); err != nil { - t.Fatal(err) - } + testutil.Ok(t, r.Series(p.At(), &lset, &chks)) res = append(res, lset) } - if p.Err() != nil { - t.Fatal(err) - } - if !reflect.DeepEqual(res, []labels.Labels{ + + testutil.Ok(t, p.Err()) + testutil.Equals(t, []labels.Labels{ {{"a", "1"}, {"b", "1"}}, {{"a", "2"}, {"b", "1"}}, - }) { - t.Fatalf("unexpected result %v", res) - } + }, res) meta, err = readMetaFile(dir) - if err != nil { - t.Fatal(err) - } - if meta.Version != 1 { - t.Fatalf("unexpected meta version %d", meta.Version) - } + testutil.Ok(t, err) + testutil.Assert(t, meta.Version == 1, "unexpected meta version %d", meta.Version) } diff --git a/tombstones.go b/tombstones.go index d4a3d0ef..ad820a05 100644 --- a/tombstones.go +++ b/tombstones.go @@ -16,12 +16,13 @@ package tsdb import ( "encoding/binary" "fmt" - "github.com/pkg/errors" "io" "io/ioutil" "os" "path/filepath" "sync" + + "github.com/pkg/errors" ) const tombstoneFilename = "tombstones" @@ -72,7 +73,7 @@ func writeTombstoneFile(dir string, tr TombstoneReader) error { mw := io.MultiWriter(f, hash) - tr.Iter(func(ref uint64, ivs Intervals) error { + if err := tr.Iter(func(ref uint64, ivs Intervals) error { for _, iv := range ivs { buf.reset() @@ -86,7 +87,9 @@ func writeTombstoneFile(dir string, tr TombstoneReader) error { } } return nil - }) + }); err != nil { + return fmt.Errorf("error writing tombstones: %v", err) + } _, err = f.Write(hash.Sum(nil)) if err != nil { diff --git a/wal.go b/wal.go index 972fdea3..59206d8d 100644 --- a/wal.go +++ b/wal.go @@ -723,6 +723,13 @@ func (w *SegmentWAL) run(interval time.Duration) { // Close syncs all data and closes the underlying resources. func (w *SegmentWAL) Close() error { + // Make sure you can call Close() multiple times. + select { + case <-w.stopc: + return nil // Already closed. + default: + } + close(w.stopc) <-w.donec @@ -735,10 +742,12 @@ func (w *SegmentWAL) Close() error { // On opening, a WAL must be fully consumed once. Afterwards // only the current segment will still be open. if hf := w.head(); hf != nil { - return errors.Wrapf(hf.Close(), "closing WAL head %s", hf.Name()) + if err := hf.Close(); err != nil { + return errors.Wrapf(err, "closing WAL head %s", hf.Name()) + } } - return w.dirFile.Close() + return errors.Wrapf(w.dirFile.Close(), "closing WAL dir %s", w.dirFile.Name()) } const ( @@ -1212,38 +1221,44 @@ func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error { return nil } -// MigrateWAL rewrites the deprecated write ahead log into the new format. -func MigrateWAL(logger log.Logger, dir string) (err error) { - if logger == nil { - logger = log.NewNopLogger() - } +func deprecatedWALExists(logger log.Logger, dir string) (bool, error) { // Detect whether we still have the old WAL. fns, err := sequenceFiles(dir) if err != nil && !os.IsNotExist(err) { - return errors.Wrap(err, "list sequence files") + return false, errors.Wrap(err, "list sequence files") } if len(fns) == 0 { - return nil // No WAL at all yet. + return false, nil // No WAL at all yet. } // Check header of first segment to see whether we are still dealing with an // old WAL. f, err := os.Open(fns[0]) if err != nil { - return errors.Wrap(err, "check first existing segment") + return false, errors.Wrap(err, "check first existing segment") } defer f.Close() var hdr [4]byte if _, err := f.Read(hdr[:]); err != nil && err != io.EOF { - return errors.Wrap(err, "read header from first segment") + return false, errors.Wrap(err, "read header from first segment") } // If we cannot read the magic header for segments of the old WAL, abort. // Either it's migrated already or there's a corruption issue with which // we cannot deal here anyway. Subsequent attempts to open the WAL will error in that case. if binary.BigEndian.Uint32(hdr[:]) != WALMagic { - return nil + return false, nil } + return true, nil +} +// MigrateWAL rewrites the deprecated write ahead log into the new format. +func MigrateWAL(logger log.Logger, dir string) (err error) { + if logger == nil { + logger = log.NewNopLogger() + } + if exists, err := deprecatedWALExists(logger, dir); err != nil || !exists { + return err + } level.Info(logger).Log("msg", "migrating WAL format") tmpdir := dir + ".tmp" @@ -1254,6 +1269,7 @@ func MigrateWAL(logger log.Logger, dir string) (err error) { if err != nil { return errors.Wrap(err, "open new WAL") } + // It should've already been closed as part of the previous finalization. // Do it once again in case of prior errors. defer func() { @@ -1300,6 +1316,12 @@ func MigrateWAL(logger log.Logger, dir string) (err error) { if err != nil { return errors.Wrap(err, "write new entries") } + // We explicitly close even when there is a defer for Windows to be + // able to delete it. The defer is in place to close it in-case there + // are errors above. + if err := w.Close(); err != nil { + return errors.Wrap(err, "close old WAL") + } if err := repl.Close(); err != nil { return errors.Wrap(err, "close new WAL") } diff --git a/wal/wal.go b/wal/wal.go index 0e95ba2a..aa52738f 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -255,15 +255,17 @@ Loop: // Repair attempts to repair the WAL based on the error. // It discards all data after the corruption. -func (w *WAL) Repair(err error) error { +func (w *WAL) Repair(origErr error) error { // We could probably have a mode that only discards torn records right around // the corruption to preserve as data much as possible. // But that's not generally applicable if the records have any kind of causality. // Maybe as an extra mode in the future if mid-WAL corruptions become // a frequent concern. + err := errors.Cause(origErr) // So that we can pick up errors even if wrapped. + cerr, ok := err.(*CorruptionErr) if !ok { - return errors.New("cannot handle error") + return errors.Wrap(origErr, "cannot handle error") } if cerr.Segment < 0 { return errors.New("corruption error does not specify position") @@ -283,6 +285,15 @@ func (w *WAL) Repair(err error) error { if s.n <= cerr.Segment { continue } + if w.segment.i == s.n { + // The active segment needs to be removed, + // close it first (Windows!). Can be closed safely + // as we set the current segment to repaired file + // below. + if err := w.segment.Close(); err != nil { + return errors.Wrap(err, "close active segment") + } + } if err := os.Remove(filepath.Join(w.dir, s.s)); err != nil { return errors.Wrap(err, "delete segment") } @@ -310,6 +321,7 @@ func (w *WAL) Repair(err error) error { return errors.Wrap(err, "open segment") } defer f.Close() + r := NewReader(bufio.NewReader(f)) for r.Next() { @@ -317,8 +329,14 @@ func (w *WAL) Repair(err error) error { return errors.Wrap(err, "insert record") } } - // We expect an error here, so nothing to handle. + // We expect an error here from r.Err(), so nothing to handle. + // We explicitly close even when there is a defer for Windows to be + // able to delete it. The defer is in place to close it in-case there + // are errors above. + if err := f.Close(); err != nil { + return errors.Wrap(err, "close corrupted file") + } if err := os.Remove(tmpfn); err != nil { return errors.Wrap(err, "delete corrupted segment") } diff --git a/wal/wal_test.go b/wal/wal_test.go index d1b724c7..72f46253 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -23,6 +23,7 @@ import ( "os" "testing" + "github.com/pkg/errors" "github.com/prometheus/tsdb/testutil" ) @@ -286,9 +287,15 @@ func TestWAL_Repair(t *testing.T) { for r.Next() { } testutil.NotOk(t, r.Err()) - + testutil.Ok(t, sr.Close()) testutil.Ok(t, w.Repair(r.Err())) + // See https://github.com/prometheus/prometheus/issues/4603 + // We need to close w.segment because it needs to be deleted. + // But this is to mainly artificially test Repair() again. + testutil.Ok(t, w.segment.Close()) + testutil.Ok(t, w.Repair(errors.Wrap(r.Err(), "err"))) + sr, err = NewSegmentsReader(dir) testutil.Ok(t, err) r = NewReader(sr) diff --git a/wal_test.go b/wal_test.go index b16680a9..e145188d 100644 --- a/wal_test.go +++ b/wal_test.go @@ -66,7 +66,7 @@ func TestSegmentWAL_cut(t *testing.T) { et, flag, b, err := newWALReader(nil, nil).entry(f) testutil.Ok(t, err) testutil.Equals(t, WALEntrySeries, et) - testutil.Equals(t, flag, byte(walSeriesSimple)) + testutil.Equals(t, byte(walSeriesSimple), flag) testutil.Equals(t, []byte("Hello World!!"), b) } }