From 16b013732b87329d785d6052fc8cd5fd897b580a Mon Sep 17 00:00:00 2001 From: Richard Park Date: Thu, 4 May 2023 14:40:29 -0700 Subject: [PATCH] Latest can also be inclusive (ie, get the latest message). --- .../azeventhubs/consumer_client_unit_test.go | 30 +++++++++++-------- sdk/messaging/azeventhubs/partition_client.go | 20 ++++++------- .../azeventhubs/processor_unit_test.go | 2 +- 3 files changed, 28 insertions(+), 24 deletions(-) diff --git a/sdk/messaging/azeventhubs/consumer_client_unit_test.go b/sdk/messaging/azeventhubs/consumer_client_unit_test.go index 900e46d4d37f..1d91ff164280 100644 --- a/sdk/messaging/azeventhubs/consumer_client_unit_test.go +++ b/sdk/messaging/azeventhubs/consumer_client_unit_test.go @@ -49,42 +49,46 @@ func TestUnitNewConsumerClient(t *testing.T) { func TestUnit_getOffsetExpression(t *testing.T) { t.Run("Valid", func(t *testing.T) { - expr, err := getOffsetExpression(StartPosition{}) + expr, err := getStartExpression(StartPosition{}) require.NoError(t, err) require.Equal(t, "amqp.annotation.x-opt-offset > '@latest'", expr) - expr, err = getOffsetExpression(StartPosition{Earliest: to.Ptr(true)}) + expr, err = getStartExpression(StartPosition{Earliest: to.Ptr(true)}) require.NoError(t, err) require.Equal(t, "amqp.annotation.x-opt-offset > '-1'", expr) - expr, err = getOffsetExpression(StartPosition{Latest: to.Ptr(true)}) + expr, err = getStartExpression(StartPosition{Latest: to.Ptr(true)}) require.NoError(t, err) require.Equal(t, "amqp.annotation.x-opt-offset > '@latest'", expr) - expr, err = getOffsetExpression(StartPosition{Offset: to.Ptr(int64(101))}) + expr, err = getStartExpression(StartPosition{Latest: to.Ptr(true), Inclusive: true}) + require.NoError(t, err) + require.Equal(t, "amqp.annotation.x-opt-offset >= '@latest'", expr) + + expr, err = getStartExpression(StartPosition{Offset: to.Ptr(int64(101))}) require.NoError(t, err) require.Equal(t, "amqp.annotation.x-opt-offset > '101'", expr) - expr, err = getOffsetExpression(StartPosition{Offset: to.Ptr(int64(101)), Inclusive: true}) + expr, err = getStartExpression(StartPosition{Offset: to.Ptr(int64(101)), Inclusive: true}) require.NoError(t, err) require.Equal(t, "amqp.annotation.x-opt-offset >= '101'", expr) - expr, err = getOffsetExpression(StartPosition{SequenceNumber: to.Ptr(int64(202))}) + expr, err = getStartExpression(StartPosition{SequenceNumber: to.Ptr(int64(202))}) require.NoError(t, err) require.Equal(t, "amqp.annotation.x-opt-sequence-number > '202'", expr) - expr, err = getOffsetExpression(StartPosition{SequenceNumber: to.Ptr(int64(202)), Inclusive: true}) + expr, err = getStartExpression(StartPosition{SequenceNumber: to.Ptr(int64(202)), Inclusive: true}) require.NoError(t, err) require.Equal(t, "amqp.annotation.x-opt-sequence-number >= '202'", expr) enqueueTime, err := time.Parse(time.RFC3339, "2020-01-01T01:02:03Z") require.NoError(t, err) - expr, err = getOffsetExpression(StartPosition{EnqueuedTime: &enqueueTime}) + expr, err = getStartExpression(StartPosition{EnqueuedTime: &enqueueTime}) require.NoError(t, err) require.Equal(t, "amqp.annotation.x-opt-enqueued-time > '1577840523000'", expr) - expr, err = getOffsetExpression(StartPosition{EnqueuedTime: &enqueueTime, Inclusive: true}) + expr, err = getStartExpression(StartPosition{EnqueuedTime: &enqueueTime, Inclusive: true}) require.NoError(t, err) require.Equal(t, "amqp.annotation.x-opt-enqueued-time >= '1577840523000'", expr) }) @@ -93,28 +97,28 @@ func TestUnit_getOffsetExpression(t *testing.T) { enqueueTime, err := time.Parse(time.RFC3339, "2020-01-01T01:02:03Z") require.NoError(t, err) - expr, err := getOffsetExpression(StartPosition{ + expr, err := getStartExpression(StartPosition{ EnqueuedTime: &enqueueTime, Offset: to.Ptr[int64](101), }) require.EqualError(t, err, "only a single start point can be set: Earliest, EnqueuedTime, Latest, Offset, or SequenceNumber") require.Empty(t, expr) - expr, err = getOffsetExpression(StartPosition{ + expr, err = getStartExpression(StartPosition{ Offset: to.Ptr[int64](202), Latest: to.Ptr(true), }) require.EqualError(t, err, "only a single start point can be set: Earliest, EnqueuedTime, Latest, Offset, or SequenceNumber") require.Empty(t, expr) - expr, err = getOffsetExpression(StartPosition{ + expr, err = getStartExpression(StartPosition{ Latest: to.Ptr(true), SequenceNumber: to.Ptr[int64](202), }) require.EqualError(t, err, "only a single start point can be set: Earliest, EnqueuedTime, Latest, Offset, or SequenceNumber") require.Empty(t, expr) - expr, err = getOffsetExpression(StartPosition{ + expr, err = getStartExpression(StartPosition{ SequenceNumber: to.Ptr[int64](202), Earliest: to.Ptr(true), }) diff --git a/sdk/messaging/azeventhubs/partition_client.go b/sdk/messaging/azeventhubs/partition_client.go index 8c5ab53fe8a3..654099f53016 100644 --- a/sdk/messaging/azeventhubs/partition_client.go +++ b/sdk/messaging/azeventhubs/partition_client.go @@ -184,7 +184,7 @@ func (pc *PartitionClient) ReceiveEvents(ctx context.Context, count int, options numEvents := len(events) lastSequenceNumber := events[numEvents-1].SequenceNumber - pc.offsetExpression = formatOffsetExpressionForSequence(">", lastSequenceNumber) + pc.offsetExpression = formatStartExpressionForSequence(">", lastSequenceNumber) log.Writef(EventConsumer, "%d Events received, moving sequence to %d", numEvents, lastSequenceNumber) return events, nil } @@ -274,7 +274,7 @@ func newPartitionClient(args partitionClientArgs, options *PartitionClientOption options = &PartitionClientOptions{} } - offsetExpr, err := getOffsetExpression(options.StartPosition) + offsetExpr, err := getStartExpression(options.StartPosition) if err != nil { return nil, err @@ -317,11 +317,11 @@ func getAllPrefetched(receiver amqpwrap.AMQPReceiver, max int) []*amqp.Message { return messages } -func getOffsetExpression(startPosition StartPosition) (string, error) { - lt := ">" +func getStartExpression(startPosition StartPosition) (string, error) { + gt := ">" if startPosition.Inclusive { - lt = ">=" + gt = ">=" } var errMultipleFieldsSet = errors.New("only a single start point can be set: Earliest, EnqueuedTime, Latest, Offset, or SequenceNumber") @@ -330,7 +330,7 @@ func getOffsetExpression(startPosition StartPosition) (string, error) { if startPosition.EnqueuedTime != nil { // time-based, non-inclusive - offsetExpr = fmt.Sprintf("amqp.annotation.x-opt-enqueued-time %s '%d'", lt, startPosition.EnqueuedTime.UnixMilli()) + offsetExpr = fmt.Sprintf("amqp.annotation.x-opt-enqueued-time %s '%d'", gt, startPosition.EnqueuedTime.UnixMilli()) } if startPosition.Offset != nil { @@ -340,7 +340,7 @@ func getOffsetExpression(startPosition StartPosition) (string, error) { return "", errMultipleFieldsSet } - offsetExpr = fmt.Sprintf("amqp.annotation.x-opt-offset %s '%d'", lt, *startPosition.Offset) + offsetExpr = fmt.Sprintf("amqp.annotation.x-opt-offset %s '%d'", gt, *startPosition.Offset) } if startPosition.Latest != nil && *startPosition.Latest { @@ -348,7 +348,7 @@ func getOffsetExpression(startPosition StartPosition) (string, error) { return "", errMultipleFieldsSet } - offsetExpr = "amqp.annotation.x-opt-offset > '@latest'" + offsetExpr = fmt.Sprintf("amqp.annotation.x-opt-offset %s '@latest'", gt) } if startPosition.SequenceNumber != nil { @@ -356,7 +356,7 @@ func getOffsetExpression(startPosition StartPosition) (string, error) { return "", errMultipleFieldsSet } - offsetExpr = formatOffsetExpressionForSequence(lt, *startPosition.SequenceNumber) + offsetExpr = formatStartExpressionForSequence(gt, *startPosition.SequenceNumber) } if startPosition.Earliest != nil && *startPosition.Earliest { @@ -375,6 +375,6 @@ func getOffsetExpression(startPosition StartPosition) (string, error) { return "amqp.annotation.x-opt-offset > '@latest'", nil } -func formatOffsetExpressionForSequence(op string, sequenceNumber int64) string { +func formatStartExpressionForSequence(op string, sequenceNumber int64) string { return fmt.Sprintf("amqp.annotation.x-opt-sequence-number %s '%d'", op, sequenceNumber) } diff --git a/sdk/messaging/azeventhubs/processor_unit_test.go b/sdk/messaging/azeventhubs/processor_unit_test.go index c21b31e8c77e..ff257b58d408 100644 --- a/sdk/messaging/azeventhubs/processor_unit_test.go +++ b/sdk/messaging/azeventhubs/processor_unit_test.go @@ -236,7 +236,7 @@ func TestUnit_Processor_Run_startPosition(t *testing.T) { fakeConsumerClient := simpleFakeConsumerClient() fakeConsumerClient.newPartitionClientFn = func(partitionID string, options *PartitionClientOptions) (*PartitionClient, error) { - offsetExpr, err := getOffsetExpression(options.StartPosition) + offsetExpr, err := getStartExpression(options.StartPosition) require.NoError(t, err) return newFakePartitionClient(partitionID, offsetExpr), nil