From d5ab991cce4e8e4aa63cab25ec7075f3d19d1d48 Mon Sep 17 00:00:00 2001 From: LuBashQ Date: Thu, 31 Oct 2024 19:05:20 +0200 Subject: [PATCH 01/25] feat(executor/nats): add pub/sub Signed-off-by: LuBashQ --- executors/nats/nats.go | 212 +++++++++++++++++++++++++++++++++++++++++ executors/registry.go | 2 + go.mod | 8 +- go.sum | 8 ++ tests/nats.yml | 41 ++++++++ 5 files changed, 270 insertions(+), 1 deletion(-) create mode 100644 executors/nats/nats.go create mode 100644 tests/nats.yml diff --git a/executors/nats/nats.go b/executors/nats/nats.go new file mode 100644 index 00000000..8fb4cd3b --- /dev/null +++ b/executors/nats/nats.go @@ -0,0 +1,212 @@ +package nats + +import ( + "context" + "fmt" + "time" + + "github.com/mitchellh/mapstructure" + "github.com/nats-io/nats.go" + "github.com/ovh/venom" +) + +const Name = "nats" + +const ( + defaultUrl = "nats://localhost:4222" + defaultConnectTimeout = 5 * time.Second + defaultReconnectTime = 1 * time.Second + defaultClientName = "Venom" +) + +const ( + defaultMessageLimit = 1 + defaultDeadline = 5 +) + +type Executor struct { + Command string `json:"command,omitempty" yaml:"command,omitempty"` + Url string `json:"url,omitempty" yaml:"url,omitempty"` + Subject string `json:"subject,omitempty" yaml:"subject,omitempty"` + Payload string `json:"payload,omitempty" yaml:"payload,omitempty"` + Header map[string][]string `json:"header,omitempty" yaml:"header,omitempty"` + MessageLimit int `json:"message_limit,omitempty" yaml:"messageLimit,omitempty"` + Deadline int `json:"deadline,omitempty" yaml:"deadline,omitempty"` + ReplySubject string `json:"reply_subject,omitempty" yaml:"replySubject,omitempty"` +} + +type Message struct { + Data interface{} `json:"data,omitempty" yaml:"data,omitempty"` + Header map[string][]string `json:"header,omitempty" yaml:"header,omitempty"` + Subject string `json:"subject,omitempty" yaml:"subject,omitempty"` + ReplySubject string `json:"reply_subject,omitempty" yaml:"replySubject,omitempty"` +} + +type Result struct { + Messages []Message `json:"messages,omitempty" yaml:"messages,omitempty"` + Error string `json:"error,omitempty" yaml:"error,omitempty"` +} + +func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, error) { + var e Executor + if err := mapstructure.Decode(step, &e); err != nil { + return nil, err + } + + session, err := e.session(ctx) + if err != nil { + return nil, err + } + defer session.Close() + + result := Result{} + + var cmdErr error + switch e.Command { + case "publish": + _, cmdErr = e.publish(ctx, session, false) + if cmdErr != nil { + result.Error = cmdErr.Error() + } + case "subscribe": + msgs, cmdErr := e.subscribe(ctx, session) + if cmdErr != nil { + result.Error = cmdErr.Error() + } else { + result.Messages = msgs + } + case "request": + reply, cmdErr := e.publish(ctx, session, true) + if cmdErr != nil { + result.Error = cmdErr.Error() + } else { + result.Messages = []Message{*reply} + venom.Debug(ctx, "Received reply message %+v", result.Messages) + } + } + + return result, nil +} + +func New() venom.Executor { + return &Executor{ + MessageLimit: defaultMessageLimit, + Deadline: defaultDeadline, + } +} + +func (e Executor) session(ctx context.Context) (*nats.Conn, error) { + if e.Url == "" { + venom.Warning(ctx, "No URL provided, using default %q", defaultUrl) + e.Url = defaultUrl + } + + opts := []nats.Option{ + nats.Timeout(defaultConnectTimeout), + nats.Name(defaultClientName), + nats.MaxReconnects(-1), + nats.ReconnectWait(defaultReconnectTime), + } + + venom.Debug(ctx, "Connecting to NATS server %q", e.Url) + + nc, err := nats.Connect(e.Url, opts...) + if err != nil { + return nil, err + } + + venom.Debug(ctx, "Connected to NATS server %q", nc.ConnectedAddr()) + + return nc, nil +} + +func (e Executor) publish(ctx context.Context, session *nats.Conn, isRequest bool) (*Message, error) { + if e.Subject == "" { + return nil, fmt.Errorf("subject is required") + } + + venom.Debug(ctx, "Publishing message to subject %q with payload %q", e.Subject, e.Payload) + + msg := nats.Msg{ + Subject: e.Subject, + Data: []byte(e.Payload), + Header: e.Header, + } + + var result Message + if isRequest { + if e.ReplySubject == "" { + return nil, fmt.Errorf("reply subject is required for request command") + } + msg.Reply = e.ReplySubject + + replyMsg, err := session.RequestMsg(&msg, time.Duration(5)*time.Second) + if err != nil { + return nil, err + } + + result = Message{ + Data: string(replyMsg.Data), + Header: replyMsg.Header, + Subject: msg.Subject, + ReplySubject: replyMsg.Subject, + } + } else { + err := session.PublishMsg(&msg) + if err != nil { + return nil, err + } + } + + venom.Debug(ctx, "Message published to subject %q", e.Subject) + + return &result, nil +} + +func (e Executor) subscribe(ctx context.Context, session *nats.Conn) ([]Message, error) { + if e.Subject == "" { + return nil, fmt.Errorf("subject is required") + } + + venom.Debug(ctx, "Subscribing to subject %q", e.Subject) + + results := make([]Message, e.MessageLimit) + + ch := make(chan *nats.Msg) + msgCount := 0 + sub, err := session.ChanSubscribe(e.Subject, ch) + if err != nil { + return nil, err + } + + ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Duration(e.Deadline)*time.Second) + defer cancel() + + venom.Debug(ctx, "Subscribed to subject %q with timeout %v and max messages %d", e.Subject, e.Deadline, e.MessageLimit) + + for { + select { + case msg := <-ch: + venom.Debug(ctx, "Received message #%d from subject %q with data %q", msgCount, e.Subject, string(msg.Data)) + + results[msgCount] = Message{ + Data: string(msg.Data), + Header: msg.Header, + Subject: msg.Subject, + } + + msgCount++ + + if msgCount >= e.MessageLimit { + err = sub.Unsubscribe() + if err != nil { + return nil, err + } + return results, nil + } + case <-ctxWithTimeout.Done(): + _ = sub.Unsubscribe() // even it if fails, we are done anyway + return nil, fmt.Errorf("timeout reached while waiting for message #%d from subject %q", msgCount, e.Subject) + } + } +} diff --git a/executors/registry.go b/executors/registry.go index 851b40b0..b799ff26 100644 --- a/executors/registry.go +++ b/executors/registry.go @@ -11,6 +11,7 @@ import ( "github.com/ovh/venom/executors/kafka" "github.com/ovh/venom/executors/mongo" "github.com/ovh/venom/executors/mqtt" + "github.com/ovh/venom/executors/nats" "github.com/ovh/venom/executors/ovhapi" "github.com/ovh/venom/executors/rabbitmq" "github.com/ovh/venom/executors/readfile" @@ -42,4 +43,5 @@ var Registry map[string]Constructor = map[string]Constructor{ ssh.Name: ssh.New, mongo.Name: mongo.New, web.Name: web.New, + nats.Name: nats.New, } diff --git a/go.mod b/go.mod index 4e37505a..846948a8 100644 --- a/go.mod +++ b/go.mod @@ -53,6 +53,11 @@ require ( modernc.org/sqlite v1.26.0 ) +require ( + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect +) + require ( github.com/ClickHouse/ch-go v0.55.0 // indirect github.com/ClickHouse/clickhouse-go/v2 v2.9.1 // indirect @@ -84,11 +89,12 @@ require ( github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/kevinramage/venomWeb v0.0.0-20230530195848-87f9f752bcfb - github.com/klauspost/compress v1.17.1 // indirect + github.com/klauspost/compress v1.17.2 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/montanaflynn/stats v0.7.1 // indirect github.com/mxk/go-imap v0.0.0-20150429134902-531c36c3f12d // indirect + github.com/nats-io/nats.go v1.37.0 github.com/paulmach/orb v0.9.0 // indirect github.com/pierrec/lz4/v4 v4.1.18 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/go.sum b/go.sum index dbe9a992..d87c6c5f 100644 --- a/go.sum +++ b/go.sum @@ -941,6 +941,8 @@ github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47e github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/compress v1.17.1 h1:NE3C767s2ak2bweCZo3+rdP4U/HoyVXLv/X9f2gPS5g= github.com/klauspost/compress v1.17.1/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= +github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -999,6 +1001,12 @@ github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8 github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= github.com/mxk/go-imap v0.0.0-20150429134902-531c36c3f12d h1:+DgqA2tuWi/8VU+gVgBAa7+WZrnFbPKhQWbKBB54cVs= github.com/mxk/go-imap v0.0.0-20150429134902-531c36c3f12d/go.mod h1:xacC5qXZnL/ooiitVoe3BtI1OotFTqi5zICBs9J5Fyk= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/ovh/cds/sdk/interpolate v0.0.0-20231019155847-e738a974db8f h1:EVG6OeiGCWtvDOD9ALqz6T0T5UcNWZ0vU2qGqLSbHT8= github.com/ovh/cds/sdk/interpolate v0.0.0-20231019155847-e738a974db8f/go.mod h1:sHqEJq74yP3ylfc/aX5f7S0HX7NNsHnl8LSgiNX3sc8= github.com/ovh/go-ovh v1.4.3 h1:Gs3V823zwTFpzgGLZNI6ILS4rmxZgJwJCz54Er9LwD0= diff --git a/tests/nats.yml b/tests/nats.yml new file mode 100644 index 00000000..5f3d194f --- /dev/null +++ b/tests/nats.yml @@ -0,0 +1,41 @@ +name: NATS testsuite +vars: + url: 'nats://localhost:4222' + baseSubject: "nats.test" + message: '{"message": "hello world"}' + +testcases: + - name: NATS publish testcase + steps: + - type: nats + url: "{{.url}}" + command: publish + subject: "{{.baseSubject}}.publish" + payload: '{{.message}}' + headers: + timestamp: + - "{{.venom.timestamp}}" + + - name: NATS subscribe testcase with deadline + steps: + - type: nats + url: "{{.url}}" + command: subscribe + subject: "{{.baseSubject}}.>" + messageLimit: 1 + deadline: 1 + assertions: + - result.error ShouldNotBeEmpty + - result.error ShouldContainSubstring "timeout reached" + + - name: NATS request/reply testcase + steps: + - type: nats + url: "{{.url}}" + command: request + subject: "{{.baseSubject}}.request" + replySubject: "{{.baseSubject}}.reply" + payload: '{{.message}}' + assertions: + - result.error ShouldBeEmpty + - result.messages.__Len__ ShouldEqual 1 \ No newline at end of file From 8174a7c66d3ba1ef287272b844d7c03981d17036 Mon Sep 17 00:00:00 2001 From: LuBashQ Date: Thu, 31 Oct 2024 19:39:51 +0200 Subject: [PATCH 02/25] feat(executor/nats): add integration test for subscriber cmd Signed-off-by: LuBashQ --- tests/nats.yml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/nats.yml b/tests/nats.yml index 5f3d194f..4fc1c519 100644 --- a/tests/nats.yml +++ b/tests/nats.yml @@ -16,6 +16,17 @@ testcases: timestamp: - "{{.venom.timestamp}}" + - name: NATS subscribe testcase + steps: + - type: nats + command: subscribe + subject: "{{.baseSubject}}.>" + messageLimit: 1 + deadline: 10 + assertions: + - result.error ShouldBeEmpty + - result.messages.__Len__ ShouldEqual 1 + - name: NATS subscribe testcase with deadline steps: - type: nats From 4a09969a1a8105d197156185a989591fc6290ed2 Mon Sep 17 00:00:00 2001 From: LuBashQ Date: Thu, 31 Oct 2024 20:04:12 +0200 Subject: [PATCH 03/25] feat(executor/nats): remove request command Remove request command and replace it with a boolean parameter Signed-off-by: LuBashQ --- executors/nats/nats.go | 19 +++++++------------ tests/nats.yml | 3 ++- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/executors/nats/nats.go b/executors/nats/nats.go index 8fb4cd3b..4312fa50 100644 --- a/executors/nats/nats.go +++ b/executors/nats/nats.go @@ -33,6 +33,7 @@ type Executor struct { MessageLimit int `json:"message_limit,omitempty" yaml:"messageLimit,omitempty"` Deadline int `json:"deadline,omitempty" yaml:"deadline,omitempty"` ReplySubject string `json:"reply_subject,omitempty" yaml:"replySubject,omitempty"` + Request bool `json:"request,omitempty" yaml:"request,omitempty"` } type Message struct { @@ -61,12 +62,14 @@ func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, erro result := Result{} - var cmdErr error switch e.Command { case "publish": - _, cmdErr = e.publish(ctx, session, false) + reply, cmdErr := e.publish(ctx, session) if cmdErr != nil { result.Error = cmdErr.Error() + } else { + result.Messages = []Message{*reply} + venom.Debug(ctx, "Received reply message %+v", result.Messages) } case "subscribe": msgs, cmdErr := e.subscribe(ctx, session) @@ -75,14 +78,6 @@ func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, erro } else { result.Messages = msgs } - case "request": - reply, cmdErr := e.publish(ctx, session, true) - if cmdErr != nil { - result.Error = cmdErr.Error() - } else { - result.Messages = []Message{*reply} - venom.Debug(ctx, "Received reply message %+v", result.Messages) - } } return result, nil @@ -120,7 +115,7 @@ func (e Executor) session(ctx context.Context) (*nats.Conn, error) { return nc, nil } -func (e Executor) publish(ctx context.Context, session *nats.Conn, isRequest bool) (*Message, error) { +func (e Executor) publish(ctx context.Context, session *nats.Conn) (*Message, error) { if e.Subject == "" { return nil, fmt.Errorf("subject is required") } @@ -134,7 +129,7 @@ func (e Executor) publish(ctx context.Context, session *nats.Conn, isRequest boo } var result Message - if isRequest { + if e.Request { if e.ReplySubject == "" { return nil, fmt.Errorf("reply subject is required for request command") } diff --git a/tests/nats.yml b/tests/nats.yml index 4fc1c519..bb9d73e3 100644 --- a/tests/nats.yml +++ b/tests/nats.yml @@ -43,7 +43,8 @@ testcases: steps: - type: nats url: "{{.url}}" - command: request + command: publish + request: true subject: "{{.baseSubject}}.request" replySubject: "{{.baseSubject}}.reply" payload: '{{.message}}' From cf8350315182ab799808a9060cd74bee3e60a63c Mon Sep 17 00:00:00 2001 From: LuBashQ Date: Thu, 31 Oct 2024 21:20:39 +0200 Subject: [PATCH 04/25] feat(executor/nats): add jetstream consumer subscribe options Signed-off-by: LuBashQ --- executors/nats/nats.go | 103 +++++++++++++++++++++++++++++++++++++---- tests/nats.yml | 31 +++++++++++++ 2 files changed, 126 insertions(+), 8 deletions(-) diff --git a/executors/nats/nats.go b/executors/nats/nats.go index 4312fa50..df9a9f93 100644 --- a/executors/nats/nats.go +++ b/executors/nats/nats.go @@ -7,6 +7,7 @@ import ( "github.com/mitchellh/mapstructure" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" "github.com/ovh/venom" ) @@ -24,6 +25,13 @@ const ( defaultDeadline = 5 ) +type JetstreamOptions struct { + Enabled bool `json:"enabled,omitempty" yaml:"enabled,omitempty"` + Stream string `json:"stream,omitempty" yaml:"stream,omitempty"` + Consumer string `json:"consumer,omitempty" yaml:"consumer,omitempty"` // if set search for a durable consumer, otherwise use an ephemeral one + FilterSubjects []string `json:"filterSubjects,omitempty" yaml:"filterSubjects,omitempty"` +} + type Executor struct { Command string `json:"command,omitempty" yaml:"command,omitempty"` Url string `json:"url,omitempty" yaml:"url,omitempty"` @@ -34,6 +42,7 @@ type Executor struct { Deadline int `json:"deadline,omitempty" yaml:"deadline,omitempty"` ReplySubject string `json:"reply_subject,omitempty" yaml:"replySubject,omitempty"` Request bool `json:"request,omitempty" yaml:"request,omitempty"` + Jetstream JetstreamOptions `json:"jetstream,omitempty" yaml:"jetstream,omitempty"` } type Message struct { @@ -48,6 +57,10 @@ type Result struct { Error string `json:"error,omitempty" yaml:"error,omitempty"` } +func (Executor) ZeroValueMessage() Message { + return Message{} +} + func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, error) { var e Executor if err := mapstructure.Decode(step, &e); err != nil { @@ -68,11 +81,19 @@ func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, erro if cmdErr != nil { result.Error = cmdErr.Error() } else { - result.Messages = []Message{*reply} + result.Messages = []Message{reply} venom.Debug(ctx, "Received reply message %+v", result.Messages) } case "subscribe": - msgs, cmdErr := e.subscribe(ctx, session) + var msgs []Message + var cmdErr error + + if e.Jetstream.Enabled { + msgs, cmdErr = e.subscribeJetstream(ctx, session) + } else { + msgs, cmdErr = e.subscribe(ctx, session) + } + if cmdErr != nil { result.Error = cmdErr.Error() } else { @@ -115,9 +136,9 @@ func (e Executor) session(ctx context.Context) (*nats.Conn, error) { return nc, nil } -func (e Executor) publish(ctx context.Context, session *nats.Conn) (*Message, error) { +func (e Executor) publish(ctx context.Context, session *nats.Conn) (Message, error) { if e.Subject == "" { - return nil, fmt.Errorf("subject is required") + return e.ZeroValueMessage(), fmt.Errorf("subject is required") } venom.Debug(ctx, "Publishing message to subject %q with payload %q", e.Subject, e.Payload) @@ -131,13 +152,13 @@ func (e Executor) publish(ctx context.Context, session *nats.Conn) (*Message, er var result Message if e.Request { if e.ReplySubject == "" { - return nil, fmt.Errorf("reply subject is required for request command") + return e.ZeroValueMessage(), fmt.Errorf("reply subject is required for request command") } msg.Reply = e.ReplySubject replyMsg, err := session.RequestMsg(&msg, time.Duration(5)*time.Second) if err != nil { - return nil, err + return e.ZeroValueMessage(), err } result = Message{ @@ -149,13 +170,13 @@ func (e Executor) publish(ctx context.Context, session *nats.Conn) (*Message, er } else { err := session.PublishMsg(&msg) if err != nil { - return nil, err + return e.ZeroValueMessage(), err } } venom.Debug(ctx, "Message published to subject %q", e.Subject) - return &result, nil + return result, nil } func (e Executor) subscribe(ctx context.Context, session *nats.Conn) ([]Message, error) { @@ -205,3 +226,69 @@ func (e Executor) subscribe(ctx context.Context, session *nats.Conn) ([]Message, } } } + +func (e Executor) subscribeJetstream(ctx context.Context, session *nats.Conn) ([]Message, error) { + if e.Jetstream.Stream == "" { + return nil, fmt.Errorf("jetstream stream name is required") + } + + js, err := jetstream.New(session) + if err != nil { + return nil, err + } + + ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Duration(e.Deadline)*time.Second) + defer cancel() + + stream, err := js.Stream(ctxWithTimeout, e.Jetstream.Stream) + if err != nil { + return nil, err + } + + var consumer jetstream.Consumer + var consErr error + if e.Jetstream.Consumer != "" { + consumer, consErr = stream.Consumer(ctxWithTimeout, e.Jetstream.Consumer) + if consErr != nil { + return nil, err + } + } else { + consumer, consErr = stream.CreateConsumer(ctxWithTimeout, jetstream.ConsumerConfig{ + FilterSubjects: e.Jetstream.FilterSubjects, + AckPolicy: jetstream.AckAllPolicy, + }) + if consErr != nil { + return nil, err + } + } + + results := make([]Message, e.MessageLimit) + + msgCount := 0 + done := make(chan struct{}) + cc, err := consumer.Consume(func(msg jetstream.Msg) { + venom.Debug(ctx, "received message from %s[%s]: %+v", consumer.CachedInfo().Stream, msg.Subject(), string(msg.Data())) + results[msgCount] = Message{ + Data: string(msg.Data()), + Header: msg.Headers(), + Subject: msg.Subject(), + ReplySubject: msg.Reply(), + } + msgCount++ + if msgCount == e.MessageLimit { + done <- struct{}{} + } + }, jetstream.PullMaxMessages(e.MessageLimit)) + + defer cc.Drain() + defer cc.Stop() + + for { + select { + case <-ctxWithTimeout.Done(): + return nil, fmt.Errorf("timeout reached while waiting for message #%d from subjects %v", msgCount, e.Jetstream.FilterSubjects) + case <-done: + return results, nil + } + } +} diff --git a/tests/nats.yml b/tests/nats.yml index bb9d73e3..bab9aa0c 100644 --- a/tests/nats.yml +++ b/tests/nats.yml @@ -5,6 +5,7 @@ vars: message: '{"message": "hello world"}' testcases: + - name: NATS publish testcase steps: - type: nats @@ -16,6 +17,36 @@ testcases: timestamp: - "{{.venom.timestamp}}" + - name: NATS subscribe Jetstream consumer testcase + steps: + - type: exec + script: | + nats stream create TEST --subjects "nats.test.js.>" --defaults + + - type: exec + script: | + nats pub "{{.baseSubject}}.js.hello" '{{.message}}' + nats pub --count 3 "{{.baseSubject}}.js.world" '{{.message}}' + + - type: nats + command: subscribe + subject: "{{.baseSubject}}.>" + messageLimit: 2 + deadline: 10 + jetstream: + enabled: true + stream: TEST + filterSubjects: + - "{{.baseSubject}}.js.hello" + - "{{.baseSubject}}.js.world" + assertions: + - result.error ShouldBeEmpty + - result.messages.__Len__ ShouldEqual 2 + + - type: exec + script: | + nats stream rm TEST -f + - name: NATS subscribe testcase steps: - type: nats From 342dca08bf8579b16d32b122697326a3a1c8bc54 Mon Sep 17 00:00:00 2001 From: LuBashQ Date: Thu, 31 Oct 2024 21:27:53 +0200 Subject: [PATCH 05/25] refactor(executor/nats): extract consumer creation Signed-off-by: LuBashQ --- executors/nats/nats.go | 39 ++++++++++++++++++++++++++++----------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/executors/nats/nats.go b/executors/nats/nats.go index df9a9f93..4e59f71c 100644 --- a/executors/nats/nats.go +++ b/executors/nats/nats.go @@ -227,20 +227,21 @@ func (e Executor) subscribe(ctx context.Context, session *nats.Conn) ([]Message, } } -func (e Executor) subscribeJetstream(ctx context.Context, session *nats.Conn) ([]Message, error) { - if e.Jetstream.Stream == "" { - return nil, fmt.Errorf("jetstream stream name is required") - } - +func (e Executor) jetstreamSession(session *nats.Conn) (jetstream.JetStream, error) { js, err := jetstream.New(session) if err != nil { return nil, err } + return js, err +} - ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Duration(e.Deadline)*time.Second) - defer cancel() +func (e Executor) getConsumer(ctx context.Context, session *nats.Conn) (jetstream.Consumer, error) { + js, err := e.jetstreamSession(session) + if err != nil { + return nil, err + } - stream, err := js.Stream(ctxWithTimeout, e.Jetstream.Stream) + stream, err := js.Stream(ctx, e.Jetstream.Stream) if err != nil { return nil, err } @@ -248,12 +249,12 @@ func (e Executor) subscribeJetstream(ctx context.Context, session *nats.Conn) ([ var consumer jetstream.Consumer var consErr error if e.Jetstream.Consumer != "" { - consumer, consErr = stream.Consumer(ctxWithTimeout, e.Jetstream.Consumer) + consumer, consErr = stream.Consumer(ctx, e.Jetstream.Consumer) if consErr != nil { return nil, err } } else { - consumer, consErr = stream.CreateConsumer(ctxWithTimeout, jetstream.ConsumerConfig{ + consumer, consErr = stream.CreateConsumer(ctx, jetstream.ConsumerConfig{ FilterSubjects: e.Jetstream.FilterSubjects, AckPolicy: jetstream.AckAllPolicy, }) @@ -262,10 +263,26 @@ func (e Executor) subscribeJetstream(ctx context.Context, session *nats.Conn) ([ } } - results := make([]Message, e.MessageLimit) + return consumer, nil +} +func (e Executor) subscribeJetstream(ctx context.Context, session *nats.Conn) ([]Message, error) { + if e.Jetstream.Stream == "" { + return nil, fmt.Errorf("jetstream stream name is required") + } + + ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Duration(e.Deadline)*time.Second) + defer cancel() + + consumer, err := e.getConsumer(ctx, session) + if err != nil { + return nil, err + } + + results := make([]Message, e.MessageLimit) msgCount := 0 done := make(chan struct{}) + cc, err := consumer.Consume(func(msg jetstream.Msg) { venom.Debug(ctx, "received message from %s[%s]: %+v", consumer.CachedInfo().Stream, msg.Subject(), string(msg.Data())) results[msgCount] = Message{ From 859e775d633c9cdb71f5bda009cbc5286eb07faf Mon Sep 17 00:00:00 2001 From: LuBashQ Date: Thu, 31 Oct 2024 21:39:01 +0200 Subject: [PATCH 06/25] refactor(executor/nats): add default url on executor creation Signed-off-by: LuBashQ --- executors/nats/nats.go | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/executors/nats/nats.go b/executors/nats/nats.go index 4e59f71c..76a2c61a 100644 --- a/executors/nats/nats.go +++ b/executors/nats/nats.go @@ -18,11 +18,8 @@ const ( defaultConnectTimeout = 5 * time.Second defaultReconnectTime = 1 * time.Second defaultClientName = "Venom" -) - -const ( - defaultMessageLimit = 1 - defaultDeadline = 5 + defaultMessageLimit = 1 + defaultDeadline = 5 ) type JetstreamOptions struct { @@ -108,15 +105,11 @@ func New() venom.Executor { return &Executor{ MessageLimit: defaultMessageLimit, Deadline: defaultDeadline, + Url: defaultUrl, } } func (e Executor) session(ctx context.Context) (*nats.Conn, error) { - if e.Url == "" { - venom.Warning(ctx, "No URL provided, using default %q", defaultUrl) - e.Url = defaultUrl - } - opts := []nats.Option{ nats.Timeout(defaultConnectTimeout), nats.Name(defaultClientName), From e6b1b81ae093ff576829952866094b4978c6f00c Mon Sep 17 00:00:00 2001 From: LuBashQ Date: Sat, 2 Nov 2024 14:20:28 +0200 Subject: [PATCH 07/25] feat(executor/nats): add jetstream message publishing Signed-off-by: LuBashQ --- executors/nats/nats.go | 40 +++++++++++++++++++++++++++++- tests/nats.yml | 56 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+), 1 deletion(-) diff --git a/executors/nats/nats.go b/executors/nats/nats.go index 76a2c61a..cd4506ee 100644 --- a/executors/nats/nats.go +++ b/executors/nats/nats.go @@ -2,6 +2,7 @@ package nats import ( "context" + "errors" "fmt" "time" @@ -74,7 +75,15 @@ func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, erro switch e.Command { case "publish": - reply, cmdErr := e.publish(ctx, session) + var cmdErr error + var reply Message + + if e.Jetstream.Enabled { + cmdErr = e.publishJetstream(ctx, session) + } else { + reply, cmdErr = e.publish(ctx, session) + } + if cmdErr != nil { result.Error = cmdErr.Error() } else { @@ -172,6 +181,35 @@ func (e Executor) publish(ctx context.Context, session *nats.Conn) (Message, err return result, nil } +func (e Executor) publishJetstream(ctx context.Context, session *nats.Conn) error { + if e.Subject == "" { + return fmt.Errorf("subject is required") + } + + js, err := e.jetstreamSession(session) + if err != nil { + return err + } + + msg := nats.Msg{ + Subject: e.Subject, + Data: []byte(e.Payload), + Header: e.Header, + } + + ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Duration(e.Deadline)*time.Second) + defer cancel() + + _, err = js.PublishMsg(ctxWithTimeout, &msg) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + return fmt.Errorf("timeout reached while waiting for ACK from NATS server") + } + return err + } + return nil +} + func (e Executor) subscribe(ctx context.Context, session *nats.Conn) ([]Message, error) { if e.Subject == "" { return nil, fmt.Errorf("subject is required") diff --git a/tests/nats.yml b/tests/nats.yml index bab9aa0c..2b1bea0c 100644 --- a/tests/nats.yml +++ b/tests/nats.yml @@ -16,6 +16,62 @@ testcases: headers: timestamp: - "{{.venom.timestamp}}" + - name: NATS publish Jetstream testcase + steps: + - type: exec + script: | + nats stream create TEST --subjects "{{.baseSubject}}.js.>" --defaults + + - type: nats + command: publish + subject: "{{.baseSubject}}.js.hello" + deadline: 2 + jetstream: + enabled: true + assertions: + - result.error ShouldBeEmpty + + - type: exec + script: | + nats stream rm TEST -f + + - name: NATS publish Jetstream non-stream subject testcase + steps: + - type: exec + script: | + nats stream create TEST --subjects "{{.baseSubject}}.js.>" --defaults + + - type: nats + command: publish + subject: "some.other.subject" + deadline: 2 + jetstream: + enabled: true + assertions: + - result.error ShouldNotBeEmpty + + - type: exec + script: | + nats stream rm TEST -f + + - name: NATS publish Jetstream empty subject testcase + steps: + - type: exec + script: | + nats stream create TEST --subjects "{{.baseSubject}}.js.>" --defaults + + - type: nats + command: publish + subject: "" + deadline: 2 + jetstream: + enabled: true + assertions: + - result.error ShouldNotBeEmpty + + - type: exec + script: | + nats stream rm TEST -f - name: NATS subscribe Jetstream consumer testcase steps: From 5b7e821b437a1ba984f71c701871c0b69910b763 Mon Sep 17 00:00:00 2001 From: LuBashQ Date: Sat, 2 Nov 2024 14:23:07 +0200 Subject: [PATCH 08/25] refactor(executor/nats): integration tests Signed-off-by: LuBashQ --- tests/nats.yml | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/tests/nats.yml b/tests/nats.yml index 2b1bea0c..07ec7b02 100644 --- a/tests/nats.yml +++ b/tests/nats.yml @@ -16,6 +16,22 @@ testcases: headers: timestamp: - "{{.venom.timestamp}}" + assertions: + - result.error ShouldBeEmpty + + - name: NATS publish empty subject testcase + steps: + - type: nats + url: "{{.url}}" + command: publish + subject: "" + payload: '{{.message}}' + headers: + timestamp: + - "{{.venom.timestamp}}" + assertions: + - result.error ShouldNotBeEmpty + - name: NATS publish Jetstream testcase steps: - type: exec @@ -77,8 +93,8 @@ testcases: steps: - type: exec script: | - nats stream create TEST --subjects "nats.test.js.>" --defaults - + nats stream create TEST --subjects "{{.baseSubject}}.js.>" --defaults + - type: exec script: | nats pub "{{.baseSubject}}.js.hello" '{{.message}}' @@ -103,17 +119,6 @@ testcases: script: | nats stream rm TEST -f - - name: NATS subscribe testcase - steps: - - type: nats - command: subscribe - subject: "{{.baseSubject}}.>" - messageLimit: 1 - deadline: 10 - assertions: - - result.error ShouldBeEmpty - - result.messages.__Len__ ShouldEqual 1 - - name: NATS subscribe testcase with deadline steps: - type: nats From 8ae0d0c369e395855858c86f3068da88c6ba6c66 Mon Sep 17 00:00:00 2001 From: LuBashQ Date: Sat, 2 Nov 2024 14:24:32 +0200 Subject: [PATCH 09/25] refactor(executor/nats): add more debug logs Signed-off-by: LuBashQ --- executors/nats/nats.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/executors/nats/nats.go b/executors/nats/nats.go index cd4506ee..bde4a3e8 100644 --- a/executors/nats/nats.go +++ b/executors/nats/nats.go @@ -88,7 +88,6 @@ func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, erro result.Error = cmdErr.Error() } else { result.Messages = []Message{reply} - venom.Debug(ctx, "Received reply message %+v", result.Messages) } case "subscribe": var msgs []Message @@ -169,6 +168,8 @@ func (e Executor) publish(ctx context.Context, session *nats.Conn) (Message, err Subject: msg.Subject, ReplySubject: replyMsg.Subject, } + + venom.Debug(ctx, "Received reply message %+v", result) } else { err := session.PublishMsg(&msg) if err != nil { @@ -310,6 +311,8 @@ func (e Executor) subscribeJetstream(ctx context.Context, session *nats.Conn) ([ return nil, err } + venom.Debug(ctx, "got consumer for %s%v", consumer.CachedInfo().Stream, consumer.CachedInfo().Config.FilterSubjects) + results := make([]Message, e.MessageLimit) msgCount := 0 done := make(chan struct{}) From db411e4b3d967a8b70407f84796d7196b62f12e1 Mon Sep 17 00:00:00 2001 From: LuBashQ Date: Sat, 2 Nov 2024 15:37:14 +0200 Subject: [PATCH 10/25] feat(executor/nats): add nats server to test stack Signed-off-by: LuBashQ --- tests/Makefile | 3 +++ tests/nats/nats.conf | 4 ++++ 2 files changed, 7 insertions(+) create mode 100644 tests/nats/nats.conf diff --git a/tests/Makefile b/tests/Makefile index ee45b9c5..f6d8c1f1 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -53,7 +53,10 @@ venom-qpid.cid: venom-grpc.cid: docker build -t venom-grpc-greeter ./grpc $(call docker_run,venom-grpc-greeter,grpc,-p 50051:50051) +venom-nats.cid: + $(call docker_run,nats,nats,-p 4222:4222 -v $(shell realpath nats/nats.conf):/nats-server.conf:ro) +start-test-stack: venom-nats.cid start-test-stack: venom-postgres.cid start-test-stack: venom-mysql.cid start-test-stack: venom-mongo.cid diff --git a/tests/nats/nats.conf b/tests/nats/nats.conf new file mode 100644 index 00000000..ad514619 --- /dev/null +++ b/tests/nats/nats.conf @@ -0,0 +1,4 @@ +host: 0.0.0.0 +port: 4222 + +jetstream {} \ No newline at end of file From b21e24022516d3a1046a5c0507fd3f0afa680219 Mon Sep 17 00:00:00 2001 From: LuBashQ Date: Sat, 2 Nov 2024 15:57:01 +0200 Subject: [PATCH 11/25] feat(executor/nats): remove request/reply test Signed-off-by: LuBashQ --- tests/nats.yml | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/tests/nats.yml b/tests/nats.yml index 07ec7b02..464a2056 100644 --- a/tests/nats.yml +++ b/tests/nats.yml @@ -129,17 +129,4 @@ testcases: deadline: 1 assertions: - result.error ShouldNotBeEmpty - - result.error ShouldContainSubstring "timeout reached" - - - name: NATS request/reply testcase - steps: - - type: nats - url: "{{.url}}" - command: publish - request: true - subject: "{{.baseSubject}}.request" - replySubject: "{{.baseSubject}}.reply" - payload: '{{.message}}' - assertions: - - result.error ShouldBeEmpty - - result.messages.__Len__ ShouldEqual 1 \ No newline at end of file + - result.error ShouldContainSubstring "timeout reached" \ No newline at end of file From 4464b01802865eb8f2d53e6f1c4f8e3179b35b5a Mon Sep 17 00:00:00 2001 From: LuBashQ Date: Sat, 2 Nov 2024 19:07:36 +0200 Subject: [PATCH 12/25] test(Makefile): add commands and args to docker_run Add command and command argument to build_docker function to support *box images, like busybox Signed-off-by: LuBashQ --- tests/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/Makefile b/tests/Makefile index f6d8c1f1..e314e3ca 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -4,7 +4,7 @@ PKI_VAR_FILE = ./pki_variables.yml include ./pki.mk define docker_run - docker run --name venom-$(2) -d $(3) $(1) > venom-$2.cid + docker run --name venom-$(2) -d $(3) $(1) $(4) > venom-$2.cid endef docker-network := venom-test-net From 12da00f972620604d1ec8a6d6d8872d7c74c9a15 Mon Sep 17 00:00:00 2001 From: LuBashQ Date: Sat, 2 Nov 2024 19:26:22 +0200 Subject: [PATCH 13/25] Revert "feat(executor/nats): remove request/reply test" This reverts commit 18b41b3fa9ab035f4e15b1b5e55233dcec189ba8. Signed-off-by: LuBashQ --- tests/nats.yml | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/tests/nats.yml b/tests/nats.yml index 464a2056..07ec7b02 100644 --- a/tests/nats.yml +++ b/tests/nats.yml @@ -129,4 +129,17 @@ testcases: deadline: 1 assertions: - result.error ShouldNotBeEmpty - - result.error ShouldContainSubstring "timeout reached" \ No newline at end of file + - result.error ShouldContainSubstring "timeout reached" + + - name: NATS request/reply testcase + steps: + - type: nats + url: "{{.url}}" + command: publish + request: true + subject: "{{.baseSubject}}.request" + replySubject: "{{.baseSubject}}.reply" + payload: '{{.message}}' + assertions: + - result.error ShouldBeEmpty + - result.messages.__Len__ ShouldEqual 1 \ No newline at end of file From 1f56fdd1ea718590a5d22afa3ad0f11a13880465 Mon Sep 17 00:00:00 2001 From: LuBashQ Date: Sat, 2 Nov 2024 19:45:34 +0200 Subject: [PATCH 14/25] test(executor/nats): add more test cases Signed-off-by: LuBashQ --- tests/nats.yml | 43 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/tests/nats.yml b/tests/nats.yml index 07ec7b02..ba24aede 100644 --- a/tests/nats.yml +++ b/tests/nats.yml @@ -119,7 +119,36 @@ testcases: script: | nats stream rm TEST -f - - name: NATS subscribe testcase with deadline + - name: NATS subscribe Jetstream consumer testcase with deadline + steps: + - type: exec + script: | + nats stream create TEST --subjects "{{.baseSubject}}.js.>" --defaults + + - type: exec + script: | + nats pub "{{.baseSubject}}.js.hello" '{{.message}}' + nats pub --count 1 "{{.baseSubject}}.js.world" '{{.message}}' + + - type: nats + command: subscribe + subject: "{{.baseSubject}}.>" + messageLimit: 10 + deadline: 1 + jetstream: + enabled: true + stream: TEST + filterSubjects: + - "{{.baseSubject}}.js.hello" + - "{{.baseSubject}}.js.world" + assertions: + - result.error ShouldNotBeEmpty + + - type: exec + script: | + nats stream rm TEST -f + + - name: NATS subscribe testcase steps: - type: nats url: "{{.url}}" @@ -127,6 +156,18 @@ testcases: subject: "{{.baseSubject}}.>" messageLimit: 1 deadline: 1 + assertions: + - result.error ShouldBeEmpty + - result.messages.__Len__ ShouldEqual 1 + + - name: NATS subscribe testcase with deadline + steps: + - type: nats + url: "{{.url}}" + command: subscribe + subject: "{{.baseSubject}}.>" + messageLimit: 1000 + deadline: 1 assertions: - result.error ShouldNotBeEmpty - result.error ShouldContainSubstring "timeout reached" From 4d042c5418b825287b53db653d931bd3f20d09d7 Mon Sep 17 00:00:00 2001 From: LuBashQ Date: Sat, 2 Nov 2024 19:48:19 +0200 Subject: [PATCH 15/25] test(Makefile): add nats publisher and replier clients Replier and publisher nats clients are used to test the `subscriber` command Signed-off-by: LuBashQ --- tests/Makefile | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/tests/Makefile b/tests/Makefile index e314e3ca..2fa6e8c3 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -8,6 +8,7 @@ define docker_run endef docker-network := venom-test-net +nats-network := venom-nats-network COVER_FILES = $(shell find . -name "*.coverprofile") PKGS_COMMA_SEP = go list -f '{{ join .Deps "\n" }}{{"\n"}}{{.ImportPath}}' . | grep github.com/ovh/venom | grep -v vendor | tr '\n' ',' | sed 's/,$$//' @@ -53,8 +54,15 @@ venom-qpid.cid: venom-grpc.cid: docker build -t venom-grpc-greeter ./grpc $(call docker_run,venom-grpc-greeter,grpc,-p 50051:50051) + +nats-server-name := nats-server +nats-server-addr := -s nats://venom-$(nats-server-name):4222 +nats-msg-body := {"hello": "world"} venom-nats.cid: - $(call docker_run,nats,nats,-p 4222:4222 -v $(shell realpath nats/nats.conf):/nats-server.conf:ro) + docker network create --driver bridge $(nats-network) + $(call docker_run,nats:2.10,$(nats-server-name),--network $(nats-network) -p 4222:4222 -v $(shell realpath nats/nats.conf):/nats-server.conf:ro) + $(call docker_run,natsio/nats-box,nats-replier,--network $(nats-network), nats $(nats-server-addr) reply nats.test.request --echo) + $(call docker_run,natsio/nats-box,nats-publisher,--network $(nats-network), nats $(nats-server-addr) pub --count=1_000_000 --sleep=1s nats.test.message '$(nats-msg-body)') start-test-stack: venom-nats.cid start-test-stack: venom-postgres.cid @@ -71,7 +79,7 @@ start-test-stack: $(PKI_DIR) start-test-stack: venom-grpc.cid stop-test-stack: - @for f in `ls -1 *.cid`; do docker stop `cat $${f}`; docker rm `cat $${f}`; done; rm -f *.cid; docker network rm $(docker-network) + @for f in `ls -1 *.cid`; do docker stop `cat $${f}`; docker rm `cat $${f}`; done; rm -f *.cid; docker network rm $(docker-network); docker network rm $(nats-network) build-test-binary: cd ../cmd/venom; \ From 3576949ba9b5b37fc8c9824075da22dc99b543e3 Mon Sep 17 00:00:00 2001 From: LuBashQ Date: Sun, 3 Nov 2024 11:10:46 +0200 Subject: [PATCH 16/25] docs(executor/nats): document structs and add debug logs Signed-off-by: LuBashQ --- executors/nats/nats.go | 34 +++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/executors/nats/nats.go b/executors/nats/nats.go index bde4a3e8..dd087311 100644 --- a/executors/nats/nats.go +++ b/executors/nats/nats.go @@ -25,24 +25,25 @@ const ( type JetstreamOptions struct { Enabled bool `json:"enabled,omitempty" yaml:"enabled,omitempty"` - Stream string `json:"stream,omitempty" yaml:"stream,omitempty"` - Consumer string `json:"consumer,omitempty" yaml:"consumer,omitempty"` // if set search for a durable consumer, otherwise use an ephemeral one + Stream string `json:"stream,omitempty" yaml:"stream,omitempty"` // Stream must exist before the command execution + Consumer string `json:"consumer,omitempty" yaml:"consumer,omitempty"` // If set search for a durable consumer, otherwise use an ephemeral one FilterSubjects []string `json:"filterSubjects,omitempty" yaml:"filterSubjects,omitempty"` } type Executor struct { - Command string `json:"command,omitempty" yaml:"command,omitempty"` + Command string `json:"command,omitempty" yaml:"command,omitempty"` // Must be publish or subscribe Url string `json:"url,omitempty" yaml:"url,omitempty"` Subject string `json:"subject,omitempty" yaml:"subject,omitempty"` Payload string `json:"payload,omitempty" yaml:"payload,omitempty"` Header map[string][]string `json:"header,omitempty" yaml:"header,omitempty"` MessageLimit int `json:"message_limit,omitempty" yaml:"messageLimit,omitempty"` - Deadline int `json:"deadline,omitempty" yaml:"deadline,omitempty"` + Deadline int `json:"deadline,omitempty" yaml:"deadline,omitempty"` // Describes the deadline in seconds from the start of the command ReplySubject string `json:"reply_subject,omitempty" yaml:"replySubject,omitempty"` - Request bool `json:"request,omitempty" yaml:"request,omitempty"` + Request bool `json:"request,omitempty" yaml:"request,omitempty"` // Describe that the publish command expects a reply from the NATS server Jetstream JetstreamOptions `json:"jetstream,omitempty" yaml:"jetstream,omitempty"` } +// Message describes a NATS message received from a consumer or a request publisher type Message struct { Data interface{} `json:"data,omitempty" yaml:"data,omitempty"` Header map[string][]string `json:"header,omitempty" yaml:"header,omitempty"` @@ -50,6 +51,7 @@ type Message struct { ReplySubject string `json:"reply_subject,omitempty" yaml:"replySubject,omitempty"` } +// Resuts describes a command result type Result struct { Messages []Message `json:"messages,omitempty" yaml:"messages,omitempty"` Error string `json:"error,omitempty" yaml:"error,omitempty"` @@ -142,8 +144,6 @@ func (e Executor) publish(ctx context.Context, session *nats.Conn) (Message, err return e.ZeroValueMessage(), fmt.Errorf("subject is required") } - venom.Debug(ctx, "Publishing message to subject %q with payload %q", e.Subject, e.Payload) - msg := nats.Msg{ Subject: e.Subject, Data: []byte(e.Payload), @@ -177,7 +177,7 @@ func (e Executor) publish(ctx context.Context, session *nats.Conn) (Message, err } } - venom.Debug(ctx, "Message published to subject %q", e.Subject) + venom.Debug(ctx, "Published message to subject %q with payload %q", e.Subject, e.Payload) return result, nil } @@ -187,7 +187,7 @@ func (e Executor) publishJetstream(ctx context.Context, session *nats.Conn) erro return fmt.Errorf("subject is required") } - js, err := e.jetstreamSession(session) + js, err := e.jetstreamSession(ctx, session) if err != nil { return err } @@ -208,6 +208,9 @@ func (e Executor) publishJetstream(ctx context.Context, session *nats.Conn) erro } return err } + + venom.Debug(ctx, "Published message to subject %q with payload %q", e.Subject, e.Payload) + return nil } @@ -259,16 +262,17 @@ func (e Executor) subscribe(ctx context.Context, session *nats.Conn) ([]Message, } } -func (e Executor) jetstreamSession(session *nats.Conn) (jetstream.JetStream, error) { +func (e Executor) jetstreamSession(ctx context.Context, session *nats.Conn) (jetstream.JetStream, error) { js, err := jetstream.New(session) if err != nil { return nil, err } + venom.Debug(ctx, "Jetstream session created") return js, err } func (e Executor) getConsumer(ctx context.Context, session *nats.Conn) (jetstream.Consumer, error) { - js, err := e.jetstreamSession(session) + js, err := e.jetstreamSession(ctx, session) if err != nil { return nil, err } @@ -278,6 +282,10 @@ func (e Executor) getConsumer(ctx context.Context, session *nats.Conn) (jetstrea return nil, err } + streamName := stream.CachedInfo().Config.Name + + venom.Debug(ctx, "Found stream %q", streamName) + var consumer jetstream.Consumer var consErr error if e.Jetstream.Consumer != "" { @@ -285,6 +293,7 @@ func (e Executor) getConsumer(ctx context.Context, session *nats.Conn) (jetstrea if consErr != nil { return nil, err } + venom.Debug(ctx, "Found existing consumer %s[%s]", streamName, e.Jetstream.Consumer) } else { consumer, consErr = stream.CreateConsumer(ctx, jetstream.ConsumerConfig{ FilterSubjects: e.Jetstream.FilterSubjects, @@ -293,6 +302,7 @@ func (e Executor) getConsumer(ctx context.Context, session *nats.Conn) (jetstrea if consErr != nil { return nil, err } + venom.Warn(ctx, "Consumer %s[%s] not found. Created ephemeral consumer", streamName, e.Jetstream.Consumer) } return consumer, nil @@ -311,8 +321,6 @@ func (e Executor) subscribeJetstream(ctx context.Context, session *nats.Conn) ([ return nil, err } - venom.Debug(ctx, "got consumer for %s%v", consumer.CachedInfo().Stream, consumer.CachedInfo().Config.FilterSubjects) - results := make([]Message, e.MessageLimit) msgCount := 0 done := make(chan struct{}) From 70479789045fbf601f368c0c8f71eea3caacc533 Mon Sep 17 00:00:00 2001 From: LuBashQ Date: Sun, 3 Nov 2024 13:01:54 +0200 Subject: [PATCH 17/25] feat(executor/nats): add TLS support Signed-off-by: LuBashQ --- executors/nats/nats.go | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/executors/nats/nats.go b/executors/nats/nats.go index dd087311..65b7177e 100644 --- a/executors/nats/nats.go +++ b/executors/nats/nats.go @@ -23,6 +23,15 @@ const ( defaultDeadline = 5 ) +// TlsOptions describes TLS authentication options to the NATS server. +type TlsOptions struct { + SelfSigned bool `json:"self_signed,omitempty" yaml:"selfSigned"` // Set to true if the NATS server uses self-signed certificates. Ca certificate is required if enabled. + ServerVerify bool `json:"server_verify,omitempty" yaml:"serverVerify"` // Set to true if the NATS server verifies the client identity. Certificate and Key are required if enabled. + CertificatePath string `json:"certificate_path,omitempty" yaml:"certificatePath"` + KeyPath string `json:"key_path,omitempty" yaml:"keyPath"` + CaPath string `json:"ca_certificate_path,omitempty" yaml:"caPath"` +} + type JetstreamOptions struct { Enabled bool `json:"enabled,omitempty" yaml:"enabled,omitempty"` Stream string `json:"stream,omitempty" yaml:"stream,omitempty"` // Stream must exist before the command execution @@ -41,6 +50,7 @@ type Executor struct { ReplySubject string `json:"reply_subject,omitempty" yaml:"replySubject,omitempty"` Request bool `json:"request,omitempty" yaml:"request,omitempty"` // Describe that the publish command expects a reply from the NATS server Jetstream JetstreamOptions `json:"jetstream,omitempty" yaml:"jetstream,omitempty"` + Tls *TlsOptions `json:"tls,omitempty" yaml:"tls"` } // Message describes a NATS message received from a consumer or a request publisher @@ -119,6 +129,29 @@ func New() venom.Executor { } } +func (tls TlsOptions) getTlsOptions() ([]nats.Option, error) { + opts := make([]nats.Option, 1, 2) + + if tls.SelfSigned { + if len(tls.CaPath) == 0 { + return nil, fmt.Errorf("TLS CA certificate is required if NATS server uses self signed CA") + } + opts = append(opts, nats.RootCAs(tls.CaPath)) + } + + if tls.ServerVerify { + if len(tls.CertificatePath) == 0 { + return nil, fmt.Errorf("TLS certificate is required if NATS server verifies clients") + } + if len(tls.KeyPath) == 0 { + return nil, fmt.Errorf("TLS key is required if NATS server vertifies clients") + } + opts = append(opts, nats.ClientCert(tls.CertificatePath, tls.KeyPath)) + } + + return opts, nil +} + func (e Executor) session(ctx context.Context) (*nats.Conn, error) { opts := []nats.Option{ nats.Timeout(defaultConnectTimeout), @@ -127,6 +160,14 @@ func (e Executor) session(ctx context.Context) (*nats.Conn, error) { nats.ReconnectWait(defaultReconnectTime), } + if e.Tls != nil { + tlsOpts, err := e.Tls.getTlsOptions() + if err != nil { + return nil, err + } + opts = append(opts, tlsOpts...) + } + venom.Debug(ctx, "Connecting to NATS server %q", e.Url) nc, err := nats.Connect(e.Url, opts...) From 7ec7c607eab768c94bab12bce8f2ba80afea5270 Mon Sep 17 00:00:00 2001 From: LuBashQ Date: Sun, 3 Nov 2024 13:02:26 +0200 Subject: [PATCH 18/25] test(executor/nats): generate PKI Signed-off-by: LuBashQ --- .gitignore | 1 + tests/nats/pki.mk | 61 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+) create mode 100644 tests/nats/pki.mk diff --git a/.gitignore b/.gitignore index 85eec240..f3d816df 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,4 @@ tests/pki_variables.yml *.xml test_results*.html !venom_output.html +tests/nats/pki \ No newline at end of file diff --git a/tests/nats/pki.mk b/tests/nats/pki.mk new file mode 100644 index 00000000..d2ec2130 --- /dev/null +++ b/tests/nats/pki.mk @@ -0,0 +1,61 @@ +$(NATS_PKI_DIR): + mkdir -p $(NATS_PKI_DIR) + + # create a certificate authority (CA) that both the client and server trust. + # The CA is just a public and private key with the public key wrapped up in a self-signed X.509 certificate. + openssl req \ + -new \ + -x509 \ + -nodes \ + -days 365 \ + -subj '/C=GB/O=Example/OU=TeamA/CN=ca.example.com' \ + -keyout $(NATS_PKI_DIR)/ca.key \ + -out $(NATS_PKI_DIR)/ca.crt + + # create the server’s key + openssl genrsa \ + -out $(NATS_PKI_DIR)/server.key 2048 + + # create a server Certificate Signing Request + openssl req \ + -new \ + -key $(NATS_PKI_DIR)/server.key \ + -subj '/C=GB/O=Example/OU=TeamA/CN=example.com' \ + -addext 'subjectAltName = DNS:venom-$(nats-server-name), DNS:localhost, IP:127.0.0.1, IP:::1' \ + -out $(NATS_PKI_DIR)/server.csr + + # creates the server signed certificate + openssl x509 \ + -req \ + -in $(NATS_PKI_DIR)/server.csr \ + -CA $(NATS_PKI_DIR)/ca.crt \ + -CAkey $(NATS_PKI_DIR)/ca.key \ + -CAcreateserial \ + -days 365 \ + -copy_extensions copy \ + -out $(NATS_PKI_DIR)/server.crt + + # create the client's key + openssl genrsa \ + -out $(NATS_PKI_DIR)/client.key 2048 + + # create a client Certificate Signing Request + openssl req \ + -new \ + -key $(NATS_PKI_DIR)/client.key \ + -subj '/CN=user1.example.com' \ + -out $(NATS_PKI_DIR)/client.csr + + # creates the client signed certificate + openssl x509 \ + -req \ + -in $(NATS_PKI_DIR)/client.csr \ + -CA $(NATS_PKI_DIR)/ca.crt \ + -CAkey $(NATS_PKI_DIR)/ca.key \ + -CAcreateserial \ + -days 365 \ + -out $(NATS_PKI_DIR)/client.crt + +.PHONY: clean-nats-pki +clean-nats-pki: + rm -fr $(NATS_PKI_DIR) \ No newline at end of file From 540a20b0df82a369653c61468e8a175c383cae5a Mon Sep 17 00:00:00 2001 From: LuBashQ Date: Sun, 3 Nov 2024 13:04:00 +0200 Subject: [PATCH 19/25] test(Makefile): enable TLS in NATS integration tests Signed-off-by: LuBashQ --- tests/Makefile | 27 ++++++++++--- tests/nats.yml | 92 +++++++++++++++++++++++++++++++++++++------- tests/nats/nats.conf | 9 ++++- 3 files changed, 108 insertions(+), 20 deletions(-) diff --git a/tests/Makefile b/tests/Makefile index 2fa6e8c3..036599e6 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -1,7 +1,10 @@ PKI_DIR := ./grpc/pki PKI_VAR_FILE = ./pki_variables.yml +NATS_PKI_DIR := ./nats/pki + include ./pki.mk +include ./nats/pki.mk define docker_run docker run --name venom-$(2) -d $(3) $(1) $(4) > venom-$2.cid @@ -56,14 +59,28 @@ venom-grpc.cid: $(call docker_run,venom-grpc-greeter,grpc,-p 50051:50051) nats-server-name := nats-server -nats-server-addr := -s nats://venom-$(nats-server-name):4222 +nats-server-addr := -s nats://venom-$(nats-server-name):4222 --tlscert=/pki/client.crt --tlskey=/pki/client.key --tlsca=/pki/ca.crt nats-msg-body := {"hello": "world"} venom-nats.cid: docker network create --driver bridge $(nats-network) - $(call docker_run,nats:2.10,$(nats-server-name),--network $(nats-network) -p 4222:4222 -v $(shell realpath nats/nats.conf):/nats-server.conf:ro) - $(call docker_run,natsio/nats-box,nats-replier,--network $(nats-network), nats $(nats-server-addr) reply nats.test.request --echo) - $(call docker_run,natsio/nats-box,nats-publisher,--network $(nats-network), nats $(nats-server-addr) pub --count=1_000_000 --sleep=1s nats.test.message '$(nats-msg-body)') + $(call docker_run,nats:2.10,$(nats-server-name),\ + --network $(nats-network)\ + -p 4222:4222\ + -v $(shell realpath nats/nats.conf):/nats-server.conf:ro\ + -v $(shell realpath nats/pki):/pki:ro\ + ) + $(call docker_run,natsio/nats-box,nats-replier,\ + --network $(nats-network)\ + -v $(shell realpath nats/pki):/pki:ro,\ + nats $(nats-server-addr) reply nats.test.request --echo\ + ) + $(call docker_run,natsio/nats-box,nats-publisher,\ + --network $(nats-network)\ + -v $(shell realpath nats/pki):/pki:ro,\ + nats $(nats-server-addr) pub --count=1_000_000 --sleep=1s nats.test.message '$(nats-msg-body)'\ + ) +start-test-stack: $(NATS_PKI_DIR) start-test-stack: venom-nats.cid start-test-stack: venom-postgres.cid start-test-stack: venom-mysql.cid @@ -100,7 +117,7 @@ wait-for-kafka: until curl --retry 30 --retry-connrefused --retry-max-time 100 --connect-timeout 10 -s http://localhost:8081/config/ -o /dev/null; do sleep 2; done ; \ echo "\n\033[0;32mdone\033[0m" -clean: +clean: clean-nats-pki @rm -f *.prof *.html *.xml *.log *.dump.json *.args.file *.error.out *.out *.test.out *.coverprofile merge-coverage: diff --git a/tests/nats.yml b/tests/nats.yml index ba24aede..1c03dbe6 100644 --- a/tests/nats.yml +++ b/tests/nats.yml @@ -3,6 +3,10 @@ vars: url: 'nats://localhost:4222' baseSubject: "nats.test" message: '{"message": "hello world"}' + certificatePath: "./nats/pki/client.crt" + keyPath: "./nats/pki/client.key" + caPath: "./nats/pki/ca.crt" + natsTlsConfig: "--tlscert {{.certificatePath}} --tlskey {{.keyPath}} --tlsca {{.caPath}}" testcases: @@ -10,6 +14,12 @@ testcases: steps: - type: nats url: "{{.url}}" + tls: + selfSigned: true + serverVerify: true + certificatePath: "{{.certificatePath}}" + keyPath: "{{.keyPath}}" + caPath: "{{.caPath}}" command: publish subject: "{{.baseSubject}}.publish" payload: '{{.message}}' @@ -23,6 +33,12 @@ testcases: steps: - type: nats url: "{{.url}}" + tls: + selfSigned: true + serverVerify: true + certificatePath: "{{.certificatePath}}" + keyPath: "{{.keyPath}}" + caPath: "{{.caPath}}" command: publish subject: "" payload: '{{.message}}' @@ -36,10 +52,16 @@ testcases: steps: - type: exec script: | - nats stream create TEST --subjects "{{.baseSubject}}.js.>" --defaults + nats {{.natsTlsConfig}} stream create TEST --subjects "{{.baseSubject}}.js.>" --defaults - type: nats command: publish + tls: + selfSigned: true + serverVerify: true + certificatePath: "{{.certificatePath}}" + keyPath: "{{.keyPath}}" + caPath: "{{.caPath}}" subject: "{{.baseSubject}}.js.hello" deadline: 2 jetstream: @@ -49,16 +71,22 @@ testcases: - type: exec script: | - nats stream rm TEST -f + nats {{.natsTlsConfig}} stream rm TEST -f - name: NATS publish Jetstream non-stream subject testcase steps: - type: exec script: | - nats stream create TEST --subjects "{{.baseSubject}}.js.>" --defaults + nats {{.natsTlsConfig}} stream create TEST --subjects "{{.baseSubject}}.js.>" --defaults - type: nats command: publish + tls: + selfSigned: true + serverVerify: true + certificatePath: "{{.certificatePath}}" + keyPath: "{{.keyPath}}" + caPath: "{{.caPath}}" subject: "some.other.subject" deadline: 2 jetstream: @@ -68,16 +96,22 @@ testcases: - type: exec script: | - nats stream rm TEST -f + nats {{.natsTlsConfig}} stream rm TEST -f - name: NATS publish Jetstream empty subject testcase steps: - type: exec script: | - nats stream create TEST --subjects "{{.baseSubject}}.js.>" --defaults + nats {{.natsTlsConfig}} stream create TEST --subjects "{{.baseSubject}}.js.>" --defaults - type: nats command: publish + tls: + selfSigned: true + serverVerify: true + certificatePath: "{{.certificatePath}}" + keyPath: "{{.keyPath}}" + caPath: "{{.caPath}}" subject: "" deadline: 2 jetstream: @@ -87,21 +121,27 @@ testcases: - type: exec script: | - nats stream rm TEST -f + nats {{.natsTlsConfig}} stream rm TEST -f - name: NATS subscribe Jetstream consumer testcase steps: - type: exec script: | - nats stream create TEST --subjects "{{.baseSubject}}.js.>" --defaults + nats {{.natsTlsConfig}} stream create TEST --subjects "{{.baseSubject}}.js.>" --defaults - type: exec script: | - nats pub "{{.baseSubject}}.js.hello" '{{.message}}' - nats pub --count 3 "{{.baseSubject}}.js.world" '{{.message}}' + nats {{.natsTlsConfig}} pub "{{.baseSubject}}.js.hello" '{{.message}}' + nats {{.natsTlsConfig}} pub --count 3 "{{.baseSubject}}.js.world" '{{.message}}' - type: nats command: subscribe + tls: + selfSigned: true + serverVerify: true + certificatePath: "{{.certificatePath}}" + keyPath: "{{.keyPath}}" + caPath: "{{.caPath}}" subject: "{{.baseSubject}}.>" messageLimit: 2 deadline: 10 @@ -117,21 +157,27 @@ testcases: - type: exec script: | - nats stream rm TEST -f + nats {{.natsTlsConfig}} stream rm TEST -f - name: NATS subscribe Jetstream consumer testcase with deadline steps: - type: exec script: | - nats stream create TEST --subjects "{{.baseSubject}}.js.>" --defaults + nats {{.natsTlsConfig}} stream create TEST --subjects "{{.baseSubject}}.js.>" --defaults - type: exec script: | - nats pub "{{.baseSubject}}.js.hello" '{{.message}}' - nats pub --count 1 "{{.baseSubject}}.js.world" '{{.message}}' + nats {{.natsTlsConfig}} pub "{{.baseSubject}}.js.hello" '{{.message}}' + nats {{.natsTlsConfig}} pub --count 1 "{{.baseSubject}}.js.world" '{{.message}}' - type: nats command: subscribe + tls: + selfSigned: true + serverVerify: true + certificatePath: "{{.certificatePath}}" + keyPath: "{{.keyPath}}" + caPath: "{{.caPath}}" subject: "{{.baseSubject}}.>" messageLimit: 10 deadline: 1 @@ -146,13 +192,19 @@ testcases: - type: exec script: | - nats stream rm TEST -f + nats {{.natsTlsConfig}} stream rm TEST -f - name: NATS subscribe testcase steps: - type: nats url: "{{.url}}" command: subscribe + tls: + selfSigned: true + serverVerify: true + certificatePath: "{{.certificatePath}}" + keyPath: "{{.keyPath}}" + caPath: "{{.caPath}}" subject: "{{.baseSubject}}.>" messageLimit: 1 deadline: 1 @@ -165,6 +217,12 @@ testcases: - type: nats url: "{{.url}}" command: subscribe + tls: + selfSigned: true + serverVerify: true + certificatePath: "{{.certificatePath}}" + keyPath: "{{.keyPath}}" + caPath: "{{.caPath}}" subject: "{{.baseSubject}}.>" messageLimit: 1000 deadline: 1 @@ -177,6 +235,12 @@ testcases: - type: nats url: "{{.url}}" command: publish + tls: + selfSigned: true + serverVerify: true + certificatePath: "{{.certificatePath}}" + keyPath: "{{.keyPath}}" + caPath: "{{.caPath}}" request: true subject: "{{.baseSubject}}.request" replySubject: "{{.baseSubject}}.reply" diff --git a/tests/nats/nats.conf b/tests/nats/nats.conf index ad514619..47957e19 100644 --- a/tests/nats/nats.conf +++ b/tests/nats/nats.conf @@ -1,4 +1,11 @@ host: 0.0.0.0 port: 4222 -jetstream {} \ No newline at end of file +jetstream {} + +tls { + cert_file: "/pki/server.crt" + key_file: "/pki/server.key" + ca_file: "/pki/ca.crt" + verify: true +} \ No newline at end of file From 5ac9cd4290a26d49a134515c170f7027c9c53382 Mon Sep 17 00:00:00 2001 From: LuBashQ Date: Sun, 3 Nov 2024 13:35:53 +0200 Subject: [PATCH 20/25] docs(executor/nats): add README.md Signed-off-by: LuBashQ --- executors/nats/README.md | 117 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) create mode 100644 executors/nats/README.md diff --git a/executors/nats/README.md b/executors/nats/README.md new file mode 100644 index 00000000..e8abf8af --- /dev/null +++ b/executors/nats/README.md @@ -0,0 +1,117 @@ +# Venom - Executor NATS + +Step to publish and subscribe to NATS subjects. + +## Input + +### Defaults + +This step includes some default values: + +- `url`: defaults to `nats://localhost:4222` +- `messageLimit`: defaults to 1 +- `deadline`: defaults to 1 second + +### Authentication + +This step allows for connection with and without TLS. Without TLS, the step does not require additional options. + +To connect to a NATS server with TLS, declare: + +```yaml +tls: + selfSigned: true + serverVerify: true + certificatePath: "/path/to/client_certificate" + keyPath: "/path/to/client_key" + caPath: ""/path/to/ca_certificate"" +``` + +Enable `selfSigned` only if the NATS server uses self-signed certificates. If enabled, `caPath` is mandatory. + +Enable `serverVerify` only if the NATS server verifies the client certificates. If enabled `certificatePath` and `keyPath` are mandatory. + +### publish + +The publish command allows to publish a payload to a specific NATS subject. Optionally it can wait for a reply. + +Full configuration example: + +```yaml +- type: nats + url: "{{.url}}" # defaults to nats://localhost:4222 if not set + command: publish + subject: "{{.subject}}" # mandatory + payload: '{{.message}}' + headers: + customHeader: + - "some-value" + assertions: + - result.error ShouldBeEmpty +``` + +Full configuration with reply example: + +```yaml +- type: nats + url: "{{.url}}" # defaults to nats://localhost:4222 if not set + command: publish + request: true + subject: "{{.subject}}" # mandatory + replySubject: "{{.subject}}.reply" # mandatory if `request = true` + payload: '{{.message}}' + assertions: + - result.error ShouldBeEmpty + - result.messages.__Len__ ShouldEqual 1 +``` + +It is possible to publish to a Jetstream stream by declaring `jetstream: true` in the step. + + For example: + +```yaml +- type: nats + command: publish + subject: "{{.subject}}.hello" # mandatory + deadline: 2 + jetstream: + enabled: true + assertions: + - result.error ShouldNotBeEmpty +``` + +### subscribe + +The subscribe command allows to receive messages from a subject or a stream. + +Full configuration example: + +```yaml +- type: nats + command: subscribe + subject: "{{.subject}}.>" # mandatory + messageLimit: 2 # defaults to 1 + deadline: 10 # in seconds, defaults to 1 + assertions: + - result.error ShouldBeEmpty + - result.messages.__Len__ ShouldEqual 2 +``` + +Full configuration example with Jetstream: + +```yaml +- type: nats + command: subscribe + subject: "{{.subject}}.>" # mandatory + messageLimit: 2 # defaults to 1 + deadline: 10 # in seconds, defaults to 1 + jetstream: + enabled: true + stream: TEST # mandatory, stream must exist + filterSubjects: + - "{{.subject}}.js.hello" + - "{{.subject}}.js.world" + assertions: + - result.error ShouldBeEmpty + - result.messages.__Len__ ShouldEqual 2 +``` From 309b8b45e30b16bb3d16f15deea95e3f40aa72f3 Mon Sep 17 00:00:00 2001 From: LuBashQ Date: Sun, 3 Nov 2024 13:38:39 +0200 Subject: [PATCH 21/25] docs(executor/nats): specify better available commands Signed-off-by: LuBashQ --- executors/nats/README.md | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/executors/nats/README.md b/executors/nats/README.md index e8abf8af..45105cf1 100644 --- a/executors/nats/README.md +++ b/executors/nats/README.md @@ -2,6 +2,11 @@ Step to publish and subscribe to NATS subjects. +Currently two commands are supported: + +- `publish` +- `subscribe` + ## Input ### Defaults @@ -31,9 +36,9 @@ Enable `selfSigned` only if the NATS server uses self-signed certificates. If en Enable `serverVerify` only if the NATS server verifies the client certificates. If enabled `certificatePath` and `keyPath` are mandatory. -### publish +### publish command -The publish command allows to publish a payload to a specific NATS subject. Optionally it can wait for a reply. +The `publish` command allows to publish a payload to a specific NATS subject. Optionally it can wait for a reply. Full configuration example: @@ -80,9 +85,9 @@ It is possible to publish to a Jetstream stream by declaring `jetstream: true` i - result.error ShouldNotBeEmpty ``` -### subscribe +### subscribe command -The subscribe command allows to receive messages from a subject or a stream. +The `subscribe` command allows to receive messages from a subject or a stream. Full configuration example: From 71038450855e07a7453e07960566d267ccfb6a12 Mon Sep 17 00:00:00 2001 From: LuBashQ Date: Fri, 8 Nov 2024 15:54:51 +0200 Subject: [PATCH 22/25] feat(executor/nats): add JSON support for message data Signed-off-by: LuBashQ --- executors/nats/nats.go | 57 ++++++++++++++++++++++++++++++++++-------- tests/Makefile | 2 +- tests/nats.yml | 8 +++++- 3 files changed, 54 insertions(+), 13 deletions(-) diff --git a/executors/nats/nats.go b/executors/nats/nats.go index 65b7177e..06468350 100644 --- a/executors/nats/nats.go +++ b/executors/nats/nats.go @@ -2,6 +2,7 @@ package nats import ( "context" + "encoding/json" "errors" "fmt" "time" @@ -55,18 +56,31 @@ type Executor struct { // Message describes a NATS message received from a consumer or a request publisher type Message struct { - Data interface{} `json:"data,omitempty" yaml:"data,omitempty"` - Header map[string][]string `json:"header,omitempty" yaml:"header,omitempty"` - Subject string `json:"subject,omitempty" yaml:"subject,omitempty"` - ReplySubject string `json:"reply_subject,omitempty" yaml:"replySubject,omitempty"` + Data string `json:"data,omitempty" yaml:"data,omitempty"` + DataJson map[string]interface{} `json:"datajson,omitempty" yaml:"dataJson,omitempty"` + Header map[string][]string `json:"header,omitempty" yaml:"header,omitempty"` + Subject string `json:"subject,omitempty" yaml:"subject,omitempty"` + ReplySubject string `json:"replysubject,omitempty" yaml:"replySubject,omitempty"` } -// Resuts describes a command result +// Result describes a command result type Result struct { Messages []Message `json:"messages,omitempty" yaml:"messages,omitempty"` Error string `json:"error,omitempty" yaml:"error,omitempty"` } +func tryDecodeToJSON(data []byte) (map[string]interface{}, error) { + if len(data) == 0 { + return nil, fmt.Errorf("empty data") + } + var result map[string]interface{} + err := json.Unmarshal(data, &result) + if err != nil { + return nil, err + } + return result, nil +} + func (Executor) ZeroValueMessage() Message { return Message{} } @@ -203,8 +217,14 @@ func (e Executor) publish(ctx context.Context, session *nats.Conn) (Message, err return e.ZeroValueMessage(), err } + dataJson, err := tryDecodeToJSON(replyMsg.Data) + if err != nil { + venom.Debug(ctx, "data %s is not valid JSON ", replyMsg.Data) + } + result = Message{ Data: string(replyMsg.Data), + DataJson: dataJson, Header: replyMsg.Header, Subject: msg.Subject, ReplySubject: replyMsg.Subject, @@ -279,12 +299,19 @@ func (e Executor) subscribe(ctx context.Context, session *nats.Conn) ([]Message, for { select { case msg := <-ch: - venom.Debug(ctx, "Received message #%d from subject %q with data %q", msgCount, e.Subject, string(msg.Data)) + msgData := msg.Data + venom.Debug(ctx, "Received message #%d from subject %q with data %q", msgCount, e.Subject, string(msgData)) + + dataJson, err := tryDecodeToJSON(msgData) + if err != nil { + venom.Debug(ctx, "data %s is not valid JSON ", msgData) + } results[msgCount] = Message{ - Data: string(msg.Data), - Header: msg.Header, - Subject: msg.Subject, + Data: string(msgData), + DataJson: dataJson, + Header: msg.Header, + Subject: msg.Subject, } msgCount++ @@ -367,9 +394,17 @@ func (e Executor) subscribeJetstream(ctx context.Context, session *nats.Conn) ([ done := make(chan struct{}) cc, err := consumer.Consume(func(msg jetstream.Msg) { - venom.Debug(ctx, "received message from %s[%s]: %+v", consumer.CachedInfo().Stream, msg.Subject(), string(msg.Data())) + msgData := msg.Data() + venom.Debug(ctx, "received message from %s[%s]: %+v", consumer.CachedInfo().Stream, msg.Subject(), string(msgData)) + + dataJson, err := tryDecodeToJSON(msgData) + if err != nil { + venom.Debug(ctx, "data %s is not valid JSON ", msgData) + } + results[msgCount] = Message{ - Data: string(msg.Data()), + Data: string(msgData), + DataJson: dataJson, Header: msg.Headers(), Subject: msg.Subject(), ReplySubject: msg.Reply(), diff --git a/tests/Makefile b/tests/Makefile index 036599e6..8d11853a 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -60,7 +60,7 @@ venom-grpc.cid: nats-server-name := nats-server nats-server-addr := -s nats://venom-$(nats-server-name):4222 --tlscert=/pki/client.crt --tlskey=/pki/client.key --tlsca=/pki/ca.crt -nats-msg-body := {"hello": "world"} +nats-msg-body := {"message": "hello world"} venom-nats.cid: docker network create --driver bridge $(nats-network) $(call docker_run,nats:2.10,$(nats-server-name),\ diff --git a/tests/nats.yml b/tests/nats.yml index 1c03dbe6..a795c493 100644 --- a/tests/nats.yml +++ b/tests/nats.yml @@ -154,6 +154,8 @@ testcases: assertions: - result.error ShouldBeEmpty - result.messages.__Len__ ShouldEqual 2 + - result.messages.messages0.datajson ShouldContainKey message + - result.messages.messages0.datajson.message ShouldEqual "hello world" - type: exec script: | @@ -211,6 +213,8 @@ testcases: assertions: - result.error ShouldBeEmpty - result.messages.__Len__ ShouldEqual 1 + - result.messages.messages0.datajson ShouldContainKey message + - result.messages.messages0.datajson.message ShouldEqual "hello world" - name: NATS subscribe testcase with deadline steps: @@ -247,4 +251,6 @@ testcases: payload: '{{.message}}' assertions: - result.error ShouldBeEmpty - - result.messages.__Len__ ShouldEqual 1 \ No newline at end of file + - result.messages.__Len__ ShouldEqual 1 + - result.messages.messages0.datajson ShouldContainKey message + - result.messages.messages0.datajson.message ShouldEqual "hello world" \ No newline at end of file From 56afb190fb44a6af77ddf448c09b843d6da5ca3b Mon Sep 17 00:00:00 2001 From: LuBashQ Date: Mon, 11 Nov 2024 10:34:25 +0200 Subject: [PATCH 23/25] refactor(executor/nats): use nil check to check if Jetstream is enabled Signed-off-by: LuBashQ --- executors/nats/nats.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/executors/nats/nats.go b/executors/nats/nats.go index 06468350..ae4a51bb 100644 --- a/executors/nats/nats.go +++ b/executors/nats/nats.go @@ -34,14 +34,13 @@ type TlsOptions struct { } type JetstreamOptions struct { - Enabled bool `json:"enabled,omitempty" yaml:"enabled,omitempty"` Stream string `json:"stream,omitempty" yaml:"stream,omitempty"` // Stream must exist before the command execution Consumer string `json:"consumer,omitempty" yaml:"consumer,omitempty"` // If set search for a durable consumer, otherwise use an ephemeral one FilterSubjects []string `json:"filterSubjects,omitempty" yaml:"filterSubjects,omitempty"` } type Executor struct { - Command string `json:"command,omitempty" yaml:"command,omitempty"` // Must be publish or subscribe + Command string `json:"command,omitempty" yaml:"command,omitempty"` // Must be `publish` or `subscribe` Url string `json:"url,omitempty" yaml:"url,omitempty"` Subject string `json:"subject,omitempty" yaml:"subject,omitempty"` Payload string `json:"payload,omitempty" yaml:"payload,omitempty"` @@ -50,7 +49,7 @@ type Executor struct { Deadline int `json:"deadline,omitempty" yaml:"deadline,omitempty"` // Describes the deadline in seconds from the start of the command ReplySubject string `json:"reply_subject,omitempty" yaml:"replySubject,omitempty"` Request bool `json:"request,omitempty" yaml:"request,omitempty"` // Describe that the publish command expects a reply from the NATS server - Jetstream JetstreamOptions `json:"jetstream,omitempty" yaml:"jetstream,omitempty"` + Jetstream *JetstreamOptions `json:"jetstream,omitempty" yaml:"jetstream,omitempty"` Tls *TlsOptions `json:"tls,omitempty" yaml:"tls"` } @@ -104,7 +103,7 @@ func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, erro var cmdErr error var reply Message - if e.Jetstream.Enabled { + if e.Jetstream != nil { cmdErr = e.publishJetstream(ctx, session) } else { reply, cmdErr = e.publish(ctx, session) @@ -119,7 +118,7 @@ func (Executor) Run(ctx context.Context, step venom.TestStep) (interface{}, erro var msgs []Message var cmdErr error - if e.Jetstream.Enabled { + if e.Jetstream != nil { msgs, cmdErr = e.subscribeJetstream(ctx, session) } else { msgs, cmdErr = e.subscribe(ctx, session) From e24c5df3b60e1b625cad470507c7463e1e8b80bd Mon Sep 17 00:00:00 2001 From: LuBashQ Date: Mon, 11 Nov 2024 10:42:01 +0200 Subject: [PATCH 24/25] feat(executor/nats): Add delivery option to Jetstream Signed-off-by: LuBashQ --- executors/nats/nats.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/executors/nats/nats.go b/executors/nats/nats.go index ae4a51bb..3127b2ef 100644 --- a/executors/nats/nats.go +++ b/executors/nats/nats.go @@ -37,6 +37,20 @@ type JetstreamOptions struct { Stream string `json:"stream,omitempty" yaml:"stream,omitempty"` // Stream must exist before the command execution Consumer string `json:"consumer,omitempty" yaml:"consumer,omitempty"` // If set search for a durable consumer, otherwise use an ephemeral one FilterSubjects []string `json:"filterSubjects,omitempty" yaml:"filterSubjects,omitempty"` + DeliveryPolicy string `json:"delivery_policy,omitempty" yaml:"deliveryPolicy,omitempty"` // Must be last, new or all. Other values will default to jetstream.DeliverLastPolicy +} + +func (js JetstreamOptions) deliveryPolicy() jetstream.DeliverPolicy { + switch js.DeliveryPolicy { + case "last": + return jetstream.DeliverLastPolicy + case "new": + return jetstream.DeliverNewPolicy + case "all": + return jetstream.DeliverAllPolicy + default: + return jetstream.DeliverAllPolicy + } } type Executor struct { @@ -365,6 +379,7 @@ func (e Executor) getConsumer(ctx context.Context, session *nats.Conn) (jetstrea consumer, consErr = stream.CreateConsumer(ctx, jetstream.ConsumerConfig{ FilterSubjects: e.Jetstream.FilterSubjects, AckPolicy: jetstream.AckAllPolicy, + DeliverPolicy: e.Jetstream.deliveryPolicy(), }) if consErr != nil { return nil, err From a4d8a3ef6ccaff52d56d1eb50ebcdb5daeba8d93 Mon Sep 17 00:00:00 2001 From: LuBashQ Date: Mon, 11 Nov 2024 10:45:28 +0200 Subject: [PATCH 25/25] feat(executor/nats): Add ACK option to Jetstream Signed-off-by: LuBashQ --- executors/nats/nats.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/executors/nats/nats.go b/executors/nats/nats.go index 3127b2ef..2fd566ac 100644 --- a/executors/nats/nats.go +++ b/executors/nats/nats.go @@ -38,6 +38,7 @@ type JetstreamOptions struct { Consumer string `json:"consumer,omitempty" yaml:"consumer,omitempty"` // If set search for a durable consumer, otherwise use an ephemeral one FilterSubjects []string `json:"filterSubjects,omitempty" yaml:"filterSubjects,omitempty"` DeliveryPolicy string `json:"delivery_policy,omitempty" yaml:"deliveryPolicy,omitempty"` // Must be last, new or all. Other values will default to jetstream.DeliverLastPolicy + AckPolicy string `json:"ack_policy,omitempty" yaml:"ackPolicy,omitempty"` // Must be all, explicit or none. Other values will default to jetstream.AckNonePolicy } func (js JetstreamOptions) deliveryPolicy() jetstream.DeliverPolicy { @@ -53,6 +54,19 @@ func (js JetstreamOptions) deliveryPolicy() jetstream.DeliverPolicy { } } +func (js JetstreamOptions) ackPolicy() jetstream.AckPolicy { + switch js.DeliveryPolicy { + case "none": + return jetstream.AckNonePolicy + case "all": + return jetstream.AckAllPolicy + case "explicit": + return jetstream.AckExplicitPolicy + default: + return jetstream.AckNonePolicy + } +} + type Executor struct { Command string `json:"command,omitempty" yaml:"command,omitempty"` // Must be `publish` or `subscribe` Url string `json:"url,omitempty" yaml:"url,omitempty"`