Skip to content

Commit

Permalink
add txn manager metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Jan 9, 2024
1 parent 65f3d54 commit 222569e
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 12 deletions.
53 changes: 45 additions & 8 deletions disperser/batcher/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,21 @@ type EncodingStreamerMetrics struct {
EncodedBlobs *prometheus.GaugeVec
}

type TxnManagerMetrics struct {
Latency prometheus.Summary
GasUsed prometheus.Gauge
SpeedUps prometheus.Gauge
}

type Metrics struct {
*EncodingStreamerMetrics
*TxnManagerMetrics

registry *prometheus.Registry

Blob *prometheus.CounterVec
Batch *prometheus.CounterVec
BatchProcLatency *prometheus.SummaryVec
GasUsed prometheus.Gauge
Attestation *prometheus.GaugeVec
BatchError *prometheus.CounterVec

Expand All @@ -67,8 +73,34 @@ func NewMetrics(httpPort string, logger common.Logger) *Metrics {
),
}

txnManagerMetrics := TxnManagerMetrics{
Latency: promauto.With(reg).NewSummary(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "txn_manager_latency_ms",
Help: "transaction confirmation latency summary in milliseconds",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001},
},
),
GasUsed: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "gas_used",
Help: "gas used for onchain batch confirmation",
},
),
SpeedUps: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "speed_ups",
Help: "number of times the gas price was increased",
},
),
}

metrics := &Metrics{
EncodingStreamerMetrics: &encodingStreamerMetrics,
TxnManagerMetrics: &txnManagerMetrics,
Blob: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Expand All @@ -94,13 +126,6 @@ func NewMetrics(httpPort string, logger common.Logger) *Metrics {
},
[]string{"stage"},
),
GasUsed: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "gas_used",
Help: "gas used for onchain batch confirmation",
},
),
Attestation: promauto.With(reg).NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Expand Down Expand Up @@ -181,3 +206,15 @@ func (e *EncodingStreamerMetrics) UpdateEncodedBlobs(count int, size uint64) {
e.EncodedBlobs.WithLabelValues("size").Set(float64(size))
e.EncodedBlobs.WithLabelValues("number").Set(float64(count))
}

func (t *TxnManagerMetrics) ObserveLatency(latencyMs float64) {
t.Latency.Observe(latencyMs)
}

func (t *TxnManagerMetrics) UpdateGasUsed(gasUsed uint64) {
t.GasUsed.Set(float64(gasUsed))
}

func (t *TxnManagerMetrics) UpdateSpeedUps(speedUps int) {
t.SpeedUps.Set(float64(speedUps))
}
9 changes: 8 additions & 1 deletion disperser/batcher/txn_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,20 @@ type txnManager struct {
receiptChan chan *ReceiptOrErr
queueSize int
txnRefreshInterval time.Duration
metrics *TxnManagerMetrics
}

var _ TxnManager = (*txnManager)(nil)

func NewTxnManager(ethClient common.EthClient, queueSize int, txnRefreshInterval time.Duration, logger common.Logger) TxnManager {
func NewTxnManager(ethClient common.EthClient, queueSize int, txnRefreshInterval time.Duration, logger common.Logger, metrics *TxnManagerMetrics) TxnManager {
return &txnManager{
ethClient: ethClient,
requestChan: make(chan *TxnRequest, queueSize),
logger: logger,
receiptChan: make(chan *ReceiptOrErr, queueSize),
queueSize: queueSize,
txnRefreshInterval: txnRefreshInterval,
metrics: metrics,
}
}

Expand Down Expand Up @@ -104,6 +106,8 @@ func (t *txnManager) Start(ctx context.Context) {
Err: nil,
}
}
t.metrics.ObserveLatency(time.Since(req.requestedAt).Seconds())
t.metrics.UpdateGasUsed(receipt.GasUsed)
}
}
}()
Expand Down Expand Up @@ -143,6 +147,7 @@ func (t *txnManager) ReceiptChan() chan *ReceiptOrErr {
// monitorTransaction monitors the transaction and resends it with a higher gas price if it is not mined without a timeout.
// It returns an error if the transaction fails to be sent for reasons other than timeouts.
func (t *txnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*types.Receipt, error) {
numSpeedUps := 0
for {
ctxWithTimeout, cancel := context.WithTimeout(ctx, t.txnRefreshInterval)
defer cancel()
Expand All @@ -154,6 +159,7 @@ func (t *txnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*
req.Tag,
)
if err == nil {
t.metrics.UpdateSpeedUps(numSpeedUps)
return receipt, nil
}

Expand All @@ -173,6 +179,7 @@ func (t *txnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*
t.logger.Error("failed to send txn", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "err", err)
continue
}
numSpeedUps++
} else {
t.logger.Error("transaction failed", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "err", err)
return nil, err
Expand Down
6 changes: 4 additions & 2 deletions disperser/batcher/txn_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ func TestProcessTransaction(t *testing.T) {
ethClient := &mock.MockEthClient{}
logger, err := logging.GetLogger(logging.DefaultCLIConfig())
assert.NoError(t, err)
txnManager := batcher.NewTxnManager(ethClient, 5, 48*time.Second, logger)
metrics := batcher.NewMetrics("9100", logger)
txnManager := batcher.NewTxnManager(ethClient, 5, 48*time.Second, logger, metrics.TxnManagerMetrics)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
txnManager.Start(ctx)
Expand Down Expand Up @@ -68,7 +69,8 @@ func TestReplaceGasFee(t *testing.T) {
ethClient := &mock.MockEthClient{}
logger, err := logging.GetLogger(logging.DefaultCLIConfig())
assert.NoError(t, err)
txnManager := batcher.NewTxnManager(ethClient, 5, 48*time.Second, logger)
metrics := batcher.NewMetrics("9100", logger)
txnManager := batcher.NewTxnManager(ethClient, 5, 48*time.Second, logger, metrics.TxnManagerMetrics)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
txnManager.Start(ctx)
Expand Down
2 changes: 1 addition & 1 deletion disperser/cmd/batcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func RunBatcher(ctx *cli.Context) error {
return err
}
finalizer := batcher.NewFinalizer(config.TimeoutConfig.ChainReadTimeout, config.BatcherConfig.FinalizerInterval, queue, client, rpcClient, config.BatcherConfig.MaxNumRetriesPerBlob, logger)
txnManager := batcher.NewTxnManager(client, 20, config.TimeoutConfig.ChainWriteTimeout, logger)
txnManager := batcher.NewTxnManager(client, 20, config.TimeoutConfig.ChainWriteTimeout, logger, metrics.TxnManagerMetrics)
batcher, err := batcher.NewBatcher(config.BatcherConfig, config.TimeoutConfig, queue, dispatcher, ics, asgn, encoderClient, agg, client, finalizer, tx, txnManager, logger, metrics)
if err != nil {
return err
Expand Down

0 comments on commit 222569e

Please sign in to comment.