From 310411075f74a5c26ea197252765b96d9189e0dc Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Sun, 4 Sep 2016 09:05:27 +0100 Subject: [PATCH] Slimmer locks --- .../Internal/Http/Frame.cs | 2 +- .../Internal/Http/SocketInput.cs | 71 +++++++++++-------- 2 files changed, 42 insertions(+), 31 deletions(-) diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/Frame.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/Frame.cs index 767130fd0..7b0c22dbd 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/Frame.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/Frame.cs @@ -383,7 +383,7 @@ public Task Stop() /// public void Abort(Exception error = null) { - if (Interlocked.CompareExchange(ref _requestAborted, 1, 0) == 0) + if (Interlocked.Exchange(ref _requestAborted, 1) == 0) { _requestProcessingStopping = true; diff --git a/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/SocketInput.cs b/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/SocketInput.cs index e73b72b77..a89fff7bf 100644 --- a/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/SocketInput.cs +++ b/src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/SocketInput.cs @@ -72,6 +72,8 @@ public MemoryPoolBlock IncomingStart() public void IncomingComplete(int count, Exception error) { + Action awaitableState; + lock (_sync) { // Must call Add() before bytes are available to consumer, to ensure that Length is >= 0 @@ -107,8 +109,10 @@ public void IncomingComplete(int count, Exception error) FinReceived(); } - Complete(); + awaitableState = Interlocked.Exchange(ref _awaitableState, _awaitableIsCompleted); } + + Complete(awaitableState); } public void IncomingDeferred() @@ -129,12 +133,8 @@ public void IncomingDeferred() } } - private void Complete() + private void Complete(Action awaitableState) { - var awaitableState = Interlocked.Exchange( - ref _awaitableState, - _awaitableIsCompleted); - _manualResetEvent.Set(); if (!ReferenceEquals(awaitableState, _awaitableIsCompleted) && @@ -146,21 +146,29 @@ private void Complete() public MemoryPoolIterator ConsumingStart() { + MemoryPoolBlock head; + bool isAlreadyConsuming; + lock (_sync) { - if (_consuming) - { - throw new InvalidOperationException("Already consuming input."); - } + isAlreadyConsuming = _consuming; + head = _head; _consuming = true; - return new MemoryPoolIterator(_head); } + + if (isAlreadyConsuming) + { + throw new InvalidOperationException("Already consuming input."); + } + + return new MemoryPoolIterator(head); } public void ConsumingComplete( MemoryPoolIterator consumed, MemoryPoolIterator examined) { + bool isConsuming; MemoryPoolBlock returnStart = null; MemoryPoolBlock returnEnd = null; @@ -184,7 +192,6 @@ public void ConsumingComplete( { // Everything has been consumed and no data is being written to the // _tail block, so return all blocks between _head and _tail inclusive. - returnEnd = null; _head = null; _tail = null; } @@ -214,31 +221,34 @@ public void ConsumingComplete( } else { + // Dispose won't have returned the blocks if we were consuming, so return them now returnStart = _head; - returnEnd = null; _head = null; _tail = null; } - ReturnBlocks(returnStart, returnEnd); - - if (!_consuming) - { - throw new InvalidOperationException("No ongoing consuming operation to complete."); - } + isConsuming = _consuming; _consuming = false; } + + ReturnBlocks(returnStart, returnEnd); + + if (!isConsuming) + { + throw new InvalidOperationException("No ongoing consuming operation to complete."); + } } public void CompleteAwaiting() { - Complete(); + Complete(Interlocked.Exchange(ref _awaitableState, _awaitableIsCompleted)); } public void AbortAwaiting() { SetConnectionError(new TaskCanceledException("The request was aborted")); - Complete(); + + CompleteAwaiting(); } public SocketInput GetAwaiter() @@ -253,15 +263,11 @@ public void OnCompleted(Action continuation) continuation, _awaitableIsNotCompleted); - if (ReferenceEquals(awaitableState, _awaitableIsNotCompleted)) - { - return; - } - else if (ReferenceEquals(awaitableState, _awaitableIsCompleted)) + if (ReferenceEquals(awaitableState, _awaitableIsCompleted)) { _threadPool.Run(continuation); } - else + else if (!ReferenceEquals(awaitableState, _awaitableIsNotCompleted)) { SetConnectionError(new InvalidOperationException("Concurrent reads are not supported.")); @@ -293,18 +299,23 @@ public void GetResult() public void Dispose() { + AbortAwaiting(); + + MemoryPoolBlock block = null; + lock (_sync) { - AbortAwaiting(); - if (!_consuming) { - ReturnBlocks(_head, null); + block = _head; _head = null; _tail = null; } + _disposed = true; } + + ReturnBlocks(block, null); } private static void ReturnBlocks(MemoryPoolBlock block, MemoryPoolBlock end)