From 1c7431f8bd7168deb79174a4f150735dd6883bd7 Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Tue, 5 Jul 2022 16:37:57 -0700 Subject: [PATCH] [azservicebus] Updating to handle disposition hang fix (#18530) Applying patch from https://github.com/Azure/go-amqp/pull/171 The disposition functions, which we use for message settlement, can hang if the link is closed. The issue here is message disposition's initial send is done through the session and connection, which means a dead link won't be detected. When this happens we end up blocking on a channel that won't be closed since the disposition response will never come back. --- sdk/messaging/azservicebus/CHANGELOG.md | 3 +- .../azservicebus/internal/amqpLinks_test.go | 45 +++++++++++++++++++ .../azservicebus/internal/go-amqp/receiver.go | 9 +++- 3 files changed, 54 insertions(+), 3 deletions(-) diff --git a/sdk/messaging/azservicebus/CHANGELOG.md b/sdk/messaging/azservicebus/CHANGELOG.md index 5b55ff84a344..7be6e7b4b724 100644 --- a/sdk/messaging/azservicebus/CHANGELOG.md +++ b/sdk/messaging/azservicebus/CHANGELOG.md @@ -1,9 +1,10 @@ # Release History -## 1.0.2 (2022-07-05) +## 1.0.2 (2022-07-07) ### Bugs Fixed +- Settlement of a message could hang if the link had been detached/closed. (#18530) - Cancelling link creation could leak a goroutine or, in rare conditions, a link. (#18479) ## 1.0.1 (2022-06-07) diff --git a/sdk/messaging/azservicebus/internal/amqpLinks_test.go b/sdk/messaging/azservicebus/internal/amqpLinks_test.go index d2194cc07f9b..c0e9f9f47397 100644 --- a/sdk/messaging/azservicebus/internal/amqpLinks_test.go +++ b/sdk/messaging/azservicebus/internal/amqpLinks_test.go @@ -97,6 +97,51 @@ func TestAMQPLinksBasic(t *testing.T) { require.EqualValues(t, entityPath, links.EntityPath()) } +func TestAMQPLinksSettleOnClosedLink(t *testing.T) { + // we're not going to use this client for these tests. + entityPath, cleanup := test.CreateExpiringQueue(t, nil) + defer cleanup() + + cs := test.GetConnectionString(t) + ns, err := NewNamespace(NamespaceWithConnectionString(cs)) + require.NoError(t, err) + + defer func() { _ = ns.Close(context.Background(), false) }() + + amqpLinks := NewAMQPLinks(NewAMQPLinksArgs{ + NS: ns, + EntityPath: entityPath, + CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) { + return newLinksForAMQPLinksTest(entityPath, session) + }, + GetRecoveryKindFunc: GetRecoveryKind, + }) + + lwid, err := amqpLinks.Get(context.Background()) + require.NoError(t, err) + + err = lwid.Sender.Send(context.Background(), &amqp.Message{ + Data: [][]byte{[]byte("hello world")}, + }) + require.NoError(t, err) + + require.NoError(t, lwid.Receiver.IssueCredit(1)) + + msg, err := lwid.Receiver.Receive(context.Background()) + require.NoError(t, err) + + receiverCloser := lwid.Receiver.(amqpwrap.AMQPReceiverCloser) + + err = receiverCloser.Close(context.Background()) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + err = lwid.Receiver.AcceptMessage(ctx, msg) + require.ErrorIs(t, err, amqp.ErrLinkClosed) +} + func TestAMQPLinksLive(t *testing.T) { // we're not going to use this client for tehse tests. entityPath, cleanup := test.CreateExpiringQueue(t, nil) diff --git a/sdk/messaging/azservicebus/internal/go-amqp/receiver.go b/sdk/messaging/azservicebus/internal/go-amqp/receiver.go index 17d0ef05f505..0075d7f514ed 100644 --- a/sdk/messaging/azservicebus/internal/go-amqp/receiver.go +++ b/sdk/messaging/azservicebus/internal/go-amqp/receiver.go @@ -290,8 +290,13 @@ func (r *Receiver) sendDisposition(first uint32, last *uint32, state encoding.De State: state, } - debug(1, "TX (sendDisposition): %s", fr) - return r.link.Session.txFrame(fr, nil) + select { + case <-r.link.Detached: + return r.link.err + default: + debug(1, "TX (sendDisposition): %s", fr) + return r.link.Session.txFrame(fr, nil) + } } func (r *Receiver) messageDisposition(ctx context.Context, msg *Message, state encoding.DeliveryState) error {