From 0be02c904e931a941dba51b0cb42d7d7fdb8e43a Mon Sep 17 00:00:00 2001 From: Francisco Javier Tirado Sarti Date: Thu, 24 Oct 2024 14:28:38 +0200 Subject: [PATCH] [Fix #3721] Refactoring KogitoIndexConverter --- .../KogitoEventBodySerializationHelper.java | 5 +- .../kie/kogito/event/DataEventFactory.java | 17 +++++ .../JacksonTypeCloudEventDataConverter.java | 44 +++++++++++++ .../MultipleProcessInstanceDataEvent.java | 1 - .../KogitoDataEventSerializationHelper.java | 7 +- ...sDataInstanceBeanDeserializerModifier.java | 2 + ...leProcessDataInstanceConverterFactory.java | 65 +++++++++++++++++++ ...eProcessInstanceDataEventDeserializer.java | 52 +++++++++++---- ...pleProcessInstanceDataEventSerializer.java | 33 ++++++++-- ...ocessInstanceDataEventExtensionRecord.java | 9 +++ .../event/process/ProcessEventsTest.java | 30 ++++++--- 11 files changed, 232 insertions(+), 33 deletions(-) create mode 100644 api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/JacksonTypeCloudEventDataConverter.java create mode 100644 api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceConverterFactory.java diff --git a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/KogitoEventBodySerializationHelper.java b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/KogitoEventBodySerializationHelper.java index 9d17f77d628..d00146eb9c2 100644 --- a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/KogitoEventBodySerializationHelper.java +++ b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/KogitoEventBodySerializationHelper.java @@ -237,10 +237,9 @@ public static void writeInteger(DataOutput out, Integer integer) throws IOExcept public static Integer readInteger(DataInput in) throws IOException { SerType type = readType(in); return type == SerType.NULL ? null : readInt(in, type); - } - private static void writeInt(DataOutput out, int size) throws IOException { + public static void writeInt(DataOutput out, int size) throws IOException { if (size < Byte.MAX_VALUE) { writeType(out, SerType.BYTE); out.writeByte((byte) size); @@ -253,7 +252,7 @@ private static void writeInt(DataOutput out, int size) throws IOException { } } - private static int readInt(DataInput in) throws IOException { + public static int readInt(DataInput in) throws IOException { SerType type = readType(in); return readInt(in, type); } diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/DataEventFactory.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/DataEventFactory.java index a52df39f9df..f2317a5d667 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/DataEventFactory.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/DataEventFactory.java @@ -18,6 +18,7 @@ */ package org.kie.kogito.event; +import java.io.IOException; import java.net.URI; import java.time.OffsetDateTime; import java.util.Optional; @@ -43,6 +44,22 @@ public static DataEvent from(CloudEvent event, Converter(event, dataUnmarshaller); } + public static , V> T from(T dataEvent, CloudEvent cloudEvent, Converter dataUnmarshaller) throws IOException { + dataEvent.setSpecVersion(cloudEvent.getSpecVersion()); + dataEvent.setId(cloudEvent.getId()); + dataEvent.setType(cloudEvent.getType()); + dataEvent.setSource(cloudEvent.getSource()); + dataEvent.setDataContentType(cloudEvent.getDataContentType()); + dataEvent.setDataSchema(cloudEvent.getDataSchema()); + dataEvent.setSubject(cloudEvent.getSubject()); + dataEvent.setTime(cloudEvent.getTime()); + cloudEvent.getExtensionNames().forEach(extensionName -> dataEvent.addExtensionAttribute(extensionName, cloudEvent.getExtension(extensionName))); + if (cloudEvent.getData() != null) { + dataEvent.setData(dataUnmarshaller.convert(cloudEvent.getData())); + } + return dataEvent; + } + public static DataEvent from(T eventData, String trigger, KogitoProcessInstance pi) { return from(eventData, trigger, URI.create("/process/" + pi.getProcessId()), Optional.empty(), ProcessMeta.fromKogitoProcessInstance(pi)); } diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/JacksonTypeCloudEventDataConverter.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/JacksonTypeCloudEventDataConverter.java new file mode 100644 index 00000000000..3e64b44a5db --- /dev/null +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/impl/JacksonTypeCloudEventDataConverter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.event.impl; + +import java.io.IOException; + +import org.kie.kogito.event.Converter; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.cloudevents.CloudEventData; + +public class JacksonTypeCloudEventDataConverter implements Converter { + + private ObjectMapper objectMapper; + private TypeReference outputType; + + public JacksonTypeCloudEventDataConverter(ObjectMapper objectMapper, TypeReference outputType) { + this.objectMapper = objectMapper; + this.outputType = outputType; + } + + @Override + public O convert(CloudEventData value) throws IOException { + return objectMapper.readValue(value.toBytes(), outputType); + } +} diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java index 377da5bf594..f29a920c132 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java @@ -42,5 +42,4 @@ public boolean isCompressed() { public void setCompressed(boolean compressed) { addExtensionAttribute(COMPRESS_DATA, compressed); } - } diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/KogitoDataEventSerializationHelper.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/KogitoDataEventSerializationHelper.java index baf4d2ad67a..f4e512eb223 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/KogitoDataEventSerializationHelper.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/KogitoDataEventSerializationHelper.java @@ -31,7 +31,7 @@ import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.*; -public class KogitoDataEventSerializationHelper { +class KogitoDataEventSerializationHelper { private KogitoDataEventSerializationHelper() { } @@ -49,7 +49,10 @@ static > T readCloudEventAttrs(DataInput in, T da data.setId(in.readUTF()); data.setSubject(readUTF(in)); data.setDataContentType(readUTF(in)); - data.setDataSchema(URI.create(readUTF(in))); + String dataSchema = readUTF(in); + if (dataSchema != null) { + data.setDataSchema(URI.create(dataSchema)); + } return data; } diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceBeanDeserializerModifier.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceBeanDeserializerModifier.java index 6ef181b975f..d72be4e5b57 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceBeanDeserializerModifier.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceBeanDeserializerModifier.java @@ -27,6 +27,8 @@ public class MultipleProcessDataInstanceBeanDeserializerModifier extends BeanDeserializerModifier { + private static final long serialVersionUID = 1L; + @Override public JsonDeserializer modifyDeserializer( DeserializationConfig config, BeanDescription beanDesc, JsonDeserializer deserializer) { diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceConverterFactory.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceConverterFactory.java new file mode 100644 index 00000000000..4f8a6205c20 --- /dev/null +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceConverterFactory.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.event.serializer; + +import java.io.IOException; +import java.util.Base64; +import java.util.Collection; + +import org.kie.kogito.event.Converter; +import org.kie.kogito.event.impl.JacksonTypeCloudEventDataConverter; +import org.kie.kogito.event.process.KogitoMarshallEventSupport; +import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent; +import org.kie.kogito.event.process.ProcessInstanceDataEvent; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; + +public class MultipleProcessDataInstanceConverterFactory { + + private MultipleProcessDataInstanceConverterFactory() { + } + + public static Converter>> fromCloudEvent(CloudEvent cloudEvent, ObjectMapper objectMapper) { + if (MultipleProcessInstanceDataEvent.BINARY_CONTENT_TYPE.equals(cloudEvent.getDataContentType())) { + return isCompressed(cloudEvent) ? compressedConverter : binaryConverter; + } else { + return new JacksonTypeCloudEventDataConverter<>(objectMapper, new TypeReference>>() { + }); + } + } + + private static boolean isCompressed(CloudEvent event) { + Object value = event.getExtension(MultipleProcessInstanceDataEvent.COMPRESS_DATA); + return value instanceof Boolean ? ((Boolean) value).booleanValue() : false; + } + + private static Converter>> binaryConverter = + data -> deserialize(data, false); + + private static Converter>> compressedConverter = + data -> deserialize(data, true); + + private static Collection> deserialize(CloudEventData data, boolean compress) throws IOException { + return MultipleProcessInstanceDataEventDeserializer.readFromBytes(Base64.getDecoder().decode(data.toBytes()), compress); + } +} diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventDeserializer.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventDeserializer.java index 1e5619be233..6e0b0f262c0 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventDeserializer.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventDeserializer.java @@ -28,10 +28,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.function.Supplier; import java.util.zip.GZIPInputStream; import org.kie.kogito.event.process.CloudEventVisitor; -import org.kie.kogito.event.process.KogitoEventBodySerializationHelper; import org.kie.kogito.event.process.KogitoMarshallEventSupport; import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent; import org.kie.kogito.event.process.ProcessInstanceDataEvent; @@ -45,6 +45,8 @@ import org.kie.kogito.event.process.ProcessInstanceStateEventBody; import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent; import org.kie.kogito.event.process.ProcessInstanceVariableEventBody; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.fasterxml.jackson.core.JacksonException; import com.fasterxml.jackson.core.JsonParser; @@ -56,8 +58,12 @@ import io.cloudevents.SpecVersion; +import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.readInt; + public class MultipleProcessInstanceDataEventDeserializer extends JsonDeserializer implements ResolvableDeserializer { + private static final Logger logger = LoggerFactory.getLogger(MultipleProcessInstanceDataEventDeserializer.class); + private JsonDeserializer defaultDeserializer; public MultipleProcessInstanceDataEventDeserializer(JsonDeserializer deserializer) { @@ -98,27 +104,34 @@ private static boolean isCompressed(JsonNode node) { return compress != null && compress.isBoolean() ? compress.asBoolean() : false; } - public static Collection> readFromBytes(byte[] binaryValue, boolean compressed) throws IOException { + static Collection> readFromBytes(byte[] binaryValue, boolean compressed) throws IOException { InputStream wrappedIn = new ByteArrayInputStream(binaryValue); if (compressed) { + logger.trace("Gzip compressed byte array"); wrappedIn = new GZIPInputStream(wrappedIn); } try (DataInputStream in = new DataInputStream(wrappedIn)) { - int size = in.readShort(); + int size = readInt(in); + logger.trace("Reading collection of size {}", size); Collection> result = new ArrayList<>(size); List infos = new ArrayList<>(); while (size-- > 0) { byte readInfo = in.readByte(); + logger.trace("Info ordinal is {}", readInfo); ProcessInstanceDataEventExtensionRecord info; if (readInfo == -1) { info = new ProcessInstanceDataEventExtensionRecord(); info.readEvent(in); + logger.trace("Info readed is {}", info); infos.add(info); } else { info = infos.get(readInfo); + logger.trace("Info cached is {}", info); } String type = in.readUTF(); + logger.trace("Type is {}", info); result.add(getCloudEvent(in, type, info)); + logger.trace("{} events remaining", size); } return result; } @@ -127,31 +140,44 @@ public static Collection getCloudEvent(DataInputStream in, String type, ProcessInstanceDataEventExtensionRecord info) throws IOException { switch (type) { case ProcessInstanceVariableDataEvent.VAR_TYPE: - ProcessInstanceVariableDataEvent item = buildDataEvent(in, new ProcessInstanceVariableDataEvent(), new ProcessInstanceVariableEventBody(), info); + ProcessInstanceVariableDataEvent item = buildDataEvent(in, new ProcessInstanceVariableDataEvent(), ProcessInstanceVariableEventBody::new, info); item.setKogitoVariableName(item.getData().getVariableName()); return item; case ProcessInstanceStateDataEvent.STATE_TYPE: - return buildDataEvent(in, new ProcessInstanceStateDataEvent(), new ProcessInstanceStateEventBody(), info); + return buildDataEvent(in, new ProcessInstanceStateDataEvent(), ProcessInstanceStateEventBody::new, info); case ProcessInstanceNodeDataEvent.NODE_TYPE: - return buildDataEvent(in, new ProcessInstanceNodeDataEvent(), new ProcessInstanceNodeEventBody(), info); + return buildDataEvent(in, new ProcessInstanceNodeDataEvent(), ProcessInstanceNodeEventBody::new, info); case ProcessInstanceErrorDataEvent.ERROR_TYPE: - return buildDataEvent(in, new ProcessInstanceErrorDataEvent(), new ProcessInstanceErrorEventBody(), info); + return buildDataEvent(in, new ProcessInstanceErrorDataEvent(), ProcessInstanceErrorEventBody::new, info); case ProcessInstanceSLADataEvent.SLA_TYPE: - return buildDataEvent(in, new ProcessInstanceSLADataEvent(), new ProcessInstanceSLAEventBody(), info); + return buildDataEvent(in, new ProcessInstanceSLADataEvent(), ProcessInstanceSLAEventBody::new, info); default: throw new UnsupportedOperationException("Unrecognized event type " + type); } } - private static , V extends KogitoMarshallEventSupport & CloudEventVisitor> T buildDataEvent(DataInput in, T cloudEvent, V body, + private static , V extends KogitoMarshallEventSupport & CloudEventVisitor> T buildDataEvent(DataInput in, T cloudEvent, Supplier bodySupplier, ProcessInstanceDataEventExtensionRecord info) throws IOException { - int delta = KogitoEventBodySerializationHelper.readInteger(in); + int delta = readInt(in); + logger.trace("Time delta is {}", delta); cloudEvent.setTime(info.getTime().plus(delta, ChronoUnit.MILLIS)); KogitoDataEventSerializationHelper.readCloudEventAttrs(in, cloudEvent); + logger.trace("Cloud event before population {}", cloudEvent); KogitoDataEventSerializationHelper.populateCloudEvent(cloudEvent, info); - body.readEvent(in); - body.visit(cloudEvent); - cloudEvent.setData(body); + logger.trace("Cloud event after population {}", cloudEvent); + + boolean isNotNull = in.readBoolean(); + if (isNotNull) { + logger.trace("Data is not null"); + V body = bodySupplier.get(); + body.readEvent(in); + logger.trace("Event body before population {}", body); + body.visit(cloudEvent); + logger.trace("Event body after population {}", body); + cloudEvent.setData(body); + } else { + logger.trace("Data is null"); + } return cloudEvent; } diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventSerializer.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventSerializer.java index 1c591079254..42825e9679c 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventSerializer.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventSerializer.java @@ -26,17 +26,22 @@ import java.util.Map; import java.util.zip.GZIPOutputStream; -import org.kie.kogito.event.process.KogitoEventBodySerializationHelper; import org.kie.kogito.event.process.KogitoMarshallEventSupport; import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent; import org.kie.kogito.event.process.ProcessInstanceDataEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JsonSerializer; import com.fasterxml.jackson.databind.SerializerProvider; +import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.writeInt; + public class MultipleProcessInstanceDataEventSerializer extends JsonSerializer { + private static final Logger logger = LoggerFactory.getLogger(MultipleProcessInstanceDataEventDeserializer.class); + private JsonSerializer defaultSerializer; public MultipleProcessInstanceDataEventSerializer(JsonSerializer serializer) { @@ -67,23 +72,41 @@ public void serialize(MultipleProcessInstanceDataEvent value, JsonGenerator gen, private byte[] dataAsBytes(JsonGenerator gen, Collection> data, boolean compress) throws IOException { ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); try (DataOutputStream out = new DataOutputStream(compress ? new GZIPOutputStream(bytesOut) : bytesOut)) { - out.writeShort(data.size()); + logger.trace("Writing size {}", data.size()); + writeInt(out, data.size()); Map infos = new HashMap<>(); for (ProcessInstanceDataEvent cloudEvent : data) { String key = cloudEvent.getKogitoProcessInstanceId(); ProcessInstanceDataEventExtensionRecord info = infos.get(key); if (info == null) { - out.writeByte(-1); + logger.trace("Writing marker byte -1"); + out.writeByte((byte) -1); info = new ProcessInstanceDataEventExtensionRecord(infos.size(), cloudEvent); + logger.trace("Writing info", info); info.writeEvent(out); infos.put(key, info); } else { + logger.trace("Writing marker byte {}", info.getOrdinal()); out.writeByte((byte) info.getOrdinal()); } + logger.trace("Writing type {}", cloudEvent.getType()); out.writeUTF(cloudEvent.getType()); - KogitoEventBodySerializationHelper.writeInteger(out, cloudEvent.getTime().compareTo(info.getTime())); + int timeDelta = cloudEvent.getTime().compareTo(info.getTime()); + logger.trace("Writing time delta {}", timeDelta); + writeInt(out, timeDelta); + logger.trace("Writing cloud event attrs {}", cloudEvent); KogitoDataEventSerializationHelper.writeCloudEventAttrs(out, cloudEvent); - cloudEvent.getData().writeEvent(out); + KogitoMarshallEventSupport itemData = cloudEvent.getData(); + if (itemData != null) { + logger.trace("Writing data not null boolean"); + out.writeBoolean(true); + logger.trace("Writing cloud event body {}", itemData); + itemData.writeEvent(out); + } else { + logger.trace("Writing data null boolean"); + out.writeBoolean(false); + } + logger.trace("individual event writing completed"); } } return bytesOut.toByteArray(); diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/ProcessInstanceDataEventExtensionRecord.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/ProcessInstanceDataEventExtensionRecord.java index 42cbef5a21d..28abac1a281 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/ProcessInstanceDataEventExtensionRecord.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/ProcessInstanceDataEventExtensionRecord.java @@ -156,4 +156,13 @@ public void readEvent(DataInput in) throws IOException { source = URI.create(in.readUTF()); addons = readUTF(in); } + + @Override + public String toString() { + return "ProcessInstanceDataEventExtensionRecord [id=" + id + ", instanceId=" + instanceId + ", version=" + + version + ", state=" + state + ", type=" + type + ", parentInstanceId=" + parentInstanceId + + ", rootId=" + rootId + ", rootInstanceId=" + rootInstanceId + ", businessKey=" + businessKey + + ", identity=" + identity + ", source=" + source + ", time=" + time + ", addons=" + addons + "]"; + } + } diff --git a/api/kogito-events-core/src/test/java/org/kie/kogito/event/process/ProcessEventsTest.java b/api/kogito-events-core/src/test/java/org/kie/kogito/event/process/ProcessEventsTest.java index ac8ce694d14..d278e2c4279 100644 --- a/api/kogito-events-core/src/test/java/org/kie/kogito/event/process/ProcessEventsTest.java +++ b/api/kogito-events-core/src/test/java/org/kie/kogito/event/process/ProcessEventsTest.java @@ -28,17 +28,18 @@ import org.junit.jupiter.api.Test; import org.kie.kogito.event.AbstractDataEvent; +import org.kie.kogito.event.DataEventFactory; import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants; +import org.kie.kogito.event.serializer.MultipleProcessDataInstanceConverterFactory; import org.kie.kogito.event.usertask.UserTaskInstanceStateDataEvent; import org.kie.kogito.jackson.utils.JsonObjectUtils; -import org.kie.kogito.jackson.utils.ObjectMapperFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import io.cloudevents.CloudEvent; import io.cloudevents.SpecVersion; import io.cloudevents.jackson.JsonFormat; @@ -47,9 +48,9 @@ class ProcessEventsTest { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() - .registerModule(new JavaTimeModule()) .registerModule(JsonFormat.getCloudEventJacksonModule()) - .disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + .disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS) + .findAndRegisterModules(); private static final Logger logger = LoggerFactory.getLogger(ProcessEventsTest.class); @@ -128,7 +129,7 @@ void processInstanceDataEvent() throws Exception { @Test void multipleInstanceDataEvent() throws IOException { - JsonNode expectedVarValue = ObjectMapperFactory.get().createObjectNode().put("name", "John Doe"); + JsonNode expectedVarValue = OBJECT_MAPPER.createObjectNode().put("name", "John Doe"); int standard = processMultipleInstanceDataEvent(expectedVarValue, false, false); int binary = processMultipleInstanceDataEvent(expectedVarValue, true, false); int binaryCompressed = processMultipleInstanceDataEvent(expectedVarValue, true, true); @@ -184,10 +185,23 @@ private int processMultipleInstanceDataEvent(JsonNode expectedVarValue, boolean event.setCompressed(compress); } - byte[] json = ObjectMapperFactory.get().writeValueAsBytes(event); + byte[] json = OBJECT_MAPPER.writeValueAsBytes(event); logger.info("Serialized chunk size is {}", json.length); - MultipleProcessInstanceDataEvent deserializedEvent = ObjectMapperFactory.get().readValue(json, MultipleProcessInstanceDataEvent.class); + + // cloud event structured mode check + MultipleProcessInstanceDataEvent deserializedEvent = OBJECT_MAPPER.readValue(json, MultipleProcessInstanceDataEvent.class); + assertThat(deserializedEvent.getData()).hasSize(event.getData().size()); + assertMultipleIntance(deserializedEvent, expectedVarValue); + + // cloud event binary mode check + CloudEvent cloudEvent = OBJECT_MAPPER.readValue(json, CloudEvent.class); + deserializedEvent = DataEventFactory.from(new MultipleProcessInstanceDataEvent(), cloudEvent, MultipleProcessDataInstanceConverterFactory.fromCloudEvent(cloudEvent, OBJECT_MAPPER)); assertThat(deserializedEvent.getData()).hasSize(event.getData().size()); + assertMultipleIntance(deserializedEvent, expectedVarValue); + return json.length; + } + + private void assertMultipleIntance(MultipleProcessInstanceDataEvent deserializedEvent, JsonNode expectedVarValue) { Iterator> iter = deserializedEvent.getData().iterator(); ProcessInstanceStateDataEvent deserializedStateEvent = (ProcessInstanceStateDataEvent) iter.next(); @@ -215,8 +229,6 @@ private int processMultipleInstanceDataEvent(JsonNode expectedVarValue, boolean assertBaseEventValues(deserializedSLAEvent, ProcessInstanceSLADataEvent.SLA_TYPE); assertExtensionNames(deserializedSLAEvent, BASE_EXTENSION_NAMES); assertSLABody(deserializedSLAEvent.getData()); - - return json.length; } private void assertSLABody(ProcessInstanceSLAEventBody data) {