Skip to content

Commit

Permalink
feat: check event schemas and values
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 23, 2024
1 parent 0c39cbe commit a0acec6
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 21 deletions.
15 changes: 12 additions & 3 deletions docs/events/CommittedTransactions.json
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@
"type": "array"
},
"Time": {
"properties": {},
"additionalProperties": false,
"type": "object"
"type": "string",
"format": "date-time",
"title": "Normalized date"
},
"Transaction": {
"properties": {
Expand Down Expand Up @@ -109,6 +109,15 @@
},
"postCommitEffectiveVolumes": {
"$ref": "#/$defs/PostCommitVolumes"
},
"reverted": {
"type": "boolean"
},
"preCommitVolumes": {
"$ref": "#/$defs/PostCommitVolumes"
},
"preCommitEffectiveVolumes": {
"$ref": "#/$defs/PostCommitVolumes"
}
},
"additionalProperties": false,
Expand Down
15 changes: 12 additions & 3 deletions docs/events/RevertedTransaction.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@
]
},
"Time": {
"properties": {},
"additionalProperties": false,
"type": "object"
"type": "string",
"format": "date-time",
"title": "Normalized date"
},
"Transaction": {
"properties": {
Expand Down Expand Up @@ -103,6 +103,15 @@
},
"postCommitEffectiveVolumes": {
"$ref": "#/$defs/PostCommitVolumes"
},
"reverted": {
"type": "boolean"
},
"preCommitVolumes": {
"$ref": "#/$defs/PostCommitVolumes"
},
"preCommitEffectiveVolumes": {
"$ref": "#/$defs/PostCommitVolumes"
}
},
"additionalProperties": false,
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,20 @@ require (
github.com/onsi/gomega v1.34.2
github.com/pborman/uuid v1.2.1
github.com/pkg/errors v0.9.1
github.com/r3labs/diff v1.1.0
github.com/shomali11/xsql v0.0.0-20190608141458-bf76292144df
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.9.0
github.com/uptrace/bun v1.2.3
github.com/uptrace/bun/dialect/pgdialect v1.2.3
github.com/xeipuuv/gojsonschema v1.2.0
go.opentelemetry.io/otel v1.30.0
go.opentelemetry.io/otel/trace v1.30.0
go.uber.org/fx v1.22.2
go.uber.org/mock v0.4.0
golang.org/x/oauth2 v0.23.0
gotest.tools/v3 v3.5.1
)

