Skip to content

Commit

Permalink
Allow the delays in the Message Pumps to be configurable
Browse files Browse the repository at this point in the history
Update the Subscription to allow the configuration of
  - the Empty channel delay
  - Channel failure delay
  • Loading branch information
preardon committed Apr 4, 2022
1 parent 20de937 commit fd2ee58
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 49 deletions.
16 changes: 12 additions & 4 deletions src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ public class SqsSubscription : Subscription
/// <param name="tags">Resource tags to be added to the queue</param>
/// <param name="makeChannels">Should we make channels if they don't exist, defaults to creating</param>
/// <param name="rawMessageDelivery">The indication of Raw Message Delivery setting is enabled or disabled</param>
/// <param name="emptyChannelDelay">How long to pause when a channel is empty in milliseconds</param>
/// <param name="channelFailureDelay">How long to pause when there is a channel failure in milliseconds</param>
public SqsSubscription(Type dataType,
SubscriptionName name = null,
ChannelName channelName = null,
Expand All @@ -138,9 +140,11 @@ public SqsSubscription(Type dataType,
SnsAttributes snsAttributes = null,
Dictionary<string,string> tags = null,
OnMissingChannel makeChannels = OnMissingChannel.Create,
bool rawMessageDelivery = true
bool rawMessageDelivery = true,
int emptyChannelDelay = 500,
int channelFailureDelay = 1000
)
: base(dataType, name, channelName, routingKey, bufferSize, noOfPerformers, timeoutInMs, requeueCount, requeueDelayInMs, unacceptableMessageLimit, runAsync, channelFactory, makeChannels)
: base(dataType, name, channelName, routingKey, bufferSize, noOfPerformers, timeoutInMs, requeueCount, requeueDelayInMs, unacceptableMessageLimit, runAsync, channelFactory, makeChannels, emptyChannelDelay, channelFailureDelay)
{
LockTimeout = lockTimeout;
DelaySeconds = delaySeconds;
Expand Down Expand Up @@ -189,6 +193,8 @@ public class SqsSubscription<T> : SqsSubscription where T : IRequest
/// <param name="tags">Resource tags to be added to the queue</param>
/// <param name="makeChannels">Should we make channels if they don't exist, defaults to creating</param>
/// <param name="rawMessageDelivery">The indication of Raw Message Delivery setting is enabled or disabled</param>
/// <param name="emptyChannelDelay">How long to pause when a channel is empty in milliseconds</param>
/// <param name="channelFailureDelay">How long to pause when there is a channel failure in milliseconds</param>
public SqsSubscription(SubscriptionName name = null,
ChannelName channelName = null,
RoutingKey routingKey = null,
Expand All @@ -211,12 +217,14 @@ public SqsSubscription(SubscriptionName name = null,
SnsAttributes snsAttributes = null,
Dictionary<string,string> tags = null,
OnMissingChannel makeChannels = OnMissingChannel.Create,
bool rawMessageDelivery = true
bool rawMessageDelivery = true,
int emptyChannelDelay = 500,
int channelFailureDelay = 1000
)
: base(typeof(T), name, channelName, routingKey, bufferSize, noOfPerformers, timeoutInMs, pollDelayInMs,
noWorkPauseInMs, requeueCount, requeueDelayInMs, unacceptableMessageLimit, runAsync, channelFactory,
lockTimeout, delaySeconds, messageRetentionPeriod,findTopicBy, iAmPolicy,redrivePolicy,
snsAttributes, tags, makeChannels, rawMessageDelivery)
snsAttributes, tags, makeChannels, rawMessageDelivery, emptyChannelDelay, channelFailureDelay)
{
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public class AzureServiceBusSubscription : Subscription
/// <param name="channelFactory">The channel factory to create channels for Consumer.</param>
/// <param name="makeChannels">Should we make channels if they don't exist, defaults to creating</param>
/// <param name="subscriptionConfiguration">The configuration options for the subscriptions.</param>
/// <param name="emptyChannelDelay">How long to pause when a channel is empty in milliseconds</param>
/// <param name="channelFailureDelay">How long to pause when there is a channel failure in milliseconds</param>
public AzureServiceBusSubscription(
Type dataType,
SubscriptionName name = null,
Expand All @@ -40,9 +42,11 @@ public AzureServiceBusSubscription(
bool isAsync = false,
IAmAChannelFactory channelFactory = null,
OnMissingChannel makeChannels = OnMissingChannel.Create,
AzureServiceBusSubscriptionConfiguration subscriptionConfiguration = null)
AzureServiceBusSubscriptionConfiguration subscriptionConfiguration = null,
int emptyChannelDelay = 500,
int channelFailureDelay = 1000)
: base(dataType, name, channelName, routingKey, bufferSize, noOfPerformers, timeoutInMilliseconds, requeueCount, requeueDelayInMs, unacceptableMessageLimit, isAsync, channelFactory,
makeChannels)
makeChannels, emptyChannelDelay, channelFailureDelay)
{
Configuration = subscriptionConfiguration ?? new AzureServiceBusSubscriptionConfiguration();
}
Expand Down Expand Up @@ -70,6 +74,8 @@ public class AzureServiceBusSubscription<T> : AzureServiceBusSubscription where
/// <param name="channelFactory">The channel factory to create channels for Consumer.</param>
/// <param name="makeChannels">Should we make channels if they don't exist, defaults to creating</param>
/// <param name="subscriptionConfiguration">The configuration options for the subscriptions.</param>
/// <param name="emptyChannelDelay">How long to pause when a channel is empty in milliseconds</param>
/// <param name="channelFailureDelay">How long to pause when there is a channel failure in milliseconds</param>
public AzureServiceBusSubscription(
SubscriptionName name = null,
ChannelName channelName = null,
Expand All @@ -83,10 +89,12 @@ public AzureServiceBusSubscription(
bool isAsync = false,
IAmAChannelFactory channelFactory = null,
OnMissingChannel makeChannels = OnMissingChannel.Create,
AzureServiceBusSubscriptionConfiguration subscriptionConfiguration = null)
AzureServiceBusSubscriptionConfiguration subscriptionConfiguration = null,
int emptyChannelDelay = 500,
int channelFailureDelay = 1000)
: base(typeof(T), name, channelName, routingKey, bufferSize, noOfPerformers,
timeoutInMilliseconds, requeueCount, requeueDelayInMs, unacceptableMessageLimit,
isAsync, channelFactory, makeChannels, subscriptionConfiguration)
isAsync, channelFactory, makeChannels, subscriptionConfiguration, emptyChannelDelay, channelFailureDelay)
{
}
}
Expand Down
16 changes: 12 additions & 4 deletions src/Paramore.Brighter.MessagingGateway.Kafka/KafkaSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ public class KafkaSubscription : Subscription
/// <param name="numOfPartitions">How many partitions should this topic have - used if we create the topic</param>
/// <param name="replicationFactor">How many copies of each partition should we have across our broker's nodes - used if we create the topic</param> /// <param name="channelFactory">The channel factory to create channels for Consumer.</param>
/// <param name="makeChannels">Should we make channels if they don't exist, defaults to creating</param>
/// <param name="emptyChannelDelay">How long to pause when a channel is empty in milliseconds</param>
/// <param name="channelFailureDelay">How long to pause when there is a channel failure in milliseconds</param>
public KafkaSubscription (
Type dataType,
SubscriptionName name = null,
Expand All @@ -141,9 +143,11 @@ public KafkaSubscription (
int numOfPartitions = 1,
short replicationFactor = 1,
IAmAChannelFactory channelFactory = null,
OnMissingChannel makeChannels = OnMissingChannel.Create)
OnMissingChannel makeChannels = OnMissingChannel.Create,
int emptyChannelDelay = 500,
int channelFailureDelay = 1000)
: base(dataType, name, channelName, routingKey, bufferSize, noOfPerformers, timeoutInMilliseconds, requeueCount,
requeueDelayInMilliseconds, unacceptableMessageLimit, isAsync, channelFactory, makeChannels)
requeueDelayInMilliseconds, unacceptableMessageLimit, isAsync, channelFactory, makeChannels, emptyChannelDelay, channelFailureDelay)
{
CommitBatchSize = commitBatchSize;
GroupId = groupId;
Expand Down Expand Up @@ -183,6 +187,8 @@ public class KafkaSubscription<T> : KafkaSubscription where T : IRequest
/// <param name="replicationFactor">How many copies of each partition should we have across our broker's nodes - used if we create the topic</param>
/// <param name="makeChannels">Should we make channels if they don't exist, defaults to creating</param>
/// <param name="channelFactory">The channel factory to create channels for Consumer.</param>
/// <param name="emptyChannelDelay">How long to pause when a channel is empty in milliseconds</param>
/// <param name="channelFailureDelay">How long to pause when there is a channel failure in milliseconds</param>
public KafkaSubscription(
SubscriptionName name = null,
ChannelName channelName = null,
Expand All @@ -204,11 +210,13 @@ public KafkaSubscription(
int numOfPartitions = 1,
short replicationFactor = 1,
IAmAChannelFactory channelFactory = null,
OnMissingChannel makeChannels = OnMissingChannel.Create)
OnMissingChannel makeChannels = OnMissingChannel.Create,
int emptyChannelDelay = 500,
int channelFailureDelay = 1000)
: base(typeof(T), name, channelName, routingKey, groupId, bufferSize, noOfPerformers, timeoutInMilliseconds,
requeueCount, requeueDelayInMilliseconds, unacceptableMessageLimit, offsetDefault, commitBatchSize,
sessionTimeoutMs, maxPollIntervalMs, sweepUncommittedOffsetsIntervalMs, isolationLevel, isAsync,
numOfPartitions, replicationFactor, channelFactory, makeChannels)
numOfPartitions, replicationFactor, channelFactory, makeChannels, emptyChannelDelay, channelFailureDelay)
{
}
}
Expand Down
16 changes: 12 additions & 4 deletions src/Paramore.Brighter.MessagingGateway.MsSql/MsSqlSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class MsSqlSubscription : Subscription
/// <param name="runAsync">Is this channel read asynchronously</param>
/// <param name="channelFactory">The channel factory to create channels for Consumer.</param>
/// <param name="makeChannels">Should we make channels if they don't exist, defaults to creating</param>
/// <param name="emptyChannelDelay">How long to pause when a channel is empty in milliseconds</param>
/// <param name="channelFailureDelay">How long to pause when there is a channel failure in milliseconds</param>
public MsSqlSubscription(
Type dataType,
SubscriptionName name = null,
Expand All @@ -57,9 +59,11 @@ public MsSqlSubscription(
int unacceptableMessageLimit = 0,
bool runAsync = false,
IAmAChannelFactory channelFactory = null,
OnMissingChannel makeChannels = OnMissingChannel.Create)
OnMissingChannel makeChannels = OnMissingChannel.Create,
int emptyChannelDelay = 500,
int channelFailureDelay = 1000)
: base(dataType, name, channelName, routingKey, bufferSize, noOfPerformers, timeoutInMilliseconds, requeueCount,
requeueDelayInMilliseconds, unacceptableMessageLimit, runAsync, channelFactory, makeChannels)
requeueDelayInMilliseconds, unacceptableMessageLimit, runAsync, channelFactory, makeChannels, emptyChannelDelay, channelFailureDelay)
{
}
}
Expand All @@ -81,6 +85,8 @@ public class MsSqlSubscription<T> : MsSqlSubscription where T : IRequest
/// <param name="runAsync">Is this channel read asynchronously</param>
/// <param name="channelFactory">The channel factory to create channels for Consumer.</param>
/// <param name="makeChannels">Should we make channels if they don't exist, defaults to creating</param>
/// <param name="emptyChannelDelay">How long to pause when a channel is empty in milliseconds</param>
/// <param name="channelFailureDelay">How long to pause when there is a channel failure in milliseconds</param>
public MsSqlSubscription(
SubscriptionName name = null,
ChannelName channelName = null,
Expand All @@ -93,9 +99,11 @@ public MsSqlSubscription(
int unacceptableMessageLimit = 0,
bool runAsync = false,
IAmAChannelFactory channelFactory = null,
OnMissingChannel makeChannels = OnMissingChannel.Create)
OnMissingChannel makeChannels = OnMissingChannel.Create,
int emptyChannelDelay = 500,
int channelFailureDelay = 1000)
: base(typeof(T), name, channelName, routingKey, bufferSize, noOfPerformers, timeoutInMilliseconds, requeueCount,
requeueDelayInMilliseconds, unacceptableMessageLimit, runAsync, channelFactory, makeChannels)
requeueDelayInMilliseconds, unacceptableMessageLimit, runAsync, channelFactory, makeChannels, emptyChannelDelay, channelFailureDelay)
{
}

