Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch auto create index cluster state updates #82159

Merged
merged 4 commits into from
Jan 11, 2022
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
Expand All @@ -29,6 +31,7 @@
import org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -69,6 +72,8 @@ public static final class TransportAction extends TransportMasterNodeAction<Crea
private final AutoCreateIndex autoCreateIndex;
private final SystemIndices systemIndices;

private final ClusterStateTaskExecutor<AckedClusterStateUpdateTask> executor;

@Inject
public TransportAction(
TransportService transportService,
Expand All @@ -79,7 +84,8 @@ public TransportAction(
MetadataCreateIndexService createIndexService,
MetadataCreateDataStreamService metadataCreateDataStreamService,
AutoCreateIndex autoCreateIndex,
SystemIndices systemIndices
SystemIndices systemIndices,
AllocationService allocationService
) {
super(
NAME,
Expand All @@ -97,6 +103,20 @@ public TransportAction(
this.createIndexService = createIndexService;
this.metadataCreateDataStreamService = metadataCreateDataStreamService;
this.autoCreateIndex = autoCreateIndex;
executor = (currentState, tasks) -> {
ClusterTasksResult.Builder<AckedClusterStateUpdateTask> builder = ClusterTasksResult.builder();
ClusterState state = currentState;
for (AckedClusterStateUpdateTask task : tasks) {
try {
state = task.execute(state);
builder.success(task);
} catch (Exception e) {
builder.failure(task, e);
}
}
state = allocationService.reroute(state, "auto-create");
return builder.build(state);
};
}

@Override
Expand All @@ -122,149 +142,143 @@ protected void masterOperation(
finalListener.onResponse(new CreateIndexResponse(false, false, indexName));
}
}, finalListener::onFailure);
clusterService.submitStateUpdateTask(
"auto create [" + request.index() + "]",
new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) {
AckedClusterStateUpdateTask clusterTask = new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) {
probakowski marked this conversation as resolved.
Show resolved Hide resolved

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
final SystemDataStreamDescriptor dataStreamDescriptor = systemIndices.validateDataStreamAccess(
request.index(),
threadPool.getThreadContext()
);
final boolean isSystemDataStream = dataStreamDescriptor != null;
final boolean isSystemIndex = isSystemDataStream == false && systemIndices.isSystemIndex(request.index());
final ComposableIndexTemplate template = resolveTemplate(request, currentState.metadata());
final boolean isDataStream = isSystemIndex == false
&& (isSystemDataStream || (template != null && template.getDataStreamTemplate() != null));

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
final SystemDataStreamDescriptor dataStreamDescriptor = systemIndices.validateDataStreamAccess(
if (isDataStream) {
// This expression only evaluates to true when the argument is non-null and false
if (isSystemDataStream == false && Boolean.FALSE.equals(template.getAllowAutoCreate())) {
throw new IndexNotFoundException(
"composable template " + template.indexPatterns() + " forbids index auto creation"
);
}

CreateDataStreamClusterStateUpdateRequest createRequest = new CreateDataStreamClusterStateUpdateRequest(
request.index(),
threadPool.getThreadContext()
dataStreamDescriptor,
request.masterNodeTimeout(),
request.timeout()
);
final boolean isSystemDataStream = dataStreamDescriptor != null;
final boolean isSystemIndex = isSystemDataStream == false && systemIndices.isSystemIndex(request.index());
final ComposableIndexTemplate template = resolveTemplate(request, currentState.metadata());
final boolean isDataStream = isSystemIndex == false
&& (isSystemDataStream || (template != null && template.getDataStreamTemplate() != null));

if (isDataStream) {
// This expression only evaluates to true when the argument is non-null and false
if (isSystemDataStream == false && Boolean.FALSE.equals(template.getAllowAutoCreate())) {
throw new IndexNotFoundException(
"composable template " + template.indexPatterns() + " forbids index auto creation"
);
ClusterState clusterState = metadataCreateDataStreamService.createDataStream(createRequest, currentState);
indexNameRef.set(clusterState.metadata().dataStreams().get(request.index()).getIndices().get(0).getName());
return clusterState;
} else {
String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index());
indexNameRef.set(indexName);
if (isSystemIndex) {
if (indexName.equals(request.index()) == false) {
throw new IllegalStateException("system indices do not support date math expressions");
}

CreateDataStreamClusterStateUpdateRequest createRequest = new CreateDataStreamClusterStateUpdateRequest(
request.index(),
dataStreamDescriptor,
request.masterNodeTimeout(),
request.timeout()
);
ClusterState clusterState = metadataCreateDataStreamService.createDataStream(createRequest, currentState);
indexNameRef.set(clusterState.metadata().dataStreams().get(request.index()).getIndices().get(0).getName());
return clusterState;
} else {
String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index());
indexNameRef.set(indexName);
if (isSystemIndex) {
if (indexName.equals(request.index()) == false) {
throw new IllegalStateException("system indices do not support date math expressions");
}
} else {
// This will throw an exception if the index does not exist and creating it is prohibited
final boolean shouldAutoCreate = autoCreateIndex.shouldAutoCreate(indexName, currentState);

if (shouldAutoCreate == false) {
// The index already exists.
return currentState;
}
// This will throw an exception if the index does not exist and creating it is prohibited
final boolean shouldAutoCreate = autoCreateIndex.shouldAutoCreate(indexName, currentState);

if (shouldAutoCreate == false) {
// The index already exists.
return currentState;
}
}

final SystemIndexDescriptor mainDescriptor = isSystemIndex ? systemIndices.findMatchingDescriptor(indexName) : null;
final boolean isManagedSystemIndex = mainDescriptor != null && mainDescriptor.isAutomaticallyManaged();

final SystemIndexDescriptor mainDescriptor = isSystemIndex
? systemIndices.findMatchingDescriptor(indexName)
: null;
final boolean isManagedSystemIndex = mainDescriptor != null && mainDescriptor.isAutomaticallyManaged();

final CreateIndexClusterStateUpdateRequest updateRequest;

if (isManagedSystemIndex) {
final SystemIndexDescriptor descriptor = mainDescriptor.getDescriptorCompatibleWith(
state.nodes().getSmallestNonClientNodeVersion()
);
if (descriptor == null) {
final String message = mainDescriptor.getMinimumNodeVersionMessage("auto-create index");
logger.warn(message);
throw new IllegalStateException(message);
}

updateRequest = buildSystemIndexUpdateRequest(indexName, descriptor);
} else if (isSystemIndex) {
updateRequest = buildUpdateRequest(indexName);

if (Objects.isNull(request.settings())) {
updateRequest.settings(SystemIndexDescriptor.DEFAULT_SETTINGS);
} else if (false == request.settings().hasValue(SETTING_INDEX_HIDDEN)) {
updateRequest.settings(
Settings.builder().put(request.settings()).put(SETTING_INDEX_HIDDEN, true).build()
);
} else if ("false".equals(request.settings().get(SETTING_INDEX_HIDDEN))) {
final String message = "Cannot auto-create system index ["
+ indexName
+ "] with [index.hidden] set to 'false'";
logger.warn(message);
throw new IllegalStateException(message);
}
} else {
updateRequest = buildUpdateRequest(indexName);
final CreateIndexClusterStateUpdateRequest updateRequest;

if (isManagedSystemIndex) {
final SystemIndexDescriptor descriptor = mainDescriptor.getDescriptorCompatibleWith(
state.nodes().getSmallestNonClientNodeVersion()
);
if (descriptor == null) {
final String message = mainDescriptor.getMinimumNodeVersionMessage("auto-create index");
logger.warn(message);
throw new IllegalStateException(message);
}

return createIndexService.applyCreateIndexRequest(currentState, updateRequest, false);
updateRequest = buildSystemIndexUpdateRequest(indexName, descriptor);
} else if (isSystemIndex) {
updateRequest = buildUpdateRequest(indexName);

if (Objects.isNull(request.settings())) {
updateRequest.settings(SystemIndexDescriptor.DEFAULT_SETTINGS);
} else if (false == request.settings().hasValue(SETTING_INDEX_HIDDEN)) {
updateRequest.settings(Settings.builder().put(request.settings()).put(SETTING_INDEX_HIDDEN, true).build());
} else if ("false".equals(request.settings().get(SETTING_INDEX_HIDDEN))) {
final String message = "Cannot auto-create system index ["
+ indexName
+ "] with [index.hidden] set to 'false'";
logger.warn(message);
throw new IllegalStateException(message);
}
} else {
updateRequest = buildUpdateRequest(indexName);
}
}

private CreateIndexClusterStateUpdateRequest buildUpdateRequest(String indexName) {
CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(
request.cause(),
indexName,
request.index()
).ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout());
logger.debug("Auto-creating index {}", indexName);
return updateRequest;
return createIndexService.applyCreateIndexRequest(currentState, updateRequest, false);
}
}

private CreateIndexClusterStateUpdateRequest buildSystemIndexUpdateRequest(
String indexName,
SystemIndexDescriptor descriptor
) {
String mappings = descriptor.getMappings();
Settings settings = descriptor.getSettings();
String aliasName = descriptor.getAliasName();
private CreateIndexClusterStateUpdateRequest buildUpdateRequest(String indexName) {
CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(
request.cause(),
indexName,
request.index()
).ackTimeout(request.timeout()).performReroute(false).masterNodeTimeout(request.masterNodeTimeout());
logger.debug("Auto-creating index {}", indexName);
return updateRequest;
}

// if we are writing to the alias name, we should create the primary index here
String concreteIndexName = indexName.equals(aliasName) ? descriptor.getPrimaryIndex() : indexName;
private CreateIndexClusterStateUpdateRequest buildSystemIndexUpdateRequest(
String indexName,
SystemIndexDescriptor descriptor
) {
String mappings = descriptor.getMappings();
Settings settings = descriptor.getSettings();
String aliasName = descriptor.getAliasName();

CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(
request.cause(),
concreteIndexName,
request.index()
).ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout());
// if we are writing to the alias name, we should create the primary index here
String concreteIndexName = indexName.equals(aliasName) ? descriptor.getPrimaryIndex() : indexName;

updateRequest.waitForActiveShards(ActiveShardCount.ALL);
CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(
request.cause(),
concreteIndexName,
request.index()
).ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()).performReroute(false);

if (mappings != null) {
updateRequest.mappings(mappings);
}
if (settings != null) {
updateRequest.settings(settings);
}
if (aliasName != null) {
updateRequest.aliases(Set.of(new Alias(aliasName).isHidden(true)));
}
updateRequest.waitForActiveShards(ActiveShardCount.ALL);

if (logger.isDebugEnabled()) {
if (concreteIndexName.equals(indexName) == false) {
logger.debug("Auto-creating backing system index {} for alias {}", concreteIndexName, indexName);
} else {
logger.debug("Auto-creating system index {}", concreteIndexName);
}
}
if (mappings != null) {
updateRequest.mappings(mappings);
}
if (settings != null) {
updateRequest.settings(settings);
}
if (aliasName != null) {
updateRequest.aliases(Set.of(new Alias(aliasName).isHidden(true)));
}

return updateRequest;
if (logger.isDebugEnabled()) {
if (concreteIndexName.equals(indexName) == false) {
logger.debug("Auto-creating backing system index {} for alias {}", concreteIndexName, indexName);
} else {
logger.debug("Auto-creating system index {}", concreteIndexName);
}
}

return updateRequest;
}
);
};
clusterService.submitStateUpdateTask("auto create [" + request.index() + "]", clusterTask, clusterTask, executor, clusterTask);
}

@Override
Expand Down