Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch Index Settings Update Requests #82896

Merged
merged 13 commits into from
Jan 25, 2022
6 changes: 6 additions & 0 deletions docs/changelog/82896.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 82896
summary: Batch Index Settings Update Requests
area: Cluster Coordination
type: enhancement
issues:
- 79866
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.routing.RoutingTable;
Expand Down Expand Up @@ -50,13 +51,12 @@ public class MetadataUpdateSettingsService {
private static final Logger logger = LogManager.getLogger(MetadataUpdateSettingsService.class);

private final ClusterService clusterService;

private final AllocationService allocationService;

private final IndexScopedSettings indexScopedSettings;
private final IndicesService indicesService;
private final ShardLimitValidator shardLimitValidator;
private final ThreadPool threadPool;
private final ClusterStateTaskExecutor<AckedClusterStateUpdateTask> executor;

public MetadataUpdateSettingsService(
ClusterService clusterService,
Expand All @@ -67,11 +67,28 @@ public MetadataUpdateSettingsService(
ThreadPool threadPool
) {
this.clusterService = clusterService;
this.threadPool = threadPool;
this.allocationService = allocationService;
this.indexScopedSettings = indexScopedSettings;
this.indicesService = indicesService;
this.shardLimitValidator = shardLimitValidator;
this.threadPool = threadPool;
this.executor = (currentState, tasks) -> {
ClusterTasksResult.Builder<AckedClusterStateUpdateTask> builder = ClusterTasksResult.builder();
ClusterState state = currentState;
for (AckedClusterStateUpdateTask task : tasks) {
try {
state = task.execute(state);
builder.success(task);
} catch (Exception e) {
builder.failure(task, e);
}
}
if (state != currentState) {
// reroute in case things change that require it (like number of replicas)
state = allocationService.reroute(state, "settings update");
}
return builder.build(state);
};
}

public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request, final ActionListener<AcknowledgedResponse> listener) {
Expand Down Expand Up @@ -105,149 +122,149 @@ public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request
final Settings openSettings = settingsForOpenIndices.build();
final boolean preserveExisting = request.isPreserveExisting();

clusterService.submitStateUpdateTask(
"update-settings " + Arrays.toString(request.indices()),
new AckedClusterStateUpdateTask(Priority.URGENT, request, wrapPreservingContext(listener, threadPool.getThreadContext())) {
// TODO: move this to custom class instead of AckedClusterStateUpdateTask
AckedClusterStateUpdateTask clusterTask = new AckedClusterStateUpdateTask(
Priority.URGENT,
request,
wrapPreservingContext(listener, threadPool.getThreadContext())
) {
@Override
public ClusterState execute(ClusterState currentState) {
RoutingTable.Builder routingTableBuilder = null;
Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata());

@Override
public ClusterState execute(ClusterState currentState) {

RoutingTable.Builder routingTableBuilder = null;
Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata());

// allow to change any settings to a close index, and only allow dynamic settings to be changed
// on an open index
Set<Index> openIndices = new HashSet<>();
Set<Index> closeIndices = new HashSet<>();
final String[] actualIndices = new String[request.indices().length];
for (int i = 0; i < request.indices().length; i++) {
Index index = request.indices()[i];
actualIndices[i] = index.getName();
final IndexMetadata metadata = currentState.metadata().getIndexSafe(index);
if (metadata.getState() == IndexMetadata.State.OPEN) {
openIndices.add(index);
} else {
closeIndices.add(index);
}
// allow to change any settings to a closed index, and only allow dynamic settings to be changed
// on an open index
Set<Index> openIndices = new HashSet<>();
Set<Index> closedIndices = new HashSet<>();
final String[] actualIndices = new String[request.indices().length];
for (int i = 0; i < request.indices().length; i++) {
Index index = request.indices()[i];
actualIndices[i] = index.getName();
final IndexMetadata metadata = currentState.metadata().getIndexSafe(index);
if (metadata.getState() == IndexMetadata.State.OPEN) {
openIndices.add(index);
} else {
closedIndices.add(index);
}
}

if (skippedSettings.isEmpty() == false && openIndices.isEmpty() == false) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"Can't update non dynamic settings [%s] for open indices %s",
skippedSettings,
openIndices
)
);
}
if (skippedSettings.isEmpty() == false && openIndices.isEmpty() == false) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"Can't update non dynamic settings [%s] for open indices %s",
skippedSettings,
openIndices
)
);
}

