diff --git a/api/kogito-events-api/pom.xml b/api/kogito-events-api/pom.xml index 63726ae8252..1bd9097e94e 100644 --- a/api/kogito-events-api/pom.xml +++ b/api/kogito-events-api/pom.xml @@ -47,10 +47,6 @@ - - io.cloudevents - cloudevents-core - org.junit.jupiter @@ -72,6 +68,10 @@ slf4j-simple test + + org.kie.kogito + kogito-jackson-utils + diff --git a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/CloudEventVisitor.java b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/CloudEventVisitor.java new file mode 100644 index 00000000000..1d6f2c1c599 --- /dev/null +++ b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/CloudEventVisitor.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.event.process; + +import org.kie.kogito.event.DataEvent; + +public interface CloudEventVisitor { + void visit(DataEvent cloudEvent); +} diff --git a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/KogitoEventBodySerializationHelper.java b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/KogitoEventBodySerializationHelper.java new file mode 100644 index 00000000000..9d17f77d628 --- /dev/null +++ b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/KogitoEventBodySerializationHelper.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.event.process; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Collection; +import java.util.Date; + +import org.kie.kogito.jackson.utils.ObjectMapperFactory; + +import com.fasterxml.jackson.databind.JsonNode; + +public class KogitoEventBodySerializationHelper { + + private KogitoEventBodySerializationHelper() { + } + + public static String readUTF(DataInput in) throws IOException { + boolean isNotNull = in.readBoolean(); + return isNotNull ? in.readUTF() : null; + } + + public static void writeUTF(DataOutput out, String string) throws IOException { + if (string == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeUTF(string); + } + } + + public static void writeDate(DataOutput out, Date date) throws IOException { + if (date == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeLong(date.getTime()); + } + } + + public static Date readDate(DataInput in) throws IOException { + boolean isNotNull = in.readBoolean(); + return isNotNull ? new Date(in.readLong()) : null; + } + + public static void writeTime(DataOutput out, OffsetDateTime date) throws IOException { + if (date == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeLong(date.toInstant().toEpochMilli()); + } + } + + public static OffsetDateTime readTime(DataInput in) throws IOException { + boolean isNotNull = in.readBoolean(); + return isNotNull ? Instant.ofEpochMilli(in.readLong()).atOffset(ZoneOffset.UTC) : null; + } + + public static void writeUTFCollection(DataOutput out, Collection collection) throws IOException { + if (collection == null) { + writeInt(out, -1); + } else { + writeInt(out, collection.size()); + for (String item : collection) { + writeUTF(out, item); + } + } + } + + public static > T readUTFCollection(DataInput in, T holder) throws IOException { + int size = readInt(in); + if (size == -1) { + return null; + } + while (size-- > 0) { + holder.add(readUTF(in)); + } + return holder; + } + + private enum SerType { + + NULL(KogitoEventBodySerializationHelper::writeNull, KogitoEventBodySerializationHelper::readNull), + JSON(KogitoEventBodySerializationHelper::writeJson, KogitoEventBodySerializationHelper::readJson), + DEFAULT(KogitoEventBodySerializationHelper::writeJson, KogitoEventBodySerializationHelper::readDefault), + STRING(KogitoEventBodySerializationHelper::writeString, DataInput::readUTF), + INT(KogitoEventBodySerializationHelper::writeInt, DataInput::readInt), + SHORT(KogitoEventBodySerializationHelper::writeShort, DataInput::readShort), + LONG(KogitoEventBodySerializationHelper::writeLong, DataInput::readLong), + BYTE(KogitoEventBodySerializationHelper::writeByte, DataInput::readByte), + BOOLEAN(KogitoEventBodySerializationHelper::writeBoolean, DataInput::readBoolean), + FLOAT(KogitoEventBodySerializationHelper::writeFloat, DataInput::readFloat), + DOUBLE(KogitoEventBodySerializationHelper::writeDouble, DataInput::readDouble); + + final ObjectWriter writer; + final ObjectReader reader; + + SerType(ObjectWriter writer, ObjectReader reader) { + this.writer = writer; + this.reader = reader; + } + + ObjectWriter writer() { + return writer; + } + + ObjectReader reader() { + return reader; + } + + static SerType fromType(Class type) { + if (JsonNode.class.isAssignableFrom(type)) { + return JSON; + } else if (String.class.isAssignableFrom(type)) { + return STRING; + } else if (Boolean.class.isAssignableFrom(type)) { + return BOOLEAN; + } else if (Integer.class.isAssignableFrom(type)) { + return INT; + } else if (Short.class.isAssignableFrom(type)) { + return SHORT; + } else if (Byte.class.isAssignableFrom(type)) { + return BYTE; + } else if (Long.class.isAssignableFrom(type)) { + return LONG; + } else if (Float.class.isAssignableFrom(type)) { + return FLOAT; + } else if (Double.class.isAssignableFrom(type)) { + return DOUBLE; + } else { + return DEFAULT; + } + } + + static SerType fromObject(Object obj) { + return obj == null ? NULL : fromType(obj.getClass()); + } + } + + private static void writeType(DataOutput out, SerType type) throws IOException { + out.writeByte(type.ordinal()); + } + + private static SerType readType(DataInput in) throws IOException { + return SerType.values()[in.readByte()]; + } + + public static void writeObject(DataOutput out, Object obj) throws IOException { + SerType type = SerType.fromObject(obj); + writeType(out, type); + type.writer().accept(out, obj); + } + + public static Object readObject(DataInput in) throws IOException { + return readType(in).reader().apply(in); + } + + @FunctionalInterface + private static interface ObjectWriter { + void accept(DataOutput out, Object obj) throws IOException; + } + + private static interface ObjectReader { + Object apply(DataInput out) throws IOException; + } + + private static void writeString(DataOutput out, Object obj) throws IOException { + out.writeUTF((String) obj); + } + + private static void writeBoolean(DataOutput out, Object obj) throws IOException { + out.writeBoolean((Boolean) obj); + } + + private static void writeInt(DataOutput out, Object obj) throws IOException { + out.writeInt((Integer) obj); + } + + private static void writeLong(DataOutput out, Object obj) throws IOException { + out.writeInt((Integer) obj); + } + + private static void writeShort(DataOutput out, Object obj) throws IOException { + out.writeShort((Short) obj); + } + + private static void writeByte(DataOutput out, Object obj) throws IOException { + out.writeByte((Byte) obj); + } + + private static void writeFloat(DataOutput out, Object obj) throws IOException { + out.writeFloat((Float) obj); + } + + private static void writeDouble(DataOutput out, Object obj) throws IOException { + out.writeDouble((Double) obj); + } + + private static void writeNull(DataOutput out, Object obj) { + // do nothing + } + + private static Object readNull(DataInput in) { + return null; + } + + public static void writeInteger(DataOutput out, Integer integer) throws IOException { + if (integer == null) { + writeType(out, SerType.NULL); + } else { + writeInt(out, integer.intValue()); + } + } + + public static Integer readInteger(DataInput in) throws IOException { + SerType type = readType(in); + return type == SerType.NULL ? null : readInt(in, type); + + } + + private static void writeInt(DataOutput out, int size) throws IOException { + if (size < Byte.MAX_VALUE) { + writeType(out, SerType.BYTE); + out.writeByte((byte) size); + } else if (size < Short.MAX_VALUE) { + writeType(out, SerType.SHORT); + out.writeShort((short) size); + } else { + writeType(out, SerType.INT); + out.writeInt(size); + } + } + + private static int readInt(DataInput in) throws IOException { + SerType type = readType(in); + return readInt(in, type); + } + + private static int readInt(DataInput in, SerType type) throws IOException { + switch (type) { + case INT: + return in.readInt(); + case SHORT: + return in.readShort(); + case BYTE: + return in.readByte(); + default: + throw new IOException("Stream corrupted. Read unrecognized type " + type); + } + } + + private static void writeJson(DataOutput out, Object obj) throws IOException { + byte[] bytes = ObjectMapperFactory.get().writeValueAsBytes(obj); + out.writeInt(bytes.length); + out.write(bytes); + } + + private static Object readJson(DataInput in) throws IOException { + return readJson(in, JsonNode.class); + } + + private static Object readDefault(DataInput in) throws IOException { + return readJson(in, Object.class); + } + + private static Object readJson(DataInput in, Class type) throws IOException { + byte[] bytes = new byte[in.readInt()]; + in.readFully(bytes); + return ObjectMapperFactory.get().readValue(bytes, type); + } + + public static Date toDate(OffsetDateTime time) { + return time == null ? null : Date.from(time.toInstant()); + } +} diff --git a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/KogitoMarshallEventSupport.java b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/KogitoMarshallEventSupport.java new file mode 100644 index 00000000000..76693a64c6c --- /dev/null +++ b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/KogitoMarshallEventSupport.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.event.process; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public interface KogitoMarshallEventSupport { + + void writeEvent(DataOutput out) throws IOException; + + void readEvent(DataInput in) throws IOException; +} diff --git a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceErrorEventBody.java b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceErrorEventBody.java index a6ea8e77855..032c8252a60 100644 --- a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceErrorEventBody.java +++ b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceErrorEventBody.java @@ -19,9 +19,16 @@ package org.kie.kogito.event.process; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.util.Date; -public class ProcessInstanceErrorEventBody { +import org.kie.kogito.event.DataEvent; + +import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.*; + +public class ProcessInstanceErrorEventBody implements KogitoMarshallEventSupport, CloudEventVisitor { // common fields for events private Date eventDate; @@ -138,4 +145,26 @@ public ProcessInstanceErrorEventBody build() { return instance; } } + + @Override + public void readEvent(DataInput in) throws IOException { + nodeDefinitionId = in.readUTF(); + nodeInstanceId = in.readUTF(); + errorMessage = in.readUTF(); + } + + @Override + public void writeEvent(DataOutput out) throws IOException { + out.writeUTF(nodeDefinitionId); + out.writeUTF(nodeInstanceId); + out.writeUTF(errorMessage); + } + + @Override + public void visit(DataEvent dataEvent) { + this.processId = dataEvent.getKogitoProcessId(); + this.processInstanceId = dataEvent.getKogitoProcessInstanceId(); + this.processVersion = dataEvent.getKogitoProcessInstanceVersion(); + this.eventDate = toDate(dataEvent.getTime()); + } } diff --git a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceNodeEventBody.java b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceNodeEventBody.java index ca6621c0b14..349a8280b86 100644 --- a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceNodeEventBody.java +++ b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceNodeEventBody.java @@ -18,12 +18,19 @@ */ package org.kie.kogito.event.process; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Objects; -public class ProcessInstanceNodeEventBody { +import org.kie.kogito.event.DataEvent; + +import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.*; + +public class ProcessInstanceNodeEventBody implements KogitoMarshallEventSupport, CloudEventVisitor { public static final int EVENT_TYPE_ENTER = 1; @@ -71,7 +78,42 @@ public class ProcessInstanceNodeEventBody { private Map data; - private ProcessInstanceNodeEventBody() { + @Override + public void writeEvent(DataOutput out) throws IOException { + writeInteger(out, eventType); + writeUTF(out, connectionNodeDefinitionId); + out.writeUTF(nodeDefinitionId); + writeUTF(out, nodeName); + out.writeUTF(nodeType); + out.writeUTF(nodeInstanceId); + writeUTF(out, workItemId); + writeDate(out, slaDueDate); + writeObject(out, data); + } + + @Override + public void readEvent(DataInput in) throws IOException { + eventType = readInteger(in); + connectionNodeDefinitionId = readUTF(in); + nodeDefinitionId = in.readUTF(); + nodeName = readUTF(in); + nodeType = in.readUTF(); + nodeInstanceId = in.readUTF(); + workItemId = readUTF(in); + slaDueDate = readDate(in); + data = (Map) readObject(in); + } + + @Override + public void visit(DataEvent dataEvent) { + this.processId = dataEvent.getKogitoProcessId(); + this.processInstanceId = dataEvent.getKogitoProcessInstanceId(); + this.processVersion = dataEvent.getKogitoProcessInstanceVersion(); + this.eventDate = toDate(dataEvent.getTime()); + this.eventUser = dataEvent.getKogitoIdentity(); + } + + public ProcessInstanceNodeEventBody() { this.data = new HashMap<>(); } @@ -246,5 +288,4 @@ public ProcessInstanceNodeEventBody build() { } } - } diff --git a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceSLAEventBody.java b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceSLAEventBody.java index 133c0e57155..d054a490b1f 100644 --- a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceSLAEventBody.java +++ b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceSLAEventBody.java @@ -18,9 +18,20 @@ */ package org.kie.kogito.event.process; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.util.Date; -public class ProcessInstanceSLAEventBody { +import org.kie.kogito.event.DataEvent; + +import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.readDate; +import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.readUTF; +import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.toDate; +import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.writeDate; +import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.writeUTF; + +public class ProcessInstanceSLAEventBody implements KogitoMarshallEventSupport, CloudEventVisitor { // common fields for events private Date eventDate; @@ -47,6 +58,34 @@ public class ProcessInstanceSLAEventBody { private Date slaDueDate; + @Override + public void writeEvent(DataOutput out) throws IOException { + out.writeUTF(nodeDefinitionId); + writeUTF(out, nodeName); + out.writeUTF(nodeType); + out.writeUTF(nodeInstanceId); + writeDate(out, slaDueDate); + + } + + @Override + public void readEvent(DataInput in) throws IOException { + nodeDefinitionId = in.readUTF(); + nodeName = readUTF(in); + nodeType = in.readUTF(); + nodeInstanceId = in.readUTF(); + slaDueDate = readDate(in); + } + + @Override + public void visit(DataEvent dataEvent) { + this.processId = dataEvent.getKogitoProcessId(); + this.processInstanceId = dataEvent.getKogitoProcessInstanceId(); + this.processVersion = dataEvent.getKogitoProcessInstanceVersion(); + this.eventDate = toDate(dataEvent.getTime()); + this.eventUser = dataEvent.getKogitoIdentity(); + } + public Date getSlaDueDate() { return slaDueDate; } diff --git a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceStateEventBody.java b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceStateEventBody.java index b9bb4fd9ad3..8c7c291110e 100644 --- a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceStateEventBody.java +++ b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceStateEventBody.java @@ -18,13 +18,21 @@ */ package org.kie.kogito.event.process; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.util.Date; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.Map; import java.util.Objects; import java.util.Set; -public class ProcessInstanceStateEventBody { +import org.kie.kogito.event.DataEvent; + +import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.*; + +public class ProcessInstanceStateEventBody implements KogitoMarshallEventSupport, CloudEventVisitor { public static final int EVENT_TYPE_STARTED = 1; public static final int EVENT_TYPE_ENDED = 2; @@ -65,6 +73,38 @@ public class ProcessInstanceStateEventBody { public Date slaDueDate; + @Override + public void writeEvent(DataOutput out) throws IOException { + writeInteger(out, eventType); + writeUTF(out, processName); + writeInteger(out, state); + writeUTFCollection(out, roles); + writeDate(out, slaDueDate); + } + + @Override + public void readEvent(DataInput in) throws IOException { + eventType = readInteger(in); + processName = readUTF(in); + state = readInteger(in); + roles = readUTFCollection(in, new LinkedHashSet<>()); + slaDueDate = readDate(in); + } + + @Override + public void visit(DataEvent dataEvent) { + this.processId = dataEvent.getKogitoProcessId(); + this.processInstanceId = dataEvent.getKogitoProcessInstanceId(); + this.processVersion = dataEvent.getKogitoProcessInstanceVersion(); + this.eventDate = toDate(dataEvent.getTime()); + this.eventUser = dataEvent.getKogitoIdentity(); + this.parentInstanceId = dataEvent.getKogitoParentProcessInstanceId(); + this.rootProcessId = dataEvent.getKogitoRootProcessId(); + this.rootProcessInstanceId = dataEvent.getKogitoRootProcessInstanceId(); + this.processType = dataEvent.getKogitoProcessType(); + this.businessKey = dataEvent.getKogitoBusinessKey(); + } + public Date getEventDate() { return eventDate; } @@ -262,4 +302,5 @@ public ProcessInstanceStateEventBody build() { } } + } diff --git a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceVariableEventBody.java b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceVariableEventBody.java index 00a55b9cd86..2008f264f6d 100644 --- a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceVariableEventBody.java +++ b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/ProcessInstanceVariableEventBody.java @@ -18,12 +18,19 @@ */ package org.kie.kogito.event.process; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Objects; -public class ProcessInstanceVariableEventBody { +import org.kie.kogito.event.DataEvent; + +import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.*; + +public class ProcessInstanceVariableEventBody implements KogitoMarshallEventSupport, CloudEventVisitor { // common fields for events private Date eventDate; @@ -46,6 +53,33 @@ public class ProcessInstanceVariableEventBody { private String variableName; private Object variableValue; + @Override + public void writeEvent(DataOutput out) throws IOException { + writeUTF(out, nodeContainerDefinitionId); + writeUTF(out, nodeContainerInstanceId); + writeUTF(out, variableId); + out.writeUTF(variableName); + writeObject(out, variableValue); + } + + @Override + public void readEvent(DataInput in) throws IOException { + nodeContainerDefinitionId = readUTF(in); + nodeContainerInstanceId = readUTF(in); + variableId = readUTF(in); + variableName = in.readUTF(); + variableValue = readObject(in); + } + + @Override + public void visit(DataEvent dataEvent) { + this.processId = dataEvent.getKogitoProcessId(); + this.processInstanceId = dataEvent.getKogitoProcessInstanceId(); + this.processVersion = dataEvent.getKogitoProcessInstanceVersion(); + this.eventDate = toDate(dataEvent.getTime()); + this.eventUser = dataEvent.getKogitoIdentity(); + } + public Date getEventDate() { return eventDate; } @@ -184,4 +218,5 @@ public ProcessInstanceVariableEventBody build() { return instance; } } + } diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java index 7db8c0e7659..4280ce27520 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java @@ -21,14 +21,26 @@ import java.net.URI; import java.util.Collection; -public class MultipleProcessInstanceDataEvent extends ProcessInstanceDataEvent>> { +public class MultipleProcessInstanceDataEvent extends ProcessInstanceDataEvent>> { - public static final String TYPE = "MultipleProcessInstanceDataEvent"; + public static final String MULTIPLE_TYPE = "MultipleProcessInstanceDataEvent"; + public static final String BINARY_CONTENT_TYPE = "application/octet-stream"; + public static final String COMPRESS_DATA = "CompressData"; public MultipleProcessInstanceDataEvent() { } - public MultipleProcessInstanceDataEvent(URI source, Collection> body) { - super(TYPE, source, body); + public MultipleProcessInstanceDataEvent(URI source, Collection> body) { + super(MULTIPLE_TYPE, source, body); } + + public boolean isCompressed() { + Object extension = getExtension(MultipleProcessInstanceDataEvent.COMPRESS_DATA); + return extension instanceof Boolean ? ((Boolean) extension).booleanValue() : false; + } + + public void setCompressed(boolean compressed) { + addExtensionAttribute(COMPRESS_DATA, compressed); + } + } diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceErrorDataEvent.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceErrorDataEvent.java index 1427e917f23..2c50f8e2262 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceErrorDataEvent.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceErrorDataEvent.java @@ -22,11 +22,14 @@ public class ProcessInstanceErrorDataEvent extends ProcessInstanceDataEvent { + public static final String ERROR_TYPE = "ProcessInstanceErrorDataEvent"; + public ProcessInstanceErrorDataEvent() { + this.setType(ERROR_TYPE); } public ProcessInstanceErrorDataEvent(String source, String addons, String identity, Map metaData, ProcessInstanceErrorEventBody body) { - super("ProcessInstanceErrorDataEvent", + super(ERROR_TYPE, source, body, (String) metaData.get(ProcessInstanceEventMetadata.PROCESS_INSTANCE_ID_META_DATA), diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceNodeDataEvent.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceNodeDataEvent.java index db118aa009f..e1e6a74e76d 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceNodeDataEvent.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceNodeDataEvent.java @@ -22,11 +22,14 @@ public class ProcessInstanceNodeDataEvent extends ProcessInstanceDataEvent { + public static final String NODE_TYPE = "ProcessInstanceNodeDataEvent"; + public ProcessInstanceNodeDataEvent() { + this.setType(NODE_TYPE); } public ProcessInstanceNodeDataEvent(String source, String addons, String identity, Map metaData, ProcessInstanceNodeEventBody body) { - super("ProcessInstanceNodeDataEvent", + super(NODE_TYPE, source, body, (String) metaData.get(ProcessInstanceEventMetadata.PROCESS_INSTANCE_ID_META_DATA), diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceSLADataEvent.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceSLADataEvent.java index 90139ce0022..e5b743aeac9 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceSLADataEvent.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceSLADataEvent.java @@ -22,11 +22,14 @@ public class ProcessInstanceSLADataEvent extends ProcessInstanceDataEvent { + public static final String SLA_TYPE = "ProcessInstanceSLADataEvent"; + public ProcessInstanceSLADataEvent() { + this.setType(SLA_TYPE); } public ProcessInstanceSLADataEvent(String source, String addons, String identity, Map metaData, ProcessInstanceSLAEventBody body) { - super("ProcessInstanceSLADataEvent", + super(SLA_TYPE, source, body, (String) metaData.get(ProcessInstanceEventMetadata.PROCESS_INSTANCE_ID_META_DATA), diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceStateDataEvent.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceStateDataEvent.java index 38d30defdab..a0ce3e1003b 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceStateDataEvent.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceStateDataEvent.java @@ -22,11 +22,14 @@ public class ProcessInstanceStateDataEvent extends ProcessInstanceDataEvent { + public static final String STATE_TYPE = "ProcessInstanceStateDataEvent"; + public ProcessInstanceStateDataEvent() { + this.setType(STATE_TYPE); } public ProcessInstanceStateDataEvent(String source, String addons, String identity, Map metaData, ProcessInstanceStateEventBody body) { - super("ProcessInstanceStateDataEvent", + super(STATE_TYPE, source, body, (String) metaData.get(ProcessInstanceEventMetadata.PROCESS_INSTANCE_ID_META_DATA), diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceVariableDataEvent.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceVariableDataEvent.java index b7c83d367fc..1acf471d5ba 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceVariableDataEvent.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceVariableDataEvent.java @@ -32,15 +32,18 @@ public class ProcessInstanceVariableDataEvent extends ProcessInstanceDataEvent

INTERNAL_EXTENSION_ATTRIBUTES = Collections.singleton(CloudEventExtensionConstants.KOGITO_VARIABLE_NAME); + public static final String VAR_TYPE = "ProcessInstanceVariableDataEvent"; + @JsonInclude(JsonInclude.Include.NON_EMPTY) @JsonProperty(CloudEventExtensionConstants.KOGITO_VARIABLE_NAME) private String kogitoVariableName; public ProcessInstanceVariableDataEvent() { + this.setType(VAR_TYPE); } public ProcessInstanceVariableDataEvent(String source, String addons, String identity, Map metaData, ProcessInstanceVariableEventBody body) { - super("ProcessInstanceVariableDataEvent", + super(VAR_TYPE, source, body, (String) metaData.get(ProcessInstanceEventMetadata.PROCESS_INSTANCE_ID_META_DATA), diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/JsonProcessInstanceDataEventDeserializer.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/JsonProcessInstanceDataEventDeserializer.java new file mode 100644 index 00000000000..d97b2512506 --- /dev/null +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/JsonProcessInstanceDataEventDeserializer.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.event.serializer; + +import java.io.IOException; + +import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent; +import org.kie.kogito.event.process.ProcessInstanceDataEvent; +import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent; +import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent; +import org.kie.kogito.event.process.ProcessInstanceSLADataEvent; +import org.kie.kogito.event.process.ProcessInstanceStateDataEvent; +import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; + +public class JsonProcessInstanceDataEventDeserializer extends StdDeserializer> { + + private static final Logger LOGGER = LoggerFactory.getLogger(JsonProcessInstanceDataEventDeserializer.class); + + private static final long serialVersionUID = 6152014726577574241L; + + public JsonProcessInstanceDataEventDeserializer() { + this(JsonProcessInstanceDataEventDeserializer.class); + } + + public JsonProcessInstanceDataEventDeserializer(Class vc) { + super(vc); + } + + @Override + public ProcessInstanceDataEvent deserialize(JsonParser jp, DeserializationContext ctxt) + throws IOException, JsonProcessingException { + JsonNode node = jp.getCodec().readTree(jp); + LOGGER.debug("Deserialize process instance data event: {}", node); + String type = node.get("type").asText(); + + switch (type) { + case MultipleProcessInstanceDataEvent.MULTIPLE_TYPE: + return jp.getCodec().treeToValue(node, MultipleProcessInstanceDataEvent.class); + case ProcessInstanceErrorDataEvent.ERROR_TYPE: + return (ProcessInstanceDataEvent) jp.getCodec().treeToValue(node, ProcessInstanceErrorDataEvent.class); + case ProcessInstanceNodeDataEvent.NODE_TYPE: + return (ProcessInstanceDataEvent) jp.getCodec().treeToValue(node, ProcessInstanceNodeDataEvent.class); + case ProcessInstanceSLADataEvent.SLA_TYPE: + return (ProcessInstanceDataEvent) jp.getCodec().treeToValue(node, ProcessInstanceSLADataEvent.class); + case ProcessInstanceStateDataEvent.STATE_TYPE: + return (ProcessInstanceDataEvent) jp.getCodec().treeToValue(node, ProcessInstanceStateDataEvent.class); + case ProcessInstanceVariableDataEvent.VAR_TYPE: + return (ProcessInstanceDataEvent) jp.getCodec().treeToValue(node, ProcessInstanceVariableDataEvent.class); + default: + LOGGER.warn("Unknown type {} in json data {}", type, node); + return (ProcessInstanceDataEvent) jp.getCodec().treeToValue(node, ProcessInstanceDataEvent.class); + + } + } +} \ No newline at end of file diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/JsonUserTaskInstanceDataEventDeserializer.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/JsonUserTaskInstanceDataEventDeserializer.java new file mode 100644 index 00000000000..7e8bf5458ad --- /dev/null +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/JsonUserTaskInstanceDataEventDeserializer.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.event.serializer; + +import java.io.IOException; + +import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent; +import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentDataEvent; +import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentDataEvent; +import org.kie.kogito.event.usertask.UserTaskInstanceCommentDataEvent; +import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent; +import org.kie.kogito.event.usertask.UserTaskInstanceDeadlineDataEvent; +import org.kie.kogito.event.usertask.UserTaskInstanceStateDataEvent; +import org.kie.kogito.event.usertask.UserTaskInstanceVariableDataEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; + +public class JsonUserTaskInstanceDataEventDeserializer extends StdDeserializer> { + + private static final Logger LOGGER = LoggerFactory.getLogger(JsonUserTaskInstanceDataEventDeserializer.class); + + private static final long serialVersionUID = -6626663191296012306L; + + public JsonUserTaskInstanceDataEventDeserializer() { + this(null); + } + + public JsonUserTaskInstanceDataEventDeserializer(Class vc) { + super(vc); + } + + @Override + public UserTaskInstanceDataEvent deserialize(JsonParser jp, DeserializationContext ctxt) + throws IOException, JsonProcessingException { + JsonNode node = jp.getCodec().readTree(jp); + LOGGER.debug("Deserialize user task instance data event: {}", node); + String type = node.get("type").asText(); + + switch (type) { + case MultipleUserTaskInstanceDataEvent.TYPE: + return jp.getCodec().treeToValue(node, MultipleUserTaskInstanceDataEvent.class); + case "UserTaskInstanceAssignmentDataEvent": + return (UserTaskInstanceDataEvent) jp.getCodec().treeToValue(node, UserTaskInstanceAssignmentDataEvent.class); + case "UserTaskInstanceAttachmentDataEvent": + return (UserTaskInstanceDataEvent) jp.getCodec().treeToValue(node, UserTaskInstanceAttachmentDataEvent.class); + case "UserTaskInstanceCommentDataEvent": + return (UserTaskInstanceDataEvent) jp.getCodec().treeToValue(node, UserTaskInstanceCommentDataEvent.class); + case "UserTaskInstanceDeadlineDataEvent": + return (UserTaskInstanceDataEvent) jp.getCodec().treeToValue(node, UserTaskInstanceDeadlineDataEvent.class); + case "UserTaskInstanceStateDataEvent": + return (UserTaskInstanceDataEvent) jp.getCodec().treeToValue(node, UserTaskInstanceStateDataEvent.class); + case "UserTaskInstanceVariableDataEvent": + return (UserTaskInstanceDataEvent) jp.getCodec().treeToValue(node, UserTaskInstanceVariableDataEvent.class); + default: + LOGGER.warn("Unknown type {} in json data {}", type, node); + return (UserTaskInstanceDataEvent) jp.getCodec().treeToValue(node, UserTaskInstanceDataEvent.class); + + } + } +} diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/KogitoDataEventSerializationHelper.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/KogitoDataEventSerializationHelper.java new file mode 100644 index 00000000000..04445be5bac --- /dev/null +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/KogitoDataEventSerializationHelper.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.event.serializer; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.net.URI; + +import org.kie.kogito.event.AbstractDataEvent; +import org.kie.kogito.event.DataEvent; +import org.kie.kogito.event.process.ProcessInstanceDataEvent; + +import io.cloudevents.SpecVersion; + +import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.*; + +public class KogitoDataEventSerializationHelper { + + private KogitoDataEventSerializationHelper() { + } + + static void writeCloudEventAttrs(DataOutput out, DataEvent data) throws IOException { + out.writeUTF(data.getSpecVersion().toString()); + out.writeUTF(data.getId()); + writeUTF(out, data.getSubject()); + writeUTF(out, data.getDataContentType()); + writeUTF(out, data.getDataSchema() != null ? data.getDataSchema().toString() : null); + } + + static > T readCloudEventAttrs(DataInput in, T data) throws IOException { + data.setSpecVersion(SpecVersion.parse(in.readUTF())); + data.setId(in.readUTF()); + data.setSubject(readUTF(in)); + data.setDataContentType(readUTF(in)); + data.setDataSchema(URI.create(readUTF(in))); + return data; + } + + static void populateCloudEvent(ProcessInstanceDataEvent event, ProcessInstanceDataEventExtensionRecord info) { + event.setKogitoBusinessKey(info.getBusinessKey()); + event.setKogitoProcessId(info.getId()); + event.setKogitoProcessInstanceId(info.getInstanceId()); + event.setKogitoParentProcessInstanceId(info.getParentInstanceId()); + event.setKogitoProcessInstanceState(info.getState()); + event.setKogitoProcessInstanceVersion(info.getVersion()); + event.setKogitoProcessType(info.getType()); + event.setKogitoRootProcessId(info.getRootId()); + event.setKogitoRootProcessInstanceId(info.getRootInstanceId()); + event.setKogitoIdentity(info.getIdentity()); + event.setSource(info.getSource()); + } + +} diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/KogitoSerializationModule.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/KogitoSerializationModule.java new file mode 100644 index 00000000000..381e0b93d2a --- /dev/null +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/KogitoSerializationModule.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.event.serializer; + +import org.kie.kogito.event.process.ProcessInstanceDataEvent; +import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent; + +import com.fasterxml.jackson.databind.module.SimpleModule; + +public class KogitoSerializationModule extends SimpleModule { + + private static final long serialVersionUID = 1L; + + public KogitoSerializationModule() { + super("KogitoSerialization"); + setSerializerModifier(new MultipleProcessDataInstanceBeanSerializerModifier()); + setDeserializerModifier(new MultipleProcessDataInstanceBeanDeserializerModifier()); + addDeserializer(ProcessInstanceDataEvent.class, new JsonProcessInstanceDataEventDeserializer()); + addDeserializer(UserTaskInstanceDataEvent.class, new JsonUserTaskInstanceDataEventDeserializer()); + } +} diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceBeanDeserializerModifier.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceBeanDeserializerModifier.java new file mode 100644 index 00000000000..6ef181b975f --- /dev/null +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceBeanDeserializerModifier.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.event.serializer; + +import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent; + +import com.fasterxml.jackson.databind.BeanDescription; +import com.fasterxml.jackson.databind.DeserializationConfig; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier; + +public class MultipleProcessDataInstanceBeanDeserializerModifier extends BeanDeserializerModifier { + + @Override + public JsonDeserializer modifyDeserializer( + DeserializationConfig config, BeanDescription beanDesc, JsonDeserializer deserializer) { + if (beanDesc.getBeanClass().equals(MultipleProcessInstanceDataEvent.class)) { + return new MultipleProcessInstanceDataEventDeserializer((JsonDeserializer) deserializer); + } + return deserializer; + } +} diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceBeanSerializerModifier.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceBeanSerializerModifier.java new file mode 100644 index 00000000000..3c62d932a33 --- /dev/null +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessDataInstanceBeanSerializerModifier.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.event.serializer; + +import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent; + +import com.fasterxml.jackson.databind.BeanDescription; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializationConfig; +import com.fasterxml.jackson.databind.ser.BeanSerializerModifier; + +public class MultipleProcessDataInstanceBeanSerializerModifier extends BeanSerializerModifier { + + private static final long serialVersionUID = 1L; + + @Override + public JsonSerializer modifySerializer( + SerializationConfig config, BeanDescription beanDesc, JsonSerializer serializer) { + if (beanDesc.getBeanClass().equals(MultipleProcessInstanceDataEvent.class)) { + return new MultipleProcessInstanceDataEventSerializer((JsonSerializer) serializer); + } + return serializer; + } +} diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventDeserializer.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventDeserializer.java new file mode 100644 index 00000000000..1e5619be233 --- /dev/null +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventDeserializer.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.event.serializer; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.zip.GZIPInputStream; + +import org.kie.kogito.event.process.CloudEventVisitor; +import org.kie.kogito.event.process.KogitoEventBodySerializationHelper; +import org.kie.kogito.event.process.KogitoMarshallEventSupport; +import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent; +import org.kie.kogito.event.process.ProcessInstanceDataEvent; +import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent; +import org.kie.kogito.event.process.ProcessInstanceErrorEventBody; +import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent; +import org.kie.kogito.event.process.ProcessInstanceNodeEventBody; +import org.kie.kogito.event.process.ProcessInstanceSLADataEvent; +import org.kie.kogito.event.process.ProcessInstanceSLAEventBody; +import org.kie.kogito.event.process.ProcessInstanceStateDataEvent; +import org.kie.kogito.event.process.ProcessInstanceStateEventBody; +import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent; +import org.kie.kogito.event.process.ProcessInstanceVariableEventBody; + +import com.fasterxml.jackson.core.JacksonException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.deser.ResolvableDeserializer; + +import io.cloudevents.SpecVersion; + +public class MultipleProcessInstanceDataEventDeserializer extends JsonDeserializer implements ResolvableDeserializer { + + private JsonDeserializer defaultDeserializer; + + public MultipleProcessInstanceDataEventDeserializer(JsonDeserializer deserializer) { + this.defaultDeserializer = deserializer; + } + + @Override + public void resolve(DeserializationContext ctxt) throws JsonMappingException { + ((ResolvableDeserializer) defaultDeserializer).resolve(ctxt); + } + + @Override + public MultipleProcessInstanceDataEvent deserialize(JsonParser p, DeserializationContext ctxt) + throws IOException, JacksonException { + JsonNode node = p.getCodec().readTree(p); + JsonNode dataContentType = node.get("datacontenttype"); + if (dataContentType != null && MultipleProcessInstanceDataEvent.BINARY_CONTENT_TYPE.equals(dataContentType.asText())) { + MultipleProcessInstanceDataEvent event = new MultipleProcessInstanceDataEvent(); + event.setDataContentType(dataContentType.asText()); + event.setSource(URI.create(node.get("source").asText())); + event.setType(node.get("type").asText()); + event.setSpecVersion(SpecVersion.parse(node.get("specversion").asText())); + event.setId(node.get("id").asText()); + JsonNode data = node.get("data"); + if (data != null) { + event.setData(readFromBytes(data.binaryValue(), isCompressed(node))); + } + return event; + } else { + JsonParser newParser = node.traverse(p.getCodec()); + newParser.nextToken(); + return (MultipleProcessInstanceDataEvent) defaultDeserializer.deserialize(newParser, ctxt); + } + } + + private static boolean isCompressed(JsonNode node) { + JsonNode compress = node.get(MultipleProcessInstanceDataEvent.COMPRESS_DATA); + return compress != null && compress.isBoolean() ? compress.asBoolean() : false; + } + + public static Collection> readFromBytes(byte[] binaryValue, boolean compressed) throws IOException { + InputStream wrappedIn = new ByteArrayInputStream(binaryValue); + if (compressed) { + wrappedIn = new GZIPInputStream(wrappedIn); + } + try (DataInputStream in = new DataInputStream(wrappedIn)) { + int size = in.readShort(); + Collection> result = new ArrayList<>(size); + List infos = new ArrayList<>(); + while (size-- > 0) { + byte readInfo = in.readByte(); + ProcessInstanceDataEventExtensionRecord info; + if (readInfo == -1) { + info = new ProcessInstanceDataEventExtensionRecord(); + info.readEvent(in); + infos.add(info); + } else { + info = infos.get(readInfo); + } + String type = in.readUTF(); + result.add(getCloudEvent(in, type, info)); + } + return result; + } + } + + private static ProcessInstanceDataEvent getCloudEvent(DataInputStream in, String type, ProcessInstanceDataEventExtensionRecord info) throws IOException { + switch (type) { + case ProcessInstanceVariableDataEvent.VAR_TYPE: + ProcessInstanceVariableDataEvent item = buildDataEvent(in, new ProcessInstanceVariableDataEvent(), new ProcessInstanceVariableEventBody(), info); + item.setKogitoVariableName(item.getData().getVariableName()); + return item; + case ProcessInstanceStateDataEvent.STATE_TYPE: + return buildDataEvent(in, new ProcessInstanceStateDataEvent(), new ProcessInstanceStateEventBody(), info); + case ProcessInstanceNodeDataEvent.NODE_TYPE: + return buildDataEvent(in, new ProcessInstanceNodeDataEvent(), new ProcessInstanceNodeEventBody(), info); + case ProcessInstanceErrorDataEvent.ERROR_TYPE: + return buildDataEvent(in, new ProcessInstanceErrorDataEvent(), new ProcessInstanceErrorEventBody(), info); + case ProcessInstanceSLADataEvent.SLA_TYPE: + return buildDataEvent(in, new ProcessInstanceSLADataEvent(), new ProcessInstanceSLAEventBody(), info); + default: + throw new UnsupportedOperationException("Unrecognized event type " + type); + } + } + + private static , V extends KogitoMarshallEventSupport & CloudEventVisitor> T buildDataEvent(DataInput in, T cloudEvent, V body, + ProcessInstanceDataEventExtensionRecord info) throws IOException { + int delta = KogitoEventBodySerializationHelper.readInteger(in); + cloudEvent.setTime(info.getTime().plus(delta, ChronoUnit.MILLIS)); + KogitoDataEventSerializationHelper.readCloudEventAttrs(in, cloudEvent); + KogitoDataEventSerializationHelper.populateCloudEvent(cloudEvent, info); + body.readEvent(in); + body.visit(cloudEvent); + cloudEvent.setData(body); + return cloudEvent; + } + +} diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventSerializer.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventSerializer.java new file mode 100644 index 00000000000..1c591079254 --- /dev/null +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventSerializer.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.event.serializer; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.zip.GZIPOutputStream; + +import org.kie.kogito.event.process.KogitoEventBodySerializationHelper; +import org.kie.kogito.event.process.KogitoMarshallEventSupport; +import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent; +import org.kie.kogito.event.process.ProcessInstanceDataEvent; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; + +public class MultipleProcessInstanceDataEventSerializer extends JsonSerializer { + + private JsonSerializer defaultSerializer; + + public MultipleProcessInstanceDataEventSerializer(JsonSerializer serializer) { + this.defaultSerializer = serializer; + } + + @Override + public void serialize(MultipleProcessInstanceDataEvent value, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + if (MultipleProcessInstanceDataEvent.BINARY_CONTENT_TYPE.equals(value.getDataContentType())) { + gen.writeStartObject(); + gen.writeStringField("datacontenttype", value.getDataContentType()); + gen.writeStringField("source", value.getSource().toString()); + gen.writeStringField("id", value.getId()); + gen.writeStringField("specversion", value.getSpecVersion().toString()); + gen.writeStringField("type", value.getType()); + boolean compress = value.isCompressed(); + if (compress) { + gen.writeBooleanField(MultipleProcessInstanceDataEvent.COMPRESS_DATA, true); + } + gen.writeBinaryField("data", dataAsBytes(gen, value.getData(), compress)); + gen.writeEndObject(); + } else { + defaultSerializer.serialize(value, gen, serializers); + } + } + + private byte[] dataAsBytes(JsonGenerator gen, Collection> data, boolean compress) throws IOException { + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + try (DataOutputStream out = new DataOutputStream(compress ? new GZIPOutputStream(bytesOut) : bytesOut)) { + out.writeShort(data.size()); + Map infos = new HashMap<>(); + for (ProcessInstanceDataEvent cloudEvent : data) { + String key = cloudEvent.getKogitoProcessInstanceId(); + ProcessInstanceDataEventExtensionRecord info = infos.get(key); + if (info == null) { + out.writeByte(-1); + info = new ProcessInstanceDataEventExtensionRecord(infos.size(), cloudEvent); + info.writeEvent(out); + infos.put(key, info); + } else { + out.writeByte((byte) info.getOrdinal()); + } + out.writeUTF(cloudEvent.getType()); + KogitoEventBodySerializationHelper.writeInteger(out, cloudEvent.getTime().compareTo(info.getTime())); + KogitoDataEventSerializationHelper.writeCloudEventAttrs(out, cloudEvent); + cloudEvent.getData().writeEvent(out); + } + } + return bytesOut.toByteArray(); + } + +} diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/ProcessInstanceDataEventExtensionRecord.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/ProcessInstanceDataEventExtensionRecord.java new file mode 100644 index 00000000000..45ba4d2ffb4 --- /dev/null +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/ProcessInstanceDataEventExtensionRecord.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.event.serializer; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.net.URI; +import java.time.OffsetDateTime; + +import org.kie.kogito.event.process.KogitoMarshallEventSupport; +import org.kie.kogito.event.process.ProcessInstanceDataEvent; + +import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.*; + +class ProcessInstanceDataEventExtensionRecord implements KogitoMarshallEventSupport { + + // addons, referenceId, and starFromNode are not used by process instance events + private String id; + private String instanceId; + private String version; + private String state; + private String type; + private String parentInstanceId; + private String rootId; + private String rootInstanceId; + private String businessKey; + private String identity; + private URI source; + private OffsetDateTime time; + private transient int ordinal; + + public ProcessInstanceDataEventExtensionRecord() { + } + + public ProcessInstanceDataEventExtensionRecord(int ordinal, ProcessInstanceDataEvent dataEvent) { + this.ordinal = ordinal; + id = dataEvent.getKogitoProcessId(); + instanceId = dataEvent.getKogitoProcessInstanceId(); + version = dataEvent.getKogitoProcessInstanceVersion(); + state = dataEvent.getKogitoProcessInstanceState(); + type = dataEvent.getKogitoProcessType(); + parentInstanceId = dataEvent.getKogitoParentProcessInstanceId(); + rootId = dataEvent.getKogitoRootProcessId(); + rootInstanceId = dataEvent.getKogitoRootProcessInstanceId(); + businessKey = dataEvent.getKogitoBusinessKey(); + identity = dataEvent.getKogitoIdentity(); + time = dataEvent.getTime(); + source = dataEvent.getSource(); + } + + public int getOrdinal() { + return ordinal; + } + + public String getId() { + return id; + } + + public String getBusinessKey() { + return businessKey; + } + + public String getInstanceId() { + return instanceId; + } + + public String getVersion() { + return version; + } + + public String getState() { + return state; + } + + public String getType() { + return type; + } + + public String getParentInstanceId() { + return parentInstanceId; + } + + public String getRootId() { + return rootId; + } + + public String getRootInstanceId() { + return rootInstanceId; + } + + public String getIdentity() { + return identity; + } + + public OffsetDateTime getTime() { + return time; + } + + public URI getSource() { + return source; + } + + @Override + public void writeEvent(DataOutput out) throws IOException { + out.writeUTF(id); + out.writeUTF(instanceId); + out.writeUTF(version); + out.writeUTF(state); + writeUTF(out, type); + writeUTF(out, parentInstanceId); + writeUTF(out, rootId); + writeUTF(out, rootInstanceId); + writeUTF(out, businessKey); + writeUTF(out, identity); + writeTime(out, time); + out.writeUTF(source.toString()); + } + + @Override + public void readEvent(DataInput in) throws IOException { + id = in.readUTF(); + instanceId = in.readUTF(); + version = in.readUTF(); + state = in.readUTF(); + type = readUTF(in); + parentInstanceId = readUTF(in); + rootId = readUTF(in); + rootInstanceId = readUTF(in); + businessKey = readUTF(in); + identity = readUTF(in); + time = readTime(in); + source = URI.create(in.readUTF()); + } +} diff --git a/api/kogito-events-core/src/main/resources/META-INF/services/com.fasterxml.jackson.databind.Module b/api/kogito-events-core/src/main/resources/META-INF/services/com.fasterxml.jackson.databind.Module new file mode 100644 index 00000000000..85fd599eb48 --- /dev/null +++ b/api/kogito-events-core/src/main/resources/META-INF/services/com.fasterxml.jackson.databind.Module @@ -0,0 +1 @@ +org.kie.kogito.event.serializer.KogitoSerializationModule \ No newline at end of file diff --git a/api/kogito-events-core/src/test/java/org/kie/kogito/event/process/ProcessEventsTest.java b/api/kogito-events-core/src/test/java/org/kie/kogito/event/process/ProcessEventsTest.java index 0d4fb4fafa9..8b393370647 100644 --- a/api/kogito-events-core/src/test/java/org/kie/kogito/event/process/ProcessEventsTest.java +++ b/api/kogito-events-core/src/test/java/org/kie/kogito/event/process/ProcessEventsTest.java @@ -18,9 +18,11 @@ */ package org.kie.kogito.event.process; +import java.io.IOException; import java.net.URI; import java.time.OffsetDateTime; import java.util.Arrays; +import java.util.Iterator; import java.util.Set; import java.util.stream.Collectors; @@ -28,7 +30,12 @@ import org.kie.kogito.event.AbstractDataEvent; import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants; import org.kie.kogito.event.usertask.UserTaskInstanceStateDataEvent; +import org.kie.kogito.jackson.utils.JsonObjectUtils; +import org.kie.kogito.jackson.utils.ObjectMapperFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; @@ -36,27 +43,27 @@ import io.cloudevents.jackson.JsonFormat; import static org.assertj.core.api.Assertions.assertThat; +import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.toDate; class ProcessEventsTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() .registerModule(new JavaTimeModule()) .registerModule(JsonFormat.getCloudEventJacksonModule()) .disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + private static final Logger logger = LoggerFactory.getLogger(ProcessEventsTest.class); + private static final Set BASE_EXTENSION_NAMES = Arrays.stream(new String[] { CloudEventExtensionConstants.PROCESS_INSTANCE_ID, CloudEventExtensionConstants.PROCESS_ROOT_PROCESS_INSTANCE_ID, CloudEventExtensionConstants.PROCESS_ID, CloudEventExtensionConstants.PROCESS_ROOT_PROCESS_ID, - CloudEventExtensionConstants.ADDONS, CloudEventExtensionConstants.PROCESS_INSTANCE_VERSION, CloudEventExtensionConstants.PROCESS_PARENT_PROCESS_INSTANCE_ID, CloudEventExtensionConstants.PROCESS_INSTANCE_STATE, - CloudEventExtensionConstants.PROCESS_REFERENCE_ID, - CloudEventExtensionConstants.PROCESS_START_FROM_NODE, CloudEventExtensionConstants.BUSINESS_KEY, - CloudEventExtensionConstants.PROCESS_TYPE }).collect(Collectors.toSet()); + CloudEventExtensionConstants.PROCESS_TYPE, + CloudEventExtensionConstants.IDENTITY }).collect(Collectors.toSet()); private static final String PROCESS_INSTANCE_EVENT_TYPE = "ProcessInstanceEvent"; private static final String USER_TASK_INSTANCE_EVENT_TYPE = "UserTaskInstanceEvent"; @@ -77,17 +84,18 @@ class ProcessEventsTest { private static final String ROOT_PROCESS_ID = "ROOT_PROCESS_ID"; private static final String PROCESS_PARENT_PROCESS_INSTANCE_ID = "PROCESS_PARENT_PROCESS_INSTANCE_ID"; private static final String PROCESS_INSTANCE_STATE = "PROCESS_INSTANCE_STATE"; - private static final String PROCESS_REFERENCE_ID = "PROCESS_REFERENCE_ID"; - private static final String PROCESS_START_FROM_NODE = "PROCESS_START_FROM_NODE"; private static final String BUSINESS_KEY = "BUSINESS_KEY"; private static final String PROCESS_TYPE = "PROCESS_TYPE"; - private static final String ADDONS = "ADDONS"; - + private static final int PROCESS_STATE = 1; + private static final String NODE_CONTAINER_ID = "323"; + private static final String NODE_CONTAINER_INSTANCEID = "323-3232-3232"; private static final String EXTENSION_1 = "EXTENSION_1"; private static final String EXTENSION_1_VALUE = "EXTENSION_1_VALUE"; private static final String EXTENSION_2 = "EXTENSION_2"; private static final String EXTENSION_2_VALUE = "EXTENSION_2_VALUE"; + private static final int EVENT_TYPE = 1; + private static final String VARIABLE_NAME = "VARIABLE_NAME"; private static final String PROCESS_USER_TASK_INSTANCE_ID = "PROCESS_USER_TASK_INSTANCE_ID"; @@ -112,6 +120,90 @@ void processInstanceDataEvent() throws Exception { assertExtensionNames(deserializedEvent, BASE_EXTENSION_NAMES, EXTENSION_1, EXTENSION_2); } + @Test + void multipleInstanceDataEvent() throws IOException { + JsonNode expectedVarValue = ObjectMapperFactory.get().createObjectNode().put("name", "John Doe"); + int standard = processMultipleInstanceDataEvent(expectedVarValue, false, false); + int binary = processMultipleInstanceDataEvent(expectedVarValue, true, false); + int binaryCompressed = processMultipleInstanceDataEvent(expectedVarValue, true, true); + assertThat(standard).isGreaterThan(binary); + assertThat(binary).isGreaterThan(binaryCompressed); + } + + private int processMultipleInstanceDataEvent(JsonNode expectedVarValue, boolean binary, boolean compress) throws IOException { + ProcessInstanceStateDataEvent stateEvent = new ProcessInstanceStateDataEvent(); + setBaseEventValues(stateEvent, ProcessInstanceStateDataEvent.STATE_TYPE); + stateEvent.setData(ProcessInstanceStateEventBody.create().eventDate(toDate(TIME)).eventType(EVENT_TYPE).eventUser(SUBJECT) + .businessKey(BUSINESS_KEY).processId(PROCESS_ID).processInstanceId(PROCESS_INSTANCE_ID).state(PROCESS_STATE) + .processVersion(PROCESS_INSTANCE_VERSION).parentInstanceId(PROCESS_PARENT_PROCESS_INSTANCE_ID).processName(PROCESS_ID) + .processType(PROCESS_TYPE).rootProcessId(ROOT_PROCESS_ID).rootProcessInstanceId(ROOT_PROCESS_INSTANCE_ID).build()); + + ProcessInstanceVariableDataEvent varEvent1 = new ProcessInstanceVariableDataEvent(); + setBaseEventValues(varEvent1, ProcessInstanceVariableDataEvent.VAR_TYPE); + varEvent1.addExtensionAttribute(CloudEventExtensionConstants.KOGITO_VARIABLE_NAME, VARIABLE_NAME); + varEvent1.setData(ProcessInstanceVariableEventBody.create().eventDate(toDate(TIME)).eventUser(SUBJECT) + .processId(PROCESS_ID).processInstanceId(PROCESS_INSTANCE_ID).processVersion(PROCESS_INSTANCE_VERSION) + .nodeContainerDefinitionId(NODE_CONTAINER_ID).nodeContainerInstanceId(NODE_CONTAINER_INSTANCEID) + .variableName(VARIABLE_NAME) + .variableId(VARIABLE_NAME) + .variableValue(expectedVarValue) + .build()); + + MultipleProcessInstanceDataEvent event = new MultipleProcessInstanceDataEvent(SOURCE, Arrays.asList(stateEvent, varEvent1)); + if (binary) { + event.setDataContentType(MultipleProcessInstanceDataEvent.BINARY_CONTENT_TYPE); + } + if (compress) { + event.setCompressed(compress); + } + + byte[] json = ObjectMapperFactory.get().writeValueAsBytes(event); + logger.info("Serialized chunk size is {}", json.length); + MultipleProcessInstanceDataEvent deserializedEvent = ObjectMapperFactory.get().readValue(json, MultipleProcessInstanceDataEvent.class); + + assertThat(deserializedEvent.getData()).hasSize(2); + Iterator> iter = deserializedEvent.getData().iterator(); + ProcessInstanceStateDataEvent deserializedStateEvent = (ProcessInstanceStateDataEvent) iter.next(); + + assertBaseEventValues(deserializedStateEvent, ProcessInstanceStateDataEvent.STATE_TYPE); + assertExtensionNames(deserializedStateEvent, BASE_EXTENSION_NAMES); + assertStateBody(deserializedStateEvent.getData()); + + ProcessInstanceVariableDataEvent deserializedVariableEvent = (ProcessInstanceVariableDataEvent) iter.next(); + assertBaseEventValues(deserializedVariableEvent, ProcessInstanceVariableDataEvent.VAR_TYPE); + assertThat(deserializedVariableEvent.getExtension(CloudEventExtensionConstants.KOGITO_VARIABLE_NAME)).isEqualTo(VARIABLE_NAME); + assertVarBody(deserializedVariableEvent.getData(), expectedVarValue); + return json.length; + } + + private static void assertVarBody(ProcessInstanceVariableEventBody data, JsonNode expectedVarValue) { + assertThat(data.getVariableId()).isEqualTo(VARIABLE_NAME); + assertThat(data.getVariableName()).isEqualTo(VARIABLE_NAME); + assertThat(JsonObjectUtils.fromValue(data.getVariableValue())).isEqualTo(expectedVarValue); + assertThat(data.getNodeContainerDefinitionId()).isEqualTo(NODE_CONTAINER_ID); + assertThat(data.getNodeContainerInstanceId()).isEqualTo(NODE_CONTAINER_INSTANCEID); + assertThat(data.getProcessId()).isEqualTo(PROCESS_ID); + assertThat(data.getProcessInstanceId()).isEqualTo(PROCESS_INSTANCE_ID); + assertThat(data.getProcessVersion()).isEqualTo(PROCESS_INSTANCE_VERSION); + assertThat(data.getEventUser()).isEqualTo(SUBJECT); + assertThat(data.getEventDate()).isEqualTo(toDate(TIME)); + } + + private static void assertStateBody(ProcessInstanceStateEventBody data) { + assertThat(data.getBusinessKey()).isEqualTo(BUSINESS_KEY); + assertThat(data.getParentInstanceId()).isEqualTo(PROCESS_PARENT_PROCESS_INSTANCE_ID); + assertThat(data.getRootProcessId()).isEqualTo(ROOT_PROCESS_ID); + assertThat(data.getProcessType()).isEqualTo(PROCESS_TYPE); + assertThat(data.getState()).isEqualTo(PROCESS_STATE); + assertThat(data.getRootProcessInstanceId()).isEqualTo(ROOT_PROCESS_INSTANCE_ID); + assertThat(data.getEventType()).isEqualTo(EVENT_TYPE); + assertThat(data.getProcessId()).isEqualTo(PROCESS_ID); + assertThat(data.getProcessInstanceId()).isEqualTo(PROCESS_INSTANCE_ID); + assertThat(data.getProcessVersion()).isEqualTo(PROCESS_INSTANCE_VERSION); + assertThat(data.getEventUser()).isEqualTo(SUBJECT); + assertThat(data.getEventDate()).isEqualTo(toDate(TIME)); + } + @Test void userTaskInstanceDataEvent() throws Exception { UserTaskInstanceStateDataEvent event = new UserTaskInstanceStateDataEvent(); @@ -136,7 +228,6 @@ void userTaskInstanceDataEvent() throws Exception { assertExtensionNames(deserializedEvent, BASE_EXTENSION_NAMES, CloudEventExtensionConstants.PROCESS_USER_TASK_INSTANCE_ID, CloudEventExtensionConstants.PROCESS_USER_TASK_INSTANCE_STATE, EXTENSION_1, EXTENSION_2); - } @Test @@ -175,12 +266,10 @@ private static void setBaseEventValues(AbstractDataEvent event, String eventT event.setKogitoRootProcessInstanceId(ROOT_PROCESS_INSTANCE_ID); event.setKogitoRootProcessId(ROOT_PROCESS_ID); event.setKogitoParentProcessInstanceId(PROCESS_PARENT_PROCESS_INSTANCE_ID); - event.setKogitoReferenceId(PROCESS_REFERENCE_ID); event.setKogitoProcessInstanceState(PROCESS_INSTANCE_STATE); - event.setKogitoStartFromNode(PROCESS_START_FROM_NODE); event.setKogitoBusinessKey(BUSINESS_KEY); event.setKogitoProcessType(PROCESS_TYPE); - event.setKogitoAddons(ADDONS); + event.setKogitoIdentity(SUBJECT); } private static void setAdditionalExtensions(AbstractDataEvent event) { @@ -197,25 +286,22 @@ private static void assertBaseEventValues(AbstractDataEvent deserializedEvent assertThat(deserializedEvent.getSubject()).isEqualTo(SUBJECT); assertThat(deserializedEvent.getDataContentType()).isEqualTo(DATA_CONTENT_TYPE); assertThat(deserializedEvent.getDataSchema()).isEqualTo(DATA_SCHEMA); - assertThat(deserializedEvent.getKogitoProcessInstanceId()).isEqualTo(PROCESS_INSTANCE_ID); assertThat(deserializedEvent.getKogitoProcessId()).isEqualTo(PROCESS_ID); assertThat(deserializedEvent.getKogitoRootProcessInstanceId()).isEqualTo(ROOT_PROCESS_INSTANCE_ID); assertThat(deserializedEvent.getKogitoRootProcessId()).isEqualTo(ROOT_PROCESS_ID); assertThat(deserializedEvent.getKogitoParentProcessInstanceId()).isEqualTo(PROCESS_PARENT_PROCESS_INSTANCE_ID); - assertThat(deserializedEvent.getKogitoReferenceId()).isEqualTo(PROCESS_REFERENCE_ID); assertThat(deserializedEvent.getKogitoProcessInstanceState()).isEqualTo(PROCESS_INSTANCE_STATE); - assertThat(deserializedEvent.getKogitoStartFromNode()).isEqualTo(PROCESS_START_FROM_NODE); assertThat(deserializedEvent.getKogitoBusinessKey()).isEqualTo(BUSINESS_KEY); assertThat(deserializedEvent.getKogitoProcessType()).isEqualTo(PROCESS_TYPE); - assertThat(deserializedEvent.getKogitoAddons()).isEqualTo(ADDONS); + assertThat(deserializedEvent.getKogitoIdentity()).isEqualTo(SUBJECT); } private static void assertExtensionNames(AbstractDataEvent event, Set baseNames, String... names) { Set extensionNames = event.getExtensionNames(); assertThat(extensionNames).hasSize(baseNames.size() + names.length) - .containsAll(baseNames) - .contains(names); + .containsAll(baseNames); + } private static void assertExtensionsNotDuplicated(String json, Set extensionNames) { diff --git a/kogito-codegen-modules/kogito-codegen-core/src/main/resources/class-templates/config/GlobalObjectMapperQuarkusTemplate.java b/kogito-codegen-modules/kogito-codegen-core/src/main/resources/class-templates/config/GlobalObjectMapperQuarkusTemplate.java index 050097b1d13..c670e699f90 100644 --- a/kogito-codegen-modules/kogito-codegen-core/src/main/resources/class-templates/config/GlobalObjectMapperQuarkusTemplate.java +++ b/kogito-codegen-modules/kogito-codegen-core/src/main/resources/class-templates/config/GlobalObjectMapperQuarkusTemplate.java @@ -43,6 +43,6 @@ public void customize(ObjectMapper mapper) { mapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); } mapper.setDateFormat(new StdDateFormat().withColonInTimeZone(true).withTimeZone(TimeZone.getDefault())); - mapper.registerModule(new JavaTimeModule()).registerModule(JsonFormat.getCloudEventJacksonModule()); + mapper.registerModule(JsonFormat.getCloudEventJacksonModule()).findAndRegisterModules(); } } \ No newline at end of file diff --git a/kogito-codegen-modules/kogito-codegen-core/src/main/resources/class-templates/config/GlobalObjectMapperSpringTemplate.java b/kogito-codegen-modules/kogito-codegen-core/src/main/resources/class-templates/config/GlobalObjectMapperSpringTemplate.java index 86896e5799e..f073ff0e9d6 100644 --- a/kogito-codegen-modules/kogito-codegen-core/src/main/resources/class-templates/config/GlobalObjectMapperSpringTemplate.java +++ b/kogito-codegen-modules/kogito-codegen-core/src/main/resources/class-templates/config/GlobalObjectMapperSpringTemplate.java @@ -32,6 +32,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; +import org.kie.kogito.event.serialize.KogitoSerializationModule; @SpringBootConfiguration public class GlobalObjectMapper { @@ -48,7 +49,7 @@ public void customize(Jackson2ObjectMapperBuilder builder) { builder.featuresToDisable (SerializationFeature.FAIL_ON_EMPTY_BEANS); } builder.dateFormat(new StdDateFormat().withColonInTimeZone(true).withTimeZone(TimeZone.getDefault())); - builder.modulesToInstall(new JavaTimeModule()); + builder.modulesToInstall(new JavaTimeModule(), new KogitoSerializationModule()); } }; } diff --git a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/GroupingMessagingEventPublisher.java b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/GroupingMessagingEventPublisher.java index 66e007d0394..ee30e0dc36d 100644 --- a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/GroupingMessagingEventPublisher.java +++ b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/GroupingMessagingEventPublisher.java @@ -26,7 +26,9 @@ import java.util.Map; import java.util.Map.Entry; +import org.eclipse.microprofile.config.inject.ConfigProperty; import org.kie.kogito.event.DataEvent; +import org.kie.kogito.event.process.KogitoMarshallEventSupport; import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent; import org.kie.kogito.event.process.ProcessInstanceDataEvent; import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent; @@ -45,6 +47,12 @@ public void publish(DataEvent event) { publish(Collections.singletonList(event)); } + @ConfigProperty(name = "kogito.events.grouping.binary", defaultValue = "false") + private boolean binary; + + @ConfigProperty(name = "kogito.events.grouping.compress", defaultValue = "false") + private boolean compress; + @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public void publish(Collection> events) { @@ -62,7 +70,12 @@ private void publishEvents(Map.Entry entry) if (firstEvent instanceof UserTaskInstanceDataEvent) { publishToTopic(entry.getKey(), new MultipleUserTaskInstanceDataEvent(source, (Collection>) entry.getValue())); } else if (firstEvent instanceof ProcessInstanceDataEvent) { - publishToTopic(entry.getKey(), new MultipleProcessInstanceDataEvent(source, (Collection>) entry.getValue())); + MultipleProcessInstanceDataEvent sent = new MultipleProcessInstanceDataEvent(source, (Collection>) entry.getValue()); + if (binary) { + sent.setDataContentType(MultipleProcessInstanceDataEvent.BINARY_CONTENT_TYPE); + sent.setCompressed(compress); + } + publishToTopic(entry.getKey(), sent); } else { for (DataEvent event : (Collection>) entry.getValue()) { publishToTopic(entry.getKey(), event);