diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java index f01db621bc2e0..1bd49154ee548 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java @@ -141,7 +141,8 @@ private void indexData(String indexName) throws IOException { @After public void cleanUpTransforms() throws IOException { for (String transformId : transformsToClean) { - highLevelClient().dataFrame().stopDataFrameTransform(new StopDataFrameTransformRequest(transformId), RequestOptions.DEFAULT); + highLevelClient().dataFrame().stopDataFrameTransform( + new StopDataFrameTransformRequest(transformId, Boolean.TRUE, null), RequestOptions.DEFAULT); } for (String transformId : transformsToClean) { @@ -265,7 +266,7 @@ public void testStartStop() throws IOException { assertThat(statsResponse.getTransformsStateAndStats(), hasSize(1)); assertEquals(IndexerState.STARTED, statsResponse.getTransformsStateAndStats().get(0).getTransformState().getIndexerState()); - StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id); + StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, Boolean.TRUE, null); StopDataFrameTransformResponse stopResponse = execute(stopRequest, client::stopDataFrameTransform, client::stopDataFrameTransformAsync); assertTrue(stopResponse.isStopped()); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java index 3c5059279b44d..07713d5371460 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java @@ -76,7 +76,8 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest @After public void cleanUpTransforms() throws IOException { for (String transformId : transformsToClean) { - highLevelClient().dataFrame().stopDataFrameTransform(new StopDataFrameTransformRequest(transformId), RequestOptions.DEFAULT); + highLevelClient().dataFrame().stopDataFrameTransform( + new StopDataFrameTransformRequest(transformId, Boolean.TRUE, TimeValue.timeValueSeconds(20)), RequestOptions.DEFAULT); } for (String transformId : transformsToClean) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/DeleteDataFrameTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/DeleteDataFrameTransformAction.java index 6b7de0ab80f3a..715fa0f5dc78b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/DeleteDataFrameTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/DeleteDataFrameTransformAction.java @@ -7,25 +7,18 @@ import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.FailedNodeException; -import org.elasticsearch.action.TaskOperationFailure; -import org.elasticsearch.action.support.tasks.BaseTasksRequest; -import org.elasticsearch.action.support.tasks.BaseTasksResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.xcontent.ToXContentObject; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.tasks.Task; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.utils.ExceptionsHelper; import java.io.IOException; -import java.util.Collections; -import java.util.List; import java.util.Objects; -public class DeleteDataFrameTransformAction extends Action { +public class DeleteDataFrameTransformAction extends Action { public static final DeleteDataFrameTransformAction INSTANCE = new DeleteDataFrameTransformAction(); public static final String NAME = "cluster:admin/data_frame/delete"; @@ -35,17 +28,21 @@ private DeleteDataFrameTransformAction() { } @Override - public Response newResponse() { + public AcknowledgedResponse newResponse() { throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override - public Writeable.Reader getResponseReader() { - return Response::new; + public Writeable.Reader getResponseReader() { + return in -> { + AcknowledgedResponse response = new AcknowledgedResponse(); + response.readFrom(in); + return response; + }; } - public static class Request extends BaseTasksRequest { - private final String id; + public static class Request extends MasterNodeRequest { + private String id; public Request(String id) { this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName()); @@ -60,11 +57,6 @@ public String getId() { return id; } - @Override - public boolean match(Task task) { - return task.getDescription().equals(DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX + id); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -94,59 +86,4 @@ public boolean equals(Object obj) { return Objects.equals(id, other.id); } } - - public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject { - - private final boolean acknowledged; - - public Response(StreamInput in) throws IOException { - super(in); - acknowledged = in.readBoolean(); - } - - public Response(boolean acknowledged, List taskFailures, List nodeFailures) { - super(taskFailures, nodeFailures); - this.acknowledged = acknowledged; - } - - public Response(boolean acknowledged) { - this(acknowledged, Collections.emptyList(), Collections.emptyList()); - } - - public boolean isDeleted() { - return acknowledged; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeBoolean(acknowledged); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - { - toXContentCommon(builder, params); - builder.field("acknowledged", acknowledged); - } - builder.endObject(); - return builder; - } - - @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - DeleteDataFrameTransformAction.Response response = (DeleteDataFrameTransformAction.Response) o; - return super.equals(o) && acknowledged == response.acknowledged; - } - - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), acknowledged); - } - } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index ec7e0de9e34fc..ccf075b13ae5a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -22,9 +22,11 @@ * An abstract class that builds an index incrementally. A background job can be launched using {@link #maybeTriggerAsyncJob(long)}, * it will create the index from the source index up to the last complete bucket that is allowed to be built (based on job position). * Only one background job can run simultaneously and {@link #onFinish} is called when the job - * finishes. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()} is called if the indexer is - * aborted while a job is running. The indexer must be started ({@link #start()} to allow a background job to run when - * {@link #maybeTriggerAsyncJob(long)} is called. {@link #stop()} can be used to stop the background job without aborting the indexer. + * finishes. {@link #onStop()} is called after the current search returns when the job is stopped early via a call + * to {@link #stop()}. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()} + * is called if the indexer is aborted while a job is running. The indexer must be started ({@link #start()} + * to allow a background job to run when {@link #maybeTriggerAsyncJob(long)} is called. + * {@link #stop()} can be used to stop the background job without aborting the indexer. * * In a nutshell this is a 2 cycle engine: 1st it sends a query, 2nd it indexes documents based on the response, sends the next query, * indexes, queries, indexes, ... until a condition lets the engine pause until the source provides new input. @@ -84,8 +86,10 @@ public synchronized IndexerState start() { /** * Sets the internal state to {@link IndexerState#STOPPING} if an async job is - * running in the background. If there is no job running when this function is - * called, the state is directly set to {@link IndexerState#STOPPED}. + * running in the background, {@link #onStop()} will be called when the background job + * detects that the indexer is stopped. + * If there is no job running when this function is called + * the state is set to {@link IndexerState#STOPPED} and {@link #onStop()} called directly. * * @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted). */ @@ -94,6 +98,7 @@ public synchronized IndexerState stop() { if (previousState == IndexerState.INDEXING) { return IndexerState.STOPPING; } else if (previousState == IndexerState.STARTED) { + onStop(); return IndexerState.STOPPED; } else { return previousState; @@ -251,6 +256,14 @@ public synchronized boolean maybeTriggerAsyncJob(long now) { */ protected abstract void onFinish(ActionListener listener); + /** + * Called when the indexer is stopped. This is only called when the indexer is stopped + * via {@link #stop()} as opposed to {@link #onFinish(ActionListener)} which is called + * when the indexer's work is done. + */ + protected void onStop() { + } + /** * Called when a background job detects that the indexer is aborted causing the * async execution to stop. @@ -276,6 +289,7 @@ private IndexerState finishAndSetState() { case STOPPING: // must be started again + onStop(); return IndexerState.STOPPED; case ABORTING: diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/DeleteDataFrameTransformActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/DeleteDataFrameTransformActionResponseTests.java deleted file mode 100644 index 54501fde5cfe8..0000000000000 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/DeleteDataFrameTransformActionResponseTests.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.core.dataframe.action; - -import org.elasticsearch.common.io.stream.Writeable.Reader; -import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction.Response; - -public class DeleteDataFrameTransformActionResponseTests extends AbstractWireSerializingDataFrameTestCase { - @Override - protected Response createTestInstance() { - return new Response(randomBoolean()); - } - - @Override - protected Reader instanceReader() { - return Response::new; - } -} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java index b39c4f1a25a76..e56491bdb5764 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.test.ESTestCase; +import org.junit.Before; import java.io.IOException; import java.util.Collections; @@ -34,17 +35,26 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase { AtomicBoolean isFinished = new AtomicBoolean(false); + AtomicBoolean isStopped = new AtomicBoolean(false); + + @Before + public void reset() { + isFinished.set(false); + isStopped.set(false); + } private class MockIndexer extends AsyncTwoPhaseIndexer { private final CountDownLatch latch; // test the execution order private volatile int step; + private final boolean stoppedBeforeFinished; protected MockIndexer(Executor executor, AtomicReference initialState, Integer initialPosition, - CountDownLatch latch) { + CountDownLatch latch, boolean stoppedBeforeFinished) { super(executor, initialState, initialPosition, new MockJobStats()); this.latch = latch; + this.stoppedBeforeFinished = stoppedBeforeFinished; } @Override @@ -57,7 +67,7 @@ protected IterationResult doProcess(SearchResponse searchResponse) { awaitForLatch(); assertThat(step, equalTo(3)); ++step; - return new IterationResult(Collections.emptyList(), 3, true); + return new IterationResult<>(Collections.emptyList(), 3, true); } private void awaitForLatch() { @@ -99,7 +109,8 @@ protected void doNextBulk(BulkRequest request, ActionListener next @Override protected void doSaveState(IndexerState state, Integer position, Runnable next) { - assertThat(step, equalTo(5)); + int expectedStep = stoppedBeforeFinished ? 3 : 5; + assertThat(step, equalTo(expectedStep)); ++step; next.run(); } @@ -114,7 +125,12 @@ protected void onFinish(ActionListener listener) { assertThat(step, equalTo(4)); ++step; listener.onResponse(null); - isFinished.set(true); + assertTrue(isFinished.compareAndSet(false, true)); + } + + @Override + protected void onStop() { + assertTrue(isStopped.compareAndSet(false, true)); } @Override @@ -180,7 +196,7 @@ protected void doSaveState(IndexerState state, Integer position, Runnable next) protected void onFailure(Exception exc) { assertThat(step, equalTo(2)); ++step; - isFinished.set(true); + assertTrue(isFinished.compareAndSet(false, true)); } @Override @@ -209,10 +225,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public void testStateMachine() throws Exception { AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); final ExecutorService executor = Executors.newFixedThreadPool(1); - isFinished.set(false); try { CountDownLatch countDownLatch = new CountDownLatch(1); - MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch); + MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false); indexer.start(); assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); @@ -220,7 +235,8 @@ public void testStateMachine() throws Exception { countDownLatch.countDown(); assertThat(indexer.getPosition(), equalTo(2)); - ESTestCase.awaitBusy(() -> isFinished.get()); + assertTrue(awaitBusy(() -> isFinished.get())); + assertFalse(isStopped.get()); assertThat(indexer.getStep(), equalTo(6)); assertThat(indexer.getStats().getNumInvocations(), equalTo(1L)); assertThat(indexer.getStats().getNumPages(), equalTo(1L)); @@ -234,18 +250,57 @@ public void testStateMachine() throws Exception { public void testStateMachineBrokenSearch() throws InterruptedException { AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); final ExecutorService executor = Executors.newFixedThreadPool(1); - isFinished.set(false); try { MockIndexerThrowsFirstSearch indexer = new MockIndexerThrowsFirstSearch(executor, state, 2); indexer.start(); assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); - assertTrue(ESTestCase.awaitBusy(() -> isFinished.get(), 10000, TimeUnit.SECONDS)); + assertTrue(awaitBusy(() -> isFinished.get(), 10000, TimeUnit.SECONDS)); assertThat(indexer.getStep(), equalTo(3)); } finally { executor.shutdownNow(); } } + + public void testStop_AfterIndexerIsFinished() throws InterruptedException { + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); + final ExecutorService executor = Executors.newFixedThreadPool(1); + try { + CountDownLatch countDownLatch = new CountDownLatch(1); + MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false); + indexer.start(); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + countDownLatch.countDown(); + assertTrue(awaitBusy(() -> isFinished.get())); + + indexer.stop(); + assertTrue(isStopped.get()); + assertThat(indexer.getState(), equalTo(IndexerState.STOPPED)); + } finally { + executor.shutdownNow(); + } + } + + public void testStop_WhileIndexing() throws InterruptedException { + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); + final ExecutorService executor = Executors.newFixedThreadPool(1); + try { + CountDownLatch countDownLatch = new CountDownLatch(1); + MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, true); + indexer.start(); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); + indexer.stop(); + countDownLatch.countDown(); + + assertThat(indexer.getPosition(), equalTo(2)); + assertTrue(awaitBusy(() -> isStopped.get())); + assertFalse(isFinished.get()); + } finally { + executor.shutdownNow(); + } + } } diff --git a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java index 84f3e05de5cd1..ba6a6137789a3 100644 --- a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java +++ b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java @@ -93,11 +93,11 @@ protected StartDataFrameTransformAction.Response startDataFrameTransform(String new StartDataFrameTransformAction.Request(id, false)).actionGet(); } - protected DeleteDataFrameTransformAction.Response deleteDataFrameTransform(String id) { - DeleteDataFrameTransformAction.Response response = client().execute(DeleteDataFrameTransformAction.INSTANCE, + protected AcknowledgedResponse deleteDataFrameTransform(String id) { + AcknowledgedResponse response = client().execute(DeleteDataFrameTransformAction.INSTANCE, new DeleteDataFrameTransformAction.Request(id)) .actionGet(); - if (response.isDeleted()) { + if (response.isAcknowledged()) { transformConfigs.remove(id); } return response; diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java index 85c0ac44a69af..4344aa823b4cc 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java @@ -21,6 +21,7 @@ import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex; +import org.junit.After; import org.junit.AfterClass; import java.io.IOException; @@ -272,16 +273,20 @@ protected static void deleteDataFrameTransform(String transformId) throws IOExce adminClient().performRequest(request); } - @AfterClass - public static void removeIndices() throws Exception { + @After + public void waitForDataFrame() throws Exception { wipeDataFrameTransforms(); waitForPendingDataFrameTasks(); + } + + @AfterClass + public static void removeIndices() throws Exception { // we might have disabled wiping indices, but now its time to get rid of them // note: can not use super.cleanUpCluster() as this method must be static wipeIndices(); } - protected static void wipeDataFrameTransforms() throws IOException, InterruptedException { + public void wipeDataFrameTransforms() throws IOException, InterruptedException { List> transformConfigs = getDataFrameTransforms(); for (Map transformConfig : transformConfigs) { String transformId = (String) transformConfig.get("id"); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameTransformAction.java index 2cdc4009e785b..ac40334dfb443 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameTransformAction.java @@ -5,93 +5,73 @@ */ package org.elasticsearch.xpack.dataframe.action; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionListenerResponseHandler; -import org.elasticsearch.action.FailedNodeException; -import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.tasks.TransportTasksAction; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.discovery.MasterNotDiscoveredException; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.tasks.Task; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction.Request; -import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction.Response; -import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; -import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask; -import java.util.List; +import java.io.IOException; -public class TransportDeleteDataFrameTransformAction extends TransportTasksAction { +public class TransportDeleteDataFrameTransformAction extends TransportMasterNodeAction { private final DataFrameTransformsConfigManager transformsConfigManager; @Inject - public TransportDeleteDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters, - ClusterService clusterService, DataFrameTransformsConfigManager transformsConfigManager) { - super(DeleteDataFrameTransformAction.NAME, clusterService, transportService, actionFilters, Request::new, Response::new, - Response::new, ThreadPool.Names.SAME); + public TransportDeleteDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters, ThreadPool threadPool, + ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, + DataFrameTransformsConfigManager transformsConfigManager) { + super(DeleteDataFrameTransformAction.NAME, transportService, clusterService, threadPool, actionFilters, + Request::new, indexNameExpressionResolver); this.transformsConfigManager = transformsConfigManager; } @Override - protected Response newResponse(Request request, List tasks, List taskOperationFailures, - List failedNodeExceptions) { - assert tasks.size() + taskOperationFailures.size() == 1; - boolean cancelled = tasks.size() > 0 && tasks.stream().allMatch(Response::isDeleted); + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); + } - return new Response(cancelled, taskOperationFailures, failedNodeExceptions); + protected AcknowledgedResponse read(StreamInput in) throws IOException { + AcknowledgedResponse response = new AcknowledgedResponse(); + response.readFrom(in); + return response; } @Override - protected void taskOperation(Request request, DataFrameTransformTask task, ActionListener listener) { - assert task.getTransformId().equals(request.getId()); - IndexerState state = task.getState().getIndexerState(); - if (state.equals(IndexerState.STOPPED)) { - task.onCancelled(); - transformsConfigManager.deleteTransform(request.getId(), ActionListener.wrap(r -> { - listener.onResponse(new Response(true)); - }, listener::onFailure)); + protected void masterOperation(Request request, ClusterState state, ActionListener listener) throws Exception { + PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + if (pTasksMeta != null && pTasksMeta.getTask(request.getId()) != null) { + listener.onFailure(new ElasticsearchStatusException("Cannot delete data frame [" + request.getId() + + "] as the task is running. Stop the task first", RestStatus.CONFLICT)); } else { - listener.onFailure(new IllegalStateException("Could not delete transform [" + request.getId() + "] because " - + "indexer state is [" + state + "]. Transform must be [" + IndexerState.STOPPED + "] before deletion.")); + // Task is not running, delete the configuration document + transformsConfigManager.deleteTransform(request.getId(), ActionListener.wrap( + r -> listener.onResponse(new AcknowledgedResponse(r)), + listener::onFailure)); } } @Override - protected void doExecute(Task task, Request request, ActionListener listener) { - final ClusterState state = clusterService.state(); - final DiscoveryNodes nodes = state.nodes(); - if (nodes.isLocalNodeElectedMaster()) { - PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - if (pTasksMeta != null && pTasksMeta.getTask(request.getId()) != null) { - super.doExecute(task, request, listener); - } else { - // we couldn't find the transform in the persistent task CS, but maybe the transform exists in the configuration index, - // if so delete the orphaned document and do not throw (for the normal case we want to stop the task first, - // than delete the configuration document if and only if the data frame transform is in stopped state) - transformsConfigManager.deleteTransform(request.getId(), ActionListener.wrap(r -> { - listener.onResponse(new Response(true)); - return; - }, listener::onFailure)); - } - } else { - // Delegates DeleteTransform to elected master node, so it becomes the coordinating node. - // Non-master nodes may have a stale cluster state that shows transforms which are cancelled - // on the master, which makes testing difficult. - if (nodes.getMasterNode() == null) { - listener.onFailure(new MasterNotDiscoveredException("no known master nodes")); - } else { - transportService.sendRequest(nodes.getMasterNode(), actionName, request, - new ActionListenerResponseHandler<>(listener, Response::new)); - } - } + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java index 7ab5f28001407..bb01da4c7e50a 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java @@ -132,7 +132,6 @@ protected void doExecute(Task task, Request request, ActionListener fi }, e -> { // If the index to search, or the individual config is not there, just return empty - logger.error("failed to expand ids", e); if (e instanceof ResourceNotFoundException) { finalListener.onResponse(new Response(Collections.emptyList())); } else { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java index 0b8ef692cdd8c..997739b2407a7 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java @@ -6,8 +6,6 @@ package org.elasticsearch.xpack.dataframe.action; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; @@ -63,8 +61,6 @@ public class TransportPutDataFrameTransformAction extends TransportMasterNodeAction { - private static final Logger logger = LogManager.getLogger(TransportPutDataFrameTransformAction.class); - private final XPackLicenseState licenseState; private final Client client; private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager; diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java index 120f1ef77596b..26f5259c69dc8 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java @@ -5,64 +5,85 @@ */ package org.elasticsearch.xpack.dataframe.action; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.TransportTasksAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.MasterNotDiscoveredException; +import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.action.util.PageParams; -import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; -import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; - public class TransportStopDataFrameTransformAction extends TransportTasksAction { - private static final TimeValue WAIT_FOR_COMPLETION_POLL = timeValueMillis(100); private final ThreadPool threadPool; private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager; + private final PersistentTasksService persistentTasksService; @Inject public TransportStopDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters, ClusterService clusterService, ThreadPool threadPool, + PersistentTasksService persistentTasksService, DataFrameTransformsConfigManager dataFrameTransformsConfigManager) { super(StopDataFrameTransformAction.NAME, clusterService, transportService, actionFilters, StopDataFrameTransformAction.Request::new, StopDataFrameTransformAction.Response::new, StopDataFrameTransformAction.Response::new, ThreadPool.Names.SAME); this.threadPool = threadPool; this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager; + this.persistentTasksService = persistentTasksService; } @Override protected void doExecute(Task task, StopDataFrameTransformAction.Request request, ActionListener listener) { + final ClusterState state = clusterService.state(); + final DiscoveryNodes nodes = state.nodes(); + if (nodes.isLocalNodeElectedMaster() == false) { + // Delegates stop data frame to elected master node so it becomes the coordinating node. + if (nodes.getMasterNode() == null) { + listener.onFailure(new MasterNotDiscoveredException("no known master node")); + } else { + transportService.sendRequest(nodes.getMasterNode(), actionName, request, + new ActionListenerResponseHandler<>(listener, StopDataFrameTransformAction.Response::new)); + } + } else { + final ActionListener finalListener; + if (request.waitForCompletion()) { + finalListener = waitForStopListener(request, listener); + } else { + finalListener = listener; + } - dataFrameTransformsConfigManager.expandTransformIds(request.getId(), new PageParams(0, 10_000), ActionListener.wrap( - expandedIds -> { - request.setExpandedIds(new HashSet<>(expandedIds)); - request.setNodes(DataFrameNodes.dataFrameTaskNodes(expandedIds, clusterService.state())); - super.doExecute(task, request, listener); - }, - listener::onFailure - )); + dataFrameTransformsConfigManager.expandTransformIds(request.getId(), new PageParams(0, 10_000), ActionListener.wrap( + expandedIds -> { + request.setExpandedIds(new HashSet<>(expandedIds)); + request.setNodes(DataFrameNodes.dataFrameTaskNodes(expandedIds, clusterService.state())); + super.doExecute(task, request, finalListener); + }, + listener::onFailure + )); + } } @Override @@ -84,42 +105,9 @@ protected void taskOperation(StopDataFrameTransformAction.Request request, DataF RestStatus.CONFLICT)); return; } - if (request.waitForCompletion() == false) { - transformTask.stop(listener); - } else { - ActionListener blockingListener = ActionListener.wrap(response -> { - if (response.isStopped()) { - // The Task acknowledged that it is stopped/stopping... wait until the status actually - // changes over before returning. Switch over to Generic threadpool so - // we don't block the network thread - threadPool.generic().execute(() -> { - try { - long untilInNanos = System.nanoTime() + request.getTimeout().getNanos(); - - while (System.nanoTime() - untilInNanos < 0) { - if (transformTask.isStopped()) { - listener.onResponse(response); - return; - } - Thread.sleep(WAIT_FOR_COMPLETION_POLL.millis()); - } - // ran out of time - listener.onFailure(new ElasticsearchTimeoutException( - DataFrameMessages.getMessage(DataFrameMessages.REST_STOP_TRANSFORM_WAIT_FOR_COMPLETION_TIMEOUT, - request.getTimeout().getStringRep(), request.getId()))); - } catch (InterruptedException e) { - listener.onFailure(new ElasticsearchException(DataFrameMessages.getMessage( - DataFrameMessages.REST_STOP_TRANSFORM_WAIT_FOR_COMPLETION_INTERRUPT, request.getId()), e)); - } - }); - } else { - // Did not acknowledge stop, just return the response - listener.onResponse(response); - } - }, listener::onFailure); - - transformTask.stop(blockingListener); - } + + transformTask.stop(); + listener.onResponse(new StopDataFrameTransformAction.Response(Boolean.TRUE)); } else { listener.onFailure(new RuntimeException("ID of data frame indexer task [" + transformTask.getTransformId() + "] does not match request's ID [" + request.getId() + "]")); @@ -139,4 +127,47 @@ protected StopDataFrameTransformAction.Response newResponse(StopDataFrameTransfo boolean allStopped = tasks.stream().allMatch(StopDataFrameTransformAction.Response::isStopped); return new StopDataFrameTransformAction.Response(allStopped); } + + private ActionListener + waitForStopListener(StopDataFrameTransformAction.Request request, + ActionListener listener) { + + return ActionListener.wrap( + response -> { + // Wait until the persistent task is stopped + // Switch over to Generic threadpool so we don't block the network thread + threadPool.generic().execute(() -> + waitForDataFrameStopped(request.getExpandedIds(), request.getTimeout(), listener)); + }, + listener::onFailure + ); + } + + private void waitForDataFrameStopped(Collection persistentTaskIds, TimeValue timeout, + ActionListener listener) { + persistentTasksService.waitForPersistentTasksCondition(persistentTasksCustomMetaData -> { + + if (persistentTasksCustomMetaData == null) { + return true; + } + + for (String persistentTaskId : persistentTaskIds) { + if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) { + return false; + } + } + return true; + + }, timeout, new ActionListener<>() { + @Override + public void onResponse(Boolean result) { + listener.onResponse(new StopDataFrameTransformAction.Response(Boolean.TRUE)); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestDeleteDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestDeleteDataFrameTransformAction.java index 183952e060338..125e61b5021e4 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestDeleteDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestDeleteDataFrameTransformAction.java @@ -11,6 +11,7 @@ import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction; @@ -33,7 +34,7 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient DeleteDataFrameTransformAction.Request request = new DeleteDataFrameTransformAction.Request(id); return channel -> client.execute(DeleteDataFrameTransformAction.INSTANCE, request, - new BaseTasksResponseToXContentListener<>(channel)); + new RestToXContentListener<>(channel)); } @Override diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 2020300a0cf77..c332d29945aaf 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -27,7 +27,6 @@ import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response; -import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; @@ -85,7 +84,7 @@ public DataFrameTransformTask(long id, String type, String action, TaskId parent String initialReason = null; long initialGeneration = 0; Map initialPosition = null; - logger.info("[{}] init, got state: [{}]", transform.getId(), state != null); + logger.trace("[{}] init, got state: [{}]", transform.getId(), state != null); if (state != null) { initialTaskState = state.getTaskState(); initialReason = state.getReason(); @@ -218,51 +217,17 @@ public synchronized void start(ActionListener listener) { )); } - public synchronized void stop(ActionListener listener) { + public synchronized void stop() { if (getIndexer() == null) { - listener.onFailure(new ElasticsearchException("Task for transform [{}] not fully initialized. Try again later", - getTransformId())); return; } // taskState is initialized as STOPPED and is updated in tandem with the indexerState // Consequently, if it is STOPPED, we consider the whole task STOPPED. if (taskState.get() == DataFrameTransformTaskState.STOPPED) { - listener.onResponse(new StopDataFrameTransformAction.Response(true)); return; } - final IndexerState newState = getIndexer().stop(); - switch (newState) { - case STOPPED: - // Fall through to `STOPPING` as the behavior is the same for both, we should persist for both - case STOPPING: - // update the persistent state to STOPPED. There are two scenarios and both are safe: - // 1. we persist STOPPED now, indexer continues a bit then sees the flag and checkpoints another STOPPED with the more recent - // position. - // 2. we persist STOPPED now, indexer continues a bit but then dies. When/if we resume we'll pick up at last checkpoint, - // overwrite some docs and eventually checkpoint. - taskState.set(DataFrameTransformTaskState.STOPPED); - DataFrameTransformState state = new DataFrameTransformState( - DataFrameTransformTaskState.STOPPED, - IndexerState.STOPPED, - getIndexer().getPosition(), - currentCheckpoint.get(), - stateReason.get(), - getIndexer().getProgress()); - persistStateToClusterState(state, ActionListener.wrap( - task -> { - auditor.info(transform.getId(), "Updated state to [" + state.getTaskState() + "]"); - listener.onResponse(new StopDataFrameTransformAction.Response(true)); - }, - exc -> listener.onFailure(new ElasticsearchException( - "Error while updating state for data frame transform [{}] to [{}]", exc, - transform.getId(), - state.getIndexerState())))); - break; - default: - listener.onFailure(new ElasticsearchException("Cannot stop task for data frame transform [{}], because state was [{}]", - transform.getId(), newState)); - break; - } + + getIndexer().stop(); } @Override @@ -280,12 +245,10 @@ public synchronized void triggered(Event event) { /** * Attempt to gracefully cleanup the data frame transform so it can be terminated. - * This tries to remove the job from the scheduler, and potentially any other - * cleanup operations in the future + * This tries to remove the job from the scheduler and completes the persistent task */ synchronized void shutdown() { try { - logger.info("Data frame indexer [" + transform.getId() + "] received abort request, stopping indexer."); schedulerEngine.remove(SCHEDULE_NAME + "_" + transform.getId()); schedulerEngine.unregister(this); } catch (Exception e) { @@ -612,6 +575,13 @@ protected void onFinish(ActionListener listener) { } } + @Override + protected void onStop() { + auditor.info(transformConfig.getId(), "Indexer has stopped"); + logger.info("Data frame transform [{}] indexer has stopped", transformConfig.getId()); + transformTask.shutdown(); + } + @Override protected void onAbort() { auditor.info(transformConfig.getId(), "Received abort request, stopping indexer"); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml index f1ac07b72340c..1e9223b79f201 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml @@ -106,6 +106,7 @@ teardown: - do: data_frame.stop_data_frame_transform: transform_id: "airline-transform-start-stop" + wait_for_completion: true - match: { stopped: true } - do: @@ -199,6 +200,7 @@ teardown: - do: data_frame.stop_data_frame_transform: transform_id: "airline-transform-start-later" + wait_for_completion: true - match: { stopped: true } - do: @@ -232,6 +234,8 @@ teardown: - do: data_frame.stop_data_frame_transform: transform_id: "_all" + wait_for_completion: true + - match: { stopped: true } - do: