Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ActivationMigrationManager to batch grain activation migrations #8474

Merged
merged 2 commits into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Orleans.Core/Runtime/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ internal static class Constants
public static readonly GrainType StreamPullingAgentManagerType = SystemTargetGrainId.CreateGrainType("stream.agent-mgr");
public static readonly GrainType StreamPullingAgentType = SystemTargetGrainId.CreateGrainType("stream.agent");
public static readonly GrainType ManifestProviderType = SystemTargetGrainId.CreateGrainType("manifest");
public static readonly GrainType ActivationMigratorType = SystemTargetGrainId.CreateGrainType("migrator");

public static readonly GrainId SiloDirectConnectionId = GrainId.Create(
GrainType.Create(GrainTypePrefix.SystemPrefix + "silo"),
Expand Down
15 changes: 0 additions & 15 deletions src/Orleans.Core/SystemTargetInterfaces/ICatalog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,5 @@ internal interface ICatalog : ISystemTarget
/// <param name="reasonText"></param>
/// <returns></returns>
Task DeleteActivations(List<GrainAddress> activationAddresses, DeactivationReasonCode reasonCode, string reasonText);

/// <summary>
/// Accepts migrating grains.
/// </summary>
ValueTask AcceptMigratingGrains(List<GrainMigrationPackage> migratingGrains);
}

[GenerateSerializer, Immutable]
internal struct GrainMigrationPackage
{
[Id(0)]
public GrainId GrainId { get; set; }

[Id(1)]
public MigrationContext MigrationContext { get; set; }
}
}
13 changes: 6 additions & 7 deletions src/Orleans.Runtime/Catalog/ActivationData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace Orleans.Runtime
/// </summary>
internal sealed class ActivationData : IGrainContext, ICollectibleGrainContext, IGrainExtensionBinder, IActivationWorkingSetMember, IGrainTimerRegistry, IGrainManagementExtension, ICallChainReentrantGrainContext, IAsyncDisposable
{
private const string GrainAddressMigrationContextKey = "sys.addr";
private readonly GrainTypeSharedContext _shared;
private readonly IServiceScope _serviceScope;
private readonly WorkItemGroup _workItemGroup;
Expand Down Expand Up @@ -1078,7 +1079,7 @@ private void RehydrateInternal(IRehydrationContext context)
throw new InvalidOperationException($"Attempted to rehydrate a grain in the {State} state");
}

if (context.TryGetValue("sys.addr", out GrainAddress previousRegistration) && previousRegistration is not null)
if (context.TryGetValue(GrainAddressMigrationContextKey, out GrainAddress previousRegistration) && previousRegistration is not null)
{
// Propagate the previous registration, so that the new activation can atomically replace it with its new address.
(_extras ??= new()).PreviousRegistration = previousRegistration;
Expand Down Expand Up @@ -1138,7 +1139,7 @@ private void OnDehydrate(IDehydrationContext context)

if (IsUsingGrainDirectory)
{
context.TryAddValue("sys.addr", Address);
context.TryAddValue(GrainAddressMigrationContextKey, Address);
}
}

Expand Down Expand Up @@ -1604,7 +1605,7 @@ private async Task FinishDeactivating(CancellationToken cancellationToken)
await WaitForAllTimersToFinish(cancellationToken);
await CallGrainDeactivate(cancellationToken);

if (DehydrationContext is { } context)
if (DehydrationContext is { } context && _shared.MigrationManager is { } migrationManager)
{
Debug.Assert(ForwardingAddress is not null);

Expand All @@ -1622,10 +1623,8 @@ private async Task FinishDeactivating(CancellationToken cancellationToken)

OnDehydrate(context.Value);

// Send the dehydration context to the target host
// TODO: Coalesce concurrent requests into fewer calls by delegating to a shared service.
var remoteCatalog = _shared.Runtime.ServiceProvider.GetRequiredService<IInternalGrainFactory>().GetSystemTarget<ICatalog>(Constants.CatalogType, ForwardingAddress);
await remoteCatalog.AcceptMigratingGrains(new List<GrainMigrationPackage>() { new GrainMigrationPackage() { GrainId = GrainId, MigrationContext = context.Value } });
// Send the dehydration context to the target host.
await migrationManager.MigrateAsync(ForwardingAddress, GrainId, context.Value);
migrated = true;
}
catch (Exception exception)
Expand Down
320 changes: 320 additions & 0 deletions src/Orleans.Runtime/Catalog/ActivationMigrationManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,320 @@
#nullable enable
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ObjectPool;
using Orleans.Runtime.Internal;
using Orleans.Runtime.Scheduler;

