Skip to content

Commit

Permalink
EH SDK + serializer integration (#13089)
Browse files Browse the repository at this point in the history
* create draft PR

* fix object batch

* temp object serializer

* dep on core experimental

* temp

* getDeserializedObject()

* null object in object batch test

* remove old SR dep

* remove temporary interfaces

* rename objectSerializer builder method to serializer

* single class imports

* fix serializer builder javadoc

* rename abstract batch impl

* add EventDataBatchBase javadoc

* mono TryAdd

* fix modifiers

* remove send mode

* fix object batch javadoc

* partition event deserialize to async

* uncomment azure core

* javadoc syntax

* remove temp interface

* only core-experimental, not core

* checkstyle

* update partition event deserialize

* monoError for object batch

* rename objectSerializer to serializer

* add serializer null check in object batch creation

* cleanup
  • Loading branch information
arerlend authored Jul 13, 2020
1 parent 65a33d3 commit 0a1dca4
Show file tree
Hide file tree
Showing 22 changed files with 675 additions and 315 deletions.
4 changes: 2 additions & 2 deletions sdk/eventhubs/azure-messaging-eventhubs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.6.0</version> <!-- {x-version-update;com.azure:azure-core;dependency} -->
<artifactId>azure-core-experimental</artifactId>
<version>1.0.0-beta.1</version> <!-- {x-version-update;com.azure:azure-core-experimental;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,9 @@

package com.azure.messaging.eventhubs;

import com.azure.core.amqp.AmqpMessageConstant;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.AmqpConstants;
import com.azure.core.amqp.implementation.ErrorContextProvider;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.ProcessKind;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.message.Message;
import reactor.core.publisher.Signal;

import java.nio.BufferOverflowException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;

import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY;
import static com.azure.core.util.tracing.Tracer.DIAGNOSTIC_ID_KEY;
import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY;
import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY;
import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY;
import static com.azure.messaging.eventhubs.implementation.ClientConstants.AZ_NAMESPACE_VALUE;

/**
* A class for aggregating {@link EventData} into a single, size-limited, batch. It is treated as a single message when
Expand All @@ -45,59 +16,10 @@
* @see EventHubClientBuilder See EventHubClientBuilder for examples of building an asynchronous or synchronous
* producer.
*/
public final class EventDataBatch {
private final ClientLogger logger = new ClientLogger(EventDataBatch.class);
private final Object lock = new Object();
private final int maxMessageSize;
private final String partitionKey;
private final ErrorContextProvider contextProvider;
private final List<EventData> events;
private final byte[] eventBytes;
private final String partitionId;
private int sizeInBytes;
private final TracerProvider tracerProvider;
private final String entityPath;
private final String hostname;

public final class EventDataBatch extends EventDataBatchBase {
EventDataBatch(int maxMessageSize, String partitionId, String partitionKey, ErrorContextProvider contextProvider,
TracerProvider tracerProvider, String entityPath, String hostname) {
this.maxMessageSize = maxMessageSize;
this.partitionKey = partitionKey;
this.partitionId = partitionId;
this.contextProvider = contextProvider;
this.events = new LinkedList<>();
this.sizeInBytes = (maxMessageSize / 65536) * 1024; // reserve 1KB for every 64KB
this.eventBytes = new byte[maxMessageSize];
this.tracerProvider = tracerProvider;
this.entityPath = entityPath;
this.hostname = hostname;
}

/**
* Gets the number of {@link EventData events} in the batch.
*
* @return The number of {@link EventData events} in the batch.
*/
public int getCount() {
return events.size();
}

/**
* Gets the maximum size, in bytes, of the {@link EventDataBatch}.
*
* @return The maximum size, in bytes, of the {@link EventDataBatch}.
*/
public int getMaxSizeInBytes() {
return maxMessageSize;
}

/**
* Gets the size of the {@link EventDataBatch} in bytes.
*
* @return the size of the {@link EventDataBatch} in bytes.
*/
public int getSizeInBytes() {
return this.sizeInBytes;
super(maxMessageSize, partitionId, partitionKey, contextProvider, tracerProvider, entityPath, hostname);
}

/**
Expand All @@ -110,177 +32,6 @@ public int getSizeInBytes() {
* @throws AmqpException if {@code eventData} is larger than the maximum size of the {@link EventDataBatch}.
*/
public boolean tryAdd(final EventData eventData) {
if (eventData == null) {
throw logger.logExceptionAsWarning(new IllegalArgumentException("eventData cannot be null"));
}
EventData event = tracerProvider.isEnabled() ? traceMessageSpan(eventData) : eventData;

final int size;
try {
size = getSize(event, events.isEmpty());
} catch (BufferOverflowException exception) {
throw logger.logExceptionAsWarning(new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED,
String.format(Locale.US, "Size of the payload exceeded maximum message size: %s kb",
maxMessageSize / 1024),
contextProvider.getErrorContext()));
}

synchronized (lock) {
if (this.sizeInBytes + size > this.maxMessageSize) {
return false;
}

this.sizeInBytes += size;
}

this.events.add(event);
return true;
}

/**
* Method to start and end a "Azure.EventHubs.message" span and add the "DiagnosticId" as a property of the message.
*
* @param eventData The Event to add tracing span for.
* @return the updated event data object.
*/
private EventData traceMessageSpan(EventData eventData) {
Optional<Object> eventContextData = eventData.getContext().getData(SPAN_CONTEXT_KEY);
if (eventContextData.isPresent()) {
// if message has context (in case of retries), don't start a message span or add a new context
return eventData;
} else {
// Starting the span makes the sampling decision (nothing is logged at this time)
Context eventContext = eventData.getContext()
.addData(AZ_TRACING_NAMESPACE_KEY, AZ_NAMESPACE_VALUE)
.addData(ENTITY_PATH_KEY, this.entityPath)
.addData(HOST_NAME_KEY, this.hostname);
Context eventSpanContext = tracerProvider.startSpan(eventContext, ProcessKind.MESSAGE);
Optional<Object> eventDiagnosticIdOptional = eventSpanContext.getData(DIAGNOSTIC_ID_KEY);
if (eventDiagnosticIdOptional.isPresent()) {
eventData.getProperties().put(DIAGNOSTIC_ID_KEY, eventDiagnosticIdOptional.get().toString());
tracerProvider.endSpan(eventSpanContext, Signal.complete());
eventData.addContext(SPAN_CONTEXT_KEY, eventSpanContext);
}
}

return eventData;
}

List<EventData> getEvents() {
return events;
}

String getPartitionKey() {
return partitionKey;
}

String getPartitionId() {
return partitionId;
}

private int getSize(final EventData eventData, final boolean isFirst) {
Objects.requireNonNull(eventData, "'eventData' cannot be null.");

final Message amqpMessage = createAmqpMessage(eventData, partitionKey);
int eventSize = amqpMessage.encode(this.eventBytes, 0, maxMessageSize); // actual encoded bytes size
eventSize += 16; // data section overhead

if (isFirst) {
amqpMessage.setBody(null);
amqpMessage.setApplicationProperties(null);
amqpMessage.setProperties(null);
amqpMessage.setDeliveryAnnotations(null);

eventSize += amqpMessage.encode(this.eventBytes, 0, maxMessageSize);
}

return eventSize;
}

/*
* Creates the AMQP message represented by the event data
*/
private Message createAmqpMessage(EventData event, String partitionKey) {
final Message message = Proton.message();

if (event.getProperties() != null && !event.getProperties().isEmpty()) {
final ApplicationProperties applicationProperties = new ApplicationProperties(event.getProperties());
message.setApplicationProperties(applicationProperties);
}

if (event.getSystemProperties() != null) {
event.getSystemProperties().forEach((key, value) -> {
if (EventData.RESERVED_SYSTEM_PROPERTIES.contains(key)) {
return;
}

final AmqpMessageConstant constant = AmqpMessageConstant.fromString(key);

if (constant != null) {
switch (constant) {
case MESSAGE_ID:
message.setMessageId(value);
break;
case USER_ID:
message.setUserId((byte[]) value);
break;
case TO:
message.setAddress((String) value);
break;
case SUBJECT:
message.setSubject((String) value);
break;
case REPLY_TO:
message.setReplyTo((String) value);
break;
case CORRELATION_ID:
message.setCorrelationId(value);
break;
case CONTENT_TYPE:
message.setContentType((String) value);
break;
case CONTENT_ENCODING:
message.setContentEncoding((String) value);
break;
case ABSOLUTE_EXPIRY_TIME:
message.setExpiryTime((long) value);
break;
case CREATION_TIME:
message.setCreationTime((long) value);
break;
case GROUP_ID:
message.setGroupId((String) value);
break;
case GROUP_SEQUENCE:
message.setGroupSequence((long) value);
break;
case REPLY_TO_GROUP_ID:
message.setReplyToGroupId((String) value);
break;
default:
throw logger.logExceptionAsWarning(new IllegalArgumentException(String.format(Locale.US,
"Property is not a recognized reserved property name: %s", key)));
}
} else {
final MessageAnnotations messageAnnotations = (message.getMessageAnnotations() == null)
? new MessageAnnotations(new HashMap<>())
: message.getMessageAnnotations();
messageAnnotations.getValue().put(Symbol.getSymbol(key), value);
message.setMessageAnnotations(messageAnnotations);
}
});
}

if (partitionKey != null) {
final MessageAnnotations messageAnnotations = (message.getMessageAnnotations() == null)
? new MessageAnnotations(new HashMap<>())
: message.getMessageAnnotations();
messageAnnotations.getValue().put(AmqpConstants.PARTITION_KEY, partitionKey);
message.setMessageAnnotations(messageAnnotations);
}

message.setBody(new Data(new Binary(event.getBody())));

return message;
return super.tryAdd(eventData);
}
}
Loading

0 comments on commit 0a1dca4

Please sign in to comment.