Skip to content

Commit

Permalink
ML: creating ML State write alias and pointing writes there (#37483)
Browse files Browse the repository at this point in the history
* ML: creating ML State write alias and pointing writes there

* Moving alias check to openJob method

* adjusting concrete index lookup for ml-state
  • Loading branch information
benwtrent committed Jan 18, 2019
1 parent df3df4a commit 560bed5
Show file tree
Hide file tree
Showing 16 changed files with 266 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,25 @@
*/
package org.elasticsearch.xpack.core.ml.job.persistence;

import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.settings.Settings;

import java.util.Arrays;
import java.util.Collections;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

/**
* Methods for handling index naming related functions
*/
Expand Down Expand Up @@ -40,11 +59,11 @@ public static String resultsWriteAlias(String jobId) {
}

/**
* The name of the default index where a job's state is stored
* @return The index name
* The name of the alias pointing to the appropriate index for writing job state
* @return The write alias name
*/
public static String jobStateIndexName() {
return AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX;
public static String jobStateIndexWriteAlias() {
return AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-write";
}

/**
Expand All @@ -64,4 +83,65 @@ public static String configIndexName() {
return AnomalyDetectorsIndexFields.CONFIG_INDEX;
}

/**
* Create the .ml-state index (if necessary)
* Create the .ml-state-write alias for the .ml-state index (if necessary)
*/
public static void createStateIndexAndAliasIfNecessary(Client client, ClusterState state, final ActionListener<Boolean> finalListener) {

if (state.getMetaData().getAliasAndIndexLookup().containsKey(jobStateIndexWriteAlias())) {
finalListener.onResponse(false);
return;
}

final ActionListener<String> createAliasListener = ActionListener.wrap(
concreteIndexName -> {
final IndicesAliasesRequest request = client.admin()
.indices()
.prepareAliases()
.addAlias(concreteIndexName, jobStateIndexWriteAlias())
.request();
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ML_ORIGIN,
request,
ActionListener.<AcknowledgedResponse>wrap(
resp -> finalListener.onResponse(resp.isAcknowledged()),
finalListener::onFailure),
client.admin().indices()::aliases);
},
finalListener::onFailure
);

IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(Settings.EMPTY);
String[] stateIndices = indexNameExpressionResolver.concreteIndexNames(state,
IndicesOptions.lenientExpandOpen(),
jobStateIndexPattern());
if (stateIndices.length > 0) {
Arrays.sort(stateIndices, Collections.reverseOrder());
createAliasListener.onResponse(stateIndices[0]);
} else {
CreateIndexRequest createIndexRequest = client.admin()
.indices()
.prepareCreate(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX)
.addAlias(new Alias(jobStateIndexWriteAlias()))
.request();
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ML_ORIGIN,
createIndexRequest,
ActionListener.<CreateIndexResponse>wrap(
createIndexResponse -> finalListener.onResponse(true),
createIndexFailure -> {
// If it was created between our last check, and this request being handled, we should add the alias
// Adding an alias that already exists is idempotent. So, no need to double check if the alias exists
// as well.
if (createIndexFailure instanceof ResourceAlreadyExistsException) {
createAliasListener.onResponse(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX);
} else {
finalListener.onFailure(createIndexFailure);
}
}),
client.admin().indices()::create);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction;
import org.elasticsearch.xpack.core.ml.action.ValidateDetectorAction;
import org.elasticsearch.xpack.core.ml.action.ValidateJobConfigAction;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
import org.elasticsearch.xpack.core.monitoring.action.MonitoringBulkAction;
Expand Down Expand Up @@ -761,7 +760,7 @@ public void testMachineLearningAdminRole() {

assertNoAccessAllowed(role, "foo");
assertOnlyReadAllowed(role, MlMetaIndex.INDEX_NAME);
assertOnlyReadAllowed(role, AnomalyDetectorsIndex.jobStateIndexName());
assertOnlyReadAllowed(role, AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX);
assertOnlyReadAllowed(role, AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT);
assertOnlyReadAllowed(role, AuditorField.NOTIFICATIONS_INDEX);
}
Expand Down Expand Up @@ -813,7 +812,7 @@ public void testMachineLearningUserRole() {

assertNoAccessAllowed(role, "foo");
assertNoAccessAllowed(role, MlMetaIndex.INDEX_NAME);
assertNoAccessAllowed(role, AnomalyDetectorsIndex.jobStateIndexName());
assertNoAccessAllowed(role, AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX);
assertOnlyReadAllowed(role, AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT);
assertOnlyReadAllowed(role, AuditorField.NOTIFICATIONS_INDEX);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;

import java.io.IOException;
Expand All @@ -30,13 +31,13 @@ public final class XPackRestTestHelper {
public static final List<String> ML_PRE_V660_TEMPLATES = Collections.unmodifiableList(
Arrays.asList(AuditorField.NOTIFICATIONS_INDEX,
MlMetaIndex.INDEX_NAME,
AnomalyDetectorsIndex.jobStateIndexName(),
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX,
AnomalyDetectorsIndex.jobResultsIndexPrefix()));

public static final List<String> ML_POST_V660_TEMPLATES = Collections.unmodifiableList(
Arrays.asList(AuditorField.NOTIFICATIONS_INDEX,
MlMetaIndex.INDEX_NAME,
AnomalyDetectorsIndex.jobStateIndexName(),
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX,
AnomalyDetectorsIndex.jobResultsIndexPrefix(),
AnomalyDetectorsIndex.configIndexName()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public void testDeleteExpiredData() throws Exception {
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < 10010; i++) {
String docId = "non_existing_job_" + randomFrom("model_state_1234567#" + i, "quantiles", "categorizer_state#" + i);
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexName(), "doc", docId);
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), "doc", docId);
indexRequest.source(Collections.emptyMap());
bulkRequestBuilder.add(indexRequest);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import org.elasticsearch.xpack.core.ml.action.ValidateDetectorAction;
import org.elasticsearch.xpack.core.ml.action.ValidateJobConfigAction;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.notifications.AuditMessage;
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
Expand Down Expand Up @@ -698,7 +699,7 @@ public UnaryOperator<Map<String, IndexTemplateMetaData>> getIndexTemplateMetaDat
}

try (XContentBuilder stateMapping = ElasticsearchMappings.stateMapping()) {
IndexTemplateMetaData stateTemplate = IndexTemplateMetaData.builder(AnomalyDetectorsIndex.jobStateIndexName())
IndexTemplateMetaData stateTemplate = IndexTemplateMetaData.builder(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX)
.patterns(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexPattern()))
// TODO review these settings
.settings(Settings.builder()
Expand All @@ -707,9 +708,9 @@ public UnaryOperator<Map<String, IndexTemplateMetaData>> getIndexTemplateMetaDat
.putMapping(ElasticsearchMappings.DOC_TYPE, Strings.toString(stateMapping))
.version(Version.CURRENT.id)
.build();
templates.put(AnomalyDetectorsIndex.jobStateIndexName(), stateTemplate);
templates.put(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, stateTemplate);
} catch (IOException e) {
logger.error("Error loading the template for the " + AnomalyDetectorsIndex.jobStateIndexName() + " index", e);
logger.error("Error loading the template for the " + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + " index", e);
}

try (XContentBuilder docMapping = ElasticsearchMappings.resultsMapping()) {
Expand Down Expand Up @@ -739,7 +740,7 @@ public UnaryOperator<Map<String, IndexTemplateMetaData>> getIndexTemplateMetaDat
public static boolean allTemplatesInstalled(ClusterState clusterState) {
boolean allPresent = true;
List<String> templateNames = Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, MlMetaIndex.INDEX_NAME,
AnomalyDetectorsIndex.jobStateIndexName(), AnomalyDetectorsIndex.jobResultsIndexPrefix(),
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, AnomalyDetectorsIndex.jobResultsIndexPrefix(),
AnomalyDetectorsIndex.configIndexName());
for (String templateName : templateNames) {
allPresent = allPresent && TemplateUtils.checkTemplateExistsAndVersionIsGTECurrentVersion(templateName, clusterState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listen

logger.debug("taking a snapshot of ml_metadata");
String documentId = "ml-config";
IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.jobStateIndexName(),
IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.jobStateIndexWriteAlias(),
ElasticsearchMappings.DOC_TYPE, documentId)
.setOpType(DocWriteRequest.OpType.CREATE);

Expand All @@ -366,8 +366,10 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listen
return;
}

executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, indexRequest.request(),
ActionListener.<IndexResponse>wrap(
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterService.state(), ActionListener.wrap(
r -> {
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, indexRequest.request(),
ActionListener.<IndexResponse>wrap(
indexResponse -> {
listener.onResponse(indexResponse.getResult() == DocWriteResponse.Result.CREATED);
},
Expand All @@ -379,8 +381,11 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listen
listener.onFailure(e);
}
}),
client::index
);
client::index
);
},
listener::onFailure
));
}

