diff --git a/internal/log.go b/internal/log.go index 7a92ca035..653ffc798 100644 --- a/internal/log.go +++ b/internal/log.go @@ -92,7 +92,7 @@ type Log struct { // It allows to check if the usage of IdempotencyKey match inputs given on the first idempotency key usage. IdempotencyHash string `json:"idempotencyHash" bun:"idempotency_hash,unique,nullzero"` ID int `json:"id" bun:"id,unique,type:numeric"` - Hash []byte `json:"hash" bun:"hash,type:bytea,scanonly"` + Hash []byte `json:"hash" bun:"hash,type:bytea"` } func (l Log) WithIdempotencyKey(key string) Log { @@ -367,4 +367,4 @@ func ComputeIdempotencyHash(inputs any) string { } return base64.URLEncoding.EncodeToString(digest.Sum(nil)) -} \ No newline at end of file +} diff --git a/internal/storage/bucket/migrations/11-stateless.sql b/internal/storage/bucket/migrations/11-stateless.sql index 9cdc0c141..f83c75665 100644 --- a/internal/storage/bucket/migrations/11-stateless.sql +++ b/internal/storage/bucket/migrations/11-stateless.sql @@ -231,47 +231,6 @@ begin end; $$; -create function "{{.Bucket}}".set_log_hash() - returns trigger - security definer - language plpgsql -as -$$ -declare - previousHash bytea; - marshalledAsJSON varchar; -begin - select hash into previousHash - from "{{.Bucket}}".logs - where ledger = new.ledger - order by seq desc - limit 1; - - -- select only fields participating in the hash on the backend and format json representation the same way - select public.json_compact(json_build_object( - 'type', new.type, - 'data', new.data, - 'date', to_json(new.date::timestamp)#>>'{}' || 'Z', - 'idempotencyKey', coalesce(new.idempotency_key, ''), - 'id', 0, - 'hash', null - )) into marshalledAsJSON; - - new.hash = ( - select public.digest( - case - when previousHash is null - then marshalledAsJSON::bytea - else '"' || encode(previousHash::bytea, 'base64')::bytea || E'"\n' || convert_to(marshalledAsJSON, 'LATIN1')::bytea - end || E'\n', 'sha256'::text - ) - ); - - return new; -end; -$$; - - create or replace function "{{.Bucket}}".update_transaction_metadata_history() returns trigger security definer language plpgsql diff --git a/internal/storage/driver/migrations.go b/internal/storage/driver/migrations.go index 52d5fa705..957c175ef 100644 --- a/internal/storage/driver/migrations.go +++ b/internal/storage/driver/migrations.go @@ -99,13 +99,6 @@ func GetMigrator() *migrations.Migrator { return err }, }, - migrations.Migration{ - Name: "Add json_compact pg func", - UpWithContext: func(ctx context.Context, tx bun.Tx) error { - _, err := tx.ExecContext(ctx, jsonCompactPGFunc) - return err - }, - }, migrations.Migration{ Name: "Rename ledger column to name", UpWithContext: func(ctx context.Context, tx bun.Tx) error { @@ -163,72 +156,6 @@ func Migrate(ctx context.Context, db bun.IDB) error { return GetMigrator().Up(ctx, db) } -// Postgres is able to compact json natively. -// But we need to have the same serialized format as go, including whitespaces, to be able to hash logs -// on the core AND the db with the same results. -// notes(gfyrag): this code has been "stolen" from a postgres wiki or something else, -// I didn't try to understand it, it just works as expected. -const jsonCompactPGFunc = ` -CREATE EXTENSION IF NOT EXISTS "pgcrypto" SCHEMA public; - -CREATE OR REPLACE FUNCTION public.json_compact(p_json JSON, p_step INTEGER DEFAULT 0) -RETURNS JSON -AS $$ -DECLARE - v_type TEXT; - v_text TEXT := ''; - v_indent INTEGER; - v_key TEXT; - v_object JSON; - v_count INTEGER; -BEGIN - p_step := coalesce(p_step, 0); - -- Object or array? - v_type := json_typeof(p_json); - - IF v_type = 'object' THEN - -- Start object - v_text := '{'; - SELECT count(*) - 1 INTO v_count - FROM json_object_keys(p_json); - -- go through keys, add them and recurse over value - FOR v_key IN (SELECT json_object_keys(p_json)) - LOOP - v_text := v_text || to_json(v_key)::TEXT || ':' || public.json_compact(p_json->v_key, p_step + 1); - IF v_count > 0 THEN - v_text := v_text || ','; - v_count := v_count - 1; - END IF; - --v_text := v_text || E'\n'; - END LOOP; - -- Close object - v_text := v_text || '}'; - ELSIF v_type = 'array' THEN - -- Start array - v_text := '['; - v_count := json_array_length(p_json) - 1; - -- go through elements and add them through recursion - FOR v_object IN (SELECT json_array_elements(p_json)) - LOOP - v_text := v_text || public.json_compact(v_object, p_step + 1); - IF v_count > 0 THEN - v_text := v_text || ','; - v_count := v_count - 1; - END IF; - --v_text := v_text || E'\n'; - END LOOP; - -- Close array - v_text := v_text || ']'; - ELSE -- A simple value - v_text := v_text || p_json::TEXT; - END IF; - IF p_step > 0 THEN RETURN v_text; - ELSE RETURN v_text::JSON; - END IF; -END; -$$ LANGUAGE plpgsql; -` - const jsonbMerge = ` create or replace function public.jsonb_concat(a jsonb, b jsonb) returns jsonb as 'select $1 || $2' diff --git a/internal/storage/ledger/logs.go b/internal/storage/ledger/logs.go index 9c19be57f..298f0b706 100644 --- a/internal/storage/ledger/logs.go +++ b/internal/storage/ledger/logs.go @@ -2,10 +2,10 @@ package ledger import ( "context" + "database/sql" "database/sql/driver" "encoding/json" "fmt" - "github.com/formancehq/ledger/internal/tracing" "github.com/formancehq/go-libs/bun/bunpaginate" @@ -51,12 +51,27 @@ func (s *Store) InsertLog(ctx context.Context, log *ledger.Log) error { if s.ledger.HasFeature(ledger.FeatureHashLogs, "SYNC") { _, err := s.db.NewRaw(`select pg_advisory_xact_lock(hashtext(?))`, s.ledger.Name).Exec(ctx) if err != nil { - return postgres.ResolveError(err) + return err + } + lastLog := &ledger.Log{} + err = s.db.NewSelect(). + Model(lastLog). + ModelTableExpr(s.GetPrefixedRelationName("logs")). + Order("seq desc"). + Where("ledger = ?", s.ledger.Name). + Limit(1). + Scan(ctx) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + return errors.Wrap(err, "retrieving last log") + } + log.ComputeHash(nil) + } else { + log.ComputeHash(lastLog) } } _, err := tracing.TraceWithLatency(ctx, "InsertLog", tracing.NoResult(func(ctx context.Context) error { - data, err := json.Marshal(log.Data) if err != nil { return errors.Wrap(err, "failed to marshal log data") diff --git a/internal/storage/ledger/logs_test.go b/internal/storage/ledger/logs_test.go index 7b351a64f..eb8c7e5a7 100644 --- a/internal/storage/ledger/logs_test.go +++ b/internal/storage/ledger/logs_test.go @@ -23,43 +23,12 @@ import ( "github.com/stretchr/testify/require" ) -// todo: add log hash test with ledger v2 - func TestInsertLog(t *testing.T) { t.Parallel() store := newLedgerStore(t) ctx := logging.TestingContext() - t.Run("check hash against core", func(t *testing.T) { - // Insert a first tx (we don't have any previous hash to use at this moment) - log1 := ledger.NewTransactionLog(ledger.NewTransaction(), map[string]metadata.Metadata{}) - log1Copy := log1 - - err := store.InsertLog(ctx, &log1) - require.NoError(t, err) - - require.Equal(t, 1, log1.ID) - require.NotZero(t, log1.Hash) - - // Ensure than the database hashing is the same as the go hashing - chainedLog1 := log1Copy.ChainLog(nil) - require.Equal(t, chainedLog1.Hash, log1.Hash) - - // Insert a new log to test the hash when a previous hash exists - // We also addi an idempotency key to check for conflicts - log2 := ledger.NewTransactionLog(ledger.NewTransaction(), map[string]metadata.Metadata{}) - log2Copy := log2 - err = store.InsertLog(ctx, &log2) - require.NoError(t, err) - require.Equal(t, 2, log2.ID) - require.NotZero(t, log2.Hash) - - // Ensure than the database hashing is the same as the go hashing - chainedLog2 := log2Copy.ChainLog(&log1) - require.Equal(t, chainedLog2.Hash, log2.Hash) - }) - t.Run("duplicate IK", func(t *testing.T) { // Insert a first tx (we don't have any previous hash to use at this moment) logTx := ledger.NewTransactionLog(ledger.NewTransaction(), map[string]metadata.Metadata{}). diff --git a/internal/storage/ledger/migrations/0-add-sequences.sql b/internal/storage/ledger/migrations/0-add-sequences.sql index aa709231b..a4b16186f 100644 --- a/internal/storage/ledger/migrations/0-add-sequences.sql +++ b/internal/storage/ledger/migrations/0-add-sequences.sql @@ -41,19 +41,6 @@ when ( execute procedure "{{.Bucket}}".update_effective_volumes(); {{ end }} --- logs hash - -{{ if .HasFeature "HASH_LOGS" "SYNC" }} -create trigger "set_log_hash_{{.ID}}" -before insert -on "{{.Bucket}}"."logs" -for each row -when ( - new.ledger = '{{.Name}}' -) -execute procedure "{{.Bucket}}".set_log_hash(); -{{ end }} - {{ if .HasFeature "ACCOUNT_METADATA_HISTORY" "SYNC" }} create trigger "update_account_metadata_history_{{.ID}}" after update