Skip to content

Commit

Permalink
Add global and index level blocks to IndexSettings
Browse files Browse the repository at this point in the history
  • Loading branch information
tlrx committed Nov 19, 2018
1 parent 6d4a3f8 commit 8413466
Show file tree
Hide file tree
Showing 15 changed files with 249 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,13 @@ public ClusterState execute(ClusterState currentState) throws Exception {
"]: cannot be greater than number of shard copies [" +
(tmpImd.getNumberOfReplicas() + 1) + "]");
}

final ClusterBlocks clusterBlocks = currentState.blocks();
final Set<ClusterBlock> globalBlocks = clusterBlocks.global();
final Set<ClusterBlock> indexBlocks = clusterBlocks.indices().get(tmpImd.getIndex().getName());

// create the index here (on the master) to validate it can be created, as well as adding the mapping
final IndexService indexService = indicesService.createIndex(tmpImd, Collections.emptyList());
final IndexService indexService = indicesService.createIndex(tmpImd, globalBlocks, indexBlocks, Collections.emptyList());
createdIndex = indexService.index();
// now add the mappings
MapperService mapperService = indexService.mapperService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.AliasAction.NewAliasValidator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
Expand Down Expand Up @@ -124,6 +126,11 @@ ClusterState innerExecute(ClusterState currentState, Iterable<AliasAction> actio
if (index == null) {
throw new IndexNotFoundException(action.getIndex());
}

final ClusterBlocks clusterBlocks = currentState.blocks();
final Set<ClusterBlock> globalBlocks = clusterBlocks.global();
final Set<ClusterBlock> indexBlocks = clusterBlocks.indices().get(index.getIndex().getName());

NewAliasValidator newAliasValidator = (alias, indexRouting, filter, writeIndex) -> {
/* It is important that we look up the index using the metadata builder we are modifying so we can remove an
* index and replace it with an alias. */
Expand All @@ -136,7 +143,7 @@ ClusterState innerExecute(ClusterState currentState, Iterable<AliasAction> actio
if (indexService == null) {
// temporarily create the index and add mappings so we can parse the filter
try {
indexService = indicesService.createIndex(index, emptyList());
indexService = indicesService.createIndex(index, globalBlocks, indexBlocks, emptyList());
indicesToClose.add(index.getIndex());
} catch (IOException e) {
throw new ElasticsearchException("Failed to create temporary index for parsing the alias", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ private static void validateAndAddTemplate(final PutRequest request, IndexTempla
.build();

final IndexMetaData tmpIndexMetadata = IndexMetaData.builder(temporaryIndexName).settings(dummySettings).build();
IndexService dummyIndexService = indicesService.createIndex(tmpIndexMetadata, Collections.emptyList());
IndexService dummyIndexService = indicesService.createIndex(tmpIndexMetadata, null, null, Collections.emptyList());
createdIndex = dummyIndexService.index();

templateBuilder.order(request.order);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingClusterStateUpdateRequest;
Expand Down Expand Up @@ -52,6 +54,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.NO_LONGER_ASSIGNED;

Expand Down Expand Up @@ -144,8 +147,12 @@ ClusterState executeRefresh(final ClusterState currentState, final List<RefreshT
boolean removeIndex = false;
IndexService indexService = indicesService.indexService(indexMetaData.getIndex());
if (indexService == null) {
final ClusterBlocks clusterBlocks = currentState.blocks();
final Set<ClusterBlock> globalBlocks = clusterBlocks.global();
final Set<ClusterBlock> indexBlocks = clusterBlocks.indices().get(indexMetaData.getIndex().getName());

// we need to create the index here, and add the current mapping to it, so we can merge
indexService = indicesService.createIndex(indexMetaData, Collections.emptyList());
indexService = indicesService.createIndex(indexMetaData, globalBlocks, indexBlocks, Collections.emptyList());
removeIndex = true;
indexService.mapperService().merge(indexMetaData, MergeReason.MAPPING_RECOVERY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
Expand Down Expand Up @@ -679,6 +680,11 @@ public boolean isForceExecution() {
}
}

@Override
public void updateBlocks(@Nullable final Set<ClusterBlock> globalBlocks, @Nullable final Set<ClusterBlock> indexBlocks) {
indexSettings.updateIndexBlocks(globalBlocks, indexBlocks);
}

private void rescheduleFsyncTask(Translog.Durability durability) {
try {
if (fsyncTask != null) {
Expand Down
60 changes: 59 additions & 1 deletion server/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.MergePolicy;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
Expand All @@ -37,6 +40,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -327,6 +331,11 @@ public final class IndexSettings {
private volatile String defaultPipeline;
private volatile boolean searchThrottled;

/**
* A {@link ClusterBlocks} containing global level and index level blocks
*/
private volatile ClusterBlocks indexBlocks;

/**
* The maximum number of refresh listeners allows on this shard.
*/
Expand Down Expand Up @@ -397,8 +406,27 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
*
* @param indexMetaData the index metadata this settings object is associated with
* @param nodeSettings the nodes settings this index is allocated on.
* @param indexScopedSettings the index level settings
*/
public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSettings, IndexScopedSettings indexScopedSettings) {
this(indexMetaData, nodeSettings, indexScopedSettings, null, null);
}

/**
* Creates a new {@link IndexSettings} instance. The given node settings will be merged with the settings in the metadata
* while index level settings will overwrite node settings.
*
* @param indexMetaData the index metadata this settings object is associated with
* @param nodeSettings the nodes settings this index is allocated on.
* @param indexScopedSettings the index level settings
* @param globalBlocks the global level blocks
* @param indexBlocks the index level blocks
*/
public IndexSettings(final IndexMetaData indexMetaData,
final Settings nodeSettings,
final IndexScopedSettings indexScopedSettings,
@Nullable final Set<ClusterBlock> globalBlocks,
@Nullable final Set<ClusterBlock> indexBlocks) {
scopedSettings = indexScopedSettings.copy(nodeSettings, indexMetaData);
this.nodeSettings = nodeSettings;
this.settings = Settings.builder().put(nodeSettings).put(indexMetaData.getSettings()).build();
Expand All @@ -408,7 +436,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
nodeName = Node.NODE_NAME_SETTING.get(settings);
this.indexMetaData = indexMetaData;
numberOfShards = settings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_SHARDS, null);

this.indexBlocks = buildBlocks(globalBlocks, indexBlocks);
this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings);
this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings);
this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings);
Expand Down Expand Up @@ -626,6 +654,36 @@ public static boolean same(final Settings left, final Settings right) {
.equals(right.filter(IndexScopedSettings.INDEX_SETTINGS_KEY_PREDICATE));
}

/**
* Updates the global level and index level blocks.
*
* @param globalBlocks the global level blocks
* @param indexBlocks the index level blocks
*/
public synchronized void updateIndexBlocks(@Nullable final Set<ClusterBlock> globalBlocks,
@Nullable final Set<ClusterBlock> indexBlocks) {
this.indexBlocks = buildBlocks(globalBlocks, indexBlocks);
}

private ClusterBlocks buildBlocks(@Nullable final Set<ClusterBlock> globalBlocks,
@Nullable final Set<ClusterBlock> indexBlocks) {
final ClusterBlocks.Builder builder = ClusterBlocks.builder();
if (globalBlocks != null) {
globalBlocks.forEach(builder::addGlobalBlock);
}
if (indexBlocks != null) {
indexBlocks.forEach(block -> builder.addIndexBlock(index.getName(), block));
}
return builder.build();
}

/**
* @return the current global level and index level blocks
*/
public ClusterBlocks getIndexBlocks() {
return indexBlocks;
}

/**
* Returns the translog durability for this index.
*/
Expand Down
15 changes: 11 additions & 4 deletions server/src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
Expand Down Expand Up @@ -448,8 +449,10 @@ public IndexService indexServiceSafe(Index index) {
* @throws ResourceAlreadyExistsException if the index already exists.
*/
@Override
public synchronized IndexService createIndex(
final IndexMetaData indexMetaData, final List<IndexEventListener> builtInListeners) throws IOException {
public synchronized IndexService createIndex(final IndexMetaData indexMetaData,
@Nullable final Set<ClusterBlock> globalBlocks,
@Nullable final Set<ClusterBlock> indexBlocks,
final List<IndexEventListener> builtInListeners) throws IOException {
ensureChangesAllowed();
if (indexMetaData.getIndexUUID().equals(IndexMetaData.INDEX_UUID_NA_VALUE)) {
throw new IllegalArgumentException("index must have a real UUID found value: [" + indexMetaData.getIndexUUID() + "]");
Expand All @@ -471,6 +474,8 @@ public void onStoreClosed(ShardId shardId) {
createIndexService(
"create index",
indexMetaData,
globalBlocks,
indexBlocks,
indicesQueryCache,
indicesFieldDataCache,
finalListeners,
Expand All @@ -493,11 +498,13 @@ public void onStoreClosed(ShardId shardId) {
*/
private synchronized IndexService createIndexService(final String reason,
IndexMetaData indexMetaData,
@Nullable final Set<ClusterBlock> globalBlocks,
@Nullable final Set<ClusterBlock> indexBlocks,
IndicesQueryCache indicesQueryCache,
IndicesFieldDataCache indicesFieldDataCache,
List<IndexEventListener> builtInListeners,
IndexingOperationListener... indexingOperationListeners) throws IOException {
final IndexSettings idxSettings = new IndexSettings(indexMetaData, settings, indexScopedSettings);
final IndexSettings idxSettings = new IndexSettings(indexMetaData, settings, indexScopedSettings, globalBlocks, indexBlocks);
// we ignore private settings since they are not registered settings
indexScopedSettings.validate(indexMetaData.getSettings(), true, true, true);
logger.debug("creating Index [{}], shards [{}]/[{}] - reason [{}]",
Expand Down Expand Up @@ -587,7 +594,7 @@ public synchronized void verifyIndexMetadata(IndexMetaData metaData, IndexMetaDa
closeables.add(indicesQueryCache);
// this will also fail if some plugin fails etc. which is nice since we can verify that early
final IndexService service =
createIndexService("metadata verification", metaData, indicesQueryCache, indicesFieldDataCache, emptyList());
createIndexService("metadata verification", metaData, null, null, indicesQueryCache, indicesFieldDataCache, emptyList());
closeables.add(() -> service.close("metadata verification", false));
service.mapperService().merge(metaData, MapperService.MergeReason.MAPPING_RECOVERY);
if (metaData.equals(metaDataUpdate) == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -457,7 +459,11 @@ private void createIndices(final ClusterState state) {

AllocatedIndex<? extends Shard> indexService = null;
try {
indexService = indicesService.createIndex(indexMetaData, buildInIndexListener);
final ClusterBlocks clusterBlocks = state.blocks();
final Set<ClusterBlock> globalBlocks = clusterBlocks.global();
final Set<ClusterBlock> indexBlocks = clusterBlocks.indices().get(index.getName());

indexService = indicesService.createIndex(indexMetaData, globalBlocks, indexBlocks, buildInIndexListener);
if (indexService.updateMapping(null, indexMetaData) && sendRefreshMapping) {
nodeMappingRefreshAction.nodeMappingRefresh(state.nodes().getMasterNode(),
new NodeMappingRefreshAction.NodeMappingRefreshRequest(indexMetaData.getIndex().getName(),
Expand All @@ -479,8 +485,8 @@ private void createIndices(final ClusterState state) {
}
}

private void updateIndices(ClusterChangedEvent event) {
if (!event.metaDataChanged()) {
private void updateIndices(final ClusterChangedEvent event) {
if (event.metaDataChanged() == false && event.blocksChanged() == false) {
return;
}
final ClusterState state = event.state();
Expand Down Expand Up @@ -512,6 +518,11 @@ private void updateIndices(ClusterChangedEvent event) {
}
}
}

if (event.blocksChanged()) {
final ClusterBlocks clusterBlocks = state.blocks();
indexService.updateBlocks(clusterBlocks.global(), clusterBlocks.indices().get(index.getName()));
}
}
}

Expand Down Expand Up @@ -780,6 +791,14 @@ public interface AllocatedIndex<T extends Shard> extends Iterable<T>, IndexCompo
*/
void updateMetaData(IndexMetaData currentIndexMetaData, IndexMetaData newIndexMetaData);

/**
* Updates the global level and index level blocks of this index. Changes become visible through {@link #getIndexSettings()}.
*
* @param globalBlocks the global level blocks
* @param indexBlocks the index level blocks
*/
void updateBlocks(@Nullable Set<ClusterBlock> globalBlocks, @Nullable Set<ClusterBlock> indexBlocks);

/**
* Checks if index requires refresh from master.
*/
Expand All @@ -801,12 +820,16 @@ public interface AllocatedIndices<T extends Shard, U extends AllocatedIndex<T>>
/**
* Creates a new {@link IndexService} for the given metadata.
*
* @param indexMetaData the index metadata to create the index for
* @param builtInIndexListener a list of built-in lifecycle {@link IndexEventListener} that should should be used along side with
* the per-index listeners
* @param indexMetaData the index metadata to create the index for
* @param globalBlocks the global level blocks
* @param indexBlocks the index level blocks
* @param builtInIndexListener a list of built-in lifecycle {@link IndexEventListener} that should should be used along side with
* the per-index listeners
* @throws ResourceAlreadyExistsException if the index already exists.
*/
U createIndex(IndexMetaData indexMetaData,
@Nullable Set<ClusterBlock> globalBlocks,
@Nullable Set<ClusterBlock> indexBlocks,
List<IndexEventListener> builtInIndexListener) throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,6 @@ private void setupIndicesService() throws Exception {
when(service.getIndexSortSupplier()).thenReturn(supplier);
when(service.getIndexEventListener()).thenReturn(mock(IndexEventListener.class));

when(indicesService.createIndex(anyObject(), anyObject())).thenReturn(service);
when(indicesService.createIndex(anyObject(), anyObject(), anyObject(), anyObject())).thenReturn(service);
}
}
Loading

0 comments on commit 8413466

Please sign in to comment.