Skip to content

Commit

Permalink
Batch commit operations
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandros Filios <[email protected]>
  • Loading branch information
alexandrosfilios committed Sep 20, 2024
1 parent 6ac394e commit 82b3a26
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 77 deletions.
4 changes: 4 additions & 0 deletions pkg/runner/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ type batchRunner[V any] struct {
*batcher[V, error]
}

func NewSerialRunner[V any](runner func([]V) []error) BatchRunner[V] {
return NewBatchRunner(runner, 1, 1*time.Hour)
}

func NewBatchRunner[V any](runner func([]V) []error, capacity int, timeout time.Duration) BatchRunner[V] {
return &batchRunner[V]{batcher: newBatcher(runner, capacity, timeout)}
}
Expand Down
3 changes: 2 additions & 1 deletion platform/common/core/generic/vault/interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"testing"

"github.com/hyperledger-labs/fabric-smart-client/platform/common/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/assert"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
)
Expand Down Expand Up @@ -45,7 +46,7 @@ func (qe mockQE) Done() {
type mockTXIDStoreReader struct {
}

func (m mockTXIDStoreReader) Iterator(pos interface{}) (driver.TxIDIterator[int], error) {
func (m mockTXIDStoreReader) Iterator(interface{}) (collections.Iterator[*driver.ByNum[int]], error) {
panic("not implemented")
}
func (m mockTXIDStoreReader) Get(txID driver.TxID) (int, string, error) {
Expand Down
5 changes: 3 additions & 2 deletions platform/common/core/generic/vault/txidstore/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package txidstore

import (
"github.com/hyperledger-labs/fabric-smart-client/platform/common/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections"
)

type Logger interface {
Expand All @@ -30,7 +31,7 @@ type cache[V driver.ValidationCode] interface {
type txidStore[V driver.ValidationCode] interface {
Get(txID driver.TxID) (V, string, error)
Set(txID driver.TxID, code V, message string) error
Iterator(pos interface{}) (driver.TxIDIterator[V], error)
Iterator(pos interface{}) (collections.Iterator[*driver.ByNum[V]], error)
}

type CachedStore[V driver.ValidationCode] struct {
Expand Down Expand Up @@ -87,6 +88,6 @@ func (s *CachedStore[V]) Set(txID string, code V, message string) error {
return nil
}

func (s *CachedStore[V]) Iterator(pos interface{}) (driver.TxIDIterator[V], error) {
func (s *CachedStore[V]) Iterator(pos interface{}) (collections.Iterator[*driver.ByNum[V]], error) {
return s.backed.Iterator(pos)
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,12 @@ func (s *SimpleTXIDStore[V]) GetLastTxID() (driver.TxID, error) {
return string(v), nil
}

func (s *SimpleTXIDStore[V]) Iterator(pos interface{}) (driver.TxIDIterator[V], error) {
func (s *SimpleTXIDStore[V]) Iterator(pos interface{}) (collections.Iterator[*driver.ByNum[V]], error) {
var iterator collections.Iterator[*ByNum]
if ppos, ok := pos.(*driver.SeekSet); ok {
if len(ppos.TxIDs) == 0 {
return collections.NewEmptyIterator[*driver.ByNum[V]](), nil
}
keys := make([]string, len(ppos.TxIDs))
for i, txID := range ppos.TxIDs {
keys[i] = keyByTxID(txID)
Expand Down
190 changes: 127 additions & 63 deletions platform/common/core/generic/vault/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
"context"
errors2 "errors"
"sync"
"time"

"github.com/hyperledger-labs/fabric-smart-client/pkg/runner"
"github.com/hyperledger-labs/fabric-smart-client/pkg/utils/errors"
"github.com/hyperledger-labs/fabric-smart-client/platform/common/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections"
Expand All @@ -27,7 +29,7 @@ type Logger interface {
}

type TXIDStoreReader[V driver.ValidationCode] interface {
Iterator(pos interface{}) (driver.TxIDIterator[V], error)
Iterator(pos interface{}) (collections.Iterator[*driver.ByNum[V]], error)
Get(txID driver.TxID) (V, string, error)
}

Expand Down Expand Up @@ -91,6 +93,8 @@ type Vault[V driver.ValidationCode] struct {
newInterceptor NewInterceptorFunc[V]
populator Populator
metrics *Metrics

commitBatcher runner.BatchRunner[txCommitIndex]
}

// New returns a new instance of Vault
Expand All @@ -103,7 +107,7 @@ func New[V driver.ValidationCode](
populator Populator,
tracerProvider trace.TracerProvider,
) *Vault[V] {
return &Vault[V]{
v := &Vault[V]{
logger: logger,
Interceptors: make(map[driver.TxID]TxInterceptor),
store: store,
Expand All @@ -113,6 +117,8 @@ func New[V driver.ValidationCode](
populator: populator,
metrics: NewMetrics(tracerProvider),
}
v.commitBatcher = runner.NewSerialRunner(v.commitTXs)
return v
}

func (db *Vault[V]) NewQueryExecutor() (QueryExecutor, error) {
Expand Down Expand Up @@ -158,111 +164,169 @@ func (db *Vault[V]) DiscardTx(txID driver.TxID, message string) error {
}

func (db *Vault[V]) UnmapInterceptor(txID driver.TxID) (TxInterceptor, error) {
m, err := db.unmapInterceptors(txID)
if err != nil {
return nil, err
}
return m[txID], nil
}

func (db *Vault[V]) unmapInterceptors(txIDs ...driver.TxID) (map[driver.TxID]TxInterceptor, error) {
db.interceptorsLock.Lock()
defer db.interceptorsLock.Unlock()

i, in := db.Interceptors[txID]
result, notFound := collections.SubMap(db.Interceptors, txIDs...)

if !in {
vc, _, err := db.txIDStore.Get(txID)
vcs, err := db.txIDStore.Iterator(&driver.SeekSet{TxIDs: notFound})
if err != nil {
return nil, errors.Wrapf(err, "read-write set for txids [%v] could not be found", txIDs)
}

foundInStore := collections.NewSet[driver.TxID]()
for vc, err := vcs.Next(); vc != nil; vc, err = vcs.Next() {
if err != nil {
return nil, errors.Wrapf(err, "read-write set for txid %s could not be found", txID)
return nil, errors.Wrapf(err, "read-write set for txid %s could not be found", vc.TxID)
}
if vc == db.vcProvider.Unknown() {
return nil, errors.Errorf("read-write set for txid %s could not be found", txID)
if vc.Code == db.vcProvider.Unknown() {
return nil, errors.Errorf("read-write set for txid %s could not be found", vc.TxID)
}
return nil, nil
foundInStore.Add(vc.TxID)
}

if !i.IsClosed() {
return nil, errors.Errorf("attempted to retrieve read-write set for %s when done has not been called", txID)
if unknownTxIDs := collections.NewSet(notFound...).Minus(foundInStore); !unknownTxIDs.Empty() {
return nil, errors.Errorf("read-write set for txid %s could not be found", unknownTxIDs.ToSlice()[0])
}

for txID, i := range result {
if !i.IsClosed() {
return nil, errors.Errorf("attempted to retrieve read-write set for %s when done has not been called", txID)
}
delete(db.Interceptors, txID)
}

delete(db.Interceptors, txID)
return result, nil
}

return i, nil
type txCommitIndex struct {
ctx context.Context
txID driver.TxID
block driver.BlockNum
indexInBloc driver.TxNum
}

type commitInput struct {
txCommitIndex
rws *ReadWriteSet
}

func (db *Vault[V]) CommitTX(ctx context.Context, txID driver.TxID, block driver.BlockNum, indexInBloc driver.TxNum) error {
newCtx, span := db.metrics.Vault.Start(ctx, "commit")
defer span.End()
db.logger.Debugf("UnmapInterceptor [%s]", txID)
span.AddEvent("unmap_interceptor")
i, err := db.UnmapInterceptor(txID)
return db.commitBatcher.Run(txCommitIndex{
ctx: newCtx,
txID: txID,
block: block,
indexInBloc: indexInBloc,
})
}

func (db *Vault[V]) commitTXs(txs []txCommitIndex) []error {
txIDs := make([]driver.TxID, len(txs))
for i, tx := range txs {
txIDs[i] = tx.txID
}
db.logger.Debugf("UnmapInterceptors [%v]", txIDs)
interceptors, err := db.unmapInterceptors(txIDs...)
if err != nil {
return errors.Wrapf(err, "failed to unmap interceptor for [%s]", txID)
return collections.Repeat(errors.Wrapf(err, "failed to unmap interceptor for [%v]", txIDs), len(txs))
}
if len(interceptors) != len(txs) {
notFound := collections.Difference(txIDs, collections.Keys(interceptors))
errs := make([]error, len(notFound))
for i, txID := range notFound {
errs[i] = errors.Errorf("read-write set for txid %s could not be found", txID)
}
return errs
}
if i == nil {
return errors.Errorf("cannot find rwset for [%s]", txID)

inputs := make([]commitInput, len(txs))
for i, tx := range txs {
inputs[i] = commitInput{txCommitIndex: tx, rws: interceptors[tx.txID].RWs()}
}
rws := i.RWs()

db.logger.Debugf("[%s] commit in vault", txID)
for {
err := db.commitRWs(newCtx, txID, block, indexInBloc, rws)
err := db.commitRWs(inputs...)
if err == nil {
return nil
return collections.Repeat[error](nil, len(txs))
}
if !errors.HasCause(err, DeadlockDetected) {
span.RecordError(err)
// This should generate a panic
return err
return collections.Repeat(err, len(txs))
}
db.logger.Debugf("Deadlock detected. Retrying... [%v]", err)
}
}

func (db *Vault[V]) commitRWs(ctx context.Context, txID driver.TxID, block driver.BlockNum, indexInBloc driver.TxNum, rws *ReadWriteSet) error {
span := trace.SpanFromContext(ctx)
span.AddEvent("commit_rws")
db.logger.Debugf("get lock [%s][%d]", txID, db.counter.Load())
func (db *Vault[V]) commitRWs(inputs ...commitInput) error {
for _, input := range inputs {
trace.SpanFromContext(input.ctx).AddEvent("wait_store_lock")
}
db.storeLock.Lock()
defer db.storeLock.Unlock()

span.AddEvent("begin_update")
for _, input := range inputs {
trace.SpanFromContext(input.ctx).AddEvent("begin_update")
}
if err := db.store.BeginUpdate(); err != nil {
return errors.Wrapf(err, "begin update in store for txid '%s' failed", txID)
return errors.Wrapf(err, "begin update in store for txid %v failed", inputs)
}

span.AddEvent("set_tx_busy")
if err := db.txIDStore.Set(txID, db.vcProvider.Busy(), ""); err != nil {
if !errors.HasCause(err, UniqueKeyViolation) {
return err
for _, input := range inputs {
span := trace.SpanFromContext(input.ctx)

span.AddEvent("set_tx_busy")
if err := db.txIDStore.Set(input.txID, db.vcProvider.Busy(), ""); err != nil {
if !errors.HasCause(err, UniqueKeyViolation) {
return err
}
}
}

db.logger.Debugf("parse writes [%s]", txID)
span.AddEvent("store_writes")
if discarded, err := db.storeWrites(ctx, rws.Writes, block, indexInBloc); err != nil {
return errors.Wrapf(err, "failed storing writes")
} else if discarded {
db.logger.Infof("Discarded changes while storing writes as duplicates. Skipping...")
db.txIDStore.Invalidate(txID)
return nil
}
db.logger.Debugf("parse writes [%s]", input.txID)
span.AddEvent("store_writes")
if discarded, err := db.storeWrites(input.ctx, input.rws.Writes, input.block, input.indexInBloc); err != nil {
return errors.Wrapf(err, "failed storing writes")
} else if discarded {
db.logger.Infof("Discarded changes while storing writes as duplicates. Skipping...")
db.txIDStore.Invalidate(input.txID)
return nil
}

db.logger.Debugf("parse meta writes [%s]", txID)
span.AddEvent("store_meta_writes")
if discarded, err := db.storeMetaWrites(ctx, rws.MetaWrites, block, indexInBloc); err != nil {
return errors.Wrapf(err, "failed storing meta writes")
} else if discarded {
db.logger.Infof("Discarded changes while storing meta writes as duplicates. Skipping...")
db.txIDStore.Invalidate(txID)
return nil
}
db.logger.Debugf("parse meta writes [%s]", input.txID)
span.AddEvent("store_meta_writes")
if discarded, err := db.storeMetaWrites(input.ctx, input.rws.MetaWrites, input.block, input.indexInBloc); err != nil {
return errors.Wrapf(err, "failed storing meta writes")
} else if discarded {
db.logger.Infof("Discarded changes while storing meta writes as duplicates. Skipping...")
db.txIDStore.Invalidate(input.txID)
return nil
}

db.logger.Debugf("set state to valid [%s]", input.txID)
span.AddEvent("set_tx_valid")
if discarded, err := db.setTxValid(input.txID); err != nil {
return errors.Wrapf(err, "failed setting tx state to valid")
} else if discarded {
db.logger.Infof("Discarded changes while setting tx state to valid as duplicates. Skipping...")
return nil
}

db.logger.Debugf("set state to valid [%s]", txID)
span.AddEvent("set_tx_valid")
if discarded, err := db.setTxValid(txID); err != nil {
return errors.Wrapf(err, "failed setting tx state to valid")
} else if discarded {
db.logger.Infof("Discarded changes while setting tx state to valid as duplicates. Skipping...")
return nil
}

span.AddEvent("commit_update")
for _, input := range inputs {
trace.SpanFromContext(input.ctx).AddEvent("commit_update")
}
if err := db.store.Commit(); err != nil {
return errors.Wrapf(err, "committing tx for txid in store '%s' failed", txID)
return errors.Wrapf(err, "committing tx for txid in store [%v] failed", inputs)
}

return nil
Expand Down
5 changes: 0 additions & 5 deletions platform/common/driver/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,3 @@ type ByNum[V comparable] struct {
Code V
Message string
}

type TxIDIterator[V comparable] interface {
Next() (*ByNum[V], error)
Close()
}
13 changes: 13 additions & 0 deletions platform/common/utils/collections/maps.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,19 @@ func Keys[K comparable, V any](m map[K]V) []K {
return res
}

func SubMap[K comparable, V any](m map[K]V, ks ...K) (map[K]V, []K) {
found := make(map[K]V, len(ks))
notFound := make([]K, 0, len(ks))
for _, k := range ks {
if v, ok := m[k]; ok {
found[k] = v
} else {
notFound = append(notFound, k)
}
}
return found, notFound
}

func RepeatValue[K comparable, V any](keys []K, val V) map[K]V {
res := make(map[K]V, len(keys))
for _, k := range keys {
Expand Down
4 changes: 4 additions & 0 deletions platform/common/utils/collections/slices.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ func Remove[T comparable](items []T, toRemove T) ([]T, bool) {
return items, false
}

func Difference[V comparable](a, b []V) []V {
return NewSet(a...).Minus(NewSet(b...)).ToSlice()
}

func Intersection[V comparable](a, b []V) []V {
//if len(a) > len(b) {
// a, b = b, a
Expand Down
Loading

0 comments on commit 82b3a26

Please sign in to comment.