Skip to content

Commit

Permalink
[Fix apache/incubator-kie-issues#1457] Allow grouping of events
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Aug 31, 2024
1 parent 8b50479 commit 7a75190
Show file tree
Hide file tree
Showing 3 changed files with 243 additions and 131 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.events.process;

import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;
import org.eclipse.microprofile.reactive.messaging.OnOverflow.Strategy;
import org.kie.kogito.addon.quarkus.common.reactive.messaging.MessageDecoratorProvider;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventPublisher;
import org.kie.kogito.events.config.EventsRuntimeConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectMapper;

import io.smallrye.reactive.messaging.MutinyEmitter;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;

import jakarta.annotation.PostConstruct;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;

public abstract class AbstractMessagingEventPublisher implements EventPublisher {

private static final Logger logger = LoggerFactory.getLogger(AbstractMessagingEventPublisher.class);

@Inject
ObjectMapper json;

@Inject
@Channel(PROCESS_INSTANCES_TOPIC_NAME)
@OnOverflow(Strategy.UNBOUNDED_BUFFER)
MutinyEmitter<String> processInstancesEventsEmitter;
private AbstractMessageEmitter processInstanceConsumer;

@Inject
@Channel(PROCESS_DEFINITIONS_TOPIC_NAME)
MutinyEmitter<String> processDefinitionEventsEmitter;
private AbstractMessageEmitter processDefinitionConsumer;

@Inject
@Channel(USER_TASK_INSTANCES_TOPIC_NAME)
MutinyEmitter<String> userTasksEventsEmitter;
private AbstractMessageEmitter userTaskConsumer;
@Inject
EventsRuntimeConfig eventsRuntimeConfig;

@Inject
Instance<MessageDecoratorProvider> decoratorProviderInstance;

private MessageDecoratorProvider decoratorProvider;

@PostConstruct
public void init() {
decoratorProvider = decoratorProviderInstance.isResolvable() ? decoratorProviderInstance.get() : null;
processDefinitionConsumer = eventsRuntimeConfig.isProcessInstancesPropagateError() ? new BlockingMessageEmitter(processDefinitionEventsEmitter, PROCESS_DEFINITIONS_TOPIC_NAME)
: new ReactiveMessageEmitter(processDefinitionEventsEmitter, PROCESS_DEFINITIONS_TOPIC_NAME);
processInstanceConsumer = eventsRuntimeConfig.isProcessDefinitionPropagateError() ? new BlockingMessageEmitter(processInstancesEventsEmitter, PROCESS_INSTANCES_TOPIC_NAME)
: new ReactiveMessageEmitter(processInstancesEventsEmitter, PROCESS_INSTANCES_TOPIC_NAME);
userTaskConsumer = eventsRuntimeConfig.isUserTasksPropagateError() ? new BlockingMessageEmitter(userTasksEventsEmitter, USER_TASK_INSTANCES_TOPIC_NAME)
: new ReactiveMessageEmitter(userTasksEventsEmitter, USER_TASK_INSTANCES_TOPIC_NAME);
}

protected Optional<AbstractMessageEmitter> getConsumer(DataEvent<?> event) {
switch (event.getType()) {
case "ProcessDefinitionEvent":
return eventsRuntimeConfig.isProcessDefinitionEventsEnabled() ? Optional.of(processDefinitionConsumer) : Optional.empty();

case "ProcessInstanceErrorDataEvent":
case "ProcessInstanceNodeDataEvent":
case "ProcessInstanceSLADataEvent":
case "ProcessInstanceStateDataEvent":
case "ProcessInstanceVariableDataEvent":
return eventsRuntimeConfig.isProcessInstancesEventsEnabled() ? Optional.of(processInstanceConsumer) : Optional.empty();

case "UserTaskInstanceAssignmentDataEvent":
case "UserTaskInstanceAttachmentDataEvent":
case "UserTaskInstanceCommentDataEvent":
case "UserTaskInstanceDeadlineDataEvent":
case "UserTaskInstanceStateDataEvent":
case "UserTaskInstanceVariableDataEvent":
return eventsRuntimeConfig.isUserTasksEventsEnabled() ? Optional.of(userTaskConsumer) : Optional.empty();

default:
return Optional.empty();
}
}

@Override
public void publish(Collection<DataEvent<?>> events) {
for (DataEvent<?> event : events) {
publish(event);
}
}

protected void publishToTopic(AbstractMessageEmitter emitter, Object event) {
logger.debug("About to publish event {} to topic {}", event, emitter.topic);
Message<String> message = null;
try {
String eventString = json.writeValueAsString(event);
logger.debug("Event payload '{}'", eventString);
message = decorateMessage(ContextAwareMessage.of(eventString));
} catch (Exception e) {
logger.error("Error while creating event to topic {} for event {}", emitter.topic, event);
}
if (message != null) {
emitter.accept(message);
}
}

protected Message<String> decorateMessage(Message<String> message) {
return decoratorProvider != null ? decoratorProvider.decorate(message) : message;
}

protected static abstract class AbstractMessageEmitter implements Consumer<Message<String>> {

protected final String topic;
protected final MutinyEmitter<String> emitter;

protected AbstractMessageEmitter(MutinyEmitter<String> emitter, String topic) {
this.emitter = emitter;
this.topic = topic;
}
}

private static class BlockingMessageEmitter extends AbstractMessageEmitter {
protected BlockingMessageEmitter(MutinyEmitter<String> emitter, String topic) {
super(emitter, topic);
}

@Override
public void accept(Message<String> message) {
emitter.sendMessageAndAwait(message);
logger.debug("Successfully published message {}", message.getPayload());
}
}

private static class ReactiveMessageEmitter extends AbstractMessageEmitter {
protected ReactiveMessageEmitter(MutinyEmitter<String> emitter, String topic) {
super(emitter, topic);
}

@Override
public void accept(Message<String> message) {
emitter.sendMessageAndForget(message
.withAck(() -> onAck(message))
.withNack(reason -> onNack(reason, message)));
}

private CompletionStage<Void> onAck(Message<String> message) {
logger.debug("Successfully published message {}", message.getPayload());
return CompletableFuture.completedFuture(null);
}

private CompletionStage<Void> onNack(Throwable reason, Message<String> message) {
logger.error("Error while publishing message {}", message, reason);
return CompletableFuture.completedFuture(null);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.events.process;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import org.kie.kogito.event.DataEvent;

import io.quarkus.arc.properties.IfBuildProperty;

import jakarta.inject.Singleton;

@Singleton
@IfBuildProperty(name = "kogito.events.grouping", stringValue = "true")
public class GroupingMessagingEventPublisher extends AbstractMessagingEventPublisher {

@Override
public void publish(DataEvent<?> event) {
publish(Collections.singletonList(event));
}

@Override
public void publish(Collection<DataEvent<?>> events) {
Map<AbstractMessageEmitter, Collection<DataEvent<?>>> eventsByChannel = new HashMap<>();
for (DataEvent<?> event : events) {
getConsumer(event).ifPresent(c -> eventsByChannel.computeIfAbsent(c, k -> new ArrayList<>()).add(event));
}
for (Entry<AbstractMessageEmitter, Collection<DataEvent<?>>> item : eventsByChannel.entrySet()) {
publishToTopic(item.getKey(), item.getValue());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,102 +19,20 @@
package org.kie.kogito.events.process;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;
import org.eclipse.microprofile.reactive.messaging.OnOverflow.Strategy;
import org.kie.kogito.addon.quarkus.common.reactive.messaging.MessageDecoratorProvider;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventPublisher;
import org.kie.kogito.events.config.EventsRuntimeConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.quarkus.arc.properties.UnlessBuildProperty;

import io.smallrye.reactive.messaging.MutinyEmitter;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;

import jakarta.annotation.PostConstruct;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

@Singleton
public class ReactiveMessagingEventPublisher implements EventPublisher {

private static final Logger logger = LoggerFactory.getLogger(ReactiveMessagingEventPublisher.class);

@Inject
ObjectMapper json;

@Inject
@Channel(PROCESS_INSTANCES_TOPIC_NAME)
@OnOverflow(Strategy.UNBOUNDED_BUFFER)
MutinyEmitter<String> processInstancesEventsEmitter;
private BiConsumer<MutinyEmitter<String>, Message<String>> processInstanceConsumer;

@Inject
@Channel(PROCESS_DEFINITIONS_TOPIC_NAME)
MutinyEmitter<String> processDefinitionEventsEmitter;
private BiConsumer<MutinyEmitter<String>, Message<String>> processDefinitionConsumer;

@Inject
@Channel(USER_TASK_INSTANCES_TOPIC_NAME)
MutinyEmitter<String> userTasksEventsEmitter;
private BiConsumer<MutinyEmitter<String>, Message<String>> userTaskConsumer;
@Inject
EventsRuntimeConfig eventsRuntimeConfig;

@Inject
Instance<MessageDecoratorProvider> decoratorProviderInstance;

private MessageDecoratorProvider decoratorProvider;

@PostConstruct
public void init() {
decoratorProvider = decoratorProviderInstance.isResolvable() ? decoratorProviderInstance.get() : null;
processInstanceConsumer = eventsRuntimeConfig.isProcessInstancesPropagateError() ? new BlockingMessageEmitter() : new ReactiveMessageEmitter();
processDefinitionConsumer = eventsRuntimeConfig.isProcessDefinitionPropagateError() ? new BlockingMessageEmitter() : new ReactiveMessageEmitter();
userTaskConsumer = eventsRuntimeConfig.isUserTasksPropagateError() ? new BlockingMessageEmitter() : new ReactiveMessageEmitter();
}
@UnlessBuildProperty(name = "kogito.events.grouping", stringValue = "true", enableIfMissing = true)
public class ReactiveMessagingEventPublisher extends AbstractMessagingEventPublisher {

@Override
public void publish(DataEvent<?> event) {

switch (event.getType()) {
case "ProcessDefinitionEvent":
if (eventsRuntimeConfig.isProcessDefinitionEventsEnabled()) {
publishToTopic(processDefinitionConsumer, event, processDefinitionEventsEmitter, PROCESS_DEFINITIONS_TOPIC_NAME);
}
break;
case "ProcessInstanceErrorDataEvent":
case "ProcessInstanceNodeDataEvent":
case "ProcessInstanceSLADataEvent":
case "ProcessInstanceStateDataEvent":
case "ProcessInstanceVariableDataEvent":
if (eventsRuntimeConfig.isProcessInstancesEventsEnabled()) {
publishToTopic(processInstanceConsumer, event, processInstancesEventsEmitter, PROCESS_INSTANCES_TOPIC_NAME);
}
break;

case "UserTaskInstanceAssignmentDataEvent":
case "UserTaskInstanceAttachmentDataEvent":
case "UserTaskInstanceCommentDataEvent":
case "UserTaskInstanceDeadlineDataEvent":
case "UserTaskInstanceStateDataEvent":
case "UserTaskInstanceVariableDataEvent":
if (eventsRuntimeConfig.isUserTasksEventsEnabled()) {
publishToTopic(userTaskConsumer, event, userTasksEventsEmitter, USER_TASK_INSTANCES_TOPIC_NAME);
}
break;
default:
logger.debug("Unknown type of event '{}', ignoring for this publisher", event.getType());
}
getConsumer(event).ifPresent(emitter -> publishToTopic(emitter, event));
}

@Override
Expand All @@ -124,49 +42,4 @@ public void publish(Collection<DataEvent<?>> events) {
}
}

protected void publishToTopic(BiConsumer<MutinyEmitter<String>, Message<String>> consumer, DataEvent<?> event, MutinyEmitter<String> emitter, String topic) {
logger.debug("About to publish event {} to topic {}", event, topic);
Message<String> message = null;
try {
String eventString = json.writeValueAsString(event);
logger.debug("Event payload '{}'", eventString);
message = decorateMessage(ContextAwareMessage.of(eventString));
} catch (Exception e) {
logger.error("Error while creating event to topic {} for event {}", topic, event);
}
if (message != null) {
consumer.accept(emitter, message);
}
}

protected CompletionStage<Void> onAck(Message<String> message) {
logger.debug("Successfully published message {}", message.getPayload());
return CompletableFuture.completedFuture(null);
}

protected CompletionStage<Void> onNack(Throwable reason, Message<String> message) {
logger.error("Error while publishing message {}", message, reason);
return CompletableFuture.completedFuture(null);
}

protected Message<String> decorateMessage(Message<String> message) {
return decoratorProvider != null ? decoratorProvider.decorate(message) : message;
}

private class BlockingMessageEmitter implements BiConsumer<MutinyEmitter<String>, Message<String>> {
@Override
public void accept(MutinyEmitter<String> emitter, Message<String> message) {
emitter.sendMessageAndAwait(message);
logger.debug("Successfully published message {}", message.getPayload());
}
}

private class ReactiveMessageEmitter implements BiConsumer<MutinyEmitter<String>, Message<String>> {
@Override
public void accept(MutinyEmitter<String> emitter, Message<String> message) {
emitter.sendMessageAndForget(message
.withAck(() -> onAck(message))
.withNack(reason -> onNack(reason, message)));
}
}
}

0 comments on commit 7a75190

Please sign in to comment.