Skip to content

Commit

Permalink
db: add Compact.Duration metric
Browse files Browse the repository at this point in the history
Add a new Compact.Duration metric that measures the cumulative time spent in
compactions since Open. This may be used in combination with Metrics.Uptime to
compute the effective compaction concurrency (cockroachdb#1934). Callers may compute
effective compaction concurrency over an interval by measuring both Uptime and
Compact.Duration at the beginning and end of the interval and subtracting.
  • Loading branch information
jbowens committed Apr 6, 2023
1 parent c2f8804 commit 8692911
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 20 deletions.
29 changes: 21 additions & 8 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,7 @@ type compaction struct {
logger Logger
version *version
stats base.InternalIteratorStats
beganAt time.Time

score float64

Expand Down Expand Up @@ -683,7 +684,7 @@ func (c *compaction) makeInfo(jobID int) CompactionInfo {
return info
}

func newCompaction(pc *pickedCompaction, opts *Options) *compaction {
func newCompaction(pc *pickedCompaction, opts *Options, beganAt time.Time) *compaction {
c := &compaction{
kind: compactionKindDefault,
cmp: pc.cmp,
Expand All @@ -696,6 +697,7 @@ func newCompaction(pc *pickedCompaction, opts *Options) *compaction {
largest: pc.largest,
logger: opts.Logger,
version: pc.version,
beganAt: beganAt,
maxOutputFileSize: pc.maxOutputFileSize,
maxOverlapBytes: pc.maxOverlapBytes,
l0SublevelInfo: pc.l0SublevelInfo,
Expand Down Expand Up @@ -727,7 +729,9 @@ func newCompaction(pc *pickedCompaction, opts *Options) *compaction {
return c
}

func newDeleteOnlyCompaction(opts *Options, cur *version, inputs []compactionLevel) *compaction {
func newDeleteOnlyCompaction(
opts *Options, cur *version, inputs []compactionLevel, beganAt time.Time,
) *compaction {
c := &compaction{
kind: compactionKindDeleteOnly,
cmp: opts.Comparer.Compare,
Expand All @@ -736,6 +740,7 @@ func newDeleteOnlyCompaction(opts *Options, cur *version, inputs []compactionLev
formatKey: opts.Comparer.FormatKey,
logger: opts.Logger,
version: cur,
beganAt: beganAt,
inputs: inputs,
}

Expand Down Expand Up @@ -815,7 +820,9 @@ func adjustGrandparentOverlapBytesForFlush(c *compaction, flushingBytes uint64)
}
}

func newFlush(opts *Options, cur *version, baseLevel int, flushing flushableList) *compaction {
func newFlush(
opts *Options, cur *version, baseLevel int, flushing flushableList, beganAt time.Time,
) *compaction {
c := &compaction{
kind: compactionKindFlush,
cmp: opts.Comparer.Compare,
Expand All @@ -824,6 +831,7 @@ func newFlush(opts *Options, cur *version, baseLevel int, flushing flushableList
formatKey: opts.Comparer.FormatKey,
logger: opts.Logger,
version: cur,
beganAt: beganAt,
inputs: []compactionLevel{{level: -1}, {level: 0}},
maxOutputFileSize: math.MaxUint64,
maxOverlapBytes: math.MaxUint64,
Expand Down Expand Up @@ -1662,6 +1670,11 @@ func (d *DB) removeInProgressCompaction(c *compaction, rollback bool) {
}
}
delete(d.mu.compact.inProgress, c)
// Add this compaction's duration to the cumulative duration. We do this
// here to ensure it's atomic with the removal of the compaction from
// in-progress compactions. This ensures Metrics.Compact.Duration does not
// miss or double count a completing compaction's duration.
d.mu.compact.duration += d.timeNow().Sub(c.beganAt)

l0InProgress := inProgressL0Compactions(d.getInProgressCompactionInfoLocked(c))
d.mu.versions.currentVersion().L0Sublevels.InitCompactingFileInfo(l0InProgress)
Expand Down Expand Up @@ -1953,7 +1966,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
}

c := newFlush(d.opts, d.mu.versions.currentVersion(),
d.mu.versions.picker.getBaseLevel(), d.mu.mem.queue[:n])
d.mu.versions.picker.getBaseLevel(), d.mu.mem.queue[:n], d.timeNow())
d.addInProgressCompaction(c)

jobID := d.mu.nextJobID
Expand Down Expand Up @@ -2176,7 +2189,7 @@ func (d *DB) maybeScheduleCompactionPicker(
d.mu.compact.deletionHints = unresolvedHints

if len(inputs) > 0 {
c := newDeleteOnlyCompaction(d.opts, v, inputs)
c := newDeleteOnlyCompaction(d.opts, v, inputs, d.timeNow())
d.mu.compact.compactingCount++
d.addInProgressCompaction(c)
go d.compact(c, nil)
Expand All @@ -2188,7 +2201,7 @@ func (d *DB) maybeScheduleCompactionPicker(
env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
pc, retryLater := d.mu.versions.picker.pickManual(env, manual)
if pc != nil {
c := newCompaction(pc, d.opts)
c := newCompaction(pc, d.opts, d.timeNow())
d.mu.compact.manual = d.mu.compact.manual[1:]
d.mu.compact.compactingCount++
d.addInProgressCompaction(c)
Expand All @@ -2215,7 +2228,7 @@ func (d *DB) maybeScheduleCompactionPicker(
if pc == nil {
break
}
c := newCompaction(pc, d.opts)
c := newCompaction(pc, d.opts, d.timeNow())
d.mu.compact.compactingCount++
d.addInProgressCompaction(c)
go d.compact(c, nil)
Expand Down Expand Up @@ -2566,7 +2579,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
d.mu.versions.incrementCompactions(c.kind, c.extraLevels)
d.mu.versions.incrementCompactionBytes(-c.bytesWritten)

info.TotalDuration = d.timeNow().Sub(startTime)
info.TotalDuration = d.timeNow().Sub(c.beganAt)
d.opts.EventListener.CompactionEnd(info)

// Update the read state before deleting obsolete files because the
Expand Down
5 changes: 3 additions & 2 deletions compaction_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"strconv"
"strings"
"testing"
"time"

"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -530,7 +531,7 @@ func TestCompactionPickerL0(t *testing.T) {
var result strings.Builder
if pc != nil {
checkClone(t, pc)
c := newCompaction(pc, opts)
c := newCompaction(pc, opts, time.Now())
fmt.Fprintf(&result, "L%d -> L%d\n", pc.startLevel.level, pc.outputLevel.level)
fmt.Fprintf(&result, "L%d: %s\n", pc.startLevel.level, fileNums(pc.startLevel.files))
if !pc.outputLevel.files.Empty() {
Expand Down Expand Up @@ -772,7 +773,7 @@ func TestCompactionPickerConcurrency(t *testing.T) {
})
var result strings.Builder
if pc != nil {
c := newCompaction(pc, opts)
c := newCompaction(pc, opts, time.Now())
fmt.Fprintf(&result, "L%d -> L%d\n", pc.startLevel.level, pc.outputLevel.level)
fmt.Fprintf(&result, "L%d: %s\n", pc.startLevel.level, fileNums(pc.startLevel.files))
if !pc.outputLevel.files.Empty() {
Expand Down
4 changes: 2 additions & 2 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ func TestPickCompaction(t *testing.T) {
vs.picker = &tc.picker
pc, got := vs.picker.pickAuto(compactionEnv{}), ""
if pc != nil {
c := newCompaction(pc, opts)
c := newCompaction(pc, opts, time.Now())
got0 := fileNums(c.startLevel.files)
got1 := fileNums(c.outputLevel.files)
got2 := fileNums(c.grandparents)
Expand Down Expand Up @@ -1852,7 +1852,7 @@ func TestCompactionOutputLevel(t *testing.T) {
d.ScanArgs(t, "start", &start)
d.ScanArgs(t, "base", &base)
pc := newPickedCompaction(opts, version, start, defaultOutputLevel(start, base), base)
c := newCompaction(pc, opts)
c := newCompaction(pc, opts, time.Now())
return fmt.Sprintf("output=%d\nmax-output-file-size=%d\n",
c.outputLevel.level, c.maxOutputFileSize)

Expand Down
3 changes: 2 additions & 1 deletion data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"strconv"
"strings"
"testing"
"time"

"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -850,7 +851,7 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) {
flushed: make(chan struct{}),
}}
c := newFlush(d.opts, d.mu.versions.currentVersion(),
d.mu.versions.picker.getBaseLevel(), toFlush)
d.mu.versions.picker.getBaseLevel(), toFlush, time.Now())
c.disableSpanElision = true
// NB: define allows the test to exactly specify which keys go
// into which sstables. If the test has a small target file
Expand Down
10 changes: 10 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,9 @@ type DB struct {
// compactions which we might have to perform.
readCompactions readCompactionQueue

// The cumulative duration of all completed compactions since Open.
// Does not include flushes.
duration time.Duration
// Flush throughput metric.
flushWriteThroughput ThroughputMetric
// The idle start time for the flush "loop", i.e., when the flushing
Expand Down Expand Up @@ -1718,6 +1721,13 @@ func (d *DB) Metrics() *Metrics {
metrics.Compact.InProgressBytes = d.mu.versions.atomicInProgressBytes.Load()
metrics.Compact.NumInProgress = int64(d.mu.compact.compactingCount)
metrics.Compact.MarkedFiles = vers.Stats.MarkedForCompaction
metrics.Compact.Duration = d.mu.compact.duration
for c := range d.mu.compact.inProgress {
if c.kind != compactionKindFlush {
metrics.Compact.Duration += d.timeNow().Sub(c.beganAt)
}
}

for _, m := range d.mu.mem.queue {
metrics.MemTable.Size += m.totalBytes()
}
Expand Down
3 changes: 3 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ type Metrics struct {
// compaction. Such files are compacted in a rewrite compaction
// when no other compactions are picked.
MarkedFiles int
// Duration records the cumulative duration of all compactions since the
// database was opened.
Duration time.Duration
}

Flush struct {
Expand Down
3 changes: 2 additions & 1 deletion open.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ func (d *DB) replayWAL(
// TODO(bananabrick): See if we can use the actual base level here,
// instead of using 1.
c := newFlush(d.opts, d.mu.versions.currentVersion(),
1 /* base level */, toFlush)
1 /* base level */, toFlush, d.timeNow())
newVE, _, _, err := d.runCompaction(jobID, c)
if err != nil {
return errors.Wrapf(err, "running compaction during WAL replay")
Expand Down Expand Up @@ -842,6 +842,7 @@ func (d *DB) replayWAL(
d.opts, d.mu.versions.currentVersion(),
1, /* base level */
[]*flushableEntry{entry},
d.timeNow(),
)
for _, file := range c.flushing[0].flushable.(*ingestedFlushable).files {
ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: 0, Meta: file.FileMetadata})
Expand Down
12 changes: 6 additions & 6 deletions testdata/event_listener
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ close: db/marker.manifest.000002.MANIFEST-000006
remove: db/marker.manifest.000001.MANIFEST-000001
sync: db
[JOB 5] MANIFEST created 000006
[JOB 5] flushed 1 memtable to L0 [000005] (770 B), in 1.0s (2.0s total), output rate 770 B/s
[JOB 5] flushed 1 memtable to L0 [000005] (770 B), in 1.0s (3.0s total), output rate 770 B/s

compact
----
Expand All @@ -145,7 +145,7 @@ close: db/marker.manifest.000003.MANIFEST-000009
remove: db/marker.manifest.000002.MANIFEST-000006
sync: db
[JOB 7] MANIFEST created 000009
[JOB 7] flushed 1 memtable to L0 [000008] (770 B), in 1.0s (2.0s total), output rate 770 B/s
[JOB 7] flushed 1 memtable to L0 [000008] (770 B), in 1.0s (3.0s total), output rate 770 B/s
remove: db/MANIFEST-000001
[JOB 7] MANIFEST deleted 000001
[JOB 8] compacting(default) L0 [000005 000008] (1.5 K) + L6 [] (0 B)
Expand All @@ -162,7 +162,7 @@ close: db/marker.manifest.000004.MANIFEST-000011
remove: db/marker.manifest.000003.MANIFEST-000009
sync: db
[JOB 8] MANIFEST created 000011
[JOB 8] compacted(default) L0 [000005 000008] (1.5 K) + L6 [] (0 B) -> L6 [000010] (770 B), in 1.0s (2.0s total), output rate 770 B/s
[JOB 8] compacted(default) L0 [000005 000008] (1.5 K) + L6 [] (0 B) -> L6 [000010] (770 B), in 1.0s (4.0s total), output rate 770 B/s
remove: db/000005.sst
[JOB 8] sstable deleted 000005
remove: db/000008.sst
Expand Down Expand Up @@ -195,7 +195,7 @@ close: db/marker.manifest.000005.MANIFEST-000014
remove: db/marker.manifest.000004.MANIFEST-000011
sync: db
[JOB 10] MANIFEST created 000014
[JOB 10] flushed 1 memtable to L0 [000013] (770 B), in 1.0s (2.0s total), output rate 770 B/s
[JOB 10] flushed 1 memtable to L0 [000013] (770 B), in 1.0s (3.0s total), output rate 770 B/s

enable-file-deletions
----
Expand Down Expand Up @@ -282,7 +282,7 @@ sync-data: db/000022.sst
close: db/000022.sst
sync: db
sync: db/MANIFEST-000016
[JOB 17] flushed 1 memtable to L0 [000022] (770 B), in 1.0s (2.0s total), output rate 770 B/s
[JOB 17] flushed 1 memtable to L0 [000022] (770 B), in 1.0s (3.0s total), output rate 770 B/s
[JOB 18] flushing 2 ingested tables
create: db/MANIFEST-000023
close: db/MANIFEST-000016
Expand All @@ -292,7 +292,7 @@ close: db/marker.manifest.000007.MANIFEST-000023
remove: db/marker.manifest.000006.MANIFEST-000016
sync: db
[JOB 18] MANIFEST created 000023
[JOB 18] flushed 2 ingested flushables L0:000017 (826 B) + L6:000018 (826 B) in 1.0s (2.0s total), output rate 1.6 K/s
[JOB 18] flushed 2 ingested flushables L0:000017 (826 B) + L6:000018 (826 B) in 1.0s (3.0s total), output rate 1.6 K/s
remove: db/MANIFEST-000014
[JOB 18] MANIFEST deleted 000014
[JOB 19] flushing 1 memtable to L0
Expand Down

0 comments on commit 8692911

Please sign in to comment.