Skip to content

Commit

Permalink
ensure migrate slots transmit fresh config snapshot before entering m…
Browse files Browse the repository at this point in the history
…igrating state
  • Loading branch information
vazois committed Oct 24, 2024
1 parent a542d89 commit a71ffad
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 26 deletions.
4 changes: 2 additions & 2 deletions libs/cluster/Server/ClusterManagerWorkerState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public bool TryRemoveWorker(string nodeid, int expirySeconds, out ReadOnlySpan<b
{
try
{
PauseConfigMerge();
SuspendConfigMerge();
errorMessage = default;
while (true)
{
Expand Down Expand Up @@ -98,7 +98,7 @@ public ReadOnlySpan<byte> TryReset(bool soft, int expirySeconds = 60)
{
try
{
PauseConfigMerge();
SuspendConfigMerge();
var resp = CmdStrings.RESP_OK;
while (true)
{
Expand Down
19 changes: 10 additions & 9 deletions libs/cluster/Server/Gossip.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ public MetricsItem[] GetPrimaryLinkStatus(ClusterConfig config)
/// Called when FORGET op executes and waits until ongoing merge operations complete before executing FORGET
/// Multiple FORGET ops can execute at the same time.
/// </summary>
void PauseConfigMerge() => activeMergeLock.WriteLock();
public void SuspendConfigMerge() => activeMergeLock.WriteLock();

/// <summary>
/// Resume config merge
/// </summary>
void ResumeConfigMerge() => activeMergeLock.WriteUnlock();
public void ResumeConfigMerge() => activeMergeLock.WriteUnlock();

/// <summary>
/// Initiate meet and main gossip tasks
Expand All @@ -99,11 +99,11 @@ void TryStartGossipTasks()
/// <summary>
/// Merge incoming config to evolve local version
/// </summary>
public bool TryMerge(ClusterConfig senderConfig)
public bool TryMerge(ClusterConfig senderConfig, bool acquireLock = true)
{
try
{
activeMergeLock.ReadLock();
if (acquireLock) activeMergeLock.ReadLock();
if (workerBanList.ContainsKey(senderConfig.LocalNodeId))
{
logger?.LogTrace("Cannot merge node <{nodeid}> because still in ban list", senderConfig.LocalNodeId);
Expand All @@ -124,7 +124,7 @@ public bool TryMerge(ClusterConfig senderConfig)
}
finally
{
activeMergeLock.ReadUnlock();
if (acquireLock) activeMergeLock.ReadUnlock();
}
}

Expand All @@ -140,9 +140,10 @@ public void RunMeetTask(string address, int port)
/// This task will immediately communicate with the new node and try to merge the retrieve configuration to its own.
/// If node to meet was previous in the ban list then it will not be added to the cluster.
/// </summary>
/// <param name="address"></param>
/// <param name="port"></param>
void TryMeet(string address, int port)
/// <param name="address">Address of node to issue meet to</param>
/// <param name="port"> Port of node to issue meet to</param>
/// <param name="acquireLock">Whether to acquire lock for merging. Default true</param>
public void TryMeet(string address, int port, bool acquireLock = true)
{
GarnetServerNode gsn = null;
var conf = CurrentConfig;
Expand Down Expand Up @@ -176,7 +177,7 @@ void TryMeet(string address, int port)

logger?.LogInformation("MEET {nodeId} {address} {port}", nodeId, address, port);
// Merge without a check because node is trusted as meet was issued by admin
_ = TryMerge(other);
_ = TryMerge(other, acquireLock);

gossipStats.UpdateMeetRequestsSucceed();

Expand Down
29 changes: 21 additions & 8 deletions libs/cluster/Server/Migration/MigrationDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ public bool TryStartMigrationTask(out ReadOnlySpan<byte> errorMessage)
/// </summary>
private void BeginAsyncMigrationTask()
{
var configResumed = true;
try
{

//1. Set target node to import state
// Set target node to import state
if (!TrySetSlotRanges(GetSourceNodeId, MigrateState.IMPORT))
{
logger?.LogError("Failed to set remote slots {slots} to import state", ClusterManager.GetRange([.. GetSlots]));
Expand All @@ -64,7 +64,7 @@ private void BeginAsyncMigrationTask()
}

#region transitionLocalSlotToMigratingState
//2. Set source node to migrating state and wait for local threads to see changed state.
// Set source node to migrating state and wait for local threads to see changed state.
if (!TryPrepareLocalForMigration())
{
logger?.LogError("Failed to set local slots {slots} to migrate state", string.Join(',', GetSlots));
Expand All @@ -77,7 +77,7 @@ private void BeginAsyncMigrationTask()
#endregion

#region migrateData
//3. Migrate actual data
// Migrate actual data
if (!MigrateSlotsDriver())
{
logger?.LogError("MigrateSlotsDriver failed");
Expand All @@ -88,7 +88,12 @@ private void BeginAsyncMigrationTask()
#endregion

#region transferSlotOwnnershipToTargetNode
//5. Change ownership of slots to target node.
// Lock config merge to avoid a background epoch bump
clusterProvider.clusterManager.SuspendConfigMerge();
configResumed = false;
clusterProvider.clusterManager.TryMeet(_targetAddress, _targetPort, acquireLock: false);

// Change ownership of slots to target node.
if (!TrySetSlotRanges(GetTargetNodeId, MigrateState.NODE))
{
logger?.LogError("Failed to assign ownership to target node:({tgtNodeId}) ({endpoint})", GetTargetNodeId, GetTargetEndpoint);
Expand All @@ -97,17 +102,24 @@ private void BeginAsyncMigrationTask()
return;
}

//6. Clear local migration set.
// Clear local migration set.
if (!RelinquishOwnership())
{
logger?.LogError("Failed to relinquish ownership from source node:({srcNode}) to target node: ({tgtNode})", GetSourceNodeId, GetTargetNodeId);
TryRecoverFromFailure();
Status = MigrateState.FAIL;
return;
}

// Gossip again to ensure that source and target agree on the slot exchange
clusterProvider.clusterManager.TryMeet(_targetAddress, _targetPort, acquireLock: false);

// Ensure that config merge resumes
clusterProvider.clusterManager.ResumeConfigMerge();
configResumed = true;
#endregion

//7. Enqueue success log
// Enqueue success log
Status = MigrateState.SUCCESS;
}
catch (Exception ex)
Expand All @@ -116,7 +128,8 @@ private void BeginAsyncMigrationTask()
}
finally
{
clusterProvider.migrationManager.TryRemoveMigrationTask(this);
if (!configResumed) clusterProvider.clusterManager.ResumeConfigMerge();
_ = clusterProvider.migrationManager.TryRemoveMigrationTask(this);
}
}
}
Expand Down
8 changes: 1 addition & 7 deletions playground/MigrateBench/MigrateSlotWalk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,7 @@ public class MigrateSlotWalk
public MigrateSlotWalk(Options opts, ILogger logger = null)
{
this.opts = opts;

var endpoints = opts.Endpoints.ToArray();

if (endpoints.Length != 3)
throw new Exception("Exactly 3 nodes are needed for this scenario");

nodes = new Node[endpoints.Length];
for (var i = 0; i < nodes.Length; i++)
{
Expand All @@ -39,7 +34,6 @@ public MigrateSlotWalk(Options opts, ILogger logger = null)
nodes[i].port = endpoint[1];
nodes[i].gc = new GarnetClientSession(nodes[i].address, int.Parse(nodes[i].port), new NetworkBufferSettings(1 << 22));
}

this.logger = logger;
}

Expand Down Expand Up @@ -102,8 +96,8 @@ public void Run()
// n0 MIGRATING slot
resp = nodes[_src].gc.ExecuteAsync(migrating).GetAwaiter().GetResult();

// Send data if any to the target node
getkeysinslot[3] = nodes[_src].gc.ExecuteAsync(countkeysinslot).GetAwaiter().GetResult();

if (int.Parse(getkeysinslot[3]) > 0)
{
var keys = nodes[_src].gc.ExecuteForArrayAsync(getkeysinslot).GetAwaiter().GetResult();
Expand Down

0 comments on commit a71ffad

Please sign in to comment.