Skip to content

Commit

Permalink
[azservicebus] Updating to handle disposition hang fix (#18530)
Browse files Browse the repository at this point in the history
Applying patch from Azure/go-amqp#171

The disposition functions, which we use for message settlement, can hang if the link is closed. The issue here is message disposition's initial send is done through the session and connection, which means a dead link won't be detected. 

When this happens we end up blocking on a channel that won't be closed since the disposition response will never come back.
  • Loading branch information
richardpark-msft authored Jul 5, 2022
1 parent 97eee56 commit 1c7431f
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 3 deletions.
3 changes: 2 additions & 1 deletion sdk/messaging/azservicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# Release History

## 1.0.2 (2022-07-05)
## 1.0.2 (2022-07-07)

### Bugs Fixed

- Settlement of a message could hang if the link had been detached/closed. (#18530)
- Cancelling link creation could leak a goroutine or, in rare conditions, a link. (#18479)

## 1.0.1 (2022-06-07)
Expand Down
45 changes: 45 additions & 0 deletions sdk/messaging/azservicebus/internal/amqpLinks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,51 @@ func TestAMQPLinksBasic(t *testing.T) {
require.EqualValues(t, entityPath, links.EntityPath())
}

func TestAMQPLinksSettleOnClosedLink(t *testing.T) {
// we're not going to use this client for these tests.
entityPath, cleanup := test.CreateExpiringQueue(t, nil)
defer cleanup()

cs := test.GetConnectionString(t)
ns, err := NewNamespace(NamespaceWithConnectionString(cs))
require.NoError(t, err)

defer func() { _ = ns.Close(context.Background(), false) }()

amqpLinks := NewAMQPLinks(NewAMQPLinksArgs{
NS: ns,
EntityPath: entityPath,
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
return newLinksForAMQPLinksTest(entityPath, session)
},
GetRecoveryKindFunc: GetRecoveryKind,
})

lwid, err := amqpLinks.Get(context.Background())
require.NoError(t, err)

err = lwid.Sender.Send(context.Background(), &amqp.Message{
Data: [][]byte{[]byte("hello world")},
})
require.NoError(t, err)

require.NoError(t, lwid.Receiver.IssueCredit(1))

msg, err := lwid.Receiver.Receive(context.Background())
require.NoError(t, err)

receiverCloser := lwid.Receiver.(amqpwrap.AMQPReceiverCloser)

err = receiverCloser.Close(context.Background())
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

err = lwid.Receiver.AcceptMessage(ctx, msg)
require.ErrorIs(t, err, amqp.ErrLinkClosed)
}

func TestAMQPLinksLive(t *testing.T) {
// we're not going to use this client for tehse tests.
entityPath, cleanup := test.CreateExpiringQueue(t, nil)
Expand Down
9 changes: 7 additions & 2 deletions sdk/messaging/azservicebus/internal/go-amqp/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,13 @@ func (r *Receiver) sendDisposition(first uint32, last *uint32, state encoding.De
State: state,
}

debug(1, "TX (sendDisposition): %s", fr)
return r.link.Session.txFrame(fr, nil)
select {
case <-r.link.Detached:
return r.link.err
default:
debug(1, "TX (sendDisposition): %s", fr)
return r.link.Session.txFrame(fr, nil)
}
}

func (r *Receiver) messageDisposition(ctx context.Context, msg *Message, state encoding.DeliveryState) error {
Expand Down

0 comments on commit 1c7431f

Please sign in to comment.