From d7ffd74204a49cec068a58c3746fd03800ae67dc Mon Sep 17 00:00:00 2001 From: Paul Reardon Date: Tue, 1 Oct 2024 13:56:59 +0100 Subject: [PATCH] Allow the Outbox Sweeper to wait for clearing so that the lock can work (#3334) Task #3321 --- .../TimedOutboxSweeper.cs | 13 +++++--- src/Paramore.Brighter/CommandProcessor.cs | 32 ++++++++++++++++++ src/Paramore.Brighter/ExternalBusServices.cs | 33 +++++++++++++++++++ src/Paramore.Brighter/IAmACommandProcessor.cs | 19 +++++++++++ src/Paramore.Brighter/OutboxSweeper.cs | 19 +++++++++++ .../TestDoubles/SpyCommandProcessor.cs | 14 ++++++++ .../TestDoubles/FakeCommandProcessor.cs | 12 +++++++ 7 files changed, 138 insertions(+), 4 deletions(-) diff --git a/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxSweeper.cs b/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxSweeper.cs index 2826c8179a..3fb1a269ee 100644 --- a/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxSweeper.cs +++ b/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxSweeper.cs @@ -30,12 +30,17 @@ public Task StartAsync(CancellationToken cancellationToken) { s_logger.LogInformation("Outbox Sweeper Service is starting."); - _timer = new Timer(DoWork, null, TimeSpan.Zero, TimeSpan.FromSeconds(_options.TimerInterval)); + _timer = new Timer(Callback, null, TimeSpan.Zero, TimeSpan.FromSeconds(_options.TimerInterval)); return Task.CompletedTask; } - private void DoWork(object state) + private async void Callback(object _) + { + await DoWorkAsync(); + } + + private async Task DoWorkAsync() { var lockId = _distributedLock.ObtainLockAsync(LockingResourceName, CancellationToken.None).Result; if (lockId != null) @@ -55,9 +60,9 @@ private void DoWork(object state) _options.Args); if (_options.UseBulk) - outBoxSweeper.SweepAsyncOutbox(); + await outBoxSweeper.SweepAsyncOutboxAsync(); else - outBoxSweeper.Sweep(); + await outBoxSweeper.SweepAsync(); } catch (Exception e) { diff --git a/src/Paramore.Brighter/CommandProcessor.cs b/src/Paramore.Brighter/CommandProcessor.cs index f74e052749..5736be64a1 100644 --- a/src/Paramore.Brighter/CommandProcessor.cs +++ b/src/Paramore.Brighter/CommandProcessor.cs @@ -670,6 +670,19 @@ public void ClearOutbox(int amountToClear = 100, int minimumAge = 5000, Dictiona { _bus.ClearOutbox(amountToClear, minimumAge, false, false, args); } + + /// + /// Flushes any outstanding message box message to the broker. + /// This will be run on a background task. + /// Intended for use with the Outbox pattern: http://gistlabs.com/2014/05/the-outbox/ + /// + /// The maximum number to clear. + /// The minimum age to clear in milliseconds. + /// Optional bag of arguments required by an outbox implementation to sweep + public Task ClearOutboxAsync(int amountToClear = 100, int minimumAge = 5000, Dictionary args = null) + { + return _bus.ClearOutboxAsync(amountToClear, minimumAge, false, false, args); + } /// /// Flushes the message box message given by to the broker. @@ -702,6 +715,25 @@ public void ClearAsyncOutbox( { _bus.ClearOutbox(amountToClear, minimumAge, true, useBulk, args); } + + /// + /// Flushes any outstanding message box message to the broker. + /// This will be run on a background task. + /// Intended for use with the Outbox pattern: http://gistlabs.com/2014/05/the-outbox/ + /// + /// The maximum number to clear. + /// The minimum age to clear in milliseconds. + /// Use the bulk send on the producer. + /// Optional bag of arguments required by an outbox implementation to sweep + public Task ClearAsyncOutboxAsync( + int amountToClear = 100, + int minimumAge = 5000, + bool useBulk = false, + Dictionary args = null + ) + { + return _bus.ClearOutboxAsync(amountToClear, minimumAge, true, useBulk, args); + } /// /// Uses the Request-Reply messaging approach to send a message to another server and block awaiting a reply. diff --git a/src/Paramore.Brighter/ExternalBusServices.cs b/src/Paramore.Brighter/ExternalBusServices.cs index d6e7325edd..20ea6dfbfc 100644 --- a/src/Paramore.Brighter/ExternalBusServices.cs +++ b/src/Paramore.Brighter/ExternalBusServices.cs @@ -273,6 +273,39 @@ internal void ClearOutbox(int amountToClear, int minimumAge, bool useAsync, bool Task.Run(() => BackgroundDispatchUsingSync(amountToClear, minimumAge, args)); } } + + /// + /// This is the clear outbox for implicit clearing of messages. + /// + /// Maximum number to clear. + /// The minimum age of messages to be cleared in milliseconds. + /// Use the Async outbox and Producer + /// Use bulk sending capability of the message producer, this must be paired with useAsync. + /// Optional bag of arguments required by an outbox implementation to sweep + internal Task ClearOutboxAsync(int amountToClear, int minimumAge, bool useAsync, bool useBulk, Dictionary args = null) + { + var span = Activity.Current; + span?.AddTag("amountToClear", amountToClear); + span?.AddTag("minimumAge", minimumAge); + span?.AddTag("async", useAsync); + span?.AddTag("bulk", useBulk); + + if (useAsync) + { + if (!HasAsyncOutbox()) + throw new InvalidOperationException("No async outbox defined."); + + return BackgroundDispatchUsingAsync(amountToClear, minimumAge, useBulk, args); + } + + else + { + if (!HasOutbox()) + throw new InvalidOperationException("No outbox defined."); + + return BackgroundDispatchUsingSync(amountToClear, minimumAge, args); + } + } private async Task BackgroundDispatchUsingSync(int amountToClear, int minimumAge, Dictionary args) { diff --git a/src/Paramore.Brighter/IAmACommandProcessor.cs b/src/Paramore.Brighter/IAmACommandProcessor.cs index 7f5cd5bb39..18a6ddbc6d 100644 --- a/src/Paramore.Brighter/IAmACommandProcessor.cs +++ b/src/Paramore.Brighter/IAmACommandProcessor.cs @@ -156,6 +156,15 @@ Task DepositPostAsync(IEnumerable requests, bool continueOnCapture /// The minimum age to clear in milliseconds. /// Optional bag of arguments required by an outbox implementation to sweep public void ClearOutbox(int amountToClear = 100, int minimumAge = 5000, Dictionary args = null); + + /// + /// Flushes any outstanding message box message to the broker. + /// Intended for use with the Outbox pattern: http://gistlabs.com/2014/05/the-outbox/ + /// + /// The maximum number to clear. + /// The minimum age to clear in milliseconds. + /// Optional bag of arguments required by an outbox implementation to sweep + public Task ClearOutboxAsync(int amountToClear = 100, int minimumAge = 5000, Dictionary args = null); /// /// Flushes the message box message given by to the broker. @@ -173,6 +182,16 @@ Task DepositPostAsync(IEnumerable requests, bool continueOnCapture /// Use the bulk send on the producer. /// Optional bag of arguments required by an outbox implementation to sweep public void ClearAsyncOutbox(int amountToClear = 100, int minimumAge = 5000, bool useBulk = false, Dictionary args = null); + + /// + /// Flushes any outstanding message box message to the broker. + /// Intended for use with the Outbox pattern: http://gistlabs.com/2014/05/the-outbox/ + /// + /// The maximum number to clear. + /// The minimum age to clear in milliseconds. + /// Use the bulk send on the producer. + /// Optional bag of arguments required by an outbox implementation to sweep + public Task ClearAsyncOutboxAsync(int amountToClear = 100, int minimumAge = 5000, bool useBulk = false, Dictionary args = null); /// /// Uses the Request-Reply messaging approach to send a message to another server and block awaiting a reply. diff --git a/src/Paramore.Brighter/OutboxSweeper.cs b/src/Paramore.Brighter/OutboxSweeper.cs index bb530b43cb..673f1d9890 100644 --- a/src/Paramore.Brighter/OutboxSweeper.cs +++ b/src/Paramore.Brighter/OutboxSweeper.cs @@ -1,5 +1,6 @@ using System.Collections.Generic; using System.Diagnostics; +using System.Threading.Tasks; namespace Paramore.Brighter { @@ -52,5 +53,23 @@ public void SweepAsyncOutbox() ApplicationTelemetry.ActivitySource.StartActivity(IMPLICITCLEAROUTBOX, ActivityKind.Server); _commandProcessor.ClearAsyncOutbox(_batchSize, _millisecondsSinceSent, _useBulk, _args); } + + /// + /// Dispatches the oldest un-dispatched messages from the outbox in a background thread. + /// + public Task SweepAsync() + { + ApplicationTelemetry.ActivitySource.StartActivity(IMPLICITCLEAROUTBOX, ActivityKind.Server); + return _commandProcessor.ClearOutboxAsync(_batchSize, _millisecondsSinceSent, _args); + } + + /// + /// Dispatches the oldest un-dispatched messages from the asynchronous outbox in a background thread. + /// + public Task SweepAsyncOutboxAsync() + { + ApplicationTelemetry.ActivitySource.StartActivity(IMPLICITCLEAROUTBOX, ActivityKind.Server); + return _commandProcessor.ClearAsyncOutboxAsync(_batchSize, _millisecondsSinceSent, _useBulk, _args); + } } } diff --git a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/TestDoubles/SpyCommandProcessor.cs b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/TestDoubles/SpyCommandProcessor.cs index edeae3a976..9d21338577 100644 --- a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/TestDoubles/SpyCommandProcessor.cs +++ b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/TestDoubles/SpyCommandProcessor.cs @@ -171,6 +171,13 @@ public void ClearOutbox(int amountToClear = 100, int minimumAge = 5000, Dictiona ClearParamsList.Add(new ClearParams { AmountToClear = amountToClear, MinimumAge = minimumAge, Args = args }); } + public Task ClearOutboxAsync(int amountToClear = 100, int minimumAge = 5000, + Dictionary args = null) + { + ClearOutbox(amountToClear, minimumAge, args); + return Task.CompletedTask; + } + public async Task ClearOutboxAsync(IEnumerable posts, bool continueOnCapturedContext = false, CancellationToken cancellationToken = default) { @@ -187,6 +194,13 @@ public void ClearAsyncOutbox(int amountToClear = 100, int minimumAge = 5000, boo ClearParamsList.Add(new ClearParams { AmountToClear = amountToClear, MinimumAge = minimumAge, Args = args }); } + public Task ClearAsyncOutboxAsync(int amountToClear = 100, int minimumAge = 5000, bool useBulk = false, + Dictionary args = null) + { + ClearAsyncOutbox(amountToClear, minimumAge, useBulk, args); + return Task.CompletedTask; + } + public Task BulkClearOutboxAsync(IEnumerable posts, bool continueOnCapturedContext = false, CancellationToken cancellationToken = default) { diff --git a/tests/Paramore.Brighter.InMemory.Tests/TestDoubles/FakeCommandProcessor.cs b/tests/Paramore.Brighter.InMemory.Tests/TestDoubles/FakeCommandProcessor.cs index b7d5a82f85..d23f2a9bbf 100644 --- a/tests/Paramore.Brighter.InMemory.Tests/TestDoubles/FakeCommandProcessor.cs +++ b/tests/Paramore.Brighter.InMemory.Tests/TestDoubles/FakeCommandProcessor.cs @@ -136,6 +136,12 @@ public void ClearOutbox(int amountToClear = 100, int minimumAge = 5000, Dictiona ClearOutbox(depositedMessages); } + + public Task ClearOutboxAsync(int amountToClear = 100, int minimumAge = 5000, Dictionary args = null) + { + ClearOutbox(amountToClear, minimumAge, args); + return Task.CompletedTask; + } public Task ClearOutboxAsync(IEnumerable posts, bool continueOnCapturedContext = false, CancellationToken cancellationToken = default) { @@ -155,6 +161,12 @@ public void ClearAsyncOutbox(int amountToClear = 100, int minimumAge = 5000, boo { ClearOutbox(amountToClear, minimumAge); } + + public Task ClearAsyncOutboxAsync(int amountToClear = 100, int minimumAge = 5000, bool useBulk = false, Dictionary args = null) + { + ClearOutbox(amountToClear, minimumAge); + return Task.CompletedTask; + } public Task BulkClearOutboxAsync(IEnumerable posts, bool continueOnCapturedContext = false, CancellationToken cancellationToken = default)