Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core, les: implement general atomic db #5

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1570,7 +1570,7 @@ func RegisterEthService(stack *node.Node, cfg *eth.Config) {
err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
fullNode, err := eth.New(ctx, cfg)
if fullNode != nil && cfg.LightServ > 0 {
ls, _ := les.NewLesServer(fullNode, cfg)
ls, _ := les.NewLesServer(ctx, fullNode, cfg)
fullNode.AddLesServer(ls)
}
return fullNode, err
Expand Down
200 changes: 200 additions & 0 deletions core/rawdb/atomic_database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// Copyright 2020 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package rawdb

import (
"sync"

"github.com/ethereum/go-ethereum/ethdb"
)

// AtomicDatabase is a wrapper of underlying ethdb.Dababase but offers more
// additional atomicity guarantee.
//
// The typical usage of atomic database is: the db handler is used by different
// modules but atomicity guarantee is required when update some data across the
// modules. In this case, call `OpenTransaction` to switch db to atomic mode, do
// whatever operation needed and call `CloseTransaction` to commit the change.
type AtomicDatabase struct {
db ethdb.Database
lock sync.Mutex
ref int
batch ethdb.Batch // It's not thread safe!!
}

func NewAtomicDatabase(db ethdb.Database) ethdb.Database {
return &AtomicDatabase{db: db}
}

// OpenTransaction creates an underlying batch to contain all
// following changes. Commit the previous batch if it's not nil.
func (db *AtomicDatabase) OpenTransaction() {
db.lock.Lock()
defer db.lock.Unlock()

// If the atomic mode is already activated, increase
// the batch reference.
if db.batch != nil {
db.ref += 1
return
}
db.ref, db.batch = 1, db.db.NewBatch()
}

// CloseTransaction commits all changes in the current batch and
// finishes the atomic db updating.
func (db *AtomicDatabase) CloseTransaction() {
db.lock.Lock()
defer db.lock.Unlock()

if db.batch == nil {
return
}
db.ref -= 1
if db.ref != 0 {
return
}
db.batch.Write()
db.batch = nil
}

// Close implements the Database interface and closes the underlying database.
func (db *AtomicDatabase) Close() error {
return db.db.Close()
}

// Has is a noop passthrough that just forwards the request to the underlying
// database.
func (db *AtomicDatabase) Has(key []byte) (bool, error) {
return db.db.Has(key)
}

// Get is a noop passthrough that just forwards the request to the underlying
// database.
func (db *AtomicDatabase) Get(key []byte) ([]byte, error) {
return db.db.Get(key)
}

// HasAncient is a noop passthrough that just forwards the request to the underlying
// database.
func (db *AtomicDatabase) HasAncient(kind string, number uint64) (bool, error) {
return db.db.HasAncient(kind, number)
}

// Ancient is a noop passthrough that just forwards the request to the underlying
// database.
func (db *AtomicDatabase) Ancient(kind string, number uint64) ([]byte, error) {
return db.db.Ancient(kind, number)
}

// Ancients is a noop passthrough that just forwards the request to the underlying
// database.
func (db *AtomicDatabase) Ancients() (uint64, error) {
return db.db.Ancients()
}

// AncientSize is a noop passthrough that just forwards the request to the underlying
// database.
func (db *AtomicDatabase) AncientSize(kind string) (uint64, error) {
return db.db.AncientSize(kind)
}

// AppendAncient is a noop passthrough that just forwards the request to the underlying
// database.
func (db *AtomicDatabase) AppendAncient(number uint64, hash, header, body, receipts, td []byte) error {
return db.db.AppendAncient(number, hash, header, body, receipts, td)
}

// TruncateAncients is a noop passthrough that just forwards the request to the underlying
// database.
func (db *AtomicDatabase) TruncateAncients(items uint64) error {
return db.db.TruncateAncients(items)
}

// Sync is a noop passthrough that just forwards the request to the underlying
// database.
func (db *AtomicDatabase) Sync() error {
return db.db.Sync()
}

// Put inserts the given value into the database. If the transaction
// is opened, put the value in batch, otherwise just forwards the
// request to the underlying database.
func (db *AtomicDatabase) Put(key []byte, value []byte) error {
db.lock.Lock()
defer db.lock.Unlock()

if db.batch != nil {
return db.batch.Put(key, value)
}
return db.db.Put(key, value)
}

// Delete removes the specified entry from the database. If the transaction
// is opened, put the deletion in batch, otherwise just forwards the request
// to the underlying database.
func (db *AtomicDatabase) Delete(key []byte) error {
db.lock.Lock()
defer db.lock.Unlock()

if db.batch != nil {
return db.batch.Delete(key)
}
return db.db.Delete(key)
}

// NewIterator creates a binary-alphabetical iterator over the entire keyspace
// contained within the database.
func (db *AtomicDatabase) NewIterator() ethdb.Iterator {
return db.NewIteratorWithPrefix(nil)
}

// NewIteratorWithStart creates a binary-alphabetical iterator over a subset of
// database content starting at a particular initial key (or after, if it does
// not exist).
func (db *AtomicDatabase) NewIteratorWithStart(start []byte) ethdb.Iterator {
return db.db.NewIteratorWithStart(start)
}

// NewIteratorWithPrefix creates a binary-alphabetical iterator over a subset
// of database content with a particular key prefix.
func (db *AtomicDatabase) NewIteratorWithPrefix(prefix []byte) ethdb.Iterator {
return db.db.NewIteratorWithPrefix(prefix)
}

// Stat returns a particular internal stat of the database.
func (db *AtomicDatabase) Stat(property string) (string, error) {
return db.db.Stat(property)
}

// Compact flattens the underlying data store for the given key range. In essence,
// deleted and overwritten versions are discarded, and the data is rearranged to
// reduce the cost of operations needed to access them.
//
// A nil start is treated as a key before all keys in the data store; a nil limit
// is treated as a key after all keys in the data store. If both is nil then it
// will compact entire data store.
func (db *AtomicDatabase) Compact(start []byte, limit []byte) error {
return db.db.Compact(start, limit)
}

// NewBatch creates a write-only database that buffers changes to its host db
// until a final write is called, each operation prefixing all keys with the
// pre-configured string.
func (db *AtomicDatabase) NewBatch() ethdb.Batch {
return db.db.NewBatch()
}
2 changes: 1 addition & 1 deletion les/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func newLesServerService(ctx *adapters.ServiceContext) (node.Service, error) {
if err != nil {
return nil, err
}
server, err := NewLesServer(ethereum, &config)
server, err := NewLesServer(ctx.NodeContext, ethereum, &config)
if err != nil {
return nil, err
}
Expand Down
88 changes: 25 additions & 63 deletions les/clientdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"bytes"
"encoding/binary"
"io"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -96,27 +95,23 @@ const (
)

var (
posBalancePrefix = []byte("pb:") // dbVersion(uint16 big endian) + posBalancePrefix + id -> positive balance
negBalancePrefix = []byte("nb:") // dbVersion(uint16 big endian) + negBalancePrefix + ip -> negative balance
curBalancePrefix = []byte("cb:") // dbVersion(uint16 big endian) + curBalancePrefix + id -> currency balance
paymentReceiverPrefix = []byte("pr:") // dbVersion(uint16 big endian) + paymentReceiverPrefix + id + "receiverName:" -> receiver namespace
expirationKey = []byte("expiration:") // dbVersion(uint16 big endian) + expirationKey -> posExp, negExp
posBalancePrefix = []byte("pb:") // dbVersion(uint16 big endian) + posBalancePrefix + id -> positive balance
negBalancePrefix = []byte("nb:") // dbVersion(uint16 big endian) + negBalancePrefix + ip -> negative balance
curBalancePrefix = []byte("cb:") // dbVersion(uint16 big endian) + curBalancePrefix + id -> currency balance
expirationKey = []byte("expiration:") // dbVersion(uint16 big endian) + expirationKey -> posExp, negExp
)

type atomicWriteLock struct {
released chan struct{}
batch ethdb.Batch
}

type nodeDB struct {
db ethdb.Database
cache *lru.Cache
clock mclock.Clock
closeCh chan struct{}
evictCallBack func(mclock.AbsTime, bool, tokenBalance) bool // Callback to determine whether the balance can be evicted.
idLockMutex sync.Mutex
idLocks map[string]atomicWriteLock
cleanupHook func() // Test hook used for testing
cleanupHook func() // Test hook used for testing

// Atomicity APIs, may nil there is no atomicity guarantee.
openTransactionFn func()
closeTransactionFn func()
}

func newNodeDB(db ethdb.Database, clock mclock.Clock) *nodeDB {
Expand All @@ -129,7 +124,13 @@ func newNodeDB(db ethdb.Database, clock mclock.Clock) *nodeDB {
cache: cache,
clock: clock,
closeCh: make(chan struct{}),
idLocks: make(map[string]atomicWriteLock),
}
// If it's a atomic database which can guarantee the atomicity
// when the db handler is shared by different modules, get the
// APIs.
if sdb, ok := db.(*rawdb.AtomicDatabase); ok {
ndb.openTransactionFn = sdb.OpenTransaction
ndb.closeTransactionFn = sdb.CloseTransaction
}
go ndb.expirer()
return ndb
Expand All @@ -139,43 +140,16 @@ func (db *nodeDB) close() {
close(db.closeCh)
}

func (db *nodeDB) atomicWriteLock(id []byte) ethdb.KeyValueWriter {
db.idLockMutex.Lock()
for {
ch := db.idLocks[string(id)].released
if ch == nil {
break
}
db.idLockMutex.Unlock()
<-ch
db.idLockMutex.Lock()
}
batch := db.db.NewBatch()
db.idLocks[string(id)] = atomicWriteLock{
released: make(chan struct{}),
batch: batch,
func (db *nodeDB) openTransaction() {
if db.openTransactionFn != nil {
db.openTransactionFn()
}
db.idLockMutex.Unlock()
return batch
}

func (db *nodeDB) atomicWriteUnlock(id []byte) {
db.idLockMutex.Lock()
awl := db.idLocks[string(id)]
awl.batch.Write()
close(awl.released)
delete(db.idLocks, string(id))
db.idLockMutex.Unlock()
}

func (db *nodeDB) writer(id []byte) ethdb.KeyValueWriter {
db.idLockMutex.Lock()
batch := db.idLocks[string(id)].batch
db.idLockMutex.Unlock()
if batch == nil {
return db.db
func (db *nodeDB) closeTransaction() {
if db.closeTransactionFn != nil {
db.closeTransactionFn()
}
return batch
}

func idKey(id []byte, neg bool) []byte {
Expand All @@ -186,10 +160,6 @@ func idKey(id []byte, neg bool) []byte {
return append(prefix, id...)
}

func receiverPrefix(id enode.ID, receiver string) []byte {
return append(append(paymentReceiverPrefix, id.Bytes()...), []byte(receiver+":")...)
}

func (db *nodeDB) getExpiration() (fixed64, fixed64) {
blob, err := db.db.Get(expirationKey)
if err != nil || len(blob) != 16 {
Expand Down Expand Up @@ -222,7 +192,7 @@ func (db *nodeDB) setCurrencyBalance(id enode.ID, b currencyBalance) {
if err != nil {
log.Crit("Failed to encode currency balance", "err", err)
}
db.writer(id.Bytes()).Put(append(curBalancePrefix, id.Bytes()...), enc)
db.db.Put(append(curBalancePrefix, id.Bytes()...), enc)
}

func (db *nodeDB) getOrNewBalance(id []byte, neg bool) tokenBalance {
Expand All @@ -249,21 +219,13 @@ func (db *nodeDB) setBalance(id []byte, neg bool, b tokenBalance) {
if err != nil {
log.Crit("Failed to encode positive balance", "err", err)
}
if neg {
db.db.Put(key, enc)
} else {
db.writer(id).Put(key, enc)
}
db.db.Put(key, enc)
db.cache.Add(string(key), b)
}

func (db *nodeDB) delBalance(id []byte, neg bool) {
key := idKey(id, neg)
if neg {
db.db.Delete(key)
} else {
db.writer(id).Delete(key)
}
db.db.Delete(key)
db.cache.Remove(string(key))
}

Expand Down
Loading