Skip to content

Commit

Permalink
Remove shadow replicas
Browse files Browse the repository at this point in the history
Resolves #22024
  • Loading branch information
dakrone committed Apr 11, 2017
1 parent 76603d4 commit 5cace8e
Show file tree
Hide file tree
Showing 53 changed files with 85 additions and 3,236 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,4 @@ protected ReplicaResult shardOperationOnReplica(ShardFlushRequest request, Index
logger.trace("{} flush request executed on replica", replica.shardId());
return new ReplicaResult();
}

@Override
protected boolean shouldExecuteReplication(IndexMetaData indexMetaData) {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,4 @@ protected ReplicaResult shardOperationOnReplica(BasicReplicationRequest request,
logger.trace("{} refresh request executed on replica", replica.shardId());
return new ReplicaResult();
}

@Override
protected boolean shouldExecuteReplication(IndexMetaData indexMetaData) {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,6 @@ protected ShardIterator shards(ClusterState state, InternalRequest request) {
@Override
protected void resolveRequest(ClusterState state, InternalRequest request) {
IndexMetaData indexMeta = state.getMetaData().index(request.concreteIndex());
if (request.request().realtime && // if the realtime flag is set
request.request().preference() == null && // the preference flag is not already set
indexMeta != null && // and we have the index
indexMeta.isIndexUsingShadowReplicas()) { // and the index uses shadow replicas
// set the preference for the request to use "_primary" automatically
request.request().preference(Preference.PRIMARY.type());
}
// update the routing (request#index here is possibly an alias)
request.request().routing(state.metaData().resolveIndexRouting(request.request().parent(), request.request().routing(), request.request().index()));
// Fail fast on the node that received the request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public class ReplicationOperation<
*/
private final AtomicInteger pendingActions = new AtomicInteger();
private final AtomicInteger successfulShards = new AtomicInteger();
private final boolean executeOnReplicas;
private final Primary<Request, ReplicaRequest, PrimaryResultT> primary;
private final Replicas<ReplicaRequest> replicasProxy;
private final AtomicBoolean finished = new AtomicBoolean();
Expand All @@ -86,9 +85,8 @@ public class ReplicationOperation<

public ReplicationOperation(Request request, Primary<Request, ReplicaRequest, PrimaryResultT> primary,
ActionListener<PrimaryResultT> listener,
boolean executeOnReplicas, Replicas<ReplicaRequest> replicas,
Replicas<ReplicaRequest> replicas,
Supplier<ClusterState> clusterStateSupplier, Logger logger, String opType) {
this.executeOnReplicas = executeOnReplicas;
this.replicasProxy = replicas;
this.primary = primary;
this.resultListener = listener;
Expand Down Expand Up @@ -160,7 +158,7 @@ private void performOnReplicas(ReplicaRequest replicaRequest, List<ShardRouting>
final String localNodeId = primary.routingEntry().currentNodeId();
// If the index gets deleted after primary operation, we skip replication
for (final ShardRouting shard : shards) {
if (executeOnReplicas == false || shard.unassigned()) {
if (shard.unassigned()) {
if (shard.primary() == false) {
totalShards.incrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,10 @@ public void handleException(TransportException exp) {
} else {
setPhase(replicationTask, "primary");
final IndexMetaData indexMetaData = clusterService.state().getMetaData().index(request.shardId().getIndex());
final boolean executeOnReplicas = (indexMetaData == null) || shouldExecuteReplication(indexMetaData);
final ActionListener<Response> listener = createResponseListener(primaryShardReference);
createReplicatedOperation(request,
ActionListener.wrap(result -> result.respond(listener), listener::onFailure),
primaryShardReference, executeOnReplicas)
primaryShardReference)
.execute();
}
} catch (Exception e) {
Expand Down Expand Up @@ -371,9 +370,9 @@ public void onFailure(Exception e) {

protected ReplicationOperation<Request, ReplicaRequest, PrimaryResult<ReplicaRequest, Response>> createReplicatedOperation(
Request request, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener,
PrimaryShardReference primaryShardReference, boolean executeOnReplicas) {
PrimaryShardReference primaryShardReference) {
return new ReplicationOperation<>(request, primaryShardReference, listener,
executeOnReplicas, replicasProxy, clusterService::state, logger, actionName);
replicasProxy, clusterService::state, logger, actionName);
}
}

Expand Down Expand Up @@ -909,14 +908,6 @@ public void onFailure(Exception e) {
indexShard.acquirePrimaryOperationLock(onAcquired, executor);
}

/**
* Indicated whether this operation should be replicated to shadow replicas or not. If this method returns true the replication phase
* will be skipped. For example writes such as index and delete don't need to be replicated on shadow replicas but refresh and flush do.
*/
protected boolean shouldExecuteReplication(IndexMetaData indexMetaData) {
return indexMetaData.isIndexUsingShadowReplicas() == false;
}

class ShardReference implements Releasable {

protected final IndexShard indexShard;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,13 +383,6 @@ static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpen
if (logger.isTraceEnabled()) {
logger.trace("shard: {} size: {}", sid, size);
}
if (indexMeta != null && indexMeta.isIndexUsingShadowReplicas()) {
// Shards on a shared filesystem should be considered of size 0
if (logger.isTraceEnabled()) {
logger.trace("shard: {} is using shadow replicas and will be treated as size 0", sid);
}
size = 0;
}
newShardSizes.put(sid, size);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,18 +192,11 @@ static Setting<Integer> buildNumberOfShardsSetting() {
public static final String SETTING_NUMBER_OF_REPLICAS = "index.number_of_replicas";
public static final Setting<Integer> INDEX_NUMBER_OF_REPLICAS_SETTING =
Setting.intSetting(SETTING_NUMBER_OF_REPLICAS, 1, 0, Property.Dynamic, Property.IndexScope);
public static final String SETTING_SHADOW_REPLICAS = "index.shadow_replicas";
public static final Setting<Boolean> INDEX_SHADOW_REPLICAS_SETTING =
Setting.boolSetting(SETTING_SHADOW_REPLICAS, false, Property.IndexScope, Property.Deprecated);

public static final String SETTING_ROUTING_PARTITION_SIZE = "index.routing_partition_size";
public static final Setting<Integer> INDEX_ROUTING_PARTITION_SIZE_SETTING =
Setting.intSetting(SETTING_ROUTING_PARTITION_SIZE, 1, 1, Property.IndexScope);

public static final String SETTING_SHARED_FILESYSTEM = "index.shared_filesystem";
public static final Setting<Boolean> INDEX_SHARED_FILESYSTEM_SETTING =
Setting.boolSetting(SETTING_SHARED_FILESYSTEM, INDEX_SHADOW_REPLICAS_SETTING, Property.IndexScope, Property.Deprecated);

public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas";
public static final Setting<AutoExpandReplicas> INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING;
public static final String SETTING_READ_ONLY = "index.blocks.read_only";
Expand Down Expand Up @@ -240,10 +233,6 @@ static Setting<Integer> buildNumberOfShardsSetting() {
public static final String SETTING_DATA_PATH = "index.data_path";
public static final Setting<String> INDEX_DATA_PATH_SETTING =
new Setting<>(SETTING_DATA_PATH, "", Function.identity(), Property.IndexScope);
public static final String SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE = "index.shared_filesystem.recover_on_any_node";
public static final Setting<Boolean> INDEX_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE_SETTING =
Setting.boolSetting(SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, false,
Property.Dynamic, Property.IndexScope, Property.Deprecated);
public static final String INDEX_UUID_NA_VALUE = "_na_";

public static final String INDEX_ROUTING_REQUIRE_GROUP_PREFIX = "index.routing.allocation.require";
Expand Down Expand Up @@ -1237,35 +1226,6 @@ public static IndexMetaData fromXContent(XContentParser parser) throws IOExcepti
}
}

private static final DeprecationLogger deprecationLogger = new DeprecationLogger(ESLoggerFactory.getLogger(IndexMetaData.class));

/**
* Returns <code>true</code> iff the given settings indicate that the index
* associated with these settings allocates it's shards on a shared
* filesystem. Otherwise <code>false</code>. The default setting for this
* is the returned value from
* {@link #isIndexUsingShadowReplicas(org.elasticsearch.common.settings.Settings)}.
*/
public boolean isOnSharedFilesystem(Settings settings) {
// don't use the setting directly, not to trigger verbose deprecation logging
return settings.getAsBooleanLenientForPreEs6Indices(
this.indexCreatedVersion, SETTING_SHARED_FILESYSTEM, isIndexUsingShadowReplicas(settings), deprecationLogger);
}

/**
* Returns <code>true</code> iff the given settings indicate that the index associated
* with these settings uses shadow replicas. Otherwise <code>false</code>. The default
* setting for this is <code>false</code>.
*/
public boolean isIndexUsingShadowReplicas() {
return isIndexUsingShadowReplicas(this.settings);
}

public boolean isIndexUsingShadowReplicas(Settings settings) {
// don't use the setting directly, not to trigger verbose deprecation logging
return settings.getAsBooleanLenientForPreEs6Indices(this.indexCreatedVersion, SETTING_SHADOW_REPLICAS, false, deprecationLogger);
}

/**
* Adds human readable version and creation date settings.
* This method is used to display the settings in a human readable format in REST API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,10 +433,9 @@ public ClusterState execute(ClusterState currentState) throws Exception {
.put(indexMetaData, false)
.build();

String maybeShadowIndicator = indexMetaData.isIndexUsingShadowReplicas() ? "s" : "";
logger.info("[{}] creating index, cause [{}], templates {}, shards [{}]/[{}{}], mappings {}",
logger.info("[{}] creating index, cause [{}], templates {}, shards [{}]/[{}], mappings {}",
request.index(), request.cause(), templateNames, indexMetaData.getNumberOfShards(),
indexMetaData.getNumberOfReplicas(), maybeShadowIndicator, mappings.keySet());
indexMetaData.getNumberOfReplicas(), mappings.keySet());

ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
if (!request.blocks().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,7 @@ boolean validate(MetaData metaData) {
"allocation set " + inSyncAllocationIds);
}

if (indexMetaData.isIndexUsingShadowReplicas() == false && // see #20650
shardRouting.primary() && shardRouting.initializing() && shardRouting.relocating() == false &&
if (shardRouting.primary() && shardRouting.initializing() && shardRouting.relocating() == false &&
RecoverySource.isInitialRecovery(shardRouting.recoverySource().getType()) == false &&
inSyncAllocationIds.contains(shardRouting.allocationId().getId()) == false)
throw new IllegalStateException("a primary shard routing " + shardRouting + " is a primary that is recovering from " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,9 +567,6 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId
if (failedShard.primary()) {
// promote active replica to primary if active replica exists (only the case for shadow replicas)
ShardRouting activeReplica = activeReplica(failedShard.shardId());
assert activeReplica == null || indexMetaData.isIndexUsingShadowReplicas() :
"initializing primary [" + failedShard + "] with active replicas [" + activeReplica + "] only expected when " +
"using shadow replicas";
if (activeReplica == null) {
moveToUnassigned(failedShard, unassignedInfo);
} else {
Expand Down Expand Up @@ -626,10 +623,6 @@ private void promoteReplicaToPrimary(ShardRouting activeReplica, IndexMetaData i
assert activeReplica.started() : "replica relocation should have been cancelled: " + activeReplica;
ShardRouting primarySwappedCandidate = promoteActiveReplicaShardToPrimary(activeReplica);
routingChangesObserver.replicaPromoted(activeReplica);
if (indexMetaData.isIndexUsingShadowReplicas()) {
ShardRouting initializedShard = reinitShadowPrimary(primarySwappedCandidate);
routingChangesObserver.startedPrimaryReinitialized(primarySwappedCandidate, initializedShard);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,6 @@ public void apply(Settings value, Settings current, Settings previous) {
TribeService.TRIBE_NAME_SETTING,
NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING,
NodeEnvironment.ENABLE_LUCENE_SEGMENT_INFOS_TRACE_SETTING,
NodeEnvironment.ADD_NODE_LOCK_ID_TO_CUSTOM_PATH,
OsService.REFRESH_INTERVAL_SETTING,
ProcessService.REFRESH_INTERVAL_SETTING,
JvmService.REFRESH_INTERVAL_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,10 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING,
IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING,
IndexMetaData.INDEX_ROUTING_PARTITION_SIZE_SETTING,
IndexMetaData.INDEX_SHADOW_REPLICAS_SETTING,
IndexMetaData.INDEX_SHARED_FILESYSTEM_SETTING,
IndexMetaData.INDEX_READ_ONLY_SETTING,
IndexMetaData.INDEX_BLOCKS_READ_SETTING,
IndexMetaData.INDEX_BLOCKS_WRITE_SETTING,
IndexMetaData.INDEX_BLOCKS_METADATA_SETTING,
IndexMetaData.INDEX_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE_SETTING,
IndexMetaData.INDEX_PRIORITY_SETTING,
IndexMetaData.INDEX_DATA_PATH_SETTING,
SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_DEBUG_SETTING,
Expand Down
13 changes: 1 addition & 12 deletions core/src/main/java/org/elasticsearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,6 @@ public String toString() {
public static final Setting<Integer> MAX_LOCAL_STORAGE_NODES_SETTING = Setting.intSetting("node.max_local_storage_nodes", 1, 1,
Property.NodeScope);

/**
* If true automatically append node lock id to custom data paths.
*/
public static final Setting<Boolean> ADD_NODE_LOCK_ID_TO_CUSTOM_PATH =
Setting.boolSetting("node.add_lock_id_to_custom_path", true, Property.NodeScope);


/**
* Seed for determining a persisted unique uuid of this node. If the node has already a persisted uuid on disk,
* this seed will be ignored and the uuid from disk will be reused.
Expand Down Expand Up @@ -922,11 +915,7 @@ public Path resolveBaseCustomLocation(IndexSettings indexSettings) {
if (customDataDir != null) {
// This assert is because this should be caught by MetaDataCreateIndexService
assert sharedDataPath != null;
if (ADD_NODE_LOCK_ID_TO_CUSTOM_PATH.get(indexSettings.getNodeSettings())) {
return sharedDataPath.resolve(customDataDir).resolve(Integer.toString(this.nodeLockId));
} else {
return sharedDataPath.resolve(customDataDir);
}
return sharedDataPath.resolve(customDataDir).resolve(Integer.toString(this.nodeLockId));
} else {
throw new IllegalArgumentException("no custom " + IndexMetaData.SETTING_DATA_PATH + " setting available");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,10 @@ public AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unas
final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(unassignedShard.index());
final Set<String> inSyncAllocationIds = indexMetaData.inSyncAllocationIds(unassignedShard.id());
final boolean snapshotRestore = unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT;
final boolean recoverOnAnyNode = recoverOnAnyNode(indexMetaData);

assert inSyncAllocationIds.isEmpty() == false;
// use in-sync allocation ids to select nodes
final NodeShardsResult nodeShardsResult = buildNodeShardsResult(unassignedShard, snapshotRestore || recoverOnAnyNode,
final NodeShardsResult nodeShardsResult = buildNodeShardsResult(unassignedShard, snapshotRestore,
allocation.getIgnoreNodes(unassignedShard.shardId()), inSyncAllocationIds, shardState, logger);
final boolean enoughAllocationsFound = nodeShardsResult.orderedAllocationCandidates.size() > 0;
logger.debug("[{}][{}]: found {} allocation candidates of {} based on allocation ids: [{}]", unassignedShard.index(),
Expand All @@ -122,10 +121,6 @@ public AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unas
logger.debug("[{}][{}]: missing local data, will restore from [{}]",
unassignedShard.index(), unassignedShard.id(), unassignedShard.recoverySource());
return AllocateUnassignedDecision.NOT_TAKEN;
} else if (recoverOnAnyNode) {
// let BalancedShardsAllocator take care of allocating this shard
logger.debug("[{}][{}]: missing local data, recover from any node", unassignedShard.index(), unassignedShard.id());
return AllocateUnassignedDecision.NOT_TAKEN;
} else {
// We have a shard that was previously allocated, but we could not find a valid shard copy to allocate the primary.
// We could just be waiting for the node that holds the primary to start back up, in which case the allocation for
Expand Down Expand Up @@ -331,19 +326,6 @@ private NodesToAllocate buildNodesToAllocate(RoutingAllocation allocation,
Collections.unmodifiableList(noNodeShards));
}

/**
* Return {@code true} if the index is configured to allow shards to be
* recovered on any node
*/
private boolean recoverOnAnyNode(IndexMetaData metaData) {
// don't use the setting directly, not to trigger verbose deprecation logging
return (metaData.isOnSharedFilesystem(metaData.getSettings()) || metaData.isOnSharedFilesystem(this.settings))
&& (metaData.getSettings().getAsBooleanLenientForPreEs6Indices(
metaData.getCreationVersion(), IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, false, deprecationLogger) ||
this.settings.getAsBooleanLenientForPreEs6Indices
(metaData.getCreationVersion(), IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, false, deprecationLogger));
}

protected abstract FetchResult<NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation);

private static class NodeShardsResult {
Expand Down
Loading

0 comments on commit 5cace8e

Please sign in to comment.