Skip to content

Commit

Permalink
PartitionReceiver - add a method that provides an EventPosition which…
Browse files Browse the repository at this point in the history
… corresponds to an EventData returned last by the receiver (Azure#408)
  • Loading branch information
sjkwak authored Dec 13, 2018
1 parent 6c781cd commit 71f8562
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,32 @@ static EventPosition fromStartOfStream() {
static EventPosition fromEndOfStream() {
return EventPositionImpl.fromEndOfStream();
}

/**
* Gets the sequence number.
* <p>
* @return the sequence number.
*/
Long getSequenceNumber();

/**
* Gets the enqueued time.
* <p>
* @return the enqueued time.
*/
Instant getEnqueuedTime();

/**
* Gets the offset.
* <p>
* @return the offset.
*/
String getOffset();

/**
* Gets the inclusive value.
* <p>
* @return the inclusive value.
*/
boolean getInclusiveFlag();
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ public interface PartitionReceiver {
*/
ReceiverRuntimeInformation getRuntimeInformation();

/**
* Get the {@link EventPosition} that corresponds to an {@link EventData} which was returned last by the receiver.
* <p> This value will not be populated, unless the knob {@link ReceiverOptions#setReceiverRuntimeMetricEnabled(boolean)} is set.
*
* @return the EventPosition object.
*/
EventPosition getEventPosition();

/**
* Synchronous version of {@link #receive}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package com.microsoft.azure.eventhubs.impl;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventPosition;
import org.apache.qpid.proton.message.Message;

import java.util.*;
Expand All @@ -27,19 +28,21 @@ final class EventDataUtil {
private EventDataUtil() {
}

static LinkedList<EventData> toEventDataCollection(final Collection<Message> messages, final PassByRef<Message> lastMessageRef) {
static LinkedList<EventData> toEventDataCollection(final Collection<Message> messages, final PassByRef<MessageWrapper> lastMessageRef) {

if (messages == null) {
return null;
}

LinkedList<EventData> events = new LinkedList<>();
for (Message message : messages) {
EventData eventData = new EventDataImpl(message);
events.add(eventData);

events.add(new EventDataImpl(message));

if (lastMessageRef != null)
lastMessageRef.set(message);
if (lastMessageRef != null) {
lastMessageRef.set(new MessageWrapper(message,
EventPosition.fromSequenceNumber(eventData.getSystemProperties().getSequenceNumber(), true)));
}
}

return events;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,22 @@ public static EventPositionImpl fromEndOfStream() {
return new EventPositionImpl(ClientConstants.END_OF_STREAM, null, null, false);
}

public Long getSequenceNumber() {
return this.sequenceNumber;
}

public Instant getEnqueuedTime() {
return this.dateTime;
}

public String getOffset() {
return this.offset;
}

public boolean getInclusiveFlag() {
return this.inclusiveFlag;
}

String getExpression() {
// order of preference
if (this.offset != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/
package com.microsoft.azure.eventhubs.impl;

import com.microsoft.azure.eventhubs.EventPosition;
import org.apache.qpid.proton.message.Message;

final class MessageWrapper {
private final Message message;
private final EventPosition eventPosition;

MessageWrapper(Message message, EventPosition eventPosition) {
this.message = message;
this.eventPosition = eventPosition;
}

Message getMessage() {
return this.message;
}

EventPosition getEventPosition() {
return this.eventPosition;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ final class PartitionReceiverImpl extends ClientEntity implements ReceiverSettin
private volatile MessageReceiver internalReceiver;

private ReceivePump receivePump;
private EventPosition currentEventPosition;

private PartitionReceiverImpl(MessagingFactory factory,
final String eventHubName,
Expand All @@ -60,6 +61,7 @@ private PartitionReceiverImpl(MessagingFactory factory,
this.runtimeInformation = (this.receiverOptions != null && this.receiverOptions.getReceiverRuntimeMetricEnabled())
? new ReceiverRuntimeInformation(partitionId)
: null;
this.currentEventPosition = EventPosition.fromStartOfStream();
}

static CompletableFuture<PartitionReceiver> create(MessagingFactory factory,
Expand Down Expand Up @@ -134,31 +136,34 @@ public final long getEpoch() {
}

public final ReceiverRuntimeInformation getRuntimeInformation() {

return this.runtimeInformation;
}

public final EventPosition getEventPosition() {
return this.currentEventPosition;
}

public CompletableFuture<Iterable<EventData>> receive(final int maxEventCount) {
return this.internalReceiver.receive(maxEventCount).thenApplyAsync(new Function<Collection<Message>, Iterable<EventData>>() {
@Override
public Iterable<EventData> apply(Collection<Message> amqpMessages) {
PassByRef<Message> lastMessageRef = null;
PassByRef<MessageWrapper> lastMessageRef = null;
if (PartitionReceiverImpl.this.receiverOptions != null && PartitionReceiverImpl.this.receiverOptions.getReceiverRuntimeMetricEnabled())
lastMessageRef = new PassByRef<>();

final Iterable<EventData> events = EventDataUtil.toEventDataCollection(amqpMessages, lastMessageRef);

if (lastMessageRef != null && lastMessageRef.get() != null) {

final DeliveryAnnotations deliveryAnnotations = lastMessageRef.get().getDeliveryAnnotations();
final DeliveryAnnotations deliveryAnnotations = lastMessageRef.get().getMessage().getDeliveryAnnotations();
if (deliveryAnnotations != null && deliveryAnnotations.getValue() != null) {

final Map<Symbol, Object> deliveryAnnotationsMap = deliveryAnnotations.getValue();
PartitionReceiverImpl.this.runtimeInformation.setRuntimeInformation(
(long) deliveryAnnotationsMap.get(ClientConstants.LAST_ENQUEUED_SEQUENCE_NUMBER),
((Date) deliveryAnnotationsMap.get(ClientConstants.LAST_ENQUEUED_TIME_UTC)).toInstant(),
(String) deliveryAnnotationsMap.get(ClientConstants.LAST_ENQUEUED_OFFSET));
}

PartitionReceiverImpl.this.currentEventPosition = lastMessageRef.get().getEventPosition();
}

return events;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,11 @@ public void testRuntimeMetricsReturnedWhenEnabled() throws EventHubException {

LinkedList<EventData> receivedEventsWithOptions = new LinkedList<>();
while (receivedEventsWithOptions.size() < sentEvents)
for (EventData eData : receiverWithOptions.receiveSync(sentEvents))
for (EventData eData : receiverWithOptions.receiveSync(1)) {
receivedEventsWithOptions.add(eData);
Assert.assertEquals((Long) eData.getSystemProperties().getSequenceNumber(),
receiverWithOptions.getEventPosition().getSequenceNumber());
}

HashSet<String> offsets = new HashSet<>();
for (EventData eData : receivedEventsWithOptions)
Expand Down

0 comments on commit 71f8562

Please sign in to comment.