Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZooKeeper locks #81

Merged
merged 8 commits into from
Mar 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions DistributedLock.Core/AssemblyAttributes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
[assembly: InternalsVisibleTo("DistributedLock.Azure, PublicKey=0024000004800000940000000602000000240000525341310004000001000100fd3af56ccc8ed94fffe25bfd651e6a5674f8f20a76d37de800dd0f7380e04f0fde2da6fa200380b14fe398605b6f470c87e5e0a0bf39ae871f07536a4994aa7a0057c4d3bcedc8fef3eecb0c88c2024a1b3289305c2393acd9fb9f9a42d0bd7826738ce864d507575ea3a1fe1746ab19823303269f79379d767949807f494be8")]
[assembly: InternalsVisibleTo("DistributedLock.FileSystem, PublicKey=0024000004800000940000000602000000240000525341310004000001000100fd3af56ccc8ed94fffe25bfd651e6a5674f8f20a76d37de800dd0f7380e04f0fde2da6fa200380b14fe398605b6f470c87e5e0a0bf39ae871f07536a4994aa7a0057c4d3bcedc8fef3eecb0c88c2024a1b3289305c2393acd9fb9f9a42d0bd7826738ce864d507575ea3a1fe1746ab19823303269f79379d767949807f494be8")]
[assembly: InternalsVisibleTo("DistributedLock.Redis, PublicKey=0024000004800000940000000602000000240000525341310004000001000100fd3af56ccc8ed94fffe25bfd651e6a5674f8f20a76d37de800dd0f7380e04f0fde2da6fa200380b14fe398605b6f470c87e5e0a0bf39ae871f07536a4994aa7a0057c4d3bcedc8fef3eecb0c88c2024a1b3289305c2393acd9fb9f9a42d0bd7826738ce864d507575ea3a1fe1746ab19823303269f79379d767949807f494be8")]
[assembly: InternalsVisibleTo("DistributedLock.ZooKeeper, PublicKey=0024000004800000940000000602000000240000525341310004000001000100fd3af56ccc8ed94fffe25bfd651e6a5674f8f20a76d37de800dd0f7380e04f0fde2da6fa200380b14fe398605b6f470c87e5e0a0bf39ae871f07536a4994aa7a0057c4d3bcedc8fef3eecb0c88c2024a1b3289305c2393acd9fb9f9a42d0bd7826738ce864d507575ea3a1fe1746ab19823303269f79379d767949807f494be8")]
#endif
3 changes: 1 addition & 2 deletions DistributedLock.Core/Internal/Data/ConnectionMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,7 @@ private async ValueTask StopOrDisposeAsync(bool isDispose)

if (task != null)
{
if (SyncViaAsync.IsSynchronous) { task.GetAwaiter().GetResult(); }
else { await task.ConfigureAwait(false); }
await task.AwaitSyncOverAsync().ConfigureAwait(false);
}
}

