From e0d75e08a09bf8e2118171fb4392b8ed46c15346 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Tue, 27 Feb 2024 11:57:16 -0800 Subject: [PATCH] Fix #8873 StatelessWorker MaxLocal property not being correctly accounted for --- .../GrainDirectory/GrainDirectoryResolver.cs | 2 +- .../Hosting/DefaultSiloServices.cs | 3 +- .../Hosting/DirectorySiloBuilderExtensions.cs | 45 ++-- src/Orleans.Runtime/Hosting/NamedService.cs | 16 +- .../Hosting/PlacementStrategyExtensions.cs | 29 ++- .../Placement/PlacementStrategyResolver.cs | 24 +- .../IPlacementTestGrain.cs | 12 +- .../TestInternalGrains/PlacementTestGrain.cs | 70 ++++-- .../GrainPlacementClusterChangeTests.cs | 62 +++++ .../General/GrainPlacementTests.cs | 231 +++--------------- 10 files changed, 215 insertions(+), 279 deletions(-) create mode 100644 test/TesterInternal/General/GrainPlacementClusterChangeTests.cs diff --git a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryResolver.cs b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryResolver.cs index 3ecb1781ce2..c608686e73c 100644 --- a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryResolver.cs +++ b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryResolver.cs @@ -29,7 +29,7 @@ public GrainDirectoryResolver( var services = serviceProvider.GetGrainDirectories(); foreach (var svc in services) { - this.directoryPerName.Add(svc.Name, svc.Service); + this.directoryPerName.Add(svc.Name, serviceProvider.GetRequiredKeyedService(svc.Name)); } this.directoryPerName.TryGetValue(GrainDirectoryAttribute.DEFAULT_GRAIN_DIRECTORY, out var defaultDirectory); diff --git a/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs b/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs index 98165b244a5..5f7c1b95658 100644 --- a/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs +++ b/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs @@ -200,8 +200,7 @@ internal static void AddDefaultServices(ISiloBuilder builder) // Placement directors services.AddPlacementDirector(); services.AddPlacementDirector(); - services.AddPlacementDirector(); - services.Replace(new ServiceDescriptor(typeof(StatelessWorkerPlacement), sp => new StatelessWorkerPlacement(), ServiceLifetime.Singleton)); + services.AddPlacementDirector(ServiceLifetime.Transient); services.AddPlacementDirector(); services.AddPlacementDirector(); services.AddPlacementDirector(); diff --git a/src/Orleans.Runtime/Hosting/DirectorySiloBuilderExtensions.cs b/src/Orleans.Runtime/Hosting/DirectorySiloBuilderExtensions.cs index 6d559b98bae..e1f1c7ebf30 100644 --- a/src/Orleans.Runtime/Hosting/DirectorySiloBuilderExtensions.cs +++ b/src/Orleans.Runtime/Hosting/DirectorySiloBuilderExtensions.cs @@ -1,8 +1,6 @@ +#nullable enable using System; using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Orleans.GrainDirectory; using Orleans.Hosting; @@ -21,7 +19,7 @@ public static class DirectorySiloBuilderExtensions /// Factory to build the grain directory provider. /// The silo builder. public static ISiloBuilder AddGrainDirectory(this ISiloBuilder builder, string name, Func implementationFactory) - where T : IGrainDirectory + where T : class, IGrainDirectory { builder.Services.AddGrainDirectory(name, implementationFactory); return builder; @@ -37,38 +35,29 @@ public static ISiloBuilder AddGrainDirectory(this ISiloBuilder builder, strin /// Factory to build the grain directory provider. /// The service collection. public static IServiceCollection AddGrainDirectory(this IServiceCollection collection, string name, Func implementationFactory) - where T : IGrainDirectory + where T : class, IGrainDirectory { - collection.AddSingleton(sp => new NamedService(name, implementationFactory(sp, name))); - // Check if the grain directory implements ILifecycleParticipant - if (typeof(ILifecycleParticipant).IsAssignableFrom(typeof(T))) - { - collection.AddSingleton(s => (ILifecycleParticipant)s.GetGrainDirectory(name)); - } + // Register the grain directory name so that directories can be enumerated by name. + collection.AddSingleton(sp => new NamedService(name)); + + // Register the grain directory implementation. + collection.AddKeyedSingleton(name, (sp, key) => implementationFactory(sp, name)); + collection.AddSingleton>(s => + s.GetKeyedService(name) as ILifecycleParticipant ?? NoOpLifecycleParticipant.Instance); + return collection; } - /// - /// Get the directory registered with . - /// - /// The service provider. - /// The name of the grain directory to resolve. - /// The grain directory registered with , or null if it is not found - public static IGrainDirectory GetGrainDirectory(this IServiceProvider serviceProvider, string name) + internal static IEnumerable> GetGrainDirectories(this IServiceProvider serviceProvider) { - foreach (var directory in serviceProvider.GetGrainDirectories()) - { - if (directory.Name.Equals(name)) - { - return directory.Service; - } - } - return null; + return serviceProvider.GetServices>() ?? []; } - internal static IEnumerable> GetGrainDirectories(this IServiceProvider serviceProvider) + private sealed class NoOpLifecycleParticipant : ILifecycleParticipant { - return serviceProvider.GetServices>() ?? Enumerable.Empty>(); + public static readonly NoOpLifecycleParticipant Instance = new(); + + public void Participate(ISiloLifecycle observer) { } } } } diff --git a/src/Orleans.Runtime/Hosting/NamedService.cs b/src/Orleans.Runtime/Hosting/NamedService.cs index a987f238242..19c31719d26 100644 --- a/src/Orleans.Runtime/Hosting/NamedService.cs +++ b/src/Orleans.Runtime/Hosting/NamedService.cs @@ -1,15 +1,9 @@ -namespace Orleans.Runtime.Hosting +using System; + +namespace Orleans.Runtime.Hosting { - internal class NamedService + internal class NamedService(string name) { - public NamedService(string name, TService service) - { - Name= name; - Service = service; - } - - public string Name { get; } - - public TService Service { get; } + public string Name { get; } = name; } } diff --git a/src/Orleans.Runtime/Hosting/PlacementStrategyExtensions.cs b/src/Orleans.Runtime/Hosting/PlacementStrategyExtensions.cs index b30bb0091ee..abbcb9dd217 100644 --- a/src/Orleans.Runtime/Hosting/PlacementStrategyExtensions.cs +++ b/src/Orleans.Runtime/Hosting/PlacementStrategyExtensions.cs @@ -1,7 +1,6 @@ using System; using Microsoft.Extensions.DependencyInjection; using Orleans.Runtime; -using Orleans.Runtime.Hosting; using Orleans.Runtime.Placement; namespace Orleans.Hosting @@ -46,14 +45,25 @@ public static ISiloBuilder AddPlacementDirector(this ISiloBuilder bui /// The service collection. /// The service collection. public static void AddPlacementDirector(this IServiceCollection services) + where TStrategy : PlacementStrategy, new() + where TDirector : class, IPlacementDirector => services.AddPlacementDirector(ServiceLifetime.Singleton); + + /// + /// Configures a as the placement director for placement strategy . + /// + /// The placement strategy. + /// The placement director. + /// The service collection. + /// The lifetime of the placement strategy. + /// The service collection. + public static void AddPlacementDirector(this IServiceCollection services, ServiceLifetime strategyLifetime) where TStrategy : PlacementStrategy, new() where TDirector : class, IPlacementDirector { - services.AddSingleton(new NamedService(typeof(TStrategy).Name, new TStrategy())); + services.Add(ServiceDescriptor.DescribeKeyed(typeof(PlacementStrategy), typeof(TStrategy).Name, typeof(TStrategy), strategyLifetime)); services.AddKeyedSingleton(typeof(TStrategy)); } - /// /// Adds a placement director. /// @@ -62,9 +72,20 @@ public static void AddPlacementDirector(this IServiceColle /// The delegate used to create the placement director. /// The service collection. public static void AddPlacementDirector(this IServiceCollection services, Func createDirector) + where TStrategy : PlacementStrategy, new() => services.AddPlacementDirector(createDirector, ServiceLifetime.Singleton); + + /// + /// Adds a placement director. + /// + /// The placement strategy. + /// The service collection. + /// The delegate used to create the placement director. + /// The lifetime of the placement strategy. + /// The service collection. + public static void AddPlacementDirector(this IServiceCollection services, Func createDirector, ServiceLifetime strategyLifetime) where TStrategy : PlacementStrategy, new() { - services.AddSingleton(new NamedService(typeof(TStrategy).Name, new TStrategy())); + services.Add(ServiceDescriptor.DescribeKeyed(typeof(PlacementStrategy), typeof(TStrategy).Name, typeof(TStrategy), strategyLifetime)); services.AddKeyedSingleton(typeof(TStrategy), (sp, type) => createDirector(sp)); } } diff --git a/src/Orleans.Runtime/Placement/PlacementStrategyResolver.cs b/src/Orleans.Runtime/Placement/PlacementStrategyResolver.cs index ba12be3f2ce..6f95a838aba 100644 --- a/src/Orleans.Runtime/Placement/PlacementStrategyResolver.cs +++ b/src/Orleans.Runtime/Placement/PlacementStrategyResolver.cs @@ -6,6 +6,8 @@ using System.Collections.Immutable; using System.Collections.Concurrent; using Orleans.Runtime.Hosting; +using System.Collections.Frozen; +using Orleans.GrainDirectory; namespace Orleans.Runtime.Placement { @@ -18,8 +20,8 @@ public sealed class PlacementStrategyResolver private readonly Func _getStrategyInternal; private readonly IPlacementStrategyResolver[] _resolvers; private readonly GrainPropertiesResolver _grainPropertiesResolver; - private readonly ImmutableDictionary _strategies; private readonly PlacementStrategy _defaultPlacementStrategy; + private readonly IServiceProvider _services; /// /// Create a instance. @@ -29,22 +31,11 @@ public PlacementStrategyResolver( IEnumerable resolvers, GrainPropertiesResolver grainPropertiesResolver) { + _services = services; _getStrategyInternal = GetPlacementStrategyInternal; _resolvers = resolvers.ToArray(); _grainPropertiesResolver = grainPropertiesResolver; _defaultPlacementStrategy = services.GetService(); - _strategies = GetAllStrategies(services); - - static ImmutableDictionary GetAllStrategies(IServiceProvider services) - { - var builder = ImmutableDictionary.CreateBuilder(); - foreach (var service in services.GetServices>()) - { - builder[service.Name] = service.Service; - } - - return builder.ToImmutable(); - } } /// @@ -52,7 +43,7 @@ static ImmutableDictionary GetAllStrategies(IServiceP /// public PlacementStrategy GetPlacementStrategy(GrainType grainType) => _resolvedStrategies.GetOrAdd(grainType, _getStrategyInternal); - internal bool TryGetNonDefaultPlacementStrategy(GrainType grainType, out PlacementStrategy strategy) + private bool TryGetNonDefaultPlacementStrategy(GrainType grainType, out PlacementStrategy strategy) { _grainPropertiesResolver.TryGetGrainProperties(grainType, out var properties); @@ -68,14 +59,15 @@ internal bool TryGetNonDefaultPlacementStrategy(GrainType grainType, out Placeme && properties.Properties.TryGetValue(WellKnownGrainTypeProperties.PlacementStrategy, out var placementStrategyId) && !string.IsNullOrWhiteSpace(placementStrategyId)) { - if (_strategies.TryGetValue(placementStrategyId, out strategy)) + strategy = _services.GetKeyedService(placementStrategyId); + if (strategy is not null) { strategy.Initialize(properties); return true; } else { - throw new KeyNotFoundException($"Could not resolve placement strategy {placementStrategyId} for grain type {grainType}"); + throw new KeyNotFoundException($"Could not resolve placement strategy {placementStrategyId} for grain type {grainType}."); } } diff --git a/test/Grains/TestGrainInterfaces/IPlacementTestGrain.cs b/test/Grains/TestGrainInterfaces/IPlacementTestGrain.cs index c7feab1aee5..4d8bd21592f 100644 --- a/test/Grains/TestGrainInterfaces/IPlacementTestGrain.cs +++ b/test/Grains/TestGrainInterfaces/IPlacementTestGrain.cs @@ -1,4 +1,4 @@ -using System.Net; +using System.Net; using Orleans.Runtime; namespace UnitTests.GrainInterfaces @@ -29,8 +29,14 @@ public interface IRandomPlacementTestGrain : IPlacementTestGrain public interface IPreferLocalPlacementTestGrain : IPlacementTestGrain { } - public interface ILocalPlacementTestGrain : IPlacementTestGrain - { } + public interface IStatelessWorkerPlacementTestGrain : IPlacementTestGrain + { + ValueTask GetWorkerLimit(); + } + + public interface IOtherStatelessWorkerPlacementTestGrain : IStatelessWorkerPlacementTestGrain + { + } internal interface IDefaultPlacementTestGrain { diff --git a/test/Grains/TestInternalGrains/PlacementTestGrain.cs b/test/Grains/TestInternalGrains/PlacementTestGrain.cs index 61bba48b510..5a76481c824 100644 --- a/test/Grains/TestInternalGrains/PlacementTestGrain.cs +++ b/test/Grains/TestInternalGrains/PlacementTestGrain.cs @@ -10,6 +10,7 @@ using Orleans.Runtime.TestHooks; using Orleans.Statistics; using UnitTests.GrainInterfaces; +using Xunit; namespace UnitTests.Grains { @@ -58,7 +59,7 @@ public Task Nop() public Task StartLocalGrains(List keys) { // we call Nop() on the grain references to ensure that they're instantiated before the promise is delivered. - var grains = keys.Select(i => GrainFactory.GetGrain(i)); + var grains = keys.Select(i => GrainFactory.GetGrain(i)); var promises = grains.Select(g => g.Nop()); return Task.WhenAll(promises); } @@ -70,7 +71,7 @@ public async Task StartPreferLocalGrain(Guid key) return key; } - private static IEnumerable> SampleLocalGrainEndpoint(ILocalPlacementTestGrain grain, int sampleSize) + private static IEnumerable> SampleLocalGrainEndpoint(IStatelessWorkerPlacementTestGrain grain, int sampleSize) { for (var i = 0; i < sampleSize; ++i) yield return grain.GetEndpoint(); @@ -78,20 +79,18 @@ private static IEnumerable> SampleLocalGrainEndpoint(ILocalPlac public async Task> SampleLocalGrainEndpoint(Guid key, int sampleSize) { - var grain = GrainFactory.GetGrain(key); - var p = await Task.WhenAll(SampleLocalGrainEndpoint(grain, sampleSize)); + var grain = GrainFactory.GetGrain(key); + var p = await Task.WhenAll(SampleLocalGrainEndpoint(grain, sampleSize)); return p.ToList(); } - private static async Task PropigateStatisticsToCluster(IGrainFactory grainFactory) + private static async Task PropagateStatisticsToCluster(IGrainFactory grainFactory) { - // force the latched statistics to propigate throughout the cluster. - IManagementGrain mgmtGrain = - grainFactory.GetGrain(0); - - var hosts = await mgmtGrain.GetHosts(true); + // force the latched statistics to propagate throughout the cluster. + var managementGrain = grainFactory.GetGrain(0); + var hosts = await managementGrain.GetHosts(true); var keys = hosts.Select(kvp => kvp.Key).ToArray(); - await mgmtGrain.ForceRuntimeStatisticsCollection(keys); + await managementGrain.ForceRuntimeStatisticsCollection(keys); } public Task EnableOverloadDetection(bool enabled) @@ -104,28 +103,28 @@ public Task LatchOverloaded() { var stats = environmentStatistics.GetEnvironmentStatistics(); environmentStatistics.SetHardwareStatistics(new(loadSheddingOptions.CpuThreshold + 1, stats.MemoryUsageBytes, stats.AvailableMemoryBytes, stats.MaximumAvailableMemoryBytes)); - return PropigateStatisticsToCluster(GrainFactory); + return PropagateStatisticsToCluster(GrainFactory); } public Task UnlatchOverloaded() { var stats = environmentStatistics.GetEnvironmentStatistics(); environmentStatistics.SetHardwareStatistics(new(0, stats.MemoryUsageBytes, stats.AvailableMemoryBytes, stats.MaximumAvailableMemoryBytes)); - return PropigateStatisticsToCluster(GrainFactory); + return PropagateStatisticsToCluster(GrainFactory); } public Task LatchCpuUsage(float value) { var stats = environmentStatistics.GetEnvironmentStatistics(); environmentStatistics.SetHardwareStatistics(new(value, stats.MemoryUsageBytes, stats.AvailableMemoryBytes, stats.MaximumAvailableMemoryBytes)); - return PropigateStatisticsToCluster(GrainFactory); + return PropagateStatisticsToCluster(GrainFactory); } public Task UnlatchCpuUsage() { var stats = environmentStatistics.GetEnvironmentStatistics(); environmentStatistics.SetHardwareStatistics(new(0, stats.MemoryUsageBytes, stats.AvailableMemoryBytes, stats.MaximumAvailableMemoryBytes)); - return PropigateStatisticsToCluster(GrainFactory); + return PropagateStatisticsToCluster(GrainFactory); } public Task GetLocation() @@ -160,10 +159,10 @@ public PreferLocalPlacementTestGrain( } } - [StatelessWorker] - internal class LocalPlacementTestGrain : PlacementTestGrainBase, ILocalPlacementTestGrain + [StatelessWorker(1)] + internal class StatelessWorkerPlacementTestGrain : PlacementTestGrainBase, IStatelessWorkerPlacementTestGrain { - public LocalPlacementTestGrain( + public StatelessWorkerPlacementTestGrain( OverloadDetector overloadDetector, TestHooksEnvironmentStatisticsProvider hostEnvironmentStatistics, IOptions loadSheddingOptions, @@ -171,6 +170,41 @@ public LocalPlacementTestGrain( : base(overloadDetector, hostEnvironmentStatistics, loadSheddingOptions, grainContext) { } + + public ValueTask GetWorkerLimit() + { + var placementStrategy = GrainContext.GetComponent(); + if (placementStrategy is not StatelessWorkerPlacement statelessWorkerPlacement) + { + throw new InvalidOperationException($"Unexpected placement strategy: {placementStrategy}"); + } + + return new(statelessWorkerPlacement.MaxLocal); + } + } + + [StatelessWorker(2)] + internal class OtherStatelessWorkerPlacementTestGrain : PlacementTestGrainBase, IOtherStatelessWorkerPlacementTestGrain + { + public OtherStatelessWorkerPlacementTestGrain( + OverloadDetector overloadDetector, + TestHooksEnvironmentStatisticsProvider hostEnvironmentStatistics, + IOptions loadSheddingOptions, + IGrainContext grainContext) + : base(overloadDetector, hostEnvironmentStatistics, loadSheddingOptions, grainContext) + { + } + + public ValueTask GetWorkerLimit() + { + var placementStrategy = GrainContext.GetComponent(); + if (placementStrategy is not StatelessWorkerPlacement statelessWorkerPlacement) + { + throw new InvalidOperationException($"Unexpected placement strategy: {placementStrategy}"); + } + + return new(statelessWorkerPlacement.MaxLocal); + } } [ActivationCountBasedPlacement] diff --git a/test/TesterInternal/General/GrainPlacementClusterChangeTests.cs b/test/TesterInternal/General/GrainPlacementClusterChangeTests.cs new file mode 100644 index 00000000000..26c1606f173 --- /dev/null +++ b/test/TesterInternal/General/GrainPlacementClusterChangeTests.cs @@ -0,0 +1,62 @@ +using System.Net; +using Orleans.TestingHost; +using TestExtensions; +using UnitTests.GrainInterfaces; +using Xunit; +using Xunit.Abstractions; + +namespace UnitTests.General +{ + public sealed class GrainPlacementClusterChangeTests(ITestOutputHelper output) : TestClusterPerTest + { + [Theory] + [InlineData("Primary")] + [InlineData("Secondary")] + [TestCategory("BVT"), TestCategory("Placement")] + public async Task PreferLocalPlacementGrain_ShouldMigrateWhenHostSiloKilled(string value) + { + foreach (SiloHandle silo in HostedCluster.GetActiveSilos()) + { + output.WriteLine( + "Silo {0} : Address = {1} Proxy gateway: {2}", + silo.Name, silo.SiloAddress, silo.GatewayAddress); + } + + IPEndPoint targetSilo; + if (value == "Primary") + { + targetSilo = HostedCluster.Primary.SiloAddress.Endpoint; + } + else + { + targetSilo = HostedCluster.SecondarySilos.First().SiloAddress.Endpoint; + } + + Guid proxyKey; + IRandomPlacementTestGrain proxy; + IPEndPoint expected; + do + { + proxyKey = Guid.NewGuid(); + proxy = GrainFactory.GetGrain(proxyKey); + expected = await proxy.GetEndpoint(); + } while (!targetSilo.Equals(expected)); + output.WriteLine("Proxy grain was originally located on silo {0}", expected); + + Guid grainKey = proxyKey; + await proxy.StartPreferLocalGrain(grainKey); + IPreferLocalPlacementTestGrain grain = GrainFactory.GetGrain(grainKey); + IPEndPoint actual = await grain.GetEndpoint(); + output.WriteLine("PreferLocalPlacement grain was originally located on silo {0}", actual); + Assert.Equal(expected, actual); // "PreferLocalPlacement strategy should create activations on the local silo." + + SiloHandle siloToKill = HostedCluster.GetActiveSilos().First(s => s.SiloAddress.Endpoint.Equals(expected)); + output.WriteLine("Killing silo {0} hosting locally placed grain", siloToKill); + await HostedCluster.StopSiloAsync(siloToKill); + + IPEndPoint newActual = await grain.GetEndpoint(); + output.WriteLine("PreferLocalPlacement grain was recreated on silo {0}", newActual); + Assert.NotEqual(expected, newActual); // "PreferLocalPlacement strategy should recreate activations on other silo if local fails." + } + } +} diff --git a/test/TesterInternal/General/GrainPlacementTests.cs b/test/TesterInternal/General/GrainPlacementTests.cs index ede2214ec29..6d0c9cf86f8 100644 --- a/test/TesterInternal/General/GrainPlacementTests.cs +++ b/test/TesterInternal/General/GrainPlacementTests.cs @@ -1,63 +1,29 @@ -using System.Net; -using Microsoft.Extensions.Logging; using Orleans.Runtime; -using Orleans.TestingHost; using TestExtensions; using UnitTests.GrainInterfaces; using Xunit; -using Xunit.Abstractions; namespace UnitTests.General { - public class GrainPlacementTests : TestClusterPerTest + public class GrainPlacementTests(DefaultClusterFixture fixture) : IClassFixture { - private readonly ITestOutputHelper output; - - public GrainPlacementTests(ITestOutputHelper output) - { - this.output = output; - output.WriteLine("GrainPlacementTests - constructor"); - } - - protected override void ConfigureTestCluster(TestClusterBuilder builder) - { - builder.AddSiloBuilderConfigurator(); - } - - private class SiloConfigurator : ISiloConfigurator - { - public void Configure(ISiloBuilder hostBuilder) - { - hostBuilder.AddMemoryGrainStorage("MemoryStore") - .AddMemoryGrainStorageAsDefault(); - } - } - + private readonly DefaultClusterFixture _fixture = fixture; [Fact, TestCategory("Placement"), TestCategory("Functional")] public async Task DefaultPlacementShouldBeRandom() { - logger.LogInformation("********************** Starting the test DefaultPlacementShouldBeRandom ******************************"); - TestSilosStarted(2); - - var actual = await GrainFactory.GetGrain(GetRandomGrainId()).GetDefaultPlacement(); + var actual = await _fixture.GrainFactory.GetGrain(Random.Shared.Next()).GetDefaultPlacement(); Assert.IsType(actual); } [Fact, TestCategory("Placement"), TestCategory("Functional")] public async Task RandomlyPlacedGrainShouldPlaceActivationsRandomly() { - await this.HostedCluster.WaitForLivenessToStabilizeAsync(); - logger.LogInformation("********************** Starting the test RandomlyPlacedGrainShouldPlaceActivationsRandomly ******************************"); - TestSilosStarted(2); - - logger.LogInformation("********************** TestSilosStarted passed OK. ******************************"); - var grains = Enumerable.Range(0, 20). Select( n => - this.GrainFactory.GetGrain(Guid.NewGuid())); + _fixture.GrainFactory.GetGrain(Guid.NewGuid())); List places = new(); foreach (var grain in grains) { @@ -96,16 +62,12 @@ public async Task RandomlyPlacedGrainShouldPlaceActivationsRandomly() [Fact, TestCategory("Placement"), TestCategory("Functional")] public async Task PreferLocalPlacedGrainShouldPlaceActivationsLocally_TwoHops() { - await this.HostedCluster.WaitForLivenessToStabilizeAsync(); - logger.LogInformation("********************** Starting the test PreferLocalPlacedGrainShouldPlaceActivationsLocally ******************************"); - TestSilosStarted(2); - int numGrains = 20; var randomGrains = Enumerable.Range(0, numGrains). Select( n => - this.GrainFactory.GetGrain(Guid.NewGuid())).ToList(); + _fixture.GrainFactory.GetGrain(Guid.NewGuid())).ToList(); var randomGrainPlaces = new List(); foreach (var grain in randomGrains) { @@ -121,7 +83,7 @@ public async Task PreferLocalPlacedGrainShouldPlaceActivationsLocally_TwoHops() var preferLocalGrainPlaces = new List(); foreach (var key in preferLocalGrainKeys) { - preferLocalGrainPlaces.Add(await this.GrainFactory.GetGrain(key).GetRuntimeInstanceId()); + preferLocalGrainPlaces.Add(await _fixture.GrainFactory.GetGrain(key).GetRuntimeInstanceId()); } // check that every "prefer local grain" was placed on the same silo with its requesting random grain @@ -133,53 +95,51 @@ public async Task PreferLocalPlacedGrainShouldPlaceActivationsLocally_TwoHops() } } - private IEnumerable SampleEndpoint(IPlacementTestGrain grain, int sampleSize) + private async Task> CollectActivationIds(IPlacementTestGrain grain, int sampleSize) { + var tasks = new List>(sampleSize); for (var i = 0; i < sampleSize; ++i) - yield return grain.GetEndpoint().Result; - } + { + tasks.Add(grain.GetActivationId()); + } - private IEnumerable CollectActivationIds(IPlacementTestGrain grain, int sampleSize) - { - for (var i = 0; i < sampleSize; ++i) - yield return grain.GetActivationId().Result; + await Task.WhenAll(tasks); + return tasks.Select(t => t.Result).ToList(); } - private int ActivationCount(IEnumerable ids) + private async Task ActivationCount(IPlacementTestGrain grain, int sampleSize) { - return ids.GroupBy(id => id).Count(); + var activations = await CollectActivationIds(grain, sampleSize); + return activations.Distinct().Count(); } - private int ActivationCount(IPlacementTestGrain grain, int sampleSize) + [Fact, TestCategory("Placement"), TestCategory("BVT")] + public async Task StatelessWorkerShouldCreateSpecifiedActivationCount() { - return ActivationCount(CollectActivationIds(grain, sampleSize)); - } + { + // note: this amount should agree with both the specified minimum and maximum in the StatelessWorkerPlacement attribute + // associated with ILocalPlacementTestGrain. + const int expected = 1; + var grain = _fixture.GrainFactory.GetGrain(Guid.NewGuid()); + int actual = await ActivationCount(grain, expected * 50); + Assert.True(actual <= expected, $"Created more activations than the specified limit: {actual} > {expected}."); + } - //[Fact, TestCategory("Placement"), TestCategory("Functional")] - /*public async Task LocallyPlacedGrainShouldCreateSpecifiedNumberOfMultipleActivations() - { - await this.HostedCluster.WaitForLivenessToStabilizeAsync(); - logger.Info("********************** Starting the test LocallyPlacedGrainShouldCreateSpecifiedNumberOfMultipleActivations ******************************"); - TestSilosStarted(2); + { + const int expected = 2; + var grain = _fixture.GrainFactory.GetGrain(Guid.NewGuid()); + int actual = await ActivationCount(grain, expected * 50); + Assert.True(actual <= expected, $"Created more activations than the specified limit: {actual} > {expected}."); - // note: this amount should agree with both the specified minimum and maximum in the StatelessWorkerPlacement attribute - // associated with ILocalPlacementTestGrain. - const int expected = 10; - var grain = this.GrainFactory.GetGrain(Guid.Empty); - int actual = ActivationCount(grain, expected * 5); - Assert.Equal(expected, actual); //"A grain instantiated with the local placement strategy should create multiple activations acording to the parameterization of the strategy." - }*/ + } + } [Fact, TestCategory("Placement"), TestCategory("Functional")] - public async Task LocallyPlacedGrainShouldCreateActivationsOnLocalSilo() + public async Task StatelessWorkerGrainShouldCreateActivationsOnLocalSilo() { - await this.HostedCluster.WaitForLivenessToStabilizeAsync(); - logger.LogInformation("********************** Starting the test LocallyPlacedGrainShouldCreateActivationsOnLocalSilo ******************************"); - TestSilosStarted(2); - const int sampleSize = 5; var placement = new StatelessWorkerPlacement(sampleSize); - var proxy = this.GrainFactory.GetGrain(Guid.NewGuid()); + var proxy = _fixture.GrainFactory.GetGrain(Guid.NewGuid()); await proxy.StartLocalGrains(new List { Guid.Empty }); var expected = await proxy.GetEndpoint(); // locally placed grains are multi-activation and stateless. this means that we have to sample the value of @@ -188,126 +148,5 @@ public async Task LocallyPlacedGrainShouldCreateActivationsOnLocalSilo() Assert.True(actual.All(expected.Equals), "A grain instantiated with the local placement strategy should create activations on the local silo."); } - - [Theory(Skip = "Repo test case for gateway silo connection issue #1859")] - [InlineData("Primary")] - [InlineData("Secondary")] - [TestCategory("BVT"), TestCategory("Placement")] - public async Task PreferLocalPlacementGrain_ShouldMigrateWhenHostSiloKilled(string value) - { - await HostedCluster.WaitForLivenessToStabilizeAsync(); - output.WriteLine("******************** Starting test ({0}) ********************", value); - TestSilosStarted(2); - - foreach (SiloHandle silo in HostedCluster.GetActiveSilos()) - { - output.WriteLine( - "Silo {0} : Address = {1} Proxy gateway: {2}", - silo.Name, silo.SiloAddress, silo.GatewayAddress); - } - - IPEndPoint targetSilo; - if (value == "Primary") - { - targetSilo = HostedCluster.Primary.SiloAddress.Endpoint; - } - else - { - targetSilo = HostedCluster.SecondarySilos.First().SiloAddress.Endpoint; - } - Guid proxyKey; - IRandomPlacementTestGrain proxy; - IPEndPoint expected; - do - { - proxyKey = Guid.NewGuid(); - proxy = GrainFactory.GetGrain(proxyKey); - expected = await proxy.GetEndpoint(); - } while (!targetSilo.Equals(expected)); - output.WriteLine("Proxy grain was originally located on silo {0}", expected); - - Guid grainKey = proxyKey; - await proxy.StartPreferLocalGrain(grainKey); - IPreferLocalPlacementTestGrain grain = GrainFactory.GetGrain(grainKey); - IPEndPoint actual = await grain.GetEndpoint(); - output.WriteLine("PreferLocalPlacement grain was originally located on silo {0}", actual); - Assert.Equal(expected, actual); // "PreferLocalPlacement strategy should create activations on the local silo." - - SiloHandle siloToKill = HostedCluster.GetActiveSilos().First(s => s.SiloAddress.Endpoint.Equals(expected)); - output.WriteLine("Killing silo {0} hosting locally placed grain", siloToKill); - await HostedCluster.StopSiloAsync(siloToKill); - - IPEndPoint newActual = await grain.GetEndpoint(); - output.WriteLine("PreferLocalPlacement grain was recreated on silo {0}", newActual); - Assert.NotEqual(expected, newActual); // "PreferLocalPlacement strategy should recreate activations on other silo if local fails." - } - - [Theory(Skip = "Repo test case for gateway silo connection issue #1859")] - [InlineData("Primary")] - [InlineData("Secondary")] - [TestCategory("BVT"), TestCategory("Placement")] - public async Task PreferLocalPlacementGrain_ShouldNotMigrateWhenOtherSiloKilled(string value) - { - await HostedCluster.WaitForLivenessToStabilizeAsync(); - output.WriteLine("******************** Starting test ({0}) ********************", value); - TestSilosStarted(2); - - foreach (SiloHandle silo in HostedCluster.GetActiveSilos()) - { - output.WriteLine( - "Silo {0} : Address = {1} Proxy gateway: {2}", - silo.Name, silo.SiloAddress, silo.GatewayAddress); - } - - IPEndPoint targetSilo; - if (value == "Primary") - { - targetSilo = HostedCluster.Primary.SiloAddress.Endpoint; - } - else - { - targetSilo = HostedCluster.SecondarySilos.First().SiloAddress.Endpoint; - } - Guid proxyKey; - IRandomPlacementTestGrain proxy; - IPEndPoint expected; - do - { - proxyKey = Guid.NewGuid(); - proxy = GrainFactory.GetGrain(proxyKey); - expected = await proxy.GetEndpoint(); - } while (!targetSilo.Equals(expected)); - output.WriteLine("Proxy grain was originally located on silo {0}", expected); - - Guid grainKey = proxyKey; - await proxy.StartPreferLocalGrain(grainKey); - IPreferLocalPlacementTestGrain grain = GrainFactory.GetGrain(grainKey); - IPEndPoint actual = await grain.GetEndpoint(); - output.WriteLine("PreferLocalPlacement grain was originally located on silo {0}", actual); - Assert.Equal(expected, actual); // "PreferLocalPlacement strategy should create activations on the local silo." - - SiloHandle siloToKill = HostedCluster.GetActiveSilos().First(s => !s.SiloAddress.Endpoint.Equals(expected)); - output.WriteLine("Killing other silo {0} not hosting locally placed grain", siloToKill); - await HostedCluster.StopSiloAsync(siloToKill); - - IPEndPoint newActual = await grain.GetEndpoint(); - output.WriteLine("PreferLocalPlacement grain is now located on silo {0}", newActual); - Assert.Equal(expected, newActual); // "PreferLocalPlacement strategy should not move activations when other non-hosting silo fails." - } - - private void TestSilosStarted(int expected) - { - IManagementGrain mgmtGrain = GrainFactory.GetGrain(0); - - Dictionary statuses = mgmtGrain.GetHosts(onlyActive: true).Result; - foreach (var pair in statuses) - { - logger.LogInformation(" ######## Silo {SiloAddress}, status: {Status}", pair.Key, pair.Value); - Assert.Equal( - SiloStatus.Active, - pair.Value); - } - Assert.Equal(expected, statuses.Count); - } } }