Skip to content

Commit

Permalink
feat: remove sequence of accounts
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 16, 2024
1 parent f606b88 commit a14530e
Show file tree
Hide file tree
Showing 11 changed files with 149 additions and 120 deletions.
79 changes: 66 additions & 13 deletions internal/storage/bucket/migrations/11-stateless.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,18 @@ add column post_commit_effective_volumes_jsonb jsonb;
alter table "{{.Bucket}}".moves
drop column transactions_seq;

alter table "{{.Bucket}}".moves
drop column accounts_seq;

alter table "{{.Bucket}}".moves
add column transactions_id bigint not null ;

alter table "{{.Bucket}}".moves
rename column account_address to accounts_address;

alter table "{{.Bucket}}".moves
rename column account_address_array to accounts_address_array;

alter table "{{.Bucket}}".moves
drop column post_commit_volumes;

Expand All @@ -45,11 +54,19 @@ drop not null,
alter column post_commit_effective_volumes
drop not null;

-- todo: need migrate
alter table "{{.Bucket}}".transactions_metadata
drop column transactions_seq;

alter table "{{.Bucket}}".transactions_metadata
add column transactions_id bigint;
add column transactions_id bigint not null;

-- todo: need migrate
alter table "{{.Bucket}}".accounts_metadata
drop column accounts_seq;

alter table "{{.Bucket}}".accounts_metadata
add column accounts_address varchar not null;

alter table "{{.Bucket}}".transactions
alter column id
Expand All @@ -71,6 +88,12 @@ type json;
--drop index transactions_metadata_ledger;
--drop index transactions_metadata_revisions;

--drop index accounts_metadata_ledger;
--drop index accounts_metadata_revisions;

create unique index accounts_metadata_ledger on "{{.Bucket}}".accounts_metadata (ledger, accounts_address, revision);
create index accounts_metadata_revisions on "{{.Bucket}}".accounts_metadata(accounts_address asc, revision desc) include (metadata, date);

create unique index transactions_metadata_ledger on "{{.Bucket}}".transactions_metadata (ledger, transactions_id, revision);
create index transactions_metadata_revisions on "{{.Bucket}}".transactions_metadata(transactions_id asc, revision desc) include (metadata, date);

Expand All @@ -89,24 +112,22 @@ create index transactions_metadata_revisions on "{{.Bucket}}".transactions_metad

create table "{{.Bucket}}".accounts_volumes (
ledger varchar not null,
accounts_seq int not null,
account varchar not null,
accounts_address varchar not null,
asset varchar not null,
input numeric not null,
output numeric not null,

primary key (ledger, account, asset)
primary key (ledger, accounts_address, asset)
);

create view "{{.Bucket}}".balances as
select ledger, accounts_seq, account, asset, input - output as balance
select ledger, accounts_address, asset, input - output as balance
from "{{.Bucket}}".accounts_volumes;

insert into "{{.Bucket}}".accounts_volumes (ledger, accounts_seq, account, asset, input, output)
select distinct on (ledger, accounts_seq, account_address, asset)
insert into "{{.Bucket}}".accounts_volumes (ledger, accounts_address, asset, input, output)
select distinct on (ledger, accounts_address, asset)
ledger,
accounts_seq,
account_address as account,
accounts_address,
asset,
(moves.post_commit_volumes->>'input')::numeric as input,
(moves.post_commit_volumes->>'output')::numeric as output
Expand All @@ -116,8 +137,8 @@ from (
order by seq desc
) moves;

drop index moves_post_commit_volumes;
drop index moves_effective_post_commit_volumes;
--drop index moves_post_commit_volumes;
--drop index moves_effective_post_commit_volumes;

drop trigger "insert_account" on "{{.Bucket}}".accounts;
drop trigger "update_account" on "{{.Bucket}}".accounts;
Expand Down Expand Up @@ -179,7 +200,7 @@ begin
'output', (post_commit_effective_volumes->>'output')::numeric + case when new.is_source then new.amount else 0 end
)
from "{{.Bucket}}".moves
where accounts_seq = new.accounts_seq
where accounts_address = new.accounts_address
and asset = new.asset
and ledger = new.ledger
and (effective_date < new.effective_date or (effective_date = new.effective_date and seq < new.seq))
Expand All @@ -206,7 +227,7 @@ begin
'input', (post_commit_effective_volumes->>'input')::numeric + case when new.is_source then 0 else new.amount end,
'output', (post_commit_effective_volumes->>'output')::numeric + case when new.is_source then new.amount else 0 end
)
where accounts_seq = new.accounts_seq
where accounts_address = new.accounts_address
and asset = new.asset
and effective_date > new.effective_date
and ledger = new.ledger;
Expand Down Expand Up @@ -286,4 +307,36 @@ begin