namespace Orleans.Runtime;

/// <summary>
/// Remote interface for migrating grain activations to a silo.
/// </summary>
internal interface IActivationMigrationManagerSystemTarget : ISystemTarget
{
/// <summary>
/// Accepts migrating grains on a best-effort basis.
/// </summary>
ValueTask AcceptMigratingGrains([Immutable] List<GrainMigrationPackage> migratingGrains);
}

[GenerateSerializer, Immutable]
internal struct GrainMigrationPackage
{
[Id(0)]
public GrainId GrainId { get; set; }

[Id(1)]
public MigrationContext MigrationContext { get; set; }
}

/// <summary>
/// Functionality for migrating an activation to a new location.
/// </summary>
internal interface IActivationMigrationManager
{
/// <summary>
/// Attempts to migrate a grain to the specified target.
/// </summary>
/// <param name="target">The migration target.</param>
/// <param name="grainId">The grain being migrated.</param>
/// <param name="migrationContext">Information about the grain being migrated, which will be consumed by the new activation.</param>
ValueTask MigrateAsync(SiloAddress target, GrainId grainId, MigrationContext migrationContext);
}

/// <summary>
/// Migrates grain activations to target hosts and handles migration requests from other hosts.
/// </summary>
internal class ActivationMigrationManager : SystemTarget, IActivationMigrationManagerSystemTarget, IActivationMigrationManager
{
private const int MaxBatchSize = 1_000;
private readonly ConcurrentDictionary<SiloAddress, (Task PumpTask, Channel<MigrationWorkItem> WorkItemChannel)> _workers = new();
private readonly ObjectPool<MigrationWorkItem> _workItemPool = ObjectPool.Create(new MigrationWorkItem.ObjectPoolPolicy());
private readonly ILogger<ActivationMigrationManager> _logger;
private readonly IInternalGrainFactory _grainFactory;
private readonly Catalog _catalog;
private readonly IClusterMembershipService _clusterMembershipService;
private readonly object _lock = new();

#pragma warning disable IDE0052 // Remove unread private members. Justification: this field is only for diagnostic purposes.
private readonly Task? _membershipUpdatesTask;
#pragma warning restore IDE0052 // Remove unread private members

public ActivationMigrationManager(
ILocalSiloDetails localSiloDetails,
ILoggerFactory loggerFactory,
IInternalGrainFactory grainFactory,
Catalog catalog,
IClusterMembershipService clusterMembershipService) : base(Constants.ActivationMigratorType, localSiloDetails.SiloAddress, loggerFactory)
{
_grainFactory = grainFactory;
_logger = loggerFactory.CreateLogger<ActivationMigrationManager>();
_catalog = catalog;
_clusterMembershipService = clusterMembershipService;
_catalog.RegisterSystemTarget(this);

{
using var _ = new ExecutionContextSuppressor();
_membershipUpdatesTask = Task.Factory.StartNew(
state => ((ActivationMigrationManager)state!).ProcessMembershipUpdates(),
this,
CancellationToken.None,
TaskCreationOptions.None,
WorkItemGroup.TaskScheduler).Unwrap();
_membershipUpdatesTask.Ignore();
}
}

public ValueTask AcceptMigratingGrains(List<GrainMigrationPackage> migratingGrains)
{
foreach (var package in migratingGrains)
{
// If the activation does not exist, create it and provide it with the migration context while doing so.
// If the activation already exists or cannot be created, it is too late to perform migration, so ignore the request.
_catalog.GetOrCreateActivation(package.GrainId, requestContextData: null, package.MigrationContext);
}

return default;
}

public ValueTask MigrateAsync(SiloAddress targetSilo, GrainId grainId, MigrationContext migrationContext)
{
var workItem = _workItemPool.Get();
var migrationPackage = new GrainMigrationPackage { GrainId = grainId, MigrationContext = migrationContext };
workItem.Initialize(migrationPackage);
var workItemWriter = GetOrCreateWorker(targetSilo);
if (!workItemWriter.TryWrite(workItem))
{
workItem.SetException(new SiloUnavailableException($"Silo {targetSilo} is no longer active"));
}

return workItem.AsValueTask();
}

private async Task ProcessMembershipUpdates()
{
await Task.Yield();

try
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Monitoring cluster membership updates");
}

var previousSnapshot = _clusterMembershipService.CurrentSnapshot;
await foreach (var snapshot in _clusterMembershipService.MembershipUpdates)
{
try
{
var diff = snapshot.CreateUpdate(previousSnapshot);
previousSnapshot = snapshot;
foreach (var change in diff.Changes)
{
if (change.Status.IsTerminating())
{
RemoveWorker(change.SiloAddress);
}
}
}
catch (Exception exception)
{
_logger.LogError(exception, "Error processing cluster membership updates");
}
}
}
finally
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("No longer monitoring cluster membership updates");
}
}
}

