Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EH SDK + serializer integration #13089

Merged
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6e895f2
create draft PR
arerlend Jun 15, 2020
544a6e7
fix object batch
arerlend Jun 16, 2020
7e9ae15
temp object serializer
arerlend Jul 1, 2020
b5a1dec
Merge branch 'master' into arerlend.schemaregistry.eh-integration
arerlend Jul 1, 2020
82eb0ff
dep on core experimental
arerlend Jul 1, 2020
eda212d
temp
arerlend Jul 1, 2020
6a1d7e7
getDeserializedObject()
arerlend Jul 2, 2020
3560da2
null object in object batch test
arerlend Jul 2, 2020
7a7bce3
remove old SR dep
arerlend Jul 2, 2020
89e6cca
remove temporary interfaces
arerlend Jul 6, 2020
f3acc8c
rename objectSerializer builder method to serializer
arerlend Jul 6, 2020
c95145c
single class imports
arerlend Jul 6, 2020
4eed6e0
fix serializer builder javadoc
arerlend Jul 6, 2020
f78469a
rename abstract batch impl
arerlend Jul 6, 2020
c5d39fd
add EventDataBatchBase javadoc
arerlend Jul 6, 2020
7a0fc35
mono TryAdd
arerlend Jul 6, 2020
c9c1f29
fix modifiers
arerlend Jul 6, 2020
6ea6bb6
remove send mode
arerlend Jul 6, 2020
4fdfa93
fix object batch javadoc
arerlend Jul 6, 2020
b976d2d
partition event deserialize to async
arerlend Jul 6, 2020
114f68d
uncomment azure core
arerlend Jul 6, 2020
45c819a
javadoc syntax
arerlend Jul 6, 2020
533df1e
remove temp interface
arerlend Jul 6, 2020
570fbea
Merge branch 'master' into arerlend.schemaregistry.eh-integration
arerlend Jul 7, 2020
d295b83
only core-experimental, not core
arerlend Jul 7, 2020
28c5f63
checkstyle
arerlend Jul 7, 2020
2e66eb1
update partition event deserialize
arerlend Jul 7, 2020
8947c09
monoError for object batch
arerlend Jul 8, 2020
7496de8
rename objectSerializer to serializer
arerlend Jul 8, 2020
03693c9
add serializer null check in object batch creation
arerlend Jul 8, 2020
447522a
cleanup
arerlend Jul 8, 2020
aa67cbf
Merge remote-tracking branch 'upstream/master' into arerlend.schemare…
arerlend Jul 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update partition event deserialize
arerlend committed Jul 7, 2020

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 2e66eb11ab6b56896e089e6128800fd4b7328f8b
Original file line number Diff line number Diff line change
@@ -40,14 +40,14 @@ class EventHubPartitionAsyncConsumer implements AutoCloseable {
private final Scheduler scheduler;
private final EmitterProcessor<PartitionEvent> emitterProcessor;
private final EventPosition initialPosition;
private final ObjectSerializer objectSerializer;
private final ObjectSerializer serializer;

private volatile Long currentOffset;

EventHubPartitionAsyncConsumer(AmqpReceiveLinkProcessor amqpReceiveLinkProcessor,
MessageSerializer messageSerializer, String fullyQualifiedNamespace,
String eventHubName, String consumerGroup, String partitionId,
ObjectSerializer objectSerializer,
ObjectSerializer serializer,
AtomicReference<Supplier<EventPosition>> currentEventPosition,
boolean trackLastEnqueuedEventProperties, Scheduler scheduler) {
this.initialPosition = Objects.requireNonNull(currentEventPosition.get().get(),
@@ -58,7 +58,7 @@ class EventHubPartitionAsyncConsumer implements AutoCloseable {
this.eventHubName = eventHubName;
this.consumerGroup = consumerGroup;
this.partitionId = partitionId;
this.objectSerializer = objectSerializer;
this.serializer = serializer;
this.trackLastEnqueuedEventProperties = trackLastEnqueuedEventProperties;
this.scheduler = Objects.requireNonNull(scheduler, "'scheduler' cannot be null.");

@@ -143,6 +143,6 @@ private PartitionEvent onMessageReceived(Message message) {

final PartitionContext partitionContext = new PartitionContext(fullyQualifiedNamespace, eventHubName,
consumerGroup, partitionId);
return new PartitionEvent(partitionContext, event, lastEnqueuedEventProperties.get(), this.objectSerializer);
return new PartitionEvent(partitionContext, event, lastEnqueuedEventProperties.get(), this.serializer);
}
}
Original file line number Diff line number Diff line change
@@ -5,12 +5,15 @@

import com.azure.core.annotation.Immutable;
import com.azure.core.experimental.serializer.ObjectSerializer;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.EventData;
import reactor.core.publisher.Mono;

import java.io.ByteArrayInputStream;
import java.util.Objects;

import static com.azure.core.util.FluxUtil.monoError;

/**
* A container for {@link EventData} along with the partition information for this event data.
*/
@@ -20,25 +23,37 @@ public class PartitionEvent {
private final PartitionContext partitionContext;
private final EventData eventData;
private final LastEnqueuedEventProperties lastEnqueuedEventProperties;
private final ObjectSerializer objectSerializer;
private final ObjectSerializer serializer;
private final ClientLogger logger = new ClientLogger(PartitionEvent.class);

private Object deserialized;
/**
* Creates an instance of PartitionEvent.
*
* @param partitionContext The partition information associated with the event data.
* @param eventData The event data received from the Event Hub.
* @param lastEnqueuedEventProperties The properties of the last enqueued event in the partition.
* @throws NullPointerException if {@code partitionContext} or {@code eventData} is {@code null}.
*/
public PartitionEvent(final PartitionContext partitionContext, final EventData eventData,
LastEnqueuedEventProperties lastEnqueuedEventProperties) {
this(partitionContext, eventData, lastEnqueuedEventProperties, null);
}

/**
* Creates an instance of PartitionEvent.
*
* @param partitionContext The partition information associated with the event data.
* @param eventData The event data received from the Event Hub.
* @param lastEnqueuedEventProperties The properties of the last enqueued event in the partition.
* @param objectSerializer ObjectSerializer implementation for deserializing event data payload. May be null.
* @param serializer ObjectSerializer implementation for deserializing event data payload. May be null.
* @throws NullPointerException if {@code partitionContext} or {@code eventData} is {@code null}.
*/
public PartitionEvent(final PartitionContext partitionContext, final EventData eventData,
LastEnqueuedEventProperties lastEnqueuedEventProperties, ObjectSerializer objectSerializer) {
LastEnqueuedEventProperties lastEnqueuedEventProperties, ObjectSerializer serializer) {
this.partitionContext = Objects.requireNonNull(partitionContext, "'partitionContext' cannot be null");
this.eventData = Objects.requireNonNull(eventData, "'eventData' cannot be null");
this.lastEnqueuedEventProperties = lastEnqueuedEventProperties;
this.objectSerializer = objectSerializer;
this.serializer = serializer;
}

/**
@@ -76,17 +91,14 @@ public LastEnqueuedEventProperties getLastEnqueuedEventProperties() {
* @return deserialized object as type T
*/
public <T> Mono<T> getDeserializedObject(Class<T> objectType) {
Objects.requireNonNull(objectSerializer, "No serializer set for deserializing event data payload.");
Objects.requireNonNull(objectType, "objectType cannot be null.");

if (deserialized != null) {
if (objectType.isInstance(deserialized)) {
return Mono.just(objectType.cast(deserialized));
}
if (this.serializer == null) {
return monoError(logger,
new NullPointerException("No serializer set for deserializing EventData payload."));
}
if (objectType == null) {
return monoError(logger, new IllegalArgumentException("objectType cannot be null."));
}

Mono<T> objectMono = objectSerializer.deserialize(new ByteArrayInputStream(eventData.getBody()), objectType);
objectMono.subscribe(o -> deserialized = o);
return objectMono;
return serializer.deserialize(new ByteArrayInputStream(eventData.getBody()), objectType);
}
}
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.experimental.serializer.ObjectSerializer;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.AmqpReceiveLinkProcessor;
@@ -30,9 +31,12 @@
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import java.io.InputStream;
import java.io.OutputStream;
import java.time.Duration;
import java.time.Instant;
import java.util.Date;
@@ -224,6 +228,57 @@ void receiveMultipleTimes() {
Assertions.assertFalse(actual.isInclusive());
}

@Test
void receiveAndDeserialize() {
// just a test value
Object o = 0;
ObjectSerializer testSerializer = new ObjectSerializer() {
@Override
public <T> Mono<T> deserialize(InputStream stream, Class<T> clazz) {
if (clazz.isInstance(o)) {
return Mono.just(clazz.cast(o));
}
return null;
}

@Override
public <S extends OutputStream> Mono<S> serialize(S stream, Object value) {
return null;
}
};

// Arrange
linkProcessor = createSink(link1, link2).subscribeWith(new AmqpReceiveLinkProcessor(PREFETCH, retryPolicy, parentConnection));
consumer = new EventHubPartitionAsyncConsumer(linkProcessor, messageSerializer, HOSTNAME, EVENT_HUB_NAME,
CONSUMER_GROUP, PARTITION_ID, testSerializer, currentPosition, false, Schedulers.parallel());

final EventData event = new EventData("Foo");
final LastEnqueuedEventProperties last = new LastEnqueuedEventProperties(10L, 15L,
Instant.ofEpochMilli(1243454), Instant.ofEpochMilli(1240004));

when(messageSerializer.deserialize(same(message1), eq(EventData.class))).thenReturn(event);
when(messageSerializer.deserialize(same(message1), eq(LastEnqueuedEventProperties.class))).thenReturn(last);

// Act & Assert
StepVerifier.create(consumer.receive())
.then(() -> {
messageProcessorSink.next(message1);
})
.assertNext(partitionEvent -> {
verifyPartitionContext(partitionEvent.getPartitionContext());
verifyLastEnqueuedInformation(false, last,
partitionEvent.getLastEnqueuedEventProperties());
Assertions.assertSame(event, partitionEvent.getData());
Assertions.assertSame(Integer.class.cast(o), partitionEvent.getDeserializedObject(Integer.class).block());
})
.thenCancel()
.verify();

// The emitter processor is not closed until the partition consumer is.
Assertions.assertFalse(linkProcessor.isTerminated());
Assertions.assertSame(originalPosition, currentPosition.get().get());
}


/**
* Verifies that the consumer closes and completes any listeners on a shutdown signal.