Skip to content

Commit

Permalink
feat: move segments generation on a trigger
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 23, 2024
1 parent fe96683 commit d302f70
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 54 deletions.
15 changes: 15 additions & 0 deletions internal/storage/bucket/migrations/11-stateless.sql
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ type bigint;
alter table "{{.Bucket}}".transactions
drop column seq;

alter table "{{.Bucket}}".accounts
alter column address_array drop not null;

alter table "{{.Bucket}}".logs
alter column hash
drop not null;
Expand Down Expand Up @@ -394,4 +397,16 @@ begin

return new;
end
$$;

create or replace function "{{.Bucket}}".set_address_array_for_account() returns trigger
security definer
language plpgsql
as
$$
begin
new.address_array = to_json(string_to_array(new.address, ':'));

return new;
end
$$;
20 changes: 8 additions & 12 deletions internal/storage/ledger/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ import (
"context"
"database/sql"
"fmt"
"math/big"
"regexp"
"strings"

. "github.com/formancehq/go-libs/bun/bunpaginate"
"github.com/formancehq/ledger/internal/tracing"
"math/big"
"regexp"

"github.com/formancehq/go-libs/logging"
"github.com/formancehq/go-libs/metadata"
Expand Down Expand Up @@ -62,7 +60,6 @@ type Account struct {

Ledger string `bun:"ledger"`
Address string `bun:"address"`
AddressArray []string `bun:"address_array"`
Metadata metadata.Metadata `bun:"metadata,type:jsonb"`
InsertionDate time.Time `bun:"insertion_date"`
UpdatedAt time.Time `bun:"updated_at"`
Expand All @@ -74,11 +71,11 @@ type Account struct {

func (account Account) toCore() ledger.Account {
return ledger.Account{
Address: account.Address,
Metadata: account.Metadata,
FirstUsage: account.FirstUsage,
InsertionDate: account.InsertionDate,
UpdatedAt: account.UpdatedAt,
Address: account.Address,
Metadata: account.Metadata,
FirstUsage: account.FirstUsage,
InsertionDate: account.InsertionDate,
UpdatedAt: account.UpdatedAt,
Volumes: account.PostCommitVolumes.toCore(),
EffectiveVolumes: account.PostCommitEffectiveVolumes.toCore(),
}
Expand Down Expand Up @@ -330,7 +327,6 @@ func (s *Store) UpdateAccountsMetadata(ctx context.Context, m map[string]metadat
accounts = append(accounts, Account{
Ledger: s.ledger.Name,
Address: account,
AddressArray: strings.Split(account, ":"),
Metadata: accountMetadata,
InsertionDate: now,
UpdatedAt: now,
Expand Down Expand Up @@ -366,7 +362,6 @@ func (s *Store) DeleteAccountMetadata(ctx context.Context, account, key string)
func (s *Store) UpsertAccount(ctx context.Context, account *ledger.Account) error {
mappedAccount := &Account{
Ledger: s.ledger.Name,
AddressArray: strings.Split(account.Address, ":"),
Address: account.Address,
FirstUsage: account.FirstUsage,
InsertionDate: account.InsertionDate,
Expand Down Expand Up @@ -406,6 +401,7 @@ func (s *Store) upsertAccount(ctx context.Context, account *Account) (bool, erro
Set("first_usage = case when ? < excluded.first_usage then ? else excluded.first_usage end", account.FirstUsage, account.FirstUsage).
Set("metadata = accounts.metadata || excluded.metadata").
Set("updated_at = ?", account.UpdatedAt).
Value("ledger", "?", s.ledger.Name).
Returning("*").
Where("(? < accounts.first_usage) or not accounts.metadata @> excluded.metadata", account.FirstUsage),
).
Expand Down
6 changes: 2 additions & 4 deletions internal/storage/ledger/accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,6 @@ func TestUpsertAccount(t *testing.T) {
account := Account{
Ledger: store.Name(),
Address: "foo",
AddressArray: []string{"foo"},
FirstUsage: now,
InsertionDate: now,
UpdatedAt: now,
Expand All @@ -443,9 +442,8 @@ func TestUpsertAccount(t *testing.T) {

// reset the account model
account = Account{
Ledger: store.Name(),
Address: "foo",
AddressArray: []string{"foo"},
Ledger: store.Name(),
Address: "foo",
// the account will be upserted on the timeline after its initial usage
// the upsert should not modify anything
// but, it should retrieve and load the account entity
Expand Down
1 change: 0 additions & 1 deletion internal/storage/ledger/balances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ func TestBalancesGet(t *testing.T) {
world := &Account{
Ledger: store.ledger.Name,
Address: "world",
AddressArray: []string{"world"},
InsertionDate: time.Now(),
UpdatedAt: time.Now(),
FirstUsage: time.Now(),
Expand Down
9 changes: 9 additions & 0 deletions internal/storage/ledger/migrations/0-add-sequences.sql
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,15 @@ execute procedure "{{.Bucket}}".set_transaction_addresses();
create index "accounts_address_array_{{.ID}}" on "{{.Bucket}}".accounts using gin (address_array jsonb_ops) where ledger = '{{.Name}}';
create index "accounts_address_array_length_{{.ID}}" on "{{.Bucket}}".accounts (jsonb_array_length(address_array)) where ledger = '{{.Name}}';

create trigger "accounts_set_address_array_{{.ID}}"
before insert
on "{{.Bucket}}"."accounts"
for each row
when (
new.ledger = '{{.Name}}'
)
execute procedure "{{.Bucket}}".set_address_array_for_account();

{{ if .HasFeature "INDEX_TRANSACTION_ACCOUNTS" "ON" }}
create index "transactions_sources_arrays_{{.ID}}" on "{{.Bucket}}".transactions using gin (sources_arrays jsonb_path_ops) where ledger = '{{.Name}}';
create index "transactions_destinations_arrays_{{.ID}}" on "{{.Bucket}}".transactions using gin (destinations_arrays jsonb_path_ops) where ledger = '{{.Name}}';
Expand Down
74 changes: 38 additions & 36 deletions internal/storage/ledger/moves_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ func TestMovesInsert(t *testing.T) {
)
require.NoError(t, store.insertTransaction(ctx, &tx))

account := &Account{}
account := &Account{
Address: "world",
}
_, err := store.upsertAccount(ctx, account)
require.NoError(t, err)

Expand All @@ -51,13 +53,13 @@ func TestMovesInsert(t *testing.T) {

// insert a first tx at t0
m1 := Move{
Ledger: store.ledger.Name,
IsSource: true,
Account: "world",
Amount: (*bunpaginate.BigInt)(big.NewInt(100)),
Asset: "USD",
InsertionDate: t0,
EffectiveDate: t0,
Ledger: store.ledger.Name,
IsSource: true,
Account: "world",
Amount: (*bunpaginate.BigInt)(big.NewInt(100)),
Asset: "USD",
InsertionDate: t0,
EffectiveDate: t0,
}
require.NoError(t, store.insertMoves(ctx, &m1))
require.NotNil(t, m1.PostCommitEffectiveVolumes)
Expand All @@ -68,13 +70,13 @@ func TestMovesInsert(t *testing.T) {

// add a second move at t3
m2 := Move{
Ledger: store.ledger.Name,
IsSource: false,
Account: "world",
Amount: (*bunpaginate.BigInt)(big.NewInt(50)),
Asset: "USD",
InsertionDate: t3,
EffectiveDate: t3,
Ledger: store.ledger.Name,
IsSource: false,
Account: "world",
Amount: (*bunpaginate.BigInt)(big.NewInt(50)),
Asset: "USD",
InsertionDate: t3,
EffectiveDate: t3,
}
require.NoError(t, store.insertMoves(ctx, &m2))
require.NotNil(t, m2.PostCommitEffectiveVolumes)
Expand All @@ -85,13 +87,13 @@ func TestMovesInsert(t *testing.T) {

// add a third move at t1
m3 := Move{
Ledger: store.ledger.Name,
IsSource: true,
Account: "world",
Amount: (*bunpaginate.BigInt)(big.NewInt(200)),
Asset: "USD",
InsertionDate: t1,
EffectiveDate: t1,
Ledger: store.ledger.Name,
IsSource: true,
Account: "world",
Amount: (*bunpaginate.BigInt)(big.NewInt(200)),
Asset: "USD",
InsertionDate: t1,
EffectiveDate: t1,
}
require.NoError(t, store.insertMoves(ctx, &m3))
require.NotNil(t, m3.PostCommitEffectiveVolumes)
Expand All @@ -102,13 +104,13 @@ func TestMovesInsert(t *testing.T) {

// add a fourth move at t2
m4 := Move{
Ledger: store.ledger.Name,
IsSource: false,
Account: "world",
Amount: (*bunpaginate.BigInt)(big.NewInt(50)),
Asset: "USD",
InsertionDate: t2,
EffectiveDate: t2,
Ledger: store.ledger.Name,
IsSource: false,
Account: "world",
Amount: (*bunpaginate.BigInt)(big.NewInt(50)),
Asset: "USD",
InsertionDate: t2,
EffectiveDate: t2,
}
require.NoError(t, store.insertMoves(ctx, &m4))
require.NotNil(t, m4.PostCommitEffectiveVolumes)
Expand All @@ -119,13 +121,13 @@ func TestMovesInsert(t *testing.T) {

// add a fifth move at t4
m5 := Move{
Ledger: store.ledger.Name,
IsSource: false,
Account: "world",
Amount: (*bunpaginate.BigInt)(big.NewInt(50)),
Asset: "USD",
InsertionDate: t4,
EffectiveDate: t4,
Ledger: store.ledger.Name,
IsSource: false,
Account: "world",
Amount: (*bunpaginate.BigInt)(big.NewInt(50)),
Asset: "USD",
InsertionDate: t4,
EffectiveDate: t4,
}
require.NoError(t, store.insertMoves(ctx, &m5))
require.NotNil(t, m5.PostCommitEffectiveVolumes)
Expand Down
1 change: 0 additions & 1 deletion internal/storage/ledger/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e
for _, address := range tx.InvolvedAccounts() {
_, err := s.upsertAccount(ctx, &Account{
Ledger: s.ledger.Name,
AddressArray: strings.Split(address, ":"),
Address: address,
FirstUsage: tx.Timestamp,
InsertionDate: tx.InsertedAt,
Expand Down

0 comments on commit d302f70

Please sign in to comment.