Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] GET API to fetch workflow-step.json #397

Merged
merged 1 commit into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.opensearch.flowframework.rest.RestDeprovisionWorkflowAction;
import org.opensearch.flowframework.rest.RestGetWorkflowAction;
import org.opensearch.flowframework.rest.RestGetWorkflowStateAction;
import org.opensearch.flowframework.rest.RestGetWorkflowStepAction;
import org.opensearch.flowframework.rest.RestProvisionWorkflowAction;
import org.opensearch.flowframework.rest.RestSearchWorkflowAction;
import org.opensearch.flowframework.rest.RestSearchWorkflowStateAction;
Expand All @@ -43,6 +44,8 @@
import org.opensearch.flowframework.transport.GetWorkflowAction;
import org.opensearch.flowframework.transport.GetWorkflowStateAction;
import org.opensearch.flowframework.transport.GetWorkflowStateTransportAction;
import org.opensearch.flowframework.transport.GetWorkflowStepAction;
import org.opensearch.flowframework.transport.GetWorkflowStepTransportAction;
import org.opensearch.flowframework.transport.GetWorkflowTransportAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowTransportAction;
Expand Down Expand Up @@ -145,6 +148,7 @@ public List<RestHandler> getRestHandlers(
new RestSearchWorkflowAction(flowFrameworkSettings),
new RestGetWorkflowStateAction(flowFrameworkSettings),
new RestGetWorkflowAction(flowFrameworkSettings),
new RestGetWorkflowStepAction(flowFrameworkSettings),
new RestSearchWorkflowStateAction(flowFrameworkSettings)
);
}
Expand All @@ -159,6 +163,7 @@ public List<RestHandler> getRestHandlers(
new ActionHandler<>(SearchWorkflowAction.INSTANCE, SearchWorkflowTransportAction.class),
new ActionHandler<>(GetWorkflowStateAction.INSTANCE, GetWorkflowStateTransportAction.class),
new ActionHandler<>(GetWorkflowAction.INSTANCE, GetWorkflowTransportAction.class),
new ActionHandler<>(GetWorkflowStepAction.INSTANCE, GetWorkflowStepTransportAction.class),
new ActionHandler<>(SearchWorkflowStateAction.INSTANCE, SearchWorkflowStateTransportAction.class)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.exception.FlowFrameworkException;

Expand All @@ -24,7 +26,7 @@
/**
* This represents an object of workflow steps json which maps each step to expected inputs and outputs
*/
public class WorkflowStepValidator {
public class WorkflowStepValidator implements ToXContentObject {

private static final Logger logger = LogManager.getLogger(WorkflowStepValidator.class);

Expand Down Expand Up @@ -140,4 +142,32 @@
public TimeValue getTimeout() {
return timeout;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject();
xContentBuilder.startArray(INPUTS_FIELD);

Check warning on line 149 in src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java#L148-L149

Added lines #L148 - L149 were not covered by tests
for (String input : this.inputs) {
xContentBuilder.value(input);
}
xContentBuilder.endArray();

Check warning on line 153 in src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java#L151-L153

Added lines #L151 - L153 were not covered by tests

xContentBuilder.startArray(OUTPUTS_FIELD);

Check warning on line 155 in src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java#L155

Added line #L155 was not covered by tests
for (String output : this.outputs) {
xContentBuilder.value(output);
}
xContentBuilder.endArray();

Check warning on line 159 in src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java#L157-L159

Added lines #L157 - L159 were not covered by tests

xContentBuilder.startArray(REQUIRED_PLUGINS);

Check warning on line 161 in src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java#L161

Added line #L161 was not covered by tests
for (String rp : this.requiredPlugins) {
xContentBuilder.value(rp);
}
xContentBuilder.endArray();

Check warning on line 165 in src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java#L163-L165

Added lines #L163 - L165 were not covered by tests

if (timeout != null) {
xContentBuilder.field(TIMEOUT, timeout);

Check warning on line 168 in src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java#L168

Added line #L168 was not covered by tests
}

return xContentBuilder.endObject();

Check warning on line 171 in src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/WorkflowStepValidator.java#L171

Added line #L171 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
*/
package org.opensearch.flowframework.model;

import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.util.ParseUtils;

Expand All @@ -20,7 +23,7 @@
/**
* This represents the workflow steps json which maps each step to expected inputs and outputs
*/
public class WorkflowValidator {
public class WorkflowValidator implements ToXContentObject {

private Map<String, WorkflowStepValidator> workflowStepValidators;

Expand Down Expand Up @@ -63,6 +66,20 @@
return parse(ParseUtils.jsonToParser(json));
}

/**
* Output this object in a compact JSON string.
*
* @return a JSON representation of the template.
*/
public String toJson() {
try {
XContentBuilder builder = JsonXContent.contentBuilder();
return this.toXContent(builder, EMPTY_PARAMS).toString();
} catch (IOException e) {
return "{\"error\": \"couldn't create JSON: " + e.getMessage() + "\"}";

Check warning on line 79 in src/main/java/org/opensearch/flowframework/model/WorkflowValidator.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/WorkflowValidator.java#L76-L79

Added lines #L76 - L79 were not covered by tests
}
}

/**
* Get the map of WorkflowStepValidators
* @return the map of WorkflowStepValidators
Expand All @@ -71,4 +88,8 @@
return Map.copyOf(this.workflowStepValidators);
}

@Override
public XContentBuilder toXContent(XContentBuilder xContentBuilder, Params params) throws IOException {
return xContentBuilder.map(workflowStepValidators);

Check warning on line 93 in src/main/java/org/opensearch/flowframework/model/WorkflowValidator.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/WorkflowValidator.java#L93

Added line #L93 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.rest;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.client.node.NodeClient;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.transport.GetWorkflowStepAction;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;

import java.io.IOException;
import java.util.List;
import java.util.Locale;

import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;

/**
* Rest Action to facilitate requests to get the workflow steps
*/
public class RestGetWorkflowStepAction extends BaseRestHandler {

private static final String GET_WORKFLOW_STEP_ACTION = "get_workflow_step";
private static final Logger logger = LogManager.getLogger(RestGetWorkflowStepAction.class);
private FlowFrameworkSettings flowFrameworkSettings;

/**
* Instantiates a new RestGetWorkflowStepAction
* @param flowFrameworkSettings Whether this API is enabled
*/
public RestGetWorkflowStepAction(FlowFrameworkSettings flowFrameworkSettings) {
this.flowFrameworkSettings = flowFrameworkSettings;
}

@Override
public String getName() {
return GET_WORKFLOW_STEP_ACTION;
}

@Override
public List<Route> routes() {
return List.of(new Route(RestRequest.Method.GET, String.format(Locale.ROOT, "%s/%s", WORKFLOW_URI, "_steps")));
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
try {
if (!flowFrameworkSettings.isFlowFrameworkEnabled()) {
throw new FlowFrameworkException(
"This API is disabled. To enable it, update the setting [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.",
RestStatus.FORBIDDEN
);
}

ActionRequest request = new ActionRequest() {
@Override
public ActionRequestValidationException validate() {
return null;

Check warning on line 75 in src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java#L75

Added line #L75 was not covered by tests
}
};
return channel -> client.execute(GetWorkflowStepAction.INSTANCE, request, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {

Check warning on line 81 in src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java#L79-L81

Added lines #L79 - L81 were not covered by tests
try {
FlowFrameworkException ex = exception instanceof FlowFrameworkException
? (FlowFrameworkException) exception
: new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception));
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));

Check warning on line 87 in src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java#L84-L87

Added lines #L84 - L87 were not covered by tests

} catch (IOException e) {
logger.error("Failed to send back get workflow step exception", e);
channel.sendResponse(new BytesRestResponse(ExceptionsHelper.status(e), e.getMessage()));
}
}));

Check warning on line 93 in src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestGetWorkflowStepAction.java#L89-L93

Added lines #L89 - L93 were not covered by tests

} catch (FlowFrameworkException ex) {
return channel -> channel.sendResponse(
new BytesRestResponse(ex.getRestStatus(), ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.transport;

import org.opensearch.action.ActionType;

import static org.opensearch.flowframework.common.CommonValue.TRANSPORT_ACTION_NAME_PREFIX;

/**
* External Action for public facing RestGetWorkflowStepAction
*/
public class GetWorkflowStepAction extends ActionType<GetWorkflowStepResponse> {

/** The name of this action */
public static final String NAME = TRANSPORT_ACTION_NAME_PREFIX + "workflow_step/get";
/** An instance of this action */
public static final GetWorkflowStepAction INSTANCE = new GetWorkflowStepAction();

/**
* Instantiates this class
*/
public GetWorkflowStepAction() {
super(NAME, GetWorkflowStepResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.transport;

import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.model.WorkflowValidator;

import java.io.IOException;

/**
* Transport Response from getting workflow step
*/
public class GetWorkflowStepResponse extends ActionResponse implements ToXContentObject {

private WorkflowValidator workflowValidator;

/**
* Instantiates a new GetWorkflowStepResponse from an input stream
* @param in the input stream to read from
* @throws IOException if the workflow json cannot be read from the input stream
*/
public GetWorkflowStepResponse(StreamInput in) throws IOException {
super(in);
this.workflowValidator = WorkflowValidator.parse(in.readString());
}

Check warning on line 35 in src/main/java/org/opensearch/flowframework/transport/GetWorkflowStepResponse.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/GetWorkflowStepResponse.java#L33-L35

Added lines #L33 - L35 were not covered by tests

/**
* Instantiates a new GetWorkflowStepResponse
* @param workflowValidator the workflow validator
*/
public GetWorkflowStepResponse(WorkflowValidator workflowValidator) {
this.workflowValidator = workflowValidator;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(workflowValidator.toJson());
}

Check warning on line 48 in src/main/java/org/opensearch/flowframework/transport/GetWorkflowStepResponse.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/GetWorkflowStepResponse.java#L47-L48

Added lines #L47 - L48 were not covered by tests

@Override
public XContentBuilder toXContent(XContentBuilder xContentBuilder, Params params) throws IOException {
return this.workflowValidator.toXContent(xContentBuilder, params);

Check warning on line 52 in src/main/java/org/opensearch/flowframework/transport/GetWorkflowStepResponse.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/GetWorkflowStepResponse.java#L52

Added line #L52 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.transport;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.model.WorkflowValidator;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

/**
* Transport action to retrieve a workflow step json
*/
public class GetWorkflowStepTransportAction extends HandledTransportAction<ActionRequest, GetWorkflowStepResponse> {

private final Logger logger = LogManager.getLogger(GetWorkflowStepTransportAction.class);

/**
* Instantiates a new GetWorkflowStepTransportAction instance
* @param transportService the transport service
* @param actionFilters action filters
*/
@Inject
public GetWorkflowStepTransportAction(TransportService transportService, ActionFilters actionFilters) {
super(GetWorkflowStepAction.NAME, transportService, actionFilters, WorkflowRequest::new);
}

@Override
protected void doExecute(Task task, ActionRequest request, ActionListener<GetWorkflowStepResponse> listener) {
try {
listener.onResponse(new GetWorkflowStepResponse(WorkflowValidator.parse("mappings/workflow-steps.json")));
} catch (Exception e) {
logger.error("Failed to retrieve workflow step json.", e);
listener.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));

Check warning on line 47 in src/main/java/org/opensearch/flowframework/transport/GetWorkflowStepTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/GetWorkflowStepTransportAction.java#L45-L47

Added lines #L45 - L47 were not covered by tests
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ public WorkflowStepFactory(
FlowFrameworkSettings flowFrameworkSettings
) {
stepMap.put(NoOpStep.NAME, NoOpStep::new);
stepMap.put(CreateIndexStep.NAME, () -> new CreateIndexStep(clusterService, client, flowFrameworkIndicesHandler));
stepMap.put(CreateIngestPipelineStep.NAME, () -> new CreateIngestPipelineStep(client, flowFrameworkIndicesHandler));
stepMap.put(
RegisterLocalCustomModelStep.NAME,
() -> new RegisterLocalCustomModelStep(threadPool, mlClient, flowFrameworkIndicesHandler, flowFrameworkSettings)
Expand Down
Loading
Loading