From 3d961e3ab2b2003a40095066969dd82da50e9366 Mon Sep 17 00:00:00 2001 From: Mike Goodfellow Date: Thu, 19 Aug 2021 09:16:20 +0100 Subject: [PATCH 1/4] Add RedisBusyLockExtension --- .../RedisBusyLockExtension.cs | 139 ++++++++++++++++ .../RedisBusyLockOptions.cs | 69 ++++++++ src/CacheTower/CacheStack.cs | 2 +- .../Redis/RedisBusyLockExtensionTests.cs | 150 ++++++++++++++++++ 4 files changed, 359 insertions(+), 1 deletion(-) create mode 100644 src/CacheTower.Extensions.Redis/RedisBusyLockExtension.cs create mode 100644 src/CacheTower.Extensions.Redis/RedisBusyLockOptions.cs create mode 100644 tests/CacheTower.Tests/Extensions/Redis/RedisBusyLockExtensionTests.cs diff --git a/src/CacheTower.Extensions.Redis/RedisBusyLockExtension.cs b/src/CacheTower.Extensions.Redis/RedisBusyLockExtension.cs new file mode 100644 index 00000000..b9948aa7 --- /dev/null +++ b/src/CacheTower.Extensions.Redis/RedisBusyLockExtension.cs @@ -0,0 +1,139 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Text; +using StackExchange.Redis; +using System.Threading.Tasks; + +namespace CacheTower.Extensions.Redis +{ + /// + /// Provides distributed cache locking via Redis. + /// This approach uses busy wait instead of pub/sub to check locks. + /// + public class RedisBusyLockExtension + { + private IDatabaseAsync Database { get; } + private RedisBusyLockOptions Options { get; } + + private ICacheStack? RegisteredStack { get; set; } + + internal ConcurrentDictionary> LockedOnKeyRefresh { get; } + + /// + /// Creates a new instance of with the given and default lock options. + /// + /// The primary connection to Redis where the distributed lock will be co-ordinated through. + public RedisBusyLockExtension(IConnectionMultiplexer connection) : this(connection, RedisBusyLockOptions.Default) { } + + /// + /// Creates a new instance of with the given and . + /// + /// The primary connection to Redis where the distributed lock will be co-ordinated through. + /// The lock options to configure the behaviour of locking. + public RedisBusyLockExtension(IConnectionMultiplexer connection, RedisBusyLockOptions options) + { + if (connection == null) + { + throw new ArgumentNullException(nameof(connection)); + } + + Options = options; + Database = connection.GetDatabase(options.DatabaseIndex); + + LockedOnKeyRefresh = new ConcurrentDictionary>(StringComparer.Ordinal); + } + + /// + public void Register(ICacheStack cacheStack) + { + if (RegisteredStack != null) + { + throw new InvalidOperationException($"{nameof(RedisBusyLockExtension)} can only be registered to one {nameof(ICacheStack)}"); + } + + RegisteredStack = cacheStack; + } + + /// + /// The attempts to set a key in Redis representing whether it has achieved a lock. + /// If it succeeds to set the key, it continues to refresh the value, removing the lock when finished. + /// If it fails to set the key, the first context will set a check process to check for the lock being released, + /// once done it can retrieve latest update from the cache stack and returning the value. + /// + /// + public async ValueTask> WithRefreshAsync(string cacheKey, Func>> valueProvider, CacheSettings settings) + { + var lockToken = Guid.NewGuid().ToString(); + var lockKey = $"{cacheKey}_lock"; + var hasLock = await Database.LockTakeAsync(lockKey, lockToken, Options.LockTimeout); + + if (hasLock) + { + try + { + var cacheEntry = await valueProvider(); + return cacheEntry; + } + finally + { + await Database.LockReleaseAsync(lockKey, lockToken); + } + } + + var completionSource = LockedOnKeyRefresh.GetOrAdd(cacheKey, key => + { + var tcs = new TaskCompletionSource(); + + _ = TestLock(tcs); + + return tcs; + + async Task TestLock(TaskCompletionSource taskCompletionSource) + { + var spinAttempt = 0; + + while (spinAttempt <= Options.SpinAttempts) + { + spinAttempt++; + + var lockQuery = await Database.LockQueryAsync(lockKey); + + if (lockQuery.HasValue) + { + await Task.Delay(Options.SpinTime); + continue; + } + + taskCompletionSource.TrySetResult(true); + return; + } + + taskCompletionSource.TrySetCanceled(); + } + }); + + //Last minute check to confirm whether waiting is required (in case the notification is missed) + var currentEntry = await RegisteredStack!.GetAsync(cacheKey); + if (currentEntry != null && currentEntry.GetStaleDate(settings) > Internal.DateTimeProvider.Now) + { + UnlockWaitingTasks(cacheKey); + return currentEntry; + } + + //Lock until we are notified to be unlocked + await completionSource.Task; + + //Get the updated value from the cache stack + return (await RegisteredStack.GetAsync(cacheKey))!; + } + + private void UnlockWaitingTasks(string cacheKey) + { + if (LockedOnKeyRefresh.TryRemove(cacheKey, out var waitingTasks)) + { + waitingTasks.TrySetResult(true); + } + } + } +} diff --git a/src/CacheTower.Extensions.Redis/RedisBusyLockOptions.cs b/src/CacheTower.Extensions.Redis/RedisBusyLockOptions.cs new file mode 100644 index 00000000..fa300fc4 --- /dev/null +++ b/src/CacheTower.Extensions.Redis/RedisBusyLockOptions.cs @@ -0,0 +1,69 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Text; +using StackExchange.Redis; +using System.Threading.Tasks; + +namespace CacheTower.Extensions.Redis +{ + /// + /// Lock options for use by the . + /// + public class RedisBusyLockOptions + { + /// + /// The default options for . + /// + /// + /// + /// - : 1 minute
+ /// - : 100 milliseconds
+ /// - : The default database configured on the connection. + ///
+ ///
+ public static readonly RedisBusyLockOptions Default = new RedisBusyLockOptions(); + + /// + /// The database index used for the Redis lock. + /// If not specified, uses the default database as configured on the connection. + /// + public int DatabaseIndex { get; } + + /// + /// How long to wait on the lock before having it expire. + /// + public TimeSpan LockTimeout { get; } + + /// + /// How open to recheck the lock when we were not the context that acquired the lock + /// + public TimeSpan SpinTime { get; } + + /// + /// Number of attempts to check the lock before giving up + /// + public int SpinAttempts { get; } + + /// + /// Creates a new instance of the . + /// + /// How long to wait on the lock before having it expire. Defaults to 1 minute. + /// How long to wait between rechecking the lock. Defaults to 100 milliseconds. + /// + /// The database index used for the Redis lock. + /// If not specified, uses the default database as configured on the connection. + /// + public RedisBusyLockOptions( + TimeSpan? lockTimeout = default, + TimeSpan? spinTime = default, + int databaseIndex = -1 + ) + { + LockTimeout = lockTimeout ?? TimeSpan.FromMinutes(1); + SpinTime = spinTime ?? TimeSpan.FromMilliseconds(100); + SpinAttempts = (int)Math.Ceiling(LockTimeout.TotalMilliseconds / SpinTime.TotalMilliseconds); + DatabaseIndex = databaseIndex; + } + } +} diff --git a/src/CacheTower/CacheStack.cs b/src/CacheTower/CacheStack.cs index a26425ea..10b1c540 100644 --- a/src/CacheTower/CacheStack.cs +++ b/src/CacheTower/CacheStack.cs @@ -324,7 +324,7 @@ private async ValueTask BackPopulateCacheAsync(int fromIndexExclusive, string else if (noExistingValueAvailable) { TaskCompletionSource completionSource; - + lock (WaitingKeyRefresh) { if (!WaitingKeyRefresh.TryGetValue(cacheKey, out completionSource!) || completionSource == null) diff --git a/tests/CacheTower.Tests/Extensions/Redis/RedisBusyLockExtensionTests.cs b/tests/CacheTower.Tests/Extensions/Redis/RedisBusyLockExtensionTests.cs new file mode 100644 index 00000000..23ab6d2a --- /dev/null +++ b/tests/CacheTower.Tests/Extensions/Redis/RedisBusyLockExtensionTests.cs @@ -0,0 +1,150 @@ +using System; +using System.Threading.Tasks; +using CacheTower.Extensions.Redis; +using CacheTower.Tests.Utils; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Moq; + +namespace CacheTower.Tests.Extensions.Redis +{ + [TestClass] + public class RedisBusyLockExtensionTests + { + [TestMethod, ExpectedException(typeof(ArgumentNullException))] + public void ThrowForNullConnection() + { + new RedisBusyLockExtension(null); + } + + [TestMethod, ExpectedException(typeof(ArgumentOutOfRangeException))] + public void ThrowForInvalidDatabaseIndex() + { + new RedisBusyLockExtension(RedisHelper.GetConnection(), new RedisBusyLockOptions(databaseIndex: -10)); + } + + [TestMethod, ExpectedException(typeof(InvalidOperationException))] + public void ThrowForRegisteringTwoCacheStacks() + { + var extension = new RedisBusyLockExtension(RedisHelper.GetConnection()); + var cacheStack = new Mock().Object; + extension.Register(cacheStack); + extension.Register(cacheStack); + } + + [TestMethod] + public async Task RefreshValueClearsLock() + { + RedisHelper.ResetState(); + + var connection = RedisHelper.GetConnection(); + + var cacheStackMock = new Mock(); + var extension = new RedisBusyLockExtension(connection, RedisBusyLockOptions.Default); + extension.Register(cacheStackMock.Object); + + var cacheEntry = new CacheEntry(13, TimeSpan.FromDays(1)); + + await extension.WithRefreshAsync("TestKey", + () => new ValueTask>(cacheEntry), new CacheSettings(TimeSpan.FromDays(1))); + + var lockStatus = await connection.GetDatabase().LockQueryAsync("TestKey_lock"); + + Assert.IsFalse(lockStatus.HasValue, "Lock was not cleared after refreshValue"); + } + + + [TestMethod] + public async Task ObservedLockSingle() + { + RedisHelper.ResetState(); + + var connection = RedisHelper.GetConnection(); + + var cacheStackMock = new Mock(); + var extension = new RedisBusyLockExtension(connection, RedisBusyLockOptions.Default); + extension.Register(cacheStackMock.Object); + + var cacheEntry = new CacheEntry(13, TimeSpan.FromDays(1)); + + //Establish lock + await connection.GetDatabase().LockTakeAsync("TestKey_lock", "token", TimeSpan.FromMinutes(1)); + + var refreshTask = extension.WithRefreshAsync("TestKey", + () => + { + return new ValueTask>(cacheEntry); + }, + new CacheSettings(TimeSpan.FromDays(1)) + ).AsTask(); + + //Delay to allow for Redis check and self-entry into lock + await Task.Delay(TimeSpan.FromSeconds(1)); + + Assert.IsTrue(extension.LockedOnKeyRefresh.ContainsKey("TestKey"), "Lock was not established"); + + //Trigger the end of the lock + await connection.GetDatabase().LockReleaseAsync("TestKey_lock", "token"); + + var succeedingTask = await Task.WhenAny(refreshTask, Task.Delay(TimeSpan.FromSeconds(10))); + if (!succeedingTask.Equals(refreshTask)) + { + RedisHelper.DebugInfo(connection); + Assert.Fail("Refresh has timed out - something has gone very wrong"); + } + + cacheStackMock.Verify(c => c.GetAsync("TestKey"), Times.Exactly(2), "Two checks to the cache stack are expected"); + } + + + [TestMethod] + public async Task ObservedLockMultiple() + { + RedisHelper.ResetState(); + + var connection = RedisHelper.GetConnection(); + + var cacheStackMock = new Mock(); + var extension = new RedisBusyLockExtension(connection, RedisBusyLockOptions.Default); + extension.Register(cacheStackMock.Object); + + var cacheEntry = new CacheEntry(13, TimeSpan.FromDays(1)); + + //Establish lock + await connection.GetDatabase().LockTakeAsync("TestKey_lock", "token", TimeSpan.FromMinutes(1)); + + var refreshTask1 = extension.WithRefreshAsync("TestKey", + () => + { + return new ValueTask>(cacheEntry); + }, + new CacheSettings(TimeSpan.FromDays(1)) + ).AsTask(); + + var refreshTask2 = extension.WithRefreshAsync("TestKey", + () => + { + return new ValueTask>(cacheEntry); + }, + new CacheSettings(TimeSpan.FromDays(1)) + ).AsTask(); + + //Delay to allow for Redis check and self-entry into lock + await Task.Delay(TimeSpan.FromSeconds(2)); + + Assert.IsTrue(extension.LockedOnKeyRefresh.ContainsKey("TestKey"), "Lock was not established"); + + //Trigger the end of the lock + await connection.GetDatabase().LockReleaseAsync("TestKey_lock", "token"); + + var whenAllRefreshesTask = Task.WhenAll(refreshTask1, refreshTask2); + var succeedingTask = await Task.WhenAny(whenAllRefreshesTask, Task.Delay(TimeSpan.FromSeconds(10))); + if (!succeedingTask.Equals(whenAllRefreshesTask)) + { + RedisHelper.DebugInfo(connection); + Assert.Fail("Refresh has timed out - something has gone very wrong"); + } + + cacheStackMock.Verify(c => c.GetAsync("TestKey"), Times.Exactly(4), "Two checks to the cache stack are expected"); + } + } +} \ No newline at end of file From 3fd404b93e6d4349cd070137e5dd6626881f6035 Mon Sep 17 00:00:00 2001 From: Mike Goodfellow Date: Fri, 20 Aug 2021 13:01:06 +0100 Subject: [PATCH 2/4] Refactor into one lock --- .../RedisBusyLockExtension.cs | 139 ---------------- .../RedisBusyLockOptions.cs | 69 -------- .../RedisLockExtension.cs | 31 ++++ .../RedisLockOptions.cs | 30 +++- .../Redis/RedisBusyLockExtensionTests.cs | 150 ------------------ .../Redis/RedisLockExtensionTests.cs | 46 ++++++ 6 files changed, 106 insertions(+), 359 deletions(-) delete mode 100644 src/CacheTower.Extensions.Redis/RedisBusyLockExtension.cs delete mode 100644 src/CacheTower.Extensions.Redis/RedisBusyLockOptions.cs delete mode 100644 tests/CacheTower.Tests/Extensions/Redis/RedisBusyLockExtensionTests.cs diff --git a/src/CacheTower.Extensions.Redis/RedisBusyLockExtension.cs b/src/CacheTower.Extensions.Redis/RedisBusyLockExtension.cs deleted file mode 100644 index b9948aa7..00000000 --- a/src/CacheTower.Extensions.Redis/RedisBusyLockExtension.cs +++ /dev/null @@ -1,139 +0,0 @@ -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Text; -using StackExchange.Redis; -using System.Threading.Tasks; - -namespace CacheTower.Extensions.Redis -{ - /// - /// Provides distributed cache locking via Redis. - /// This approach uses busy wait instead of pub/sub to check locks. - /// - public class RedisBusyLockExtension - { - private IDatabaseAsync Database { get; } - private RedisBusyLockOptions Options { get; } - - private ICacheStack? RegisteredStack { get; set; } - - internal ConcurrentDictionary> LockedOnKeyRefresh { get; } - - /// - /// Creates a new instance of with the given and default lock options. - /// - /// The primary connection to Redis where the distributed lock will be co-ordinated through. - public RedisBusyLockExtension(IConnectionMultiplexer connection) : this(connection, RedisBusyLockOptions.Default) { } - - /// - /// Creates a new instance of with the given and . - /// - /// The primary connection to Redis where the distributed lock will be co-ordinated through. - /// The lock options to configure the behaviour of locking. - public RedisBusyLockExtension(IConnectionMultiplexer connection, RedisBusyLockOptions options) - { - if (connection == null) - { - throw new ArgumentNullException(nameof(connection)); - } - - Options = options; - Database = connection.GetDatabase(options.DatabaseIndex); - - LockedOnKeyRefresh = new ConcurrentDictionary>(StringComparer.Ordinal); - } - - /// - public void Register(ICacheStack cacheStack) - { - if (RegisteredStack != null) - { - throw new InvalidOperationException($"{nameof(RedisBusyLockExtension)} can only be registered to one {nameof(ICacheStack)}"); - } - - RegisteredStack = cacheStack; - } - - /// - /// The attempts to set a key in Redis representing whether it has achieved a lock. - /// If it succeeds to set the key, it continues to refresh the value, removing the lock when finished. - /// If it fails to set the key, the first context will set a check process to check for the lock being released, - /// once done it can retrieve latest update from the cache stack and returning the value. - /// - /// - public async ValueTask> WithRefreshAsync(string cacheKey, Func>> valueProvider, CacheSettings settings) - { - var lockToken = Guid.NewGuid().ToString(); - var lockKey = $"{cacheKey}_lock"; - var hasLock = await Database.LockTakeAsync(lockKey, lockToken, Options.LockTimeout); - - if (hasLock) - { - try - { - var cacheEntry = await valueProvider(); - return cacheEntry; - } - finally - { - await Database.LockReleaseAsync(lockKey, lockToken); - } - } - - var completionSource = LockedOnKeyRefresh.GetOrAdd(cacheKey, key => - { - var tcs = new TaskCompletionSource(); - - _ = TestLock(tcs); - - return tcs; - - async Task TestLock(TaskCompletionSource taskCompletionSource) - { - var spinAttempt = 0; - - while (spinAttempt <= Options.SpinAttempts) - { - spinAttempt++; - - var lockQuery = await Database.LockQueryAsync(lockKey); - - if (lockQuery.HasValue) - { - await Task.Delay(Options.SpinTime); - continue; - } - - taskCompletionSource.TrySetResult(true); - return; - } - - taskCompletionSource.TrySetCanceled(); - } - }); - - //Last minute check to confirm whether waiting is required (in case the notification is missed) - var currentEntry = await RegisteredStack!.GetAsync(cacheKey); - if (currentEntry != null && currentEntry.GetStaleDate(settings) > Internal.DateTimeProvider.Now) - { - UnlockWaitingTasks(cacheKey); - return currentEntry; - } - - //Lock until we are notified to be unlocked - await completionSource.Task; - - //Get the updated value from the cache stack - return (await RegisteredStack.GetAsync(cacheKey))!; - } - - private void UnlockWaitingTasks(string cacheKey) - { - if (LockedOnKeyRefresh.TryRemove(cacheKey, out var waitingTasks)) - { - waitingTasks.TrySetResult(true); - } - } - } -} diff --git a/src/CacheTower.Extensions.Redis/RedisBusyLockOptions.cs b/src/CacheTower.Extensions.Redis/RedisBusyLockOptions.cs deleted file mode 100644 index fa300fc4..00000000 --- a/src/CacheTower.Extensions.Redis/RedisBusyLockOptions.cs +++ /dev/null @@ -1,69 +0,0 @@ -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Text; -using StackExchange.Redis; -using System.Threading.Tasks; - -namespace CacheTower.Extensions.Redis -{ - /// - /// Lock options for use by the . - /// - public class RedisBusyLockOptions - { - /// - /// The default options for . - /// - /// - /// - /// - : 1 minute
- /// - : 100 milliseconds
- /// - : The default database configured on the connection. - ///
- ///
- public static readonly RedisBusyLockOptions Default = new RedisBusyLockOptions(); - - /// - /// The database index used for the Redis lock. - /// If not specified, uses the default database as configured on the connection. - /// - public int DatabaseIndex { get; } - - /// - /// How long to wait on the lock before having it expire. - /// - public TimeSpan LockTimeout { get; } - - /// - /// How open to recheck the lock when we were not the context that acquired the lock - /// - public TimeSpan SpinTime { get; } - - /// - /// Number of attempts to check the lock before giving up - /// - public int SpinAttempts { get; } - - /// - /// Creates a new instance of the . - /// - /// How long to wait on the lock before having it expire. Defaults to 1 minute. - /// How long to wait between rechecking the lock. Defaults to 100 milliseconds. - /// - /// The database index used for the Redis lock. - /// If not specified, uses the default database as configured on the connection. - /// - public RedisBusyLockOptions( - TimeSpan? lockTimeout = default, - TimeSpan? spinTime = default, - int databaseIndex = -1 - ) - { - LockTimeout = lockTimeout ?? TimeSpan.FromMinutes(1); - SpinTime = spinTime ?? TimeSpan.FromMilliseconds(100); - SpinAttempts = (int)Math.Ceiling(LockTimeout.TotalMilliseconds / SpinTime.TotalMilliseconds); - DatabaseIndex = databaseIndex; - } - } -} diff --git a/src/CacheTower.Extensions.Redis/RedisLockExtension.cs b/src/CacheTower.Extensions.Redis/RedisLockExtension.cs index a8f2bcd3..0aff2eec 100644 --- a/src/CacheTower.Extensions.Redis/RedisLockExtension.cs +++ b/src/CacheTower.Extensions.Redis/RedisLockExtension.cs @@ -93,7 +93,38 @@ public async ValueTask> WithRefreshAsync(string cacheKey, Func< var tcs = new TaskCompletionSource(); var cts = new CancellationTokenSource(Options.LockTimeout); cts.Token.Register(tcs => ((TaskCompletionSource)tcs).TrySetCanceled(), tcs, useSynchronizationContext: false); + + if (Options.UseBusyLockCheck) + { + _ = TestLock(tcs); + } + return tcs; + + async Task TestLock(TaskCompletionSource taskCompletionSource) + { + var spinAttempt = 0; + + while (spinAttempt <= Options.SpinAttempts && + !taskCompletionSource.Task.IsCanceled && + !taskCompletionSource.Task.IsCompleted) + { + spinAttempt++; + + var lockQuery = await Database.LockQueryAsync(lockKey); + + if (lockQuery.HasValue) + { + await Task.Delay(Options.SpinTime); + continue; + } + + taskCompletionSource.TrySetResult(true); + return; + } + + taskCompletionSource.TrySetCanceled(); + } }); //Last minute check to confirm whether waiting is required (in case the notification is missed) diff --git a/src/CacheTower.Extensions.Redis/RedisLockOptions.cs b/src/CacheTower.Extensions.Redis/RedisLockOptions.cs index fd5430cf..a56c46a0 100644 --- a/src/CacheTower.Extensions.Redis/RedisLockOptions.cs +++ b/src/CacheTower.Extensions.Redis/RedisLockOptions.cs @@ -16,6 +16,7 @@ public class RedisLockOptions /// - : "CacheTower.CacheLock"
/// - : "Lock:{0}"
/// - : The default database configured on the connection. + /// - : Unused. /// /// public static readonly RedisLockOptions Default = new RedisLockOptions(); @@ -25,20 +26,39 @@ public class RedisLockOptions /// If not specified, uses the default database as configured on the connection. /// public int DatabaseIndex { get; } + /// /// The Redis channel to communicate unlocking events across. /// public string RedisChannel { get; } + /// /// How long to wait on the lock before having it expire. /// public TimeSpan LockTimeout { get; } + /// /// A compatible string used to create the lock key stored in Redis. /// The cache key is provided as argument {0}. /// public string KeyFormat { get; } + /// + /// Is the busy lock check enabled + /// + public bool UseBusyLockCheck { get; } + + /// + /// How open to recheck the lock when we were not the context that acquired the lock + /// + public TimeSpan SpinTime { get; } + + /// + /// Number of attempts to check the lock before giving up + /// + public int SpinAttempts { get; } + + /// /// Creates a new instance of the . /// @@ -53,17 +73,25 @@ public class RedisLockOptions /// The database index used for the Redis lock. /// If not specified, uses the default database as configured on the connection. /// + /// + /// The waiter on the lock also performs a tight loop to check for lock release + /// This can avoid the situation of a missed message on redis pub/sub + /// public RedisLockOptions( TimeSpan? lockTimeout = default, string redisChannel = "CacheTower.CacheLock", string keyFormat = "Lock:{0}", - int databaseIndex = -1 + int databaseIndex = -1, + TimeSpan? spinTime = default ) { LockTimeout = lockTimeout ?? TimeSpan.FromMinutes(1); RedisChannel = redisChannel ?? throw new ArgumentNullException(nameof(redisChannel)); KeyFormat = keyFormat ?? throw new ArgumentNullException(nameof(keyFormat)); DatabaseIndex = databaseIndex; + UseBusyLockCheck = spinTime.HasValue; + SpinTime = spinTime ?? TimeSpan.FromMilliseconds(100); + SpinAttempts = (int)Math.Ceiling(LockTimeout.TotalMilliseconds / SpinTime.TotalMilliseconds); } } } diff --git a/tests/CacheTower.Tests/Extensions/Redis/RedisBusyLockExtensionTests.cs b/tests/CacheTower.Tests/Extensions/Redis/RedisBusyLockExtensionTests.cs deleted file mode 100644 index 23ab6d2a..00000000 --- a/tests/CacheTower.Tests/Extensions/Redis/RedisBusyLockExtensionTests.cs +++ /dev/null @@ -1,150 +0,0 @@ -using System; -using System.Threading.Tasks; -using CacheTower.Extensions.Redis; -using CacheTower.Tests.Utils; -using Microsoft.VisualStudio.TestTools.UnitTesting; -using Moq; - -namespace CacheTower.Tests.Extensions.Redis -{ - [TestClass] - public class RedisBusyLockExtensionTests - { - [TestMethod, ExpectedException(typeof(ArgumentNullException))] - public void ThrowForNullConnection() - { - new RedisBusyLockExtension(null); - } - - [TestMethod, ExpectedException(typeof(ArgumentOutOfRangeException))] - public void ThrowForInvalidDatabaseIndex() - { - new RedisBusyLockExtension(RedisHelper.GetConnection(), new RedisBusyLockOptions(databaseIndex: -10)); - } - - [TestMethod, ExpectedException(typeof(InvalidOperationException))] - public void ThrowForRegisteringTwoCacheStacks() - { - var extension = new RedisBusyLockExtension(RedisHelper.GetConnection()); - var cacheStack = new Mock().Object; - extension.Register(cacheStack); - extension.Register(cacheStack); - } - - [TestMethod] - public async Task RefreshValueClearsLock() - { - RedisHelper.ResetState(); - - var connection = RedisHelper.GetConnection(); - - var cacheStackMock = new Mock(); - var extension = new RedisBusyLockExtension(connection, RedisBusyLockOptions.Default); - extension.Register(cacheStackMock.Object); - - var cacheEntry = new CacheEntry(13, TimeSpan.FromDays(1)); - - await extension.WithRefreshAsync("TestKey", - () => new ValueTask>(cacheEntry), new CacheSettings(TimeSpan.FromDays(1))); - - var lockStatus = await connection.GetDatabase().LockQueryAsync("TestKey_lock"); - - Assert.IsFalse(lockStatus.HasValue, "Lock was not cleared after refreshValue"); - } - - - [TestMethod] - public async Task ObservedLockSingle() - { - RedisHelper.ResetState(); - - var connection = RedisHelper.GetConnection(); - - var cacheStackMock = new Mock(); - var extension = new RedisBusyLockExtension(connection, RedisBusyLockOptions.Default); - extension.Register(cacheStackMock.Object); - - var cacheEntry = new CacheEntry(13, TimeSpan.FromDays(1)); - - //Establish lock - await connection.GetDatabase().LockTakeAsync("TestKey_lock", "token", TimeSpan.FromMinutes(1)); - - var refreshTask = extension.WithRefreshAsync("TestKey", - () => - { - return new ValueTask>(cacheEntry); - }, - new CacheSettings(TimeSpan.FromDays(1)) - ).AsTask(); - - //Delay to allow for Redis check and self-entry into lock - await Task.Delay(TimeSpan.FromSeconds(1)); - - Assert.IsTrue(extension.LockedOnKeyRefresh.ContainsKey("TestKey"), "Lock was not established"); - - //Trigger the end of the lock - await connection.GetDatabase().LockReleaseAsync("TestKey_lock", "token"); - - var succeedingTask = await Task.WhenAny(refreshTask, Task.Delay(TimeSpan.FromSeconds(10))); - if (!succeedingTask.Equals(refreshTask)) - { - RedisHelper.DebugInfo(connection); - Assert.Fail("Refresh has timed out - something has gone very wrong"); - } - - cacheStackMock.Verify(c => c.GetAsync("TestKey"), Times.Exactly(2), "Two checks to the cache stack are expected"); - } - - - [TestMethod] - public async Task ObservedLockMultiple() - { - RedisHelper.ResetState(); - - var connection = RedisHelper.GetConnection(); - - var cacheStackMock = new Mock(); - var extension = new RedisBusyLockExtension(connection, RedisBusyLockOptions.Default); - extension.Register(cacheStackMock.Object); - - var cacheEntry = new CacheEntry(13, TimeSpan.FromDays(1)); - - //Establish lock - await connection.GetDatabase().LockTakeAsync("TestKey_lock", "token", TimeSpan.FromMinutes(1)); - - var refreshTask1 = extension.WithRefreshAsync("TestKey", - () => - { - return new ValueTask>(cacheEntry); - }, - new CacheSettings(TimeSpan.FromDays(1)) - ).AsTask(); - - var refreshTask2 = extension.WithRefreshAsync("TestKey", - () => - { - return new ValueTask>(cacheEntry); - }, - new CacheSettings(TimeSpan.FromDays(1)) - ).AsTask(); - - //Delay to allow for Redis check and self-entry into lock - await Task.Delay(TimeSpan.FromSeconds(2)); - - Assert.IsTrue(extension.LockedOnKeyRefresh.ContainsKey("TestKey"), "Lock was not established"); - - //Trigger the end of the lock - await connection.GetDatabase().LockReleaseAsync("TestKey_lock", "token"); - - var whenAllRefreshesTask = Task.WhenAll(refreshTask1, refreshTask2); - var succeedingTask = await Task.WhenAny(whenAllRefreshesTask, Task.Delay(TimeSpan.FromSeconds(10))); - if (!succeedingTask.Equals(whenAllRefreshesTask)) - { - RedisHelper.DebugInfo(connection); - Assert.Fail("Refresh has timed out - something has gone very wrong"); - } - - cacheStackMock.Verify(c => c.GetAsync("TestKey"), Times.Exactly(4), "Two checks to the cache stack are expected"); - } - } -} \ No newline at end of file diff --git a/tests/CacheTower.Tests/Extensions/Redis/RedisLockExtensionTests.cs b/tests/CacheTower.Tests/Extensions/Redis/RedisLockExtensionTests.cs index 27e66b07..3e52845e 100644 --- a/tests/CacheTower.Tests/Extensions/Redis/RedisLockExtensionTests.cs +++ b/tests/CacheTower.Tests/Extensions/Redis/RedisLockExtensionTests.cs @@ -249,5 +249,51 @@ public async Task FailsafeOnSubscriberFailure() cacheStackMock.Verify(c => c.GetAsync("TestKey"), Times.Exactly(1), "One checks to the cache stack are expected as it will fail to resolve lock"); } + + + + [TestMethod] + public async Task BusyLockCheckWorksWhenSubscriberFails() + { + RedisHelper.ResetState(); + + var connection = RedisHelper.GetConnection(); + + var cacheStackMock = new Mock(); + var extension = new RedisLockExtension(connection, new RedisLockOptions(null, "CacheTower.CacheLock", "Lock:{0}", -1, TimeSpan.FromMilliseconds(50))); + extension.Register(cacheStackMock.Object); + + var cacheEntry = new CacheEntry(13, TimeSpan.FromDays(1)); + + //Establish lock + await connection.GetDatabase().StringSetAsync("Lock:TestKey", RedisValue.EmptyString); + + var refreshTask = extension.WithRefreshAsync("TestKey", + () => + { + return new ValueTask>(cacheEntry); + }, + new CacheSettings(TimeSpan.FromDays(1)) + ).AsTask(); + + //Delay to allow for Redis check and self-entry into lock + await Task.Delay(TimeSpan.FromSeconds(1)); + + Assert.IsTrue(extension.LockedOnKeyRefresh.ContainsKey("TestKey"), "Lock was not established"); + + //Trigger the end of the lock + await connection.GetDatabase().KeyDeleteAsync("Lock:TestKey"); + + //Note we don't publish the value was refreshed + + var succeedingTask = await Task.WhenAny(refreshTask, Task.Delay(TimeSpan.FromSeconds(10))); + if (!succeedingTask.Equals(refreshTask)) + { + RedisHelper.DebugInfo(connection); + Assert.Fail("Refresh has timed out - something has gone very wrong"); + } + + cacheStackMock.Verify(c => c.GetAsync("TestKey"), Times.Exactly(2), "Two checks to the cache stack are expected"); + } } } \ No newline at end of file From e6e7d05883e67620a985e6c414d50d37373913c8 Mon Sep 17 00:00:00 2001 From: Mike Goodfellow Date: Fri, 20 Aug 2021 13:09:48 +0100 Subject: [PATCH 3/4] Fix to check key exists directly --- src/CacheTower.Extensions.Redis/RedisLockExtension.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/CacheTower.Extensions.Redis/RedisLockExtension.cs b/src/CacheTower.Extensions.Redis/RedisLockExtension.cs index 0aff2eec..781185a1 100644 --- a/src/CacheTower.Extensions.Redis/RedisLockExtension.cs +++ b/src/CacheTower.Extensions.Redis/RedisLockExtension.cs @@ -111,9 +111,9 @@ async Task TestLock(TaskCompletionSource taskCompletionSource) { spinAttempt++; - var lockQuery = await Database.LockQueryAsync(lockKey); + var lockExists = await Database.KeyExistsAsync(lockKey); - if (lockQuery.HasValue) + if (lockExists) { await Task.Delay(Options.SpinTime); continue; From c1844a2a66c6727e60f15bf3207334c8f7076b4b Mon Sep 17 00:00:00 2001 From: Mike Goodfellow Date: Fri, 20 Aug 2021 13:40:05 +0100 Subject: [PATCH 4/4] Only setup cts if not using busy lock --- src/CacheTower.Extensions.Redis/RedisLockExtension.cs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/CacheTower.Extensions.Redis/RedisLockExtension.cs b/src/CacheTower.Extensions.Redis/RedisLockExtension.cs index 781185a1..498ca10a 100644 --- a/src/CacheTower.Extensions.Redis/RedisLockExtension.cs +++ b/src/CacheTower.Extensions.Redis/RedisLockExtension.cs @@ -91,13 +91,16 @@ public async ValueTask> WithRefreshAsync(string cacheKey, Func< var completionSource = LockedOnKeyRefresh.GetOrAdd(cacheKey, key => { var tcs = new TaskCompletionSource(); - var cts = new CancellationTokenSource(Options.LockTimeout); - cts.Token.Register(tcs => ((TaskCompletionSource)tcs).TrySetCanceled(), tcs, useSynchronizationContext: false); - + if (Options.UseBusyLockCheck) { _ = TestLock(tcs); } + else + { + var cts = new CancellationTokenSource(Options.LockTimeout); + cts.Token.Register(tcs => ((TaskCompletionSource)tcs).TrySetCanceled(), tcs, useSynchronizationContext: false); + } return tcs;