require (
Expand Down Expand Up @@ -159,7 +162,6 @@ require (
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
github.com/xo/dburl v0.23.2 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
github.com/zitadel/oidc/v2 v2.12.2 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 h1:o4JXh1EVt
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/puzpuzpuz/xsync/v3 v3.4.0 h1:DuVBAdXuGFHv8adVXjWWZ63pJq+NRXOWVXlKDBZ+mJ4=
github.com/puzpuzpuz/xsync/v3 v3.4.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA=
github.com/r3labs/diff v1.1.0 h1:V53xhrbTHrWFWq3gI4b94AjgEJOerO1+1l0xyHOBi8M=
github.com/r3labs/diff v1.1.0/go.mod h1:7WjXasNzi0vJetRcB/RqNl5dlIsmXcTTLmF5IoH6Xig=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/riandyrn/otelchi v0.10.0 h1:QMbR/FMDWBOkej6dfyWteYefUKqIFxnyrpaoWRJ9RPQ=
Expand Down Expand Up @@ -294,6 +296,7 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
Expand Down
20 changes: 9 additions & 11 deletions internal/api/v2/controllers_bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,10 @@ func TestBulk(t *testing.T) {
"asset": "USD/2",
},
},
"timestamp": now.Format(time.RFC3339Nano),
"metadata": map[string]any{},
"reverted": false,
"revertedAt": nil,
"id": float64(0),
"timestamp": now.Format(time.RFC3339Nano),
"metadata": map[string]any{},
"reverted": false,
"id": float64(0),
},
ResponseType: ActionCreateTransaction,
}},
Expand Down Expand Up @@ -152,12 +151,11 @@ func TestBulk(t *testing.T) {
},
expectResults: []Result{{
Data: map[string]any{
"id": float64(0),
"metadata": nil,
"postings": nil,
"reverted": false,
"revertedAt": nil,
"timestamp": "0001-01-01T00:00:00Z",
"id": float64(0),
"metadata": nil,
"postings": nil,
"reverted": false,
"timestamp": "0001-01-01T00:00:00Z",
},
ResponseType: ActionRevertTransaction,
}},
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/bucket/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func getMigrator(name string) *migrations.Migrator {
buf := bytes.NewBufferString("")

t := template.Must(template.New("migration").Parse(s))
if err := t.Execute(buf, map[string]interface{}{
if err := t.Execute(buf, map[string]any{
"Bucket": name,
}); err != nil {
panic(err)
Expand Down
15 changes: 14 additions & 1 deletion internal/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ledger

import (
"encoding/json"
"github.com/invopop/jsonschema"
"math/big"
"slices"
"sort"
Expand Down Expand Up @@ -44,11 +45,23 @@ type Transaction struct {

TransactionData
ID int `json:"id" bun:"id,type:numeric"`
RevertedAt *time.Time `json:"revertedAt" bun:"reverted_at,type:timestamp without time zone"`
RevertedAt *time.Time `json:"revertedAt,omitempty" bun:"reverted_at,type:timestamp without time zone"`
PostCommitVolumes PostCommitVolumes `json:"postCommitVolumes,omitempty" bun:"post_commit_volumes,type:jsonb"`
PostCommitEffectiveVolumes PostCommitVolumes `json:"postCommitEffectiveVolumes,omitempty" bun:"post_commit_effective_volumes,type:jsonb,scanonly"`
}

func (Transaction) JSONSchemaExtend(schema *jsonschema.Schema) {
schema.Properties.Set("reverted", &jsonschema.Schema{
Type: "boolean",
})
postCommitVolumesSchema, present := schema.Properties.Get("postCommitVolumes")
if !present {
panic("missing postCommitVolumes schema")
}
schema.Properties.Set("preCommitVolumes", postCommitVolumesSchema)
schema.Properties.Set("preCommitEffectiveVolumes", postCommitVolumesSchema)
}

func (tx Transaction) Reverse(atEffectiveDate bool) Transaction {
ret := NewTransaction().WithPostings(tx.Postings.Reverse()...)
if atEffectiveDate {
Expand Down
6 changes: 6 additions & 0 deletions internal/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ledger

import (
"encoding/json"
"github.com/invopop/jsonschema"
"math/big"
)

Expand All @@ -10,6 +11,11 @@ type Volumes struct {
Output *big.Int `json:"output"`
}

func (Volumes) JSONSchemaExtend(schema *jsonschema.Schema) {
inputProperty, _ := schema.Properties.Get("input")
schema.Properties.Set("balance", inputProperty)
}

func (v Volumes) Copy() Volumes {
return Volumes{
Input: new(big.Int).Set(v.Input),
Expand Down
99 changes: 99 additions & 0 deletions pkg/testserver/matchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,19 @@ package testserver

import (
"context"
"encoding/json"
"fmt"
"github.com/formancehq/go-libs/pointer"
"github.com/formancehq/go-libs/publish"
"github.com/formancehq/stack/ledger/client/models/operations"
"github.com/google/go-cmp/cmp"
"github.com/invopop/jsonschema"
"github.com/nats-io/nats.go"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/types"
"github.com/xeipuuv/gojsonschema"
"math/big"
"reflect"
)

type HaveCoherentStateMatcher struct{}
Expand Down Expand Up @@ -61,3 +68,95 @@ var _ types.GomegaMatcher = (*HaveCoherentStateMatcher)(nil)
func HaveCoherentState() *HaveCoherentStateMatcher {
return &HaveCoherentStateMatcher{}
}

// todo(libs): move in shared libs
type EventMatcher struct {
eventName string
expected any
validationErrors []gojsonschema.ResultError
diff string
}

func (e *EventMatcher) Match(actual any) (success bool, err error) {
msg, ok := actual.(*nats.Msg)
if !ok {
return false, fmt.Errorf("expected type %t", actual)
}

ev := publish.EventMessage{}
if err := json.Unmarshal(msg.Data, &ev); err != nil {
return false, fmt.Errorf("unable to unmarshal msg: %s", err)
}

rawSchema := jsonschema.Reflect(e.expected)
data, err := json.Marshal(rawSchema)
if err != nil {
return false, fmt.Errorf("unable to marshal schema: %s", err)
}

schemaJsonLoader := gojsonschema.NewStringLoader(string(data))
schema, err := gojsonschema.NewSchema(schemaJsonLoader)
if err != nil {
return false, fmt.Errorf("unable to load json schema: %s", err)
}

dataJsonLoader := gojsonschema.NewRawLoader(ev.Payload)

validate, err := schema.Validate(dataJsonLoader)
if err != nil {
return false, err
}

if !validate.Valid() {
e.validationErrors = validate.Errors()
return false, fmt.Errorf("%s", validate.Errors())
}

marshaledPayload, err := json.Marshal(ev.Payload)
if err != nil {
return false, fmt.Errorf("unable to marshal payload: %s", err)
}

unmarshalledPayload := reflect.New(reflect.TypeOf(e.expected)).Interface()
if err := json.Unmarshal(marshaledPayload, unmarshalledPayload); err != nil {
return false, fmt.Errorf("unable to unmarshal payload: %s", err)
}

// unmarshalledPayload is actually a pointer
// as it is seen as "any" by the code, we use reflection to get the targeted valud
unmarshalledPayload = reflect.ValueOf(unmarshalledPayload).Elem().Interface()

diff := cmp.Diff(unmarshalledPayload, e.expected, cmp.Comparer(func(v1 *big.Int, v2 *big.Int) bool {
return v1.String() == v2.String()
}))
if diff != "" {
e.diff = diff
return false, nil
}

return true, nil
}

func (e *EventMatcher) FailureMessage(_ any) (message string) {
ret := "event does not match expectations"
for _, validationError := range e.validationErrors {
ret += validationError.String() + "\n"
}
if e.diff != "" {
ret += e.diff
ret += "\n"
}
return ret
}

func (e *EventMatcher) NegatedFailureMessage(_ any) (message string) {
return fmt.Sprintf("event should not match expected type %T", e.expected)
}

var _ types.GomegaMatcher = (*EventMatcher)(nil)

func Event(expected any) types.GomegaMatcher {
return &EventMatcher{
expected: expected,
}
}
35 changes: 34 additions & 1 deletion test/e2e/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"database/sql"
"encoding/json"
"fmt"
"github.com/formancehq/go-libs/metadata"
"github.com/formancehq/go-libs/time"
"github.com/formancehq/ledger/internal/bus"
"github.com/nats-io/nats.go"
"io"
"math/big"
Expand Down Expand Up @@ -141,7 +144,37 @@ var _ = Context("Ledger integration tests", func() {
msgs = testServer.GetValue().Subscribe()
})
It("should receive an event", func() {
Eventually(msgs).Should(Receive())
Eventually(msgs).Should(Receive(Event(bus.CommittedTransactions{
Ledger: "foo",
Transactions: []ledger.Transaction{{
TransactionData: ledger.TransactionData{
Postings: []ledger.Posting{
ledger.NewPosting("world", "bank", "USD/2", big.NewInt(100)),
},
Timestamp: time.New(tx.Timestamp),
InsertedAt: time.New(tx.Timestamp),
Metadata: metadata.Metadata{},
},
ID: 1,
PostCommitVolumes: ledger.PostCommitVolumes{
"world": {
"USD/2": ledger.NewVolumesInt64(0, 100),
},
"bank": {
"USD/2": ledger.NewVolumesInt64(100, 0),
},
},
PostCommitEffectiveVolumes: ledger.PostCommitVolumes{
"world": {
"USD/2": ledger.NewVolumesInt64(0, 100),
},
"bank": {
"USD/2": ledger.NewVolumesInt64(100, 0),
},
},
}},
AccountMetadata: ledger.AccountMetadata{},
})))
})
})
It("should be listable on api", func() {
Expand Down

0 comments on commit a0acec6

Please sign in to comment.