Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expiring set lock free #6577

Merged
merged 12 commits into from
Jul 29, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,7 @@ protected override async Task OnClosingAsync()
}
await this.ReceiveLinkManager.CloseAsync().ConfigureAwait(false);
await this.RequestResponseLinkManager.CloseAsync().ConfigureAwait(false);
this.requestResponseLockedMessages.Close();
}

protected virtual async Task<IList<Message>> OnReceiveAsync(int maxMessageCount, TimeSpan serverWaitTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,64 +4,97 @@
namespace Microsoft.Azure.ServiceBus.Primitives
{
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

sealed class ConcurrentExpiringSet<TKey>
{
readonly ConcurrentDictionary<TKey, DateTime> dictionary;
readonly object cleanupSynObject = new object();
bool cleanupScheduled;
static TimeSpan delayBetweenCleanups = TimeSpan.FromSeconds(30);
readonly ICollection<KeyValuePair<TKey, DateTime>> dictionaryAsCollection;
readonly CancellationTokenSource tokenSource = new CancellationTokenSource();
volatile TaskCompletionSource<bool> cleanupTaskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
int closeSignaled;
bool closed;
static readonly TimeSpan delayBetweenCleanups = TimeSpan.FromSeconds(30);

public ConcurrentExpiringSet()
{
this.dictionary = new ConcurrentDictionary<TKey, DateTime>();
this.dictionaryAsCollection = dictionary;
_ = CollectExpiredEntriesAsync(tokenSource.Token);
danielmarbach marked this conversation as resolved.
Show resolved Hide resolved
}

public void AddOrUpdate(TKey key, DateTime expiration)
{
this.ThrowIfClosed();

this.dictionary[key] = expiration;
this.ScheduleCleanup();
this.cleanupTaskCompletionSource.TrySetResult(true);
}

public bool Contains(TKey key)
{
this.ThrowIfClosed();

return this.dictionary.TryGetValue(key, out var expiration) && expiration > DateTime.UtcNow;
}

void ScheduleCleanup()
public void Close()
{
lock (this.cleanupSynObject)
if (Interlocked.Exchange(ref this.closeSignaled, 1) != 0)
danielmarbach marked this conversation as resolved.
Show resolved Hide resolved
{
if (this.cleanupScheduled || this.dictionary.Count <= 0)
{
return;
}

this.cleanupScheduled = true;
Task.Run(async () => await this.CollectExpiredEntriesAsync().ConfigureAwait(false));
return;
}

this.closed = true;

this.tokenSource.Cancel();
this.cleanupTaskCompletionSource.TrySetCanceled();
this.dictionary.Clear();
this.tokenSource.Dispose();
}

async Task CollectExpiredEntriesAsync()
async Task CollectExpiredEntriesAsync(CancellationToken token)
{
await Task.Delay(delayBetweenCleanups);

lock (this.cleanupSynObject)
while (!token.IsCancellationRequested)
{
this.cleanupScheduled = false;
}
try
{
await this.cleanupTaskCompletionSource.Task.ConfigureAwait(false);
await Task.Delay(delayBetweenCleanups, token).ConfigureAwait(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this also throw ObjectDisposedException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it can given that dispose of the CTS only releases the timer and the linked registrations if any

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't completely know the inner details of CTS. I am going to trust your word that this would not throw ObjectDisposedException and also that this.dictionaryAsCollection.Remove(kvp); will not throw while iteration/dictionary.Clear() is in progress.

}
catch (OperationCanceledException)
{
return;
}

foreach (var key in this.dictionary.Keys)
{
if (DateTime.UtcNow > this.dictionary[key])
var isEmpty = true;
var utcNow = DateTime.UtcNow;
foreach (var kvp in this.dictionary)
{
this.dictionary.TryRemove(key, out _);
isEmpty = false;
var expiration = kvp.Value;
if (utcNow > expiration)
{
this.dictionaryAsCollection.Remove(kvp);
danielmarbach marked this conversation as resolved.
Show resolved Hide resolved
}
}

if (isEmpty)
{
this.cleanupTaskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
}
}
}

this.ScheduleCleanup();
void ThrowIfClosed()
{
if (closed)
{
throw new ObjectDisposedException($"ConcurrentExpiringSet has already been closed. Please create a new set instead.");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests.Primitives
{
using Microsoft.Azure.ServiceBus.Primitives;
using System;
using System.Threading.Tasks;
using Xunit;

public class ConcurrentExpiringSetTests
Expand All @@ -27,5 +28,40 @@ public void Contains_returns_false_for_expired_entry()
set.AddOrUpdate("testKey", DateTime.UtcNow - TimeSpan.FromSeconds(5));
Assert.False(set.Contains("testKey"), "The set should return false for an expired entry.");
}

[Fact]
public void Contains_throws_after_close()
{
var set = new ConcurrentExpiringSet<string>();
set.AddOrUpdate("testKey", DateTime.UtcNow + TimeSpan.FromSeconds(5));
set.Close();

Assert.Throws<ObjectDisposedException>(() => set.Contains("testKey"));
}

[Fact]
public void AddOrUpdate_throws_after_close()
{
var set = new ConcurrentExpiringSet<string>();
set.AddOrUpdate("testKey1", DateTime.UtcNow + TimeSpan.FromSeconds(5));
set.Close();

Assert.Throws<ObjectDisposedException>(() => set.AddOrUpdate("testKey2", DateTime.UtcNow - TimeSpan.FromSeconds(5)));
}

[Fact]
public void Close_is_idempotent_and_thread_safe()
{
var set = new ConcurrentExpiringSet<string>();

var ex = Record.Exception(() =>
{
set.Close();
set.Close();
Parallel.Invoke(() => set.Close(), () => set.Close());
});

Assert.Null(ex);
}
}
}