Skip to content

Commit

Permalink
Refactoring 2
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenBond committed Apr 9, 2024
1 parent 46cebe5 commit e50a28a
Show file tree
Hide file tree
Showing 16 changed files with 37 additions and 66 deletions.
2 changes: 1 addition & 1 deletion src/Orleans.Core/Core/DefaultClientServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public static void AddDefaultServices(IClientBuilder builder)
services.AddSingleton<SharedMemoryPool>();

// Networking
services.TryAddSingleton<IMessageSink, NoOpActiveRebalancerGateway>();
services.TryAddSingleton<IMessageStatisticsSink, NoOpMessageStatisticsSink>();
services.TryAddSingleton<ConnectionCommon>();
services.TryAddSingleton<ConnectionManager>();
services.TryAddSingleton<ConnectionPreambleHelper>();
Expand Down
5 changes: 2 additions & 3 deletions src/Orleans.Core/Networking/ClientOutboundConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ public ClientOutboundConnection(
ConnectionOptions connectionOptions,
ConnectionCommon connectionShared,
ConnectionPreambleHelper connectionPreambleHelper,
ClusterOptions clusterOptions,
IMessageSink rebalancerGateway)
: base(rebalancerGateway, connection, middleware, connectionShared)
ClusterOptions clusterOptions)
: base(connection, middleware, connectionShared)
{
this.messageCenter = messageCenter;
this.connectionManager = connectionManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ internal sealed class ClientOutboundConnectionFactory : ConnectionFactory
private readonly ClientConnectionOptions clientConnectionOptions;
private readonly ClusterOptions clusterOptions;
private readonly ConnectionPreambleHelper connectionPreambleHelper;
private readonly IMessageSink rebalancerGateway;
private readonly object initializationLock = new object();
private volatile bool isInitialized;
private ClientMessageCenter messageCenter;
Expand All @@ -24,12 +23,10 @@ public ClientOutboundConnectionFactory(
IOptions<ConnectionOptions> connectionOptions,
IOptions<ClientConnectionOptions> clientConnectionOptions,
IOptions<ClusterOptions> clusterOptions,
IMessageSink rebalancerGateway,
ConnectionCommon connectionShared,
ConnectionPreambleHelper connectionPreambleHelper)
: base(connectionShared.ServiceProvider.GetRequiredKeyedService<IConnectionFactory>(ServicesKey), connectionShared.ServiceProvider, connectionOptions)
{
this.rebalancerGateway = rebalancerGateway;
this.connectionShared = connectionShared;
this.clientConnectionOptions = clientConnectionOptions.Value;
this.clusterOptions = clusterOptions.Value;
Expand All @@ -49,8 +46,7 @@ protected override Connection CreateConnection(SiloAddress address, ConnectionCo
this.ConnectionOptions,
this.connectionShared,
this.connectionPreambleHelper,
this.clusterOptions,
this.rebalancerGateway);
this.clusterOptions);
}

protected override void ConfigureConnectionBuilder(IConnectionBuilder connectionBuilder)
Expand Down
6 changes: 2 additions & 4 deletions src/Orleans.Core/Networking/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ internal abstract class Connection

private static readonly ObjectPool<MessageHandler> MessageHandlerPool = ObjectPool.Create(new MessageHandlerPoolPolicy());
private readonly ConnectionCommon shared;
private readonly IMessageSink _rebalancerGateway;
private readonly ConnectionDelegate middleware;
private readonly Channel<Message> outgoingMessages;
private readonly ChannelWriter<Message> outgoingMessageWriter;
Expand All @@ -43,12 +42,10 @@ internal abstract class Connection
private Task _closeTask;

protected Connection(
IMessageSink rebalancerGateway,
ConnectionContext connection,
ConnectionDelegate middleware,
ConnectionCommon shared)
{
_rebalancerGateway = rebalancerGateway ?? throw new ArgumentNullException(nameof(rebalancerGateway));
this.Context = connection ?? throw new ArgumentNullException(nameof(connection));
this.middleware = middleware ?? throw new ArgumentNullException(nameof(middleware));
this.shared = shared;
Expand Down Expand Up @@ -356,6 +353,7 @@ private async Task ProcessOutgoing()

Exception error = default;
var serializer = this.shared.ServiceProvider.GetRequiredService<MessageSerializer>();
var messageStatisticsSink = this.shared.MessageStatisticsSink;
try
{
var output = this._transport.Output;
Expand All @@ -377,7 +375,7 @@ private async Task ProcessOutgoing()
inflight.Add(message);
var (headerLength, bodyLength) = serializer.Write(output, message);
RecordMessageSend(message, headerLength + bodyLength, headerLength);
_rebalancerGateway.RecordMessage(message);
messageStatisticsSink.RecordMessage(message);
message = null;
}
}
Expand Down
31 changes: 13 additions & 18 deletions src/Orleans.Core/Networking/ConnectionShared.cs
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
using System;
using System;
using Orleans.Placement.Rebalancing;

