Skip to content

Commit

Permalink
[azservicebus] Remove pager interfaces, use pager structs, some bug f…
Browse files Browse the repository at this point in the history
…ixes (#16141)

- Fixed a bug when _not_ specifying a page size where you would infinitely get results

Addressing some feedback jhendrixmsft had:
- The Azure SDK guidelines were a bit outdated - we're not using interfaces for our returned Pagers. Removing, replacing them with their underlying structs
- Found a few spots where I was supposed to be passing in options in the admin.Client, those have been added in.
- Changed message batch so that it uses uint64 in any spot that references max message size. This doesn't quite match up yet with go-amqp, but it will in the future and it'll be fine in the interim.
  • Loading branch information
richardpark-msft authored Nov 12, 2021
1 parent def4692 commit 9587539
Show file tree
Hide file tree
Showing 10 changed files with 315 additions and 357 deletions.
10 changes: 8 additions & 2 deletions sdk/messaging/azservicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
# Release History

## 0.3.0 (Unreleased)
## 0.3.0 (2021-11-12)

### Features Added

- AbandonMessage and DeferMessage now take an additional `PropertiesToModify` option, allowing
the message properties to be modified when they are settled.
- Missing fields for entities in the admin.Client have been added (UserMetadata, etc..)

### Breaking Changes

- AdminClient has been moved into the `admin` subpackage.
- ReceivedMessage.Body is now a function that returns a ([]byte, error), rather than being a field.
This protects against a potential data-loss scenario where a message is received with a payload
encoded in the sequence or value sections of an AMQP message, which cannot be prpoerly represented
Expand All @@ -18,7 +20,11 @@
- MessageBatch.Add() has been renamed to MessageBatch.AddMessage(). AddMessage() now returns only an `error`,
with a sentinel error (ErrMessageTooLarge) signaling that the batch cannot fit a new message.
- Sender.SendMessages() has been removed in favor of simplifications made in MessageBatch.
- AdminClient has been moved into the `admin` subpackage.

### Bugs Fixed

- ReceiveMessages has been tuned to match the .NET limits (which has worked well in practice). This partly addresses #15963,
as our default limit was far higher than needed.

## 0.2.0 (2021-11-02)

Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azservicebus/admin/admin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (ac *Client) newPagerFunc(baseFragment string, maxPageSize int32, lenV func
eof = true
}

skip += maxPageSize
skip += int32(lenV(pv))
return resp, nil
}
}
96 changes: 39 additions & 57 deletions sdk/messaging/azservicebus/admin/admin_client_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,12 @@ type GetQueueResponse struct {
RawResponse *http.Response
}

type GetQueueOptions struct {
// For future expansion
}