private void createConfigIndex(ActionListener<Boolean> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ public void onFailure(Exception e) {
// Try adding state doc mapping
ActionListener<Boolean> resultsPutMappingHandler = ActionListener.wrap(
response -> {
addDocMappingIfMissing(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings::stateMapping,
addDocMappingIfMissing(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), ElasticsearchMappings::stateMapping,
state, missingMappingsListener);
}, listener::onFailure
);
Expand Down Expand Up @@ -800,6 +800,7 @@ public static class OpenJobPersistentTasksExecutor extends PersistentTasksExecut
private volatile int maxConcurrentJobAllocations;
private volatile int maxMachineMemoryPercent;
private volatile int maxLazyMLNodes;
private volatile ClusterState clusterState;

public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterService,
AutodetectProcessManager autodetectProcessManager, MlMemoryTracker memoryTracker,
Expand All @@ -817,6 +818,7 @@ public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterS
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MachineLearning.MAX_MACHINE_MEMORY_PERCENT, this::setMaxMachineMemoryPercent);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_LAZY_ML_NODES, this::setMaxLazyMLNodes);
clusterService.addListener(event -> clusterState = event.state());
}

@Override
Expand Down Expand Up @@ -878,7 +880,7 @@ protected void nodeOperation(AllocatedPersistentTask task, OpenJobAction.JobPara
}

