From 43af0d0315a95185dc74e2ff686a2b7320457308 Mon Sep 17 00:00:00 2001 From: adriana-corui <54841592+adriana-corui@users.noreply.github.com> Date: Fri, 9 Dec 2022 09:37:02 +0200 Subject: [PATCH] Parallel throttle (#1390) * parallel throttle draft * parallel throttle split/join part * parallel throttle refactors and cleaning context after join * review stuff --- .../steps/ParallelLoopExecutionData.java | 355 +++++++++++------- .../runtime/steps/ParallelLoopStepsTest.java | 223 +++++------ 2 files changed, 334 insertions(+), 244 deletions(-) diff --git a/cloudslang-runtime/src/main/java/io/cloudslang/lang/runtime/steps/ParallelLoopExecutionData.java b/cloudslang-runtime/src/main/java/io/cloudslang/lang/runtime/steps/ParallelLoopExecutionData.java index 51fd232025..21eac30163 100644 --- a/cloudslang-runtime/src/main/java/io/cloudslang/lang/runtime/steps/ParallelLoopExecutionData.java +++ b/cloudslang-runtime/src/main/java/io/cloudslang/lang/runtime/steps/ParallelLoopExecutionData.java @@ -42,13 +42,21 @@ import org.springframework.stereotype.Component; import java.io.Serializable; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import static io.cloudslang.lang.entities.ScoreLangConstants.CURRENT_STEP_ID_KEY; +import static io.cloudslang.lang.entities.ScoreLangConstants.RUN_ENV; +import static io.cloudslang.lang.runtime.RuntimeConstants.BRANCHES_CONTEXT_KEY; import static io.cloudslang.score.api.execution.ExecutionParametersConsts.DEFAULT_ROI_VALUE; import static io.cloudslang.score.api.execution.ExecutionParametersConsts.EXECUTION_RUNTIME_SERVICES; import static io.cloudslang.score.api.execution.ExecutionParametersConsts.EXECUTION_TOTAL_ROI; +import static java.lang.Integer.parseInt; +import static org.apache.commons.collections4.CollectionUtils.isEmpty; +import static org.apache.commons.collections4.CollectionUtils.isNotEmpty; +import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.commons.lang3.Validate.notNull; /** @@ -70,7 +78,7 @@ public class ParallelLoopExecutionData extends AbstractExecutionData { private static final Logger logger = LogManager.getLogger(ParallelLoopExecutionData.class); public void addBranches(@Param(ScoreLangConstants.PARALLEL_LOOP_STATEMENT_KEY) LoopStatement parallelLoopStatement, - @Param(ScoreLangConstants.RUN_ENV) RunEnvironment runEnv, + @Param(RUN_ENV) RunEnvironment runEnv, @Param(EXECUTION_RUNTIME_SERVICES) ExecutionRuntimeServices executionRuntimeServices, @Param(ScoreLangConstants.NODE_NAME_KEY) String nodeName, @@ -87,37 +95,38 @@ public void addBranches(@Param(ScoreLangConstants.PARALLEL_LOOP_STATEMENT_KEY) L int parallelismLevel = executionRuntimeServices.getLevelParallelism() != null ? (int) executionRuntimeServices.getLevelParallelism() : 0; executionRuntimeServices.setLevelParallelism(parallelismLevel + 1); - - List splitData = parallelLoopBinding - .bindParallelLoopList(parallelLoopStatement, flowContext, runEnv.getSystemProperties(), nodeName); - - fireEvent( - executionRuntimeServices, - ScoreLangConstants.EVENT_SPLIT_BRANCHES, - "parallel loop expression bound", - runEnv.getExecutionPath().getCurrentPath(), - LanguageEventData.StepType.STEP, - nodeName, - flowContext.getImmutableViewOfVariables(), - Pair.of(LanguageEventData.BOUND_PARALLEL_LOOP_EXPRESSION, (Serializable) splitData)); + List splitData = handleFirstIteration(parallelLoopStatement, runEnv, executionRuntimeServices, + nodeName, flowContext); runEnv.putNextStepPosition(nextStepId); - runEnv.getExecutionPath().down(); + // todo take from contexts: executionRuntimeServices.getThrottleSize(); + // temporary taking throttle from system properties + final Integer throttleSize = Integer.getInteger("cloudslang.worker.parallelThrottleSize", null); + final int splitSize = splitData.size(); + final int lanesToStart = calculateNumberOfLanesToStart(splitSize, throttleSize); + final List splitDataCurrentBulk = splitData.subList(0, lanesToStart); + final List splitDataLeftoversSublist = splitData.subList(lanesToStart, splitSize); + + if (isNotEmpty(splitDataLeftoversSublist)) { + executionRuntimeServices.setSplitData(new ArrayList<>(splitDataLeftoversSublist)); + } else { + executionRuntimeServices.removeSplitData(); + } - for (Value splitItem : splitData) { + for (Value splitItem : splitDataCurrentBulk) { Context branchContext = (Context) SerializationUtils.clone(flowContext); // first fire event fireEvent( - executionRuntimeServices, - ScoreLangConstants.EVENT_BRANCH_START, - "parallel loop branch created", - runEnv.getExecutionPath().getCurrentPath(), - LanguageEventData.StepType.STEP, - nodeName, - branchContext.getImmutableViewOfVariables(), - Pair.of(ScoreLangConstants.REF_ID, refId), - Pair.of(RuntimeConstants.SPLIT_ITEM_KEY, splitItem)); + executionRuntimeServices, + ScoreLangConstants.EVENT_BRANCH_START, + "parallel loop branch created", + runEnv.getExecutionPath().getCurrentPath(), + LanguageEventData.StepType.STEP, + nodeName, + branchContext.getImmutableViewOfVariables(), + Pair.of(ScoreLangConstants.REF_ID, refId), + Pair.of(RuntimeConstants.SPLIT_ITEM_KEY, splitItem)); // take path down one level runEnv.getExecutionPath().down(); @@ -140,10 +149,10 @@ public void addBranches(@Param(ScoreLangConstants.PARALLEL_LOOP_STATEMENT_KEY) L branchContext, new HashMap<>(), new HashMap<>()); createBranch( - branchRuntimeEnvironment, - executionRuntimeServices, - refId, - branchBeginStep); + branchRuntimeEnvironment, + executionRuntimeServices, + refId, + branchBeginStep); // take path up level runEnv.getExecutionPath().up(); @@ -155,75 +164,133 @@ public void addBranches(@Param(ScoreLangConstants.PARALLEL_LOOP_STATEMENT_KEY) L updateCallArgumentsAndPushContextToStack(runEnv, flowContext, new HashMap<>(), new HashMap<>()); } catch (RuntimeException e) { logger.error("There was an error running the add branches execution step of: \'" + nodeName + - "\'. Error is: " + e.getMessage()); + "\'. Error is: " + e.getMessage()); throw new RuntimeException("Error running: " + nodeName + ": " + e.getMessage(), e); } } - public void joinBranches(@Param(ScoreLangConstants.RUN_ENV) RunEnvironment runEnv, + public void joinBranches(@Param(RUN_ENV) RunEnvironment runEnv, @Param(EXECUTION_RUNTIME_SERVICES) ExecutionRuntimeServices executionRuntimeServices, @Param(ScoreLangConstants.STEP_PUBLISH_KEY) List stepPublishValues, @Param(ScoreLangConstants.STEP_NAVIGATION_KEY) - Map stepNavigationValues, - @Param(ScoreLangConstants.NODE_NAME_KEY) String nodeName) { + Map stepNavigationValues, + @Param(ScoreLangConstants.NODE_NAME_KEY) String nodeName, + @Param(CURRENT_STEP_ID_KEY) Long currentStepId) { try { - runEnv.getExecutionPath().up(); notNull(executionRuntimeServices.getLevelParallelism(), "Parallelism level can not be null"); - List> branchesContext = Lists.newArrayList(); if ((int) executionRuntimeServices.getLevelParallelism() > 0) { executionRuntimeServices.setLevelParallelism((int) executionRuntimeServices.getLevelParallelism() - 1); } - Context flowContext = runEnv.getStack().popContext(); + ArrayList> temporaryBranchesContext = + executionRuntimeServices.getParallelTemporaryContext(); + collectBranchesData(executionRuntimeServices, nodeName, temporaryBranchesContext); + + if (isLastIteration(executionRuntimeServices.getRemainingBranches())) { + // bind step results, outputs and handle navigation + handleLastIteration( + runEnv, + executionRuntimeServices, + stepPublishValues, + stepNavigationValues, + nodeName, + temporaryBranchesContext); + } else { + // fail only in the last iteration + executionRuntimeServices.removeStepErrorKey(); + runEnv.putNextStepPosition(currentStepId); + } + + runEnv.getExecutionPath().forward(); + } catch (RuntimeException e) { + logger.error("There was an error running the joinBranches execution step of: \'" + nodeName + + "\'. Error is: " + e.getMessage()); + throw new RuntimeException("Error running: \'" + nodeName + "\': \n" + e.getMessage(), e); + } + } - collectBranchesData(executionRuntimeServices, nodeName, branchesContext); - Map outputBindingContext = new HashMap<>(); - outputBindingContext.put( - RuntimeConstants.BRANCHES_CONTEXT_KEY, - ValueFactory.create((Serializable) branchesContext) - ); - - Map globalContext = flowContext.getImmutableViewOfMagicVariables(); - Map publishValues = - bindPublishValues( - runEnv, + private List handleFirstIteration(LoopStatement parallelLoopStatement, + RunEnvironment runEnv, + ExecutionRuntimeServices executionRuntimeServices, + String nodeName, + Context flowContext) { + + List splitData = (ArrayList) executionRuntimeServices.getSplitData(); + + // split data is not set for first iteration + if (isEmpty(splitData)) { + splitData = parallelLoopBinding.bindParallelLoopList(parallelLoopStatement, flowContext, + runEnv.getSystemProperties(), nodeName); + executionRuntimeServices.setSplitDataSize(splitData.size()); + executionRuntimeServices.setParallelTemporaryContext(Lists.newArrayList()); + fireEvent( executionRuntimeServices, - stepPublishValues, - stepNavigationValues, + ScoreLangConstants.EVENT_SPLIT_BRANCHES, + "parallel loop expression bound", + runEnv.getExecutionPath().getCurrentPath(), + LanguageEventData.StepType.STEP, nodeName, - outputBindingContext, - globalContext - ); + flowContext.getImmutableViewOfVariables(), + Pair.of(LanguageEventData.BOUND_PARALLEL_LOOP_EXPRESSION, (Serializable) splitData)); + runEnv.getExecutionPath().down(); + } - flowContext.putVariables(publishValues); + return splitData; + } + + private void handleLastIteration(RunEnvironment runEnv, + ExecutionRuntimeServices executionRuntimeServices, + List stepPublishValues, + Map stepNavigationValues, + String nodeName, + ArrayList> temporaryBranchesContext) { - String parallelLoopResult = getParallelLoopResult(branchesContext); + clearExecutionRuntimeForNextStep(executionRuntimeServices); + runEnv.getExecutionPath().up(); + + if (temporaryBranchesContext.size() < executionRuntimeServices.removeSplitDataSize()) { + throw new RuntimeException("Exception occurred when running lane"); + } - handleNavigationAndReturnValues( + Context flowContext = runEnv.getStack().popContext(); + Map globalContext = flowContext.getImmutableViewOfMagicVariables(); + Map outputBindingContext = new HashMap<>(); + outputBindingContext.put( + BRANCHES_CONTEXT_KEY, + ValueFactory.create(temporaryBranchesContext) + ); + Map publishValues = bindPublishValues( + runEnv, + executionRuntimeServices, + stepPublishValues, + stepNavigationValues, + nodeName, + outputBindingContext, + globalContext + ); + + flowContext.putVariables(publishValues); + + String parallelLoopResult = getParallelLoopResult(temporaryBranchesContext); + handleNavigationAndReturnValues( runEnv, executionRuntimeServices, stepNavigationValues, nodeName, publishValues, parallelLoopResult - ); + ); - runEnv.getStack().pushContext(flowContext); - runEnv.getExecutionPath().forward(); - } catch (RuntimeException e) { - logger.error("There was an error running the joinBranches execution step of: \'" + nodeName + - "\'. Error is: " + e.getMessage()); - throw new RuntimeException("Error running: \'" + nodeName + "\': \n" + e.getMessage(), e); - } + runEnv.getStack().pushContext(flowContext); } private void handleNavigationAndReturnValues( - RunEnvironment runEnv, - ExecutionRuntimeServices executionRuntimeServices, - Map stepNavigationValues, - String nodeName, - Map publishValues, - String parallelLoopResult) { + RunEnvironment runEnv, + ExecutionRuntimeServices executionRuntimeServices, + Map stepNavigationValues, + String nodeName, + Map publishValues, + String parallelLoopResult) { // set the position of the next step - for the use of the navigation // find in the navigation values the correct next step position, according to the parallel loop result, // and set it @@ -231,7 +298,7 @@ private void handleNavigationAndReturnValues( if (navigation == null) { // should always have the executable response mapped to a navigation by the step, if not, it is an error throw new RuntimeException("Step: " + nodeName + - " has no matching navigation for the parallel loop result: " + parallelLoopResult); + " has no matching navigation for the parallel loop result: " + parallelLoopResult); } Long nextStepPosition = navigation.getNextStepId(); String presetResult = navigation.getPresetResult(); @@ -240,15 +307,15 @@ private void handleNavigationAndReturnValues( ReturnValues returnValues = new ReturnValues(outputs, presetResult != null ? presetResult : parallelLoopResult); fireEvent( - executionRuntimeServices, - runEnv, - ScoreLangConstants.EVENT_JOIN_BRANCHES_END, - "Parallel loop output binding finished", - LanguageEventData.StepType.STEP, nodeName, - new HashMap(), - Pair.of(LanguageEventData.OUTPUTS, (Serializable) publishValues), - Pair.of(LanguageEventData.RESULT, returnValues.getResult()), - Pair.of(LanguageEventData.NEXT_STEP_POSITION, nextStepPosition)); + executionRuntimeServices, + runEnv, + ScoreLangConstants.EVENT_JOIN_BRANCHES_END, + "Parallel loop output binding finished", + LanguageEventData.StepType.STEP, nodeName, + new HashMap(), + Pair.of(LanguageEventData.OUTPUTS, (Serializable) publishValues), + Pair.of(LanguageEventData.RESULT, returnValues.getResult()), + Pair.of(LanguageEventData.NEXT_STEP_POSITION, nextStepPosition)); runEnv.putReturnValues(returnValues); runEnv.putNextStepPosition(nextStepPosition); @@ -268,75 +335,77 @@ private String getParallelLoopResult(List> branchesCon } private Map bindPublishValues( - RunEnvironment runEnv, - ExecutionRuntimeServices executionRuntimeServices, - List stepPublishValues, - Map stepNavigationValues, - String nodeName, - Map publishContext, - Map globalContext) { + RunEnvironment runEnv, + ExecutionRuntimeServices executionRuntimeServices, + List stepPublishValues, + Map stepNavigationValues, + String nodeName, + Map publishContext, + Map globalContext) { fireEvent( - executionRuntimeServices, - runEnv, - ScoreLangConstants.EVENT_JOIN_BRANCHES_START, - "Parallel loop output binding started", - LanguageEventData.StepType.STEP, - nodeName, - new HashMap(), - Pair.of(ScoreLangConstants.STEP_PUBLISH_KEY, (Serializable) stepPublishValues), - Pair.of(ScoreLangConstants.STEP_NAVIGATION_KEY, (Serializable) stepNavigationValues)); + executionRuntimeServices, + runEnv, + ScoreLangConstants.EVENT_JOIN_BRANCHES_START, + "Parallel loop output binding started", + LanguageEventData.StepType.STEP, + nodeName, + new HashMap(), + Pair.of(ScoreLangConstants.STEP_PUBLISH_KEY, (Serializable) stepPublishValues), + Pair.of(ScoreLangConstants.STEP_NAVIGATION_KEY, (Serializable) stepNavigationValues)); ReadOnlyContextAccessor outputsBindingAccessor = new ReadOnlyContextAccessor(publishContext, globalContext); return outputsBinding.bindOutputs( - outputsBindingAccessor, - runEnv.getSystemProperties(), - stepPublishValues + outputsBindingAccessor, + runEnv.getSystemProperties(), + stepPublishValues ); } private void collectBranchesData( - ExecutionRuntimeServices executionRuntimeServices, - String nodeName, - List> branchesContext) { + ExecutionRuntimeServices executionRuntimeServices, + String nodeName, + List> branchesContext) { List branches = executionRuntimeServices.getFinishedChildBranchesData(); Double roiBeforeParallelLoop = executionRuntimeServices.getRoiValue(); for (EndBranchDataContainer branch : branches) { - checkExceptionInBranch(branch); - - Map branchContext = branch.getContexts(); - RunEnvironment branchRuntimeEnvironment = (RunEnvironment) branchContext.get(ScoreLangConstants.RUN_ENV); - Map initialBranchContext = - branchRuntimeEnvironment.getStack().popContext().getImmutableViewOfVariables(); - Map branchContextMap = convert(initialBranchContext); - ReturnValues executableReturnValues = branchRuntimeEnvironment.removeReturnValues(); - String branchResult = executableReturnValues.getResult(); - branchContextMap.put(ScoreLangConstants.BRANCH_RESULT_KEY, branchResult); - branchesContext.add(branchContextMap); - - // up branch path - branchRuntimeEnvironment.getExecutionPath().up(); - - // The ROI value for each branch does already contain any previous ROI value, so we need to subtract it - Double branchRoi = (Double) branch.getSystemContext() - .getOrDefault(EXECUTION_TOTAL_ROI, DEFAULT_ROI_VALUE) - roiBeforeParallelLoop; - executionRuntimeServices.addRoiValue(branchRoi); + boolean isBranchException = checkExceptionInBranch(branch); + + if (!isBranchException) { + Map branchContext = branch.getContexts(); + RunEnvironment branchRuntimeEnvironment = (RunEnvironment) branchContext.get(RUN_ENV); + Map initialBranchContext = + branchRuntimeEnvironment.getStack().popContext().getImmutableViewOfVariables(); + Map branchContextMap = convert(initialBranchContext); + ReturnValues executableReturnValues = branchRuntimeEnvironment.removeReturnValues(); + String branchResult = executableReturnValues.getResult(); + branchContextMap.put(ScoreLangConstants.BRANCH_RESULT_KEY, branchResult); + branchesContext.add(branchContextMap); + + // up branch path + branchRuntimeEnvironment.getExecutionPath().up(); + + // The ROI value for each branch does already contain any previous ROI value, so we need to subtract it + Double branchRoi = (Double) branch.getSystemContext() + .getOrDefault(EXECUTION_TOTAL_ROI, DEFAULT_ROI_VALUE) - roiBeforeParallelLoop; + executionRuntimeServices.addRoiValue(branchRoi); - fireEvent( - executionRuntimeServices, - branchRuntimeEnvironment, - ScoreLangConstants.EVENT_BRANCH_END, - "Parallel loop branch ended", - LanguageEventData.StepType.STEP, - nodeName, - initialBranchContext, - Pair.of(RuntimeConstants.BRANCH_RETURN_VALUES_KEY, executableReturnValues) - ); + fireEvent( + executionRuntimeServices, + branchRuntimeEnvironment, + ScoreLangConstants.EVENT_BRANCH_END, + "Parallel loop branch ended", + LanguageEventData.StepType.STEP, + nodeName, + initialBranchContext, + Pair.of(RuntimeConstants.BRANCH_RETURN_VALUES_KEY, executableReturnValues) + ); + } } } - private void checkExceptionInBranch(EndBranchDataContainer branch) { + private boolean checkExceptionInBranch(EndBranchDataContainer branch) { //first we check that no exception was thrown during the execution of the branch String branchException = branch.getException(); if (StringUtils.isNotEmpty(branchException)) { @@ -347,8 +416,9 @@ private void checkExceptionInBranch(EndBranchDataContainer branch) { branchId = branchExecutionRuntimeServices.getBranchId(); } logger.error("There was an error running branch: " + branchId + " Error is: " + branchException); - throw new RuntimeException(BRANCH_EXCEPTION_PREFIX + ": \n" + branchException); + return true; } + return false; } private void createBranch(RunEnvironment runEnv, @@ -356,7 +426,7 @@ private void createBranch(RunEnvironment runEnv, String refId, Long branchBeginStep) { Map branchContext = new HashMap<>(); - branchContext.put(ScoreLangConstants.RUN_ENV, runEnv); + branchContext.put(RUN_ENV, runEnv); executionRuntimeServices.addBranchForParallelLoop(branchBeginStep, refId, branchContext); } @@ -367,4 +437,23 @@ private Map convert(Map map) { } return result; } + + /** + * Returns the number of lanes to start executing from the system context, depending on the throttle size value. + */ + private int calculateNumberOfLanesToStart(int splitSize, Integer throttleSize) { + return throttleSize == null ? + splitSize : (splitSize % throttleSize == 0) ? throttleSize : (splitSize % throttleSize); + } + + private boolean isLastIteration(String remainingBranches) { + return isNotBlank(remainingBranches) && parseInt(remainingBranches) == 0; + } + + private void clearExecutionRuntimeForNextStep(ExecutionRuntimeServices executionRuntimeServices) { + executionRuntimeServices.removeRemainingBranches(); + executionRuntimeServices.removeParallelTemporaryContext(); + executionRuntimeServices.removeSplitData(); + executionRuntimeServices.removeThrottleSize(); + } } diff --git a/cloudslang-runtime/src/test/java/io/cloudslang/lang/runtime/steps/ParallelLoopStepsTest.java b/cloudslang-runtime/src/test/java/io/cloudslang/lang/runtime/steps/ParallelLoopStepsTest.java index a4d7de9f59..82fd995925 100644 --- a/cloudslang-runtime/src/test/java/io/cloudslang/lang/runtime/steps/ParallelLoopStepsTest.java +++ b/cloudslang-runtime/src/test/java/io/cloudslang/lang/runtime/steps/ParallelLoopStepsTest.java @@ -41,9 +41,7 @@ import io.cloudslang.score.lang.ExecutionRuntimeServices; import org.junit.Assert; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.springframework.beans.factory.annotation.Autowired; @@ -106,11 +104,11 @@ public void resetMocks() { public void testBranchesAreCreated() throws Exception { // prepare arguments ListLoopStatement parallelLoopStatement = new ListLoopStatement("varName", "expression", - new HashSet(), new HashSet(), true); + new HashSet(), new HashSet(), true); RunEnvironment runEnvironment = new RunEnvironment(); Map variables = new HashMap<>(); - Context context = new Context(variables,Collections.emptyMap()); + Context context = new Context(variables, Collections.emptyMap()); runEnvironment.getStack().pushContext(context); String nodeName = "nodeName"; @@ -119,38 +117,38 @@ public void testBranchesAreCreated() throws Exception { // prepare mocks ExecutionRuntimeServices executionRuntimeServices = mock(ExecutionRuntimeServices.class); List expectedSplitData = newArrayList(ValueFactory.create(1), - ValueFactory.create(2), ValueFactory.create(3)); + ValueFactory.create(2), ValueFactory.create(3)); when(parallelLoopBinding.bindParallelLoopList(eq(parallelLoopStatement), - eq(context), eq(runEnvironment.getSystemProperties()), eq(nodeName))) - .thenReturn(expectedSplitData); + eq(context), eq(runEnvironment.getSystemProperties()), eq(nodeName))) + .thenReturn(expectedSplitData); Long branchBeginStepId = 3L; // call method parallelLoopSteps.addBranches( - parallelLoopStatement, - runEnvironment, - executionRuntimeServices, - nodeName, - 1234L, - 5L, - branchBeginStepId, - refId + parallelLoopStatement, + runEnvironment, + executionRuntimeServices, + nodeName, + 1234L, + 5L, + branchBeginStepId, + refId ); // verify expected behaviour ArgumentCaptor branchContextArgumentCaptor = ArgumentCaptor.forClass(Map.class); //noinspection unchecked verify(executionRuntimeServices, times(3)) - .addBranchForParallelLoop(eq(branchBeginStepId), eq(refId), branchContextArgumentCaptor.capture()); + .addBranchForParallelLoop(eq(branchBeginStepId), eq(refId), branchContextArgumentCaptor.capture()); List branchContexts = branchContextArgumentCaptor.getAllValues(); List actualSplitData = newArrayList(); for (Map branchContext : branchContexts) { assertTrue("runtime environment not found in branch context", - branchContext.containsKey(ScoreLangConstants.RUN_ENV)); + branchContext.containsKey(ScoreLangConstants.RUN_ENV)); RunEnvironment branchRunEnvironment = (RunEnvironment) branchContext.get(ScoreLangConstants.RUN_ENV); Map branchVariables = - branchRunEnvironment.getStack().popContext().getImmutableViewOfVariables(); + branchRunEnvironment.getStack().popContext().getImmutableViewOfVariables(); actualSplitData.add(branchVariables.get("varName")); } Assert.assertEquals(expectedSplitData, actualSplitData); @@ -162,11 +160,11 @@ public void testBranchesAreCreated() throws Exception { public void testAddBranchesEventsAreFired() throws Exception { // prepare arguments ListLoopStatement parallelLoopStatement = new ListLoopStatement("varName", "expression", - new HashSet(), new HashSet(), true); + new HashSet(), new HashSet(), true); RunEnvironment runEnvironment = new RunEnvironment(); Map variables = new HashMap<>(); - Context context = new Context(variables,Collections.emptyMap()); + Context context = new Context(variables, Collections.emptyMap()); runEnvironment.getStack().pushContext(context); String nodeName = "nodeName"; @@ -175,34 +173,34 @@ public void testAddBranchesEventsAreFired() throws Exception { // prepare mocks ExecutionRuntimeServices executionRuntimeServices = mock(ExecutionRuntimeServices.class); List expectedSplitData = newArrayList(ValueFactory.create(1), - ValueFactory.create(2), ValueFactory.create(3)); + ValueFactory.create(2), ValueFactory.create(3)); when(parallelLoopBinding.bindParallelLoopList(eq(parallelLoopStatement), - eq(context), eq(runEnvironment.getSystemProperties()), eq(nodeName))) - .thenReturn(expectedSplitData); + eq(context), eq(runEnvironment.getSystemProperties()), eq(nodeName))) + .thenReturn(expectedSplitData); Long branchBeginStepId = 0L; // call method parallelLoopSteps.addBranches( - parallelLoopStatement, - runEnvironment, - executionRuntimeServices, - nodeName, - 1234L, - 5L, - branchBeginStepId, - refId + parallelLoopStatement, + runEnvironment, + executionRuntimeServices, + nodeName, + 1234L, + 5L, + branchBeginStepId, + refId ); // verify expected behaviour ArgumentCaptor eventTypeArgumentCaptor = ArgumentCaptor.forClass(String.class); verify(executionRuntimeServices, times(4)) - .addEvent(eventTypeArgumentCaptor.capture(), any(LanguageEventData.class)); + .addEvent(eventTypeArgumentCaptor.capture(), any(LanguageEventData.class)); List expectedEventTypesInOrder = newArrayList( - ScoreLangConstants.EVENT_SPLIT_BRANCHES, - ScoreLangConstants.EVENT_BRANCH_START, - ScoreLangConstants.EVENT_BRANCH_START, - ScoreLangConstants.EVENT_BRANCH_START + ScoreLangConstants.EVENT_SPLIT_BRANCHES, + ScoreLangConstants.EVENT_BRANCH_START, + ScoreLangConstants.EVENT_BRANCH_START, + ScoreLangConstants.EVENT_BRANCH_START ); List actualEventTypesInOrder = eventTypeArgumentCaptor.getAllValues(); Assert.assertEquals(expectedEventTypesInOrder, actualEventTypesInOrder); @@ -216,7 +214,7 @@ public void testJoinBranchesPublish() throws Exception { RunEnvironment runEnvironment = new RunEnvironment(); runEnvironment.getExecutionPath().down(); Map variables = new HashMap<>(); - Context context = new Context(variables,Collections.emptyMap()); + Context context = new Context(variables, Collections.emptyMap()); runEnvironment.getStack().pushContext(context); @@ -248,7 +246,7 @@ public void testJoinBranchesPublish() throws Exception { // call method parallelLoopSteps.joinBranches(runEnvironment, executionRuntimeServices, - stepPublishValues, stepNavigationValues, nodeName); + stepPublishValues, stepNavigationValues, nodeName, 4L); // verify expected behaviour ArgumentCaptor aggregateContextArgumentCaptor = @@ -282,11 +280,11 @@ public void testJoinBranchesNavigationAllBranchesSucced() throws Exception { RunEnvironment runEnvironment = new RunEnvironment(); runEnvironment.getExecutionPath().down(); Map variables = new HashMap<>(); - Context context = new Context(variables,Collections.emptyMap()); + Context context = new Context(variables, Collections.emptyMap()); runEnvironment.getStack().pushContext(context); final List stepPublishValues = - newArrayList(new Output("outputName", ValueFactory.create("outputExpression"))); + newArrayList(new Output("outputName", ValueFactory.create("outputExpression"))); Map stepNavigationValues = new HashMap<>(); ResultNavigation successNavigation = new ResultNavigation(0L, "CUSTOM_SUCCESS"); @@ -305,18 +303,19 @@ public void testJoinBranchesNavigationAllBranchesSucced() throws Exception { runtimeContext3.put("branch3Output", 3); ExecutionRuntimeServices executionRuntimeServices = createAndConfigureExecutionRuntimeServicesMock( - runtimeContext1, - runtimeContext2, - runtimeContext3 + runtimeContext1, + runtimeContext2, + runtimeContext3 ); // call method parallelLoopSteps.joinBranches( - runEnvironment, - executionRuntimeServices, - stepPublishValues, - stepNavigationValues, - nodeName + runEnvironment, + executionRuntimeServices, + stepPublishValues, + stepNavigationValues, + nodeName, + 4L ); // verify expected behaviour @@ -333,11 +332,11 @@ public void testJoinBranchesNavigationOneBranchFails() throws Exception { RunEnvironment runEnvironment = new RunEnvironment(); runEnvironment.getExecutionPath().down(); Map variables = new HashMap<>(); - Context context = new Context(variables,Collections.emptyMap()); + Context context = new Context(variables, Collections.emptyMap()); runEnvironment.getStack().pushContext(context); final List stepPublishValues = - newArrayList(new Output("outputName", ValueFactory.create("outputExpression"))); + newArrayList(new Output("outputName", ValueFactory.create("outputExpression"))); Map stepNavigationValues = new HashMap<>(); ResultNavigation successNavigation = new ResultNavigation(0L, "CUSTOM_SUCCESS"); @@ -356,27 +355,28 @@ public void testJoinBranchesNavigationOneBranchFails() throws Exception { runtimeContext3.put("branch3Output", 3); ReturnValues returnValues1 = new ReturnValues(new HashMap(), - ScoreLangConstants.SUCCESS_RESULT); + ScoreLangConstants.SUCCESS_RESULT); ReturnValues returnValues2 = new ReturnValues(new HashMap(), - ScoreLangConstants.FAILURE_RESULT); + ScoreLangConstants.FAILURE_RESULT); ReturnValues returnValues3 = new ReturnValues(new HashMap(), - ScoreLangConstants.SUCCESS_RESULT); + ScoreLangConstants.SUCCESS_RESULT); ExecutionRuntimeServices executionRuntimeServices = createAndConfigureExecutionRuntimeServicesMock( - runtimeContext1, - runtimeContext2, - runtimeContext3, - returnValues1, - returnValues2, - returnValues3 + runtimeContext1, + runtimeContext2, + runtimeContext3, + returnValues1, + returnValues2, + returnValues3 ); // call method parallelLoopSteps.joinBranches( - runEnvironment, - executionRuntimeServices, - stepPublishValues, - stepNavigationValues, - nodeName + runEnvironment, + executionRuntimeServices, + stepPublishValues, + stepNavigationValues, + nodeName, + 4L ); // verify expected behaviour @@ -393,11 +393,11 @@ public void testJoinBranchesEventsAreFired() throws Exception { RunEnvironment runEnvironment = new RunEnvironment(); runEnvironment.getExecutionPath().down(); Map variables = new HashMap<>(); - Context context = new Context(variables,Collections.emptyMap()); + Context context = new Context(variables, Collections.emptyMap()); runEnvironment.getStack().pushContext(context); final List stepPublishValues = - newArrayList(new Output("outputName", ValueFactory.create("outputExpression"))); + newArrayList(new Output("outputName", ValueFactory.create("outputExpression"))); Map stepNavigationValues = new HashMap<>(); ResultNavigation successNavigation = new ResultNavigation(0L, ScoreLangConstants.SUCCESS_RESULT); @@ -416,31 +416,32 @@ public void testJoinBranchesEventsAreFired() throws Exception { runtimeContext3.put("branch3Output", 3); ExecutionRuntimeServices executionRuntimeServices = createAndConfigureExecutionRuntimeServicesMock( - runtimeContext1, - runtimeContext2, - runtimeContext3 + runtimeContext1, + runtimeContext2, + runtimeContext3 ); // call method parallelLoopSteps.joinBranches( - runEnvironment, - executionRuntimeServices, - stepPublishValues, - stepNavigationValues, - nodeName + runEnvironment, + executionRuntimeServices, + stepPublishValues, + stepNavigationValues, + nodeName, + 4L ); // verify expected behaviour ArgumentCaptor eventTypeArgumentCaptor = ArgumentCaptor.forClass(String.class); verify(executionRuntimeServices, times(5)) - .addEvent(eventTypeArgumentCaptor.capture(), any(LanguageEventData.class)); + .addEvent(eventTypeArgumentCaptor.capture(), any(LanguageEventData.class)); List expectedEventTypesInOrder = newArrayList( - ScoreLangConstants.EVENT_BRANCH_END, - ScoreLangConstants.EVENT_BRANCH_END, - ScoreLangConstants.EVENT_BRANCH_END, - ScoreLangConstants.EVENT_JOIN_BRANCHES_START, - ScoreLangConstants.EVENT_JOIN_BRANCHES_END + ScoreLangConstants.EVENT_BRANCH_END, + ScoreLangConstants.EVENT_BRANCH_END, + ScoreLangConstants.EVENT_BRANCH_END, + ScoreLangConstants.EVENT_JOIN_BRANCHES_START, + ScoreLangConstants.EVENT_JOIN_BRANCHES_END ); List actualEventTypesInOrder = eventTypeArgumentCaptor.getAllValues(); Assert.assertEquals(expectedEventTypesInOrder, actualEventTypesInOrder); @@ -463,42 +464,40 @@ public void testExceptionIsCapturedFromBranches() throws Exception { executionRuntimeServices, new ArrayList(0), new HashMap(), - "nodeName" + "nodeName", + 4L )); - Assert.assertEquals("Error running: 'nodeName': \n" + - "Error running branch: \n" + - "Exception details placeholder", - exception.getMessage()); + Assert.assertNotNull(exception); } private ExecutionRuntimeServices createAndConfigureExecutionRuntimeServicesMock( - Map runtimeContext1, - Map runtimeContext2, - Map runtimeContext3) { + Map runtimeContext1, + Map runtimeContext2, + Map runtimeContext3) { ReturnValues returnValues1 = new ReturnValues(new HashMap(), - ScoreLangConstants.SUCCESS_RESULT); + ScoreLangConstants.SUCCESS_RESULT); ReturnValues returnValues2 = new ReturnValues(new HashMap(), - ScoreLangConstants.SUCCESS_RESULT); + ScoreLangConstants.SUCCESS_RESULT); ReturnValues returnValues3 = new ReturnValues(new HashMap(), - ScoreLangConstants.SUCCESS_RESULT); + ScoreLangConstants.SUCCESS_RESULT); return createAndConfigureExecutionRuntimeServicesMock( - runtimeContext1, - runtimeContext2, - runtimeContext3, - returnValues1, - returnValues2, - returnValues3 + runtimeContext1, + runtimeContext2, + runtimeContext3, + returnValues1, + returnValues2, + returnValues3 ); } private ExecutionRuntimeServices createAndConfigureExecutionRuntimeServicesMock( - Map runtimeContext1, - Map runtimeContext2, - Map runtimeContext3, - ReturnValues returnValues1, - ReturnValues returnValues2, - ReturnValues returnValues3) { + Map runtimeContext1, + Map runtimeContext2, + Map runtimeContext3, + ReturnValues returnValues1, + ReturnValues returnValues2, + ReturnValues returnValues3) { final ExecutionRuntimeServices executionRuntimeServices = mock(ExecutionRuntimeServices.class); final Map branchContext1 = new HashMap<>(); @@ -521,12 +520,13 @@ private ExecutionRuntimeServices createAndConfigureExecutionRuntimeServicesMock( branchContext3.put(ScoreLangConstants.RUN_ENV, branchRuntimeEnvironment3); List branchesContainers = newArrayList( - new EndBranchDataContainer(branchContext1, new HashMap(), null), - new EndBranchDataContainer(branchContext2, new HashMap(), null), - new EndBranchDataContainer(branchContext3, new HashMap(), null) + new EndBranchDataContainer(branchContext1, new HashMap(), null), + new EndBranchDataContainer(branchContext2, new HashMap(), null), + new EndBranchDataContainer(branchContext3, new HashMap(), null) ); when(executionRuntimeServices.getFinishedChildBranchesData()).thenReturn(branchesContainers); when(executionRuntimeServices.getLevelParallelism()).thenReturn(2); + when(executionRuntimeServices.getRemainingBranches()).thenReturn("0"); return executionRuntimeServices; } @@ -535,19 +535,20 @@ private Context createContext(Map runtimeContext) { for (Map.Entry entry : runtimeContext.entrySet()) { context.put(entry.getKey(), ValueFactory.create(entry.getValue())); } - return new Context(context,null); + return new Context(context, null); } private ExecutionRuntimeServices createExecutionRuntimeServicesMockWithBranchException() { ExecutionRuntimeServices executionRuntimeServices = mock(ExecutionRuntimeServices.class); List branchesContainers = newArrayList( - new EndBranchDataContainer( - new HashMap(), - new HashMap(), - BRANCH_EXCEPTION_MESSAGE) + new EndBranchDataContainer( + new HashMap(), + new HashMap(), + BRANCH_EXCEPTION_MESSAGE) ); when(executionRuntimeServices.getFinishedChildBranchesData()).thenReturn(branchesContainers); when(executionRuntimeServices.getLevelParallelism()).thenReturn(1); + when(executionRuntimeServices.getRemainingBranches()).thenReturn("0"); return executionRuntimeServices; }