Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: implement support for event upcasters, fix #193 #195

Merged
merged 8 commits into from
Nov 3, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.axonframework.extensions.kafka.eventhandling.producer.KafkaPublisher;
import org.axonframework.extensions.kafka.eventhandling.producer.ProducerFactory;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.upcasting.event.EventUpcasterChain;
import org.axonframework.spring.config.AxonConfiguration;
import org.axonframework.springboot.autoconfig.AxonAutoConfiguration;
import org.axonframework.springboot.autoconfig.InfraConfiguration;
Expand Down Expand Up @@ -84,9 +85,11 @@ public KafkaAutoConfiguration(KafkaProperties properties) {
@Bean
@ConditionalOnMissingBean
public KafkaMessageConverter<String, byte[]> kafkaMessageConverter(
@Qualifier("eventSerializer") Serializer eventSerializer
@Qualifier("eventSerializer") Serializer eventSerializer,
AxonConfiguration config
) {
return DefaultKafkaMessageConverter.builder().serializer(eventSerializer).build();
return DefaultKafkaMessageConverter.builder().serializer(eventSerializer).upcasterChain(
zambrovski marked this conversation as resolved.
Show resolved Hide resolved
config.upcasterChain() != null ? config.upcasterChain() : new EventUpcasterChain()).build();
}

@Bean("axonKafkaProducerFactory")
Expand Down Expand Up @@ -123,9 +126,10 @@ private boolean isNonEmptyString(String s) {
@ConditionalOnMissingBean
@Bean(destroyMethod = "shutDown")
@ConditionalOnBean({ProducerFactory.class, KafkaMessageConverter.class})
public KafkaPublisher<String, byte[]> kafkaPublisher(ProducerFactory<String, byte[]> axonKafkaProducerFactory,
KafkaMessageConverter<String, byte[]> kafkaMessageConverter,
AxonConfiguration configuration) {
public KafkaPublisher<String, byte[]> kafkaPublisher(
ProducerFactory<String, byte[]> axonKafkaProducerFactory,
KafkaMessageConverter<String, byte[]> kafkaMessageConverter,
AxonConfiguration configuration) {
return KafkaPublisher.<String, byte[]>builder()
.producerFactory(axonKafkaProducerFactory)
.messageConverter(kafkaMessageConverter)
Expand All @@ -138,9 +142,10 @@ public KafkaPublisher<String, byte[]> kafkaPublisher(ProducerFactory<String, byt
@Bean
@ConditionalOnMissingBean
@ConditionalOnBean({KafkaPublisher.class})
public KafkaEventPublisher<String, byte[]> kafkaEventPublisher(KafkaPublisher<String, byte[]> kafkaPublisher,
KafkaProperties kafkaProperties,
EventProcessingConfigurer eventProcessingConfigurer) {
public KafkaEventPublisher<String, byte[]> kafkaEventPublisher(
KafkaPublisher<String, byte[]> kafkaPublisher,
KafkaProperties kafkaProperties,
EventProcessingConfigurer eventProcessingConfigurer) {
KafkaEventPublisher<String, byte[]> kafkaEventPublisher =
KafkaEventPublisher.<String, byte[]>builder().kafkaPublisher(kafkaPublisher).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.eventhandling.EventData;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericDomainEventEntry;
import org.axonframework.eventhandling.GenericDomainEventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.async.SequencingPolicy;
Expand All @@ -31,27 +33,34 @@
import org.axonframework.serialization.SerializedMessage;
import org.axonframework.serialization.SerializedObject;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.SimpleSerializedObject;
import org.axonframework.serialization.upcasting.event.EventUpcasterChain;
import org.axonframework.serialization.upcasting.event.InitialEventRepresentation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.Arrays;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.stream.Stream;

import static org.axonframework.common.BuilderUtils.assertNonNull;
import static org.axonframework.extensions.kafka.eventhandling.HeaderUtils.*;
import static org.axonframework.messaging.Headers.*;

/**
* Converts and {@link EventMessage} to a {@link ProducerRecord} Kafka message and {from a @link ConsumerRecord} Kafka
* Converts and {@link EventMessage} to a {@link ProducerRecord} Kafka message and from a {@link ConsumerRecord} Kafka
zambrovski marked this conversation as resolved.
Show resolved Hide resolved
* message back to an EventMessage (if possible).
* <p>
* During conversion meta data entries with the {@code 'axon-metadata-'} prefix are passed to the {@link Headers}. Other
* message-specific attributes are added as metadata. The {@link EventMessage#getPayload()} is serialized using the
* configured {@link Serializer} and passed as the Kafka recordd's body.
* configured {@link Serializer} and passed as the Kafka record's body.
* <p>
* <p>
* If an up-caster / up-caster chain is configured, this converter will pass the converted messages through the it.
zambrovski marked this conversation as resolved.
Show resolved Hide resolved
* Please note, that the since the message converter consumes records one-by-one, the up-casting functionality is
zambrovski marked this conversation as resolved.
Show resolved Hide resolved
* limited to one-to-one and one-to-many up-casters only.
* </p>
* This implementation will suffice in most cases.
*
* @author Nakul Mishra
Expand All @@ -65,6 +74,7 @@ public class DefaultKafkaMessageConverter implements KafkaMessageConverter<Strin
private final Serializer serializer;
private final SequencingPolicy<? super EventMessage<?>> sequencingPolicy;
private final BiFunction<String, Object, RecordHeader> headerValueMapper;
private final EventUpcasterChain upcasterChain;

/**
* Instantiate a {@link DefaultKafkaMessageConverter} based on the fields contained in the {@link Builder}.
Expand All @@ -80,6 +90,7 @@ protected DefaultKafkaMessageConverter(Builder builder) {
this.serializer = builder.serializer;
this.sequencingPolicy = builder.sequencingPolicy;
this.headerValueMapper = builder.headerValueMapper;
this.upcasterChain = builder.upcasterChain;
}

/**
Expand Down Expand Up @@ -131,43 +142,93 @@ public Optional<EventMessage<?>> readKafkaMessage(ConsumerRecord<String, byte[]>
Headers headers = consumerRecord.headers();
if (isAxonMessage(headers)) {
byte[] messageBody = consumerRecord.value();
SerializedMessage<?> message = extractSerializedMessage(headers, messageBody);
return buildMessage(headers, message);
final EventData<?> eventData = createEventData(headers, messageBody);
return upcasterChain
.upcast(Stream.of(new InitialEventRepresentation(eventData, serializer)))
.findFirst()
.map(upcastedEventData -> new SerializedMessage<>(
upcastedEventData.getMessageIdentifier(),
new LazyDeserializingObject<>(upcastedEventData.getData(), serializer),
upcastedEventData.getMetaData()
)
).flatMap(serializedMessage -> buildMessage(headers, serializedMessage));
}
} catch (Exception e) {
logger.trace("Error converting ConsumerRecord [{}] to an EventMessage", consumerRecord, e);
}

return Optional.empty();
}

private boolean isAxonMessage(Headers headers) {
return keys(headers).containsAll(Arrays.asList(MESSAGE_ID, MESSAGE_TYPE));
}

private SerializedMessage<?> extractSerializedMessage(Headers headers, byte[] messageBody) {
SimpleSerializedObject<byte[]> serializedObject = new SimpleSerializedObject<>(
messageBody,
byte[].class,
/**
* Constructs event data representation from given Kafka headers and byte array body.
* <p>
* This method <i>reuses</i> the {@link GenericDomainEventEntry} class for both types of events which can be
* transmitted via Kafka. For domain events, the fields <code>aggregateType</code>, <code>aggregateId</code> and
* <code>aggregateSeq</code> will contain the corresponding values, but for the simple event they will be
* <code>null</code>. This is ok to pass <code>null</code> to those values and <code>0L</code> to
* <code>aggregateSeq</code>, since the {@link InitialEventRepresentation} does the same in its constructor and
* is implemented in a null-tolerant way. Check {@link DefaultKafkaMessageConverter#isDomainEvent(Headers)} for more
* details.
* </p>
*
* @param headers Kafka headers.
* @param messageBody Kafka payload as a byte array.
* @return event data.
*/
private EventData<?> createEventData(Headers headers, byte[] messageBody) {
return new GenericDomainEventEntry<>(
valueAsString(headers, AGGREGATE_TYPE),
valueAsString(headers, AGGREGATE_ID),
valueAsLong(headers, AGGREGATE_SEQ, 0L),
valueAsString(headers, MESSAGE_ID),
valueAsLong(headers, MESSAGE_TIMESTAMP),
valueAsString(headers, MESSAGE_TYPE),
valueAsString(headers, MESSAGE_REVISION, null)
valueAsString(headers, MESSAGE_REVISION, null),
messageBody,
extractMetadataAsBytes(headers)
);
}

return new SerializedMessage<>(
valueAsString(headers, MESSAGE_ID),
new LazyDeserializingObject<>(serializedObject, serializer),
new LazyDeserializingObject<>(MetaData.from(extractAxonMetadata(headers)))
);
private byte[] extractMetadataAsBytes(Headers headers) {
return serializer.serialize(MetaData.from(extractAxonMetadata(headers)), byte[].class).getData();
}

private Optional<EventMessage<?>> buildMessage(Headers headers, SerializedMessage<?> message) {
private static boolean isAxonMessage(Headers headers) {
return keys(headers).containsAll(Arrays.asList(MESSAGE_ID, MESSAGE_TYPE));
}

/**
* Checks if the event is originated from an aggregate (domain event) or is a simple event sent over the bus.
* <p>
* The difference between a DomainEventMessage and an EventMessage, is the following three fields:
* <ul>
* <li>The type - represents the Aggregate the event originates from. It would be empty for an EventMessage and
* filled for a DomainEventMessage.</li>
* <li>The aggregateIdentifier - represents the Aggregate instance the event originates from. It would be equal
* to the eventIdentifier for an EventMessage and not equal to that identifier a DomainEventMessage.</li>
* <li>The sequenceNumber - represents the order of the events within an Aggregate instance's event stream.
* It would be 0 at all times for an EventMessage, whereas a DomainEventMessage would be 0 or greater.</li>
* </ul>
* </p>
*
* @param headers Kafka headers.
* @return <code>true</code> if the event is originated from an aggregate.
*/
private static boolean isDomainEvent(Headers headers) {
return headers.lastHeader(AGGREGATE_TYPE) != null
&& headers.lastHeader(AGGREGATE_ID) != null
&& headers.lastHeader(AGGREGATE_SEQ) != null;
}

private static Optional<EventMessage<?>> buildMessage(Headers headers, SerializedMessage<?> message) {
long timestamp = valueAsLong(headers, MESSAGE_TIMESTAMP);
return headers.lastHeader(AGGREGATE_ID) != null
? buildDomainEvent(headers, message, timestamp)
: buildEvent(message, timestamp);
return isDomainEvent(headers)
? buildDomainEventMessage(headers, message, timestamp)
: buildEventMessage(message, timestamp);
}

private Optional<EventMessage<?>> buildDomainEvent(Headers headers, SerializedMessage<?> message, long timestamp) {
private static Optional<EventMessage<?>> buildDomainEventMessage(Headers headers, SerializedMessage<?> message,
long timestamp) {
return Optional.of(new GenericDomainEventMessage<>(
valueAsString(headers, AGGREGATE_TYPE),
valueAsString(headers, AGGREGATE_ID),
Expand All @@ -177,7 +238,7 @@ private Optional<EventMessage<?>> buildDomainEvent(Headers headers, SerializedMe
));
}

private Optional<EventMessage<?>> buildEvent(SerializedMessage<?> message, long timestamp) {
private static Optional<EventMessage<?>> buildEventMessage(SerializedMessage<?> message, long timestamp) {
return Optional.of(new GenericEventMessage<>(message, () -> Instant.ofEpochMilli(timestamp)));
}

Expand All @@ -193,6 +254,7 @@ public static class Builder {
private Serializer serializer;
private SequencingPolicy<? super EventMessage<?>> sequencingPolicy = SequentialPerAggregatePolicy.instance();
private BiFunction<String, Object, RecordHeader> headerValueMapper = byteMapper();
private EventUpcasterChain upcasterChain = new EventUpcasterChain();

/**
* Sets the serializer to serialize the Event Message's payload with.
Expand Down Expand Up @@ -234,6 +296,18 @@ public Builder headerValueMapper(BiFunction<String, Object, RecordHeader> header
return this;
}

/**
* Sets the {@code upcasterChain} to be used during the consumption of events.
*
* @param upcasterChain upcaster chain to be used on event reading.
* @return the current Builder instance, for fluent interfacing
*/
public Builder upcasterChain(EventUpcasterChain upcasterChain) {
assertNonNull(upcasterChain, "UpcasterChain must not be null");
this.upcasterChain = upcasterChain;
return this;
}

/**
* Initializes a {@link DefaultKafkaMessageConverter} as specified through this Builder.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,20 @@ public static Long valueAsLong(Headers headers, String key) {
return asLong(value(headers, key));
}

/**
* Return a {@link Long} representation of the {@code value} stored under a given {@code key} inside the {@link
* Headers}. In case of a missing entry {@code null} is returned.
zambrovski marked this conversation as resolved.
Show resolved Hide resolved
*
* @param headers the Kafka {@code headers} to pull the {@link Long} value from
* @param key the key corresponding to the expected {@link Long} value
* @param defaultValue the default value to return when {@code key} does not exist in the given {@code headers}
* @return the value as a {@link Long} corresponding to the given {@code key} in the {@code headers}
*/
public static Long valueAsLong(Headers headers, String key, Long defaultValue) {
zambrovski marked this conversation as resolved.
Show resolved Hide resolved
Long value = asLong(value(headers, key));
return value != null ? value : defaultValue;
}

/**
* Converts bytes to {@link String}.
*
Expand Down
Loading