Skip to content

Commit

Permalink
Address code review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Sep 19, 2023
1 parent 136ceaa commit d1cbcbd
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 5 deletions.
2 changes: 1 addition & 1 deletion formatter/formatting.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ allprojects {

trimTrailingWhitespace()
endWithNewline()
indentWithSpaces(4)
indentWithSpaces()
}
format("license", {
licenseHeaderFile("${rootProject.file("formatter/license-header.txt")}", "package ");
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/demo/CreateIndexWorkflowStep.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public CreateIndexWorkflowStep() {
@Override
public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) {
CompletableFuture<WorkflowData> future = new CompletableFuture<>();
// TODO we will be passing a thread pool to this object when it's instantiated
// we should either add the generic executor from that pool to this call
// or use executorservice.submit or any of various threading options
// https://github.com/opensearch-project/opensearch-ai-flow-framework/issues/42
CompletableFuture.runAsync(() -> {
String inputIndex = null;
boolean first = true;
Expand All @@ -61,8 +65,6 @@ public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) {
} catch (InterruptedException e) {}
// Simulate response of created index
CreateIndexResponse response = new CreateIndexResponse(true, true, inputIndex);
// OLD UNSCALABLE WAY: future.complete(new CreateIndexResponseData(response));
// Better way with an anonymous class:
future.complete(new WorkflowData() {
@Override
public Map<String, Object> getContent() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,23 @@ void setPredecessors(Set<ProcessNode> predecessors) {
*/
public CompletableFuture<WorkflowData> execute() {
this.future = new CompletableFuture<>();
// TODO this class will be instantiated with the OpenSearch thread pool (or one for tests!)
// the generic executor from that pool should be passed to this runAsync call
// https://github.com/opensearch-project/opensearch-ai-flow-framework/issues/42
CompletableFuture.runAsync(() -> {
List<CompletableFuture<WorkflowData>> predFutures = predecessors.stream().map(p -> p.getFuture()).collect(Collectors.toList());
if (!predecessors.isEmpty()) {
CompletableFuture<Void> waitForPredecessors = CompletableFuture.allOf(predFutures.toArray(new CompletableFuture<?>[0]));

Check warning on line 131 in src/main/java/org/opensearch/flowframework/template/ProcessNode.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/template/ProcessNode.java#L131

Added line #L131 was not covered by tests
try {
// We need timeouts to be part of the user template or in settings
// https://github.com/opensearch-project/opensearch-ai-flow-framework/issues/45
waitForPredecessors.orTimeout(30, TimeUnit.SECONDS).get();
} catch (InterruptedException | ExecutionException e) {
handleException(e);
return;
}

Check warning on line 139 in src/main/java/org/opensearch/flowframework/template/ProcessNode.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/template/ProcessNode.java#L135-L139

Added lines #L135 - L139 were not covered by tests
}
logger.debug(">>> Starting {}.", this.id);
logger.info(">>> Starting {}.", this.id);
// get the input data from predecessor(s)
List<WorkflowData> input = new ArrayList<WorkflowData>();
input.add(this.input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ public static List<ProcessNode> parseJsonGraphToSequence(String json, Map<String
String nodeId = nodeObject.get(NODE_ID).getAsString();
// The below steps will be replaced by a generator class that instantiates a WorkflowStep
// based on user_input data from the template.
// https://github.com/opensearch-project/opensearch-ai-flow-framework/issues/41
WorkflowStep workflowStep = workflowSteps.get(nodeId);
// temporary demo POC of getting from a request to input data
// this will be refactored into something pulling from user template
// this will be refactored into something pulling from user template as part of the above issue
WorkflowData inputData = WorkflowData.EMPTY;
if (List.of("create_index", "create_another_index").contains(nodeId)) {
CreateIndexRequest request = new CreateIndexRequest(nodeObject.get("index_name").getAsString());
Expand Down

0 comments on commit d1cbcbd

Please sign in to comment.