Skip to content

Commit

Permalink
[ML] Wait for shards to initialize after creating ML internal indices
Browse files Browse the repository at this point in the history
There have been a few test failures that are likely caused by tests
performing actions that use ML indices immediately after the actions
that create those ML indices.  Currently this can result in attempts
to search the newly created index before its shards have initialized.

This change makes the method that creates the internal ML indices
that have been affected by this problem (state and stats) wait for
the shards to be initialized before returning.

Fixes elastic#54887
Fixes elastic#55221
Fixes elastic#55807
Fixes elastic#57102
Fixes elastic#58841
Fixes elastic#59011
  • Loading branch information
droberts195 committed Jul 3, 2020
1 parent 34175b9 commit 6758c96
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
Expand All @@ -18,6 +20,7 @@
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -69,13 +72,38 @@ private MlIndexAndAlias() {}
* Adds an {@code alias} to that index if it was created,
* or to the index with the highest suffix if the index did not have to be created.
* The listener is notified with a {@code boolean} that informs whether the index or the alias were created.
* If the index is created, the listener is not called until the index is ready to use via the supplied alias,
* so that a method that receives a success response from this method can safely use the index immediately.
*/
public static void createIndexAndAliasIfNecessary(Client client,
ClusterState clusterState,
IndexNameExpressionResolver resolver,
String indexPatternPrefix,
String alias,
ActionListener<Boolean> listener) {
ActionListener<Boolean> finalListener) {

// If the index and alias were successfully created then wait for the shards of the index that the alias points to be ready
ActionListener<Boolean> waitForShardsListener = ActionListener.wrap(
created -> {
if (created) {
ClusterHealthRequest healthRequest = Requests.clusterHealthRequest(alias)
.waitForNoRelocatingShards(true)
.waitForNoInitializingShards(true);
executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
ML_ORIGIN,
healthRequest,
ActionListener.<ClusterHealthResponse>wrap(
response -> finalListener.onResponse(response.isTimedOut() == false),
finalListener::onFailure),
(request, listener) -> client.admin().cluster().health(request, listener)
);
} else {
finalListener.onResponse(false);
}
},
finalListener::onFailure
);

String legacyIndexWithoutSuffix = indexPatternPrefix;
String indexPattern = indexPatternPrefix + "*";
Expand All @@ -89,15 +117,15 @@ public static void createIndexAndAliasIfNecessary(Client client,

if (concreteIndexNames.length == 0) {
if (indexPointedByCurrentWriteAlias.isEmpty()) {
createFirstConcreteIndex(client, firstConcreteIndex, alias, true, listener);
createFirstConcreteIndex(client, firstConcreteIndex, alias, true, waitForShardsListener);
return;
}
logger.error(
"There are no indices matching '{}' pattern but '{}' alias points at [{}]. This should never happen.",
indexPattern, alias, indexPointedByCurrentWriteAlias.get());
} else if (concreteIndexNames.length == 1 && concreteIndexNames[0].equals(legacyIndexWithoutSuffix)) {
if (indexPointedByCurrentWriteAlias.isEmpty()) {
createFirstConcreteIndex(client, firstConcreteIndex, alias, true, listener);
createFirstConcreteIndex(client, firstConcreteIndex, alias, true, waitForShardsListener);
return;
}
if (indexPointedByCurrentWriteAlias.get().getIndex().getName().equals(legacyIndexWithoutSuffix)) {
Expand All @@ -107,8 +135,8 @@ public static void createIndexAndAliasIfNecessary(Client client,
alias,
false,
ActionListener.wrap(
unused -> updateWriteAlias(client, alias, legacyIndexWithoutSuffix, firstConcreteIndex, listener),
listener::onFailure)
unused -> updateWriteAlias(client, alias, legacyIndexWithoutSuffix, firstConcreteIndex, waitForShardsListener),
finalListener::onFailure)
);
return;
}
Expand All @@ -119,12 +147,12 @@ public static void createIndexAndAliasIfNecessary(Client client,
if (indexPointedByCurrentWriteAlias.isEmpty()) {
assert concreteIndexNames.length > 0;
String latestConcreteIndexName = Arrays.stream(concreteIndexNames).max(INDEX_NAME_COMPARATOR).get();
updateWriteAlias(client, alias, null, latestConcreteIndexName, listener);
updateWriteAlias(client, alias, null, latestConcreteIndexName, finalListener);
return;
}
}
// If the alias is set, there is nothing more to do.
listener.onResponse(false);
finalListener.onResponse(false);
}

private static void createFirstConcreteIndex(Client client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ public void testDeleteExpiredDataActionDeletesEmptyStateIndices() throws Excepti
is(arrayContaining(".ml-state-000001", ".ml-state-000005", ".ml-state-000007")));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/57102")
public void testDeleteExpiredDataWithStandardThrottle() throws Exception {
testExpiredDeletion(-1.0f, 100);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,6 @@ public void testStopAndRestart() throws Exception {
assertMlResultsFieldMappings(destIndex, predictedClassField, "double");
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/55807")
public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exception {
String sourceIndex = "regression_two_jobs_with_same_randomize_seed_source";
indexData(sourceIndex, 100, 0);
Expand Down

0 comments on commit 6758c96

Please sign in to comment.