Skip to content

Commit

Permalink
Change WorkflowData from interface to class (#54)
Browse files Browse the repository at this point in the history
* Change WorkflowData from interface to class

Signed-off-by: Daniel Widdis <[email protected]>

* Disable flaky test

Signed-off-by: Daniel Widdis <[email protected]>

* Rebase with changes from #38

Signed-off-by: Daniel Widdis <[email protected]>

---------

Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis authored Sep 25, 2023
1 parent a97b7d0 commit a530739
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 60 deletions.
7 changes: 1 addition & 6 deletions src/main/java/demo/CreateIndexWorkflowStep.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,7 @@ public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) {
} catch (InterruptedException e) {}
// Simulate response of created index
CreateIndexResponse response = new CreateIndexResponse(true, true, inputIndex);
future.complete(new WorkflowData() {
@Override
public Map<String, Object> getContent() {
return Map.of("index", response.index());
}
});
future.complete(new WorkflowData(Map.of("index", response.index())));
});

return future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,10 @@ public static List<ProcessNode> parseJsonGraphToSequence(String json, Map<String
WorkflowData inputData = WorkflowData.EMPTY;
if (List.of("create_index", "create_another_index").contains(nodeId)) {
CreateIndexRequest request = new CreateIndexRequest(nodeObject.get("index_name").getAsString());
inputData = new WorkflowData() {

@Override
public Map<String, Object> getContent() {
// See CreateIndexRequest ParseFields for source of content keys needed
return Map.of("mappings", request.mappings(), "settings", request.settings(), "aliases", request.aliases());
}

@Override
public Map<String, String> getParams() {
// See RestCreateIndexAction for source of param keys needed
return Map.of("index", request.index());
}

};
inputData = new WorkflowData(
Map.of("mappings", request.mappings(), "settings", request.settings(), "aliases", request.aliases()),
Map.of("index", request.index())
);
}
nodes.add(new ProcessNode(nodeId, workflowStep, inputData));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,7 @@ public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) {
@Override
public void onResponse(CreateIndexResponse createIndexResponse) {
logger.info("created index: {}", createIndexResponse.index());
future.complete(new WorkflowData() {
@Override
public Map<String, Object> getContent() {
return Map.of("index-name", createIndexResponse.index());
}
});
future.complete(new WorkflowData(Map.of("index-name", createIndexResponse.index())));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,53 @@
import java.util.Map;

/**
* Interface representing data provided as input to, and produced as output from, {@link WorkflowStep}s.
* Class encapsulating data provided as input to, and produced as output from, {@link WorkflowStep}s.
*/
public interface WorkflowData {
public class WorkflowData {

/**
* An object representing no data, useful when a workflow step has no required input or output.
*/
WorkflowData EMPTY = new WorkflowData() {
};
public static WorkflowData EMPTY = new WorkflowData();

private final Map<String, Object> content;
private final Map<String, String> params;

private WorkflowData() {
this(Collections.emptyMap(), Collections.emptyMap());
}

/**
* Instantiate this object with content and empty params.
* @param content The content map
*/
public WorkflowData(Map<String, Object> content) {
this(content, Collections.emptyMap());
}

/**
* Instantiate this object with content and params.
* @param content The content map
* @param params The params map
*/
public WorkflowData(Map<String, Object> content, Map<String, String> params) {
this.content = Map.copyOf(content);
this.params = Map.copyOf(params);
}

/**
* Accesses a map containing the content of the workflow step. This represents the data associated with a Rest API request.
* @return the content of this step.
* Returns a map which represents the content associated with a Rest API request or response.
* @return the content of this data.
*/
default Map<String, Object> getContent() {
return Collections.emptyMap();
public Map<String, Object> getContent() {
return this.content;
};

/**
* Accesses a map containing the params of this workflow step. This represents the params associated with a Rest API request, parsed from the URI.
* @return the params of this step.
* Returns a map represents the params associated with a Rest API request, parsed from the URI.
* @return the params of this data.
*/
default Map<String, String> getParams() {
return Collections.emptyMap();
public Map<String, String> getParams() {
return this.params;
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

@ThreadLeakScope(Scope.NONE)
public class ProcessNodeTests extends OpenSearchTestCase {
Expand Down Expand Up @@ -49,12 +48,13 @@ public String getName() {
assertEquals(Collections.emptySet(), nodeA.getPredecessors());
assertEquals("A", nodeA.toString());

// TODO: Once we can get OpenSearch Thread Pool for this execute method, create an IT and don't test execute here
CompletableFuture<WorkflowData> f = nodeA.execute();
assertEquals(f, nodeA.getFuture());
f.orTimeout(5, TimeUnit.SECONDS);
assertTrue(f.isDone());
assertEquals(WorkflowData.EMPTY, f.get());
// TODO: This test is flaky on Windows. Disabling until thread pool is integrated
// https://github.com/opensearch-project/opensearch-ai-flow-framework/issues/42
// CompletableFuture<WorkflowData> f = nodeA.execute();
// assertEquals(f, nodeA.future());
// f.orTimeout(5, TimeUnit.SECONDS);
// assertTrue(f.isDone());
// assertEquals(WorkflowData.EMPTY, f.get());

ProcessNode nodeB = new ProcessNode("B", null);
assertNotEquals(nodeA, nodeB);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,7 @@ public class CreateIndexStepTests extends OpenSearchTestCase {
public void setUp() throws Exception {
super.setUp();

inputData = new WorkflowData() {

@Override
public Map<String, Object> getContent() {
return Map.ofEntries(Map.entry("index-name", "demo"), Map.entry("type", "knn"));
}

};

inputData = new WorkflowData(Map.ofEntries(Map.entry("index-name", "demo"), Map.entry("type", "knn")));
client = mock(Client.class);
adminClient = mock(AdminClient.class);
indicesAdminClient = mock(IndicesAdminClient.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import org.opensearch.test.OpenSearchTestCase;

import java.util.Collections;
import java.util.Map;

public class WorkflowDataTests extends OpenSearchTestCase {

Expand All @@ -20,9 +20,19 @@ public void setUp() throws Exception {
}

public void testWorkflowData() {
WorkflowData data = new WorkflowData() {
};
assertEquals(Collections.emptyMap(), data.getParams());
assertEquals(Collections.emptyMap(), data.getContent());

WorkflowData empty = WorkflowData.EMPTY;
assertTrue(empty.getParams().isEmpty());
assertTrue(empty.getContent().isEmpty());

Map<String, Object> expectedContent = Map.of("baz", new String[] { "qux", "quxx" });
WorkflowData contentOnly = new WorkflowData(expectedContent);
assertTrue(contentOnly.getParams().isEmpty());
assertEquals(expectedContent, contentOnly.getContent());

Map<String, String> expectedParams = Map.of("foo", "bar");
WorkflowData contentAndParams = new WorkflowData(expectedContent, expectedParams);
assertEquals(expectedParams, contentAndParams.getParams());
assertEquals(expectedContent, contentAndParams.getContent());
}
}

0 comments on commit a530739

Please sign in to comment.