String jobId = jobTask.getJobId();
autodetectProcessManager.openJob(jobTask, e2 -> {
autodetectProcessManager.openJob(jobTask, clusterState, e2 -> {
if (e2 == null) {
FinalizeJobExecutionAction.Request finalizeRequest = new FinalizeJobExecutionAction.Request(new String[]{jobId});
executeAsyncWithOrigin(client, ML_ORIGIN, FinalizeJobExecutionAction.INSTANCE, finalizeRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
Expand Down Expand Up @@ -79,26 +80,38 @@ protected void masterOperation(RevertModelSnapshotAction.Request request, Cluste
logger.debug("Received request to revert to snapshot id '{}' for job '{}', deleting intervening results: {}",
request.getSnapshotId(), request.getJobId(), request.getDeleteInterveningResults());

jobManager.jobExists(request.getJobId(), ActionListener.wrap(
exists -> {
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
JobState jobState = MlTasks.getJobState(request.getJobId(), tasks);

if (jobState.equals(JobState.CLOSED) == false) {
throw ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_REVERT));
// 3. Revert the state
ActionListener<Boolean> jobExistsListener = ActionListener.wrap(
exists -> {
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
JobState jobState = MlTasks.getJobState(request.getJobId(), tasks);

if (jobState.equals(JobState.CLOSED) == false) {
throw ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_REVERT));
}

getModelSnapshot(request, jobResultsProvider, modelSnapshot -> {
ActionListener<RevertModelSnapshotAction.Response> wrappedListener = listener;
if (request.getDeleteInterveningResults()) {
wrappedListener = wrapDeleteOldDataListener(wrappedListener, modelSnapshot, request.getJobId());
wrappedListener = wrapRevertDataCountsListener(wrappedListener, modelSnapshot, request.getJobId());
}
jobManager.revertSnapshot(request, wrappedListener, modelSnapshot);
}, listener::onFailure);
},
listener::onFailure
);


// 2. Verify the job exists
ActionListener<Boolean> createStateIndexListener = ActionListener.wrap(
r -> jobManager.jobExists(request.getJobId(), jobExistsListener),
listener::onFailure
);

getModelSnapshot(request, jobResultsProvider, modelSnapshot -> {
ActionListener<RevertModelSnapshotAction.Response> wrappedListener = listener;
if (request.getDeleteInterveningResults()) {
wrappedListener = wrapDeleteOldDataListener(wrappedListener, modelSnapshot, request.getJobId());
wrappedListener = wrapRevertDataCountsListener(wrappedListener, modelSnapshot, request.getJobId());
}
jobManager.revertSnapshot(request, wrappedListener, modelSnapshot);
}, listener::onFailure);
},
listener::onFailure
));
// 1. Verify/Create the state index and its alias exists
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, state, createStateIndexListener);
}

private void getModelSnapshot(RevertModelSnapshotAction.Request request, JobResultsProvider provider, Consumer<ModelSnapshot> handler,
Expand Down
Loading

0 comments on commit 560bed5

Please sign in to comment.