Skip to content

Commit

Permalink
Ignore shard started requests when primary term does not match (#37899)
Browse files Browse the repository at this point in the history
This commit changes the StartedShardEntry so that it also contains the
primary term of the shard to start. This way the master node can also
checks that the primary term from the start request is equal to the current
shard's primary term in the cluster state, and it can ignore any shard
started request that would concerns a previous instance of the shard that
would have been allocated to the same node.

Such situation are likely to happen with frozen (or restored) indices and
the replication of closed indices, because with replicated closed indices
the shards will be initialized again after the index is closed and can
potentially be re initialized again if the index is reopened as a frozen
index. In such cases the lifecycle of the shards would be something like:
* shard is STARTED
* index is closed
* shards is INITIALIZING (index state is CLOSED, primary term is X)
* index is reopened
* shards are INITIALIZING again (index state is OPENED, potentially frozen,
primary term is X+1)

Adding the primary term to the shard started request will allow to discard
potential StartedShardEntry requests received by the master node if the
request concerns the shard with primary term X because it has been
moved/reinitialized in the meanwhile under the primary term X+1.

Relates to #33888
  • Loading branch information
tlrx committed Jan 31, 2019
1 parent 18f5c7a commit 255015d
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -493,12 +493,20 @@ public int hashCode() {
}
}

public void shardStarted(final ShardRouting shardRouting, final String message, Listener listener) {
shardStarted(shardRouting, message, listener, clusterService.state());
public void shardStarted(final ShardRouting shardRouting,
final long primaryTerm,
final String message,
final Listener listener) {
shardStarted(shardRouting, primaryTerm, message, listener, clusterService.state());
}
public void shardStarted(final ShardRouting shardRouting, final String message, Listener listener, ClusterState currentState) {
StartedShardEntry shardEntry = new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), message);
sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, shardEntry, listener);

public void shardStarted(final ShardRouting shardRouting,
final long primaryTerm,
final String message,
final Listener listener,
final ClusterState currentState) {
StartedShardEntry entry = new StartedShardEntry(shardRouting.shardId(), shardRouting.allocationId().getId(), primaryTerm, message);
sendShardAction(SHARD_STARTED_ACTION_NAME, currentState, entry, listener);
}

