Skip to content

Commit

Permalink
Merge pull request #81 from madelson/zookeeper
Browse files Browse the repository at this point in the history
ZooKeeper locks
  • Loading branch information
madelson authored Mar 26, 2021
2 parents 5bd69af + 6314241 commit f3d6faf
Show file tree
Hide file tree
Showing 59 changed files with 3,418 additions and 29 deletions.
1 change: 1 addition & 0 deletions DistributedLock.Core/AssemblyAttributes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
[assembly: InternalsVisibleTo("DistributedLock.Azure, PublicKey=0024000004800000940000000602000000240000525341310004000001000100fd3af56ccc8ed94fffe25bfd651e6a5674f8f20a76d37de800dd0f7380e04f0fde2da6fa200380b14fe398605b6f470c87e5e0a0bf39ae871f07536a4994aa7a0057c4d3bcedc8fef3eecb0c88c2024a1b3289305c2393acd9fb9f9a42d0bd7826738ce864d507575ea3a1fe1746ab19823303269f79379d767949807f494be8")]
[assembly: InternalsVisibleTo("DistributedLock.FileSystem, PublicKey=0024000004800000940000000602000000240000525341310004000001000100fd3af56ccc8ed94fffe25bfd651e6a5674f8f20a76d37de800dd0f7380e04f0fde2da6fa200380b14fe398605b6f470c87e5e0a0bf39ae871f07536a4994aa7a0057c4d3bcedc8fef3eecb0c88c2024a1b3289305c2393acd9fb9f9a42d0bd7826738ce864d507575ea3a1fe1746ab19823303269f79379d767949807f494be8")]
[assembly: InternalsVisibleTo("DistributedLock.Redis, PublicKey=0024000004800000940000000602000000240000525341310004000001000100fd3af56ccc8ed94fffe25bfd651e6a5674f8f20a76d37de800dd0f7380e04f0fde2da6fa200380b14fe398605b6f470c87e5e0a0bf39ae871f07536a4994aa7a0057c4d3bcedc8fef3eecb0c88c2024a1b3289305c2393acd9fb9f9a42d0bd7826738ce864d507575ea3a1fe1746ab19823303269f79379d767949807f494be8")]
[assembly: InternalsVisibleTo("DistributedLock.ZooKeeper, PublicKey=0024000004800000940000000602000000240000525341310004000001000100fd3af56ccc8ed94fffe25bfd651e6a5674f8f20a76d37de800dd0f7380e04f0fde2da6fa200380b14fe398605b6f470c87e5e0a0bf39ae871f07536a4994aa7a0057c4d3bcedc8fef3eecb0c88c2024a1b3289305c2393acd9fb9f9a42d0bd7826738ce864d507575ea3a1fe1746ab19823303269f79379d767949807f494be8")]
#endif
3 changes: 1 addition & 2 deletions DistributedLock.Core/Internal/Data/ConnectionMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,7 @@ private async ValueTask StopOrDisposeAsync(bool isDispose)

if (task != null)
{
if (SyncViaAsync.IsSynchronous) { task.GetAwaiter().GetResult(); }
else { await task.ConfigureAwait(false); }
await task.AwaitSyncOverAsync().ConfigureAwait(false);
}
}

