From b98066869b38e2b4e706b94a7d3ca0fb96063409 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 13 Jan 2022 14:33:46 +0100 Subject: [PATCH 1/2] Fix Concurrent Index Auto Create Failing Bulk Requests Batching these requests introduced a bug where auto-create requests for system indices would fail because system indices are always auto-created and thus throw resource-already-exists if multiple equal ones are batched together even though the index doesn't yet exist in the cluster state but only in the intermediary task executor state. This leads to bulk requests ignoring the exeception (thinking that the index already exists) in their auto-create callback when the request doesn't yet exist. Fixed by deduplicating these requests for now, added a TODO to do it a little nicer down the road but this fix is somewhat urgent as it breaks ML integ tests. --- .../indices/create/CreateSystemIndicesIT.java | 32 ++ .../indices/create/AutoCreateAction.java | 288 ++++++++++-------- 2 files changed, 187 insertions(+), 133 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/create/CreateSystemIndicesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/create/CreateSystemIndicesIT.java index b200e438adb7e..1331c2e50c1e7 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/create/CreateSystemIndicesIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/create/CreateSystemIndicesIT.java @@ -8,6 +8,7 @@ package org.elasticsearch.action.admin.indices.create; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse; @@ -17,7 +18,11 @@ import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.admin.indices.template.delete.DeleteComposableIndexTemplateAction; import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.MappingMetadata; @@ -40,6 +45,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.elasticsearch.indices.TestSystemIndexDescriptor.INDEX_NAME; @@ -201,6 +208,31 @@ private void doCreateTest(Runnable runnable, String concreteIndex) { assertAliases(concreteIndex); } + public void testConcurrentAutoCreates() throws InterruptedException { + internalCluster().startNodes(3); + + final Client client = client(); + final int count = randomIntBetween(5, 30); + final CountDownLatch latch = new CountDownLatch(count); + final ActionListener listener = new ActionListener<>() { + @Override + public void onResponse(BulkResponse o) { + latch.countDown(); + assertFalse(o.hasFailures()); + } + + @Override + public void onFailure(Exception e) { + latch.countDown(); + throw new AssertionError(e); + } + }; + for (int i = 0; i < count; i++) { + client.bulk(new BulkRequest().add(new IndexRequest(INDEX_NAME).source(Map.of("foo", "bar"))), listener); + } + assertTrue(latch.await(30L, TimeUnit.SECONDS)); + } + /** * Make sure that aliases are created hidden */ diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java index 8b9afa55de375..837163c5775d2 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java @@ -44,6 +44,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; @@ -72,7 +74,7 @@ public static final class TransportAction extends TransportMasterNodeAction executor; + private final ClusterStateTaskExecutor executor; @Inject public TransportAction( @@ -104,17 +106,28 @@ public TransportAction( this.metadataCreateDataStreamService = metadataCreateDataStreamService; this.autoCreateIndex = autoCreateIndex; executor = (currentState, tasks) -> { - ClusterTasksResult.Builder builder = ClusterTasksResult.builder(); + ClusterTasksResult.Builder builder = ClusterTasksResult.builder(); ClusterState state = currentState; - for (AckedClusterStateUpdateTask task : tasks) { + final Map successfulRequests = new HashMap<>(tasks.size()); + for (CreateIndexTask task : tasks) { try { - state = task.execute(state); + final CreateIndexTask successfulBefore = successfulRequests.putIfAbsent(task.request, task); + if (successfulBefore == null) { + state = task.execute(state); + } else { + // TODO: clean this up to just deduplicate the task listener instead of setting the generated name from + // duplicate tasks here and then waiting for shards to become available multiple times in parallel for + // each duplicate task + task.indexNameRef.set(successfulBefore.indexNameRef.get()); + } builder.success(task); } catch (Exception e) { builder.failure(task, e); } } - state = allocationService.reroute(state, "auto-create"); + if (state != currentState) { + state = allocationService.reroute(state, "auto-create"); + } return builder.build(state); }; } @@ -135,157 +148,166 @@ protected void masterOperation( new String[] { indexName }, ActiveShardCount.DEFAULT, request.timeout(), - shardsAcked -> { finalListener.onResponse(new CreateIndexResponse(true, shardsAcked, indexName)); }, + shardsAcked -> finalListener.onResponse(new CreateIndexResponse(true, shardsAcked, indexName)), finalListener::onFailure ); } else { finalListener.onResponse(new CreateIndexResponse(false, false, indexName)); } }, finalListener::onFailure); - // TODO: move this to custom class instead of AckedClusterStateUpdateTask - AckedClusterStateUpdateTask clusterTask = new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) { + CreateIndexTask clusterTask = new CreateIndexTask(request, listener, indexNameRef); + clusterService.submitStateUpdateTask("auto create [" + request.index() + "]", clusterTask, clusterTask, executor, clusterTask); + } - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - final SystemDataStreamDescriptor dataStreamDescriptor = systemIndices.validateDataStreamAccess( - request.index(), - threadPool.getThreadContext() - ); - final boolean isSystemDataStream = dataStreamDescriptor != null; - final boolean isSystemIndex = isSystemDataStream == false && systemIndices.isSystemIndex(request.index()); - final ComposableIndexTemplate template = resolveTemplate(request, currentState.metadata()); - final boolean isDataStream = isSystemIndex == false - && (isSystemDataStream || (template != null && template.getDataStreamTemplate() != null)); - - if (isDataStream) { - // This expression only evaluates to true when the argument is non-null and false - if (isSystemDataStream == false && Boolean.FALSE.equals(template.getAllowAutoCreate())) { - throw new IndexNotFoundException( - "composable template " + template.indexPatterns() + " forbids index auto creation" - ); - } + @Override + protected ClusterBlockException checkBlock(CreateIndexRequest request, ClusterState state) { + return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.index()); + } - CreateDataStreamClusterStateUpdateRequest createRequest = new CreateDataStreamClusterStateUpdateRequest( - request.index(), - dataStreamDescriptor, - request.masterNodeTimeout(), - request.timeout(), - false + private final class CreateIndexTask extends AckedClusterStateUpdateTask { + + final CreateIndexRequest request; + final AtomicReference indexNameRef; + + CreateIndexTask( + CreateIndexRequest request, + ActionListener listener, + AtomicReference indexNameRef + ) { + super(Priority.URGENT, request, listener); + this.request = request; + this.indexNameRef = indexNameRef; + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + final SystemDataStreamDescriptor dataStreamDescriptor = systemIndices.validateDataStreamAccess( + request.index(), + threadPool.getThreadContext() + ); + final boolean isSystemDataStream = dataStreamDescriptor != null; + final boolean isSystemIndex = isSystemDataStream == false && systemIndices.isSystemIndex(request.index()); + final ComposableIndexTemplate template = resolveTemplate(request, currentState.metadata()); + final boolean isDataStream = isSystemIndex == false + && (isSystemDataStream || (template != null && template.getDataStreamTemplate() != null)); + + if (isDataStream) { + // This expression only evaluates to true when the argument is non-null and false + if (isSystemDataStream == false && Boolean.FALSE.equals(template.getAllowAutoCreate())) { + throw new IndexNotFoundException( + "composable template " + template.indexPatterns() + " forbids index auto creation" ); - ClusterState clusterState = metadataCreateDataStreamService.createDataStream(createRequest, currentState); - indexNameRef.set(clusterState.metadata().dataStreams().get(request.index()).getIndices().get(0).getName()); - return clusterState; - } else { - String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index()); - indexNameRef.set(indexName); - if (isSystemIndex) { - if (indexName.equals(request.index()) == false) { - throw new IllegalStateException("system indices do not support date math expressions"); - } - } else { - // This will throw an exception if the index does not exist and creating it is prohibited - final boolean shouldAutoCreate = autoCreateIndex.shouldAutoCreate(indexName, currentState); + } - if (shouldAutoCreate == false) { - // The index already exists. - return currentState; - } + CreateDataStreamClusterStateUpdateRequest createRequest = new CreateDataStreamClusterStateUpdateRequest( + request.index(), + dataStreamDescriptor, + request.masterNodeTimeout(), + request.timeout(), + false + ); + ClusterState clusterState = metadataCreateDataStreamService.createDataStream(createRequest, currentState); + indexNameRef.set(clusterState.metadata().dataStreams().get(request.index()).getIndices().get(0).getName()); + return clusterState; + } else { + String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index()); + indexNameRef.set(indexName); + if (isSystemIndex) { + if (indexName.equals(request.index()) == false) { + throw new IllegalStateException("system indices do not support date math expressions"); } + } else { + // This will throw an exception if the index does not exist and creating it is prohibited + final boolean shouldAutoCreate = autoCreateIndex.shouldAutoCreate(indexName, currentState); - final SystemIndexDescriptor mainDescriptor = isSystemIndex ? systemIndices.findMatchingDescriptor(indexName) : null; - final boolean isManagedSystemIndex = mainDescriptor != null && mainDescriptor.isAutomaticallyManaged(); - - final CreateIndexClusterStateUpdateRequest updateRequest; - - if (isManagedSystemIndex) { - final SystemIndexDescriptor descriptor = mainDescriptor.getDescriptorCompatibleWith( - state.nodes().getSmallestNonClientNodeVersion() - ); - if (descriptor == null) { - final String message = mainDescriptor.getMinimumNodeVersionMessage("auto-create index"); - logger.warn(message); - throw new IllegalStateException(message); - } - - updateRequest = buildSystemIndexUpdateRequest(indexName, descriptor); - } else if (isSystemIndex) { - updateRequest = buildUpdateRequest(indexName); - - if (Objects.isNull(request.settings())) { - updateRequest.settings(SystemIndexDescriptor.DEFAULT_SETTINGS); - } else if (false == request.settings().hasValue(SETTING_INDEX_HIDDEN)) { - updateRequest.settings(Settings.builder().put(request.settings()).put(SETTING_INDEX_HIDDEN, true).build()); - } else if ("false".equals(request.settings().get(SETTING_INDEX_HIDDEN))) { - final String message = "Cannot auto-create system index [" - + indexName - + "] with [index.hidden] set to 'false'"; - logger.warn(message); - throw new IllegalStateException(message); - } - } else { - updateRequest = buildUpdateRequest(indexName); + if (shouldAutoCreate == false) { + // The index already exists. + return currentState; } - - return createIndexService.applyCreateIndexRequest(currentState, updateRequest, false); } - } - private CreateIndexClusterStateUpdateRequest buildUpdateRequest(String indexName) { - CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest( - request.cause(), - indexName, - request.index() - ).ackTimeout(request.timeout()).performReroute(false).masterNodeTimeout(request.masterNodeTimeout()); - logger.debug("Auto-creating index {}", indexName); - return updateRequest; - } + final SystemIndexDescriptor mainDescriptor = isSystemIndex ? systemIndices.findMatchingDescriptor(indexName) : null; + final boolean isManagedSystemIndex = mainDescriptor != null && mainDescriptor.isAutomaticallyManaged(); - private CreateIndexClusterStateUpdateRequest buildSystemIndexUpdateRequest( - String indexName, - SystemIndexDescriptor descriptor - ) { - String mappings = descriptor.getMappings(); - Settings settings = descriptor.getSettings(); - String aliasName = descriptor.getAliasName(); + final CreateIndexClusterStateUpdateRequest updateRequest; - // if we are writing to the alias name, we should create the primary index here - String concreteIndexName = indexName.equals(aliasName) ? descriptor.getPrimaryIndex() : indexName; - - CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest( - request.cause(), - concreteIndexName, - request.index() - ).ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()).performReroute(false); - - updateRequest.waitForActiveShards(ActiveShardCount.ALL); - - if (mappings != null) { - updateRequest.mappings(mappings); - } - if (settings != null) { - updateRequest.settings(settings); - } - if (aliasName != null) { - updateRequest.aliases(Set.of(new Alias(aliasName).isHidden(true))); - } + if (isManagedSystemIndex) { + final SystemIndexDescriptor descriptor = mainDescriptor.getDescriptorCompatibleWith( + currentState.nodes().getSmallestNonClientNodeVersion() + ); + if (descriptor == null) { + final String message = mainDescriptor.getMinimumNodeVersionMessage("auto-create index"); + logger.warn(message); + throw new IllegalStateException(message); + } - if (logger.isDebugEnabled()) { - if (concreteIndexName.equals(indexName) == false) { - logger.debug("Auto-creating backing system index {} for alias {}", concreteIndexName, indexName); - } else { - logger.debug("Auto-creating system index {}", concreteIndexName); + updateRequest = buildSystemIndexUpdateRequest(indexName, descriptor); + } else if (isSystemIndex) { + updateRequest = buildUpdateRequest(indexName); + + if (Objects.isNull(request.settings())) { + updateRequest.settings(SystemIndexDescriptor.DEFAULT_SETTINGS); + } else if (false == request.settings().hasValue(SETTING_INDEX_HIDDEN)) { + updateRequest.settings(Settings.builder().put(request.settings()).put(SETTING_INDEX_HIDDEN, true).build()); + } else if ("false".equals(request.settings().get(SETTING_INDEX_HIDDEN))) { + final String message = "Cannot auto-create system index [" + indexName + "] with [index.hidden] set to 'false'"; + logger.warn(message); + throw new IllegalStateException(message); } + } else { + updateRequest = buildUpdateRequest(indexName); } - return updateRequest; + return createIndexService.applyCreateIndexRequest(currentState, updateRequest, false); + } + } + + private CreateIndexClusterStateUpdateRequest buildUpdateRequest(String indexName) { + CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest( + request.cause(), + indexName, + request.index() + ).ackTimeout(request.timeout()).performReroute(false).masterNodeTimeout(request.masterNodeTimeout()); + logger.debug("Auto-creating index {}", indexName); + return updateRequest; + } + + private CreateIndexClusterStateUpdateRequest buildSystemIndexUpdateRequest(String indexName, SystemIndexDescriptor descriptor) { + String mappings = descriptor.getMappings(); + Settings settings = descriptor.getSettings(); + String aliasName = descriptor.getAliasName(); + + // if we are writing to the alias name, we should create the primary index here + String concreteIndexName = indexName.equals(aliasName) ? descriptor.getPrimaryIndex() : indexName; + + CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest( + request.cause(), + concreteIndexName, + request.index() + ).ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()).performReroute(false); + + updateRequest.waitForActiveShards(ActiveShardCount.ALL); + + if (mappings != null) { + updateRequest.mappings(mappings); + } + if (settings != null) { + updateRequest.settings(settings); + } + if (aliasName != null) { + updateRequest.aliases(Set.of(new Alias(aliasName).isHidden(true))); } - }; - clusterService.submitStateUpdateTask("auto create [" + request.index() + "]", clusterTask, clusterTask, executor, clusterTask); - } - @Override - protected ClusterBlockException checkBlock(CreateIndexRequest request, ClusterState state) { - return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.index()); + if (logger.isDebugEnabled()) { + if (concreteIndexName.equals(indexName) == false) { + logger.debug("Auto-creating backing system index {} for alias {}", concreteIndexName, indexName); + } else { + logger.debug("Auto-creating system index {}", concreteIndexName); + } + } + + return updateRequest; + } } } From f289236997ae61131d9b33f8b96b45ec1f78f45f Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 14 Jan 2022 13:46:39 +0100 Subject: [PATCH 2/2] add todo --- .../action/admin/indices/create/AutoCreateAction.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java index 837163c5775d2..c45e7cc2f9fdb 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java @@ -164,6 +164,8 @@ protected ClusterBlockException checkBlock(CreateIndexRequest request, ClusterSt return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.index()); } + // TODO: split the listner out of this task and use AckedClusterStateTaskListener directly to avoid the complicated listener + // construction upstream when instantiating these private final class CreateIndexTask extends AckedClusterStateUpdateTask { final CreateIndexRequest request;