private static class ShardStartedTransportHandler implements TransportRequestHandler<StartedShardEntry> {
Expand Down Expand Up @@ -543,7 +551,7 @@ public ClusterTasksResult<StartedShardEntry> execute(ClusterState currentState,
List<ShardRouting> shardRoutingsToBeApplied = new ArrayList<>(tasks.size());
Set<ShardRouting> seenShardRoutings = new HashSet<>(); // to prevent duplicates
for (StartedShardEntry task : tasks) {
ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId);
final ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId);
if (matched == null) {
// tasks that correspond to non-existent shards are marked as successful. The reason is that we resend shard started
// events on every cluster state publishing that does not contain the shard as started yet. This means that old stale
Expand All @@ -552,6 +560,19 @@ public ClusterTasksResult<StartedShardEntry> execute(ClusterState currentState,
logger.debug("{} ignoring shard started task [{}] (shard does not exist anymore)", task.shardId, task);
builder.success(task);
} else {
if (matched.primary() && task.primaryTerm > 0) {
final IndexMetaData indexMetaData = currentState.metaData().index(task.shardId.getIndex());
assert indexMetaData != null;
final long currentPrimaryTerm = indexMetaData.primaryTerm(task.shardId.id());
if (currentPrimaryTerm != task.primaryTerm) {
assert currentPrimaryTerm > task.primaryTerm : "received a primary term with a higher term than in the " +
"current cluster state (received [" + task.primaryTerm + "] but current is [" + currentPrimaryTerm + "])";
logger.debug("{} ignoring shard started task [{}] (primary term {} does not match current term {})",
task.shardId, task, task.primaryTerm, currentPrimaryTerm);
builder.success(task);
continue;
}
}
if (matched.initializing() == false) {
assert matched.active() : "expected active shard routing for task " + task + " but found " + matched;
// same as above, this might have been a stale in-flight request, so we just ignore.
Expand Down Expand Up @@ -596,15 +617,20 @@ public void onFailure(String source, Exception e) {
public static class StartedShardEntry extends TransportRequest {
final ShardId shardId;
final String allocationId;
final long primaryTerm;
final String message;

StartedShardEntry(StreamInput in) throws IOException {
super(in);
shardId = ShardId.readShardId(in);
allocationId = in.readString();
if (in.getVersion().before(Version.V_6_3_0)) {
final long primaryTerm = in.readVLong();
primaryTerm = in.readVLong();
assert primaryTerm == UNASSIGNED_PRIMARY_TERM : "shard is only started by itself: primary term [" + primaryTerm + "]";
} else if (in.getVersion().onOrAfter(Version.V_6_7_0)) {
primaryTerm = in.readVLong();
} else {
primaryTerm = UNASSIGNED_PRIMARY_TERM;
}
this.message = in.readString();
if (in.getVersion().before(Version.V_6_3_0)) {
Expand All @@ -613,9 +639,10 @@ public static class StartedShardEntry extends TransportRequest {
}
}

public StartedShardEntry(ShardId shardId, String allocationId, String message) {
public StartedShardEntry(final ShardId shardId, final String allocationId, final long primaryTerm, final String message) {
this.shardId = shardId;
this.allocationId = allocationId;
this.primaryTerm = primaryTerm;
this.message = message;
}

Expand All @@ -626,6 +653,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(allocationId);
if (out.getVersion().before(Version.V_6_3_0)) {
out.writeVLong(0L);
} else if (out.getVersion().onOrAfter(Version.V_6_7_0)) {
out.writeVLong(primaryTerm);
}
out.writeString(message);
if (out.getVersion().before(Version.V_6_3_0)) {
Expand All @@ -635,8 +664,8 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public String toString() {
return String.format(Locale.ROOT, "StartedShardEntry{shardId [%s], allocationId [%s], message [%s]}",
shardId, allocationId, message);
return String.format(Locale.ROOT, "StartedShardEntry{shardId [%s], allocationId [%s], primary term [%d], message [%s]}",
shardId, allocationId, primaryTerm, message);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,13 +575,14 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR
}

try {
logger.debug("{} creating shard", shardRouting.shardId());
final long primaryTerm = state.metaData().index(shardRouting.index()).primaryTerm(shardRouting.id());
logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm);
RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode);
indicesService.createShard(
shardRouting,
recoveryState,
recoveryTargetService,
new RecoveryListener(shardRouting),
new RecoveryListener(shardRouting, primaryTerm),
repositoriesService,
failedShardHandler,
globalCheckpointSyncer,
Expand All @@ -598,9 +599,10 @@ private void updateShard(DiscoveryNodes nodes, ShardRouting shardRouting, Shard
"local shard has a different allocation id but wasn't cleaning by removeShards. "
+ "cluster state: " + shardRouting + " local: " + currentRoutingEntry;

final long primaryTerm;
try {
final IndexMetaData indexMetaData = clusterState.metaData().index(shard.shardId().getIndex());
final long primaryTerm = indexMetaData.primaryTerm(shard.shardId().id());
primaryTerm = indexMetaData.primaryTerm(shard.shardId().id());
final Set<String> inSyncIds = indexMetaData.inSyncAllocationIds(shard.shardId().id());
final IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId());
final Set<String> pre60AllocationIds = indexShardRoutingTable.assignedShards()
Expand Down Expand Up @@ -633,7 +635,7 @@ private void updateShard(DiscoveryNodes nodes, ShardRouting shardRouting, Shard
shardRouting.shardId(), state, nodes.getMasterNode());
}
if (nodes.getMasterNode() != null) {
shardStateAction.shardStarted(shardRouting, "master " + nodes.getMasterNode() +
shardStateAction.shardStarted(shardRouting, primaryTerm, "master " + nodes.getMasterNode() +
" marked shard as initializing, but shard state is [" + state + "], mark shard as started",
SHARD_STATE_ACTION_LISTENER, clusterState);
}
Expand Down Expand Up @@ -673,15 +675,24 @@ private static DiscoveryNode findSourceNodeForPeerRecovery(Logger logger, Routin

private class RecoveryListener implements PeerRecoveryTargetService.RecoveryListener {

/**
* ShardRouting with which the shard was created
*/
private final ShardRouting shardRouting;

private RecoveryListener(ShardRouting shardRouting) {
/**
* Primary term with which the shard was created
*/
private final long primaryTerm;

private RecoveryListener(final ShardRouting shardRouting, final long primaryTerm) {
this.shardRouting = shardRouting;
this.primaryTerm = primaryTerm;
}

@Override
public void onRecoveryDone(RecoveryState state) {
shardStateAction.shardStarted(shardRouting, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
public void onRecoveryDone(final RecoveryState state) {
shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
}

@Override
Expand Down
Loading

0 comments on commit 255015d

Please sign in to comment.