Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Http 1.1's read-ahead task management #103257

Merged
merged 2 commits into from
Jun 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ private void ProcessHttp11RequestQueue(HttpConnection? connection)
{
if (connection is not null || _http11Connections.TryPop(out connection))
{
// If the connection is new, this check will always succeed as there is no scavenging task pending.
if (!connection.TryOwnScavengingTaskCompletion())
{
goto DisposeConnection;
}

// TryDequeueWaiter will prune completed requests from the head of the queue,
// so it's possible for it to return false even though we checked that Count != 0.
bool success = _http11RequestQueue.TryDequeueWaiter(this, out waiter);
Expand Down Expand Up @@ -144,10 +150,19 @@ private void ProcessHttp11RequestQueue(HttpConnection? connection)
// and set the _http11RequestQueueIsEmptyAndNotDisposed flag to false, followed by multiple
// returning connections observing the flag and calling into this method before we clear the flag.
// This should be a relatively rare case, so the added contention should be minimal.

// We took ownership of the scavenging task completion.
// If we can't return the completion (the task already completed), we must dispose the connection.
if (!connection.TryReturnScavengingTaskCompletionOwnership())
{
goto DisposeConnection;
}

_http11Connections.Push(connection);
}
else
{
// We may be out of available connections, check if we should inject a new one.
CheckForHttp11ConnectionInjection();
}

Expand All @@ -163,11 +178,31 @@ private void ProcessHttp11RequestQueue(HttpConnection? connection)
// before signaling the waiter. This is intentional, as the fact that
// this method was called indicates that the connection is either new,
// or was just returned to the pool and is still in a good state.
//
// We must, however, take ownership of the scavenging task completion as
// there is a small chance that such a task was started if the connection
// was briefly returned to the pool.
return;
}

// The request was already cancelled or handled by a different connection.

// We took ownership of the scavenging task completion.
// If we can't return the completion (the task already completed), we must dispose the connection.
if (!connection.TryReturnScavengingTaskCompletionOwnership())
{
goto DisposeConnection;
}

// Loop again to try to find another request to signal, or return the connection.
continue;

DisposeConnection:
// The scavenging task completed before we assigned a request to the connection.
// We've received EOF/erroneous data and the connection is not usable anymore.
// Throw it away and try again.
connection.Dispose();
connection = null;
}

if (_disposed)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ internal sealed partial class HttpConnection : HttpConnectionBase
private const int ReadAheadTask_NotStarted = 0;
private const int ReadAheadTask_Started = 1;
private const int ReadAheadTask_CompletionReserved = 2;
private const int ReadAheadTask_Completed = 3;
private int _readAheadTaskStatus;
private ValueTask<int> _readAheadTask;
private ArrayBuffer _readBuffer;
Expand Down Expand Up @@ -118,8 +119,11 @@ private void Dispose(bool disposing)
}
}

private bool ReadAheadTaskHasStarted =>
_readAheadTaskStatus != ReadAheadTask_NotStarted;

/// <summary>Prepare an idle connection to be used for a new request.
/// The caller MUST call SendAsync afterwards if this method returns true.</summary>
/// The caller MUST call SendAsync afterwards if this method returns true, or dispose the connection if it returns false.</summary>
/// <param name="async">Indicates whether the coming request will be sync or async.</param>
/// <returns>True if connection can be used, false if it is invalid due to a timeout or receiving EOF or unexpected data.</returns>
public bool PrepareForReuse(bool async)
Expand All @@ -133,7 +137,9 @@ public bool PrepareForReuse(bool async)
// If the read-ahead task is completed, then we've received either EOF or erroneous data the connection, so it's not usable.
if (ReadAheadTaskHasStarted)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could still this be race condition? e.g. It did not start it but it will before we get to the next statement.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the time you get to PrepareForReuse, the caller is the only thing with a reference to this connection.

It's possible that the read was already started and is transitioning to Completed right under you, but it can't transition from NotStarted => something else, or from something to => NotStarted.

{
return TryOwnReadAheadTaskCompletion();
Debug.Assert(_readAheadTaskStatus is ReadAheadTask_Started or ReadAheadTask_Completed);

return Interlocked.Exchange(ref _readAheadTaskStatus, ReadAheadTask_CompletionReserved) == ReadAheadTask_Started;
}

// Check to see if we've received anything on the connection; if we have, that's
Expand Down Expand Up @@ -177,6 +183,35 @@ public bool PrepareForReuse(bool async)
}
}

