Skip to content

Commit

Permalink
feat: compute hash at core level
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 16, 2024
1 parent 0228fef commit d084b92
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 163 deletions.
4 changes: 2 additions & 2 deletions internal/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type Log struct {
// It allows to check if the usage of IdempotencyKey match inputs given on the first idempotency key usage.
IdempotencyHash string `json:"idempotencyHash" bun:"idempotency_hash,unique,nullzero"`
ID int `json:"id" bun:"id,unique,type:numeric"`
Hash []byte `json:"hash" bun:"hash,type:bytea,scanonly"`
Hash []byte `json:"hash" bun:"hash,type:bytea"`
}

func (l Log) WithIdempotencyKey(key string) Log {
Expand Down Expand Up @@ -367,4 +367,4 @@ func ComputeIdempotencyHash(inputs any) string {
}

return base64.URLEncoding.EncodeToString(digest.Sum(nil))
}
}
41 changes: 0 additions & 41 deletions internal/storage/bucket/migrations/11-stateless.sql
Original file line number Diff line number Diff line change
Expand Up @@ -231,47 +231,6 @@ begin
end;
$$;

create function "{{.Bucket}}".set_log_hash()
returns trigger
security definer
language plpgsql
as
$$
declare
previousHash bytea;
marshalledAsJSON varchar;
begin
select hash into previousHash
from "{{.Bucket}}".logs
where ledger = new.ledger
order by seq desc
limit 1;

-- select only fields participating in the hash on the backend and format json representation the same way
select public.json_compact(json_build_object(
'type', new.type,
'data', new.data,
'date', to_json(new.date::timestamp)#>>'{}' || 'Z',
'idempotencyKey', coalesce(new.idempotency_key, ''),
'id', 0,
'hash', null
)) into marshalledAsJSON;

new.hash = (
select public.digest(
case
when previousHash is null
then marshalledAsJSON::bytea
else '"' || encode(previousHash::bytea, 'base64')::bytea || E'"\n' || convert_to(marshalledAsJSON, 'LATIN1')::bytea
end || E'\n', 'sha256'::text
)
);

return new;
end;
$$;


