Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ML: creating ML State write alias and pointing writes there #37483

Merged
merged 10 commits into from
Jan 18, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,24 @@
*/
package org.elasticsearch.xpack.core.ml.job.persistence;

import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;

import java.util.Arrays;
import java.util.Collections;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

/**
* Methods for handling index naming related functions
*/
Expand Down Expand Up @@ -40,11 +58,11 @@ public static String resultsWriteAlias(String jobId) {
}

/**
* The name of the default index where a job's state is stored
* @return The index name
* The name of the alias pointing to the appropriate index for writing job state
* @return The write alias name
*/
public static String jobStateIndexName() {
return AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX;
public static String jobStateIndexWriteAlias() {
return AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-write";
}

/**
Expand All @@ -64,4 +82,65 @@ public static String configIndexName() {
return AnomalyDetectorsIndexFields.CONFIG_INDEX;
}

/**
* Create the .ml-state index (if necessary)
* Create the .ml-state-write alias for the .ml-state index (if necessary)
*/
public static void createStateIndexAndAliasIfNecessary(Client client, ClusterState state, final ActionListener<Boolean> finalListener) {

if (state.getMetaData().getAliasAndIndexLookup().containsKey(jobStateIndexWriteAlias())) {
finalListener.onResponse(false);
return;
}

final ActionListener<String> createAliasListener = ActionListener.wrap(
concreteIndexName -> {
final IndicesAliasesRequest request = client.admin()
.indices()
.prepareAliases()
.addAlias(concreteIndexName, jobStateIndexWriteAlias())
.request();
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ML_ORIGIN,
request,
ActionListener.<AcknowledgedResponse>wrap(
resp -> finalListener.onResponse(resp.isAcknowledged()),
finalListener::onFailure),
client.admin().indices()::aliases);
},
finalListener::onFailure
);

IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver();
String[] stateIndices = indexNameExpressionResolver.concreteIndexNames(state,
IndicesOptions.lenientExpandOpen(),
jobStateIndexPattern());
if (stateIndices.length > 0) {
Arrays.sort(stateIndices, Collections.reverseOrder());
createAliasListener.onResponse(stateIndices[0]);
} else {
CreateIndexRequest createIndexRequest = client.admin()
.indices()
.prepareCreate(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX)
.addAlias(new Alias(jobStateIndexWriteAlias()))
.request();
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ML_ORIGIN,
createIndexRequest,
ActionListener.<CreateIndexResponse>wrap(
createIndexResponse -> finalListener.onResponse(true),
createIndexFailure -> {
// If it was created between our last check, and this request being handled, we should add the alias
// Adding an alias that already exists is idempotent. So, no need to double check if the alias exists
// as well.
if (createIndexFailure instanceof ResourceAlreadyExistsException) {
createAliasListener.onResponse(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX);
} else {
finalListener.onFailure(createIndexFailure);
}
}),
client.admin().indices()::create);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction;
import org.elasticsearch.xpack.core.ml.action.ValidateDetectorAction;
import org.elasticsearch.xpack.core.ml.action.ValidateJobConfigAction;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
import org.elasticsearch.xpack.core.monitoring.action.MonitoringBulkAction;
Expand Down Expand Up @@ -762,7 +761,7 @@ public void testMachineLearningAdminRole() {

assertNoAccessAllowed(role, "foo");
assertOnlyReadAllowed(role, MlMetaIndex.INDEX_NAME);
assertOnlyReadAllowed(role, AnomalyDetectorsIndex.jobStateIndexName());
assertOnlyReadAllowed(role, AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX);
assertOnlyReadAllowed(role, AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT);
assertOnlyReadAllowed(role, AuditorField.NOTIFICATIONS_INDEX);
}
Expand Down Expand Up @@ -814,7 +813,7 @@ public void testMachineLearningUserRole() {

assertNoAccessAllowed(role, "foo");
assertNoAccessAllowed(role, MlMetaIndex.INDEX_NAME);
assertNoAccessAllowed(role, AnomalyDetectorsIndex.jobStateIndexName());
assertNoAccessAllowed(role, AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX);
assertOnlyReadAllowed(role, AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT);
assertOnlyReadAllowed(role, AuditorField.NOTIFICATIONS_INDEX);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;

import java.io.IOException;
Expand All @@ -30,13 +31,13 @@ public final class XPackRestTestHelper {
public static final List<String> ML_PRE_V660_TEMPLATES = Collections.unmodifiableList(
Arrays.asList(AuditorField.NOTIFICATIONS_INDEX,
MlMetaIndex.INDEX_NAME,
AnomalyDetectorsIndex.jobStateIndexName(),
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX,
AnomalyDetectorsIndex.jobResultsIndexPrefix()));

public static final List<String> ML_POST_V660_TEMPLATES = Collections.unmodifiableList(
Arrays.asList(AuditorField.NOTIFICATIONS_INDEX,
MlMetaIndex.INDEX_NAME,
AnomalyDetectorsIndex.jobStateIndexName(),
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX,
AnomalyDetectorsIndex.jobResultsIndexPrefix(),
AnomalyDetectorsIndex.configIndexName()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public void testDeleteExpiredData() throws Exception {
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < 10010; i++) {
String docId = "non_existing_job_" + randomFrom("model_state_1234567#" + i, "quantiles", "categorizer_state#" + i);
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexName(), "doc", docId);
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), "doc", docId);
indexRequest.source(Collections.emptyMap());
bulkRequestBuilder.add(indexRequest);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import org.elasticsearch.xpack.core.ml.action.ValidateDetectorAction;
import org.elasticsearch.xpack.core.ml.action.ValidateJobConfigAction;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.notifications.AuditMessage;
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
Expand Down Expand Up @@ -701,7 +702,7 @@ public UnaryOperator<Map<String, IndexTemplateMetaData>> getIndexTemplateMetaDat
}

try (XContentBuilder stateMapping = ElasticsearchMappings.stateMapping()) {
IndexTemplateMetaData stateTemplate = IndexTemplateMetaData.builder(AnomalyDetectorsIndex.jobStateIndexName())
IndexTemplateMetaData stateTemplate = IndexTemplateMetaData.builder(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX)
.patterns(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexPattern()))
// TODO review these settings
.settings(Settings.builder()
Expand All @@ -710,9 +711,9 @@ public UnaryOperator<Map<String, IndexTemplateMetaData>> getIndexTemplateMetaDat
.putMapping(ElasticsearchMappings.DOC_TYPE, Strings.toString(stateMapping))
.version(Version.CURRENT.id)
.build();
templates.put(AnomalyDetectorsIndex.jobStateIndexName(), stateTemplate);
templates.put(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, stateTemplate);
} catch (IOException e) {
logger.error("Error loading the template for the " + AnomalyDetectorsIndex.jobStateIndexName() + " index", e);
logger.error("Error loading the template for the " + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + " index", e);
}

try (XContentBuilder docMapping = ElasticsearchMappings.resultsMapping()) {
Expand Down Expand Up @@ -742,7 +743,7 @@ public UnaryOperator<Map<String, IndexTemplateMetaData>> getIndexTemplateMetaDat
public static boolean allTemplatesInstalled(ClusterState clusterState) {
boolean allPresent = true;
List<String> templateNames = Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, MlMetaIndex.INDEX_NAME,
AnomalyDetectorsIndex.jobStateIndexName(), AnomalyDetectorsIndex.jobResultsIndexPrefix());
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, AnomalyDetectorsIndex.jobResultsIndexPrefix());
for (String templateName : templateNames) {
allPresent = allPresent && TemplateUtils.checkTemplateExistsAndVersionIsGTECurrentVersion(templateName, clusterState);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listen

logger.debug("taking a snapshot of ml_metadata");
String documentId = "ml-config";
IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.jobStateIndexName(),
IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.jobStateIndexWriteAlias(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no guarantee that an autodetect process will have been started on the newer version of the product at the point when this call is made - if all ML jobs are closed prior to upgrading from 6.5 to 6.7 then that will definitely trigger this situation. So this method needs to call AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary() first.

ElasticsearchMappings.DOC_TYPE, documentId)
.setOpType(DocWriteRequest.OpType.CREATE);

Expand All @@ -456,8 +456,10 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listen
return;
}

executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, indexRequest.request(),
ActionListener.<IndexResponse>wrap(
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterService.state(), ActionListener.wrap(
r -> {
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, indexRequest.request(),
ActionListener.<IndexResponse>wrap(
indexResponse -> {
listener.onResponse(indexResponse.getResult() == DocWriteResponse.Result.CREATED);
},
Expand All @@ -469,8 +471,11 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listen
listener.onFailure(e);
}
}),
client::index
);
client::index
);
},
listener::onFailure
));
}