return new;
end;
$$;

create or replace function "{{.Bucket}}".update_account_metadata_history() returns trigger
security definer
language plpgsql
as
$$
begin
insert into "{{.Bucket}}".accounts_metadata (ledger, accounts_address, revision, date, metadata)
values (new.ledger, new.address, (
select revision + 1
from "{{.Bucket}}".accounts_metadata
where accounts_metadata.accounts_address = new.address
order by revision desc
limit 1
), new.updated_at, new.metadata);

return new;
end;
$$;

create or replace function "{{.Bucket}}".insert_account_metadata_history() returns trigger
security definer
language plpgsql
as
$$
begin
insert into "{{.Bucket}}".accounts_metadata (ledger, accounts_address, revision, date, metadata)
values (new.ledger, new.address, 1, new.insertion_date, new.metadata);

return new;
end;
$$;
26 changes: 12 additions & 14 deletions internal/storage/ledger/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ type Account struct {

PostCommitVolumes AggregatedAccountVolumes `bun:"pcv,scanonly"`
PostCommitEffectiveVolumes AggregatedAccountVolumes `bun:"pcev,scanonly"`
Seq int `bun:"seq,scanonly"`
}

func (account Account) toCore() ledger.ExpandedAccount {
Expand Down Expand Up @@ -115,11 +114,11 @@ func (s *Store) selectBalance(date *time.Time) *bun.SelectQuery {

func (s *Store) selectDistinctAccountMetadataHistories(date *time.Time) *bun.SelectQuery {
ret := s.db.NewSelect().
DistinctOn("accounts_seq").
DistinctOn("accounts_address").
ModelTableExpr(s.GetPrefixedRelationName("accounts_metadata")).
Where("ledger = ?", s.ledger.Name).
Column("accounts_seq", "metadata").
Order("accounts_seq", "revision desc")
Column("accounts_address", "metadata").
Order("accounts_address", "revision desc")

if date != nil && !date.IsZero() {
ret = ret.Where("date <= ?", date)
Expand Down Expand Up @@ -174,7 +173,7 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo
if s.ledger.HasFeature(ledger.FeatureAccountMetadataHistory, "SYNC") && date != nil && !date.IsZero() {
ret = ret.
Join(
`left join (?) accounts_metadata on accounts_metadata.accounts_seq = accounts.seq`,
`left join (?) accounts_metadata on accounts_metadata.accounts_address = accounts.address`,
s.selectDistinctAccountMetadataHistories(date),
).
ColumnExpr("coalesce(accounts_metadata.metadata, '{}'::jsonb) as metadata")
Expand All @@ -187,25 +186,25 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo
if s.ledger.HasFeature(ledger.FeatureMovesHistory, "ON") && needPCV {
ret = ret.
Join(
`left join (?) pcv on pcv.accounts_seq = accounts.seq`,
`left join (?) pcv on pcv.accounts_address = accounts.address`,
s.db.NewSelect().
TableExpr("(?) v", s.SelectDistinctMovesBySeq(date)).
Column("accounts_seq").
Column("accounts_address").
ColumnExpr(`to_json(array_agg(json_build_object('asset', v.asset, 'input', (v.post_commit_volumes->>'input')::numeric, 'output', (v.post_commit_volumes->>'output')::numeric))) as pcv`).
Group("accounts_seq"),
Group("accounts_address"),
).
ColumnExpr("pcv.*")
}

if s.ledger.HasFeature(ledger.FeatureMovesHistoryPostCommitEffectiveVolumes, "SYNC") && expandEffectiveVolumes {
ret = ret.
Join(
`left join (?) pcev on pcev.accounts_seq = accounts.seq`,
`left join (?) pcev on pcev.accounts_address = accounts.address`,
s.db.NewSelect().
TableExpr("(?) v", s.SelectDistinctMovesByEffectiveDate(date)).
Column("accounts_seq").
Column("accounts_address").
ColumnExpr(`to_json(array_agg(json_build_object('asset', v.asset, 'input', (v.post_commit_effective_volumes->>'input')::numeric, 'output', (v.post_commit_effective_volumes->>'output')::numeric))) as pcev`).
Group("accounts_seq"),
Group("accounts_address"),
).
ColumnExpr("pcev.*")
}
Expand All @@ -226,7 +225,7 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo
TableExpr(
"(?) balance",
s.selectBalance(date).
Where("asset = ? and moves.accounts_seq = accounts.seq", asset),
Where("asset = ? and moves.accounts_address = accounts.address", asset),
).
ColumnExpr(fmt.Sprintf("balance %s ?", convertOperatorToSQL(operator)), value).
String(), nil, nil
Expand All @@ -236,7 +235,7 @@ func (s *Store) selectAccounts(date *time.Time, expandVolumes, expandEffectiveVo
TableExpr(
"(?) balance",
s.selectBalance(date).
Where("moves.accounts_seq = accounts.seq"),
Where("moves.accounts_address = accounts.address"),
).
ColumnExpr(fmt.Sprintf("balance %s ?", convertOperatorToSQL(operator)), value).
String(), nil, nil
Expand Down Expand Up @@ -432,7 +431,6 @@ func (s *Store) upsertAccount(ctx context.Context, account *Account) (bool, erro
return err
}

account.Seq = upserted.Seq
account.FirstUsage = upserted.FirstUsage
account.InsertionDate = upserted.InsertionDate
account.UpdatedAt = upserted.UpdatedAt
Expand Down
2 changes: 0 additions & 2 deletions internal/storage/ledger/accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,6 @@ func TestUpsertAccount(t *testing.T) {
upserted, err := store.upsertAccount(ctx, &account)
require.NoError(t, err)
require.True(t, upserted)
require.NotZero(t, account.Seq)

// reset the account model
account = Account{
Expand All @@ -470,5 +469,4 @@ func TestUpsertAccount(t *testing.T) {
upserted, err = store.upsertAccount(ctx, &account)
require.NoError(t, err)
require.False(t, upserted)
require.NotZero(t, account.Seq)
}
27 changes: 13 additions & 14 deletions internal/storage/ledger/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type Balances struct {
bun.BaseModel `bun:"accounts_volumes"`

Ledger string `bun:"ledger,type:varchar"`
Account string `bun:"account,type:varchar"`
Account string `bun:"accounts_address,type:varchar"`
Asset string `bun:"asset,type:varchar"`
Balance *big.Int `bun:"balance,type:numeric"`
}
Expand Down Expand Up @@ -75,22 +75,21 @@ func (s *Store) selectAccountWithVolumes(date *time.Time, useInsertionDate bool,
}
selectAccountsWithVolumes = s.db.NewSelect().
TableExpr("(?) moves", s.SelectDistinctMovesBySeq(date)).
Column("asset", "accounts_seq", "account_address", "account_address_array").
Column("asset", "accounts_address", "accounts_address_array").
ColumnExpr("post_commit_volumes as volumes")
} else {
if !s.ledger.HasFeature(ledger.FeatureMovesHistoryPostCommitEffectiveVolumes, "SYNC") {
return ret.Err(ledgercontroller.NewErrMissingFeature(ledger.FeatureMovesHistoryPostCommitEffectiveVolumes))
}
selectAccountsWithVolumes = s.db.NewSelect().
TableExpr("(?) moves", s.SelectDistinctMovesByEffectiveDate(date)).
Column("asset", "accounts_seq", "account_address", "account_address_array").
Column("asset", "accounts_address", "accounts_address_array").
ColumnExpr("moves.post_commit_effective_volumes as volumes")
}
} else {
selectAccountsWithVolumes = s.db.NewSelect().
ModelTableExpr(s.GetPrefixedRelationName("accounts_volumes")).
Column("asset", "accounts_seq").
ColumnExpr("account as account_address").
Column("asset", "accounts_address").
ColumnExpr("json_build_object('input', input, 'output', output) as volumes").
Where("ledger = ?", s.ledger.Name)
}
Expand All @@ -103,20 +102,20 @@ func (s *Store) selectAccountWithVolumes(date *time.Time, useInsertionDate bool,
if s.ledger.HasFeature(ledger.FeatureAccountMetadataHistory, "SYNC") && date != nil && !date.IsZero() {
selectAccountsWithVolumes = selectAccountsWithVolumes.
Join(
`left join (?) accounts_metadata on accounts_metadata.accounts_seq = accounts_volumes.accounts_seq`,
`left join (?) accounts_metadata on accounts_metadata.accounts_address = accounts_volumes.accounts_address`,
s.selectDistinctAccountMetadataHistories(date),
).
ColumnExpr("coalesce(accounts_metadata.metadata, '{}'::jsonb) as metadata")

if needAddressSegment {
selectAccountsWithVolumes = selectAccountsWithVolumes.
Join("join " + s.GetPrefixedRelationName("accounts") + " on accounts.seq = accounts_volumes.accounts_seq").
Join("join " + s.GetPrefixedRelationName("accounts") + " on accounts.address = accounts_volumes.accounts_address").
Column("accounts.address_array")
}
} else {
selectAccountsWithVolumes = selectAccountsWithVolumes.
Join(
`join (?) accounts on accounts.seq = accounts_volumes.accounts_seq`,
`join (?) accounts on accounts.address = accounts_volumes.accounts_address`,
s.db.NewSelect().ModelTableExpr(s.GetPrefixedRelationName("accounts")),
)

Expand All @@ -131,8 +130,8 @@ func (s *Store) selectAccountWithVolumes(date *time.Time, useInsertionDate bool,
TableExpr(
"(?) accounts",
selectAccountsWithVolumes.
Join("join "+s.GetPrefixedRelationName("accounts")+" accounts on accounts.seq = accounts_volumes.accounts_seq").
ColumnExpr("accounts.address_array as account_address_array"),
Join("join "+s.GetPrefixedRelationName("accounts")+" accounts on accounts.address = accounts_volumes.accounts_address").
ColumnExpr("accounts.address_array as accounts_address_array"),
).
ColumnExpr("*")
}
Expand All @@ -143,7 +142,7 @@ func (s *Store) selectAccountWithVolumes(date *time.Time, useInsertionDate bool,
where, args, err := builder.Build(query.ContextFn(func(key, operator string, value any) (string, []any, error) {
switch {
case key == "address":
return filterAccountAddress(value.(string), "account_address"), nil, nil
return filterAccountAddress(value.(string), "accounts_address"), nil, nil
case metadataRegex.Match([]byte(key)):
match := metadataRegex.FindAllStringSubmatch(key, 3)

Expand Down Expand Up @@ -192,7 +191,7 @@ func (s *Store) GetBalances(ctx context.Context, query ledgercontroller.BalanceQ
args := make([]any, 0)
for account, assets := range query {
for _, asset := range assets {
conditions = append(conditions, "account = ? and asset = ?")
conditions = append(conditions, "accounts_address = ? and asset = ?")
args = append(args, account, asset)
}
}
Expand All @@ -201,12 +200,12 @@ func (s *Store) GetBalances(ctx context.Context, query ledgercontroller.BalanceQ
err := s.db.NewSelect().
Model(&balances).
ModelTableExpr(s.GetPrefixedRelationName("accounts_volumes")).
ColumnExpr("account, asset").
ColumnExpr("accounts_address, asset").
ColumnExpr("input - output as balance").
Where("("+strings.Join(conditions, ") OR (")+")", args...).
For("update").
// notes(gfyrag): keep order, it ensures consistent locking order and limit deadlocks
Order("account", "asset").
Order("accounts_address", "asset").
Scan(ctx)
if err != nil {
return nil, postgres.ResolveError(err)
Expand Down
3 changes: 0 additions & 3 deletions internal/storage/ledger/balances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func TestBalancesGet(t *testing.T) {
Asset: "USD",
Input: new(big.Int),
Output: big.NewInt(100),
AccountsSeq: world.Seq,
})
require.NoError(t, err)

Expand Down Expand Up @@ -278,7 +277,6 @@ func TestUpdateBalances(t *testing.T) {
Asset: "USD/2",
Input: big.NewInt(0),
Output: big.NewInt(100),
AccountsSeq: world.Seq,
})
require.NoError(t, err)
require.Equal(t, ledger.PostCommitVolumes{
Expand Down Expand Up @@ -307,7 +305,6 @@ func TestUpdateBalances(t *testing.T) {
Asset: "USD/2",
Input: big.NewInt(50),
Output: big.NewInt(50),
AccountsSeq: world.Seq,
})
require.NoError(t, err)
require.Equal(t, ledger.PostCommitVolumes{
Expand Down
Loading

0 comments on commit a14530e

Please sign in to comment.