diff --git a/src/Orleans.Core/Core/DefaultClientServices.cs b/src/Orleans.Core/Core/DefaultClientServices.cs index 1c05084dfd0..d0c995dd07e 100644 --- a/src/Orleans.Core/Core/DefaultClientServices.cs +++ b/src/Orleans.Core/Core/DefaultClientServices.cs @@ -111,7 +111,7 @@ public static void AddDefaultServices(IClientBuilder builder) services.AddSingleton(); // Networking - services.TryAddSingleton(); + services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); diff --git a/src/Orleans.Core/Networking/ClientOutboundConnection.cs b/src/Orleans.Core/Networking/ClientOutboundConnection.cs index c04b60365e0..d27555787ad 100644 --- a/src/Orleans.Core/Networking/ClientOutboundConnection.cs +++ b/src/Orleans.Core/Networking/ClientOutboundConnection.cs @@ -27,9 +27,8 @@ public ClientOutboundConnection( ConnectionOptions connectionOptions, ConnectionCommon connectionShared, ConnectionPreambleHelper connectionPreambleHelper, - ClusterOptions clusterOptions, - IMessageSink rebalancerGateway) - : base(rebalancerGateway, connection, middleware, connectionShared) + ClusterOptions clusterOptions) + : base(connection, middleware, connectionShared) { this.messageCenter = messageCenter; this.connectionManager = connectionManager; diff --git a/src/Orleans.Core/Networking/ClientOutboundConnectionFactory.cs b/src/Orleans.Core/Networking/ClientOutboundConnectionFactory.cs index d6ecf1501cf..038e75526a3 100644 --- a/src/Orleans.Core/Networking/ClientOutboundConnectionFactory.cs +++ b/src/Orleans.Core/Networking/ClientOutboundConnectionFactory.cs @@ -14,7 +14,6 @@ internal sealed class ClientOutboundConnectionFactory : ConnectionFactory private readonly ClientConnectionOptions clientConnectionOptions; private readonly ClusterOptions clusterOptions; private readonly ConnectionPreambleHelper connectionPreambleHelper; - private readonly IMessageSink rebalancerGateway; private readonly object initializationLock = new object(); private volatile bool isInitialized; private ClientMessageCenter messageCenter; @@ -24,12 +23,10 @@ public ClientOutboundConnectionFactory( IOptions connectionOptions, IOptions clientConnectionOptions, IOptions clusterOptions, - IMessageSink rebalancerGateway, ConnectionCommon connectionShared, ConnectionPreambleHelper connectionPreambleHelper) : base(connectionShared.ServiceProvider.GetRequiredKeyedService(ServicesKey), connectionShared.ServiceProvider, connectionOptions) { - this.rebalancerGateway = rebalancerGateway; this.connectionShared = connectionShared; this.clientConnectionOptions = clientConnectionOptions.Value; this.clusterOptions = clusterOptions.Value; @@ -49,8 +46,7 @@ protected override Connection CreateConnection(SiloAddress address, ConnectionCo this.ConnectionOptions, this.connectionShared, this.connectionPreambleHelper, - this.clusterOptions, - this.rebalancerGateway); + this.clusterOptions); } protected override void ConfigureConnectionBuilder(IConnectionBuilder connectionBuilder) diff --git a/src/Orleans.Core/Networking/Connection.cs b/src/Orleans.Core/Networking/Connection.cs index 9ad4a0e0a52..d28da7748e8 100644 --- a/src/Orleans.Core/Networking/Connection.cs +++ b/src/Orleans.Core/Networking/Connection.cs @@ -30,7 +30,6 @@ internal abstract class Connection private static readonly ObjectPool MessageHandlerPool = ObjectPool.Create(new MessageHandlerPoolPolicy()); private readonly ConnectionCommon shared; - private readonly IMessageSink _rebalancerGateway; private readonly ConnectionDelegate middleware; private readonly Channel outgoingMessages; private readonly ChannelWriter outgoingMessageWriter; @@ -43,12 +42,10 @@ internal abstract class Connection private Task _closeTask; protected Connection( - IMessageSink rebalancerGateway, ConnectionContext connection, ConnectionDelegate middleware, ConnectionCommon shared) { - _rebalancerGateway = rebalancerGateway ?? throw new ArgumentNullException(nameof(rebalancerGateway)); this.Context = connection ?? throw new ArgumentNullException(nameof(connection)); this.middleware = middleware ?? throw new ArgumentNullException(nameof(middleware)); this.shared = shared; @@ -356,6 +353,7 @@ private async Task ProcessOutgoing() Exception error = default; var serializer = this.shared.ServiceProvider.GetRequiredService(); + var messageStatisticsSink = this.shared.MessageStatisticsSink; try { var output = this._transport.Output; @@ -377,7 +375,7 @@ private async Task ProcessOutgoing() inflight.Add(message); var (headerLength, bodyLength) = serializer.Write(output, message); RecordMessageSend(message, headerLength + bodyLength, headerLength); - _rebalancerGateway.RecordMessage(message); + messageStatisticsSink.RecordMessage(message); message = null; } } diff --git a/src/Orleans.Core/Networking/ConnectionShared.cs b/src/Orleans.Core/Networking/ConnectionShared.cs index a6b21f76cc9..fde60cf471e 100644 --- a/src/Orleans.Core/Networking/ConnectionShared.cs +++ b/src/Orleans.Core/Networking/ConnectionShared.cs @@ -1,24 +1,19 @@ -using System; +using System; +using Orleans.Placement.Rebalancing; namespace Orleans.Runtime.Messaging { - internal sealed class ConnectionCommon + internal sealed class ConnectionCommon( + IServiceProvider serviceProvider, + MessageFactory messageFactory, + MessagingTrace messagingTrace, + NetworkingTrace networkingTrace, + IMessageStatisticsSink messageStatisticsSink) { - public ConnectionCommon( - IServiceProvider serviceProvider, - MessageFactory messageFactory, - MessagingTrace messagingTrace, - NetworkingTrace networkingTrace) - { - this.ServiceProvider = serviceProvider; - this.MessageFactory = messageFactory; - this.MessagingTrace = messagingTrace; - this.NetworkingTrace = networkingTrace; - } - - public MessageFactory MessageFactory { get; } - public IServiceProvider ServiceProvider { get; } - public NetworkingTrace NetworkingTrace { get; } - public MessagingTrace MessagingTrace { get; } + public MessageFactory MessageFactory { get; } = messageFactory; + public IServiceProvider ServiceProvider { get; } = serviceProvider; + public NetworkingTrace NetworkingTrace { get; } = networkingTrace; + public IMessageStatisticsSink MessageStatisticsSink { get; } = messageStatisticsSink; + public MessagingTrace MessagingTrace { get; } = messagingTrace; } } diff --git a/src/Orleans.Core/Placement/Rebalancing/IImbalanceToleranceRule.cs b/src/Orleans.Core/Placement/Rebalancing/IImbalanceToleranceRule.cs index 645489e471f..195d8e28215 100644 --- a/src/Orleans.Core/Placement/Rebalancing/IImbalanceToleranceRule.cs +++ b/src/Orleans.Core/Placement/Rebalancing/IImbalanceToleranceRule.cs @@ -6,7 +6,7 @@ namespace Orleans.Placement.Rebalancing; public interface IImbalanceToleranceRule { /// - /// Checks if this rule is statisfied by . + /// Checks if this rule is satisfied by . /// /// The imbalance between the exchanging silo pair that will be, if this method were to return bool IsSatisfiedBy(uint imbalance); diff --git a/src/Orleans.Core/Placement/Rebalancing/IMessageSink.cs b/src/Orleans.Core/Placement/Rebalancing/IMessageStatisticsSink.cs similarity index 59% rename from src/Orleans.Core/Placement/Rebalancing/IMessageSink.cs rename to src/Orleans.Core/Placement/Rebalancing/IMessageStatisticsSink.cs index ba7f2bc92fe..277d1287dd2 100644 --- a/src/Orleans.Core/Placement/Rebalancing/IMessageSink.cs +++ b/src/Orleans.Core/Placement/Rebalancing/IMessageStatisticsSink.cs @@ -2,12 +2,12 @@ namespace Orleans.Placement.Rebalancing; -internal interface IMessageSink +internal interface IMessageStatisticsSink { void RecordMessage(Message message); } -internal sealed class NoOpActiveRebalancerGateway : IMessageSink +internal sealed class NoOpMessageStatisticsSink : IMessageStatisticsSink { public void RecordMessage(Message message) { } } \ No newline at end of file diff --git a/src/Orleans.Runtime/Hosting/ActiveRebalancingExtensions.cs b/src/Orleans.Runtime/Hosting/ActiveRebalancingExtensions.cs index 7faa0880dca..7cfd1569766 100644 --- a/src/Orleans.Runtime/Hosting/ActiveRebalancingExtensions.cs +++ b/src/Orleans.Runtime/Hosting/ActiveRebalancingExtensions.cs @@ -40,7 +40,7 @@ private static IServiceCollection AddActiveRebalancing(this IServiceColle services.AddSingleton(); services.AddSingleton(); - services.AddFromExisting(); + services.AddFromExisting(); services.AddFromExisting, ActivationRebalancer>(); return services; diff --git a/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs b/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs index 3a0edb2883f..a4ed94cf486 100644 --- a/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs +++ b/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs @@ -355,7 +355,7 @@ internal static void AddDefaultServices(ISiloBuilder builder) (sp, _) => sp.GetRequiredService()); // Networking - services.TryAddSingleton(); + services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); diff --git a/src/Orleans.Runtime/Messaging/MessageCenter.cs b/src/Orleans.Runtime/Messaging/MessageCenter.cs index d2be403926d..650cc0b04ea 100644 --- a/src/Orleans.Runtime/Messaging/MessageCenter.cs +++ b/src/Orleans.Runtime/Messaging/MessageCenter.cs @@ -26,7 +26,7 @@ internal class MessageCenter : IMessageCenter, IAsyncDisposable private readonly SiloMessagingOptions messagingOptions; private readonly PlacementService placementService; private readonly GrainLocator _grainLocator; - private readonly IMessageSink _rebalancerGateway; + private readonly IMessageStatisticsSink _messageStatisticsSink; private readonly ILogger log; private readonly Catalog catalog; private bool stopped; @@ -45,7 +45,7 @@ public MessageCenter( IOptions messagingOptions, PlacementService placementService, GrainLocator grainLocator, - IMessageSink rebalancerGateway) + IMessageStatisticsSink messageStatisticsSink) { this.catalog = catalog; this.messagingOptions = messagingOptions.Value; @@ -54,7 +54,7 @@ public MessageCenter( this.messagingTrace = messagingTrace; this.placementService = placementService; _grainLocator = grainLocator; - _rebalancerGateway = rebalancerGateway; + _messageStatisticsSink = messageStatisticsSink; this.log = logger; this.messageFactory = messageFactory; this._siloAddress = siloDetails.SiloAddress; @@ -77,7 +77,7 @@ public bool TryDeliverToProxy(Message msg) if (this.Gateway is Gateway gateway && gateway.TryDeliverToProxy(msg) || this.hostedClient is HostedClient client && client.TryDispatchToClient(msg)) { - _rebalancerGateway.RecordMessage(msg); + _messageStatisticsSink.RecordMessage(msg); return true; } @@ -538,7 +538,7 @@ public void ReceiveMessage(Message msg) } targetActivation.ReceiveMessage(msg); - _rebalancerGateway.RecordMessage(msg); + _messageStatisticsSink.RecordMessage(msg); } } catch (Exception ex) diff --git a/src/Orleans.Runtime/Networking/GatewayConnectionListener.cs b/src/Orleans.Runtime/Networking/GatewayConnectionListener.cs index 14409c36f83..b3482b1d56f 100644 --- a/src/Orleans.Runtime/Networking/GatewayConnectionListener.cs +++ b/src/Orleans.Runtime/Networking/GatewayConnectionListener.cs @@ -7,7 +7,6 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Orleans.Configuration; -using Orleans.Placement.Rebalancing; namespace Orleans.Runtime.Messaging { @@ -15,7 +14,6 @@ internal sealed class GatewayConnectionListener : ConnectionListener, ILifecycle { internal static readonly object ServicesKey = new object(); private readonly ILocalSiloDetails localSiloDetails; - private readonly IMessageSink rebalancerGateway; private readonly MessageCenter messageCenter; private readonly ConnectionCommon connectionShared; private readonly ConnectionPreambleHelper connectionPreambleHelper; @@ -31,7 +29,6 @@ public GatewayConnectionListener( IOptions siloConnectionOptions, OverloadDetector overloadDetector, ILocalSiloDetails localSiloDetails, - IMessageSink rebalancerGateway, IOptions endpointOptions, MessageCenter messageCenter, ConnectionManager connectionManager, @@ -44,7 +41,6 @@ public GatewayConnectionListener( this.overloadDetector = overloadDetector; this.gateway = messageCenter.Gateway; this.localSiloDetails = localSiloDetails; - this.rebalancerGateway = rebalancerGateway; this.messageCenter = messageCenter; this.connectionShared = connectionShared; this.connectionPreambleHelper = connectionPreambleHelper; @@ -65,8 +61,7 @@ protected override Connection CreateConnection(ConnectionContext context) this.ConnectionOptions, this.messageCenter, this.connectionShared, - this.connectionPreambleHelper, - this.rebalancerGateway); + this.connectionPreambleHelper); } protected override void ConfigureConnectionBuilder(IConnectionBuilder connectionBuilder) diff --git a/src/Orleans.Runtime/Networking/GatewayInboundConnection.cs b/src/Orleans.Runtime/Networking/GatewayInboundConnection.cs index 4245b1429c4..a30572a2a14 100644 --- a/src/Orleans.Runtime/Networking/GatewayInboundConnection.cs +++ b/src/Orleans.Runtime/Networking/GatewayInboundConnection.cs @@ -28,9 +28,8 @@ public GatewayInboundConnection( ConnectionOptions connectionOptions, MessageCenter messageCenter, ConnectionCommon connectionShared, - ConnectionPreambleHelper connectionPreambleHelper, - IMessageSink rebalancerGateway) - : base(rebalancerGateway, connection, middleware, connectionShared) + ConnectionPreambleHelper connectionPreambleHelper) + : base(connection, middleware, connectionShared) { this.connectionOptions = connectionOptions; this.gateway = gateway; diff --git a/src/Orleans.Runtime/Networking/SiloConnection.cs b/src/Orleans.Runtime/Networking/SiloConnection.cs index 90fffe485af..9d6cd46ab8e 100644 --- a/src/Orleans.Runtime/Networking/SiloConnection.cs +++ b/src/Orleans.Runtime/Networking/SiloConnection.cs @@ -33,9 +33,8 @@ public SiloConnection( ConnectionOptions connectionOptions, ConnectionCommon connectionShared, ProbeRequestMonitor probeMonitor, - ConnectionPreambleHelper connectionPreambleHelper, - IMessageSink rebalancerGateway) - : base(rebalancerGateway, connection, middleware, connectionShared) + ConnectionPreambleHelper connectionPreambleHelper) + : base(connection, middleware, connectionShared) { this.messageCenter = messageCenter; this.connectionManager = connectionManager; diff --git a/src/Orleans.Runtime/Networking/SiloConnectionFactory.cs b/src/Orleans.Runtime/Networking/SiloConnectionFactory.cs index c096f8ff11f..456642894c6 100644 --- a/src/Orleans.Runtime/Networking/SiloConnectionFactory.cs +++ b/src/Orleans.Runtime/Networking/SiloConnectionFactory.cs @@ -5,7 +5,6 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using Orleans.Configuration; -using Orleans.Placement.Rebalancing; namespace Orleans.Runtime.Messaging { @@ -13,7 +12,6 @@ internal sealed class SiloConnectionFactory : ConnectionFactory { internal static readonly object ServicesKey = new object(); private readonly ILocalSiloDetails localSiloDetails; - private readonly IMessageSink rebalancerGateway; private readonly ConnectionCommon connectionShared; private readonly ProbeRequestMonitor probeRequestMonitor; private readonly ConnectionPreambleHelper connectionPreambleHelper; @@ -30,7 +28,6 @@ public SiloConnectionFactory( IOptions connectionOptions, IOptions siloConnectionOptions, ILocalSiloDetails localSiloDetails, - IMessageSink rebalancerGateway, ConnectionCommon connectionShared, ProbeRequestMonitor probeRequestMonitor, ConnectionPreambleHelper connectionPreambleHelper) @@ -39,7 +36,6 @@ public SiloConnectionFactory( this.serviceProvider = serviceProvider; this.siloConnectionOptions = siloConnectionOptions.Value; this.localSiloDetails = localSiloDetails; - this.rebalancerGateway = rebalancerGateway; this.connectionShared = connectionShared; this.probeRequestMonitor = probeRequestMonitor; this.connectionPreambleHelper = connectionPreambleHelper; @@ -71,8 +67,7 @@ protected override Connection CreateConnection(SiloAddress address, ConnectionCo this.ConnectionOptions, this.connectionShared, this.probeRequestMonitor, - this.connectionPreambleHelper, - this.rebalancerGateway); + this.connectionPreambleHelper); } protected override void ConfigureConnectionBuilder(IConnectionBuilder connectionBuilder) diff --git a/src/Orleans.Runtime/Networking/SiloConnectionListener.cs b/src/Orleans.Runtime/Networking/SiloConnectionListener.cs index 6ebc207c6a1..98ca50c3dc3 100644 --- a/src/Orleans.Runtime/Networking/SiloConnectionListener.cs +++ b/src/Orleans.Runtime/Networking/SiloConnectionListener.cs @@ -6,7 +6,6 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using Orleans.Configuration; -using Orleans.Placement.Rebalancing; namespace Orleans.Runtime.Messaging { @@ -14,7 +13,6 @@ internal sealed class SiloConnectionListener : ConnectionListener, ILifecyclePar { internal static readonly object ServicesKey = new object(); private readonly ILocalSiloDetails localSiloDetails; - private readonly IMessageSink rebalancerGateway; private readonly SiloConnectionOptions siloConnectionOptions; private readonly MessageCenter messageCenter; private readonly EndpointOptions endpointOptions; @@ -30,7 +28,6 @@ public SiloConnectionListener( MessageCenter messageCenter, IOptions endpointOptions, ILocalSiloDetails localSiloDetails, - IMessageSink rebalancerGateway, ConnectionManager connectionManager, ConnectionCommon connectionShared, ProbeRequestMonitor probeRequestMonitor, @@ -40,7 +37,6 @@ public SiloConnectionListener( this.siloConnectionOptions = siloConnectionOptions.Value; this.messageCenter = messageCenter; this.localSiloDetails = localSiloDetails; - this.rebalancerGateway = rebalancerGateway; this.connectionManager = connectionManager; this.connectionShared = connectionShared; this.probeRequestMonitor = probeRequestMonitor; @@ -62,8 +58,7 @@ protected override Connection CreateConnection(ConnectionContext context) this.ConnectionOptions, this.connectionShared, this.probeRequestMonitor, - this.connectionPreambleHelper, - this.rebalancerGateway); + this.connectionPreambleHelper); } protected override void ConfigureConnectionBuilder(IConnectionBuilder connectionBuilder) diff --git a/src/Orleans.Runtime/Placement/Rebalancing/ActivationRebalancer.MessageSink.cs b/src/Orleans.Runtime/Placement/Rebalancing/ActivationRebalancer.MessageSink.cs index 04dc0b0831d..fa30c4f1367 100644 --- a/src/Orleans.Runtime/Placement/Rebalancing/ActivationRebalancer.MessageSink.cs +++ b/src/Orleans.Runtime/Placement/Rebalancing/ActivationRebalancer.MessageSink.cs @@ -7,7 +7,7 @@ namespace Orleans.Runtime.Placement.Rebalancing; -internal partial class ActivationRebalancer : IMessageSink +internal partial class ActivationRebalancer : IMessageStatisticsSink { private readonly CancellationTokenSource _shutdownCts = new(); private Task? _processPendingEdgesTask;