Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pre transactions balance to the log #223

Merged
merged 14 commits into from
Jun 14, 2022
5 changes: 2 additions & 3 deletions pkg/api/controllers/account_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,9 @@ func TestGetAccount(t *testing.T) {
Balances: map[string]int64{
"USD": 100,
},
Volumes: map[string]map[string]int64{
Volumes: core.Volumes{
"USD": {
"input": 100,
"output": 0,
Input: 100,
},
},
Metadata: core.Metadata{
Expand Down
48 changes: 48 additions & 0 deletions pkg/api/controllers/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,10 @@ components:
format: date-time
txid:
type: integer
preCommitVolumes:
$ref: '#/components/schemas/AggregatedVolumes'
postCommitVolumes:
$ref: '#/components/schemas/AggregatedVolumes'
required:
- postings
- timestamp
Expand Down Expand Up @@ -864,6 +868,50 @@ components:
type: object
required:
- data
Volume:
type: object
properties:
input:
type: number
output:
type: number
balance:
type: number
required:
- input
- output
example:
input: 100
output: 20
balance: 80
Volumes:
type: object
additionalProperties:
$ref: '#/components/schemas/Volume'
example:
USD:
input: 100
output: 10
balance: 90
EUR:
input: 100
output: 10
balance: 90
AggregatedVolumes:
gfyrag marked this conversation as resolved.
Show resolved Hide resolved
type: object
additionalProperties:
$ref: '#/components/schemas/Volumes'
example:
"orders:1":
"USD":
input: 100
output: 10
balance: 90
"orders:2":
"USD":
input: 100
output: 10
balance: 90
ErrorCode:
type: string
enum:
Expand Down
49 changes: 45 additions & 4 deletions pkg/api/controllers/transaction_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,25 +191,66 @@ func TestGetTransaction(t *testing.T) {
})
require.Equal(t, http.StatusOK, rsp.Result().StatusCode)

tx, _ := internal.DecodeSingleResponse[[]core.Transaction](t, rsp.Body)
txs, _ := internal.DecodeSingleResponse[[]core.Transaction](t, rsp.Body)
tx := txs[0]
assert.EqualValues(t, core.AggregatedVolumes{
"world": core.Volumes{
"USD": {},
},
"central_bank": core.Volumes{
"USD": {},
},
}, tx.PreCommitVolumes)
assert.EqualValues(t, core.AggregatedVolumes{
"world": core.Volumes{
"USD": {
Output: 1000,
},
},
"central_bank": core.Volumes{
"USD": {
Input: 1000,
},
},
}, tx.PostCommitVolumes)

rsp = internal.GetTransaction(api, tx[0].ID)
rsp = internal.GetTransaction(api, tx.ID)
assert.Equal(t, http.StatusOK, rsp.Result().StatusCode)

ret, _ := internal.DecodeSingleResponse[core.Transaction](t, rsp.Body)

assert.EqualValues(t, ret.Postings, core.Postings{
assert.EqualValues(t, core.Postings{
{
Source: "world",
Destination: "central_bank",
Amount: 1000,
Asset: "USD",
},
})
}, ret.Postings)
assert.EqualValues(t, 0, ret.ID)
assert.EqualValues(t, core.Metadata{}, ret.Metadata)
assert.EqualValues(t, "ref", ret.Reference)
assert.NotEmpty(t, ret.Timestamp)
assert.EqualValues(t, core.AggregatedVolumes{
"world": core.Volumes{
"USD": {},
},
"central_bank": core.Volumes{
"USD": {},
},
}, ret.PreCommitVolumes)
assert.EqualValues(t, core.AggregatedVolumes{
"world": core.Volumes{
"USD": {
Output: 1000,
},
},
"central_bank": core.Volumes{
"USD": {
Input: 1000,
},
},
}, ret.PostCommitVolumes)
return nil
},
})
Expand Down
7 changes: 5 additions & 2 deletions pkg/bus/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ type baseEvent struct {
}

