-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make StreamQueueBalancer Pluggable #3152
Conversation
- Make it possible for the injected StreamQueueBalancer to be built on top of an async service, such as external storage, or a lease manager.
/// Async Initialize logic | ||
/// </summary> | ||
/// <returns></returns> | ||
Task Initialize(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add hock for async initialize logic, such as load queue balancing info from an async storage service, or connect async lease manager for initialize logic.
The same reason for other methods to be async
@@ -99,6 +99,9 @@ public class PersistentStreamProviderConfig | |||
public const string SILO_MATURITY_PERIOD = "SiloMaturityPeriod"; | |||
public static readonly TimeSpan DEFAULT_SILO_MATURITY_PERIOD = TimeSpan.FromMinutes(2); | |||
|
|||
//optional param. user need to set this when they use Custom StreamQueueBalancer | |||
public const string QUEUE_BALANCER_FACTORY_NAME = nameof(QueueBalancerFactoryName); | |||
public string QueueBalancerFactoryName { get; set; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
User needs to firstly set QueueBalancerFactoryName and set BalancerType to be CustomBalancer, secondly inject their IStreamQueueBalancerFactory
with the set name to DI to be able to use this feature.
namespace Tester.StreamingTests | ||
{ | ||
//one lease manager grain per stream provider, so its key is stream provider name | ||
public interface ILeaseManagerGrain : IGrainWithStringKey |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just used in tests.
The test is a small probe work to prototype LeaseManager based queue balancer, mainly to make sure it can access most dependency object it needs to access.
{ | ||
this.siloStatusOracle = base.ServiceProvider.GetRequiredService<ISiloStatusOracle>(); | ||
this.clusterConfiguration = base.ServiceProvider.GetRequiredService<ClusterConfiguration>(); | ||
this.queueMapper = base.ServiceProvider.GetServiceByName<IStreamQueueMapper>(this.GetPrimaryKeyString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"LeaseManager being grain" has been discussed as a possibility. I want to make sure LeaseManagerGrainBasedQueueBalancer can be an option later.
Just want to bring up the concern that, if we go with "Lease manager being a grain" option, assign it complex dependency will not be intuitive with this PR's interface design.
As an example: I found it difficult for LeaseManagerGrain to access StreamQueueMapper, which intuitively should be its dependency. This is a pretty hacky way to access it. We can think of other more elegant way in the future. But I think it is fine to do it in tests ;)
protected override TestCluster CreateTestCluster() | ||
{ | ||
var options = new TestClusterOptions(siloCount); | ||
ProviderSettings.EventHubPartitionCount = totalQueueCount; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use EventDataGeneratorStreamProvider mainly because it is a very configurable provider, I can assign any queue count I like through config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would prefer if these tests where in a test harness so we could run them against other persistent providers.
Initial test should probably use memory stream provider, It allows the same queue flexibility, while also allowing us to do this in a sharable harness.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah make sense !
@dotnet-bot test netfx-functional |
{ | ||
/// <summary> | ||
/// Async Initialize logic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like "Initializes this instance", perhaps
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, I'm not sure I follow. What else instance will this be initializing? it is initializing itself. So "this instance" is a bit redundant to me
EDIT: oh you meant the xml comments. I though you meant the method name. yeah we can add that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, just the doc comments. There are many Initialize
methods in Orleans, so I wanted them to be aligned. Some examples from our code:
/// <summary>
/// Initializes the external serializer. Called once when the serialization manager creates
/// an instance of this type
/// </summary>
void Initialize(Logger logger);
/// <summary>
/// Initializes the client runtime from the standard client configuration file.
/// </summary>
public static void Initialize()
/// <summary>
/// Initializes this instance.
/// </summary>
public void Initialize()
We don't use StyleCop, but standard MS StyleCop rules check that constructor doc comments all follow a pattern like this:
/// <summary>
/// Initializes a new instance of the <see cref="AssemblyProcessor"/> class.
/// </summary>
/// <summary> | ||
/// Use user configured custom queue balancer | ||
/// </summary> | ||
CustomBalancer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would result in a larger change, but I have strong preference to fetching the balancer from DI and only using this type if no balancer has been added to the container. The NamedServices DI feature can hopefully be used so that different providers can have different balancers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check out ReminderTableFactory
for an example of how this is done today. It will have to change a bit for named services, but it's registered in Silo
as:
services.AddSingleton<ReminderTableFactory>();
services.AddSingleton<IReminderTable>(sp => sp.GetRequiredService<ReminderTableFactory>().Create());
If the user injects their own IReminderTable
, then that will be used instead.
Ideally we would only add our default one if they have not added their own - maybe @jdom knows a better way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jason-bragg has similar concern on that configuration is a bit redundant. He suggested something which convinced me, and I think you will like it.
He suggested remove this type, and make StreamQueueBalancerType.cs a list of static string, which list the build in queue balancers. so the runtime can resolve the queue balancer using named service. And when user want to inject their own queue balancer, they can configure PersistenStreamProviderConfig.BalancerType to be their custom queue balancer type by giving it a name. And we can remove the other QueueBalancerFactoryName config .
I think it is similar to what you suggested. Hope this make sense, or you will see in my PR update ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As to add balancers to DI. I suspect the value of it. Objects are put into DI because it needs to be available in multiple places. But for balancers, it got created once per stream provider and pass to pulling agent manager, and it's done. I don't see where DI comes to play here.
And adding balancer to DI doesn't play well with its life cycle. every stream provider create balancer at the step of initializing pulling agent manager, with a different set of params per stream provider. This is hard to access and differentiate in the step of building DI container
It make sense to me to add BalancerFactory to DI as a named service, because that's the most convenient and familiar pattern for Orleans user to configure a custom service now.
} | ||
} | ||
|
||
public bool UnSubscribeToQueueDistributionChangeEvents(IStreamQueueBalanceListener observer) | ||
public Task<bool> UnSubscribeToQueueDistributionChangeEvents(IStreamQueueBalanceListener observer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should rename this: UnsubscribeFromQueueBalancingEvents
and similar for the subscribe method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you favor QueueBalancing over QueueDistribution? queue balancing is a verb, and queue distribution is a norm which describe current queue distribution state, right? seems to me , it make more sense to use QueueDistribution
Use queue balancing is a bit confusing. it doesn't indicate what kind of events listener will be receiving. Does it receive only queue distribution change events, or does it receiver balancer state changing also, which listener care less?
@@ -37,7 +42,7 @@ internal interface IStreamQueueBalancer | |||
/// It should be implemented by components interested in stream queue load balancing. | |||
/// When change notification is received, listener should request updated list of queues from the queue balancer. | |||
/// </summary> | |||
internal interface IStreamQueueBalanceListener : IAddressable | |||
public interface IStreamQueueBalanceListener : IAddressable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be IAddressable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it was IAddressable before. But you are right, this limits listener to be only IAddressable.
I wonder is there historical reason for it @jason-bragg ?
/// <summary> | ||
/// Built-in stream queue balancer type which is supported natively in orleans | ||
/// </summary> | ||
public static class BuiltInStreamQueueBalancerType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing the name of this class will make it a breaking change. But I slightly prefer this name, which has clearer indication, especially now we support pluggable queue balancer now. But I will be fine if you want to change it back
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's leave the enum, and have this class expose string for the enums. That way legacy code/tests can still use these enum values.
In the config object we can add a 'string QueueBalancer' property and change the BalancerType to be a passthrough that setts the QueueBalancer'. The property in the property bag would remain "BalancerType"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?? if you are worrying about breaking change, only by changing the name would make this a breaking change. Change it from a enum class to a static class with strings won't make it a breaking change. By breaking change I mean: user will have compiling errors from it. To check this, you can change it from a enum class to a static class and compile your test project, and see if you get any compile errors. Maybe I missed something.
And I don't fully understand the logic behind an extra QueueBalancer setting. To avoid breaking change? Then back to my original question.
I guess my question is, what is your concern? if you are worrying about breaking change, then I should just change the name back, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was concerned about breaking changes. It seems like we could maintain the old enums and add support for strings, but in retrospect, it's probably best to have only one supported way (strings) and eat the cost of a breaking change.
I withdraw my suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but it is not a breaking change if we change this class's name back. If we want to avoid breaking change, then we need to change the class name back, which is what I try to say.
People use this as BalancerType = StreamQueueBalancerType.WhateverType.
after this change, object in both sides of the "=" will become string, so it won't cause compile problems.
namespace Orleans.Streams | ||
{ | ||
public interface IStreamQueueBalancerFactory | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still think this should be an interface, instead of a factory delegate...
I think factory as a delegate will be good if it is part of a bigger class's property, it is light weight that way. But that is not true in this PR.
User inject a factory instance, instead of a factory delegate fits more into OOP patterns
updated with fixes for PR feedback |
/// Retrieves the latest queue distribution for this balancer. | ||
/// </summary> | ||
/// <returns>Queue allocated to this balancer.</returns> | ||
IEnumerable<QueueId> GetMyQueues(); | ||
Task<IEnumerable<QueueId>> GetMyQueues(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you make these tasks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I explained it in my comments before: to make it possible for the queue balancer to be built on top of an async service, such as an async lease manager, or a storage service.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, sorry, I missed that. Now I understand.
Ok, so we'd considered this in the original pattern, and chose for this to be synchronous. The reason for this is that this is a push+pull pattern. We notify the IStreamQueueBalanceListeners once we get a change, so we already have the change at that point, so there is no reason to be asynchronous. This is actually important, because the agent is reentrant and giving up it's turn to get the queues could have unexpected side effects. Even the subscribe and unsubscribe calls were expected to be local, because the queue balancer is expected to be per silo, we don't want it to be distributed and asynchronous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will change it back
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the heads up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May need to note in the comments, that GetMyQueues, SubscribeToQueueDistributionChangeEvents, and UnSubscribeFromQueueDistributionChangeEvents will likely be called in the IStreamQueueBalanceListener's thread so they need to be thread safe.
@@ -21,7 +22,7 @@ internal class StreamQueueBalancerFactory | |||
/// <param name="siloMaturityPeriod">Maturity Period of a silo for queue rebalancing purposes</param> | |||
/// <returns>Constructed stream queue balancer</returns> | |||
public static IStreamQueueBalancer Create( | |||
StreamQueueBalancerType balancerType, | |||
string balancerType, | |||
string strProviderName, | |||
ISiloStatusOracle siloStatusOracle, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ISiloStatusOracle, ClusterConfiguration, and IStreamProviderRuntime, can be obtained by DI in the factory's constructor. We should probably remove them from the Create call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not a big fan of service locator pattern. but seems like that's the orleans style now. we use it in grain public API too.....
so remove those and add IServiceProvider param?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking here is to avoid passing anything that can be obtained by DI, passing only the things that the queue balancer manager alone has, like balancer type, provider name, and maturity period. The container can be obtained from DI, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, but we don't need the container either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see my latest commit. We decided to shift from "configure balancer through a factory" to "configure balancer through its init method", as a more straight-forward alternative. so this factory interface is removed.
Concern for a keyed factory is, it split configuration into two places: factory class and keyed service config in DI, which IMO is not an very comprehensive pattern to users. And the constraits we put on the interface is not enough to avoid bad user code. It's easy for user to misunderstand this feature as "inject an factory as a singleton and do a case-switch inside its create method to generate the correct balancer"
@@ -91,20 +91,25 @@ public class PersistentStreamProviderConfig | |||
public static readonly TimeSpan DEFAULT_STREAM_INACTIVITY_PERIOD = TimeSpan.FromMinutes(30); | |||
|
|||
public const string QUEUE_BALANCER_TYPE = "QueueBalancerType"; | |||
public const StreamQueueBalancerType DEFAULT_STREAM_QUEUE_BALANCER_TYPE = StreamQueueBalancerType.ConsistentRingBalancer; | |||
//default balancer type if ConsistentRingBalancer | |||
public static Type DEFAULT_STREAM_QUEUE_BALANCER_TYPE = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think we need this.. default would be null anyway, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mainly for less breaking change, so user can still do
providerSettings.BalancerType = PersistentStreamProviderConfig.DEFAULT_STREAM_QUEUE_BALANCER_TYPE
, when they configure stream provider.
Although this seems like an unusual usage pattern, since without this line it will still be the default value. So I don't have strong option against removing this
@@ -13,7 +13,7 @@ internal interface IPersistentStreamPullingAgent : ISystemTarget, IStreamProduce | |||
|
|||
internal interface IPersistentStreamPullingManager : ISystemTarget | |||
{ | |||
Task Initialize(Immutable<IQueueAdapter> queueAdapter); | |||
Task Initialize(Immutable<IQueueAdapter> queueAdapter, Immutable<IStreamQueueMapper> queueMapper); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can get the mapper from the adapter, why pass in the additional arg?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
U are fast ! wait for 30 mins and come back ! I just submitted this. haven't clean up yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry for the confusion
As a more straight-forward alternative
@@ -6,20 +6,26 @@ | |||
|
|||
namespace Orleans.Streams | |||
{ | |||
internal class ConsistentRingQueueBalancer : IAsyncRingRangeListener, IStreamQueueBalancer | |||
public class ConsistentRingQueueBalancer : IAsyncRingRangeListener, IStreamQueueBalancer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
y make this, and all that it requires, public. This is can/should remain internal, right?
@@ -21,7 +22,7 @@ internal class StreamQueueBalancerFactory | |||
/// <param name="siloMaturityPeriod">Maturity Period of a silo for queue rebalancing purposes</param> | |||
/// <returns>Constructed stream queue balancer</returns> | |||
public static IStreamQueueBalancer Create( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thought we were getting rid of this class? No longer needed right? Queue manager should be able to get balancer form DI by type, then initialize it directly.
/// <summary> | ||
/// Built-in stream queue balancer type which is supported natively in orleans | ||
/// </summary> | ||
public static class StreamQueueBalancerType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is where we can define the default queue balancer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I follow. Let's talk in person tomorrow
public StreamQueueBalancerType BalancerType { get; set; } = DEFAULT_STREAM_QUEUE_BALANCER_TYPE; | ||
|
||
/// <summary> | ||
/// The queue balancer type for your stream provider. If you are using a custom queue balancer by injecting IStreamQueueBalancer as a transient service into DI, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably should specify 'transient', we don't really care about the scoping.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I specified transient at the end. scroll 💃
@dotnet-bot test this please |
Make StreamQueueBalancer pluggable,
Make StreamQueueBalancer pluggable,
Make StreamQueueBalancer pluggable,
Make it possible for the injected StreamQueueBalancer to be built on top of an async service, such as external storage, or a lease manager.