Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[azservicebus] Final API changes for beta.3 #16110

Merged
merged 27 commits into from
Nov 10, 2021
Merged
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
11fb208
- Use the new Prefetched() API in go-amqp's receiver, which ensures …
richardpark-msft Nov 7, 2021
f226bb7
Removing function that's not quite ready for prime time.
richardpark-msft Nov 7, 2021
0d71b5a
Cleaning up the message a little. It'll become more prescriptive once…
richardpark-msft Nov 7, 2021
ab0edde
Updating example to be simpler - really the important stuff is that t…
richardpark-msft Nov 7, 2021
8c35ba0
Updating migration guide to be accurate against the API surface, upda…
richardpark-msft Nov 7, 2021
c6fce94
moving admin client into a sub-package
richardpark-msft Nov 7, 2021
81cbf4b
- AdminClient moved into it's own sub-package.
richardpark-msft Nov 7, 2021
2b23de8
Move stress into the internal package
richardpark-msft Nov 7, 2021
9edc8ee
Fixing some linting failures, updating the package level comment for …
richardpark-msft Nov 7, 2021
2ca93bc
Adding in getting namespace properties to the admin.Client
richardpark-msft Nov 8, 2021
edc56f1
Used the wrong format for the forwarding entity
richardpark-msft Nov 8, 2021
b4e4392
Update copyright header
richardpark-msft Nov 8, 2021
aaa3517
This is going to be a beta release, so we'll bump the minor revision.
Nov 8, 2021
c5d0d01
Updating tests and options name
Nov 8, 2021
7fe92cc
- propertiesToModify is already a nullable value, no need to make it …
Nov 9, 2021
1af6e79
Troubleshooting flaky test
Nov 9, 2021
fe1e854
Unintended URL inconsistency
Nov 9, 2021
8bc310c
Fixing note to mention 'azservicebus.Client'
Nov 9, 2021
f3bc0f9
Changing 'an Client' to 'a Client'
Nov 9, 2021
02a1e90
NamespaceProperties.RawResult => NamespaceProperties.RawResponse
Nov 9, 2021
5e6f237
Remove unneeded .Sequence comment
Nov 9, 2021
798aecf
Alter our counting to always use maxPageSize as the increment.
Nov 10, 2021
1a33990
Fixing tests
Nov 10, 2021
af06fed
Adding some comments and explanation for a bit that Charles mentioned…
Nov 10, 2021
5ab8104
Remove dead comment about AMQPAnnotatedMessage
Nov 10, 2021
a981633
Updating changelog with a note that we've moved the admin client
Nov 10, 2021
52e7a04
Updating all pagers to match the logic in the .NET client, where if a…
Nov 10, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Adding some comments and explanation for a bit that Charles mentioned…
… was unclear/confusing.
Richard Park committed Nov 10, 2021
commit af06fed326706b63d99742ae017cdb676543c837
14 changes: 8 additions & 6 deletions sdk/messaging/azservicebus/receiver.go
Original file line number Diff line number Diff line change
@@ -368,20 +368,22 @@ func (r *Receiver) receiveMessagesImpl(ctx context.Context, maxMessages int, opt

// drainLink initiates a drainLink on the link. Service Bus will send whatever messages it might have still had and
// set our link credit to 0.
func (r *Receiver) drainLink(ctx context.Context, receiver internal.AMQPReceiver, messages []*ReceivedMessage) ([]*ReceivedMessage, error) {
// ctxForLoggingOnly is literally only used for when we need to extract context for logging. This function will always attempt
// to complete, ignoring cancellation, otherwise we can leave the link with messages that haven't been returned to the user.
func (r *Receiver) drainLink(ctxForLoggingOnly context.Context, receiver internal.AMQPReceiver, messages []*ReceivedMessage) ([]*ReceivedMessage, error) {
// start the drain asynchronously. Note that we ignore the user's context at this point
// since draining makes sure we don't get messages when nobody is receiving.
if err := receiver.DrainCredit(context.Background()); err != nil {
tab.For(ctx).Debug(fmt.Sprintf("Draining of credit failed. link will be closed and will re-open on next receive: %s", err.Error()))
tab.For(ctxForLoggingOnly).Debug(fmt.Sprintf("Draining of credit failed. link will be closed and will re-open on next receive: %s", err.Error()))

// if the drain fails we just close the link so it'll re-open at the next receive.
if err := r.amqpLinks.Close(context.Background(), false); err != nil {
tab.For(ctx).Debug(fmt.Sprintf("Failed to close links on ReceiveMessages cleanup. Not fatal: %s", err.Error()))
tab.For(ctxForLoggingOnly).Debug(fmt.Sprintf("Failed to close links on ReceiveMessages cleanup. Not fatal: %s", err.Error()))
}
}

// Receive until the drain completes, at which point it'll cancel
// our context.
// Draining data from the receiver's prefetched queue. This won't wait for new messages to
// arrive, so it'll only receive messages that arrived prior to the drain.
for {
am, err := receiver.Prefetched(context.Background())
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved

@@ -400,7 +402,7 @@ func (r *Receiver) drainLink(ctx context.Context, receiver internal.AMQPReceiver
}
}

messages = append(messages, newReceivedMessage(ctx, am))
messages = append(messages, newReceivedMessage(ctxForLoggingOnly, am))
}

return messages, nil