Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] Move SLM history to data stream (#63038) #63122

Merged
merged 5 commits into from
Nov 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -706,14 +706,21 @@ protected static void wipeAllIndices() throws IOException {
protected static void wipeDataStreams() throws IOException {
try {
if (hasXPack()) {
adminClient().performRequest(new Request("DELETE", "_data_stream/*"));
adminClient().performRequest(new Request("DELETE", "_data_stream/*?expand_wildcards=all"));
}
} catch (ResponseException e) {
// We hit a version of ES that doesn't serialize DeleteDataStreamAction.Request#wildcardExpressionsOriginallySpecified field or
// that doesn't support data streams so it's safe to ignore
int statusCode = e.getResponse().getStatusLine().getStatusCode();
if (org.elasticsearch.common.collect.Set.of(404, 405, 500).contains(statusCode) == false) {
throw e;
// We hit a version of ES that doesn't understand expand_wildcards, try again without it
try {
if (hasXPack()) {
adminClient().performRequest(new Request("DELETE", "_data_stream/*"));
}
} catch (ResponseException ee) {
// We hit a version of ES that doesn't serialize DeleteDataStreamAction.Request#wildcardExpressionsOriginallySpecified
//field or that doesn't support data streams so it's safe to ignore
int statusCode = e.getResponse().getStatusLine().getStatusCode();
if (org.elasticsearch.common.collect.Set.of(404, 405, 500).contains(statusCode) == false) {
throw e;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,11 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
Expand All @@ -27,6 +24,7 @@

import static org.elasticsearch.xpack.core.ilm.LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING;
import static org.elasticsearch.xpack.core.slm.history.SnapshotLifecycleTemplateRegistry.INDEX_TEMPLATE_VERSION;
import static org.elasticsearch.xpack.core.slm.history.SnapshotLifecycleTemplateRegistry.SLM_TEMPLATE_NAME;

/**
* Records Snapshot Lifecycle Management actions as represented by {@link SnapshotHistoryItem} into an index
Expand All @@ -35,8 +33,7 @@
public class SnapshotHistoryStore {
private static final Logger logger = LogManager.getLogger(SnapshotHistoryStore.class);

public static final String SLM_HISTORY_INDEX_PREFIX = ".slm-history-" + INDEX_TEMPLATE_VERSION + "-";
public static final String SLM_HISTORY_ALIAS = ".slm-history-" + INDEX_TEMPLATE_VERSION;
public static final String SLM_HISTORY_DATA_STREAM = ".slm-history-" + INDEX_TEMPLATE_VERSION;

private final Client client;
private final ClusterService clusterService;
Expand All @@ -59,85 +56,29 @@ public void putAsync(SnapshotHistoryItem item) {
SLM_HISTORY_INDEX_ENABLED_SETTING.getKey(), item);
return;
}
logger.trace("about to index snapshot history item in index [{}]: [{}]", SLM_HISTORY_ALIAS, item);
ensureHistoryIndex(client, clusterService.state(), ActionListener.wrap(createdIndex -> {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
item.toXContent(builder, ToXContent.EMPTY_PARAMS);
IndexRequest request = new IndexRequest(SLM_HISTORY_ALIAS)
.source(builder);
client.index(request, ActionListener.wrap(indexResponse -> {
logger.debug("successfully indexed snapshot history item with id [{}] in index [{}]: [{}]",
indexResponse.getId(), SLM_HISTORY_ALIAS, item);
}, exception -> {
logger.error(new ParameterizedMessage("failed to index snapshot history item in index [{}]: [{}]",
SLM_HISTORY_ALIAS, item), exception);
}));
} catch (IOException exception) {
logger.error(new ParameterizedMessage("failed to index snapshot history item in index [{}]: [{}]",
SLM_HISTORY_ALIAS, item), exception);
}
}, ex -> logger.error(new ParameterizedMessage("failed to ensure SLM history index exists, not indexing history item [{}]",
item), ex)));
}

/**
* Checks if the SLM history index exists, and if not, creates it.
*
* @param client The client to use to create the index if needed
* @param state The current cluster state, to determine if the alias exists
* @param andThen Called after the index has been created. `onResponse` called with `true` if the index was created,
* `false` if it already existed.
*/
static void ensureHistoryIndex(Client client, ClusterState state, ActionListener<Boolean> andThen) {
final String initialHistoryIndexName = SLM_HISTORY_INDEX_PREFIX + "000001";
final IndexAbstraction slmHistory = state.metadata().getIndicesLookup().get(SLM_HISTORY_ALIAS);
final IndexAbstraction initialHistoryIndex = state.metadata().getIndicesLookup().get(initialHistoryIndexName);

if (slmHistory == null && initialHistoryIndex == null) {
// No alias or index exists with the expected names, so create the index with appropriate alias
client.admin().indices().prepareCreate(initialHistoryIndexName)
.setWaitForActiveShards(1)
.addAlias(new Alias(SLM_HISTORY_ALIAS)
.writeIndex(true)
.isHidden(true))
.execute(new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse response) {
andThen.onResponse(true);
}

@Override
public void onFailure(Exception e) {
if (e instanceof ResourceAlreadyExistsException) {
// The index didn't exist before we made the call, there was probably a race - just ignore this
logger.debug("index [{}] was created after checking for its existence, likely due to a concurrent call",
initialHistoryIndexName);
andThen.onResponse(false);
} else {
andThen.onFailure(e);
}
}
});
} else if (slmHistory == null) {
// alias does not exist but initial index does, something is broken
andThen.onFailure(new IllegalStateException("SLM history index [" + initialHistoryIndexName +
"] already exists but does not have alias [" + SLM_HISTORY_ALIAS + "]"));
} else if (slmHistory.getType() == IndexAbstraction.Type.ALIAS) {
if (slmHistory.getWriteIndex() != null) {
// The alias exists and has a write index, so we're good
andThen.onResponse(false);
} else {
// The alias does not have a write index, so we can't index into it
andThen.onFailure(new IllegalStateException("SLM history alias [" + SLM_HISTORY_ALIAS + "does not have a write index"));
}
} else if (slmHistory.getType() != IndexAbstraction.Type.ALIAS) {
// This is not an alias, error out
andThen.onFailure(new IllegalStateException("SLM history alias [" + SLM_HISTORY_ALIAS +
"] already exists as " + slmHistory.getType().getDisplayName()));
} else {
logger.error("unexpected IndexOrAlias for [{}]: [{}]", SLM_HISTORY_ALIAS, slmHistory);
// (slmHistory.isAlias() == true) but (slmHistory instanceof Alias == false)?
assert false : SLM_HISTORY_ALIAS + " cannot be both an alias and not an alias simultaneously";
logger.trace("about to index snapshot history item in data stream [{}]: [{}]", SLM_HISTORY_DATA_STREAM, item);
Metadata metadata = clusterService.state().getMetadata();
if (metadata.dataStreams().containsKey(SLM_HISTORY_DATA_STREAM) == false &&
metadata.templatesV2().containsKey(SLM_TEMPLATE_NAME) == false) {
logger.error(new ParameterizedMessage("failed to index snapshot history item, data stream [{}] and template [{}] don't exist",
SLM_HISTORY_DATA_STREAM, SLM_TEMPLATE_NAME));
return;
}
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
item.toXContent(builder, ToXContent.EMPTY_PARAMS);
IndexRequest request = new IndexRequest(SLM_HISTORY_DATA_STREAM)
.opType(DocWriteRequest.OpType.CREATE)
.source(builder);
client.index(request, ActionListener.wrap(indexResponse -> {
logger.debug("successfully indexed snapshot history item with id [{}] in data stream [{}]: [{}]",
indexResponse.getId(), SLM_HISTORY_DATA_STREAM, item);
}, exception -> {
logger.error(new ParameterizedMessage("failed to index snapshot history item in data stream [{}]: [{}]",
SLM_HISTORY_DATA_STREAM, item), exception);
}));
} catch (IOException exception) {
logger.error(new ParameterizedMessage("failed to index snapshot history item in data stream [{}]: [{}]",
SLM_HISTORY_DATA_STREAM, item), exception);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public class SnapshotLifecycleTemplateRegistry extends IndexTemplateRegistry {
// version 1: initial
// version 2: converted to hidden index
// version 3: templates moved to composable templates
public static final int INDEX_TEMPLATE_VERSION = 3;
// version 4:converted data stream
public static final int INDEX_TEMPLATE_VERSION = 4;

public static final String SLM_TEMPLATE_VERSION_VARIABLE = "xpack.slm.template.version";
public static final String SLM_TEMPLATE_NAME = ".slm-history";
Expand Down
8 changes: 4 additions & 4 deletions x-pack/plugin/core/src/main/resources/slm-history.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
".slm-history-${xpack.slm.template.version}*"
],
"priority": 2147483647,
"data_stream": {
"hidden": true
},
"template": {
"settings": {
"index.number_of_shards": 1,
"index.number_of_replicas": 0,
"index.auto_expand_replicas": "0-1",
"index.lifecycle.name": "slm-history-ilm-policy",
"index.lifecycle.rollover_alias": ".slm-history-${xpack.slm.template.version}",
"index.hidden": true,
"index.format": 1
"index.lifecycle.name": "slm-history-ilm-policy"
},
"mappings": {
"dynamic": false,
Expand Down
Loading