diff --git a/data-index/data-index-common/src/main/java/org/kie/kogito/index/service/IndexingService.java b/data-index/data-index-common/src/main/java/org/kie/kogito/index/service/IndexingService.java index f3cd153d5b..947e1972bf 100644 --- a/data-index/data-index-common/src/main/java/org/kie/kogito/index/service/IndexingService.java +++ b/data-index/data-index-common/src/main/java/org/kie/kogito/index/service/IndexingService.java @@ -79,14 +79,8 @@ public class IndexingService { public void indexProcessInstanceEvent(ProcessInstanceDataEvent event) { ProcessInstanceStorage storage = manager.getProcessInstanceStorage(); if (event instanceof MultipleProcessInstanceDataEvent) { - for (ProcessInstanceDataEvent item : ((MultipleProcessInstanceDataEvent) event).getData()) - indexProccessInstanceEvent(storage, item); - } else { - indexProccessInstanceEvent(storage, event); + storage.indexGroup(((MultipleProcessInstanceDataEvent) event)); } - } - - private void indexProccessInstanceEvent(ProcessInstanceStorage storage, ProcessInstanceDataEvent event) { if (event instanceof ProcessInstanceErrorDataEvent) { storage.indexError((ProcessInstanceErrorDataEvent) event); } else if (event instanceof ProcessInstanceNodeDataEvent) { diff --git a/data-index/data-index-storage/data-index-storage-api/src/main/java/org/kie/kogito/index/storage/ProcessInstanceStorage.java b/data-index/data-index-storage/data-index-storage-api/src/main/java/org/kie/kogito/index/storage/ProcessInstanceStorage.java index 753caf66bc..674cc17bd4 100644 --- a/data-index/data-index-storage/data-index-storage-api/src/main/java/org/kie/kogito/index/storage/ProcessInstanceStorage.java +++ b/data-index/data-index-storage/data-index-storage-api/src/main/java/org/kie/kogito/index/storage/ProcessInstanceStorage.java @@ -18,6 +18,7 @@ */ package org.kie.kogito.index.storage; +import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent; import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent; import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent; import org.kie.kogito.event.process.ProcessInstanceSLADataEvent; @@ -28,6 +29,8 @@ public interface ProcessInstanceStorage extends StorageFetcher { + void indexGroup(MultipleProcessInstanceDataEvent event); + void indexError(ProcessInstanceErrorDataEvent event); void indexNode(ProcessInstanceNodeDataEvent event); diff --git a/data-index/data-index-storage/data-index-storage-common/src/main/java/org/kie/kogito/index/storage/ModelProcessInstanceStorage.java b/data-index/data-index-storage/data-index-storage-common/src/main/java/org/kie/kogito/index/storage/ModelProcessInstanceStorage.java index 9f21497fc2..07b645bb70 100644 --- a/data-index/data-index-storage/data-index-storage-common/src/main/java/org/kie/kogito/index/storage/ModelProcessInstanceStorage.java +++ b/data-index/data-index-storage/data-index-storage-common/src/main/java/org/kie/kogito/index/storage/ModelProcessInstanceStorage.java @@ -18,6 +18,7 @@ */ package org.kie.kogito.index.storage; +import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent; import org.kie.kogito.event.process.ProcessInstanceDataEvent; import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent; import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent; @@ -69,6 +70,23 @@ public void indexVariable(ProcessInstanceVariableDataEvent event) { index(event, variableMerger); } + @Override + public void indexGroup(MultipleProcessInstanceDataEvent events) { + for (ProcessInstanceDataEvent event : events.getData()) { + if (event instanceof ProcessInstanceErrorDataEvent) { + index(event, errorMerger); + } else if (event instanceof ProcessInstanceNodeDataEvent) { + index(event, nodeMerger); + } else if (event instanceof ProcessInstanceSLADataEvent) { + index(event, slaMerger); + } else if (event instanceof ProcessInstanceStateDataEvent) { + index(event, stateMerger); + } else if (event instanceof ProcessInstanceVariableDataEvent) { + index(event, variableMerger); + } + } + } + private > void index(T event, ProcessInstanceEventMerger merger) { ProcessInstance processInstance = storage.get(event.getKogitoProcessInstanceId()); if (processInstance == null) { diff --git a/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/ProcessInstanceEntityStorage.java b/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/ProcessInstanceEntityStorage.java index 386aabcf78..de23853a83 100644 --- a/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/ProcessInstanceEntityStorage.java +++ b/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/ProcessInstanceEntityStorage.java @@ -20,9 +20,11 @@ import java.time.ZonedDateTime; import java.util.ArrayList; -import java.util.Date; +import java.util.Iterator; import java.util.Set; +import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent; +import org.kie.kogito.event.process.ProcessInstanceDataEvent; import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent; import org.kie.kogito.event.process.ProcessInstanceErrorEventBody; import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent; @@ -64,43 +66,60 @@ public ProcessInstanceEntityStorage(ProcessInstanceEntityRepository repository, super(repository, ProcessInstanceEntity.class, mapper::mapToModel); } + @Override + @Transactional + public void indexGroup(MultipleProcessInstanceDataEvent event) { + Iterator> iter = event.getData().iterator(); + ProcessInstanceDataEvent firstEvent = iter.next(); + ProcessInstanceEntity pi = findOrInit(firstEvent); + indexEvent(pi, firstEvent); + while (iter.hasNext()) { + indexEvent(pi, iter.next()); + } + repository.flush(); + } + @Override @Transactional public void indexError(ProcessInstanceErrorDataEvent event) { - indexError(event.getData()); + indexError(findOrInit(event), event.getData()); + repository.flush(); } @Override @Transactional public void indexNode(ProcessInstanceNodeDataEvent event) { - indexNode(event.getData()); + indexNode(findOrInit(event), event.getData()); + repository.flush(); } @Override @Transactional public void indexSLA(ProcessInstanceSLADataEvent event) { - indexSLA(event.getData()); - + indexSla(findOrInit(event), event.getData()); + repository.flush(); } @Override @Transactional public void indexState(ProcessInstanceStateDataEvent event) { - indexState(event.getData(), event.getKogitoAddons() == null ? Set.of() : Set.of(event.getKogitoAddons().split(",")), event.getSource() == null ? null : event.getSource().toString()); + indexState(findOrInit(event), event); + repository.flush(); } @Override @Transactional public void indexVariable(ProcessInstanceVariableDataEvent event) { - indexVariable(event.getData()); + indexVariable(findOrInit(event), event.getData()); + repository.flush(); } - private ProcessInstanceEntity findOrInit(String processId, String processInstanceId, Date date) { - return repository.findByIdOptional(processInstanceId).orElseGet(() -> { + private ProcessInstanceEntity findOrInit(ProcessInstanceDataEvent event) { + return repository.findByIdOptional(event.getKogitoProcessInstanceId()).orElseGet(() -> { ProcessInstanceEntity pi = new ProcessInstanceEntity(); - pi.setProcessId(processId); - pi.setId(processInstanceId); - pi.setLastUpdate(toZonedDateTime(date)); + pi.setProcessId(event.getKogitoProcessId()); + pi.setId(event.getKogitoProcessInstanceId()); + pi.setLastUpdate(toZonedDateTime(event.getTime())); pi.setNodes(new ArrayList<>()); pi.setMilestones(new ArrayList<>()); repository.persist(pi); @@ -108,8 +127,21 @@ private ProcessInstanceEntity findOrInit(String processId, String processInstanc }); } - private void indexError(ProcessInstanceErrorEventBody error) { - ProcessInstanceEntity pi = findOrInit(error.getProcessId(), error.getProcessInstanceId(), error.getEventDate()); + private void indexEvent(ProcessInstanceEntity pi, ProcessInstanceDataEvent event) { + if (event instanceof ProcessInstanceErrorDataEvent) { + indexError(pi, ((ProcessInstanceErrorDataEvent) event).getData()); + } else if (event instanceof ProcessInstanceNodeDataEvent) { + indexNode(pi, ((ProcessInstanceNodeDataEvent) event).getData()); + } else if (event instanceof ProcessInstanceSLADataEvent) { + indexSla(pi, ((ProcessInstanceSLADataEvent) event).getData()); + } else if (event instanceof ProcessInstanceStateDataEvent) { + indexState(pi, (ProcessInstanceStateDataEvent) event); + } else if (event instanceof ProcessInstanceVariableDataEvent) { + indexVariable(pi, ((ProcessInstanceVariableDataEvent) event).getData()); + } + } + + private void indexError(ProcessInstanceEntity pi, ProcessInstanceErrorEventBody error) { ProcessInstanceErrorEntity errorEntity = pi.getError(); if (errorEntity == null) { errorEntity = new ProcessInstanceErrorEntity(); @@ -118,16 +150,14 @@ private void indexError(ProcessInstanceErrorEventBody error) { errorEntity.setMessage(error.getErrorMessage()); errorEntity.setNodeDefinitionId(error.getNodeDefinitionId()); pi.setState(CommonUtils.ERROR_STATE); - repository.flush(); } - private void indexNode(ProcessInstanceNodeEventBody data) { - ProcessInstanceEntity pi = findOrInit(data.getProcessId(), data.getProcessInstanceId(), data.getEventDate()); + private void indexNode(ProcessInstanceEntity pi, ProcessInstanceNodeEventBody data) { pi.getNodes().stream().filter(n -> n.getId().equals(data.getNodeInstanceId())).findAny().ifPresentOrElse(n -> updateNode(n, data), () -> createNode(pi, data)); if ("MilestoneNode".equals(data.getNodeType())) { pi.getMilestones().stream().filter(n -> n.getId().equals(data.getNodeInstanceId())).findAny().ifPresentOrElse(n -> updateMilestone(n, data), () -> createMilestone(pi, data)); } - repository.flush(); + } private MilestoneEntity createMilestone(ProcessInstanceEntity pi, ProcessInstanceNodeEventBody data) { @@ -174,13 +204,11 @@ private NodeInstanceEntity updateNode(NodeInstanceEntity nodeInstance, ProcessIn return nodeInstance; } - private void indexSLA(ProcessInstanceSLAEventBody data) { - findOrInit(data.getProcessId(), data.getProcessInstanceId(), data.getEventDate()); - repository.flush(); + private void indexState(ProcessInstanceEntity pi, ProcessInstanceStateDataEvent event) { + indexState(pi, event.getData(), event.getKogitoAddons() == null ? Set.of() : Set.of(event.getKogitoAddons().split(",")), event.getSource() == null ? null : event.getSource().toString()); } - private void indexState(ProcessInstanceStateEventBody data, Set addons, String endpoint) { - ProcessInstanceEntity pi = findOrInit(data.getProcessId(), data.getProcessInstanceId(), data.getEventDate()); + private void indexState(ProcessInstanceEntity pi, ProcessInstanceStateEventBody data, Set addons, String endpoint) { pi.setVersion(data.getProcessVersion()); pi.setProcessName(data.getProcessName()); pi.setRootProcessInstanceId(data.getRootProcessInstanceId()); @@ -199,13 +227,13 @@ private void indexState(ProcessInstanceStateEventBody data, Set addons, pi.setLastUpdate(toZonedDateTime(data.getEventDate())); pi.setAddons(addons); pi.setEndpoint(endpoint); - repository.flush(); } - private void indexVariable(ProcessInstanceVariableEventBody data) { - ProcessInstanceEntity pi = findOrInit(data.getProcessId(), data.getProcessInstanceId(), data.getEventDate()); + private void indexVariable(ProcessInstanceEntity pi, ProcessInstanceVariableEventBody data) { pi.setVariables(JsonUtils.mergeVariable(data.getVariableName(), data.getVariableValue(), pi.getVariables())); - repository.flush(); } + private void indexSla(ProcessInstanceEntity orInit, ProcessInstanceSLAEventBody data) { + // SLA does nothing for now + } }