Skip to content

Commit

Permalink
[azeventhubs] Latest start position can also be inclusive (ie, get th…
Browse files Browse the repository at this point in the history
…e latest message) (#20744)
  • Loading branch information
richardpark-msft authored May 4, 2023
1 parent e1a6152 commit 04b463d
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 24 deletions.
30 changes: 17 additions & 13 deletions sdk/messaging/azeventhubs/consumer_client_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,42 +49,46 @@ func TestUnitNewConsumerClient(t *testing.T) {

func TestUnit_getOffsetExpression(t *testing.T) {
t.Run("Valid", func(t *testing.T) {
expr, err := getOffsetExpression(StartPosition{})
expr, err := getStartExpression(StartPosition{})
require.NoError(t, err)
require.Equal(t, "amqp.annotation.x-opt-offset > '@latest'", expr)

expr, err = getOffsetExpression(StartPosition{Earliest: to.Ptr(true)})
expr, err = getStartExpression(StartPosition{Earliest: to.Ptr(true)})
require.NoError(t, err)
require.Equal(t, "amqp.annotation.x-opt-offset > '-1'", expr)

expr, err = getOffsetExpression(StartPosition{Latest: to.Ptr(true)})
expr, err = getStartExpression(StartPosition{Latest: to.Ptr(true)})
require.NoError(t, err)
require.Equal(t, "amqp.annotation.x-opt-offset > '@latest'", expr)

expr, err = getOffsetExpression(StartPosition{Offset: to.Ptr(int64(101))})
expr, err = getStartExpression(StartPosition{Latest: to.Ptr(true), Inclusive: true})
require.NoError(t, err)
require.Equal(t, "amqp.annotation.x-opt-offset >= '@latest'", expr)

expr, err = getStartExpression(StartPosition{Offset: to.Ptr(int64(101))})
require.NoError(t, err)
require.Equal(t, "amqp.annotation.x-opt-offset > '101'", expr)

expr, err = getOffsetExpression(StartPosition{Offset: to.Ptr(int64(101)), Inclusive: true})
expr, err = getStartExpression(StartPosition{Offset: to.Ptr(int64(101)), Inclusive: true})
require.NoError(t, err)
require.Equal(t, "amqp.annotation.x-opt-offset >= '101'", expr)

expr, err = getOffsetExpression(StartPosition{SequenceNumber: to.Ptr(int64(202))})
expr, err = getStartExpression(StartPosition{SequenceNumber: to.Ptr(int64(202))})
require.NoError(t, err)
require.Equal(t, "amqp.annotation.x-opt-sequence-number > '202'", expr)

expr, err = getOffsetExpression(StartPosition{SequenceNumber: to.Ptr(int64(202)), Inclusive: true})
expr, err = getStartExpression(StartPosition{SequenceNumber: to.Ptr(int64(202)), Inclusive: true})
require.NoError(t, err)
require.Equal(t, "amqp.annotation.x-opt-sequence-number >= '202'", expr)

enqueueTime, err := time.Parse(time.RFC3339, "2020-01-01T01:02:03Z")
require.NoError(t, err)

expr, err = getOffsetExpression(StartPosition{EnqueuedTime: &enqueueTime})
expr, err = getStartExpression(StartPosition{EnqueuedTime: &enqueueTime})
require.NoError(t, err)
require.Equal(t, "amqp.annotation.x-opt-enqueued-time > '1577840523000'", expr)

expr, err = getOffsetExpression(StartPosition{EnqueuedTime: &enqueueTime, Inclusive: true})
expr, err = getStartExpression(StartPosition{EnqueuedTime: &enqueueTime, Inclusive: true})
require.NoError(t, err)
require.Equal(t, "amqp.annotation.x-opt-enqueued-time >= '1577840523000'", expr)
})
Expand All @@ -93,28 +97,28 @@ func TestUnit_getOffsetExpression(t *testing.T) {
enqueueTime, err := time.Parse(time.RFC3339, "2020-01-01T01:02:03Z")
require.NoError(t, err)

expr, err := getOffsetExpression(StartPosition{
expr, err := getStartExpression(StartPosition{
EnqueuedTime: &enqueueTime,
Offset: to.Ptr[int64](101),
})
require.EqualError(t, err, "only a single start point can be set: Earliest, EnqueuedTime, Latest, Offset, or SequenceNumber")
require.Empty(t, expr)

expr, err = getOffsetExpression(StartPosition{
expr, err = getStartExpression(StartPosition{
Offset: to.Ptr[int64](202),
Latest: to.Ptr(true),
})
require.EqualError(t, err, "only a single start point can be set: Earliest, EnqueuedTime, Latest, Offset, or SequenceNumber")
require.Empty(t, expr)

expr, err = getOffsetExpression(StartPosition{
expr, err = getStartExpression(StartPosition{
Latest: to.Ptr(true),
SequenceNumber: to.Ptr[int64](202),
})
require.EqualError(t, err, "only a single start point can be set: Earliest, EnqueuedTime, Latest, Offset, or SequenceNumber")
require.Empty(t, expr)

expr, err = getOffsetExpression(StartPosition{
expr, err = getStartExpression(StartPosition{
SequenceNumber: to.Ptr[int64](202),
Earliest: to.Ptr(true),
})
Expand Down
20 changes: 10 additions & 10 deletions sdk/messaging/azeventhubs/partition_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (pc *PartitionClient) ReceiveEvents(ctx context.Context, count int, options
numEvents := len(events)
lastSequenceNumber := events[numEvents-1].SequenceNumber

pc.offsetExpression = formatOffsetExpressionForSequence(">", lastSequenceNumber)
pc.offsetExpression = formatStartExpressionForSequence(">", lastSequenceNumber)
log.Writef(EventConsumer, "%d Events received, moving sequence to %d", numEvents, lastSequenceNumber)
return events, nil
}
Expand Down Expand Up @@ -274,7 +274,7 @@ func newPartitionClient(args partitionClientArgs, options *PartitionClientOption
options = &PartitionClientOptions{}
}

offsetExpr, err := getOffsetExpression(options.StartPosition)
offsetExpr, err := getStartExpression(options.StartPosition)

if err != nil {
return nil, err
Expand Down Expand Up @@ -317,11 +317,11 @@ func getAllPrefetched(receiver amqpwrap.AMQPReceiver, max int) []*amqp.Message {
return messages
}

func getOffsetExpression(startPosition StartPosition) (string, error) {
lt := ">"
func getStartExpression(startPosition StartPosition) (string, error) {
gt := ">"

if startPosition.Inclusive {
lt = ">="
gt = ">="
}

var errMultipleFieldsSet = errors.New("only a single start point can be set: Earliest, EnqueuedTime, Latest, Offset, or SequenceNumber")
Expand All @@ -330,7 +330,7 @@ func getOffsetExpression(startPosition StartPosition) (string, error) {

if startPosition.EnqueuedTime != nil {
// time-based, non-inclusive
offsetExpr = fmt.Sprintf("amqp.annotation.x-opt-enqueued-time %s '%d'", lt, startPosition.EnqueuedTime.UnixMilli())
offsetExpr = fmt.Sprintf("amqp.annotation.x-opt-enqueued-time %s '%d'", gt, startPosition.EnqueuedTime.UnixMilli())
}

if startPosition.Offset != nil {
Expand All @@ -340,23 +340,23 @@ func getOffsetExpression(startPosition StartPosition) (string, error) {
return "", errMultipleFieldsSet
}

offsetExpr = fmt.Sprintf("amqp.annotation.x-opt-offset %s '%d'", lt, *startPosition.Offset)
offsetExpr = fmt.Sprintf("amqp.annotation.x-opt-offset %s '%d'", gt, *startPosition.Offset)
}

if startPosition.Latest != nil && *startPosition.Latest {
if offsetExpr != "" {
return "", errMultipleFieldsSet
}

offsetExpr = "amqp.annotation.x-opt-offset > '@latest'"
offsetExpr = fmt.Sprintf("amqp.annotation.x-opt-offset %s '@latest'", gt)
}

if startPosition.SequenceNumber != nil {
if offsetExpr != "" {
return "", errMultipleFieldsSet
}

offsetExpr = formatOffsetExpressionForSequence(lt, *startPosition.SequenceNumber)
offsetExpr = formatStartExpressionForSequence(gt, *startPosition.SequenceNumber)
}

if startPosition.Earliest != nil && *startPosition.Earliest {
Expand All @@ -375,6 +375,6 @@ func getOffsetExpression(startPosition StartPosition) (string, error) {
return "amqp.annotation.x-opt-offset > '@latest'", nil
}

func formatOffsetExpressionForSequence(op string, sequenceNumber int64) string {
func formatStartExpressionForSequence(op string, sequenceNumber int64) string {
return fmt.Sprintf("amqp.annotation.x-opt-sequence-number %s '%d'", op, sequenceNumber)
}
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/processor_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func TestUnit_Processor_Run_startPosition(t *testing.T) {
fakeConsumerClient := simpleFakeConsumerClient()

fakeConsumerClient.newPartitionClientFn = func(partitionID string, options *PartitionClientOptions) (*PartitionClient, error) {
offsetExpr, err := getOffsetExpression(options.StartPosition)
offsetExpr, err := getStartExpression(options.StartPosition)
require.NoError(t, err)

return newFakePartitionClient(partitionID, offsetExpr), nil
Expand Down

0 comments on commit 04b463d

Please sign in to comment.