From b81ff21b9e359d23b7c588b9d3b042cc0043db9e Mon Sep 17 00:00:00 2001 From: Francisco Javier Tirado Sarti <65240126+fjtirado@users.noreply.github.com> Date: Fri, 4 Oct 2024 17:44:55 +0200 Subject: [PATCH] [incubator-kie-issues#1457] Allow grouping of events (#3654) * [Fix apache/incubator-kie-issues#1457] Allow grouping of events * Add test for coverage * [Fix apache/incubator-kie-issues#1457] Grouping of event serialization * [Fix apache/incubator-kie-issues#1457] Adding default constructor * [Fix apache/incubator-kie-issues#1457] Adding TYPE constant --------- Co-authored-by: gmunozfe --- .../kie/kogito/event/AbstractDataEvent.java | 16 +- .../MultipleProcessInstanceDataEvent.java | 34 ++ .../process/ProcessInstanceDataEvent.java | 6 + .../MultipleUserTaskInstanceDataEvent.java | 34 ++ .../usertask/UserTaskInstanceDataEvent.java | 5 + kogito-build/kogito-dependencies-bom/pom.xml | 2 +- quarkus/addons/events/process/runtime/pom.xml | 11 + .../AbstractMessagingEventPublisher.java | 188 +++++++++ .../GroupingMessagingEventPublisher.java | 72 ++++ .../ReactiveMessagingEventPublisher.java | 135 +----- .../GroupingMessagingEventPublisherTest.java | 392 ++++++++++++++++++ 11 files changed, 757 insertions(+), 138 deletions(-) create mode 100644 api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java create mode 100644 api/kogito-events-core/src/main/java/org/kie/kogito/event/usertask/MultipleUserTaskInstanceDataEvent.java create mode 100644 quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/AbstractMessagingEventPublisher.java create mode 100644 quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/GroupingMessagingEventPublisher.java create mode 100644 quarkus/addons/events/process/runtime/src/test/java/org/kie/kogito/events/process/GroupingMessagingEventPublisherTest.java diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/AbstractDataEvent.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/AbstractDataEvent.java index 2fcdd704196..065111c8519 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/AbstractDataEvent.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/AbstractDataEvent.java @@ -177,6 +177,15 @@ public abstract class AbstractDataEvent implements DataEvent { protected AbstractDataEvent() { } + protected AbstractDataEvent(String type, URI source, T body) { + this.specVersion = SpecVersion.parse(SPEC_VERSION); + this.id = UUID.randomUUID().toString(); + this.source = source; + this.type = type; + this.time = ZonedDateTime.now().toOffsetDateTime(); + this.data = body; + } + protected AbstractDataEvent(String type, String source, T body, @@ -201,12 +210,7 @@ protected AbstractDataEvent(String type, String subject, String dataContentType, String dataSchema) { - this.specVersion = SpecVersion.parse(SPEC_VERSION); - this.id = UUID.randomUUID().toString(); - this.source = Optional.ofNullable(source).map(URI::create).orElse(null); - this.type = type; - this.time = ZonedDateTime.now().toOffsetDateTime(); - this.data = body; + this(type, Optional.ofNullable(source).map(URI::create).orElse(null), body); setKogitoProcessInstanceId(kogitoProcessInstanceId); setKogitoRootProcessInstanceId(kogitoRootProcessInstanceId); setKogitoProcessId(kogitoProcessId); diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java new file mode 100644 index 00000000000..7db8c0e7659 --- /dev/null +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/MultipleProcessInstanceDataEvent.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.event.process; + +import java.net.URI; +import java.util.Collection; + +public class MultipleProcessInstanceDataEvent extends ProcessInstanceDataEvent>> { + + public static final String TYPE = "MultipleProcessInstanceDataEvent"; + + public MultipleProcessInstanceDataEvent() { + } + + public MultipleProcessInstanceDataEvent(URI source, Collection> body) { + super(TYPE, source, body); + } +} diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceDataEvent.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceDataEvent.java index 8069df7ee39..31131563dcc 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceDataEvent.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/process/ProcessInstanceDataEvent.java @@ -18,6 +18,8 @@ */ package org.kie.kogito.event.process; +import java.net.URI; + import org.kie.kogito.event.AbstractDataEvent; public class ProcessInstanceDataEvent extends AbstractDataEvent { @@ -29,6 +31,10 @@ public ProcessInstanceDataEvent(T body) { setData(body); } + protected ProcessInstanceDataEvent(String type, URI source, T body) { + super(type, source, body); + } + public ProcessInstanceDataEvent(String type, String source, T body, diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/usertask/MultipleUserTaskInstanceDataEvent.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/usertask/MultipleUserTaskInstanceDataEvent.java new file mode 100644 index 00000000000..b2b62c61d83 --- /dev/null +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/usertask/MultipleUserTaskInstanceDataEvent.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.event.usertask; + +import java.net.URI; +import java.util.Collection; + +public class MultipleUserTaskInstanceDataEvent extends UserTaskInstanceDataEvent>> { + + public static final String TYPE = "MultipleUserTaskInstanceDataEvent"; + + public MultipleUserTaskInstanceDataEvent() { + } + + public MultipleUserTaskInstanceDataEvent(URI source, Collection> body) { + super(TYPE, source, body); + } +} diff --git a/api/kogito-events-core/src/main/java/org/kie/kogito/event/usertask/UserTaskInstanceDataEvent.java b/api/kogito-events-core/src/main/java/org/kie/kogito/event/usertask/UserTaskInstanceDataEvent.java index 98fc6528094..c4b3e0af5c9 100644 --- a/api/kogito-events-core/src/main/java/org/kie/kogito/event/usertask/UserTaskInstanceDataEvent.java +++ b/api/kogito-events-core/src/main/java/org/kie/kogito/event/usertask/UserTaskInstanceDataEvent.java @@ -18,6 +18,7 @@ */ package org.kie.kogito.event.usertask; +import java.net.URI; import java.util.Set; import org.kie.kogito.event.AbstractDataEvent; @@ -48,6 +49,10 @@ public UserTaskInstanceDataEvent(T body) { setData(body); } + protected UserTaskInstanceDataEvent(String type, URI source, T body) { + super(type, source, body); + } + public UserTaskInstanceDataEvent(String type, String source, T body, diff --git a/kogito-build/kogito-dependencies-bom/pom.xml b/kogito-build/kogito-dependencies-bom/pom.xml index 0c815fc2353..26fae00aa0d 100644 --- a/kogito-build/kogito-dependencies-bom/pom.xml +++ b/kogito-build/kogito-dependencies-bom/pom.xml @@ -52,7 +52,7 @@ 2.0.2 2.4.1 0.3.0 - 2.2.0 + 2.4.1 0.2.3 1.5.2 3.25.8 diff --git a/quarkus/addons/events/process/runtime/pom.xml b/quarkus/addons/events/process/runtime/pom.xml index dc5d417218d..50ef92684c1 100644 --- a/quarkus/addons/events/process/runtime/pom.xml +++ b/quarkus/addons/events/process/runtime/pom.xml @@ -78,6 +78,17 @@ org.slf4j slf4j-api + + + org.junit.jupiter + junit-jupiter + test + + + io.quarkus + quarkus-junit5-mockito + test + diff --git a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/AbstractMessagingEventPublisher.java b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/AbstractMessagingEventPublisher.java new file mode 100644 index 00000000000..f8092c83575 --- /dev/null +++ b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/AbstractMessagingEventPublisher.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.events.process; + +import java.util.Collection; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.function.Consumer; + +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.OnOverflow; +import org.eclipse.microprofile.reactive.messaging.OnOverflow.Strategy; +import org.kie.kogito.addon.quarkus.common.reactive.messaging.MessageDecoratorProvider; +import org.kie.kogito.event.DataEvent; +import org.kie.kogito.event.EventPublisher; +import org.kie.kogito.events.config.EventsRuntimeConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.smallrye.reactive.messaging.MutinyEmitter; +import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage; + +import jakarta.annotation.PostConstruct; +import jakarta.enterprise.inject.Instance; +import jakarta.inject.Inject; + +public abstract class AbstractMessagingEventPublisher implements EventPublisher { + + private static final Logger logger = LoggerFactory.getLogger(AbstractMessagingEventPublisher.class); + + @Inject + ObjectMapper json; + + @Inject + @Channel(PROCESS_INSTANCES_TOPIC_NAME) + @OnOverflow(Strategy.UNBOUNDED_BUFFER) + MutinyEmitter processInstancesEventsEmitter; + private AbstractMessageEmitter processInstanceConsumer; + + @Inject + @Channel(PROCESS_DEFINITIONS_TOPIC_NAME) + MutinyEmitter processDefinitionEventsEmitter; + private AbstractMessageEmitter processDefinitionConsumer; + + @Inject + @Channel(USER_TASK_INSTANCES_TOPIC_NAME) + MutinyEmitter userTasksEventsEmitter; + private AbstractMessageEmitter userTaskConsumer; + @Inject + EventsRuntimeConfig eventsRuntimeConfig; + + @Inject + Instance decoratorProviderInstance; + + private MessageDecoratorProvider decoratorProvider; + + @PostConstruct + public void init() { + decoratorProvider = decoratorProviderInstance.isResolvable() ? decoratorProviderInstance.get() : null; + processDefinitionConsumer = eventsRuntimeConfig.isProcessInstancesPropagateError() ? new BlockingMessageEmitter(processDefinitionEventsEmitter, PROCESS_DEFINITIONS_TOPIC_NAME) + : new ReactiveMessageEmitter(processDefinitionEventsEmitter, PROCESS_DEFINITIONS_TOPIC_NAME); + processInstanceConsumer = eventsRuntimeConfig.isProcessDefinitionPropagateError() ? new BlockingMessageEmitter(processInstancesEventsEmitter, PROCESS_INSTANCES_TOPIC_NAME) + : new ReactiveMessageEmitter(processInstancesEventsEmitter, PROCESS_INSTANCES_TOPIC_NAME); + userTaskConsumer = eventsRuntimeConfig.isUserTasksPropagateError() ? new BlockingMessageEmitter(userTasksEventsEmitter, USER_TASK_INSTANCES_TOPIC_NAME) + : new ReactiveMessageEmitter(userTasksEventsEmitter, USER_TASK_INSTANCES_TOPIC_NAME); + } + + protected Optional getConsumer(DataEvent event) { + if (event == null) { + return Optional.empty(); + } + switch (event.getType()) { + case "ProcessDefinitionEvent": + return eventsRuntimeConfig.isProcessDefinitionEventsEnabled() ? Optional.of(processDefinitionConsumer) : Optional.empty(); + + case "ProcessInstanceErrorDataEvent": + case "ProcessInstanceNodeDataEvent": + case "ProcessInstanceSLADataEvent": + case "ProcessInstanceStateDataEvent": + case "ProcessInstanceVariableDataEvent": + return eventsRuntimeConfig.isProcessInstancesEventsEnabled() ? Optional.of(processInstanceConsumer) : Optional.empty(); + + case "UserTaskInstanceAssignmentDataEvent": + case "UserTaskInstanceAttachmentDataEvent": + case "UserTaskInstanceCommentDataEvent": + case "UserTaskInstanceDeadlineDataEvent": + case "UserTaskInstanceStateDataEvent": + case "UserTaskInstanceVariableDataEvent": + return eventsRuntimeConfig.isUserTasksEventsEnabled() ? Optional.of(userTaskConsumer) : Optional.empty(); + + default: + return Optional.empty(); + } + } + + @Override + public void publish(Collection> events) { + for (DataEvent event : events) { + publish(event); + } + } + + protected void publishToTopic(AbstractMessageEmitter emitter, Object event) { + logger.debug("About to publish event {} to topic {}", event, emitter.topic); + Message message = null; + try { + String eventString = json.writeValueAsString(event); + logger.debug("Event payload '{}'", eventString); + message = decorateMessage(ContextAwareMessage.of(eventString)); + } catch (Exception e) { + logger.error("Error while creating event to topic {} for event {}", emitter.topic, event); + } + if (message != null) { + emitter.accept(message); + } + } + + protected Message decorateMessage(Message message) { + return decoratorProvider != null ? decoratorProvider.decorate(message) : message; + } + + protected static abstract class AbstractMessageEmitter implements Consumer> { + + protected final String topic; + protected final MutinyEmitter emitter; + + protected AbstractMessageEmitter(MutinyEmitter emitter, String topic) { + this.emitter = emitter; + this.topic = topic; + } + } + + private static class BlockingMessageEmitter extends AbstractMessageEmitter { + protected BlockingMessageEmitter(MutinyEmitter emitter, String topic) { + super(emitter, topic); + } + + @Override + public void accept(Message message) { + emitter.sendMessageAndAwait(message); + logger.debug("Successfully published message {}", message.getPayload()); + } + } + + private static class ReactiveMessageEmitter extends AbstractMessageEmitter { + protected ReactiveMessageEmitter(MutinyEmitter emitter, String topic) { + super(emitter, topic); + } + + @Override + public void accept(Message message) { + emitter.sendMessageAndForget(message + .withAck(() -> onAck(message)) + .withNack(reason -> onNack(reason, message))); + } + + private CompletionStage onAck(Message message) { + logger.debug("Successfully published message {}", message.getPayload()); + return CompletableFuture.completedFuture(null); + } + + private CompletionStage onNack(Throwable reason, Message message) { + logger.error("Error while publishing message {}", message, reason); + return CompletableFuture.completedFuture(null); + } + + } +} diff --git a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/GroupingMessagingEventPublisher.java b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/GroupingMessagingEventPublisher.java new file mode 100644 index 00000000000..ee1f284b3a8 --- /dev/null +++ b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/GroupingMessagingEventPublisher.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.events.process; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.kie.kogito.event.DataEvent; +import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent; +import org.kie.kogito.event.process.ProcessInstanceDataEvent; +import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent; +import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent; + +import io.quarkus.arc.properties.IfBuildProperty; + +import jakarta.inject.Singleton; + +@Singleton +@IfBuildProperty(name = "kogito.events.grouping", stringValue = "true") +public class GroupingMessagingEventPublisher extends AbstractMessagingEventPublisher { + + @Override + public void publish(DataEvent event) { + publish(Collections.singletonList(event)); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public void publish(Collection> events) { + Map eventsByChannel = new HashMap<>(); + for (DataEvent event : events) { + getConsumer(event).ifPresent(c -> eventsByChannel.computeIfAbsent(c, k -> new ArrayList<>()).add(event)); + } + eventsByChannel.entrySet().forEach(this::publishEvents); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private void publishEvents(Map.Entry entry) { + DataEvent firstEvent = (DataEvent) entry.getValue().iterator().next(); + URI source = firstEvent.getSource(); + if (firstEvent instanceof UserTaskInstanceDataEvent) { + publishToTopic(entry.getKey(), new MultipleUserTaskInstanceDataEvent(source, (Collection>) entry.getValue())); + } else if (firstEvent instanceof ProcessInstanceDataEvent) { + publishToTopic(entry.getKey(), new MultipleProcessInstanceDataEvent(source, (Collection>) entry.getValue())); + } else { + for (DataEvent event : (Collection>) entry.getValue()) { + publishToTopic(entry.getKey(), event); + } + } + } +} diff --git a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/ReactiveMessagingEventPublisher.java b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/ReactiveMessagingEventPublisher.java index c232a1eb471..c6aa4424f0c 100644 --- a/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/ReactiveMessagingEventPublisher.java +++ b/quarkus/addons/events/process/runtime/src/main/java/org/kie/kogito/events/process/ReactiveMessagingEventPublisher.java @@ -19,102 +19,20 @@ package org.kie.kogito.events.process; import java.util.Collection; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.function.BiConsumer; -import org.eclipse.microprofile.reactive.messaging.Channel; -import org.eclipse.microprofile.reactive.messaging.Message; -import org.eclipse.microprofile.reactive.messaging.OnOverflow; -import org.eclipse.microprofile.reactive.messaging.OnOverflow.Strategy; -import org.kie.kogito.addon.quarkus.common.reactive.messaging.MessageDecoratorProvider; import org.kie.kogito.event.DataEvent; -import org.kie.kogito.event.EventPublisher; -import org.kie.kogito.events.config.EventsRuntimeConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; +import io.quarkus.arc.properties.UnlessBuildProperty; -import io.smallrye.reactive.messaging.MutinyEmitter; -import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage; - -import jakarta.annotation.PostConstruct; -import jakarta.enterprise.inject.Instance; -import jakarta.inject.Inject; import jakarta.inject.Singleton; @Singleton -public class ReactiveMessagingEventPublisher implements EventPublisher { - - private static final Logger logger = LoggerFactory.getLogger(ReactiveMessagingEventPublisher.class); - - @Inject - ObjectMapper json; - - @Inject - @Channel(PROCESS_INSTANCES_TOPIC_NAME) - @OnOverflow(Strategy.UNBOUNDED_BUFFER) - MutinyEmitter processInstancesEventsEmitter; - private BiConsumer, Message> processInstanceConsumer; - - @Inject - @Channel(PROCESS_DEFINITIONS_TOPIC_NAME) - MutinyEmitter processDefinitionEventsEmitter; - private BiConsumer, Message> processDefinitionConsumer; - - @Inject - @Channel(USER_TASK_INSTANCES_TOPIC_NAME) - MutinyEmitter userTasksEventsEmitter; - private BiConsumer, Message> userTaskConsumer; - @Inject - EventsRuntimeConfig eventsRuntimeConfig; - - @Inject - Instance decoratorProviderInstance; - - private MessageDecoratorProvider decoratorProvider; - - @PostConstruct - public void init() { - decoratorProvider = decoratorProviderInstance.isResolvable() ? decoratorProviderInstance.get() : null; - processInstanceConsumer = eventsRuntimeConfig.isProcessInstancesPropagateError() ? new BlockingMessageEmitter() : new ReactiveMessageEmitter(); - processDefinitionConsumer = eventsRuntimeConfig.isProcessDefinitionPropagateError() ? new BlockingMessageEmitter() : new ReactiveMessageEmitter(); - userTaskConsumer = eventsRuntimeConfig.isUserTasksPropagateError() ? new BlockingMessageEmitter() : new ReactiveMessageEmitter(); - } +@UnlessBuildProperty(name = "kogito.events.grouping", stringValue = "true", enableIfMissing = true) +public class ReactiveMessagingEventPublisher extends AbstractMessagingEventPublisher { @Override public void publish(DataEvent event) { - - switch (event.getType()) { - case "ProcessDefinitionEvent": - if (eventsRuntimeConfig.isProcessDefinitionEventsEnabled()) { - publishToTopic(processDefinitionConsumer, event, processDefinitionEventsEmitter, PROCESS_DEFINITIONS_TOPIC_NAME); - } - break; - case "ProcessInstanceErrorDataEvent": - case "ProcessInstanceNodeDataEvent": - case "ProcessInstanceSLADataEvent": - case "ProcessInstanceStateDataEvent": - case "ProcessInstanceVariableDataEvent": - if (eventsRuntimeConfig.isProcessInstancesEventsEnabled()) { - publishToTopic(processInstanceConsumer, event, processInstancesEventsEmitter, PROCESS_INSTANCES_TOPIC_NAME); - } - break; - - case "UserTaskInstanceAssignmentDataEvent": - case "UserTaskInstanceAttachmentDataEvent": - case "UserTaskInstanceCommentDataEvent": - case "UserTaskInstanceDeadlineDataEvent": - case "UserTaskInstanceStateDataEvent": - case "UserTaskInstanceVariableDataEvent": - if (eventsRuntimeConfig.isUserTasksEventsEnabled()) { - publishToTopic(userTaskConsumer, event, userTasksEventsEmitter, USER_TASK_INSTANCES_TOPIC_NAME); - } - break; - default: - logger.debug("Unknown type of event '{}', ignoring for this publisher", event.getType()); - } + getConsumer(event).ifPresent(emitter -> publishToTopic(emitter, event)); } @Override @@ -124,49 +42,4 @@ public void publish(Collection> events) { } } - protected void publishToTopic(BiConsumer, Message> consumer, DataEvent event, MutinyEmitter emitter, String topic) { - logger.debug("About to publish event {} to topic {}", event, topic); - Message message = null; - try { - String eventString = json.writeValueAsString(event); - logger.debug("Event payload '{}'", eventString); - message = decorateMessage(ContextAwareMessage.of(eventString)); - } catch (Exception e) { - logger.error("Error while creating event to topic {} for event {}", topic, event); - } - if (message != null) { - consumer.accept(emitter, message); - } - } - - protected CompletionStage onAck(Message message) { - logger.debug("Successfully published message {}", message.getPayload()); - return CompletableFuture.completedFuture(null); - } - - protected CompletionStage onNack(Throwable reason, Message message) { - logger.error("Error while publishing message {}", message, reason); - return CompletableFuture.completedFuture(null); - } - - protected Message decorateMessage(Message message) { - return decoratorProvider != null ? decoratorProvider.decorate(message) : message; - } - - private class BlockingMessageEmitter implements BiConsumer, Message> { - @Override - public void accept(MutinyEmitter emitter, Message message) { - emitter.sendMessageAndAwait(message); - logger.debug("Successfully published message {}", message.getPayload()); - } - } - - private class ReactiveMessageEmitter implements BiConsumer, Message> { - @Override - public void accept(MutinyEmitter emitter, Message message) { - emitter.sendMessageAndForget(message - .withAck(() -> onAck(message)) - .withNack(reason -> onNack(reason, message))); - } - } } diff --git a/quarkus/addons/events/process/runtime/src/test/java/org/kie/kogito/events/process/GroupingMessagingEventPublisherTest.java b/quarkus/addons/events/process/runtime/src/test/java/org/kie/kogito/events/process/GroupingMessagingEventPublisherTest.java new file mode 100644 index 00000000000..0d784d3a53b --- /dev/null +++ b/quarkus/addons/events/process/runtime/src/test/java/org/kie/kogito/events/process/GroupingMessagingEventPublisherTest.java @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.kie.kogito.events.process; + +import java.util.*; + +import org.eclipse.microprofile.reactive.messaging.Message; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.kie.kogito.addon.quarkus.common.reactive.messaging.MessageDecoratorProvider; +import org.kie.kogito.event.DataEvent; +import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent; +import org.kie.kogito.event.process.ProcessInstanceDataEvent; +import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent; +import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent; +import org.kie.kogito.events.config.EventsRuntimeConfig; +import org.kie.kogito.events.process.AbstractMessagingEventPublisher.AbstractMessageEmitter; +import org.mockito.ArgumentCaptor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.Spy; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.smallrye.reactive.messaging.MutinyEmitter; + +import jakarta.enterprise.inject.Instance; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +@SuppressWarnings("unchecked") +public class GroupingMessagingEventPublisherTest { + + @Mock + private ObjectMapper json; + + @Mock + private MutinyEmitter processInstancesEventsEmitter; + + @Mock + private MutinyEmitter processDefinitionEventsEmitter; + + @Mock + private MutinyEmitter userTasksEventsEmitter; + + @Mock + private EventsRuntimeConfig eventsRuntimeConfig; + + @Mock + private MessageDecoratorProvider decoratorProvider; + + @Mock + private Message decoratedMessage; + + @Mock + private Instance decoratorProviderInstance; + + @Mock + private AbstractMessagingEventPublisher.AbstractMessageEmitter processInstanceConsumer; + + @Mock + private AbstractMessagingEventPublisher.AbstractMessageEmitter userTaskConsumer; + + @Mock + private AbstractMessagingEventPublisher.AbstractMessageEmitter processDefinitionConsumer; + + @Spy + @InjectMocks + private GroupingMessagingEventPublisher groupingMessagingEventPublisher; + + @Spy + @InjectMocks + private ReactiveMessagingEventPublisher reactiveMessagingEventPublisher; + + @BeforeEach + void setUp() { + MockitoAnnotations.openMocks(this); + + when(decoratorProviderInstance.isResolvable()).thenReturn(true); + when(decoratorProviderInstance.get()).thenReturn(decoratorProvider); + + when(eventsRuntimeConfig.isProcessInstancesPropagateError()).thenReturn(false); + when(eventsRuntimeConfig.isProcessDefinitionPropagateError()).thenReturn(false); + when(eventsRuntimeConfig.isUserTasksPropagateError()).thenReturn(false); + + when(eventsRuntimeConfig.isProcessInstancesEventsEnabled()).thenReturn(true); + when(eventsRuntimeConfig.isUserTasksEventsEnabled()).thenReturn(true); + } + + @Test + public void testGroupingMessagingEventPublisher_publish() throws Exception { + DataEvent event = mock(DataEvent.class); + when(event.getType()).thenReturn("ProcessInstanceErrorDataEvent"); + + // Test initialization + groupingMessagingEventPublisher.init(); + when(decoratorProvider.decorate(any(Message.class))).thenReturn(decoratedMessage); + + // Mock the message behavior + mockMessageForBothAckNack(decoratedMessage); + + // Call method + groupingMessagingEventPublisher.publish(event); + + // Verify that the consumer has been invoked + verify(processInstancesEventsEmitter).sendMessageAndForget(any()); + } + + @Test + public void testReactiveMessagingEventPublisher_publish() throws Exception { + DataEvent event = mock(DataEvent.class); + when(event.getType()).thenReturn("ProcessInstanceErrorDataEvent"); + + // Test initialization + reactiveMessagingEventPublisher.init(); + when(decoratorProvider.decorate(any(Message.class))).thenReturn(decoratedMessage); + + // Mock the message behavior + mockMessageForBothAckNack(decoratedMessage); + + // Call method + reactiveMessagingEventPublisher.publish(event); + + // Verify that the consumer has been invoked + verify(processInstancesEventsEmitter).sendMessageAndForget(any()); + } + + @Test + public void testPublishGroupingByChannel() { + // Create mock events + DataEvent processInstanceEvent = mock(ProcessInstanceDataEvent.class); + when(processInstanceEvent.getType()).thenReturn("ProcessInstanceStateDataEvent"); + + DataEvent userTaskEvent = mock(UserTaskInstanceDataEvent.class); + when(userTaskEvent.getType()).thenReturn("UserTaskInstanceStateDataEvent"); + + // Mock getConsumer() to return different emitters based on event type + doReturn(Optional.of(processInstanceConsumer)).when(groupingMessagingEventPublisher).getConsumer(processInstanceEvent); + doReturn(Optional.of(userTaskConsumer)).when(groupingMessagingEventPublisher).getConsumer(userTaskEvent); + + // Create a collection of events with different types (ProcessInstance and UserTask) + Collection> events = Arrays.asList(processInstanceEvent, userTaskEvent); + + // Spy on the publisher's internal method to verify the calls + doNothing().when(groupingMessagingEventPublisher).publishToTopic(any(), any()); + + // Invoke the method to test + groupingMessagingEventPublisher.publish(events); + + // Capture and verify that the correct emitter was used for each event + verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), any(MultipleProcessInstanceDataEvent.class)); + verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(userTaskConsumer), any(MultipleUserTaskInstanceDataEvent.class)); + } + + @Test + public void testPublishMultipleEventsGroupedByChannel() { + // Create multiple events of different types + DataEvent processInstanceEvent1 = mock(ProcessInstanceDataEvent.class); + DataEvent processInstanceEvent2 = mock(ProcessInstanceDataEvent.class); + DataEvent userTaskEvent1 = mock(UserTaskInstanceDataEvent.class); + DataEvent userTaskEvent2 = mock(UserTaskInstanceDataEvent.class); + + when(processInstanceEvent1.getType()).thenReturn("ProcessInstanceStateDataEvent"); + when(processInstanceEvent2.getType()).thenReturn("ProcessInstanceStateDataEvent"); + when(userTaskEvent1.getType()).thenReturn("UserTaskInstanceStateDataEvent"); + when(userTaskEvent2.getType()).thenReturn("UserTaskInstanceStateDataEvent"); + + // Mock getConsumer() to return corresponding emitters for event types + doReturn(Optional.of(processInstanceConsumer)).when(groupingMessagingEventPublisher).getConsumer(processInstanceEvent1); + doReturn(Optional.of(processInstanceConsumer)).when(groupingMessagingEventPublisher).getConsumer(processInstanceEvent2); + doReturn(Optional.of(userTaskConsumer)).when(groupingMessagingEventPublisher).getConsumer(userTaskEvent1); + doReturn(Optional.of(userTaskConsumer)).when(groupingMessagingEventPublisher).getConsumer(userTaskEvent2); + + // Create a collection of events that would be grouped by channel + Collection> events = Arrays.asList(processInstanceEvent1, processInstanceEvent2, userTaskEvent1, userTaskEvent2); + + // Spy on the internal publishToTopic to verify grouping + doNothing().when(groupingMessagingEventPublisher).publishToTopic(any(), any()); + + // Invoke the method to test + groupingMessagingEventPublisher.publish(events); + + // Verify that two grouped publishToTopic calls are made: one for processInstanceConsumer, one for userTaskConsumer + verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), any(MultipleProcessInstanceDataEvent.class)); + verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(userTaskConsumer), any(MultipleUserTaskInstanceDataEvent.class)); + + // Verify that the right number of events was grouped and passed to each emitter + ArgumentCaptor captorPI = ArgumentCaptor.forClass(MultipleProcessInstanceDataEvent.class); + + verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), captorPI.capture()); + MultipleProcessInstanceDataEvent groupedProcessInstanceEvents = captorPI.getValue(); + assertEquals(2, groupedProcessInstanceEvents.getData().size()); // both processInstanceEvents are grouped + + ArgumentCaptor captorUT = ArgumentCaptor.forClass(MultipleUserTaskInstanceDataEvent.class); + + verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(userTaskConsumer), captorUT.capture()); + MultipleUserTaskInstanceDataEvent groupedUserTaskEvents = captorUT.getValue(); + assertEquals(2, groupedUserTaskEvents.getData().size()); // both userTaskEvents are grouped + } + + @Test + public void testPublishEmptyEventsCollection() { + Collection> events = Collections.emptyList(); + + // Spy on the internal publishToTopic to verify no calls are made + doNothing().when(groupingMessagingEventPublisher).publishToTopic(any(), any()); + + groupingMessagingEventPublisher.publish(events); + + // Verify that publishToTopic is never called + verify(groupingMessagingEventPublisher, never()).publishToTopic(any(), anyCollection()); + } + + @Test + public void testNoConsumersFound() { + DataEvent processInstanceEvent = mock(DataEvent.class); + when(processInstanceEvent.getType()).thenReturn("ProcessInstanceStateDataEvent"); + + DataEvent userTaskEvent = mock(DataEvent.class); + when(userTaskEvent.getType()).thenReturn("UserTaskInstanceStateDataEvent"); + + // Mock getConsumer() to return empty optionals (no consumers found) + doReturn(Optional.empty()).when(groupingMessagingEventPublisher).getConsumer(processInstanceEvent); + doReturn(Optional.empty()).when(groupingMessagingEventPublisher).getConsumer(userTaskEvent); + + // Create a collection of events + Collection> events = Arrays.asList(processInstanceEvent, userTaskEvent); + + // Spy on the publisher's internal method to verify no calls are made + doNothing().when(groupingMessagingEventPublisher).publishToTopic(any(), any()); + + // Invoke the method to test + groupingMessagingEventPublisher.publish(events); + + // Verify that publishToTopic is never called since no consumers were found + verify(groupingMessagingEventPublisher, never()).publishToTopic(any(), anyCollection()); + } + + @Test + void testPublishToTopic_ExceptionHandling() throws Exception { + DataEvent event = mock(DataEvent.class); + when(event.getType()).thenReturn("ProcessInstanceErrorDataEvent"); + + groupingMessagingEventPublisher.init(); + when(decoratorProvider.decorate(any(Message.class))).thenThrow(new RuntimeException("Serialization error")); + + // Mock the message behavior + mockMessageForBothAckNack(decoratedMessage); + + // Call method + groupingMessagingEventPublisher.publish(event); + + // Check that emitter.sendMessageAndForget was never called + verify(processInstancesEventsEmitter, never()).sendMessageAndForget(any()); + } + + @Test + public void testPublishUnsupportedEventType() { + DataEvent unsupportedEvent = mock(DataEvent.class); + when(unsupportedEvent.getType()).thenReturn("UnsupportedEvent"); + + doReturn(Optional.empty()).when(groupingMessagingEventPublisher).getConsumer(unsupportedEvent); + + Collection> events = Collections.singletonList(unsupportedEvent); + + groupingMessagingEventPublisher.publish(events); + + // Verify no publishing occurred since no consumer exists for unsupported event + verify(groupingMessagingEventPublisher, never()).publishToTopic(any(), anyCollection()); + } + + @Test + public void testEventsDisabledInConfig() { + DataEvent processInstanceEvent = mock(DataEvent.class); + when(processInstanceEvent.getType()).thenReturn("ProcessInstanceStateDataEvent"); + + DataEvent userTaskEvent = mock(DataEvent.class); + when(userTaskEvent.getType()).thenReturn("UserTaskInstanceStateDataEvent"); + + // Disable process and user task events in the config + when(eventsRuntimeConfig.isProcessInstancesEventsEnabled()).thenReturn(false); + when(eventsRuntimeConfig.isUserTasksEventsEnabled()).thenReturn(false); + + Collection> events = Arrays.asList(processInstanceEvent, userTaskEvent); + + groupingMessagingEventPublisher.publish(events); + + // Verify no publishing occurred since events are disabled + verify(groupingMessagingEventPublisher, never()).publishToTopic(any(), anyCollection()); + } + + @Test + public void testNullEventInCollection() { + DataEvent validEvent = mock(ProcessInstanceDataEvent.class); + when(validEvent.getType()).thenReturn("ProcessInstanceStateDataEvent"); + + Collection> events = Arrays.asList(validEvent, null); // One valid event and one null event + + // Return a mock consumer for the valid event + doReturn(Optional.of(processInstanceConsumer)).when(groupingMessagingEventPublisher).getConsumer(validEvent); + + // Call the method + groupingMessagingEventPublisher.publish(events); + + // Verify the valid event is processed + verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), any(MultipleProcessInstanceDataEvent.class)); + } + + @Test + public void testDecorateMessage() { + Message rawMessage = mock(Message.class); + when(decoratorProvider.decorate(rawMessage)).thenReturn(decoratedMessage); + + reactiveMessagingEventPublisher.init(); + + Message result = reactiveMessagingEventPublisher.decorateMessage(rawMessage); + assertEquals(decoratedMessage, result); + + verify(decoratorProvider).decorate(rawMessage); + } + + @Test + public void testPublishToTopicWithDecorator() throws Exception { + Object event = new Object(); + when(json.writeValueAsString(event)).thenReturn("eventString"); + + reactiveMessagingEventPublisher.init(); + + // Mock the message emitter + AbstractMessagingEventPublisher.AbstractMessageEmitter mockEmitter = mock(AbstractMessagingEventPublisher.AbstractMessageEmitter.class); + + // Ensure decorated message is used + when(decoratorProvider.decorate(any(Message.class))).thenReturn(decoratedMessage); + + // Spy on the reactiveMessagingEventPublisher to allow publishToTopic + reactiveMessagingEventPublisher.publishToTopic(mockEmitter, event); + + // Verify that the message was decorated and sent + verify(decoratorProvider).decorate(any(Message.class)); + verify(mockEmitter).accept(decoratedMessage); + } + + @Test + public void testPublishWithMultipleEventTypesSomeWithoutConsumers() { + DataEvent processInstanceEvent = mock(ProcessInstanceDataEvent.class); + when(processInstanceEvent.getType()).thenReturn("ProcessInstanceStateDataEvent"); + + DataEvent unsupportedEvent = mock(DataEvent.class); + when(unsupportedEvent.getType()).thenReturn("UnsupportedEvent"); + + doReturn(Optional.of(processInstanceConsumer)).when(groupingMessagingEventPublisher).getConsumer(processInstanceEvent); + doReturn(Optional.empty()).when(groupingMessagingEventPublisher).getConsumer(unsupportedEvent); + + Collection> events = Arrays.asList(processInstanceEvent, unsupportedEvent); + + groupingMessagingEventPublisher.publish(events); + + // Ensure that only the supported event was published + verify(groupingMessagingEventPublisher, times(1)).publishToTopic(eq(processInstanceConsumer), any(MultipleProcessInstanceDataEvent.class)); + verify(groupingMessagingEventPublisher, never()).publishToTopic(any(), eq(Collections.singletonList(unsupportedEvent))); + } + + private void mockMessageForBothAckNack(Message message) { + when(message.withAck(any())).thenReturn(message); + when(message.withNack(any())).thenReturn(message); + } +}