diff --git a/src/Orleans.Core/Core/ClientBuilderExtensions.cs b/src/Orleans.Core/Core/ClientBuilderExtensions.cs index 3a7b35033d..82d87a1237 100644 --- a/src/Orleans.Core/Core/ClientBuilderExtensions.cs +++ b/src/Orleans.Core/Core/ClientBuilderExtensions.cs @@ -104,6 +104,35 @@ public static IClientBuilder AddGatewayCountChangedHandler(this IClientBuilder b return builder; } + /// + /// Registers a event handler. + /// + public static IClientBuilder AddGatewayCountChangedHandler(this IClientBuilder builder, Func handlerFactory) + { + builder.ConfigureServices(services => services.AddSingleton(handlerFactory)); + return builder; + } + + /// + /// Registers a cluster connection status observer. + /// + public static IClientBuilder AddClusterConnectionStatusObserver(this IClientBuilder builder, TObserver observer) + where TObserver : IClusterConnectionStatusObserver + { + builder.Services.AddSingleton(observer); + return builder; + } + + /// + /// Registers a cluster connection status observer. + /// + public static IClientBuilder AddClusterConnectionStatusObserver(this IClientBuilder builder) + where TObserver : class, IClusterConnectionStatusObserver + { + builder.Services.AddSingleton(); + return builder; + } + /// /// Registers a event handler. /// @@ -116,6 +145,18 @@ public static IClientBuilder AddClusterConnectionLostHandler(this IClientBuilder return builder; } + /// + /// Registers a event handler. + /// + /// The builder. + /// The handler factory. + /// The builder. + public static IClientBuilder AddClusterConnectionLostHandler(this IClientBuilder builder, Func handlerFactory) + { + builder.ConfigureServices(services => services.AddSingleton(handlerFactory)); + return builder; + } + /// /// Add propagation through grain calls. /// Note: according to activity will be created only when any listener for activity exists and returns . diff --git a/src/Orleans.Core/Core/DefaultClientServices.cs b/src/Orleans.Core/Core/DefaultClientServices.cs index 51315459af..6dd179cf51 100644 --- a/src/Orleans.Core/Core/DefaultClientServices.cs +++ b/src/Orleans.Core/Core/DefaultClientServices.cs @@ -75,6 +75,7 @@ public static void AddDefaultServices(IClientBuilder builder) services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); + services.AddSingleton(); services.AddFromExisting(); services.TryAddFromExisting(); services.TryAddFromExisting(); diff --git a/src/Orleans.Core/Core/IClusterConnectionStatusObserver.cs b/src/Orleans.Core/Core/IClusterConnectionStatusObserver.cs new file mode 100644 index 0000000000..b4ce7cefee --- /dev/null +++ b/src/Orleans.Core/Core/IClusterConnectionStatusObserver.cs @@ -0,0 +1,24 @@ +namespace Orleans; + +/// +/// Interface that receives notifications about the status of the cluster connection. +/// +public interface IClusterConnectionStatusObserver +{ + /// + /// Notifies this observer that the number of connected gateways has changed. + /// + /// + /// The current number of gateways. + /// + /// + /// The previous number of gateways. + /// + /// Indicates whether a loss of connectivity has been resolved. + void NotifyGatewayCountChanged(int currentNumberOfGateways, int previousNumberOfGateways, bool connectionRecovered); + + /// + /// Notifies this observer that the connection to the cluster has been lost. + /// + void NotifyClusterConnectionLost(); +} \ No newline at end of file diff --git a/src/Orleans.Core/Runtime/ClusterConnectionStatusObserverAdaptor.cs b/src/Orleans.Core/Runtime/ClusterConnectionStatusObserverAdaptor.cs new file mode 100644 index 0000000000..fd8e0932d2 --- /dev/null +++ b/src/Orleans.Core/Runtime/ClusterConnectionStatusObserverAdaptor.cs @@ -0,0 +1,46 @@ +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using Microsoft.Extensions.Logging; + +namespace Orleans.Runtime; + +internal sealed class ClusterConnectionStatusObserverAdaptor( + IEnumerable gatewayCountChangedHandlers, + IEnumerable connectionLostHandlers, + ILogger logger) : IClusterConnectionStatusObserver +{ + private readonly ImmutableArray _gatewayCountChangedHandlers = gatewayCountChangedHandlers.ToImmutableArray(); + private readonly ImmutableArray _connectionLostHandler = connectionLostHandlers.ToImmutableArray(); + + public void NotifyClusterConnectionLost() + { + foreach (var handler in _connectionLostHandler) + { + try + { + handler(null, EventArgs.Empty); + } + catch (Exception ex) + { + logger.LogError((int)ErrorCode.ClientError, ex, "Error sending cluster connection lost notification."); + } + } + } + + public void NotifyGatewayCountChanged(int currentNumberOfGateways, int previousNumberOfGateways, bool connectionRecovered) + { + var args = new GatewayCountChangedEventArgs(currentNumberOfGateways, previousNumberOfGateways); + foreach (var handler in _gatewayCountChangedHandlers) + { + try + { + handler(null, args); + } + catch (Exception ex) + { + logger.LogError((int)ErrorCode.ClientError, ex, "Error sending gateway count changed notification."); + } + } + } +} \ No newline at end of file diff --git a/src/Orleans.Core/Runtime/OutsideRuntimeClient.cs b/src/Orleans.Core/Runtime/OutsideRuntimeClient.cs index c44ac4316d..d9dff1864b 100644 --- a/src/Orleans.Core/Runtime/OutsideRuntimeClient.cs +++ b/src/Orleans.Core/Runtime/OutsideRuntimeClient.cs @@ -1,4 +1,5 @@ using System; +using System.Collections; using System.Collections.Concurrent; using System.Linq; using System.Threading; @@ -13,11 +14,11 @@ using Orleans.Runtime; using Orleans.Serialization; using Orleans.Serialization.Invocation; -using Orleans.Serialization.Serializers; using static Orleans.Internal.StandardExtensions; namespace Orleans { + internal class OutsideRuntimeClient : IRuntimeClient, IDisposable, IClusterConnectionStatusListener { internal static bool TestOnlyThrowExceptionDuringInit { get; set; } @@ -32,6 +33,7 @@ internal class OutsideRuntimeClient : IRuntimeClient, IDisposable, IClusterConne private readonly MessagingTrace messagingTrace; private readonly InterfaceToImplementationMappingCache _interfaceToImplementationMapping; + private IClusterConnectionStatusObserver[] _statusObservers; public IInternalGrainFactory InternalGrainFactory { get; private set; } @@ -95,17 +97,7 @@ internal void ConsumeServices() { try { - var connectionLostHandlers = this.ServiceProvider.GetServices(); - foreach (var handler in connectionLostHandlers) - { - this.ClusterConnectionLost += handler; - } - - var gatewayCountChangedHandlers = this.ServiceProvider.GetServices(); - foreach (var handler in gatewayCountChangedHandlers) - { - this.GatewayCountChanged += handler; - } + _statusObservers = this.ServiceProvider.GetServices().ToArray(); this.InternalGrainFactory = this.ServiceProvider.GetRequiredService(); this.messageFactory = this.ServiceProvider.GetService(); @@ -272,7 +264,7 @@ public void SendRequest(GrainReference target, IInvokable request, IResponseComp { // don't set expiration for system target messages. var ttl = request.GetDefaultResponseTimeout() ?? this.clientMessagingOptions.ResponseTimeout; - message.TimeToLive = ttl; + message.TimeToLive = ttl; } if (!oneWay) @@ -401,9 +393,6 @@ public void Dispose() Utils.SafeExecute(() => MessageCenter?.Dispose()); - this.ClusterConnectionLost = null; - this.GatewayCountChanged = null; - GC.SuppressFinalize(this); disposed = true; } @@ -422,35 +411,38 @@ public void BreakOutstandingMessagesToDeadSilo(SiloAddress deadSilo) public int GetRunningRequestsCount(GrainInterfaceType grainInterfaceType) => this.callbacks.Count(c => c.Value.Message.InterfaceType == grainInterfaceType); - /// - public event ConnectionToClusterLostHandler ClusterConnectionLost; - - /// - public event GatewayCountChangedHandler GatewayCountChanged; - /// public void NotifyClusterConnectionLost() { - try + foreach (var observer in _statusObservers) { - this.ClusterConnectionLost?.Invoke(this, EventArgs.Empty); - } - catch (Exception ex) - { - this.logger.LogError((int)ErrorCode.ClientError, ex, "Error when sending cluster disconnection notification"); + try + { + observer.NotifyClusterConnectionLost(); + } + catch (Exception ex) + { + this.logger.LogError((int)ErrorCode.ClientError, ex, "Error sending cluster disconnection notification."); + } } } /// public void NotifyGatewayCountChanged(int currentNumberOfGateways, int previousNumberOfGateways) { - try - { - this.GatewayCountChanged?.Invoke(this, new GatewayCountChangedEventArgs(currentNumberOfGateways, previousNumberOfGateways)); - } - catch (Exception ex) + foreach (var observer in _statusObservers) { - this.logger.LogError((int)ErrorCode.ClientError, ex, "Error when sending gateway count changed notification"); + try + { + observer.NotifyGatewayCountChanged( + currentNumberOfGateways, + previousNumberOfGateways, + currentNumberOfGateways > 0 && previousNumberOfGateways <= 0); + } + catch (Exception ex) + { + this.logger.LogError((int)ErrorCode.ClientError, ex, "Error sending gateway count changed notification."); + } } } diff --git a/src/Orleans.TestingHost/InMemoryTransport/InMemoryTransportConnection.cs b/src/Orleans.TestingHost/InMemoryTransport/InMemoryTransportConnection.cs index 01ec045fbf..bf45faeb0d 100644 --- a/src/Orleans.TestingHost/InMemoryTransport/InMemoryTransportConnection.cs +++ b/src/Orleans.TestingHost/InMemoryTransport/InMemoryTransportConnection.cs @@ -87,4 +87,6 @@ public override async ValueTask DisposeAsync() _connectionClosedTokenSource.Dispose(); } + + public override string ToString() => $"InMem({LocalEndPoint}<->{RemoteEndPoint})"; } diff --git a/src/Orleans.TestingHost/InProcTestCluster.cs b/src/Orleans.TestingHost/InProcTestCluster.cs new file mode 100644 index 0000000000..5256724292 --- /dev/null +++ b/src/Orleans.TestingHost/InProcTestCluster.cs @@ -0,0 +1,754 @@ +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Orleans.Runtime; +using Orleans.TestingHost.Utils; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Configuration.Memory; +using Orleans.Configuration; +using Microsoft.Extensions.Options; +using Microsoft.Extensions.Hosting; +using Orleans.TestingHost.InMemoryTransport; +using System.Net; +using Orleans.Statistics; +using Orleans.TestingHost.InProcess; +using Orleans.Runtime.Hosting; +using Orleans.GrainDirectory; +using Orleans.Messaging; +using Orleans.Hosting; +using Orleans.Runtime.TestHooks; +using Orleans.Configuration.Internal; +using Orleans.TestingHost.Logging; + +namespace Orleans.TestingHost; + +/// +/// A host class for local testing with Orleans using in-process silos. +/// +public sealed class InProcessTestCluster : IDisposable, IAsyncDisposable +{ + private readonly List _silos = []; + private readonly StringBuilder _log = new(); + private readonly InMemoryTransportConnectionHub _transportHub = new(); + private readonly InProcessGrainDirectory _grainDirectory; + private readonly InProcessMembershipTable _membershipTable; + private bool _disposed; + private int _startedInstances; + + /// + /// Collection of all known silos. + /// + public ReadOnlyCollection Silos + { + get + { + lock (_silos) + { + return new List(_silos).AsReadOnly(); + } + } + } + + /// + /// Options used to configure the test cluster. + /// + /// This is the options you configured your test cluster with, or the default one. + /// If the cluster is being configured via ClusterConfiguration, then this object may not reflect the true settings. + /// + public InProcessTestClusterOptions Options { get; } + + /// + /// The internal client interface. + /// + internal IHost ClientHost { get; private set; } + + /// + /// The internal client interface. + /// + internal IInternalClusterClient InternalClient => ClientHost?.Services.GetRequiredService(); + + /// + /// The client. + /// + public IClusterClient Client => ClientHost?.Services.GetRequiredService(); + + /// + /// The port allocator. + /// + public ITestClusterPortAllocator PortAllocator { get; } + + /// + /// Configures the test cluster plus client in-process. + /// + public InProcessTestCluster( + InProcessTestClusterOptions options, + ITestClusterPortAllocator portAllocator) + { + Options = options; + PortAllocator = portAllocator; + _membershipTable = new(options.ClusterId); + _grainDirectory = new(_membershipTable.GetSiloStatus); + } + + /// + /// Returns the associated with the given . + /// + /// The silo process to the the service provider for. + /// If is one of the existing silos will be picked randomly. + public IServiceProvider GetSiloServiceProvider(SiloAddress silo = null) + { + if (silo != null) + { + var handle = Silos.FirstOrDefault(x => x.SiloAddress.Equals(silo)); + return handle != null ? handle.SiloHost.Services : + throw new ArgumentException($"The provided silo address '{silo}' is unknown."); + } + else + { + var index = Random.Shared.Next(Silos.Count); + return Silos[index].SiloHost.Services; + } + } + + /// + /// Deploys the cluster using the specified configuration and starts the client in-process. + /// + public async Task DeployAsync() + { + if (_silos.Count > 0) throw new InvalidOperationException("Cluster host already deployed."); + + AppDomain.CurrentDomain.UnhandledException += ReportUnobservedException; + + try + { + string startMsg = "----------------------------- STARTING NEW UNIT TEST SILO HOST: " + GetType().FullName + " -------------------------------------"; + WriteLog(startMsg); + await InitializeAsync(); + + if (Options.InitializeClientOnDeploy) + { + await WaitForInitialStabilization(); + } + } + catch (TimeoutException te) + { + FlushLogToConsole(); + throw new TimeoutException("Timeout during test initialization", te); + } + catch (Exception ex) + { + await StopAllSilosAsync(); + + Exception baseExc = ex.GetBaseException(); + FlushLogToConsole(); + + if (baseExc is TimeoutException) + { + throw new TimeoutException("Timeout during test initialization", ex); + } + + // IMPORTANT: + // Do NOT re-throw the original exception here, also not as an internal exception inside AggregateException + // Due to the way MS tests works, if the original exception is an Orleans exception, + // it's assembly might not be loaded yet in this phase of the test. + // As a result, we will get "MSTest: Unit Test Adapter threw exception: Type is not resolved for member XXX" + // and will loose the original exception. This makes debugging tests super hard! + // The root cause has to do with us initializing our tests from Test constructor and not from TestInitialize method. + // More details: http://dobrzanski.net/2010/09/20/mstest-unit-test-adapter-threw-exception-type-is-not-resolved-for-member/ + //throw new Exception( + // string.Format("Exception during test initialization: {0}", + // LogFormatter.PrintException(baseExc))); + throw; + } + } + + private async Task WaitForInitialStabilization() + { + // Poll each silo to check that it knows the expected number of active silos. + // If any silo does not have the expected number of active silos in its cluster membership oracle, try again. + // If the cluster membership has not stabilized after a certain period of time, give up and continue anyway. + var totalWait = Stopwatch.StartNew(); + while (true) + { + var silos = Silos; + var expectedCount = silos.Count; + var remainingSilos = expectedCount; + + foreach (var silo in silos) + { + var hooks = InternalClient.GetTestHooks(silo); + var statuses = await hooks.GetApproximateSiloStatuses(); + var activeCount = statuses.Count(s => s.Value == SiloStatus.Active); + if (activeCount != expectedCount) break; + remainingSilos--; + } + + if (remainingSilos == 0) + { + totalWait.Stop(); + break; + } + + WriteLog($"{remainingSilos} silos do not have a consistent cluster view, waiting until stabilization."); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + if (totalWait.Elapsed < TimeSpan.FromSeconds(60)) + { + WriteLog($"Warning! {remainingSilos} silos do not have a consistent cluster view after {totalWait.ElapsedMilliseconds}ms, continuing without stabilization."); + break; + } + } + } + + /// + /// Get the list of current active silos. + /// + /// List of current silos. + public IEnumerable GetActiveSilos() + { + var additional = new List(); + lock (_silos) + { + additional.AddRange(_silos); + } + + WriteLog("GetActiveSilos: {0} Silos={1}", + additional.Count, Runtime.Utils.EnumerableToString(additional)); + + if (additional.Count > 0) + foreach (var s in additional) + if (s?.IsActive == true) + yield return s; + } + + /// + /// Find the silo handle for the specified silo address. + /// + /// Silo address to be found. + /// SiloHandle of the appropriate silo, or null if not found. + public InProcessSiloHandle GetSiloForAddress(SiloAddress siloAddress) + { + var activeSilos = GetActiveSilos().ToList(); + var ret = activeSilos.Find(s => s.SiloAddress.Equals(siloAddress)); + return ret; + } + + /// + /// Wait for the silo liveness sub-system to detect and act on any recent cluster membership changes. + /// + /// Whether recent membership changes we done by graceful Stop. + public async Task WaitForLivenessToStabilizeAsync(bool didKill = false) + { + var clusterMembershipOptions = Client.ServiceProvider.GetRequiredService>().Value; + TimeSpan stabilizationTime = GetLivenessStabilizationTime(clusterMembershipOptions, didKill); + WriteLog(Environment.NewLine + Environment.NewLine + "WaitForLivenessToStabilize is about to sleep for {0}", stabilizationTime); + await Task.Delay(stabilizationTime); + WriteLog("WaitForLivenessToStabilize is done sleeping"); + } + + /// + /// Get the timeout value to use to wait for the silo liveness sub-system to detect and act on any recent cluster membership changes. + /// + /// + public static TimeSpan GetLivenessStabilizationTime(ClusterMembershipOptions clusterMembershipOptions, bool didKill = false) + { + TimeSpan stabilizationTime = TimeSpan.Zero; + if (didKill) + { + // in case of hard kill (kill and not Stop), we should give silos time to detect failures first. + stabilizationTime = TestingUtils.Multiply(clusterMembershipOptions.ProbeTimeout, clusterMembershipOptions.NumMissedProbesLimit); + } + if (clusterMembershipOptions.UseLivenessGossip) + { + stabilizationTime += TimeSpan.FromSeconds(5); + } + else + { + stabilizationTime += TestingUtils.Multiply(clusterMembershipOptions.TableRefreshTimeout, 2); + } + return stabilizationTime; + } + + /// + /// Start an additional silo, so that it joins the existing cluster. + /// + /// SiloHandle for the newly started silo. + public InProcessSiloHandle StartAdditionalSilo(bool startAdditionalSiloOnNewPort = false) + { + return StartAdditionalSiloAsync(startAdditionalSiloOnNewPort).GetAwaiter().GetResult(); + } + + /// + /// Start an additional silo, so that it joins the existing cluster. + /// + /// SiloHandle for the newly started silo. + public async Task StartAdditionalSiloAsync(bool startAdditionalSiloOnNewPort = false) + { + return (await StartSilosAsync(1, startAdditionalSiloOnNewPort)).Single(); + } + + /// + /// Start a number of additional silo, so that they join the existing cluster. + /// + /// Number of silos to start. + /// + /// List of SiloHandles for the newly started silos. + public async Task> StartSilosAsync(int silosToStart, bool startAdditionalSiloOnNewPort = false) + { + var instances = new List(); + if (silosToStart > 0) + { + var siloStartTasks = Enumerable.Range(_startedInstances, silosToStart) + .Select(instanceNumber => Task.Run(() => StartSiloAsync((short)instanceNumber, Options, startSiloOnNewPort: startAdditionalSiloOnNewPort))).ToArray(); + + try + { + await Task.WhenAll(siloStartTasks); + } + catch (Exception) + { + lock (_silos) + { + _silos.AddRange(siloStartTasks.Where(t => t.Exception == null).Select(t => t.Result)); + } + + throw; + } + + instances.AddRange(siloStartTasks.Select(t => t.Result)); + lock (_silos) + { + _silos.AddRange(instances); + } + } + + return instances; + } + + /// + /// Stop all silos. + /// + public async Task StopSilosAsync() + { + foreach (var instance in _silos.ToList()) + { + await StopSiloAsync(instance); + } + } + + /// + /// Stop cluster client as an asynchronous operation. + /// + /// A representing the asynchronous operation. + public async Task StopClusterClientAsync() + { + var client = ClientHost; + try + { + if (client is not null) + { + await client.StopAsync().ConfigureAwait(false); + } + } + catch (Exception exc) + { + WriteLog("Exception stopping client: {0}", exc); + } + finally + { + await DisposeAsync(client).ConfigureAwait(false); + ClientHost = null; + } + } + + /// + /// Stop all current silos. + /// + public void StopAllSilos() + { + StopAllSilosAsync().GetAwaiter().GetResult(); + } + + /// + /// Stop all current silos. + /// + public async Task StopAllSilosAsync() + { + await StopClusterClientAsync(); + await StopSilosAsync(); + AppDomain.CurrentDomain.UnhandledException -= ReportUnobservedException; + } + + /// + /// Do a semi-graceful Stop of the specified silo. + /// + /// Silo to be stopped. + public async Task StopSiloAsync(InProcessSiloHandle instance) + { + if (instance != null) + { + await StopSiloAsync(instance, true); + lock (_silos) + { + _silos.Remove(instance); + } + } + } + + /// + /// Do an immediate Kill of the specified silo. + /// + /// Silo to be killed. + public async Task KillSiloAsync(InProcessSiloHandle instance) + { + if (instance != null) + { + // do NOT stop, just kill directly, to simulate crash. + await StopSiloAsync(instance, false); + lock (_silos) + { + _silos.Remove(instance); + } + } + } + + /// + /// Performs a hard kill on client. Client will not cleanup resources. + /// + public async Task KillClientAsync() + { + var client = ClientHost; + if (client != null) + { + var cancelled = new CancellationTokenSource(); + cancelled.Cancel(); + try + { + await client.StopAsync(cancelled.Token).ConfigureAwait(false); + } + finally + { + await DisposeAsync(client); + ClientHost = null; + } + } + } + + /// + /// Do a Stop or Kill of the specified silo, followed by a restart. + /// + /// Silo to be restarted. + public async Task RestartSiloAsync(InProcessSiloHandle instance) + { + if (instance != null) + { + var instanceNumber = instance.InstanceNumber; + await StopSiloAsync(instance); + var newInstance = await StartSiloAsync(instanceNumber, Options); + lock (_silos) + { + _silos.Add(newInstance); + } + + return newInstance; + } + + return null; + } + + /// + /// Restart a previously stopped. + /// + /// Silo to be restarted. + public async Task RestartStoppedSecondarySiloAsync(string siloName) + { + if (siloName == null) throw new ArgumentNullException(nameof(siloName)); + var siloHandle = Silos.Single(s => s.Name.Equals(siloName, StringComparison.Ordinal)); + var newInstance = await StartSiloAsync(Silos.IndexOf(siloHandle), Options); + lock (_silos) + { + _silos.Add(newInstance); + } + return newInstance; + } + + /// + /// Initialize the grain client. This should be already done by + /// + public async Task InitializeClientAsync() + { + WriteLog("Initializing Cluster Client"); + + if (ClientHost is not null) + { + await StopClusterClientAsync(); + } + + var hostBuilder = Host.CreateApplicationBuilder(new HostApplicationBuilderSettings + { + EnvironmentName = Environments.Development, + ApplicationName = "TestClusterClient", + DisableDefaults = true, + }); + + hostBuilder.UseOrleansClient(clientBuilder => + { + if (Options.UseTestClusterMembership) + { + clientBuilder.Services.AddSingleton(_membershipTable); + } + + clientBuilder.UseInMemoryConnectionTransport(_transportHub); + }); + + TryConfigureFileLogging(Options, hostBuilder.Services, "TestClusterClient"); + + foreach (var hostDelegate in Options.ClientHostConfigurationDelegates) + { + hostDelegate(hostBuilder); + } + + ClientHost = hostBuilder.Build(); + await ClientHost.StartAsync(); + } + + private async Task InitializeAsync() + { + var silosToStart = Options.InitialSilosCount; + + if (silosToStart > 0) + { + await StartSilosAsync(silosToStart); + } + + WriteLog("Done initializing cluster"); + + if (Options.InitializeClientOnDeploy) + { + await InitializeClientAsync(); + } + } + + public async Task CreateSiloAsync(InProcessTestSiloSpecificOptions siloOptions) + { + var host = await Task.Run(async () => + { + var siloName = siloOptions.SiloName; + + var appBuilder = Host.CreateApplicationBuilder(new HostApplicationBuilderSettings + { + ApplicationName = siloName, + EnvironmentName = Environments.Development, + DisableDefaults = true + }); + + var services = appBuilder.Services; + TryConfigureFileLogging(Options, services, siloName); + + if (Debugger.IsAttached) + { + // Test is running inside debugger - Make timeout ~= infinite + services.Configure(op => op.ResponseTimeout = TimeSpan.FromMilliseconds(1000000)); + } + + appBuilder.UseOrleans(siloBuilder => + { + siloBuilder.Configure(o => + { + o.SiloName = siloOptions.SiloName; + }); + + siloBuilder.Configure(o => + { + o.AdvertisedIPAddress = IPAddress.Loopback; + o.SiloPort = siloOptions.SiloPort; + o.GatewayPort = siloOptions.GatewayPort; + }); + + siloBuilder.Services + .Configure(options => options.ShutdownTimeout = TimeSpan.FromSeconds(30)); + + if (Options.UseTestClusterMembership) + { + services.AddSingleton(_membershipTable); + siloBuilder.AddGrainDirectory(GrainDirectoryAttribute.DEFAULT_GRAIN_DIRECTORY, (_, _) => _grainDirectory); + } + + siloBuilder.UseInMemoryConnectionTransport(_transportHub); + + services.AddSingleton(); + services.AddSingleton(); + if (!Options.UseRealEnvironmentStatistics) + { + services.AddFromExisting(); + } + }); + + foreach (var hostDelegate in Options.SiloHostConfigurationDelegates) + { + hostDelegate(siloOptions, appBuilder); + } + + var host = appBuilder.Build(); + InitializeTestHooksSystemTarget(host); + await host.StartAsync(); + return host; + }); + + return new InProcessSiloHandle + { + Name = siloOptions.SiloName, + SiloHost = host, + SiloAddress = host.Services.GetRequiredService().SiloAddress, + GatewayAddress = host.Services.GetRequiredService().GatewayAddress, + }; + } + + /// + /// Start a new silo in the target cluster + /// + /// The InProcessTestCluster in which the silo should be deployed + /// The instance number to deploy + /// The options to use. + /// Configuration overrides. + /// Whether we start this silo on a new port, instead of the default one + /// A handle to the silo deployed + public static async Task StartSiloAsync(InProcessTestCluster cluster, int instanceNumber, InProcessTestClusterOptions clusterOptions, IReadOnlyList configurationOverrides = null, bool startSiloOnNewPort = false) + { + if (cluster == null) throw new ArgumentNullException(nameof(cluster)); + return await cluster.StartSiloAsync(instanceNumber, clusterOptions, configurationOverrides, startSiloOnNewPort); + } + + /// + /// Starts a new silo. + /// + /// The instance number to deploy + /// The options to use. + /// Configuration overrides. + /// Whether we start this silo on a new port, instead of the default one + /// A handle to the deployed silo. + public async Task StartSiloAsync(int instanceNumber, InProcessTestClusterOptions clusterOptions, IReadOnlyList configurationOverrides = null, bool startSiloOnNewPort = false) + { + var siloOptions = InProcessTestSiloSpecificOptions.Create(this, clusterOptions, instanceNumber, startSiloOnNewPort); + var handle = await CreateSiloAsync(siloOptions); + handle.InstanceNumber = (short)instanceNumber; + Interlocked.Increment(ref _startedInstances); + return handle; + } + + private async Task StopSiloAsync(InProcessSiloHandle instance, bool stopGracefully) + { + try + { + await instance.StopSiloAsync(stopGracefully).ConfigureAwait(false); + } + finally + { + await DisposeAsync(instance).ConfigureAwait(false); + + Interlocked.Decrement(ref _startedInstances); + } + } + + /// + /// Gets the log. + /// + /// The log contents. + public string GetLog() + { + return _log.ToString(); + } + + private void ReportUnobservedException(object sender, UnhandledExceptionEventArgs eventArgs) + { + Exception exception = (Exception)eventArgs.ExceptionObject; + WriteLog("Unobserved exception: {0}", exception); + } + + private void WriteLog(string format, params object[] args) + { + _log.AppendFormat(format + Environment.NewLine, args); + } + + private void FlushLogToConsole() + { + Console.WriteLine(GetLog()); + } + + /// + public async ValueTask DisposeAsync() + { + if (_disposed) + { + return; + } + + await Task.Run(async () => + { + foreach (var handle in Silos) + { + await DisposeAsync(handle).ConfigureAwait(false); + } + + await DisposeAsync(ClientHost).ConfigureAwait(false); + ClientHost = null; + + PortAllocator?.Dispose(); + }); + + _disposed = true; + } + + /// + public void Dispose() + { + if (_disposed) + { + return; + } + + foreach (var handle in Silos) + { + handle.Dispose(); + } + + ClientHost?.Dispose(); + PortAllocator?.Dispose(); + + _disposed = true; + } + + private static async Task DisposeAsync(IDisposable value) + { + if (value is IAsyncDisposable asyncDisposable) + { + await asyncDisposable.DisposeAsync().ConfigureAwait(false); + } + else if (value is IDisposable disposable) + { + disposable.Dispose(); + } + + } + private static void TryConfigureFileLogging(InProcessTestClusterOptions options, IServiceCollection services, string name) + { + if (options.ConfigureFileLogging) + { + var fileName = TestingUtils.CreateTraceFileName(name, options.ClusterId); + services.AddLogging(loggingBuilder => loggingBuilder.AddFile(fileName)); + } + } + + private static void InitializeTestHooksSystemTarget(IHost host) + { + var testHook = host.Services.GetRequiredService(); + var catalog = host.Services.GetRequiredService(); + catalog.RegisterSystemTarget(testHook); + } +} diff --git a/src/Orleans.TestingHost/InProcTestClusterBuilder.cs b/src/Orleans.TestingHost/InProcTestClusterBuilder.cs new file mode 100644 index 0000000000..7156fd2d87 --- /dev/null +++ b/src/Orleans.TestingHost/InProcTestClusterBuilder.cs @@ -0,0 +1,153 @@ +using System; +using System.Globalization; +using System.Linq; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Orleans.Hosting; +using Orleans.Runtime; + +namespace Orleans.TestingHost; + +/// Configuration builder for starting a . +public sealed class InProcessTestClusterBuilder +{ + /// + /// Initializes a new instance of using the default options. + /// + public InProcessTestClusterBuilder() + : this(2) + { + } + + /// + /// Initializes a new instance of overriding the initial silos count. + /// + /// The number of initial silos to deploy. + public InProcessTestClusterBuilder(short initialSilosCount) + { + Options = new InProcessTestClusterOptions + { + InitialSilosCount = initialSilosCount, + ClusterId = CreateClusterId(), + ServiceId = Guid.NewGuid().ToString("N"), + UseTestClusterMembership = true, + InitializeClientOnDeploy = true, + ConfigureFileLogging = true, + AssumeHomogenousSilosForTesting = true + }; + } + + /// + /// Gets the options. + /// + /// The options. + public InProcessTestClusterOptions Options { get; } + + /// + /// The port allocator. + /// + public ITestClusterPortAllocator PortAllocator { get; } = new TestClusterPortAllocator(); + + /// + /// Adds a delegate for configuring silo and client hosts. + /// + public InProcessTestClusterBuilder ConfigureHost(Action configureDelegate) + { + Options.SiloHostConfigurationDelegates.Add((_, hostBuilder) => configureDelegate(hostBuilder)); + Options.ClientHostConfigurationDelegates.Add(configureDelegate); + return this; + } + + /// + /// Adds a delegate to configure silos. + /// + /// The builder. + public InProcessTestClusterBuilder ConfigureSilo(Action configureSiloDelegate) + { + Options.SiloHostConfigurationDelegates.Add((options, hostBuilder) => hostBuilder.UseOrleans(siloBuilder => configureSiloDelegate(options, siloBuilder))); + return this; + } + + /// + /// Adds a delegate to configure silo hosts. + /// + /// The builder. + public InProcessTestClusterBuilder ConfigureSiloHost(Action configureSiloHostDelegate) + { + Options.SiloHostConfigurationDelegates.Add(configureSiloHostDelegate); + return this; + } + + /// + /// Adds a delegate to configure clients. + /// + /// The builder. + public InProcessTestClusterBuilder ConfigureClient(Action configureClientDelegate) + { + Options.ClientHostConfigurationDelegates.Add(hostBuilder => hostBuilder.UseOrleansClient(clientBuilder => configureClientDelegate(clientBuilder))); + return this; + } + + /// + /// Adds a delegate to configure clients hosts. + /// + /// The builder. + public InProcessTestClusterBuilder ConfigureClientHost(Action configureHostDelegate) + { + Options.ClientHostConfigurationDelegates.Add(hostBuilder => configureHostDelegate(hostBuilder)); + return this; + } + + /// + /// Builds this instance. + /// + /// InProcessTestCluster. + public InProcessTestCluster Build() + { + var portAllocator = PortAllocator; + + ConfigureDefaultPorts(); + + var testCluster = new InProcessTestCluster(Options, portAllocator); + return testCluster; + } + + /// + /// Creates a cluster identifier. + /// + /// A new cluster identifier. + public static string CreateClusterId() + { + string prefix = "testcluster-"; + int randomSuffix = Random.Shared.Next(1000); + DateTime now = DateTime.UtcNow; + string DateTimeFormat = @"yyyy-MM-dd\tHH-mm-ss"; + return $"{prefix}{now.ToString(DateTimeFormat, CultureInfo.InvariantCulture)}-{randomSuffix}"; + } + + private void ConfigureDefaultPorts() + { + // Set base ports if none are currently set. + (int baseSiloPort, int baseGatewayPort) = PortAllocator.AllocateConsecutivePortPairs(Options.InitialSilosCount + 3); + if (Options.BaseSiloPort == 0) Options.BaseSiloPort = baseSiloPort; + if (Options.BaseGatewayPort == 0) Options.BaseGatewayPort = baseGatewayPort; + } + + internal class ConfigureStaticClusterDeploymentOptions : IHostConfigurator + { + public void Configure(IHostBuilder hostBuilder) + { + hostBuilder.ConfigureServices((context, services) => + { + var initialSilos = int.Parse(context.Configuration[nameof(InProcessTestClusterOptions.InitialSilosCount)]); + var siloNames = Enumerable.Range(0, initialSilos).Select(GetSiloName).ToList(); + services.Configure(options => options.SiloNames = siloNames); + }); + } + + private static string GetSiloName(int instanceNumber) + { + return instanceNumber == 0 ? Silo.PrimarySiloName : $"Secondary_{instanceNumber}"; + } + } +} \ No newline at end of file diff --git a/src/Orleans.TestingHost/InProcTestClusterOptions.cs b/src/Orleans.TestingHost/InProcTestClusterOptions.cs new file mode 100644 index 0000000000..5f0ae5e509 --- /dev/null +++ b/src/Orleans.TestingHost/InProcTestClusterOptions.cs @@ -0,0 +1,92 @@ +using System; +using System.Collections.Generic; +using Microsoft.Extensions.Hosting; +using Orleans.Configuration; +using Orleans.Hosting; + +namespace Orleans.TestingHost; + +/// +/// Configuration options for test clusters. +/// +public sealed class InProcessTestClusterOptions +{ + /// + /// Gets or sets the cluster identifier. + /// + /// + /// The cluster identifier. + public string ClusterId { get; set; } + + /// + /// Gets or sets the service identifier. + /// + /// + /// The service identifier. + public string ServiceId { get; set; } + + /// + /// Gets or sets the base silo port, which is the port for the first silo. Other silos will use subsequent ports. + /// + /// The base silo port. + internal int BaseSiloPort { get; set; } + + /// + /// Gets or sets the base gateway port, which is the gateway port for the first silo. Other silos will use subsequent ports. + /// + /// The base gateway port. + internal int BaseGatewayPort { get; set; } + + /// + /// Gets or sets a value indicating whether to use test cluster membership. + /// + /// if test cluster membership should be used; otherwise, . + internal bool UseTestClusterMembership { get; set; } + + /// + /// Gets or sets a value indicating whether to use the real environment statistics. + /// + public bool UseRealEnvironmentStatistics { get; set; } + + /// + /// Gets or sets a value indicating whether to initialize the client immediately on deployment. + /// + /// if the client should be initialized immediately on deployment; otherwise, . + public bool InitializeClientOnDeploy { get; set; } + + /// + /// Gets or sets the initial silos count. + /// + /// The initial silos count. + public short InitialSilosCount { get; set; } + + /// + /// Gets or sets a value indicating whether to configure file logging. + /// + /// if file logging should be configured; otherwise, . + public bool ConfigureFileLogging { get; set; } = true; + + /// + /// Gets or sets a value indicating whether to assume homogeneous silos for testing purposes. + /// + /// if the cluster should assume homogeneous silos; otherwise, . + public bool AssumeHomogenousSilosForTesting { get; set; } + + /// + /// Gets or sets a value indicating whether each silo should host a gateway. + /// + /// if each silo should host a gateway; otherwise, . + public bool GatewayPerSilo { get; set; } = true; + + /// + /// Gets the silo host configuration delegates. + /// + /// The silo host configuration delegates. + public List> SiloHostConfigurationDelegates { get; } = []; + + /// + /// Gets the client host configuration delegates. + /// + /// The client host configuration delegates. + public List> ClientHostConfigurationDelegates { get; } = []; +} diff --git a/src/Orleans.TestingHost/InProcTestSiloSpecificOptions.cs b/src/Orleans.TestingHost/InProcTestSiloSpecificOptions.cs new file mode 100644 index 0000000000..c7904fc0bb --- /dev/null +++ b/src/Orleans.TestingHost/InProcTestSiloSpecificOptions.cs @@ -0,0 +1,55 @@ +namespace Orleans.TestingHost; + +/// +/// Configuration overrides for individual silos. +/// +public sealed class InProcessTestSiloSpecificOptions +{ + /// + /// Gets or sets the silo port. + /// + /// The silo port. + public int SiloPort { get; set; } + + /// + /// Gets or sets the gateway port. + /// + /// The gateway port. + public int GatewayPort { get; set; } + + /// + /// Gets or sets the name of the silo. + /// + /// The name of the silo. + public string SiloName { get; set; } + + /// + /// Creates an instance of the class. + /// + /// The test cluster. + /// The test cluster options. + /// The instance number. + /// if set to , assign a new port for the silo. + /// The options. + public static InProcessTestSiloSpecificOptions Create(InProcessTestCluster testCluster, InProcessTestClusterOptions testClusterOptions, int instanceNumber, bool assignNewPort = false) + { + var result = new InProcessTestSiloSpecificOptions + { + SiloName = $"Silo_{instanceNumber}", + }; + + if (assignNewPort) + { + var (siloPort, gatewayPort) = testCluster.PortAllocator.AllocateConsecutivePortPairs(1); + result.SiloPort = siloPort; + result.GatewayPort = (instanceNumber == 0 || testClusterOptions.GatewayPerSilo) ? gatewayPort : 0; + } + else + { + result.SiloPort = testClusterOptions.BaseSiloPort + instanceNumber; + result.GatewayPort = (instanceNumber == 0 || testClusterOptions.GatewayPerSilo) ? testClusterOptions.BaseGatewayPort + instanceNumber : 0; + } + + return result; + } +} diff --git a/src/Orleans.TestingHost/InProcess/InProcessGrainDirectory.cs b/src/Orleans.TestingHost/InProcess/InProcessGrainDirectory.cs new file mode 100644 index 0000000000..26b7d48797 --- /dev/null +++ b/src/Orleans.TestingHost/InProcess/InProcessGrainDirectory.cs @@ -0,0 +1,82 @@ +#nullable enable +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading.Tasks; +using Orleans.GrainDirectory; +using Orleans.Runtime; + +namespace Orleans.TestingHost.InProcess; +internal sealed class InProcessGrainDirectory(Func getSiloStatus) : IGrainDirectory +{ + private readonly ConcurrentDictionary _entries = []; + + public Task Lookup(GrainId grainId) + { + if (_entries.TryGetValue(grainId, out var result) && !IsSiloDead(result)) + { + return Task.FromResult(result); + } + + return Task.FromResult(null); + } + + public Task Register(GrainAddress address, GrainAddress? previousAddress) + { + ArgumentNullException.ThrowIfNull(address); + + var result = _entries.AddOrUpdate( + address.GrainId, + static (grainId, state) => state.Address, + static (grainId, existing, state) => + { + if (existing is null || state.PreviousAddress is { } prev && existing.Matches(prev) || state.Self.IsSiloDead(existing)) + { + return state.Address; + } + + return existing; + }, + (Self: this, Address: address, PreviousAddress: previousAddress)); + + if (result is null || IsSiloDead(result)) + { + return Task.FromResult(null); + } + + return Task.FromResult(result); + } + + public Task Register(GrainAddress address) => Register(address, null); + + public Task Unregister(GrainAddress address) + { + if (!((IDictionary)_entries).Remove(KeyValuePair.Create(address.GrainId, address))) + { + if (_entries.TryGetValue(address.GrainId, out var existing) && (existing.Matches(address) || IsSiloDead(existing))) + { + ((IDictionary)_entries).Remove(KeyValuePair.Create(existing.GrainId, existing)); + } + } + + return Task.CompletedTask; + } + + public Task UnregisterSilos(List siloAddresses) + { + foreach (var entry in _entries) + { + foreach (var silo in siloAddresses) + { + if (silo.Equals(entry.Value.SiloAddress)) + { + ((IDictionary)_entries).Remove(entry); + } + } + } + + return Task.CompletedTask; + } + + private bool IsSiloDead(GrainAddress existing) => existing.SiloAddress is not { } address || getSiloStatus(address) is SiloStatus.Dead or SiloStatus.None; +} diff --git a/src/Orleans.TestingHost/InProcess/InProcessMembershipTable.cs b/src/Orleans.TestingHost/InProcess/InProcessMembershipTable.cs new file mode 100644 index 0000000000..bfefa09531 --- /dev/null +++ b/src/Orleans.TestingHost/InProcess/InProcessMembershipTable.cs @@ -0,0 +1,193 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Orleans.Runtime; +using System.Globalization; +using System.Threading.Tasks; +using Orleans.Messaging; + +namespace Orleans.TestingHost.InProcess; + +/// +/// An in-memory implementation of for testing purposes. +/// +internal sealed class InProcessMembershipTable(string clusterId) : IMembershipTableSystemTarget, IGatewayListProvider +{ + private readonly Table _table = new(); + private readonly string _clusterId = clusterId; + + public TimeSpan MaxStaleness => TimeSpan.Zero; + public bool IsUpdatable => true; + + public Task InitializeMembershipTable(bool tryInitTableVersion) => Task.CompletedTask; + + public Task DeleteMembershipTableEntries(string clusterId) + { + if (string.Equals(_clusterId, clusterId, StringComparison.Ordinal)) + { + _table.Clear(); + } + + return Task.CompletedTask; + } + + public Task ReadRow(SiloAddress key) => Task.FromResult(_table.Read(key)); + + public Task ReadAll() => Task.FromResult(_table.ReadAll()); + + public Task InsertRow(MembershipEntry entry, TableVersion tableVersion) => Task.FromResult(_table.Insert(entry, tableVersion)); + + public Task UpdateRow(MembershipEntry entry, string etag, TableVersion tableVersion) => Task.FromResult(_table.Update(entry, etag, tableVersion)); + + public Task UpdateIAmAlive(MembershipEntry entry) + { + _table.UpdateIAmAlive(entry); + return Task.CompletedTask; + } + + public Task CleanupDefunctSiloEntries(DateTimeOffset beforeDate) + { + _table.CleanupDefunctSiloEntries(beforeDate); + return Task.CompletedTask; + } + + public Task InitializeGatewayListProvider() => Task.CompletedTask; + + public Task> GetGateways() + { + var table = _table.ReadAll(); + var result = table.Members + .Where(x => x.Item1.Status == SiloStatus.Active && x.Item1.ProxyPort != 0) + .Select(x => + { + var entry = x.Item1; + return SiloAddress.New(entry.SiloAddress.Endpoint.Address, entry.ProxyPort, entry.SiloAddress.Generation).ToGatewayUri(); + }).ToList(); + return Task.FromResult>(result); + } + + public SiloStatus GetSiloStatus(SiloAddress address) => _table.GetSiloStatus(address); + + private sealed class Table + { + private readonly object _lock = new(); + private readonly Dictionary _table = []; + private TableVersion _tableVersion; + private long _lastETagCounter; + + public Table() + { + _tableVersion = new TableVersion(0, NewETag()); + } + public SiloStatus GetSiloStatus(SiloAddress key) + { + lock (_lock) + { + return _table.TryGetValue(key, out var data) ? data.Entry.Status : SiloStatus.None; + } + } + + public MembershipTableData Read(SiloAddress key) + { + lock (_lock) + { + return _table.TryGetValue(key, out var data) ? + new MembershipTableData(Tuple.Create(data.Entry.Copy(), data.ETag), _tableVersion) + : new MembershipTableData(_tableVersion); + } + } + + public MembershipTableData ReadAll() + { + lock (_lock) + { + return new MembershipTableData(_table.Values.Select(data => Tuple.Create(data.Entry.Copy(), data.ETag)).ToList(), _tableVersion); + } + } + + public TableVersion ReadTableVersion() => _tableVersion; + + public bool Insert(MembershipEntry entry, TableVersion version) + { + lock (_lock) + { + if (_table.TryGetValue(entry.SiloAddress, out var data)) + { + return false; + } + + if (!_tableVersion.VersionEtag.Equals(version.VersionEtag)) + { + return false; + } + + _table[entry.SiloAddress] = (entry.Copy(), _lastETagCounter++.ToString(CultureInfo.InvariantCulture)); + _tableVersion = new TableVersion(version.Version, NewETag()); + return true; + } + } + + public bool Update(MembershipEntry entry, string etag, TableVersion version) + { + lock (_lock) + { + if (!_table.TryGetValue(entry.SiloAddress, out var data)) + { + return false; + } + + if (!data.ETag.Equals(etag) || !_tableVersion.VersionEtag.Equals(version.VersionEtag)) + { + return false; + } + + _table[entry.SiloAddress] = (entry.Copy(), _lastETagCounter++.ToString(CultureInfo.InvariantCulture)); + _tableVersion = new TableVersion(version.Version, NewETag()); + return true; + } + } + + public void UpdateIAmAlive(MembershipEntry entry) + { + lock (_lock) + { + if (!_table.TryGetValue(entry.SiloAddress, out var data)) + { + return; + } + + data.Entry.IAmAliveTime = entry.IAmAliveTime; + _table[entry.SiloAddress] = (data.Entry, NewETag()); + } + } + + public void CleanupDefunctSiloEntries(DateTimeOffset beforeDate) + { + lock (_lock) + { + var entries = _table.Values.ToList(); + foreach (var (entry, _) in entries) + { + if (entry.Status == SiloStatus.Dead + && new DateTime(Math.Max(entry.IAmAliveTime.Ticks, entry.StartTime.Ticks), DateTimeKind.Utc) < beforeDate) + { + _table.Remove(entry.SiloAddress, out _); + continue; + } + } + } + } + + internal void Clear() + { + lock (_lock) + { + _table.Clear(); + } + } + + public override string ToString() => $"Table = {ReadAll()}, ETagCounter={_lastETagCounter}"; + + private string NewETag() => _lastETagCounter++.ToString(CultureInfo.InvariantCulture); + } +} diff --git a/src/Orleans.TestingHost/InProcessSiloHandle.cs b/src/Orleans.TestingHost/InProcessSiloHandle.cs index 7712db2cbd..0191f24034 100644 --- a/src/Orleans.TestingHost/InProcessSiloHandle.cs +++ b/src/Orleans.TestingHost/InProcessSiloHandle.cs @@ -16,7 +16,12 @@ public class InProcessSiloHandle : SiloHandle private bool isActive = true; /// Gets a reference to the silo host. - public IHost SiloHost { get; private set; } + public IHost SiloHost { get; init; } + + /// + /// Gets the silo's service provider. + /// + public IServiceProvider ServiceProvider => SiloHost.Services; /// public override bool IsActive => isActive; @@ -28,7 +33,7 @@ public class InProcessSiloHandle : SiloHandle /// The configuration. /// An optional delegate which is invoked just prior to building the host builder. /// The silo handle. - public static async Task CreateAsync( + public static async Task CreateAsync( string siloName, IConfiguration configuration, Action postConfigureHostBuilder = null) diff --git a/test/TestInfrastructure/TestExtensions/BaseInProcessTestClusterFixture.cs b/test/TestInfrastructure/TestExtensions/BaseInProcessTestClusterFixture.cs new file mode 100644 index 0000000000..78c9b623b4 --- /dev/null +++ b/test/TestInfrastructure/TestExtensions/BaseInProcessTestClusterFixture.cs @@ -0,0 +1,81 @@ +using System.Runtime.ExceptionServices; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Orleans.Configuration; +using Orleans.TestingHost; + +namespace TestExtensions; + +public abstract class BaseInProcessTestClusterFixture : Xunit.IAsyncLifetime +{ + private readonly ExceptionDispatchInfo preconditionsException; + + static BaseInProcessTestClusterFixture() + { + TestDefaultConfiguration.InitializeDefaults(); + } + + protected BaseInProcessTestClusterFixture() + { + try + { + CheckPreconditionsOrThrow(); + } + catch (Exception ex) + { + preconditionsException = ExceptionDispatchInfo.Capture(ex); + return; + } + } + + public void EnsurePreconditionsMet() + { + preconditionsException?.Throw(); + } + + protected virtual void CheckPreconditionsOrThrow() { } + + protected virtual void ConfigureTestCluster(InProcessTestClusterBuilder builder) + { + } + + public InProcessTestCluster HostedCluster { get; private set; } + + public IGrainFactory GrainFactory => Client; + + public IClusterClient Client => HostedCluster?.Client; + + public ILogger Logger { get; private set; } + + public string GetClientServiceId() => Client.ServiceProvider.GetRequiredService>().Value.ServiceId; + + public virtual async Task InitializeAsync() + { + EnsurePreconditionsMet(); + var builder = new InProcessTestClusterBuilder(); + builder.ConfigureHost(hostBuilder => TestDefaultConfiguration.ConfigureHostConfiguration(hostBuilder.Configuration)); + ConfigureTestCluster(builder); + + var testCluster = builder.Build(); + await testCluster.DeployAsync().ConfigureAwait(false); + + HostedCluster = testCluster; + Logger = Client.ServiceProvider.GetRequiredService().CreateLogger("Application"); + } + + public virtual async Task DisposeAsync() + { + var cluster = HostedCluster; + if (cluster is null) return; + + try + { + await cluster.StopAllSilosAsync().ConfigureAwait(false); + } + finally + { + await cluster.DisposeAsync().ConfigureAwait(false); + } + } +} \ No newline at end of file diff --git a/test/Tester/ClientConnectionTests/ClientConnectionEventTests.cs b/test/Tester/ClientConnectionTests/ClientConnectionEventTests.cs index 92a7dc05bf..4a4c4440eb 100644 --- a/test/Tester/ClientConnectionTests/ClientConnectionEventTests.cs +++ b/test/Tester/ClientConnectionTests/ClientConnectionEventTests.cs @@ -1,63 +1,48 @@ -using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Orleans.Configuration; using Orleans.TestingHost; -using TestExtensions; using UnitTests.GrainInterfaces; using Xunit; -namespace Tester +namespace Tester; + +public class ClientConnectionEventTests { - public class ClientConnectionEventTests : TestClusterPerTest + [Fact, TestCategory("SlowBVT")] + public async Task EventSendWhenDisconnectedFromCluster() { - private OutsideRuntimeClient runtimeClient; - - protected override void ConfigureTestCluster(TestClusterBuilder builder) - { - builder.AddClientBuilderConfigurator(); - } - - public override async Task InitializeAsync() + var semaphore = new SemaphoreSlim(0, 1); + var builder = new InProcessTestClusterBuilder(); + builder.ConfigureClient(c => { - await base.InitializeAsync(); - this.runtimeClient = this.HostedCluster.Client.ServiceProvider.GetRequiredService(); - } - - public class Configurator : IClientBuilderConfigurator + c.Configure(o => o.GatewayListRefreshPeriod = TimeSpan.FromSeconds(0.5)); + c.AddClusterConnectionLostHandler((sender, args) => semaphore.Release()); + }); + await using var cluster = builder.Build(); + await cluster.DeployAsync(); + + // Burst lot of call, to be sure that we are connected to all silos + for (int i = 0; i < 100; i++) { - public void Configure(IConfiguration configuration, IClientBuilder clientBuilder) - { - clientBuilder.Configure(options => options.GatewayListRefreshPeriod = TimeSpan.FromSeconds(1)); - } + var grain = cluster.Client.GetGrain(i); + await grain.SetLabel(i.ToString()); } - [Fact, TestCategory("SlowBVT")] - public async Task EventSendWhenDisconnectedFromCluster() - { - var runtime = this.HostedCluster.ServiceProvider.GetRequiredService(); - - var semaphore = new SemaphoreSlim(0, 1); - this.runtimeClient.ClusterConnectionLost += (sender, args) => semaphore.Release(); + await cluster.StopAllSilosAsync(); - // Burst lot of call, to be sure that we are connected to all silos - for (int i = 0; i < 100; i++) - { - var grain = GrainFactory.GetGrain(i); - await grain.SetLabel(i.ToString()); - } - - await this.HostedCluster.StopAllSilosAsync(); - - Assert.True(await semaphore.WaitAsync(TimeSpan.FromSeconds(10))); - } + Assert.True(await semaphore.WaitAsync(TimeSpan.FromSeconds(10))); + } - [Fact, TestCategory("SlowBVT")] - public async Task GatewayChangedEventSentOnDisconnectAndReconnect() + [Fact, TestCategory("SlowBVT")] + public async Task GatewayChangedEventSentOnDisconnectAndReconnect() + { + var regainedGatewaySemaphore = new SemaphoreSlim(0, 1); + var lostGatewaySemaphore = new SemaphoreSlim(0, 1); + var builder = new InProcessTestClusterBuilder(); + builder.ConfigureClient(c => { - var regainedGatewaySemaphore = new SemaphoreSlim(0, 1); - var lostGatewaySemaphore = new SemaphoreSlim(0, 1); - - this.runtimeClient.GatewayCountChanged += (sender, args) => + c.Configure(o => o.GatewayListRefreshPeriod = TimeSpan.FromSeconds(0.5)); + c.AddGatewayCountChangedHandler((sender, args) => { if (args.NumberOfConnectedGateways == 1) { @@ -67,25 +52,27 @@ public async Task GatewayChangedEventSentOnDisconnectAndReconnect() { regainedGatewaySemaphore.Release(); } - }; + }); + }); + await using var cluster = builder.Build(); + await cluster.DeployAsync(); - var silo = this.HostedCluster.SecondarySilos[0]; - await silo.StopSiloAsync(true); + var silo = cluster.Silos[0]; + await silo.StopSiloAsync(true); - Assert.True(await lostGatewaySemaphore.WaitAsync(TimeSpan.FromSeconds(20))); + Assert.True(await lostGatewaySemaphore.WaitAsync(TimeSpan.FromSeconds(20))); - await this.HostedCluster.RestartStoppedSecondarySiloAsync(silo.Name); + await cluster.RestartStoppedSecondarySiloAsync(silo.Name); - // Clients need prodding to reconnect. - var remainingAttempts = 90; - bool reconnected; - do - { - this.Client.GetGrain(Guid.NewGuid().GetHashCode()).SetLabel("test").Ignore(); - reconnected = await regainedGatewaySemaphore.WaitAsync(TimeSpan.FromSeconds(1)); - } while (!reconnected && --remainingAttempts > 0); + // Clients need prodding to reconnect. + var remainingAttempts = 90; + bool reconnected; + do + { + cluster.Client.GetGrain(Guid.NewGuid().GetHashCode()).SetLabel("test").Ignore(); + reconnected = await regainedGatewaySemaphore.WaitAsync(TimeSpan.FromSeconds(1)); + } while (!reconnected && --remainingAttempts > 0); - Assert.True(reconnected, "Failed to reconnect to restarted gateway."); - } + Assert.True(reconnected, "Failed to reconnect to restarted gateway."); } }