Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throw the correct error message in status API for WorkflowSteps #676

Merged
merged 3 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
- Reset workflow state to initial state after successful deprovision ([#635](https://github.com/opensearch-project/flow-framework/pull/635))
- Silently ignore content on APIs that don't require it ([#639](https://github.com/opensearch-project/flow-framework/pull/639))
- Hide user and credential field from search response ([#680](https://github.com/opensearch-project/flow-framework/pull/680))
- Throw the correct error message in status API for WorkflowSteps ([#676](https://github.com/opensearch-project/flow-framework/pull/676))

### Infrastructure
### Documentation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
*/
package org.opensearch.flowframework.exception;

import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchParseException;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
Expand Down Expand Up @@ -64,4 +67,19 @@ public RestStatus getRestStatus() {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().field("error", this.getMessage()).endObject();
}

/**
* Getter for safe exceptions
* @param ex exception
* @return exception if safe
*/
public static Exception getSafeException(Exception ex) {
if (ex instanceof IllegalArgumentException
|| ex instanceof OpenSearchStatusException
|| ex instanceof OpenSearchParseException
|| (ex instanceof OpenSearchException && ex.getCause() instanceof OpenSearchParseException)) {
return ex;
}
return null;
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ private void executeWorkflow(List<ProcessNode> workflowSequence, String workflow
status = ExceptionsHelper.status(ex);
}
logger.error("Provisioning failed for workflow {} during step {}.", workflowId, currentStepId, ex);
String errorMessage = (ex.getCause() == null ? ex.getClass().getName() : ex.getCause().getClass().getName())
String errorMessage = (ex.getCause() == null ? ex.getMessage() : ex.getCause().getClass().getName())
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
+ " during step "
+ currentStepId
+ ", restStatus: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.opensearch.flowframework.common.WorkflowResources.PIPELINE_ID;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to create either a search or ingest pipeline
Expand Down Expand Up @@ -137,8 +138,9 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) {
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed step " + pipelineToBeCreated;
public void onFailure(Exception ex) {
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed step " + pipelineToBeCreated : e.getMessage());
logger.error(errorMessage, e);
createPipelineFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import static org.opensearch.flowframework.common.CommonValue.VERSION_FIELD;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_GROUP_ID;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Abstract local model registration step
Expand Down Expand Up @@ -215,9 +216,10 @@ public PlainActionFuture<WorkflowData> execute(
}, exception -> { registerLocalModelFuture.onFailure(exception); })
);
}, exception -> {
String errorMessage = "Failed to register local model in step " + currentNodeId;
logger.error(errorMessage, exception);
registerLocalModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(exception)));
Exception e = getSafeException(exception);
String errorMessage = (e == null ? "Failed to register local model in step " + currentNodeId : e.getMessage());
logger.error(errorMessage, e);
registerLocalModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}));
} catch (IllegalArgumentException iae) {
registerLocalModelFuture.onFailure(new WorkflowStepException(iae.getMessage(), RestStatus.BAD_REQUEST));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static org.opensearch.flowframework.common.CommonValue.PROTOCOL_FIELD;
import static org.opensearch.flowframework.common.CommonValue.VERSION_FIELD;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;
import static org.opensearch.flowframework.util.ParseUtils.getStringToStringMap;

/**
Expand Down Expand Up @@ -121,8 +122,9 @@ public void onResponse(MLCreateConnectorResponse mlCreateConnectorResponse) {
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to create connector";
public void onFailure(Exception ex) {
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to create connector" : ex.getMessage());
logger.error(errorMessage, e);
createConnectorFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS;
import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to create an index
Expand Down Expand Up @@ -136,10 +137,11 @@ public PlainActionFuture<WorkflowData> execute(
logger.error(errorMessage, ex);
createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(ex)));
}
}, e -> {
String errorMessage = "Failed to create the index " + indexName;
}, ex -> {
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to create the index " + indexName : e.getMessage());
logger.error(errorMessage, e);
createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
createIndexFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}));
} catch (Exception e) {
createIndexFuture.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Set;

import static org.opensearch.flowframework.common.WorkflowResources.AGENT_ID;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to delete a agent for a remote model
Expand Down Expand Up @@ -82,8 +83,9 @@ public void onResponse(DeleteResponse deleteResponse) {
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to delete agent " + agentId;
public void onFailure(Exception ex) {
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to delete agent " + agentId : e.getMessage());
logger.error(errorMessage, e);
deleteAgentFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Set;

import static org.opensearch.flowframework.common.WorkflowResources.CONNECTOR_ID;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to delete a connector for a remote model
Expand Down Expand Up @@ -82,8 +83,9 @@ public void onResponse(DeleteResponse deleteResponse) {
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to delete connector " + connectorId;
public void onFailure(Exception ex) {
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to delete connector " + connectorId : e.getMessage());
logger.error(errorMessage, e);
deleteConnectorFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Set;

import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to delete a model for a remote model
Expand Down Expand Up @@ -83,8 +84,9 @@ public void onResponse(DeleteResponse deleteResponse) {
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to delete model " + modelId;
public void onFailure(Exception ex) {
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to delete model " + modelId : e.getMessage());
logger.error(errorMessage, e);
deleteModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to deploy a model
Expand Down Expand Up @@ -115,8 +116,9 @@ public void onResponse(MLDeployModelResponse mlDeployModelResponse) {
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to deploy model " + modelId;
public void onFailure(Exception ex) {
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to deploy model " + modelId : e.getMessage());
logger.error(errorMessage, e);
deployModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static org.opensearch.flowframework.common.CommonValue.TYPE;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;
import static org.opensearch.flowframework.util.ParseUtils.getStringToStringMap;

/**
Expand Down Expand Up @@ -133,8 +134,9 @@ public void onResponse(MLRegisterAgentResponse mlRegisterAgentResponse) {
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to register the agent";
public void onFailure(Exception ex) {
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to register the agent" : e.getMessage());
logger.error(errorMessage, e);
registerAgentModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static org.opensearch.flowframework.common.CommonValue.MODEL_GROUP_STATUS;
import static org.opensearch.flowframework.common.CommonValue.NAME_FIELD;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to register a model group
Expand Down Expand Up @@ -118,8 +119,9 @@ public void onResponse(MLRegisterModelGroupResponse mlRegisterModelGroupResponse
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to register model group";
public void onFailure(Exception ex) {
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to register model group" : e.getMessage());
logger.error(errorMessage, e);
registerModelGroupFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static org.opensearch.flowframework.common.WorkflowResources.CONNECTOR_ID;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_GROUP_ID;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to register a remote model
Expand Down Expand Up @@ -184,8 +185,9 @@ void completeRegisterFuture(UpdateResponse response, String resourceName, MLRegi
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to register remote model";
public void onFailure(Exception ex) {
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to register remote model" : e.getMessage());
logger.error(errorMessage, e);
registerRemoteModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import static org.opensearch.flowframework.common.CommonValue.SUCCESS;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException;

/**
* Step to undeploy model
Expand Down Expand Up @@ -96,8 +97,9 @@ public void onResponse(MLUndeployModelsResponse mlUndeployModelsResponse) {
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to undeploy model " + modelId;
public void onFailure(Exception ex) {
Exception e = getSafeException(ex);
String errorMessage = (e == null ? "Failed to undeploy model " + modelId : e.getMessage());
logger.error(errorMessage, e);
undeployModelFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,8 @@ public void testDefaultSemanticSearchUseCaseWithFailureExpected() throws Excepti
assertEquals(RestStatus.OK, TestHelpers.restStatus(response));
getAndAssertWorkflowStatus(client(), workflowId, State.FAILED, ProvisioningProgress.FAILED);
String error = getAndWorkflowStatusError(client(), workflowId);
assertTrue(error.contains("org.opensearch.flowframework.exception.WorkflowStepException during step create_ingest_pipeline"));
// Since knn plugin is not installed
assertTrue(error.contains("No processor type exists with name [text_embedding]"));
}

public void testAllDefaultUseCasesCreation() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/
package org.opensearch.flowframework.workflow;

import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.support.PlainActionFuture;
Expand All @@ -24,6 +25,7 @@
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.RemoteTransportException;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -139,4 +141,25 @@ public void testCreateIndexStepFailure() throws ExecutionException, InterruptedE
assertTrue(ex.getCause() instanceof Exception);
assertEquals("Failed to create the index demo", ex.getCause().getMessage());
}

public void testCreateIndexStepUnsafeFailure() throws ExecutionException, InterruptedException, IOException {
@SuppressWarnings({ "unchecked" })
ArgumentCaptor<ActionListener<CreateIndexResponse>> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class);
PlainActionFuture<WorkflowData> future = createIndexStep.execute(
inputData.getNodeId(),
inputData,
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap()
);
assertFalse(future.isDone());
verify(indicesAdminClient, times(1)).create(any(CreateIndexRequest.class), actionListenerCaptor.capture());

actionListenerCaptor.getValue().onFailure(new RemoteTransportException("test", new ResourceNotFoundException("test")));

assertTrue(future.isDone());
ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent());
assertTrue(ex.getCause() instanceof Exception);
assertEquals("Failed to create the index demo", ex.getCause().getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public void testRegisterLocalCustomModelFailure() {

doAnswer(invocation -> {
ActionListener<MLRegisterModelResponse> actionListener = invocation.getArgument(1);
actionListener.onFailure(new IllegalArgumentException("test"));
actionListener.onFailure(new IllegalArgumentException("Failed to register local model in step test-node-id"));
return null;
}).when(machineLearningNodeClient).register(any(MLRegisterModelInput.class), any());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public void testRegisterLocalPretrainedModelFailure() {

doAnswer(invocation -> {
ActionListener<MLRegisterModelResponse> actionListener = invocation.getArgument(1);
actionListener.onFailure(new IllegalArgumentException("test"));
actionListener.onFailure(new IllegalArgumentException("Failed to register local model in step test-node-id"));
return null;
}).when(machineLearningNodeClient).register(any(MLRegisterModelInput.class), any());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public void testRegisterLocalSparseEncodingModelFailure() {

doAnswer(invocation -> {
ActionListener<MLRegisterModelResponse> actionListener = invocation.getArgument(1);
actionListener.onFailure(new IllegalArgumentException("test"));
actionListener.onFailure(new IllegalArgumentException("Failed to register local model in step test-node-id"));
return null;
}).when(machineLearningNodeClient).register(any(MLRegisterModelInput.class), any());

Expand Down
Loading
Loading