Skip to content

Commit

Permalink
Remove LegacyCTRAL from AutoCreateAction (#84170)
Browse files Browse the repository at this point in the history
Relates #83784
  • Loading branch information
DaveCTurner authored Mar 2, 2022
1 parent 8364efa commit 1e338c6
Showing 1 changed file with 103 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,20 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.ActiveShardsObserver;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateAckListener;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
Expand All @@ -30,12 +32,14 @@
import org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.SystemDataStreamDescriptor;
import org.elasticsearch.indices.SystemIndexDescriptor;
Expand All @@ -47,7 +51,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_HIDDEN;

Expand Down Expand Up @@ -106,20 +109,12 @@ public TransportAction(
this.autoCreateIndex = autoCreateIndex;
executor = (currentState, taskContexts) -> {
ClusterState state = currentState;
final Map<CreateIndexRequest, CreateIndexTask> successfulRequests = Maps.newMapWithExpectedSize(taskContexts.size());
final Map<CreateIndexRequest, String> successfulRequests = Maps.newMapWithExpectedSize(taskContexts.size());
for (final var taskContext : taskContexts) {
final var task = taskContext.getTask();
try {
final CreateIndexTask successfulBefore = successfulRequests.putIfAbsent(task.request, task);
if (successfulBefore == null) {
state = task.execute(state);
} else {
// TODO: clean this up to just deduplicate the task listener instead of setting the generated name from
// duplicate tasks here and then waiting for shards to become available multiple times in parallel for
// each duplicate task
task.indexNameRef.set(successfulBefore.indexNameRef.get());
}
taskContext.success(new ClusterStateTaskExecutor.LegacyClusterTaskResultActionListener(task, currentState), task);
state = task.execute(state, successfulRequests, taskContext);
assert successfulRequests.containsKey(task.request);
} catch (Exception e) {
taskContext.onFailure(e);
}
Expand All @@ -136,52 +131,102 @@ protected void masterOperation(
Task task,
CreateIndexRequest request,
ClusterState state,
ActionListener<CreateIndexResponse> finalListener
ActionListener<CreateIndexResponse> listener
) {
AtomicReference<String> indexNameRef = new AtomicReference<>();
ActionListener<AcknowledgedResponse> listener = ActionListener.wrap(response -> {
String indexName = indexNameRef.get();
assert indexName != null;
if (response.isAcknowledged()) {
activeShardsObserver.waitForActiveShards(
new String[] { indexName },
ActiveShardCount.DEFAULT,
request.timeout(),
shardsAcked -> finalListener.onResponse(new CreateIndexResponse(true, shardsAcked, indexName)),
finalListener::onFailure
);
} else {
finalListener.onResponse(new CreateIndexResponse(false, false, indexName));
}
}, finalListener::onFailure);
CreateIndexTask clusterTask = new CreateIndexTask(request, listener, indexNameRef);
clusterService.submitStateUpdateTask("auto create [" + request.index() + "]", clusterTask, clusterTask, executor);
clusterService.submitStateUpdateTask(
"auto create [" + request.index() + "]",
new CreateIndexTask(request, listener),
ClusterStateTaskConfig.build(Priority.URGENT, request.masterNodeTimeout()),
executor
);
}

@Override
protected ClusterBlockException checkBlock(CreateIndexRequest request, ClusterState state) {
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.index());
}

// TODO: split the listner out of this task and use AckedClusterStateTaskListener directly to avoid the complicated listener
// construction upstream when instantiating these
private final class CreateIndexTask extends AckedClusterStateUpdateTask {

final CreateIndexRequest request;
final AtomicReference<String> indexNameRef;
private final class CreateIndexTask implements ClusterStateTaskListener {
private final CreateIndexRequest request;
private final ActionListener<CreateIndexResponse> listener;

CreateIndexTask(
CreateIndexRequest request,
ActionListener<AcknowledgedResponse> listener,
AtomicReference<String> indexNameRef
) {
super(Priority.URGENT, request, listener);
private CreateIndexTask(CreateIndexRequest request, ActionListener<CreateIndexResponse> listener) {
this.request = request;
this.indexNameRef = indexNameRef;
this.listener = listener;
}

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
assert false : "should not be called";
}

private ClusterStateAckListener getAckListener(String indexName) {
return new ClusterStateAckListener() {
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
}

@Override
public void onAllNodesAcked(Exception e) {
if (e == null) {
activeShardsObserver.waitForActiveShards(
new String[] { indexName },
ActiveShardCount.DEFAULT,
request.timeout(),
shardsAcked -> listener.onResponse(new CreateIndexResponse(true, shardsAcked, indexName)),
listener::onFailure
);
} else {
listener.onResponse(new CreateIndexResponse(false, false, indexName));
}
}

@Override
public void onAckTimeout() {
listener.onResponse(new CreateIndexResponse(false, false, indexName));
}

@Override
public TimeValue ackTimeout() {
return request.ackTimeout();
}
};
}

/**
* @param successfulRequests Cache of successful requests executed by this batch, to avoid failing duplicate requests with a
* {@link ResourceAlreadyExistsException}. If this method executes a request it should update this
* map.
*/
ClusterState execute(
ClusterState currentState,
Map<CreateIndexRequest, String> successfulRequests,
ClusterStateTaskExecutor.TaskContext<CreateIndexTask> taskContext
) throws Exception {
final ActionListener<ClusterState> publishListener = new ActionListener<>() {
@Override
public void onResponse(ClusterState clusterState) {
// nothing to do here, listener is completed at the end of acking
}

@Override
public void onFailure(Exception e) {
CreateIndexTask.this.onFailure(e);
}
};

final var previousIndexName = successfulRequests.get(request);
if (previousIndexName != null) {
taskContext.success(publishListener, getAckListener(previousIndexName));
return currentState;
}

final SystemDataStreamDescriptor dataStreamDescriptor = systemIndices.validateDataStreamAccess(
request.index(),
threadPool.getThreadContext()
Expand All @@ -208,11 +253,13 @@ public ClusterState execute(ClusterState currentState) throws Exception {
false
);
ClusterState clusterState = metadataCreateDataStreamService.createDataStream(createRequest, currentState);
indexNameRef.set(clusterState.metadata().dataStreams().get(request.index()).getIndices().get(0).getName());

final var indexName = clusterState.metadata().dataStreams().get(request.index()).getIndices().get(0).getName();
taskContext.success(publishListener, getAckListener(indexName));
successfulRequests.put(request, indexName);
return clusterState;
} else {
String indexName = IndexNameExpressionResolver.resolveDateMathExpression(request.index());
indexNameRef.set(indexName);
final var indexName = IndexNameExpressionResolver.resolveDateMathExpression(request.index());
if (isSystemIndex) {
if (indexName.equals(request.index()) == false) {
throw new IllegalStateException("system indices do not support date math expressions");
Expand All @@ -223,6 +270,8 @@ public ClusterState execute(ClusterState currentState) throws Exception {

if (shouldAutoCreate == false) {
// The index already exists.
taskContext.success(publishListener, getAckListener(indexName));
successfulRequests.put(request, indexName);
return currentState;
}
}
Expand Down Expand Up @@ -259,7 +308,10 @@ public ClusterState execute(ClusterState currentState) throws Exception {
updateRequest = buildUpdateRequest(indexName);
}

return createIndexService.applyCreateIndexRequest(currentState, updateRequest, false);
final var clusterState = createIndexService.applyCreateIndexRequest(currentState, updateRequest, false);
taskContext.success(publishListener, getAckListener(indexName));
successfulRequests.put(request, indexName);
return clusterState;
}
}

Expand Down

0 comments on commit 1e338c6

Please sign in to comment.