Skip to content

Commit

Permalink
Added metrics for batcher
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandros Filios <[email protected]>
  • Loading branch information
alexandrosfilios committed Sep 23, 2024
1 parent 82b3a26 commit d8c305a
Show file tree
Hide file tree
Showing 18 changed files with 130 additions and 46 deletions.
2 changes: 1 addition & 1 deletion docs/fabric/fabricdev/core/fabricdev/channelprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (p *provider) NewChannel(nw driver.FabricNetworkService, channelName string
}

// Vault
vault, txIDStore, err := p.newVault(nw.ConfigService(), channelName, p.drivers, p.tracerProvider)
vault, txIDStore, err := p.newVault(nw.ConfigService(), channelName, p.drivers, p.metricsProvider, p.tracerProvider)
if err != nil {
return nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions docs/fabric/fabricdev/core/fabricdev/vault/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db"
dbdriver "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/trace"
)

var logger = flogging.MustGetLogger("fabric-sdk.core.vault")

func New(configService driver.ConfigService, channel string, drivers []dbdriver.NamedDriver, tracerProvider trace.TracerProvider) (*Vault, driver.TXIDStore, error) {
func New(configService driver.ConfigService, channel string, drivers []dbdriver.NamedDriver, metricsProvider metrics.Provider, tracerProvider trace.TracerProvider) (*Vault, driver.TXIDStore, error) {
var d dbdriver.Driver
for _, driver := range drivers {
if driver.Name == configService.VaultPersistenceType() {
Expand Down Expand Up @@ -51,10 +52,10 @@ func New(configService driver.ConfigService, channel string, drivers []dbdriver.
if txIDStoreCacheSize > 0 {
logger.Debugf("creating txID store second cache with size [%d]", txIDStoreCacheSize)
c := txidstore.NewCache(txidStore, secondcache.NewTyped[*txidstore.Entry](txIDStoreCacheSize), logger)
return NewVault(persistence, c, tracerProvider), c, nil
return NewVault(persistence, c, metricsProvider, tracerProvider), c, nil
} else {
logger.Debugf("txID store without cache selected")
c := txidstore.NewNoCache(txidStore)
return NewVault(persistence, c, tracerProvider), c, nil
return NewVault(persistence, c, metricsProvider, tracerProvider), c, nil
}
}
4 changes: 3 additions & 1 deletion docs/fabric/fabricdev/core/fabricdev/vault/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/common/driver"
fdriver "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics"
"go.opentelemetry.io/otel/trace"
)

Expand All @@ -23,14 +24,15 @@ type (
)

// NewVault returns a new instance of Vault
func NewVault(store vault.VersionedPersistence, txIDStore TXIDStore, tracerProvider trace.TracerProvider) *Vault {
func NewVault(store vault.VersionedPersistence, txIDStore TXIDStore, metricsProvider metrics.Provider, tracerProvider trace.TracerProvider) *Vault {
return vault.New[fdriver.ValidationCode](
flogging.MustGetLogger("fabric-sdk.generic.vault"),
store,
txIDStore,
&fdriver.ValidationCodeProvider{},
newInterceptor,
&populator{},
metricsProvider,
tracerProvider,
)
}
Expand Down
24 changes: 2 additions & 22 deletions pkg/runner/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,16 @@ import (
"sync/atomic"
"time"

"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
"github.com/pkg/errors"
)

var logger = flogging.MustGetLogger("batch-executor")

type BatchExecutor[I any, O any] interface {
Execute(input I) (O, error)
}

type BatchRunner[V any] interface {
Run(v V) error
}

type Output[O any] struct {
Val O
Err error
}

type batcher[I any, O any] struct {
idx uint32
inputs []chan I
outputs []chan O
locks []sync.Mutex
len uint32
executor func([]I) []O
executor ExecuteFunc[I, O]
timeout time.Duration
}

Expand Down Expand Up @@ -120,7 +104,7 @@ type batchExecutor[I any, O any] struct {
*batcher[I, Output[O]]
}

func NewBatchExecutor[I any, O any](executor func([]I) []Output[O], capacity int, timeout time.Duration) BatchExecutor[I, O] {
func NewBatchExecutor[I any, O any](executor ExecuteFunc[I, Output[O]], capacity int, timeout time.Duration) BatchExecutor[I, O] {
return &batchExecutor[I, O]{batcher: newBatcher(executor, capacity, timeout)}
}

Expand All @@ -133,10 +117,6 @@ type batchRunner[V any] struct {
*batcher[V, error]
}

func NewSerialRunner[V any](runner func([]V) []error) BatchRunner[V] {
return NewBatchRunner(runner, 1, 1*time.Hour)
}

func NewBatchRunner[V any](runner func([]V) []error, capacity int, timeout time.Duration) BatchRunner[V] {
return &batchRunner[V]{batcher: newBatcher(runner, capacity, timeout)}
}
Expand Down
26 changes: 26 additions & 0 deletions pkg/runner/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package runner

import "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"

var logger = flogging.MustGetLogger("batch-executor")

type BatchExecutor[I any, O any] interface {
Execute(input I) (O, error)
}

type BatchRunner[V any] interface {
Run(v V) error
}

type Output[O any] struct {
Val O
Err error
}

type ExecuteFunc[I any, O any] func([]I) []O
32 changes: 32 additions & 0 deletions pkg/runner/serial.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package runner

func NewSerialRunner[V any](runner ExecuteFunc[V, error]) BatchRunner[V] {
return &serialRunner[V]{executor: runner}
}

type serialRunner[V any] struct {
executor ExecuteFunc[V, error]
}

func (r *serialRunner[V]) Run(val V) error {
return r.executor([]V{val})[0]
}

func NewSerialExecutor[I any, O any](executor ExecuteFunc[I, Output[O]]) BatchExecutor[I, O] {
return &serialExecutor[I, O]{executor: executor}
}

type serialExecutor[I any, O any] struct {
executor ExecuteFunc[I, Output[O]]
}

func (r *serialExecutor[I, O]) Execute(input I) (O, error) {
res := r.executor([]I{input})[0]
return res.Val, res.Err
}
21 changes: 20 additions & 1 deletion platform/common/core/generic/vault/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,35 @@ SPDX-License-Identifier: Apache-2.0
package vault

import (
"time"

"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/tracing"
"go.opentelemetry.io/otel/trace"
)

type Metrics struct {
CommitDuration metrics.Histogram
BatchedCommitDuration metrics.Histogram

Vault trace.Tracer
}

func NewMetrics(p trace.TracerProvider) *Metrics {
func NewMetrics(m metrics.Provider, p trace.TracerProvider) *Metrics {
return &Metrics{
CommitDuration: m.NewHistogram(metrics.HistogramOpts{
Namespace: "vault",
Name: "commit",
Help: "Histogram for the duration of commit",
Buckets: utils.ExponentialBucketTimeRange(0, 5*time.Second, 15),
}),
BatchedCommitDuration: m.NewHistogram(metrics.HistogramOpts{
Namespace: "vault",
Name: "batched_commit",
Help: "Histogram for the duration of commit with the batching overhead",
Buckets: utils.ExponentialBucketTimeRange(0, 5*time.Second, 15),
}),
Vault: p.Tracer("vault", tracing.WithMetricsOpts(tracing.MetricsOpts{
Namespace: "coresdk",
LabelNames: []tracing.LabelName{},
Expand Down
12 changes: 10 additions & 2 deletions platform/common/core/generic/vault/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/common/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections"
dbdriver "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
)
Expand Down Expand Up @@ -105,6 +106,7 @@ func New[V driver.ValidationCode](
vcProvider driver.ValidationCodeProvider[V],
newInterceptor NewInterceptorFunc[V],
populator Populator,
metricsProvider metrics.Provider,
tracerProvider trace.TracerProvider,
) *Vault[V] {
v := &Vault[V]{
Expand All @@ -115,7 +117,7 @@ func New[V driver.ValidationCode](
vcProvider: vcProvider,
newInterceptor: newInterceptor,
populator: populator,
metrics: NewMetrics(tracerProvider),
metrics: NewMetrics(metricsProvider, tracerProvider),
}
v.commitBatcher = runner.NewSerialRunner(v.commitTXs)
return v
Expand Down Expand Up @@ -220,17 +222,22 @@ type commitInput struct {
}

func (db *Vault[V]) CommitTX(ctx context.Context, txID driver.TxID, block driver.BlockNum, indexInBloc driver.TxNum) error {
start := time.Now()
newCtx, span := db.metrics.Vault.Start(ctx, "commit")
defer span.End()
return db.commitBatcher.Run(txCommitIndex{
err := db.commitBatcher.Run(txCommitIndex{
ctx: newCtx,
txID: txID,
block: block,
indexInBloc: indexInBloc,
})
db.metrics.BatchedCommitDuration.Observe(time.Since(start).Seconds())
return err
}

func (db *Vault[V]) commitTXs(txs []txCommitIndex) []error {
db.logger.Debugf("Commit %d transactions", len(txs))
start := time.Now()
txIDs := make([]driver.TxID, len(txs))
for i, tx := range txs {
txIDs[i] = tx.txID
Expand All @@ -257,6 +264,7 @@ func (db *Vault[V]) commitTXs(txs []txCommitIndex) []error {
for {
err := db.commitRWs(inputs...)
if err == nil {
db.metrics.CommitDuration.Observe(time.Since(start).Seconds() / float64(len(txs)))
return collections.Repeat[error](nil, len(txs))
}
if !errors.HasCause(err, DeadlockDetected) {
Expand Down
3 changes: 3 additions & 0 deletions platform/common/core/generic/vault/vault_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/cache/secondcache"
db2 "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics/disabled"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/trace/noop"
"golang.org/x/exp/slices"
Expand All @@ -40,6 +41,7 @@ func (p *testArtifactProvider) NewCachedVault(ddb VersionedPersistence) (*Vault[
&VCProvider{},
newInterceptor,
&populator{},
&disabled.Provider{},
&noop.TracerProvider{},
), nil
}
Expand All @@ -56,6 +58,7 @@ func (p *testArtifactProvider) NewNonCachedVault(ddb VersionedPersistence) (*Vau
&VCProvider{},
newInterceptor,
&populator{},
&disabled.Provider{},
&noop.TracerProvider{},
), nil
}
Expand Down
4 changes: 2 additions & 2 deletions platform/fabric/core/generic/channelprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"go.opentelemetry.io/otel/trace"
)

type VaultConstructor = func(configService driver.ConfigService, channel string, drivers []driver2.NamedDriver, tracerProvider trace.TracerProvider) (*vault.Vault, driver.TXIDStore, error)
type VaultConstructor = func(configService driver.ConfigService, channel string, drivers []driver2.NamedDriver, metricsProvider metrics.Provider, tracerProvider trace.TracerProvider) (*vault.Vault, driver.TXIDStore, error)

type ChannelProvider interface {
NewChannel(nw driver.FabricNetworkService, name string, quiet bool) (driver.Channel, error)
Expand Down Expand Up @@ -88,7 +88,7 @@ func (p *provider) NewChannel(nw driver.FabricNetworkService, channelName string
}

// Vault
vault, txIDStore, err := p.newVault(nw.ConfigService(), channelName, p.drivers, p.tracerProvider)
vault, txIDStore, err := p.newVault(nw.ConfigService(), channelName, p.drivers, p.metricsProvider, p.tracerProvider)
if err != nil {
return nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions platform/fabric/core/generic/vault/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db"
driver2 "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/trace"
)

var logger = flogging.MustGetLogger("fabric-sdk.core.vault")

func New(configService driver.ConfigService, channel string, drivers []driver2.NamedDriver, tracerProvider trace.TracerProvider) (*Vault, driver.TXIDStore, error) {
func New(configService driver.ConfigService, channel string, drivers []driver2.NamedDriver, metricsProvider metrics.Provider, tracerProvider trace.TracerProvider) (*Vault, driver.TXIDStore, error) {
var d driver2.Driver
for _, driver := range drivers {
if driver.Name == configService.VaultPersistenceType() {
Expand Down Expand Up @@ -50,10 +51,10 @@ func New(configService driver.ConfigService, channel string, drivers []driver2.N
if txIDStoreCacheSize > 0 {
logger.Debugf("creating txID store second cache with size [%d]", txIDStoreCacheSize)
c := txidstore.NewCache(txidStore, secondcache.NewTyped[*txidstore.Entry](txIDStoreCacheSize), logger)
return NewVault(persistence, c, tracerProvider), c, nil
return NewVault(persistence, c, metricsProvider, tracerProvider), c, nil
} else {
logger.Debugf("txID store without cache selected")
c := txidstore.NewNoCache(txidStore)
return NewVault(persistence, c, tracerProvider), c, nil
return NewVault(persistence, c, metricsProvider, tracerProvider), c, nil
}
}
4 changes: 3 additions & 1 deletion platform/fabric/core/generic/vault/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections"
fdriver "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/flogging"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics"
"github.com/hyperledger/fabric-protos-go/ledger/rwset"
"github.com/hyperledger/fabric-protos-go/ledger/rwset/kvrwset"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
Expand All @@ -37,14 +38,15 @@ func NewTXIDStore(persistence txidstore.UnversionedPersistence) (*SimpleTXIDStor
}

// NewVault returns a new instance of Vault
func NewVault(store vault.VersionedPersistence, txIDStore TXIDStore, tracerProvider trace.TracerProvider) *Vault {
func NewVault(store vault.VersionedPersistence, txIDStore TXIDStore, metricsProvider metrics.Provider, tracerProvider trace.TracerProvider) *Vault {
return vault.New[fdriver.ValidationCode](
flogging.MustGetLogger("fabric-sdk.generic.vault"),
store,
txIDStore,
&fdriver.ValidationCodeProvider{},
newInterceptor,
&populator{},
metricsProvider,
tracerProvider,
)
}
Expand Down
5 changes: 3 additions & 2 deletions platform/fabric/core/generic/vault/vault_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/cache/secondcache"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db"
dbdriver "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics/disabled"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/trace/noop"
"golang.org/x/exp/slices"
Expand All @@ -31,15 +32,15 @@ func (p *artifactsProvider) NewCachedVault(ddb VersionedPersistence) (*Vault, er
if err != nil {
return nil, err
}
return NewVault(ddb, txidstore.NewCache(txidStore, secondcache.NewTyped[*txidstore.Entry](100), logger), &noop.TracerProvider{}), nil
return NewVault(ddb, txidstore.NewCache(txidStore, secondcache.NewTyped[*txidstore.Entry](100), logger), &disabled.Provider{}, &noop.TracerProvider{}), nil
}

func (p *artifactsProvider) NewNonCachedVault(ddb VersionedPersistence) (*Vault, error) {
txidStore, err := NewTXIDStore(db.Unversioned(ddb))
if err != nil {
return nil, err
}
return NewVault(ddb, txidstore.NewNoCache(txidStore), &noop.TracerProvider{}), nil
return NewVault(ddb, txidstore.NewNoCache(txidStore), &disabled.Provider{}, &noop.TracerProvider{}), nil
}

func (p *artifactsProvider) NewMarshaller() vault.Marshaller {
Expand Down
Loading

0 comments on commit d8c305a

Please sign in to comment.