Skip to content

Commit

Permalink
core/rawdb, ethdb: introduce batched/atomic reads from ancients (ethe…
Browse files Browse the repository at this point in the history
…reum#23566)

This PR adds a new accessor method to the freezer database. This new view offers a consistent interface, guaranteeing that all individual tables (headers, bodies etc) are all on the same number, and that this number is not changes (added/truncated) while the operation is performing.
  • Loading branch information
holiman authored and jagdeep sidhu committed Oct 25, 2021
1 parent d53a14d commit 39750c9
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 138 deletions.
199 changes: 76 additions & 123 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,15 @@ import (

// ReadCanonicalHash retrieves the hash assigned to a canonical block number.
func ReadCanonicalHash(db ethdb.Reader, number uint64) common.Hash {
data, _ := db.Ancient(freezerHashTable, number)
if len(data) == 0 {
data, _ = db.Get(headerHashKey(number))
// In the background freezer is moving data from leveldb to flatten files.
// So during the first check for ancient db, the data is not yet in there,
// but when we reach into leveldb, the data was already moved. That would
// result in a not found error.
var data []byte
db.ReadAncients(func(reader ethdb.AncientReader) error {
data, _ = reader.Ancient(freezerHashTable, number)
if len(data) == 0 {
data, _ = db.Ancient(freezerHashTable, number)
// Get it by hash from leveldb
data, _ = db.Get(headerHashKey(number))
}
}
if len(data) == 0 {
return common.Hash{}
}
return nil
})
return common.BytesToHash(data)
}

Expand Down Expand Up @@ -304,32 +299,25 @@ func WriteFastTxLookupLimit(db ethdb.KeyValueWriter, number uint64) {

// ReadHeaderRLP retrieves a block header in its raw RLP database encoding.
func ReadHeaderRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
// First try to look up the data in ancient database. Extra hash
// comparison is necessary since ancient database only maintains
// the canonical data.
data, _ := db.Ancient(freezerHeaderTable, number)
if len(data) > 0 && crypto.Keccak256Hash(data) == hash {
return data
}
// Then try to look up the data in leveldb.
data, _ = db.Get(headerKey(number, hash))
if len(data) > 0 {
return data
}
// In the background freezer is moving data from leveldb to flatten files.
// So during the first check for ancient db, the data is not yet in there,
// but when we reach into leveldb, the data was already moved. That would
// result in a not found error.
data, _ = db.Ancient(freezerHeaderTable, number)
if len(data) > 0 && crypto.Keccak256Hash(data) == hash {
return data
}
return nil // Can't find the data anywhere.
var data []byte
db.ReadAncients(func(reader ethdb.AncientReader) error {
// First try to look up the data in ancient database. Extra hash
// comparison is necessary since ancient database only maintains
// the canonical data.
data, _ = reader.Ancient(freezerHeaderTable, number)
if len(data) > 0 && crypto.Keccak256Hash(data) == hash {
return nil
}
// If not, try reading from leveldb
data, _ = db.Get(headerKey(number, hash))
return nil
})
return data
}

// HasHeader verifies the existence of a block header corresponding to the hash.
func HasHeader(db ethdb.Reader, hash common.Hash, number uint64) bool {
if has, err := db.Ancient(freezerHashTable, number); err == nil && common.BytesToHash(has) == hash {
if isCanon(db, number, hash) {
return true
}
if has, err := db.Has(headerKey(number, hash)); !has || err != nil {
Expand Down Expand Up @@ -389,53 +377,48 @@ func deleteHeaderWithoutNumber(db ethdb.KeyValueWriter, hash common.Hash, number
}
}

// isCanon is an internal utility method, to check whether the given number/hash
// is part of the ancient (canon) set.
func isCanon(reader ethdb.AncientReader, number uint64, hash common.Hash) bool {
h, err := reader.Ancient(freezerHashTable, number)
if err != nil {
return false
}
return bytes.Equal(h, hash[:])
}

// ReadBodyRLP retrieves the block body (transactions and uncles) in RLP encoding.
func ReadBodyRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
// First try to look up the data in ancient database. Extra hash
// comparison is necessary since ancient database only maintains
// the canonical data.
data, _ := db.Ancient(freezerBodiesTable, number)
if len(data) > 0 {
h, _ := db.Ancient(freezerHashTable, number)
if common.BytesToHash(h) == hash {
return data
}
}
// Then try to look up the data in leveldb.
data, _ = db.Get(blockBodyKey(number, hash))
if len(data) > 0 {
return data
}
// In the background freezer is moving data from leveldb to flatten files.
// So during the first check for ancient db, the data is not yet in there,
// but when we reach into leveldb, the data was already moved. That would
// result in a not found error.
data, _ = db.Ancient(freezerBodiesTable, number)
if len(data) > 0 {
h, _ := db.Ancient(freezerHashTable, number)
if common.BytesToHash(h) == hash {
return data
var data []byte
db.ReadAncients(func(reader ethdb.AncientReader) error {
// Check if the data is in ancients
if isCanon(reader, number, hash) {
data, _ = reader.Ancient(freezerBodiesTable, number)
return nil
}
}
return nil // Can't find the data anywhere.
// If not, try reading from leveldb
data, _ = db.Get(blockBodyKey(number, hash))
return nil
})
return data
}

// ReadCanonicalBodyRLP retrieves the block body (transactions and uncles) for the canonical
// block at number, in RLP encoding.
func ReadCanonicalBodyRLP(db ethdb.Reader, number uint64) rlp.RawValue {
// If it's an ancient one, we don't need the canonical hash
data, _ := db.Ancient(freezerBodiesTable, number)
if len(data) == 0 {
// Need to get the hash
data, _ = db.Get(blockBodyKey(number, ReadCanonicalHash(db, number)))
// In the background freezer is moving data from leveldb to flatten files.
// So during the first check for ancient db, the data is not yet in there,
// but when we reach into leveldb, the data was already moved. That would
// result in a not found error.
if len(data) == 0 {
data, _ = db.Ancient(freezerBodiesTable, number)
var data []byte
db.ReadAncients(func(reader ethdb.AncientReader) error {
data, _ = reader.Ancient(freezerBodiesTable, number)
if len(data) > 0 {
return nil
}
}
// Get it by hash from leveldb
data, _ = db.Get(blockBodyKey(number, ReadCanonicalHash(db, number)))
return nil
})
return data
}

Expand All @@ -448,7 +431,7 @@ func WriteBodyRLP(db ethdb.KeyValueWriter, hash common.Hash, number uint64, rlp

// HasBody verifies the existence of a block body corresponding to the hash.
func HasBody(db ethdb.Reader, hash common.Hash, number uint64) bool {
if has, err := db.Ancient(freezerHashTable, number); err == nil && common.BytesToHash(has) == hash {
if isCanon(db, number, hash) {
return true
}
if has, err := db.Has(blockBodyKey(number, hash)); !has || err != nil {
Expand Down Expand Up @@ -489,33 +472,18 @@ func DeleteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {

// ReadTdRLP retrieves a block's total difficulty corresponding to the hash in RLP encoding.
func ReadTdRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
// First try to look up the data in ancient database. Extra hash
// comparison is necessary since ancient database only maintains
// the canonical data.
data, _ := db.Ancient(freezerDifficultyTable, number)
if len(data) > 0 {
h, _ := db.Ancient(freezerHashTable, number)
if common.BytesToHash(h) == hash {
return data
}
}
// Then try to look up the data in leveldb.
data, _ = db.Get(headerTDKey(number, hash))
if len(data) > 0 {
return data
}
// In the background freezer is moving data from leveldb to flatten files.
// So during the first check for ancient db, the data is not yet in there,
// but when we reach into leveldb, the data was already moved. That would
// result in a not found error.
data, _ = db.Ancient(freezerDifficultyTable, number)
if len(data) > 0 {
h, _ := db.Ancient(freezerHashTable, number)
if common.BytesToHash(h) == hash {
return data
var data []byte
db.ReadAncients(func(reader ethdb.AncientReader) error {
// Check if the data is in ancients
if isCanon(reader, number, hash) {
data, _ = reader.Ancient(freezerDifficultyTable, number)
return nil
}
}
return nil // Can't find the data anywhere.
// If not, try reading from leveldb
data, _ = db.Get(headerTDKey(number, hash))
return nil
})
return data
}

// ReadTd retrieves a block's total difficulty corresponding to the hash.
Expand Down Expand Up @@ -553,7 +521,7 @@ func DeleteTd(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
// HasReceipts verifies the existence of all the transaction receipts belonging
// to a block.
func HasReceipts(db ethdb.Reader, hash common.Hash, number uint64) bool {
if has, err := db.Ancient(freezerHashTable, number); err == nil && common.BytesToHash(has) == hash {
if isCanon(db, number, hash) {
return true
}
if has, err := db.Has(blockReceiptsKey(number, hash)); !has || err != nil {
Expand All @@ -564,33 +532,18 @@ func HasReceipts(db ethdb.Reader, hash common.Hash, number uint64) bool {

// ReadReceiptsRLP retrieves all the transaction receipts belonging to a block in RLP encoding.
func ReadReceiptsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
// First try to look up the data in ancient database. Extra hash
// comparison is necessary since ancient database only maintains
// the canonical data.
data, _ := db.Ancient(freezerReceiptTable, number)
if len(data) > 0 {
h, _ := db.Ancient(freezerHashTable, number)
if common.BytesToHash(h) == hash {
return data
}
}
// Then try to look up the data in leveldb.
data, _ = db.Get(blockReceiptsKey(number, hash))
if len(data) > 0 {
return data
}
// In the background freezer is moving data from leveldb to flatten files.
// So during the first check for ancient db, the data is not yet in there,
// but when we reach into leveldb, the data was already moved. That would
// result in a not found error.
data, _ = db.Ancient(freezerReceiptTable, number)
if len(data) > 0 {
h, _ := db.Ancient(freezerHashTable, number)
if common.BytesToHash(h) == hash {
return data
var data []byte
db.ReadAncients(func(reader ethdb.AncientReader) error {
// Check if the data is in ancients
if isCanon(reader, number, hash) {
data, _ = reader.Ancient(freezerReceiptTable, number)
return nil
}
}
return nil // Can't find the data anywhere.
// If not, try reading from leveldb
data, _ = db.Get(blockReceiptsKey(number, hash))
return nil
})
return data
}

// ReadRawReceipts retrieves all the transaction receipts belonging to a block.
Expand Down
20 changes: 18 additions & 2 deletions core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ func (db *nofreezedb) Ancient(kind string, number uint64) ([]byte, error) {
return nil, errNotSupported
}

// ReadAncients returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) ReadAncients(kind string, start, max, maxByteSize uint64) ([][]byte, error) {
// AncientRange returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) AncientRange(kind string, start, max, maxByteSize uint64) ([][]byte, error) {
return nil, errNotSupported
}

Expand Down Expand Up @@ -119,6 +119,22 @@ func (db *nofreezedb) Sync() error {
return errNotSupported
}

func (db *nofreezedb) ReadAncients(fn func(reader ethdb.AncientReader) error) (err error) {
// Unlike other ancient-related methods, this method does not return
// errNotSupported when invoked.
// The reason for this is that the caller might want to do several things:
// 1. Check if something is in freezer,
// 2. If not, check leveldb.
//
// This will work, since the ancient-checks inside 'fn' will return errors,
// and the leveldb work will continue.
//
// If we instead were to return errNotSupported here, then the caller would
// have to explicitly check for that, having an extra clause to do the
// non-ancient operations.
return fn(db)
}

// NewDatabase creates a high level database on top of a given key-value data
// store without a freezer moving immutable chain segments into cold storage.
func NewDatabase(db ethdb.KeyValueStore) ethdb.Database {
Expand Down
21 changes: 15 additions & 6 deletions core/rawdb/freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ type freezer struct {
frozen uint64 // Number of blocks already frozen
threshold uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests)

// This lock synchronizes writers and the truncate operation.
writeLock sync.Mutex
// This lock synchronizes writers and the truncate operation, as well as
// the "atomic" (batched) read operations.
writeLock sync.RWMutex
writeBatch *freezerBatch

readonly bool
Expand Down Expand Up @@ -201,12 +202,12 @@ func (f *freezer) Ancient(kind string, number uint64) ([]byte, error) {
return nil, errUnknownTable
}

// ReadAncients retrieves multiple items in sequence, starting from the index 'start'.
// AncientRange retrieves multiple items in sequence, starting from the index 'start'.
// It will return
// - at most 'max' items,
// - at least 1 item (even if exceeding the maxByteSize), but will otherwise
// return as many items as fit into maxByteSize.
func (f *freezer) ReadAncients(kind string, start, count, maxBytes uint64) ([][]byte, error) {
func (f *freezer) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) {
if table := f.tables[kind]; table != nil {
return table.RetrieveItems(start, count, maxBytes)
}
Expand All @@ -222,15 +223,23 @@ func (f *freezer) Ancients() (uint64, error) {
func (f *freezer) AncientSize(kind string) (uint64, error) {
// This needs the write lock to avoid data races on table fields.
// Speed doesn't matter here, AncientSize is for debugging.
f.writeLock.Lock()
defer f.writeLock.Unlock()
f.writeLock.RLock()
defer f.writeLock.RUnlock()

if table := f.tables[kind]; table != nil {
return table.size()
}
return 0, errUnknownTable
}

// ReadAncients runs the given read operation while ensuring that no writes take place
// on the underlying freezer.
func (f *freezer) ReadAncients(fn func(ethdb.AncientReader) error) (err error) {
f.writeLock.RLock()
defer f.writeLock.RUnlock()
return fn(f)
}

// ModifyAncients runs the given write operation.
func (f *freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize int64, err error) {
if f.readonly {
Expand Down
10 changes: 7 additions & 3 deletions core/rawdb/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ func (t *table) Ancient(kind string, number uint64) ([]byte, error) {
return t.db.Ancient(kind, number)
}

// ReadAncients is a noop passthrough that just forwards the request to the underlying
// AncientRange is a noop passthrough that just forwards the request to the underlying
// database.
func (t *table) ReadAncients(kind string, start, count, maxBytes uint64) ([][]byte, error) {
return t.db.ReadAncients(kind, start, count, maxBytes)
func (t *table) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) {
return t.db.AncientRange(kind, start, count, maxBytes)
}

// Ancients is a noop passthrough that just forwards the request to the underlying
Expand All @@ -85,6 +85,10 @@ func (t *table) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (int64, erro
return t.db.ModifyAncients(fn)
}

func (t *table) ReadAncients(fn func(reader ethdb.AncientReader) error) (err error) {
return t.db.ReadAncients(fn)
}

// TruncateAncients is a noop passthrough that just forwards the request to the underlying
// database.
func (t *table) TruncateAncients(items uint64) error {
Expand Down
Loading

0 comments on commit 39750c9

Please sign in to comment.