From e0496fab29940a75b7a561594a25f39e6063164a Mon Sep 17 00:00:00 2001 From: Mary Gouseti Date: Fri, 30 Dec 2022 18:40:03 +0100 Subject: [PATCH] Batch ApplyAliasActions cluster state updates (#89924) (#90010) Move the alias updates to be processed in a batched way. This should give a performance boost when there are a lot of alias updates since they have high priority. --- .../metadata/MetadataIndexAliasesService.java | 74 ++++++++++++++++--- .../MetadataIndexAliasesServiceTests.java | 67 ++++++++++++++--- 2 files changed, 118 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexAliasesService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexAliasesService.java index ed53313a95ede..0ec58787dfb61 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexAliasesService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexAliasesService.java @@ -12,15 +12,20 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesClusterStateUpdateRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterStateAckListener; +import org.elasticsearch.cluster.ClusterStateTaskConfig; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.SimpleBatchedAckListenerTaskExecutor; import org.elasticsearch.cluster.metadata.AliasAction.NewAliasValidator; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexService; @@ -55,6 +60,8 @@ public class MetadataIndexAliasesService { private final NamedXContentRegistry xContentRegistry; + private final ClusterStateTaskExecutor executor; + @Inject public MetadataIndexAliasesService( ClusterService clusterService, @@ -66,20 +73,19 @@ public MetadataIndexAliasesService( this.indicesService = indicesService; this.deleteIndexService = deleteIndexService; this.xContentRegistry = xContentRegistry; - } + this.executor = new SimpleBatchedAckListenerTaskExecutor<>() { - public void indicesAliases(final IndicesAliasesClusterStateUpdateRequest request, final ActionListener listener) { - submitUnbatchedTask("index-aliases", new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) { @Override - public ClusterState execute(ClusterState currentState) { - return applyAliasActions(currentState, request.actions()); + public Tuple executeTask(ApplyAliasesTask applyAliasesTask, ClusterState clusterState) { + return new Tuple<>(applyAliasActions(clusterState, applyAliasesTask.request().actions()), applyAliasesTask); } - }); + }; } - @SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here - private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) { - clusterService.submitUnbatchedStateUpdateTask(source, task); + public void indicesAliases(final IndicesAliasesClusterStateUpdateRequest request, final ActionListener listener) { + var task = new ApplyAliasesTask(request, listener); + var config = ClusterStateTaskConfig.build(Priority.URGENT); + clusterService.submitStateUpdateTask("index-aliases", task, config, executor); } /** @@ -198,6 +204,11 @@ public ClusterState applyAliasActions(ClusterState currentState, Iterable getExecutor() { + return executor; + } + private void validateFilter( List indicesToClose, Map indices, @@ -244,4 +255,43 @@ private static void validateAliasTargetIsNotDSBackingIndex(ClusterState currentS ); } } + + /** + * A cluster state update task that consists of the cluster state request and the listeners that need to be notified upon completion. + */ + record ApplyAliasesTask(IndicesAliasesClusterStateUpdateRequest request, ActionListener listener) + implements + ClusterStateTaskListener, + ClusterStateAckListener { + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + public boolean mustAck(DiscoveryNode discoveryNode) { + return true; + } + + @Override + public void onAllNodesAcked() { + listener.onResponse(AcknowledgedResponse.TRUE); + } + + @Override + public void onAckFailure(Exception e) { + listener.onResponse(AcknowledgedResponse.FALSE); + } + + @Override + public void onAckTimeout() { + listener.onResponse(AcknowledgedResponse.FALSE); + } + + @Override + public TimeValue ackTimeout() { + return request.ackTimeout(); + } + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexAliasesServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexAliasesServiceTests.java index 4534727b75923..4f99f5ea943cc 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexAliasesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataIndexAliasesServiceTests.java @@ -10,8 +10,10 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesClusterStateUpdateRequest; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.Tuple; @@ -292,13 +294,13 @@ public void testAddWriteOnlyWithNoExistingAliases() { ClusterState after = service.applyAliasActions( before, - Arrays.asList(new AliasAction.Add("test", "alias", null, null, null, false, null)) + List.of(new AliasAction.Add("test", "alias", null, null, null, false, null)) ); assertFalse(after.metadata().index("test").getAliases().get("alias").writeIndex()); assertNull(after.metadata().getIndicesLookup().get("alias").getWriteIndex()); assertAliasesVersionIncreased("test", before, after); - after = service.applyAliasActions(before, Arrays.asList(new AliasAction.Add("test", "alias", null, null, null, null, null))); + after = service.applyAliasActions(before, List.of(new AliasAction.Add("test", "alias", null, null, null, null, null))); assertNull(after.metadata().index("test").getAliases().get("alias").writeIndex()); assertThat( after.metadata().index(after.metadata().getIndicesLookup().get("alias").getWriteIndex()), @@ -306,7 +308,7 @@ public void testAddWriteOnlyWithNoExistingAliases() { ); assertAliasesVersionIncreased("test", before, after); - after = service.applyAliasActions(before, Arrays.asList(new AliasAction.Add("test", "alias", null, null, null, true, null))); + after = service.applyAliasActions(before, List.of(new AliasAction.Add("test", "alias", null, null, null, true, null))); assertTrue(after.metadata().index("test").getAliases().get("alias").writeIndex()); assertThat( after.metadata().index(after.metadata().getIndicesLookup().get("alias").getWriteIndex()), @@ -329,10 +331,7 @@ public void testAddWriteOnlyWithExistingWriteIndex() { .metadata(Metadata.builder().put(indexMetadata).put(indexMetadata2)) .build(); - ClusterState after = service.applyAliasActions( - before, - Arrays.asList(new AliasAction.Add("test", "alias", null, null, null, null, null)) - ); + ClusterState after = service.applyAliasActions(before, List.of(new AliasAction.Add("test", "alias", null, null, null, null, null))); assertNull(after.metadata().index("test").getAliases().get("alias").writeIndex()); assertThat( after.metadata().index(after.metadata().getIndicesLookup().get("alias").getWriteIndex()), @@ -343,7 +342,7 @@ public void testAddWriteOnlyWithExistingWriteIndex() { Exception exception = expectThrows( IllegalStateException.class, - () -> service.applyAliasActions(before, Arrays.asList(new AliasAction.Add("test", "alias", null, null, null, true, null))) + () -> service.applyAliasActions(before, List.of(new AliasAction.Add("test", "alias", null, null, null, true, null))) ); assertThat(exception.getMessage(), startsWith("alias [alias] has more than one write index [")); } @@ -402,7 +401,7 @@ public void testAddWriteOnlyWithExistingNonWriteIndices() { ClusterState after = service.applyAliasActions( before, - Arrays.asList(new AliasAction.Add("test3", "alias", null, null, null, true, null)) + List.of(new AliasAction.Add("test3", "alias", null, null, null, true, null)) ); assertTrue(after.metadata().index("test3").getAliases().get("alias").writeIndex()); assertThat( @@ -675,6 +674,44 @@ public void testDataStreamAliasesWithWriteFlag() { assertThat(result.metadata().dataStreamAliases().get("logs-http"), nullValue()); } + public void testAddAndRemoveAliasClusterStateUpdate() throws Exception { + // Create a state with a single index + String index = randomAlphaOfLength(5); + ClusterState before = createIndex(ClusterState.builder(ClusterName.DEFAULT).build(), index); + IndicesAliasesClusterStateUpdateRequest addAliasRequest = new IndicesAliasesClusterStateUpdateRequest( + List.of(new AliasAction.Add(index, "test", null, null, null, null, null)) + ); + IndicesAliasesClusterStateUpdateRequest removeAliasRequest = new IndicesAliasesClusterStateUpdateRequest( + List.of(new AliasAction.Remove(index, "test", true)) + ); + + ClusterState after = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful( + before, + service.getExecutor(), + List.of( + new MetadataIndexAliasesService.ApplyAliasesTask(addAliasRequest, null), + // Repeat the same change to ensure that the clte version won't increase + new MetadataIndexAliasesService.ApplyAliasesTask(addAliasRequest, null), + new MetadataIndexAliasesService.ApplyAliasesTask(removeAliasRequest, null), + new MetadataIndexAliasesService.ApplyAliasesTask(addAliasRequest, null) + ) + ); + + IndexAbstraction alias = after.metadata().getIndicesLookup().get("test"); + assertNotNull(alias); + assertThat(alias.getType(), equalTo(IndexAbstraction.Type.ALIAS)); + assertThat(alias.getIndices(), contains(after.metadata().index(index).getIndex())); + assertAliasesVersionIncreased(new String[] { index }, before, after, 3); + assertThat(after.metadata().aliasedIndices("test"), contains(after.metadata().index(index).getIndex())); + } + + public void testEmptyTaskListProducesSameClusterState() throws Exception { + String index = randomAlphaOfLength(5); + ClusterState before = createIndex(ClusterState.builder(ClusterName.DEFAULT).build(), index); + ClusterState after = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(before, service.getExecutor(), List.of()); + assertSame(before, after); + } + private ClusterState applyHiddenAliasMix(ClusterState before, Boolean isHidden1, Boolean isHidden2) { return service.applyAliasActions( before, @@ -711,11 +748,19 @@ private void assertAliasesVersionIncreased(final String index, final ClusterStat } private void assertAliasesVersionIncreased(final String[] indices, final ClusterState before, final ClusterState after) { + assertAliasesVersionIncreased(indices, before, after, 1); + } + + private void assertAliasesVersionIncreased( + final String[] indices, + final ClusterState before, + final ClusterState after, + final int diff + ) { for (final var index : indices) { - final long expected = 1 + before.metadata().index(index).getAliasesVersion(); + final long expected = diff + before.metadata().index(index).getAliasesVersion(); final long actual = after.metadata().index(index).getAliasesVersion(); assertThat("index metadata aliases version mismatch", actual, equalTo(expected)); } } - }