if (IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.exists(openSettings)) {
final int updatedNumberOfReplicas = IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(openSettings);
if (preserveExisting == false) {
// Verify that this won't take us over the cluster shard limit.
shardLimitValidator.validateShardLimitOnReplicaUpdate(currentState, request.indices(), updatedNumberOfReplicas);
if (IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.exists(openSettings)) {
final int updatedNumberOfReplicas = IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(openSettings);
if (preserveExisting == false) {
// Verify that this won't take us over the cluster shard limit.
shardLimitValidator.validateShardLimitOnReplicaUpdate(currentState, request.indices(), updatedNumberOfReplicas);

/*
* We do not update the in-sync allocation IDs as they will be removed upon the first index operation
* which makes these copies stale.
*
* TODO: should we update the in-sync allocation IDs once the data is deleted by the node?
*/
routingTableBuilder = RoutingTable.builder(currentState.routingTable());
routingTableBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices);
metadataBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices);
logger.info("updating number_of_replicas to [{}] for indices {}", updatedNumberOfReplicas, actualIndices);
}
/*
* We do not update the in-sync allocation IDs as they will be removed upon the first index operation
* which makes these copies stale.
*
* TODO: should we update the in-sync allocation IDs once the data is deleted by the node?
*/
routingTableBuilder = RoutingTable.builder(currentState.routingTable());
routingTableBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices);
metadataBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices);
logger.info("updating number_of_replicas to [{}] for indices {}", updatedNumberOfReplicas, actualIndices);
}
}

updateIndexSettings(
openIndices,
metadataBuilder,
(index, indexSettings) -> indexScopedSettings.updateDynamicSettings(
openSettings,
indexSettings,
Settings.builder(),
index.getName()
),
preserveExisting,
indexScopedSettings
);
updateIndexSettings(
openIndices,
metadataBuilder,
(index, indexSettings) -> indexScopedSettings.updateDynamicSettings(
openSettings,
indexSettings,
Settings.builder(),
index.getName()
),
preserveExisting,
indexScopedSettings
);

updateIndexSettings(
closeIndices,
metadataBuilder,
(index, indexSettings) -> indexScopedSettings.updateSettings(
closedSettings,
indexSettings,
Settings.builder(),
index.getName()
),
preserveExisting,
indexScopedSettings
);
updateIndexSettings(
closedIndices,
metadataBuilder,
(index, indexSettings) -> indexScopedSettings.updateSettings(
closedSettings,
indexSettings,
Settings.builder(),
index.getName()
),
preserveExisting,
indexScopedSettings
);

