From 6758c963dec619a94637bfce94683bac72a87566 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Fri, 3 Jul 2020 18:39:50 +0100 Subject: [PATCH] [ML] Wait for shards to initialize after creating ML internal indices There have been a few test failures that are likely caused by tests performing actions that use ML indices immediately after the actions that create those ML indices. Currently this can result in attempts to search the newly created index before its shards have initialized. This change makes the method that creates the internal ML indices that have been affected by this problem (state and stats) wait for the shards to be initialized before returning. Fixes #54887 Fixes #55221 Fixes #55807 Fixes #57102 Fixes #58841 Fixes #59011 --- .../xpack/core/ml/utils/MlIndexAndAlias.java | 42 +++++++++++++++---- .../ml/integration/DeleteExpiredDataIT.java | 1 - .../xpack/ml/integration/RegressionIT.java | 1 - 3 files changed, 35 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java index 3507653909b82..9848fc167836d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java @@ -9,6 +9,8 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; @@ -18,6 +20,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -69,13 +72,38 @@ private MlIndexAndAlias() {} * Adds an {@code alias} to that index if it was created, * or to the index with the highest suffix if the index did not have to be created. * The listener is notified with a {@code boolean} that informs whether the index or the alias were created. + * If the index is created, the listener is not called until the index is ready to use via the supplied alias, + * so that a method that receives a success response from this method can safely use the index immediately. */ public static void createIndexAndAliasIfNecessary(Client client, ClusterState clusterState, IndexNameExpressionResolver resolver, String indexPatternPrefix, String alias, - ActionListener listener) { + ActionListener finalListener) { + + // If the index and alias were successfully created then wait for the shards of the index that the alias points to be ready + ActionListener waitForShardsListener = ActionListener.wrap( + created -> { + if (created) { + ClusterHealthRequest healthRequest = Requests.clusterHealthRequest(alias) + .waitForNoRelocatingShards(true) + .waitForNoInitializingShards(true); + executeAsyncWithOrigin( + client.threadPool().getThreadContext(), + ML_ORIGIN, + healthRequest, + ActionListener.wrap( + response -> finalListener.onResponse(response.isTimedOut() == false), + finalListener::onFailure), + (request, listener) -> client.admin().cluster().health(request, listener) + ); + } else { + finalListener.onResponse(false); + } + }, + finalListener::onFailure + ); String legacyIndexWithoutSuffix = indexPatternPrefix; String indexPattern = indexPatternPrefix + "*"; @@ -89,7 +117,7 @@ public static void createIndexAndAliasIfNecessary(Client client, if (concreteIndexNames.length == 0) { if (indexPointedByCurrentWriteAlias.isEmpty()) { - createFirstConcreteIndex(client, firstConcreteIndex, alias, true, listener); + createFirstConcreteIndex(client, firstConcreteIndex, alias, true, waitForShardsListener); return; } logger.error( @@ -97,7 +125,7 @@ public static void createIndexAndAliasIfNecessary(Client client, indexPattern, alias, indexPointedByCurrentWriteAlias.get()); } else if (concreteIndexNames.length == 1 && concreteIndexNames[0].equals(legacyIndexWithoutSuffix)) { if (indexPointedByCurrentWriteAlias.isEmpty()) { - createFirstConcreteIndex(client, firstConcreteIndex, alias, true, listener); + createFirstConcreteIndex(client, firstConcreteIndex, alias, true, waitForShardsListener); return; } if (indexPointedByCurrentWriteAlias.get().getIndex().getName().equals(legacyIndexWithoutSuffix)) { @@ -107,8 +135,8 @@ public static void createIndexAndAliasIfNecessary(Client client, alias, false, ActionListener.wrap( - unused -> updateWriteAlias(client, alias, legacyIndexWithoutSuffix, firstConcreteIndex, listener), - listener::onFailure) + unused -> updateWriteAlias(client, alias, legacyIndexWithoutSuffix, firstConcreteIndex, waitForShardsListener), + finalListener::onFailure) ); return; } @@ -119,12 +147,12 @@ public static void createIndexAndAliasIfNecessary(Client client, if (indexPointedByCurrentWriteAlias.isEmpty()) { assert concreteIndexNames.length > 0; String latestConcreteIndexName = Arrays.stream(concreteIndexNames).max(INDEX_NAME_COMPARATOR).get(); - updateWriteAlias(client, alias, null, latestConcreteIndexName, listener); + updateWriteAlias(client, alias, null, latestConcreteIndexName, finalListener); return; } } // If the alias is set, there is nothing more to do. - listener.onResponse(false); + finalListener.onResponse(false); } private static void createFirstConcreteIndex(Client client, diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java index e57f89f92235a..06ee0e48b6fd3 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java @@ -137,7 +137,6 @@ public void testDeleteExpiredDataActionDeletesEmptyStateIndices() throws Excepti is(arrayContaining(".ml-state-000001", ".ml-state-000005", ".ml-state-000007"))); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/57102") public void testDeleteExpiredDataWithStandardThrottle() throws Exception { testExpiredDeletion(-1.0f, 100); } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java index 84d6f39df4369..f2c79e4e42445 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java @@ -297,7 +297,6 @@ public void testStopAndRestart() throws Exception { assertMlResultsFieldMappings(destIndex, predictedClassField, "double"); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/55807") public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exception { String sourceIndex = "regression_two_jobs_with_same_randomize_seed_source"; indexData(sourceIndex, 100, 0);