private void createConfigIndex(ActionListener<Boolean> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ public void onFailure(Exception e) {
// Try adding state doc mapping
ActionListener<Boolean> resultsPutMappingHandler = ActionListener.wrap(
response -> {
addDocMappingIfMissing(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings::stateMapping,
addDocMappingIfMissing(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), ElasticsearchMappings::stateMapping,
state, jobUpdateListener);
}, listener::onFailure
);
Expand Down Expand Up @@ -673,6 +673,7 @@ public static class OpenJobPersistentTasksExecutor extends PersistentTasksExecut
private volatile int maxConcurrentJobAllocations;
private volatile int maxMachineMemoryPercent;
private volatile int maxLazyMLNodes;
private volatile ClusterState clusterState;

public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterService,
AutodetectProcessManager autodetectProcessManager, MlMemoryTracker memoryTracker,
Expand All @@ -689,6 +690,7 @@ public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterS
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MachineLearning.MAX_MACHINE_MEMORY_PERCENT, this::setMaxMachineMemoryPercent);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_LAZY_ML_NODES, this::setMaxLazyMLNodes);
clusterService.addListener(event -> clusterState = event.state());
}

@Override
Expand Down Expand Up @@ -748,7 +750,7 @@ protected void nodeOperation(AllocatedPersistentTask task, OpenJobAction.JobPara
}