if (IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.exists(normalizedSettings)
|| IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.exists(normalizedSettings)) {
for (String index : actualIndices) {
final Settings settings = metadataBuilder.get(index).getSettings();
MetadataCreateIndexService.validateTranslogRetentionSettings(settings);
MetadataCreateIndexService.validateStoreTypeSetting(settings);
}
if (IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.exists(normalizedSettings)
|| IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.exists(normalizedSettings)) {
for (String index : actualIndices) {
final Settings settings = metadataBuilder.get(index).getSettings();
MetadataCreateIndexService.validateTranslogRetentionSettings(settings);
MetadataCreateIndexService.validateStoreTypeSetting(settings);
}
boolean changed = false;
// increment settings versions
for (final String index : actualIndices) {
if (same(currentState.metadata().index(index).getSettings(), metadataBuilder.get(index).getSettings()) == false) {
changed = true;
final IndexMetadata.Builder builder = IndexMetadata.builder(metadataBuilder.get(index));
builder.settingsVersion(1 + builder.settingsVersion());
metadataBuilder.put(builder);
}
}
boolean changed = false;
// increment settings versions
for (final String index : actualIndices) {
if (same(currentState.metadata().index(index).getSettings(), metadataBuilder.get(index).getSettings()) == false) {
changed = true;
final IndexMetadata.Builder builder = IndexMetadata.builder(metadataBuilder.get(index));
builder.settingsVersion(1 + builder.settingsVersion());
metadataBuilder.put(builder);
}
}

final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
boolean changedBlocks = false;
for (IndexMetadata.APIBlock block : IndexMetadata.APIBlock.values()) {
changedBlocks |= maybeUpdateClusterBlock(actualIndices, blocks, block.block, block.setting, openSettings);
}
changed |= changedBlocks;
final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
boolean changedBlocks = false;
for (IndexMetadata.APIBlock block : IndexMetadata.APIBlock.values()) {
changedBlocks |= maybeUpdateClusterBlock(actualIndices, blocks, block.block, block.setting, openSettings);
}
changed |= changedBlocks;

if (changed == false) {
return currentState;
}
if (changed == false) {
return currentState;
}

ClusterState updatedState = ClusterState.builder(currentState)
.metadata(metadataBuilder)
.routingTable(routingTableBuilder == null ? currentState.routingTable() : routingTableBuilder.build())
.blocks(changedBlocks ? blocks.build() : currentState.blocks())
.build();
ClusterState updatedState = ClusterState.builder(currentState)
.metadata(metadataBuilder)
.routingTable(routingTableBuilder == null ? currentState.routingTable() : routingTableBuilder.build())
.blocks(changedBlocks ? blocks.build() : currentState.blocks())
.build();

// now, reroute in case things change that require it (like number of replicas)
updatedState = allocationService.reroute(updatedState, "settings update");
try {
for (Index index : openIndices) {
final IndexMetadata currentMetadata = currentState.getMetadata().getIndexSafe(index);
final IndexMetadata updatedMetadata = updatedState.metadata().getIndexSafe(index);
indicesService.verifyIndexMetadata(currentMetadata, updatedMetadata);
}
for (Index index : closeIndices) {
final IndexMetadata currentMetadata = currentState.getMetadata().getIndexSafe(index);
final IndexMetadata updatedMetadata = updatedState.metadata().getIndexSafe(index);
// Verifies that the current index settings can be updated with the updated dynamic settings.
indicesService.verifyIndexMetadata(currentMetadata, updatedMetadata);
// Now check that we can create the index with the updated settings (dynamic and non-dynamic).
// This step is mandatory since we allow to update non-dynamic settings on closed indices.
indicesService.verifyIndexMetadata(updatedMetadata, updatedMetadata);
}
} catch (IOException ex) {
throw ExceptionsHelper.convertToElastic(ex);
try {
for (Index index : openIndices) {
final IndexMetadata currentMetadata = currentState.metadata().getIndexSafe(index);
final IndexMetadata updatedMetadata = updatedState.metadata().getIndexSafe(index);
indicesService.verifyIndexMetadata(currentMetadata, updatedMetadata);
}
return updatedState;
for (Index index : closedIndices) {
final IndexMetadata currentMetadata = currentState.metadata().getIndexSafe(index);
final IndexMetadata updatedMetadata = updatedState.metadata().getIndexSafe(index);
// Verifies that the current index settings can be updated with the updated dynamic settings.
indicesService.verifyIndexMetadata(currentMetadata, updatedMetadata);
// Now check that we can create the index with the updated settings (dynamic and non-dynamic).
// This step is mandatory since we allow to update non-dynamic settings on closed indices.
indicesService.verifyIndexMetadata(updatedMetadata, updatedMetadata);
}
} catch (IOException ex) {
throw ExceptionsHelper.convertToElastic(ex);
}
},
ClusterStateTaskExecutor.unbatched()
);

return updatedState;
}
};

clusterService.submitStateUpdateTask("update-settings " + Arrays.toString(request.indices()), clusterTask, this.executor);
}

public static void updateIndexSettings(
Expand All @@ -256,7 +273,6 @@ public static void updateIndexSettings(
BiFunction<Index, Settings.Builder, Boolean> settingUpdater,
Boolean preserveExisting,
IndexScopedSettings indexScopedSettings

) {
for (Index index : indices) {
IndexMetadata indexMetadata = metadataBuilder.getSafe(index);
Expand Down
Loading