From ebd9f69afc7f8a9839c285d37746084532ed2cf4 Mon Sep 17 00:00:00 2001 From: yashi Date: Tue, 30 Apr 2024 14:56:38 +0530 Subject: [PATCH 1/2] Fix the view handler to ensure, messages are viewed from RMQ in correct FCFS order --- mw/rabbitmq/int_test.go | 10 ++++++++++ mw/rabbitmq/retry.go | 13 +++++++++---- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/mw/rabbitmq/int_test.go b/mw/rabbitmq/int_test.go index c874ec8..679dd96 100644 --- a/mw/rabbitmq/int_test.go +++ b/mw/rabbitmq/int_test.go @@ -112,6 +112,7 @@ func Test_view(t *testing.T) { viewCount int expectedViewCount int name string + expectedMessages []string } cases := []test{ @@ -121,6 +122,7 @@ func Test_view(t *testing.T) { publishCount: 5, viewCount: 5, expectedViewCount: 5, + expectedMessages: []string{"bar-0", "bar-1", "bar-2", "bar-3", "bar-4"}, }, { name: "read excess number of messages than there are in the queue", @@ -128,6 +130,7 @@ func Test_view(t *testing.T) { publishCount: 5, viewCount: 10, expectedViewCount: 5, + expectedMessages: []string{"bar-0", "bar-1", "bar-2", "bar-3", "bar-4"}, }, { name: "read negative number of messages", @@ -135,6 +138,7 @@ func Test_view(t *testing.T) { publishCount: 5, viewCount: -1, expectedViewCount: 0, + expectedMessages: []string{}, }, { name: "read zero messages", @@ -142,6 +146,7 @@ func Test_view(t *testing.T) { viewCount: 0, publishCount: 5, expectedViewCount: 0, + expectedMessages: []string{}, }} for _, c := range cases { @@ -168,6 +173,11 @@ func Test_view(t *testing.T) { if len(events) != c.expectedViewCount { t.Errorf("expected to read %d messages but read %d", c.expectedViewCount, len(events)) } + for idx, event := range events { + if string(event.Value) != c.expectedMessages[idx] { + t.Errorf("expected message %s but got %s", c.expectedMessages[0], string(event.Value)) + } + } err = ar.DeleteQueuesAndExchanges(context.Background(), c.qname) if err != nil { t.Errorf("error deleting queues:%v", err) diff --git a/mw/rabbitmq/retry.go b/mw/rabbitmq/retry.go index 1600bfc..74695b8 100644 --- a/mw/rabbitmq/retry.go +++ b/mw/rabbitmq/retry.go @@ -246,9 +246,11 @@ func (r *ARetry) view(ctx context.Context, qnameWithType string, count int, ack actualCount = q.Messages } events := make([]*ziggurat.Event, actualCount) + var consumedDeliveries []amqp.Delivery for i := 0; i < actualCount; i++ { msg, _, err := ch.Get(qnameWithType, false) + if err != nil { return []*ziggurat.Event{}, err } @@ -259,15 +261,18 @@ func (r *ARetry) view(ctx context.Context, qnameWithType string, count int, ack return []*ziggurat.Event{}, err } + events[i] = &e + consumedDeliveries = append(consumedDeliveries, msg) + } + for _, delivery := range consumedDeliveries { var ackErr error if ack { - ackErr = msg.Ack(true) + ackErr = delivery.Ack(true) } else { - ackErr = msg.Reject(true) + ackErr = delivery.Reject(true) } - r.ogLogger.Error("", ackErr) - events[i] = &e + } r.ogLogger.Error("auto retry view: channel close error:", ch.Close()) return events, nil From b24b8e28339c40310aad6b7a68bf2ae00b358ee6 Mon Sep 17 00:00:00 2001 From: yashipro13 Date: Thu, 2 May 2024 12:44:57 +0530 Subject: [PATCH 2/2] Skip reject message explicitly --- mw/rabbitmq/int_test.go | 13 +++++++------ mw/rabbitmq/retry.go | 13 +++---------- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/mw/rabbitmq/int_test.go b/mw/rabbitmq/int_test.go index 679dd96..10ac9c7 100644 --- a/mw/rabbitmq/int_test.go +++ b/mw/rabbitmq/int_test.go @@ -166,16 +166,17 @@ func Test_view(t *testing.T) { } } queueName := fmt.Sprintf("%s_%s_%s", c.qname, "dlq", "queue") - events, err := ar.view(ctx, queueName, c.viewCount, false) + viewEventsOnce, err := ar.view(ctx, queueName, c.viewCount, false) + viewEventsTwice, err := ar.view(ctx, queueName, c.viewCount, false) if err != nil { t.Errorf("error viewing messages: %v", err) } - if len(events) != c.expectedViewCount { - t.Errorf("expected to read %d messages but read %d", c.expectedViewCount, len(events)) + if len(viewEventsOnce) != c.expectedViewCount { + t.Errorf("expected to read %d messages but read %d", c.expectedViewCount, len(viewEventsOnce)) } - for idx, event := range events { - if string(event.Value) != c.expectedMessages[idx] { - t.Errorf("expected message %s but got %s", c.expectedMessages[0], string(event.Value)) + for idx, event := range c.expectedMessages { + if string(viewEventsOnce[idx].Value) != event || string(viewEventsTwice[idx].Value) != event { + t.Errorf("expected message %s but got %s", event, string(viewEventsOnce[idx].Value)) } } err = ar.DeleteQueuesAndExchanges(context.Background(), c.qname) diff --git a/mw/rabbitmq/retry.go b/mw/rabbitmq/retry.go index 74695b8..42ff933 100644 --- a/mw/rabbitmq/retry.go +++ b/mw/rabbitmq/retry.go @@ -246,11 +246,9 @@ func (r *ARetry) view(ctx context.Context, qnameWithType string, count int, ack actualCount = q.Messages } events := make([]*ziggurat.Event, actualCount) - var consumedDeliveries []amqp.Delivery for i := 0; i < actualCount; i++ { msg, _, err := ch.Get(qnameWithType, false) - if err != nil { return []*ziggurat.Event{}, err } @@ -261,18 +259,13 @@ func (r *ARetry) view(ctx context.Context, qnameWithType string, count int, ack return []*ziggurat.Event{}, err } - events[i] = &e - consumedDeliveries = append(consumedDeliveries, msg) - } - for _, delivery := range consumedDeliveries { var ackErr error if ack { - ackErr = delivery.Ack(true) - } else { - ackErr = delivery.Reject(true) + ackErr = msg.Ack(true) } - r.ogLogger.Error("", ackErr) + r.ogLogger.Error("", ackErr) + events[i] = &e } r.ogLogger.Error("auto retry view: channel close error:", ch.Close()) return events, nil