Skip to content

Commit

Permalink
Switch clear to close
Browse files Browse the repository at this point in the history
  • Loading branch information
danielmarbach committed Jul 19, 2019
1 parent 7193e01 commit febefba
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1008,7 +1008,7 @@ protected override async Task OnClosingAsync()
}
await this.ReceiveLinkManager.CloseAsync().ConfigureAwait(false);
await this.RequestResponseLinkManager.CloseAsync().ConfigureAwait(false);
this.requestResponseLockedMessages.Clear();
this.requestResponseLockedMessages.Close();
}

protected virtual async Task<IList<Message>> OnReceiveAsync(int maxMessageCount, TimeSpan serverWaitTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ sealed class ConcurrentExpiringSet<TKey>
{
readonly ConcurrentDictionary<TKey, DateTime> dictionary;
readonly ICollection<KeyValuePair<TKey, DateTime>> dictionaryAsCollection;
CancellationTokenSource tokenSource = new CancellationTokenSource(); // doesn't need to be disposed because it doesn't own a timer
readonly CancellationTokenSource tokenSource = new CancellationTokenSource();
volatile TaskCompletionSource<bool> cleanupTaskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
volatile int closeSignaled;
bool closed;
static readonly TimeSpan delayBetweenCleanups = TimeSpan.FromSeconds(30);

public ConcurrentExpiringSet()
Expand All @@ -26,23 +28,32 @@ public ConcurrentExpiringSet()

public void AddOrUpdate(TKey key, DateTime expiration)
{
this.ThrowIfClosed();

this.dictionary[key] = expiration;
this.cleanupTaskCompletionSource.TrySetResult(true);
}

public bool Contains(TKey key)
{
this.ThrowIfClosed();

return this.dictionary.TryGetValue(key, out var expiration) && expiration > DateTime.UtcNow;
}

public void Clear()
public void Close()
{
if (Interlocked.Exchange(ref this.closeSignaled, 1) != 0)
{
return;
}

this.closed = true;

this.tokenSource.Cancel();
this.dictionary.Clear();
this.cleanupTaskCompletionSource.TrySetCanceled();
this.cleanupTaskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
this.tokenSource = new CancellationTokenSource();
_ = CollectExpiredEntriesAsync(tokenSource.Token);
this.dictionary.Clear();
this.tokenSource.Dispose();
}

async Task CollectExpiredEntriesAsync(CancellationToken token)
Expand Down Expand Up @@ -70,5 +81,13 @@ async Task CollectExpiredEntriesAsync(CancellationToken token)
}
}
}

void ThrowIfClosed()
{
if (closed)
{
throw new ObjectDisposedException($"ConcurrentExpiringSet has already been closed. Please create a new set instead.");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace Microsoft.Azure.ServiceBus.UnitTests.Primitives
{
using Microsoft.Azure.ServiceBus.Primitives;
using System;
using System.Threading.Tasks;
using Xunit;

public class ConcurrentExpiringSetTests
Expand All @@ -29,25 +30,38 @@ public void Contains_returns_false_for_expired_entry()
}

[Fact]
public void Contains_returns_false_after_clear()
public void Contains_throws_after_close()
{
var set = new ConcurrentExpiringSet<string>();
set.AddOrUpdate("testKey", DateTime.UtcNow + TimeSpan.FromSeconds(5));
set.Clear();
set.Close();

Assert.False(set.Contains("testKey"), "The set should return false after clear.");
Assert.Throws<ObjectDisposedException>(() => set.Contains("testKey"));
}

[Fact]
public void Contains_returns_false_after_clear_for_added_expired_keys()
public void AddOrUpdate_throws_after_close()
{
var set = new ConcurrentExpiringSet<string>();
set.AddOrUpdate("testKey1", DateTime.UtcNow + TimeSpan.FromSeconds(5));
set.Clear();
set.AddOrUpdate("testKey2", DateTime.UtcNow - TimeSpan.FromSeconds(5));
set.Close();

Assert.False(set.Contains("testKey1"), "The set should return false after clear.");
Assert.False(set.Contains("testKey2"), "The set should return false for an expired entry.");
Assert.Throws<ObjectDisposedException>(() => set.AddOrUpdate("testKey2", DateTime.UtcNow - TimeSpan.FromSeconds(5)));
}

[Fact]
public void Close_is_idempotent_and_thread_safe()
{
var set = new ConcurrentExpiringSet<string>();

var ex = Record.Exception(() =>
{
set.Close();
set.Close();
Parallel.Invoke(() => set.Close(), () => set.Close());
});

Assert.Null(ex);
}
}
}

0 comments on commit febefba

Please sign in to comment.