Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make ConcurrentExpiringSet not leak the cleanup task for the period of delayBetweenCleanups #6576

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,7 @@ protected override async Task OnClosingAsync()
}
await this.ReceiveLinkManager.CloseAsync().ConfigureAwait(false);
await this.RequestResponseLinkManager.CloseAsync().ConfigureAwait(false);
this.requestResponseLockedMessages.Clear();
}

protected virtual async Task<IList<Message>> OnReceiveAsync(int maxMessageCount, TimeSpan serverWaitTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,45 @@
namespace Microsoft.Azure.ServiceBus.Primitives
{
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

sealed class ConcurrentExpiringSet<TKey>
{
readonly ConcurrentDictionary<TKey, DateTime> dictionary;
readonly ICollection<KeyValuePair<TKey, DateTime>> dictionaryAsCollection;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this help?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will answer later

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConcurrentDictionary implements the ICollection<KeyValuePair<TKey, TValue>> interface implicitly. By having a reference to that we can use the Remove(KeyValuePair<TKey, TValue> kvp) method which calls into TryRemoveInternal with matchValue set to true which then makes sure that the value matches the reference we get from looping over. This is basically the more accurate implementation that avoids removing a key when the value was updated in the meantime.

readonly object cleanupSynObject = new object();
CancellationTokenSource tokenSource = new CancellationTokenSource(); // doesn't need to be disposed because it doesn't own a timer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe CancellationTokenSource implements IDisposable and I think it might internally use a timer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does. As far as I remember it only allocates disposable resources like timers when used with timeouts. Happy though to add

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See https://referencesource.microsoft.com/#mscorlib/system/threading/CancellationTokenSource.cs,554 that's why I did not implement the dispose call because we are not using the registrations or the timer path of the disposable

bool cleanupScheduled;
static TimeSpan delayBetweenCleanups = TimeSpan.FromSeconds(30);
static readonly TimeSpan delayBetweenCleanups = TimeSpan.FromSeconds(30);

public ConcurrentExpiringSet()
{
this.dictionary = new ConcurrentDictionary<TKey, DateTime>();
this.dictionaryAsCollection = dictionary;
}

public void AddOrUpdate(TKey key, DateTime expiration)
{
this.dictionary[key] = expiration;
this.ScheduleCleanup();
this.ScheduleCleanup(tokenSource.Token);
}

public bool Contains(TKey key)
{
return this.dictionary.TryGetValue(key, out var expiration) && expiration > DateTime.UtcNow;
}

void ScheduleCleanup()
public void Clear()
{
this.tokenSource.Cancel();
this.dictionary.Clear();
this.tokenSource = new CancellationTokenSource();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be disposed of as it implements IDisposable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above

}

void ScheduleCleanup(CancellationToken token)
{
lock (this.cleanupSynObject)
{
Expand All @@ -40,28 +52,38 @@ void ScheduleCleanup()
}

this.cleanupScheduled = true;
Task.Run(async () => await this.CollectExpiredEntriesAsync().ConfigureAwait(false));
_ = this.CollectExpiredEntriesAsync(token);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will start a task that will do the collection without waiting for the cleanup to happen -- is this the design?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes see previous implementation did a fire and forget Task.Run

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to assign the result of the method call if you don't plan to access it later (ie. no need for _ = )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is technically accurate. Though I consider it best practice to explicitly ignore not consumed tasks. Because if at any time this method would return a task the compiler would start exposing warning. I have to double check the VS diagbostics if they even detect dropped task in void methods

}
}

async Task CollectExpiredEntriesAsync()
async Task CollectExpiredEntriesAsync(CancellationToken token)
{
await Task.Delay(delayBetweenCleanups);

lock (this.cleanupSynObject)
try
{
this.cleanupScheduled = false;
await Task.Delay(delayBetweenCleanups, token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
return;
}
finally
{
lock (this.cleanupSynObject)
{
this.cleanupScheduled = false;
}
}

foreach (var key in this.dictionary.Keys)
foreach (var kvp in this.dictionary)
{
if (DateTime.UtcNow > this.dictionary[key])
var expiration = kvp.Value;
if (DateTime.UtcNow > expiration)
{
this.dictionary.TryRemove(key, out _);
this.dictionaryAsCollection.Remove(kvp);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this will modify the collection while you are iterating over it, won't it?

Since both dictionary and dictionaryAsCollection point to the same object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}

this.ScheduleCleanup();
this.ScheduleCleanup(token);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the token is canceled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will cancel everything and not throw see try catch further down. Are you hinting that you'd like an realy exit before acquiring the lock?

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,27 @@ public void Contains_returns_false_for_expired_entry()
set.AddOrUpdate("testKey", DateTime.UtcNow - TimeSpan.FromSeconds(5));
Assert.False(set.Contains("testKey"), "The set should return false for an expired entry.");
}

[Fact]
AlexGhiondea marked this conversation as resolved.
Show resolved Hide resolved
public void Contains_returns_false_after_clear()
{
var set = new ConcurrentExpiringSet<string>();
set.AddOrUpdate("testKey", DateTime.UtcNow + TimeSpan.FromSeconds(5));
set.Clear();

Assert.False(set.Contains("testKey"), "The set should return false after clear.");
}

[Fact]
public void Contains_returns_false_after_clear_for_added_expired_keys()
{
var set = new ConcurrentExpiringSet<string>();
set.AddOrUpdate("testKey1", DateTime.UtcNow + TimeSpan.FromSeconds(5));
set.Clear();
set.AddOrUpdate("testKey2", DateTime.UtcNow - TimeSpan.FromSeconds(5));

Assert.False(set.Contains("testKey1"), "The set should return false after clear.");
Assert.False(set.Contains("testKey2"), "The set should return false for an expired entry.");
}
}
}