Skip to content

Commit

Permalink
Bump Go stream client to v0.13-alpha (#756)
Browse files Browse the repository at this point in the history
* merge

* use builder instead of record

* merge

* Use the direct connection

* Re-Enable tests

* Trigger GitHub action

* merge

* Update to the 0.13-alpha stream client

* Run go mod tidy

* Minor formatting

Co-authored-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
ansd and Gsantomaggio authored Sep 3, 2021
1 parent bad691c commit 581e6c1
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 33 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/mikefarah/yq/v4 v4.12.1
github.com/onsi/ginkgo v1.16.4
github.com/onsi/gomega v1.16.0
github.com/rabbitmq/rabbitmq-stream-go-client v0.0.0-20210422170636-520637be5dde
github.com/rabbitmq/rabbitmq-stream-go-client v0.0.0-20210811090309-627299932bac
github.com/sclevine/yj v0.0.0-20200815061347-554173e71934
github.com/streadway/amqp v1.0.0
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781
Expand Down
5 changes: 2 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -695,8 +695,8 @@ github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rabbitmq/rabbitmq-stream-go-client v0.0.0-20210422170636-520637be5dde h1:mqccUEJlP1qGNDztZx7svxQ9c4nUMgkk/SaIunGbOKk=
github.com/rabbitmq/rabbitmq-stream-go-client v0.0.0-20210422170636-520637be5dde/go.mod h1:smigXgSjihJBEh9pqwqh+vWhNr2JNg7NN/3pX0qYWRQ=
github.com/rabbitmq/rabbitmq-stream-go-client v0.0.0-20210811090309-627299932bac h1:yzD97s5U78mUFEEC1qTFmBiQuViNUnBQaBhs6qjKWm8=
github.com/rabbitmq/rabbitmq-stream-go-client v0.0.0-20210811090309-627299932bac/go.mod h1:dZtRwYizmzN1EwkcW7/CoN9rFbGSEA7i/889iakOJZE=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uYEyJGbgTkfkS4+E/PavXkNJcbFIpEtjt2B0KDQ5+9M=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
Expand Down Expand Up @@ -1328,7 +1328,6 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las=
k8s.io/api v0.0.0-20190918155943-95b840bb6a1f/go.mod h1:uWuOHnjmNrtQomJrvEBg0c0HRNyQ+8KTEERVsK0PW48=
k8s.io/api v0.21.3 h1:cblWILbLO8ar+Fj6xdDGr603HRsf8Wu9E9rngJeprZQ=
k8s.io/api v0.21.3/go.mod h1:hUgeYHUbBp23Ue4qdX9tR8/ANi/g3ehylAqDn9NWVOg=
Expand Down
2 changes: 1 addition & 1 deletion system_tests/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ CONSOLE_LOG=new`
if strings.Contains(cluster.Spec.Image, ":3.8") || strings.HasSuffix(cluster.Spec.Image, "tanzu-rabbitmq:1") {
Skip("rabbitmq_stream plugin is not supported by RabbitMQ image " + cluster.Spec.Image)
}
publishAndConsumeStreamMsg(ctx, hostname, rabbitmqNodePort(ctx, clientSet, cluster, "stream"), username, password)
publishAndConsumeStreamMsg(hostname, rabbitmqNodePort(ctx, clientSet, cluster, "stream"), username, password)
})
})
})
72 changes: 44 additions & 28 deletions system_tests/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ import (
"github.com/go-stomp/stomp"
rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
streamamqp "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/streaming"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
"github.com/streadway/amqp"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -943,40 +944,55 @@ func publishAndConsumeSTOMPMsg(hostname, port, username, password string, tlsCon
ExpectWithOffset(1, conn.Disconnect()).To(Succeed())
}

func publishAndConsumeStreamMsg(ctx context.Context, hostname, port, username, password string) {
uri := fmt.Sprintf("rabbitmq-streaming://%s:%s@%s:%s/%%2f", username, password, hostname, port)
client, err := streaming.NewClientCreator().Uri(uri).Connect()
func publishAndConsumeStreamMsg(host, port, username, password string) {
portInt, err := strconv.Atoi(port)
Expect(err).ToNot(HaveOccurred())

streamName := "system-test-stream"
Expect(client.StreamCreator().Stream(streamName).Create()).To(Succeed())

var msgReceived []byte
consumer, err := client.ConsumerCreator().
Stream(streamName).
Name("system-test-consumer").
MessagesHandler(func(context streaming.ConsumerContext, message *streamamqp.Message) {
Expect(message.Data).To(HaveLen(1))
msgReceived = message.Data[0]
}).Build()
env, err := stream.NewEnvironment(stream.NewEnvironmentOptions().
SetHost(host).
SetPort(portInt).
SetPassword(password).
SetUser(username).
SetAddressResolver(stream.AddressResolver{
Host: host,
Port: portInt,
}))
Expect(err).ToNot(HaveOccurred())

msgSent := []byte("test message stream")
producer, err := client.ProducerCreator().Stream(streamName).Build()
Expect(err).ToNot(HaveOccurred())
_, err = producer.BatchPublish(ctx, []*streamamqp.Message{
streamamqp.NewMessage(msgSent)},
)
const streamName = "system-test-stream"
Expect(env.DeclareStream(
streamName,
&stream.StreamOptions{
MaxLengthBytes: stream.ByteCapacity{}.KB(1),
},
)).To(Succeed())

producer, err := env.NewProducer(streamName, nil)
Expect(err).ToNot(HaveOccurred())
chPublishConfirm := producer.NotifyPublishConfirmation()
const msgSent = "test message"
Expect(producer.BatchSend(
[]message.StreamMessage{streamamqp.NewMessage([]byte(msgSent))})).To(Succeed())
Eventually(chPublishConfirm).Should(Receive())
Expect(producer.Close()).To(Succeed())

Eventually(func() []byte {
var msgReceived string
handleMessages := func(consumerContext stream.ConsumerContext, message *streamamqp.Message) {
Expect(message.Data).To(HaveLen(1))
msgReceived = string(message.Data[0][:])
}
consumer, err := env.NewConsumer(
streamName,
handleMessages,
stream.NewConsumerOptions().
SetOffset(stream.OffsetSpecification{}.First()))
Expect(err).ToNot(HaveOccurred())
Eventually(func() string {
return msgReceived
}, 5*time.Second).Should(Equal(msgSent), "consumer should receive message sent by producer")

Expect(producer.Close()).To(Succeed())
Expect(consumer.UnSubscribe()).To(Succeed())
Expect(client.DeleteStream(streamName)).To(Succeed())
Expect(client.Close()).To(Succeed())
}).Should(Equal(msgSent), "consumer should receive message")
Expect(consumer.Close()).To(Succeed())
Expect(env.DeleteStream(streamName)).To(Succeed())
Expect(env.Close()).To(Succeed())
}

func pod(ctx context.Context, clientSet *kubernetes.Clientset, r *rabbitmqv1beta1.RabbitmqCluster, i int) *corev1.Pod {
Expand Down

0 comments on commit 581e6c1

Please sign in to comment.