From d8853dde4058e50ff0bd3e8700a92ced135f7b97 Mon Sep 17 00:00:00 2001 From: Francisco Javier Tirado Sarti Date: Fri, 4 Oct 2024 11:16:16 +0200 Subject: [PATCH] [Fix apache/incubator-kie-issues#1457] Converter deserialization right --- .../messaging/KogitoIndexEventConverter.java | 114 +++++++++++++- .../KogitoIndexEventConverterTest.java | 149 +++++++++++++++++- 2 files changed, 260 insertions(+), 3 deletions(-) diff --git a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverter.java b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverter.java index 4ce5f3554f..cef45271e0 100644 --- a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverter.java +++ b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverter.java @@ -20,17 +20,47 @@ import java.io.IOException; import java.lang.reflect.Type; +import java.util.Collection; +import java.util.function.Supplier; import org.eclipse.microprofile.reactive.messaging.Message; import org.kie.kogito.event.AbstractDataEvent; +import org.kie.kogito.event.DataEvent; +import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent; import org.kie.kogito.event.process.ProcessDefinitionDataEvent; import org.kie.kogito.event.process.ProcessDefinitionEventBody; +import org.kie.kogito.event.process.ProcessInstanceDataEvent; +import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent; +import org.kie.kogito.event.process.ProcessInstanceErrorEventBody; +import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent; +import org.kie.kogito.event.process.ProcessInstanceNodeEventBody; +import org.kie.kogito.event.process.ProcessInstanceSLADataEvent; +import org.kie.kogito.event.process.ProcessInstanceSLAEventBody; +import org.kie.kogito.event.process.ProcessInstanceStateDataEvent; +import org.kie.kogito.event.process.ProcessInstanceStateEventBody; +import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent; +import org.kie.kogito.event.process.ProcessInstanceVariableEventBody; +import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent; +import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentDataEvent; +import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentEventBody; +import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentDataEvent; +import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentEventBody; +import org.kie.kogito.event.usertask.UserTaskInstanceCommentDataEvent; +import org.kie.kogito.event.usertask.UserTaskInstanceCommentEventBody; +import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent; +import org.kie.kogito.event.usertask.UserTaskInstanceDeadlineDataEvent; +import org.kie.kogito.event.usertask.UserTaskInstanceDeadlineEventBody; +import org.kie.kogito.event.usertask.UserTaskInstanceStateDataEvent; +import org.kie.kogito.event.usertask.UserTaskInstanceStateEventBody; +import org.kie.kogito.event.usertask.UserTaskInstanceVariableDataEvent; +import org.kie.kogito.event.usertask.UserTaskInstanceVariableEventBody; import org.kie.kogito.index.event.KogitoJobCloudEvent; import org.kie.kogito.index.model.Job; import org.kie.kogito.index.service.DataIndexServiceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import io.cloudevents.CloudEvent; @@ -55,6 +85,11 @@ public class KogitoIndexEventConverter implements MessageConverter { ObjectMapper objectMapper; + @Override + public int getPriority() { + return CONVERTER_DEFAULT_PRIORITY - 2; + } + @Override public boolean canConvert(Message message, Type type) { return isIndexable(type) && @@ -62,7 +97,9 @@ public boolean canConvert(Message message, Type type) { } private boolean isIndexable(Type type) { - return type == ProcessDefinitionDataEvent.class + return type == ProcessInstanceDataEvent.class + || type == ProcessDefinitionDataEvent.class + || type == UserTaskInstanceDataEvent.class || type == KogitoJobCloudEvent.class; } @@ -78,8 +115,12 @@ public Message convert(Message message, Type type) { MessageReader messageReader = VertxMessageFactory.createReader(httpHeaders, buffer); cloudEvent = messageReader.toEvent(); - if (type.getTypeName().equals(KogitoJobCloudEvent.class.getTypeName())) { + if (type.getTypeName().equals(ProcessInstanceDataEvent.class.getTypeName())) { + return message.withPayload(buildProcessInstanceDataEventVariant(cloudEvent)); + } else if (type.getTypeName().equals(KogitoJobCloudEvent.class.getTypeName())) { return message.withPayload(buildKogitoJobCloudEvent(cloudEvent)); + } else if (type.getTypeName().equals(UserTaskInstanceDataEvent.class.getTypeName())) { + return message.withPayload(buildUserTaskInstanceDataEvent(cloudEvent)); } else if (type.getTypeName().equals(ProcessDefinitionDataEvent.class.getTypeName())) { return message.withPayload(buildProcessDefinitionEvent(cloudEvent)); } @@ -105,6 +146,48 @@ public void setObjectMapper(ObjectMapper objectMapper) { this.objectMapper = objectMapper; } + private DataEvent buildProcessInstanceDataEventVariant(CloudEvent cloudEvent) throws IOException { + switch (cloudEvent.getType()) { + case MultipleProcessInstanceDataEvent.TYPE: + return buildDataEvent(cloudEvent, objectMapper, MultipleProcessInstanceDataEvent::new, new TypeReference>>() { + }); + case "ProcessInstanceErrorDataEvent": + return buildDataEvent(cloudEvent, objectMapper, ProcessInstanceErrorDataEvent::new, ProcessInstanceErrorEventBody.class); + case "ProcessInstanceNodeDataEvent": + return buildDataEvent(cloudEvent, objectMapper, ProcessInstanceNodeDataEvent::new, ProcessInstanceNodeEventBody.class); + case "ProcessInstanceSLADataEvent": + return buildDataEvent(cloudEvent, objectMapper, ProcessInstanceSLADataEvent::new, ProcessInstanceSLAEventBody.class); + case "ProcessInstanceStateDataEvent": + return buildDataEvent(cloudEvent, objectMapper, ProcessInstanceStateDataEvent::new, ProcessInstanceStateEventBody.class); + case "ProcessInstanceVariableDataEvent": + return buildDataEvent(cloudEvent, objectMapper, ProcessInstanceVariableDataEvent::new, ProcessInstanceVariableEventBody.class); + default: + throw new IllegalArgumentException("Unknown ProcessInstanceDataEvent variant: " + cloudEvent.getType()); + } + } + + private DataEvent buildUserTaskInstanceDataEvent(CloudEvent cloudEvent) throws IOException { + switch (cloudEvent.getType()) { + case MultipleUserTaskInstanceDataEvent.TYPE: + return buildDataEvent(cloudEvent, objectMapper, MultipleUserTaskInstanceDataEvent::new, new TypeReference>>() { + }); + case "UserTaskInstanceAssignmentDataEvent": + return buildDataEvent(cloudEvent, objectMapper, UserTaskInstanceAssignmentDataEvent::new, UserTaskInstanceAssignmentEventBody.class); + case "UserTaskInstanceAttachmentDataEvent": + return buildDataEvent(cloudEvent, objectMapper, UserTaskInstanceAttachmentDataEvent::new, UserTaskInstanceAttachmentEventBody.class); + case "UserTaskInstanceCommentDataEvent": + return buildDataEvent(cloudEvent, objectMapper, UserTaskInstanceCommentDataEvent::new, UserTaskInstanceCommentEventBody.class); + case "UserTaskInstanceDeadlineDataEvent": + return buildDataEvent(cloudEvent, objectMapper, UserTaskInstanceDeadlineDataEvent::new, UserTaskInstanceDeadlineEventBody.class); + case "UserTaskInstanceStateDataEvent": + return buildDataEvent(cloudEvent, objectMapper, UserTaskInstanceStateDataEvent::new, UserTaskInstanceStateEventBody.class); + case "UserTaskInstanceVariableDataEvent": + return buildDataEvent(cloudEvent, objectMapper, UserTaskInstanceVariableDataEvent::new, UserTaskInstanceVariableEventBody.class); + default: + throw new IllegalArgumentException("Unknown UserTaskInstanceDataEvent variant: " + cloudEvent.getType()); + } + } + private KogitoJobCloudEvent buildKogitoJobCloudEvent(CloudEvent cloudEvent) throws IOException { KogitoJobCloudEvent jobCloudEvent = new KogitoJobCloudEvent(); jobCloudEvent.setId(cloudEvent.getId()); @@ -120,6 +203,29 @@ private KogitoJobCloudEvent buildKogitoJobCloudEvent(CloudEvent cloudEvent) thro return jobCloudEvent; } + private static , T> E buildDataEvent(CloudEvent cloudEvent, ObjectMapper objectMapper, Supplier supplier, TypeReference typeReference) throws IOException { + E dataEvent = buildEvent(cloudEvent, objectMapper, supplier); + if (cloudEvent.getData() != null) { + dataEvent.setData(objectMapper.readValue(cloudEvent.getData().toBytes(), typeReference)); + } + return dataEvent; + } + + private static , T> E buildDataEvent(CloudEvent cloudEvent, ObjectMapper objectMapper, Supplier supplier, Class clazz) throws IOException { + E dataEvent = buildEvent(cloudEvent, objectMapper, supplier); + if (cloudEvent.getData() != null) { + dataEvent.setData(objectMapper.readValue(cloudEvent.getData().toBytes(), clazz)); + } + return dataEvent; + } + + private static > E buildEvent(CloudEvent cloudEvent, ObjectMapper objectMapper, Supplier supplier) throws IOException { + E dataEvent = supplier.get(); + applyCloudEventAttributes(cloudEvent, dataEvent); + applyExtensions(cloudEvent, dataEvent); + return dataEvent; + } + private static void applyCloudEventAttributes(CloudEvent cloudEvent, AbstractDataEvent dataEvent) { dataEvent.setSpecVersion(cloudEvent.getSpecVersion()); dataEvent.setId(cloudEvent.getId()); @@ -130,4 +236,8 @@ private static void applyCloudEventAttributes(CloudEvent cloudEvent, AbstractDat dataEvent.setSubject(cloudEvent.getSubject()); dataEvent.setTime(cloudEvent.getTime()); } + + private static void applyExtensions(CloudEvent cloudEvent, AbstractDataEvent dataEvent) { + cloudEvent.getExtensionNames().forEach(extensionName -> dataEvent.addExtensionAttribute(extensionName, cloudEvent.getExtension(extensionName))); + } } diff --git a/data-index/data-index-service/data-index-service-common/src/test/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverterTest.java b/data-index/data-index-service/data-index-service-common/src/test/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverterTest.java index b34bce21e4..c9ce83ace9 100644 --- a/data-index/data-index-service/data-index-service-common/src/test/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverterTest.java +++ b/data-index/data-index-service/data-index-service-common/src/test/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverterTest.java @@ -18,21 +18,32 @@ */ package org.kie.kogito.index.service.messaging; +import java.io.IOException; +import java.io.UncheckedIOException; import java.net.URI; import java.time.OffsetDateTime; +import java.util.AbstractMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.Metadata; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.kie.kogito.event.process.NodeDefinition; import org.kie.kogito.event.process.ProcessDefinitionDataEvent; import org.kie.kogito.event.process.ProcessInstanceDataEvent; +import org.kie.kogito.event.process.ProcessInstanceStateDataEvent; import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent; +import org.kie.kogito.event.usertask.UserTaskInstanceStateDataEvent; import org.kie.kogito.index.event.KogitoJobCloudEvent; import org.kie.kogito.index.json.JsonUtils; import org.kie.kogito.index.json.ObjectMapperProducer; import org.kie.kogito.index.model.Job; +import org.kie.kogito.index.model.Node; +import org.kie.kogito.jackson.utils.ObjectMapperFactory; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -61,6 +72,8 @@ @ExtendWith(MockitoExtension.class) class KogitoIndexEventConverterTest { + private static final String PROCESS_INSTANCE_STATE_EVENT_TYPE = "ProcessInstanceStateDataEvent"; + private static final String USER_TASK_INSTANCE_STATE_EVENT_TYPE = "UserTaskInstanceStateDataEvent"; private static final String JOB_EVENT_TYPE = "JobEvent"; private static final String EVENT_ID = "ID"; private static final URI EVENT_SOURCE = URI.create("http://localhost:8080/travels"); @@ -68,7 +81,9 @@ class KogitoIndexEventConverterTest { private static final URI EVENT_DATA_SCHEMA = URI.create("http://my_event_data_schema/my_schema.json"); private static final String EVENT_DATA_CONTENT_TYPE = "application/json; charset=utf-8"; private static final String EVENT_SUBJECT = "SUBJECT"; - + private static final String STRUCTURED_PROCESS_INSTANCE_CLOUD_EVENT = "process_instance_event.json"; + private static final String BINARY_PROCESS_INSTANCE_CLOUD_EVENT_DATA = "binary_process_instance_event_data.json"; + private static final String BINARY_USER_TASK_INSTANCE_CLOUD_EVENT_DATA = "binary_user_task_instance_state_event_data.json"; private static final String BINARY_KOGITO_JOB_CLOUD_EVENT_DATA = "binary_job_event_data.json"; private static final String STRUCTURED_PROCESS_DEFINITION_CLOUD_EVENT = "process_definition_event.json"; private static final String BINARY_PROCESS_DEFINITION_CLOUD_EVENT = "binary_process_definition_event.json"; @@ -92,11 +107,17 @@ void setUp() { void canConvertBufferPayload() { Buffer buffer = Buffer.buffer("{}"); Message message = Message.of(buffer, Metadata.of(httpMetadata)); + assertThat(converter.canConvert(message, ProcessInstanceDataEvent.class)).isTrue(); + assertThat(converter.canConvert(message, UserTaskInstanceDataEvent.class)).isTrue(); assertThat(converter.canConvert(message, KogitoJobCloudEvent.class)).isTrue(); } @Test void canConvertNotBufferPayload() { + assertThat(converter.canConvert(Message.of(new ProcessInstanceDataEvent<>(), Metadata.of(httpMetadata)), + ProcessInstanceDataEvent.class)).isFalse(); + assertThat(converter.canConvert(Message.of(new UserTaskInstanceDataEvent<>(), Metadata.of(httpMetadata)), + UserTaskInstanceDataEvent.class)).isFalse(); assertThat(converter.canConvert(Message.of(KogitoJobCloudEvent.builder().build(), Metadata.of(httpMetadata)), KogitoJobCloudEvent.class)).isFalse(); } @@ -129,6 +150,55 @@ void convertBinaryProcessDefinitionDataEvent() throws Exception { assertThat(cloudEvent.getSubject()).isEqualTo(EVENT_SUBJECT); } + private static Map getMetadata() { + return toStringMap( + Map.of("Description", "JSON based greeting workflow", + "annotations", getAnnotations(), + "Tags", getAnnotations())); + } + + private List getNodes() { + try { + List nodes = ObjectMapperFactory.get() + .readerForListOf(NodeDefinition.class) + .readValue(readFileContent("nodes_definitions.json")); + return nodes.stream().map(definition -> { + Node node = new Node(); + node.setId(definition.getId()); + node.setName(definition.getName()); + node.setUniqueId(definition.getUniqueId()); + node.setType(definition.getType()); + node.setMetadata(toStringMap(definition.getMetadata())); + return node; + }).collect(Collectors.toList()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static List getAnnotations() { + return List.of("test1", "test2", "test3"); + } + + private static Map toStringMap(Map input) { + if (input == null) { + return null; + } + return input.entrySet().stream() + .map(entry -> { + if (String.class.isInstance(entry.getValue())) { + return entry; + } + String value = null; + try { + value = JsonUtils.getObjectMapper().writeValueAsString(entry.getValue()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return new AbstractMap.SimpleEntry<>(entry.getKey(), value); + }).collect(Collectors.toMap(Map.Entry::getKey, e -> (String) e.getValue())); + } + @Test void convertStructuredProcessDefinitionDataEvent() throws Exception { Buffer buffer = Buffer.buffer(readFileContent(STRUCTURED_PROCESS_DEFINITION_CLOUD_EVENT)); @@ -148,6 +218,54 @@ void convertStructuredProcessDefinitionDataEvent() throws Exception { assertThat(cloudEvent.getTime()).isEqualTo("2023-10-19T10:18:01.540311-03:00"); } + @Test + void convertBinaryProcessInstanceDataEvent() throws Exception { + Buffer buffer = Buffer.buffer(readFileContent(BINARY_PROCESS_INSTANCE_CLOUD_EVENT_DATA)); + Message message = Message.of(buffer, Metadata.of(httpMetadata)); + + // set ce-xxx headers for the binary format. + headers.add(ceHeader(SPECVERSION), SpecVersion.V1.toString()); + headers.add(ceHeader(ID), EVENT_ID); + headers.add(ceHeader(SOURCE), EVENT_SOURCE.toString()); + headers.add(ceHeader(TYPE), PROCESS_INSTANCE_STATE_EVENT_TYPE); + headers.add(ceHeader(TIME), EVENT_TIME.toString()); + headers.add(ceHeader(DATASCHEMA), EVENT_DATA_SCHEMA.toString()); + headers.add(ceHeader(DATACONTENTTYPE), EVENT_DATA_CONTENT_TYPE); + headers.add(ceHeader(SUBJECT), EVENT_SUBJECT); + + Message result = converter.convert(message, ProcessInstanceDataEvent.class); + assertThat(result.getPayload()).isInstanceOf(ProcessInstanceDataEvent.class); + ProcessInstanceStateDataEvent cloudEvent = (ProcessInstanceStateDataEvent) result.getPayload(); + + assertThat(cloudEvent.getId()).isEqualTo(EVENT_ID); + assertThat(cloudEvent.getSpecVersion().toString()).isEqualTo(SpecVersion.V1.toString()); + assertThat(cloudEvent.getSource().toString()).isEqualTo(EVENT_SOURCE.toString()); + assertThat(cloudEvent.getType()).isEqualTo(PROCESS_INSTANCE_STATE_EVENT_TYPE); + assertThat(cloudEvent.getTime()).isEqualTo(EVENT_TIME); + assertThat(cloudEvent.getDataSchema()).isEqualTo(EVENT_DATA_SCHEMA); + assertThat(cloudEvent.getDataContentType()).isEqualTo(EVENT_DATA_CONTENT_TYPE); + assertThat(cloudEvent.getSubject()).isEqualTo(EVENT_SUBJECT); + } + + @Test + void convertStructuredProcessInstanceDataEvent() throws Exception { + Buffer buffer = Buffer.buffer(readFileContent(STRUCTURED_PROCESS_INSTANCE_CLOUD_EVENT)); + Message message = Message.of(buffer, Metadata.of(httpMetadata)); + + // set ce header for the structured format. + headers.add(HttpHeaders.CONTENT_TYPE, "application/cloudevents+json"); + + Message result = converter.convert(message, ProcessInstanceDataEvent.class); + assertThat(result.getPayload()).isInstanceOf(ProcessInstanceDataEvent.class); + ProcessInstanceDataEvent cloudEvent = (ProcessInstanceDataEvent) result.getPayload(); + + assertThat(cloudEvent.getId()).isEqualTo("867ff7b4-2e49-49b3-882a-76f65a2c4124"); + assertThat(cloudEvent.getSpecVersion().toString()).isEqualTo(SpecVersion.V1.toString()); + assertThat(cloudEvent.getSource().toString()).isEqualTo(EVENT_SOURCE.toString()); + assertThat(cloudEvent.getType()).isEqualTo(PROCESS_INSTANCE_STATE_EVENT_TYPE); + assertThat(cloudEvent.getTime()).isEqualTo(EVENT_TIME); + } + @Test void convertBinaryKogitoJobCloudEvent() throws Exception { Buffer buffer = Buffer.buffer(readFileContent(BINARY_KOGITO_JOB_CLOUD_EVENT_DATA)); @@ -194,6 +312,35 @@ void convertBinaryKogitoJobCloudEvent() throws Exception { assertThat(job.getExecutionCounter()).isEqualTo(0); } + @Test + void convertBinaryUserTaskInstanceDataEvent() throws Exception { + Buffer buffer = Buffer.buffer(readFileContent(BINARY_USER_TASK_INSTANCE_CLOUD_EVENT_DATA)); + Message message = Message.of(buffer, Metadata.of(httpMetadata)); + + // set ce-xxx headers for the binary format. + headers.add(ceHeader(SPECVERSION), SpecVersion.V1.toString()); + headers.add(ceHeader(ID), EVENT_ID); + headers.add(ceHeader(SOURCE), EVENT_SOURCE.toString()); + headers.add(ceHeader(TYPE), USER_TASK_INSTANCE_STATE_EVENT_TYPE); + headers.add(ceHeader(TIME), EVENT_TIME.toString()); + headers.add(ceHeader(DATASCHEMA), EVENT_DATA_SCHEMA.toString()); + headers.add(ceHeader(DATACONTENTTYPE), EVENT_DATA_CONTENT_TYPE); + headers.add(ceHeader(SUBJECT), EVENT_SUBJECT); + + Message result = converter.convert(message, UserTaskInstanceDataEvent.class); + assertThat(result.getPayload()).isInstanceOf(UserTaskInstanceStateDataEvent.class); + UserTaskInstanceStateDataEvent cloudEvent = (UserTaskInstanceStateDataEvent) result.getPayload(); + + assertThat(cloudEvent.getId()).isEqualTo(EVENT_ID); + assertThat(cloudEvent.getSpecVersion()).isEqualTo(SpecVersion.V1); + assertThat(cloudEvent.getSource().toString()).isEqualTo(EVENT_SOURCE.toString()); + assertThat(cloudEvent.getType()).isEqualTo(USER_TASK_INSTANCE_STATE_EVENT_TYPE); + assertThat(cloudEvent.getTime()).isEqualTo(EVENT_TIME); + assertThat(cloudEvent.getDataSchema()).isEqualTo(EVENT_DATA_SCHEMA); + assertThat(cloudEvent.getDataContentType()).isEqualTo(EVENT_DATA_CONTENT_TYPE); + assertThat(cloudEvent.getSubject()).isEqualTo(EVENT_SUBJECT); + } + @Test void convertFailureBinaryUnexpectedBufferContent() { Buffer buffer = Buffer.buffer("unexpected Content");