Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] Move SLM history to data stream (#63038) #63122

Merged
merged 5 commits into from
Nov 2, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Move SLM history to data stream (#63038)
This change converts SLM history index to data stream.
Consequence of this is name change as name of data stream can't start with .. New name for SLM history data stream will be slm-history-4 (retaining version in the name).
# Conflicts:
#	x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/history/SnapshotHistoryStore.java
#	x-pack/plugin/ilm/src/internalClusterTest/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java
probakowski committed Oct 1, 2020
commit 207c9a1ef37bac63255763ea81f967965b8570c1
Original file line number Diff line number Diff line change
@@ -9,14 +9,11 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
@@ -27,6 +24,7 @@

import static org.elasticsearch.xpack.core.ilm.LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING;
import static org.elasticsearch.xpack.core.slm.history.SnapshotLifecycleTemplateRegistry.INDEX_TEMPLATE_VERSION;
import static org.elasticsearch.xpack.core.slm.history.SnapshotLifecycleTemplateRegistry.SLM_TEMPLATE_NAME;

/**
* Records Snapshot Lifecycle Management actions as represented by {@link SnapshotHistoryItem} into an index
@@ -35,8 +33,7 @@
public class SnapshotHistoryStore {
private static final Logger logger = LogManager.getLogger(SnapshotHistoryStore.class);

public static final String SLM_HISTORY_INDEX_PREFIX = ".slm-history-" + INDEX_TEMPLATE_VERSION + "-";
public static final String SLM_HISTORY_ALIAS = ".slm-history-" + INDEX_TEMPLATE_VERSION;
public static final String SLM_HISTORY_DATA_STREAM = "slm-history-" + INDEX_TEMPLATE_VERSION;

private final Client client;
private final ClusterService clusterService;
@@ -59,85 +56,29 @@ public void putAsync(SnapshotHistoryItem item) {
SLM_HISTORY_INDEX_ENABLED_SETTING.getKey(), item);
return;
}
logger.trace("about to index snapshot history item in index [{}]: [{}]", SLM_HISTORY_ALIAS, item);
ensureHistoryIndex(client, clusterService.state(), ActionListener.wrap(createdIndex -> {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
item.toXContent(builder, ToXContent.EMPTY_PARAMS);
IndexRequest request = new IndexRequest(SLM_HISTORY_ALIAS)
.source(builder);
client.index(request, ActionListener.wrap(indexResponse -> {
logger.debug("successfully indexed snapshot history item with id [{}] in index [{}]: [{}]",
indexResponse.getId(), SLM_HISTORY_ALIAS, item);
}, exception -> {
logger.error(new ParameterizedMessage("failed to index snapshot history item in index [{}]: [{}]",
SLM_HISTORY_ALIAS, item), exception);
}));
} catch (IOException exception) {
logger.error(new ParameterizedMessage("failed to index snapshot history item in index [{}]: [{}]",
SLM_HISTORY_ALIAS, item), exception);
}
}, ex -> logger.error(new ParameterizedMessage("failed to ensure SLM history index exists, not indexing history item [{}]",
item), ex)));
}

/**
* Checks if the SLM history index exists, and if not, creates it.
*
* @param client The client to use to create the index if needed
* @param state The current cluster state, to determine if the alias exists
* @param andThen Called after the index has been created. `onResponse` called with `true` if the index was created,
* `false` if it already existed.
*/
static void ensureHistoryIndex(Client client, ClusterState state, ActionListener<Boolean> andThen) {
final String initialHistoryIndexName = SLM_HISTORY_INDEX_PREFIX + "000001";
final IndexAbstraction slmHistory = state.metadata().getIndicesLookup().get(SLM_HISTORY_ALIAS);
final IndexAbstraction initialHistoryIndex = state.metadata().getIndicesLookup().get(initialHistoryIndexName);

if (slmHistory == null && initialHistoryIndex == null) {
// No alias or index exists with the expected names, so create the index with appropriate alias
client.admin().indices().prepareCreate(initialHistoryIndexName)
.setWaitForActiveShards(1)
.addAlias(new Alias(SLM_HISTORY_ALIAS)
.writeIndex(true)
.isHidden(true))
.execute(new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse response) {
andThen.onResponse(true);
}

@Override
public void onFailure(Exception e) {
if (e instanceof ResourceAlreadyExistsException) {
// The index didn't exist before we made the call, there was probably a race - just ignore this
logger.debug("index [{}] was created after checking for its existence, likely due to a concurrent call",
initialHistoryIndexName);
andThen.onResponse(false);
} else {
andThen.onFailure(e);
}
}
});
} else if (slmHistory == null) {
// alias does not exist but initial index does, something is broken
andThen.onFailure(new IllegalStateException("SLM history index [" + initialHistoryIndexName +
"] already exists but does not have alias [" + SLM_HISTORY_ALIAS + "]"));
} else if (slmHistory.getType() == IndexAbstraction.Type.ALIAS) {
if (slmHistory.getWriteIndex() != null) {
// The alias exists and has a write index, so we're good
andThen.onResponse(false);
} else {
// The alias does not have a write index, so we can't index into it
andThen.onFailure(new IllegalStateException("SLM history alias [" + SLM_HISTORY_ALIAS + "does not have a write index"));
}
} else if (slmHistory.getType() != IndexAbstraction.Type.ALIAS) {
// This is not an alias, error out
andThen.onFailure(new IllegalStateException("SLM history alias [" + SLM_HISTORY_ALIAS +
"] already exists as " + slmHistory.getType().getDisplayName()));
} else {
logger.error("unexpected IndexOrAlias for [{}]: [{}]", SLM_HISTORY_ALIAS, slmHistory);
// (slmHistory.isAlias() == true) but (slmHistory instanceof Alias == false)?
assert false : SLM_HISTORY_ALIAS + " cannot be both an alias and not an alias simultaneously";
logger.trace("about to index snapshot history item in data stream [{}]: [{}]", SLM_HISTORY_DATA_STREAM, item);
Metadata metadata = clusterService.state().getMetadata();
if (metadata.dataStreams().containsKey(SLM_HISTORY_DATA_STREAM) == false &&
metadata.templatesV2().containsKey(SLM_TEMPLATE_NAME) == false) {
logger.error(new ParameterizedMessage("failed to index snapshot history item, data stream [{}] and template [{}] don't exist",
SLM_HISTORY_DATA_STREAM, SLM_TEMPLATE_NAME));
return;
}
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
item.toXContent(builder, ToXContent.EMPTY_PARAMS);
IndexRequest request = new IndexRequest(SLM_HISTORY_DATA_STREAM)
.opType(DocWriteRequest.OpType.CREATE)
.source(builder);
client.index(request, ActionListener.wrap(indexResponse -> {
logger.debug("successfully indexed snapshot history item with id [{}] in data stream [{}]: [{}]",
indexResponse.getId(), SLM_HISTORY_DATA_STREAM, item);
}, exception -> {
logger.error(new ParameterizedMessage("failed to index snapshot history item in data stream [{}]: [{}]",
SLM_HISTORY_DATA_STREAM, item), exception);
}));
} catch (IOException exception) {
logger.error(new ParameterizedMessage("failed to index snapshot history item in data stream [{}]: [{}]",
SLM_HISTORY_DATA_STREAM, item), exception);
}
}
}
Original file line number Diff line number Diff line change
@@ -37,7 +37,8 @@ public class SnapshotLifecycleTemplateRegistry extends IndexTemplateRegistry {
// version 1: initial
// version 2: converted to hidden index
// version 3: templates moved to composable templates
public static final int INDEX_TEMPLATE_VERSION = 3;
// version 4:converted data stream
public static final int INDEX_TEMPLATE_VERSION = 4;

public static final String SLM_TEMPLATE_VERSION_VARIABLE = "xpack.slm.template.version";
public static final String SLM_TEMPLATE_NAME = ".slm-history";
8 changes: 3 additions & 5 deletions x-pack/plugin/core/src/main/resources/slm-history.json
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
{
"index_patterns": [
".slm-history-${xpack.slm.template.version}*"
"slm-history-${xpack.slm.template.version}*"
],
"priority": 2147483647,
"data_stream": {},
"template": {
"settings": {
"index.number_of_shards": 1,
"index.number_of_replicas": 0,
"index.auto_expand_replicas": "0-1",
"index.lifecycle.name": "slm-history-ilm-policy",
"index.lifecycle.rollover_alias": ".slm-history-${xpack.slm.template.version}",
"index.hidden": true,
"index.format": 1
"index.lifecycle.name": "slm-history-ilm-policy"
},
"mappings": {
"dynamic": false,
Loading