From 1e338c62ed9e172e9351323f48b0bb7b58ac0693 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 2 Mar 2022 08:09:45 +0000 Subject: [PATCH] Remove LegacyCTRAL from AutoCreateAction (#84170) Relates #83784 --- .../indices/create/AutoCreateAction.java | 154 ++++++++++++------ 1 file changed, 103 insertions(+), 51 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java index c44b831711704..0f7b77083eacb 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.admin.indices.alias.Alias; @@ -16,11 +17,12 @@ import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.ActiveShardsObserver; import org.elasticsearch.action.support.AutoCreateIndex; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; -import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateAckListener; +import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; @@ -30,12 +32,14 @@ import org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest; import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.Maps; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.indices.SystemDataStreamDescriptor; import org.elasticsearch.indices.SystemIndexDescriptor; @@ -47,7 +51,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_HIDDEN; @@ -106,20 +109,12 @@ public TransportAction( this.autoCreateIndex = autoCreateIndex; executor = (currentState, taskContexts) -> { ClusterState state = currentState; - final Map successfulRequests = Maps.newMapWithExpectedSize(taskContexts.size()); + final Map successfulRequests = Maps.newMapWithExpectedSize(taskContexts.size()); for (final var taskContext : taskContexts) { final var task = taskContext.getTask(); try { - final CreateIndexTask successfulBefore = successfulRequests.putIfAbsent(task.request, task); - if (successfulBefore == null) { - state = task.execute(state); - } else { - // TODO: clean this up to just deduplicate the task listener instead of setting the generated name from - // duplicate tasks here and then waiting for shards to become available multiple times in parallel for - // each duplicate task - task.indexNameRef.set(successfulBefore.indexNameRef.get()); - } - taskContext.success(new ClusterStateTaskExecutor.LegacyClusterTaskResultActionListener(task, currentState), task); + state = task.execute(state, successfulRequests, taskContext); + assert successfulRequests.containsKey(task.request); } catch (Exception e) { taskContext.onFailure(e); } @@ -136,26 +131,14 @@ protected void masterOperation( Task task, CreateIndexRequest request, ClusterState state, - ActionListener finalListener + ActionListener listener ) { - AtomicReference indexNameRef = new AtomicReference<>(); - ActionListener listener = ActionListener.wrap(response -> { - String indexName = indexNameRef.get(); - assert indexName != null; - if (response.isAcknowledged()) { - activeShardsObserver.waitForActiveShards( - new String[] { indexName }, - ActiveShardCount.DEFAULT, - request.timeout(), - shardsAcked -> finalListener.onResponse(new CreateIndexResponse(true, shardsAcked, indexName)), - finalListener::onFailure - ); - } else { - finalListener.onResponse(new CreateIndexResponse(false, false, indexName)); - } - }, finalListener::onFailure); - CreateIndexTask clusterTask = new CreateIndexTask(request, listener, indexNameRef); - clusterService.submitStateUpdateTask("auto create [" + request.index() + "]", clusterTask, clusterTask, executor); + clusterService.submitStateUpdateTask( + "auto create [" + request.index() + "]", + new CreateIndexTask(request, listener), + ClusterStateTaskConfig.build(Priority.URGENT, request.masterNodeTimeout()), + executor + ); } @Override @@ -163,25 +146,87 @@ protected ClusterBlockException checkBlock(CreateIndexRequest request, ClusterSt return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.index()); } - // TODO: split the listner out of this task and use AckedClusterStateTaskListener directly to avoid the complicated listener - // construction upstream when instantiating these - private final class CreateIndexTask extends AckedClusterStateUpdateTask { - - final CreateIndexRequest request; - final AtomicReference indexNameRef; + private final class CreateIndexTask implements ClusterStateTaskListener { + private final CreateIndexRequest request; + private final ActionListener listener; - CreateIndexTask( - CreateIndexRequest request, - ActionListener listener, - AtomicReference indexNameRef - ) { - super(Priority.URGENT, request, listener); + private CreateIndexTask(CreateIndexRequest request, ActionListener listener) { this.request = request; - this.indexNameRef = indexNameRef; + this.listener = listener; } @Override - public ClusterState execute(ClusterState currentState) throws Exception { + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { + assert false : "should not be called"; + } + + private ClusterStateAckListener getAckListener(String indexName) { + return new ClusterStateAckListener() { + @Override + public boolean mustAck(DiscoveryNode discoveryNode) { + return true; + } + + @Override + public void onAllNodesAcked(Exception e) { + if (e == null) { + activeShardsObserver.waitForActiveShards( + new String[] { indexName }, + ActiveShardCount.DEFAULT, + request.timeout(), + shardsAcked -> listener.onResponse(new CreateIndexResponse(true, shardsAcked, indexName)), + listener::onFailure + ); + } else { + listener.onResponse(new CreateIndexResponse(false, false, indexName)); + } + } + + @Override + public void onAckTimeout() { + listener.onResponse(new CreateIndexResponse(false, false, indexName)); + } + + @Override + public TimeValue ackTimeout() { + return request.ackTimeout(); + } + }; + } + + /** + * @param successfulRequests Cache of successful requests executed by this batch, to avoid failing duplicate requests with a + * {@link ResourceAlreadyExistsException}. If this method executes a request it should update this + * map. + */ + ClusterState execute( + ClusterState currentState, + Map successfulRequests, + ClusterStateTaskExecutor.TaskContext taskContext + ) throws Exception { + final ActionListener publishListener = new ActionListener<>() { + @Override + public void onResponse(ClusterState clusterState) { + // nothing to do here, listener is completed at the end of acking + } + + @Override + public void onFailure(Exception e) { + CreateIndexTask.this.onFailure(e); + } + }; + + final var previousIndexName = successfulRequests.get(request); + if (previousIndexName != null) { + taskContext.success(publishListener, getAckListener(previousIndexName)); + return currentState; + } + final SystemDataStreamDescriptor dataStreamDescriptor = systemIndices.validateDataStreamAccess( request.index(), threadPool.getThreadContext() @@ -208,11 +253,13 @@ public ClusterState execute(ClusterState currentState) throws Exception { false ); ClusterState clusterState = metadataCreateDataStreamService.createDataStream(createRequest, currentState); - indexNameRef.set(clusterState.metadata().dataStreams().get(request.index()).getIndices().get(0).getName()); + + final var indexName = clusterState.metadata().dataStreams().get(request.index()).getIndices().get(0).getName(); + taskContext.success(publishListener, getAckListener(indexName)); + successfulRequests.put(request, indexName); return clusterState; } else { - String indexName = IndexNameExpressionResolver.resolveDateMathExpression(request.index()); - indexNameRef.set(indexName); + final var indexName = IndexNameExpressionResolver.resolveDateMathExpression(request.index()); if (isSystemIndex) { if (indexName.equals(request.index()) == false) { throw new IllegalStateException("system indices do not support date math expressions"); @@ -223,6 +270,8 @@ public ClusterState execute(ClusterState currentState) throws Exception { if (shouldAutoCreate == false) { // The index already exists. + taskContext.success(publishListener, getAckListener(indexName)); + successfulRequests.put(request, indexName); return currentState; } } @@ -259,7 +308,10 @@ public ClusterState execute(ClusterState currentState) throws Exception { updateRequest = buildUpdateRequest(indexName); } - return createIndexService.applyCreateIndexRequest(currentState, updateRequest, false); + final var clusterState = createIndexService.applyCreateIndexRequest(currentState, updateRequest, false); + taskContext.success(publishListener, getAckListener(indexName)); + successfulRequests.put(request, indexName); + return clusterState; } }