Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure only primary sender drives slot ownership updates #754

Merged
merged 9 commits into from
Jul 16, 2024
168 changes: 79 additions & 89 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -2927,8 +2927,11 @@ int clusterProcessPacket(clusterLink *link) {
mstime_t now = mstime();

uint16_t flags = ntohs(hdr->flags);
uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
uint64_t sender_claimed_current_epoch = 0, sender_claimed_config_epoch = 0;
clusterNode *sender = getNodeFromLinkAndMsg(link, hdr);
int sender_claims_to_be_primary = !memcmp(hdr->replicaof, CLUSTER_NODE_NULL_NAME, CLUSTER_NAMELEN);
int sender_last_reported_as_replica = sender && nodeIsReplica(sender);
int sender_last_reported_as_primary = sender && nodeIsPrimary(sender);

if (sender && (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA)) {
sender->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED;
Expand All @@ -2942,13 +2945,13 @@ int clusterProcessPacket(clusterLink *link) {

if (sender && !nodeInHandshake(sender)) {
/* Update our currentEpoch if we see a newer epoch in the cluster. */
senderCurrentEpoch = ntohu64(hdr->currentEpoch);
senderConfigEpoch = ntohu64(hdr->configEpoch);
if (senderCurrentEpoch > server.cluster->currentEpoch) server.cluster->currentEpoch = senderCurrentEpoch;
sender_claimed_current_epoch = ntohu64(hdr->currentEpoch);
sender_claimed_config_epoch = ntohu64(hdr->configEpoch);
if (sender_claimed_current_epoch > server.cluster->currentEpoch)
server.cluster->currentEpoch = sender_claimed_current_epoch;
/* Update the sender configEpoch if it is a primary publishing a newer one. */
if (!memcmp(hdr->replicaof, CLUSTER_NODE_NULL_NAME, sizeof(hdr->replicaof)) &&
senderConfigEpoch > sender->configEpoch) {
sender->configEpoch = senderConfigEpoch;
if (sender_claims_to_be_primary && sender_claimed_config_epoch > sender->configEpoch) {
sender->configEpoch = sender_claimed_config_epoch;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_FSYNC_CONFIG);
}
/* Update the replication offset info for this node. */
Expand Down Expand Up @@ -3110,36 +3113,36 @@ int clusterProcessPacket(clusterLink *link) {
/* Check for role switch: replica -> primary or primary -> replica. */
if (sender) {
serverLog(LL_DEBUG, "node %.40s (%s) announces that it is a %s in shard %.40s", sender->name,
sender->human_nodename,
!memcmp(hdr->replicaof, CLUSTER_NODE_NULL_NAME, sizeof(hdr->replicaof)) ? "primary" : "replica",
sender->shard_id);
if (!memcmp(hdr->replicaof, CLUSTER_NODE_NULL_NAME, sizeof(hdr->replicaof))) {
sender->human_nodename, sender_claims_to_be_primary ? "primary" : "replica", sender->shard_id);
if (sender_claims_to_be_primary) {
/* Node is a primary. */
clusterSetNodeAsPrimary(sender);
} else {
/* Node is a replica. */
clusterNode *primary = clusterLookupNode(hdr->replicaof, CLUSTER_NAMELEN);
clusterNode *sender_claimed_primary = clusterLookupNode(hdr->replicaof, CLUSTER_NAMELEN);

if (clusterNodeIsPrimary(sender)) {
if (sender_last_reported_as_primary) {
/* Primary turned into a replica! Reconfigure the node. */
if (primary && areInSameShard(primary, sender)) {
if (sender_claimed_primary && areInSameShard(sender_claimed_primary, sender)) {
/* `sender` was a primary and was in the same shard as its new primary */
if (sender->configEpoch > senderConfigEpoch) {
if (sender->configEpoch > sender_claimed_config_epoch) {
serverLog(LL_NOTICE,
"Ignore stale message from %.40s (%s) in shard %.40s;"
" gossip config epoch: %llu, current config epoch: %llu",
sender->name, sender->human_nodename, sender->shard_id,
(unsigned long long)senderConfigEpoch, (unsigned long long)sender->configEpoch);
(unsigned long long)sender_claimed_config_epoch,
(unsigned long long)sender->configEpoch);
} else {
/* `primary` is still a `replica` in this observer node's view;
* update its role and configEpoch */
clusterSetNodeAsPrimary(primary);
primary->configEpoch = senderConfigEpoch;
clusterSetNodeAsPrimary(sender_claimed_primary);
sender_claimed_primary->configEpoch = sender_claimed_config_epoch;
serverLog(LL_NOTICE,
"A failover occurred in shard %.40s; node %.40s (%s)"
" failed over to node %.40s (%s) with a config epoch of %llu",
sender->shard_id, sender->name, sender->human_nodename, primary->name,
primary->human_nodename, (unsigned long long)primary->configEpoch);
sender->shard_id, sender->name, sender->human_nodename,
sender_claimed_primary->name, sender_claimed_primary->human_nodename,
(unsigned long long)sender_claimed_primary->configEpoch);
}
} else {
/* `sender` was moved to another shard and has become a replica, remove its slot assignment */
Expand All @@ -3148,9 +3151,9 @@ int clusterProcessPacket(clusterLink *link) {
"Node %.40s (%s) is no longer primary of shard %.40s;"
" removed all %d slot(s) it used to own",
sender->name, sender->human_nodename, sender->shard_id, slots);
if (primary != NULL) {
if (sender_claimed_primary != NULL) {
serverLog(LL_NOTICE, "Node %.40s (%s) is now part of shard %.40s", sender->name,
sender->human_nodename, primary->shard_id);
sender->human_nodename, sender_claimed_primary->shard_id);
}
}

Expand All @@ -3162,17 +3165,17 @@ int clusterProcessPacket(clusterLink *link) {
}

/* Primary node changed for this replica? */
if (primary && sender->replicaof != primary) {
if (sender_claimed_primary && sender->replicaof != sender_claimed_primary) {
if (sender->replicaof) clusterNodeRemoveReplica(sender->replicaof, sender);
serverLog(LL_NOTICE, "Node %.40s (%s) is now a replica of node %.40s (%s) in shard %.40s",
sender->name, sender->human_nodename, primary->name, primary->human_nodename,
sender->shard_id);
clusterNodeAddReplica(primary, sender);
sender->replicaof = primary;
sender->name, sender->human_nodename, sender_claimed_primary->name,
sender_claimed_primary->human_nodename, sender->shard_id);
clusterNodeAddReplica(sender_claimed_primary, sender);
sender->replicaof = sender_claimed_primary;

/* Update the shard_id when a replica is connected to its
* primary in the very first time. */
updateShardId(sender, primary->shard_id);
updateShardId(sender, sender_claimed_primary->shard_id);

/* Update config. */
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
Expand All @@ -3187,66 +3190,41 @@ int clusterProcessPacket(clusterLink *link) {

/* Many checks are only needed if the set of served slots this
* instance claims is different compared to the set of slots we have
* for it. Check this ASAP to avoid other computational expansive
* checks later. */
clusterNode *sender_primary = NULL; /* Sender or its primary if replica. */
int dirty_slots = 0; /* Sender claimed slots don't match my view? */

if (sender) {
sender_primary = clusterNodeIsPrimary(sender) ? sender : sender->replicaof;
if (sender_primary) {
dirty_slots = memcmp(sender_primary->slots, hdr->myslots, sizeof(hdr->myslots)) != 0;

/* Force dirty when the sending shard owns no slots so that
* we have a chance to examine and repair slot migrating/importing
* states that involve empty shards. */
dirty_slots |= sender_primary->numslots == 0;
}
}

/* 1) If the sender of the message is a primary, and we detected that
* the set of slots it claims changed, scan the slots to see if we
* need to update our configuration. */
if (sender_primary && dirty_slots)
clusterUpdateSlotsConfigWith(sender_primary, senderConfigEpoch, hdr->myslots);

/* Explicitly check for a replication loop before attempting the replication
* chain folding logic. */
if (myself->replicaof && myself->replicaof->replicaof && myself->replicaof->replicaof != myself) {
/* Safeguard against sub-replicas. A replica's primary can turn itself
* into a replica if its last slot is removed. If no other node takes
* over the slot, there is nothing else to trigger replica migration. */
serverLog(LL_NOTICE, "I'm a sub-replica! Reconfiguring myself as a replica of %.40s from %.40s",
myself->replicaof->replicaof->name, myself->replicaof->name);
clusterSetPrimary(myself->replicaof->replicaof, 1);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
}

/* 2) We also check for the reverse condition, that is, the sender
* claims to serve slots we know are served by a primary with a
* greater configEpoch. If this happens we inform the sender.
*
* This is useful because sometimes after a partition heals, a
* reappearing primary may be the last one to claim a given set of
* hash slots, but with a configuration that other instances know to
* be deprecated. Example:
*
* A and B are primary and replica for slots 1,2,3.
* A is partitioned away, B gets promoted.
* B is partitioned away, and A returns available.
*
* Usually B would PING A publishing its set of served slots and its
* configEpoch, but because of the partition B can't inform A of the
* new configuration, so other nodes that have an updated table must
* do it. In this way A will stop to act as a primary (or can try to
* failover if there are the conditions to win the election). */
if (sender && dirty_slots) {
int j;

for (j = 0; j < CLUSTER_SLOTS; j++) {
* for it or if there was a failover in the sender's shard. Check
* this ASAP to avoid other computational expensive checks later.*/

if (sender && sender_claims_to_be_primary &&
(sender_last_reported_as_replica || memcmp(sender->slots, hdr->myslots, sizeof(hdr->myslots)))) {
/* Make sure CLUSTER_NODE_PRIMARY has already been set by now on sender */
serverAssert(nodeIsPrimary(sender));

/* 1) If the sender of the message is a primary, and we detected that
* the set of slots it claims changed, scan the slots to see if we
* need to update our configuration. */
clusterUpdateSlotsConfigWith(sender, sender_claimed_config_epoch, hdr->myslots);

/* 2) We also check for the reverse condition, that is, the sender
* claims to serve slots we know are served by a primary with a
* greater configEpoch. If this happens we inform the sender.
*
* This is useful because sometimes after a partition heals, a
* reappearing primary may be the last one to claim a given set of
* hash slots, but with a configuration that other instances know to
* be deprecated. Example:
*
* A and B are primary and replica for slots 1,2,3.
* A is partitioned away, B gets promoted.
* B is partitioned away, and A returns available.
*
* Usually B would PING A publishing its set of served slots and its
* configEpoch, but because of the partition B can't inform A of the
* new configuration, so other nodes that have an updated table must
* do it. In this way A will stop to act as a primary (or can try to
* failover if there are the conditions to win the election). */
for (int j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(hdr->myslots, j)) {
if (server.cluster->slots[j] == sender || isSlotUnclaimed(j)) continue;
if (server.cluster->slots[j]->configEpoch > senderConfigEpoch) {
if (server.cluster->slots[j]->configEpoch > sender_claimed_config_epoch) {
serverLog(LL_VERBOSE,
"Node %.40s has old slots configuration, sending "
"an UPDATE message about %.40s",
Expand All @@ -3262,10 +3240,22 @@ int clusterProcessPacket(clusterLink *link) {
}
}

/* Explicitly check for a replication loop before attempting the replication
* chain folding logic. */
if (myself->replicaof && myself->replicaof->replicaof && myself->replicaof->replicaof != myself) {
/* Safeguard against sub-replicas. A replica's primary can turn itself
* into a replica if its last slot is removed. If no other node takes
* over the slot, there is nothing else to trigger replica migration. */
serverLog(LL_NOTICE, "I'm a sub-replica! Reconfiguring myself as a replica of %.40s from %.40s",
myself->replicaof->replicaof->name, myself->replicaof->name);
clusterSetPrimary(myself->replicaof->replicaof, 1);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
}

/* If our config epoch collides with the sender's try to fix
* the problem. */
if (sender && clusterNodeIsPrimary(myself) && clusterNodeIsPrimary(sender) &&
senderConfigEpoch == myself->configEpoch) {
if (sender && nodeIsPrimary(myself) && nodeIsPrimary(sender) &&
sender_claimed_config_epoch == myself->configEpoch) {
clusterHandleConfigEpochCollision(sender);
}

Expand Down Expand Up @@ -3317,7 +3307,7 @@ int clusterProcessPacket(clusterLink *link) {
/* We consider this vote only if the sender is a primary serving
* a non zero number of slots, and its currentEpoch is greater or
* equal to epoch where this node started the election. */
if (clusterNodeIsVotingPrimary(sender) && senderCurrentEpoch >= server.cluster->failover_auth_epoch) {
if (clusterNodeIsVotingPrimary(sender) && sender_claimed_current_epoch >= server.cluster->failover_auth_epoch) {
server.cluster->failover_auth_count++;
/* Maybe we reached a quorum here, set a flag to make sure
* we check ASAP. */
Expand Down
6 changes: 3 additions & 3 deletions tests/unit/cluster/hostnames.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,18 @@ test "Verify the nodes configured with prefer hostname only show hostname for ne
# to accept our isolated nodes connections. At this point they will
# start showing up in cluster slots.
wait_for_condition 50 100 {
[llength [R 6 CLUSTER SLOTS]] eq 3
[llength [R 6 CLUSTER SLOTS]] eq 2
} else {
fail "Node did not learn about the 2 shards it can talk to"
}
wait_for_condition 50 100 {
[lindex [get_slot_field [R 6 CLUSTER SLOTS] 1 2 3] 1] eq "shard-1.com"
[lindex [get_slot_field [R 6 CLUSTER SLOTS] 0 2 3] 1] eq "shard-1.com"
} else {
fail "hostname for shard-1 didn't reach node 6"
}

wait_for_condition 50 100 {
[lindex [get_slot_field [R 6 CLUSTER SLOTS] 2 2 3] 1] eq "shard-2.com"
[lindex [get_slot_field [R 6 CLUSTER SLOTS] 1 2 3] 1] eq "shard-2.com"
} else {
fail "hostname for shard-2 didn't reach node 6"
}
Expand Down
Loading