create or replace function "{{.Bucket}}".update_transaction_metadata_history() returns trigger
security definer
language plpgsql
Expand Down
73 changes: 0 additions & 73 deletions internal/storage/driver/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,6 @@ func GetMigrator() *migrations.Migrator {
return err
},
},
migrations.Migration{
Name: "Add json_compact pg func",
UpWithContext: func(ctx context.Context, tx bun.Tx) error {
_, err := tx.ExecContext(ctx, jsonCompactPGFunc)
return err
},
},
migrations.Migration{
Name: "Rename ledger column to name",
UpWithContext: func(ctx context.Context, tx bun.Tx) error {
Expand Down Expand Up @@ -163,72 +156,6 @@ func Migrate(ctx context.Context, db bun.IDB) error {
return GetMigrator().Up(ctx, db)
}

// Postgres is able to compact json natively.
// But we need to have the same serialized format as go, including whitespaces, to be able to hash logs
// on the core AND the db with the same results.
// notes(gfyrag): this code has been "stolen" from a postgres wiki or something else,
// I didn't try to understand it, it just works as expected.
const jsonCompactPGFunc = `
CREATE EXTENSION IF NOT EXISTS "pgcrypto" SCHEMA public;
CREATE OR REPLACE FUNCTION public.json_compact(p_json JSON, p_step INTEGER DEFAULT 0)
RETURNS JSON
AS $$
DECLARE
v_type TEXT;
v_text TEXT := '';
v_indent INTEGER;
v_key TEXT;
v_object JSON;
v_count INTEGER;
BEGIN
p_step := coalesce(p_step, 0);
-- Object or array?
v_type := json_typeof(p_json);
IF v_type = 'object' THEN
-- Start object
v_text := '{';
SELECT count(*) - 1 INTO v_count
FROM json_object_keys(p_json);
-- go through keys, add them and recurse over value
FOR v_key IN (SELECT json_object_keys(p_json))
LOOP
v_text := v_text || to_json(v_key)::TEXT || ':' || public.json_compact(p_json->v_key, p_step + 1);
IF v_count > 0 THEN
v_text := v_text || ',';
v_count := v_count - 1;
END IF;
--v_text := v_text || E'\n';
END LOOP;
-- Close object
v_text := v_text || '}';
ELSIF v_type = 'array' THEN
-- Start array
v_text := '[';
v_count := json_array_length(p_json) - 1;
-- go through elements and add them through recursion
FOR v_object IN (SELECT json_array_elements(p_json))
LOOP
v_text := v_text || public.json_compact(v_object, p_step + 1);
IF v_count > 0 THEN
v_text := v_text || ',';
v_count := v_count - 1;
END IF;
--v_text := v_text || E'\n';
END LOOP;
-- Close array
v_text := v_text || ']';
ELSE -- A simple value
v_text := v_text || p_json::TEXT;
END IF;
IF p_step > 0 THEN RETURN v_text;
ELSE RETURN v_text::JSON;
END IF;
END;
$$ LANGUAGE plpgsql;
`

const jsonbMerge = `
create or replace function public.jsonb_concat(a jsonb, b jsonb) returns jsonb
as 'select $1 || $2'
Expand Down
21 changes: 18 additions & 3 deletions internal/storage/ledger/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package ledger

import (
"context"
"database/sql"
"database/sql/driver"
"encoding/json"
"fmt"

"github.com/formancehq/ledger/internal/tracing"

"github.com/formancehq/go-libs/bun/bunpaginate"
Expand Down Expand Up @@ -51,12 +51,27 @@ func (s *Store) InsertLog(ctx context.Context, log *ledger.Log) error {
if s.ledger.HasFeature(ledger.FeatureHashLogs, "SYNC") {
_, err := s.db.NewRaw(`select pg_advisory_xact_lock(hashtext(?))`, s.ledger.Name).Exec(ctx)
if err != nil {
return postgres.ResolveError(err)
return err
}
lastLog := &ledger.Log{}
err = s.db.NewSelect().
Model(lastLog).
ModelTableExpr(s.GetPrefixedRelationName("logs")).
Order("seq desc").
Where("ledger = ?", s.ledger.Name).
Limit(1).
Scan(ctx)
if err != nil {
if !errors.Is(err, sql.ErrNoRows) {
return errors.Wrap(err, "retrieving last log")
}
log.ComputeHash(nil)
} else {
log.ComputeHash(lastLog)
}
}

_, err := tracing.TraceWithLatency(ctx, "InsertLog", tracing.NoResult(func(ctx context.Context) error {

data, err := json.Marshal(log.Data)
if err != nil {
return errors.Wrap(err, "failed to marshal log data")
Expand Down
31 changes: 0 additions & 31 deletions internal/storage/ledger/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,43 +23,12 @@ import (
"github.com/stretchr/testify/require"
)

// todo: add log hash test with ledger v2

func TestInsertLog(t *testing.T) {
t.Parallel()

store := newLedgerStore(t)
ctx := logging.TestingContext()

t.Run("check hash against core", func(t *testing.T) {
// Insert a first tx (we don't have any previous hash to use at this moment)
log1 := ledger.NewTransactionLog(ledger.NewTransaction(), map[string]metadata.Metadata{})
log1Copy := log1

err := store.InsertLog(ctx, &log1)
require.NoError(t, err)

require.Equal(t, 1, log1.ID)
require.NotZero(t, log1.Hash)

// Ensure than the database hashing is the same as the go hashing
chainedLog1 := log1Copy.ChainLog(nil)
require.Equal(t, chainedLog1.Hash, log1.Hash)

// Insert a new log to test the hash when a previous hash exists
// We also addi an idempotency key to check for conflicts
log2 := ledger.NewTransactionLog(ledger.NewTransaction(), map[string]metadata.Metadata{})
log2Copy := log2
err = store.InsertLog(ctx, &log2)
require.NoError(t, err)
require.Equal(t, 2, log2.ID)
require.NotZero(t, log2.Hash)

// Ensure than the database hashing is the same as the go hashing
chainedLog2 := log2Copy.ChainLog(&log1)
require.Equal(t, chainedLog2.Hash, log2.Hash)
})

t.Run("duplicate IK", func(t *testing.T) {
// Insert a first tx (we don't have any previous hash to use at this moment)
logTx := ledger.NewTransactionLog(ledger.NewTransaction(), map[string]metadata.Metadata{}).
Expand Down
13 changes: 0 additions & 13 deletions internal/storage/ledger/migrations/0-add-sequences.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,6 @@ when (
execute procedure "{{.Bucket}}".update_effective_volumes();
{{ end }}

-- logs hash

{{ if .HasFeature "HASH_LOGS" "SYNC" }}
create trigger "set_log_hash_{{.ID}}"
before insert
on "{{.Bucket}}"."logs"
for each row
when (
new.ledger = '{{.Name}}'
)
execute procedure "{{.Bucket}}".set_log_hash();
{{ end }}

{{ if .HasFeature "ACCOUNT_METADATA_HISTORY" "SYNC" }}
create trigger "update_account_metadata_history_{{.ID}}"
after update
Expand Down

0 comments on commit d084b92

Please sign in to comment.