Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.

Commit

Permalink
ThreadPool.SparseArray improvement for DequeueSteal
Browse files Browse the repository at this point in the history
  • Loading branch information
benaadams committed Jun 27, 2016
1 parent 21c3d55 commit dd47ed0
Showing 1 changed file with 75 additions and 24 deletions.
99 changes: 75 additions & 24 deletions src/mscorlib/src/System/Threading/ThreadPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,42 +71,78 @@ internal sealed class ThreadPoolWorkQueue
// Simple sparsely populated array to allow lock-free reading.
internal class SparseArray<T> where T : class
{
private volatile T[] m_array;
private Snapshot m_current;

internal sealed class Snapshot
{
public readonly T[] Data;
public readonly int Mask;
private int m_length;

internal Snapshot(int initialSize, int initalLength = 0)
{
if ((initialSize >> 1) << 1 != initialSize)
{
throw new ArgumentOutOfRangeException(nameof(initialSize), "initialSize must be a power of 2");
}

Data = new T[initialSize];
Mask = initialSize - 1;
m_length = initalLength;
}

internal int ActiveLength => m_length;

internal void IncrementLength()
{
m_length++;
}
}

internal SparseArray(int initialSize)
{
m_array = new T[initialSize];
m_current = new Snapshot(initialSize);
}

internal T[] Current
internal Snapshot Current
{
get { return m_array; }
get { return m_current; }
}

internal int Add(T e)
{
while (true)
{
T[] array = m_array;
lock (array)
var current = m_current;
lock (current)
{
if (current != m_current)
{
// If there was a race condition, we start over again.
continue;
}

var array = current.Data;

for (int i = 0; i < array.Length; i++)
{
if (array[i] == null)
{
if (i > current.ActiveLength)
{
current.IncrementLength();
}
Volatile.Write(ref array[i], e);
return i;
}
else if (i == array.Length - 1)
{
// Must resize. If there was a race condition, we start over again.
if (array != m_array)
continue;
var newSnapshot = new Snapshot(array.Length * 2, array.Length + 1);
T[] newArray = newSnapshot.Data;

T[] newArray = new T[array.Length * 2];
Array.Copy(array, newArray, i + 1);
newArray[i + 1] = e;
m_array = newArray;
m_current = newSnapshot;
return i + 1;
}
}
Expand All @@ -116,16 +152,29 @@ internal int Add(T e)

internal void Remove(T e)
{
T[] array = m_array;
lock (array)
while (true)
{
for (int i = 0; i < m_array.Length; i++)
var current = m_current;
lock (current)
{
if (m_array[i] == e)
if (current != m_current)
{
Volatile.Write(ref m_array[i], null);
break;
// If there was a race condition, we start over again.
continue;
}

var array = current.Data;
var length = current.ActiveLength;

for (int i = 0; i < length; i++)
{
if (array[i] == e)
{
Volatile.Write(ref array[i], null);
break;
}
}
break;
}
}
}
Expand Down Expand Up @@ -741,12 +790,14 @@ private void DequeueSeek(ThreadPoolWorkQueueThreadLocals tl, ref IThreadPoolWork
private static void DequeueSteal(ThreadPoolWorkQueueThreadLocals tl, ref IThreadPoolWorkItem callback, ref bool missedSteal)
{
WorkStealingQueue wsq = tl.workStealingQueue;
WorkStealingQueue[] otherQueues = allThreadQueues.Current;
int i = tl.random.Next(otherQueues.Length);
int c = otherQueues.Length;
while (c > 0)
var otherQueues = allThreadQueues.Current;
var remaining = otherQueues.ActiveLength;
var i = tl.random.Next(remaining);
var data = otherQueues.Data;
var mask = otherQueues.Mask;
while (remaining > 0)
{
WorkStealingQueue otherQueue = Volatile.Read(ref otherQueues[i % otherQueues.Length]);
WorkStealingQueue otherQueue = Volatile.Read(ref data[i & mask]);
if (otherQueue != null &&
otherQueue != wsq &&
otherQueue.TrySteal(out callback, ref missedSteal))
Expand All @@ -755,7 +806,7 @@ private static void DequeueSteal(ThreadPoolWorkQueueThreadLocals tl, ref IThread
break;
}
i++;
c--;
remaining--;
}
}

Expand Down Expand Up @@ -1766,7 +1817,7 @@ internal static bool TryPopCustomWorkItem(IThreadPoolWorkItem workItem)
[SecurityCritical]
internal static IEnumerable<IThreadPoolWorkItem> GetQueuedWorkItems()
{
return EnumerateQueuedWorkItems(ThreadPoolWorkQueue.allThreadQueues.Current, ThreadPoolGlobals.workQueue.queueTail);
return EnumerateQueuedWorkItems(ThreadPoolWorkQueue.allThreadQueues.Current.Data, ThreadPoolGlobals.workQueue.queueTail);
}

internal static IEnumerable<IThreadPoolWorkItem> EnumerateQueuedWorkItems(ThreadPoolWorkQueue.WorkStealingQueue[] wsQueues, ThreadPoolWorkQueue.QueueSegment globalQueueTail)
Expand Down

0 comments on commit dd47ed0

Please sign in to comment.