private async Task PumpMigrationQueue(SiloAddress targetSilo, Channel<MigrationWorkItem> workItems)
{
try
{
var remote = _grainFactory.GetSystemTarget<IActivationMigrationManagerSystemTarget>(Constants.ActivationMigratorType, targetSilo);
await Task.Yield();

if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Starting migration worker for target silo {SiloAddress}", targetSilo);
}

var items = new List<MigrationWorkItem>();
var batch = new List<GrainMigrationPackage>();
var reader = workItems.Reader;
while (await reader.WaitToReadAsync())
{
try
{
// Collect a batch of work items.
while (batch.Count < MaxBatchSize && reader.TryRead(out var workItem))
{
items.Add(workItem);
batch.Add(workItem.Value);
}

// Attempt to migrate the batch.
await remote.AcceptMigratingGrains(batch);

foreach (var item in items)
{
item.SetCompleted();
}

if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Migrated {Count} activations to target silo {SiloAddress}", items.Count, targetSilo);
}
}
catch (Exception exception)
{
_logger.LogError(exception, "Error while migrating {Count} grain activations to {SiloAddress}", items.Count, targetSilo);

foreach (var item in items)
{
item.SetException(exception);
}

// If the silo is terminating, we should stop trying to migrate activations to it.
if (_clusterMembershipService.CurrentSnapshot.GetSiloStatus(targetSilo).IsTerminating())
{
break;
}
}
finally
{
items.Clear();
batch.Clear();
}
}

// Remove ourselves and clean up.
RemoveWorker(targetSilo);
}
finally
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Exiting migration worker for target silo {SiloAddress}", targetSilo);
}
}
}

private ChannelWriter<MigrationWorkItem> GetOrCreateWorker(SiloAddress targetSilo)
{
if (!_workers.TryGetValue(targetSilo, out var entry))
{
lock (_lock)
{
if (!_workers.TryGetValue(targetSilo, out entry))
{
using var _ = new ExecutionContextSuppressor();
var channel = Channel.CreateUnbounded<MigrationWorkItem>();
var pumpTask = Task.Factory.StartNew(
() => PumpMigrationQueue(targetSilo, channel),
CancellationToken.None,
TaskCreationOptions.None,
WorkItemGroup.TaskScheduler).Unwrap();
pumpTask.Ignore();

entry = (pumpTask, channel);
var didAdd = _workers.TryAdd(targetSilo, entry);
Debug.Assert(didAdd);
}
}
}

return entry.WorkItemChannel.Writer;
}

private void RemoveWorker(SiloAddress targetSilo)
{
if (_workers.TryRemove(targetSilo, out var entry))
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Target silo {SiloAddress} is no longer active, so this migration activation worker is terminating", targetSilo);
}

entry.WorkItemChannel.Writer.TryComplete();

var exception = new SiloUnavailableException($"Silo {targetSilo} is no longer active");
while (entry.WorkItemChannel.Reader.TryRead(out var item))
{
item.SetException(exception);
}
}
}

private class MigrationWorkItem : IValueTaskSource
{
private ManualResetValueTaskSourceCore<int> _core = new() { RunContinuationsAsynchronously = true };
private GrainMigrationPackage _migrationPackage;

public void Initialize(GrainMigrationPackage package) => _migrationPackage = package;
public void Reset() => _core.Reset();

public GrainMigrationPackage Value => _migrationPackage;

public void SetCompleted() => _core.SetResult(0);
public void SetException(Exception exception) => _core.SetException(exception);
public ValueTask AsValueTask() => new (this, _core.Version);

public void GetResult(short token)
{
try
{
_core.GetResult(token);
}
finally
{
Reset();
}
}

public ValueTaskSourceStatus GetStatus(short token) => _core.GetStatus(token);
public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _core.OnCompleted(continuation, state, token, flags);

public sealed class ObjectPoolPolicy : IPooledObjectPolicy<MigrationWorkItem>
{
public MigrationWorkItem Create() => new();
public bool Return(MigrationWorkItem obj)
{
obj.Reset();
return true;
}
}
}
}
Loading