diff --git a/docs/changelog/82896.yaml b/docs/changelog/82896.yaml new file mode 100644 index 0000000000000..8267ce5a55594 --- /dev/null +++ b/docs/changelog/82896.yaml @@ -0,0 +1,6 @@ +pr: 82896 +summary: Batch Index Settings Update Requests +area: Cluster Coordination +type: enhancement +issues: + - 79866 diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java index ce6843c71bc89..ae06e6f6f9636 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java @@ -17,6 +17,7 @@ import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.routing.RoutingTable; @@ -50,13 +51,12 @@ public class MetadataUpdateSettingsService { private static final Logger logger = LogManager.getLogger(MetadataUpdateSettingsService.class); private final ClusterService clusterService; - private final AllocationService allocationService; - private final IndexScopedSettings indexScopedSettings; private final IndicesService indicesService; private final ShardLimitValidator shardLimitValidator; private final ThreadPool threadPool; + private final ClusterStateTaskExecutor executor; public MetadataUpdateSettingsService( ClusterService clusterService, @@ -67,11 +67,28 @@ public MetadataUpdateSettingsService( ThreadPool threadPool ) { this.clusterService = clusterService; - this.threadPool = threadPool; this.allocationService = allocationService; this.indexScopedSettings = indexScopedSettings; this.indicesService = indicesService; this.shardLimitValidator = shardLimitValidator; + this.threadPool = threadPool; + this.executor = (currentState, tasks) -> { + ClusterTasksResult.Builder builder = ClusterTasksResult.builder(); + ClusterState state = currentState; + for (AckedClusterStateUpdateTask task : tasks) { + try { + state = task.execute(state); + builder.success(task); + } catch (Exception e) { + builder.failure(task, e); + } + } + if (state != currentState) { + // reroute in case things change that require it (like number of replicas) + state = allocationService.reroute(state, "settings update"); + } + return builder.build(state); + }; } public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request, final ActionListener listener) { @@ -105,149 +122,149 @@ public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request final Settings openSettings = settingsForOpenIndices.build(); final boolean preserveExisting = request.isPreserveExisting(); - clusterService.submitStateUpdateTask( - "update-settings " + Arrays.toString(request.indices()), - new AckedClusterStateUpdateTask(Priority.URGENT, request, wrapPreservingContext(listener, threadPool.getThreadContext())) { + // TODO: move this to custom class instead of AckedClusterStateUpdateTask + AckedClusterStateUpdateTask clusterTask = new AckedClusterStateUpdateTask( + Priority.URGENT, + request, + wrapPreservingContext(listener, threadPool.getThreadContext()) + ) { + @Override + public ClusterState execute(ClusterState currentState) { + RoutingTable.Builder routingTableBuilder = null; + Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata()); - @Override - public ClusterState execute(ClusterState currentState) { - - RoutingTable.Builder routingTableBuilder = null; - Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata()); - - // allow to change any settings to a close index, and only allow dynamic settings to be changed - // on an open index - Set openIndices = new HashSet<>(); - Set closeIndices = new HashSet<>(); - final String[] actualIndices = new String[request.indices().length]; - for (int i = 0; i < request.indices().length; i++) { - Index index = request.indices()[i]; - actualIndices[i] = index.getName(); - final IndexMetadata metadata = currentState.metadata().getIndexSafe(index); - if (metadata.getState() == IndexMetadata.State.OPEN) { - openIndices.add(index); - } else { - closeIndices.add(index); - } + // allow to change any settings to a closed index, and only allow dynamic settings to be changed + // on an open index + Set openIndices = new HashSet<>(); + Set closedIndices = new HashSet<>(); + final String[] actualIndices = new String[request.indices().length]; + for (int i = 0; i < request.indices().length; i++) { + Index index = request.indices()[i]; + actualIndices[i] = index.getName(); + final IndexMetadata metadata = currentState.metadata().getIndexSafe(index); + if (metadata.getState() == IndexMetadata.State.OPEN) { + openIndices.add(index); + } else { + closedIndices.add(index); } + } - if (skippedSettings.isEmpty() == false && openIndices.isEmpty() == false) { - throw new IllegalArgumentException( - String.format( - Locale.ROOT, - "Can't update non dynamic settings [%s] for open indices %s", - skippedSettings, - openIndices - ) - ); - } + if (skippedSettings.isEmpty() == false && openIndices.isEmpty() == false) { + throw new IllegalArgumentException( + String.format( + Locale.ROOT, + "Can't update non dynamic settings [%s] for open indices %s", + skippedSettings, + openIndices + ) + ); + } - if (IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.exists(openSettings)) { - final int updatedNumberOfReplicas = IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(openSettings); - if (preserveExisting == false) { - // Verify that this won't take us over the cluster shard limit. - shardLimitValidator.validateShardLimitOnReplicaUpdate(currentState, request.indices(), updatedNumberOfReplicas); + if (IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.exists(openSettings)) { + final int updatedNumberOfReplicas = IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(openSettings); + if (preserveExisting == false) { + // Verify that this won't take us over the cluster shard limit. + shardLimitValidator.validateShardLimitOnReplicaUpdate(currentState, request.indices(), updatedNumberOfReplicas); - /* - * We do not update the in-sync allocation IDs as they will be removed upon the first index operation - * which makes these copies stale. - * - * TODO: should we update the in-sync allocation IDs once the data is deleted by the node? - */ - routingTableBuilder = RoutingTable.builder(currentState.routingTable()); - routingTableBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices); - metadataBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices); - logger.info("updating number_of_replicas to [{}] for indices {}", updatedNumberOfReplicas, actualIndices); - } + /* + * We do not update the in-sync allocation IDs as they will be removed upon the first index operation + * which makes these copies stale. + * + * TODO: should we update the in-sync allocation IDs once the data is deleted by the node? + */ + routingTableBuilder = RoutingTable.builder(currentState.routingTable()); + routingTableBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices); + metadataBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices); + logger.info("updating number_of_replicas to [{}] for indices {}", updatedNumberOfReplicas, actualIndices); } + } - updateIndexSettings( - openIndices, - metadataBuilder, - (index, indexSettings) -> indexScopedSettings.updateDynamicSettings( - openSettings, - indexSettings, - Settings.builder(), - index.getName() - ), - preserveExisting, - indexScopedSettings - ); + updateIndexSettings( + openIndices, + metadataBuilder, + (index, indexSettings) -> indexScopedSettings.updateDynamicSettings( + openSettings, + indexSettings, + Settings.builder(), + index.getName() + ), + preserveExisting, + indexScopedSettings + ); - updateIndexSettings( - closeIndices, - metadataBuilder, - (index, indexSettings) -> indexScopedSettings.updateSettings( - closedSettings, - indexSettings, - Settings.builder(), - index.getName() - ), - preserveExisting, - indexScopedSettings - ); + updateIndexSettings( + closedIndices, + metadataBuilder, + (index, indexSettings) -> indexScopedSettings.updateSettings( + closedSettings, + indexSettings, + Settings.builder(), + index.getName() + ), + preserveExisting, + indexScopedSettings + ); - if (IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.exists(normalizedSettings) - || IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.exists(normalizedSettings)) { - for (String index : actualIndices) { - final Settings settings = metadataBuilder.get(index).getSettings(); - MetadataCreateIndexService.validateTranslogRetentionSettings(settings); - MetadataCreateIndexService.validateStoreTypeSetting(settings); - } + if (IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.exists(normalizedSettings) + || IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.exists(normalizedSettings)) { + for (String index : actualIndices) { + final Settings settings = metadataBuilder.get(index).getSettings(); + MetadataCreateIndexService.validateTranslogRetentionSettings(settings); + MetadataCreateIndexService.validateStoreTypeSetting(settings); } - boolean changed = false; - // increment settings versions - for (final String index : actualIndices) { - if (same(currentState.metadata().index(index).getSettings(), metadataBuilder.get(index).getSettings()) == false) { - changed = true; - final IndexMetadata.Builder builder = IndexMetadata.builder(metadataBuilder.get(index)); - builder.settingsVersion(1 + builder.settingsVersion()); - metadataBuilder.put(builder); - } + } + boolean changed = false; + // increment settings versions + for (final String index : actualIndices) { + if (same(currentState.metadata().index(index).getSettings(), metadataBuilder.get(index).getSettings()) == false) { + changed = true; + final IndexMetadata.Builder builder = IndexMetadata.builder(metadataBuilder.get(index)); + builder.settingsVersion(1 + builder.settingsVersion()); + metadataBuilder.put(builder); } + } - final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); - boolean changedBlocks = false; - for (IndexMetadata.APIBlock block : IndexMetadata.APIBlock.values()) { - changedBlocks |= maybeUpdateClusterBlock(actualIndices, blocks, block.block, block.setting, openSettings); - } - changed |= changedBlocks; + final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); + boolean changedBlocks = false; + for (IndexMetadata.APIBlock block : IndexMetadata.APIBlock.values()) { + changedBlocks |= maybeUpdateClusterBlock(actualIndices, blocks, block.block, block.setting, openSettings); + } + changed |= changedBlocks; - if (changed == false) { - return currentState; - } + if (changed == false) { + return currentState; + } - ClusterState updatedState = ClusterState.builder(currentState) - .metadata(metadataBuilder) - .routingTable(routingTableBuilder == null ? currentState.routingTable() : routingTableBuilder.build()) - .blocks(changedBlocks ? blocks.build() : currentState.blocks()) - .build(); + ClusterState updatedState = ClusterState.builder(currentState) + .metadata(metadataBuilder) + .routingTable(routingTableBuilder == null ? currentState.routingTable() : routingTableBuilder.build()) + .blocks(changedBlocks ? blocks.build() : currentState.blocks()) + .build(); - // now, reroute in case things change that require it (like number of replicas) - updatedState = allocationService.reroute(updatedState, "settings update"); - try { - for (Index index : openIndices) { - final IndexMetadata currentMetadata = currentState.getMetadata().getIndexSafe(index); - final IndexMetadata updatedMetadata = updatedState.metadata().getIndexSafe(index); - indicesService.verifyIndexMetadata(currentMetadata, updatedMetadata); - } - for (Index index : closeIndices) { - final IndexMetadata currentMetadata = currentState.getMetadata().getIndexSafe(index); - final IndexMetadata updatedMetadata = updatedState.metadata().getIndexSafe(index); - // Verifies that the current index settings can be updated with the updated dynamic settings. - indicesService.verifyIndexMetadata(currentMetadata, updatedMetadata); - // Now check that we can create the index with the updated settings (dynamic and non-dynamic). - // This step is mandatory since we allow to update non-dynamic settings on closed indices. - indicesService.verifyIndexMetadata(updatedMetadata, updatedMetadata); - } - } catch (IOException ex) { - throw ExceptionsHelper.convertToElastic(ex); + try { + for (Index index : openIndices) { + final IndexMetadata currentMetadata = currentState.metadata().getIndexSafe(index); + final IndexMetadata updatedMetadata = updatedState.metadata().getIndexSafe(index); + indicesService.verifyIndexMetadata(currentMetadata, updatedMetadata); } - return updatedState; + for (Index index : closedIndices) { + final IndexMetadata currentMetadata = currentState.metadata().getIndexSafe(index); + final IndexMetadata updatedMetadata = updatedState.metadata().getIndexSafe(index); + // Verifies that the current index settings can be updated with the updated dynamic settings. + indicesService.verifyIndexMetadata(currentMetadata, updatedMetadata); + // Now check that we can create the index with the updated settings (dynamic and non-dynamic). + // This step is mandatory since we allow to update non-dynamic settings on closed indices. + indicesService.verifyIndexMetadata(updatedMetadata, updatedMetadata); + } + } catch (IOException ex) { + throw ExceptionsHelper.convertToElastic(ex); } - }, - ClusterStateTaskExecutor.unbatched() - ); + + return updatedState; + } + }; + + clusterService.submitStateUpdateTask("update-settings " + Arrays.toString(request.indices()), clusterTask, this.executor); } public static void updateIndexSettings( @@ -256,7 +273,6 @@ public static void updateIndexSettings( BiFunction settingUpdater, Boolean preserveExisting, IndexScopedSettings indexScopedSettings - ) { for (Index index : indices) { IndexMetadata indexMetadata = metadataBuilder.getSafe(index); diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index 8a98a3dfe6762..66efd2a708e53 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -38,6 +38,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult; +import org.elasticsearch.cluster.ClusterStateTaskExecutor.TaskResult; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.EmptyClusterInfoService; import org.elasticsearch.cluster.action.shard.ShardStateAction; @@ -440,7 +441,7 @@ public ClusterState applyStartedShards(ClusterState clusterState, Map ClusterState runTasks(ClusterStateTaskExecutor executor, ClusterState clusterState, List entries) { try { ClusterTasksResult result = executor.execute(clusterState, entries); - for (ClusterStateTaskExecutor.TaskResult taskResult : result.executionResults().values()) { + for (TaskResult taskResult : result.executionResults().values()) { if (taskResult.isSuccess() == false) { throw taskResult.getFailure(); } @@ -465,16 +466,25 @@ private , Response extends ActionResp }); } + @SuppressWarnings("unchecked") private ClusterState executeClusterStateUpdateTask(ClusterState state, Runnable runnable) { - ClusterState[] result = new ClusterState[1]; + ClusterState[] resultingState = new ClusterState[1]; doAnswer(invocationOnMock -> { ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocationOnMock.getArguments()[1]; - result[0] = task.execute(state); + ClusterStateTaskExecutor executor = (ClusterStateTaskExecutor) invocationOnMock + .getArguments()[2]; + ClusterTasksResult result = executor.execute(state, List.of(task)); + for (TaskResult taskResult : result.executionResults().values()) { + if (taskResult.isSuccess() == false) { + throw taskResult.getFailure(); + } + } + resultingState[0] = result.resultingState(); return null; }).when(clusterService).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class), any()); runnable.run(); - assertThat(result[0], notNullValue()); - return result[0]; + assertThat(resultingState[0], notNullValue()); + return resultingState[0]; } private ActionListener createTestListener() {