namespace Orleans.Runtime.Messaging
{
internal sealed class ConnectionCommon
internal sealed class ConnectionCommon(
IServiceProvider serviceProvider,
MessageFactory messageFactory,
MessagingTrace messagingTrace,
NetworkingTrace networkingTrace,
IMessageStatisticsSink messageStatisticsSink)
{
public ConnectionCommon(
IServiceProvider serviceProvider,
MessageFactory messageFactory,
MessagingTrace messagingTrace,
NetworkingTrace networkingTrace)
{
this.ServiceProvider = serviceProvider;
this.MessageFactory = messageFactory;
this.MessagingTrace = messagingTrace;
this.NetworkingTrace = networkingTrace;
}

public MessageFactory MessageFactory { get; }
public IServiceProvider ServiceProvider { get; }
public NetworkingTrace NetworkingTrace { get; }
public MessagingTrace MessagingTrace { get; }
public MessageFactory MessageFactory { get; } = messageFactory;
public IServiceProvider ServiceProvider { get; } = serviceProvider;
public NetworkingTrace NetworkingTrace { get; } = networkingTrace;
public IMessageStatisticsSink MessageStatisticsSink { get; } = messageStatisticsSink;
public MessagingTrace MessagingTrace { get; } = messagingTrace;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace Orleans.Placement.Rebalancing;
public interface IImbalanceToleranceRule
{
/// <summary>
/// Checks if this rule is statisfied by <paramref name="imbalance"/>.
/// Checks if this rule is satisfied by <paramref name="imbalance"/>.
/// </summary>
/// <param name="imbalance">The imbalance between the exchanging silo pair that will be, if this method were to return <see langword="true"/></param>
bool IsSatisfiedBy(uint imbalance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

namespace Orleans.Placement.Rebalancing;

internal interface IMessageSink
internal interface IMessageStatisticsSink
{
void RecordMessage(Message message);
}

internal sealed class NoOpActiveRebalancerGateway : IMessageSink
internal sealed class NoOpMessageStatisticsSink : IMessageStatisticsSink
{
public void RecordMessage(Message message) { }
}
2 changes: 1 addition & 1 deletion src/Orleans.Runtime/Hosting/ActiveRebalancingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private static IServiceCollection AddActiveRebalancing<TRule>(this IServiceColle

services.AddSingleton<IRebalancingMessageFilter, RebalancingMessageFilter>();
services.AddSingleton<ActivationRebalancer>();
services.AddFromExisting<IMessageSink, ActivationRebalancer>();
services.AddFromExisting<IMessageStatisticsSink, ActivationRebalancer>();
services.AddFromExisting<ILifecycleParticipant<ISiloLifecycle>, ActivationRebalancer>();

return services;
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Runtime/Hosting/DefaultSiloServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ internal static void AddDefaultServices(ISiloBuilder builder)
(sp, _) => sp.GetRequiredService<IAsyncEnumerableGrainExtension>());

// Networking
services.TryAddSingleton<IMessageSink, NoOpActiveRebalancerGateway>();
services.TryAddSingleton<IMessageStatisticsSink, NoOpMessageStatisticsSink>();
services.TryAddSingleton<ConnectionCommon>();
services.TryAddSingleton<ConnectionManager>();
services.TryAddSingleton<ConnectionPreambleHelper>();
Expand Down
10 changes: 5 additions & 5 deletions src/Orleans.Runtime/Messaging/MessageCenter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ internal class MessageCenter : IMessageCenter, IAsyncDisposable
private readonly SiloMessagingOptions messagingOptions;
private readonly PlacementService placementService;
private readonly GrainLocator _grainLocator;
private readonly IMessageSink _rebalancerGateway;
private readonly IMessageStatisticsSink _messageStatisticsSink;
private readonly ILogger log;
private readonly Catalog catalog;
private bool stopped;
Expand All @@ -45,7 +45,7 @@ public MessageCenter(
IOptions<SiloMessagingOptions> messagingOptions,
PlacementService placementService,
GrainLocator grainLocator,
IMessageSink rebalancerGateway)
IMessageStatisticsSink messageStatisticsSink)
{
this.catalog = catalog;
this.messagingOptions = messagingOptions.Value;
Expand All @@ -54,7 +54,7 @@ public MessageCenter(
this.messagingTrace = messagingTrace;
this.placementService = placementService;
_grainLocator = grainLocator;
_rebalancerGateway = rebalancerGateway;
_messageStatisticsSink = messageStatisticsSink;
this.log = logger;
this.messageFactory = messageFactory;
this._siloAddress = siloDetails.SiloAddress;
Expand All @@ -77,7 +77,7 @@ public bool TryDeliverToProxy(Message msg)
if (this.Gateway is Gateway gateway && gateway.TryDeliverToProxy(msg)
|| this.hostedClient is HostedClient client && client.TryDispatchToClient(msg))
{
_rebalancerGateway.RecordMessage(msg);
_messageStatisticsSink.RecordMessage(msg);
return true;
}

Expand Down Expand Up @@ -538,7 +538,7 @@ public void ReceiveMessage(Message msg)
}

targetActivation.ReceiveMessage(msg);
_rebalancerGateway.RecordMessage(msg);
_messageStatisticsSink.RecordMessage(msg);
}
}
catch (Exception ex)
Expand Down
7 changes: 1 addition & 6 deletions src/Orleans.Runtime/Networking/GatewayConnectionListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Orleans.Configuration;
using Orleans.Placement.Rebalancing;

namespace Orleans.Runtime.Messaging
{
internal sealed class GatewayConnectionListener : ConnectionListener, ILifecycleParticipant<ISiloLifecycle>, ILifecycleObserver
{
internal static readonly object ServicesKey = new object();
private readonly ILocalSiloDetails localSiloDetails;
private readonly IMessageSink rebalancerGateway;
private readonly MessageCenter messageCenter;
private readonly ConnectionCommon connectionShared;
private readonly ConnectionPreambleHelper connectionPreambleHelper;
Expand All @@ -31,7 +29,6 @@ public GatewayConnectionListener(
IOptions<SiloConnectionOptions> siloConnectionOptions,
OverloadDetector overloadDetector,
ILocalSiloDetails localSiloDetails,
IMessageSink rebalancerGateway,
IOptions<EndpointOptions> endpointOptions,
MessageCenter messageCenter,
ConnectionManager connectionManager,
Expand All @@ -44,7 +41,6 @@ public GatewayConnectionListener(
this.overloadDetector = overloadDetector;
this.gateway = messageCenter.Gateway;
this.localSiloDetails = localSiloDetails;
this.rebalancerGateway = rebalancerGateway;
this.messageCenter = messageCenter;
this.connectionShared = connectionShared;
this.connectionPreambleHelper = connectionPreambleHelper;
Expand All @@ -65,8 +61,7 @@ protected override Connection CreateConnection(ConnectionContext context)
this.ConnectionOptions,
this.messageCenter,
this.connectionShared,
this.connectionPreambleHelper,
this.rebalancerGateway);
this.connectionPreambleHelper);
}

protected override void ConfigureConnectionBuilder(IConnectionBuilder connectionBuilder)
Expand Down
5 changes: 2 additions & 3 deletions src/Orleans.Runtime/Networking/GatewayInboundConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ public GatewayInboundConnection(
ConnectionOptions connectionOptions,
MessageCenter messageCenter,
ConnectionCommon connectionShared,
ConnectionPreambleHelper connectionPreambleHelper,
IMessageSink rebalancerGateway)
: base(rebalancerGateway, connection, middleware, connectionShared)
ConnectionPreambleHelper connectionPreambleHelper)
: base(connection, middleware, connectionShared)
{
this.connectionOptions = connectionOptions;
this.gateway = gateway;
Expand Down
5 changes: 2 additions & 3 deletions src/Orleans.Runtime/Networking/SiloConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ public SiloConnection(
ConnectionOptions connectionOptions,
ConnectionCommon connectionShared,
ProbeRequestMonitor probeMonitor,
ConnectionPreambleHelper connectionPreambleHelper,
IMessageSink rebalancerGateway)
: base(rebalancerGateway, connection, middleware, connectionShared)
ConnectionPreambleHelper connectionPreambleHelper)
: base(connection, middleware, connectionShared)
{
this.messageCenter = messageCenter;
this.connectionManager = connectionManager;
Expand Down
7 changes: 1 addition & 6 deletions src/Orleans.Runtime/Networking/SiloConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Orleans.Configuration;
using Orleans.Placement.Rebalancing;

namespace Orleans.Runtime.Messaging
{
internal sealed class SiloConnectionFactory : ConnectionFactory
{
internal static readonly object ServicesKey = new object();
private readonly ILocalSiloDetails localSiloDetails;
private readonly IMessageSink rebalancerGateway;
private readonly ConnectionCommon connectionShared;
private readonly ProbeRequestMonitor probeRequestMonitor;
private readonly ConnectionPreambleHelper connectionPreambleHelper;
Expand All @@ -30,7 +28,6 @@ public SiloConnectionFactory(
IOptions<ConnectionOptions> connectionOptions,
IOptions<SiloConnectionOptions> siloConnectionOptions,
ILocalSiloDetails localSiloDetails,
IMessageSink rebalancerGateway,
ConnectionCommon connectionShared,
ProbeRequestMonitor probeRequestMonitor,
ConnectionPreambleHelper connectionPreambleHelper)
Expand All @@ -39,7 +36,6 @@ public SiloConnectionFactory(
this.serviceProvider = serviceProvider;
this.siloConnectionOptions = siloConnectionOptions.Value;
this.localSiloDetails = localSiloDetails;
this.rebalancerGateway = rebalancerGateway;
this.connectionShared = connectionShared;
this.probeRequestMonitor = probeRequestMonitor;
this.connectionPreambleHelper = connectionPreambleHelper;
Expand Down Expand Up @@ -71,8 +67,7 @@ protected override Connection CreateConnection(SiloAddress address, ConnectionCo
this.ConnectionOptions,
this.connectionShared,
this.probeRequestMonitor,
this.connectionPreambleHelper,
this.rebalancerGateway);
this.connectionPreambleHelper);
}

protected override void ConfigureConnectionBuilder(IConnectionBuilder connectionBuilder)
Expand Down
7 changes: 1 addition & 6 deletions src/Orleans.Runtime/Networking/SiloConnectionListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Orleans.Configuration;
using Orleans.Placement.Rebalancing;

namespace Orleans.Runtime.Messaging
{
internal sealed class SiloConnectionListener : ConnectionListener, ILifecycleParticipant<ISiloLifecycle>, ILifecycleObserver
{
internal static readonly object ServicesKey = new object();
private readonly ILocalSiloDetails localSiloDetails;
private readonly IMessageSink rebalancerGateway;
private readonly SiloConnectionOptions siloConnectionOptions;
private readonly MessageCenter messageCenter;
private readonly EndpointOptions endpointOptions;
Expand All @@ -30,7 +28,6 @@ public SiloConnectionListener(
MessageCenter messageCenter,
IOptions<EndpointOptions> endpointOptions,
ILocalSiloDetails localSiloDetails,
IMessageSink rebalancerGateway,
ConnectionManager connectionManager,
ConnectionCommon connectionShared,
ProbeRequestMonitor probeRequestMonitor,
Expand All @@ -40,7 +37,6 @@ public SiloConnectionListener(
this.siloConnectionOptions = siloConnectionOptions.Value;
this.messageCenter = messageCenter;
this.localSiloDetails = localSiloDetails;
this.rebalancerGateway = rebalancerGateway;
this.connectionManager = connectionManager;
this.connectionShared = connectionShared;
this.probeRequestMonitor = probeRequestMonitor;
Expand All @@ -62,8 +58,7 @@ protected override Connection CreateConnection(ConnectionContext context)
this.ConnectionOptions,
this.connectionShared,
this.probeRequestMonitor,
this.connectionPreambleHelper,
this.rebalancerGateway);
this.connectionPreambleHelper);
}

protected override void ConfigureConnectionBuilder(IConnectionBuilder connectionBuilder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

namespace Orleans.Runtime.Placement.Rebalancing;

internal partial class ActivationRebalancer : IMessageSink
internal partial class ActivationRebalancer : IMessageStatisticsSink
{
private readonly CancellationTokenSource _shutdownCts = new();
private Task? _processPendingEdgesTask;
Expand Down

0 comments on commit e50a28a

Please sign in to comment.