Skip to content

Commit

Permalink
[ML Data Frame] Refactor stop logic (#42644) (#42762)
Browse files Browse the repository at this point in the history
* Revert "invalid test"

This reverts commit 9dd8b52.

* Testing

* mend

* Revert "[ML Data Frame] Mute Data Frame tests"

This reverts commit 5d837fa.

* Call onStop and onAbort outside atomic update

* Don’t update CS

* Tidying up

* Remove invalid test that asserted logic that has been removed

* Add stopped event

* Revert "Add stopped event"

This reverts commit 02ba992.

* Adding check for STOPPED in saveState
  • Loading branch information
benwtrent authored Jun 3, 2019
1 parent 13a70c2 commit 4f7f8b9
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,28 +89,21 @@ public synchronized IndexerState start() {
* Sets the internal state to {@link IndexerState#STOPPING} if an async job is
* running in the background, {@link #onStop()} will be called when the background job
* detects that the indexer is stopped.
* If there is no job running when this function is called
* the state is set to {@link IndexerState#STOPPED} and {@link #onStop()} called directly.
* If there is no job running when this function is called the returned
* state is {@link IndexerState#STOPPED} and {@link #onStop()} will not be called.
*
* @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted).
*/
public synchronized IndexerState stop() {
AtomicBoolean wasStartedAndSetStopped = new AtomicBoolean(false);
IndexerState currentState = state.updateAndGet(previousState -> {
return state.updateAndGet(previousState -> {
if (previousState == IndexerState.INDEXING) {
return IndexerState.STOPPING;
} else if (previousState == IndexerState.STARTED) {
wasStartedAndSetStopped.set(true);
return IndexerState.STOPPED;
} else {
return previousState;
}
});

if (wasStartedAndSetStopped.get()) {
onStop();
}
return currentState;
}

/**
Expand Down Expand Up @@ -287,20 +280,22 @@ private void finishWithIndexingFailure(Exception exc) {
}

private IndexerState finishAndSetState() {
return state.updateAndGet(prev -> {
AtomicBoolean callOnStop = new AtomicBoolean(false);
AtomicBoolean callOnAbort = new AtomicBoolean(false);
IndexerState updatedState = state.updateAndGet(prev -> {
switch (prev) {
case INDEXING:
// ready for another job
return IndexerState.STARTED;

case STOPPING:
callOnStop.set(true);
// must be started again
onStop();
return IndexerState.STOPPED;

case ABORTING:
callOnAbort.set(true);
// abort and exit
onAbort();
return IndexerState.ABORTING; // This shouldn't matter, since onAbort() will kill the task first

case STOPPED:
Expand All @@ -315,6 +310,14 @@ private IndexerState finishAndSetState() {
throw new IllegalStateException("Indexer job encountered an illegal state [" + prev + "]");
}
});

if (callOnStop.get()) {
onStop();
} else if (callOnAbort.get()) {
onAbort();
}

return updatedState;
}

private void onSearchResponse(SearchResponse searchResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,25 +268,6 @@ public void testStateMachineBrokenSearch() throws InterruptedException {
}
}

public void testStop_AfterIndexerIsFinished() throws InterruptedException {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final ExecutorService executor = Executors.newFixedThreadPool(1);
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false);
indexer.start();
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
countDownLatch.countDown();
assertTrue(awaitBusy(() -> isFinished.get()));

indexer.stop();
assertTrue(isStopped.get());
assertThat(indexer.getState(), equalTo(IndexerState.STOPPED));
} finally {
executor.shutdownNow();
}
}

public void testStop_WhileIndexing() throws InterruptedException {
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
final ExecutorService executor = Executors.newFixedThreadPool(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public void cleanTransforms() throws IOException {
cleanUp();
}

@AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public void testDataFrameTransformCrud() throws Exception {
createReviewsIndex();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.elasticsearch.xpack.dataframe.integration;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import org.junit.Before;
Expand All @@ -23,7 +22,6 @@
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;

@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFrameAuditorIT extends DataFrameRestTestCase {

private static final String TEST_USER_NAME = "df_admin_plus_data";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
Expand All @@ -23,7 +22,6 @@

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFrameConfigurationIndexIT extends DataFrameRestTestCase {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.elasticsearch.xpack.dataframe.integration;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
Expand All @@ -22,7 +21,6 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {

private static final String TEST_USER_NAME = "df_user";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.elasticsearch.xpack.dataframe.integration;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
Expand All @@ -16,7 +15,6 @@
import java.io.IOException;
import java.util.Map;

@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFrameMetaDataIT extends DataFrameRestTestCase {

private boolean indicesCreated = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.elasticsearch.xpack.dataframe.integration;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.junit.Before;
Expand All @@ -22,7 +21,6 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;

@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFramePivotRestIT extends DataFrameRestTestCase {

private static final String TEST_USER_NAME = "df_admin_plus_data";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.elasticsearch.xpack.dataframe.integration;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.rest.RestStatus;
Expand All @@ -20,7 +19,6 @@
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.equalTo;

@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {

public void testDummy() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

package org.elasticsearch.xpack.dataframe.integration;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
Expand All @@ -23,7 +22,6 @@
import static org.elasticsearch.xpack.core.dataframe.DataFrameField.INDEX_DOC_TYPE;
import static org.elasticsearch.xpack.dataframe.DataFrameFeatureSet.PROVIDED_STATS;

@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/42344")
public class DataFrameUsageIT extends DataFrameRestTestCase {
private boolean indicesCreated = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
private final Map<String, Object> initialPosition;
private final IndexerState initialIndexerState;

private final SetOnce<DataFrameIndexer> indexer = new SetOnce<>();
private final SetOnce<ClientDataFrameIndexer> indexer = new SetOnce<>();

private final AtomicReference<DataFrameTransformTaskState> taskState;
private final AtomicReference<String> stateReason;
Expand Down Expand Up @@ -125,7 +125,7 @@ public Status getStatus() {
return getState();
}

private DataFrameIndexer getIndexer() {
private ClientDataFrameIndexer getIndexer() {
return indexer.get();
}

Expand Down Expand Up @@ -236,7 +236,10 @@ public synchronized void stop() {
return;
}

getIndexer().stop();
IndexerState state = getIndexer().stop();
if (state == IndexerState.STOPPED) {
getIndexer().doSaveState(state, getIndexer().getPosition(), () -> getIndexer().onStop());
}
}

@Override
Expand Down Expand Up @@ -530,40 +533,36 @@ protected void doSaveState(IndexerState indexerState, Map<String, Object> positi
next.run();
return;
}
// If we are `STOPPED` on a `doSaveState` call, that indicates we transitioned to `STOPPED` from `STOPPING`
// OR we called `doSaveState` manually as the indexer was not actively running.
// Since we save the state to an index, we should make sure that our task state is in parity with the indexer state
if (indexerState.equals(IndexerState.STOPPED)) {
transformTask.setTaskStateStopped();
}

final DataFrameTransformState state = new DataFrameTransformState(
transformTask.taskState.get(),
indexerState,
getPosition(),
position,
transformTask.currentCheckpoint.get(),
transformTask.stateReason.get(),
getProgress());
logger.debug("Updating persistent state of transform [{}] to [{}]", transformConfig.getId(), state.toString());

// Persisting stats when we call `doSaveState` should be ok as we only call it on a state transition and
// only every-so-often when doing the bulk indexing calls. See AsyncTwoPhaseIndexer#onBulkResponse for current periodicity
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> updateClusterStateListener = ActionListener.wrap(
task -> {
transformsConfigManager.putOrUpdateTransformStats(
new DataFrameTransformStateAndStats(transformId, state, getStats(),
DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
ActionListener.wrap(
r -> {
next.run();
},
statsExc -> {
logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc);
next.run();
}
));
},
exc -> {
logger.error("Updating persistent state of transform [" + transformConfig.getId() + "] failed", exc);
next.run();
}
);

transformTask.persistStateToClusterState(state, updateClusterStateListener);
transformsConfigManager.putOrUpdateTransformStats(
new DataFrameTransformStateAndStats(transformId, state, getStats(),
DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
ActionListener.wrap(
r -> {
next.run();
},
statsExc -> {
logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc);
next.run();
}
));
}

@Override
Expand Down Expand Up @@ -602,20 +601,7 @@ protected void onFinish(ActionListener<Void> listener) {
protected void onStop() {
auditor.info(transformConfig.getId(), "Indexer has stopped");
logger.info("Data frame transform [{}] indexer has stopped", transformConfig.getId());

transformTask.setTaskStateStopped();
transformsConfigManager.putOrUpdateTransformStats(
new DataFrameTransformStateAndStats(transformId, transformTask.getState(), getStats(),
DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
ActionListener.wrap(
r -> {
transformTask.shutdown();
},
statsExc -> {
transformTask.shutdown();
logger.error("Updating saving stats of transform [" + transformConfig.getId() + "] failed", statsExc);
}
));
transformTask.shutdown();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,10 @@ teardown:
- do:
data_frame.stop_data_frame_transform:
transform_id: "airline-transform-start-stop"
wait_for_completion: true
- match: { acknowledged: true }


- do:
data_frame.get_data_frame_transform_stats:
transform_id: "airline-transform-start-later"
Expand All @@ -209,3 +211,46 @@ teardown:
- do:
data_frame.delete_data_frame_transform:
transform_id: "airline-transform-start-later"

---
"Test stop all":
- do:
data_frame.put_data_frame_transform:
transform_id: "airline-transform-stop-all"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-data-start-later" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
- do:
data_frame.start_data_frame_transform:
transform_id: "airline-transform-stop-all"
- match: { acknowledged: true }

- do:
data_frame.start_data_frame_transform:
transform_id: "airline-transform-start-stop"
- match: { acknowledged: true }

- do:
data_frame.stop_data_frame_transform:
transform_id: "_all"
wait_for_completion: true
- match: { acknowledged: true }

- do:
data_frame.get_data_frame_transform_stats:
transform_id: "*"
- match: { count: 2 }
- match: { transforms.0.state.indexer_state: "stopped" }
- match: { transforms.0.state.task_state: "stopped" }
- match: { transforms.1.state.indexer_state: "stopped" }
- match: { transforms.1.state.task_state: "stopped" }

- do:
data_frame.delete_data_frame_transform:
transform_id: "airline-transform-stop-all"

0 comments on commit 4f7f8b9

Please sign in to comment.