diff --git a/docs/changelog/97773.yaml b/docs/changelog/97773.yaml new file mode 100644 index 0000000000000..3106dc2621742 --- /dev/null +++ b/docs/changelog/97773.yaml @@ -0,0 +1,5 @@ +pr: 97773 +summary: "[Profiling] Support index migrations" +area: Application +type: enhancement +issues: [] diff --git a/x-pack/plugin/profiler/src/main/java/org/elasticsearch/xpack/profiler/AbstractProfilingPersistenceManager.java b/x-pack/plugin/profiler/src/main/java/org/elasticsearch/xpack/profiler/AbstractProfilingPersistenceManager.java index 7ba3ec95afd17..8ea0087565236 100644 --- a/x-pack/plugin/profiler/src/main/java/org/elasticsearch/xpack/profiler/AbstractProfilingPersistenceManager.java +++ b/x-pack/plugin/profiler/src/main/java/org/elasticsearch/xpack/profiler/AbstractProfilingPersistenceManager.java @@ -10,8 +10,14 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.support.RefCountingRunnable; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; @@ -21,12 +27,21 @@ import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.Index; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ClientHelper; import java.io.Closeable; +import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; + +import static org.elasticsearch.core.Strings.format; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; public abstract class AbstractProfilingPersistenceManager implements @@ -35,11 +50,14 @@ public abstract class AbstractProfilingPersistenceManager inProgress.set(false))) { ClusterState clusterState = event.state(); for (T index : getManagedIndices()) { - Status status = getStatus(clusterState, index); - if (status.actionable) { - onStatus(clusterState, status, index, ActionListener.releasing(refs.acquire())); + IndexState state = getIndexState(clusterState, index); + if (state.getStatus().actionable) { + onIndexState(clusterState, state, ActionListener.releasing(refs.acquire())); } } } @@ -120,29 +138,32 @@ protected boolean isAllResourcesCreated(ClusterChangedEvent event) { * Handler that takes appropriate action for a certain index status. * * @param clusterState The current cluster state. Never null. - * @param status Status of the current index. - * @param index The current index. + * @param indexState The state of the current index. * @param listener Listener to be called on completion / errors. */ - protected abstract void onStatus(ClusterState clusterState, Status status, T index, ActionListener listener); + protected abstract void onIndexState( + ClusterState clusterState, + IndexState indexState, + ActionListener listener + ); - private Status getStatus(ClusterState state, T index) { + private IndexState getIndexState(ClusterState state, T index) { IndexMetadata metadata = indexMetadata(state, index); if (metadata == null) { - return Status.NEEDS_CREATION; + return new IndexState<>(index, null, Status.NEEDS_CREATION); } if (metadata.getState() == IndexMetadata.State.CLOSE) { logger.warn( "Index [{}] is closed. This is likely to prevent Universal Profiling from functioning correctly", metadata.getIndex() ); - return Status.CLOSED; + return new IndexState<>(index, metadata.getIndex(), Status.CLOSED); } final IndexRoutingTable routingTable = state.getRoutingTable().index(metadata.getIndex()); ClusterHealthStatus indexHealth = new ClusterIndexHealth(metadata, routingTable).getStatus(); if (indexHealth == ClusterHealthStatus.RED) { - logger.debug("Index [{}] health status is RED, any pending mapping upgrades will wait until this changes", metadata.getIndex()); - return Status.UNHEALTHY; + logger.trace("Index [{}] health status is RED, any pending mapping upgrades will wait until this changes", metadata.getIndex()); + return new IndexState<>(index, metadata.getIndex(), Status.UNHEALTHY); } MappingMetadata mapping = metadata.mapping(); if (mapping != null) { @@ -159,23 +180,108 @@ private Status getStatus(ClusterState state, T index) { currentIndexVersion = getVersionField(metadata.getIndex(), meta, "index-version"); currentTemplateVersion = getVersionField(metadata.getIndex(), meta, "index-template-version"); if (currentIndexVersion == -1 || currentTemplateVersion == -1) { - return Status.UNHEALTHY; + return new IndexState<>(index, metadata.getIndex(), Status.UNHEALTHY); } } if (index.getVersion() > currentIndexVersion) { - return Status.NEEDS_VERSION_BUMP; - } else if (ProfilingIndexTemplateRegistry.INDEX_TEMPLATE_VERSION > currentTemplateVersion) { - // TODO 8.10+: Check if there are any pending migrations. If none are pending we can consider the index up to date. - return Status.NEEDS_MAPPINGS_UPDATE; + return new IndexState<>(index, metadata.getIndex(), Status.NEEDS_VERSION_BUMP); + } else if (getIndexTemplateVersion() > currentTemplateVersion) { + // if there are no migrations we can consider the index up-to-date even if the index template version does not match. + List pendingMigrations = index.getMigrations(currentTemplateVersion); + if (pendingMigrations.isEmpty()) { + logger.trace( + "Index [{}] with index template version [{}] (current is [{}]) is up-to-date (no pending migrations).", + metadata.getIndex(), + currentTemplateVersion, + getIndexTemplateVersion() + ); + return new IndexState<>(index, metadata.getIndex(), Status.UP_TO_DATE); + } + logger.trace( + "Index [{}] with index template version [{}] (current is [{}]) has [{}] pending migrations.", + metadata.getIndex(), + currentTemplateVersion, + getIndexTemplateVersion(), + pendingMigrations.size() + ); + return new IndexState<>(index, metadata.getIndex(), Status.NEEDS_MAPPINGS_UPDATE, pendingMigrations); } else { - return Status.UP_TO_DATE; + return new IndexState<>(index, metadata.getIndex(), Status.UP_TO_DATE); } } else { logger.warn("No mapping found for existing index [{}]. Index cannot be migrated.", metadata.getIndex()); - return Status.UNHEALTHY; + return new IndexState<>(index, metadata.getIndex(), Status.UNHEALTHY); + } + } + + // overridable for testing + protected int getIndexTemplateVersion() { + return ProfilingIndexTemplateRegistry.INDEX_TEMPLATE_VERSION; + } + + protected final void applyMigrations(IndexState indexState, ActionListener listener) { + String writeIndex = indexState.getWriteIndex().getName(); + try (var refs = new RefCountingRunnable(() -> listener.onResponse(null))) { + for (Migration migration : indexState.getPendingMigrations()) { + logger.debug("Applying migration [{}] for [{}].", migration, writeIndex); + migration.apply( + writeIndex, + (r -> updateMapping(r, ActionListener.releasing(refs.acquire()))), + (r -> updateSettings(r, ActionListener.releasing(refs.acquire()))) + ); + } } } + protected final void updateMapping(PutMappingRequest request, ActionListener listener) { + request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); + executeAsync("put mapping", request, listener, (req, l) -> client.admin().indices().putMapping(req, l)); + } + + protected final void updateSettings(UpdateSettingsRequest request, ActionListener listener) { + request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); + executeAsync("update settings", request, listener, (req, l) -> client.admin().indices().updateSettings(req, l)); + } + + protected final void executeAsync( + final String actionName, + final Request request, + final ActionListener listener, + BiConsumer> consumer + ) { + final Executor executor = threadPool.generic(); + executor.execute(() -> { + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ClientHelper.PROFILING_ORIGIN, request, new ActionListener<>() { + @Override + public void onResponse(Response response) { + if (response.isAcknowledged() == false) { + logger.error( + "Could not execute action [{}] for indices [{}] for [{}], request was not acknowledged", + actionName, + request.indices(), + ClientHelper.PROFILING_ORIGIN + ); + } + listener.onResponse(response); + } + + @Override + public void onFailure(Exception ex) { + logger.error( + () -> format( + "Could not execute action [%s] for indices [%s] for [%s]", + actionName, + request.indices(), + ClientHelper.PROFILING_ORIGIN + ), + ex + ); + listener.onFailure(ex); + } + }, consumer); + }); + } + private int getVersionField(Index index, Map meta, String fieldName) { Object value = meta.get(fieldName); if (value instanceof Integer) { @@ -189,6 +295,40 @@ private int getVersionField(Index index, Map meta, String fieldN return -1; } + protected static final class IndexState { + private final T index; + private final Index writeIndex; + private final Status status; + private final List pendingMigrations; + + IndexState(T index, Index writeIndex, Status status) { + this(index, writeIndex, status, null); + } + + IndexState(T index, Index writeIndex, Status status, List pendingMigrations) { + this.index = index; + this.writeIndex = writeIndex; + this.status = status; + this.pendingMigrations = pendingMigrations; + } + + public T getIndex() { + return index; + } + + public Index getWriteIndex() { + return writeIndex; + } + + public Status getStatus() { + return status; + } + + public List getPendingMigrations() { + return pendingMigrations; + } + } + enum Status { CLOSED(false), UNHEALTHY(false), @@ -214,5 +354,7 @@ interface ProfilingIndexAbstraction { String getName(); int getVersion(); + + List getMigrations(int currentIndexTemplateVersion); } } diff --git a/x-pack/plugin/profiler/src/main/java/org/elasticsearch/xpack/profiler/Migration.java b/x-pack/plugin/profiler/src/main/java/org/elasticsearch/xpack/profiler/Migration.java new file mode 100644 index 0000000000000..9483eb84babcb --- /dev/null +++ b/x-pack/plugin/profiler/src/main/java/org/elasticsearch/xpack/profiler/Migration.java @@ -0,0 +1,163 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.profiler; + +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.common.settings.Settings; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.function.Consumer; + +/** + * Encapsulates a single migration action. + */ +public interface Migration { + /** + * @return The index template version that the targeted index will be on after the migration has been applied. + */ + int getTargetIndexTemplateVersion(); + + /** + * Applies this migration. Depending on the type of migration one or the other consumer will be called. + * + * @param index The name of the specific index to which the migration should be applied. + * @param mappingConsumer A consumer that applies mapping changes. + * @param settingsConsumer A consumer that applies settings changes. + */ + void apply(String index, Consumer mappingConsumer, Consumer settingsConsumer); + + /** + * Builds migrations for an index. + */ + class Builder { + private final List migrations = new ArrayList<>(); + private Integer targetVersion; + + private void checkVersionSet() { + if (targetVersion == null) { + throw new IllegalStateException("Set targetVersion before defining migrations"); + } + } + + /** + * @param version The index template version that the targeted index will reach after this migration has been applied. + * @return this to allow for method chaining. + */ + public Builder migrateToIndexTemplateVersion(int version) { + this.targetVersion = version; + return this; + } + + /** + * Adds new property to an index mapping. This method should be used for simple cases where a new property needs to be added + * without any further mapping parameters. For more complex cases use {@link #putMapping(Map)} instead and provide the body of + * the PUT mapping API call. + * + * @param property The name of the new property. + * @param type The mapping type. + * @return this to allow for method chaining. + */ + public Builder addProperty(String property, String type) { + Map body = Map.of("properties", Map.of(property, Map.of("type", type))); + return putMapping(body); + } + + /** + * Adds a change to an existing index mapping. Use {@link #addProperty(String, String)} instead for simple cases. + * + * @param body The complete body for this mapping change. + * @return this to allow for method chaining. + */ + public Builder putMapping(Map body) { + checkVersionSet(); + this.migrations.add(new PutMappingMigration(targetVersion, body)); + return this; + } + + /** + * Adds or modifies dynamic index settings. + * + * @param settings A settings object representing the required dynamic index settings. + * @return this to allow for method chaining. + */ + public Builder dynamicSettings(Settings settings) { + checkVersionSet(); + this.migrations.add(new DynamicIndexSettingsMigration(targetVersion, settings)); + return this; + } + + /** + * Builds the specified list of migrations. + * + * @param indexVersion The current index version. + * @return An unmodifiable list of migrations in program order (i.e. the order in which they have been called). + */ + public List build(int indexVersion) { + // ensure that the index template version is up-to-date after all migrations have been applied + Map updateIndexTemplateVersion = Map.of( + "_meta", + Map.of("index-template-version", targetVersion, "index-version", indexVersion) + ); + migrations.add(new PutMappingMigration(targetVersion, updateIndexTemplateVersion)); + return Collections.unmodifiableList(migrations); + } + } + + class PutMappingMigration implements Migration { + private final int targetVersion; + private final Map body; + + public PutMappingMigration(int targetVersion, Map body) { + this.targetVersion = targetVersion; + this.body = body; + } + + @Override + public int getTargetIndexTemplateVersion() { + return targetVersion; + } + + public void apply(String index, Consumer mappingConsumer, Consumer settingsConsumer) { + mappingConsumer.accept(new PutMappingRequest(index).source(body)); + } + + @Override + public String toString() { + return String.format(Locale.ROOT, "put mapping to target version [%d]", targetVersion); + } + } + + class DynamicIndexSettingsMigration implements Migration { + private final int targetVersion; + private final Settings settings; + + public DynamicIndexSettingsMigration(int targetVersion, Settings settings) { + this.targetVersion = targetVersion; + this.settings = settings; + } + + @Override + public int getTargetIndexTemplateVersion() { + return targetVersion; + } + + public void apply(String index, Consumer mappingConsumer, Consumer settingsConsumer) { + settingsConsumer.accept(new UpdateSettingsRequest(settings, index)); + } + + @Override + public String toString() { + return String.format(Locale.ROOT, "update settings to target version [%d]", targetVersion); + } + } +} diff --git a/x-pack/plugin/profiler/src/main/java/org/elasticsearch/xpack/profiler/ProfilingDataStreamManager.java b/x-pack/plugin/profiler/src/main/java/org/elasticsearch/xpack/profiler/ProfilingDataStreamManager.java index b6cc0cec2d4c1..fb431ce93e6d3 100644 --- a/x-pack/plugin/profiler/src/main/java/org/elasticsearch/xpack/profiler/ProfilingDataStreamManager.java +++ b/x-pack/plugin/profiler/src/main/java/org/elasticsearch/xpack/profiler/ProfilingDataStreamManager.java @@ -51,27 +51,23 @@ public class ProfilingDataStreamManager extends AbstractProfilingPersistenceMana PROFILING_DATASTREAMS = Collections.unmodifiableList(dataStreams); } - private final ThreadPool threadPool; - private final Client client; - public ProfilingDataStreamManager(ThreadPool threadPool, Client client, ClusterService clusterService) { - super(clusterService); - this.threadPool = threadPool; - this.client = client; + super(threadPool, client, clusterService); } @Override - protected void onStatus( + protected void onIndexState( ClusterState clusterState, - Status status, - ProfilingDataStream index, + IndexState indexState, ActionListener listener ) { + Status status = indexState.getStatus(); switch (status) { - case NEEDS_CREATION -> createDataStream(index, listener); - case NEEDS_VERSION_BUMP -> rolloverDataStream(index, listener); + case NEEDS_CREATION -> createDataStream(indexState.getIndex(), listener); + case NEEDS_VERSION_BUMP -> rolloverDataStream(indexState.getIndex(), listener); + case NEEDS_MAPPINGS_UPDATE -> applyMigrations(indexState, listener); default -> { - logger.debug("Skipping status change [{}] for data stream [{}].", status, index); + logger.trace("Skipping status change [{}] for data stream [{}].", status, indexState.getIndex()); // ensure that listener is notified we're done listener.onResponse(null); } @@ -192,18 +188,25 @@ public void onFailure(Exception e) { static class ProfilingDataStream implements AbstractProfilingPersistenceManager.ProfilingIndexAbstraction { private final String name; private final int version; + private final List migrations; public static ProfilingDataStream of(String name, int version) { - return new ProfilingDataStream(name, version); + return of(name, version, null); + } + + public static ProfilingDataStream of(String name, int version, Migration.Builder builder) { + List migrations = builder != null ? builder.build(version) : null; + return new ProfilingDataStream(name, version, migrations); } - private ProfilingDataStream(String name, int version) { + private ProfilingDataStream(String name, int version, List migrations) { this.name = name; this.version = version; + this.migrations = migrations; } public ProfilingDataStream withVersion(int version) { - return new ProfilingDataStream(name, version); + return new ProfilingDataStream(name, version, migrations); } @Override @@ -216,6 +219,13 @@ public int getVersion() { return version; } + @Override + public List getMigrations(int currentIndexTemplateVersion) { + return migrations != null + ? migrations.stream().filter(m -> m.getTargetIndexTemplateVersion() > currentIndexTemplateVersion).toList() + : Collections.emptyList(); + } + @Override public String toString() { return getName(); diff --git a/x-pack/plugin/profiler/src/main/java/org/elasticsearch/xpack/profiler/ProfilingIndexManager.java b/x-pack/plugin/profiler/src/main/java/org/elasticsearch/xpack/profiler/ProfilingIndexManager.java index 2fa6b0f3deb3b..6c64983687710 100644 --- a/x-pack/plugin/profiler/src/main/java/org/elasticsearch/xpack/profiler/ProfilingIndexManager.java +++ b/x-pack/plugin/profiler/src/main/java/org/elasticsearch/xpack/profiler/ProfilingIndexManager.java @@ -24,6 +24,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Locale; @@ -67,27 +68,23 @@ public class ProfilingIndexManager extends AbstractProfilingPersistenceManager

indexState, ActionListener listener ) { + Status status = indexState.getStatus(); switch (status) { - case NEEDS_CREATION -> createIndex(clusterState, index, listener); - case NEEDS_VERSION_BUMP -> bumpVersion(clusterState, index, listener); + case NEEDS_CREATION -> createIndex(clusterState, indexState.getIndex(), listener); + case NEEDS_VERSION_BUMP -> bumpVersion(clusterState, indexState.getIndex(), listener); + case NEEDS_MAPPINGS_UPDATE -> applyMigrations(indexState, listener); default -> { - logger.debug("Skipping status change [{}] for index [{}].", status, index); + logger.trace("Skipping status change [{}] for index [{}].", status, indexState.getIndex()); // ensure that listener is notified we're done listener.onResponse(null); } @@ -278,41 +275,10 @@ public void onFailure(Exception e) { }); } - private void onDeleteIndexFailure(String[] indices, Exception ex) { - logger.error(() -> format("error deleting indices [%s] for [%s]", indices, ClientHelper.PROFILING_ORIGIN), ex); - } - private void deleteIndices(final String[] indices, final ActionListener listener) { - final Executor executor = threadPool.generic(); - executor.execute(() -> { - DeleteIndexRequest request = new DeleteIndexRequest(indices); - request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); - executeAsyncWithOrigin( - client.threadPool().getThreadContext(), - ClientHelper.PROFILING_ORIGIN, - request, - new ActionListener() { - @Override - public void onResponse(AcknowledgedResponse response) { - if (response.isAcknowledged() == false) { - logger.error( - "error deleting indices [{}] for [{}], request was not acknowledged", - indices, - ClientHelper.PROFILING_ORIGIN - ); - } - listener.onResponse(response); - } - - @Override - public void onFailure(Exception e) { - onDeleteIndexFailure(indices, e); - listener.onFailure(e); - } - }, - (req, l) -> client.admin().indices().delete(req, l) - ); - }); + DeleteIndexRequest request = new DeleteIndexRequest(indices); + request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); + executeAsync("delete", request, listener, (req, l) -> client.admin().indices().delete(req, l)); } enum OnVersionBump { @@ -328,29 +294,41 @@ static class ProfilingIndex implements ProfilingIndexAbstraction { private final int version; private final String generation; private final OnVersionBump onVersionBump; + private final List migrations; public static ProfilingIndex regular(String name, int version, OnVersionBump onVersionBump) { - return new ProfilingIndex(name, version, null, onVersionBump); + return regular(name, version, onVersionBump, null); + } + + public static ProfilingIndex regular(String name, int version, OnVersionBump onVersionBump, Migration.Builder builder) { + List migrations = builder != null ? builder.build(version) : null; + return new ProfilingIndex(name, version, null, onVersionBump, migrations); } public static ProfilingIndex kv(String name, int version) { + return kv(name, version, null); + } + + public static ProfilingIndex kv(String name, int version, Migration.Builder builder) { + List migrations = builder != null ? builder.build(version) : null; // K/V indices will age automatically as per the ILM policy, and we won't force-upgrade them on version bumps - return new ProfilingIndex(name, version, "000001", OnVersionBump.KEEP_OLD); + return new ProfilingIndex(name, version, "000001", OnVersionBump.KEEP_OLD, migrations); } - private ProfilingIndex(String namePrefix, int version, String generation, OnVersionBump onVersionBump) { + private ProfilingIndex(String namePrefix, int version, String generation, OnVersionBump onVersionBump, List migrations) { this.namePrefix = namePrefix; this.version = version; this.generation = generation; this.onVersionBump = onVersionBump; + this.migrations = migrations; } public ProfilingIndex withVersion(int version) { - return new ProfilingIndex(namePrefix, version, generation, onVersionBump); + return new ProfilingIndex(namePrefix, version, generation, onVersionBump, migrations); } public ProfilingIndex withGeneration(String generation) { - return new ProfilingIndex(namePrefix, version, generation, onVersionBump); + return new ProfilingIndex(namePrefix, version, generation, onVersionBump, migrations); } public boolean isMatchWithoutVersion(String indexName) { @@ -392,6 +370,13 @@ public int getVersion() { return version; } + @Override + public List getMigrations(int currentIndexTemplateVersion) { + return migrations != null + ? migrations.stream().filter(m -> m.getTargetIndexTemplateVersion() > currentIndexTemplateVersion).toList() + : Collections.emptyList(); + } + public OnVersionBump getOnVersionBump() { return onVersionBump; } diff --git a/x-pack/plugin/profiler/src/test/java/org/elasticsearch/xpack/profiler/ProfilingDataStreamManagerTests.java b/x-pack/plugin/profiler/src/test/java/org/elasticsearch/xpack/profiler/ProfilingDataStreamManagerTests.java index f6ca02c74865a..e0f4fd5eaf6f9 100644 --- a/x-pack/plugin/profiler/src/test/java/org/elasticsearch/xpack/profiler/ProfilingDataStreamManagerTests.java +++ b/x-pack/plugin/profiler/src/test/java/org/elasticsearch/xpack/profiler/ProfilingDataStreamManagerTests.java @@ -11,8 +11,13 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.admin.indices.rollover.RolloverAction; import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; +import org.elasticsearch.action.admin.indices.rollover.RolloverResponse; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.datastreams.CreateDataStreamAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -66,6 +71,8 @@ public class ProfilingDataStreamManagerTests extends ESTestCase { private ClusterService clusterService; private ThreadPool threadPool; private VerifyingClient client; + private List managedDataStreams; + private int indexTemplateVersion; @Before public void createRegistryAndClient() { @@ -73,11 +80,23 @@ public void createRegistryAndClient() { threadPool = new TestThreadPool(this.getClass().getName()); client = new VerifyingClient(threadPool); clusterService = ClusterServiceUtils.createClusterService(threadPool); + managedDataStreams = ProfilingDataStreamManager.PROFILING_DATASTREAMS; + indexTemplateVersion = ProfilingIndexTemplateRegistry.INDEX_TEMPLATE_VERSION; datastreamManager = new ProfilingDataStreamManager(threadPool, client, clusterService) { @Override protected boolean isAllResourcesCreated(ClusterChangedEvent event) { return templatesCreated.get(); } + + @Override + protected int getIndexTemplateVersion() { + return indexTemplateVersion; + } + + @Override + protected Iterable getManagedIndices() { + return managedDataStreams; + } }; datastreamManager.setTemplatesEnabled(true); } @@ -220,6 +239,68 @@ public void testThatDataStreamIsRolledOver() throws Exception { calledTimes.set(0); } + public void testNoMigrationsIfIndexTemplateVersionMatches() throws Exception { + DiscoveryNode node = DiscoveryNodeUtils.create("node"); + DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build(); + templatesCreated.set(true); + + ProfilingDataStreamManager.ProfilingDataStream ds = ProfilingDataStreamManager.ProfilingDataStream.of( + "profiling-test", + 1, + new Migration.Builder().migrateToIndexTemplateVersion(2).addProperty("test", "keyword") + ); + + managedDataStreams = List.of(ds); + ClusterChangedEvent event = createClusterChangedEvent(managedDataStreams, nodes); + + client.setVerifier((a, r, l) -> { + fail("all data streams should be up-to-date; nothing should happen"); + return null; + }); + datastreamManager.clusterChanged(event); + } + + public void testMigratesIfIndexTemplateVersionIsBehind() throws Exception { + DiscoveryNode node = DiscoveryNodeUtils.create("node"); + DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build(); + templatesCreated.set(true); + + ProfilingDataStreamManager.ProfilingDataStream ds = ProfilingDataStreamManager.ProfilingDataStream.of( + "profiling-test", + 1, + new Migration.Builder().migrateToIndexTemplateVersion(2).addProperty("test", "keyword") + ); + ProfilingDataStreamManager.ProfilingDataStream ds2 = ProfilingDataStreamManager.ProfilingDataStream.of("profiling-no-change", 1 + // no migration specified, should not be changed + ); + + managedDataStreams = List.of(ds, ds2); + // index is out of date and should be migrated + indexTemplateVersion = 2; + ClusterChangedEvent event = createClusterChangedEvent(managedDataStreams, nodes); + + AtomicInteger mappingUpdates = new AtomicInteger(0); + AtomicInteger settingsUpdates = new AtomicInteger(0); + client.setVerifier( + (action, request, listener) -> verifyIndexMigrated( + ".ds-profiling-test", + mappingUpdates, + settingsUpdates, + action, + request, + listener + ) + ); + + datastreamManager.clusterChanged(event); + // one mapping update is the one we specified, the other one is because we need to update _meta + assertBusy(() -> assertThat(mappingUpdates.get(), equalTo(2))); + assertBusy(() -> assertThat(settingsUpdates.get(), equalTo(0))); + + mappingUpdates.set(0); + settingsUpdates.set(0); + } + private ActionResponse verifyDataStreamInstalled( AtomicInteger calledTimes, ActionType action, @@ -249,6 +330,42 @@ private ActionResponse verifyDataStreamRolledOver( assertThat(action, instanceOf(RolloverAction.class)); assertThat(request, instanceOf(RolloverRequest.class)); assertNotNull(listener); + RolloverRequest rolloverRequest = (RolloverRequest) request; + return new RolloverResponse( + rolloverRequest.getRolloverTarget(), + rolloverRequest.getNewIndexName(), + Map.of(), + false, + true, + true, + true + ); + } else { + fail("client called with unexpected request:" + request.toString()); + return null; + } + } + + private ActionResponse verifyIndexMigrated( + String indexName, + AtomicInteger mappingUpdates, + AtomicInteger settingsUpdates, + ActionType action, + ActionRequest request, + ActionListener listener + ) { + if (action instanceof PutMappingAction) { + mappingUpdates.incrementAndGet(); + assertThat(action, instanceOf(PutMappingAction.class)); + assertThat(request, instanceOf(PutMappingRequest.class)); + assertThat(((PutMappingRequest) request).indices(), equalTo(new String[] { indexName })); + assertNotNull(listener); + return AcknowledgedResponse.TRUE; + } else if (action instanceof UpdateSettingsAction) { + settingsUpdates.incrementAndGet(); + assertThat(action, instanceOf(UpdateSettingsAction.class)); + assertThat(request, instanceOf(UpdateSettingsRequest.class)); + assertNotNull(listener); return AcknowledgedResponse.TRUE; } else { fail("client called with unexpected request:" + request.toString()); diff --git a/x-pack/plugin/profiler/src/test/java/org/elasticsearch/xpack/profiler/ProfilingIndexManagerTests.java b/x-pack/plugin/profiler/src/test/java/org/elasticsearch/xpack/profiler/ProfilingIndexManagerTests.java index 12f117359c2ee..b41a33f2c8705 100644 --- a/x-pack/plugin/profiler/src/test/java/org/elasticsearch/xpack/profiler/ProfilingIndexManagerTests.java +++ b/x-pack/plugin/profiler/src/test/java/org/elasticsearch/xpack/profiler/ProfilingIndexManagerTests.java @@ -16,6 +16,10 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; @@ -35,7 +39,9 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.shard.ShardId; @@ -65,6 +71,8 @@ public class ProfilingIndexManagerTests extends ESTestCase { private ClusterService clusterService; private ThreadPool threadPool; private VerifyingClient client; + private List managedIndices; + private int indexTemplateVersion; @Before public void createRegistryAndClient() { @@ -72,11 +80,23 @@ public void createRegistryAndClient() { threadPool = new TestThreadPool(this.getClass().getName()); client = new VerifyingClient(threadPool); clusterService = ClusterServiceUtils.createClusterService(threadPool); + managedIndices = ProfilingIndexManager.PROFILING_INDICES; + indexTemplateVersion = ProfilingIndexTemplateRegistry.INDEX_TEMPLATE_VERSION; indexManager = new ProfilingIndexManager(threadPool, client, clusterService) { @Override protected boolean isAllResourcesCreated(ClusterChangedEvent event) { return templatesCreated.get(); } + + @Override + protected Iterable getManagedIndices() { + return managedIndices; + } + + @Override + protected int getIndexTemplateVersion() { + return indexTemplateVersion; + } }; indexManager.setTemplatesEnabled(true); } @@ -227,6 +247,77 @@ public void testUpgradesOldIndex() throws Exception { indicesDeleted.set(0); } + public void testNoMigrationsIfIndexTemplateVersionMatches() { + DiscoveryNode node = DiscoveryNodeUtils.create("node"); + DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build(); + templatesCreated.set(true); + + ProfilingIndexManager.ProfilingIndex idx = ProfilingIndexManager.ProfilingIndex.regular( + "profiling-test", + 1, + ProfilingIndexManager.OnVersionBump.KEEP_OLD, + new Migration.Builder().migrateToIndexTemplateVersion(2).addProperty("test", "keyword") + ); + + managedIndices = List.of(idx); + ClusterChangedEvent event = createClusterChangedEvent(managedIndices, nodes); + + client.setVerifier((a, r, l) -> { + fail("all indices should be up-to-date; nothing should happen"); + return null; + }); + indexManager.clusterChanged(event); + } + + public void testMigratesIfIndexTemplateVersionIsBehind() throws Exception { + DiscoveryNode node = DiscoveryNodeUtils.create("node"); + DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build(); + templatesCreated.set(true); + + ProfilingIndexManager.ProfilingIndex idx = ProfilingIndexManager.ProfilingIndex.regular( + "profiling-test", + 1, + ProfilingIndexManager.OnVersionBump.KEEP_OLD, + new Migration.Builder().migrateToIndexTemplateVersion(2) + .addProperty("test", "keyword") + .dynamicSettings( + Settings.builder().put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(30)).build() + ) + ); + ProfilingIndexManager.ProfilingIndex idx2 = ProfilingIndexManager.ProfilingIndex.regular( + "profiling-no-change", + 1, + ProfilingIndexManager.OnVersionBump.KEEP_OLD + // no migration specified, should not be changed + ); + + managedIndices = List.of(idx, idx2); + // index is out of date and should be migrated + indexTemplateVersion = 2; + ClusterChangedEvent event = createClusterChangedEvent(managedIndices, nodes); + + AtomicInteger mappingUpdates = new AtomicInteger(0); + AtomicInteger settingsUpdates = new AtomicInteger(0); + client.setVerifier( + (action, request, listener) -> verifyIndexMigrated( + ".profiling-test-v001", + mappingUpdates, + settingsUpdates, + action, + request, + listener + ) + ); + + indexManager.clusterChanged(event); + // one mapping update is the one we specified, the other one is because we need to update _meta + assertBusy(() -> assertThat(mappingUpdates.get(), equalTo(2))); + assertBusy(() -> assertThat(settingsUpdates.get(), equalTo(1))); + + mappingUpdates.set(0); + settingsUpdates.set(0); + } + public void testIndexMatchWithoutVersion() { ProfilingIndexManager.ProfilingIndex idx = ProfilingIndexManager.ProfilingIndex.kv("profiling-test", 1); assertTrue(idx.isMatchWithoutVersion(".profiling-test-v002")); @@ -278,6 +369,33 @@ private ActionResponse verifyIndexUpgraded( } } + private ActionResponse verifyIndexMigrated( + String indexName, + AtomicInteger mappingUpdates, + AtomicInteger settingsUpdates, + ActionType action, + ActionRequest request, + ActionListener listener + ) { + if (action instanceof PutMappingAction) { + mappingUpdates.incrementAndGet(); + assertThat(action, instanceOf(PutMappingAction.class)); + assertThat(request, instanceOf(PutMappingRequest.class)); + assertThat(((PutMappingRequest) request).indices(), equalTo(new String[] { indexName })); + assertNotNull(listener); + return AcknowledgedResponse.TRUE; + } else if (action instanceof UpdateSettingsAction) { + settingsUpdates.incrementAndGet(); + assertThat(action, instanceOf(UpdateSettingsAction.class)); + assertThat(request, instanceOf(UpdateSettingsRequest.class)); + assertNotNull(listener); + return AcknowledgedResponse.TRUE; + } else { + fail("client called with unexpected request:" + request.toString()); + return null; + } + } + private ClusterChangedEvent createClusterChangedEvent( Iterable existingIndices, DiscoveryNodes nodes