Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Update workflow state without using painless script #897

Merged
merged 1 commit into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

### Maintenance
### Refactoring
- Update workflow state without using painless script ([#894](https://github.com/opensearch-project/flow-framework/pull/894))
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.exception.FlowFrameworkException;
Expand All @@ -45,12 +46,13 @@
import org.opensearch.flowframework.util.EncryptorUtils;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;
import org.opensearch.index.engine.VersionConflictEngineException;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -80,6 +82,8 @@
private static final Map<String, AtomicBoolean> indexMappingUpdated = new HashMap<>();
private static final Map<String, Object> indexSettings = Map.of("index.auto_expand_replicas", "0-1");
private final NamedXContentRegistry xContentRegistry;
// Retries in case of simultaneous updates
private static final int RETRIES = 5;

/**
* constructor
Expand Down Expand Up @@ -576,14 +580,14 @@
}

/**
* Updates a document in the workflow state index
* Updates a complete document in the workflow state index
* @param documentId the document ID
* @param updatedFields the fields to update the global state index with
* @param updatedDocument a complete document to update the global state index with
* @param listener action listener
*/
public void updateFlowFrameworkSystemIndexDoc(
String documentId,
Map<String, Object> updatedFields,
ToXContentObject updatedDocument,
ActionListener<UpdateResponse> listener
) {
if (!doesIndexExist(WORKFLOW_STATE_INDEX)) {
Expand All @@ -593,11 +597,11 @@
} else {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
UpdateRequest updateRequest = new UpdateRequest(WORKFLOW_STATE_INDEX, documentId);
Map<String, Object> updatedContent = new HashMap<>(updatedFields);
updateRequest.doc(updatedContent);
XContentBuilder builder = XContentFactory.jsonBuilder();
updatedDocument.toXContent(builder, null);
updateRequest.doc(builder);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.retryOnConflict(5);
// TODO: decide what condition can be considered as an update conflict and add retry strategy
updateRequest.retryOnConflict(RETRIES);
client.update(updateRequest, ActionListener.runBefore(listener, context::restore));
} catch (Exception e) {
String errorMessage = "Failed to update " + WORKFLOW_STATE_INDEX + " entry : " + documentId;
Expand All @@ -608,99 +612,60 @@
}

/**
* Deletes a document in the workflow state index
* Updates a partial document in the workflow state index
* @param documentId the document ID
* @param updatedFields the fields to update the global state index with
* @param listener action listener
*/
public void deleteFlowFrameworkSystemIndexDoc(String documentId, ActionListener<DeleteResponse> listener) {
public void updateFlowFrameworkSystemIndexDoc(
String documentId,
Map<String, Object> updatedFields,
ActionListener<UpdateResponse> listener
) {
if (!doesIndexExist(WORKFLOW_STATE_INDEX)) {
String errorMessage = "Failed to delete document " + documentId + " due to missing " + WORKFLOW_STATE_INDEX + " index";
String errorMessage = "Failed to update document " + documentId + " due to missing " + WORKFLOW_STATE_INDEX + " index";
logger.error(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
} else {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
DeleteRequest deleteRequest = new DeleteRequest(WORKFLOW_STATE_INDEX, documentId);
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.delete(deleteRequest, ActionListener.runBefore(listener, context::restore));
UpdateRequest updateRequest = new UpdateRequest(WORKFLOW_STATE_INDEX, documentId);
Map<String, Object> updatedContent = new HashMap<>(updatedFields);
updateRequest.doc(updatedContent);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.retryOnConflict(RETRIES);
// TODO: decide what condition can be considered as an update conflict and add retry strategy
client.update(updateRequest, ActionListener.runBefore(listener, context::restore));
} catch (Exception e) {
String errorMessage = "Failed to delete " + WORKFLOW_STATE_INDEX + " entry : " + documentId;
String errorMessage = "Failed to update " + WORKFLOW_STATE_INDEX + " entry : " + documentId;

Check warning on line 639 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L639

Added line #L639 was not covered by tests
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
}
}

/**
* Updates a document in the workflow state index
* @param indexName the index that we will be updating a document of.
* Deletes a document in the workflow state index
* @param documentId the document ID
* @param script the given script to update doc
* @param listener action listener
*/
public void updateFlowFrameworkSystemIndexDocWithScript(
String indexName,
String documentId,
Script script,
ActionListener<UpdateResponse> listener
) {
if (!doesIndexExist(indexName)) {
String errorMessage = "Failed to update document for given workflow due to missing " + indexName + " index";
public void deleteFlowFrameworkSystemIndexDoc(String documentId, ActionListener<DeleteResponse> listener) {
if (!doesIndexExist(WORKFLOW_STATE_INDEX)) {
String errorMessage = "Failed to delete document " + documentId + " due to missing " + WORKFLOW_STATE_INDEX + " index";
logger.error(errorMessage);
listener.onFailure(new Exception(errorMessage));
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
} else {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
UpdateRequest updateRequest = new UpdateRequest(indexName, documentId);
// TODO: Also add ability to change other fields at the same time when adding detailed provision progress
updateRequest.script(script);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.retryOnConflict(3);
// TODO: Implement our own concurrency control to improve on retry mechanism
client.update(updateRequest, ActionListener.runBefore(listener, context::restore));
DeleteRequest deleteRequest = new DeleteRequest(WORKFLOW_STATE_INDEX, documentId);
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.delete(deleteRequest, ActionListener.runBefore(listener, context::restore));
} catch (Exception e) {
String errorMessage = "Failed to update " + indexName + " entry : " + documentId;
String errorMessage = "Failed to delete " + WORKFLOW_STATE_INDEX + " entry : " + documentId;

Check warning on line 662 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L662

Added line #L662 was not covered by tests
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
}
}

/**
* Creates a new ResourceCreated object and a script to update the state index
* @param workflowId workflowId for the relevant step
* @param nodeId current process node (workflow step) id
* @param workflowStepName the workflowstep name that created the resource
* @param resourceId the id of the newly created resource
* @param listener the ActionListener for this step to handle completing the future after update
* @throws IOException if parsing fails on new resource
*/
private void updateResourceInStateIndex(
String workflowId,
String nodeId,
String workflowStepName,
String resourceId,
ActionListener<UpdateResponse> listener
) throws IOException {
ResourceCreated newResource = new ResourceCreated(
workflowStepName,
nodeId,
getResourceByWorkflowStep(workflowStepName),
resourceId
);

// The script to append a new object to the resources_created array
Script script = new Script(
ScriptType.INLINE,
"painless",
"ctx._source.resources_created.add(params.newResource);",
Collections.singletonMap("newResource", newResource.resourceMap())
);

updateFlowFrameworkSystemIndexDocWithScript(WORKFLOW_STATE_INDEX, workflowId, script, ActionListener.wrap(updateResponse -> {
logger.info("updated resources created of {}", workflowId);
listener.onResponse(updateResponse);
}, listener::onFailure));
}

/**
* Adds a resource to the state index, including common exception handling
* @param currentNodeInputs Inputs to the current node
Expand All @@ -716,26 +681,93 @@
String resourceId,
ActionListener<WorkflowData> listener
) {
String workflowId = currentNodeInputs.getWorkflowId();
String resourceName = getResourceByWorkflowStep(workflowStepName);
try {
updateResourceInStateIndex(
currentNodeInputs.getWorkflowId(),
nodeId,
workflowStepName,
resourceId,
ActionListener.wrap(updateResponse -> {
logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex());
listener.onResponse(new WorkflowData(Map.of(resourceName, resourceId), currentNodeInputs.getWorkflowId(), nodeId));
}, exception -> {
String errorMessage = "Failed to update new created " + nodeId + " resource " + workflowStepName + " id " + resourceId;
logger.error(errorMessage, exception);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
})
ResourceCreated newResource = new ResourceCreated(workflowStepName, nodeId, resourceName, resourceId);
if (!doesIndexExist(WORKFLOW_STATE_INDEX)) {
String errorMessage = "Failed to update state for " + workflowId + " due to missing " + WORKFLOW_STATE_INDEX + " index";
logger.error(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.NOT_FOUND));
} else {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
getAndUpdateResourceInStateDocumentWithRetries(
workflowId,
newResource,
RETRIES,
ActionListener.runBefore(listener, context::restore)
);
}
}
}

/**
* Performs a get and update of a State Index document adding a new resource with strong consistency and retries
* @param workflowId The document id to update
* @param newResource The resource to add to the resources created list
* @param retries The number of retries on update version conflicts
* @param listener The listener to complete on success or failure
*/
private void getAndUpdateResourceInStateDocumentWithRetries(
String workflowId,
ResourceCreated newResource,
int retries,
ActionListener<WorkflowData> listener
) {
GetRequest getRequest = new GetRequest(WORKFLOW_STATE_INDEX, workflowId);
client.get(getRequest, ActionListener.wrap(getResponse -> {
if (!getResponse.isExists()) {
listener.onFailure(new FlowFrameworkException("Workflow state not found for " + workflowId, RestStatus.NOT_FOUND));
return;
}
WorkflowState currentState = WorkflowState.parse(getResponse.getSourceAsString());
List<ResourceCreated> resourcesCreated = new ArrayList<>(currentState.resourcesCreated());
resourcesCreated.add(newResource);
XContentBuilder builder = XContentFactory.jsonBuilder();
WorkflowState newState = WorkflowState.builder(currentState).resourcesCreated(resourcesCreated).build();
newState.toXContent(builder, null);
UpdateRequest updateRequest = new UpdateRequest(WORKFLOW_STATE_INDEX, workflowId).doc(builder)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.setIfSeqNo(getResponse.getSeqNo())
.setIfPrimaryTerm(getResponse.getPrimaryTerm());
client.update(
updateRequest,
ActionListener.wrap(
r -> handleStateUpdateSuccess(workflowId, newResource, listener),
e -> handleStateUpdateException(workflowId, newResource, retries, listener, e)
)
);
} catch (Exception e) {
String errorMessage = "Failed to parse and update new created resource";
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}, ex -> handleStateUpdateException(workflowId, newResource, 0, listener, ex)));
}

private void handleStateUpdateSuccess(String workflowId, ResourceCreated newResource, ActionListener<WorkflowData> listener) {
String resourceName = newResource.resourceType();
String resourceId = newResource.resourceId();
String nodeId = newResource.workflowStepId();
logger.info("Updated resources created for {} on step {} with {} {}", workflowId, nodeId, resourceName, resourceId);
listener.onResponse(new WorkflowData(Map.of(resourceName, resourceId), workflowId, nodeId));
}

private void handleStateUpdateException(
String workflowId,
ResourceCreated newResource,
int retries,
ActionListener<WorkflowData> listener,
Exception e
) {
if (e instanceof VersionConflictEngineException && retries > 0) {
// Retry if we haven't exhausted retries
getAndUpdateResourceInStateDocumentWithRetries(workflowId, newResource, retries - 1, listener);
return;
}
String errorMessage = "Failed to update workflow state for "
+ workflowId
+ " on step "
+ newResource.workflowStepId()
+ " with "
+ newResource.resourceType()
+ " "
+ newResource.resourceId();
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,12 @@
import org.opensearch.flowframework.model.State;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.model.WorkflowState;
import org.opensearch.flowframework.util.EncryptorUtils;
import org.opensearch.flowframework.workflow.ProcessNode;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
import org.opensearch.plugins.PluginsService;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
Expand All @@ -58,7 +57,6 @@
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.RESOURCES_CREATED_FIELD;
import static org.opensearch.flowframework.common.CommonValue.STATE_FIELD;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES;
import static org.opensearch.flowframework.util.ParseUtils.getUserContext;
import static org.opensearch.flowframework.util.ParseUtils.resolveUserAndExecute;
Expand Down Expand Up @@ -210,24 +208,14 @@

// Remove error field if any prior to subsequent execution
if (response.getWorkflowState().getError() != null) {
Script script = new Script(
ScriptType.INLINE,
"painless",
"if(ctx._source.containsKey('error')){ctx._source.remove('error')}",
Collections.emptyMap()
);
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDocWithScript(
WORKFLOW_STATE_INDEX,
workflowId,
script,
ActionListener.wrap(updateResponse -> {

}, exception -> {
String errorMessage = "Failed to update workflow state: " + workflowId;
logger.error(errorMessage, exception);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
})
);
WorkflowState newState = WorkflowState.builder(response.getWorkflowState()).error(null).build();
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(workflowId, newState, ActionListener.wrap(updateResponse -> {

Check warning on line 212 in src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java#L211-L212

Added lines #L211 - L212 were not covered by tests

}, exception -> {
String errorMessage = "Failed to update workflow state: " + workflowId;
logger.error(errorMessage, exception);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
}));

Check warning on line 218 in src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ReprovisionWorkflowTransportAction.java#L214-L218

Added lines #L214 - L218 were not covered by tests
}

// Update State Index, maintain resources created for subsequent execution
Expand Down
Loading
Loading