From 9907c0b57ce4c6dfcdf4e81f4cfe003e17d19482 Mon Sep 17 00:00:00 2001
From: Reuben Bond <203839+ReubenBond@users.noreply.github.com>
Date: Mon, 30 Sep 2024 18:59:04 -0700
Subject: [PATCH] Revert "Add IClusterConnectionStatusObserver support" until
InProcessTestCluster is merged (#9157)
Revert "Add IClusterConnectionStatusObserver support (#9145)"
This reverts commit 546b73989d94b5ee9be9f238a402f86f9263c8d4.
---
.../Core/ClientBuilderExtensions.cs | 41 -
.../Core/DefaultClientServices.cs | 1 -
.../Core/IClusterConnectionStatusObserver.cs | 24 -
.../ClusterConnectionStatusObserverAdaptor.cs | 46 --
.../Runtime/OutsideRuntimeClient.cs | 60 +-
.../InMemoryTransportConnection.cs | 2 -
src/Orleans.TestingHost/InProcTestCluster.cs | 754 ------------------
.../InProcTestClusterBuilder.cs | 153 ----
.../InProcTestClusterOptions.cs | 92 ---
.../InProcTestSiloSpecificOptions.cs | 55 --
.../InProcess/InProcessGrainDirectory.cs | 82 --
.../InProcess/InProcessMembershipTable.cs | 193 -----
.../InProcessSiloHandle.cs | 9 +-
.../BaseInProcessTestClusterFixture.cs | 81 --
.../ClientConnectionEventTests.cs | 107 +--
15 files changed, 96 insertions(+), 1604 deletions(-)
delete mode 100644 src/Orleans.Core/Core/IClusterConnectionStatusObserver.cs
delete mode 100644 src/Orleans.Core/Runtime/ClusterConnectionStatusObserverAdaptor.cs
delete mode 100644 src/Orleans.TestingHost/InProcTestCluster.cs
delete mode 100644 src/Orleans.TestingHost/InProcTestClusterBuilder.cs
delete mode 100644 src/Orleans.TestingHost/InProcTestClusterOptions.cs
delete mode 100644 src/Orleans.TestingHost/InProcTestSiloSpecificOptions.cs
delete mode 100644 src/Orleans.TestingHost/InProcess/InProcessGrainDirectory.cs
delete mode 100644 src/Orleans.TestingHost/InProcess/InProcessMembershipTable.cs
delete mode 100644 test/TestInfrastructure/TestExtensions/BaseInProcessTestClusterFixture.cs
diff --git a/src/Orleans.Core/Core/ClientBuilderExtensions.cs b/src/Orleans.Core/Core/ClientBuilderExtensions.cs
index 82d87a1237..3a7b35033d 100644
--- a/src/Orleans.Core/Core/ClientBuilderExtensions.cs
+++ b/src/Orleans.Core/Core/ClientBuilderExtensions.cs
@@ -104,35 +104,6 @@ public static IClientBuilder AddGatewayCountChangedHandler(this IClientBuilder b
return builder;
}
- ///
- /// Registers a event handler.
- ///
- public static IClientBuilder AddGatewayCountChangedHandler(this IClientBuilder builder, Func handlerFactory)
- {
- builder.ConfigureServices(services => services.AddSingleton(handlerFactory));
- return builder;
- }
-
- ///
- /// Registers a cluster connection status observer.
- ///
- public static IClientBuilder AddClusterConnectionStatusObserver(this IClientBuilder builder, TObserver observer)
- where TObserver : IClusterConnectionStatusObserver
- {
- builder.Services.AddSingleton(observer);
- return builder;
- }
-
- ///
- /// Registers a cluster connection status observer.
- ///
- public static IClientBuilder AddClusterConnectionStatusObserver(this IClientBuilder builder)
- where TObserver : class, IClusterConnectionStatusObserver
- {
- builder.Services.AddSingleton();
- return builder;
- }
-
///
/// Registers a event handler.
///
@@ -145,18 +116,6 @@ public static IClientBuilder AddClusterConnectionLostHandler(this IClientBuilder
return builder;
}
- ///
- /// Registers a event handler.
- ///
- /// The builder.
- /// The handler factory.
- /// The builder.
- public static IClientBuilder AddClusterConnectionLostHandler(this IClientBuilder builder, Func handlerFactory)
- {
- builder.ConfigureServices(services => services.AddSingleton(handlerFactory));
- return builder;
- }
-
///
/// Add propagation through grain calls.
/// Note: according to activity will be created only when any listener for activity exists and returns .
diff --git a/src/Orleans.Core/Core/DefaultClientServices.cs b/src/Orleans.Core/Core/DefaultClientServices.cs
index 6dd179cf51..51315459af 100644
--- a/src/Orleans.Core/Core/DefaultClientServices.cs
+++ b/src/Orleans.Core/Core/DefaultClientServices.cs
@@ -75,7 +75,6 @@ public static void AddDefaultServices(IClientBuilder builder)
services.TryAddSingleton();
services.TryAddSingleton();
services.TryAddSingleton();
- services.AddSingleton();
services.AddFromExisting();
services.TryAddFromExisting();
services.TryAddFromExisting();
diff --git a/src/Orleans.Core/Core/IClusterConnectionStatusObserver.cs b/src/Orleans.Core/Core/IClusterConnectionStatusObserver.cs
deleted file mode 100644
index b4ce7cefee..0000000000
--- a/src/Orleans.Core/Core/IClusterConnectionStatusObserver.cs
+++ /dev/null
@@ -1,24 +0,0 @@
-namespace Orleans;
-
-///
-/// Interface that receives notifications about the status of the cluster connection.
-///
-public interface IClusterConnectionStatusObserver
-{
- ///
- /// Notifies this observer that the number of connected gateways has changed.
- ///
- ///
- /// The current number of gateways.
- ///
- ///
- /// The previous number of gateways.
- ///
- /// Indicates whether a loss of connectivity has been resolved.
- void NotifyGatewayCountChanged(int currentNumberOfGateways, int previousNumberOfGateways, bool connectionRecovered);
-
- ///
- /// Notifies this observer that the connection to the cluster has been lost.
- ///
- void NotifyClusterConnectionLost();
-}
\ No newline at end of file
diff --git a/src/Orleans.Core/Runtime/ClusterConnectionStatusObserverAdaptor.cs b/src/Orleans.Core/Runtime/ClusterConnectionStatusObserverAdaptor.cs
deleted file mode 100644
index fd8e0932d2..0000000000
--- a/src/Orleans.Core/Runtime/ClusterConnectionStatusObserverAdaptor.cs
+++ /dev/null
@@ -1,46 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Collections.Immutable;
-using Microsoft.Extensions.Logging;
-
-namespace Orleans.Runtime;
-
-internal sealed class ClusterConnectionStatusObserverAdaptor(
- IEnumerable gatewayCountChangedHandlers,
- IEnumerable connectionLostHandlers,
- ILogger logger) : IClusterConnectionStatusObserver
-{
- private readonly ImmutableArray _gatewayCountChangedHandlers = gatewayCountChangedHandlers.ToImmutableArray();
- private readonly ImmutableArray _connectionLostHandler = connectionLostHandlers.ToImmutableArray();
-
- public void NotifyClusterConnectionLost()
- {
- foreach (var handler in _connectionLostHandler)
- {
- try
- {
- handler(null, EventArgs.Empty);
- }
- catch (Exception ex)
- {
- logger.LogError((int)ErrorCode.ClientError, ex, "Error sending cluster connection lost notification.");
- }
- }
- }
-
- public void NotifyGatewayCountChanged(int currentNumberOfGateways, int previousNumberOfGateways, bool connectionRecovered)
- {
- var args = new GatewayCountChangedEventArgs(currentNumberOfGateways, previousNumberOfGateways);
- foreach (var handler in _gatewayCountChangedHandlers)
- {
- try
- {
- handler(null, args);
- }
- catch (Exception ex)
- {
- logger.LogError((int)ErrorCode.ClientError, ex, "Error sending gateway count changed notification.");
- }
- }
- }
-}
\ No newline at end of file
diff --git a/src/Orleans.Core/Runtime/OutsideRuntimeClient.cs b/src/Orleans.Core/Runtime/OutsideRuntimeClient.cs
index d9dff1864b..c44ac4316d 100644
--- a/src/Orleans.Core/Runtime/OutsideRuntimeClient.cs
+++ b/src/Orleans.Core/Runtime/OutsideRuntimeClient.cs
@@ -1,5 +1,4 @@
using System;
-using System.Collections;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
@@ -14,11 +13,11 @@
using Orleans.Runtime;
using Orleans.Serialization;
using Orleans.Serialization.Invocation;
+using Orleans.Serialization.Serializers;
using static Orleans.Internal.StandardExtensions;
namespace Orleans
{
-
internal class OutsideRuntimeClient : IRuntimeClient, IDisposable, IClusterConnectionStatusListener
{
internal static bool TestOnlyThrowExceptionDuringInit { get; set; }
@@ -33,7 +32,6 @@ internal class OutsideRuntimeClient : IRuntimeClient, IDisposable, IClusterConne
private readonly MessagingTrace messagingTrace;
private readonly InterfaceToImplementationMappingCache _interfaceToImplementationMapping;
- private IClusterConnectionStatusObserver[] _statusObservers;
public IInternalGrainFactory InternalGrainFactory { get; private set; }
@@ -97,7 +95,17 @@ internal void ConsumeServices()
{
try
{
- _statusObservers = this.ServiceProvider.GetServices().ToArray();
+ var connectionLostHandlers = this.ServiceProvider.GetServices();
+ foreach (var handler in connectionLostHandlers)
+ {
+ this.ClusterConnectionLost += handler;
+ }
+
+ var gatewayCountChangedHandlers = this.ServiceProvider.GetServices();
+ foreach (var handler in gatewayCountChangedHandlers)
+ {
+ this.GatewayCountChanged += handler;
+ }
this.InternalGrainFactory = this.ServiceProvider.GetRequiredService();
this.messageFactory = this.ServiceProvider.GetService();
@@ -264,7 +272,7 @@ public void SendRequest(GrainReference target, IInvokable request, IResponseComp
{
// don't set expiration for system target messages.
var ttl = request.GetDefaultResponseTimeout() ?? this.clientMessagingOptions.ResponseTimeout;
- message.TimeToLive = ttl;
+ message.TimeToLive = ttl;
}
if (!oneWay)
@@ -393,6 +401,9 @@ public void Dispose()
Utils.SafeExecute(() => MessageCenter?.Dispose());
+ this.ClusterConnectionLost = null;
+ this.GatewayCountChanged = null;
+
GC.SuppressFinalize(this);
disposed = true;
}
@@ -411,38 +422,35 @@ public void BreakOutstandingMessagesToDeadSilo(SiloAddress deadSilo)
public int GetRunningRequestsCount(GrainInterfaceType grainInterfaceType)
=> this.callbacks.Count(c => c.Value.Message.InterfaceType == grainInterfaceType);
+ ///
+ public event ConnectionToClusterLostHandler ClusterConnectionLost;
+
+ ///
+ public event GatewayCountChangedHandler GatewayCountChanged;
+
///
public void NotifyClusterConnectionLost()
{
- foreach (var observer in _statusObservers)
+ try
{
- try
- {
- observer.NotifyClusterConnectionLost();
- }
- catch (Exception ex)
- {
- this.logger.LogError((int)ErrorCode.ClientError, ex, "Error sending cluster disconnection notification.");
- }
+ this.ClusterConnectionLost?.Invoke(this, EventArgs.Empty);
+ }
+ catch (Exception ex)
+ {
+ this.logger.LogError((int)ErrorCode.ClientError, ex, "Error when sending cluster disconnection notification");
}
}
///
public void NotifyGatewayCountChanged(int currentNumberOfGateways, int previousNumberOfGateways)
{
- foreach (var observer in _statusObservers)
+ try
{
- try
- {
- observer.NotifyGatewayCountChanged(
- currentNumberOfGateways,
- previousNumberOfGateways,
- currentNumberOfGateways > 0 && previousNumberOfGateways <= 0);
- }
- catch (Exception ex)
- {
- this.logger.LogError((int)ErrorCode.ClientError, ex, "Error sending gateway count changed notification.");
- }
+ this.GatewayCountChanged?.Invoke(this, new GatewayCountChangedEventArgs(currentNumberOfGateways, previousNumberOfGateways));
+ }
+ catch (Exception ex)
+ {
+ this.logger.LogError((int)ErrorCode.ClientError, ex, "Error when sending gateway count changed notification");
}
}
diff --git a/src/Orleans.TestingHost/InMemoryTransport/InMemoryTransportConnection.cs b/src/Orleans.TestingHost/InMemoryTransport/InMemoryTransportConnection.cs
index bf45faeb0d..01ec045fbf 100644
--- a/src/Orleans.TestingHost/InMemoryTransport/InMemoryTransportConnection.cs
+++ b/src/Orleans.TestingHost/InMemoryTransport/InMemoryTransportConnection.cs
@@ -87,6 +87,4 @@ public override async ValueTask DisposeAsync()
_connectionClosedTokenSource.Dispose();
}
-
- public override string ToString() => $"InMem({LocalEndPoint}<->{RemoteEndPoint})";
}
diff --git a/src/Orleans.TestingHost/InProcTestCluster.cs b/src/Orleans.TestingHost/InProcTestCluster.cs
deleted file mode 100644
index 5256724292..0000000000
--- a/src/Orleans.TestingHost/InProcTestCluster.cs
+++ /dev/null
@@ -1,754 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Collections.ObjectModel;
-using System.Diagnostics;
-using System.Linq;
-using System.Text;
-using System.Threading;
-using System.Threading.Tasks;
-using Microsoft.Extensions.DependencyInjection;
-using Orleans.Runtime;
-using Orleans.TestingHost.Utils;
-using Microsoft.Extensions.Configuration;
-using Microsoft.Extensions.Configuration.Memory;
-using Orleans.Configuration;
-using Microsoft.Extensions.Options;
-using Microsoft.Extensions.Hosting;
-using Orleans.TestingHost.InMemoryTransport;
-using System.Net;
-using Orleans.Statistics;
-using Orleans.TestingHost.InProcess;
-using Orleans.Runtime.Hosting;
-using Orleans.GrainDirectory;
-using Orleans.Messaging;
-using Orleans.Hosting;
-using Orleans.Runtime.TestHooks;
-using Orleans.Configuration.Internal;
-using Orleans.TestingHost.Logging;
-
-namespace Orleans.TestingHost;
-
-///
-/// A host class for local testing with Orleans using in-process silos.
-///
-public sealed class InProcessTestCluster : IDisposable, IAsyncDisposable
-{
- private readonly List _silos = [];
- private readonly StringBuilder _log = new();
- private readonly InMemoryTransportConnectionHub _transportHub = new();
- private readonly InProcessGrainDirectory _grainDirectory;
- private readonly InProcessMembershipTable _membershipTable;
- private bool _disposed;
- private int _startedInstances;
-
- ///
- /// Collection of all known silos.
- ///
- public ReadOnlyCollection Silos
- {
- get
- {
- lock (_silos)
- {
- return new List(_silos).AsReadOnly();
- }
- }
- }
-
- ///
- /// Options used to configure the test cluster.
- ///
- /// This is the options you configured your test cluster with, or the default one.
- /// If the cluster is being configured via ClusterConfiguration, then this object may not reflect the true settings.
- ///
- public InProcessTestClusterOptions Options { get; }
-
- ///
- /// The internal client interface.
- ///
- internal IHost ClientHost { get; private set; }
-
- ///
- /// The internal client interface.
- ///
- internal IInternalClusterClient InternalClient => ClientHost?.Services.GetRequiredService();
-
- ///
- /// The client.
- ///
- public IClusterClient Client => ClientHost?.Services.GetRequiredService();
-
- ///
- /// The port allocator.
- ///
- public ITestClusterPortAllocator PortAllocator { get; }
-
- ///
- /// Configures the test cluster plus client in-process.
- ///
- public InProcessTestCluster(
- InProcessTestClusterOptions options,
- ITestClusterPortAllocator portAllocator)
- {
- Options = options;
- PortAllocator = portAllocator;
- _membershipTable = new(options.ClusterId);
- _grainDirectory = new(_membershipTable.GetSiloStatus);
- }
-
- ///
- /// Returns the associated with the given .
- ///
- /// The silo process to the the service provider for.
- /// If is one of the existing silos will be picked randomly.
- public IServiceProvider GetSiloServiceProvider(SiloAddress silo = null)
- {
- if (silo != null)
- {
- var handle = Silos.FirstOrDefault(x => x.SiloAddress.Equals(silo));
- return handle != null ? handle.SiloHost.Services :
- throw new ArgumentException($"The provided silo address '{silo}' is unknown.");
- }
- else
- {
- var index = Random.Shared.Next(Silos.Count);
- return Silos[index].SiloHost.Services;
- }
- }
-
- ///
- /// Deploys the cluster using the specified configuration and starts the client in-process.
- ///
- public async Task DeployAsync()
- {
- if (_silos.Count > 0) throw new InvalidOperationException("Cluster host already deployed.");
-
- AppDomain.CurrentDomain.UnhandledException += ReportUnobservedException;
-
- try
- {
- string startMsg = "----------------------------- STARTING NEW UNIT TEST SILO HOST: " + GetType().FullName + " -------------------------------------";
- WriteLog(startMsg);
- await InitializeAsync();
-
- if (Options.InitializeClientOnDeploy)
- {
- await WaitForInitialStabilization();
- }
- }
- catch (TimeoutException te)
- {
- FlushLogToConsole();
- throw new TimeoutException("Timeout during test initialization", te);
- }
- catch (Exception ex)
- {
- await StopAllSilosAsync();
-
- Exception baseExc = ex.GetBaseException();
- FlushLogToConsole();
-
- if (baseExc is TimeoutException)
- {
- throw new TimeoutException("Timeout during test initialization", ex);
- }
-
- // IMPORTANT:
- // Do NOT re-throw the original exception here, also not as an internal exception inside AggregateException
- // Due to the way MS tests works, if the original exception is an Orleans exception,
- // it's assembly might not be loaded yet in this phase of the test.
- // As a result, we will get "MSTest: Unit Test Adapter threw exception: Type is not resolved for member XXX"
- // and will loose the original exception. This makes debugging tests super hard!
- // The root cause has to do with us initializing our tests from Test constructor and not from TestInitialize method.
- // More details: http://dobrzanski.net/2010/09/20/mstest-unit-test-adapter-threw-exception-type-is-not-resolved-for-member/
- //throw new Exception(
- // string.Format("Exception during test initialization: {0}",
- // LogFormatter.PrintException(baseExc)));
- throw;
- }
- }
-
- private async Task WaitForInitialStabilization()
- {
- // Poll each silo to check that it knows the expected number of active silos.
- // If any silo does not have the expected number of active silos in its cluster membership oracle, try again.
- // If the cluster membership has not stabilized after a certain period of time, give up and continue anyway.
- var totalWait = Stopwatch.StartNew();
- while (true)
- {
- var silos = Silos;
- var expectedCount = silos.Count;
- var remainingSilos = expectedCount;
-
- foreach (var silo in silos)
- {
- var hooks = InternalClient.GetTestHooks(silo);
- var statuses = await hooks.GetApproximateSiloStatuses();
- var activeCount = statuses.Count(s => s.Value == SiloStatus.Active);
- if (activeCount != expectedCount) break;
- remainingSilos--;
- }
-
- if (remainingSilos == 0)
- {
- totalWait.Stop();
- break;
- }
-
- WriteLog($"{remainingSilos} silos do not have a consistent cluster view, waiting until stabilization.");
- await Task.Delay(TimeSpan.FromMilliseconds(100));
-
- if (totalWait.Elapsed < TimeSpan.FromSeconds(60))
- {
- WriteLog($"Warning! {remainingSilos} silos do not have a consistent cluster view after {totalWait.ElapsedMilliseconds}ms, continuing without stabilization.");
- break;
- }
- }
- }
-
- ///
- /// Get the list of current active silos.
- ///
- /// List of current silos.
- public IEnumerable GetActiveSilos()
- {
- var additional = new List();
- lock (_silos)
- {
- additional.AddRange(_silos);
- }
-
- WriteLog("GetActiveSilos: {0} Silos={1}",
- additional.Count, Runtime.Utils.EnumerableToString(additional));
-
- if (additional.Count > 0)
- foreach (var s in additional)
- if (s?.IsActive == true)
- yield return s;
- }
-
- ///
- /// Find the silo handle for the specified silo address.
- ///
- /// Silo address to be found.
- /// SiloHandle of the appropriate silo, or null if not found.
- public InProcessSiloHandle GetSiloForAddress(SiloAddress siloAddress)
- {
- var activeSilos = GetActiveSilos().ToList();
- var ret = activeSilos.Find(s => s.SiloAddress.Equals(siloAddress));
- return ret;
- }
-
- ///
- /// Wait for the silo liveness sub-system to detect and act on any recent cluster membership changes.
- ///
- /// Whether recent membership changes we done by graceful Stop.
- public async Task WaitForLivenessToStabilizeAsync(bool didKill = false)
- {
- var clusterMembershipOptions = Client.ServiceProvider.GetRequiredService>().Value;
- TimeSpan stabilizationTime = GetLivenessStabilizationTime(clusterMembershipOptions, didKill);
- WriteLog(Environment.NewLine + Environment.NewLine + "WaitForLivenessToStabilize is about to sleep for {0}", stabilizationTime);
- await Task.Delay(stabilizationTime);
- WriteLog("WaitForLivenessToStabilize is done sleeping");
- }
-
- ///
- /// Get the timeout value to use to wait for the silo liveness sub-system to detect and act on any recent cluster membership changes.
- ///
- ///
- public static TimeSpan GetLivenessStabilizationTime(ClusterMembershipOptions clusterMembershipOptions, bool didKill = false)
- {
- TimeSpan stabilizationTime = TimeSpan.Zero;
- if (didKill)
- {
- // in case of hard kill (kill and not Stop), we should give silos time to detect failures first.
- stabilizationTime = TestingUtils.Multiply(clusterMembershipOptions.ProbeTimeout, clusterMembershipOptions.NumMissedProbesLimit);
- }
- if (clusterMembershipOptions.UseLivenessGossip)
- {
- stabilizationTime += TimeSpan.FromSeconds(5);
- }
- else
- {
- stabilizationTime += TestingUtils.Multiply(clusterMembershipOptions.TableRefreshTimeout, 2);
- }
- return stabilizationTime;
- }
-
- ///
- /// Start an additional silo, so that it joins the existing cluster.
- ///
- /// SiloHandle for the newly started silo.
- public InProcessSiloHandle StartAdditionalSilo(bool startAdditionalSiloOnNewPort = false)
- {
- return StartAdditionalSiloAsync(startAdditionalSiloOnNewPort).GetAwaiter().GetResult();
- }
-
- ///
- /// Start an additional silo, so that it joins the existing cluster.
- ///
- /// SiloHandle for the newly started silo.
- public async Task StartAdditionalSiloAsync(bool startAdditionalSiloOnNewPort = false)
- {
- return (await StartSilosAsync(1, startAdditionalSiloOnNewPort)).Single();
- }
-
- ///
- /// Start a number of additional silo, so that they join the existing cluster.
- ///
- /// Number of silos to start.
- ///
- /// List of SiloHandles for the newly started silos.
- public async Task> StartSilosAsync(int silosToStart, bool startAdditionalSiloOnNewPort = false)
- {
- var instances = new List();
- if (silosToStart > 0)
- {
- var siloStartTasks = Enumerable.Range(_startedInstances, silosToStart)
- .Select(instanceNumber => Task.Run(() => StartSiloAsync((short)instanceNumber, Options, startSiloOnNewPort: startAdditionalSiloOnNewPort))).ToArray();
-
- try
- {
- await Task.WhenAll(siloStartTasks);
- }
- catch (Exception)
- {
- lock (_silos)
- {
- _silos.AddRange(siloStartTasks.Where(t => t.Exception == null).Select(t => t.Result));
- }
-
- throw;
- }
-
- instances.AddRange(siloStartTasks.Select(t => t.Result));
- lock (_silos)
- {
- _silos.AddRange(instances);
- }
- }
-
- return instances;
- }
-
- ///
- /// Stop all silos.
- ///
- public async Task StopSilosAsync()
- {
- foreach (var instance in _silos.ToList())
- {
- await StopSiloAsync(instance);
- }
- }
-
- ///
- /// Stop cluster client as an asynchronous operation.
- ///
- /// A representing the asynchronous operation.
- public async Task StopClusterClientAsync()
- {
- var client = ClientHost;
- try
- {
- if (client is not null)
- {
- await client.StopAsync().ConfigureAwait(false);
- }
- }
- catch (Exception exc)
- {
- WriteLog("Exception stopping client: {0}", exc);
- }
- finally
- {
- await DisposeAsync(client).ConfigureAwait(false);
- ClientHost = null;
- }
- }
-
- ///
- /// Stop all current silos.
- ///
- public void StopAllSilos()
- {
- StopAllSilosAsync().GetAwaiter().GetResult();
- }
-
- ///
- /// Stop all current silos.
- ///
- public async Task StopAllSilosAsync()
- {
- await StopClusterClientAsync();
- await StopSilosAsync();
- AppDomain.CurrentDomain.UnhandledException -= ReportUnobservedException;
- }
-
- ///
- /// Do a semi-graceful Stop of the specified silo.
- ///
- /// Silo to be stopped.
- public async Task StopSiloAsync(InProcessSiloHandle instance)
- {
- if (instance != null)
- {
- await StopSiloAsync(instance, true);
- lock (_silos)
- {
- _silos.Remove(instance);
- }
- }
- }
-
- ///
- /// Do an immediate Kill of the specified silo.
- ///
- /// Silo to be killed.
- public async Task KillSiloAsync(InProcessSiloHandle instance)
- {
- if (instance != null)
- {
- // do NOT stop, just kill directly, to simulate crash.
- await StopSiloAsync(instance, false);
- lock (_silos)
- {
- _silos.Remove(instance);
- }
- }
- }
-
- ///
- /// Performs a hard kill on client. Client will not cleanup resources.
- ///
- public async Task KillClientAsync()
- {
- var client = ClientHost;
- if (client != null)
- {
- var cancelled = new CancellationTokenSource();
- cancelled.Cancel();
- try
- {
- await client.StopAsync(cancelled.Token).ConfigureAwait(false);
- }
- finally
- {
- await DisposeAsync(client);
- ClientHost = null;
- }
- }
- }
-
- ///
- /// Do a Stop or Kill of the specified silo, followed by a restart.
- ///
- /// Silo to be restarted.
- public async Task RestartSiloAsync(InProcessSiloHandle instance)
- {
- if (instance != null)
- {
- var instanceNumber = instance.InstanceNumber;
- await StopSiloAsync(instance);
- var newInstance = await StartSiloAsync(instanceNumber, Options);
- lock (_silos)
- {
- _silos.Add(newInstance);
- }
-
- return newInstance;
- }
-
- return null;
- }
-
- ///
- /// Restart a previously stopped.
- ///
- /// Silo to be restarted.
- public async Task RestartStoppedSecondarySiloAsync(string siloName)
- {
- if (siloName == null) throw new ArgumentNullException(nameof(siloName));
- var siloHandle = Silos.Single(s => s.Name.Equals(siloName, StringComparison.Ordinal));
- var newInstance = await StartSiloAsync(Silos.IndexOf(siloHandle), Options);
- lock (_silos)
- {
- _silos.Add(newInstance);
- }
- return newInstance;
- }
-
- ///
- /// Initialize the grain client. This should be already done by
- ///
- public async Task InitializeClientAsync()
- {
- WriteLog("Initializing Cluster Client");
-
- if (ClientHost is not null)
- {
- await StopClusterClientAsync();
- }
-
- var hostBuilder = Host.CreateApplicationBuilder(new HostApplicationBuilderSettings
- {
- EnvironmentName = Environments.Development,
- ApplicationName = "TestClusterClient",
- DisableDefaults = true,
- });
-
- hostBuilder.UseOrleansClient(clientBuilder =>
- {
- if (Options.UseTestClusterMembership)
- {
- clientBuilder.Services.AddSingleton(_membershipTable);
- }
-
- clientBuilder.UseInMemoryConnectionTransport(_transportHub);
- });
-
- TryConfigureFileLogging(Options, hostBuilder.Services, "TestClusterClient");
-
- foreach (var hostDelegate in Options.ClientHostConfigurationDelegates)
- {
- hostDelegate(hostBuilder);
- }
-
- ClientHost = hostBuilder.Build();
- await ClientHost.StartAsync();
- }
-
- private async Task InitializeAsync()
- {
- var silosToStart = Options.InitialSilosCount;
-
- if (silosToStart > 0)
- {
- await StartSilosAsync(silosToStart);
- }
-
- WriteLog("Done initializing cluster");
-
- if (Options.InitializeClientOnDeploy)
- {
- await InitializeClientAsync();
- }
- }
-
- public async Task CreateSiloAsync(InProcessTestSiloSpecificOptions siloOptions)
- {
- var host = await Task.Run(async () =>
- {
- var siloName = siloOptions.SiloName;
-
- var appBuilder = Host.CreateApplicationBuilder(new HostApplicationBuilderSettings
- {
- ApplicationName = siloName,
- EnvironmentName = Environments.Development,
- DisableDefaults = true
- });
-
- var services = appBuilder.Services;
- TryConfigureFileLogging(Options, services, siloName);
-
- if (Debugger.IsAttached)
- {
- // Test is running inside debugger - Make timeout ~= infinite
- services.Configure(op => op.ResponseTimeout = TimeSpan.FromMilliseconds(1000000));
- }
-
- appBuilder.UseOrleans(siloBuilder =>
- {
- siloBuilder.Configure(o =>
- {
- o.SiloName = siloOptions.SiloName;
- });
-
- siloBuilder.Configure(o =>
- {
- o.AdvertisedIPAddress = IPAddress.Loopback;
- o.SiloPort = siloOptions.SiloPort;
- o.GatewayPort = siloOptions.GatewayPort;
- });
-
- siloBuilder.Services
- .Configure(options => options.ShutdownTimeout = TimeSpan.FromSeconds(30));
-
- if (Options.UseTestClusterMembership)
- {
- services.AddSingleton(_membershipTable);
- siloBuilder.AddGrainDirectory(GrainDirectoryAttribute.DEFAULT_GRAIN_DIRECTORY, (_, _) => _grainDirectory);
- }
-
- siloBuilder.UseInMemoryConnectionTransport(_transportHub);
-
- services.AddSingleton();
- services.AddSingleton();
- if (!Options.UseRealEnvironmentStatistics)
- {
- services.AddFromExisting();
- }
- });
-
- foreach (var hostDelegate in Options.SiloHostConfigurationDelegates)
- {
- hostDelegate(siloOptions, appBuilder);
- }
-
- var host = appBuilder.Build();
- InitializeTestHooksSystemTarget(host);
- await host.StartAsync();
- return host;
- });
-
- return new InProcessSiloHandle
- {
- Name = siloOptions.SiloName,
- SiloHost = host,
- SiloAddress = host.Services.GetRequiredService().SiloAddress,
- GatewayAddress = host.Services.GetRequiredService().GatewayAddress,
- };
- }
-
- ///
- /// Start a new silo in the target cluster
- ///
- /// The InProcessTestCluster in which the silo should be deployed
- /// The instance number to deploy
- /// The options to use.
- /// Configuration overrides.
- /// Whether we start this silo on a new port, instead of the default one
- /// A handle to the silo deployed
- public static async Task StartSiloAsync(InProcessTestCluster cluster, int instanceNumber, InProcessTestClusterOptions clusterOptions, IReadOnlyList configurationOverrides = null, bool startSiloOnNewPort = false)
- {
- if (cluster == null) throw new ArgumentNullException(nameof(cluster));
- return await cluster.StartSiloAsync(instanceNumber, clusterOptions, configurationOverrides, startSiloOnNewPort);
- }
-
- ///
- /// Starts a new silo.
- ///
- /// The instance number to deploy
- /// The options to use.
- /// Configuration overrides.
- /// Whether we start this silo on a new port, instead of the default one
- /// A handle to the deployed silo.
- public async Task StartSiloAsync(int instanceNumber, InProcessTestClusterOptions clusterOptions, IReadOnlyList configurationOverrides = null, bool startSiloOnNewPort = false)
- {
- var siloOptions = InProcessTestSiloSpecificOptions.Create(this, clusterOptions, instanceNumber, startSiloOnNewPort);
- var handle = await CreateSiloAsync(siloOptions);
- handle.InstanceNumber = (short)instanceNumber;
- Interlocked.Increment(ref _startedInstances);
- return handle;
- }
-
- private async Task StopSiloAsync(InProcessSiloHandle instance, bool stopGracefully)
- {
- try
- {
- await instance.StopSiloAsync(stopGracefully).ConfigureAwait(false);
- }
- finally
- {
- await DisposeAsync(instance).ConfigureAwait(false);
-
- Interlocked.Decrement(ref _startedInstances);
- }
- }
-
- ///
- /// Gets the log.
- ///
- /// The log contents.
- public string GetLog()
- {
- return _log.ToString();
- }
-
- private void ReportUnobservedException(object sender, UnhandledExceptionEventArgs eventArgs)
- {
- Exception exception = (Exception)eventArgs.ExceptionObject;
- WriteLog("Unobserved exception: {0}", exception);
- }
-
- private void WriteLog(string format, params object[] args)
- {
- _log.AppendFormat(format + Environment.NewLine, args);
- }
-
- private void FlushLogToConsole()
- {
- Console.WriteLine(GetLog());
- }
-
- ///
- public async ValueTask DisposeAsync()
- {
- if (_disposed)
- {
- return;
- }
-
- await Task.Run(async () =>
- {
- foreach (var handle in Silos)
- {
- await DisposeAsync(handle).ConfigureAwait(false);
- }
-
- await DisposeAsync(ClientHost).ConfigureAwait(false);
- ClientHost = null;
-
- PortAllocator?.Dispose();
- });
-
- _disposed = true;
- }
-
- ///
- public void Dispose()
- {
- if (_disposed)
- {
- return;
- }
-
- foreach (var handle in Silos)
- {
- handle.Dispose();
- }
-
- ClientHost?.Dispose();
- PortAllocator?.Dispose();
-
- _disposed = true;
- }
-
- private static async Task DisposeAsync(IDisposable value)
- {
- if (value is IAsyncDisposable asyncDisposable)
- {
- await asyncDisposable.DisposeAsync().ConfigureAwait(false);
- }
- else if (value is IDisposable disposable)
- {
- disposable.Dispose();
- }
-
- }
- private static void TryConfigureFileLogging(InProcessTestClusterOptions options, IServiceCollection services, string name)
- {
- if (options.ConfigureFileLogging)
- {
- var fileName = TestingUtils.CreateTraceFileName(name, options.ClusterId);
- services.AddLogging(loggingBuilder => loggingBuilder.AddFile(fileName));
- }
- }
-
- private static void InitializeTestHooksSystemTarget(IHost host)
- {
- var testHook = host.Services.GetRequiredService();
- var catalog = host.Services.GetRequiredService();
- catalog.RegisterSystemTarget(testHook);
- }
-}
diff --git a/src/Orleans.TestingHost/InProcTestClusterBuilder.cs b/src/Orleans.TestingHost/InProcTestClusterBuilder.cs
deleted file mode 100644
index 7156fd2d87..0000000000
--- a/src/Orleans.TestingHost/InProcTestClusterBuilder.cs
+++ /dev/null
@@ -1,153 +0,0 @@
-using System;
-using System.Globalization;
-using System.Linq;
-using Microsoft.Extensions.DependencyInjection;
-using Microsoft.Extensions.Hosting;
-using Orleans.Hosting;
-using Orleans.Runtime;
-
-namespace Orleans.TestingHost;
-
-/// Configuration builder for starting a .
-public sealed class InProcessTestClusterBuilder
-{
- ///
- /// Initializes a new instance of using the default options.
- ///
- public InProcessTestClusterBuilder()
- : this(2)
- {
- }
-
- ///
- /// Initializes a new instance of overriding the initial silos count.
- ///
- /// The number of initial silos to deploy.
- public InProcessTestClusterBuilder(short initialSilosCount)
- {
- Options = new InProcessTestClusterOptions
- {
- InitialSilosCount = initialSilosCount,
- ClusterId = CreateClusterId(),
- ServiceId = Guid.NewGuid().ToString("N"),
- UseTestClusterMembership = true,
- InitializeClientOnDeploy = true,
- ConfigureFileLogging = true,
- AssumeHomogenousSilosForTesting = true
- };
- }
-
- ///
- /// Gets the options.
- ///
- /// The options.
- public InProcessTestClusterOptions Options { get; }
-
- ///
- /// The port allocator.
- ///
- public ITestClusterPortAllocator PortAllocator { get; } = new TestClusterPortAllocator();
-
- ///
- /// Adds a delegate for configuring silo and client hosts.
- ///
- public InProcessTestClusterBuilder ConfigureHost(Action configureDelegate)
- {
- Options.SiloHostConfigurationDelegates.Add((_, hostBuilder) => configureDelegate(hostBuilder));
- Options.ClientHostConfigurationDelegates.Add(configureDelegate);
- return this;
- }
-
- ///
- /// Adds a delegate to configure silos.
- ///
- /// The builder.
- public InProcessTestClusterBuilder ConfigureSilo(Action configureSiloDelegate)
- {
- Options.SiloHostConfigurationDelegates.Add((options, hostBuilder) => hostBuilder.UseOrleans(siloBuilder => configureSiloDelegate(options, siloBuilder)));
- return this;
- }
-
- ///
- /// Adds a delegate to configure silo hosts.
- ///
- /// The builder.
- public InProcessTestClusterBuilder ConfigureSiloHost(Action configureSiloHostDelegate)
- {
- Options.SiloHostConfigurationDelegates.Add(configureSiloHostDelegate);
- return this;
- }
-
- ///
- /// Adds a delegate to configure clients.
- ///
- /// The builder.
- public InProcessTestClusterBuilder ConfigureClient(Action configureClientDelegate)
- {
- Options.ClientHostConfigurationDelegates.Add(hostBuilder => hostBuilder.UseOrleansClient(clientBuilder => configureClientDelegate(clientBuilder)));
- return this;
- }
-
- ///
- /// Adds a delegate to configure clients hosts.
- ///
- /// The builder.
- public InProcessTestClusterBuilder ConfigureClientHost(Action configureHostDelegate)
- {
- Options.ClientHostConfigurationDelegates.Add(hostBuilder => configureHostDelegate(hostBuilder));
- return this;
- }
-
- ///
- /// Builds this instance.
- ///
- /// InProcessTestCluster.
- public InProcessTestCluster Build()
- {
- var portAllocator = PortAllocator;
-
- ConfigureDefaultPorts();
-
- var testCluster = new InProcessTestCluster(Options, portAllocator);
- return testCluster;
- }
-
- ///
- /// Creates a cluster identifier.
- ///
- /// A new cluster identifier.
- public static string CreateClusterId()
- {
- string prefix = "testcluster-";
- int randomSuffix = Random.Shared.Next(1000);
- DateTime now = DateTime.UtcNow;
- string DateTimeFormat = @"yyyy-MM-dd\tHH-mm-ss";
- return $"{prefix}{now.ToString(DateTimeFormat, CultureInfo.InvariantCulture)}-{randomSuffix}";
- }
-
- private void ConfigureDefaultPorts()
- {
- // Set base ports if none are currently set.
- (int baseSiloPort, int baseGatewayPort) = PortAllocator.AllocateConsecutivePortPairs(Options.InitialSilosCount + 3);
- if (Options.BaseSiloPort == 0) Options.BaseSiloPort = baseSiloPort;
- if (Options.BaseGatewayPort == 0) Options.BaseGatewayPort = baseGatewayPort;
- }
-
- internal class ConfigureStaticClusterDeploymentOptions : IHostConfigurator
- {
- public void Configure(IHostBuilder hostBuilder)
- {
- hostBuilder.ConfigureServices((context, services) =>
- {
- var initialSilos = int.Parse(context.Configuration[nameof(InProcessTestClusterOptions.InitialSilosCount)]);
- var siloNames = Enumerable.Range(0, initialSilos).Select(GetSiloName).ToList();
- services.Configure(options => options.SiloNames = siloNames);
- });
- }
-
- private static string GetSiloName(int instanceNumber)
- {
- return instanceNumber == 0 ? Silo.PrimarySiloName : $"Secondary_{instanceNumber}";
- }
- }
-}
\ No newline at end of file
diff --git a/src/Orleans.TestingHost/InProcTestClusterOptions.cs b/src/Orleans.TestingHost/InProcTestClusterOptions.cs
deleted file mode 100644
index 5f0ae5e509..0000000000
--- a/src/Orleans.TestingHost/InProcTestClusterOptions.cs
+++ /dev/null
@@ -1,92 +0,0 @@
-using System;
-using System.Collections.Generic;
-using Microsoft.Extensions.Hosting;
-using Orleans.Configuration;
-using Orleans.Hosting;
-
-namespace Orleans.TestingHost;
-
-///
-/// Configuration options for test clusters.
-///
-public sealed class InProcessTestClusterOptions
-{
- ///
- /// Gets or sets the cluster identifier.
- ///
- ///
- /// The cluster identifier.
- public string ClusterId { get; set; }
-
- ///
- /// Gets or sets the service identifier.
- ///
- ///
- /// The service identifier.
- public string ServiceId { get; set; }
-
- ///
- /// Gets or sets the base silo port, which is the port for the first silo. Other silos will use subsequent ports.
- ///
- /// The base silo port.
- internal int BaseSiloPort { get; set; }
-
- ///
- /// Gets or sets the base gateway port, which is the gateway port for the first silo. Other silos will use subsequent ports.
- ///
- /// The base gateway port.
- internal int BaseGatewayPort { get; set; }
-
- ///
- /// Gets or sets a value indicating whether to use test cluster membership.
- ///
- /// if test cluster membership should be used; otherwise, .
- internal bool UseTestClusterMembership { get; set; }
-
- ///
- /// Gets or sets a value indicating whether to use the real environment statistics.
- ///
- public bool UseRealEnvironmentStatistics { get; set; }
-
- ///
- /// Gets or sets a value indicating whether to initialize the client immediately on deployment.
- ///
- /// if the client should be initialized immediately on deployment; otherwise, .
- public bool InitializeClientOnDeploy { get; set; }
-
- ///
- /// Gets or sets the initial silos count.
- ///
- /// The initial silos count.
- public short InitialSilosCount { get; set; }
-
- ///
- /// Gets or sets a value indicating whether to configure file logging.
- ///
- /// if file logging should be configured; otherwise, .
- public bool ConfigureFileLogging { get; set; } = true;
-
- ///
- /// Gets or sets a value indicating whether to assume homogeneous silos for testing purposes.
- ///
- /// if the cluster should assume homogeneous silos; otherwise, .
- public bool AssumeHomogenousSilosForTesting { get; set; }
-
- ///
- /// Gets or sets a value indicating whether each silo should host a gateway.
- ///
- /// if each silo should host a gateway; otherwise, .
- public bool GatewayPerSilo { get; set; } = true;
-
- ///
- /// Gets the silo host configuration delegates.
- ///
- /// The silo host configuration delegates.
- public List> SiloHostConfigurationDelegates { get; } = [];
-
- ///
- /// Gets the client host configuration delegates.
- ///
- /// The client host configuration delegates.
- public List> ClientHostConfigurationDelegates { get; } = [];
-}
diff --git a/src/Orleans.TestingHost/InProcTestSiloSpecificOptions.cs b/src/Orleans.TestingHost/InProcTestSiloSpecificOptions.cs
deleted file mode 100644
index c7904fc0bb..0000000000
--- a/src/Orleans.TestingHost/InProcTestSiloSpecificOptions.cs
+++ /dev/null
@@ -1,55 +0,0 @@
-namespace Orleans.TestingHost;
-
-///
-/// Configuration overrides for individual silos.
-///
-public sealed class InProcessTestSiloSpecificOptions
-{
- ///
- /// Gets or sets the silo port.
- ///
- /// The silo port.
- public int SiloPort { get; set; }
-
- ///
- /// Gets or sets the gateway port.
- ///
- /// The gateway port.
- public int GatewayPort { get; set; }
-
- ///
- /// Gets or sets the name of the silo.
- ///
- /// The name of the silo.
- public string SiloName { get; set; }
-
- ///
- /// Creates an instance of the class.
- ///
- /// The test cluster.
- /// The test cluster options.
- /// The instance number.
- /// if set to , assign a new port for the silo.
- /// The options.
- public static InProcessTestSiloSpecificOptions Create(InProcessTestCluster testCluster, InProcessTestClusterOptions testClusterOptions, int instanceNumber, bool assignNewPort = false)
- {
- var result = new InProcessTestSiloSpecificOptions
- {
- SiloName = $"Silo_{instanceNumber}",
- };
-
- if (assignNewPort)
- {
- var (siloPort, gatewayPort) = testCluster.PortAllocator.AllocateConsecutivePortPairs(1);
- result.SiloPort = siloPort;
- result.GatewayPort = (instanceNumber == 0 || testClusterOptions.GatewayPerSilo) ? gatewayPort : 0;
- }
- else
- {
- result.SiloPort = testClusterOptions.BaseSiloPort + instanceNumber;
- result.GatewayPort = (instanceNumber == 0 || testClusterOptions.GatewayPerSilo) ? testClusterOptions.BaseGatewayPort + instanceNumber : 0;
- }
-
- return result;
- }
-}
diff --git a/src/Orleans.TestingHost/InProcess/InProcessGrainDirectory.cs b/src/Orleans.TestingHost/InProcess/InProcessGrainDirectory.cs
deleted file mode 100644
index 26b7d48797..0000000000
--- a/src/Orleans.TestingHost/InProcess/InProcessGrainDirectory.cs
+++ /dev/null
@@ -1,82 +0,0 @@
-#nullable enable
-using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Threading.Tasks;
-using Orleans.GrainDirectory;
-using Orleans.Runtime;
-
-namespace Orleans.TestingHost.InProcess;
-internal sealed class InProcessGrainDirectory(Func getSiloStatus) : IGrainDirectory
-{
- private readonly ConcurrentDictionary _entries = [];
-
- public Task Lookup(GrainId grainId)
- {
- if (_entries.TryGetValue(grainId, out var result) && !IsSiloDead(result))
- {
- return Task.FromResult(result);
- }
-
- return Task.FromResult(null);
- }
-
- public Task Register(GrainAddress address, GrainAddress? previousAddress)
- {
- ArgumentNullException.ThrowIfNull(address);
-
- var result = _entries.AddOrUpdate(
- address.GrainId,
- static (grainId, state) => state.Address,
- static (grainId, existing, state) =>
- {
- if (existing is null || state.PreviousAddress is { } prev && existing.Matches(prev) || state.Self.IsSiloDead(existing))
- {
- return state.Address;
- }
-
- return existing;
- },
- (Self: this, Address: address, PreviousAddress: previousAddress));
-
- if (result is null || IsSiloDead(result))
- {
- return Task.FromResult(null);
- }
-
- return Task.FromResult(result);
- }
-
- public Task Register(GrainAddress address) => Register(address, null);
-
- public Task Unregister(GrainAddress address)
- {
- if (!((IDictionary)_entries).Remove(KeyValuePair.Create(address.GrainId, address)))
- {
- if (_entries.TryGetValue(address.GrainId, out var existing) && (existing.Matches(address) || IsSiloDead(existing)))
- {
- ((IDictionary)_entries).Remove(KeyValuePair.Create(existing.GrainId, existing));
- }
- }
-
- return Task.CompletedTask;
- }
-
- public Task UnregisterSilos(List siloAddresses)
- {
- foreach (var entry in _entries)
- {
- foreach (var silo in siloAddresses)
- {
- if (silo.Equals(entry.Value.SiloAddress))
- {
- ((IDictionary)_entries).Remove(entry);
- }
- }
- }
-
- return Task.CompletedTask;
- }
-
- private bool IsSiloDead(GrainAddress existing) => existing.SiloAddress is not { } address || getSiloStatus(address) is SiloStatus.Dead or SiloStatus.None;
-}
diff --git a/src/Orleans.TestingHost/InProcess/InProcessMembershipTable.cs b/src/Orleans.TestingHost/InProcess/InProcessMembershipTable.cs
deleted file mode 100644
index bfefa09531..0000000000
--- a/src/Orleans.TestingHost/InProcess/InProcessMembershipTable.cs
+++ /dev/null
@@ -1,193 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using Orleans.Runtime;
-using System.Globalization;
-using System.Threading.Tasks;
-using Orleans.Messaging;
-
-namespace Orleans.TestingHost.InProcess;
-
-///
-/// An in-memory implementation of for testing purposes.
-///
-internal sealed class InProcessMembershipTable(string clusterId) : IMembershipTableSystemTarget, IGatewayListProvider
-{
- private readonly Table _table = new();
- private readonly string _clusterId = clusterId;
-
- public TimeSpan MaxStaleness => TimeSpan.Zero;
- public bool IsUpdatable => true;
-
- public Task InitializeMembershipTable(bool tryInitTableVersion) => Task.CompletedTask;
-
- public Task DeleteMembershipTableEntries(string clusterId)
- {
- if (string.Equals(_clusterId, clusterId, StringComparison.Ordinal))
- {
- _table.Clear();
- }
-
- return Task.CompletedTask;
- }
-
- public Task ReadRow(SiloAddress key) => Task.FromResult(_table.Read(key));
-
- public Task ReadAll() => Task.FromResult(_table.ReadAll());
-
- public Task InsertRow(MembershipEntry entry, TableVersion tableVersion) => Task.FromResult(_table.Insert(entry, tableVersion));
-
- public Task UpdateRow(MembershipEntry entry, string etag, TableVersion tableVersion) => Task.FromResult(_table.Update(entry, etag, tableVersion));
-
- public Task UpdateIAmAlive(MembershipEntry entry)
- {
- _table.UpdateIAmAlive(entry);
- return Task.CompletedTask;
- }
-
- public Task CleanupDefunctSiloEntries(DateTimeOffset beforeDate)
- {
- _table.CleanupDefunctSiloEntries(beforeDate);
- return Task.CompletedTask;
- }
-
- public Task InitializeGatewayListProvider() => Task.CompletedTask;
-
- public Task> GetGateways()
- {
- var table = _table.ReadAll();
- var result = table.Members
- .Where(x => x.Item1.Status == SiloStatus.Active && x.Item1.ProxyPort != 0)
- .Select(x =>
- {
- var entry = x.Item1;
- return SiloAddress.New(entry.SiloAddress.Endpoint.Address, entry.ProxyPort, entry.SiloAddress.Generation).ToGatewayUri();
- }).ToList();
- return Task.FromResult>(result);
- }
-
- public SiloStatus GetSiloStatus(SiloAddress address) => _table.GetSiloStatus(address);
-
- private sealed class Table
- {
- private readonly object _lock = new();
- private readonly Dictionary _table = [];
- private TableVersion _tableVersion;
- private long _lastETagCounter;
-
- public Table()
- {
- _tableVersion = new TableVersion(0, NewETag());
- }
- public SiloStatus GetSiloStatus(SiloAddress key)
- {
- lock (_lock)
- {
- return _table.TryGetValue(key, out var data) ? data.Entry.Status : SiloStatus.None;
- }
- }
-
- public MembershipTableData Read(SiloAddress key)
- {
- lock (_lock)
- {
- return _table.TryGetValue(key, out var data) ?
- new MembershipTableData(Tuple.Create(data.Entry.Copy(), data.ETag), _tableVersion)
- : new MembershipTableData(_tableVersion);
- }
- }
-
- public MembershipTableData ReadAll()
- {
- lock (_lock)
- {
- return new MembershipTableData(_table.Values.Select(data => Tuple.Create(data.Entry.Copy(), data.ETag)).ToList(), _tableVersion);
- }
- }
-
- public TableVersion ReadTableVersion() => _tableVersion;
-
- public bool Insert(MembershipEntry entry, TableVersion version)
- {
- lock (_lock)
- {
- if (_table.TryGetValue(entry.SiloAddress, out var data))
- {
- return false;
- }
-
- if (!_tableVersion.VersionEtag.Equals(version.VersionEtag))
- {
- return false;
- }
-
- _table[entry.SiloAddress] = (entry.Copy(), _lastETagCounter++.ToString(CultureInfo.InvariantCulture));
- _tableVersion = new TableVersion(version.Version, NewETag());
- return true;
- }
- }
-
- public bool Update(MembershipEntry entry, string etag, TableVersion version)
- {
- lock (_lock)
- {
- if (!_table.TryGetValue(entry.SiloAddress, out var data))
- {
- return false;
- }
-
- if (!data.ETag.Equals(etag) || !_tableVersion.VersionEtag.Equals(version.VersionEtag))
- {
- return false;
- }
-
- _table[entry.SiloAddress] = (entry.Copy(), _lastETagCounter++.ToString(CultureInfo.InvariantCulture));
- _tableVersion = new TableVersion(version.Version, NewETag());
- return true;
- }
- }
-
- public void UpdateIAmAlive(MembershipEntry entry)
- {
- lock (_lock)
- {
- if (!_table.TryGetValue(entry.SiloAddress, out var data))
- {
- return;
- }
-
- data.Entry.IAmAliveTime = entry.IAmAliveTime;
- _table[entry.SiloAddress] = (data.Entry, NewETag());
- }
- }
-
- public void CleanupDefunctSiloEntries(DateTimeOffset beforeDate)
- {
- lock (_lock)
- {
- var entries = _table.Values.ToList();
- foreach (var (entry, _) in entries)
- {
- if (entry.Status == SiloStatus.Dead
- && new DateTime(Math.Max(entry.IAmAliveTime.Ticks, entry.StartTime.Ticks), DateTimeKind.Utc) < beforeDate)
- {
- _table.Remove(entry.SiloAddress, out _);
- continue;
- }
- }
- }
- }
-
- internal void Clear()
- {
- lock (_lock)
- {
- _table.Clear();
- }
- }
-
- public override string ToString() => $"Table = {ReadAll()}, ETagCounter={_lastETagCounter}";
-
- private string NewETag() => _lastETagCounter++.ToString(CultureInfo.InvariantCulture);
- }
-}
diff --git a/src/Orleans.TestingHost/InProcessSiloHandle.cs b/src/Orleans.TestingHost/InProcessSiloHandle.cs
index 0191f24034..7712db2cbd 100644
--- a/src/Orleans.TestingHost/InProcessSiloHandle.cs
+++ b/src/Orleans.TestingHost/InProcessSiloHandle.cs
@@ -16,12 +16,7 @@ public class InProcessSiloHandle : SiloHandle
private bool isActive = true;
/// Gets a reference to the silo host.
- public IHost SiloHost { get; init; }
-
- ///
- /// Gets the silo's service provider.
- ///
- public IServiceProvider ServiceProvider => SiloHost.Services;
+ public IHost SiloHost { get; private set; }
///
public override bool IsActive => isActive;
@@ -33,7 +28,7 @@ public class InProcessSiloHandle : SiloHandle
/// The configuration.
/// An optional delegate which is invoked just prior to building the host builder.
/// The silo handle.
- public static async Task CreateAsync(
+ public static async Task CreateAsync(
string siloName,
IConfiguration configuration,
Action postConfigureHostBuilder = null)
diff --git a/test/TestInfrastructure/TestExtensions/BaseInProcessTestClusterFixture.cs b/test/TestInfrastructure/TestExtensions/BaseInProcessTestClusterFixture.cs
deleted file mode 100644
index 78c9b623b4..0000000000
--- a/test/TestInfrastructure/TestExtensions/BaseInProcessTestClusterFixture.cs
+++ /dev/null
@@ -1,81 +0,0 @@
-using System.Runtime.ExceptionServices;
-using Microsoft.Extensions.DependencyInjection;
-using Microsoft.Extensions.Logging;
-using Microsoft.Extensions.Options;
-using Orleans.Configuration;
-using Orleans.TestingHost;
-
-namespace TestExtensions;
-
-public abstract class BaseInProcessTestClusterFixture : Xunit.IAsyncLifetime
-{
- private readonly ExceptionDispatchInfo preconditionsException;
-
- static BaseInProcessTestClusterFixture()
- {
- TestDefaultConfiguration.InitializeDefaults();
- }
-
- protected BaseInProcessTestClusterFixture()
- {
- try
- {
- CheckPreconditionsOrThrow();
- }
- catch (Exception ex)
- {
- preconditionsException = ExceptionDispatchInfo.Capture(ex);
- return;
- }
- }
-
- public void EnsurePreconditionsMet()
- {
- preconditionsException?.Throw();
- }
-
- protected virtual void CheckPreconditionsOrThrow() { }
-
- protected virtual void ConfigureTestCluster(InProcessTestClusterBuilder builder)
- {
- }
-
- public InProcessTestCluster HostedCluster { get; private set; }
-
- public IGrainFactory GrainFactory => Client;
-
- public IClusterClient Client => HostedCluster?.Client;
-
- public ILogger Logger { get; private set; }
-
- public string GetClientServiceId() => Client.ServiceProvider.GetRequiredService>().Value.ServiceId;
-
- public virtual async Task InitializeAsync()
- {
- EnsurePreconditionsMet();
- var builder = new InProcessTestClusterBuilder();
- builder.ConfigureHost(hostBuilder => TestDefaultConfiguration.ConfigureHostConfiguration(hostBuilder.Configuration));
- ConfigureTestCluster(builder);
-
- var testCluster = builder.Build();
- await testCluster.DeployAsync().ConfigureAwait(false);
-
- HostedCluster = testCluster;
- Logger = Client.ServiceProvider.GetRequiredService().CreateLogger("Application");
- }
-
- public virtual async Task DisposeAsync()
- {
- var cluster = HostedCluster;
- if (cluster is null) return;
-
- try
- {
- await cluster.StopAllSilosAsync().ConfigureAwait(false);
- }
- finally
- {
- await cluster.DisposeAsync().ConfigureAwait(false);
- }
- }
-}
\ No newline at end of file
diff --git a/test/Tester/ClientConnectionTests/ClientConnectionEventTests.cs b/test/Tester/ClientConnectionTests/ClientConnectionEventTests.cs
index 4a4c4440eb..92a7dc05bf 100644
--- a/test/Tester/ClientConnectionTests/ClientConnectionEventTests.cs
+++ b/test/Tester/ClientConnectionTests/ClientConnectionEventTests.cs
@@ -1,48 +1,63 @@
+using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Orleans.Configuration;
using Orleans.TestingHost;
+using TestExtensions;
using UnitTests.GrainInterfaces;
using Xunit;
-namespace Tester;
-
-public class ClientConnectionEventTests
+namespace Tester
{
- [Fact, TestCategory("SlowBVT")]
- public async Task EventSendWhenDisconnectedFromCluster()
+ public class ClientConnectionEventTests : TestClusterPerTest
{
- var semaphore = new SemaphoreSlim(0, 1);
- var builder = new InProcessTestClusterBuilder();
- builder.ConfigureClient(c =>
+ private OutsideRuntimeClient runtimeClient;
+
+ protected override void ConfigureTestCluster(TestClusterBuilder builder)
{
- c.Configure(o => o.GatewayListRefreshPeriod = TimeSpan.FromSeconds(0.5));
- c.AddClusterConnectionLostHandler((sender, args) => semaphore.Release());
- });
- await using var cluster = builder.Build();
- await cluster.DeployAsync();
-
- // Burst lot of call, to be sure that we are connected to all silos
- for (int i = 0; i < 100; i++)
+ builder.AddClientBuilderConfigurator();
+ }
+
+ public override async Task InitializeAsync()
{
- var grain = cluster.Client.GetGrain(i);
- await grain.SetLabel(i.ToString());
+ await base.InitializeAsync();
+ this.runtimeClient = this.HostedCluster.Client.ServiceProvider.GetRequiredService();
}
- await cluster.StopAllSilosAsync();
+ public class Configurator : IClientBuilderConfigurator
+ {
+ public void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
+ {
+ clientBuilder.Configure(options => options.GatewayListRefreshPeriod = TimeSpan.FromSeconds(1));
+ }
+ }
- Assert.True(await semaphore.WaitAsync(TimeSpan.FromSeconds(10)));
- }
+ [Fact, TestCategory("SlowBVT")]
+ public async Task EventSendWhenDisconnectedFromCluster()
+ {
+ var runtime = this.HostedCluster.ServiceProvider.GetRequiredService();
- [Fact, TestCategory("SlowBVT")]
- public async Task GatewayChangedEventSentOnDisconnectAndReconnect()
- {
- var regainedGatewaySemaphore = new SemaphoreSlim(0, 1);
- var lostGatewaySemaphore = new SemaphoreSlim(0, 1);
- var builder = new InProcessTestClusterBuilder();
- builder.ConfigureClient(c =>
+ var semaphore = new SemaphoreSlim(0, 1);
+ this.runtimeClient.ClusterConnectionLost += (sender, args) => semaphore.Release();
+
+ // Burst lot of call, to be sure that we are connected to all silos
+ for (int i = 0; i < 100; i++)
+ {
+ var grain = GrainFactory.GetGrain(i);
+ await grain.SetLabel(i.ToString());
+ }
+
+ await this.HostedCluster.StopAllSilosAsync();
+
+ Assert.True(await semaphore.WaitAsync(TimeSpan.FromSeconds(10)));
+ }
+
+ [Fact, TestCategory("SlowBVT")]
+ public async Task GatewayChangedEventSentOnDisconnectAndReconnect()
{
- c.Configure(o => o.GatewayListRefreshPeriod = TimeSpan.FromSeconds(0.5));
- c.AddGatewayCountChangedHandler((sender, args) =>
+ var regainedGatewaySemaphore = new SemaphoreSlim(0, 1);
+ var lostGatewaySemaphore = new SemaphoreSlim(0, 1);
+
+ this.runtimeClient.GatewayCountChanged += (sender, args) =>
{
if (args.NumberOfConnectedGateways == 1)
{
@@ -52,27 +67,25 @@ public async Task GatewayChangedEventSentOnDisconnectAndReconnect()
{
regainedGatewaySemaphore.Release();
}
- });
- });
- await using var cluster = builder.Build();
- await cluster.DeployAsync();
+ };
- var silo = cluster.Silos[0];
- await silo.StopSiloAsync(true);
+ var silo = this.HostedCluster.SecondarySilos[0];
+ await silo.StopSiloAsync(true);
- Assert.True(await lostGatewaySemaphore.WaitAsync(TimeSpan.FromSeconds(20)));
+ Assert.True(await lostGatewaySemaphore.WaitAsync(TimeSpan.FromSeconds(20)));
- await cluster.RestartStoppedSecondarySiloAsync(silo.Name);
+ await this.HostedCluster.RestartStoppedSecondarySiloAsync(silo.Name);
- // Clients need prodding to reconnect.
- var remainingAttempts = 90;
- bool reconnected;
- do
- {
- cluster.Client.GetGrain(Guid.NewGuid().GetHashCode()).SetLabel("test").Ignore();
- reconnected = await regainedGatewaySemaphore.WaitAsync(TimeSpan.FromSeconds(1));
- } while (!reconnected && --remainingAttempts > 0);
+ // Clients need prodding to reconnect.
+ var remainingAttempts = 90;
+ bool reconnected;
+ do
+ {
+ this.Client.GetGrain(Guid.NewGuid().GetHashCode()).SetLabel("test").Ignore();
+ reconnected = await regainedGatewaySemaphore.WaitAsync(TimeSpan.FromSeconds(1));
+ } while (!reconnected && --remainingAttempts > 0);
- Assert.True(reconnected, "Failed to reconnect to restarted gateway.");
+ Assert.True(reconnected, "Failed to reconnect to restarted gateway.");
+ }
}
}