Skip to content

Commit

Permalink
feat: add deadlocks metric
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 16, 2024
1 parent e1dbb19 commit cf1e25e
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 30 deletions.
11 changes: 10 additions & 1 deletion TODO
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
doc:
* technical endpoints
** some example around pprof?
** some example around pprof?

* graph metrics:
** process.runtime.go.mem.heap_alloc (Bytes of allocated heap objects)
** process.runtime.go.mem.heap_objects (Number of allocated heap objects)
** process.runtime.go.gc.count (Number of completed garbage collection cycles)
** process.runtime.go.gc.pause_total_ns (Cumulative nanoseconds in GC stop-the-world pauses since the program started)
** system.network.io (Bytes transferred attributed by direction (Transmit, Receive))
** system.memory.utilization (Memory utilization of this process attributed by memory state (Used, Available))
** process.cpu.time (Accumulated CPU time spent by this process attributed by state (User, System, ...))
4 changes: 2 additions & 2 deletions internal/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -702,8 +702,8 @@ type Move struct {
Account string `bun:"accounts_address,type:varchar"`
Amount *bunpaginate.BigInt `bun:"amount,type:numeric"`
Asset string `bun:"asset,type:varchar"`
InsertionDate time.Time `bun:"insertion_date,type:timestamp"`
EffectiveDate time.Time `bun:"effective_date,type:timestamp"`
InsertionDate time.Time `bun:"insertion_date,type:timestamp,nullzero"`
EffectiveDate time.Time `bun:"effective_date,type:timestamp,nullzero"`
PostCommitVolumes *Volumes `bun:"post_commit_volumes,type:jsonb"`
PostCommitEffectiveVolumes *Volumes `bun:"post_commit_effective_volumes,type:jsonb,scanonly"`
}
Expand Down
45 changes: 32 additions & 13 deletions internal/controller/ledger/controller_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,51 @@ type DefaultController struct {
ledger ledger.Ledger

tracer trace.Tracer
meter metric.Meter
meter metric.Meter

executeMachineHistogram metric.Int64Histogram
deadLockCounter metric.Int64Counter

createTransactionLp *logProcessor[RunScript, ledger.CreatedTransaction]
revertTransactionLp *logProcessor[RevertTransaction, ledger.RevertedTransaction]
saveTransactionMetadataLp *logProcessor[SaveTransactionMetadata, ledger.SavedMetadata]
saveAccountMetadataLp *logProcessor[SaveAccountMetadata, ledger.SavedMetadata]
deleteTransactionMetadata *logProcessor[DeleteTransactionMetadata, ledger.DeletedMetadata]
deleteAccountMetadata *logProcessor[DeleteAccountMetadata, ledger.DeletedMetadata]
}

func NewDefaultController(
ledger ledger.Ledger,
l ledger.Ledger,
store Store,
machineFactory MachineFactory,
opts ...DefaultControllerOption,
) *DefaultController {
ret := &DefaultController{
store: store,
ledger: ledger,
ledger: l,
machineFactory: machineFactory,
}

for _, opt := range append(defaultOptions, opts...) {
opt(ret)
}

histogram, err := ret.meter.Int64Histogram("numscript.run")
var err error
ret.executeMachineHistogram, err = ret.meter.Int64Histogram("numscript.run")
if err != nil {
panic(err)
}
ret.deadLockCounter, err = ret.meter.Int64Counter("deadlocks")
if err != nil {
return nil
panic(err)
}

ret.executeMachineHistogram = histogram
ret.createTransactionLp = newLogProcessor[RunScript, ledger.CreatedTransaction]("CreateTransaction", ret.deadLockCounter)
ret.revertTransactionLp = newLogProcessor[RevertTransaction, ledger.RevertedTransaction]("RevertTransaction", ret.deadLockCounter)
ret.saveTransactionMetadataLp = newLogProcessor[SaveTransactionMetadata, ledger.SavedMetadata]("SaveTransactionMetadata", ret.deadLockCounter)
ret.saveAccountMetadataLp = newLogProcessor[SaveAccountMetadata, ledger.SavedMetadata]("SaveAccountMetadata", ret.deadLockCounter)
ret.deleteTransactionMetadata = newLogProcessor[DeleteTransactionMetadata, ledger.DeletedMetadata]("DeleteTransactionMetadata", ret.deadLockCounter)
ret.deleteAccountMetadata = newLogProcessor[DeleteAccountMetadata, ledger.DeletedMetadata]("DeleteAccountMetadata", ret.deadLockCounter)

return ret
}
Expand Down Expand Up @@ -236,7 +255,7 @@ func (ctrl *DefaultController) CreateTransaction(ctx context.Context, parameters
return nil, fmt.Errorf("failed to compile script: %w", err)
}

