From f5abcf784f328d531595707178563a4941834802 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Wed, 8 Jun 2022 10:31:05 +0200 Subject: [PATCH 01/14] Update lib to use generic cursor. --- pkg/opentelemetry/opentelemetrytraces/storage.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/opentelemetry/opentelemetrytraces/storage.go b/pkg/opentelemetry/opentelemetrytraces/storage.go index 51dd4cfc8..e72687276 100644 --- a/pkg/opentelemetry/opentelemetrytraces/storage.go +++ b/pkg/opentelemetry/opentelemetrytraces/storage.go @@ -151,9 +151,9 @@ func (o *openTelemetryStorage) CountAccounts(ctx context.Context, q query.Accoun return } -func (o *openTelemetryStorage) GetAccounts(ctx context.Context, q query.Accounts) (c sharedapi.Cursor[core.Account], err error) { +func (o *openTelemetryStorage) GetAccounts(ctx context.Context, query query.Accounts) (c sharedapi.Cursor[core.Account], err error) { handlingErr := o.handle(ctx, "GetAccounts", func(ctx context.Context) error { - c, err = o.underlying.GetAccounts(ctx, q) + c, err = o.underlying.GetAccounts(ctx, query) return err }) if handlingErr != nil { From c7a62dde496b519395a8b0661b26d22dedfcee97 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Wed, 8 Jun 2022 10:52:49 +0200 Subject: [PATCH 02/14] Introduce core.Volume structure. --- .../controllers/account_controller_test.go | 5 ++-- pkg/core/account.go | 9 ++++-- pkg/ledger/ledger.go | 30 +++++++++++-------- pkg/ledger/ledger_test.go | 22 ++++++-------- pkg/storage/sqlstorage/aggregations.go | 8 ++--- pkg/storage/sqlstorage/migrations_test.go | 4 +-- pkg/storage/sqlstorage/store_test.go | 5 ++-- 7 files changed, 43 insertions(+), 40 deletions(-) diff --git a/pkg/api/controllers/account_controller_test.go b/pkg/api/controllers/account_controller_test.go index 73fc271d2..8122e6b23 100644 --- a/pkg/api/controllers/account_controller_test.go +++ b/pkg/api/controllers/account_controller_test.go @@ -178,10 +178,9 @@ func TestGetAccount(t *testing.T) { Balances: map[string]int64{ "USD": 100, }, - Volumes: map[string]map[string]int64{ + Volumes: core.Volumes{ "USD": { - "input": 100, - "output": 0, + Input: 100, }, }, Metadata: core.Metadata{ diff --git a/pkg/core/account.go b/pkg/core/account.go index 68c2997e5..3b6abf64a 100644 --- a/pkg/core/account.go +++ b/pkg/core/account.go @@ -4,13 +4,18 @@ const ( WORLD = "world" ) +type Volume struct { + Input int64 `json:"input"` + Output int64 `json:"output"` +} + type Balances map[string]int64 -type Volumes map[string]map[string]int64 +type Volumes map[string]Volume func (v Volumes) Balances() Balances { balances := Balances{} for asset, vv := range v { - balances[asset] = vv["input"] - vv["output"] + balances[asset] = vv.Input - vv.Output } return balances } diff --git a/pkg/ledger/ledger.go b/pkg/ledger/ledger.go index bdfc16c48..29625041d 100644 --- a/pkg/ledger/ledger.go +++ b/pkg/ledger/ledger.go @@ -101,22 +101,24 @@ func (l *Ledger) processTx(ctx context.Context, ts []core.TransactionData) (core return nil, nil, nil, NewTransactionCommitError(i, NewValidationError("invalid asset")) } if _, ok := rf[p.Source]; !ok { - rf[p.Source] = map[string]map[string]int64{} + rf[p.Source] = core.Volumes{} } if _, ok := rf[p.Source][p.Asset]; !ok { - rf[p.Source][p.Asset] = map[string]int64{"input": 0, "output": 0} + rf[p.Source][p.Asset] = core.Volume{} } - - rf[p.Source][p.Asset]["output"] += p.Amount + volume := rf[p.Source][p.Asset] + volume.Output += p.Amount + rf[p.Source][p.Asset] = volume if _, ok := rf[p.Destination]; !ok { - rf[p.Destination] = map[string]map[string]int64{} + rf[p.Destination] = core.Volumes{} } if _, ok := rf[p.Destination][p.Asset]; !ok { - rf[p.Destination][p.Asset] = map[string]int64{"input": 0, "output": 0} + rf[p.Destination][p.Asset] = core.Volume{} } - - rf[p.Destination][p.Asset]["input"] += p.Amount + volume = rf[p.Destination][p.Asset] + volume.Input += p.Amount + rf[p.Destination][p.Asset] = volume } for addr := range rf { @@ -127,12 +129,12 @@ func (l *Ledger) processTx(ctx context.Context, ts []core.TransactionData) (core } } - for asset, volumes := range rf[addr] { + for asset, transfer := range rf[addr] { if _, ok := aggregatedVolumes[addr][asset]; !ok { - aggregatedVolumes[addr][asset] = map[string]int64{"input": 0, "output": 0} + aggregatedVolumes[addr][asset] = core.Volume{} } if addr != "world" { - expectedBalance := aggregatedVolumes[addr][asset]["input"] - aggregatedVolumes[addr][asset]["output"] + volumes["input"] - volumes["output"] + expectedBalance := aggregatedVolumes[addr][asset].Input - aggregatedVolumes[addr][asset].Output + transfer.Input - transfer.Output for _, contract := range contracts { if contract.Match(addr) { account, ok := accounts[addr] @@ -157,8 +159,10 @@ func (l *Ledger) processTx(ctx context.Context, ts []core.TransactionData) (core } } } - aggregatedVolumes[addr][asset]["input"] += volumes["input"] - aggregatedVolumes[addr][asset]["output"] += volumes["output"] + volume := aggregatedVolumes[addr][asset] + volume.Input += transfer.Input + volume.Output += transfer.Output + aggregatedVolumes[addr][asset] = volume } } diff --git a/pkg/ledger/ledger_test.go b/pkg/ledger/ledger_test.go index 3645625f9..75846c5e3 100644 --- a/pkg/ledger/ledger_test.go +++ b/pkg/ledger/ledger_test.go @@ -274,30 +274,26 @@ func TestTransactionExpectedVolumes(t *testing.T) { assert.NoError(t, err) assert.EqualValues(t, volumes, core.AggregatedVolumes{ - "world": map[string]map[string]int64{ + "world": core.Volumes{ "USD": { - "input": 0, - "output": 100, + Output: 100, }, "EUR": { - "input": 0, - "output": 200, + Output: 200, }, }, - "player": map[string]map[string]int64{ + "player": core.Volumes{ "USD": { - "input": 100, - "output": 0, + Input: 100, }, "EUR": { - "input": 100, - "output": 50, + Input: 100, + Output: 50, }, }, - "player2": map[string]map[string]int64{ + "player2": core.Volumes{ "EUR": { - "input": 150, - "output": 0, + Input: 150, }, }, }) diff --git a/pkg/storage/sqlstorage/aggregations.go b/pkg/storage/sqlstorage/aggregations.go index 63dcb4dba..0ee0498aa 100644 --- a/pkg/storage/sqlstorage/aggregations.go +++ b/pkg/storage/sqlstorage/aggregations.go @@ -57,7 +57,7 @@ func (s *Store) aggregateVolumes(ctx context.Context, exec executor, address str } }(rows) - volumes := make(map[string]map[string]int64) + volumes := core.Volumes{} for rows.Next() { var ( asset string @@ -68,9 +68,9 @@ func (s *Store) aggregateVolumes(ctx context.Context, exec executor, address str if err != nil { return nil, s.error(err) } - volumes[asset] = map[string]int64{ - "input": input, - "output": output, + volumes[asset] = core.Volume{ + Input: input, + Output: output, } } if err := rows.Err(); err != nil { diff --git a/pkg/storage/sqlstorage/migrations_test.go b/pkg/storage/sqlstorage/migrations_test.go index 34fa7eed9..a90011c3f 100644 --- a/pkg/storage/sqlstorage/migrations_test.go +++ b/pkg/storage/sqlstorage/migrations_test.go @@ -257,8 +257,8 @@ var postMigrate = map[string]func(t *testing.T, store *sqlstorage.Store){ } if !assert.Equal(t, core.Volumes{ "USD": { - "input": 100, - "output": 1, + Input: 100, + Output: 1, }, }, volumes) { return diff --git a/pkg/storage/sqlstorage/store_test.go b/pkg/storage/sqlstorage/store_test.go index 4992fca16..1058ff7c8 100644 --- a/pkg/storage/sqlstorage/store_test.go +++ b/pkg/storage/sqlstorage/store_test.go @@ -214,9 +214,8 @@ func testAggregateVolumes(t *testing.T, store *sqlstorage.Store) { volumes, err := store.AggregateVolumes(context.Background(), "central_bank") assert.NoError(t, err) assert.Len(t, volumes, 1) - assert.Len(t, volumes["USD"], 2) - assert.EqualValues(t, 100, volumes["USD"]["input"]) - assert.EqualValues(t, 0, volumes["USD"]["output"]) + assert.EqualValues(t, 100, volumes["USD"].Input) + assert.EqualValues(t, 0, volumes["USD"].Output) } func testGetAccounts(t *testing.T, store *sqlstorage.Store) { From 3057dbf7ad86c5965023d6a8af189ae7f381b77d Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Wed, 8 Jun 2022 10:58:16 +0200 Subject: [PATCH 03/14] Rename AggregateVolumes to GetAccountVolumes --- pkg/ledger/ledger.go | 4 ++-- pkg/opentelemetry/opentelemetrytraces/storage.go | 8 ++++---- pkg/opentelemetry/opentelemetrytraces/storage_test.go | 4 ++-- pkg/storage/sqlstorage/aggregations.go | 2 +- pkg/storage/sqlstorage/migrations_test.go | 2 +- pkg/storage/sqlstorage/store_bench_test.go | 4 ++-- pkg/storage/sqlstorage/store_test.go | 4 ++-- pkg/storage/storage.go | 4 ++-- 8 files changed, 16 insertions(+), 16 deletions(-) diff --git a/pkg/ledger/ledger.go b/pkg/ledger/ledger.go index 29625041d..f3d7f05ab 100644 --- a/pkg/ledger/ledger.go +++ b/pkg/ledger/ledger.go @@ -123,7 +123,7 @@ func (l *Ledger) processTx(ctx context.Context, ts []core.TransactionData) (core for addr := range rf { if _, ok := aggregatedVolumes[addr]; !ok { - aggregatedVolumes[addr], err = l.store.AggregateVolumes(ctx, addr) + aggregatedVolumes[addr], err = l.store.GetAccountVolumes(ctx, addr) if err != nil { return nil, nil, nil, err } @@ -296,7 +296,7 @@ func (l *Ledger) GetAccount(ctx context.Context, address string) (core.Account, return core.Account{}, err } - volumes, err := l.store.AggregateVolumes(ctx, address) + volumes, err := l.store.GetAccountVolumes(ctx, address) if err != nil { return account, err } diff --git a/pkg/opentelemetry/opentelemetrytraces/storage.go b/pkg/opentelemetry/opentelemetrytraces/storage.go index e72687276..892383374 100644 --- a/pkg/opentelemetry/opentelemetrytraces/storage.go +++ b/pkg/opentelemetry/opentelemetrytraces/storage.go @@ -129,13 +129,13 @@ func (o *openTelemetryStorage) GetAccount(ctx context.Context, s string) (tx cor return } -func (o *openTelemetryStorage) AggregateVolumes(ctx context.Context, s string) (volumes core.Volumes, err error) { - handlingErr := o.handle(ctx, "AggregateVolumes", func(ctx context.Context) error { - volumes, err = o.underlying.AggregateVolumes(ctx, s) +func (o *openTelemetryStorage) GetAccountVolumes(ctx context.Context, s string) (volumes core.Volumes, err error) { + handlingErr := o.handle(ctx, "GetAccountVolumes", func(ctx context.Context) error { + volumes, err = o.underlying.GetAccountVolumes(ctx, s) return err }) if handlingErr != nil { - sharedlogging.Errorf("opentelemetry AggregateVolumes: %s", handlingErr) + sharedlogging.Errorf("opentelemetry GetAccountVolumes: %s", handlingErr) } return } diff --git a/pkg/opentelemetry/opentelemetrytraces/storage_test.go b/pkg/opentelemetry/opentelemetrytraces/storage_test.go index 79dfb6201..e960a3d27 100644 --- a/pkg/opentelemetry/opentelemetrytraces/storage_test.go +++ b/pkg/opentelemetry/opentelemetrytraces/storage_test.go @@ -36,7 +36,7 @@ func TestStore(t *testing.T) { fn: testCountAccounts, }, { - name: "AggregateVolumes", + name: "GetAccountVolumes", fn: testAggregateVolumes, }, { @@ -89,7 +89,7 @@ func testCountAccounts(t *testing.T, store storage.Store) { } func testAggregateVolumes(t *testing.T, store storage.Store) { - _, err := store.AggregateVolumes(context.Background(), "central_bank") + _, err := store.GetAccountVolumes(context.Background(), "central_bank") assert.NoError(t, err) } diff --git a/pkg/storage/sqlstorage/aggregations.go b/pkg/storage/sqlstorage/aggregations.go index 0ee0498aa..f7d632269 100644 --- a/pkg/storage/sqlstorage/aggregations.go +++ b/pkg/storage/sqlstorage/aggregations.go @@ -80,6 +80,6 @@ func (s *Store) aggregateVolumes(ctx context.Context, exec executor, address str return volumes, nil } -func (s *Store) AggregateVolumes(ctx context.Context, address string) (core.Volumes, error) { +func (s *Store) GetAccountVolumes(ctx context.Context, address string) (core.Volumes, error) { return s.aggregateVolumes(ctx, s.schema, address) } diff --git a/pkg/storage/sqlstorage/migrations_test.go b/pkg/storage/sqlstorage/migrations_test.go index a90011c3f..bf3e74628 100644 --- a/pkg/storage/sqlstorage/migrations_test.go +++ b/pkg/storage/sqlstorage/migrations_test.go @@ -251,7 +251,7 @@ var postMigrate = map[string]func(t *testing.T, store *sqlstorage.Store){ return } - volumes, err := store.AggregateVolumes(context.Background(), "player1") + volumes, err := store.GetAccountVolumes(context.Background(), "player1") if !assert.NoError(t, err) { return } diff --git a/pkg/storage/sqlstorage/store_bench_test.go b/pkg/storage/sqlstorage/store_bench_test.go index 5e35035bf..16815e0bc 100644 --- a/pkg/storage/sqlstorage/store_bench_test.go +++ b/pkg/storage/sqlstorage/store_bench_test.go @@ -71,7 +71,7 @@ func BenchmarkStore(b *testing.B) { fn: testBenchmarkLastLog, }, { - name: "AggregateVolumes", + name: "GetAccountVolumes", fn: testBenchmarkAggregateVolumes, }, { @@ -222,7 +222,7 @@ func testBenchmarkAggregateVolumes(b *testing.B, store *sqlstorage.Store) { b.ResetTimer() for n := 0; n < b.N; n++ { - _, err := store.AggregateVolumes(context.Background(), "world") + _, err := store.GetAccountVolumes(context.Background(), "world") assert.NoError(b, err) } diff --git a/pkg/storage/sqlstorage/store_test.go b/pkg/storage/sqlstorage/store_test.go index 1058ff7c8..cf721e1b7 100644 --- a/pkg/storage/sqlstorage/store_test.go +++ b/pkg/storage/sqlstorage/store_test.go @@ -54,7 +54,7 @@ func TestStore(t *testing.T) { fn: testCountAccounts, }, { - name: "AggregateVolumes", + name: "GetAccountVolumes", fn: testAggregateVolumes, }, { @@ -211,7 +211,7 @@ func testAggregateVolumes(t *testing.T, store *sqlstorage.Store) { err := store.AppendLog(context.Background(), core.NewTransactionLog(nil, tx)) assert.NoError(t, err) - volumes, err := store.AggregateVolumes(context.Background(), "central_bank") + volumes, err := store.GetAccountVolumes(context.Background(), "central_bank") assert.NoError(t, err) assert.Len(t, volumes, 1) assert.EqualValues(t, 100, volumes["USD"].Input) diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index ad0d2411b..f938ccaf5 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -65,7 +65,7 @@ type Store interface { GetTransactions(context.Context, query.Transactions) (sharedapi.Cursor[core.Transaction], error) GetTransaction(context.Context, uint64) (core.Transaction, error) GetAccount(context.Context, string) (core.Account, error) - AggregateVolumes(context.Context, string) (core.Volumes, error) + GetAccountVolumes(context.Context, string) (core.Volumes, error) CountAccounts(context.Context, query.Accounts) (uint64, error) GetAccounts(context.Context, query.Accounts) (sharedapi.Cursor[core.Account], error) @@ -115,7 +115,7 @@ func (n noOpStore) GetAccount(ctx context.Context, s string) (core.Account, erro return core.Account{}, nil } -func (n noOpStore) AggregateVolumes(ctx context.Context, s string) (core.Volumes, error) { +func (n noOpStore) GetAccountVolumes(ctx context.Context, s string) (core.Volumes, error) { return nil, nil } From 986359eb05b31f26957a443a90bfe71df8d841c5 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Wed, 8 Jun 2022 11:04:02 +0200 Subject: [PATCH 04/14] Add GetAccountVolume on store. --- .../opentelemetrytraces/storage.go | 11 ++++++ pkg/storage/sqlstorage/aggregations.go | 34 +++++++++++++++++-- pkg/storage/storage.go | 5 +++ 3 files changed, 48 insertions(+), 2 deletions(-) diff --git a/pkg/opentelemetry/opentelemetrytraces/storage.go b/pkg/opentelemetry/opentelemetrytraces/storage.go index 892383374..d9be0c332 100644 --- a/pkg/opentelemetry/opentelemetrytraces/storage.go +++ b/pkg/opentelemetry/opentelemetrytraces/storage.go @@ -140,6 +140,17 @@ func (o *openTelemetryStorage) GetAccountVolumes(ctx context.Context, s string) return } +func (o *openTelemetryStorage) GetAccountVolume(ctx context.Context, account, asset string) (volume core.Volume, err error) { + handlingErr := o.handle(ctx, "GetAccountVolume", func(ctx context.Context) error { + volume, err = o.underlying.GetAccountVolume(ctx, account, asset) + return err + }) + if handlingErr != nil { + sharedlogging.Errorf("opentelemetry GetAccountVolumes: %s", handlingErr) + } + return +} + func (o *openTelemetryStorage) CountAccounts(ctx context.Context, q query.Accounts) (count uint64, err error) { handlingErr := o.handle(ctx, "CountAccounts", func(ctx context.Context) error { count, err = o.underlying.CountAccounts(ctx, q) diff --git a/pkg/storage/sqlstorage/aggregations.go b/pkg/storage/sqlstorage/aggregations.go index f7d632269..3a3052126 100644 --- a/pkg/storage/sqlstorage/aggregations.go +++ b/pkg/storage/sqlstorage/aggregations.go @@ -39,7 +39,7 @@ func (s *Store) CountAccounts(ctx context.Context, q query.Accounts) (uint64, er return s.countAccounts(ctx, s.schema, q.Params) } -func (s *Store) aggregateVolumes(ctx context.Context, exec executor, address string) (core.Volumes, error) { +func (s *Store) getAccountVolumes(ctx context.Context, exec executor, address string) (core.Volumes, error) { sb := sqlbuilder.NewSelectBuilder() sb.Select("asset", "input", "output") sb.From(s.schema.Table("volumes")) @@ -81,5 +81,35 @@ func (s *Store) aggregateVolumes(ctx context.Context, exec executor, address str } func (s *Store) GetAccountVolumes(ctx context.Context, address string) (core.Volumes, error) { - return s.aggregateVolumes(ctx, s.schema, address) + return s.getAccountVolumes(ctx, s.schema, address) +} + +func (s *Store) getAccountVolume(ctx context.Context, exec executor, address, asset string) (core.Volume, error) { + sb := sqlbuilder.NewSelectBuilder() + sb.Select("input", "output") + sb.From(s.schema.Table("volumes")) + sb.Where(sb.And(sb.E("account", address), sb.E("asset", asset))) + + q, args := sb.BuildWithFlavor(s.schema.Flavor()) + row := exec.QueryRowContext(ctx, q, args...) + if row.Err() != nil { + return core.Volume{}, s.error(row.Err()) + } + + var ( + input int64 + output int64 + ) + err := row.Scan(&asset, &input, &output) + if err != nil { + return core.Volume{}, s.error(err) + } + return core.Volume{ + Input: input, + Output: output, + }, nil +} + +func (s *Store) GetAccountVolume(ctx context.Context, address, asset string) (core.Volume, error) { + return s.getAccountVolume(ctx, s.schema, address, asset) } diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index f938ccaf5..90f058528 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -66,6 +66,7 @@ type Store interface { GetTransaction(context.Context, uint64) (core.Transaction, error) GetAccount(context.Context, string) (core.Account, error) GetAccountVolumes(context.Context, string) (core.Volumes, error) + GetAccountVolume(ctx context.Context, account, asset string) (core.Volume, error) CountAccounts(context.Context, query.Accounts) (uint64, error) GetAccounts(context.Context, query.Accounts) (sharedapi.Cursor[core.Account], error) @@ -83,6 +84,10 @@ type Store interface { // A no op store. Useful for testing. type noOpStore struct{} +func (n noOpStore) GetAccountVolume(ctx context.Context, account, asset string) (core.Volume, error) { + return core.Volume{}, nil +} + func (n noOpStore) GetLastTransaction(ctx context.Context) (*core.Transaction, error) { return &core.Transaction{}, nil } From 2e03863b83d7a3c361bb08552a8b0f196dd45c41 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Wed, 8 Jun 2022 15:59:12 +0200 Subject: [PATCH 05/14] Add pre_commit_volumes and post_commit_volumes column on "transaction" table. Both are written on api response and bus events. --- pkg/api/controllers/swagger.yaml | 22 +++ .../transaction_controller_test.go | 49 ++++- pkg/bus/message.go | 7 +- pkg/bus/monitor.go | 8 +- pkg/bus/monitor_test.go | 3 +- pkg/core/account.go | 18 -- pkg/core/transaction.go | 6 +- pkg/core/volumes.go | 50 +++++ pkg/ledger/executor_test.go | 24 +-- pkg/ledger/ledger.go | 150 +++++++------- pkg/ledger/ledger_test.go | 101 +++++----- pkg/ledger/monitor.go | 4 +- pkg/ledger/stats_test.go | 2 +- pkg/ledger/verification_test.go | 2 +- pkg/ledger/volume_agg.go | 132 +++++++++++++ pkg/ledger/volume_agg_test.go | 184 ++++++++++++++++++ pkg/storage/sqlstorage/aggregations.go | 5 +- .../sqlstorage/migrations/postgresql/8.sql | 43 ++++ .../sqlstorage/migrations/sqlite/2.sql | 29 +++ pkg/storage/sqlstorage/migrations_test.go | 7 +- pkg/storage/sqlstorage/transactions.go | 12 +- 21 files changed, 681 insertions(+), 177 deletions(-) create mode 100644 pkg/core/volumes.go create mode 100644 pkg/ledger/volume_agg.go create mode 100644 pkg/ledger/volume_agg_test.go create mode 100644 pkg/storage/sqlstorage/migrations/postgresql/8.sql create mode 100644 pkg/storage/sqlstorage/migrations/sqlite/2.sql diff --git a/pkg/api/controllers/swagger.yaml b/pkg/api/controllers/swagger.yaml index 1e4e91bcb..4677ebc54 100644 --- a/pkg/api/controllers/swagger.yaml +++ b/pkg/api/controllers/swagger.yaml @@ -781,6 +781,10 @@ components: format: date-time txid: type: integer + preCommitVolumes: + $ref: '#/components/schemas/AggregatedVolumes' + postCommitVolumes: + $ref: '#/components/schemas/AggregatedVolumes' required: - postings - timestamp @@ -864,6 +868,24 @@ components: type: object required: - data + Volume: + type: object + properties: + input: + type: number + output: + type: number + required: + - input + - output + Volumes: + type: object + additionalProperties: + $ref: '#/components/schemas/Volume' + AggregatedVolumes: + type: object + additionalProperties: + $ref: '#/components/schemas/Volumes' ErrorCode: type: string enum: diff --git a/pkg/api/controllers/transaction_controller_test.go b/pkg/api/controllers/transaction_controller_test.go index 6272aa019..9feba3b84 100644 --- a/pkg/api/controllers/transaction_controller_test.go +++ b/pkg/api/controllers/transaction_controller_test.go @@ -191,25 +191,66 @@ func TestGetTransaction(t *testing.T) { }) require.Equal(t, http.StatusOK, rsp.Result().StatusCode) - tx, _ := internal.DecodeSingleResponse[[]core.Transaction](t, rsp.Body) + txs, _ := internal.DecodeSingleResponse[[]core.Transaction](t, rsp.Body) + tx := txs[0] + assert.EqualValues(t, core.AggregatedVolumes{ + "world": core.Volumes{ + "USD": {}, + }, + "central_bank": core.Volumes{ + "USD": {}, + }, + }, tx.PreCommitVolumes) + assert.EqualValues(t, core.AggregatedVolumes{ + "world": core.Volumes{ + "USD": { + Output: 1000, + }, + }, + "central_bank": core.Volumes{ + "USD": { + Input: 1000, + }, + }, + }, tx.PostCommitVolumes) - rsp = internal.GetTransaction(api, tx[0].ID) + rsp = internal.GetTransaction(api, tx.ID) assert.Equal(t, http.StatusOK, rsp.Result().StatusCode) ret, _ := internal.DecodeSingleResponse[core.Transaction](t, rsp.Body) - assert.EqualValues(t, ret.Postings, core.Postings{ + assert.EqualValues(t, core.Postings{ { Source: "world", Destination: "central_bank", Amount: 1000, Asset: "USD", }, - }) + }, ret.Postings) assert.EqualValues(t, 0, ret.ID) assert.EqualValues(t, core.Metadata{}, ret.Metadata) assert.EqualValues(t, "ref", ret.Reference) assert.NotEmpty(t, ret.Timestamp) + assert.EqualValues(t, core.AggregatedVolumes{ + "world": core.Volumes{ + "USD": {}, + }, + "central_bank": core.Volumes{ + "USD": {}, + }, + }, ret.PreCommitVolumes) + assert.EqualValues(t, core.AggregatedVolumes{ + "world": core.Volumes{ + "USD": { + Output: 1000, + }, + }, + "central_bank": core.Volumes{ + "USD": { + Input: 1000, + }, + }, + }, ret.PostCommitVolumes) return nil }, }) diff --git a/pkg/bus/message.go b/pkg/bus/message.go index c79ce3542..835ed9cf2 100644 --- a/pkg/bus/message.go +++ b/pkg/bus/message.go @@ -14,8 +14,11 @@ type baseEvent struct { } type committedTransactions struct { - Transactions []core.Transaction `json:"transactions"` - Volumes core.AggregatedVolumes `json:"volumes"` + Transactions []core.Transaction `json:"transactions"` + // Deprecated (use postCommitVolumes) + Volumes core.AggregatedVolumes `json:"volumes"` + PostCommitVolumes core.AggregatedVolumes `json:"postCommitVolumes"` + PreCommitVolumes core.AggregatedVolumes `json:"preCommitVolumes"` } type savedMetadata struct { diff --git a/pkg/bus/monitor.go b/pkg/bus/monitor.go index 0eeb8b59d..0cddba491 100644 --- a/pkg/bus/monitor.go +++ b/pkg/bus/monitor.go @@ -35,10 +35,12 @@ func (l *ledgerMonitor) publish(ctx context.Context, ledger string, et string, d } } -func (l *ledgerMonitor) CommittedTransactions(ctx context.Context, ledger string, results []core.Transaction, volumes core.AggregatedVolumes) { +func (l *ledgerMonitor) CommittedTransactions(ctx context.Context, ledger string, result *ledger.CommitmentResult) { l.publish(ctx, ledger, CommittedTransactions, committedTransactions{ - Transactions: results, - Volumes: volumes, + Transactions: result.GeneratedTransactions, + Volumes: result.PostCommitVolumes, + PostCommitVolumes: result.PostCommitVolumes, + PreCommitVolumes: result.PreCommitVolumes, }) } diff --git a/pkg/bus/monitor_test.go b/pkg/bus/monitor_test.go index 38df52c79..7b9e90740 100644 --- a/pkg/bus/monitor_test.go +++ b/pkg/bus/monitor_test.go @@ -2,6 +2,7 @@ package bus import ( "context" + "github.com/numary/ledger/pkg/ledger" "testing" "time" @@ -28,7 +29,7 @@ func TestMonitor(t *testing.T) { "*": "testing", }) m := NewLedgerMonitor(p) - go m.CommittedTransactions(context.Background(), uuid.New(), nil, nil) + go m.CommittedTransactions(context.Background(), uuid.New(), &ledger.CommitmentResult{}) select { case m := <-messages: diff --git a/pkg/core/account.go b/pkg/core/account.go index 3b6abf64a..ddbc2ffc1 100644 --- a/pkg/core/account.go +++ b/pkg/core/account.go @@ -4,24 +4,6 @@ const ( WORLD = "world" ) -type Volume struct { - Input int64 `json:"input"` - Output int64 `json:"output"` -} - -type Balances map[string]int64 -type Volumes map[string]Volume - -func (v Volumes) Balances() Balances { - balances := Balances{} - for asset, vv := range v { - balances[asset] = vv.Input - vv.Output - } - return balances -} - -type AggregatedVolumes map[string]Volumes - type Account struct { Address string `json:"address" example:"users:001"` Type string `json:"type,omitempty" example:"virtual"` diff --git a/pkg/core/transaction.go b/pkg/core/transaction.go index 333206406..07c716948 100644 --- a/pkg/core/transaction.go +++ b/pkg/core/transaction.go @@ -32,8 +32,10 @@ func (t *TransactionData) Reverse() TransactionData { type Transaction struct { TransactionData - ID uint64 `json:"txid"` - Timestamp string `json:"timestamp"` + ID uint64 `json:"txid"` + Timestamp string `json:"timestamp"` + PreCommitVolumes AggregatedVolumes `json:"preCommitVolumes,omitempty"` // Keep omitempty to keep consistent hash + PostCommitVolumes AggregatedVolumes `json:"postCommitVolumes,omitempty"` // Keep omitempty to keep consistent hash } func (t *Transaction) AppendPosting(p Posting) { diff --git a/pkg/core/volumes.go b/pkg/core/volumes.go new file mode 100644 index 000000000..38fc7f74f --- /dev/null +++ b/pkg/core/volumes.go @@ -0,0 +1,50 @@ +package core + +import ( + "database/sql/driver" + "encoding/json" +) + +type Volume struct { + Input int64 `json:"input"` + Output int64 `json:"output"` +} + +func (v Volume) Balance() int64 { + return v.Input - v.Output +} + +type Balances map[string]int64 +type Volumes map[string]Volume + +func (v Volumes) Balances() Balances { + balances := Balances{} + for asset, vv := range v { + balances[asset] = vv.Input - vv.Output + } + return balances +} + +type AggregatedVolumes map[string]Volumes + +// Scan - Implement the database/sql scanner interface +func (m *AggregatedVolumes) Scan(value interface{}) error { + if value == nil { + return nil + } + + v, err := driver.String.ConvertValue(value) + if err != nil { + return err + } + + *m = AggregatedVolumes{} + switch vv := v.(type) { + case []uint8: + return json.Unmarshal(vv, m) + case string: + return json.Unmarshal([]byte(vv), m) + default: + panic("not handled type") + } +} diff --git a/pkg/ledger/executor_test.go b/pkg/ledger/executor_test.go index b11699798..78653cf24 100644 --- a/pkg/ledger/executor_test.go +++ b/pkg/ledger/executor_test.go @@ -24,7 +24,7 @@ func assertBalance(t *testing.T, l *Ledger, account, asset string, amount int64) } func TestNoScript(t *testing.T) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { script := core.Script{} _, err := l.Execute(context.Background(), script) @@ -34,7 +34,7 @@ func TestNoScript(t *testing.T) { } func TestCompilationError(t *testing.T) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { script := core.Script{ Plain: "willnotcompile", } @@ -46,7 +46,7 @@ func TestCompilationError(t *testing.T) { } func TestTransactionInvalidScript(t *testing.T) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { script := core.Script{ Plain: "this is not a valid script", } @@ -59,7 +59,7 @@ func TestTransactionInvalidScript(t *testing.T) { } func TestTransactionFail(t *testing.T) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { script := core.Script{ Plain: "fail", } @@ -72,7 +72,7 @@ func TestTransactionFail(t *testing.T) { } func TestSend(t *testing.T) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { defer func(l *Ledger, ctx context.Context) { require.NoError(t, l.Close(ctx)) }(l, context.Background()) @@ -92,7 +92,7 @@ func TestSend(t *testing.T) { } func TestNoVariables(t *testing.T) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { var script core.Script err := json.Unmarshal( []byte(`{ @@ -110,7 +110,7 @@ func TestNoVariables(t *testing.T) { } func TestVariables(t *testing.T) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { defer func(l *Ledger, ctx context.Context) { require.NoError(t, l.Close(ctx)) }(l, context.Background()) @@ -141,7 +141,7 @@ func TestVariables(t *testing.T) { } func TestEnoughFunds(t *testing.T) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { defer func(l *Ledger, ctx context.Context) { require.NoError(t, l.Close(ctx)) }(l, context.Background()) @@ -174,7 +174,7 @@ func TestEnoughFunds(t *testing.T) { } func TestNotEnoughFunds(t *testing.T) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { defer func(l *Ledger, ctx context.Context) { require.NoError(t, l.Close(ctx)) }(l, context.Background()) @@ -207,7 +207,7 @@ func TestNotEnoughFunds(t *testing.T) { } func TestMissingMetadata(t *testing.T) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { defer func(l *Ledger, ctx context.Context) { require.NoError(t, l.Close(ctx)) }(l, context.Background()) @@ -237,7 +237,7 @@ func TestMissingMetadata(t *testing.T) { } func TestMetadata(t *testing.T) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { defer func(l *Ledger, ctx context.Context) { require.NoError(t, l.Close(ctx)) }(l, context.Background()) @@ -308,7 +308,7 @@ func TestMetadata(t *testing.T) { } func TestSetTxMeta(t *testing.T) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { defer func(l *Ledger, ctx context.Context) { require.NoError(t, l.Close(ctx)) }(l, context.Background()) diff --git a/pkg/ledger/ledger.go b/pkg/ledger/ledger.go index f3d7f05ab..2772d7c86 100644 --- a/pkg/ledger/ledger.go +++ b/pkg/ledger/ledger.go @@ -51,28 +51,36 @@ func (l *Ledger) Close(ctx context.Context) error { return nil } -func (l *Ledger) processTx(ctx context.Context, ts []core.TransactionData) (core.AggregatedVolumes, []core.Transaction, []core.Log, error) { +type CommitmentResult struct { + PreCommitVolumes core.AggregatedVolumes + PostCommitVolumes core.AggregatedVolumes + GeneratedTransactions []core.Transaction + GeneratedLogs []core.Log +} + +func (l *Ledger) processTx(ctx context.Context, ts []core.TransactionData) (*CommitmentResult, error) { mapping, err := l.store.LoadMapping(ctx) if err != nil { - return nil, nil, nil, errors.Wrap(err, "loading mapping") + return nil, errors.Wrap(err, "loading mapping") } lastLog, err := l.store.LastLog(ctx) if err != nil { - return nil, nil, nil, err + return nil, err } var nextTxId uint64 lastTx, err := l.store.GetLastTransaction(ctx) if err != nil { - return nil, nil, nil, err + return nil, err } if lastTx != nil { nextTxId = lastTx.ID + 1 } - txs := make([]core.Transaction, 0) - aggregatedVolumes := core.AggregatedVolumes{} + volumeAggregator := NewVolumeAggregator(l.store) + + generatedTxs := make([]core.Transaction, 0) accounts := make(map[string]core.Account, 0) logs := make([]core.Log, 0) contracts := make([]core.Contract, 0) @@ -83,102 +91,83 @@ func (l *Ledger) processTx(ctx context.Context, ts []core.TransactionData) (core for i, t := range ts { if len(t.Postings) == 0 { - return nil, nil, nil, NewTransactionCommitError(i, NewValidationError("transaction has no postings")) + return nil, NewTransactionCommitError(i, NewValidationError("transaction has no postings")) } - rf := core.AggregatedVolumes{} + txVolumeAggregator := volumeAggregator.NextTx() + for _, p := range t.Postings { if p.Amount < 0 { - return nil, nil, nil, NewTransactionCommitError(i, NewValidationError("negative amount")) + return nil, NewTransactionCommitError(i, NewValidationError("negative amount")) } if !core.ValidateAddress(p.Source) { - return nil, nil, nil, NewTransactionCommitError(i, NewValidationError("invalid source address")) + return nil, NewTransactionCommitError(i, NewValidationError("invalid source address")) } if !core.ValidateAddress(p.Destination) { - return nil, nil, nil, NewTransactionCommitError(i, NewValidationError("invalid destination address")) + return nil, NewTransactionCommitError(i, NewValidationError("invalid destination address")) } if !core.AssetIsValid(p.Asset) { - return nil, nil, nil, NewTransactionCommitError(i, NewValidationError("invalid asset")) + return nil, NewTransactionCommitError(i, NewValidationError("invalid asset")) } - if _, ok := rf[p.Source]; !ok { - rf[p.Source] = core.Volumes{} + err := txVolumeAggregator.Transfer(ctx, p.Source, p.Destination, p.Asset, uint64(p.Amount)) + if err != nil { + return nil, NewTransactionCommitError(i, err) } - if _, ok := rf[p.Source][p.Asset]; !ok { - rf[p.Source][p.Asset] = core.Volume{} - } - volume := rf[p.Source][p.Asset] - volume.Output += p.Amount - rf[p.Source][p.Asset] = volume - - if _, ok := rf[p.Destination]; !ok { - rf[p.Destination] = core.Volumes{} - } - if _, ok := rf[p.Destination][p.Asset]; !ok { - rf[p.Destination][p.Asset] = core.Volume{} - } - volume = rf[p.Destination][p.Asset] - volume.Input += p.Amount - rf[p.Destination][p.Asset] = volume } - for addr := range rf { - if _, ok := aggregatedVolumes[addr]; !ok { - aggregatedVolumes[addr], err = l.store.GetAccountVolumes(ctx, addr) - if err != nil { - return nil, nil, nil, err + for addr, volumes := range txVolumeAggregator.PostCommitVolumes() { + for asset, volume := range volumes { + if addr == "world" { + continue } - } - for asset, transfer := range rf[addr] { - if _, ok := aggregatedVolumes[addr][asset]; !ok { - aggregatedVolumes[addr][asset] = core.Volume{} - } - if addr != "world" { - expectedBalance := aggregatedVolumes[addr][asset].Input - aggregatedVolumes[addr][asset].Output + transfer.Input - transfer.Output - for _, contract := range contracts { - if contract.Match(addr) { - account, ok := accounts[addr] - if !ok { - account, err = l.store.GetAccount(ctx, addr) - if err != nil { - return nil, nil, nil, err - } - accounts[addr] = account + expectedBalance := volume.Balance() + for _, contract := range contracts { + if contract.Match(addr) { + account, ok := accounts[addr] + if !ok { + account, err = l.store.GetAccount(ctx, addr) + if err != nil { + return nil, err } + accounts[addr] = account + } - if ok = contract.Expr.Eval(core.EvalContext{ - Variables: map[string]interface{}{ - "balance": float64(expectedBalance), - }, - Metadata: account.Metadata, - Asset: asset, - }); !ok { - return nil, nil, nil, NewTransactionCommitError(i, NewInsufficientFundError(asset)) - } - break + if ok = contract.Expr.Eval(core.EvalContext{ + Variables: map[string]interface{}{ + "balance": float64(expectedBalance), + }, + Metadata: account.Metadata, + Asset: asset, + }); !ok { + return nil, NewTransactionCommitError(i, NewInsufficientFundError(asset)) } + break } } - volume := aggregatedVolumes[addr][asset] - volume.Input += transfer.Input - volume.Output += transfer.Output - aggregatedVolumes[addr][asset] = volume } } tx := core.Transaction{ - TransactionData: t, - ID: nextTxId, - Timestamp: time.Now().UTC().Format(time.RFC3339), + TransactionData: t, + ID: nextTxId, + Timestamp: time.Now().UTC().Format(time.RFC3339), + PostCommitVolumes: txVolumeAggregator.PostCommitVolumes(), + PreCommitVolumes: txVolumeAggregator.PreCommitVolumes(), } - txs = append(txs, tx) + generatedTxs = append(generatedTxs, tx) newLog := core.NewTransactionLog(lastLog, tx) lastLog = &newLog logs = append(logs, newLog) nextTxId++ } - return aggregatedVolumes, txs, logs, nil + return &CommitmentResult{ + PreCommitVolumes: volumeAggregator.AggregatedPreCommitVolumes(), + PostCommitVolumes: volumeAggregator.AggregatedPostCommitVolumes(), + GeneratedTransactions: generatedTxs, + GeneratedLogs: logs, + }, nil } func (l *Ledger) Commit(ctx context.Context, ts []core.TransactionData) (core.AggregatedVolumes, []core.Transaction, error) { @@ -188,12 +177,12 @@ func (l *Ledger) Commit(ctx context.Context, ts []core.TransactionData) (core.Ag } defer unlock(ctx) - volumes, txs, logs, err := l.processTx(ctx, ts) + result, err := l.processTx(ctx, ts) if err != nil { return nil, nil, err } - if err = l.store.AppendLog(ctx, logs...); err != nil { + if err = l.store.AppendLog(ctx, result.GeneratedLogs...); err != nil { switch { case storage.IsErrorCode(err, storage.ConstraintFailed): return nil, nil, NewConflictError() @@ -202,9 +191,9 @@ func (l *Ledger) Commit(ctx context.Context, ts []core.TransactionData) (core.Ag } } - l.monitor.CommittedTransactions(ctx, l.name, txs, volumes) + l.monitor.CommittedTransactions(ctx, l.name, result) - return volumes, txs, nil + return result.PostCommitVolumes, result.GeneratedTransactions, nil } func (l *Ledger) CommitPreview(ctx context.Context, ts []core.TransactionData) (core.AggregatedVolumes, []core.Transaction, error) { @@ -214,8 +203,8 @@ func (l *Ledger) CommitPreview(ctx context.Context, ts []core.TransactionData) ( } defer unlock(ctx) - volumes, txs, _, err := l.processTx(ctx, ts) - return volumes, txs, err + result, err := l.processTx(ctx, ts) + return result.PostCommitVolumes, result.GeneratedTransactions, err } func (l *Ledger) GetTransactions(ctx context.Context, m ...query.TxModifier) (sharedapi.Cursor[core.Transaction], error) { @@ -261,23 +250,24 @@ func (l *Ledger) RevertTransaction(ctx context.Context, id uint64) (*core.Transa } defer unlock(ctx) - _, txs, logs, err := l.processTx(ctx, []core.TransactionData{rt}) + result, err := l.processTx(ctx, []core.TransactionData{rt}) if err != nil { return nil, err } + logs := result.GeneratedLogs logs = append(logs, core.NewSetMetadataLog(&logs[len(logs)-1], core.SetMetadata{ TargetType: core.MetaTargetTypeTransaction, TargetID: id, - Metadata: core.RevertedMetadata(txs[0].ID), + Metadata: core.RevertedMetadata(result.GeneratedTransactions[0].ID), })) err = l.store.AppendLog(ctx, logs...) if err != nil { return nil, err } - l.monitor.RevertedTransaction(ctx, l.name, tx, txs[0]) - return &txs[0], nil + l.monitor.RevertedTransaction(ctx, l.name, tx, result.GeneratedTransactions[0]) + return &result.GeneratedTransactions[0], nil } func (l *Ledger) CountAccounts(ctx context.Context, m ...query.AccModifier) (uint64, error) { diff --git a/pkg/ledger/ledger_test.go b/pkg/ledger/ledger_test.go index 75846c5e3..9bd77efce 100644 --- a/pkg/ledger/ledger_test.go +++ b/pkg/ledger/ledger_test.go @@ -22,39 +22,21 @@ import ( "go.uber.org/fx" ) -func with(f func(l *Ledger)) { +func withContainer(options ...fx.Option) { done := make(chan struct{}) - app := fx.New( + opts := append([]fx.Option{ fx.NopLogger, ledgertesting.ProvideStorageDriver(), - fx.Invoke(func(lc fx.Lifecycle, storageDriver storage.Driver) { - lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - defer func() { - close(done) - }() - name := uuid.New() - store, _, err := storageDriver.GetStore(context.Background(), name, true) - if err != nil { - return err - } - _, err = store.Initialize(context.Background()) - if err != nil { - return err - } - l, err := NewLedger(name, store, NewInMemoryLocker(), &noOpMonitor{}) - if err != nil { - panic(err) - } - lc.Append(fx.Hook{ - OnStop: l.Close, - }) - f(l) - return nil - }, - }) - }), - ) + }, options...) + opts = append(opts, fx.Invoke(func(lc fx.Lifecycle) { + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + close(done) + return nil + }, + }) + })) + app := fx.New(opts...) go func() { if err := app.Start(context.Background()); err != nil { panic(err) @@ -74,6 +56,33 @@ func with(f func(l *Ledger)) { } } +func runOnLedger(f func(l *Ledger)) { + withContainer(fx.Invoke(func(lc fx.Lifecycle, storageDriver storage.Driver) { + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + name := uuid.New() + store, _, err := storageDriver.GetStore(context.Background(), name, true) + if err != nil { + return err + } + _, err = store.Initialize(context.Background()) + if err != nil { + return err + } + l, err := NewLedger(name, store, NewInMemoryLocker(), &noOpMonitor{}) + if err != nil { + panic(err) + } + lc.Append(fx.Hook{ + OnStop: l.Close, + }) + f(l) + return nil + }, + }) + })) +} + func TestMain(m *testing.M) { var code int defer func() { @@ -89,7 +98,7 @@ func TestMain(m *testing.M) { } func TestTransaction(t *testing.T) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { testsize := 1e4 total := 0 batch := []core.TransactionData{} @@ -140,7 +149,7 @@ func TestTransaction(t *testing.T) { } func TestTransactionBatchWithIntermediateWrongState(t *testing.T) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { batch := []core.TransactionData{ { Postings: []core.Posting{ @@ -182,7 +191,7 @@ func TestTransactionBatchWithIntermediateWrongState(t *testing.T) { } func TestTransactionBatchWithConflictingReference(t *testing.T) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { batch := []core.TransactionData{ { Postings: []core.Posting{ @@ -226,7 +235,7 @@ func TestTransactionBatchWithConflictingReference(t *testing.T) { } func TestTransactionExpectedVolumes(t *testing.T) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { batch := []core.TransactionData{ { Postings: []core.Posting{ @@ -301,7 +310,7 @@ func TestTransactionExpectedVolumes(t *testing.T) { } func TestBalance(t *testing.T) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { _, _, err := l.Commit(context.Background(), []core.TransactionData{ { Postings: []core.Posting{ @@ -320,7 +329,7 @@ func TestBalance(t *testing.T) { } func TestReference(t *testing.T) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { tx := core.TransactionData{ Reference: "payment_processor_id_01", Postings: []core.Posting{ @@ -342,7 +351,7 @@ func TestReference(t *testing.T) { } func TestAccountMetadata(t *testing.T) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { err := l.SaveMeta(context.Background(), core.MetaTargetTypeAccount, "users:001", core.Metadata{ "a random metadata": json.RawMessage(`"old value"`), @@ -398,7 +407,7 @@ func TestAccountMetadata(t *testing.T) { } func TestTransactionMetadata(t *testing.T) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { _, _, err := l.Commit(context.Background(), []core.TransactionData{{ Postings: []core.Posting{ { @@ -438,7 +447,7 @@ func TestTransactionMetadata(t *testing.T) { } func TestSaveTransactionMetadata(t *testing.T) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { _, _, err := l.Commit(context.Background(), []core.TransactionData{{ Postings: []core.Posting{ { @@ -469,7 +478,7 @@ func TestSaveTransactionMetadata(t *testing.T) { } func TestGetTransaction(t *testing.T) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { _, _, err := l.Commit(context.Background(), []core.TransactionData{{ Reference: "bar", Postings: []core.Posting{ @@ -494,7 +503,7 @@ func TestGetTransaction(t *testing.T) { } func TestGetTransactions(t *testing.T) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { tx := core.TransactionData{ Postings: []core.Posting{ { @@ -517,7 +526,7 @@ func TestGetTransactions(t *testing.T) { } func TestRevertTransaction(t *testing.T) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { revertAmt := int64(100) _, txs, err := l.Commit(context.Background(), []core.TransactionData{{ @@ -574,7 +583,7 @@ func TestRevertTransaction(t *testing.T) { } func BenchmarkTransaction1(b *testing.B) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { for n := 0; n < b.N; n++ { txs := []core.TransactionData{} @@ -596,7 +605,7 @@ func BenchmarkTransaction1(b *testing.B) { } func BenchmarkTransaction_20_1k(b *testing.B) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { for n := 0; n < b.N; n++ { for i := 0; i < 20; i++ { txs := []core.TransactionData{} @@ -622,7 +631,7 @@ func BenchmarkTransaction_20_1k(b *testing.B) { } func BenchmarkGetAccount(b *testing.B) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { for i := 0; i < b.N; i++ { _, err := l.GetAccount(context.Background(), "users:013") require.NoError(b, err) @@ -631,7 +640,7 @@ func BenchmarkGetAccount(b *testing.B) { } func BenchmarkGetTransactions(b *testing.B) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { for i := 0; i < b.N; i++ { _, err := l.GetTransactions(context.Background()) require.NoError(b, err) diff --git a/pkg/ledger/monitor.go b/pkg/ledger/monitor.go index 29ae30f83..428929b14 100644 --- a/pkg/ledger/monitor.go +++ b/pkg/ledger/monitor.go @@ -7,7 +7,7 @@ import ( ) type Monitor interface { - CommittedTransactions(context.Context, string, []core.Transaction, core.AggregatedVolumes) + CommittedTransactions(context.Context, string, *CommitmentResult) SavedMetadata(ctx context.Context, ledger string, targetType string, id string, metadata core.Metadata) UpdatedMapping(context.Context, string, core.Mapping) RevertedTransaction(ctx context.Context, ledger string, reverted core.Transaction, revert core.Transaction) @@ -15,7 +15,7 @@ type Monitor interface { type noOpMonitor struct{} -func (n noOpMonitor) CommittedTransactions(ctx context.Context, s string, results []core.Transaction, volumes core.AggregatedVolumes) { +func (n noOpMonitor) CommittedTransactions(ctx context.Context, s string, result *CommitmentResult) { } func (n noOpMonitor) SavedMetadata(ctx context.Context, ledger string, targetType string, id string, metadata core.Metadata) { } diff --git a/pkg/ledger/stats_test.go b/pkg/ledger/stats_test.go index b30067a04..9ae5b158b 100644 --- a/pkg/ledger/stats_test.go +++ b/pkg/ledger/stats_test.go @@ -8,7 +8,7 @@ import ( ) func TestStats(t *testing.T) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { _, err := l.Stats(context.Background()) assert.NoError(t, err) }) diff --git a/pkg/ledger/verification_test.go b/pkg/ledger/verification_test.go index 795bbc577..7d2ebf93e 100644 --- a/pkg/ledger/verification_test.go +++ b/pkg/ledger/verification_test.go @@ -5,7 +5,7 @@ import ( ) func TestVerify(t *testing.T) { - with(func(l *Ledger) { + runOnLedger(func(l *Ledger) { err := l.Verify() if err != nil { diff --git a/pkg/ledger/volume_agg.go b/pkg/ledger/volume_agg.go new file mode 100644 index 000000000..77c2a6f26 --- /dev/null +++ b/pkg/ledger/volume_agg.go @@ -0,0 +1,132 @@ +package ledger + +import ( + "context" + "github.com/numary/ledger/pkg/core" + "github.com/numary/ledger/pkg/storage" +) + +type TransactionVolumeAggregator struct { + agg *VolumeAggregator + postVolumes core.AggregatedVolumes + preVolumes core.AggregatedVolumes + previous *TransactionVolumeAggregator +} + +func (tva *TransactionVolumeAggregator) PostCommitVolumes() core.AggregatedVolumes { + return tva.postVolumes +} + +func (tva *TransactionVolumeAggregator) PreCommitVolumes() core.AggregatedVolumes { + return tva.preVolumes +} + +func (tva *TransactionVolumeAggregator) Transfer(ctx context.Context, from, to, asset string, amount uint64) error { + if tva.preVolumes == nil { + tva.preVolumes = core.AggregatedVolumes{} + } + if tva.postVolumes == nil { + tva.postVolumes = core.AggregatedVolumes{} + } + for _, addr := range []string{from, to} { + if _, ok := tva.preVolumes[addr][asset]; !ok { + current := tva.previous + found := false + if _, ok := tva.preVolumes[addr]; !ok { + tva.preVolumes[addr] = core.Volumes{} + } + for current != nil { + if v, ok := current.postVolumes[addr][asset]; ok { + tva.preVolumes[addr][asset] = v + found = true + break + } + current = current.previous + } + if !found { + v, err := tva.agg.store.GetAccountVolume(ctx, addr, asset) + if err != nil { + return err + } + tva.preVolumes[addr][asset] = v + } + } + if _, ok := tva.postVolumes[addr][asset]; !ok { + if _, ok := tva.postVolumes[addr]; !ok { + tva.postVolumes[addr] = core.Volumes{} + } + tva.postVolumes[addr][asset] = tva.preVolumes[addr][asset] + } + } + v := tva.postVolumes[from][asset] + v.Output += int64(amount) + tva.postVolumes[from][asset] = v + + v = tva.postVolumes[to][asset] + v.Input += int64(amount) + tva.postVolumes[to][asset] = v + + return nil +} + +type VolumeAggregator struct { + store storage.Store + txs []*TransactionVolumeAggregator +} + +func (agg *VolumeAggregator) NextTx() *TransactionVolumeAggregator { + var previous *TransactionVolumeAggregator + if len(agg.txs) > 0 { + previous = agg.txs[len(agg.txs)-1] + } + tva := &TransactionVolumeAggregator{ + agg: agg, + previous: previous, + } + agg.txs = append(agg.txs, tva) + return tva +} + +func (agg *VolumeAggregator) AggregatedPostCommitVolumes() core.AggregatedVolumes { + ret := core.AggregatedVolumes{} + for i := len(agg.txs) - 1; i >= 0; i-- { + tx := agg.txs[i] + postVolumes := tx.PostCommitVolumes() + for account, volumes := range postVolumes { + for asset, volume := range volumes { + if _, ok := ret[account]; !ok { + ret[account] = core.Volumes{} + } + if _, ok := ret[account][asset]; !ok { + ret[account][asset] = volume + } + } + } + } + return ret +} + +func (agg *VolumeAggregator) AggregatedPreCommitVolumes() core.AggregatedVolumes { + ret := core.AggregatedVolumes{} + for i := 0; i < len(agg.txs); i++ { + tx := agg.txs[i] + preVolumes := tx.PreCommitVolumes() + for account, volumes := range preVolumes { + for asset, volume := range volumes { + if _, ok := ret[account]; !ok { + ret[account] = core.Volumes{} + } + if _, ok := ret[account][asset]; !ok { + ret[account][asset] = volume + } + } + } + } + return ret +} + +func NewVolumeAggregator(store storage.Store) *VolumeAggregator { + return &VolumeAggregator{ + store: store, + } +} diff --git a/pkg/ledger/volume_agg_test.go b/pkg/ledger/volume_agg_test.go new file mode 100644 index 000000000..4a3ab8dc7 --- /dev/null +++ b/pkg/ledger/volume_agg_test.go @@ -0,0 +1,184 @@ +package ledger + +import ( + "context" + "github.com/numary/ledger/pkg/core" + "github.com/numary/ledger/pkg/storage" + "github.com/pborman/uuid" + "github.com/stretchr/testify/require" + "go.uber.org/fx" + "testing" +) + +func TestVolumeAggregator(t *testing.T) { + withContainer(fx.Invoke(func(lc fx.Lifecycle, storageDriver storage.Driver) { + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + name := uuid.New() + + store, _, err := storageDriver.GetStore(context.Background(), name, true) + if err != nil { + return err + } + + _, err = store.Initialize(context.Background()) + if err != nil { + return err + } + + firstTxLog := core.NewTransactionLog(nil, core.Transaction{ + ID: 0, + TransactionData: core.TransactionData{ + Postings: []core.Posting{ + { + Source: "bob", + Destination: "zozo", + Amount: 100, + Asset: "USD", + }, + }, + }, + }) + secondTxLog := core.NewTransactionLog(&firstTxLog, core.Transaction{ + ID: 1, + TransactionData: core.TransactionData{ + Postings: []core.Posting{ + { + Source: "zozo", + Destination: "alice", + Amount: 100, + Asset: "USD", + }, + }, + }, + }) + require.NoError(t, store.AppendLog(context.Background(), firstTxLog, secondTxLog)) + + volumeAggregator := NewVolumeAggregator(store) + firstTx := volumeAggregator.NextTx() + require.NoError(t, firstTx.Transfer(context.Background(), "bob", "alice", "USD", 100)) + require.NoError(t, firstTx.Transfer(context.Background(), "bob", "zoro", "USD", 50)) + + require.Equal(t, core.AggregatedVolumes{ + "bob": core.Volumes{ + "USD": { + Output: 250, + }, + }, + "alice": core.Volumes{ + "USD": { + Input: 200, + }, + }, + "zoro": { + "USD": { + Input: 50, + }, + }, + }, firstTx.PostCommitVolumes()) + require.Equal(t, core.AggregatedVolumes{ + "bob": core.Volumes{ + "USD": { + Output: 100, + }, + }, + "alice": core.Volumes{ + "USD": { + Input: 100, + }, + }, + "zoro": core.Volumes{ + "USD": { + Input: 0, + }, + }, + }, firstTx.PreCommitVolumes()) + + secondTx := volumeAggregator.NextTx() + require.NoError(t, secondTx.Transfer(context.Background(), "alice", "fred", "USD", 50)) + require.NoError(t, secondTx.Transfer(context.Background(), "bob", "fred", "USD", 25)) + require.Equal(t, core.AggregatedVolumes{ + "bob": core.Volumes{ + "USD": { + Output: 275, + }, + }, + "alice": core.Volumes{ + "USD": { + Input: 200, + Output: 50, + }, + }, + "fred": core.Volumes{ + "USD": { + Input: 75, + }, + }, + }, secondTx.PostCommitVolumes()) + require.Equal(t, core.AggregatedVolumes{ + "bob": core.Volumes{ + "USD": { + Output: 250, + }, + }, + "alice": core.Volumes{ + "USD": { + Input: 200, + }, + }, + "fred": core.Volumes{ + "USD": {}, + }, + }, secondTx.PreCommitVolumes()) + + aggregatedPostVolumes := volumeAggregator.AggregatedPostCommitVolumes() + require.Equal(t, core.AggregatedVolumes{ + "bob": core.Volumes{ + "USD": { + Output: 275, + }, + }, + "alice": core.Volumes{ + "USD": { + Input: 200, + Output: 50, + }, + }, + "fred": core.Volumes{ + "USD": { + Input: 75, + }, + }, + "zoro": core.Volumes{ + "USD": { + Input: 50, + Output: 0, + }, + }, + }, aggregatedPostVolumes) + + aggregatedPreVolumes := volumeAggregator.AggregatedPreCommitVolumes() + require.Equal(t, core.AggregatedVolumes{ + "bob": core.Volumes{ + "USD": { + Output: 100, + }, + }, + "alice": core.Volumes{ + "USD": { + Input: 100, + }, + }, + "fred": core.Volumes{ + "USD": {}, + }, + "zoro": core.Volumes{ + "USD": {}, + }, + }, aggregatedPreVolumes) + + return nil + }, + }) + })) +} diff --git a/pkg/storage/sqlstorage/aggregations.go b/pkg/storage/sqlstorage/aggregations.go index 3a3052126..5bfb44719 100644 --- a/pkg/storage/sqlstorage/aggregations.go +++ b/pkg/storage/sqlstorage/aggregations.go @@ -100,8 +100,11 @@ func (s *Store) getAccountVolume(ctx context.Context, exec executor, address, as input int64 output int64 ) - err := row.Scan(&asset, &input, &output) + err := row.Scan(&input, &output) if err != nil { + if err == sql.ErrNoRows { + return core.Volume{}, nil + } return core.Volume{}, s.error(err) } return core.Volume{ diff --git a/pkg/storage/sqlstorage/migrations/postgresql/8.sql b/pkg/storage/sqlstorage/migrations/postgresql/8.sql new file mode 100644 index 000000000..4a239391e --- /dev/null +++ b/pkg/storage/sqlstorage/migrations/postgresql/8.sql @@ -0,0 +1,43 @@ +--statement +ALTER TABLE "VAR_LEDGER_NAME".transactions +ADD COLUMN "pre_commit_volumes" jsonb; +--statement +ALTER TABLE "VAR_LEDGER_NAME".transactions +ADD COLUMN "post_commit_volumes" jsonb; +--statement +CREATE OR REPLACE FUNCTION "VAR_LEDGER_NAME".handle_log_entry() + RETURNS TRIGGER + LANGUAGE PLPGSQL +AS +$$ +BEGIN + if NEW.type = 'NEW_TRANSACTION' THEN + INSERT INTO "VAR_LEDGER_NAME".transactions(id, timestamp, reference, postings, metadata, pre_commit_volumes, post_commit_volumes) + VALUES ( + (NEW.data ->> 'txid')::bigint, + (NEW.data ->> 'timestamp')::varchar, + CASE + WHEN (NEW.data ->> 'reference')::varchar = '' THEN NULL + ELSE (NEW.data ->> 'reference')::varchar END, + (NEW.data ->> 'postings')::jsonb, + CASE WHEN (NEW.data ->> 'metadata')::jsonb IS NULL THEN '{}' ELSE (NEW.data ->> 'metadata')::jsonb END, + (NEW.data ->> 'preCommitVolumes')::jsonb, + (NEW.data ->> 'postCommitVolumes')::jsonb + ); + END IF; + if NEW.type = 'SET_METADATA' THEN + if NEW.data ->> 'targetType' = 'TRANSACTION' THEN + UPDATE "VAR_LEDGER_NAME".transactions + SET metadata = metadata || (NEW.data ->> 'metadata')::jsonb + WHERE id = (NEW.data ->> 'targetId')::bigint; + END IF; + if NEW.data ->> 'targetType' = 'ACCOUNT' THEN + INSERT INTO "VAR_LEDGER_NAME".accounts (address, metadata) + VALUES ((NEW.data ->> 'targetId')::varchar, + (NEW.data ->> 'metadata')::jsonb) + ON CONFLICT (address) DO UPDATE SET metadata = accounts.metadata || (NEW.data ->> 'metadata')::jsonb; + END IF; + END IF; + RETURN NEW; +END; +$$; diff --git a/pkg/storage/sqlstorage/migrations/sqlite/2.sql b/pkg/storage/sqlstorage/migrations/sqlite/2.sql new file mode 100644 index 000000000..e8e05badd --- /dev/null +++ b/pkg/storage/sqlstorage/migrations/sqlite/2.sql @@ -0,0 +1,29 @@ +--statement +ALTER TABLE transactions +ADD COLUMN pre_commit_volumes varchar; +--statement +ALTER TABLE transactions +ADD COLUMN post_commit_volumes varchar; +--statement +DROP TRIGGER new_log_transaction; +--statement +CREATE TRIGGER new_log_transaction +AFTER INSERT +ON log +WHEN new.type = 'NEW_TRANSACTION' +BEGIN +INSERT INTO transactions (id, reference, timestamp, postings, metadata, pre_commit_volumes, post_commit_volumes) +VALUES ( + json_extract(new.data, '$.txid'), + CASE + WHEN json_extract(new.data, '$.reference') = '' THEN NULL + ELSE json_extract(new.data, '$.reference') END, + json_extract(new.data, '$.timestamp'), + json_extract(new.data, '$.postings'), + CASE + WHEN json_extract(new.data, '$.metadata') IS NULL THEN '{}' + ELSE json_extract(new.data, '$.metadata') END, + json_extract(new.data, '$.preCommitVolumes'), + json_extract(new.data, '$.postCommitVolumes') +); +END; diff --git a/pkg/storage/sqlstorage/migrations_test.go b/pkg/storage/sqlstorage/migrations_test.go index bf3e74628..43ac894ad 100644 --- a/pkg/storage/sqlstorage/migrations_test.go +++ b/pkg/storage/sqlstorage/migrations_test.go @@ -187,7 +187,8 @@ var postMigrate = map[string]func(t *testing.T, store *sqlstorage.Store){ return } }, - "1.sql": func(t *testing.T, store *sqlstorage.Store) { + "1.sql": func(t *testing.T, store *sqlstorage.Store) {}, + "2.sql": func(t *testing.T, store *sqlstorage.Store) { count, err := store.CountTransactions(context.Background(), query.Transactions{}) if !assert.NoError(t, err) { @@ -449,6 +450,10 @@ var postMigrate = map[string]func(t *testing.T, store *sqlstorage.Store){ func TestMigrates(t *testing.T) { + if ledgertesting.StorageDriverName() != "sqlite" { + return // Migration file does not match between both drivers + } + if testing.Verbose() { l := logrus.New() l.SetLevel(logrus.DebugLevel) diff --git a/pkg/storage/sqlstorage/transactions.go b/pkg/storage/sqlstorage/transactions.go index 8d9f31ab3..7e16f3dd2 100644 --- a/pkg/storage/sqlstorage/transactions.go +++ b/pkg/storage/sqlstorage/transactions.go @@ -14,7 +14,7 @@ import ( func (s *Store) buildTransactionsQuery(p map[string]interface{}) *sqlbuilder.SelectBuilder { sb := sqlbuilder.NewSelectBuilder() - sb.Select("id", "timestamp", "reference", "metadata", "postings") + sb.Select("id", "timestamp", "reference", "metadata", "postings", "pre_commit_volumes", "post_commit_volumes") sb.From(s.schema.Table("transactions")) if account, ok := p["account"]; ok && account.(string) != "" { arg := sb.Args.Add(account.(string)) @@ -81,6 +81,8 @@ func (s *Store) getTransactions(ctx context.Context, exec executor, q query.Tran &ref, &tx.Metadata, &tx.Postings, + &tx.PreCommitVolumes, + &tx.PostCommitVolumes, ); err != nil { return sharedapi.Cursor[core.Transaction]{}, err } @@ -115,7 +117,7 @@ func (s *Store) GetTransactions(ctx context.Context, q query.Transactions) (shar func (s *Store) getTransaction(ctx context.Context, exec executor, txid uint64) (core.Transaction, error) { sb := sqlbuilder.NewSelectBuilder() - sb.Select("id", "timestamp", "reference", "metadata", "postings") + sb.Select("id", "timestamp", "reference", "metadata", "postings", "pre_commit_volumes", "post_commit_volumes") sb.From(s.schema.Table("transactions")) sb.Where(sb.Equal("id", txid)) sb.OrderBy("id desc") @@ -138,6 +140,8 @@ func (s *Store) getTransaction(ctx context.Context, exec executor, txid uint64) &ref, &tx.Metadata, &tx.Postings, + &tx.PreCommitVolumes, + &tx.PostCommitVolumes, ) if err != nil { if err == sql.ErrNoRows { @@ -165,7 +169,7 @@ func (s *Store) GetTransaction(ctx context.Context, txId uint64) (tx core.Transa func (s *Store) getLastTransaction(ctx context.Context, exec executor) (*core.Transaction, error) { sb := sqlbuilder.NewSelectBuilder() - sb.Select("id", "timestamp", "reference", "metadata", "postings") + sb.Select("id", "timestamp", "reference", "metadata", "postings", "pre_commit_volumes", "post_commit_volumes") sb.From(s.schema.Table("transactions")) sb.OrderBy("id desc") sb.Limit(1) @@ -188,6 +192,8 @@ func (s *Store) getLastTransaction(ctx context.Context, exec executor) (*core.Tr &ref, &tx.Metadata, &tx.Postings, + &tx.PreCommitVolumes, + &tx.PostCommitVolumes, ) if err != nil { if err == sql.ErrNoRows { From 274619218e5b38ab113142b1870166e1d0190a8c Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Thu, 9 Jun 2022 11:30:29 +0200 Subject: [PATCH 06/14] Lint. --- pkg/bus/monitor_test.go | 2 +- pkg/ledger/volume_agg.go | 1 + pkg/ledger/volume_agg_test.go | 3 ++- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/bus/monitor_test.go b/pkg/bus/monitor_test.go index 7b9e90740..7a7b28ae2 100644 --- a/pkg/bus/monitor_test.go +++ b/pkg/bus/monitor_test.go @@ -2,13 +2,13 @@ package bus import ( "context" - "github.com/numary/ledger/pkg/ledger" "testing" "time" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" "github.com/numary/go-libs/sharedpublish" + "github.com/numary/ledger/pkg/ledger" "github.com/pborman/uuid" "github.com/stretchr/testify/assert" ) diff --git a/pkg/ledger/volume_agg.go b/pkg/ledger/volume_agg.go index 77c2a6f26..53a04667b 100644 --- a/pkg/ledger/volume_agg.go +++ b/pkg/ledger/volume_agg.go @@ -2,6 +2,7 @@ package ledger import ( "context" + "github.com/numary/ledger/pkg/core" "github.com/numary/ledger/pkg/storage" ) diff --git a/pkg/ledger/volume_agg_test.go b/pkg/ledger/volume_agg_test.go index 4a3ab8dc7..eaa014ed2 100644 --- a/pkg/ledger/volume_agg_test.go +++ b/pkg/ledger/volume_agg_test.go @@ -2,12 +2,13 @@ package ledger import ( "context" + "testing" + "github.com/numary/ledger/pkg/core" "github.com/numary/ledger/pkg/storage" "github.com/pborman/uuid" "github.com/stretchr/testify/require" "go.uber.org/fx" - "testing" ) func TestVolumeAggregator(t *testing.T) { From 50987cf087c52256cea90ead9dd4b4d1ccb11998 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Thu, 9 Jun 2022 12:25:37 +0200 Subject: [PATCH 07/14] Compute balances when serializing core.Volume. --- pkg/core/volumes.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pkg/core/volumes.go b/pkg/core/volumes.go index 38fc7f74f..cd4f2d22b 100644 --- a/pkg/core/volumes.go +++ b/pkg/core/volumes.go @@ -10,6 +10,17 @@ type Volume struct { Output int64 `json:"output"` } +func (v Volume) MarshalJSON() ([]byte, error) { + type volume Volume + return json.Marshal(struct { + volume + Balance int64 `json:"balance"` + }{ + volume: volume(v), + Balance: v.Input - v.Output, + }) +} + func (v Volume) Balance() int64 { return v.Input - v.Output } From cc3020f102eb3adeb7e986e1296f94d41c87142f Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Fri, 10 Jun 2022 15:00:01 +0200 Subject: [PATCH 08/14] Add missing property on swagger spec. --- pkg/api/controllers/swagger.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/api/controllers/swagger.yaml b/pkg/api/controllers/swagger.yaml index 4677ebc54..bc22921f9 100644 --- a/pkg/api/controllers/swagger.yaml +++ b/pkg/api/controllers/swagger.yaml @@ -875,6 +875,8 @@ components: type: number output: type: number + balance: + type: number required: - input - output From 6a478269c1cb0001f6288950a33b90c6e3544277 Mon Sep 17 00:00:00 2001 From: Ragot Geoffrey Date: Mon, 13 Jun 2022 19:07:47 +0200 Subject: [PATCH 09/14] Update pkg/storage/sqlstorage/aggregations.go Co-authored-by: Antoine Gelloz --- pkg/storage/sqlstorage/aggregations.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/storage/sqlstorage/aggregations.go b/pkg/storage/sqlstorage/aggregations.go index 5bfb44719..6121c369e 100644 --- a/pkg/storage/sqlstorage/aggregations.go +++ b/pkg/storage/sqlstorage/aggregations.go @@ -100,8 +100,7 @@ func (s *Store) getAccountVolume(ctx context.Context, exec executor, address, as input int64 output int64 ) - err := row.Scan(&input, &output) - if err != nil { + if err := row.Scan(&input, &output); err != nil { if err == sql.ErrNoRows { return core.Volume{}, nil } From 6a44a0ff9bd499a403f012f7bb2d64c2be3682a7 Mon Sep 17 00:00:00 2001 From: Ragot Geoffrey Date: Mon, 13 Jun 2022 19:09:16 +0200 Subject: [PATCH 10/14] Update pkg/storage/sqlstorage/aggregations.go Co-authored-by: Antoine Gelloz --- pkg/storage/sqlstorage/aggregations.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/storage/sqlstorage/aggregations.go b/pkg/storage/sqlstorage/aggregations.go index 6121c369e..ec483abcc 100644 --- a/pkg/storage/sqlstorage/aggregations.go +++ b/pkg/storage/sqlstorage/aggregations.go @@ -96,10 +96,7 @@ func (s *Store) getAccountVolume(ctx context.Context, exec executor, address, as return core.Volume{}, s.error(row.Err()) } - var ( - input int64 - output int64 - ) + var input, output int64 if err := row.Scan(&input, &output); err != nil { if err == sql.ErrNoRows { return core.Volume{}, nil From d3663d15f506dea77ff1316dd93bb150bbb733ad Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Mon, 13 Jun 2022 19:16:53 +0200 Subject: [PATCH 11/14] Some renaming. --- pkg/bus/monitor.go | 2 +- pkg/bus/monitor_test.go | 2 +- pkg/ledger/ledger.go | 14 ++++++------ pkg/ledger/monitor.go | 4 ++-- pkg/ledger/volume_agg.go | 40 +++++++++++++++++------------------ pkg/ledger/volume_agg_test.go | 2 +- 6 files changed, 32 insertions(+), 32 deletions(-) diff --git a/pkg/bus/monitor.go b/pkg/bus/monitor.go index 0cddba491..afd4fa056 100644 --- a/pkg/bus/monitor.go +++ b/pkg/bus/monitor.go @@ -35,7 +35,7 @@ func (l *ledgerMonitor) publish(ctx context.Context, ledger string, et string, d } } -func (l *ledgerMonitor) CommittedTransactions(ctx context.Context, ledger string, result *ledger.CommitmentResult) { +func (l *ledgerMonitor) CommittedTransactions(ctx context.Context, ledger string, result *ledger.CommitResult) { l.publish(ctx, ledger, CommittedTransactions, committedTransactions{ Transactions: result.GeneratedTransactions, Volumes: result.PostCommitVolumes, diff --git a/pkg/bus/monitor_test.go b/pkg/bus/monitor_test.go index 7a7b28ae2..54b466063 100644 --- a/pkg/bus/monitor_test.go +++ b/pkg/bus/monitor_test.go @@ -29,7 +29,7 @@ func TestMonitor(t *testing.T) { "*": "testing", }) m := NewLedgerMonitor(p) - go m.CommittedTransactions(context.Background(), uuid.New(), &ledger.CommitmentResult{}) + go m.CommittedTransactions(context.Background(), uuid.New(), &ledger.CommitResult{}) select { case m := <-messages: diff --git a/pkg/ledger/ledger.go b/pkg/ledger/ledger.go index 2772d7c86..b0eec8279 100644 --- a/pkg/ledger/ledger.go +++ b/pkg/ledger/ledger.go @@ -51,14 +51,14 @@ func (l *Ledger) Close(ctx context.Context) error { return nil } -type CommitmentResult struct { +type CommitResult struct { PreCommitVolumes core.AggregatedVolumes PostCommitVolumes core.AggregatedVolumes GeneratedTransactions []core.Transaction GeneratedLogs []core.Log } -func (l *Ledger) processTx(ctx context.Context, ts []core.TransactionData) (*CommitmentResult, error) { +func (l *Ledger) processTx(ctx context.Context, ts []core.TransactionData) (*CommitResult, error) { mapping, err := l.store.LoadMapping(ctx) if err != nil { return nil, errors.Wrap(err, "loading mapping") @@ -78,11 +78,11 @@ func (l *Ledger) processTx(ctx context.Context, ts []core.TransactionData) (*Com nextTxId = lastTx.ID + 1 } - volumeAggregator := NewVolumeAggregator(l.store) + volumeAggregator := newVolumeAggregator(l.store) generatedTxs := make([]core.Transaction, 0) accounts := make(map[string]core.Account, 0) - logs := make([]core.Log, 0) + generatedLogs := make([]core.Log, 0) contracts := make([]core.Contract, 0) if mapping != nil { contracts = append(contracts, mapping.Contracts...) @@ -158,15 +158,15 @@ func (l *Ledger) processTx(ctx context.Context, ts []core.TransactionData) (*Com generatedTxs = append(generatedTxs, tx) newLog := core.NewTransactionLog(lastLog, tx) lastLog = &newLog - logs = append(logs, newLog) + generatedLogs = append(generatedLogs, newLog) nextTxId++ } - return &CommitmentResult{ + return &CommitResult{ PreCommitVolumes: volumeAggregator.AggregatedPreCommitVolumes(), PostCommitVolumes: volumeAggregator.AggregatedPostCommitVolumes(), GeneratedTransactions: generatedTxs, - GeneratedLogs: logs, + GeneratedLogs: generatedLogs, }, nil } diff --git a/pkg/ledger/monitor.go b/pkg/ledger/monitor.go index 428929b14..1367b5175 100644 --- a/pkg/ledger/monitor.go +++ b/pkg/ledger/monitor.go @@ -7,7 +7,7 @@ import ( ) type Monitor interface { - CommittedTransactions(context.Context, string, *CommitmentResult) + CommittedTransactions(context.Context, string, *CommitResult) SavedMetadata(ctx context.Context, ledger string, targetType string, id string, metadata core.Metadata) UpdatedMapping(context.Context, string, core.Mapping) RevertedTransaction(ctx context.Context, ledger string, reverted core.Transaction, revert core.Transaction) @@ -15,7 +15,7 @@ type Monitor interface { type noOpMonitor struct{} -func (n noOpMonitor) CommittedTransactions(ctx context.Context, s string, result *CommitmentResult) { +func (n noOpMonitor) CommittedTransactions(ctx context.Context, s string, result *CommitResult) { } func (n noOpMonitor) SavedMetadata(ctx context.Context, ledger string, targetType string, id string, metadata core.Metadata) { } diff --git a/pkg/ledger/volume_agg.go b/pkg/ledger/volume_agg.go index 53a04667b..b5b49053b 100644 --- a/pkg/ledger/volume_agg.go +++ b/pkg/ledger/volume_agg.go @@ -7,22 +7,22 @@ import ( "github.com/numary/ledger/pkg/storage" ) -type TransactionVolumeAggregator struct { - agg *VolumeAggregator +type transactionVolumeAggregator struct { + agg *volumeAggregator postVolumes core.AggregatedVolumes preVolumes core.AggregatedVolumes - previous *TransactionVolumeAggregator + previousTx *transactionVolumeAggregator } -func (tva *TransactionVolumeAggregator) PostCommitVolumes() core.AggregatedVolumes { +func (tva *transactionVolumeAggregator) PostCommitVolumes() core.AggregatedVolumes { return tva.postVolumes } -func (tva *TransactionVolumeAggregator) PreCommitVolumes() core.AggregatedVolumes { +func (tva *transactionVolumeAggregator) PreCommitVolumes() core.AggregatedVolumes { return tva.preVolumes } -func (tva *TransactionVolumeAggregator) Transfer(ctx context.Context, from, to, asset string, amount uint64) error { +func (tva *transactionVolumeAggregator) Transfer(ctx context.Context, from, to, asset string, amount uint64) error { if tva.preVolumes == nil { tva.preVolumes = core.AggregatedVolumes{} } @@ -31,7 +31,7 @@ func (tva *TransactionVolumeAggregator) Transfer(ctx context.Context, from, to, } for _, addr := range []string{from, to} { if _, ok := tva.preVolumes[addr][asset]; !ok { - current := tva.previous + current := tva.previousTx found := false if _, ok := tva.preVolumes[addr]; !ok { tva.preVolumes[addr] = core.Volumes{} @@ -42,7 +42,7 @@ func (tva *TransactionVolumeAggregator) Transfer(ctx context.Context, from, to, found = true break } - current = current.previous + current = current.previousTx } if !found { v, err := tva.agg.store.GetAccountVolume(ctx, addr, asset) @@ -70,25 +70,25 @@ func (tva *TransactionVolumeAggregator) Transfer(ctx context.Context, from, to, return nil } -type VolumeAggregator struct { +type volumeAggregator struct { store storage.Store - txs []*TransactionVolumeAggregator + txs []*transactionVolumeAggregator } -func (agg *VolumeAggregator) NextTx() *TransactionVolumeAggregator { - var previous *TransactionVolumeAggregator +func (agg *volumeAggregator) NextTx() *transactionVolumeAggregator { + var previousTx *transactionVolumeAggregator if len(agg.txs) > 0 { - previous = agg.txs[len(agg.txs)-1] + previousTx = agg.txs[len(agg.txs)-1] } - tva := &TransactionVolumeAggregator{ - agg: agg, - previous: previous, + tva := &transactionVolumeAggregator{ + agg: agg, + previousTx: previousTx, } agg.txs = append(agg.txs, tva) return tva } -func (agg *VolumeAggregator) AggregatedPostCommitVolumes() core.AggregatedVolumes { +func (agg *volumeAggregator) AggregatedPostCommitVolumes() core.AggregatedVolumes { ret := core.AggregatedVolumes{} for i := len(agg.txs) - 1; i >= 0; i-- { tx := agg.txs[i] @@ -107,7 +107,7 @@ func (agg *VolumeAggregator) AggregatedPostCommitVolumes() core.AggregatedVolume return ret } -func (agg *VolumeAggregator) AggregatedPreCommitVolumes() core.AggregatedVolumes { +func (agg *volumeAggregator) AggregatedPreCommitVolumes() core.AggregatedVolumes { ret := core.AggregatedVolumes{} for i := 0; i < len(agg.txs); i++ { tx := agg.txs[i] @@ -126,8 +126,8 @@ func (agg *VolumeAggregator) AggregatedPreCommitVolumes() core.AggregatedVolumes return ret } -func NewVolumeAggregator(store storage.Store) *VolumeAggregator { - return &VolumeAggregator{ +func newVolumeAggregator(store storage.Store) *volumeAggregator { + return &volumeAggregator{ store: store, } } diff --git a/pkg/ledger/volume_agg_test.go b/pkg/ledger/volume_agg_test.go index eaa014ed2..5bd956f52 100644 --- a/pkg/ledger/volume_agg_test.go +++ b/pkg/ledger/volume_agg_test.go @@ -55,7 +55,7 @@ func TestVolumeAggregator(t *testing.T) { }) require.NoError(t, store.AppendLog(context.Background(), firstTxLog, secondTxLog)) - volumeAggregator := NewVolumeAggregator(store) + volumeAggregator := newVolumeAggregator(store) firstTx := volumeAggregator.NextTx() require.NoError(t, firstTx.Transfer(context.Background(), "bob", "alice", "USD", 100)) require.NoError(t, firstTx.Transfer(context.Background(), "bob", "zoro", "USD", 50)) From 9fab781f7fa2f7be75722bf1acbc714f3702104b Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Mon, 13 Jun 2022 19:20:08 +0200 Subject: [PATCH 12/14] Add examples to the openapi spec. --- pkg/api/controllers/swagger.yaml | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/pkg/api/controllers/swagger.yaml b/pkg/api/controllers/swagger.yaml index bc22921f9..b2273700c 100644 --- a/pkg/api/controllers/swagger.yaml +++ b/pkg/api/controllers/swagger.yaml @@ -880,14 +880,38 @@ components: required: - input - output + example: + input: 100 + output: 20 + balance: 80 Volumes: type: object additionalProperties: $ref: '#/components/schemas/Volume' + example: + USD: + input: 100 + output: 10 + balance: 90 + EUR: + input: 100 + output: 10 + balance: 90 AggregatedVolumes: type: object additionalProperties: $ref: '#/components/schemas/Volumes' + example: + "orders:1": + "USD": + input: 100 + output: 10 + balance: 90 + "orders:2": + "USD": + input: 100 + output: 10 + balance: 90 ErrorCode: type: string enum: From 58b25832395848c0bcf44a1c389676ccf8ff9b3d Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Mon, 13 Jun 2022 19:22:56 +0200 Subject: [PATCH 13/14] Unexpose some functions. --- pkg/ledger/ledger.go | 14 +++++++------- pkg/ledger/volume_agg.go | 16 ++++++++-------- pkg/ledger/volume_agg_test.go | 24 ++++++++++++------------ 3 files changed, 27 insertions(+), 27 deletions(-) diff --git a/pkg/ledger/ledger.go b/pkg/ledger/ledger.go index b0eec8279..04deb92eb 100644 --- a/pkg/ledger/ledger.go +++ b/pkg/ledger/ledger.go @@ -94,7 +94,7 @@ func (l *Ledger) processTx(ctx context.Context, ts []core.TransactionData) (*Com return nil, NewTransactionCommitError(i, NewValidationError("transaction has no postings")) } - txVolumeAggregator := volumeAggregator.NextTx() + txVolumeAggregator := volumeAggregator.nextTx() for _, p := range t.Postings { if p.Amount < 0 { @@ -109,13 +109,13 @@ func (l *Ledger) processTx(ctx context.Context, ts []core.TransactionData) (*Com if !core.AssetIsValid(p.Asset) { return nil, NewTransactionCommitError(i, NewValidationError("invalid asset")) } - err := txVolumeAggregator.Transfer(ctx, p.Source, p.Destination, p.Asset, uint64(p.Amount)) + err := txVolumeAggregator.transfer(ctx, p.Source, p.Destination, p.Asset, uint64(p.Amount)) if err != nil { return nil, NewTransactionCommitError(i, err) } } - for addr, volumes := range txVolumeAggregator.PostCommitVolumes() { + for addr, volumes := range txVolumeAggregator.postCommitVolumes() { for asset, volume := range volumes { if addr == "world" { continue @@ -152,8 +152,8 @@ func (l *Ledger) processTx(ctx context.Context, ts []core.TransactionData) (*Com TransactionData: t, ID: nextTxId, Timestamp: time.Now().UTC().Format(time.RFC3339), - PostCommitVolumes: txVolumeAggregator.PostCommitVolumes(), - PreCommitVolumes: txVolumeAggregator.PreCommitVolumes(), + PostCommitVolumes: txVolumeAggregator.postCommitVolumes(), + PreCommitVolumes: txVolumeAggregator.preCommitVolumes(), } generatedTxs = append(generatedTxs, tx) newLog := core.NewTransactionLog(lastLog, tx) @@ -163,8 +163,8 @@ func (l *Ledger) processTx(ctx context.Context, ts []core.TransactionData) (*Com } return &CommitResult{ - PreCommitVolumes: volumeAggregator.AggregatedPreCommitVolumes(), - PostCommitVolumes: volumeAggregator.AggregatedPostCommitVolumes(), + PreCommitVolumes: volumeAggregator.aggregatedPreCommitVolumes(), + PostCommitVolumes: volumeAggregator.aggregatedPostCommitVolumes(), GeneratedTransactions: generatedTxs, GeneratedLogs: generatedLogs, }, nil diff --git a/pkg/ledger/volume_agg.go b/pkg/ledger/volume_agg.go index b5b49053b..e1ef191bc 100644 --- a/pkg/ledger/volume_agg.go +++ b/pkg/ledger/volume_agg.go @@ -14,15 +14,15 @@ type transactionVolumeAggregator struct { previousTx *transactionVolumeAggregator } -func (tva *transactionVolumeAggregator) PostCommitVolumes() core.AggregatedVolumes { +func (tva *transactionVolumeAggregator) postCommitVolumes() core.AggregatedVolumes { return tva.postVolumes } -func (tva *transactionVolumeAggregator) PreCommitVolumes() core.AggregatedVolumes { +func (tva *transactionVolumeAggregator) preCommitVolumes() core.AggregatedVolumes { return tva.preVolumes } -func (tva *transactionVolumeAggregator) Transfer(ctx context.Context, from, to, asset string, amount uint64) error { +func (tva *transactionVolumeAggregator) transfer(ctx context.Context, from, to, asset string, amount uint64) error { if tva.preVolumes == nil { tva.preVolumes = core.AggregatedVolumes{} } @@ -75,7 +75,7 @@ type volumeAggregator struct { txs []*transactionVolumeAggregator } -func (agg *volumeAggregator) NextTx() *transactionVolumeAggregator { +func (agg *volumeAggregator) nextTx() *transactionVolumeAggregator { var previousTx *transactionVolumeAggregator if len(agg.txs) > 0 { previousTx = agg.txs[len(agg.txs)-1] @@ -88,11 +88,11 @@ func (agg *volumeAggregator) NextTx() *transactionVolumeAggregator { return tva } -func (agg *volumeAggregator) AggregatedPostCommitVolumes() core.AggregatedVolumes { +func (agg *volumeAggregator) aggregatedPostCommitVolumes() core.AggregatedVolumes { ret := core.AggregatedVolumes{} for i := len(agg.txs) - 1; i >= 0; i-- { tx := agg.txs[i] - postVolumes := tx.PostCommitVolumes() + postVolumes := tx.postCommitVolumes() for account, volumes := range postVolumes { for asset, volume := range volumes { if _, ok := ret[account]; !ok { @@ -107,11 +107,11 @@ func (agg *volumeAggregator) AggregatedPostCommitVolumes() core.AggregatedVolume return ret } -func (agg *volumeAggregator) AggregatedPreCommitVolumes() core.AggregatedVolumes { +func (agg *volumeAggregator) aggregatedPreCommitVolumes() core.AggregatedVolumes { ret := core.AggregatedVolumes{} for i := 0; i < len(agg.txs); i++ { tx := agg.txs[i] - preVolumes := tx.PreCommitVolumes() + preVolumes := tx.preCommitVolumes() for account, volumes := range preVolumes { for asset, volume := range volumes { if _, ok := ret[account]; !ok { diff --git a/pkg/ledger/volume_agg_test.go b/pkg/ledger/volume_agg_test.go index 5bd956f52..377124954 100644 --- a/pkg/ledger/volume_agg_test.go +++ b/pkg/ledger/volume_agg_test.go @@ -56,9 +56,9 @@ func TestVolumeAggregator(t *testing.T) { require.NoError(t, store.AppendLog(context.Background(), firstTxLog, secondTxLog)) volumeAggregator := newVolumeAggregator(store) - firstTx := volumeAggregator.NextTx() - require.NoError(t, firstTx.Transfer(context.Background(), "bob", "alice", "USD", 100)) - require.NoError(t, firstTx.Transfer(context.Background(), "bob", "zoro", "USD", 50)) + firstTx := volumeAggregator.nextTx() + require.NoError(t, firstTx.transfer(context.Background(), "bob", "alice", "USD", 100)) + require.NoError(t, firstTx.transfer(context.Background(), "bob", "zoro", "USD", 50)) require.Equal(t, core.AggregatedVolumes{ "bob": core.Volumes{ @@ -76,7 +76,7 @@ func TestVolumeAggregator(t *testing.T) { Input: 50, }, }, - }, firstTx.PostCommitVolumes()) + }, firstTx.postCommitVolumes()) require.Equal(t, core.AggregatedVolumes{ "bob": core.Volumes{ "USD": { @@ -93,11 +93,11 @@ func TestVolumeAggregator(t *testing.T) { Input: 0, }, }, - }, firstTx.PreCommitVolumes()) + }, firstTx.preCommitVolumes()) - secondTx := volumeAggregator.NextTx() - require.NoError(t, secondTx.Transfer(context.Background(), "alice", "fred", "USD", 50)) - require.NoError(t, secondTx.Transfer(context.Background(), "bob", "fred", "USD", 25)) + secondTx := volumeAggregator.nextTx() + require.NoError(t, secondTx.transfer(context.Background(), "alice", "fred", "USD", 50)) + require.NoError(t, secondTx.transfer(context.Background(), "bob", "fred", "USD", 25)) require.Equal(t, core.AggregatedVolumes{ "bob": core.Volumes{ "USD": { @@ -115,7 +115,7 @@ func TestVolumeAggregator(t *testing.T) { Input: 75, }, }, - }, secondTx.PostCommitVolumes()) + }, secondTx.postCommitVolumes()) require.Equal(t, core.AggregatedVolumes{ "bob": core.Volumes{ "USD": { @@ -130,9 +130,9 @@ func TestVolumeAggregator(t *testing.T) { "fred": core.Volumes{ "USD": {}, }, - }, secondTx.PreCommitVolumes()) + }, secondTx.preCommitVolumes()) - aggregatedPostVolumes := volumeAggregator.AggregatedPostCommitVolumes() + aggregatedPostVolumes := volumeAggregator.aggregatedPostCommitVolumes() require.Equal(t, core.AggregatedVolumes{ "bob": core.Volumes{ "USD": { @@ -158,7 +158,7 @@ func TestVolumeAggregator(t *testing.T) { }, }, aggregatedPostVolumes) - aggregatedPreVolumes := volumeAggregator.AggregatedPreCommitVolumes() + aggregatedPreVolumes := volumeAggregator.aggregatedPreCommitVolumes() require.Equal(t, core.AggregatedVolumes{ "bob": core.Volumes{ "USD": { From b11fa827806003a177a637e339642fc961a1c540 Mon Sep 17 00:00:00 2001 From: antoinegelloz Date: Tue, 14 Jun 2022 09:49:03 +0200 Subject: [PATCH 14/14] lint --- pkg/storage/sqlstorage/aggregations.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/sqlstorage/aggregations.go b/pkg/storage/sqlstorage/aggregations.go index ec483abcc..de94384f3 100644 --- a/pkg/storage/sqlstorage/aggregations.go +++ b/pkg/storage/sqlstorage/aggregations.go @@ -97,7 +97,7 @@ func (s *Store) getAccountVolume(ctx context.Context, exec executor, address, as } var input, output int64 - if err := row.Scan(&input, &output); err != nil { + if err := row.Scan(&input, &output); err != nil { if err == sql.ErrNoRows { return core.Volume{}, nil }