From 9ccd34e26af928a434ecfec65faa9e86cc80a623 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Tue, 15 Jan 2019 09:06:30 -0600 Subject: [PATCH 1/7] ML: creating ML State write alias and pointing writes there --- .../persistence/AnomalyDetectorsIndex.java | 88 ++++++++++++++++++- .../authz/store/ReservedRolesStoreTests.java | 4 +- .../xpack/test/rest/XPackRestTestHelper.java | 5 +- .../ml/integration/DeleteExpiredDataIT.java | 2 +- .../xpack/ml/MachineLearning.java | 9 +- .../xpack/ml/MlConfigMigrator.java | 2 +- .../ml/action/TransportOpenJobAction.java | 10 ++- .../job/persistence/JobResultsPersister.java | 4 +- .../output/AutodetectStateProcessor.java | 2 +- .../action/TransportOpenJobActionTests.java | 6 +- .../xpack/test/rest/XPackRestIT.java | 3 +- .../test/upgraded_cluster/30_ml_jobs_crud.yml | 5 ++ 12 files changed, 119 insertions(+), 21 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java index b9f887d2d49fc..a86a6ec8282ef 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java @@ -5,6 +5,26 @@ */ package org.elasticsearch.xpack.core.ml.job.persistence; +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasOrIndex; +import org.elasticsearch.common.settings.Settings; + +import java.util.List; +import java.util.Optional; +import java.util.SortedMap; +import java.util.stream.Collectors; + +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; + /** * Methods for handling index naming related functions */ @@ -40,11 +60,11 @@ public static String resultsWriteAlias(String jobId) { } /** - * The name of the default index where a job's state is stored - * @return The index name + * The name of the alias pointing to the appropriate index for writing job state + * @return The write alias name */ - public static String jobStateIndexName() { - return AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX; + public static String jobStateIndexWriteAlias() { + return AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-write"; } /** @@ -64,4 +84,64 @@ public static String configIndexName() { return AnomalyDetectorsIndexFields.CONFIG_INDEX; } + /** + * Create the .ml-state index (if necessary) + * Create the .ml-state-write alias for the .ml-state index (if necessary) + */ + public static void createStateIndexAndAliasIfNecessary(Client client, ClusterState state, final ActionListener finalListener) { + + if (state.getMetaData().getAliasAndIndexLookup().containsKey(jobStateIndexWriteAlias())) { + finalListener.onResponse(false); + return; + } + + final ActionListener createAliasListener = ActionListener.wrap( + r -> { + final IndicesAliasesRequest request = client.admin() + .indices() + .prepareAliases() + .addAlias(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, jobStateIndexWriteAlias()) + .request(); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), + ML_ORIGIN, + request, + ActionListener.wrap( + resp -> finalListener.onResponse(resp.isAcknowledged()), + finalListener::onFailure), + client.admin().indices()::aliases); + }, + finalListener::onFailure + ); + + // Only create the index or aliases if some other ML index exists - saves clutter if ML is never used. + SortedMap mlLookup = state.getMetaData().getAliasAndIndexLookup().tailMap(".ml"); + if (mlLookup.isEmpty() == false && mlLookup.firstKey().startsWith(".ml")) { + if (mlLookup.containsKey(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX)) { + createAliasListener.onResponse(null); + } else { + CreateIndexRequest createIndexRequest = client.admin() + .indices() + .prepareCreate(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX) + .addAlias(new Alias(jobStateIndexWriteAlias())) + .request(); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), + ML_ORIGIN, + createIndexRequest, + ActionListener.wrap( + createIndexResponse -> finalListener.onResponse(true), + createIndexFailure -> { + // If it was created between our last check, and this request being handled, we should add the alias + // Adding an alias that already exists is idempotent, so, no need to double check if the alias exists + // as well. + if (createIndexFailure instanceof ResourceAlreadyExistsException) { + createAliasListener.onResponse(null); + } else { + finalListener.onFailure(createIndexFailure); + } + }), + client.admin().indices()::create); + } + } + } + } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java index fc9869a12803f..5c540df77d51d 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java @@ -762,7 +762,7 @@ public void testMachineLearningAdminRole() { assertNoAccessAllowed(role, "foo"); assertOnlyReadAllowed(role, MlMetaIndex.INDEX_NAME); - assertOnlyReadAllowed(role, AnomalyDetectorsIndex.jobStateIndexName()); + assertOnlyReadAllowed(role, AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX); assertOnlyReadAllowed(role, AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT); assertOnlyReadAllowed(role, AuditorField.NOTIFICATIONS_INDEX); } @@ -814,7 +814,7 @@ public void testMachineLearningUserRole() { assertNoAccessAllowed(role, "foo"); assertNoAccessAllowed(role, MlMetaIndex.INDEX_NAME); - assertNoAccessAllowed(role, AnomalyDetectorsIndex.jobStateIndexName()); + assertNoAccessAllowed(role, AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX); assertOnlyReadAllowed(role, AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT); assertOnlyReadAllowed(role, AuditorField.NOTIFICATIONS_INDEX); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestTestHelper.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestTestHelper.java index 47580bf731a44..082992d95ff87 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestTestHelper.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestTestHelper.java @@ -16,6 +16,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.MlMetaIndex; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; import org.elasticsearch.xpack.core.ml.notifications.AuditorField; import java.io.IOException; @@ -30,13 +31,13 @@ public final class XPackRestTestHelper { public static final List ML_PRE_V660_TEMPLATES = Collections.unmodifiableList( Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, MlMetaIndex.INDEX_NAME, - AnomalyDetectorsIndex.jobStateIndexName(), + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, AnomalyDetectorsIndex.jobResultsIndexPrefix())); public static final List ML_POST_V660_TEMPLATES = Collections.unmodifiableList( Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, MlMetaIndex.INDEX_NAME, - AnomalyDetectorsIndex.jobStateIndexName(), + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, AnomalyDetectorsIndex.jobResultsIndexPrefix(), AnomalyDetectorsIndex.configIndexName())); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java index f2ca43bf53c26..2a63ccaf41245 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java @@ -180,7 +180,7 @@ public void testDeleteExpiredData() throws Exception { bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); for (int i = 0; i < 10010; i++) { String docId = "non_existing_job_" + randomFrom("model_state_1234567#" + i, "quantiles", "categorizer_state#" + i); - IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexName(), "doc", docId); + IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), "doc", docId); indexRequest.source(Collections.emptyMap()); bulkRequestBuilder.add(indexRequest); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 11d302470c708..43674d42a56e6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -109,6 +109,7 @@ import org.elasticsearch.xpack.core.ml.action.ValidateDetectorAction; import org.elasticsearch.xpack.core.ml.action.ValidateJobConfigAction; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.notifications.AuditMessage; import org.elasticsearch.xpack.core.ml.notifications.AuditorField; @@ -701,7 +702,7 @@ public UnaryOperator> getIndexTemplateMetaDat } try (XContentBuilder stateMapping = ElasticsearchMappings.stateMapping()) { - IndexTemplateMetaData stateTemplate = IndexTemplateMetaData.builder(AnomalyDetectorsIndex.jobStateIndexName()) + IndexTemplateMetaData stateTemplate = IndexTemplateMetaData.builder(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX) .patterns(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexPattern())) // TODO review these settings .settings(Settings.builder() @@ -710,9 +711,9 @@ public UnaryOperator> getIndexTemplateMetaDat .putMapping(ElasticsearchMappings.DOC_TYPE, Strings.toString(stateMapping)) .version(Version.CURRENT.id) .build(); - templates.put(AnomalyDetectorsIndex.jobStateIndexName(), stateTemplate); + templates.put(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, stateTemplate); } catch (IOException e) { - logger.error("Error loading the template for the " + AnomalyDetectorsIndex.jobStateIndexName() + " index", e); + logger.error("Error loading the template for the " + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + " index", e); } try (XContentBuilder docMapping = ElasticsearchMappings.resultsMapping()) { @@ -742,7 +743,7 @@ public UnaryOperator> getIndexTemplateMetaDat public static boolean allTemplatesInstalled(ClusterState clusterState) { boolean allPresent = true; List templateNames = Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, MlMetaIndex.INDEX_NAME, - AnomalyDetectorsIndex.jobStateIndexName(), AnomalyDetectorsIndex.jobResultsIndexPrefix()); + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, AnomalyDetectorsIndex.jobResultsIndexPrefix()); for (String templateName : templateNames) { allPresent = allPresent && TemplateUtils.checkTemplateExistsAndVersionIsGTECurrentVersion(templateName, clusterState); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java index fbf8c3c804eef..c55a945122666 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -347,7 +347,7 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener listen logger.debug("taking a snapshot of ml_metadata"); String documentId = "ml-config"; - IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.jobStateIndexName(), + IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), ElasticsearchMappings.DOC_TYPE, documentId) .setOpType(DocWriteRequest.OpType.CREATE); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index b7b4fb3aad4c9..cbf2ae335ca17 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -526,11 +526,17 @@ public void onFailure(Exception e) { listener::onFailure ); + // Manually create the state index and its alias if necessary + ActionListener createMLStateListener = ActionListener.wrap( + response -> AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, state, jobUpdateListener), + listener::onFailure + ); + // Try adding state doc mapping ActionListener resultsPutMappingHandler = ActionListener.wrap( response -> { - addDocMappingIfMissing(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings::stateMapping, - state, jobUpdateListener); + addDocMappingIfMissing(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), ElasticsearchMappings::stateMapping, + state, createMLStateListener); }, listener::onFailure ); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index e57d85aefa72c..2a16b1c8ddd8a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -228,7 +228,7 @@ public void persistCategoryDefinition(CategoryDefinition category) { */ public void persistQuantiles(Quantiles quantiles) { Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.documentId(quantiles.getJobId())); - persistable.persist(AnomalyDetectorsIndex.jobStateIndexName()).actionGet(); + persistable.persist(AnomalyDetectorsIndex.jobStateIndexWriteAlias()).actionGet(); } /** @@ -237,7 +237,7 @@ public void persistQuantiles(Quantiles quantiles) { public void persistQuantiles(Quantiles quantiles, WriteRequest.RefreshPolicy refreshPolicy, ActionListener listener) { Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.documentId(quantiles.getJobId())); persistable.setRefreshPolicy(refreshPolicy); - persistable.persist(AnomalyDetectorsIndex.jobStateIndexName(), listener); + persistable.persist(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), listener); } /** diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectStateProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectStateProcessor.java index 63a496f0503bc..9d3afd0ad0dcb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectStateProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectStateProcessor.java @@ -98,7 +98,7 @@ private BytesReference splitAndPersist(BytesReference bytesRef, int searchFrom) void persist(BytesReference bytes) throws IOException { BulkRequest bulkRequest = new BulkRequest(); - bulkRequest.add(bytes, AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, XContentType.JSON); + bulkRequest.add(bytes, AnomalyDetectorsIndex.jobStateIndexWriteAlias(), ElasticsearchMappings.DOC_TYPE, XContentType.JSON); if (bulkRequest.numberOfActions() > 0) { LOGGER.trace("[{}] Persisting job state document", jobId); try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index 7d72ef7f633e1..10cbced840f34 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MappingMetaData; @@ -593,7 +594,7 @@ public static void addJobTask(String jobId, String nodeId, JobState jobState, Pe private void addIndices(MetaData.Builder metaData, RoutingTable.Builder routingTable) { List indices = new ArrayList<>(); - indices.add(AnomalyDetectorsIndex.jobStateIndexName()); + indices.add(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX); indices.add(MlMetaIndex.INDEX_NAME); indices.add(AuditorField.NOTIFICATIONS_INDEX); indices.add(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT); @@ -604,6 +605,9 @@ private void addIndices(MetaData.Builder metaData, RoutingTable.Builder routingT .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) ); + if (indexName.equals(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX)) { + indexMetaData.putAlias(new AliasMetaData.Builder(AnomalyDetectorsIndex.jobStateIndexWriteAlias())); + } metaData.put(indexMetaData); Index index = new Index(indexName, "_uuid"); ShardId shardId = new ShardId(index, 0); diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java index 57c944788482e..336ddadea4c32 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java @@ -25,6 +25,7 @@ import org.elasticsearch.xpack.core.ml.MlMetaIndex; import org.elasticsearch.xpack.core.ml.integration.MlRestTestStateCleaner; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; import org.elasticsearch.xpack.core.ml.notifications.AuditorField; import org.elasticsearch.xpack.core.rollup.job.RollupJob; import org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField; @@ -87,7 +88,7 @@ private void waitForTemplates() throws Exception { if (installTemplates()) { List templates = new ArrayList<>(); templates.addAll(Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, MlMetaIndex.INDEX_NAME, - AnomalyDetectorsIndex.jobStateIndexName(), + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, AnomalyDetectorsIndex.jobResultsIndexPrefix(), AnomalyDetectorsIndex.configIndexName())); diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/30_ml_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/30_ml_jobs_crud.yml index 507362507cdeb..e962c20a7e9eb 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/30_ml_jobs_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/30_ml_jobs_crud.yml @@ -72,6 +72,11 @@ setup: ml.get_jobs: job_id: mixed-cluster-job + - do: + indices.exists_alias: + name: ".ml-state-write" + - is_true: '' + --- "Test job with no model memory limit has established model memory after reopening": - do: From a98b2d3de2142270200d5e66f96e353276b88461 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Tue, 15 Jan 2019 09:27:37 -0600 Subject: [PATCH 2/7] style fix --- .../xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java | 4 ---- .../core/security/authz/store/ReservedRolesStoreTests.java | 1 - 2 files changed, 5 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java index a86a6ec8282ef..7f7ebd4026916 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java @@ -15,12 +15,8 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasOrIndex; -import org.elasticsearch.common.settings.Settings; -import java.util.List; -import java.util.Optional; import java.util.SortedMap; -import java.util.stream.Collectors; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java index 5c540df77d51d..9c9e3ecd3c6cc 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/store/ReservedRolesStoreTests.java @@ -73,7 +73,6 @@ import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction; import org.elasticsearch.xpack.core.ml.action.ValidateDetectorAction; import org.elasticsearch.xpack.core.ml.action.ValidateJobConfigAction; -import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; import org.elasticsearch.xpack.core.ml.notifications.AuditorField; import org.elasticsearch.xpack.core.monitoring.action.MonitoringBulkAction; From e194d8ef35ba37f6f7951588adfc618982bfd2fa Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Tue, 15 Jan 2019 15:16:35 -0600 Subject: [PATCH 3/7] Moving alias check to openJob method --- .../ml/action/TransportOpenJobAction.java | 12 +-- .../autodetect/AutodetectProcessManager.java | 98 ++++++++++--------- .../AutodetectProcessManagerTests.java | 51 ++++++---- 3 files changed, 87 insertions(+), 74 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index cbf2ae335ca17..f38083aca07f5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -526,17 +526,11 @@ public void onFailure(Exception e) { listener::onFailure ); - // Manually create the state index and its alias if necessary - ActionListener createMLStateListener = ActionListener.wrap( - response -> AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, state, jobUpdateListener), - listener::onFailure - ); - // Try adding state doc mapping ActionListener resultsPutMappingHandler = ActionListener.wrap( response -> { addDocMappingIfMissing(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), ElasticsearchMappings::stateMapping, - state, createMLStateListener); + state, jobUpdateListener); }, listener::onFailure ); @@ -679,6 +673,7 @@ public static class OpenJobPersistentTasksExecutor extends PersistentTasksExecut private volatile int maxConcurrentJobAllocations; private volatile int maxMachineMemoryPercent; private volatile int maxLazyMLNodes; + private volatile ClusterState clusterState; public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterService, AutodetectProcessManager autodetectProcessManager, MlMemoryTracker memoryTracker, @@ -695,6 +690,7 @@ public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterS clusterService.getClusterSettings() .addSettingsUpdateConsumer(MachineLearning.MAX_MACHINE_MEMORY_PERCENT, this::setMaxMachineMemoryPercent); clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_LAZY_ML_NODES, this::setMaxLazyMLNodes); + clusterService.addListener(event -> clusterState = event.state()); } @Override @@ -747,7 +743,7 @@ protected void nodeOperation(AllocatedPersistentTask task, OpenJobAction.JobPara } String jobId = jobTask.getJobId(); - autodetectProcessManager.openJob(jobTask, e2 -> { + autodetectProcessManager.openJob(jobTask, clusterState, e2 -> { if (e2 == null) { FinalizeJobExecutionAction.Request finalizeRequest = new FinalizeJobExecutionAction.Request(new String[]{jobId}); executeAsyncWithOrigin(client, ML_ORIGIN, FinalizeJobExecutionAction.INSTANCE, finalizeRequest, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index ef03b4f9e7160..b209be922ca9d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -11,6 +11,7 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Setting; @@ -36,6 +37,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; @@ -410,68 +412,70 @@ public void onFailure(Exception e) { } } - public void openJob(JobTask jobTask, Consumer closeHandler) { + public void openJob(JobTask jobTask, ClusterState clusterState, Consumer closeHandler) { String jobId = jobTask.getJobId(); logger.info("Opening job [{}]", jobId); - - jobManager.getJob(jobId, ActionListener.wrap( - job -> { - if (job.getJobVersion() == null) { - closeHandler.accept(ExceptionsHelper.badRequestException("Cannot open job [" + jobId + AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, ActionListener.wrap( + r -> { + jobManager.getJob(jobId, ActionListener.wrap( + job -> { + if (job.getJobVersion() == null) { + closeHandler.accept(ExceptionsHelper.badRequestException("Cannot open job [" + jobId + "] because jobs created prior to version 5.5 are not supported")); - return; - } - + return; + } - processByAllocation.putIfAbsent(jobTask.getAllocationId(), new ProcessContext(jobTask)); - jobResultsProvider.getAutodetectParams(job, params -> { - // We need to fork, otherwise we restore model state from a network thread (several GET api calls): - threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - closeHandler.accept(e); - } - @Override - protected void doRun() { - ProcessContext processContext = processByAllocation.get(jobTask.getAllocationId()); - if (processContext == null) { - logger.debug("Aborted opening job [{}] as it has been closed", jobId); - return; + processByAllocation.putIfAbsent(jobTask.getAllocationId(), new ProcessContext(jobTask)); + jobResultsProvider.getAutodetectParams(job, params -> { + // We need to fork, otherwise we restore model state from a network thread (several GET api calls): + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + closeHandler.accept(e); } - if (processContext.getState() != ProcessContext.ProcessStateName.NOT_RUNNING) { - logger.debug("Cannot open job [{}] when its state is [{}]", + + @Override + protected void doRun() { + ProcessContext processContext = processByAllocation.get(jobTask.getAllocationId()); + if (processContext == null) { + logger.debug("Aborted opening job [{}] as it has been closed", jobId); + return; + } + if (processContext.getState() != ProcessContext.ProcessStateName.NOT_RUNNING) { + logger.debug("Cannot open job [{}] when its state is [{}]", jobId, processContext.getState().getClass().getName()); - return; - } + return; + } - try { - createProcessAndSetRunning(processContext, job, params, closeHandler); - processContext.getAutodetectCommunicator().init(params.modelSnapshot()); - setJobState(jobTask, JobState.OPENED); - } catch (Exception e1) { - // No need to log here as the persistent task framework will log it try { - // Don't leave a partially initialised process hanging around - processContext.newKillBuilder() + createProcessAndSetRunning(processContext, job, params, closeHandler); + processContext.getAutodetectCommunicator().init(params.modelSnapshot()); + setJobState(jobTask, JobState.OPENED); + } catch (Exception e1) { + // No need to log here as the persistent task framework will log it + try { + // Don't leave a partially initialised process hanging around + processContext.newKillBuilder() .setAwaitCompletion(false) .setFinish(false) .kill(); - processByAllocation.remove(jobTask.getAllocationId()); - } finally { - setJobState(jobTask, JobState.FAILED, e2 -> closeHandler.accept(e1)); + processByAllocation.remove(jobTask.getAllocationId()); + } finally { + setJobState(jobTask, JobState.FAILED, e2 -> closeHandler.accept(e1)); + } } } - } + }); + }, e1 -> { + logger.warn("Failed to gather information required to open job [" + jobId + "]", e1); + setJobState(jobTask, JobState.FAILED, e2 -> closeHandler.accept(e1)); }); - }, e1 -> { - logger.warn("Failed to gather information required to open job [" + jobId + "]", e1); - setJobState(jobTask, JobState.FAILED, e2 -> closeHandler.accept(e1)); - }); - }, - closeHandler - )); - + }, + closeHandler + )); + }, + closeHandler)); } private void createProcessAndSetRunning(ProcessContext processContext, Job job, AutodetectParams params, Consumer handler) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 346e9aa5d5dbc..42760aeb13f9c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -8,6 +8,9 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasOrIndex; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -31,6 +34,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; @@ -61,6 +65,8 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -106,6 +112,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { private JobDataCountsPersister jobDataCountsPersister; private NormalizerFactory normalizerFactory; private Auditor auditor; + private ClusterState clusterState; private DataCounts dataCounts = new DataCounts("foo"); private ModelSizeStats modelSizeStats = new ModelSizeStats.Builder("foo").build(); @@ -125,6 +132,12 @@ public void setup() throws Exception { jobDataCountsPersister = mock(JobDataCountsPersister.class); normalizerFactory = mock(NormalizerFactory.class); auditor = mock(Auditor.class); + MetaData metaData = mock(MetaData.class); + SortedMap aliasOrIndexSortedMap = new TreeMap<>(); + aliasOrIndexSortedMap.put(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), mock(AliasOrIndex.Alias.class)); + when(metaData.getAliasAndIndexLookup()).thenReturn(aliasOrIndexSortedMap); + clusterState = mock(ClusterState.class); + when(clusterState.getMetaData()).thenReturn(metaData); doAnswer(invocationOnMock -> { @SuppressWarnings("unchecked") @@ -167,7 +180,7 @@ public void testOpenJob() { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); when(jobTask.getAllocationId()).thenReturn(1L); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); assertEquals(1, manager.numberOfOpenJobs()); assertTrue(manager.jobHasActiveAutodetectProcess(jobTask)); verify(jobTask).updatePersistentTaskState(eq(new JobTaskState(JobState.OPENED, 1L)), any()); @@ -193,7 +206,7 @@ public void testOpenJob_withoutVersion() { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn(job.getId()); AtomicReference errorHolder = new AtomicReference<>(); - manager.openJob(jobTask, errorHolder::set); + manager.openJob(jobTask, clusterState, errorHolder::set); Exception error = errorHolder.get(); assertThat(error, is(notNullValue())); assertThat(error.getMessage(), equalTo("Cannot open job [no_version] because jobs created prior to version 5.5 are not supported")); @@ -239,22 +252,22 @@ public void testOpenJob_exceedMaxNumJobs() { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("bar"); when(jobTask.getAllocationId()).thenReturn(1L); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("baz"); when(jobTask.getAllocationId()).thenReturn(2L); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); assertEquals(3, manager.numberOfOpenJobs()); Exception[] holder = new Exception[1]; jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foobar"); when(jobTask.getAllocationId()).thenReturn(3L); - manager.openJob(jobTask, e -> holder[0] = e); + manager.openJob(jobTask, clusterState, e -> holder[0] = e); Exception e = holder[0]; assertEquals("max running job capacity [3] reached", e.getMessage()); @@ -263,7 +276,7 @@ public void testOpenJob_exceedMaxNumJobs() { when(jobTask.getJobId()).thenReturn("baz"); manager.closeJob(jobTask, false, null); assertEquals(2, manager.numberOfOpenJobs()); - manager.openJob(jobTask, e1 -> {}); + manager.openJob(jobTask, clusterState, e1 -> {}); assertEquals(3, manager.numberOfOpenJobs()); } @@ -275,7 +288,7 @@ public void testProcessData() { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); DataLoadParams params = new DataLoadParams(TimeRange.builder().build(), Optional.empty()); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()), params, (dataCounts1, e) -> {}); assertEquals(1, manager.numberOfOpenJobs()); @@ -298,7 +311,7 @@ public void testProcessDataThrowsElasticsearchStatusException_onIoException() { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); Exception[] holder = new Exception[1]; manager.processData(jobTask, analysisRegistry, inputStream, xContentType, params, (dataCounts1, e) -> holder[0] = e); assertNotNull(holder[0]); @@ -311,7 +324,7 @@ public void testCloseJob() { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> {}); @@ -339,7 +352,7 @@ public void testCanCloseClosingJob() throws Exception { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> {}); @@ -387,7 +400,7 @@ public void testCanKillClosingJob() throws Exception { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> {}); @@ -416,7 +429,7 @@ public void testBucketResetMessageIsSent() { InputStream inputStream = createInputStream(""); JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, inputStream, xContentType, params, (dataCounts1, e) -> {}); verify(communicator).writeToJob(same(inputStream), same(analysisRegistry), same(xContentType), same(params), any()); } @@ -428,7 +441,7 @@ public void testFlush() { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); InputStream inputStream = createInputStream(""); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, inputStream, randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> {}); @@ -468,7 +481,7 @@ public void testCloseThrows() { // create a jobtask JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> { }); @@ -508,7 +521,7 @@ public void testJobHasActiveAutodetectProcess() { when(jobTask.getJobId()).thenReturn("foo"); assertFalse(manager.jobHasActiveAutodetectProcess(jobTask)); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> {}); @@ -526,7 +539,7 @@ public void testKillKillsAutodetectProcess() throws IOException { when(jobTask.getJobId()).thenReturn("foo"); assertFalse(manager.jobHasActiveAutodetectProcess(jobTask)); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> {}); @@ -560,7 +573,7 @@ public void testProcessData_GivenStateNotOpened() { JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn("foo"); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); InputStream inputStream = createInputStream(""); DataCounts[] dataCounts = new DataCounts[1]; manager.processData(jobTask, analysisRegistry, inputStream, @@ -705,7 +718,7 @@ private AutodetectProcessManager createManagerAndCallProcessData(AutodetectCommu AutodetectProcessManager manager = createManager(communicator); JobTask jobTask = mock(JobTask.class); when(jobTask.getJobId()).thenReturn(jobId); - manager.openJob(jobTask, e -> {}); + manager.openJob(jobTask, clusterState, e -> {}); manager.processData(jobTask, analysisRegistry, createInputStream(""), randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts, e) -> {}); return manager; From a8bbd2e53b5040e8231dfea2c1936aa4a01e11e9 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Wed, 16 Jan 2019 12:10:00 -0600 Subject: [PATCH 4/7] Addressing PR comments --- .../persistence/AnomalyDetectorsIndex.java | 70 ++++++++++--------- .../xpack/ml/MlConfigMigrator.java | 13 ++-- .../TransportRevertModelSnapshotAction.java | 47 ++++++++----- 3 files changed, 76 insertions(+), 54 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java index 7f7ebd4026916..7d25ee90f1b2d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java @@ -11,12 +11,14 @@ import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.AliasOrIndex; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import java.util.SortedMap; +import java.util.Arrays; +import java.util.List; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; @@ -91,12 +93,12 @@ public static void createStateIndexAndAliasIfNecessary(Client client, ClusterSta return; } - final ActionListener createAliasListener = ActionListener.wrap( - r -> { + final ActionListener createAliasListener = ActionListener.wrap( + concreteIndexName -> { final IndicesAliasesRequest request = client.admin() .indices() .prepareAliases() - .addAlias(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, jobStateIndexWriteAlias()) + .addAlias(concreteIndexName, jobStateIndexWriteAlias()) .request(); executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, @@ -109,34 +111,36 @@ public static void createStateIndexAndAliasIfNecessary(Client client, ClusterSta finalListener::onFailure ); - // Only create the index or aliases if some other ML index exists - saves clutter if ML is never used. - SortedMap mlLookup = state.getMetaData().getAliasAndIndexLookup().tailMap(".ml"); - if (mlLookup.isEmpty() == false && mlLookup.firstKey().startsWith(".ml")) { - if (mlLookup.containsKey(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX)) { - createAliasListener.onResponse(null); - } else { - CreateIndexRequest createIndexRequest = client.admin() - .indices() - .prepareCreate(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX) - .addAlias(new Alias(jobStateIndexWriteAlias())) - .request(); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), - ML_ORIGIN, - createIndexRequest, - ActionListener.wrap( - createIndexResponse -> finalListener.onResponse(true), - createIndexFailure -> { - // If it was created between our last check, and this request being handled, we should add the alias - // Adding an alias that already exists is idempotent, so, no need to double check if the alias exists - // as well. - if (createIndexFailure instanceof ResourceAlreadyExistsException) { - createAliasListener.onResponse(null); - } else { - finalListener.onFailure(createIndexFailure); - } - }), - client.admin().indices()::create); - } + IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(); + String[] state_indices = indexNameExpressionResolver.concreteIndexNames(state, + IndicesOptions.lenientExpandOpen(), + jobStateIndexPattern()); + if (state_indices.length > 0) { + List indices = Arrays.asList(state_indices); + indices.sort(String::compareTo); + createAliasListener.onResponse(indices.get(indices.size() - 1)); + } else { + CreateIndexRequest createIndexRequest = client.admin() + .indices() + .prepareCreate(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX) + .addAlias(new Alias(jobStateIndexWriteAlias())) + .request(); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), + ML_ORIGIN, + createIndexRequest, + ActionListener.wrap( + createIndexResponse -> finalListener.onResponse(true), + createIndexFailure -> { + // If it was created between our last check, and this request being handled, we should add the alias + // Adding an alias that already exists is idempotent. So, no need to double check if the alias exists + // as well. + if (createIndexFailure instanceof ResourceAlreadyExistsException) { + createAliasListener.onResponse(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX); + } else { + finalListener.onFailure(createIndexFailure); + } + }), + client.admin().indices()::create); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java index c55a945122666..6334e1f1fd4f6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -364,14 +364,19 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener listen return; } - executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, indexRequest.request(), - ActionListener.wrap( + AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterService.state(), ActionListener.wrap( + r -> { + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, indexRequest.request(), + ActionListener.wrap( indexResponse -> { listener.onResponse(indexResponse.getResult() == DocWriteResponse.Result.CREATED); }, listener::onFailure), - client::index - ); + client::index + ); + }, + listener::onFailure + )); } private void createConfigIndex(ActionListener listener) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java index a940d6666c9fd..ab2fb1368345a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.messages.Messages; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck; @@ -79,26 +80,38 @@ protected void masterOperation(RevertModelSnapshotAction.Request request, Cluste logger.debug("Received request to revert to snapshot id '{}' for job '{}', deleting intervening results: {}", request.getSnapshotId(), request.getJobId(), request.getDeleteInterveningResults()); - jobManager.jobExists(request.getJobId(), ActionListener.wrap( - exists -> { - PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - JobState jobState = MlTasks.getJobState(request.getJobId(), tasks); - if (jobState.equals(JobState.CLOSED) == false) { - throw ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_REVERT)); + // 3. Revert the state + ActionListener jobExistsListener = ActionListener.wrap( + exists -> { + PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + JobState jobState = MlTasks.getJobState(request.getJobId(), tasks); + + if (jobState.equals(JobState.CLOSED) == false) { + throw ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_REVERT)); + } + + getModelSnapshot(request, jobResultsProvider, modelSnapshot -> { + ActionListener wrappedListener = listener; + if (request.getDeleteInterveningResults()) { + wrappedListener = wrapDeleteOldDataListener(wrappedListener, modelSnapshot, request.getJobId()); + wrappedListener = wrapRevertDataCountsListener(wrappedListener, modelSnapshot, request.getJobId()); } + jobManager.revertSnapshot(request, wrappedListener, modelSnapshot); + }, listener::onFailure); + }, + listener::onFailure + ); + + + // 2. Verify the job exists + ActionListener createStateIndexListener = ActionListener.wrap( + r -> jobManager.jobExists(request.getJobId(), jobExistsListener), + listener::onFailure + ); - getModelSnapshot(request, jobResultsProvider, modelSnapshot -> { - ActionListener wrappedListener = listener; - if (request.getDeleteInterveningResults()) { - wrappedListener = wrapDeleteOldDataListener(wrappedListener, modelSnapshot, request.getJobId()); - wrappedListener = wrapRevertDataCountsListener(wrappedListener, modelSnapshot, request.getJobId()); - } - jobManager.revertSnapshot(request, wrappedListener, modelSnapshot); - }, listener::onFailure); - }, - listener::onFailure - )); + // 1. Verify/Create the state index and its alias exists + AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, state, createStateIndexListener); } private void getModelSnapshot(RevertModelSnapshotAction.Request request, JobResultsProvider provider, Consumer handler, From 694bc862cea3b534a7b8e87e95ade3e3323382a3 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Wed, 16 Jan 2019 14:22:32 -0600 Subject: [PATCH 5/7] updating for master merge --- .../xpack/ml/integration/MlConfigMigratorIT.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java index b8eae71b4bcab..10c758dcf8a2b 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -185,7 +186,11 @@ public void testMigrateConfigs() throws InterruptedException, IOException { public void testExistingSnapshotDoesNotBlockMigration() throws InterruptedException { // index a doc with the same Id as the config snapshot - IndexRequestBuilder indexRequest = client().prepareIndex(AnomalyDetectorsIndex.jobStateIndexName(), + PlainActionFuture future = PlainActionFuture.newFuture(); + AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client(), clusterService.state(), future); + future.actionGet(); + + IndexRequestBuilder indexRequest = client().prepareIndex(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), ElasticsearchMappings.DOC_TYPE, "ml-config") .setSource(Collections.singletonMap("a_field", "a_value")) .setOpType(DocWriteRequest.OpType.CREATE) From 77749e6097003ed109430a34079a9a93cb877baf Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Wed, 16 Jan 2019 15:31:10 -0600 Subject: [PATCH 6/7] Fixing tests after master merge --- .../ml/integration/MlConfigMigratorIT.java | 36 ++++++++++++------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java index 10c758dcf8a2b..4ee76a4b1ab21 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java @@ -15,6 +15,7 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.AliasOrIndex; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; @@ -53,6 +54,8 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder; @@ -75,7 +78,13 @@ public void setUpTests() { clusterService = mock(ClusterService.class); ClusterSettings clusterSettings = new ClusterSettings(nodeSettings(), new HashSet<>(Collections.singletonList( MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION))); + MetaData metaData = mock(MetaData.class); + SortedMap aliasOrIndexSortedMap = new TreeMap<>(); + when(metaData.getAliasAndIndexLookup()).thenReturn(aliasOrIndexSortedMap); + ClusterState clusterState = mock(ClusterState.class); + when(clusterState.getMetaData()).thenReturn(metaData); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + when(clusterService.state()).thenReturn(clusterState); } public void testWriteConfigToIndex() throws InterruptedException { @@ -140,6 +149,7 @@ public void testMigrateConfigs() throws InterruptedException, IOException { .metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build())) .routingTable(routingTable.build()) .build(); + when(clusterService.state()).thenReturn(clusterState); doAnswer(invocation -> { ClusterStateUpdateTask listener = (ClusterStateUpdateTask) invocation.getArguments()[1]; @@ -185,6 +195,19 @@ public void testMigrateConfigs() throws InterruptedException, IOException { } public void testExistingSnapshotDoesNotBlockMigration() throws InterruptedException { + // define the configs + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(buildJobBuilder("job-foo").build(), false); + + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addMlConfigIndex(metaData, routingTable); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build())) + .routingTable(routingTable.build()) + .build(); + when(clusterService.state()).thenReturn(clusterState); + // index a doc with the same Id as the config snapshot PlainActionFuture future = PlainActionFuture.newFuture(); AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client(), clusterService.state(), future); @@ -198,18 +221,6 @@ public void testExistingSnapshotDoesNotBlockMigration() throws InterruptedExcept indexRequest.execute().actionGet(); - // define the configs - MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); - mlMetadata.putJob(buildJobBuilder("job-foo").build(), false); - - MetaData.Builder metaData = MetaData.builder(); - RoutingTable.Builder routingTable = RoutingTable.builder(); - addMlConfigIndex(metaData, routingTable); - ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) - .metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build())) - .routingTable(routingTable.build()) - .build(); - doAnswer(invocation -> { ClusterStateUpdateTask listener = (ClusterStateUpdateTask) invocation.getArguments()[1]; listener.clusterStateProcessed("source", mock(ClusterState.class), mock(ClusterState.class)); @@ -263,6 +274,7 @@ public void testMigrateConfigs_GivenLargeNumberOfJobsAndDatafeeds() throws Inter .metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build())) .routingTable(routingTable.build()) .build(); + when(clusterService.state()).thenReturn(clusterState); doAnswer(invocation -> { ClusterStateUpdateTask listener = (ClusterStateUpdateTask) invocation.getArguments()[1]; From 3974ec9ed6637ee458c49ce04fa926078f42fafd Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Thu, 17 Jan 2019 11:19:47 -0600 Subject: [PATCH 7/7] adjusting concrete index lookup for ml-state --- .../ml/job/persistence/AnomalyDetectorsIndex.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java index 7d25ee90f1b2d..7e61d42705a90 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java @@ -18,7 +18,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import java.util.Arrays; -import java.util.List; +import java.util.Collections; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; @@ -112,13 +112,12 @@ public static void createStateIndexAndAliasIfNecessary(Client client, ClusterSta ); IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(); - String[] state_indices = indexNameExpressionResolver.concreteIndexNames(state, + String[] stateIndices = indexNameExpressionResolver.concreteIndexNames(state, IndicesOptions.lenientExpandOpen(), jobStateIndexPattern()); - if (state_indices.length > 0) { - List indices = Arrays.asList(state_indices); - indices.sort(String::compareTo); - createAliasListener.onResponse(indices.get(indices.size() - 1)); + if (stateIndices.length > 0) { + Arrays.sort(stateIndices, Collections.reverseOrder()); + createAliasListener.onResponse(stateIndices[0]); } else { CreateIndexRequest createIndexRequest = client.admin() .indices()