Skip to content

Commit

Permalink
Batch ApplyAliasActions cluster state updates (#89924) (#90010)
Browse files Browse the repository at this point in the history
Move the alias updates to be processed in a batched way. This should give a performance boost when there are a lot of alias updates since they have high priority.
  • Loading branch information
gmarouli authored Dec 30, 2022
1 parent c40615c commit e0496fa
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,20 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesClusterStateUpdateRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterStateAckListener;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.SimpleBatchedAckListenerTaskExecutor;
import org.elasticsearch.cluster.metadata.AliasAction.NewAliasValidator;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
Expand Down Expand Up @@ -55,6 +60,8 @@ public class MetadataIndexAliasesService {

private final NamedXContentRegistry xContentRegistry;

private final ClusterStateTaskExecutor<ApplyAliasesTask> executor;

@Inject
public MetadataIndexAliasesService(
ClusterService clusterService,
Expand All @@ -66,20 +73,19 @@ public MetadataIndexAliasesService(
this.indicesService = indicesService;
this.deleteIndexService = deleteIndexService;
this.xContentRegistry = xContentRegistry;
}
this.executor = new SimpleBatchedAckListenerTaskExecutor<>() {

public void indicesAliases(final IndicesAliasesClusterStateUpdateRequest request, final ActionListener<AcknowledgedResponse> listener) {
submitUnbatchedTask("index-aliases", new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) {
@Override
public ClusterState execute(ClusterState currentState) {
return applyAliasActions(currentState, request.actions());
public Tuple<ClusterState, ClusterStateAckListener> executeTask(ApplyAliasesTask applyAliasesTask, ClusterState clusterState) {
return new Tuple<>(applyAliasActions(clusterState, applyAliasesTask.request().actions()), applyAliasesTask);
}
});
};
}

@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
clusterService.submitUnbatchedStateUpdateTask(source, task);
public void indicesAliases(final IndicesAliasesClusterStateUpdateRequest request, final ActionListener<AcknowledgedResponse> listener) {
var task = new ApplyAliasesTask(request, listener);
var config = ClusterStateTaskConfig.build(Priority.URGENT);
clusterService.submitStateUpdateTask("index-aliases", task, config, executor);
}

/**
Expand Down Expand Up @@ -198,6 +204,11 @@ public ClusterState applyAliasActions(ClusterState currentState, Iterable<AliasA
}
}

// Visible for testing purposes
ClusterStateTaskExecutor<ApplyAliasesTask> getExecutor() {
return executor;
}

private void validateFilter(
List<Index> indicesToClose,
Map<String, IndexService> indices,
Expand Down Expand Up @@ -244,4 +255,43 @@ private static void validateAliasTargetIsNotDSBackingIndex(ClusterState currentS
);
}
}

/**
* A cluster state update task that consists of the cluster state request and the listeners that need to be notified upon completion.
*/
record ApplyAliasesTask(IndicesAliasesClusterStateUpdateRequest request, ActionListener<AcknowledgedResponse> listener)
implements
ClusterStateTaskListener,
ClusterStateAckListener {

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
}

@Override
public void onAllNodesAcked() {
listener.onResponse(AcknowledgedResponse.TRUE);
}

@Override
public void onAckFailure(Exception e) {
listener.onResponse(AcknowledgedResponse.FALSE);
}

@Override
public void onAckTimeout() {
listener.onResponse(AcknowledgedResponse.FALSE);
}

@Override
public TimeValue ackTimeout() {
return request.ackTimeout();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@

import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesClusterStateUpdateRequest;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.Tuple;
Expand Down Expand Up @@ -292,21 +294,21 @@ public void testAddWriteOnlyWithNoExistingAliases() {

ClusterState after = service.applyAliasActions(
before,
Arrays.asList(new AliasAction.Add("test", "alias", null, null, null, false, null))
List.of(new AliasAction.Add("test", "alias", null, null, null, false, null))
);
assertFalse(after.metadata().index("test").getAliases().get("alias").writeIndex());
assertNull(after.metadata().getIndicesLookup().get("alias").getWriteIndex());
assertAliasesVersionIncreased("test", before, after);

after = service.applyAliasActions(before, Arrays.asList(new AliasAction.Add("test", "alias", null, null, null, null, null)));
after = service.applyAliasActions(before, List.of(new AliasAction.Add("test", "alias", null, null, null, null, null)));
assertNull(after.metadata().index("test").getAliases().get("alias").writeIndex());
assertThat(
after.metadata().index(after.metadata().getIndicesLookup().get("alias").getWriteIndex()),
equalTo(after.metadata().index("test"))
);
assertAliasesVersionIncreased("test", before, after);

after = service.applyAliasActions(before, Arrays.asList(new AliasAction.Add("test", "alias", null, null, null, true, null)));
after = service.applyAliasActions(before, List.of(new AliasAction.Add("test", "alias", null, null, null, true, null)));
assertTrue(after.metadata().index("test").getAliases().get("alias").writeIndex());
assertThat(
after.metadata().index(after.metadata().getIndicesLookup().get("alias").getWriteIndex()),
Expand All @@ -329,10 +331,7 @@ public void testAddWriteOnlyWithExistingWriteIndex() {
.metadata(Metadata.builder().put(indexMetadata).put(indexMetadata2))
.build();

ClusterState after = service.applyAliasActions(
before,
Arrays.asList(new AliasAction.Add("test", "alias", null, null, null, null, null))
);
ClusterState after = service.applyAliasActions(before, List.of(new AliasAction.Add("test", "alias", null, null, null, null, null)));
assertNull(after.metadata().index("test").getAliases().get("alias").writeIndex());
assertThat(
after.metadata().index(after.metadata().getIndicesLookup().get("alias").getWriteIndex()),
Expand All @@ -343,7 +342,7 @@ public void testAddWriteOnlyWithExistingWriteIndex() {

Exception exception = expectThrows(
IllegalStateException.class,
() -> service.applyAliasActions(before, Arrays.asList(new AliasAction.Add("test", "alias", null, null, null, true, null)))
() -> service.applyAliasActions(before, List.of(new AliasAction.Add("test", "alias", null, null, null, true, null)))
);
assertThat(exception.getMessage(), startsWith("alias [alias] has more than one write index ["));
}
Expand Down Expand Up @@ -402,7 +401,7 @@ public void testAddWriteOnlyWithExistingNonWriteIndices() {

ClusterState after = service.applyAliasActions(
before,
Arrays.asList(new AliasAction.Add("test3", "alias", null, null, null, true, null))
List.of(new AliasAction.Add("test3", "alias", null, null, null, true, null))
);
assertTrue(after.metadata().index("test3").getAliases().get("alias").writeIndex());
assertThat(
Expand Down Expand Up @@ -675,6 +674,44 @@ public void testDataStreamAliasesWithWriteFlag() {
assertThat(result.metadata().dataStreamAliases().get("logs-http"), nullValue());
}

public void testAddAndRemoveAliasClusterStateUpdate() throws Exception {
// Create a state with a single index
String index = randomAlphaOfLength(5);
ClusterState before = createIndex(ClusterState.builder(ClusterName.DEFAULT).build(), index);
IndicesAliasesClusterStateUpdateRequest addAliasRequest = new IndicesAliasesClusterStateUpdateRequest(
List.of(new AliasAction.Add(index, "test", null, null, null, null, null))
);
IndicesAliasesClusterStateUpdateRequest removeAliasRequest = new IndicesAliasesClusterStateUpdateRequest(
List.of(new AliasAction.Remove(index, "test", true))
);

ClusterState after = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(
before,
service.getExecutor(),
List.of(
new MetadataIndexAliasesService.ApplyAliasesTask(addAliasRequest, null),
// Repeat the same change to ensure that the clte version won't increase
new MetadataIndexAliasesService.ApplyAliasesTask(addAliasRequest, null),
new MetadataIndexAliasesService.ApplyAliasesTask(removeAliasRequest, null),
new MetadataIndexAliasesService.ApplyAliasesTask(addAliasRequest, null)
)
);

IndexAbstraction alias = after.metadata().getIndicesLookup().get("test");
assertNotNull(alias);
assertThat(alias.getType(), equalTo(IndexAbstraction.Type.ALIAS));
assertThat(alias.getIndices(), contains(after.metadata().index(index).getIndex()));
assertAliasesVersionIncreased(new String[] { index }, before, after, 3);
assertThat(after.metadata().aliasedIndices("test"), contains(after.metadata().index(index).getIndex()));
}

public void testEmptyTaskListProducesSameClusterState() throws Exception {
String index = randomAlphaOfLength(5);
ClusterState before = createIndex(ClusterState.builder(ClusterName.DEFAULT).build(), index);
ClusterState after = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(before, service.getExecutor(), List.of());
assertSame(before, after);
}

private ClusterState applyHiddenAliasMix(ClusterState before, Boolean isHidden1, Boolean isHidden2) {
return service.applyAliasActions(
before,
Expand Down Expand Up @@ -711,11 +748,19 @@ private void assertAliasesVersionIncreased(final String index, final ClusterStat
}

private void assertAliasesVersionIncreased(final String[] indices, final ClusterState before, final ClusterState after) {
assertAliasesVersionIncreased(indices, before, after, 1);
}

private void assertAliasesVersionIncreased(
final String[] indices,
final ClusterState before,
final ClusterState after,
final int diff
) {
for (final var index : indices) {
final long expected = 1 + before.metadata().index(index).getAliasesVersion();
final long expected = diff + before.metadata().index(index).getAliasesVersion();
final long actual = after.metadata().index(index).getAliasesVersion();
assertThat("index metadata aliases version mismatch", actual, equalTo(expected));
}
}

}

0 comments on commit e0496fa

Please sign in to comment.