Skip to content

Commit

Permalink
[Fix apache#3721] Optimize event grouping
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Oct 21, 2024
1 parent 4d5b927 commit a5a5abd
Show file tree
Hide file tree
Showing 29 changed files with 1,357 additions and 39 deletions.
8 changes: 4 additions & 4 deletions api/kogito-events-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@
</dependency>

<!-- CloudEvents -->
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
</dependency>
<!-- Tests -->
<dependency>
<groupId>org.junit.jupiter</groupId>
Expand All @@ -72,6 +68,10 @@
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-jackson-utils</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.event.process;

import org.kie.kogito.event.DataEvent;

public interface CloudEventVisitor {
void visit(DataEvent<?> cloudEvent);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.event.process;

import java.io.DataInput;
import java.lang.reflect.Array;
import java.io.DataOutput;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Types;
import java.time.OffsetDateTime;
import java.util.Collection;
import java.util.Date;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import org.kie.kogito.jackson.utils.ObjectMapperFactory;

import com.fasterxml.jackson.databind.JsonNode;

public class KogitoEventBodySerializationHelper {

private KogitoEventBodySerializationHelper() {
}

public static String readUTF(DataInput in) throws IOException {
boolean isNotNull = in.readBoolean();
return isNotNull ? in.readUTF() : null;
}

public static void writeUTF(DataOutput out, String string) throws IOException {
if (string == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(string);
}
}

public static Date readDate(DataInput in) throws IOException {
boolean isNotNull = in.readBoolean();
return isNotNull ? new Date(in.readLong()) : null;
}

public static void writeDate(DataOutput out, Date date) throws IOException {
if (date == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeLong(date.getTime());
}
}

public static void writeInt(DataOutput out, Integer integer) throws IOException {
out.writeInt(integer == null ? Integer.MIN_VALUE : integer.intValue());
}

public static Integer readInt(DataInput in) throws IOException {
int integer = in.readInt();
return integer == Integer.MIN_VALUE ? null : Integer.valueOf(integer);
}

public static void writeUTFCollection(DataOutput out, Collection<String> collection) throws IOException {
if (collection == null) {
writeSize(out, -1);
} else {
writeSize(out, collection.size());
for (String item : collection) {
writeUTF(out, item);
}
}
}

public static <T extends Collection<String>> T readUTFCollection(DataInput in, T holder) throws IOException {
int size = readSize(in);
if (size == -1) {
return null;
}
while (size-- > 0) {
holder.add(readUTF(in));
}
return holder;
}

private enum SerType {

NULL(KogitoEventBodySerializationHelper::writeNull, KogitoEventBodySerializationHelper::readNull),
JSON(KogitoEventBodySerializationHelper::writeJson, KogitoEventBodySerializationHelper::readJson),
DEFAULT(KogitoEventBodySerializationHelper::writeJson,KogitoEventBodySerializationHelper::readDefault),
STRING (KogitoEventBodySerializationHelper::writeString, DataInput::readUTF),
INT(KogitoEventBodySerializationHelper::writeInt, DataInput::readInt),
SHORT(KogitoEventBodySerializationHelper::writeShort, DataInput::readShort),
LONG(KogitoEventBodySerializationHelper::writeLong, DataInput::readLong),
BYTE(KogitoEventBodySerializationHelper::writeByte, DataInput::readByte),
BOOLEAN(KogitoEventBodySerializationHelper::writeBoolean, DataInput::readBoolean),
FLOAT(KogitoEventBodySerializationHelper::writeFloat, DataInput::readFloat) ,
DOUBLE(KogitoEventBodySerializationHelper::writeDouble, DataInput::readDouble);


final ObjectWriter writer;
final ObjectReader reader;

SerType (ObjectWriter writer, ObjectReader reader) {
this.writer = writer;
this.reader = reader;
}

ObjectWriter writer() {
return writer;
}

ObjectReader reader() {
return reader;
}


static SerType fromType (Class<?> type) {
if (JsonNode.class.isAssignableFrom(type)) {
return JSON;
} else if (String.class.isAssignableFrom(type)) {
return STRING;
} else if (Boolean.class.isAssignableFrom(type)) {
return BOOLEAN;
} else if (Integer.class.isAssignableFrom(type)) {
return INT;
} else if (Short.class.isAssignableFrom(type)) {
return SHORT;
} else if (Byte.class.isAssignableFrom(type)) {
return BYTE;
} else if (Long.class.isAssignableFrom(type)) {
return LONG;
} else if (Float.class.isAssignableFrom(type)) {
return FLOAT;
} else if (Double.class.isAssignableFrom(type)) {
return DOUBLE;
} else {
return DEFAULT;
}
}

static SerType fromObject (Object obj) {
return obj == null ? NULL : fromType (obj.getClass());
}
}



private static void writeType (DataOutput out, SerType type) throws IOException {
out.writeByte(type.ordinal());
}

private static SerType readType (DataInput in) throws IOException {
return SerType.values()[in.readByte()];
}

public static void writeObject(DataOutput out, Object obj) throws IOException {
SerType type = SerType.fromObject(obj);
writeType (out,type);
type.writer().accept(out, obj);
}

public static Object readObject (DataInput in) throws IOException {
return readType(in).reader().apply(in);
}

@FunctionalInterface
private static interface ObjectWriter {
void accept (DataOutput out, Object obj) throws IOException;
}

private static interface ObjectReader {
Object apply (DataInput out) throws IOException;
}

private static void writeString (DataOutput out, Object obj) throws IOException {
out.writeUTF((String)obj);
}

private static void writeBoolean (DataOutput out, Object obj) throws IOException {
out.writeBoolean((Boolean)obj);
}

private static void writeInt (DataOutput out, Object obj) throws IOException {
out.writeInt((Integer)obj);
}

private static void writeLong (DataOutput out, Object obj) throws IOException {
out.writeInt((Integer)obj);
}

private static void writeShort (DataOutput out, Object obj) throws IOException {
out.writeShort((Short)obj);
}

private static void writeByte (DataOutput out, Object obj) throws IOException {
out.writeByte((Byte)obj);
}

private static void writeFloat (DataOutput out, Object obj) throws IOException {
out.writeFloat((Float)obj);
}

private static void writeDouble (DataOutput out, Object obj) throws IOException {
out.writeDouble((Double)obj);
}



private static void writeNull (DataOutput out, Object obj) {
// do nothing
}

private static Object readNull (DataInput in) {
return null;
}

private static void writeSize (DataOutput out, int size) throws IOException {
if (size < Byte.MAX_VALUE) {
writeType(out,SerType.BYTE);
out.writeByte((byte)size);
} else if (size < Short.MAX_VALUE) {
writeType(out,SerType.SHORT);
out.writeShort((short)size);
} else {
writeType(out,SerType.INT);
out.writeInt(size);
}
}

private static int readSize (DataInput in) throws IOException {
SerType type = readType(in);
switch (type) {
case INT:
return in.readInt();
case SHORT:
return in.readShort();
case BYTE:
return in.readByte();
default:
throw new IOException ("Stream corrupted. Read unrecognized type "+type);
}
}


private static void writeJson (DataOutput out, Object obj) throws IOException {
byte[] bytes = ObjectMapperFactory.get().writeValueAsBytes(obj);
out.writeInt(bytes.length);
out.write(bytes);
}


private static Object readJson(DataInput in) throws IOException {
return readJson (in, JsonNode.class);
}

private static Object readDefault(DataInput in) throws IOException {
return readJson (in, Object.class);
}

private static Object readJson(DataInput in, Class<?> type) throws IOException {
byte[] bytes = new byte[in.readInt()];
in.readFully(bytes);
return ObjectMapperFactory.get().readValue(bytes, type);
}

public static Date toDate(OffsetDateTime time) {
return time == null ? null : Date.from(time.toInstant());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.event.process;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public interface KogitoMarshallEventSupport {

void writeEvent(DataOutput out) throws IOException;

void readEvent(DataInput in) throws IOException;
}
Loading

0 comments on commit a5a5abd

Please sign in to comment.