// GetQueue gets a queue by name.
func (ac *Client) GetQueue(ctx context.Context, queueName string) (*GetQueueResponse, error) {
func (ac *Client) GetQueue(ctx context.Context, queueName string, options *GetQueueOptions) (*GetQueueResponse, error) {
var atomResp *atom.QueueEnvelope
resp, err := ac.em.Get(ctx, "/"+queueName, &atomResp)

Expand Down Expand Up @@ -215,8 +219,12 @@ type GetQueueRuntimePropertiesResponse struct {
RawResponse *http.Response
}

type GetQueueRuntimePropertiesOptions struct {
// For future expansion
}

// GetQueueRuntimeProperties gets runtime properties of a queue, like the SizeInBytes, or ActiveMessageCount.
func (ac *Client) GetQueueRuntimeProperties(ctx context.Context, queueName string) (*GetQueueRuntimePropertiesResponse, error) {
func (ac *Client) GetQueueRuntimeProperties(ctx context.Context, queueName string, options *GetQueueRuntimePropertiesOptions) (*GetQueueRuntimePropertiesResponse, error) {
var atomResp *atom.QueueEnvelope
resp, err := ac.em.Get(ctx, "/"+queueName, &atomResp)

Expand Down Expand Up @@ -259,19 +267,6 @@ type ListQueuesOptions struct {
MaxPageSize int32
}

// QueueItemPager provides iteration over ListQueueProperties pages.
type QueueItemPager interface {
// NextPage returns true if the pager advanced to the next page.
// Returns false if there are no more pages or an error occurred.
NextPage(context.Context) bool

// PageResponse returns the current QueueProperties.
PageResponse() *ListQueuesResponse

// Err returns the last error encountered while paging.
Err() error
}

type ListQueuesResult struct {
Items []*QueueItem
}
Expand All @@ -287,23 +282,18 @@ type QueueItem struct {
}

// ListQueues lists queues.
func (ac *Client) ListQueues(options *ListQueuesOptions) QueueItemPager {
func (ac *Client) ListQueues(options *ListQueuesOptions) *QueuePager {
var pageSize int32

if options != nil {
pageSize = options.MaxPageSize
}

return &queuePropertiesPager{
return &QueuePager{
innerPager: ac.newPagerFunc("/$Resources/Queues", pageSize, queueFeedLen),
}
}

func queueFeedLen(v interface{}) int {
feed := v.(**atom.QueueFeed)
return len((*feed).Entries)
}

// ListQueuesRuntimePropertiesOptions can be used to configure the ListQueuesRuntimeProperties method.
type ListQueuesRuntimePropertiesOptions struct {
// MaxPageSize is the maximum size of each page of results.
Expand All @@ -320,28 +310,15 @@ type QueueRuntimePropertiesItem struct {
QueueRuntimeProperties
}

// QueueRuntimePropertiesPager provides iteration over ListQueueRuntimeProperties pages.
type QueueRuntimePropertiesPager interface {
// NextPage returns true if the pager advanced to the next page.
// Returns false if there are no more pages or an error occurred.
NextPage(context.Context) bool

// PageResponse returns the current QueueRuntimeProperties.
PageResponse() *ListQueuesRuntimePropertiesResponse

// Err returns the last error encountered while paging.
Err() error
}

// ListQueuesRuntimeProperties lists runtime properties for queues.
func (ac *Client) ListQueuesRuntimeProperties(options *ListQueuesRuntimePropertiesOptions) QueueRuntimePropertiesPager {
func (ac *Client) ListQueuesRuntimeProperties(options *ListQueuesRuntimePropertiesOptions) *QueueRuntimePropertiesPager {
var pageSize int32

if options != nil {
pageSize = options.MaxPageSize
}

return &queueRuntimePropertiesPager{
return &QueueRuntimePropertiesPager{
innerPager: ac.newPagerFunc("/$Resources/Queues", pageSize, queueFeedLen),
}
}
Expand Down Expand Up @@ -379,8 +356,8 @@ func (ac *Client) createOrUpdateQueueImpl(ctx context.Context, queueName string,
return newProps, resp, nil
}

// queuePropertiesPager provides iteration over QueueProperties pages.
type queuePropertiesPager struct {
// QueuePager provides iteration over ListQueues pages.
type QueuePager struct {
innerPager pagerFunc

lastErr error
Expand All @@ -389,22 +366,22 @@ type queuePropertiesPager struct {

// NextPage returns true if the pager advanced to the next page.
// Returns false if there are no more pages or an error occurred.
func (p *queuePropertiesPager) NextPage(ctx context.Context) bool {
func (p *QueuePager) NextPage(ctx context.Context) bool {
p.lastResponse, p.lastErr = p.getNextPage(ctx)
return p.lastResponse != nil
}

// PageResponse returns the current page.
func (p *queuePropertiesPager) PageResponse() *ListQueuesResponse {
func (p *QueuePager) PageResponse() *ListQueuesResponse {
return p.lastResponse
}

// Err returns the last error encountered while paging.
func (p *queuePropertiesPager) Err() error {
func (p *QueuePager) Err() error {
return p.lastErr
}

func (p *queuePropertiesPager) getNextPage(ctx context.Context) (*ListQueuesResponse, error) {
func (p *QueuePager) getNextPage(ctx context.Context) (*ListQueuesResponse, error) {
var feed *atom.QueueFeed
resp, err := p.innerPager(ctx, &feed)

Expand Down Expand Up @@ -436,21 +413,31 @@ func (p *queuePropertiesPager) getNextPage(ctx context.Context) (*ListQueuesResp
}, nil
}

// queueRuntimePropertiesPager provides iteration over QueueRuntimeProperties pages.
type queueRuntimePropertiesPager struct {
// QueueRuntimePropertiesPager provides iteration over ListQueueRuntimeProperties pages.
type QueueRuntimePropertiesPager struct {
innerPager pagerFunc
lastErr error
lastResponse *ListQueuesRuntimePropertiesResponse
}

// NextPage returns true if the pager advanced to the next page.
// Returns false if there are no more pages or an error occurred.
func (p *queueRuntimePropertiesPager) NextPage(ctx context.Context) bool {
func (p *QueueRuntimePropertiesPager) NextPage(ctx context.Context) bool {
p.lastResponse, p.lastErr = p.getNextPage(ctx)
return p.lastResponse != nil
}

func (p *queueRuntimePropertiesPager) getNextPage(ctx context.Context) (*ListQueuesRuntimePropertiesResponse, error) {
// PageResponse returns the current page.
func (p *QueueRuntimePropertiesPager) PageResponse() *ListQueuesRuntimePropertiesResponse {
return p.lastResponse
}

// Err returns the last error encountered while paging.
func (p *QueueRuntimePropertiesPager) Err() error {
return p.lastErr
}

func (p *QueueRuntimePropertiesPager) getNextPage(ctx context.Context) (*ListQueuesRuntimePropertiesResponse, error) {
var feed *atom.QueueFeed
resp, err := p.innerPager(ctx, &feed)

Expand All @@ -473,16 +460,6 @@ func (p *queueRuntimePropertiesPager) getNextPage(ctx context.Context) (*ListQue
}, nil
}

// PageResponse returns the current page.
func (p *queueRuntimePropertiesPager) PageResponse() *ListQueuesRuntimePropertiesResponse {
return p.lastResponse
}

// Err returns the last error encountered while paging.
func (p *queueRuntimePropertiesPager) Err() error {
return p.lastErr
}

func newQueueEnvelope(props *QueueProperties, tokenProvider auth.TokenProvider) (*atom.QueueEnvelope, []atom.MiddlewareFunc) {
qpr := &atom.QueueDescription{
LockDuration: utils.DurationToStringPtr(props.LockDuration),
Expand Down Expand Up @@ -589,3 +566,8 @@ func int64OrZero(i *int64) int64 {

return *i
}

func queueFeedLen(v interface{}) int {
feed := v.(**atom.QueueFeed)
return len((*feed).Entries)
}
Loading

0 comments on commit 9587539

Please sign in to comment.