Skip to content

Commit

Permalink
[azservicebus] Make ReceiveMessages overall timeout is controlled by …
Browse files Browse the repository at this point in the history
…ctx (#15980)

This PR changes the ReceiveMessages function so the overall timeout is controlled via the user's 'ctx' parameter, rather than having a separately specified 'MaxWaitTime' variable.
- (also updating to latest go-amqp with robustness fixes for ReceiveMessages)

Fixes #15981
  • Loading branch information
richardpark-msft authored Oct 29, 2021
1 parent 12210f6 commit ba76637
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 43 deletions.
16 changes: 7 additions & 9 deletions sdk/messaging/azservicebus/example_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,18 @@ func ExampleClient_NewReceiverForSubscription_deadLetterQueue() {
}

func ExampleReceiver_ReceiveMessages() {
// Receive a fixed set of messages. Note that the number of messages
// to receive and the amount of time to wait are upper bounds.
messages, err = receiver.ReceiveMessages(context.TODO(),
// ReceiveMessages respects the passed in context, and will gracefully stop
// receiving when 'ctx' is cancelled.
ctx, cancel := context.WithTimeout(context.TODO(), 60*time.Second)
defer cancel()

messages, err = receiver.ReceiveMessages(ctx,
// The number of messages to receive. Note this is merely an upper
// bound. It is possible to get fewer message (or zero), depending
// on the contents of the remote queue or subscription and network
// conditions.
1,
&azservicebus.ReceiveOptions{
// This configures the amount of time to wait for messages to arrive.
// Note that this is merely an upper bound. It is possible to get messages
// faster than the duration specified.
MaxWaitTime: 60 * time.Second,
},
nil,
)

exitOnError("Failed to receive messages", err)
Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azservicebus/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.19.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.11.0
github.com/Azure/azure-sdk-for-go/sdk/internal v0.7.1
github.com/Azure/go-amqp v0.16.0
github.com/Azure/go-amqp v0.16.2
github.com/Azure/go-autorest/autorest v0.11.18
github.com/Azure/go-autorest/autorest/adal v0.9.13
github.com/Azure/go-autorest/autorest/date v0.3.0
Expand Down
3 changes: 2 additions & 1 deletion sdk/messaging/azservicebus/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.11.0/go.mod h1:HcM1YX14R7CJc
github.com/Azure/azure-sdk-for-go/sdk/internal v0.7.0/go.mod h1:yqy467j36fJxcRV2TzfVZ1pCb5vxm4BtZPUdYWe/Xo8=
github.com/Azure/azure-sdk-for-go/sdk/internal v0.7.1 h1:8XSiy/LSvjtFwpguk7m6yGLgGkWocluo8hLM5vtcpcg=
github.com/Azure/azure-sdk-for-go/sdk/internal v0.7.1/go.mod h1:KLF4gFr6DcKFZwSuH8w8yEK6DpFl3LP5rhdvAb7Yz5I=
github.com/Azure/go-amqp v0.16.0 h1:6mhxUxaKLjMtHlGqzeih/LKqjUPLZxbM6zwfz5/C4NQ=
github.com/Azure/go-amqp v0.16.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
github.com/Azure/go-amqp v0.16.2 h1:3w03tFlEZ9sYTS1jgapvvJaxD2/6dXr52W2ElRaIriE=
github.com/Azure/go-amqp v0.16.2/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest/autorest v0.11.18 h1:90Y4srNYrwOtAgVo3ndrQkTYn6kf1Eg/AjTFJ8Is2aM=
Expand Down
30 changes: 7 additions & 23 deletions sdk/messaging/azservicebus/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,12 @@ func newReceiver(ns internal.NamespaceWithNewAMQPLinks, entity *entity, cleanupO

// ReceiveOptions are options for the ReceiveMessages function.
type ReceiveOptions struct {
// MaxWaitTime configures how long to wait for the first
// message in a set of messages to arrive.
// Default: 60 seconds
MaxWaitTime time.Duration

maxWaitTimeAfterFirstMessage time.Duration
}

// ReceiveMessages receives a fixed number of messages, up to numMessages.
// There are two timeouts involved in receiving messages:
// 1. An explicit timeout set with `ReceiveOptions.MaxWaitTime` (default: 60 seconds)
// There are two ways to stop receiving messages:
// 1. Cancelling the `ctx` parameter.
// 2. An implicit timeout (default: 1 second) that starts after the first
// message has been received.
func (r *Receiver) ReceiveMessages(ctx context.Context, maxMessages int, options *ReceiveOptions) ([]*ReceivedMessage, error) {
Expand All @@ -171,7 +166,7 @@ func (r *Receiver) ReceiveMessages(ctx context.Context, maxMessages int, options
r.mu.Unlock()

if isReceiving {
return nil, errors.New("receiver is already receiving messages. ReceiveMessages() cannot be called concurrently.")
return nil, errors.New("receiver is already receiving messages. ReceiveMessages() cannot be called concurrently")
}

return r.receiveMessagesImpl(ctx, maxMessages, options)
Expand All @@ -186,13 +181,10 @@ func (r *Receiver) receiveMessagesImpl(ctx context.Context, maxMessages int, opt
// link is still valid.
// Phase 3. <drain the link and leave it in a good state>
localOpts := &ReceiveOptions{
MaxWaitTime: time.Minute,
maxWaitTimeAfterFirstMessage: time.Second,
}

if options != nil {
localOpts.MaxWaitTime = options.MaxWaitTime

if options.maxWaitTimeAfterFirstMessage != 0 {
localOpts.maxWaitTimeAfterFirstMessage = options.maxWaitTimeAfterFirstMessage
}
Expand Down Expand Up @@ -274,10 +266,7 @@ func (r *Receiver) drainLink(receiver internal.AMQPReceiver, messages []*Receive

// getMessages receives messages until a link failure, timeout or the user
// cancels their context.
func (r *Receiver) getMessages(theirCtx context.Context, receiver internal.AMQPReceiver, maxMessages int, ropts *ReceiveOptions) ([]*ReceivedMessage, error) {
ctx, cancel := context.WithTimeout(theirCtx, ropts.MaxWaitTime)
defer cancel()

func (r *Receiver) getMessages(ctx context.Context, receiver internal.AMQPReceiver, maxMessages int, ropts *ReceiveOptions) ([]*ReceivedMessage, error) {
var messages []*ReceivedMessage

for {
Expand Down Expand Up @@ -305,14 +294,9 @@ func (r *Receiver) getMessages(theirCtx context.Context, receiver internal.AMQPR
}

if len(messages) == 1 {
go func() {
select {
case <-time.After(ropts.maxWaitTimeAfterFirstMessage):
cancel()
case <-ctx.Done():
break
}
}()
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Second)
defer cancel()
}
}
}
Expand Down
21 changes: 12 additions & 9 deletions sdk/messaging/azservicebus/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ func TestReceiverForceTimeoutWithTooFewMessages(t *testing.T) {
require.NoError(t, err)

// there's only one message, requesting more messages will time out.
messages, err := receiver.ReceiveMessages(context.Background(), 1+1, &ReceiveOptions{
MaxWaitTime: 10 * time.Second,
})
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
messages, err := receiver.ReceiveMessages(ctx, 1+1, nil)
require.NoError(t, err)

require.EqualValues(t,
Expand Down Expand Up @@ -127,11 +127,13 @@ func TestReceiveWithEarlyFirstMessageTimeout(t *testing.T) {
receiver, err := serviceBusClient.NewReceiverForQueue(queueName, nil)
require.NoError(t, err)

// this is never meant to be hit since the first message time is so short.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()

startTime := time.Now()
messages, err := receiver.ReceiveMessages(context.Background(), 1,
messages, err := receiver.ReceiveMessages(ctx, 1,
&ReceiveOptions{
// this is never meant to be hit since the first message time is so short.
MaxWaitTime: 10 * time.Minute,
maxWaitTimeAfterFirstMessage: time.Millisecond,
})

Expand Down Expand Up @@ -164,9 +166,10 @@ func TestReceiverSendAndReceiveManyTimes(t *testing.T) {
var allMessages []*ReceivedMessage

for i := 0; i < 100; i++ {
messages, err := receiver.ReceiveMessages(context.Background(), 1, &ReceiveOptions{
MaxWaitTime: 10 * time.Second,
})
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

messages, err := receiver.ReceiveMessages(ctx, 1, nil)
require.NoError(t, err)
allMessages = append(allMessages, messages...)

Expand Down

0 comments on commit ba76637

Please sign in to comment.