diff --git a/internal/api/router.go b/internal/api/router.go index 7e08bbfd4..439fbe623 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -17,6 +17,7 @@ import ( "github.com/go-chi/chi/v5" ) +// todo: refine textual errors func NewRouter( systemController system.Controller, healthController *health.HealthController, diff --git a/internal/controller/ledger/log_process.go b/internal/controller/ledger/log_process.go index 78e0ead70..03c35c7e2 100644 --- a/internal/controller/ledger/log_process.go +++ b/internal/controller/ledger/log_process.go @@ -9,6 +9,8 @@ import ( "github.com/formancehq/go-libs/pointer" ledger "github.com/formancehq/ledger/internal" "github.com/formancehq/ledger/internal/tracing" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) func runTx[INPUT any, OUTPUT ledger.LogPayload](ctx context.Context, store Store, parameters Parameters[INPUT], fn func(ctx context.Context, sqlTX TX, input INPUT) (*OUTPUT, error)) (*OUTPUT, error) { @@ -41,26 +43,19 @@ func runTx[INPUT any, OUTPUT ledger.LogPayload](ctx context.Context, store Store return payload, err } +// todo: metrics, add deadlocks // todo: handle too many clients error // notes(gfyrag): how? // By retrying? Is the server already overloaded? Add a limit on the retries number? // Ask the client to retry later? func forgeLog[INPUT any, OUTPUT ledger.LogPayload](ctx context.Context, store Store, parameters Parameters[INPUT], fn func(ctx context.Context, sqlTX TX, input INPUT) (*OUTPUT, error)) (*OUTPUT, error) { if parameters.IdempotencyKey != "" { - log, err := store.ReadLogWithIdempotencyKey(ctx, parameters.IdempotencyKey) - if err != nil && !errors.Is(err, postgres.ErrNotFound) { + output, err := fetchLogWithIK[INPUT, OUTPUT](ctx, store, parameters) + if err != nil { return nil, err } - if err == nil { - // notes(gfyrag): idempotency hash should never be empty in this case, but data from previous - // ledger version does not have this field and it cannot be recomputed - if log.IdempotencyHash != "" { - if computedHash := ledger.ComputeIdempotencyHash(parameters.Input); log.IdempotencyHash != computedHash { - return nil, newErrInvalidIdempotencyInputs(log.IdempotencyKey, log.IdempotencyHash, computedHash) - } - } - - return pointer.For(log.Data.(OUTPUT)), nil + if output != nil { + return output, nil } } @@ -69,20 +64,22 @@ func forgeLog[INPUT any, OUTPUT ledger.LogPayload](ctx context.Context, store St if err != nil { switch { case errors.Is(err, postgres.ErrDeadlockDetected): + trace.SpanFromContext(ctx).SetAttributes(attribute.Bool("deadlock", true)) logging.FromContext(ctx).Info("deadlock detected, retrying...") continue // A log with the IK could have been inserted in the meantime, read again the database to retrieve it case errors.Is(err, ErrIdempotencyKeyConflict{}): - log, err := store.ReadLogWithIdempotencyKey(ctx, parameters.IdempotencyKey) - if err != nil && !errors.Is(err, postgres.ErrNotFound) { + output, err := fetchLogWithIK[INPUT, OUTPUT](ctx, store, parameters) + if err != nil { return nil, err } - if errors.Is(err, postgres.ErrNotFound) { - logging.FromContext(ctx).Errorf("incoherent error, received duplicate IK but log not found in database") + if output == nil { + err = errors.New("incoherent error, received duplicate IK but log not found in database") + trace.SpanFromContext(ctx).RecordError(err) return nil, err } - return pointer.For(log.Data.(OUTPUT)), nil + return output, nil default: return nil, fmt.Errorf("unexpected error while forging log: %w", err) } @@ -91,3 +88,22 @@ func forgeLog[INPUT any, OUTPUT ledger.LogPayload](ctx context.Context, store St return output, nil } } + +func fetchLogWithIK[INPUT any, OUTPUT ledger.LogPayload](ctx context.Context, store Store, parameters Parameters[INPUT]) (*OUTPUT, error) { + log, err := store.ReadLogWithIdempotencyKey(ctx, parameters.IdempotencyKey) + if err != nil && !errors.Is(err, postgres.ErrNotFound) { + return nil, err + } + if err == nil { + // notes(gfyrag): idempotency hash should never be empty in this case, but data from previous + // ledger version does not have this field and it cannot be recomputed + if log.IdempotencyHash != "" { + if computedHash := ledger.ComputeIdempotencyHash(parameters.Input); log.IdempotencyHash != computedHash { + return nil, newErrInvalidIdempotencyInputs(log.IdempotencyKey, log.IdempotencyHash, computedHash) + } + } + + return pointer.For(log.Data.(OUTPUT)), nil + } + return nil, nil +} \ No newline at end of file