diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java index d0279d9aebd38..912f7032fe3b0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java @@ -17,6 +17,8 @@ public class TransformMessages { "Interrupted while waiting for transform [{0}] to stop"; public static final String REST_PUT_TRANSFORM_EXISTS = "Transform with id [{0}] already exists"; public static final String REST_UNKNOWN_TRANSFORM = "Transform with id [{0}] could not be found"; + public static final String REST_STOP_TRANSFORM_WITHOUT_CONFIG = + "Detected transforms with no config [{0}]. Use force to stop/delete them."; public static final String REST_PUT_TRANSFORM_FAILED_TO_VALIDATE_CONFIGURATION = "Failed to validate configuration"; public static final String REST_PUT_FAILED_PERSIST_TRANSFORM_CONFIGURATION = "Failed to persist transform configuration"; diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java index 16a559d5b5c68..0b7aaf42b7c2d 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java @@ -918,12 +918,4 @@ public void testContinuousStopWaitForCheckpoint() throws Exception { assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_26", 3.918918918); deleteIndex(indexName); } - - private void assertOnePivotValue(String query, double expected) throws IOException { - Map searchResult = getAsMap(query); - - assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult)); - double actual = (Double) ((List) XContentMapValues.extractValue("hits.hits._source.avg_rating", searchResult)).get(0); - assertEquals(expected, actual, 0.000001); - } } diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java index bf10e3976e983..d9c61dddeab47 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java @@ -73,7 +73,7 @@ protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOE } protected void createReviewsIndex(String indexName, int numDocs) throws IOException { - int[] distributionTable = {5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 4, 4, 4, 3, 3, 2, 1, 1, 1}; + int[] distributionTable = { 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 4, 4, 4, 3, 3, 2, 1, 1, 1 }; // create mapping try (XContentBuilder builder = jsonBuilder()) { @@ -158,6 +158,7 @@ protected void createReviewsIndex(String indexName, int numDocs) throws IOExcept bulkRequest.setJsonEntity(bulk.toString()); client().performRequest(bulkRequest); } + /** * Create a simple dataset for testing with reviewers, ratings and businesses */ @@ -182,9 +183,8 @@ protected void createContinuousPivotReviewsTransform(String transformId, String final Request createDataframeTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, authHeader); - String config = "{ \"dest\": {\"index\":\"" + dataFrameIndex + "\"}," - + " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"}," - //Set frequency high for testing + String config = "{ \"dest\": {\"index\":\"" + dataFrameIndex + "\"}," + " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"}," + // Set frequency high for testing + " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"15m\"}}," + " \"frequency\": \"1s\"," + " \"pivot\": {" @@ -206,7 +206,6 @@ protected void createContinuousPivotReviewsTransform(String transformId, String assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); } - protected void createPivotReviewsTransform(String transformId, String dataFrameIndex, String query, String pipeline, String authHeader) throws IOException { final Request createDataframeTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, authHeader); @@ -226,18 +225,18 @@ protected void createPivotReviewsTransform(String transformId, String dataFrameI } config += " \"pivot\": {" - + " \"group_by\": {" - + " \"reviewer\": {" - + " \"terms\": {" - + " \"field\": \"user_id\"" - + " } } }," - + " \"aggregations\": {" - + " \"avg_rating\": {" - + " \"avg\": {" - + " \"field\": \"stars\"" - + " } } } }," - + "\"frequency\":\"1s\"" - + "}"; + + " \"group_by\": {" + + " \"reviewer\": {" + + " \"terms\": {" + + " \"field\": \"user_id\"" + + " } } }," + + " \"aggregations\": {" + + " \"avg_rating\": {" + + " \"avg\": {" + + " \"field\": \"stars\"" + + " } } } }," + + "\"frequency\":\"1s\"" + + "}"; createDataframeTransformRequest.setJsonEntity(config); @@ -245,11 +244,11 @@ protected void createPivotReviewsTransform(String transformId, String dataFrameI assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); } - protected void startDataframeTransform(String transformId) throws IOException { - startDataframeTransform(transformId, null); + protected void startTransform(String transformId) throws IOException { + startTransform(transformId, null); } - protected void startDataframeTransform(String transformId, String authHeader, String... warnings) throws IOException { + protected void startTransform(String transformId, String authHeader, String... warnings) throws IOException { // start the transform final Request startTransformRequest = createRequestWithAuth("POST", getTransformEndpoint() + transformId + "/_start", authHeader); if (warnings.length > 0) { @@ -280,10 +279,10 @@ protected void startAndWaitForTransform(String transformId, String dataFrameInde startAndWaitForTransform(transformId, dataFrameIndex, authHeader, new String[0]); } - protected void startAndWaitForTransform(String transformId, String dataFrameIndex, - String authHeader, String... warnings) throws Exception { + protected void startAndWaitForTransform(String transformId, String dataFrameIndex, String authHeader, String... warnings) + throws Exception { // start the transform - startDataframeTransform(transformId, authHeader, warnings); + startTransform(transformId, authHeader, warnings); assertTrue(indexExists(dataFrameIndex)); // wait until the dataframe has been created and all data is available waitForDataFrameCheckpoint(transformId); @@ -292,18 +291,14 @@ protected void startAndWaitForTransform(String transformId, String dataFrameInde refreshIndex(dataFrameIndex); } - protected void startAndWaitForContinuousTransform(String transformId, - String dataFrameIndex, - String authHeader) throws Exception { + protected void startAndWaitForContinuousTransform(String transformId, String dataFrameIndex, String authHeader) throws Exception { startAndWaitForContinuousTransform(transformId, dataFrameIndex, authHeader, 1L); } - protected void startAndWaitForContinuousTransform(String transformId, - String dataFrameIndex, - String authHeader, - long checkpoint) throws Exception { + protected void startAndWaitForContinuousTransform(String transformId, String dataFrameIndex, String authHeader, long checkpoint) + throws Exception { // start the transform - startDataframeTransform(transformId, authHeader, new String[0]); + startTransform(transformId, authHeader, new String[0]); assertTrue(indexExists(dataFrameIndex)); // wait until the dataframe has been created and all data is available waitForTransformCheckpoint(transformId, checkpoint); @@ -323,9 +318,7 @@ protected Request createRequestWithAuth(final String method, final String endpoi } void waitForDataFrameStopped(String transformId) throws Exception { - assertBusy(() -> { - assertEquals("stopped", getDataFrameTransformState(transformId)); - }, 15, TimeUnit.SECONDS); + assertBusy(() -> { assertEquals("stopped", getTransformState(transformId)); }, 15, TimeUnit.SECONDS); } void waitForDataFrameCheckpoint(String transformId) throws Exception { @@ -341,7 +334,7 @@ void refreshIndex(String index) throws IOException { } @SuppressWarnings("unchecked") - private static List> getDataFrameTransforms() throws IOException { + protected static List> getTransforms() throws IOException { Response response = adminClient().performRequest(new Request("GET", getTransformEndpoint() + "_all")); Map transforms = entityAsMap(response); List> transformConfigs = (List>) XContentMapValues.extractValue("transforms", transforms); @@ -349,12 +342,12 @@ private static List> getDataFrameTransforms() throws IOExcep return transformConfigs == null ? Collections.emptyList() : transformConfigs; } - protected static String getDataFrameTransformState(String transformId) throws IOException { - Map transformStatsAsMap = getDataFrameState(transformId); + protected static String getTransformState(String transformId) throws IOException { + Map transformStatsAsMap = getTransformStateAndStats(transformId); return transformStatsAsMap == null ? null : (String) XContentMapValues.extractValue("state", transformStatsAsMap); } - protected static Map getDataFrameState(String transformId) throws IOException { + protected static Map getTransformStateAndStats(String transformId) throws IOException { Response statsResponse = client().performRequest(new Request("GET", getTransformEndpoint() + transformId + "/_stats")); List transforms = ((List) entityAsMap(statsResponse).get("transforms")); if (transforms.isEmpty()) { @@ -383,7 +376,7 @@ public static void removeIndices() throws Exception { } public void wipeTransforms() throws IOException { - List> transformConfigs = getDataFrameTransforms(); + List> transformConfigs = getTransforms(); for (Map transformConfig : transformConfigs) { String transformId = (String) transformConfig.get("id"); Request request = new Request("POST", getTransformEndpoint() + transformId + "/_stop"); @@ -395,7 +388,7 @@ public void wipeTransforms() throws IOException { for (Map transformConfig : transformConfigs) { String transformId = (String) transformConfig.get("id"); - String state = getDataFrameTransformState(transformId); + String state = getTransformState(transformId); assertEquals("Transform [" + transformId + "] is not in the stopped state", "stopped", state); } @@ -405,7 +398,7 @@ public void wipeTransforms() throws IOException { } // transforms should be all gone - transformConfigs = getDataFrameTransforms(); + transformConfigs = getTransforms(); assertTrue(transformConfigs.isEmpty()); // the configuration index should be empty @@ -437,11 +430,15 @@ static int getDataFrameCheckpoint(String transformId) throws IOException { protected void setupDataAccessRole(String role, String... indices) throws IOException { String indicesStr = Arrays.stream(indices).collect(Collectors.joining("\",\"", "\"", "\"")); Request request = new Request("PUT", "/_security/role/" + role); - request.setJsonEntity("{" - + " \"indices\" : [" - + " { \"names\": [" + indicesStr + "], \"privileges\": [\"create_index\", \"read\", \"write\", \"view_index_metadata\"] }" - + " ]" - + "}"); + request.setJsonEntity( + "{" + + " \"indices\" : [" + + " { \"names\": [" + + indicesStr + + "], \"privileges\": [\"create_index\", \"read\", \"write\", \"view_index_metadata\"] }" + + " ]" + + "}" + ); client().performRequest(request); } @@ -450,13 +447,18 @@ protected void setupUser(String user, List roles) throws IOException { String rolesStr = roles.stream().collect(Collectors.joining("\",\"", "\"", "\"")); Request request = new Request("PUT", "/_security/user/" + user); - request.setJsonEntity("{" - + " \"password\" : \"" + password + "\"," - + " \"roles\" : [ " + rolesStr + " ]" - + "}"); + request.setJsonEntity("{" + " \"password\" : \"" + password + "\"," + " \"roles\" : [ " + rolesStr + " ]" + "}"); client().performRequest(request); } + protected void assertOnePivotValue(String query, double expected) throws IOException { + Map searchResult = getAsMap(query); + + assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult)); + double actual = (Double) ((List) XContentMapValues.extractValue("hits.hits._source.avg_rating", searchResult)).get(0); + assertEquals(expected, actual, 0.000001); + } + protected static String getTransformEndpoint() { return useDeprecatedEndpoints ? TransformField.REST_BASE_PATH_TRANSFORMS_DEPRECATED : TransformField.REST_BASE_PATH_TRANSFORMS; } diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformRobustnessIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformRobustnessIT.java new file mode 100644 index 0000000000000..0a9b5ca68ac16 --- /dev/null +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformRobustnessIT.java @@ -0,0 +1,115 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.transform.integration; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.xpack.core.transform.TransformField; +import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; + +import java.io.IOException; +import java.util.Map; +import java.util.Map.Entry; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class TransformRobustnessIT extends TransformRestTestCase { + + public void testTaskRemovalAfterInternalIndexGotDeleted() throws Exception { + String indexName = "continuous_reviews"; + createReviewsIndex(indexName); + String transformId = "simple_continuous_pivot"; + String transformIndex = "pivot_reviews_continuous"; + final Request createTransformRequest = new Request("PUT", TransformField.REST_BASE_PATH_TRANSFORMS + transformId); + String config = "{" + + " \"source\": {\"index\":\"" + + indexName + + "\"}," + + " \"dest\": {\"index\":\"" + + transformIndex + + "\"}," + + " \"frequency\": \"1s\"," + + " \"sync\": {\"time\": {\"field\": \"timestamp\", \"delay\": \"1s\"}}," + + " \"pivot\": {" + + " \"group_by\": {" + + " \"reviewer\": {" + + " \"terms\": {" + + " \"field\": \"user_id\"" + + " } } }," + + " \"aggregations\": {" + + " \"avg_rating\": {" + + " \"avg\": {" + + " \"field\": \"stars\"" + + " } } } }" + + "}"; + createTransformRequest.setJsonEntity(config); + Map createTransformResponse = entityAsMap(client().performRequest(createTransformRequest)); + assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + assertEquals(1, getTransforms().size()); + // there shouldn't be a task yet + assertEquals(0, getNumberOfTransformTasks()); + startAndWaitForContinuousTransform(transformId, transformIndex, null); + assertTrue(indexExists(transformIndex)); + + // a task exists + assertEquals(1, getNumberOfTransformTasks()); + // get and check some users + assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_0", 3.776978417); + assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_5", 3.72); + assertNotNull(getTransformState(transformId)); + + assertEquals(1, getTransforms().size()); + + // delete the transform index + beEvilAndDeleteTheTransformIndex(); + // transform is gone + assertEquals(0, getTransforms().size()); + // but the task is still there + assertEquals(1, getNumberOfTransformTasks()); + + Request stopTransformRequest = new Request("POST", TransformField.REST_BASE_PATH_TRANSFORMS + transformId + "/_stop"); + ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(stopTransformRequest)); + + assertEquals(409, e.getResponse().getStatusLine().getStatusCode()); + assertThat( + e.getMessage(), + containsString("Detected transforms with no config [" + transformId + "]. Use force to stop/delete them.") + ); + stopTransformRequest.addParameter(TransformField.FORCE.getPreferredName(), Boolean.toString(true)); + Map stopTransformResponse = entityAsMap(client().performRequest(stopTransformRequest)); + assertThat(stopTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + + // the task is gone + assertEquals(1, getNumberOfTransformTasks()); + } + + @SuppressWarnings("unchecked") + private int getNumberOfTransformTasks() throws IOException { + final Request tasksRequest = new Request("GET", "/_tasks"); + tasksRequest.addParameter("actions", TransformField.TASK_NAME + "*"); + Map tasksResponse = entityAsMap(client().performRequest(tasksRequest)); + + Map nodes = (Map) tasksResponse.get("nodes"); + if (nodes == null) { + return 0; + } + + int foundTasks = 0; + for (Entry node : nodes.entrySet()) { + Map nodeInfo = (Map) node.getValue(); + Map tasks = (Map) nodeInfo.get("tasks"); + foundTasks += tasks != null ? tasks.size() : 0; + } + + return foundTasks; + } + + private void beEvilAndDeleteTheTransformIndex() throws IOException { + adminClient().performRequest(new Request("DELETE", TransformInternalIndexConstants.LATEST_INDEX_NAME)); + } +} diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java index 033cf57ba84fa..809064df4c039 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java @@ -40,16 +40,15 @@ public void setClusterSettings() throws IOException { // Set logging level to trace // see: https://github.com/elastic/elasticsearch/issues/45562 Request addFailureRetrySetting = new Request("PUT", "/_cluster/settings"); - addFailureRetrySetting - .setJsonEntity( - "{\"transient\": {\"xpack.transform.num_transform_failure_retries\": \"" - + 0 - + "\"," - + "\"logger.org.elasticsearch.action.bulk\": \"info\"," - + // reduces bulk failure spam - "\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\"," - + "\"logger.org.elasticsearch.xpack.transform\": \"trace\"}}" - ); + addFailureRetrySetting.setJsonEntity( + "{\"transient\": {\"xpack.transform.num_transform_failure_retries\": \"" + + 0 + + "\"," + + "\"logger.org.elasticsearch.action.bulk\": \"info\"," + + // reduces bulk failure spam + "\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\"," + + "\"logger.org.elasticsearch.xpack.transform\": \"trace\"}}" + ); client().performRequest(addFailureRetrySetting); } @@ -70,9 +69,9 @@ public void testForceStopFailedTransform() throws Exception { createDestinationIndexWithBadMapping(transformIndex); createContinuousPivotReviewsTransform(transformId, transformIndex, null); failureTransforms.add(transformId); - startDataframeTransform(transformId); + startTransform(transformId); awaitState(transformId, TransformStats.State.FAILED); - Map fullState = getDataFrameState(transformId); + Map fullState = getTransformStateAndStats(transformId); final String failureReason = "task encountered more than 0 failures; latest failure: " + ".*BulkIndexingException: Bulk index experienced failures. See the logs of the node running the transform for details."; // Verify we have failed for the expected reason @@ -94,7 +93,7 @@ public void testForceStopFailedTransform() throws Exception { stopTransform(transformId, true); awaitState(transformId, TransformStats.State.STOPPED); - fullState = getDataFrameState(transformId); + fullState = getTransformStateAndStats(transformId); assertThat(XContentMapValues.extractValue("reason", fullState), is(nullValue())); } @@ -105,9 +104,9 @@ public void testStartFailedTransform() throws Exception { createDestinationIndexWithBadMapping(dataFrameIndex); createContinuousPivotReviewsTransform(transformId, dataFrameIndex, null); failureTransforms.add(transformId); - startDataframeTransform(transformId); + startTransform(transformId); awaitState(transformId, TransformStats.State.FAILED); - Map fullState = getDataFrameState(transformId); + Map fullState = getTransformStateAndStats(transformId); final String failureReason = "task encountered more than 0 failures; latest failure: " + ".*BulkIndexingException: Bulk index experienced failures. See the logs of the node running the transform for details."; // Verify we have failed for the expected reason @@ -119,7 +118,7 @@ public void testStartFailedTransform() throws Exception { + "\\]. Use force stop and then restart the transform once error is resolved."; // Verify that we cannot start the transform when the task is in a failed state assertBusy(() -> { - ResponseException ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId)); + ResponseException ex = expectThrows(ResponseException.class, () -> startTransform(transformId)); assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus())); assertThat( (String) XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())), @@ -132,7 +131,7 @@ public void testStartFailedTransform() throws Exception { private void awaitState(String transformId, TransformStats.State state) throws Exception { assertBusy(() -> { - String currentState = getDataFrameTransformState(transformId); + String currentState = getTransformState(transformId); assertThat(currentState, equalTo(state.value())); }, 180, TimeUnit.SECONDS); // It should not take this long, but if the scheduler gets deferred, it could } @@ -142,8 +141,7 @@ private void createDestinationIndexWithBadMapping(String indexName) throws IOExc try (XContentBuilder builder = jsonBuilder()) { builder.startObject(); { - builder - .startObject("mappings") + builder.startObject("mappings") .startObject("properties") .startObject("reviewer") .field("type", "long") diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java index bd3ddc3e8ceab..4cea47c399238 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.FailedNodeException; @@ -19,22 +20,28 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.LoggerMessageFormat; +import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.action.util.PageParams; +import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.TransformMessages; import org.elasticsearch.xpack.core.transform.action.StopTransformAction; import org.elasticsearch.xpack.core.transform.action.StopTransformAction.Request; import org.elasticsearch.xpack.core.transform.action.StopTransformAction.Response; import org.elasticsearch.xpack.core.transform.transforms.TransformState; +import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; import org.elasticsearch.xpack.transform.TransformServices; @@ -47,6 +54,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -127,6 +135,25 @@ static void validateTaskState(ClusterState state, List transformIds, boo } } + static Tuple, Set> findTasksWithoutConfig(ClusterState state, String transformId) { + PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); + + Set taskIds = new HashSet<>(); + Set executorNodes = new HashSet<>(); + + Predicate> taskMatcher = Strings.isAllOrWildcard(new String[] { transformId }) ? t -> true : t -> { + TransformTaskParams transformParams = (TransformTaskParams) t.getParams(); + return Regex.simpleMatch(transformId, transformParams.getId()); + }; + + for (PersistentTasksCustomMetaData.PersistentTask pTask : tasks.findTasks(TransformField.TASK_NAME, taskMatcher)) { + executorNodes.add(pTask.getExecutorNode()); + taskIds.add(pTask.getId()); + } + + return new Tuple<>(taskIds, executorNodes); + } + @Override protected void doExecute(Task task, Request request, ActionListener listener) { final ClusterState state = clusterService.state(); @@ -160,7 +187,31 @@ protected void doExecute(Task task, Request request, ActionListener li request.setExpandedIds(new HashSet<>(hitsAndIds.v2())); request.setNodes(TransformNodes.transformTaskNodes(hitsAndIds.v2(), state)); super.doExecute(task, request, finalListener); - }, listener::onFailure) + }, e -> { + if (e instanceof ResourceNotFoundException) { + Tuple, Set> runningTasksAndNodes = findTasksWithoutConfig(state, request.getId()); + if (runningTasksAndNodes.v1().isEmpty()) { + listener.onFailure(e); + // found transforms without a config + } else if (request.isForce()) { + request.setExpandedIds(runningTasksAndNodes.v1()); + request.setNodes(runningTasksAndNodes.v2().toArray(new String[0])); + super.doExecute(task, request, finalListener); + } else { + listener.onFailure( + new ElasticsearchStatusException( + TransformMessages.getMessage( + TransformMessages.REST_STOP_TRANSFORM_WITHOUT_CONFIG, + Strings.arrayToCommaDelimitedString(runningTasksAndNodes.v1().toArray(new String[0])) + ), + RestStatus.CONFLICT + ) + ); + } + } else { + listener.onFailure(e); + } + }) ); } }