From 86929113488c4160ade17f9f15dc2b2e8915d367 Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Thu, 6 Apr 2023 14:31:10 -0400 Subject: [PATCH] db: add Compact.Duration metric Add a new Compact.Duration metric that measures the cumulative time spent in compactions since Open. This may be used in combination with Metrics.Uptime to compute the effective compaction concurrency (#1934). Callers may compute effective compaction concurrency over an interval by measuring both Uptime and Compact.Duration at the beginning and end of the interval and subtracting. --- compaction.go | 29 +++++++++++++++++++++-------- compaction_picker_test.go | 5 +++-- compaction_test.go | 4 ++-- data_test.go | 3 ++- db.go | 10 ++++++++++ metrics.go | 3 +++ open.go | 3 ++- testdata/event_listener | 12 ++++++------ 8 files changed, 49 insertions(+), 20 deletions(-) diff --git a/compaction.go b/compaction.go index 330e5126bf..d7c34eddc7 100644 --- a/compaction.go +++ b/compaction.go @@ -555,6 +555,7 @@ type compaction struct { logger Logger version *version stats base.InternalIteratorStats + beganAt time.Time score float64 @@ -683,7 +684,7 @@ func (c *compaction) makeInfo(jobID int) CompactionInfo { return info } -func newCompaction(pc *pickedCompaction, opts *Options) *compaction { +func newCompaction(pc *pickedCompaction, opts *Options, beganAt time.Time) *compaction { c := &compaction{ kind: compactionKindDefault, cmp: pc.cmp, @@ -696,6 +697,7 @@ func newCompaction(pc *pickedCompaction, opts *Options) *compaction { largest: pc.largest, logger: opts.Logger, version: pc.version, + beganAt: beganAt, maxOutputFileSize: pc.maxOutputFileSize, maxOverlapBytes: pc.maxOverlapBytes, l0SublevelInfo: pc.l0SublevelInfo, @@ -727,7 +729,9 @@ func newCompaction(pc *pickedCompaction, opts *Options) *compaction { return c } -func newDeleteOnlyCompaction(opts *Options, cur *version, inputs []compactionLevel) *compaction { +func newDeleteOnlyCompaction( + opts *Options, cur *version, inputs []compactionLevel, beganAt time.Time, +) *compaction { c := &compaction{ kind: compactionKindDeleteOnly, cmp: opts.Comparer.Compare, @@ -736,6 +740,7 @@ func newDeleteOnlyCompaction(opts *Options, cur *version, inputs []compactionLev formatKey: opts.Comparer.FormatKey, logger: opts.Logger, version: cur, + beganAt: beganAt, inputs: inputs, } @@ -815,7 +820,9 @@ func adjustGrandparentOverlapBytesForFlush(c *compaction, flushingBytes uint64) } } -func newFlush(opts *Options, cur *version, baseLevel int, flushing flushableList) *compaction { +func newFlush( + opts *Options, cur *version, baseLevel int, flushing flushableList, beganAt time.Time, +) *compaction { c := &compaction{ kind: compactionKindFlush, cmp: opts.Comparer.Compare, @@ -824,6 +831,7 @@ func newFlush(opts *Options, cur *version, baseLevel int, flushing flushableList formatKey: opts.Comparer.FormatKey, logger: opts.Logger, version: cur, + beganAt: beganAt, inputs: []compactionLevel{{level: -1}, {level: 0}}, maxOutputFileSize: math.MaxUint64, maxOverlapBytes: math.MaxUint64, @@ -1662,6 +1670,11 @@ func (d *DB) removeInProgressCompaction(c *compaction, rollback bool) { } } delete(d.mu.compact.inProgress, c) + // Add this compaction's duration to the cumulative duration. We do this + // here to ensure it's atomic with the removal of the compaction from + // in-progress compactions. This ensures Metrics.Compact.Duration does not + // miss or double count a completing compaction's duration. + d.mu.compact.duration += d.timeNow().Sub(c.beganAt) l0InProgress := inProgressL0Compactions(d.getInProgressCompactionInfoLocked(c)) d.mu.versions.currentVersion().L0Sublevels.InitCompactingFileInfo(l0InProgress) @@ -1953,7 +1966,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { } c := newFlush(d.opts, d.mu.versions.currentVersion(), - d.mu.versions.picker.getBaseLevel(), d.mu.mem.queue[:n]) + d.mu.versions.picker.getBaseLevel(), d.mu.mem.queue[:n], d.timeNow()) d.addInProgressCompaction(c) jobID := d.mu.nextJobID @@ -2176,7 +2189,7 @@ func (d *DB) maybeScheduleCompactionPicker( d.mu.compact.deletionHints = unresolvedHints if len(inputs) > 0 { - c := newDeleteOnlyCompaction(d.opts, v, inputs) + c := newDeleteOnlyCompaction(d.opts, v, inputs, d.timeNow()) d.mu.compact.compactingCount++ d.addInProgressCompaction(c) go d.compact(c, nil) @@ -2188,7 +2201,7 @@ func (d *DB) maybeScheduleCompactionPicker( env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil) pc, retryLater := d.mu.versions.picker.pickManual(env, manual) if pc != nil { - c := newCompaction(pc, d.opts) + c := newCompaction(pc, d.opts, d.timeNow()) d.mu.compact.manual = d.mu.compact.manual[1:] d.mu.compact.compactingCount++ d.addInProgressCompaction(c) @@ -2215,7 +2228,7 @@ func (d *DB) maybeScheduleCompactionPicker( if pc == nil { break } - c := newCompaction(pc, d.opts) + c := newCompaction(pc, d.opts, d.timeNow()) d.mu.compact.compactingCount++ d.addInProgressCompaction(c) go d.compact(c, nil) @@ -2566,7 +2579,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) { d.mu.versions.incrementCompactions(c.kind, c.extraLevels) d.mu.versions.incrementCompactionBytes(-c.bytesWritten) - info.TotalDuration = d.timeNow().Sub(startTime) + info.TotalDuration = d.timeNow().Sub(c.beganAt) d.opts.EventListener.CompactionEnd(info) // Update the read state before deleting obsolete files because the diff --git a/compaction_picker_test.go b/compaction_picker_test.go index b10ab0c95c..863ec54bc1 100644 --- a/compaction_picker_test.go +++ b/compaction_picker_test.go @@ -12,6 +12,7 @@ import ( "strconv" "strings" "testing" + "time" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" @@ -530,7 +531,7 @@ func TestCompactionPickerL0(t *testing.T) { var result strings.Builder if pc != nil { checkClone(t, pc) - c := newCompaction(pc, opts) + c := newCompaction(pc, opts, time.Now()) fmt.Fprintf(&result, "L%d -> L%d\n", pc.startLevel.level, pc.outputLevel.level) fmt.Fprintf(&result, "L%d: %s\n", pc.startLevel.level, fileNums(pc.startLevel.files)) if !pc.outputLevel.files.Empty() { @@ -772,7 +773,7 @@ func TestCompactionPickerConcurrency(t *testing.T) { }) var result strings.Builder if pc != nil { - c := newCompaction(pc, opts) + c := newCompaction(pc, opts, time.Now()) fmt.Fprintf(&result, "L%d -> L%d\n", pc.startLevel.level, pc.outputLevel.level) fmt.Fprintf(&result, "L%d: %s\n", pc.startLevel.level, fileNums(pc.startLevel.files)) if !pc.outputLevel.files.Empty() { diff --git a/compaction_test.go b/compaction_test.go index 50176938ca..bbf661d876 100644 --- a/compaction_test.go +++ b/compaction_test.go @@ -525,7 +525,7 @@ func TestPickCompaction(t *testing.T) { vs.picker = &tc.picker pc, got := vs.picker.pickAuto(compactionEnv{}), "" if pc != nil { - c := newCompaction(pc, opts) + c := newCompaction(pc, opts, time.Now()) got0 := fileNums(c.startLevel.files) got1 := fileNums(c.outputLevel.files) got2 := fileNums(c.grandparents) @@ -1852,7 +1852,7 @@ func TestCompactionOutputLevel(t *testing.T) { d.ScanArgs(t, "start", &start) d.ScanArgs(t, "base", &base) pc := newPickedCompaction(opts, version, start, defaultOutputLevel(start, base), base) - c := newCompaction(pc, opts) + c := newCompaction(pc, opts, time.Now()) return fmt.Sprintf("output=%d\nmax-output-file-size=%d\n", c.outputLevel.level, c.maxOutputFileSize) diff --git a/data_test.go b/data_test.go index 6118fd30d1..18e4beafd7 100644 --- a/data_test.go +++ b/data_test.go @@ -14,6 +14,7 @@ import ( "strconv" "strings" "testing" + "time" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" @@ -850,7 +851,7 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) { flushed: make(chan struct{}), }} c := newFlush(d.opts, d.mu.versions.currentVersion(), - d.mu.versions.picker.getBaseLevel(), toFlush) + d.mu.versions.picker.getBaseLevel(), toFlush, time.Now()) c.disableSpanElision = true // NB: define allows the test to exactly specify which keys go // into which sstables. If the test has a small target file diff --git a/db.go b/db.go index 2aafd6c6b1..475a4649bd 100644 --- a/db.go +++ b/db.go @@ -413,6 +413,9 @@ type DB struct { // compactions which we might have to perform. readCompactions readCompactionQueue + // The cumulative duration of all completed compactions since Open. + // Does not include flushes. + duration time.Duration // Flush throughput metric. flushWriteThroughput ThroughputMetric // The idle start time for the flush "loop", i.e., when the flushing @@ -1718,6 +1721,13 @@ func (d *DB) Metrics() *Metrics { metrics.Compact.InProgressBytes = d.mu.versions.atomicInProgressBytes.Load() metrics.Compact.NumInProgress = int64(d.mu.compact.compactingCount) metrics.Compact.MarkedFiles = vers.Stats.MarkedForCompaction + metrics.Compact.Duration = d.mu.compact.duration + for c := range d.mu.compact.inProgress { + if c.kind != compactionKindFlush { + metrics.Compact.Duration += d.timeNow().Sub(c.beganAt) + } + } + for _, m := range d.mu.mem.queue { metrics.MemTable.Size += m.totalBytes() } diff --git a/metrics.go b/metrics.go index 326e3d8b76..cd90d03515 100644 --- a/metrics.go +++ b/metrics.go @@ -179,6 +179,9 @@ type Metrics struct { // compaction. Such files are compacted in a rewrite compaction // when no other compactions are picked. MarkedFiles int + // Duration records the cumulative duration of all compactions since the + // database was opened. + Duration time.Duration } Flush struct { diff --git a/open.go b/open.go index cd5c91d562..ef6ac38fdb 100644 --- a/open.go +++ b/open.go @@ -713,7 +713,7 @@ func (d *DB) replayWAL( // TODO(bananabrick): See if we can use the actual base level here, // instead of using 1. c := newFlush(d.opts, d.mu.versions.currentVersion(), - 1 /* base level */, toFlush) + 1 /* base level */, toFlush, d.timeNow()) newVE, _, _, err := d.runCompaction(jobID, c) if err != nil { return errors.Wrapf(err, "running compaction during WAL replay") @@ -842,6 +842,7 @@ func (d *DB) replayWAL( d.opts, d.mu.versions.currentVersion(), 1, /* base level */ []*flushableEntry{entry}, + d.timeNow(), ) for _, file := range c.flushing[0].flushable.(*ingestedFlushable).files { ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: 0, Meta: file.FileMetadata}) diff --git a/testdata/event_listener b/testdata/event_listener index ed4fff0ffe..f9562b3be7 100644 --- a/testdata/event_listener +++ b/testdata/event_listener @@ -121,7 +121,7 @@ close: db/marker.manifest.000002.MANIFEST-000006 remove: db/marker.manifest.000001.MANIFEST-000001 sync: db [JOB 5] MANIFEST created 000006 -[JOB 5] flushed 1 memtable to L0 [000005] (770 B), in 1.0s (2.0s total), output rate 770 B/s +[JOB 5] flushed 1 memtable to L0 [000005] (770 B), in 1.0s (3.0s total), output rate 770 B/s compact ---- @@ -145,7 +145,7 @@ close: db/marker.manifest.000003.MANIFEST-000009 remove: db/marker.manifest.000002.MANIFEST-000006 sync: db [JOB 7] MANIFEST created 000009 -[JOB 7] flushed 1 memtable to L0 [000008] (770 B), in 1.0s (2.0s total), output rate 770 B/s +[JOB 7] flushed 1 memtable to L0 [000008] (770 B), in 1.0s (3.0s total), output rate 770 B/s remove: db/MANIFEST-000001 [JOB 7] MANIFEST deleted 000001 [JOB 8] compacting(default) L0 [000005 000008] (1.5 K) + L6 [] (0 B) @@ -162,7 +162,7 @@ close: db/marker.manifest.000004.MANIFEST-000011 remove: db/marker.manifest.000003.MANIFEST-000009 sync: db [JOB 8] MANIFEST created 000011 -[JOB 8] compacted(default) L0 [000005 000008] (1.5 K) + L6 [] (0 B) -> L6 [000010] (770 B), in 1.0s (2.0s total), output rate 770 B/s +[JOB 8] compacted(default) L0 [000005 000008] (1.5 K) + L6 [] (0 B) -> L6 [000010] (770 B), in 1.0s (4.0s total), output rate 770 B/s remove: db/000005.sst [JOB 8] sstable deleted 000005 remove: db/000008.sst @@ -195,7 +195,7 @@ close: db/marker.manifest.000005.MANIFEST-000014 remove: db/marker.manifest.000004.MANIFEST-000011 sync: db [JOB 10] MANIFEST created 000014 -[JOB 10] flushed 1 memtable to L0 [000013] (770 B), in 1.0s (2.0s total), output rate 770 B/s +[JOB 10] flushed 1 memtable to L0 [000013] (770 B), in 1.0s (3.0s total), output rate 770 B/s enable-file-deletions ---- @@ -282,7 +282,7 @@ sync-data: db/000022.sst close: db/000022.sst sync: db sync: db/MANIFEST-000016 -[JOB 17] flushed 1 memtable to L0 [000022] (770 B), in 1.0s (2.0s total), output rate 770 B/s +[JOB 17] flushed 1 memtable to L0 [000022] (770 B), in 1.0s (3.0s total), output rate 770 B/s [JOB 18] flushing 2 ingested tables create: db/MANIFEST-000023 close: db/MANIFEST-000016 @@ -292,7 +292,7 @@ close: db/marker.manifest.000007.MANIFEST-000023 remove: db/marker.manifest.000006.MANIFEST-000016 sync: db [JOB 18] MANIFEST created 000023 -[JOB 18] flushed 2 ingested flushables L0:000017 (826 B) + L6:000018 (826 B) in 1.0s (2.0s total), output rate 1.6 K/s +[JOB 18] flushed 2 ingested flushables L0:000017 (826 B) + L6:000018 (826 B) in 1.0s (3.0s total), output rate 1.6 K/s remove: db/MANIFEST-000014 [JOB 18] MANIFEST deleted 000014 [JOB 19] flushing 1 memtable to L0