Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ethdb/pebble: several updates for pebble #49

Merged
merged 3 commits into from
Jan 31, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 87 additions & 82 deletions ethdb/pebble/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ import (
)

const (
// minCache is the minimum amount of memory in megabytes to allocate to leveldb
// minCache is the minimum amount of memory in megabytes to allocate to pebble
// read and write caching, split half and half.
minCache = 16

// minHandles is the minimum number of files handles to allocate to the open
// database files.
minHandles = 16

// metricsGatheringInterval specifies the interval to retrieve leveldb database
// metricsGatheringInterval specifies the interval to retrieve pebble database
// compaction, io and pause stats to report to the user.
metricsGatheringInterval = 3 * time.Second
)
Expand Down Expand Up @@ -78,7 +78,6 @@ type Database struct {
activeComp int // current number of active compactions
compStartTime time.Time // the start time of the earliest currently-active compaction
compTime int64 // total time spent in compaction in ns
seekCompCount int64 // total number of compactions caused by reads
level0Comp uint32 // total number of level-zero compactions
nonLevel0Comp uint32 // total number of non level-zero compactions
writeDelayStartTime time.Time // the start time of the latest write stall
Expand All @@ -90,16 +89,11 @@ func (d *Database) onCompactionBegin(info pebble.CompactionInfo) {
if d.activeComp == 0 {
d.compStartTime = time.Now()
}
if info.Reason == "read" {
atomic.AddInt64(&d.seekCompCount, 1)
}

for _, level := range info.Input {
if level.Level == 0 {
atomic.AddUint32(&d.level0Comp, 1)
} else {
atomic.AddUint32(&d.nonLevel0Comp, 1)
}
l0 := info.Input[0]
if l0.Level == 0 {
atomic.AddUint32(&d.level0Comp, 1)
} else {
atomic.AddUint32(&d.nonLevel0Comp, 1)
}
d.activeComp++
}
Expand All @@ -110,7 +104,6 @@ func (d *Database) onCompactionEnd(info pebble.CompactionInfo) {
} else if d.activeComp == 0 {
panic("should not happen")
}

d.activeComp--
}

Expand Down Expand Up @@ -154,25 +147,33 @@ func New(file string, cache int, handles int, namespace string, readonly bool) (
// internal/arenaskl.node, DeferredBatchOp, and flushableBatchEntry.
// Taken from https://github.com/cockroachdb/pebble/blob/master/open.go#L38
maxMemTableSize := 4 << 30 // 4 GB
memTableSize := cache * 1024 * 1024 / 4

// Two memory tables is configured which is identical to leveldb,
// including a frozen memory table and another live one.
memTableLimit := 2
memTableSize := cache * 1024 * 1024 / 2 / memTableLimit
if memTableSize > maxMemTableSize {
memTableSize = maxMemTableSize
}

// Open the db and recover any potential corruptions
db, err := pebble.Open(file, &pebble.Options{
opt := &pebble.Options{
// Pebble has a single combined cache area and the write
// buffers are taken from this too. Assign all available
// memory allowance for cache.
Cache: pebble.NewCache(int64(cache * 1024 * 1024)),
MaxOpenFiles: handles,

// The size of memory table(as well as the write buffer).
// Note, there may have more than two memory tables in the system.
// MemTableStopWritesThreshold can be configured to avoid the memory abuse.
MemTableSize: memTableSize,

// MemTableStopWritesThreshold places a hard limit on the size
// of the existent MemTables(including the frozen one).
MemTableStopWritesThreshold: memTableLimit * memTableSize,

// The default compaction concurrency(1 thread),
// Here use all available CPUs for faster compaction.
MaxConcurrentCompactions: func() int { return runtime.NumCPU() },

// Per-level options. Options for at least one level must be specified. The
// options for the last level are used for all subsequent levels.
Levels: []pebble.LevelOptions{
Expand All @@ -186,7 +187,13 @@ func New(file string, cache int, handles int, namespace string, readonly bool) (
},
ReadOnly: readonly,
EventListener: eventListener,
})
}
// Disable seek compaction explicitly. Check https://github.com/ethereum/go-ethereum/pull/20130
// for more details.
opt.Experimental.ReadSamplingMultiplier = -1

// Open the db and recover any potential corruptions
db, err := pebble.Open(file, opt)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -218,24 +225,24 @@ func New(file string, cache int, handles int, namespace string, readonly bool) (

// Close stops the metrics collection, flushes any pending data to disk and closes
// all io accesses to the underlying key-value store.
func (db *Database) Close() error {
db.quitLock.Lock()
defer db.quitLock.Unlock()
func (d *Database) Close() error {
d.quitLock.Lock()
defer d.quitLock.Unlock()

if db.quitChan != nil {
if d.quitChan != nil {
errc := make(chan error)
db.quitChan <- errc
d.quitChan <- errc
if err := <-errc; err != nil {
db.log.Error("Metrics collection failed", "err", err)
d.log.Error("Metrics collection failed", "err", err)
}
db.quitChan = nil
d.quitChan = nil
}
return db.db.Close()
return d.db.Close()
}

// Has retrieves if a key is present in the key-value store.
func (db *Database) Has(key []byte) (bool, error) {
_, closer, err := db.db.Get(key)
func (d *Database) Has(key []byte) (bool, error) {
_, closer, err := d.db.Get(key)
if err == pebble.ErrNotFound {
return false, nil
} else if err != nil {
Expand All @@ -246,8 +253,8 @@ func (db *Database) Has(key []byte) (bool, error) {
}

// Get retrieves the given key if it's present in the key-value store.
func (db *Database) Get(key []byte) ([]byte, error) {
dat, closer, err := db.db.Get(key)
func (d *Database) Get(key []byte) ([]byte, error) {
dat, closer, err := d.db.Get(key)
if err != nil {
return nil, err
}
Expand All @@ -258,28 +265,28 @@ func (db *Database) Get(key []byte) ([]byte, error) {
}

// Put inserts the given value into the key-value store.
func (db *Database) Put(key []byte, value []byte) error {
return db.db.Set(key, value, pebble.NoSync)
func (d *Database) Put(key []byte, value []byte) error {
return d.db.Set(key, value, pebble.NoSync)
}

// Delete removes the key from the key-value store.
func (db *Database) Delete(key []byte) error {
return db.db.Delete(key, nil)
func (d *Database) Delete(key []byte) error {
return d.db.Delete(key, nil)
}

// NewBatch creates a write-only key-value store that buffers changes to its host
// database until a final write is called.
func (db *Database) NewBatch() ethdb.Batch {
func (d *Database) NewBatch() ethdb.Batch {
return &batch{
b: db.db.NewBatch(),
b: d.db.NewBatch(),
}
}

// NewBatchWithSize creates a write-only database batch with pre-allocated buffer.
// TODO can't do this with pebble. Batches are allocated in a pool so maybe this doesn't matter?
func (db *Database) NewBatchWithSize(_ int) ethdb.Batch {
func (d *Database) NewBatchWithSize(_ int) ethdb.Batch {
return &batch{
b: db.db.NewBatch(),
b: d.db.NewBatch(),
}
}

Expand Down Expand Up @@ -327,17 +334,17 @@ func (snap *snapshot) Release() {
// happened on the database.
// Note don't forget to release the snapshot once it's used up, otherwise
// the stale data will never be cleaned up by the underlying compactor.
func (db *Database) NewSnapshot() (ethdb.Snapshot, error) {
snap := db.db.NewSnapshot()
func (d *Database) NewSnapshot() (ethdb.Snapshot, error) {
snap := d.db.NewSnapshot()
return &snapshot{db: snap}, nil
}

// NewIterator creates a binary-alphabetical iterator over a subset
// of database content with a particular key prefix, starting at a particular
// initial key (or after, if it does not exist).
func (db *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator {
func (d *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator {
iterRange := bytesPrefixRange(prefix, start)
iter := db.db.NewIter(&pebble.IterOptions{
iter := d.db.NewIter(&pebble.IterOptions{
LowerBound: iterRange.Start,
UpperBound: iterRange.Limit,
})
Expand All @@ -346,7 +353,7 @@ func (db *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator {
}

// Stat returns a particular internal stat of the database.
func (db *Database) Stat(property string) (string, error) {
func (d *Database) Stat(property string) (string, error) {
return "", nil
}

Expand All @@ -357,18 +364,18 @@ func (db *Database) Stat(property string) (string, error) {
// A nil start is treated as a key before all keys in the data store; a nil limit
// is treated as a key after all keys in the data store. If both is nil then it
// will compact entire data store.
func (db *Database) Compact(start []byte, limit []byte) error {
return db.db.Compact(start, limit, false)
func (d *Database) Compact(start []byte, limit []byte) error {
return d.db.Compact(start, limit, false)
}

// Path returns the path to the database directory.
func (db *Database) Path() string {
return db.fn
func (d *Database) Path() string {
return d.fn
}

// meter periodically retrieves internal leveldb counters and reports them to
// the metrics subsystem.
func (db *Database) meter(refresh time.Duration) {
func (d *Database) meter(refresh time.Duration) {
var errc chan error
timer := time.NewTimer(refresh)
defer timer.Stop()
Expand All @@ -392,14 +399,12 @@ func (db *Database) meter(refresh time.Duration) {
nWrite int64
)

metrics := db.db.Metrics()

compTime := atomic.LoadInt64(&db.compTime)
writeDelayCount := atomic.LoadInt64(&db.writeDelayCount)
writeDelayTime := atomic.LoadInt64(&db.writeDelayTime)
seekCompCount := atomic.LoadInt64(&db.seekCompCount)
nonLevel0CompCount := int64(atomic.LoadUint32(&db.nonLevel0Comp))
level0CompCount := int64(atomic.LoadUint32(&db.level0Comp))
metrics := d.db.Metrics()
compTime := atomic.LoadInt64(&d.compTime)
writeDelayCount := atomic.LoadInt64(&d.writeDelayCount)
writeDelayTime := atomic.LoadInt64(&d.writeDelayTime)
nonLevel0CompCount := int64(atomic.LoadUint32(&d.nonLevel0Comp))
level0CompCount := int64(atomic.LoadUint32(&d.level0Comp))

writeDelayTimes[i%2] = writeDelayTime
writeDelayCounts[i%2] = writeDelayCount
Expand All @@ -418,41 +423,41 @@ func (db *Database) meter(refresh time.Duration) {
compReads[i%2] = compRead
nWrites[i%2] = nWrite

if db.writeDelayNMeter != nil {
db.writeDelayNMeter.Mark(writeDelayCounts[i%2] - writeDelayCounts[(i-1)%2])
if d.writeDelayNMeter != nil {
d.writeDelayNMeter.Mark(writeDelayCounts[i%2] - writeDelayCounts[(i-1)%2])
}
if db.writeDelayMeter != nil {
db.writeDelayMeter.Mark(writeDelayTimes[i%2] - writeDelayTimes[(i-1)%2])
if d.writeDelayMeter != nil {
d.writeDelayMeter.Mark(writeDelayTimes[i%2] - writeDelayTimes[(i-1)%2])
}
if db.compTimeMeter != nil {
db.compTimeMeter.Mark(compTimes[i%2] - compTimes[(i-1)%2])
if d.compTimeMeter != nil {
d.compTimeMeter.Mark(compTimes[i%2] - compTimes[(i-1)%2])
}
if db.compReadMeter != nil {
db.compReadMeter.Mark(compReads[i%2] - compReads[(i-1)%2])
if d.compReadMeter != nil {
d.compReadMeter.Mark(compReads[i%2] - compReads[(i-1)%2])
}
if db.compWriteMeter != nil {
db.compWriteMeter.Mark(compWrites[i%2] - compWrites[(i-1)%2])
if d.compWriteMeter != nil {
d.compWriteMeter.Mark(compWrites[i%2] - compWrites[(i-1)%2])
}
if db.diskSizeGauge != nil {
db.diskSizeGauge.Update(int64(metrics.DiskSpaceUsage()))
if d.diskSizeGauge != nil {
d.diskSizeGauge.Update(int64(metrics.DiskSpaceUsage()))
}
if db.diskReadMeter != nil {
db.diskReadMeter.Mark(0) // pebble doesn't track non-compaction reads
if d.diskReadMeter != nil {
d.diskReadMeter.Mark(0) // pebble doesn't track non-compaction reads
}
if db.diskWriteMeter != nil {
db.diskWriteMeter.Mark(nWrites[i%2] - nWrites[(i-1)%2])
if d.diskWriteMeter != nil {
d.diskWriteMeter.Mark(nWrites[i%2] - nWrites[(i-1)%2])
}
// See https://github.com/cockroachdb/pebble/pull/1628#pullrequestreview-1026664054
manuallyAllocated := metrics.BlockCache.Size + int64(metrics.MemTable.Size) + int64(metrics.MemTable.ZombieSize)
db.manualMemAllocGauge.Update(manuallyAllocated)
db.memCompGauge.Update(metrics.Flush.Count)
db.nonlevel0CompGauge.Update(nonLevel0CompCount)
db.level0CompGauge.Update(level0CompCount)
db.seekCompGauge.Update(seekCompCount)
d.manualMemAllocGauge.Update(manuallyAllocated)
d.memCompGauge.Update(metrics.Flush.Count)
d.nonlevel0CompGauge.Update(nonLevel0CompCount)
d.level0CompGauge.Update(level0CompCount)
d.seekCompGauge.Update(metrics.Compact.ReadCount)

// Sleep a bit, then repeat the stats collection
select {
case errc = <-db.quitChan:
case errc = <-d.quitChan:
// Quit requesting, stop hammering the database
case <-timer.C:
timer.Reset(refresh)
Expand All @@ -472,14 +477,14 @@ type batch struct {
// Put inserts the given value into the batch for later committing.
func (b *batch) Put(key, value []byte) error {
b.b.Set(key, value, nil)
b.size += len(value)
b.size += len(key) + len(value)
return nil
}

// Delete inserts the a key removal into the batch for later committing.
func (b *batch) Delete(key []byte) error {
b.b.Delete(key, nil)
b.size++
b.size += len(key)
return nil
}

Expand Down