Expand Down
1 change: 0 additions & 1 deletion DistributedLock.Core/Internal/Data/DatabaseConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public DatabaseCommand CreateCommand()
// note: we could have this return an IAsyncDisposable which would allow you to close the transaction
// without closing the connection. However, we don't currently have any use-cases for that
public async ValueTask BeginTransactionAsync()
#pragma warning restore CS1998
{
Invariant.Require(this._transaction == null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public async ValueTask<Result> TryAcquireAsync<TLockCookie>(
bool opportunistic)
where TLockCookie : class
{
using var mutextHandle = await this._mutex.TryAcquireAsync(opportunistic ? TimeSpan.Zero : Timeout.InfiniteTimeSpan, cancellationToken).ConfigureAwait(false);
if (mutextHandle == null)
using var mutexHandle = await this._mutex.TryAcquireAsync(opportunistic ? TimeSpan.Zero : Timeout.InfiniteTimeSpan, cancellationToken).ConfigureAwait(false);
if (mutexHandle == null)
{
// mutex wasn't free, so just give up
Invariant.Require(opportunistic);
Expand Down
3 changes: 1 addition & 2 deletions DistributedLock.Core/Internal/LeaseMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ public async ValueTask DisposeAsync()
this._disposalSource.Cancel();
}

if (SyncViaAsync.IsSynchronous) { this._monitoringTask.GetAwaiter().GetResult(); }
else { await this._monitoringTask.ConfigureAwait(false); }
await this._monitoringTask.AwaitSyncOverAsync().ConfigureAwait(false);
}
finally
{
Expand Down
22 changes: 22 additions & 0 deletions DistributedLock.Core/Internal/SyncViaAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,27 @@ public static ValueTask Delay(TimeoutValue timeout, CancellationToken cancellati
public static void DisposeSyncViaAsync<TDisposable>(this TDisposable disposable)
where TDisposable : IAsyncDisposable, IDisposable =>
Run(@this => @this.DisposeAsync(), disposable);

/// <summary>
/// In synchronous mode, performs a blocking wait on the provided <paramref name="task"/>. In asynchronous mode,
/// returns the <paramref name="task"/> as a <see cref="ValueTask{TResult}"/>.
/// </summary>
public static ValueTask<TResult> AwaitSyncOverAsync<TResult>(this Task<TResult> task) =>
IsSynchronous ? task.GetAwaiter().GetResult().AsValueTask() : task.AsValueTask();

/// <summary>
/// In synchronous mode, performs a blocking wait on the provided <paramref name="task"/>. In asynchronous mode,
/// returns the <paramref name="task"/> as a <see cref="ValueTask"/>.
/// </summary>
public static ValueTask AwaitSyncOverAsync(this Task task)
{
if (IsSynchronous)
{
task.GetAwaiter().GetResult();
return default;
}

return task.AsValueTask();
}
}
}
4 changes: 1 addition & 3 deletions DistributedLock.Redis/RedLock/RedLockAcquire.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ public RedLockAcquire(
var succeeded = false;
try
{
succeeded = isSynchronous
? waitForAcquireTask.GetAwaiter().GetResult()
: await waitForAcquireTask.ConfigureAwait(false);
succeeded = await waitForAcquireTask.AwaitSyncOverAsync().ConfigureAwait(false);
}
finally
{
Expand Down
2 changes: 1 addition & 1 deletion DistributedLock.Redis/TimeoutTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Medallion.Threading.Redis
/// Acts as a <see cref="Task.Delay(TimeSpan, CancellationToken)"/> which is cleaned up when
/// the <see cref="TimeoutTask"/> gets disposed
/// </summary>
internal struct TimeoutTask : IDisposable
internal readonly struct TimeoutTask : IDisposable
{
private readonly CancellationTokenSource _cleanupTokenSource;
private readonly CancellationTokenSource? _linkedTokenSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,8 @@ private void CrossProcessAbandonmentHelper(bool asyncWait, bool kill)
command.StandardInput.WriteLine("abandon");
command.StandardInput.Flush();
}
// make sure it actually exits
Assert.IsTrue(command.Task.ContinueWith(_ => { }).Wait(TimeSpan.FromSeconds(5)), "lock taker should exit");

if (this._lockProvider.SupportsCrossProcessAbandonment)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void TestAcquireWithLockPrefix()
using var explicitPrefixHandle = explicitPrefixLock.TryAcquire();
Assert.IsNull(explicitPrefixHandle);

IDatabase CreateDatabase(string? keyPrefix = null)
static IDatabase CreateDatabase(string? keyPrefix = null)
{
var database = RedisServer.GetDefaultServer(0).Multiplexer.GetDatabase();
return keyPrefix is null ? database : database.WithKeyPrefix(keyPrefix);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
using Medallion.Threading.Tests.ZooKeeper;
using Medallion.Threading.ZooKeeper;
using NUnit.Framework;
using org.apache.zookeeper;
using org.apache.zookeeper.data;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Medallion.Threading.Tests.ZooKeeper
{
public abstract class ZooKeeperSynchronizationCoreTestCases<TLockProvider>
where TLockProvider : TestingLockProvider<TestingZooKeeperSynchronizationStrategy>, new()
{
private TLockProvider _provider = default!;

[SetUp]
public void SetUp() => this._provider = new TLockProvider();

[TearDown]
public void TearDown() => this._provider.Dispose();

[Test]
public async Task TestDoesNotAttemptToCreateOrDeleteExistingNode()
{
// This doesn't work because creating the lock attempts to acquire which will then fail initially. We could work around this by testing
// for a different set of conditions in the multi-ticket case, but the extra coverage doesn't seem valuable (we still have coverage of single-ticket)
if (IsMultiTicketSemaphoreProvider) { Assert.Pass("not supported"); }

var path = new ZooKeeperPath($"/{this.GetType()}.{nameof(this.TestDoesNotAttemptToCreateOrDeleteExistingNode)} ({TargetFramework.Current})");
using var connection = await ZooKeeperConnection.DefaultPool.ConnectAsync(
new ZooKeeperConnectionInfo(ZooKeeperPorts.DefaultConnectionString, TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30), new EquatableReadOnlyList<ZooKeeperAuthInfo>(Array.Empty<ZooKeeperAuthInfo>())),
CancellationToken.None
);

// pre-clean up just in case
try { await connection.ZooKeeper.deleteAsync(path.ToString()); }
catch (KeeperException.NoNodeException) { }

this._provider.Strategy.AssumeNodeExists = true;
var @lock = this._provider.CreateLockWithExactName(path.ToString());

Assert.That(
Assert.ThrowsAsync<InvalidOperationException>(() => @lock.TryAcquireAsync().AsTask()).Message,
Does.Contain("does not exist")
);

await connection.ZooKeeper.createAsync(path.ToString(), Array.Empty<byte>(), new List<ACL> { ZooKeeperNodeCreator.PublicAcl }, CreateMode.PERSISTENT);
try
{
await using (var handle = await @lock.TryAcquireAsync())
{
Assert.IsNotNull(handle);
}

Assert.IsNotNull(await connection.ZooKeeper.existsAsync(path.ToString()));
}
finally
{
await connection.ZooKeeper.deleteAsync(path.ToString());
}
}

[TestCase("/")]
[TestCase(".")]
[TestCase("..")]
[TestCase("zookeeper")]
[TestCase("abc\0")]
public void TestGetSafeName(string name) =>
Assert.DoesNotThrowAsync(async () => await (await this._provider.CreateLockWithExactName(this._provider.GetSafeName(name)).AcquireAsync()).DisposeAsync());

[Test]
public void TestGetSafeNameWithControlCharacters() => this.TestGetSafeName("\u001f\u009F\uf8ff\ufff1");

[Test]
public async Task TestCustomAclAndAuth()
{
// This doesn't work because creating the lock causes the node to be created (from taking the other tickets)
// and releasing the lock doesn't cause the node to be deleted (due to those other tickets).
if (IsMultiTicketSemaphoreProvider) { Assert.Pass("not supported"); }

const string Username = "username";
const string Password = "secretPassword";

var unauthenticatedLock = this._provider.CreateLock(string.Empty);

this._provider.Strategy.Options = o => o.AddAccessControl("digest", GenerateDigestAclId(Username, Password), 0x1f)
.AddAuthInfo("digest", Encoding.UTF8.GetBytes($"{Username}:{Password}"));
var @lock = this._provider.CreateLock(string.Empty);

await using (await @lock.AcquireAsync())
{
Assert.ThrowsAsync<KeeperException.NoAuthException>(() => unauthenticatedLock.TryAcquireAsync().AsTask());
}

Assert.DoesNotThrowAsync(async () => await (await unauthenticatedLock.AcquireAsync()).DisposeAsync());

// Based on
// https://github.com/apache/zookeeper/blob/d8561f620fa8611e9a6819d9879b0f18e5a404a9/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/DigestAuthenticationProvider.java
static string GenerateDigestAclId(string username, string password)
{
using var sha = SHA1.Create();
var digest = sha.ComputeHash(Encoding.UTF8.GetBytes($"{username}:{password}"));
return $"{username}:{Convert.ToBase64String(digest)}";
}
}

[Test]
public async Task TestInvalidAclDoesNotCorruptStore()
{
// This doesn't work because creating the lock causes the node to be created (from taking the other tickets)
// and releasing the lock doesn't cause the node to be deleted (due to those other tickets).
if (IsMultiTicketSemaphoreProvider) { Assert.Pass("not supported"); }

const string Username = "username";
const string Password = "xyz";

// ACL is the right format but the wrong password (this can easily happen if you get the encoding wrong)
this._provider.Strategy.Options = o => o.AddAccessControl("digest", $"{Username}:1eYGPn6j9+P9osACW8ob4HhZT+s=", 0x1f)
.AddAuthInfo("digest", Encoding.UTF8.GetBytes($"{Username}:{Password}"));
var invalidAclLock = this._provider.CreateLock(string.Empty);

// pre-cleanup to make sure we will actually create the path
using var connection = await ZooKeeperConnection.DefaultPool.ConnectAsync(
new ZooKeeperConnectionInfo(ZooKeeperPorts.DefaultConnectionString, TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30), new EquatableReadOnlyList<ZooKeeperAuthInfo>(Array.Empty<ZooKeeperAuthInfo>())),
CancellationToken.None
);
try { await connection.ZooKeeper.deleteAsync(invalidAclLock.Name); }
catch (KeeperException.NoNodeException) { }

Assert.ThrowsAsync<KeeperException.NoAuthException>(() => invalidAclLock.AcquireAsync().AsTask());

Assert.IsNull(await connection.ZooKeeper.existsAsync(invalidAclLock.Name));

this._provider.Strategy.Options = null;
var validLock = this._provider.CreateLock(string.Empty);
Assert.DoesNotThrowAsync(async () => await (await validLock.AcquireAsync()).DisposeAsync());
}

[Test]
public async Task TestDeepDirectoryCreation()
{
var directory = new ZooKeeperPath($"/{TestHelper.UniqueName}/foo/bar/baz");

// pre-cleanup to make sure we will actually create the directory
using var connection = await ZooKeeperConnection.DefaultPool.ConnectAsync(
new ZooKeeperConnectionInfo(ZooKeeperPorts.DefaultConnectionString, TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30), new EquatableReadOnlyList<ZooKeeperAuthInfo>(Array.Empty<ZooKeeperAuthInfo>())),
CancellationToken.None
);
for (var toDelete = directory; toDelete != ZooKeeperPath.Root; toDelete = toDelete.GetDirectory()!.Value)
{
try { await connection.ZooKeeper.deleteAsync(toDelete.ToString()); }
catch (KeeperException.NoNodeException) { }
}

var @lock = this._provider.CreateLockWithExactName(directory.GetChildNodePathWithSafeName("qux").ToString());

await using (await @lock.AcquireAsync())
{
Assert.IsNotNull(await connection.ZooKeeper.existsAsync(directory.ToString()));
}

Assert.IsNotNull(await connection.ZooKeeper.existsAsync(directory.ToString()), "directory still exists");
}

[Test]
public async Task TestThrowsIfPathDeletedWhileWaiting()
{
var @lock = this._provider.CreateLock(string.Empty);

// hold the lock
await using var handle = await @lock.AcquireAsync();

using var connection = await ZooKeeperConnection.DefaultPool.ConnectAsync(
new ZooKeeperConnectionInfo(ZooKeeperPorts.DefaultConnectionString, TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30), new EquatableReadOnlyList<ZooKeeperAuthInfo>(Array.Empty<ZooKeeperAuthInfo>())),
CancellationToken.None
);
var initialChildren = await connection.ZooKeeper.getChildrenAsync(@lock.Name);

// start waiting
var blockedAcquireTask = @lock.AcquireAsync(TimeSpan.FromSeconds(30)).AsTask();
// once the wait has started...
var newChild = await WaitForNewChildAsync();
// ... start another waiter...
var blockedAcquireTask2 = @lock.AcquireAsync(TimeSpan.FromSeconds(30)).AsTask();
// ... and delete the first waiter's node
await connection.ZooKeeper.deleteAsync(newChild);

// release the lock
await handle.DisposeAsync();

// the first waiter should throw
Assert.ThrowsAsync<InvalidOperationException>(() => blockedAcquireTask);

// the second waiter should complete
Assert.DoesNotThrowAsync(async () => await (await blockedAcquireTask2).DisposeAsync());

async Task<string> WaitForNewChildAsync()
{
var start = DateTime.UtcNow;
while (true)
{
var children = await connection.ZooKeeper.getChildrenAsync(@lock.Name);
var newChild = children.Children.Except(initialChildren.Children).SingleOrDefault();
if (newChild != null) { return $"{@lock.Name}/{newChild}"; }

if (DateTime.UtcNow - start >= TimeSpan.FromSeconds(10)) { Assert.Fail("Timed out"); }

await Task.Delay(5);
}
}
}

private static bool IsMultiTicketSemaphoreProvider =>
typeof(TLockProvider) == typeof(TestingSemaphore5AsMutexProvider<TestingZooKeeperDistributedSemaphoreProvider, TestingZooKeeperSynchronizationStrategy>);
}
}
2 changes: 1 addition & 1 deletion DistributedLock.Tests/DistributedLock.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<!-- for test iteration speed, just use one TF for debug -->
<TargetFrameworks Condition="'$(Configuration)' == 'Debug'">netcoreapp3.1</TargetFrameworks>
<TargetFrameworks Condition="'$(Configuration)' != 'Debug'">net471;netcoreapp3.1</TargetFrameworks>
<LangVersion>8.0</LangVersion>
<LangVersion>Latest</LangVersion>
<Nullable>enable</Nullable>
<RootNamespace>Medallion.Threading.Tests</RootNamespace>
<SignAssembly>true</SignAssembly>
Expand Down
13 changes: 13 additions & 0 deletions DistributedLock.Tests/Infrastructure/Shared/ZooKeeperPorts.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace Medallion.Threading.Tests
{
public static class ZooKeeperPorts
{
public const int DefaultPort = 2181;

public static readonly string DefaultConnectionString = "127.0.0.1:" + DefaultPort;
}
}
30 changes: 30 additions & 0 deletions DistributedLock.Tests/Infrastructure/TestHelper.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Medallion.Threading.Internal;
using Medallion.Threading.Redis;
using NUnit.Framework;
using System;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -39,5 +40,34 @@ public static async Task<bool> WaitAsync(this Task task, TimeoutValue timeout)
await task;
return true;
}

/// <summary>
/// Waits up to <paramref name="timeout"/> for <paramref name="predicate"/> to return true. Checks <paramref name="predicate"/> every
/// <paramref name="checkCadence"/>.
/// </summary>
public static async Task<bool> WaitForAsync(Func<ValueTask<bool>> predicate, TimeoutValue timeout, TimeoutValue? checkCadence = null)
{
using var cancellationSource = new CancellationTokenSource();
var waitForPredicateTask = WaitForPredicateAsync();

if (!await waitForPredicateTask.WaitAsync(timeout))
{
cancellationSource.Cancel();
await waitForPredicateTask;
return false;
}

return true;

async Task WaitForPredicateAsync()
{
var cancellationToken = cancellationSource.Token;
while (!cancellationToken.IsCancellationRequested)
{
if (await predicate()) { return; }
await Task.Delay(checkCadence?.TimeSpan ?? TimeSpan.FromMilliseconds(5));
}
}
}
}
}
Loading