diff --git a/CHANGELOG.md b/CHANGELOG.md index d15a7d8b3..7a2c0552e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ### Enhancements - Add guardrails to default use case params ([#658](https://github.com/opensearch-project/flow-framework/pull/658)) - Allow strings for boolean workflow step parameters ([#671](https://github.com/opensearch-project/flow-framework/pull/671)) +- Add optional delay parameter to no-op step ([#674](https://github.com/opensearch-project/flow-framework/pull/674)) ### Bug Fixes - Reset workflow state to initial state after successful deprovision ([#635](https://github.com/opensearch-project/flow-framework/pull/635)) diff --git a/src/main/java/org/opensearch/flowframework/common/CommonValue.java b/src/main/java/org/opensearch/flowframework/common/CommonValue.java index ac40e5f8e..c3a7afb51 100644 --- a/src/main/java/org/opensearch/flowframework/common/CommonValue.java +++ b/src/main/java/org/opensearch/flowframework/common/CommonValue.java @@ -170,6 +170,8 @@ private CommonValue() {} public static final String CONFIGURATIONS = "configurations"; /** Guardrails field */ public static final String GUARDRAILS_FIELD = "guardrails"; + /** Delay field */ + public static final String DELAY_FIELD = "delay"; /* * Constants associated with resource provisioning / state diff --git a/src/main/java/org/opensearch/flowframework/workflow/NoOpStep.java b/src/main/java/org/opensearch/flowframework/workflow/NoOpStep.java index e93aba1cc..af2aea1e3 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/NoOpStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/NoOpStep.java @@ -9,8 +9,17 @@ package org.opensearch.flowframework.workflow; import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.FutureUtils; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.flowframework.exception.WorkflowStepException; +import org.opensearch.flowframework.util.ParseUtils; +import java.util.Collections; import java.util.Map; +import java.util.Set; + +import static org.opensearch.flowframework.common.CommonValue.DELAY_FIELD; /** * A workflow step that does nothing. May be used for synchronizing other actions. @@ -32,6 +41,29 @@ public PlainActionFuture execute( Map params ) { PlainActionFuture future = PlainActionFuture.newFuture(); + + Set requiredKeys = Collections.emptySet(); + Set optionalKeys = Set.of(DELAY_FIELD); + + try { + Map inputs = ParseUtils.getInputsFromPreviousSteps( + requiredKeys, + optionalKeys, + currentNodeInputs, + outputs, + previousNodeInputs, + params + ); + if (inputs.containsKey(DELAY_FIELD)) { + long delay = TimeValue.parseTimeValue(inputs.get(DELAY_FIELD).toString(), DELAY_FIELD).millis(); + Thread.sleep(delay); + } + } catch (IllegalArgumentException iae) { + throw new WorkflowStepException(iae.getMessage(), RestStatus.BAD_REQUEST); + } catch (InterruptedException e) { + FutureUtils.cancel(future); + } + future.onResponse(WorkflowData.EMPTY); return future; } diff --git a/src/test/java/org/opensearch/flowframework/workflow/NoOpStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/NoOpStepTests.java index 21141003f..4aabee215 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/NoOpStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/NoOpStepTests.java @@ -9,10 +9,14 @@ package org.opensearch.flowframework.workflow; import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.flowframework.exception.WorkflowStepException; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; import java.util.Collections; +import java.util.Map; + +import static org.opensearch.flowframework.common.CommonValue.DELAY_FIELD; public class NoOpStepTests extends OpenSearchTestCase { @@ -28,4 +32,32 @@ public void testNoOpStep() throws IOException { ); assertTrue(future.isDone()); } + + public void testNoOpStepDelay() throws IOException, InterruptedException { + NoOpStep noopStep = new NoOpStep(); + WorkflowData delayData = new WorkflowData(Map.of(DELAY_FIELD, "1s"), null, null); + + long start = System.nanoTime(); + PlainActionFuture future = noopStep.execute( + "nodeId", + delayData, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + ); + assertTrue(future.isDone()); + // Sleep isn't exactly accurate so leave 100ms of roundoff + assertTrue(System.nanoTime() - start > 900_000_000L); + } + + public void testNoOpStepParse() throws IOException { + NoOpStep noopStep = new NoOpStep(); + WorkflowData delayData = new WorkflowData(Map.of(DELAY_FIELD, "foo"), null, null); + + Exception ex = assertThrows( + WorkflowStepException.class, + () -> noopStep.execute("nodeId", delayData, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()) + ); + assertEquals("failed to parse setting [delay] with value [foo] as a time value: unit is missing or unrecognized", ex.getMessage()); + } }