diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventPosition.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventPosition.java index 19c6979c93bbd..797fbb2d0446f 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventPosition.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventPosition.java @@ -89,4 +89,32 @@ static EventPosition fromStartOfStream() { static EventPosition fromEndOfStream() { return EventPositionImpl.fromEndOfStream(); } + + /** + * Gets the sequence number. + *

+ * @return the sequence number. + */ + Long getSequenceNumber(); + + /** + * Gets the enqueued time. + *

+ * @return the enqueued time. + */ + Instant getEnqueuedTime(); + + /** + * Gets the offset. + *

+ * @return the offset. + */ + String getOffset(); + + /** + * Gets the inclusive value. + *

+ * @return the inclusive value. + */ + boolean getInclusiveFlag(); } diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java index f0cf42a5dc5cf..979382f0d8a34 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java @@ -78,6 +78,14 @@ public interface PartitionReceiver { */ ReceiverRuntimeInformation getRuntimeInformation(); + /** + * Get the {@link EventPosition} that corresponds to an {@link EventData} which was returned last by the receiver. + *

This value will not be populated, unless the knob {@link ReceiverOptions#setReceiverRuntimeMetricEnabled(boolean)} is set. + * + * @return the EventPosition object. + */ + EventPosition getEventPosition(); + /** * Synchronous version of {@link #receive}. * diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataUtil.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataUtil.java index 4f339cb84a312..491ee6b6e60ee 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataUtil.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataUtil.java @@ -5,6 +5,7 @@ package com.microsoft.azure.eventhubs.impl; import com.microsoft.azure.eventhubs.EventData; +import com.microsoft.azure.eventhubs.EventPosition; import org.apache.qpid.proton.message.Message; import java.util.*; @@ -27,7 +28,7 @@ final class EventDataUtil { private EventDataUtil() { } - static LinkedList toEventDataCollection(final Collection messages, final PassByRef lastMessageRef) { + static LinkedList toEventDataCollection(final Collection messages, final PassByRef lastMessageRef) { if (messages == null) { return null; @@ -35,11 +36,13 @@ static LinkedList toEventDataCollection(final Collection mes LinkedList events = new LinkedList<>(); for (Message message : messages) { + EventData eventData = new EventDataImpl(message); + events.add(eventData); - events.add(new EventDataImpl(message)); - - if (lastMessageRef != null) - lastMessageRef.set(message); + if (lastMessageRef != null) { + lastMessageRef.set(new MessageWrapper(message, + EventPosition.fromSequenceNumber(eventData.getSystemProperties().getSequenceNumber(), true))); + } } return events; diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventPositionImpl.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventPositionImpl.java index 154ca360ce414..898975d5b0e22 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventPositionImpl.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventPositionImpl.java @@ -54,6 +54,22 @@ public static EventPositionImpl fromEndOfStream() { return new EventPositionImpl(ClientConstants.END_OF_STREAM, null, null, false); } + public Long getSequenceNumber() { + return this.sequenceNumber; + } + + public Instant getEnqueuedTime() { + return this.dateTime; + } + + public String getOffset() { + return this.offset; + } + + public boolean getInclusiveFlag() { + return this.inclusiveFlag; + } + String getExpression() { // order of preference if (this.offset != null) { diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageWrapper.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageWrapper.java new file mode 100644 index 0000000000000..f2388659bb5c5 --- /dev/null +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageWrapper.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) Microsoft. All rights reserved. + * Licensed under the MIT license. See LICENSE file in the project root for full license information. + */ +package com.microsoft.azure.eventhubs.impl; + +import com.microsoft.azure.eventhubs.EventPosition; +import org.apache.qpid.proton.message.Message; + +final class MessageWrapper { + private final Message message; + private final EventPosition eventPosition; + + MessageWrapper(Message message, EventPosition eventPosition) { + this.message = message; + this.eventPosition = eventPosition; + } + + Message getMessage() { + return this.message; + } + + EventPosition getEventPosition() { + return this.eventPosition; + } +} diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionReceiverImpl.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionReceiverImpl.java index eab32117d85d0..ed8fcde513282 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionReceiverImpl.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionReceiverImpl.java @@ -36,6 +36,7 @@ final class PartitionReceiverImpl extends ClientEntity implements ReceiverSettin private volatile MessageReceiver internalReceiver; private ReceivePump receivePump; + private EventPosition currentEventPosition; private PartitionReceiverImpl(MessagingFactory factory, final String eventHubName, @@ -60,6 +61,7 @@ private PartitionReceiverImpl(MessagingFactory factory, this.runtimeInformation = (this.receiverOptions != null && this.receiverOptions.getReceiverRuntimeMetricEnabled()) ? new ReceiverRuntimeInformation(partitionId) : null; + this.currentEventPosition = EventPosition.fromStartOfStream(); } static CompletableFuture create(MessagingFactory factory, @@ -134,31 +136,34 @@ public final long getEpoch() { } public final ReceiverRuntimeInformation getRuntimeInformation() { - return this.runtimeInformation; } + public final EventPosition getEventPosition() { + return this.currentEventPosition; + } + public CompletableFuture> receive(final int maxEventCount) { return this.internalReceiver.receive(maxEventCount).thenApplyAsync(new Function, Iterable>() { @Override public Iterable apply(Collection amqpMessages) { - PassByRef lastMessageRef = null; + PassByRef lastMessageRef = null; if (PartitionReceiverImpl.this.receiverOptions != null && PartitionReceiverImpl.this.receiverOptions.getReceiverRuntimeMetricEnabled()) lastMessageRef = new PassByRef<>(); final Iterable events = EventDataUtil.toEventDataCollection(amqpMessages, lastMessageRef); if (lastMessageRef != null && lastMessageRef.get() != null) { - - final DeliveryAnnotations deliveryAnnotations = lastMessageRef.get().getDeliveryAnnotations(); + final DeliveryAnnotations deliveryAnnotations = lastMessageRef.get().getMessage().getDeliveryAnnotations(); if (deliveryAnnotations != null && deliveryAnnotations.getValue() != null) { - final Map deliveryAnnotationsMap = deliveryAnnotations.getValue(); PartitionReceiverImpl.this.runtimeInformation.setRuntimeInformation( (long) deliveryAnnotationsMap.get(ClientConstants.LAST_ENQUEUED_SEQUENCE_NUMBER), ((Date) deliveryAnnotationsMap.get(ClientConstants.LAST_ENQUEUED_TIME_UTC)).toInstant(), (String) deliveryAnnotationsMap.get(ClientConstants.LAST_ENQUEUED_OFFSET)); } + + PartitionReceiverImpl.this.currentEventPosition = lastMessageRef.get().getEventPosition(); } return events; diff --git a/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiverRuntimeMetricsTest.java b/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiverRuntimeMetricsTest.java index bacb54e55c4b3..e86f4ed54a0fb 100644 --- a/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiverRuntimeMetricsTest.java +++ b/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiverRuntimeMetricsTest.java @@ -70,8 +70,11 @@ public void testRuntimeMetricsReturnedWhenEnabled() throws EventHubException { LinkedList receivedEventsWithOptions = new LinkedList<>(); while (receivedEventsWithOptions.size() < sentEvents) - for (EventData eData : receiverWithOptions.receiveSync(sentEvents)) + for (EventData eData : receiverWithOptions.receiveSync(1)) { receivedEventsWithOptions.add(eData); + Assert.assertEquals((Long) eData.getSystemProperties().getSequenceNumber(), + receiverWithOptions.getEventPosition().getSequenceNumber()); + } HashSet offsets = new HashSet<>(); for (EventData eData : receivedEventsWithOptions)