Skip to content

Commit

Permalink
feat: factorize some code
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 23, 2024
1 parent fe480fc commit be4eec4
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 17 deletions.
1 change: 1 addition & 0 deletions internal/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/go-chi/chi/v5"
)

// todo: refine textual errors
func NewRouter(
systemController system.Controller,
healthController *health.HealthController,
Expand Down
50 changes: 33 additions & 17 deletions internal/controller/ledger/log_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/formancehq/go-libs/pointer"
ledger "github.com/formancehq/ledger/internal"
"github.com/formancehq/ledger/internal/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

func runTx[INPUT any, OUTPUT ledger.LogPayload](ctx context.Context, store Store, parameters Parameters[INPUT], fn func(ctx context.Context, sqlTX TX, input INPUT) (*OUTPUT, error)) (*OUTPUT, error) {
Expand Down Expand Up @@ -41,26 +43,19 @@ func runTx[INPUT any, OUTPUT ledger.LogPayload](ctx context.Context, store Store
return payload, err
}

// todo: metrics, add deadlocks
// todo: handle too many clients error
// notes(gfyrag): how?
// By retrying? Is the server already overloaded? Add a limit on the retries number?
// Ask the client to retry later?
func forgeLog[INPUT any, OUTPUT ledger.LogPayload](ctx context.Context, store Store, parameters Parameters[INPUT], fn func(ctx context.Context, sqlTX TX, input INPUT) (*OUTPUT, error)) (*OUTPUT, error) {
if parameters.IdempotencyKey != "" {
log, err := store.ReadLogWithIdempotencyKey(ctx, parameters.IdempotencyKey)
if err != nil && !errors.Is(err, postgres.ErrNotFound) {
output, err := fetchLogWithIK[INPUT, OUTPUT](ctx, store, parameters)
if err != nil {
return nil, err
}
if err == nil {
// notes(gfyrag): idempotency hash should never be empty in this case, but data from previous
// ledger version does not have this field and it cannot be recomputed
if log.IdempotencyHash != "" {
if computedHash := ledger.ComputeIdempotencyHash(parameters.Input); log.IdempotencyHash != computedHash {
return nil, newErrInvalidIdempotencyInputs(log.IdempotencyKey, log.IdempotencyHash, computedHash)
}
}

return pointer.For(log.Data.(OUTPUT)), nil
if output != nil {
return output, nil
}
}

Expand All @@ -69,20 +64,22 @@ func forgeLog[INPUT any, OUTPUT ledger.LogPayload](ctx context.Context, store St
if err != nil {
switch {
case errors.Is(err, postgres.ErrDeadlockDetected):
trace.SpanFromContext(ctx).SetAttributes(attribute.Bool("deadlock", true))
logging.FromContext(ctx).Info("deadlock detected, retrying...")
continue
// A log with the IK could have been inserted in the meantime, read again the database to retrieve it
case errors.Is(err, ErrIdempotencyKeyConflict{}):
log, err := store.ReadLogWithIdempotencyKey(ctx, parameters.IdempotencyKey)
if err != nil && !errors.Is(err, postgres.ErrNotFound) {
output, err := fetchLogWithIK[INPUT, OUTPUT](ctx, store, parameters)
if err != nil {
return nil, err
}
if errors.Is(err, postgres.ErrNotFound) {
logging.FromContext(ctx).Errorf("incoherent error, received duplicate IK but log not found in database")
if output == nil {
err = errors.New("incoherent error, received duplicate IK but log not found in database")
trace.SpanFromContext(ctx).RecordError(err)
return nil, err
}

return pointer.For(log.Data.(OUTPUT)), nil
return output, nil
default:
return nil, fmt.Errorf("unexpected error while forging log: %w", err)
}
Expand All @@ -91,3 +88,22 @@ func forgeLog[INPUT any, OUTPUT ledger.LogPayload](ctx context.Context, store St
return output, nil
}
}

func fetchLogWithIK[INPUT any, OUTPUT ledger.LogPayload](ctx context.Context, store Store, parameters Parameters[INPUT]) (*OUTPUT, error) {
log, err := store.ReadLogWithIdempotencyKey(ctx, parameters.IdempotencyKey)
if err != nil && !errors.Is(err, postgres.ErrNotFound) {
return nil, err
}
if err == nil {
// notes(gfyrag): idempotency hash should never be empty in this case, but data from previous
// ledger version does not have this field and it cannot be recomputed
if log.IdempotencyHash != "" {
if computedHash := ledger.ComputeIdempotencyHash(parameters.Input); log.IdempotencyHash != computedHash {
return nil, newErrInvalidIdempotencyInputs(log.IdempotencyKey, log.IdempotencyHash, computedHash)
}
}

return pointer.For(log.Data.(OUTPUT)), nil
}
return nil, nil
}

0 comments on commit be4eec4

Please sign in to comment.