Skip to content

Commit

Permalink
[Fix apache#3721] Adding traces
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Oct 24, 2024
1 parent 5d36770 commit 7211e81
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,9 @@ public static void writeInteger(DataOutput out, Integer integer) throws IOExcept
public static Integer readInteger(DataInput in) throws IOException {
SerType type = readType(in);
return type == SerType.NULL ? null : readInt(in, type);

}

private static void writeInt(DataOutput out, int size) throws IOException {
public static void writeInt(DataOutput out, int size) throws IOException {
if (size < Byte.MAX_VALUE) {
writeType(out, SerType.BYTE);
out.writeByte((byte) size);
Expand All @@ -253,7 +252,7 @@ private static void writeInt(DataOutput out, int size) throws IOException {
}
}

private static int readInt(DataInput in) throws IOException {
public static int readInt(DataInput in) throws IOException {
SerType type = readType(in);
return readInt(in, type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;
import java.util.zip.GZIPInputStream;

import org.kie.kogito.event.process.CloudEventVisitor;
import org.kie.kogito.event.process.KogitoEventBodySerializationHelper;
import org.kie.kogito.event.process.KogitoMarshallEventSupport;
import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
Expand All @@ -45,6 +45,8 @@
import org.kie.kogito.event.process.ProcessInstanceStateEventBody;
import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent;
import org.kie.kogito.event.process.ProcessInstanceVariableEventBody;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.core.JsonParser;
Expand All @@ -56,8 +58,12 @@

import io.cloudevents.SpecVersion;

import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.readInt;

public class MultipleProcessInstanceDataEventDeserializer extends JsonDeserializer<MultipleProcessInstanceDataEvent> implements ResolvableDeserializer {

private static final Logger logger = LoggerFactory.getLogger(MultipleProcessInstanceDataEventDeserializer.class);

private JsonDeserializer<Object> defaultDeserializer;

public MultipleProcessInstanceDataEventDeserializer(JsonDeserializer<Object> deserializer) {
Expand Down Expand Up @@ -101,24 +107,31 @@ private static boolean isCompressed(JsonNode node) {
public static Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>> readFromBytes(byte[] binaryValue, boolean compressed) throws IOException {
InputStream wrappedIn = new ByteArrayInputStream(binaryValue);
if (compressed) {
logger.trace("Gzip compressed byte array");
wrappedIn = new GZIPInputStream(wrappedIn);
}
try (DataInputStream in = new DataInputStream(wrappedIn)) {
int size = in.readShort();
int size = readInt(in);
logger.trace("Reading collection of size {}", size);
Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>> result = new ArrayList<>(size);
List<ProcessInstanceDataEventExtensionRecord> infos = new ArrayList<>();
while (size-- > 0) {
byte readInfo = in.readByte();
logger.trace("Info ordinal is {}", readInfo);
ProcessInstanceDataEventExtensionRecord info;
if (readInfo == -1) {
info = new ProcessInstanceDataEventExtensionRecord();
info.readEvent(in);
logger.trace("Info readed is {}", info);
infos.add(info);
} else {
info = infos.get(readInfo);
logger.trace("Info cached is {}", info);
}
String type = in.readUTF();
logger.trace("Type is {}", info);
result.add(getCloudEvent(in, type, info));
logger.trace("{} events remaining", size);
}
return result;
}
Expand All @@ -127,31 +140,44 @@ public static Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventS
private static ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport> getCloudEvent(DataInputStream in, String type, ProcessInstanceDataEventExtensionRecord info) throws IOException {
switch (type) {
case ProcessInstanceVariableDataEvent.VAR_TYPE:
ProcessInstanceVariableDataEvent item = buildDataEvent(in, new ProcessInstanceVariableDataEvent(), new ProcessInstanceVariableEventBody(), info);
ProcessInstanceVariableDataEvent item = buildDataEvent(in, new ProcessInstanceVariableDataEvent(), ProcessInstanceVariableEventBody::new, info);
item.setKogitoVariableName(item.getData().getVariableName());
return item;
case ProcessInstanceStateDataEvent.STATE_TYPE:
return buildDataEvent(in, new ProcessInstanceStateDataEvent(), new ProcessInstanceStateEventBody(), info);
return buildDataEvent(in, new ProcessInstanceStateDataEvent(), ProcessInstanceStateEventBody::new, info);
case ProcessInstanceNodeDataEvent.NODE_TYPE:
return buildDataEvent(in, new ProcessInstanceNodeDataEvent(), new ProcessInstanceNodeEventBody(), info);
return buildDataEvent(in, new ProcessInstanceNodeDataEvent(), ProcessInstanceNodeEventBody::new, info);
case ProcessInstanceErrorDataEvent.ERROR_TYPE:
return buildDataEvent(in, new ProcessInstanceErrorDataEvent(), new ProcessInstanceErrorEventBody(), info);
return buildDataEvent(in, new ProcessInstanceErrorDataEvent(), ProcessInstanceErrorEventBody::new, info);
case ProcessInstanceSLADataEvent.SLA_TYPE:
return buildDataEvent(in, new ProcessInstanceSLADataEvent(), new ProcessInstanceSLAEventBody(), info);
return buildDataEvent(in, new ProcessInstanceSLADataEvent(), ProcessInstanceSLAEventBody::new, info);
default:
throw new UnsupportedOperationException("Unrecognized event type " + type);
}
}

private static <T extends ProcessInstanceDataEvent<V>, V extends KogitoMarshallEventSupport & CloudEventVisitor> T buildDataEvent(DataInput in, T cloudEvent, V body,
private static <T extends ProcessInstanceDataEvent<V>, V extends KogitoMarshallEventSupport & CloudEventVisitor> T buildDataEvent(DataInput in, T cloudEvent, Supplier<V> bodySupplier,
ProcessInstanceDataEventExtensionRecord info) throws IOException {
int delta = KogitoEventBodySerializationHelper.readInteger(in);
int delta = readInt(in);
logger.trace("Time delta is {}", delta);
cloudEvent.setTime(info.getTime().plus(delta, ChronoUnit.MILLIS));
KogitoDataEventSerializationHelper.readCloudEventAttrs(in, cloudEvent);
logger.trace("Cloud event before population {}", cloudEvent);
KogitoDataEventSerializationHelper.populateCloudEvent(cloudEvent, info);
body.readEvent(in);
body.visit(cloudEvent);
cloudEvent.setData(body);
logger.trace("Cloud event after population {}", cloudEvent);

boolean isNotNull = in.readBoolean();
if (isNotNull) {
logger.trace("Data is not null");
V body = bodySupplier.get();
body.readEvent(in);
logger.trace("Event body before population {}", body);
body.visit(cloudEvent);
logger.trace("Event body after population {}", body);
cloudEvent.setData(body);
} else {
logger.trace("Data is null");
}
return cloudEvent;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,22 @@
import java.util.Map;
import java.util.zip.GZIPOutputStream;

import org.kie.kogito.event.process.KogitoEventBodySerializationHelper;
import org.kie.kogito.event.process.KogitoMarshallEventSupport;
import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;

import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.writeInt;

public class MultipleProcessInstanceDataEventSerializer extends JsonSerializer<MultipleProcessInstanceDataEvent> {

private static final Logger logger = LoggerFactory.getLogger(MultipleProcessInstanceDataEventDeserializer.class);

private JsonSerializer<Object> defaultSerializer;

public MultipleProcessInstanceDataEventSerializer(JsonSerializer<Object> serializer) {
Expand Down Expand Up @@ -67,23 +72,42 @@ public void serialize(MultipleProcessInstanceDataEvent value, JsonGenerator gen,
private byte[] dataAsBytes(JsonGenerator gen, Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>> data, boolean compress) throws IOException {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
try (DataOutputStream out = new DataOutputStream(compress ? new GZIPOutputStream(bytesOut) : bytesOut)) {
out.writeShort(data.size());
logger.trace("Writing size {}", data.size());
writeInt(out, data.size());
Map<String, ProcessInstanceDataEventExtensionRecord> infos = new HashMap<>();
for (ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport> cloudEvent : data) {

String key = cloudEvent.getKogitoProcessInstanceId();
ProcessInstanceDataEventExtensionRecord info = infos.get(key);
if (info == null) {
out.writeByte(-1);
logger.trace("Writing marker byte -1");
out.writeByte((byte) -1);
info = new ProcessInstanceDataEventExtensionRecord(infos.size(), cloudEvent);
logger.trace("Writing info", info);
info.writeEvent(out);
infos.put(key, info);
} else {
logger.trace("Writing marker byte {}", info.getOrdinal());
out.writeByte((byte) info.getOrdinal());
}
logger.trace("Writing type {}", cloudEvent.getType());
out.writeUTF(cloudEvent.getType());
KogitoEventBodySerializationHelper.writeInteger(out, cloudEvent.getTime().compareTo(info.getTime()));
int timeDelta = cloudEvent.getTime().compareTo(info.getTime());
logger.trace("Writing time delta {}", timeDelta);
writeInt(out, timeDelta);
logger.trace("Writing cloud event attrs {}", cloudEvent);
KogitoDataEventSerializationHelper.writeCloudEventAttrs(out, cloudEvent);
cloudEvent.getData().writeEvent(out);
KogitoMarshallEventSupport itemData = cloudEvent.getData();
if (itemData != null) {
logger.trace("Writing data not null boolean");
out.writeBoolean(true);
logger.trace("Writing cloud event body {}", itemData);
itemData.writeEvent(out);
} else {
logger.trace("Writing data null boolean");
out.writeBoolean(false);
}
logger.trace("individual event writing completed");
}
}
return bytesOut.toByteArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,13 @@ public void readEvent(DataInput in) throws IOException {
source = URI.create(in.readUTF());
addons = readUTF(in);
}

@Override
public String toString() {
return "ProcessInstanceDataEventExtensionRecord [id=" + id + ", instanceId=" + instanceId + ", version="
+ version + ", state=" + state + ", type=" + type + ", parentInstanceId=" + parentInstanceId
+ ", rootId=" + rootId + ", rootInstanceId=" + rootInstanceId + ", businessKey=" + businessKey
+ ", identity=" + identity + ", source=" + source + ", time=" + time + ", addons=" + addons + "]";
}

}

0 comments on commit 7211e81

Please sign in to comment.