Skip to content

Commit

Permalink
Simplify thread pool to use BlockingCollection
Browse files Browse the repository at this point in the history
Instead of implementing a custom queue with custom locks, CustomThreadPool
now uses a single BlockingCollection to add elements, and a fixed set
of threads consuming elements concurrently using GetConsumingEnumerable.

This fixes bug #60 and #61, at the expense of losing the ability of shutting
down idle threads in the pool. We decreased the pool size from 10 to 5.
  • Loading branch information
rpaquay committed Jun 3, 2020
1 parent 3eaa64e commit d078f59
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 207 deletions.
2 changes: 0 additions & 2 deletions src/Server/Server.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,6 @@
<Compile Include="Program.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Server.cs" />
<Compile Include="Threads\ThreadObject.cs" />
<Compile Include="Threads\ThreadPool.cs" />
</ItemGroup>
<ItemGroup>
<None Include="Configuration\ChromiumEnlistmentDetection.patterns">
Expand Down
105 changes: 42 additions & 63 deletions src/Server/Threads/CustomThreadPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
// found in the LICENSE file.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.ComponentModel.Composition;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using VsChromium.Core.Logging;
using VsChromium.Core.Threads;
Expand All @@ -14,18 +17,22 @@ namespace VsChromium.Server.Threads {
public class CustomThreadPool : ICustomThreadPool {
private readonly IDateTimeProvider _dateTimeProvider;
private readonly object _lock = new object();
private readonly ThreadPool _threadPool;
private readonly object _queueLock = new object();
private readonly Queue<Action> _taskQueue = new Queue<Action>();
private readonly BlockingCollection<Action> _taskQueue = new BlockingCollection<Action>();
private readonly List<ThreadPoolEntry> _threadPool;
private readonly CancellationTokenSource _tokenSource = new CancellationTokenSource();

[ImportingConstructor]
public CustomThreadPool(IDateTimeProvider dateTimeProvider)
: this(dateTimeProvider, 10) {
: this(dateTimeProvider, 5) {
}

public CustomThreadPool(IDateTimeProvider dateTimeProvider, int capacity) {
_dateTimeProvider = dateTimeProvider;
_threadPool = new ThreadPool(dateTimeProvider, capacity);
_threadPool = new List<ThreadPoolEntry>();
_threadPool.AddRange(Enumerable.Range(0, capacity).Select(i => new ThreadPoolEntry(i, _taskQueue, _tokenSource.Token)));

// Start threads right away
_threadPool.ForEach(x => x.Start());
}

public void RunAsync(Action task) {
Expand All @@ -44,74 +51,46 @@ public void RunAsync(Action task, TimeSpan delay) {
Task.Delay(delay).ContinueWith(_ => RunAsync(task, TimeSpan.Zero));
} else {
// Enqueue
lock (_queueLock) {
_taskQueue.Enqueue(task);
}

// Process queue if any available thread
ProcessQueueAsync();
_taskQueue.Add(task);
}
}

private void ProcessQueueAsync() {
var thread = TryAcquireThread();
if (thread != null) {
thread.RunAsync(() => ProcessQueueAndReleaseThread(thread));
private class ThreadPoolEntry {
private readonly int _id;
private readonly Thread _thread;
private readonly BlockingCollection<Action> _queue;
private readonly CancellationToken _token;

public ThreadPoolEntry(int id, BlockingCollection<Action> queue, CancellationToken token) {
_id = id;
_thread = new Thread(ThreadLoop);
_queue = queue;
_token = token;
}
}

private void ProcessQueueAndReleaseThread(ThreadObject thread) {
try {
ProcessQueue();
} finally {
ReleaseThread(thread);
public void Start() {
_thread.Priority = ThreadPriority.AboveNormal;
_thread.Name = String.Format("CustomThread #{0}", _id);
_thread.IsBackground = true;
_thread.Start();
}

// There may have been more items enqueued concurrently: Mote tasks may have been
// enueue we may have been
// releasing the thread while
// schedule another queue processing if needed
bool queueIsEmpty;
lock (_queueLock) {
queueIsEmpty = _taskQueue.Count == 0;
}
if (!queueIsEmpty) {
ProcessQueueAsync();
}
}

private void ProcessQueue() {
// Process all items in the queue. This happens concurrently on
// all active threads of the thread pool.
while (true) {
Action task = TryGetTaskFromQueue();

// Queue is empty, bail
if (task == null) {
break;
}

private void ThreadLoop() {
try {
task();
} catch (Exception e) {
// TODO(rpaquay): Do we want to propage the exception here?
Logger.LogError(e, "Error executing task on custom thread pool.");
foreach (var task in _queue.GetConsumingEnumerable(_token)) {
try {
task();
} catch (Exception e) {
// TODO(rpaquay): Do we want to propage the exception here?
Logger.LogError(e, "Error executing task on custom thread pool, moving on to next task");
}
}
} catch(OperationCanceledException e) {
Logger.LogInfo(e, "Queue has been cancelled, terminating thread");
} catch(Exception e) {
Logger.LogError(e, "Error consuming tasks from queue, terminating thread");
}
}
}

private Action TryGetTaskFromQueue() {
lock (_queueLock) {
return (_taskQueue.Count == 0) ? null : _taskQueue.Dequeue();
}
}

private ThreadObject TryAcquireThread() {
return _threadPool.TryAcquireThread();
}

private void ReleaseThread(ThreadObject thread) {
thread.Release();
}
}
}
85 changes: 0 additions & 85 deletions src/Server/Threads/ThreadObject.cs

This file was deleted.

57 changes: 0 additions & 57 deletions src/Server/Threads/ThreadPool.cs

This file was deleted.

0 comments on commit d078f59

Please sign in to comment.