type committedTransactions struct {
Transactions []core.Transaction `json:"transactions"`
Volumes core.AggregatedVolumes `json:"volumes"`
Transactions []core.Transaction `json:"transactions"`
// Deprecated (use postCommitVolumes)
Volumes core.AggregatedVolumes `json:"volumes"`
PostCommitVolumes core.AggregatedVolumes `json:"postCommitVolumes"`
PreCommitVolumes core.AggregatedVolumes `json:"preCommitVolumes"`
}

type savedMetadata struct {
Expand Down
8 changes: 5 additions & 3 deletions pkg/bus/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ func (l *ledgerMonitor) publish(ctx context.Context, ledger string, et string, d
}
}

func (l *ledgerMonitor) CommittedTransactions(ctx context.Context, ledger string, results []core.Transaction, volumes core.AggregatedVolumes) {
func (l *ledgerMonitor) CommittedTransactions(ctx context.Context, ledger string, result *ledger.CommitResult) {
l.publish(ctx, ledger, CommittedTransactions, committedTransactions{
Transactions: results,
Volumes: volumes,
Transactions: result.GeneratedTransactions,
Volumes: result.PostCommitVolumes,
PostCommitVolumes: result.PostCommitVolumes,
PreCommitVolumes: result.PreCommitVolumes,
})
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/bus/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
"github.com/numary/go-libs/sharedpublish"
"github.com/numary/ledger/pkg/ledger"
"github.com/pborman/uuid"
"github.com/stretchr/testify/assert"
)
Expand All @@ -28,7 +29,7 @@ func TestMonitor(t *testing.T) {
"*": "testing",
})
m := NewLedgerMonitor(p)
go m.CommittedTransactions(context.Background(), uuid.New(), nil, nil)
go m.CommittedTransactions(context.Background(), uuid.New(), &ledger.CommitResult{})

