Skip to content

Commit

Permalink
rebase fix
Browse files Browse the repository at this point in the history
Signed-off-by: Angelo De Caro <[email protected]>
  • Loading branch information
adecaro committed Nov 5, 2024
1 parent 120fb7a commit ae5cc81
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 60 deletions.
16 changes: 0 additions & 16 deletions pkg/runner/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,9 @@ import (
"sync/atomic"
"time"

"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
"github.com/pkg/errors"
)

var logger = flogging.MustGetLogger("batch-executor")

type BatchExecutor[I any, O any] interface {
Execute(input I) (O, error)
}

type BatchRunner[V any] interface {
Run(v V) error
}

type Output[O any] struct {
Val O
Err error
}

type batcher[I any, O any] struct {
idx uint32
inputs []chan I
Expand Down
47 changes: 3 additions & 44 deletions platform/common/core/generic/vault/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections"
dbdriver "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
)
Expand Down Expand Up @@ -68,18 +67,6 @@ type (
QueryExecutor = dbdriver.QueryExecutor
)

type txCommitIndex struct {
ctx context.Context
txID driver.TxID
block driver.BlockNum
indexInBloc driver.TxNum
}

type commitInput struct {
txCommitIndex
rws *ReadWriteSet
}

type VersionBuilder interface {
VersionedValues(rws *ReadWriteSet, ns driver.Namespace, writes NamespaceWrites, block driver.BlockNum, indexInBloc driver.TxNum) (map[driver.PKey]VersionedValue, error)
VersionedMetaValues(rws *ReadWriteSet, ns driver.Namespace, writes KeyedMetaWrites, block driver.BlockNum, indexInBloc driver.TxNum) (map[driver.PKey]driver.VersionedMetadataValue, error)
Expand Down Expand Up @@ -306,15 +293,6 @@ func (db *Vault[V]) commitRWs(inputs ...commitInput) error {
return errors.Wrapf(err, "begin update in store for txid %v failed", inputs)
}

for _, input := range inputs {
span := trace.SpanFromContext(input.ctx)

span.AddEvent("set_tx_busy")
if err := db.txIDStore.Set(input.txID, db.vcProvider.Busy(), ""); err != nil {
if !errors.HasCause(err, UniqueKeyViolation) {
return err
}
}
if _, err := db.setStatuses(inputs, db.vcProvider.Busy()); err != nil {
return err
}
Expand All @@ -335,15 +313,6 @@ func (db *Vault[V]) commitRWs(inputs ...commitInput) error {
}
}

db.logger.Debugf("parse writes [%s]", input.txID)
span.AddEvent("store_writes")
if discarded, err := db.storeWrites(input.ctx, input.rws.Writes, input.block, input.indexInBloc); err != nil {
return errors.Wrapf(err, "failed storing writes")
} else if discarded {
db.logger.Infof("Discarded changes while storing writes as duplicates. Skipping...")
db.txIDStore.Invalidate(input.txID)
return nil
}
if errs := db.storeAllWrites(writes); len(errs) == 0 {
db.logger.Debugf("Successfully stored writes for %d namespaces", len(writes))
} else if discarded, err := db.discard("", 0, 0, errs); err != nil {
Expand All @@ -356,15 +325,6 @@ func (db *Vault[V]) commitRWs(inputs ...commitInput) error {
return nil
}

db.logger.Debugf("parse meta writes [%s]", input.txID)
span.AddEvent("store_meta_writes")
if discarded, err := db.storeMetaWrites(input.ctx, input.rws.MetaWrites, input.block, input.indexInBloc); err != nil {
return errors.Wrapf(err, "failed storing meta writes")
} else if discarded {
db.logger.Infof("Discarded changes while storing meta writes as duplicates. Skipping...")
db.txIDStore.Invalidate(input.txID)
return nil
}
db.logger.Debugf("parse meta writes")
metaWrites := make(map[driver.Namespace]map[driver.PKey]driver.VersionedMetadataValue)
for _, input := range inputs {
Expand Down Expand Up @@ -408,10 +368,9 @@ func (db *Vault[V]) commitRWs(inputs ...commitInput) error {
return nil
}

for _, input := range inputs {
trace.SpanFromContext(input.ctx).AddEvent("commit_update")
}
span.AddEvent("commit_update")
for _, input := range inputs {
trace.SpanFromContext(input.ctx).AddEvent("commit_update")
}
if err := db.store.Commit(); err != nil {
return errors.Wrapf(err, "committing tx for txid in store [%v] failed", inputs)
}
Expand Down

0 comments on commit ae5cc81

Please sign in to comment.