Skip to content

Commit

Permalink
[Fix apache/incubator-kie-issues#1457] Converter deserialization right
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Oct 4, 2024
1 parent 3ac2bd3 commit d8853dd
Show file tree
Hide file tree
Showing 2 changed files with 260 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,47 @@

import java.io.IOException;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.function.Supplier;

import org.eclipse.microprofile.reactive.messaging.Message;
import org.kie.kogito.event.AbstractDataEvent;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessDefinitionDataEvent;
import org.kie.kogito.event.process.ProcessDefinitionEventBody;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent;
import org.kie.kogito.event.process.ProcessInstanceErrorEventBody;
import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
import org.kie.kogito.event.process.ProcessInstanceNodeEventBody;
import org.kie.kogito.event.process.ProcessInstanceSLADataEvent;
import org.kie.kogito.event.process.ProcessInstanceSLAEventBody;
import org.kie.kogito.event.process.ProcessInstanceStateDataEvent;
import org.kie.kogito.event.process.ProcessInstanceStateEventBody;
import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent;
import org.kie.kogito.event.process.ProcessInstanceVariableEventBody;
import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentEventBody;
import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentEventBody;
import org.kie.kogito.event.usertask.UserTaskInstanceCommentDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceCommentEventBody;
import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceDeadlineDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceDeadlineEventBody;
import org.kie.kogito.event.usertask.UserTaskInstanceStateDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceStateEventBody;
import org.kie.kogito.event.usertask.UserTaskInstanceVariableDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceVariableEventBody;
import org.kie.kogito.index.event.KogitoJobCloudEvent;
import org.kie.kogito.index.model.Job;
import org.kie.kogito.index.service.DataIndexServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

import io.cloudevents.CloudEvent;
Expand All @@ -55,14 +85,21 @@ public class KogitoIndexEventConverter implements MessageConverter {

ObjectMapper objectMapper;

@Override
public int getPriority() {
return CONVERTER_DEFAULT_PRIORITY - 2;
}

@Override
public boolean canConvert(Message<?> message, Type type) {
return isIndexable(type) &&
(message.getPayload() instanceof Buffer);
}

private boolean isIndexable(Type type) {
return type == ProcessDefinitionDataEvent.class
return type == ProcessInstanceDataEvent.class
|| type == ProcessDefinitionDataEvent.class
|| type == UserTaskInstanceDataEvent.class
|| type == KogitoJobCloudEvent.class;
}

Expand All @@ -78,8 +115,12 @@ public Message<?> convert(Message<?> message, Type type) {
MessageReader messageReader = VertxMessageFactory.createReader(httpHeaders, buffer);
cloudEvent = messageReader.toEvent();

if (type.getTypeName().equals(KogitoJobCloudEvent.class.getTypeName())) {
if (type.getTypeName().equals(ProcessInstanceDataEvent.class.getTypeName())) {
return message.withPayload(buildProcessInstanceDataEventVariant(cloudEvent));
} else if (type.getTypeName().equals(KogitoJobCloudEvent.class.getTypeName())) {
return message.withPayload(buildKogitoJobCloudEvent(cloudEvent));
} else if (type.getTypeName().equals(UserTaskInstanceDataEvent.class.getTypeName())) {
return message.withPayload(buildUserTaskInstanceDataEvent(cloudEvent));
} else if (type.getTypeName().equals(ProcessDefinitionDataEvent.class.getTypeName())) {
return message.withPayload(buildProcessDefinitionEvent(cloudEvent));
}
Expand All @@ -105,6 +146,48 @@ public void setObjectMapper(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}

private DataEvent<?> buildProcessInstanceDataEventVariant(CloudEvent cloudEvent) throws IOException {
switch (cloudEvent.getType()) {
case MultipleProcessInstanceDataEvent.TYPE:
return buildDataEvent(cloudEvent, objectMapper, MultipleProcessInstanceDataEvent::new, new TypeReference<Collection<ProcessInstanceDataEvent<?>>>() {
});
case "ProcessInstanceErrorDataEvent":
return buildDataEvent(cloudEvent, objectMapper, ProcessInstanceErrorDataEvent::new, ProcessInstanceErrorEventBody.class);
case "ProcessInstanceNodeDataEvent":
return buildDataEvent(cloudEvent, objectMapper, ProcessInstanceNodeDataEvent::new, ProcessInstanceNodeEventBody.class);
case "ProcessInstanceSLADataEvent":
return buildDataEvent(cloudEvent, objectMapper, ProcessInstanceSLADataEvent::new, ProcessInstanceSLAEventBody.class);
case "ProcessInstanceStateDataEvent":
return buildDataEvent(cloudEvent, objectMapper, ProcessInstanceStateDataEvent::new, ProcessInstanceStateEventBody.class);
case "ProcessInstanceVariableDataEvent":
return buildDataEvent(cloudEvent, objectMapper, ProcessInstanceVariableDataEvent::new, ProcessInstanceVariableEventBody.class);
default:
throw new IllegalArgumentException("Unknown ProcessInstanceDataEvent variant: " + cloudEvent.getType());
}
}

private DataEvent<?> buildUserTaskInstanceDataEvent(CloudEvent cloudEvent) throws IOException {
switch (cloudEvent.getType()) {
case MultipleUserTaskInstanceDataEvent.TYPE:
return buildDataEvent(cloudEvent, objectMapper, MultipleUserTaskInstanceDataEvent::new, new TypeReference<Collection<UserTaskInstanceDataEvent<?>>>() {
});
case "UserTaskInstanceAssignmentDataEvent":
return buildDataEvent(cloudEvent, objectMapper, UserTaskInstanceAssignmentDataEvent::new, UserTaskInstanceAssignmentEventBody.class);
case "UserTaskInstanceAttachmentDataEvent":
return buildDataEvent(cloudEvent, objectMapper, UserTaskInstanceAttachmentDataEvent::new, UserTaskInstanceAttachmentEventBody.class);
case "UserTaskInstanceCommentDataEvent":
return buildDataEvent(cloudEvent, objectMapper, UserTaskInstanceCommentDataEvent::new, UserTaskInstanceCommentEventBody.class);
case "UserTaskInstanceDeadlineDataEvent":
return buildDataEvent(cloudEvent, objectMapper, UserTaskInstanceDeadlineDataEvent::new, UserTaskInstanceDeadlineEventBody.class);
case "UserTaskInstanceStateDataEvent":
return buildDataEvent(cloudEvent, objectMapper, UserTaskInstanceStateDataEvent::new, UserTaskInstanceStateEventBody.class);
case "UserTaskInstanceVariableDataEvent":
return buildDataEvent(cloudEvent, objectMapper, UserTaskInstanceVariableDataEvent::new, UserTaskInstanceVariableEventBody.class);
default:
throw new IllegalArgumentException("Unknown UserTaskInstanceDataEvent variant: " + cloudEvent.getType());
}
}

private KogitoJobCloudEvent buildKogitoJobCloudEvent(CloudEvent cloudEvent) throws IOException {
KogitoJobCloudEvent jobCloudEvent = new KogitoJobCloudEvent();
jobCloudEvent.setId(cloudEvent.getId());
Expand All @@ -120,6 +203,29 @@ private KogitoJobCloudEvent buildKogitoJobCloudEvent(CloudEvent cloudEvent) thro
return jobCloudEvent;
}

private static <E extends AbstractDataEvent<T>, T> E buildDataEvent(CloudEvent cloudEvent, ObjectMapper objectMapper, Supplier<E> supplier, TypeReference<T> typeReference) throws IOException {
E dataEvent = buildEvent(cloudEvent, objectMapper, supplier);
if (cloudEvent.getData() != null) {
dataEvent.setData(objectMapper.readValue(cloudEvent.getData().toBytes(), typeReference));
}
return dataEvent;
}

private static <E extends AbstractDataEvent<T>, T> E buildDataEvent(CloudEvent cloudEvent, ObjectMapper objectMapper, Supplier<E> supplier, Class<T> clazz) throws IOException {
E dataEvent = buildEvent(cloudEvent, objectMapper, supplier);
if (cloudEvent.getData() != null) {
dataEvent.setData(objectMapper.readValue(cloudEvent.getData().toBytes(), clazz));
}
return dataEvent;
}

private static <E extends AbstractDataEvent<?>> E buildEvent(CloudEvent cloudEvent, ObjectMapper objectMapper, Supplier<E> supplier) throws IOException {
E dataEvent = supplier.get();
applyCloudEventAttributes(cloudEvent, dataEvent);
applyExtensions(cloudEvent, dataEvent);
return dataEvent;
}

private static void applyCloudEventAttributes(CloudEvent cloudEvent, AbstractDataEvent<?> dataEvent) {
dataEvent.setSpecVersion(cloudEvent.getSpecVersion());
dataEvent.setId(cloudEvent.getId());
Expand All @@ -130,4 +236,8 @@ private static void applyCloudEventAttributes(CloudEvent cloudEvent, AbstractDat
dataEvent.setSubject(cloudEvent.getSubject());
dataEvent.setTime(cloudEvent.getTime());
}

private static void applyExtensions(CloudEvent cloudEvent, AbstractDataEvent<?> dataEvent) {
cloudEvent.getExtensionNames().forEach(extensionName -> dataEvent.addExtensionAttribute(extensionName, cloudEvent.getExtension(extensionName)));
}
}
Loading

0 comments on commit d8853dd

Please sign in to comment.