select {
case m := <-messages:
Expand Down
13 changes: 0 additions & 13 deletions pkg/core/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,6 @@ const (
WORLD = "world"
)

type Balances map[string]int64
type Volumes map[string]map[string]int64

func (v Volumes) Balances() Balances {
balances := Balances{}
for asset, vv := range v {
balances[asset] = vv["input"] - vv["output"]
}
return balances
}

type AggregatedVolumes map[string]Volumes

type Account struct {
Address string `json:"address" example:"users:001"`
Type string `json:"type,omitempty" example:"virtual"`
Expand Down
6 changes: 4 additions & 2 deletions pkg/core/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ func (t *TransactionData) Reverse() TransactionData {

type Transaction struct {
TransactionData
ID uint64 `json:"txid"`
Timestamp string `json:"timestamp"`
ID uint64 `json:"txid"`
Timestamp string `json:"timestamp"`
PreCommitVolumes AggregatedVolumes `json:"preCommitVolumes,omitempty"` // Keep omitempty to keep consistent hash
PostCommitVolumes AggregatedVolumes `json:"postCommitVolumes,omitempty"` // Keep omitempty to keep consistent hash
}

func (t *Transaction) AppendPosting(p Posting) {
Expand Down
61 changes: 61 additions & 0 deletions pkg/core/volumes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package core

import (
"database/sql/driver"
"encoding/json"
)

type Volume struct {
Input int64 `json:"input"`
Output int64 `json:"output"`
}

func (v Volume) MarshalJSON() ([]byte, error) {
type volume Volume
return json.Marshal(struct {
volume
Balance int64 `json:"balance"`
}{
volume: volume(v),
Balance: v.Input - v.Output,
})
}

func (v Volume) Balance() int64 {
return v.Input - v.Output
}

type Balances map[string]int64
type Volumes map[string]Volume

func (v Volumes) Balances() Balances {
balances := Balances{}
for asset, vv := range v {
balances[asset] = vv.Input - vv.Output
}
return balances
}

type AggregatedVolumes map[string]Volumes

// Scan - Implement the database/sql scanner interface
func (m *AggregatedVolumes) Scan(value interface{}) error {
if value == nil {
return nil
}

v, err := driver.String.ConvertValue(value)
if err != nil {
return err
}

*m = AggregatedVolumes{}
switch vv := v.(type) {
case []uint8:
return json.Unmarshal(vv, m)
case string:
return json.Unmarshal([]byte(vv), m)
default:
panic("not handled type")
}
}
24 changes: 12 additions & 12 deletions pkg/ledger/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func assertBalance(t *testing.T, l *Ledger, account, asset string, amount int64)
}

func TestNoScript(t *testing.T) {
with(func(l *Ledger) {
runOnLedger(func(l *Ledger) {
script := core.Script{}

_, err := l.Execute(context.Background(), script)
Expand All @@ -34,7 +34,7 @@ func TestNoScript(t *testing.T) {
}

func TestCompilationError(t *testing.T) {
with(func(l *Ledger) {
runOnLedger(func(l *Ledger) {
script := core.Script{
Plain: "willnotcompile",
}
Expand All @@ -46,7 +46,7 @@ func TestCompilationError(t *testing.T) {
}

func TestTransactionInvalidScript(t *testing.T) {
with(func(l *Ledger) {
runOnLedger(func(l *Ledger) {
script := core.Script{
Plain: "this is not a valid script",
}
Expand All @@ -59,7 +59,7 @@ func TestTransactionInvalidScript(t *testing.T) {
}

func TestTransactionFail(t *testing.T) {
with(func(l *Ledger) {
runOnLedger(func(l *Ledger) {
script := core.Script{
Plain: "fail",
}
Expand All @@ -72,7 +72,7 @@ func TestTransactionFail(t *testing.T) {
}

func TestSend(t *testing.T) {
with(func(l *Ledger) {
runOnLedger(func(l *Ledger) {
defer func(l *Ledger, ctx context.Context) {
require.NoError(t, l.Close(ctx))
}(l, context.Background())
Expand All @@ -92,7 +92,7 @@ func TestSend(t *testing.T) {
}

func TestNoVariables(t *testing.T) {
with(func(l *Ledger) {
runOnLedger(func(l *Ledger) {
var script core.Script
err := json.Unmarshal(
[]byte(`{
Expand All @@ -110,7 +110,7 @@ func TestNoVariables(t *testing.T) {
}

func TestVariables(t *testing.T) {
with(func(l *Ledger) {
runOnLedger(func(l *Ledger) {
defer func(l *Ledger, ctx context.Context) {
require.NoError(t, l.Close(ctx))
}(l, context.Background())
Expand Down Expand Up @@ -141,7 +141,7 @@ func TestVariables(t *testing.T) {
}

func TestEnoughFunds(t *testing.T) {
with(func(l *Ledger) {
runOnLedger(func(l *Ledger) {
defer func(l *Ledger, ctx context.Context) {
require.NoError(t, l.Close(ctx))
}(l, context.Background())
Expand Down Expand Up @@ -174,7 +174,7 @@ func TestEnoughFunds(t *testing.T) {
}

func TestNotEnoughFunds(t *testing.T) {
with(func(l *Ledger) {
runOnLedger(func(l *Ledger) {
defer func(l *Ledger, ctx context.Context) {
require.NoError(t, l.Close(ctx))
}(l, context.Background())
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestNotEnoughFunds(t *testing.T) {
}

func TestMissingMetadata(t *testing.T) {
with(func(l *Ledger) {
runOnLedger(func(l *Ledger) {
defer func(l *Ledger, ctx context.Context) {
require.NoError(t, l.Close(ctx))
}(l, context.Background())
Expand Down Expand Up @@ -237,7 +237,7 @@ func TestMissingMetadata(t *testing.T) {
}

func TestMetadata(t *testing.T) {
with(func(l *Ledger) {
runOnLedger(func(l *Ledger) {
defer func(l *Ledger, ctx context.Context) {
require.NoError(t, l.Close(ctx))
}(l, context.Background())
Expand Down Expand Up @@ -308,7 +308,7 @@ func TestMetadata(t *testing.T) {
}

func TestSetTxMeta(t *testing.T) {
with(func(l *Ledger) {
runOnLedger(func(l *Ledger) {
defer func(l *Ledger, ctx context.Context) {
require.NoError(t, l.Close(ctx))
}(l, context.Background())
Expand Down
Loading