Expand Down
16 changes: 12 additions & 4 deletions src/Paramore.Brighter.MessagingGateway.RMQ/RmqSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public class RmqSubscription : Subscription
/// <param name="deadLetterRoutingKey">The routing key for dead letters</param>
/// <param name="ttl">Time to live in ms of a message on a queue; null (the default) is inifinite</param>
/// <param name="makeChannels">Should we make channels if they don't exist, defaults to creating</param>
/// <param name="emptyChannelDelay">How long to pause when a channel is empty in milliseconds</param>
/// <param name="channelFailureDelay">How long to pause when there is a channel failure in milliseconds</param>
public RmqSubscription(
Type dataType,
SubscriptionName name = null,
Expand All @@ -105,8 +107,10 @@ public RmqSubscription(
ChannelName deadLetterChannelName = null,
string deadLetterRoutingKey = null,
int? ttl = null,
OnMissingChannel makeChannels = OnMissingChannel.Create)
: base(dataType, name, channelName, routingKey, bufferSize, noOfPerformers, timeoutInMilliseconds, requeueCount, requeueDelayInMilliseconds, unacceptableMessageLimit, runAsync, channelFactory, makeChannels)
OnMissingChannel makeChannels = OnMissingChannel.Create,
int emptyChannelDelay = 500,
int channelFailureDelay = 1000)
: base(dataType, name, channelName, routingKey, bufferSize, noOfPerformers, timeoutInMilliseconds, requeueCount, requeueDelayInMilliseconds, unacceptableMessageLimit, runAsync, channelFactory, makeChannels, emptyChannelDelay, channelFailureDelay)
{
DeadLetterRoutingKey = deadLetterRoutingKey;
DeadLetterChannelName = deadLetterChannelName;
Expand Down Expand Up @@ -139,6 +143,8 @@ public class RmqSubscription<T> : RmqSubscription where T : IRequest
/// <param name="deadLetterRoutingKey">The routing key for dead letters</param>
/// <param name="ttl">Time to live in ms of a message on a queue; null (the default) is inifinite</param>
/// <param name="makeChannels">Should we make channels if they don't exist, defaults to creating</param>
/// <param name="emptyChannelDelay">How long to pause when a channel is empty in milliseconds</param>
/// <param name="channelFailureDelay">How long to pause when there is a channel failure in milliseconds</param>
public RmqSubscription(SubscriptionName name = null,
ChannelName channelName = null,
RoutingKey routingKey = null,
Expand All @@ -155,9 +161,11 @@ public RmqSubscription(SubscriptionName name = null,
ChannelName deadLetterChannelName = null,
string deadLetterRoutingKey = null,
int? ttl = null,
OnMissingChannel makeChannels = OnMissingChannel.Create)
OnMissingChannel makeChannels = OnMissingChannel.Create,
int emptyChannelDelay = 500,
int channelFailureDelay = 1000)
: base(typeof(T), name, channelName, routingKey, bufferSize, noOfPerformers, timeoutInMilliseconds, requeueCount, requeueDelayInMilliseconds,
unacceptableMessageLimit, isDurable, runAsync, channelFactory, highAvailability, deadLetterChannelName, deadLetterRoutingKey, ttl, makeChannels)
unacceptableMessageLimit, isDurable, runAsync, channelFactory, highAvailability, deadLetterChannelName, deadLetterRoutingKey, ttl, makeChannels, emptyChannelDelay, channelFailureDelay)
{ }

}
Expand Down
16 changes: 12 additions & 4 deletions src/Paramore.Brighter.MessagingGateway.Redis/RedisSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class RedisSubscription : Subscription
/// <param name="runAsync">Is this channel read asynchronously</param>
/// <param name="channelFactory">The channel factory to create channels for Consumer.</param>
/// <param name="makeChannels">Should we make channels if they don't exist, defaults to creating</param>
/// <param name="emptyChannelDelay">How long to pause when a channel is empty in milliseconds</param>
/// <param name="channelFailureDelay">How long to pause when there is a channel failure in milliseconds</param>
public RedisSubscription(
Type dataType,
SubscriptionName name = null,
Expand All @@ -57,9 +59,11 @@ public RedisSubscription(
int unacceptableMessageLimit = 0,
bool runAsync = false,
IAmAChannelFactory channelFactory = null,
OnMissingChannel makeChannels = OnMissingChannel.Create)
OnMissingChannel makeChannels = OnMissingChannel.Create,
int emptyChannelDelay = 500,
int channelFailureDelay = 1000)
: base(dataType, name, channelName, routingKey, bufferSize, noOfPerformers, timeoutInMilliseconds, requeueCount,
requeueDelayInMilliseconds, unacceptableMessageLimit, runAsync, channelFactory, makeChannels)
requeueDelayInMilliseconds, unacceptableMessageLimit, runAsync, channelFactory, makeChannels, emptyChannelDelay, channelFailureDelay)
{
}
}
Expand All @@ -81,6 +85,8 @@ public class RedisSubscription<T> : RedisSubscription where T : IRequest
/// <param name="runAsync">Is this channel read asynchronously</param>
/// <param name="channelFactory">The channel factory to create channels for Consumer.</param>
/// <param name="makeChannels">Should we make channels if they don't exist, defaults to creating</param>
/// <param name="emptyChannelDelay">How long to pause when a channel is empty in milliseconds</param>
/// <param name="channelFailureDelay">How long to pause when there is a channel failure in milliseconds</param>
public RedisSubscription(
SubscriptionName name = null,
ChannelName channelName = null,
Expand All @@ -93,9 +99,11 @@ public RedisSubscription(
int unacceptableMessageLimit = 0,
bool runAsync = false,
IAmAChannelFactory channelFactory = null,
OnMissingChannel makeChannels = OnMissingChannel.Create)
OnMissingChannel makeChannels = OnMissingChannel.Create,
int emptyChannelDelay = 500,
int channelFailureDelay = 1000)
: base(typeof(T), name, channelName, routingKey, bufferSize, noOfPerformers, timeoutInMilliseconds, requeueCount,
requeueDelayInMilliseconds, unacceptableMessageLimit, runAsync, channelFactory, makeChannels)
requeueDelayInMilliseconds, unacceptableMessageLimit, runAsync, channelFactory, makeChannels, emptyChannelDelay, channelFailureDelay)
{
}
}
Expand Down
Loading

0 comments on commit fd2ee58

Please sign in to comment.