Skip to content

Commit

Permalink
[Fix apache#3719] Unmarshall return ObjectNode that is event sensitive (
Browse files Browse the repository at this point in the history
apache#3720)

* [Fix apache#3719] Unmarshall return ObjectNode that is event sensitive

* [Fix apache#3719] Fixing IT test
  • Loading branch information
fjtirado authored and rgdoliveira committed Oct 24, 2024
1 parent 12d5506 commit 343f086
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public static String buildSource(String service, String processId) {
if (processId == null) {
return null;
} else {
processId = processId.replace(" ", "-");
return service + "/" + (processId.contains(".") ? processId.substring(processId.lastIndexOf('.') + 1) : processId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.jbpm.process.core.datatype.DataType;
import org.jbpm.process.core.datatype.DataTypeUtils;
import org.jbpm.process.core.datatype.impl.coverter.CloneHelper;
import org.jbpm.process.core.datatype.impl.coverter.TypeConverterRegistry;

/**
Expand Down Expand Up @@ -86,6 +87,10 @@ public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(className);
}

public Object clone(Object value) {
return CloneHelper.get().clone(value);
}

@Override
public boolean verifyDataType(final Object value) {
if (value == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public Any marshall(Object unmarshalled) {
public Object unmarshall(Any data) {
try {
KogitoTypesProtobuf.JsonNode storedValue = data.unpack(KogitoTypesProtobuf.JsonNode.class);
return ObjectMapperFactory.get().readTree(storedValue.getContent());
return ObjectMapperFactory.listenerAware().readTree(storedValue.getContent());
} catch (InvalidProtocolBufferException | JsonProcessingException e1) {
throw new ProcessInstanceMarshallerException("Error trying to unmarshalling a Json Node value", e1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,10 @@
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.NullSource;
import org.kie.kogito.internal.process.runtime.KogitoProcessRuntime;
import org.kie.kogito.jackson.utils.ObjectMapperFactory;
import org.kie.kogito.process.impl.AbstractProcess;
import org.w3c.dom.Document;

import com.fasterxml.jackson.databind.ObjectMapper;

import jakarta.xml.bind.JAXBContext;
import jakarta.xml.bind.annotation.XmlRootElement;

Expand Down Expand Up @@ -197,8 +196,8 @@ private static Stream<Arguments> testRoundTrip() throws Exception {
Arguments.of(5l),
Arguments.of(BigDecimal.valueOf(10l)),
Arguments.of(new MarshableObject("henry")),
Arguments.of(new ObjectMapper().readTree("{ \"key\" : \"value\" }")),
Arguments.of(new ObjectMapper().valueToTree(marshableObject)),
Arguments.of(ObjectMapperFactory.listenerAware().readTree("{ \"key\" : \"value\" }")),
Arguments.of(ObjectMapperFactory.listenerAware().valueToTree(marshableObject)),
Arguments.of(new Date()),
Arguments.of(Instant.now()),
Arguments.of(OffsetDateTime.now()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.kie.kogito.StaticConfig;
import org.kie.kogito.codegen.api.context.impl.JavaKogitoBuildContext;
import org.kie.kogito.config.StaticConfigBean;
import org.kie.kogito.event.EventManager;
import org.kie.kogito.event.EventPublisher;
import org.kie.kogito.event.impl.EventFactoryUtils;
import org.kie.kogito.internal.process.event.DefaultKogitoProcessEventListener;
import org.kie.kogito.internal.process.event.KogitoProcessEventListener;
Expand All @@ -65,6 +67,8 @@
import org.kie.kogito.serverless.workflow.utils.MultiSourceConfigResolver;
import org.kie.kogito.services.uow.CollectingUnitOfWorkFactory;
import org.kie.kogito.services.uow.DefaultUnitOfWorkManager;
import org.kie.kogito.services.uow.UnitOfWorkExecutor;
import org.kie.kogito.uow.UnitOfWorkManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -98,6 +102,7 @@ public class StaticWorkflowApplication extends StaticApplication implements Auto
private Iterable<StaticProcessRegister> processRegisters;
private final Collection<AutoCloseable> closeables = new ArrayList<>();
private final Map<String, SynchronousQueue<JsonNodeModel>> queues;
private final UnitOfWorkManager manager;
private ProcessInstancesFactory processInstancesFactory;

private static class StaticCompletionEventListener extends DefaultKogitoProcessEventListener {
Expand Down Expand Up @@ -127,7 +132,10 @@ public void afterProcessCompleted(ProcessCompletedEvent event) {
public static class WorkflowApplicationBuilder {

private Map<String, Object> properties;
private String serviceName = "EmbeddedKogito";
private Collection<KogitoProcessEventListener> listeners = new ArrayList<>();
private Optional<UnitOfWorkManager> manager = Optional.empty();
private Collection<EventPublisher> publishers = new ArrayList<>();

private WorkflowApplicationBuilder() {
}
Expand All @@ -145,14 +153,36 @@ public WorkflowApplicationBuilder withEventListener(KogitoProcessEventListener l
return this;
}

public WorkflowApplicationBuilder withManager(UnitOfWorkManager manager) {
this.manager = Optional.ofNullable(manager);
return this;
}

public WorkflowApplicationBuilder withService(String serviceName) {
this.serviceName = serviceName;
return this;
}

public WorkflowApplicationBuilder withEventPublisher(EventPublisher publisher, EventPublisher... extraPublishers) {
publishers.add(publisher);
for (EventPublisher extraPublisher : extraPublishers) {
publishers.add(extraPublisher);
}
return this;
}

public StaticWorkflowApplication build() {
if (properties == null) {
this.properties = loadApplicationDotProperties();
}
Map<String, SynchronousQueue<JsonNodeModel>> queues = new ConcurrentHashMap<>();
listeners.add(new StaticCompletionEventListener(queues));
StaticWorkflowApplication application = new StaticWorkflowApplication(properties, queues, listeners);
StaticWorkflowApplication application =
new StaticWorkflowApplication(properties, queues, listeners, manager.orElseGet(() -> new DefaultUnitOfWorkManager(new CollectingUnitOfWorkFactory())));
application.applicationRegisters.forEach(register -> register.register(application));
EventManager eventManager = application.manager.eventManager();
eventManager.setService(serviceName);
publishers.forEach(p -> eventManager.addPublisher(p));
return application;
}
}
Expand Down Expand Up @@ -189,14 +219,15 @@ public static StaticWorkflowApplication create(Map<String, Object> properties) {
return builder().withProperties(properties).build();
}

private StaticWorkflowApplication(Map<String, Object> properties, Map<String, SynchronousQueue<JsonNodeModel>> queues, Collection<KogitoProcessEventListener> listeners) {
private StaticWorkflowApplication(Map<String, Object> properties, Map<String, SynchronousQueue<JsonNodeModel>> queues, Collection<KogitoProcessEventListener> listeners,
UnitOfWorkManager manager) {
super(new StaticConfig(new Addons(Collections.emptySet()), new StaticProcessConfig(new CachedWorkItemHandlerConfig(),
new DefaultProcessEventListenerConfig(listeners),
new DefaultUnitOfWorkManager(new CollectingUnitOfWorkFactory())), new StaticConfigBean()));
new DefaultProcessEventListenerConfig(listeners), manager), new StaticConfigBean()));
if (!properties.isEmpty()) {
ConfigResolverHolder.setConfigResolver(MultiSourceConfigResolver.withSystemProperties(properties));
}
this.queues = queues;
this.manager = manager;
applicationRegisters = ServiceLoader.load(StaticApplicationRegister.class);
workflowRegisters = ServiceLoader.load(StaticWorkflowRegister.class);
processRegisters = ServiceLoader.load(StaticProcessRegister.class);
Expand Down Expand Up @@ -268,8 +299,10 @@ public JsonNodeModel execute(Process<JsonNodeModel> process, JsonNode data) {
*/
public JsonNodeModel execute(Process<JsonNodeModel> process, JsonNodeModel model) {
ProcessInstance<JsonNodeModel> processInstance = process.createInstance(model);
processInstance.start();
return processInstance.variables();
return UnitOfWorkExecutor.executeInUnitOfWork(manager, () -> {
processInstance.start();
return processInstance.variables();
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
<artifactId>wiremock-jre8</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<dependency>
<groupId>org.kie</groupId>
<artifactId>kie-addons-persistence-rocksdb</artifactId>
<scope>test</scope>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kie.kogito.serverless.workflow.executor;

import java.util.ArrayList;
import java.util.Collection;

import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventPublisher;

public class EventPublisherCollector implements EventPublisher {

private Collection<DataEvent<?>> events = new ArrayList<>();

@Override
public void publish(DataEvent<?> event) {
events.add(event);
}

@Override
public void publish(Collection<DataEvent<?>> events) {
events.forEach(this::publish);
}

public Collection<DataEvent<?>> events() {
return events;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,27 @@
import java.nio.file.Path;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.kie.api.event.process.ProcessVariableChangedEvent;
import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent;
import org.kie.kogito.event.process.ProcessInstanceVariableEventBody;
import org.kie.kogito.internal.process.event.DefaultKogitoProcessEventListener;
import org.kie.kogito.persistence.rocksdb.RocksDBProcessInstancesFactory;
import org.kie.kogito.serverless.workflow.SWFConstants;
import org.rocksdb.Options;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.node.TextNode;

Expand All @@ -51,21 +61,37 @@
import static org.kie.kogito.serverless.workflow.fluent.WorkflowBuilder.workflow;

public class PersistentApplicationTest {

private final static Logger logger = LoggerFactory.getLogger(PersistentApplicationTest.class);

@Test
void testCallbackSubscriberWithPersistence(@TempDir Path tempDir) throws InterruptedException, TimeoutException, RocksDBException {
final String eventType = "testSubscribe";
final String additionalData = "This has been injected by the event";
Workflow workflow = workflow("testCallback").start(callback(call(expr("concat", "{slogan:.slogan+\"er Beti\"}")), eventDef(eventType))).end().build();

final EventPublisherCollector eventCollector = new EventPublisherCollector();
Workflow workflow = workflow("testCallback").start(callback(call(expr("concat", "{slogan:.slogan+\"Viva er Beti manque pierda\"}")), eventDef(eventType))).end().build();
try (StaticWorkflowApplication application =
StaticWorkflowApplication.create().processInstancesFactory(new RocksDBProcessInstancesFactory(new Options().setCreateIfMissing(true), tempDir.toString()))) {
String id = application.execute(workflow, jsonObject().put("slogan", "Viva ")).getId();
StaticWorkflowApplication.builder().withEventListener(new DefaultKogitoProcessEventListener() {
@Override
public void afterVariableChanged(ProcessVariableChangedEvent event) {
logger.info(event.toString());

}
}).withEventPublisher(eventCollector).build().processInstancesFactory(new RocksDBProcessInstancesFactory(new Options().setCreateIfMissing(true), tempDir.toString()))) {
String id = application.execute(workflow, Map.of()).getId();
assertThat(application.variables(id).orElseThrow().getWorkflowdata()).doesNotContain(new TextNode(additionalData));
publish(eventType, buildCloudEvent(eventType, id)
.withData(JsonCloudEventData.wrap(jsonObject().put("additionalData", additionalData)))
.build());
assertThat(application.waitForFinish(id, Duration.ofSeconds(2000)).orElseThrow().getWorkflowdata())
.isEqualTo(jsonObject().put("additionalData", additionalData).put("slogan", "Viva er Beti"));
.isEqualTo(jsonObject().put("additionalData", additionalData).put("slogan", "Viva er Beti manque pierda"));
await().atMost(Duration.ofSeconds(1)).pollInterval(Duration.ofMillis(50)).until(() -> application.variables(id).isEmpty());
List<ProcessInstanceVariableEventBody> dataChangeEvents = eventCollector.events().stream().filter(ProcessInstanceVariableDataEvent.class::isInstance)
.map(ProcessInstanceVariableDataEvent.class::cast).map(ProcessInstanceVariableDataEvent::getData).collect(Collectors.toList());
assertThat(dataChangeEvents).hasSize(2);
assertThat(dataChangeEvents.get(0).getVariableName()).isEqualTo(SWFConstants.DEFAULT_WORKFLOW_VAR);
assertThat(dataChangeEvents.get(1).getVariableName()).isEqualTo(SWFConstants.DEFAULT_WORKFLOW_VAR + ".additionalData");
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<root level="info">
<appender-ref ref="STDOUT" />
</root>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,8 @@ String executeCallbackStateWithErrorPath(String callbackProcessPostUrl, String c
assertThat(lastExecutedState).isEqualTo("FinalizeWithError");

JsonPath variableLastExecutedStateEventContent =
waitForKogitoProcessInstanceEvent(kafkaClient, ProcessInstanceVariableDataEvent.class, e -> "workflowdata".equals(e.get("data.variableName")), true);
Map<Object, Object> lastExecutedStateDataMap = variableLastExecutedStateEventContent.getMap("data.variableValue");

assertThat(lastExecutedStateDataMap).containsEntry("lastExecutedState", "FinalizeWithError");
assertThat(lastExecutedStateDataMap).containsEntry("query", GENERATE_ERROR_QUERY);
waitForKogitoProcessInstanceEvent(kafkaClient, ProcessInstanceVariableDataEvent.class, e -> "workflowdata.lastExecutedState".equals(e.get("data.variableName")), true);
assertThat(variableLastExecutedStateEventContent.getString("data.variableValue")).isEqualTo("FinalizeWithError");

// the process instance should not be there since an end state was reached.
assertProcessInstanceNotExists(callbackProcessGetByIdUrl, processInstanceId);
Expand Down

0 comments on commit 343f086

Please sign in to comment.