output, err := forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input RunScript) (*ledger.CreatedTransaction, error) {
output, err := ctrl.createTransactionLp.forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input RunScript) (*ledger.CreatedTransaction, error) {
result, err := tracing.TraceWithMetric(
ctx,
"ExecuteMachine",
Expand Down Expand Up @@ -294,7 +313,7 @@ func (ctrl *DefaultController) CreateTransaction(ctx context.Context, parameters
}

func (ctrl *DefaultController) RevertTransaction(ctx context.Context, parameters Parameters[RevertTransaction]) (*ledger.RevertedTransaction, error) {
return forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input RevertTransaction) (*ledger.RevertedTransaction, error) {
return ctrl.revertTransactionLp.forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input RevertTransaction) (*ledger.RevertedTransaction, error) {

var (
hasBeenReverted bool
Expand Down Expand Up @@ -359,7 +378,7 @@ func (ctrl *DefaultController) RevertTransaction(ctx context.Context, parameters
}

func (ctrl *DefaultController) SaveTransactionMetadata(ctx context.Context, parameters Parameters[SaveTransactionMetadata]) error {
_, err := forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input SaveTransactionMetadata) (*ledger.SavedMetadata, error) {
_, err := ctrl.saveTransactionMetadataLp.forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input SaveTransactionMetadata) (*ledger.SavedMetadata, error) {
if _, _, err := sqlTX.UpdateTransactionMetadata(ctx, input.TransactionID, input.Metadata); err != nil {
return nil, err
}
Expand All @@ -374,7 +393,7 @@ func (ctrl *DefaultController) SaveTransactionMetadata(ctx context.Context, para
}

func (ctrl *DefaultController) SaveAccountMetadata(ctx context.Context, parameters Parameters[SaveAccountMetadata]) error {
_, err := forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input SaveAccountMetadata) (*ledger.SavedMetadata, error) {
_, err := ctrl.saveAccountMetadataLp.forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input SaveAccountMetadata) (*ledger.SavedMetadata, error) {
if _, err := sqlTX.UpsertAccount(ctx, &ledger.Account{
Address: input.Address,
Metadata: input.Metadata,
Expand All @@ -393,7 +412,7 @@ func (ctrl *DefaultController) SaveAccountMetadata(ctx context.Context, paramete
}

func (ctrl *DefaultController) DeleteTransactionMetadata(ctx context.Context, parameters Parameters[DeleteTransactionMetadata]) error {
_, err := forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input DeleteTransactionMetadata) (*ledger.DeletedMetadata, error) {
_, err := ctrl.deleteTransactionMetadata.forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input DeleteTransactionMetadata) (*ledger.DeletedMetadata, error) {
_, modified, err := sqlTX.DeleteTransactionMetadata(ctx, input.TransactionID, input.Key)
if err != nil {
return nil, err
Expand All @@ -414,7 +433,7 @@ func (ctrl *DefaultController) DeleteTransactionMetadata(ctx context.Context, pa
}

func (ctrl *DefaultController) DeleteAccountMetadata(ctx context.Context, parameters Parameters[DeleteAccountMetadata]) error {
_, err := forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input DeleteAccountMetadata) (*ledger.DeletedMetadata, error) {
_, err := ctrl.deleteAccountMetadata.forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input DeleteAccountMetadata) (*ledger.DeletedMetadata, error) {
err := sqlTX.DeleteAccountMetadata(ctx, input.Address, input.Key)
if err != nil {
return nil, err
Expand Down Expand Up @@ -447,4 +466,4 @@ func WithTracer(tracer trace.Tracer) DefaultControllerOption {
return func(controller *DefaultController) {
controller.tracer = tracer
}
}
}
38 changes: 27 additions & 11 deletions internal/controller/ledger/log_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,23 @@ import (
"github.com/formancehq/go-libs/pointer"
ledger "github.com/formancehq/ledger/internal"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"math/rand"
"time"
)

func runTx[INPUT any, OUTPUT ledger.LogPayload](ctx context.Context, store Store, parameters Parameters[INPUT], fn func(ctx context.Context, sqlTX TX, input INPUT) (*OUTPUT, error)) (*OUTPUT, error) {
type logProcessor[INPUT any, OUTPUT ledger.LogPayload] struct {
deadLockCounter metric.Int64Counter
operation string
}

func newLogProcessor[INPUT any, OUTPUT ledger.LogPayload](operation string, deadlockCounter metric.Int64Counter) *logProcessor[INPUT, OUTPUT] {
return &logProcessor[INPUT, OUTPUT]{
operation: operation,
deadLockCounter: deadlockCounter,
}
}

func (lp *logProcessor[INPUT, OUTPUT]) runTx(ctx context.Context, store Store, parameters Parameters[INPUT], fn func(ctx context.Context, sqlTX TX, input INPUT) (*OUTPUT, error)) (*OUTPUT, error) {
var payload *OUTPUT
err := store.WithTX(ctx, nil, func(tx TX) (commit bool, err error) {
payload, err = fn(ctx, tx, parameters.Input)
Expand All @@ -40,10 +51,14 @@ func runTx[INPUT any, OUTPUT ledger.LogPayload](ctx context.Context, store Store
return payload, err
}

// todo: metrics, add deadlocks
func forgeLog[INPUT any, OUTPUT ledger.LogPayload](ctx context.Context, store Store, parameters Parameters[INPUT], fn func(ctx context.Context, sqlTX TX, input INPUT) (*OUTPUT, error)) (*OUTPUT, error) {
func (lp *logProcessor[INPUT, OUTPUT]) forgeLog(
ctx context.Context,
store Store,
parameters Parameters[INPUT],
fn func(ctx context.Context, sqlTX TX, input INPUT) (*OUTPUT, error),
) (*OUTPUT, error) {
if parameters.IdempotencyKey != "" {
output, err := fetchLogWithIK[INPUT, OUTPUT](ctx, store, parameters)
output, err := lp.fetchLogWithIK(ctx, store, parameters)
if err != nil {
return nil, err
}
Expand All @@ -53,18 +68,19 @@ func forgeLog[INPUT any, OUTPUT ledger.LogPayload](ctx context.Context, store St
}

for {
output, err := runTx(ctx, store, parameters, fn)
output, err := lp.runTx(ctx, store, parameters, fn)
if err != nil {
switch {
case errors.Is(err, postgres.ErrDeadlockDetected):
trace.SpanFromContext(ctx).SetAttributes(attribute.Bool("deadlock", true))
logging.FromContext(ctx).Info("deadlock detected, retrying...")
// todo: keep ? / set configurable?
<-time.After(time.Duration(rand.Intn(100)) * time.Millisecond)
lp.deadLockCounter.Add(ctx, 1, metric.WithAttributes(
attribute.String("operation", lp.operation),
))
continue
// A log with the IK could have been inserted in the meantime, read again the database to retrieve it
case errors.Is(err, ErrIdempotencyKeyConflict{}):
output, err := fetchLogWithIK[INPUT, OUTPUT](ctx, store, parameters)
output, err := lp.fetchLogWithIK(ctx, store, parameters)
if err != nil {
return nil, err
}
Expand All @@ -82,7 +98,7 @@ func forgeLog[INPUT any, OUTPUT ledger.LogPayload](ctx context.Context, store St
}
}

func fetchLogWithIK[INPUT any, OUTPUT ledger.LogPayload](ctx context.Context, store Store, parameters Parameters[INPUT]) (*OUTPUT, error) {
func (lp *logProcessor[INPUT, OUTPUT]) fetchLogWithIK(ctx context.Context, store Store, parameters Parameters[INPUT]) (*OUTPUT, error) {
log, err := store.ReadLogWithIdempotencyKey(ctx, parameters.IdempotencyKey)
if err != nil && !errors.Is(err, postgres.ErrNotFound) {
return nil, err
Expand Down
7 changes: 5 additions & 2 deletions internal/controller/ledger/log_process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/formancehq/go-libs/platform/postgres"
ledger "github.com/formancehq/ledger/internal"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/metric/noop"
"go.uber.org/mock/gomock"
"testing"
)
Expand All @@ -30,7 +31,8 @@ func TestForgeLogWithIKConflict(t *testing.T) {
Data: ledger.CreatedTransaction{},
}, nil)

_, err := forgeLog[RunScript, ledger.CreatedTransaction](ctx, store, Parameters[RunScript]{
lp := newLogProcessor[RunScript, ledger.CreatedTransaction]("foo", noop.Int64Counter{})
_, err := lp.forgeLog(ctx, store, Parameters[RunScript]{
IdempotencyKey: "foo",
}, nil)
require.NoError(t, err)
Expand All @@ -53,6 +55,7 @@ func TestForgeLogWithDeadlock(t *testing.T) {
WithTX(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil)

_, err := forgeLog[RunScript, ledger.CreatedTransaction](ctx, store, Parameters[RunScript]{}, nil)
lp := newLogProcessor[RunScript, ledger.CreatedTransaction]("foo", noop.Int64Counter{})
_, err := lp.forgeLog(ctx, store, Parameters[RunScript]{}, nil)
require.NoError(t, err)
}
3 changes: 2 additions & 1 deletion test/performance/env_testserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ func (f *TestServerEnvFactory) Create(ctx context.Context, b *testing.B, ledger
Output: output,
OTLPConfig: &testserver.OTLPConfig{
Metrics: &otlpmetrics.ModuleConfig{
Exporter: "memory",
Exporter: "memory",
RuntimeMetrics: true,
},
},
})
Expand Down

0 comments on commit cf1e25e

Please sign in to comment.