From 11db45ea686bcfefc8f14a4a863eee4a97f5f48f Mon Sep 17 00:00:00 2001 From: Liudmila Molkova Date: Tue, 23 Jan 2024 18:59:53 -0800 Subject: [PATCH] buffer content if content length is not available --- .../Program.cs | 71 +++++++++++++++---- 1 file changed, 58 insertions(+), 13 deletions(-) diff --git a/tools/http-fault-injector/Azure.Sdk.Tools.HttpFaultInjector/Program.cs b/tools/http-fault-injector/Azure.Sdk.Tools.HttpFaultInjector/Program.cs index 631a84f0698a..7c9d5bee413a 100644 --- a/tools/http-fault-injector/Azure.Sdk.Tools.HttpFaultInjector/Program.cs +++ b/tools/http-fault-injector/Azure.Sdk.Tools.HttpFaultInjector/Program.cs @@ -7,6 +7,7 @@ using Microsoft.Extensions.Primitives; using System; using System.Collections.Generic; +using System.Diagnostics; using System.IO; using System.Linq; using System.Net; @@ -224,6 +225,7 @@ private static async Task SendUpstreamRequest(HttpRequest requ Log($"StatusCode: {upstreamResponseMessage.StatusCode}"); Log("Headers:"); + foreach (var header in upstreamResponseMessage.Headers) { Log($" {header.Key}:{header.Value.First()}"); @@ -245,15 +247,15 @@ private static async Task SendUpstreamRequest(HttpRequest requ } Log("Reading upstream response body..."); - Log($"ContentLength: {upstreamResponseMessage.Content.Headers.ContentLength}"); - return new UpstreamResponse() - { - StatusCode = (int)upstreamResponseMessage.StatusCode, - Headers = headers.ToArray(), - Content = upstreamResponseMessage.Content, - ContentLength = upstreamResponseMessage.Content.Headers.ContentLength - }; + long? length = upstreamResponseMessage.Content.Headers.ContentLength; + Log($"ContentLength: {length}, is chunked: {upstreamResponseMessage.Headers.TransferEncodingChunked}"); + + var response = await UpstreamResponse.FromContent(upstreamResponseMessage.Content, cancellationToken); + response.StatusCode = (int)upstreamResponseMessage.StatusCode; + response.Headers = headers.ToArray(); + + return response; } private static async Task TryHandleResponseOption(string selection, HttpContext context, UpstreamResponse upstreamResponse) @@ -322,7 +324,7 @@ private static async Task SendDownstreamResponse(UpstreamResponse upstreamRespon Log($"Writing response body of {count} bytes..."); - using Stream source = await upstreamResponse.Content.ReadAsStreamAsync(cancellationToken); + using Stream source = await upstreamResponse.GetContentStreamAsync(cancellationToken); byte[] buffer = new byte[8192]; try @@ -351,7 +353,7 @@ private static async Task SendDownstreamResponse(UpstreamResponse upstreamRespon // disponse content as early as possible (before infinite wait that might happen later) // so that underlying connection returns to connection pool // and we won't run out of them - upstreamResponse.Content.Dispose(); + upstreamResponse.Dispose(); } } @@ -378,14 +380,57 @@ private static void Log(object value) private class UpstreamResponse : IDisposable { + private readonly HttpContent? _content = null; + private readonly Stream? _contentStream = null; + private UpstreamResponse(HttpContent content) + { + _content = content; + ContentLength = _content.Headers.ContentLength ?? throw new ArgumentNullException("ContentLength must not be null."); + } + + private UpstreamResponse(MemoryStream contentStream) + { + _contentStream = contentStream; + ContentLength = contentStream.Length; + } + public int StatusCode { get; set; } public KeyValuePair[] Headers { get; set; } - public HttpContent Content { get; set; } - public long? ContentLength { get; set; } + public async Task GetContentStreamAsync(CancellationToken cancellationToken) + { + return _contentStream != null ? _contentStream : await _content.ReadAsStreamAsync(cancellationToken); + } + + public long ContentLength { get; } public void Dispose() { - Content.Dispose(); + _content?.Dispose(); + _contentStream?.Dispose(); + } + + public static async Task FromContent(HttpContent content, CancellationToken cancellationToken) + { + if (content.Headers.ContentLength == null) + { + MemoryStream contentStream = await BufferContentAsync(content, cancellationToken); + // we no longer need that content and can let the connection go back to the pool. + content.Dispose(); + return new UpstreamResponse(contentStream); + } + + return new UpstreamResponse(content); + } + + private static async Task BufferContentAsync(HttpContent content, CancellationToken cancellationToken) + { + Debug.Assert(content.Headers.ContentLength == null, "We should not buffer content if length is available."); + + byte[] contentBytes = await content.ReadAsByteArrayAsync(cancellationToken); + Log($"Response does not have content length and is chunked or malformed. Content was buffered, total length - {contentBytes.Length}."); + + // comparing to buffering full content memory stream allocation is not such a big deal. + return new MemoryStream(contentBytes); } } }