Skip to content

Commit

Permalink
feat: add IdempotencyHash on Log model
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 16, 2024
1 parent 635b3c9 commit ebfc7bd
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 13 deletions.
25 changes: 21 additions & 4 deletions internal/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,18 @@ func LogTypeFromString(logType string) LogType {
}

// Log represents atomic actions made on the ledger.
// notes(gfyrag): Keep keys ordered! the order matter when hashing the log.
type Log struct {
bun.BaseModel `bun:"table:logs,alias:logs"`

Type LogType `json:"type" bun:"type,type:log_type"`
Data any `json:"data" bun:"data,type:jsonb"`
Date time.Time `json:"date" bun:"date,type:timestamptz"`
IdempotencyKey string `json:"idempotencyKey" bun:"idempotency_key,type:varchar(256),unique,nullzero"`
ID int `json:"id" bun:"id,unique,type:numeric"`
Hash []byte `json:"hash" bun:"hash,type:bytea,scanonly"`
// IdempotencyHash is a signature used when using IdempotencyKey.
// It allows to check if the usage of IdempotencyKey match inputs given on the first idempotency key usage.
IdempotencyHash string `json:"idempotencyHash" bun:"idempotency_hash,unique,nullzero"`
ID int `json:"id" bun:"id,unique,type:numeric"`
Hash []byte `json:"hash" bun:"hash,type:bytea,scanonly"`
}

func (l Log) WithIdempotencyKey(key string) Log {
Expand Down Expand Up @@ -138,7 +140,22 @@ func (l *Log) ComputeHash(previous *Log) {
}
}

if err := enc.Encode(l); err != nil {
if err := enc.Encode(struct {
// notes(gfyrag): Keep keys ordered! the order matter when hashing the log.
Type LogType `json:"type"`
Data any `json:"data"`
Date time.Time `json:"date"`
IdempotencyKey string `json:"idempotencyKey"`
ID int `json:"id"`
Hash []byte `json:"hash"`
}{
Type: l.Type,
Data: l.Data,
Date: l.Date,
IdempotencyKey: l.IdempotencyKey,
ID: l.ID,
Hash: l.Hash,
}); err != nil {
panic(err)
}

Expand Down
3 changes: 3 additions & 0 deletions internal/storage/bucket/migrations/11-stateless.sql
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ alter table "{{.Bucket}}".logs
alter column data
type json;

alter table "{{.Bucket}}".logs
add column idempotency_hash bytea;

create unique index accounts_metadata_ledger on "{{.Bucket}}".accounts_metadata (ledger, accounts_address, revision);
create index accounts_metadata_revisions on "{{.Bucket}}".accounts_metadata(accounts_address asc, revision desc) include (metadata, date);

Expand Down
23 changes: 14 additions & 9 deletions internal/storage/ledger/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ package ledger_test
import (
"context"
"database/sql"
"golang.org/x/sync/errgroup"
"math/big"
"testing"

"github.com/alitto/pond"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
"github.com/pkg/errors"

Expand Down Expand Up @@ -80,24 +80,29 @@ func TestInsertLog(t *testing.T) {
})

t.Run("hash consistency over high concurrency", func(t *testing.T) {
wp := pond.New(10, 10)
const countLogs = 100
errGroup, _ := errgroup.WithContext(ctx)
const countLogs = 50
for range countLogs {
wp.Submit(func() {
errGroup.Go(func() error {
tx, err := store.GetDB().BeginTx(ctx, &sql.TxOptions{})
require.NoError(t, err)
if err != nil {
return err
}
defer func() {
_ = tx.Rollback()
}()
store := store.WithDB(tx)

logTx := ledger.NewTransactionLog(ledger.NewTransaction(), map[string]metadata.Metadata{})
err = store.InsertLog(ctx, &logTx)
require.NoError(t, err)
require.NoError(t, tx.Commit())
if err != nil {
return err
}
return tx.Commit()
})
}
wp.StopAndWait()
err := errGroup.Wait()
require.NoError(t, err)

logs, err := store.ListLogs(ctx, ledgercontroller.NewListLogsQuery(ledgercontroller.PaginatedQueryOptions[any]{
PageSize: countLogs,
Expand Down Expand Up @@ -179,4 +184,4 @@ func TestGetLogs(t *testing.T) {
// Should get only the second log, as StartTime is inclusive and EndTime exclusive.
require.Len(t, cursor.Data, 1)
require.EqualValues(t, 2, cursor.Data[0].ID)
}
}

0 comments on commit ebfc7bd

Please sign in to comment.