Expand Down
1 change: 0 additions & 1 deletion DistributedLock.Core/Internal/Data/DatabaseConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public DatabaseCommand CreateCommand()
// note: we could have this return an IAsyncDisposable which would allow you to close the transaction
// without closing the connection. However, we don't currently have any use-cases for that
public async ValueTask BeginTransactionAsync()
#pragma warning restore CS1998
{
Invariant.Require(this._transaction == null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public async ValueTask<Result> TryAcquireAsync<TLockCookie>(
bool opportunistic)
where TLockCookie : class
{
using var mutextHandle = await this._mutex.TryAcquireAsync(opportunistic ? TimeSpan.Zero : Timeout.InfiniteTimeSpan, cancellationToken).ConfigureAwait(false);
if (mutextHandle == null)
using var mutexHandle = await this._mutex.TryAcquireAsync(opportunistic ? TimeSpan.Zero : Timeout.InfiniteTimeSpan, cancellationToken).ConfigureAwait(false);
if (mutexHandle == null)
{
// mutex wasn't free, so just give up
Invariant.Require(opportunistic);
Expand Down
3 changes: 1 addition & 2 deletions DistributedLock.Core/Internal/LeaseMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ public async ValueTask DisposeAsync()
this._disposalSource.Cancel();
}

if (SyncViaAsync.IsSynchronous) { this._monitoringTask.GetAwaiter().GetResult(); }
else { await this._monitoringTask.ConfigureAwait(false); }
await this._monitoringTask.AwaitSyncOverAsync().ConfigureAwait(false);
}
finally
{
Expand Down
22 changes: 22 additions & 0 deletions DistributedLock.Core/Internal/SyncViaAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,27 @@ public static ValueTask Delay(TimeoutValue timeout, CancellationToken cancellati
public static void DisposeSyncViaAsync<TDisposable>(this TDisposable disposable)
where TDisposable : IAsyncDisposable, IDisposable =>
Run(@this => @this.DisposeAsync(), disposable);

/// <summary>
/// In synchronous mode, performs a blocking wait on the provided <paramref name="task"/>. In asynchronous mode,
/// returns the <paramref name="task"/> as a <see cref="ValueTask{TResult}"/>.
/// </summary>
public static ValueTask<TResult> AwaitSyncOverAsync<TResult>(this Task<TResult> task) =>
IsSynchronous ? task.GetAwaiter().GetResult().AsValueTask() : task.AsValueTask();

/// <summary>
/// In synchronous mode, performs a blocking wait on the provided <paramref name="task"/>. In asynchronous mode,
/// returns the <paramref name="task"/> as a <see cref="ValueTask"/>.
/// </summary>
public static ValueTask AwaitSyncOverAsync(this Task task)
{
if (IsSynchronous)
{
task.GetAwaiter().GetResult();
return default;
}

return task.AsValueTask();
}
}
}
4 changes: 1 addition & 3 deletions DistributedLock.Redis/RedLock/RedLockAcquire.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ public RedLockAcquire(
var succeeded = false;
try
{
succeeded = isSynchronous
? waitForAcquireTask.GetAwaiter().GetResult()
: await waitForAcquireTask.ConfigureAwait(false);
succeeded = await waitForAcquireTask.AwaitSyncOverAsync().ConfigureAwait(false);
}
finally
{
Expand Down
2 changes: 1 addition & 1 deletion DistributedLock.Redis/TimeoutTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Medallion.Threading.Redis
/// Acts as a <see cref="Task.Delay(TimeSpan, CancellationToken)"/> which is cleaned up when
/// the <see cref="TimeoutTask"/> gets disposed
/// </summary>
internal struct TimeoutTask : IDisposable
internal readonly struct TimeoutTask : IDisposable
{
private readonly CancellationTokenSource _cleanupTokenSource;
private readonly CancellationTokenSource? _linkedTokenSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,8 @@ private void CrossProcessAbandonmentHelper(bool asyncWait, bool kill)
command.StandardInput.WriteLine("abandon");
command.StandardInput.Flush();
}
// make sure it actually exits
Assert.IsTrue(command.Task.ContinueWith(_ => { }).Wait(TimeSpan.FromSeconds(5)), "lock taker should exit");

if (this._lockProvider.SupportsCrossProcessAbandonment)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void TestAcquireWithLockPrefix()
using var explicitPrefixHandle = explicitPrefixLock.TryAcquire();
Assert.IsNull(explicitPrefixHandle);

IDatabase CreateDatabase(string? keyPrefix = null)
static IDatabase CreateDatabase(string? keyPrefix = null)
{
var database = RedisServer.GetDefaultServer(0).Multiplexer.GetDatabase();
return keyPrefix is null ? database : database.WithKeyPrefix(keyPrefix);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
using Medallion.Threading.Tests.ZooKeeper;
using Medallion.Threading.ZooKeeper;
using NUnit.Framework;
using org.apache.zookeeper;
using org.apache.zookeeper.data;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Medallion.Threading.Tests.ZooKeeper
{
public abstract class ZooKeeperSynchronizationCoreTestCases<TLockProvider>
where TLockProvider : TestingLockProvider<TestingZooKeeperSynchronizationStrategy>, new()
{
private TLockProvider _provider = default!;

[SetUp]
public void SetUp() => this._provider = new TLockProvider();

[TearDown]
public void TearDown() => this._provider.Dispose();

[Test]
public async Task TestDoesNotAttemptToCreateOrDeleteExistingNode()
{
// This doesn't work because creating the lock attempts to acquire which will then fail initially. We could work around this by testing
// for a different set of conditions in the multi-ticket case, but the extra coverage doesn't seem valuable (we still have coverage of single-ticket)
if (IsMultiTicketSemaphoreProvider) { Assert.Pass("not supported"); }

var path = new ZooKeeperPath($"/{this.GetType()}.{nameof(this.TestDoesNotAttemptToCreateOrDeleteExistingNode)} ({TargetFramework.Current})");
using var connection = await ZooKeeperConnection.DefaultPool.ConnectAsync(
new ZooKeeperConnectionInfo(ZooKeeperPorts.DefaultConnectionString, TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30), new EquatableReadOnlyList<ZooKeeperAuthInfo>(Array.Empty<ZooKeeperAuthInfo>())),
CancellationToken.None
);

// pre-clean up just in case
try { await connection.ZooKeeper.deleteAsync(path.ToString()); }
catch (KeeperException.NoNodeException) { }

this._provider.Strategy.AssumeNodeExists = true;
var @lock = this._provider.CreateLockWithExactName(path.ToString());

Assert.That(
Assert.ThrowsAsync<InvalidOperationException>(() => @lock.TryAcquireAsync().AsTask()).Message,
Does.Contain("does not exist")
);

await connection.ZooKeeper.createAsync(path.ToString(), Array.Empty<byte>(), new List<ACL> { ZooKeeperNodeCreator.PublicAcl }, CreateMode.PERSISTENT);
try
{
await using (var handle = await @lock.TryAcquireAsync())
{
Assert.IsNotNull(handle);
}

Assert.IsNotNull(await connection.ZooKeeper.existsAsync(path.ToString()));
}
finally
{
await connection.ZooKeeper.deleteAsync(path.ToString());
}
}

[TestCase("/")]
[TestCase(".")]
[TestCase("..")]
[TestCase("zookeeper")]
[TestCase("abc\0")]
public void TestGetSafeName(string name) =>
Assert.DoesNotThrowAsync(async () => await (await this._provider.CreateLockWithExactName(this._provider.GetSafeName(name)).AcquireAsync()).DisposeAsync());

[Test]
public void TestGetSafeNameWithControlCharacters() => this.TestGetSafeName("\u001f\u009F\uf8ff\ufff1");

[Test]
public async Task TestCustomAclAndAuth()
{
// This doesn't work because creating the lock causes the node to be created (from taking the other tickets)
// and releasing the lock doesn't cause the node to be deleted (due to those other tickets).
if (IsMultiTicketSemaphoreProvider) { Assert.Pass("not supported"); }

const string Username = "username";
const string Password = "secretPassword";

var unauthenticatedLock = this._provider.CreateLock(string.Empty);

this._provider.Strategy.Options = o => o.AddAccessControl("digest", GenerateDigestAclId(Username, Password), 0x1f)
.AddAuthInfo("digest", Encoding.UTF8.GetBytes($"{Username}:{Password}"));
var @lock = this._provider.CreateLock(string.Empty);

await using (await @lock.AcquireAsync())
{
Assert.ThrowsAsync<KeeperException.NoAuthException>(() => unauthenticatedLock.TryAcquireAsync().AsTask());
}

Assert.DoesNotThrowAsync(async () => await (await unauthenticatedLock.AcquireAsync()).DisposeAsync());

// Based on
// https://github.com/apache/zookeeper/blob/d8561f620fa8611e9a6819d9879b0f18e5a404a9/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/DigestAuthenticationProvider.java
static string GenerateDigestAclId(string username, string password)
{
using var sha = SHA1.Create();
var digest = sha.ComputeHash(Encoding.UTF8.GetBytes($"{username}:{password}"));
return $"{username}:{Convert.ToBase64String(digest)}";
}
}

[Test]
public async Task TestInvalidAclDoesNotCorruptStore()
{
// This doesn't work because creating the lock causes the node to be created (from taking the other tickets)
// and releasing the lock doesn't cause the node to be deleted (due to those other tickets).
if (IsMultiTicketSemaphoreProvider) { Assert.Pass("not supported"); }

const string Username = "username";
const string Password = "xyz";

// ACL is the right format but the wrong password (this can easily happen if you get the encoding wrong)
this._provider.Strategy.Options = o => o.AddAccessControl("digest", $"{Username}:1eYGPn6j9+P9osACW8ob4HhZT+s=", 0x1f)
.AddAuthInfo("digest", Encoding.UTF8.GetBytes($"{Username}:{Password}"));
var invalidAclLock = this._provider.CreateLock(string.Empty);

// pre-cleanup to make sure we will actually create the path
using var connection = await ZooKeeperConnection.DefaultPool.ConnectAsync(
new ZooKeeperConnectionInfo(ZooKeeperPorts.DefaultConnectionString, TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30), new EquatableReadOnlyList<ZooKeeperAuthInfo>(Array.Empty<ZooKeeperAuthInfo>())),
CancellationToken.None
);
try { await connection.ZooKeeper.deleteAsync(invalidAclLock.Name); }
catch (KeeperException.NoNodeException) { }

Assert.ThrowsAsync<KeeperException.NoAuthException>(() => invalidAclLock.AcquireAsync().AsTask());

Assert.IsNull(await connection.ZooKeeper.existsAsync(invalidAclLock.Name));

this._provider.Strategy.Options = null;
var validLock = this._provider.CreateLock(string.Empty);
Assert.DoesNotThrowAsync(async () => await (await validLock.AcquireAsync()).DisposeAsync());
}

[Test]
public async Task TestDeepDirectoryCreation()
{
var directory = new ZooKeeperPath($"/{TestHelper.UniqueName}/foo/bar/baz");

// pre-cleanup to make sure we will actually create the directory
using var connection = await ZooKeeperConnection.DefaultPool.ConnectAsync(
new ZooKeeperConnectionInfo(ZooKeeperPorts.DefaultConnectionString, TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30), new EquatableReadOnlyList<ZooKeeperAuthInfo>(Array.Empty<ZooKeeperAuthInfo>())),
CancellationToken.None
);
for (var toDelete = directory; toDelete != ZooKeeperPath.Root; toDelete = toDelete.GetDirectory()!.Value)
{
try { await connection.ZooKeeper.deleteAsync(toDelete.ToString()); }
catch (KeeperException.NoNodeException) { }
}

var @lock = this._provider.CreateLockWithExactName(directory.GetChildNodePathWithSafeName("qux").ToString());

await using (await @lock.AcquireAsync())
{
Assert.IsNotNull(await connection.ZooKeeper.existsAsync(directory.ToString()));
}

Assert.IsNotNull(await connection.ZooKeeper.existsAsync(directory.ToString()), "directory still exists");
}

[Test]
public async Task TestThrowsIfPathDeletedWhileWaiting()
{
var @lock = this._provider.CreateLock(string.Empty);

// hold the lock
await using var handle = await @lock.AcquireAsync();

using var connection = await ZooKeeperConnection.DefaultPool.ConnectAsync(
new ZooKeeperConnectionInfo(ZooKeeperPorts.DefaultConnectionString, TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30), new EquatableReadOnlyList<ZooKeeperAuthInfo>(Array.Empty<ZooKeeperAuthInfo>())),
CancellationToken.None
);
var initialChildren = await connection.ZooKeeper.getChildrenAsync(@lock.Name);

// start waiting
var blockedAcquireTask = @lock.AcquireAsync(TimeSpan.FromSeconds(30)).AsTask();
// once the wait has started...
var newChild = await WaitForNewChildAsync();
// ... start another waiter...
var blockedAcquireTask2 = @lock.AcquireAsync(TimeSpan.FromSeconds(30)).AsTask();
// ... and delete the first waiter's node
await connection.ZooKeeper.deleteAsync(newChild);

// release the lock
await handle.DisposeAsync();

// the first waiter should throw
Assert.ThrowsAsync<InvalidOperationException>(() => blockedAcquireTask);

// the second waiter should complete
Assert.DoesNotThrowAsync(async () => await (await blockedAcquireTask2).DisposeAsync());

async Task<string> WaitForNewChildAsync()
{
var start = DateTime.UtcNow;
while (true)
{
var children = await connection.ZooKeeper.getChildrenAsync(@lock.Name);
var newChild = children.Children.Except(initialChildren.Children).SingleOrDefault();
if (newChild != null) { return $"{@lock.Name}/{newChild}"; }

if (DateTime.UtcNow - start >= TimeSpan.FromSeconds(10)) { Assert.Fail("Timed out"); }

await Task.Delay(5);
}
}
}

private static bool IsMultiTicketSemaphoreProvider =>
typeof(TLockProvider) == typeof(TestingSemaphore5AsMutexProvider<TestingZooKeeperDistributedSemaphoreProvider, TestingZooKeeperSynchronizationStrategy>);
}
}
2 changes: 1 addition & 1 deletion DistributedLock.Tests/DistributedLock.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<!-- for test iteration speed, just use one TF for debug -->
<TargetFrameworks Condition="'$(Configuration)' == 'Debug'">netcoreapp3.1</TargetFrameworks>
<TargetFrameworks Condition="'$(Configuration)' != 'Debug'">net471;netcoreapp3.1</TargetFrameworks>
<LangVersion>8.0</LangVersion>
<LangVersion>Latest</LangVersion>
<Nullable>enable</Nullable>
<RootNamespace>Medallion.Threading.Tests</RootNamespace>
<SignAssembly>true</SignAssembly>
Expand Down
13 changes: 13 additions & 0 deletions DistributedLock.Tests/Infrastructure/Shared/ZooKeeperPorts.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace Medallion.Threading.Tests
{
public static class ZooKeeperPorts
{
public const int DefaultPort = 2181;

public static readonly string DefaultConnectionString = "127.0.0.1:" + DefaultPort;
}
}
30 changes: 30 additions & 0 deletions DistributedLock.Tests/Infrastructure/TestHelper.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Medallion.Threading.Internal;
using Medallion.Threading.Redis;
using NUnit.Framework;
using System;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -39,5 +40,34 @@ public static async Task<bool> WaitAsync(this Task task, TimeoutValue timeout)
await task;
return true;
}

/// <summary>
/// Waits up to <paramref name="timeout"/> for <paramref name="predicate"/> to return true. Checks <paramref name="predicate"/> every
/// <paramref name="checkCadence"/>.
/// </summary>
public static async Task<bool> WaitForAsync(Func<ValueTask<bool>> predicate, TimeoutValue timeout, TimeoutValue? checkCadence = null)
{
using var cancellationSource = new CancellationTokenSource();
var waitForPredicateTask = WaitForPredicateAsync();

if (!await waitForPredicateTask.WaitAsync(timeout))
{
cancellationSource.Cancel();
await waitForPredicateTask;
return false;
}

return true;

async Task WaitForPredicateAsync()
{
var cancellationToken = cancellationSource.Token;
while (!cancellationToken.IsCancellationRequested)
{
if (await predicate()) { return; }
await Task.Delay(checkCadence?.TimeSpan ?? TimeSpan.FromMilliseconds(5));
}
}
}
}
}
Loading

0 comments on commit f3d6faf

Please sign in to comment.