Skip to content

Commit

Permalink
[Fix apache#2113] Data index group processing
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Oct 14, 2024
1 parent ee946ad commit da4cee6
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,8 @@ public class IndexingService {
public void indexProcessInstanceEvent(ProcessInstanceDataEvent<?> event) {
ProcessInstanceStorage storage = manager.getProcessInstanceStorage();
if (event instanceof MultipleProcessInstanceDataEvent) {
for (ProcessInstanceDataEvent<?> item : ((MultipleProcessInstanceDataEvent) event).getData())
indexProccessInstanceEvent(storage, item);
} else {
indexProccessInstanceEvent(storage, event);
storage.indexGroup(((MultipleProcessInstanceDataEvent) event));
}
}

private void indexProccessInstanceEvent(ProcessInstanceStorage storage, ProcessInstanceDataEvent<?> event) {
if (event instanceof ProcessInstanceErrorDataEvent) {
storage.indexError((ProcessInstanceErrorDataEvent) event);
} else if (event instanceof ProcessInstanceNodeDataEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.kie.kogito.index.storage;

import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent;
import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
import org.kie.kogito.event.process.ProcessInstanceSLADataEvent;
Expand All @@ -28,6 +29,8 @@

public interface ProcessInstanceStorage extends StorageFetcher<String, ProcessInstance> {

void indexGroup(MultipleProcessInstanceDataEvent event);

void indexError(ProcessInstanceErrorDataEvent event);

void indexNode(ProcessInstanceNodeDataEvent event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.kie.kogito.index.storage;

import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent;
import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
Expand Down Expand Up @@ -69,6 +70,23 @@ public void indexVariable(ProcessInstanceVariableDataEvent event) {
index(event, variableMerger);
}

@Override
public void indexGroup(MultipleProcessInstanceDataEvent events) {
for (ProcessInstanceDataEvent<?> event : events.getData()) {
if (event instanceof ProcessInstanceErrorDataEvent) {
index(event, errorMerger);
} else if (event instanceof ProcessInstanceNodeDataEvent) {
index(event, nodeMerger);
} else if (event instanceof ProcessInstanceSLADataEvent) {
index(event, slaMerger);
} else if (event instanceof ProcessInstanceStateDataEvent) {
index(event, stateMerger);
} else if (event instanceof ProcessInstanceVariableDataEvent) {
index(event, variableMerger);
}
}
}

private <T extends ProcessInstanceDataEvent<?>> void index(T event, ProcessInstanceEventMerger merger) {
ProcessInstance processInstance = storage.get(event.getKogitoProcessInstanceId());
if (processInstance == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;

import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent;
import org.kie.kogito.event.process.ProcessInstanceErrorEventBody;
import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
Expand Down Expand Up @@ -64,52 +66,82 @@ public ProcessInstanceEntityStorage(ProcessInstanceEntityRepository repository,
super(repository, ProcessInstanceEntity.class, mapper::mapToModel);
}

@Override
@Transactional
public void indexGroup(MultipleProcessInstanceDataEvent event) {
Iterator<ProcessInstanceDataEvent<?>> iter = event.getData().iterator();
ProcessInstanceDataEvent<?> firstEvent = iter.next();
ProcessInstanceEntity pi = findOrInit(firstEvent);
indexEvent(pi, firstEvent);
while (iter.hasNext()) {
indexEvent(pi, iter.next());
}
repository.flush();
}

@Override
@Transactional
public void indexError(ProcessInstanceErrorDataEvent event) {
indexError(event.getData());
indexError(findOrInit(event), event.getData());
repository.flush();
}

@Override
@Transactional
public void indexNode(ProcessInstanceNodeDataEvent event) {
indexNode(event.getData());
indexNode(findOrInit(event), event.getData());
repository.flush();
}

@Override
@Transactional
public void indexSLA(ProcessInstanceSLADataEvent event) {
indexSLA(event.getData());

indexSla(findOrInit(event), event.getData());
repository.flush();
}

@Override
@Transactional
public void indexState(ProcessInstanceStateDataEvent event) {
indexState(event.getData(), event.getKogitoAddons() == null ? Set.of() : Set.of(event.getKogitoAddons().split(",")), event.getSource() == null ? null : event.getSource().toString());
indexState(findOrInit(event), event);
repository.flush();
}

@Override
@Transactional
public void indexVariable(ProcessInstanceVariableDataEvent event) {
indexVariable(event.getData());
indexVariable(findOrInit(event), event.getData());
repository.flush();
}

private ProcessInstanceEntity findOrInit(String processId, String processInstanceId, Date date) {
return repository.findByIdOptional(processInstanceId).orElseGet(() -> {
private ProcessInstanceEntity findOrInit(ProcessInstanceDataEvent<?> event) {
return repository.findByIdOptional(event.getKogitoProcessInstanceId()).orElseGet(() -> {
ProcessInstanceEntity pi = new ProcessInstanceEntity();
pi.setProcessId(processId);
pi.setId(processInstanceId);
pi.setLastUpdate(toZonedDateTime(date));
pi.setProcessId(event.getKogitoProcessId());
pi.setId(event.getKogitoProcessInstanceId());
pi.setLastUpdate(toZonedDateTime(event.getTime()));
pi.setNodes(new ArrayList<>());
pi.setMilestones(new ArrayList<>());
repository.persist(pi);
return pi;
});
}

private void indexError(ProcessInstanceErrorEventBody error) {
ProcessInstanceEntity pi = findOrInit(error.getProcessId(), error.getProcessInstanceId(), error.getEventDate());
private void indexEvent(ProcessInstanceEntity pi, ProcessInstanceDataEvent<?> event) {
if (event instanceof ProcessInstanceErrorDataEvent) {
indexError(pi, ((ProcessInstanceErrorDataEvent) event).getData());
} else if (event instanceof ProcessInstanceNodeDataEvent) {
indexNode(pi, ((ProcessInstanceNodeDataEvent) event).getData());
} else if (event instanceof ProcessInstanceSLADataEvent) {
indexSla(pi, ((ProcessInstanceSLADataEvent) event).getData());
} else if (event instanceof ProcessInstanceStateDataEvent) {
indexState(pi, (ProcessInstanceStateDataEvent) event);
} else if (event instanceof ProcessInstanceVariableDataEvent) {
indexVariable(pi, ((ProcessInstanceVariableDataEvent) event).getData());
}
}

private void indexError(ProcessInstanceEntity pi, ProcessInstanceErrorEventBody error) {
ProcessInstanceErrorEntity errorEntity = pi.getError();
if (errorEntity == null) {
errorEntity = new ProcessInstanceErrorEntity();
Expand All @@ -118,16 +150,14 @@ private void indexError(ProcessInstanceErrorEventBody error) {
errorEntity.setMessage(error.getErrorMessage());
errorEntity.setNodeDefinitionId(error.getNodeDefinitionId());
pi.setState(CommonUtils.ERROR_STATE);
repository.flush();
}

private void indexNode(ProcessInstanceNodeEventBody data) {
ProcessInstanceEntity pi = findOrInit(data.getProcessId(), data.getProcessInstanceId(), data.getEventDate());
private void indexNode(ProcessInstanceEntity pi, ProcessInstanceNodeEventBody data) {
pi.getNodes().stream().filter(n -> n.getId().equals(data.getNodeInstanceId())).findAny().ifPresentOrElse(n -> updateNode(n, data), () -> createNode(pi, data));
if ("MilestoneNode".equals(data.getNodeType())) {
pi.getMilestones().stream().filter(n -> n.getId().equals(data.getNodeInstanceId())).findAny().ifPresentOrElse(n -> updateMilestone(n, data), () -> createMilestone(pi, data));
}
repository.flush();

}

private MilestoneEntity createMilestone(ProcessInstanceEntity pi, ProcessInstanceNodeEventBody data) {
Expand Down Expand Up @@ -174,13 +204,11 @@ private NodeInstanceEntity updateNode(NodeInstanceEntity nodeInstance, ProcessIn
return nodeInstance;
}

private void indexSLA(ProcessInstanceSLAEventBody data) {
findOrInit(data.getProcessId(), data.getProcessInstanceId(), data.getEventDate());
repository.flush();
private void indexState(ProcessInstanceEntity pi, ProcessInstanceStateDataEvent event) {
indexState(pi, event.getData(), event.getKogitoAddons() == null ? Set.of() : Set.of(event.getKogitoAddons().split(",")), event.getSource() == null ? null : event.getSource().toString());
}

private void indexState(ProcessInstanceStateEventBody data, Set<String> addons, String endpoint) {
ProcessInstanceEntity pi = findOrInit(data.getProcessId(), data.getProcessInstanceId(), data.getEventDate());
private void indexState(ProcessInstanceEntity pi, ProcessInstanceStateEventBody data, Set<String> addons, String endpoint) {
pi.setVersion(data.getProcessVersion());
pi.setProcessName(data.getProcessName());
pi.setRootProcessInstanceId(data.getRootProcessInstanceId());
Expand All @@ -199,13 +227,13 @@ private void indexState(ProcessInstanceStateEventBody data, Set<String> addons,
pi.setLastUpdate(toZonedDateTime(data.getEventDate()));
pi.setAddons(addons);
pi.setEndpoint(endpoint);
repository.flush();
}

private void indexVariable(ProcessInstanceVariableEventBody data) {
ProcessInstanceEntity pi = findOrInit(data.getProcessId(), data.getProcessInstanceId(), data.getEventDate());
private void indexVariable(ProcessInstanceEntity pi, ProcessInstanceVariableEventBody data) {
pi.setVariables(JsonUtils.mergeVariable(data.getVariableName(), data.getVariableValue(), pi.getVariables()));
repository.flush();
}

private void indexSla(ProcessInstanceEntity orInit, ProcessInstanceSLAEventBody data) {
// SLA does nothing for now
}
}

0 comments on commit da4cee6

Please sign in to comment.