diff --git a/sdk/messaging/azservicebus/CHANGELOG.md b/sdk/messaging/azservicebus/CHANGELOG.md index 5b55ff84a344..7be6e7b4b724 100644 --- a/sdk/messaging/azservicebus/CHANGELOG.md +++ b/sdk/messaging/azservicebus/CHANGELOG.md @@ -1,9 +1,10 @@ # Release History -## 1.0.2 (2022-07-05) +## 1.0.2 (2022-07-07) ### Bugs Fixed +- Settlement of a message could hang if the link had been detached/closed. (#18530) - Cancelling link creation could leak a goroutine or, in rare conditions, a link. (#18479) ## 1.0.1 (2022-06-07) diff --git a/sdk/messaging/azservicebus/internal/amqpLinks_test.go b/sdk/messaging/azservicebus/internal/amqpLinks_test.go index d2194cc07f9b..c0e9f9f47397 100644 --- a/sdk/messaging/azservicebus/internal/amqpLinks_test.go +++ b/sdk/messaging/azservicebus/internal/amqpLinks_test.go @@ -97,6 +97,51 @@ func TestAMQPLinksBasic(t *testing.T) { require.EqualValues(t, entityPath, links.EntityPath()) } +func TestAMQPLinksSettleOnClosedLink(t *testing.T) { + // we're not going to use this client for these tests. + entityPath, cleanup := test.CreateExpiringQueue(t, nil) + defer cleanup() + + cs := test.GetConnectionString(t) + ns, err := NewNamespace(NamespaceWithConnectionString(cs)) + require.NoError(t, err) + + defer func() { _ = ns.Close(context.Background(), false) }() + + amqpLinks := NewAMQPLinks(NewAMQPLinksArgs{ + NS: ns, + EntityPath: entityPath, + CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) { + return newLinksForAMQPLinksTest(entityPath, session) + }, + GetRecoveryKindFunc: GetRecoveryKind, + }) + + lwid, err := amqpLinks.Get(context.Background()) + require.NoError(t, err) + + err = lwid.Sender.Send(context.Background(), &amqp.Message{ + Data: [][]byte{[]byte("hello world")}, + }) + require.NoError(t, err) + + require.NoError(t, lwid.Receiver.IssueCredit(1)) + + msg, err := lwid.Receiver.Receive(context.Background()) + require.NoError(t, err) + + receiverCloser := lwid.Receiver.(amqpwrap.AMQPReceiverCloser) + + err = receiverCloser.Close(context.Background()) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + err = lwid.Receiver.AcceptMessage(ctx, msg) + require.ErrorIs(t, err, amqp.ErrLinkClosed) +} + func TestAMQPLinksLive(t *testing.T) { // we're not going to use this client for tehse tests. entityPath, cleanup := test.CreateExpiringQueue(t, nil) diff --git a/sdk/messaging/azservicebus/internal/go-amqp/receiver.go b/sdk/messaging/azservicebus/internal/go-amqp/receiver.go index 17d0ef05f505..0075d7f514ed 100644 --- a/sdk/messaging/azservicebus/internal/go-amqp/receiver.go +++ b/sdk/messaging/azservicebus/internal/go-amqp/receiver.go @@ -290,8 +290,13 @@ func (r *Receiver) sendDisposition(first uint32, last *uint32, state encoding.De State: state, } - debug(1, "TX (sendDisposition): %s", fr) - return r.link.Session.txFrame(fr, nil) + select { + case <-r.link.Detached: + return r.link.err + default: + debug(1, "TX (sendDisposition): %s", fr) + return r.link.Session.txFrame(fr, nil) + } } func (r *Receiver) messageDisposition(ctx context.Context, msg *Message, state encoding.DeliveryState) error {