Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.

Commit

Permalink
Revert two changes to thread requests (#14015)
Browse files Browse the repository at this point in the history
Reverting 99db31c and fd91ee1 to unblock others while trying to figure out what the issues are and how to fix them.

fd91ee1 is causing @benaadams thread pool perf test (https://github.com/benaadams/ThreadPoolTaskTesting) to hang due to a missed thread request. Somehow wsqActive is ending up at zero while there is a work item in the queue and with no pending thread requests. I don't understand how yet.

99db31c appears to have a potential issue because the order of MarkThreadRequestSatisfied and Dequeue are reversed. For instance, assuming a proc count of 1:
- Initial state: 1 work item enqueued, 1 thread request
- T1 Dispatch: dequeues a work item and requests a thread (0 work items, 1 thread request)
- T1 Dispatch: sees no more work items, returns
- T1 calls Dispatch again due to its own thread request
- T1 Dispatch: After Dequeue (which saw 0 work items) and before MarkThreadRequestSatisfied:
  - Current state: 0 work items, 1 thread request
  - T2 enqueues a work item, sees 1 thread request and does not request a thread (1 work item, 1 thread request)
- T1 Dispatch: MarkThreadRequestSatisfied decrements thread requests (1 work item, 0 thread requests)
- Now after T1 returns, it won't wake up again but there is still one work item in the queue
  • Loading branch information
kouvel authored Sep 15, 2017
1 parent a38aa24 commit b8dda0c
Showing 1 changed file with 30 additions and 98 deletions.
128 changes: 30 additions & 98 deletions src/mscorlib/src/System/Threading/ThreadPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,6 @@ internal static class WorkStealingQueueList

public static WorkStealingQueue[] Queues => _queues;

// Track whether the WorkStealingQueueList is empty
// Three states simplifies race conditions. They may be considered.
// Now Active --> Maybe Inactive -> Confirmed Inactive
public const int WsqNowActive = 2;
public static int wsqActive;

public static void Add(WorkStealingQueue queue)
{
Debug.Assert(queue != null);
Expand Down Expand Up @@ -400,9 +394,6 @@ public ThreadPoolWorkQueueThreadLocals EnsureCurrentThreadHasQueue() =>
ThreadPoolWorkQueueThreadLocals.threadLocals ??
(ThreadPoolWorkQueueThreadLocals.threadLocals = new ThreadPoolWorkQueueThreadLocals(this));

internal bool ThreadRequestNeeded(int count) => (count < ThreadPoolGlobals.processorCount) &&
(!workItems.IsEmpty || (WorkStealingQueueList.wsqActive > 0));

internal void EnsureThreadRequested()
{
//
Expand All @@ -413,7 +404,7 @@ internal void EnsureThreadRequested()
// which is handled by RequestWorkerThread.
//
int count = numOutstandingThreadRequests;
while (ThreadRequestNeeded(count))
while (count < ThreadPoolGlobals.processorCount)
{
int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests, count + 1, count);
if (prev == count)
Expand All @@ -425,7 +416,7 @@ internal void EnsureThreadRequested()
}
}

internal void MarkThreadRequestSatisfied(bool dequeueSuccessful)
internal void MarkThreadRequestSatisfied()
{
//
// The VM has called us, so one of our outstanding thread requests has been satisfied.
Expand All @@ -434,17 +425,8 @@ internal void MarkThreadRequestSatisfied(bool dequeueSuccessful)
// by the time we reach this point.
//
int count = numOutstandingThreadRequests;

while (count > 0)
{
if (dequeueSuccessful && (count == ThreadPoolGlobals.processorCount) && ThreadRequestNeeded(count - 1))
{
// If we gated threads due to too many outstanding requests and queue was not empty
// Request another thread.
ThreadPool.RequestWorkerThread();
return;
}

int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests, count - 1, count);
if (prev == count)
{
Expand All @@ -466,18 +448,6 @@ public void Enqueue(IThreadPoolWorkItem callback, bool forceGlobal)
if (null != tl)
{
tl.workStealingQueue.LocalPush(callback);

// We must guarantee wsqActive is set to WsqNowActive after we push
// The ordering must be global because we rely on other threads
// observing in this order
Interlocked.MemoryBarrier();

// We do not want to simply write. We want to prevent unnecessary writes
// which would invalidate reader's caches
if (WorkStealingQueueList.wsqActive != WorkStealingQueueList.WsqNowActive)
{
Volatile.Write(ref WorkStealingQueueList.wsqActive, WorkStealingQueueList.WsqNowActive);
}
}
else
{
Expand All @@ -495,56 +465,33 @@ internal bool LocalFindAndPop(IThreadPoolWorkItem callback)

public IThreadPoolWorkItem Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool missedSteal)
{
WorkStealingQueue localWsq = tl.workStealingQueue;
IThreadPoolWorkItem callback;
int wsqActiveObserved = WorkStealingQueueList.wsqActive;
if (wsqActiveObserved > 0)
{
WorkStealingQueue localWsq = tl.workStealingQueue;

if ((callback = localWsq.LocalPop()) == null && // first try the local queue
!workItems.TryDequeue(out callback)) // then try the global queue
if ((callback = localWsq.LocalPop()) == null && // first try the local queue
!workItems.TryDequeue(out callback)) // then try the global queue
{
// finally try to steal from another thread's local queue
WorkStealingQueue[] queues = WorkStealingQueueList.Queues;
int c = queues.Length;
Debug.Assert(c > 0, "There must at least be a queue for this thread.");
int maxIndex = c - 1;
int i = tl.random.Next(c);
while (c > 0)
{
// finally try to steal from another thread's local queue
WorkStealingQueue[] queues = WorkStealingQueueList.Queues;
int c = queues.Length;
Debug.Assert(c > 0, "There must at least be a queue for this thread.");
int maxIndex = c - 1;
int i = tl.random.Next(c);
while (c > 0)
i = (i < maxIndex) ? i + 1 : 0;
WorkStealingQueue otherQueue = queues[i];
if (otherQueue != localWsq && otherQueue.CanSteal)
{
i = (i < maxIndex) ? i + 1 : 0;
WorkStealingQueue otherQueue = queues[i];
if (otherQueue != localWsq && otherQueue.CanSteal)
callback = otherQueue.TrySteal(ref missedSteal);
if (callback != null)
{
callback = otherQueue.TrySteal(ref missedSteal);
if (callback != null)
{
break;
}
break;
}
c--;
}
if ((callback == null) && !missedSteal)
{
// Only decrement if the value is unchanged since we started looking for work
// This prevents multiple threads decrementing based on overlapping scans.
//
// When we decrement from active, the producer may have inserted a queue item during our scan
// therefore we cannot transition to empty
//
// When we decrement from Maybe Inactive, if the producer inserted a queue item during our scan,
// the producer must write Active. We may transition to empty briefly if we beat the
// producer's write, but the producer will then overwrite us before waking threads.
// So effectively we cannot mark the queue empty when an item is in the queue.
Interlocked.CompareExchange(ref WorkStealingQueueList.wsqActive, wsqActiveObserved - 1, wsqActiveObserved);
}
c--;
}
}
else
{
// We only need to look at the global queue since WorkStealingQueueList is inactive
workItems.TryDequeue(out callback);
}

return callback;
}
Expand All @@ -558,7 +505,15 @@ internal static bool Dispatch()
//
int quantumStartTime = Environment.TickCount;

bool markThreadRequestSatisfied = true;
//
// Update our records to indicate that an outstanding request for a thread has now been fulfilled.
// From this point on, we are responsible for requesting another thread if we stop working for any
// reason, and we believe there might still be work in the queue.
//
// Note that if this thread is aborted before we get a chance to request another one, the VM will
// record a thread request on our behalf. So we don't need to worry about getting aborted right here.
//
workQueue.MarkThreadRequestSatisfied();

// Has the desire for logging changed since the last time we entered?
workQueue.loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, FrameworkEventSource.Keywords.ThreadPool | FrameworkEventSource.Keywords.ThreadTransfer);
Expand Down Expand Up @@ -607,21 +562,7 @@ internal static bool Dispatch()
// If we found work, there may be more work. Ask for another thread so that the other work can be processed
// in parallel. Note that this will only ask for a max of #procs threads, so it's safe to call it for every dequeue.
//
if (markThreadRequestSatisfied)
{
//
// Update our records to indicate that an outstanding request for a thread has now been fulfilled
// and that an item was successfully dispatched and another thread may be needed
//
// From this point on, we are responsible for requesting another thread if we stop working for any
// reason, and we believe there might still be work in the queue.
//
// Note that if this thread is aborted before we get a chance to request another one, the VM will
// record a thread request on our behalf. So we don't need to worry about getting aborted right here.
//
workQueue.MarkThreadRequestSatisfied(true);
markThreadRequestSatisfied = false;
}
workQueue.EnsureThreadRequested();

//
// Execute the workitem outside of any finally blocks, so that it can be aborted if needed.
Expand Down Expand Up @@ -676,15 +617,6 @@ internal static bool Dispatch()
}
finally
{
if (markThreadRequestSatisfied)
{
//
// Update our records to indicate that an outstanding request for a thread has now been fulfilled
// and that an item was not successfully dispatched. We will request thread below if needed
//
workQueue.MarkThreadRequestSatisfied(false);
}

//
// If we are exiting for any reason other than that the queue is definitely empty, ask for another
// thread to pick up where we left off.
Expand Down

0 comments on commit b8dda0c

Please sign in to comment.