diff --git a/internal/bus/listener.go b/internal/bus/listener.go index 25403f478..23e35fa55 100644 --- a/internal/bus/listener.go +++ b/internal/bus/listener.go @@ -64,7 +64,12 @@ func (lis *LedgerListener) DeletedMetadata(ctx context.Context, l string, target } func (lis *LedgerListener) publish(ctx context.Context, topic string, ev publish.EventMessage) { - if err := lis.publisher.Publish(topic, publish.NewMessage(ctx, ev)); err != nil { + msg := publish.NewMessage(ctx, ev) + logging.FromContext(ctx).WithFields(map[string]any{ + "payload": string(msg.Payload), + "topic": topic, + }).Debugf("send event %s", ev.Type) + if err := lis.publisher.Publish(topic, msg); err != nil { logging.FromContext(ctx).Errorf("publishing message: %s", err) return } diff --git a/pkg/testserver/server.go b/pkg/testserver/server.go index 2a0ff86b9..618c19124 100644 --- a/pkg/testserver/server.go +++ b/pkg/testserver/server.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "github.com/formancehq/go-libs/publish" + "github.com/google/uuid" + "github.com/nats-io/nats.go" "io" "net/http" "strings" @@ -43,6 +45,7 @@ type Server struct { cancel func() ctx context.Context errorChan chan error + id string } func (s *Server) Start() { @@ -82,6 +85,7 @@ func (s *Server) Start() { args, "--" + publish.PublisherNatsEnabledFlag, "--" + publish.PublisherNatsURLFlag, s.configuration.NatsURL, + "--" + publish.PublisherTopicMappingFlag, fmt.Sprintf("*:%s", s.id), ) } @@ -182,10 +186,30 @@ func (s *Server) Database() *bun.DB { return db } +func (s *Server) Subscribe() chan *nats.Msg { + if s.configuration.NatsURL == "" { + require.Fail(s.t, "NATS URL must be set") + } + + ret := make(chan *nats.Msg) + conn, err := nats.Connect(s.configuration.NatsURL) + require.NoError(s.t, err) + + subscription, err := conn.Subscribe(s.id, func(msg *nats.Msg) { + ret <- msg + }) + require.NoError(s.t, err) + s.t.Cleanup(func() { + require.NoError(s.t, subscription.Unsubscribe()) + }) + return ret +} + func New(t T, configuration Configuration) *Server { srv := &Server{ t: t, configuration: configuration, + id: uuid.NewString()[:8], } t.Logf("Start testing server") srv.Start() diff --git a/test/e2e/integration_test.go b/test/e2e/integration_test.go index f1222f354..111b28d98 100644 --- a/test/e2e/integration_test.go +++ b/test/e2e/integration_test.go @@ -6,6 +6,7 @@ import ( "database/sql" "encoding/json" "fmt" + "github.com/nats-io/nats.go" "io" "math/big" @@ -130,6 +131,15 @@ var _ = Context("Ledger integration tests", func() { Expect(err).To(BeNil()) checkTx() }) + Context("when listening on event", func() { + var msgs chan *nats.Msg + BeforeEach(func() { + msgs = testServer.GetValue().Subscribe() + }) + It("should receive an event", func() { + Eventually(msgs).Should(Receive()) + }) + }) It("should be listable on api", func() { txs, err := ListTransactions(ctx, testServer.GetValue(), operations.V2ListTransactionsRequest{ Ledger: createLedgerRequest.Ledger,