diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/create/CreateSystemIndicesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/create/CreateSystemIndicesIT.java index b200e438adb7e..1331c2e50c1e7 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/create/CreateSystemIndicesIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/create/CreateSystemIndicesIT.java @@ -8,6 +8,7 @@ package org.elasticsearch.action.admin.indices.create; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse; @@ -17,7 +18,11 @@ import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.admin.indices.template.delete.DeleteComposableIndexTemplateAction; import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.MappingMetadata; @@ -40,6 +45,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.elasticsearch.indices.TestSystemIndexDescriptor.INDEX_NAME; @@ -201,6 +208,31 @@ private void doCreateTest(Runnable runnable, String concreteIndex) { assertAliases(concreteIndex); } + public void testConcurrentAutoCreates() throws InterruptedException { + internalCluster().startNodes(3); + + final Client client = client(); + final int count = randomIntBetween(5, 30); + final CountDownLatch latch = new CountDownLatch(count); + final ActionListener listener = new ActionListener<>() { + @Override + public void onResponse(BulkResponse o) { + latch.countDown(); + assertFalse(o.hasFailures()); + } + + @Override + public void onFailure(Exception e) { + latch.countDown(); + throw new AssertionError(e); + } + }; + for (int i = 0; i < count; i++) { + client.bulk(new BulkRequest().add(new IndexRequest(INDEX_NAME).source(Map.of("foo", "bar"))), listener); + } + assertTrue(latch.await(30L, TimeUnit.SECONDS)); + } + /** * Make sure that aliases are created hidden */ diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java index 8b9afa55de375..c45e7cc2f9fdb 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java @@ -44,6 +44,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; @@ -72,7 +74,7 @@ public static final class TransportAction extends TransportMasterNodeAction executor; + private final ClusterStateTaskExecutor executor; @Inject public TransportAction( @@ -104,17 +106,28 @@ public TransportAction( this.metadataCreateDataStreamService = metadataCreateDataStreamService; this.autoCreateIndex = autoCreateIndex; executor = (currentState, tasks) -> { - ClusterTasksResult.Builder builder = ClusterTasksResult.builder(); + ClusterTasksResult.Builder builder = ClusterTasksResult.builder(); ClusterState state = currentState; - for (AckedClusterStateUpdateTask task : tasks) { + final Map successfulRequests = new HashMap<>(tasks.size()); + for (CreateIndexTask task : tasks) { try { - state = task.execute(state); + final CreateIndexTask successfulBefore = successfulRequests.putIfAbsent(task.request, task); + if (successfulBefore == null) { + state = task.execute(state); + } else { + // TODO: clean this up to just deduplicate the task listener instead of setting the generated name from + // duplicate tasks here and then waiting for shards to become available multiple times in parallel for + // each duplicate task + task.indexNameRef.set(successfulBefore.indexNameRef.get()); + } builder.success(task); } catch (Exception e) { builder.failure(task, e); } } - state = allocationService.reroute(state, "auto-create"); + if (state != currentState) { + state = allocationService.reroute(state, "auto-create"); + } return builder.build(state); }; } @@ -135,157 +148,168 @@ protected void masterOperation( new String[] { indexName }, ActiveShardCount.DEFAULT, request.timeout(), - shardsAcked -> { finalListener.onResponse(new CreateIndexResponse(true, shardsAcked, indexName)); }, + shardsAcked -> finalListener.onResponse(new CreateIndexResponse(true, shardsAcked, indexName)), finalListener::onFailure ); } else { finalListener.onResponse(new CreateIndexResponse(false, false, indexName)); } }, finalListener::onFailure); - // TODO: move this to custom class instead of AckedClusterStateUpdateTask - AckedClusterStateUpdateTask clusterTask = new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) { + CreateIndexTask clusterTask = new CreateIndexTask(request, listener, indexNameRef); + clusterService.submitStateUpdateTask("auto create [" + request.index() + "]", clusterTask, clusterTask, executor, clusterTask); + } - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - final SystemDataStreamDescriptor dataStreamDescriptor = systemIndices.validateDataStreamAccess( - request.index(), - threadPool.getThreadContext() - ); - final boolean isSystemDataStream = dataStreamDescriptor != null; - final boolean isSystemIndex = isSystemDataStream == false && systemIndices.isSystemIndex(request.index()); - final ComposableIndexTemplate template = resolveTemplate(request, currentState.metadata()); - final boolean isDataStream = isSystemIndex == false - && (isSystemDataStream || (template != null && template.getDataStreamTemplate() != null)); - - if (isDataStream) { - // This expression only evaluates to true when the argument is non-null and false - if (isSystemDataStream == false && Boolean.FALSE.equals(template.getAllowAutoCreate())) { - throw new IndexNotFoundException( - "composable template " + template.indexPatterns() + " forbids index auto creation" - ); - } + @Override + protected ClusterBlockException checkBlock(CreateIndexRequest request, ClusterState state) { + return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.index()); + } - CreateDataStreamClusterStateUpdateRequest createRequest = new CreateDataStreamClusterStateUpdateRequest( - request.index(), - dataStreamDescriptor, - request.masterNodeTimeout(), - request.timeout(), - false + // TODO: split the listner out of this task and use AckedClusterStateTaskListener directly to avoid the complicated listener + // construction upstream when instantiating these + private final class CreateIndexTask extends AckedClusterStateUpdateTask { + + final CreateIndexRequest request; + final AtomicReference indexNameRef; + + CreateIndexTask( + CreateIndexRequest request, + ActionListener listener, + AtomicReference indexNameRef + ) { + super(Priority.URGENT, request, listener); + this.request = request; + this.indexNameRef = indexNameRef; + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + final SystemDataStreamDescriptor dataStreamDescriptor = systemIndices.validateDataStreamAccess( + request.index(), + threadPool.getThreadContext() + ); + final boolean isSystemDataStream = dataStreamDescriptor != null; + final boolean isSystemIndex = isSystemDataStream == false && systemIndices.isSystemIndex(request.index()); + final ComposableIndexTemplate template = resolveTemplate(request, currentState.metadata()); + final boolean isDataStream = isSystemIndex == false + && (isSystemDataStream || (template != null && template.getDataStreamTemplate() != null)); + + if (isDataStream) { + // This expression only evaluates to true when the argument is non-null and false + if (isSystemDataStream == false && Boolean.FALSE.equals(template.getAllowAutoCreate())) { + throw new IndexNotFoundException( + "composable template " + template.indexPatterns() + " forbids index auto creation" ); - ClusterState clusterState = metadataCreateDataStreamService.createDataStream(createRequest, currentState); - indexNameRef.set(clusterState.metadata().dataStreams().get(request.index()).getIndices().get(0).getName()); - return clusterState; - } else { - String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index()); - indexNameRef.set(indexName); - if (isSystemIndex) { - if (indexName.equals(request.index()) == false) { - throw new IllegalStateException("system indices do not support date math expressions"); - } - } else { - // This will throw an exception if the index does not exist and creating it is prohibited - final boolean shouldAutoCreate = autoCreateIndex.shouldAutoCreate(indexName, currentState); + } - if (shouldAutoCreate == false) { - // The index already exists. - return currentState; - } + CreateDataStreamClusterStateUpdateRequest createRequest = new CreateDataStreamClusterStateUpdateRequest( + request.index(), + dataStreamDescriptor, + request.masterNodeTimeout(), + request.timeout(), + false + ); + ClusterState clusterState = metadataCreateDataStreamService.createDataStream(createRequest, currentState); + indexNameRef.set(clusterState.metadata().dataStreams().get(request.index()).getIndices().get(0).getName()); + return clusterState; + } else { + String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index()); + indexNameRef.set(indexName); + if (isSystemIndex) { + if (indexName.equals(request.index()) == false) { + throw new IllegalStateException("system indices do not support date math expressions"); } + } else { + // This will throw an exception if the index does not exist and creating it is prohibited + final boolean shouldAutoCreate = autoCreateIndex.shouldAutoCreate(indexName, currentState); - final SystemIndexDescriptor mainDescriptor = isSystemIndex ? systemIndices.findMatchingDescriptor(indexName) : null; - final boolean isManagedSystemIndex = mainDescriptor != null && mainDescriptor.isAutomaticallyManaged(); - - final CreateIndexClusterStateUpdateRequest updateRequest; - - if (isManagedSystemIndex) { - final SystemIndexDescriptor descriptor = mainDescriptor.getDescriptorCompatibleWith( - state.nodes().getSmallestNonClientNodeVersion() - ); - if (descriptor == null) { - final String message = mainDescriptor.getMinimumNodeVersionMessage("auto-create index"); - logger.warn(message); - throw new IllegalStateException(message); - } - - updateRequest = buildSystemIndexUpdateRequest(indexName, descriptor); - } else if (isSystemIndex) { - updateRequest = buildUpdateRequest(indexName); - - if (Objects.isNull(request.settings())) { - updateRequest.settings(SystemIndexDescriptor.DEFAULT_SETTINGS); - } else if (false == request.settings().hasValue(SETTING_INDEX_HIDDEN)) { - updateRequest.settings(Settings.builder().put(request.settings()).put(SETTING_INDEX_HIDDEN, true).build()); - } else if ("false".equals(request.settings().get(SETTING_INDEX_HIDDEN))) { - final String message = "Cannot auto-create system index [" - + indexName - + "] with [index.hidden] set to 'false'"; - logger.warn(message); - throw new IllegalStateException(message); - } - } else { - updateRequest = buildUpdateRequest(indexName); + if (shouldAutoCreate == false) { + // The index already exists. + return currentState; } - - return createIndexService.applyCreateIndexRequest(currentState, updateRequest, false); } - } - private CreateIndexClusterStateUpdateRequest buildUpdateRequest(String indexName) { - CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest( - request.cause(), - indexName, - request.index() - ).ackTimeout(request.timeout()).performReroute(false).masterNodeTimeout(request.masterNodeTimeout()); - logger.debug("Auto-creating index {}", indexName); - return updateRequest; - } + final SystemIndexDescriptor mainDescriptor = isSystemIndex ? systemIndices.findMatchingDescriptor(indexName) : null; + final boolean isManagedSystemIndex = mainDescriptor != null && mainDescriptor.isAutomaticallyManaged(); - private CreateIndexClusterStateUpdateRequest buildSystemIndexUpdateRequest( - String indexName, - SystemIndexDescriptor descriptor - ) { - String mappings = descriptor.getMappings(); - Settings settings = descriptor.getSettings(); - String aliasName = descriptor.getAliasName(); + final CreateIndexClusterStateUpdateRequest updateRequest; - // if we are writing to the alias name, we should create the primary index here - String concreteIndexName = indexName.equals(aliasName) ? descriptor.getPrimaryIndex() : indexName; - - CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest( - request.cause(), - concreteIndexName, - request.index() - ).ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()).performReroute(false); - - updateRequest.waitForActiveShards(ActiveShardCount.ALL); - - if (mappings != null) { - updateRequest.mappings(mappings); - } - if (settings != null) { - updateRequest.settings(settings); - } - if (aliasName != null) { - updateRequest.aliases(Set.of(new Alias(aliasName).isHidden(true))); - } + if (isManagedSystemIndex) { + final SystemIndexDescriptor descriptor = mainDescriptor.getDescriptorCompatibleWith( + currentState.nodes().getSmallestNonClientNodeVersion() + ); + if (descriptor == null) { + final String message = mainDescriptor.getMinimumNodeVersionMessage("auto-create index"); + logger.warn(message); + throw new IllegalStateException(message); + } - if (logger.isDebugEnabled()) { - if (concreteIndexName.equals(indexName) == false) { - logger.debug("Auto-creating backing system index {} for alias {}", concreteIndexName, indexName); - } else { - logger.debug("Auto-creating system index {}", concreteIndexName); + updateRequest = buildSystemIndexUpdateRequest(indexName, descriptor); + } else if (isSystemIndex) { + updateRequest = buildUpdateRequest(indexName); + + if (Objects.isNull(request.settings())) { + updateRequest.settings(SystemIndexDescriptor.DEFAULT_SETTINGS); + } else if (false == request.settings().hasValue(SETTING_INDEX_HIDDEN)) { + updateRequest.settings(Settings.builder().put(request.settings()).put(SETTING_INDEX_HIDDEN, true).build()); + } else if ("false".equals(request.settings().get(SETTING_INDEX_HIDDEN))) { + final String message = "Cannot auto-create system index [" + indexName + "] with [index.hidden] set to 'false'"; + logger.warn(message); + throw new IllegalStateException(message); } + } else { + updateRequest = buildUpdateRequest(indexName); } - return updateRequest; + return createIndexService.applyCreateIndexRequest(currentState, updateRequest, false); + } + } + + private CreateIndexClusterStateUpdateRequest buildUpdateRequest(String indexName) { + CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest( + request.cause(), + indexName, + request.index() + ).ackTimeout(request.timeout()).performReroute(false).masterNodeTimeout(request.masterNodeTimeout()); + logger.debug("Auto-creating index {}", indexName); + return updateRequest; + } + + private CreateIndexClusterStateUpdateRequest buildSystemIndexUpdateRequest(String indexName, SystemIndexDescriptor descriptor) { + String mappings = descriptor.getMappings(); + Settings settings = descriptor.getSettings(); + String aliasName = descriptor.getAliasName(); + + // if we are writing to the alias name, we should create the primary index here + String concreteIndexName = indexName.equals(aliasName) ? descriptor.getPrimaryIndex() : indexName; + + CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest( + request.cause(), + concreteIndexName, + request.index() + ).ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()).performReroute(false); + + updateRequest.waitForActiveShards(ActiveShardCount.ALL); + + if (mappings != null) { + updateRequest.mappings(mappings); + } + if (settings != null) { + updateRequest.settings(settings); + } + if (aliasName != null) { + updateRequest.aliases(Set.of(new Alias(aliasName).isHidden(true))); } - }; - clusterService.submitStateUpdateTask("auto create [" + request.index() + "]", clusterTask, clusterTask, executor, clusterTask); - } - @Override - protected ClusterBlockException checkBlock(CreateIndexRequest request, ClusterState state) { - return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.index()); + if (logger.isDebugEnabled()) { + if (concreteIndexName.equals(indexName) == false) { + logger.debug("Auto-creating backing system index {} for alias {}", concreteIndexName, indexName); + } else { + logger.debug("Auto-creating system index {}", concreteIndexName); + } + } + + return updateRequest; + } } }