diff --git a/TODO b/TODO index 54266bdd6..585f8e373 100644 --- a/TODO +++ b/TODO @@ -1,3 +1,12 @@ doc: * technical endpoints - ** some example around pprof? \ No newline at end of file + ** some example around pprof? + +* graph metrics: + ** process.runtime.go.mem.heap_alloc (Bytes of allocated heap objects) + ** process.runtime.go.mem.heap_objects (Number of allocated heap objects) + ** process.runtime.go.gc.count (Number of completed garbage collection cycles) + ** process.runtime.go.gc.pause_total_ns (Cumulative nanoseconds in GC stop-the-world pauses since the program started) + ** system.network.io (Bytes transferred attributed by direction (Transmit, Receive)) + ** system.memory.utilization (Memory utilization of this process attributed by memory state (Used, Available)) + ** process.cpu.time (Accumulated CPU time spent by this process attributed by state (User, System, ...)) \ No newline at end of file diff --git a/internal/README.md b/internal/README.md index e71f34dc7..cf66d7449 100644 --- a/internal/README.md +++ b/internal/README.md @@ -702,8 +702,8 @@ type Move struct { Account string `bun:"accounts_address,type:varchar"` Amount *bunpaginate.BigInt `bun:"amount,type:numeric"` Asset string `bun:"asset,type:varchar"` - InsertionDate time.Time `bun:"insertion_date,type:timestamp"` - EffectiveDate time.Time `bun:"effective_date,type:timestamp"` + InsertionDate time.Time `bun:"insertion_date,type:timestamp,nullzero"` + EffectiveDate time.Time `bun:"effective_date,type:timestamp,nullzero"` PostCommitVolumes *Volumes `bun:"post_commit_volumes,type:jsonb"` PostCommitEffectiveVolumes *Volumes `bun:"post_commit_effective_volumes,type:jsonb,scanonly"` } diff --git a/internal/controller/ledger/controller_default.go b/internal/controller/ledger/controller_default.go index 56d07c306..a723af15b 100644 --- a/internal/controller/ledger/controller_default.go +++ b/internal/controller/ledger/controller_default.go @@ -35,19 +35,28 @@ type DefaultController struct { ledger ledger.Ledger tracer trace.Tracer - meter metric.Meter + meter metric.Meter + executeMachineHistogram metric.Int64Histogram + deadLockCounter metric.Int64Counter + + createTransactionLp *logProcessor[RunScript, ledger.CreatedTransaction] + revertTransactionLp *logProcessor[RevertTransaction, ledger.RevertedTransaction] + saveTransactionMetadataLp *logProcessor[SaveTransactionMetadata, ledger.SavedMetadata] + saveAccountMetadataLp *logProcessor[SaveAccountMetadata, ledger.SavedMetadata] + deleteTransactionMetadata *logProcessor[DeleteTransactionMetadata, ledger.DeletedMetadata] + deleteAccountMetadata *logProcessor[DeleteAccountMetadata, ledger.DeletedMetadata] } func NewDefaultController( - ledger ledger.Ledger, + l ledger.Ledger, store Store, machineFactory MachineFactory, opts ...DefaultControllerOption, ) *DefaultController { ret := &DefaultController{ store: store, - ledger: ledger, + ledger: l, machineFactory: machineFactory, } @@ -55,12 +64,22 @@ func NewDefaultController( opt(ret) } - histogram, err := ret.meter.Int64Histogram("numscript.run") + var err error + ret.executeMachineHistogram, err = ret.meter.Int64Histogram("numscript.run") + if err != nil { + panic(err) + } + ret.deadLockCounter, err = ret.meter.Int64Counter("deadlocks") if err != nil { - return nil + panic(err) } - ret.executeMachineHistogram = histogram + ret.createTransactionLp = newLogProcessor[RunScript, ledger.CreatedTransaction]("CreateTransaction", ret.deadLockCounter) + ret.revertTransactionLp = newLogProcessor[RevertTransaction, ledger.RevertedTransaction]("RevertTransaction", ret.deadLockCounter) + ret.saveTransactionMetadataLp = newLogProcessor[SaveTransactionMetadata, ledger.SavedMetadata]("SaveTransactionMetadata", ret.deadLockCounter) + ret.saveAccountMetadataLp = newLogProcessor[SaveAccountMetadata, ledger.SavedMetadata]("SaveAccountMetadata", ret.deadLockCounter) + ret.deleteTransactionMetadata = newLogProcessor[DeleteTransactionMetadata, ledger.DeletedMetadata]("DeleteTransactionMetadata", ret.deadLockCounter) + ret.deleteAccountMetadata = newLogProcessor[DeleteAccountMetadata, ledger.DeletedMetadata]("DeleteAccountMetadata", ret.deadLockCounter) return ret } @@ -236,7 +255,7 @@ func (ctrl *DefaultController) CreateTransaction(ctx context.Context, parameters return nil, fmt.Errorf("failed to compile script: %w", err) } - output, err := forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input RunScript) (*ledger.CreatedTransaction, error) { + output, err := ctrl.createTransactionLp.forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input RunScript) (*ledger.CreatedTransaction, error) { result, err := tracing.TraceWithMetric( ctx, "ExecuteMachine", @@ -294,7 +313,7 @@ func (ctrl *DefaultController) CreateTransaction(ctx context.Context, parameters } func (ctrl *DefaultController) RevertTransaction(ctx context.Context, parameters Parameters[RevertTransaction]) (*ledger.RevertedTransaction, error) { - return forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input RevertTransaction) (*ledger.RevertedTransaction, error) { + return ctrl.revertTransactionLp.forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input RevertTransaction) (*ledger.RevertedTransaction, error) { var ( hasBeenReverted bool @@ -359,7 +378,7 @@ func (ctrl *DefaultController) RevertTransaction(ctx context.Context, parameters } func (ctrl *DefaultController) SaveTransactionMetadata(ctx context.Context, parameters Parameters[SaveTransactionMetadata]) error { - _, err := forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input SaveTransactionMetadata) (*ledger.SavedMetadata, error) { + _, err := ctrl.saveTransactionMetadataLp.forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input SaveTransactionMetadata) (*ledger.SavedMetadata, error) { if _, _, err := sqlTX.UpdateTransactionMetadata(ctx, input.TransactionID, input.Metadata); err != nil { return nil, err } @@ -374,7 +393,7 @@ func (ctrl *DefaultController) SaveTransactionMetadata(ctx context.Context, para } func (ctrl *DefaultController) SaveAccountMetadata(ctx context.Context, parameters Parameters[SaveAccountMetadata]) error { - _, err := forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input SaveAccountMetadata) (*ledger.SavedMetadata, error) { + _, err := ctrl.saveAccountMetadataLp.forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input SaveAccountMetadata) (*ledger.SavedMetadata, error) { if _, err := sqlTX.UpsertAccount(ctx, &ledger.Account{ Address: input.Address, Metadata: input.Metadata, @@ -393,7 +412,7 @@ func (ctrl *DefaultController) SaveAccountMetadata(ctx context.Context, paramete } func (ctrl *DefaultController) DeleteTransactionMetadata(ctx context.Context, parameters Parameters[DeleteTransactionMetadata]) error { - _, err := forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input DeleteTransactionMetadata) (*ledger.DeletedMetadata, error) { + _, err := ctrl.deleteTransactionMetadata.forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input DeleteTransactionMetadata) (*ledger.DeletedMetadata, error) { _, modified, err := sqlTX.DeleteTransactionMetadata(ctx, input.TransactionID, input.Key) if err != nil { return nil, err @@ -414,7 +433,7 @@ func (ctrl *DefaultController) DeleteTransactionMetadata(ctx context.Context, pa } func (ctrl *DefaultController) DeleteAccountMetadata(ctx context.Context, parameters Parameters[DeleteAccountMetadata]) error { - _, err := forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input DeleteAccountMetadata) (*ledger.DeletedMetadata, error) { + _, err := ctrl.deleteAccountMetadata.forgeLog(ctx, ctrl.store, parameters, func(ctx context.Context, sqlTX TX, input DeleteAccountMetadata) (*ledger.DeletedMetadata, error) { err := sqlTX.DeleteAccountMetadata(ctx, input.Address, input.Key) if err != nil { return nil, err @@ -447,4 +466,4 @@ func WithTracer(tracer trace.Tracer) DefaultControllerOption { return func(controller *DefaultController) { controller.tracer = tracer } -} \ No newline at end of file +} diff --git a/internal/controller/ledger/log_process.go b/internal/controller/ledger/log_process.go index dd2b0f4cf..5b164f00a 100644 --- a/internal/controller/ledger/log_process.go +++ b/internal/controller/ledger/log_process.go @@ -9,12 +9,23 @@ import ( "github.com/formancehq/go-libs/pointer" ledger "github.com/formancehq/ledger/internal" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" - "math/rand" - "time" ) -func runTx[INPUT any, OUTPUT ledger.LogPayload](ctx context.Context, store Store, parameters Parameters[INPUT], fn func(ctx context.Context, sqlTX TX, input INPUT) (*OUTPUT, error)) (*OUTPUT, error) { +type logProcessor[INPUT any, OUTPUT ledger.LogPayload] struct { + deadLockCounter metric.Int64Counter + operation string +} + +func newLogProcessor[INPUT any, OUTPUT ledger.LogPayload](operation string, deadlockCounter metric.Int64Counter) *logProcessor[INPUT, OUTPUT] { + return &logProcessor[INPUT, OUTPUT]{ + operation: operation, + deadLockCounter: deadlockCounter, + } +} + +func (lp *logProcessor[INPUT, OUTPUT]) runTx(ctx context.Context, store Store, parameters Parameters[INPUT], fn func(ctx context.Context, sqlTX TX, input INPUT) (*OUTPUT, error)) (*OUTPUT, error) { var payload *OUTPUT err := store.WithTX(ctx, nil, func(tx TX) (commit bool, err error) { payload, err = fn(ctx, tx, parameters.Input) @@ -40,10 +51,14 @@ func runTx[INPUT any, OUTPUT ledger.LogPayload](ctx context.Context, store Store return payload, err } -// todo: metrics, add deadlocks -func forgeLog[INPUT any, OUTPUT ledger.LogPayload](ctx context.Context, store Store, parameters Parameters[INPUT], fn func(ctx context.Context, sqlTX TX, input INPUT) (*OUTPUT, error)) (*OUTPUT, error) { +func (lp *logProcessor[INPUT, OUTPUT]) forgeLog( + ctx context.Context, + store Store, + parameters Parameters[INPUT], + fn func(ctx context.Context, sqlTX TX, input INPUT) (*OUTPUT, error), +) (*OUTPUT, error) { if parameters.IdempotencyKey != "" { - output, err := fetchLogWithIK[INPUT, OUTPUT](ctx, store, parameters) + output, err := lp.fetchLogWithIK(ctx, store, parameters) if err != nil { return nil, err } @@ -53,18 +68,19 @@ func forgeLog[INPUT any, OUTPUT ledger.LogPayload](ctx context.Context, store St } for { - output, err := runTx(ctx, store, parameters, fn) + output, err := lp.runTx(ctx, store, parameters, fn) if err != nil { switch { case errors.Is(err, postgres.ErrDeadlockDetected): trace.SpanFromContext(ctx).SetAttributes(attribute.Bool("deadlock", true)) logging.FromContext(ctx).Info("deadlock detected, retrying...") - // todo: keep ? / set configurable? - <-time.After(time.Duration(rand.Intn(100)) * time.Millisecond) + lp.deadLockCounter.Add(ctx, 1, metric.WithAttributes( + attribute.String("operation", lp.operation), + )) continue // A log with the IK could have been inserted in the meantime, read again the database to retrieve it case errors.Is(err, ErrIdempotencyKeyConflict{}): - output, err := fetchLogWithIK[INPUT, OUTPUT](ctx, store, parameters) + output, err := lp.fetchLogWithIK(ctx, store, parameters) if err != nil { return nil, err } @@ -82,7 +98,7 @@ func forgeLog[INPUT any, OUTPUT ledger.LogPayload](ctx context.Context, store St } } -func fetchLogWithIK[INPUT any, OUTPUT ledger.LogPayload](ctx context.Context, store Store, parameters Parameters[INPUT]) (*OUTPUT, error) { +func (lp *logProcessor[INPUT, OUTPUT]) fetchLogWithIK(ctx context.Context, store Store, parameters Parameters[INPUT]) (*OUTPUT, error) { log, err := store.ReadLogWithIdempotencyKey(ctx, parameters.IdempotencyKey) if err != nil && !errors.Is(err, postgres.ErrNotFound) { return nil, err diff --git a/internal/controller/ledger/log_process_test.go b/internal/controller/ledger/log_process_test.go index 60272f97b..4157b96f5 100644 --- a/internal/controller/ledger/log_process_test.go +++ b/internal/controller/ledger/log_process_test.go @@ -5,6 +5,7 @@ import ( "github.com/formancehq/go-libs/platform/postgres" ledger "github.com/formancehq/ledger/internal" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/metric/noop" "go.uber.org/mock/gomock" "testing" ) @@ -30,7 +31,8 @@ func TestForgeLogWithIKConflict(t *testing.T) { Data: ledger.CreatedTransaction{}, }, nil) - _, err := forgeLog[RunScript, ledger.CreatedTransaction](ctx, store, Parameters[RunScript]{ + lp := newLogProcessor[RunScript, ledger.CreatedTransaction]("foo", noop.Int64Counter{}) + _, err := lp.forgeLog(ctx, store, Parameters[RunScript]{ IdempotencyKey: "foo", }, nil) require.NoError(t, err) @@ -53,6 +55,7 @@ func TestForgeLogWithDeadlock(t *testing.T) { WithTX(gomock.Any(), gomock.Any(), gomock.Any()). Return(nil) - _, err := forgeLog[RunScript, ledger.CreatedTransaction](ctx, store, Parameters[RunScript]{}, nil) + lp := newLogProcessor[RunScript, ledger.CreatedTransaction]("foo", noop.Int64Counter{}) + _, err := lp.forgeLog(ctx, store, Parameters[RunScript]{}, nil) require.NoError(t, err) } diff --git a/test/performance/env_testserver_test.go b/test/performance/env_testserver_test.go index b1d447dea..ed6d828e9 100644 --- a/test/performance/env_testserver_test.go +++ b/test/performance/env_testserver_test.go @@ -64,7 +64,8 @@ func (f *TestServerEnvFactory) Create(ctx context.Context, b *testing.B, ledger Output: output, OTLPConfig: &testserver.OTLPConfig{ Metrics: &otlpmetrics.ModuleConfig{ - Exporter: "memory", + Exporter: "memory", + RuntimeMetrics: true, }, }, })