From 7211e8179b171aebce1aaa74c49238eb3a042d99 Mon Sep 17 00:00:00 2001 From: Francisco Javier Tirado Sarti Date: Thu, 24 Oct 2024 14:28:38 +0200 Subject: [PATCH] [Fix #3721] Adding traces --- .../KogitoEventBodySerializationHelper.java | 5 +- ...eProcessInstanceDataEventDeserializer.java | 50 ++++++++++++++----- ...pleProcessInstanceDataEventSerializer.java | 34 +++++++++++-- ...ocessInstanceDataEventExtensionRecord.java | 9 ++++ 4 files changed, 78 insertions(+), 20 deletions(-) diff --git a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/KogitoEventBodySerializationHelper.java b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/KogitoEventBodySerializationHelper.java index 9d17f77d628..d00146eb9c2 100644 --- a/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/KogitoEventBodySerializationHelper.java +++ b/api/kogito-events-api/src/main/java/org/kie/kogito/event/process/KogitoEventBodySerializationHelper.java @@ -237,10 +237,9 @@ public static void writeInteger(DataOutput out, Integer integer) throws IOExcept public static Integer readInteger(DataInput in) throws IOException { SerType type = readType(in); return type == SerType.NULL ? null : readInt(in, type); - } - private static void writeInt(DataOutput out, int size) throws IOException { + public static void writeInt(DataOutput out, int size) throws IOException { if (size < Byte.MAX_VALUE) { writeType(out, SerType.BYTE); out.writeByte((byte) size); @@ -253,7 +252,7 @@ private static void writeInt(DataOutput out, int size) throws IOException { } } - private static int readInt(DataInput in) throws IOException { + public static int readInt(DataInput in) throws IOException { SerType type = readType(in); return readInt(in, type); } diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventDeserializer.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventDeserializer.java index 1e5619be233..cb556d3e08f 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventDeserializer.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventDeserializer.java @@ -28,10 +28,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.function.Supplier; import java.util.zip.GZIPInputStream; import org.kie.kogito.event.process.CloudEventVisitor; -import org.kie.kogito.event.process.KogitoEventBodySerializationHelper; import org.kie.kogito.event.process.KogitoMarshallEventSupport; import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent; import org.kie.kogito.event.process.ProcessInstanceDataEvent; @@ -45,6 +45,8 @@ import org.kie.kogito.event.process.ProcessInstanceStateEventBody; import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent; import org.kie.kogito.event.process.ProcessInstanceVariableEventBody; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.fasterxml.jackson.core.JacksonException; import com.fasterxml.jackson.core.JsonParser; @@ -56,8 +58,12 @@ import io.cloudevents.SpecVersion; +import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.readInt; + public class MultipleProcessInstanceDataEventDeserializer extends JsonDeserializer implements ResolvableDeserializer { + private static final Logger logger = LoggerFactory.getLogger(MultipleProcessInstanceDataEventDeserializer.class); + private JsonDeserializer defaultDeserializer; public MultipleProcessInstanceDataEventDeserializer(JsonDeserializer deserializer) { @@ -101,24 +107,31 @@ private static boolean isCompressed(JsonNode node) { public static Collection> readFromBytes(byte[] binaryValue, boolean compressed) throws IOException { InputStream wrappedIn = new ByteArrayInputStream(binaryValue); if (compressed) { + logger.trace("Gzip compressed byte array"); wrappedIn = new GZIPInputStream(wrappedIn); } try (DataInputStream in = new DataInputStream(wrappedIn)) { - int size = in.readShort(); + int size = readInt(in); + logger.trace("Reading collection of size {}", size); Collection> result = new ArrayList<>(size); List infos = new ArrayList<>(); while (size-- > 0) { byte readInfo = in.readByte(); + logger.trace("Info ordinal is {}", readInfo); ProcessInstanceDataEventExtensionRecord info; if (readInfo == -1) { info = new ProcessInstanceDataEventExtensionRecord(); info.readEvent(in); + logger.trace("Info readed is {}", info); infos.add(info); } else { info = infos.get(readInfo); + logger.trace("Info cached is {}", info); } String type = in.readUTF(); + logger.trace("Type is {}", info); result.add(getCloudEvent(in, type, info)); + logger.trace("{} events remaining", size); } return result; } @@ -127,31 +140,44 @@ public static Collection getCloudEvent(DataInputStream in, String type, ProcessInstanceDataEventExtensionRecord info) throws IOException { switch (type) { case ProcessInstanceVariableDataEvent.VAR_TYPE: - ProcessInstanceVariableDataEvent item = buildDataEvent(in, new ProcessInstanceVariableDataEvent(), new ProcessInstanceVariableEventBody(), info); + ProcessInstanceVariableDataEvent item = buildDataEvent(in, new ProcessInstanceVariableDataEvent(), ProcessInstanceVariableEventBody::new, info); item.setKogitoVariableName(item.getData().getVariableName()); return item; case ProcessInstanceStateDataEvent.STATE_TYPE: - return buildDataEvent(in, new ProcessInstanceStateDataEvent(), new ProcessInstanceStateEventBody(), info); + return buildDataEvent(in, new ProcessInstanceStateDataEvent(), ProcessInstanceStateEventBody::new, info); case ProcessInstanceNodeDataEvent.NODE_TYPE: - return buildDataEvent(in, new ProcessInstanceNodeDataEvent(), new ProcessInstanceNodeEventBody(), info); + return buildDataEvent(in, new ProcessInstanceNodeDataEvent(), ProcessInstanceNodeEventBody::new, info); case ProcessInstanceErrorDataEvent.ERROR_TYPE: - return buildDataEvent(in, new ProcessInstanceErrorDataEvent(), new ProcessInstanceErrorEventBody(), info); + return buildDataEvent(in, new ProcessInstanceErrorDataEvent(), ProcessInstanceErrorEventBody::new, info); case ProcessInstanceSLADataEvent.SLA_TYPE: - return buildDataEvent(in, new ProcessInstanceSLADataEvent(), new ProcessInstanceSLAEventBody(), info); + return buildDataEvent(in, new ProcessInstanceSLADataEvent(), ProcessInstanceSLAEventBody::new, info); default: throw new UnsupportedOperationException("Unrecognized event type " + type); } } - private static , V extends KogitoMarshallEventSupport & CloudEventVisitor> T buildDataEvent(DataInput in, T cloudEvent, V body, + private static , V extends KogitoMarshallEventSupport & CloudEventVisitor> T buildDataEvent(DataInput in, T cloudEvent, Supplier bodySupplier, ProcessInstanceDataEventExtensionRecord info) throws IOException { - int delta = KogitoEventBodySerializationHelper.readInteger(in); + int delta = readInt(in); + logger.trace("Time delta is {}", delta); cloudEvent.setTime(info.getTime().plus(delta, ChronoUnit.MILLIS)); KogitoDataEventSerializationHelper.readCloudEventAttrs(in, cloudEvent); + logger.trace("Cloud event before population {}", cloudEvent); KogitoDataEventSerializationHelper.populateCloudEvent(cloudEvent, info); - body.readEvent(in); - body.visit(cloudEvent); - cloudEvent.setData(body); + logger.trace("Cloud event after population {}", cloudEvent); + + boolean isNotNull = in.readBoolean(); + if (isNotNull) { + logger.trace("Data is not null"); + V body = bodySupplier.get(); + body.readEvent(in); + logger.trace("Event body before population {}", body); + body.visit(cloudEvent); + logger.trace("Event body after population {}", body); + cloudEvent.setData(body); + } else { + logger.trace("Data is null"); + } return cloudEvent; } diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventSerializer.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventSerializer.java index 1c591079254..29953f155db 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventSerializer.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/MultipleProcessInstanceDataEventSerializer.java @@ -26,17 +26,22 @@ import java.util.Map; import java.util.zip.GZIPOutputStream; -import org.kie.kogito.event.process.KogitoEventBodySerializationHelper; import org.kie.kogito.event.process.KogitoMarshallEventSupport; import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent; import org.kie.kogito.event.process.ProcessInstanceDataEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JsonSerializer; import com.fasterxml.jackson.databind.SerializerProvider; +import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.writeInt; + public class MultipleProcessInstanceDataEventSerializer extends JsonSerializer { + private static final Logger logger = LoggerFactory.getLogger(MultipleProcessInstanceDataEventDeserializer.class); + private JsonSerializer defaultSerializer; public MultipleProcessInstanceDataEventSerializer(JsonSerializer serializer) { @@ -67,23 +72,42 @@ public void serialize(MultipleProcessInstanceDataEvent value, JsonGenerator gen, private byte[] dataAsBytes(JsonGenerator gen, Collection> data, boolean compress) throws IOException { ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); try (DataOutputStream out = new DataOutputStream(compress ? new GZIPOutputStream(bytesOut) : bytesOut)) { - out.writeShort(data.size()); + logger.trace("Writing size {}", data.size()); + writeInt(out, data.size()); Map infos = new HashMap<>(); for (ProcessInstanceDataEvent cloudEvent : data) { + String key = cloudEvent.getKogitoProcessInstanceId(); ProcessInstanceDataEventExtensionRecord info = infos.get(key); if (info == null) { - out.writeByte(-1); + logger.trace("Writing marker byte -1"); + out.writeByte((byte) -1); info = new ProcessInstanceDataEventExtensionRecord(infos.size(), cloudEvent); + logger.trace("Writing info", info); info.writeEvent(out); infos.put(key, info); } else { + logger.trace("Writing marker byte {}", info.getOrdinal()); out.writeByte((byte) info.getOrdinal()); } + logger.trace("Writing type {}", cloudEvent.getType()); out.writeUTF(cloudEvent.getType()); - KogitoEventBodySerializationHelper.writeInteger(out, cloudEvent.getTime().compareTo(info.getTime())); + int timeDelta = cloudEvent.getTime().compareTo(info.getTime()); + logger.trace("Writing time delta {}", timeDelta); + writeInt(out, timeDelta); + logger.trace("Writing cloud event attrs {}", cloudEvent); KogitoDataEventSerializationHelper.writeCloudEventAttrs(out, cloudEvent); - cloudEvent.getData().writeEvent(out); + KogitoMarshallEventSupport itemData = cloudEvent.getData(); + if (itemData != null) { + logger.trace("Writing data not null boolean"); + out.writeBoolean(true); + logger.trace("Writing cloud event body {}", itemData); + itemData.writeEvent(out); + } else { + logger.trace("Writing data null boolean"); + out.writeBoolean(false); + } + logger.trace("individual event writing completed"); } } return bytesOut.toByteArray(); diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/ProcessInstanceDataEventExtensionRecord.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/ProcessInstanceDataEventExtensionRecord.java index 42cbef5a21d..28abac1a281 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/ProcessInstanceDataEventExtensionRecord.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/serializer/ProcessInstanceDataEventExtensionRecord.java @@ -156,4 +156,13 @@ public void readEvent(DataInput in) throws IOException { source = URI.create(in.readUTF()); addons = readUTF(in); } + + @Override + public String toString() { + return "ProcessInstanceDataEventExtensionRecord [id=" + id + ", instanceId=" + instanceId + ", version=" + + version + ", state=" + state + ", type=" + type + ", parentInstanceId=" + parentInstanceId + + ", rootId=" + rootId + ", rootInstanceId=" + rootInstanceId + ", businessKey=" + businessKey + + ", identity=" + identity + ", source=" + source + ", time=" + time + ", addons=" + addons + "]"; + } + }