diff --git a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java index be100fbce..4657f438f 100644 --- a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java @@ -13,7 +13,6 @@ import org.apache.logging.log4j.Logger; import org.opensearch.ExceptionsHelper; import org.opensearch.action.search.SearchRequest; -import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; import org.opensearch.client.Client; @@ -111,21 +110,23 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener onSearchGlobalContext(response, listener, request.getMaxWorkflows()), - exception -> listener.onFailure(exception) - ) - ); + onSearchGlobalContext(request.getRequestTimeout(), request.getMaxWorkflows(), ActionListener.wrap(max -> { + if (!max) { + String errorMessage = "Maximum workflows limit reached " + request.getMaxWorkflows(); + logger.error(errorMessage); + FlowFrameworkException ffe = new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST); + listener.onFailure(ffe); + return; + } + }, e -> { + logger.error("Failed to updated use case template {} : {}", request.getWorkflowId(), e.getMessage()); + if (e instanceof FlowFrameworkException) { + listener.onFailure(e); + } else { + listener.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e))); + } + })); // Create new global context and state index entries flowFrameworkIndicesHandler.putTemplateToGlobalContext(templateWithUser, ActionListener.wrap(globalContextResponse -> { @@ -190,17 +191,27 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener= maxWorkflow) { - String errorMessage = "Maximum workflows limit reached " + maxWorkflow; - logger.error(errorMessage); - listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST)); - } + protected void onSearchGlobalContext(TimeValue requestTimeOut, Integer maxWorkflow, ActionListener internalListener) { + QueryBuilder query = QueryBuilders.matchAllQuery(); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(query).size(0).timeout(requestTimeOut); + + SearchRequest searchRequest = new SearchRequest(CommonValue.GLOBAL_CONTEXT_INDEX).source(searchSourceBuilder); + + client.search(searchRequest, ActionListener.wrap(searchResponse -> { + logger.info("SEARCH RESPONSE SIZE {}", searchResponse.getHits().getTotalHits().value); + if (searchResponse.getHits().getTotalHits().value >= maxWorkflow) { + internalListener.onResponse(false); + } else { + internalListener.onResponse(true); + } + }, exception -> { + logger.error("Unable to fetch the workflows {}", exception); + internalListener.onFailure(new FlowFrameworkException("Unable to fetch the workflows", RestStatus.BAD_REQUEST)); + })); } private void validateWorkflows(Template template) throws Exception {