/// <summary>Takes ownership of the scavenging task completion if it was started.
/// The caller MUST call either SendAsync or return the completion ownership afterwards if this method returns true, or dispose the connection if it returns false.</summary>
public bool TryOwnScavengingTaskCompletion()
{
Debug.Assert(_readAheadTaskStatus != ReadAheadTask_CompletionReserved);

return !ReadAheadTaskHasStarted
|| Interlocked.Exchange(ref _readAheadTaskStatus, ReadAheadTask_CompletionReserved) == ReadAheadTask_Started;
}

/// <summary>Returns ownership of the scavenging task completion if it was started.
/// The caller MUST Dispose the connection afterwards if this method returns false.</summary>
public bool TryReturnScavengingTaskCompletionOwnership()
{
Debug.Assert(_readAheadTaskStatus != ReadAheadTask_Started);

if (!ReadAheadTaskHasStarted ||
Interlocked.Exchange(ref _readAheadTaskStatus, ReadAheadTask_Started) == ReadAheadTask_CompletionReserved)
{
return true;
}

// The read-ahead task has started, and we failed to transition back to Started.
// This means that the read-ahead task has completed, and we can't reuse the connection. The caller must dispose it.
// We're still responsible for observing potential exceptions thrown by the read-ahead task to avoid leaking unobserved exceptions.
LogExceptions(_readAheadTask.AsTask());
return false;
}

/// <summary>Check whether a currently idle connection is still usable, or should be scavenged.</summary>
/// <returns>True if connection can be used, false if it is invalid due to a timeout or receiving EOF or unexpected data.</returns>
public override bool CheckUsabilityOnScavenge()
Expand All @@ -187,21 +222,7 @@ public override bool CheckUsabilityOnScavenge()
}

// We may already have a read-ahead task if we did a previous scavenge and haven't used the connection since.
EnsureReadAheadTaskHasStarted();

// If the read-ahead task is completed, then we've received either EOF or erroneous data the connection, so it's not usable.
return !_readAheadTask.IsCompleted;
}

private bool ReadAheadTaskHasStarted =>
_readAheadTaskStatus != ReadAheadTask_NotStarted;

private bool TryOwnReadAheadTaskCompletion() =>
Interlocked.CompareExchange(ref _readAheadTaskStatus, ReadAheadTask_CompletionReserved, ReadAheadTask_Started) == ReadAheadTask_Started;

private void EnsureReadAheadTaskHasStarted()
{
if (_readAheadTaskStatus == ReadAheadTask_NotStarted)
if (!ReadAheadTaskHasStarted)
{
Debug.Assert(_readAheadTask == default);

Expand All @@ -212,6 +233,9 @@ private void EnsureReadAheadTaskHasStarted()
#pragma warning restore CA2012
}

// If the read-ahead task is completed, then we've received either EOF or erroneous data the connection, so it's not usable.
return !_readAheadTask.IsCompleted;

async ValueTask<int> ReadAheadWithZeroByteReadAsync()
{
Debug.Assert(_readAheadTask == default);
Expand All @@ -231,19 +255,26 @@ async ValueTask<int> ReadAheadWithZeroByteReadAsync()
// PrepareForReuse will check TryOwnReadAheadTaskCompletion before calling into SendAsync.
// If we can own the completion from within the read-ahead task, it means that PrepareForReuse hasn't been called yet.
// In that case we've received EOF/erroneous data before we sent the request headers, and the connection can't be reused.
if (TryOwnReadAheadTaskCompletion())
if (TransitionToCompletedAndTryOwnCompletion())
{
if (NetEventSource.Log.IsEnabled()) Trace("Read-ahead task observed data before the request was sent.");
}

return read;
}
catch (Exception error) when (TryOwnReadAheadTaskCompletion())
catch (Exception error) when (TransitionToCompletedAndTryOwnCompletion())
{
if (NetEventSource.Log.IsEnabled()) Trace($"Error performing read ahead: {error}");

return 0;
}

bool TransitionToCompletedAndTryOwnCompletion()
{
Debug.Assert(_readAheadTaskStatus is ReadAheadTask_Started or ReadAheadTask_CompletionReserved);

return Interlocked.Exchange(ref _readAheadTaskStatus, ReadAheadTask_Completed) == ReadAheadTask_Started;
}
}
}

Expand Down Expand Up @@ -497,7 +528,8 @@ public async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, boo
{
Debug.Assert(_currentRequest == null, $"Expected null {nameof(_currentRequest)}.");
Debug.Assert(_readBuffer.ActiveLength == 0, "Unexpected data in read buffer");
Debug.Assert(_readAheadTaskStatus != ReadAheadTask_Started);
Debug.Assert(_readAheadTaskStatus != ReadAheadTask_Started,
"The caller should have called PrepareForReuse or TryOwnScavengingTaskCompletion if the connection was idle on the pool.");

MarkConnectionAsNotIdle();

Expand Down
Loading