Skip to content
This repository has been archived by the owner on Dec 18, 2018. It is now read-only.

Commit

Permalink
Slimmer locks
Browse files Browse the repository at this point in the history
  • Loading branch information
benaadams authored and halter73 committed Sep 28, 2016
1 parent 09fda74 commit 3104110
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ public Task Stop()
/// </summary>
public void Abort(Exception error = null)
{
if (Interlocked.CompareExchange(ref _requestAborted, 1, 0) == 0)
if (Interlocked.Exchange(ref _requestAborted, 1) == 0)
{
_requestProcessingStopping = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public MemoryPoolBlock IncomingStart()

public void IncomingComplete(int count, Exception error)
{
Action awaitableState;

lock (_sync)
{
// Must call Add() before bytes are available to consumer, to ensure that Length is >= 0
Expand Down Expand Up @@ -107,8 +109,10 @@ public void IncomingComplete(int count, Exception error)
FinReceived();
}

Complete();
awaitableState = Interlocked.Exchange(ref _awaitableState, _awaitableIsCompleted);
}

Complete(awaitableState);
}

public void IncomingDeferred()
Expand All @@ -129,12 +133,8 @@ public void IncomingDeferred()
}
}

private void Complete()
private void Complete(Action awaitableState)
{
var awaitableState = Interlocked.Exchange(
ref _awaitableState,
_awaitableIsCompleted);

_manualResetEvent.Set();

if (!ReferenceEquals(awaitableState, _awaitableIsCompleted) &&
Expand All @@ -146,21 +146,29 @@ private void Complete()

public MemoryPoolIterator ConsumingStart()
{
MemoryPoolBlock head;
bool isAlreadyConsuming;

lock (_sync)
{
if (_consuming)
{
throw new InvalidOperationException("Already consuming input.");
}
isAlreadyConsuming = _consuming;
head = _head;
_consuming = true;
return new MemoryPoolIterator(_head);
}

if (isAlreadyConsuming)
{
throw new InvalidOperationException("Already consuming input.");
}

return new MemoryPoolIterator(head);
}

public void ConsumingComplete(
MemoryPoolIterator consumed,
MemoryPoolIterator examined)
{
bool isConsuming;
MemoryPoolBlock returnStart = null;
MemoryPoolBlock returnEnd = null;

Expand All @@ -184,7 +192,6 @@ public void ConsumingComplete(
{
// Everything has been consumed and no data is being written to the
// _tail block, so return all blocks between _head and _tail inclusive.
returnEnd = null;
_head = null;
_tail = null;
}
Expand Down Expand Up @@ -214,31 +221,34 @@ public void ConsumingComplete(
}
else
{
// Dispose won't have returned the blocks if we were consuming, so return them now
returnStart = _head;
returnEnd = null;
_head = null;
_tail = null;
}

ReturnBlocks(returnStart, returnEnd);

if (!_consuming)
{
throw new InvalidOperationException("No ongoing consuming operation to complete.");
}
isConsuming = _consuming;
_consuming = false;
}

ReturnBlocks(returnStart, returnEnd);

if (!isConsuming)
{
throw new InvalidOperationException("No ongoing consuming operation to complete.");
}
}

public void CompleteAwaiting()
{
Complete();
Complete(Interlocked.Exchange(ref _awaitableState, _awaitableIsCompleted));
}

public void AbortAwaiting()
{
SetConnectionError(new TaskCanceledException("The request was aborted"));
Complete();

CompleteAwaiting();
}

public SocketInput GetAwaiter()
Expand All @@ -253,15 +263,11 @@ public void OnCompleted(Action continuation)
continuation,
_awaitableIsNotCompleted);

if (ReferenceEquals(awaitableState, _awaitableIsNotCompleted))
{
return;
}
else if (ReferenceEquals(awaitableState, _awaitableIsCompleted))
if (ReferenceEquals(awaitableState, _awaitableIsCompleted))
{
_threadPool.Run(continuation);
}
else
else if (!ReferenceEquals(awaitableState, _awaitableIsNotCompleted))
{
SetConnectionError(new InvalidOperationException("Concurrent reads are not supported."));

Expand Down Expand Up @@ -293,18 +299,23 @@ public void GetResult()

public void Dispose()
{
AbortAwaiting();

MemoryPoolBlock block = null;

lock (_sync)
{
AbortAwaiting();

if (!_consuming)
{
ReturnBlocks(_head, null);
block = _head;
_head = null;
_tail = null;
}

_disposed = true;
}

ReturnBlocks(block, null);
}

private static void ReturnBlocks(MemoryPoolBlock block, MemoryPoolBlock end)
Expand Down

0 comments on commit 3104110

Please sign in to comment.