-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ML: creating ML State write alias and pointing writes there #37483
Changes from 7 commits
9ccd34e
a98b2d3
e194d8e
a8bbd2e
5754435
694bc86
77749e6
3974ec9
eb50b03
bf23c35
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,24 @@ | |
*/ | ||
package org.elasticsearch.xpack.core.ml.job.persistence; | ||
|
||
import org.elasticsearch.ResourceAlreadyExistsException; | ||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.action.admin.indices.alias.Alias; | ||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; | ||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; | ||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; | ||
import org.elasticsearch.action.support.IndicesOptions; | ||
import org.elasticsearch.action.support.master.AcknowledgedResponse; | ||
import org.elasticsearch.client.Client; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; | ||
|
||
import java.util.Arrays; | ||
import java.util.List; | ||
|
||
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; | ||
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; | ||
|
||
/** | ||
* Methods for handling index naming related functions | ||
*/ | ||
|
@@ -40,11 +58,11 @@ public static String resultsWriteAlias(String jobId) { | |
} | ||
|
||
/** | ||
* The name of the default index where a job's state is stored | ||
* @return The index name | ||
* The name of the alias pointing to the appropriate index for writing job state | ||
* @return The write alias name | ||
*/ | ||
public static String jobStateIndexName() { | ||
return AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX; | ||
public static String jobStateIndexWriteAlias() { | ||
return AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-write"; | ||
} | ||
|
||
/** | ||
|
@@ -64,4 +82,66 @@ public static String configIndexName() { | |
return AnomalyDetectorsIndexFields.CONFIG_INDEX; | ||
} | ||
|
||
/** | ||
* Create the .ml-state index (if necessary) | ||
* Create the .ml-state-write alias for the .ml-state index (if necessary) | ||
*/ | ||
public static void createStateIndexAndAliasIfNecessary(Client client, ClusterState state, final ActionListener<Boolean> finalListener) { | ||
|
||
if (state.getMetaData().getAliasAndIndexLookup().containsKey(jobStateIndexWriteAlias())) { | ||
finalListener.onResponse(false); | ||
return; | ||
} | ||
|
||
final ActionListener<String> createAliasListener = ActionListener.wrap( | ||
concreteIndexName -> { | ||
final IndicesAliasesRequest request = client.admin() | ||
.indices() | ||
.prepareAliases() | ||
.addAlias(concreteIndexName, jobStateIndexWriteAlias()) | ||
.request(); | ||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), | ||
ML_ORIGIN, | ||
request, | ||
ActionListener.<AcknowledgedResponse>wrap( | ||
resp -> finalListener.onResponse(resp.isAcknowledged()), | ||
finalListener::onFailure), | ||
client.admin().indices()::aliases); | ||
}, | ||
finalListener::onFailure | ||
); | ||
|
||
IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(); | ||
String[] state_indices = indexNameExpressionResolver.concreteIndexNames(state, | ||
IndicesOptions.lenientExpandOpen(), | ||
jobStateIndexPattern()); | ||
if (state_indices.length > 0) { | ||
List<String> indices = Arrays.asList(state_indices); | ||
indices.sort(String::compareTo); | ||
createAliasListener.onResponse(indices.get(indices.size() - 1)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of creating the temporary list just for sorting you could sort the array directly:
or:
or:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lulz, can't believe I missed that. |
||
} else { | ||
CreateIndexRequest createIndexRequest = client.admin() | ||
.indices() | ||
.prepareCreate(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX) | ||
.addAlias(new Alias(jobStateIndexWriteAlias())) | ||
.request(); | ||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), | ||
ML_ORIGIN, | ||
createIndexRequest, | ||
ActionListener.<CreateIndexResponse>wrap( | ||
createIndexResponse -> finalListener.onResponse(true), | ||
createIndexFailure -> { | ||
// If it was created between our last check, and this request being handled, we should add the alias | ||
// Adding an alias that already exists is idempotent. So, no need to double check if the alias exists | ||
// as well. | ||
if (createIndexFailure instanceof ResourceAlreadyExistsException) { | ||
createAliasListener.onResponse(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX); | ||
} else { | ||
finalListener.onFailure(createIndexFailure); | ||
} | ||
}), | ||
client.admin().indices()::create); | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -439,7 +439,7 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listen | |
|
||
logger.debug("taking a snapshot of ml_metadata"); | ||
String documentId = "ml-config"; | ||
IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.jobStateIndexName(), | ||
IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's no guarantee that an autodetect process will have been started on the newer version of the product at the point when this call is made - if all ML jobs are closed prior to upgrading from 6.5 to 6.7 then that will definitely trigger this situation. So this method needs to call |
||
ElasticsearchMappings.DOC_TYPE, documentId) | ||
.setOpType(DocWriteRequest.OpType.CREATE); | ||
|
||
|
@@ -456,8 +456,10 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listen | |
return; | ||
} | ||
|
||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, indexRequest.request(), | ||
ActionListener.<IndexResponse>wrap( | ||
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterService.state(), ActionListener.wrap( | ||
r -> { | ||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, indexRequest.request(), | ||
ActionListener.<IndexResponse>wrap( | ||
indexResponse -> { | ||
listener.onResponse(indexResponse.getResult() == DocWriteResponse.Result.CREATED); | ||
}, | ||
|
@@ -469,8 +471,11 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listen | |
listener.onFailure(e); | ||
} | ||
}), | ||
client::index | ||
); | ||
client::index | ||
); | ||
}, | ||
listener::onFailure | ||
)); | ||
} | ||
|
||
private void createConfigIndex(ActionListener<Boolean> listener) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -228,7 +228,7 @@ public void persistCategoryDefinition(CategoryDefinition category) { | |
*/ | ||
public void persistQuantiles(Quantiles quantiles) { | ||
Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.documentId(quantiles.getJobId())); | ||
persistable.persist(AnomalyDetectorsIndex.jobStateIndexName()).actionGet(); | ||
persistable.persist(AnomalyDetectorsIndex.jobStateIndexWriteAlias()).actionGet(); | ||
} | ||
|
||
/** | ||
|
@@ -237,7 +237,7 @@ public void persistQuantiles(Quantiles quantiles) { | |
public void persistQuantiles(Quantiles quantiles, WriteRequest.RefreshPolicy refreshPolicy, ActionListener<IndexResponse> listener) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method can be called when reverting a model snapshot, and there's no guarantee that an autodetect process will have been started on the newer version of the product at the point when a model snapshot is reverted. The call chain is |
||
Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.documentId(quantiles.getJobId())); | ||
persistable.setRefreshPolicy(refreshPolicy); | ||
persistable.persist(AnomalyDetectorsIndex.jobStateIndexName(), listener); | ||
persistable.persist(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), listener); | ||
} | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
state_indices
should bestateIndices