Skip to content

Commit

Permalink
buffer content if content length is not available
Browse files Browse the repository at this point in the history
  • Loading branch information
lmolkova committed Jan 24, 2024
1 parent 86d6f6c commit 11db45e
Showing 1 changed file with 58 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Microsoft.Extensions.Primitives;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
Expand Down Expand Up @@ -224,6 +225,7 @@ private static async Task<UpstreamResponse> SendUpstreamRequest(HttpRequest requ
Log($"StatusCode: {upstreamResponseMessage.StatusCode}");

Log("Headers:");

foreach (var header in upstreamResponseMessage.Headers)
{
Log($" {header.Key}:{header.Value.First()}");
Expand All @@ -245,15 +247,15 @@ private static async Task<UpstreamResponse> SendUpstreamRequest(HttpRequest requ
}

Log("Reading upstream response body...");
Log($"ContentLength: {upstreamResponseMessage.Content.Headers.ContentLength}");

return new UpstreamResponse()
{
StatusCode = (int)upstreamResponseMessage.StatusCode,
Headers = headers.ToArray(),
Content = upstreamResponseMessage.Content,
ContentLength = upstreamResponseMessage.Content.Headers.ContentLength
};
long? length = upstreamResponseMessage.Content.Headers.ContentLength;
Log($"ContentLength: {length}, is chunked: {upstreamResponseMessage.Headers.TransferEncodingChunked}");

var response = await UpstreamResponse.FromContent(upstreamResponseMessage.Content, cancellationToken);
response.StatusCode = (int)upstreamResponseMessage.StatusCode;
response.Headers = headers.ToArray();

return response;
}

private static async Task<bool> TryHandleResponseOption(string selection, HttpContext context, UpstreamResponse upstreamResponse)
Expand Down Expand Up @@ -322,7 +324,7 @@ private static async Task SendDownstreamResponse(UpstreamResponse upstreamRespon

Log($"Writing response body of {count} bytes...");

using Stream source = await upstreamResponse.Content.ReadAsStreamAsync(cancellationToken);
using Stream source = await upstreamResponse.GetContentStreamAsync(cancellationToken);
byte[] buffer = new byte[8192];

try
Expand Down Expand Up @@ -351,7 +353,7 @@ private static async Task SendDownstreamResponse(UpstreamResponse upstreamRespon
// disponse content as early as possible (before infinite wait that might happen later)
// so that underlying connection returns to connection pool
// and we won't run out of them
upstreamResponse.Content.Dispose();
upstreamResponse.Dispose();
}
}

Expand All @@ -378,14 +380,57 @@ private static void Log(object value)

private class UpstreamResponse : IDisposable
{
private readonly HttpContent? _content = null;
private readonly Stream? _contentStream = null;
private UpstreamResponse(HttpContent content)
{
_content = content;
ContentLength = _content.Headers.ContentLength ?? throw new ArgumentNullException("ContentLength must not be null.");
}

private UpstreamResponse(MemoryStream contentStream)
{
_contentStream = contentStream;
ContentLength = contentStream.Length;
}

public int StatusCode { get; set; }
public KeyValuePair<string, StringValues>[] Headers { get; set; }
public HttpContent Content { get; set; }
public long? ContentLength { get; set; }
public async Task<Stream> GetContentStreamAsync(CancellationToken cancellationToken)
{
return _contentStream != null ? _contentStream : await _content.ReadAsStreamAsync(cancellationToken);
}

public long ContentLength { get; }

public void Dispose()
{
Content.Dispose();
_content?.Dispose();
_contentStream?.Dispose();
}

public static async Task<UpstreamResponse> FromContent(HttpContent content, CancellationToken cancellationToken)
{
if (content.Headers.ContentLength == null)
{
MemoryStream contentStream = await BufferContentAsync(content, cancellationToken);
// we no longer need that content and can let the connection go back to the pool.
content.Dispose();
return new UpstreamResponse(contentStream);
}

return new UpstreamResponse(content);
}

private static async Task<MemoryStream> BufferContentAsync(HttpContent content, CancellationToken cancellationToken)
{
Debug.Assert(content.Headers.ContentLength == null, "We should not buffer content if length is available.");

byte[] contentBytes = await content.ReadAsByteArrayAsync(cancellationToken);
Log($"Response does not have content length and is chunked or malformed. Content was buffered, total length - {contentBytes.Length}.");

// comparing to buffering full content memory stream allocation is not such a big deal.
return new MemoryStream(contentBytes);
}
}
}
Expand Down

0 comments on commit 11db45e

Please sign in to comment.