Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add busy lock support for RedisLockExtension #184

Merged
merged 4 commits into from
Aug 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions src/CacheTower.Extensions.Redis/RedisLockExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,43 @@ public async ValueTask<CacheEntry<T>> WithRefreshAsync<T>(string cacheKey, Func<
var completionSource = LockedOnKeyRefresh.GetOrAdd(cacheKey, key =>
{
var tcs = new TaskCompletionSource<bool>();
var cts = new CancellationTokenSource(Options.LockTimeout);
cts.Token.Register(tcs => ((TaskCompletionSource<bool>)tcs).TrySetCanceled(), tcs, useSynchronizationContext: false);

if (Options.UseBusyLockCheck)
{
_ = TestLock(tcs);
}
else
{
var cts = new CancellationTokenSource(Options.LockTimeout);
cts.Token.Register(tcs => ((TaskCompletionSource<bool>)tcs).TrySetCanceled(), tcs, useSynchronizationContext: false);
}

return tcs;

async Task TestLock(TaskCompletionSource<bool> taskCompletionSource)
{
var spinAttempt = 0;

while (spinAttempt <= Options.SpinAttempts &&
!taskCompletionSource.Task.IsCanceled &&
!taskCompletionSource.Task.IsCompleted)
{
spinAttempt++;

var lockExists = await Database.KeyExistsAsync(lockKey);

if (lockExists)
{
await Task.Delay(Options.SpinTime);
continue;
}

taskCompletionSource.TrySetResult(true);
return;
}

taskCompletionSource.TrySetCanceled();
}
});

//Last minute check to confirm whether waiting is required (in case the notification is missed)
Expand Down
30 changes: 29 additions & 1 deletion src/CacheTower.Extensions.Redis/RedisLockOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class RedisLockOptions
/// - <see cref="RedisChannel"/>: <c>"CacheTower.CacheLock"</c><br/>
/// - <see cref="KeyFormat"/>: <c>"Lock:{0}"</c><br/>
/// - <see cref="DatabaseIndex"/>: <i>The default database configured on the connection.</i>
/// - <see cref="SpinTime"/>: <i>Unused.</i>
/// </para>
/// </remarks>
public static readonly RedisLockOptions Default = new RedisLockOptions();
Expand All @@ -25,20 +26,39 @@ public class RedisLockOptions
/// If not specified, uses the default database as configured on the connection.
/// </summary>
public int DatabaseIndex { get; }

/// <summary>
/// The Redis channel to communicate unlocking events across.
/// </summary>
public string RedisChannel { get; }

/// <summary>
/// How long to wait on the lock before having it expire.
/// </summary>
public TimeSpan LockTimeout { get; }

/// <summary>
/// A <see cref="string.Format(string, object[])"/> compatible string used to create the lock key stored in Redis.
/// The cache key is provided as argument {0}.
/// </summary>
public string KeyFormat { get; }

/// <summary>
/// Is the busy lock check enabled
/// </summary>
public bool UseBusyLockCheck { get; }

/// <summary>
/// How open to recheck the lock when we were not the context that acquired the lock
/// </summary>
public TimeSpan SpinTime { get; }

/// <summary>
/// Number of attempts to check the lock before giving up
/// </summary>
public int SpinAttempts { get; }


/// <summary>
/// Creates a new instance of the <see cref="RedisLockOptions"/>.
/// </summary>
Expand All @@ -53,17 +73,25 @@ public class RedisLockOptions
/// The database index used for the Redis lock.
/// If not specified, uses the default database as configured on the connection.
/// </param>
/// <param name="spinTime">
/// The waiter on the lock also performs a tight loop to check for lock release
/// This can avoid the situation of a missed message on redis pub/sub
/// </param>
public RedisLockOptions(
TimeSpan? lockTimeout = default,
string redisChannel = "CacheTower.CacheLock",
string keyFormat = "Lock:{0}",
int databaseIndex = -1
int databaseIndex = -1,
TimeSpan? spinTime = default
)
{
LockTimeout = lockTimeout ?? TimeSpan.FromMinutes(1);
RedisChannel = redisChannel ?? throw new ArgumentNullException(nameof(redisChannel));
KeyFormat = keyFormat ?? throw new ArgumentNullException(nameof(keyFormat));
DatabaseIndex = databaseIndex;
UseBusyLockCheck = spinTime.HasValue;
SpinTime = spinTime ?? TimeSpan.FromMilliseconds(100);
SpinAttempts = (int)Math.Ceiling(LockTimeout.TotalMilliseconds / SpinTime.TotalMilliseconds);
}
}
}
2 changes: 1 addition & 1 deletion src/CacheTower/CacheStack.cs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ private async ValueTask BackPopulateCacheAsync<T>(int fromIndexExclusive, string
else if (noExistingValueAvailable)
{
TaskCompletionSource<CacheEntry> completionSource;

lock (WaitingKeyRefresh)
{
if (!WaitingKeyRefresh.TryGetValue(cacheKey, out completionSource!) || completionSource == null)
Expand Down
46 changes: 46 additions & 0 deletions tests/CacheTower.Tests/Extensions/Redis/RedisLockExtensionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -249,5 +249,51 @@ public async Task FailsafeOnSubscriberFailure()

cacheStackMock.Verify(c => c.GetAsync<int>("TestKey"), Times.Exactly(1), "One checks to the cache stack are expected as it will fail to resolve lock");
}



[TestMethod]
public async Task BusyLockCheckWorksWhenSubscriberFails()
{
RedisHelper.ResetState();

var connection = RedisHelper.GetConnection();

var cacheStackMock = new Mock<ICacheStack>();
var extension = new RedisLockExtension(connection, new RedisLockOptions(null, "CacheTower.CacheLock", "Lock:{0}", -1, TimeSpan.FromMilliseconds(50)));
extension.Register(cacheStackMock.Object);

var cacheEntry = new CacheEntry<int>(13, TimeSpan.FromDays(1));

//Establish lock
await connection.GetDatabase().StringSetAsync("Lock:TestKey", RedisValue.EmptyString);

var refreshTask = extension.WithRefreshAsync("TestKey",
() =>
{
return new ValueTask<CacheEntry<int>>(cacheEntry);
},
new CacheSettings(TimeSpan.FromDays(1))
).AsTask();

//Delay to allow for Redis check and self-entry into lock
await Task.Delay(TimeSpan.FromSeconds(1));

Assert.IsTrue(extension.LockedOnKeyRefresh.ContainsKey("TestKey"), "Lock was not established");

//Trigger the end of the lock
await connection.GetDatabase().KeyDeleteAsync("Lock:TestKey");

//Note we don't publish the value was refreshed

var succeedingTask = await Task.WhenAny(refreshTask, Task.Delay(TimeSpan.FromSeconds(10)));
if (!succeedingTask.Equals(refreshTask))
{
RedisHelper.DebugInfo(connection);
Assert.Fail("Refresh has timed out - something has gone very wrong");
}

cacheStackMock.Verify(c => c.GetAsync<int>("TestKey"), Times.Exactly(2), "Two checks to the cache stack are expected");
}
}
}