diff --git a/sdk/messaging/azeventhubs/consumer_client.go b/sdk/messaging/azeventhubs/consumer_client.go index 757b7bb5b163..944ba250a746 100644 --- a/sdk/messaging/azeventhubs/consumer_client.go +++ b/sdk/messaging/azeventhubs/consumer_client.go @@ -5,24 +5,15 @@ package azeventhubs import ( "context" "crypto/tls" - "errors" "fmt" "net" - "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore" - "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" - "github.com/Azure/azure-sdk-for-go/sdk/internal/log" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap" - "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp" ) -// DefaultConsumerGroup is the name of the default consumer group in the Event Hubs service. -const DefaultConsumerGroup = "$Default" - -// ConsumerClientOptions contains options for the `NewConsumerClient` and `NewConsumerClientFromConnectionString` -// functions. +// ConsumerClientOptions configures optional parameters for a ConsumerClient. type ConsumerClientOptions struct { // TLSConfig configures a client with a custom *tls.Config. TLSConfig *tls.Config @@ -37,77 +28,31 @@ type ConsumerClientOptions struct { // RetryOptions controls how often operations are retried from this client and any // Receivers and Senders created from this client. RetryOptions RetryOptions - - // StartPosition is the position we will start receiving events from, - // either an offset (inclusive) with Offset, or receiving events received - // after a specific time using EnqueuedTime. - StartPosition StartPosition - - // OwnerLevel is the priority for this consumer, also known as the 'epoch' level. - // When used, a consumer with a higher OwnerLevel will take ownership of a partition - // from consumers with a lower OwnerLevel. - // Default is off. - OwnerLevel *int64 -} - -// StartPosition indicates the position to start receiving events within a partition. -// The default position is Latest. -type StartPosition struct { - // Offset will start the consumer after the specified offset. Can be exclusive - // or inclusive, based on the Inclusive property. - // NOTE: offsets are not stable values, and might refer to different events over time - // as the Event Hub events reach their age limit and are discarded. - Offset *int64 - - // SequenceNumber will start the consumer after the specified sequence number. Can be exclusive - // or inclusive, based on the Inclusive property. - SequenceNumber *int64 - - // EnqueuedTime will start the consumer before events that were enqueued on or after EnqueuedTime. - // Can be exclusive or inclusive, based on the Inclusive property. - EnqueuedTime *time.Time - - // Inclusive configures whether the events directly at Offset, SequenceNumber or EnqueuedTime will be included (true) - // or excluded (false). - Inclusive bool - - // Earliest will start the consumer at the earliest event. - Earliest *bool - - // Latest will start the consumer after the last event. - Latest *bool } -// ConsumerClient is used to receive events from an Event Hub partition. +// ConsumerClient can create PartitionClient instances, which can read events from +// a partition. type ConsumerClient struct { + consumerGroup string + eventHub string retryOptions RetryOptions namespace *internal.Namespace - eventHub string - consumerGroup string - partitionID string - ownerLevel *int64 - - offsetExpression string - - links *internal.Links[amqpwrap.AMQPReceiverCloser] + links *internal.Links[amqpwrap.AMQPReceiverCloser] } // NewConsumerClient creates a ConsumerClient which uses an azcore.TokenCredential for authentication. -// The consumerGroup is the consumer group for this consumer. // The fullyQualifiedNamespace is the Event Hubs namespace name (ex: myeventhub.servicebus.windows.net) // The credential is one of the credentials in the `github.com/Azure/azure-sdk-for-go/sdk/azidentity` package. -func NewConsumerClient(fullyQualifiedNamespace string, eventHub string, partitionID string, consumerGroup string, credential azcore.TokenCredential, options *ConsumerClientOptions) (*ConsumerClient, error) { - return newConsumerClientImpl(consumerClientArgs{ +func NewConsumerClient(fullyQualifiedNamespace string, eventHub string, consumerGroup string, credential azcore.TokenCredential, options *ConsumerClientOptions) (*ConsumerClient, error) { + return newConsumerClient(consumerClientArgs{ + consumerGroup: consumerGroup, fullyQualifiedNamespace: fullyQualifiedNamespace, - credential: credential, eventHub: eventHub, - partitionID: partitionID, - consumerGroup: consumerGroup, + credential: credential, }, options) } // NewConsumerClientFromConnectionString creates a ConsumerClient from a connection string. -// The consumerGroup is the consumer group for this consumer. // // connectionString can be one of the following formats: // @@ -116,75 +61,43 @@ func NewConsumerClient(fullyQualifiedNamespace string, eventHub string, partitio // // Connection string, has EntityPath. In this case eventHub must be empty. // ex: Endpoint=sb://.servicebus.windows.net/;SharedAccessKeyName=;SharedAccessKey=;EntityPath= -func NewConsumerClientFromConnectionString(connectionString string, eventHub string, partitionID string, consumerGroup string, options *ConsumerClientOptions) (*ConsumerClient, error) { +func NewConsumerClientFromConnectionString(connectionString string, eventHub string, consumerGroup string, options *ConsumerClientOptions) (*ConsumerClient, error) { parsedConn, err := parseConn(connectionString, eventHub) if err != nil { return nil, err } - return newConsumerClientImpl(consumerClientArgs{ + return newConsumerClient(consumerClientArgs{ + consumerGroup: consumerGroup, connectionString: connectionString, eventHub: parsedConn.HubName, - partitionID: partitionID, - consumerGroup: consumerGroup, }, options) } -// ReceiveEventsOptions contains optional parameters for the ReceiveEvents function -type ReceiveEventsOptions struct { - // For future expansion -} - -// ReceiveEvents receives events until the context has expired or been cancelled. -func (cc *ConsumerClient) ReceiveEvents(ctx context.Context, count int, options *ReceiveEventsOptions) ([]*ReceivedEventData, error) { - var events []*ReceivedEventData - - err := cc.links.Retry(ctx, EventConsumer, "ReceiveEvents", cc.partitionID, cc.retryOptions, func(ctx context.Context, lwid internal.LinkWithID[amqpwrap.AMQPReceiverCloser]) error { - events = nil - - outstandingCredits := lwid.Link.Credits() - - if count > int(outstandingCredits) { - newCredits := uint32(count) - outstandingCredits - - log.Writef(EventConsumer, "Have %d outstanding credit, only issuing %d credits", outstandingCredits, newCredits) - - if err := lwid.Link.IssueCredit(newCredits); err != nil { - return err - } - } - - for { - amqpMessage, err := lwid.Link.Receive(ctx) - - if err != nil { - prefetched := getAllPrefetched(lwid.Link, count-len(events)) - - for _, amqpMsg := range prefetched { - events = append(events, newReceivedEventData(amqpMsg)) - } - - // this lets cancel errors just return - return err - } - - receivedEvent := newReceivedEventData(amqpMessage) - events = append(events, receivedEvent) - - if len(events) == count { - return nil - } - } - }) +// NewPartitionClientOptions provides options for the Subscribe function. +type NewPartitionClientOptions struct { + // StartPosition is the position we will start receiving events from, + // either an offset (inclusive) with Offset, or receiving events received + // after a specific time using EnqueuedTime. + StartPosition StartPosition - if err != nil && len(events) == 0 { - // TODO: if we get a "partition ownership lost" we need to think about whether that's retryable. - return nil, internal.TransformError(err) - } + // OwnerLevel is the priority for this partition client, also known as the 'epoch' level. + // When used, a partition client with a higher OwnerLevel will take ownership of a partition + // from partition clients with a lower OwnerLevel. + // Default is off. + OwnerLevel *int64 +} - cc.offsetExpression = formatOffsetExpressionForSequence(">", events[len(events)-1].SequenceNumber) - return events, nil +// NewPartitionClient creates a client that can receive events from a partition. +func (cc *ConsumerClient) NewPartitionClient(partitionID string, options *NewPartitionClientOptions) (*PartitionClient, error) { + return newPartitionClient(partitionClientArgs{ + namespace: cc.namespace, + eventHub: cc.eventHub, + partitionID: partitionID, + consumerGroup: cc.consumerGroup, + retryOptions: cc.retryOptions, + }, options) } // GetEventHubProperties gets event hub properties, like the available partition IDs and when the Event Hub was created. @@ -210,108 +123,11 @@ func (cc *ConsumerClient) GetPartitionProperties(ctx context.Context, partitionI return getPartitionProperties(ctx, cc.namespace, rpcLink.Link, cc.eventHub, partitionID, options) } -// Close closes the consumer's link and the underlying AMQP connection. +// Close closes the connection for this client. func (cc *ConsumerClient) Close(ctx context.Context) error { - if err := cc.links.Close(ctx); err != nil { - log.Writef(EventConsumer, "Failed to close link (error might be cached): %s", err.Error()) - } return cc.namespace.Close(ctx, true) } -func getOffsetExpression(startPosition StartPosition) (string, error) { - lt := ">" - - if startPosition.Inclusive { - lt = ">=" - } - - var errMultipleFieldsSet = errors.New("only a single start point can be set: Earliest, EnqueuedTime, Latest, Offset, or SequenceNumber") - - offsetExpr := "" - - if startPosition.EnqueuedTime != nil { - // time-based, non-inclusive - offsetExpr = fmt.Sprintf("amqp.annotation.x-opt-enqueued-time %s '%d'", lt, startPosition.EnqueuedTime.UnixMilli()) - } - - if startPosition.Offset != nil { - // offset-based, non-inclusive - // ex: amqp.annotation.x-opt-enqueued-time %s '165805323000' - if offsetExpr != "" { - return "", errMultipleFieldsSet - } - - offsetExpr = fmt.Sprintf("amqp.annotation.x-opt-offset %s '%d'", lt, *startPosition.Offset) - } - - if startPosition.Latest != nil && *startPosition.Latest { - if offsetExpr != "" { - return "", errMultipleFieldsSet - } - - offsetExpr = "amqp.annotation.x-opt-offset > '@latest'" - } - - if startPosition.SequenceNumber != nil { - if offsetExpr != "" { - return "", errMultipleFieldsSet - } - - offsetExpr = formatOffsetExpressionForSequence(lt, *startPosition.SequenceNumber) - } - - if startPosition.Earliest != nil && *startPosition.Earliest { - if offsetExpr != "" { - return "", errMultipleFieldsSet - } - - return "amqp.annotation.x-opt-offset > '-1'", nil - } - - if offsetExpr != "" { - return offsetExpr, nil - } - - // default to the start - return "amqp.annotation.x-opt-offset > '@latest'", nil -} - -func formatOffsetExpressionForSequence(op string, sequenceNumber int64) string { - return fmt.Sprintf("amqp.annotation.x-opt-sequence-number %s '%d'", op, sequenceNumber) -} - -func (cc *ConsumerClient) getEntityPath(partitionID string) string { - return fmt.Sprintf("%s/ConsumerGroups/%s/Partitions/%s", cc.eventHub, cc.consumerGroup, partitionID) -} - -const defaultLinkRxBuffer = 2048 - -func (cc *ConsumerClient) newEventHubConsumerLink(ctx context.Context, session amqpwrap.AMQPSession, entityPath string) (internal.AMQPReceiverCloser, error) { - var receiverProps map[string]interface{} - - if cc.ownerLevel != nil { - receiverProps = map[string]interface{}{ - "com.microsoft:epoch": *cc.ownerLevel, - } - } - - receiver, err := session.NewReceiver(ctx, entityPath, &amqp.ReceiverOptions{ - SettlementMode: to.Ptr(amqp.ModeFirst), - ManualCredits: true, - Credit: defaultLinkRxBuffer, - Filters: []amqp.LinkFilter{ - amqp.LinkFilterSelector(cc.offsetExpression), - }, - Properties: receiverProps, - }) - - if err != nil { - return nil, err - } - - return receiver, nil -} - type consumerClientArgs struct { connectionString string @@ -319,29 +135,18 @@ type consumerClientArgs struct { fullyQualifiedNamespace string credential azcore.TokenCredential - eventHub string - partitionID string - consumerGroup string + eventHub string } -func newConsumerClientImpl(args consumerClientArgs, options *ConsumerClientOptions) (*ConsumerClient, error) { +func newConsumerClient(args consumerClientArgs, options *ConsumerClientOptions) (*ConsumerClient, error) { if options == nil { options = &ConsumerClientOptions{} } - offsetExpr, err := getOffsetExpression(options.StartPosition) - - if err != nil { - return nil, err - } - client := &ConsumerClient{ - eventHub: args.eventHub, - partitionID: args.partitionID, - ownerLevel: options.OwnerLevel, - consumerGroup: args.consumerGroup, - offsetExpression: offsetExpr, + consumerGroup: args.consumerGroup, + eventHub: args.eventHub, } var nsOptions []internal.NamespaceOption @@ -379,23 +184,7 @@ func newConsumerClientImpl(args consumerClientArgs, options *ConsumerClientOptio } client.namespace = tempNS - client.links = internal.NewLinks(tempNS, fmt.Sprintf("%s/$management", client.eventHub), client.getEntityPath, client.newEventHubConsumerLink) + client.links = internal.NewLinks[amqpwrap.AMQPReceiverCloser](tempNS, fmt.Sprintf("%s/$management", client.eventHub), nil, nil) return client, nil } - -func getAllPrefetched(receiver amqpwrap.AMQPReceiver, max int) []*amqp.Message { - var messages []*amqp.Message - - for i := 0; i < max; i++ { - msg := receiver.Prefetched() - - if msg == nil { - break - } - - messages = append(messages, msg) - } - - return messages -} diff --git a/sdk/messaging/azeventhubs/consumer_client_test.go b/sdk/messaging/azeventhubs/consumer_client_test.go index 691107fd93e2..c7834743aa71 100644 --- a/sdk/messaging/azeventhubs/consumer_client_test.go +++ b/sdk/messaging/azeventhubs/consumer_client_test.go @@ -21,7 +21,7 @@ func TestConsumerClient_DefaultAzureCredential(t *testing.T) { require.NoError(t, err) t.Run("EventHubProperties and PartitionProperties", func(t *testing.T) { - consumerClient, err := azeventhubs.NewConsumerClient(testParams.EventHubNamespace, testParams.EventHubName, "0", azeventhubs.DefaultConsumerGroup, dac, nil) + consumerClient, err := azeventhubs.NewConsumerClient(testParams.EventHubNamespace, testParams.EventHubName, azeventhubs.DefaultConsumerGroup, dac, nil) require.NoError(t, err) defer func() { @@ -66,10 +66,7 @@ func TestConsumerClient_DefaultAzureCredential(t *testing.T) { firstPartition, err := producerClient.GetPartitionProperties(context.Background(), "0", nil) require.NoError(t, err) - consumerClient, err := azeventhubs.NewConsumerClient(testParams.EventHubNamespace, testParams.EventHubName, firstPartition.PartitionID, azeventhubs.DefaultConsumerGroup, dac, - &azeventhubs.ConsumerClientOptions{ - StartPosition: getStartPosition(firstPartition), - }) + consumerClient, err := azeventhubs.NewConsumerClient(testParams.EventHubNamespace, testParams.EventHubName, azeventhubs.DefaultConsumerGroup, dac, nil) require.NoError(t, err) defer func() { @@ -90,12 +87,23 @@ func TestConsumerClient_DefaultAzureCredential(t *testing.T) { err = producerClient.SendEventBatch(context.Background(), eventDataBatch, nil) require.NoError(t, err) + subscription, err := consumerClient.NewPartitionClient(firstPartition.PartitionID, &azeventhubs.NewPartitionClientOptions{ + StartPosition: getStartPosition(firstPartition), + }) + require.NoError(t, err) + require.NotNil(t, subscription) + + defer func() { + err := subscription.Close(context.Background()) + require.NoError(t, err) + }() + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - events, err := consumerClient.ReceiveEvents(ctx, 1, nil) + events, err := subscription.ReceiveEvents(ctx, 1, nil) require.NoError(t, err) - require.NotEmpty(t, events) + require.Equal(t, "hello", string(events[0].Body)) consumerPart, err := consumerClient.GetPartitionProperties(context.Background(), firstPartition.PartitionID, nil) @@ -108,7 +116,7 @@ func TestConsumerClient_DefaultAzureCredential(t *testing.T) { }) t.Run("EventHubProperties and PartitionProperties after send", func(t *testing.T) { - consumerClient, err := azeventhubs.NewConsumerClient(testParams.EventHubNamespace, testParams.EventHubName, "0", azeventhubs.DefaultConsumerGroup, dac, nil) + consumerClient, err := azeventhubs.NewConsumerClient(testParams.EventHubNamespace, testParams.EventHubName, azeventhubs.DefaultConsumerGroup, dac, nil) require.NoError(t, err) defer func() { @@ -140,13 +148,12 @@ func TestConsumerClient_DefaultAzureCredential(t *testing.T) { require.Equal(t, producerPartProps, consumerPartProps) }) - } func TestConsumerClient_GetHubAndPartitionProperties(t *testing.T) { testParams := getConnectionParams(t) - consumer, err := azeventhubs.NewConsumerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, "0", azeventhubs.DefaultConsumerGroup, nil) + consumer, err := azeventhubs.NewConsumerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, azeventhubs.DefaultConsumerGroup, nil) require.NoError(t, err) defer func() { @@ -169,16 +176,14 @@ func TestConsumerClient_GetHubAndPartitionProperties(t *testing.T) { func TestConsumerClient_Concurrent_NoEpoch(t *testing.T) { testParams := getConnectionParams(t) - partitions := mustSendEventsToAllPartitions(t, testParams.ConnectionString, testParams.EventHubName, []*azeventhubs.EventData{ + partitions := mustSendEventsToAllPartitions(t, []*azeventhubs.EventData{ {Body: []byte("hello world")}, }) const simultaneousClients = 5 // max you can have with a single consumer group for a single partition for i := 0; i < simultaneousClients; i++ { - client, err := azeventhubs.NewConsumerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, partitions[0].PartitionID, "$Default", &azeventhubs.ConsumerClientOptions{ - StartPosition: getStartPosition(partitions[0]), - }) + client, err := azeventhubs.NewConsumerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, "$Default", nil) require.NoError(t, err) // We want all the clients open while this for loop is going. @@ -187,117 +192,124 @@ func TestConsumerClient_Concurrent_NoEpoch(t *testing.T) { require.NoError(t, err) }() + partitionClient, err := client.NewPartitionClient(partitions[0].PartitionID, &azeventhubs.NewPartitionClientOptions{ + StartPosition: getStartPosition(partitions[0]), + }) + require.NoError(t, err) + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - events, err := client.ReceiveEvents(ctx, 1, nil) + events, err := partitionClient.ReceiveEvents(ctx, 1, nil) require.NoError(t, err) require.Equal(t, 1, len(events)) } } -func TestConsumerClient_SameEpoch(t *testing.T) { - testParams := getConnectionParams(t) - - highestEpoch := int64(2) - - partitions := mustSendEventsToAllPartitions(t, testParams.ConnectionString, testParams.EventHubName, []*azeventhubs.EventData{ +func TestConsumerClient_SameEpoch_StealsLink(t *testing.T) { + partitions := mustSendEventsToAllPartitions(t, []*azeventhubs.EventData{ {Body: []byte("hello world 1")}, - {Body: []byte("hello world 2")}, }) - clientA, err := azeventhubs.NewConsumerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, partitions[0].PartitionID, "$Default", &azeventhubs.ConsumerClientOptions{ + ownerLevel := int64(2) + + origPartClient, cleanup := newPartitionClientForTest(t, partitions[0].PartitionID, azeventhubs.NewPartitionClientOptions{ StartPosition: getStartPosition(partitions[0]), - OwnerLevel: &highestEpoch, - // Today we treat the link stolen error as retryable. I've filed an issue to look at making this fatal - // instead since it's likely to be a configuration/runtime issue where the user has two consumers - // starting up with the same ownerlevel. Having them fight with retries is probably undesirable. - RetryOptions: azeventhubs.RetryOptions{ - MaxRetries: -1, - }, + OwnerLevel: &ownerLevel, }) - require.NoError(t, err) - - defer func() { - err := clientA.Close(context.Background()) - require.NoError(t, err) - }() + defer cleanup() ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - events, err := clientA.ReceiveEvents(ctx, 1, nil) + // open up a link, with an owner level of 2 + events, err := origPartClient.ReceiveEvents(ctx, 1, nil) require.NoError(t, err) - require.Equal(t, 1, len(events)) - - // we have an active link, using epoch/owner level 1 - - /* - It might be worth making this a terminal error since it's typically not something you'd consider normal - and probably indicates a configuration error. + require.NotEmpty(t, events) - (connlost): link detached, reason: *Error{Condition: amqp:link:stolen, Description: New receiver 'nil' with higher epoch of '1' is created hence current receiver 'nil' with epoch '1' is getting disconnected. If you are recreating the receiver, make sure a higher epoch is used. - */ + // link with owner level of 2 is alive, so now we'll steal it. - // now we'll take over with another client. - clientB, err := azeventhubs.NewConsumerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, partitions[0].PartitionID, "$Default", &azeventhubs.ConsumerClientOptions{ + thiefPartClient, cleanup := newPartitionClientForTest(t, partitions[0].PartitionID, azeventhubs.NewPartitionClientOptions{ StartPosition: getStartPosition(partitions[0]), - OwnerLevel: &highestEpoch, + OwnerLevel: &ownerLevel, }) - require.NoError(t, err) - - defer func() { - err := clientB.Close(context.Background()) - require.NoError(t, err) - }() + defer cleanup() ctx, cancel = context.WithTimeout(context.Background(), time.Minute) defer cancel() - // no need to timeout here, we know there's one event. - events, err = clientB.ReceiveEvents(ctx, 1, nil) + events, err = thiefPartClient.ReceiveEvents(ctx, 1, nil) require.NoError(t, err) require.NotEmpty(t, events) - // now if we attempt to use the first client we'll get an error that - // we've lost ownership. + // the link has been stolen at this point - 'stealerPartClient' owns the link since it's last-in-wins. + + // using the original link reports that it was stolen ctx, cancel = context.WithTimeout(context.Background(), time.Minute) defer cancel() - events, err = clientA.ReceiveEvents(ctx, 1, nil) + events, err = origPartClient.ReceiveEvents(ctx, 1, nil) require.Error(t, err) require.Contains(t, err.Error(), "amqp:link:stolen") require.Empty(t, events) +} - // now let's try to spin up another receiver, but with a _lower_ owner level. - lowerEpochClient, err := azeventhubs.NewConsumerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, partitions[0].PartitionID, "$Default", &azeventhubs.ConsumerClientOptions{ +func TestConsumerClient_LowerEpochsAreRejected(t *testing.T) { + partitions := mustSendEventsToAllPartitions(t, []*azeventhubs.EventData{ + {Body: []byte("hello world 1")}, + {Body: []byte("hello world 2")}, + }) + + highestOwnerLevel := int64(2) + + origPartClient, cleanup := newPartitionClientForTest(t, partitions[0].PartitionID, azeventhubs.NewPartitionClientOptions{ StartPosition: getStartPosition(partitions[0]), - OwnerLevel: to.Ptr(highestEpoch - 1), // lower owner level than the max - automatically loses. - // Today we treat the link stolen error as retryable. I've filed an issue to look at making this fatal - // instead since it's likely to be a configuration/runtime issue where the user has two consumers - // starting up with the same ownerlevel. Having them fight with retries is probably undesirable. - RetryOptions: azeventhubs.RetryOptions{ - MaxRetries: -1, - }, + OwnerLevel: &highestOwnerLevel, }) + defer cleanup() + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + events, err := origPartClient.ReceiveEvents(ctx, 1, nil) require.NoError(t, err) + require.NotEmpty(t, events) - defer func() { - err := lowerEpochClient.Close(context.Background()) - require.NoError(t, err) - }() + lowerOwnerLevels := []*int64{ + nil, // no owner level + to.Ptr(highestOwnerLevel - 1), + } - ctx, cancel = context.WithTimeout(context.Background(), time.Minute) - defer cancel() + for _, ownerLevel := range lowerOwnerLevels { + origPartClient, cleanup := newPartitionClientForTest(t, partitions[0].PartitionID, azeventhubs.NewPartitionClientOptions{ + StartPosition: getStartPosition(partitions[0]), + OwnerLevel: ownerLevel, + }) + defer cleanup() - _, err = lowerEpochClient.ReceiveEvents(ctx, 1, nil) - require.Error(t, err) - require.Contains(t, err.Error(), "amqp:link:stolen") + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() - // and lastly, one without an owner level at all. - noEpochClient, err := azeventhubs.NewConsumerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, partitions[0].PartitionID, "$Default", &azeventhubs.ConsumerClientOptions{ - StartPosition: getStartPosition(partitions[0]), + events, err := origPartClient.ReceiveEvents(ctx, 1, nil) + require.Error(t, err) + // The typical error message is like this: + // At least one receiver for the endpoint is created with epoch of '2', and so non-epoch receiver is not allowed. + // Either reconnect with a higher epoch, or make sure all epoch receivers are closed or disconnected. + require.Contains(t, err.Error(), "amqp:link:stolen") + require.Empty(t, events) + } + + // and the original client is unaffected + events, err = origPartClient.ReceiveEvents(ctx, 1, nil) + require.NoError(t, err) + require.NotEmpty(t, events) +} + +func newPartitionClientForTest(t *testing.T, partitionID string, subscribeOptions azeventhubs.NewPartitionClientOptions) (*azeventhubs.PartitionClient, func()) { + testParams := getConnectionParams(t) + + origClient, err := azeventhubs.NewConsumerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, "$Default", &azeventhubs.ConsumerClientOptions{ // Today we treat the link stolen error as retryable. I've filed an issue to look at making this fatal // instead since it's likely to be a configuration/runtime issue where the user has two consumers // starting up with the same ownerlevel. Having them fight with retries is probably undesirable. @@ -307,17 +319,16 @@ func TestConsumerClient_SameEpoch(t *testing.T) { }) require.NoError(t, err) - defer func() { - err := noEpochClient.Close(context.Background()) - require.NoError(t, err) - }() + partClient, err := origClient.NewPartitionClient(partitionID, &subscribeOptions) + require.NoError(t, err) - ctx, cancel = context.WithTimeout(context.Background(), time.Minute) - defer cancel() + return partClient, func() { + err := partClient.Close(context.Background()) + require.NoError(t, err) - _, err = noEpochClient.ReceiveEvents(ctx, 1, nil) - require.Error(t, err) - require.Contains(t, err.Error(), "amqp:link:stolen") + err = origClient.Close(context.Background()) + require.NoError(t, err) + } } func TestConsumerClient_StartPositions(t *testing.T) { @@ -354,7 +365,15 @@ func TestConsumerClient_StartPositions(t *testing.T) { require.NoError(t, err) t.Run("offset", func(t *testing.T) { - consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, "0", azeventhubs.DefaultConsumerGroup, &azeventhubs.ConsumerClientOptions{ + consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, azeventhubs.DefaultConsumerGroup, nil) + require.NoError(t, err) + + defer func() { + err := consumerClient.Close(context.Background()) + require.NoError(t, err) + }() + + subscription, err := consumerClient.NewPartitionClient("0", &azeventhubs.NewPartitionClientOptions{ StartPosition: azeventhubs.StartPosition{ Offset: &origPartProps.LastEnqueuedOffset, }, @@ -362,24 +381,20 @@ func TestConsumerClient_StartPositions(t *testing.T) { require.NoError(t, err) defer func() { - err := consumerClient.Close(context.Background()) + err := subscription.Close(context.Background()) require.NoError(t, err) }() ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - events, err := consumerClient.ReceiveEvents(ctx, 2, nil) + events, err := subscription.ReceiveEvents(ctx, 2, nil) require.NoError(t, err) require.Equal(t, []string{"message 1", "message 2"}, getSortedBodies(events)) }) t.Run("enqueuedTime", func(t *testing.T) { - consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, "0", azeventhubs.DefaultConsumerGroup, &azeventhubs.ConsumerClientOptions{ - StartPosition: azeventhubs.StartPosition{ - EnqueuedTime: &origPartProps.LastEnqueuedOn, - }, - }) + consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, azeventhubs.DefaultConsumerGroup, nil) require.NoError(t, err) defer func() { @@ -387,24 +402,38 @@ func TestConsumerClient_StartPositions(t *testing.T) { require.NoError(t, err) }() + subscription, err := consumerClient.NewPartitionClient("0", &azeventhubs.NewPartitionClientOptions{ + StartPosition: azeventhubs.StartPosition{ + EnqueuedTime: &origPartProps.LastEnqueuedOn, + }, + }) + require.NoError(t, err) + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - events, err := consumerClient.ReceiveEvents(ctx, 2, nil) + events, err := subscription.ReceiveEvents(ctx, 2, nil) require.NoError(t, err) require.Equal(t, []string{"message 1", "message 2"}, getSortedBodies(events)) }) t.Run("earliest", func(t *testing.T) { - consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, "0", azeventhubs.DefaultConsumerGroup, &azeventhubs.ConsumerClientOptions{ + consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, azeventhubs.DefaultConsumerGroup, nil) + require.NoError(t, err) + + defer func() { + err := consumerClient.Close(context.Background()) + require.NoError(t, err) + }() + + subscription, err := consumerClient.NewPartitionClient("0", &azeventhubs.NewPartitionClientOptions{ StartPosition: azeventhubs.StartPosition{ Earliest: to.Ptr(true), }, }) require.NoError(t, err) - defer func() { - err := consumerClient.Close(context.Background()) + err := subscription.Close(context.Background()) require.NoError(t, err) }() @@ -412,7 +441,7 @@ func TestConsumerClient_StartPositions(t *testing.T) { defer cancel() // we know there are _at_ two events but it's okay if they're just any events. - events, err := consumerClient.ReceiveEvents(ctx, 2, nil) + events, err := subscription.ReceiveEvents(ctx, 2, nil) require.NoError(t, err) require.Equal(t, 2, len(events)) }) @@ -421,12 +450,7 @@ func TestConsumerClient_StartPositions(t *testing.T) { func TestConsumerClient_StartPosition_Latest(t *testing.T) { testParams := getConnectionParams(t) - consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, "0", azeventhubs.DefaultConsumerGroup, - &azeventhubs.ConsumerClientOptions{ - StartPosition: azeventhubs.StartPosition{ - Latest: to.Ptr(true), - }, - }) + consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, azeventhubs.DefaultConsumerGroup, nil) require.NoError(t, err) defer func() { @@ -441,7 +465,19 @@ func TestConsumerClient_StartPosition_Latest(t *testing.T) { latestEventsCh := make(chan []*azeventhubs.ReceivedEventData, 1) go func() { - events, err := consumerClient.ReceiveEvents(context.Background(), 2, nil) + subscription, err := consumerClient.NewPartitionClient("0", &azeventhubs.NewPartitionClientOptions{ + StartPosition: azeventhubs.StartPosition{ + Latest: to.Ptr(true), + }, + }) + require.NoError(t, err) + + defer func() { + err := subscription.Close(context.Background()) + require.NoError(t, err) + }() + + events, err := subscription.ReceiveEvents(context.Background(), 2, nil) require.NoError(t, err) latestEventsCh <- events }() @@ -489,8 +525,9 @@ func TestConsumerClient_StartPosition_Latest(t *testing.T) { // NOTE: the message that's passed in does get altered so don't count on it being unchanged after calling // this function. Each message gets an additional property (DestPartitionID), set to the parttion ID that // we sent it to. -func mustSendEventsToAllPartitions(t *testing.T, cs string, eventHub string, events []*azeventhubs.EventData) []azeventhubs.PartitionProperties { - producer, err := azeventhubs.NewProducerClientFromConnectionString(cs, eventHub, nil) +func mustSendEventsToAllPartitions(t *testing.T, events []*azeventhubs.EventData) []azeventhubs.PartitionProperties { + testParams := getConnectionParams(t) + producer, err := azeventhubs.NewProducerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, nil) require.NoError(t, err) defer func() { diff --git a/sdk/messaging/azeventhubs/consumer_client_unit_test.go b/sdk/messaging/azeventhubs/consumer_client_unit_test.go index 6501e6bb34b6..900e46d4d37f 100644 --- a/sdk/messaging/azeventhubs/consumer_client_unit_test.go +++ b/sdk/messaging/azeventhubs/consumer_client_unit_test.go @@ -15,12 +15,12 @@ func TestUnitNewConsumerClient(t *testing.T) { t.Run("ConnectionStringNoEntityPath", func(t *testing.T) { connectionStringNoEntityPath := "Endpoint=sb://.servicebus.windows.net/;SharedAccessKeyName=;SharedAccessKey=" - client, err := NewConsumerClientFromConnectionString(connectionStringNoEntityPath, "eventHubName", "0", "$Default", nil) + client, err := NewConsumerClientFromConnectionString(connectionStringNoEntityPath, "eventHubName", DefaultConsumerGroup, nil) require.NoError(t, err) require.NotNil(t, client) require.Equal(t, "eventHubName", client.eventHub) - client, err = NewConsumerClientFromConnectionString(connectionStringNoEntityPath, "", "0", "$Default", nil) + client, err = NewConsumerClientFromConnectionString(connectionStringNoEntityPath, "", DefaultConsumerGroup, nil) require.EqualError(t, err, "connection string does not contain an EntityPath. eventHub cannot be an empty string") require.Nil(t, client) }) @@ -28,19 +28,19 @@ func TestUnitNewConsumerClient(t *testing.T) { t.Run("ConnectionStringWithEntityPath", func(t *testing.T) { connectionStringWithEntityPath := "Endpoint=sb://.servicebus.windows.net/;SharedAccessKeyName=;SharedAccessKey=;EntityPath=eventHubName" - client, err := NewConsumerClientFromConnectionString(connectionStringWithEntityPath, "", "0", "$Default", nil) + client, err := NewConsumerClientFromConnectionString(connectionStringWithEntityPath, "", DefaultConsumerGroup, nil) require.NoError(t, err) require.NotNil(t, client) require.Equal(t, "eventHubName", client.eventHub) - client, err = NewConsumerClientFromConnectionString(connectionStringWithEntityPath, "eventHubName", "0", "$Default", nil) + client, err = NewConsumerClientFromConnectionString(connectionStringWithEntityPath, "eventHubName", DefaultConsumerGroup, nil) require.EqualError(t, err, "connection string contains an EntityPath. eventHub must be an empty string") require.Nil(t, client) }) t.Run("TokenCredential", func(t *testing.T) { tokenCredential := fakeTokenCredential{} - client, err := NewConsumerClient("ripark.servicebus.windows.net", "eventHubName", "0", "$Default", tokenCredential, nil) + client, err := NewConsumerClient("ripark.servicebus.windows.net", "eventHubName", DefaultConsumerGroup, tokenCredential, nil) require.NoError(t, err) require.NotNil(t, client) require.Equal(t, "eventHubName", client.eventHub) diff --git a/sdk/messaging/azeventhubs/example_consumerclient_test.go b/sdk/messaging/azeventhubs/example_consumerclient_test.go index 660cf0efe789..ce70caccdb8f 100644 --- a/sdk/messaging/azeventhubs/example_consumerclient_test.go +++ b/sdk/messaging/azeventhubs/example_consumerclient_test.go @@ -11,7 +11,6 @@ import ( ) var consumerClient *azeventhubs.ConsumerClient -var consumerGroup string var err error func ExampleNewConsumerClient() { @@ -21,7 +20,7 @@ func ExampleNewConsumerClient() { panic(err) } - consumerClient, err = azeventhubs.NewConsumerClient("", "eventhub-name", "partition id", consumerGroup, defaultAzureCred, nil) + consumerClient, err = azeventhubs.NewConsumerClient("", "eventhub-name", azeventhubs.DefaultConsumerGroup, defaultAzureCred, nil) if err != nil { panic(err) @@ -32,21 +31,31 @@ func ExampleNewConsumerClientFromConnectionString() { // if the connection string contains an EntityPath // connectionString := "Endpoint=sb://.servicebus.windows.net/;SharedAccessKeyName=;SharedAccessKey=;EntityPath=" - consumerClient, err = azeventhubs.NewConsumerClientFromConnectionString(connectionString, "", "partition id", consumerGroup, nil) + consumerClient, err = azeventhubs.NewConsumerClientFromConnectionString(connectionString, "", azeventhubs.DefaultConsumerGroup, nil) // or // if the connection string does not contain an EntityPath connectionString = "Endpoint=sb://.servicebus.windows.net/;SharedAccessKeyName=;SharedAccessKey=" - consumerClient, err = azeventhubs.NewConsumerClientFromConnectionString(connectionString, "eventhub-name", "partition id", consumerGroup, nil) + consumerClient, err = azeventhubs.NewConsumerClientFromConnectionString(connectionString, "eventhub-name", azeventhubs.DefaultConsumerGroup, nil) if err != nil { panic(err) } } -func ExampleConsumerClient_ReceiveEvents() { - events, err := consumerClient.ReceiveEvents(context.TODO(), 100, nil) +func ExampleConsumerClient_NewPartitionClient_receiveEvents() { + const partitionID = "0" + + partitionClient, err := consumerClient.NewPartitionClient(partitionID, nil) + + if err != nil { + panic(err) + } + + defer partitionClient.Close(context.TODO()) + + events, err := partitionClient.ReceiveEvents(context.TODO(), 100, nil) if err != nil { panic(err) diff --git a/sdk/messaging/azeventhubs/example_consuming_events_test.go b/sdk/messaging/azeventhubs/example_consuming_events_test.go index 11d5883cfb85..1ab7c80e38e9 100644 --- a/sdk/messaging/azeventhubs/example_consuming_events_test.go +++ b/sdk/messaging/azeventhubs/example_consuming_events_test.go @@ -27,7 +27,7 @@ func Example_consuming() { // // consumerClient, err = azeventhubs.NewConsumerClientFromConnectionString(connectionString, "", "partition id", consumerGroup, nil) // - consumerClient, err = azeventhubs.NewConsumerClient(eventHubNamespace, eventHubName, eventHubPartitionID, azeventhubs.DefaultConsumerGroup, defaultAzureCred, nil) + consumerClient, err = azeventhubs.NewConsumerClient(eventHubNamespace, eventHubName, azeventhubs.DefaultConsumerGroup, defaultAzureCred, nil) if err != nil { panic(err) @@ -35,12 +35,20 @@ func Example_consuming() { defer consumerClient.Close(context.TODO()) + subscription, err := consumerClient.NewPartitionClient(eventHubPartitionID, nil) + + if err != nil { + panic(err) + } + + defer subscription.Close(context.TODO()) + for { // ReceiveEvents will wait until it either receives the # of events requested (100, in this call) // or if the context is cancelled, in which case it'll return any messages it has received. - ctx, cancel := context.WithTimeout(context.TODO(), time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - events, err := consumerClient.ReceiveEvents(ctx, 100, nil) + events, err := subscription.ReceiveEvents(ctx, 100, nil) cancel() if err != nil { diff --git a/sdk/messaging/azeventhubs/internal/eh/stress/tests/finite_send_and_receive.go b/sdk/messaging/azeventhubs/internal/eh/stress/tests/finite_send_and_receive.go index 7674a88ac99b..ebb3e17c464e 100644 --- a/sdk/messaging/azeventhubs/internal/eh/stress/tests/finite_send_and_receive.go +++ b/sdk/messaging/azeventhubs/internal/eh/stress/tests/finite_send_and_receive.go @@ -173,9 +173,7 @@ func consumeEventsFromPartition(cs string, hubName string, partProps azeventhubs } } - consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(cs, hubName, partProps.PartitionID, azeventhubs.DefaultConsumerGroup, &azeventhubs.ConsumerClientOptions{ - StartPosition: startPosition, - }) + consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(cs, hubName, azeventhubs.DefaultConsumerGroup, nil) if err != nil { return err @@ -190,6 +188,14 @@ func consumeEventsFromPartition(cs string, hubName string, partProps azeventhubs // read in 10 second chunks. If we ever end a 10 second chunk with no messages // then we've probably just failed. + subscription, err := consumerClient.NewPartitionClient(partProps.PartitionID, &azeventhubs.NewPartitionClientOptions{ + StartPosition: startPosition, + }) + + if err != nil { + panic(err) + } + sequenceNumbers := map[int64]int64{} numEmptyBatches := 0 @@ -200,7 +206,7 @@ func consumeEventsFromPartition(cs string, hubName string, partProps azeventhubs ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - events, err := consumerClient.ReceiveEvents(ctx, 1000, nil) + events, err := subscription.ReceiveEvents(ctx, 1000, nil) if err != nil { if errors.Is(err, context.DeadlineExceeded) { diff --git a/sdk/messaging/azeventhubs/partition_client.go b/sdk/messaging/azeventhubs/partition_client.go new file mode 100644 index 000000000000..e0b07c0f8a69 --- /dev/null +++ b/sdk/messaging/azeventhubs/partition_client.go @@ -0,0 +1,267 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package azeventhubs + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/Azure/azure-sdk-for-go/sdk/internal/log" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp" +) + +// DefaultConsumerGroup is the name of the default consumer group in the Event Hubs service. +const DefaultConsumerGroup = "$Default" + +// StartPosition indicates the position to start receiving events within a partition. +// The default position is Latest. +type StartPosition struct { + // Offset will start the consumer after the specified offset. Can be exclusive + // or inclusive, based on the Inclusive property. + // NOTE: offsets are not stable values, and might refer to different events over time + // as the Event Hub events reach their age limit and are discarded. + Offset *int64 + + // SequenceNumber will start the consumer after the specified sequence number. Can be exclusive + // or inclusive, based on the Inclusive property. + SequenceNumber *int64 + + // EnqueuedTime will start the consumer before events that were enqueued on or after EnqueuedTime. + // Can be exclusive or inclusive, based on the Inclusive property. + EnqueuedTime *time.Time + + // Inclusive configures whether the events directly at Offset, SequenceNumber or EnqueuedTime will be included (true) + // or excluded (false). + Inclusive bool + + // Earliest will start the consumer at the earliest event. + Earliest *bool + + // Latest will start the consumer after the last event. + Latest *bool +} + +// PartitionClient is used to receive events from an Event Hub partition. +type PartitionClient struct { + retryOptions RetryOptions + eventHub string + consumerGroup string + partitionID string + ownerLevel *int64 + + offsetExpression string + + links *internal.Links[amqpwrap.AMQPReceiverCloser] +} + +// ReceiveEventsOptions contains optional parameters for the ReceiveEvents function +type ReceiveEventsOptions struct { + // For future expansion +} + +// ReceiveEvents receives events until the context has expired or been cancelled. +func (cc *PartitionClient) ReceiveEvents(ctx context.Context, count int, options *ReceiveEventsOptions) ([]*ReceivedEventData, error) { + var events []*ReceivedEventData + + err := cc.links.Retry(ctx, EventConsumer, "ReceiveEvents", cc.partitionID, cc.retryOptions, func(ctx context.Context, lwid internal.LinkWithID[amqpwrap.AMQPReceiverCloser]) error { + events = nil + + outstandingCredits := lwid.Link.Credits() + + if count > int(outstandingCredits) { + newCredits := uint32(count) - outstandingCredits + + log.Writef(EventConsumer, "Have %d outstanding credit, only issuing %d credits", outstandingCredits, newCredits) + + if err := lwid.Link.IssueCredit(newCredits); err != nil { + return err + } + } + + for { + amqpMessage, err := lwid.Link.Receive(ctx) + + if err != nil { + prefetched := getAllPrefetched(lwid.Link, count-len(events)) + + for _, amqpMsg := range prefetched { + events = append(events, newReceivedEventData(amqpMsg)) + } + + // this lets cancel errors just return + return err + } + + receivedEvent := newReceivedEventData(amqpMessage) + events = append(events, receivedEvent) + + if len(events) == count { + return nil + } + } + }) + + if err != nil && len(events) == 0 { + // TODO: if we get a "partition ownership lost" we need to think about whether that's retryable. + return nil, internal.TransformError(err) + } + + cc.offsetExpression = formatOffsetExpressionForSequence(">", events[len(events)-1].SequenceNumber) + return events, nil +} + +// Close closes the consumer's link and the underlying AMQP connection. +func (cc *PartitionClient) Close(ctx context.Context) error { + return cc.links.Close(ctx) +} + +func (s *PartitionClient) getEntityPath(partitionID string) string { + return fmt.Sprintf("%s/ConsumerGroups/%s/Partitions/%s", s.eventHub, s.consumerGroup, partitionID) +} + +const defaultLinkRxBuffer = 2048 + +func (s *PartitionClient) newEventHubConsumerLink(ctx context.Context, session amqpwrap.AMQPSession, entityPath string) (internal.AMQPReceiverCloser, error) { + var props map[string]interface{} + + if s.ownerLevel != nil { + props = map[string]interface{}{ + "com.microsoft:epoch": *s.ownerLevel, + } + } + + receiver, err := session.NewReceiver(ctx, entityPath, &amqp.ReceiverOptions{ + SettlementMode: to.Ptr(amqp.ModeFirst), + ManualCredits: true, + Credit: defaultLinkRxBuffer, + Filters: []amqp.LinkFilter{ + amqp.LinkFilterSelector(s.offsetExpression), + }, + Properties: props, + }) + + if err != nil { + return nil, err + } + + return receiver, nil +} + +type partitionClientArgs struct { + namespace *internal.Namespace + + eventHub string + partitionID string + + consumerGroup string + + retryOptions RetryOptions +} + +func newPartitionClient(args partitionClientArgs, options *NewPartitionClientOptions) (*PartitionClient, error) { + if options == nil { + options = &NewPartitionClientOptions{} + } + + offsetExpr, err := getOffsetExpression(options.StartPosition) + + if err != nil { + return nil, err + } + + client := &PartitionClient{ + eventHub: args.eventHub, + partitionID: args.partitionID, + ownerLevel: options.OwnerLevel, + consumerGroup: args.consumerGroup, + offsetExpression: offsetExpr, + retryOptions: args.retryOptions, + } + + client.links = internal.NewLinks(args.namespace, fmt.Sprintf("%s/$management", client.eventHub), client.getEntityPath, client.newEventHubConsumerLink) + + return client, nil +} + +func getAllPrefetched(receiver amqpwrap.AMQPReceiver, max int) []*amqp.Message { + var messages []*amqp.Message + + for i := 0; i < max; i++ { + msg := receiver.Prefetched() + + if msg == nil { + break + } + + messages = append(messages, msg) + } + + return messages +} + +func getOffsetExpression(startPosition StartPosition) (string, error) { + lt := ">" + + if startPosition.Inclusive { + lt = ">=" + } + + var errMultipleFieldsSet = errors.New("only a single start point can be set: Earliest, EnqueuedTime, Latest, Offset, or SequenceNumber") + + offsetExpr := "" + + if startPosition.EnqueuedTime != nil { + // time-based, non-inclusive + offsetExpr = fmt.Sprintf("amqp.annotation.x-opt-enqueued-time %s '%d'", lt, startPosition.EnqueuedTime.UnixMilli()) + } + + if startPosition.Offset != nil { + // offset-based, non-inclusive + // ex: amqp.annotation.x-opt-enqueued-time %s '165805323000' + if offsetExpr != "" { + return "", errMultipleFieldsSet + } + + offsetExpr = fmt.Sprintf("amqp.annotation.x-opt-offset %s '%d'", lt, *startPosition.Offset) + } + + if startPosition.Latest != nil && *startPosition.Latest { + if offsetExpr != "" { + return "", errMultipleFieldsSet + } + + offsetExpr = "amqp.annotation.x-opt-offset > '@latest'" + } + + if startPosition.SequenceNumber != nil { + if offsetExpr != "" { + return "", errMultipleFieldsSet + } + + offsetExpr = formatOffsetExpressionForSequence(lt, *startPosition.SequenceNumber) + } + + if startPosition.Earliest != nil && *startPosition.Earliest { + if offsetExpr != "" { + return "", errMultipleFieldsSet + } + + return "amqp.annotation.x-opt-offset > '-1'", nil + } + + if offsetExpr != "" { + return offsetExpr, nil + } + + // default to the start + return "amqp.annotation.x-opt-offset > '@latest'", nil +} + +func formatOffsetExpressionForSequence(op string, sequenceNumber int64) string { + return fmt.Sprintf("amqp.annotation.x-opt-sequence-number %s '%d'", op, sequenceNumber) +} diff --git a/sdk/messaging/azeventhubs/producer_client_test.go b/sdk/messaging/azeventhubs/producer_client_test.go index c50d7698bd1c..3812ac31bec5 100644 --- a/sdk/messaging/azeventhubs/producer_client_test.go +++ b/sdk/messaging/azeventhubs/producer_client_test.go @@ -107,9 +107,7 @@ func TestNewProducerClient_SendToAny(t *testing.T) { go func(partProps azeventhubs.PartitionProperties) { defer wg.Done() - consumer, err := azeventhubs.NewConsumerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, partProps.PartitionID, azeventhubs.DefaultConsumerGroup, &azeventhubs.ConsumerClientOptions{ - StartPosition: getStartPosition(partProps), - }) + consumer, err := azeventhubs.NewConsumerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, azeventhubs.DefaultConsumerGroup, nil) require.NoError(t, err) defer func() { @@ -117,7 +115,12 @@ func TestNewProducerClient_SendToAny(t *testing.T) { require.NoError(t, err) }() - events, err := consumer.ReceiveEvents(ctx, 1, nil) + subscription, err := consumer.NewPartitionClient(partProps.PartitionID, &azeventhubs.NewPartitionClientOptions{ + StartPosition: getStartPosition(partProps), + }) + require.NoError(t, err) + + events, err := subscription.ReceiveEvents(ctx, 1, nil) if err != nil { if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { @@ -180,9 +183,7 @@ func sendAndReceiveToPartitionTest(t *testing.T, cs string, eventHubName string, partProps, err := producer.GetPartitionProperties(context.Background(), partitionID, &azeventhubs.GetPartitionPropertiesOptions{}) require.NoError(t, err) - consumer, err := azeventhubs.NewConsumerClientFromConnectionString(cs, eventHubName, partitionID, azeventhubs.DefaultConsumerGroup, &azeventhubs.ConsumerClientOptions{ - StartPosition: getStartPosition(partProps), - }) + consumer, err := azeventhubs.NewConsumerClientFromConnectionString(cs, eventHubName, azeventhubs.DefaultConsumerGroup, nil) require.NoError(t, err) defer func() { @@ -222,8 +223,13 @@ func sendAndReceiveToPartitionTest(t *testing.T, cs string, eventHubName string, var actualBodies []string + subscription, err := consumer.NewPartitionClient(partitionID, &azeventhubs.NewPartitionClientOptions{ + StartPosition: getStartPosition(partProps), + }) + require.NoError(t, err) + for { - events, err := consumer.ReceiveEvents(ctx, 100, nil) + events, err := subscription.ReceiveEvents(ctx, 100, nil) require.NoError(t, err) for _, event := range events { @@ -242,11 +248,13 @@ func sendAndReceiveToPartitionTest(t *testing.T, cs string, eventHubName string, require.Equal(t, expectedBodies, actualBodies) } -func getConnectionParams(t *testing.T) struct { +type connectionParams struct { ConnectionString string EventHubName string EventHubNamespace string -} { +} + +func getConnectionParams(t *testing.T) connectionParams { _ = godotenv.Load() cs := os.Getenv("EVENTHUB_CONNECTION_STRING") @@ -254,11 +262,7 @@ func getConnectionParams(t *testing.T) struct { if cs == "" { t.Skipf("EVENTHUB_CONNECTION_STRING must be defined in the environment. Live test skipped.") - return struct { - ConnectionString string - EventHubName string - EventHubNamespace string - }{} + return connectionParams{} } eventHubName := os.Getenv("EVENTHUB_NAME") @@ -276,11 +280,7 @@ func getConnectionParams(t *testing.T) struct { parsedConn, err := conn.ParsedConnectionFromStr(cs) require.NoError(t, err) - return struct { - ConnectionString string - EventHubName string - EventHubNamespace string - }{ + return connectionParams{ ConnectionString: cs, EventHubName: eventHubName, EventHubNamespace: parsedConn.Namespace,