String jobId = jobTask.getJobId();
autodetectProcessManager.openJob(jobTask, e2 -> {
autodetectProcessManager.openJob(jobTask, clusterState, e2 -> {
if (e2 == null) {
FinalizeJobExecutionAction.Request finalizeRequest = new FinalizeJobExecutionAction.Request(new String[]{jobId});
executeAsyncWithOrigin(client, ML_ORIGIN, FinalizeJobExecutionAction.INSTANCE, finalizeRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
Expand Down Expand Up @@ -79,26 +80,38 @@ protected void masterOperation(RevertModelSnapshotAction.Request request, Cluste
logger.debug("Received request to revert to snapshot id '{}' for job '{}', deleting intervening results: {}",
request.getSnapshotId(), request.getJobId(), request.getDeleteInterveningResults());

jobManager.jobExists(request.getJobId(), ActionListener.wrap(
exists -> {
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
JobState jobState = MlTasks.getJobState(request.getJobId(), tasks);

if (jobState.equals(JobState.CLOSED) == false) {
throw ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_REVERT));
// 3. Revert the state
ActionListener<Boolean> jobExistsListener = ActionListener.wrap(
exists -> {
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
JobState jobState = MlTasks.getJobState(request.getJobId(), tasks);

if (jobState.equals(JobState.CLOSED) == false) {
throw ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_REVERT));
}

getModelSnapshot(request, jobResultsProvider, modelSnapshot -> {
ActionListener<RevertModelSnapshotAction.Response> wrappedListener = listener;
if (request.getDeleteInterveningResults()) {
wrappedListener = wrapDeleteOldDataListener(wrappedListener, modelSnapshot, request.getJobId());
wrappedListener = wrapRevertDataCountsListener(wrappedListener, modelSnapshot, request.getJobId());
}
jobManager.revertSnapshot(request, wrappedListener, modelSnapshot);
}, listener::onFailure);
},
listener::onFailure
);


// 2. Verify the job exists
ActionListener<Boolean> createStateIndexListener = ActionListener.wrap(
r -> jobManager.jobExists(request.getJobId(), jobExistsListener),
listener::onFailure
);

getModelSnapshot(request, jobResultsProvider, modelSnapshot -> {
ActionListener<RevertModelSnapshotAction.Response> wrappedListener = listener;
if (request.getDeleteInterveningResults()) {
wrappedListener = wrapDeleteOldDataListener(wrappedListener, modelSnapshot, request.getJobId());
wrappedListener = wrapRevertDataCountsListener(wrappedListener, modelSnapshot, request.getJobId());
}
jobManager.revertSnapshot(request, wrappedListener, modelSnapshot);
}, listener::onFailure);
},
listener::onFailure
));
// 1. Verify/Create the state index and its alias exists
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, state, createStateIndexListener);
}

private void getModelSnapshot(RevertModelSnapshotAction.Request request, JobResultsProvider provider, Consumer<ModelSnapshot> handler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public void persistCategoryDefinition(CategoryDefinition category) {
*/
public void persistQuantiles(Quantiles quantiles) {
Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.documentId(quantiles.getJobId()));
persistable.persist(AnomalyDetectorsIndex.jobStateIndexName()).actionGet();
persistable.persist(AnomalyDetectorsIndex.jobStateIndexWriteAlias()).actionGet();
}

/**
Expand All @@ -237,7 +237,7 @@ public void persistQuantiles(Quantiles quantiles) {
public void persistQuantiles(Quantiles quantiles, WriteRequest.RefreshPolicy refreshPolicy, ActionListener<IndexResponse> listener) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method can be called when reverting a model snapshot, and there's no guarantee that an autodetect process will have been started on the newer version of the product at the point when a model snapshot is reverted. The call chain is TransportRevertModelSnapshotAction.masterOperation() -> JobManager.revertSnapshot() -> this method. So one of those two calls needs to call AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary() first.

Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.documentId(quantiles.getJobId()));
persistable.setRefreshPolicy(refreshPolicy);
persistable.persist(AnomalyDetectorsIndex.jobStateIndexName(), listener);
persistable.persist(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), listener);
}

/**
Expand Down
Loading