Skip to content

Commit

Permalink
Rewrite failtinjector to use ILogger and built-in logging
Browse files Browse the repository at this point in the history
  • Loading branch information
lmolkova committed Dec 19, 2023
1 parent 0ea9091 commit 0f7805f
Show file tree
Hide file tree
Showing 6 changed files with 340 additions and 309 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@

<ItemGroup>
<PackageReference Include="CommandLineParser" Version="2.8.0" />
<PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.7.0" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
using System.Threading.Tasks;
using System.Threading;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using System;
using Microsoft.AspNetCore.Connections.Features;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Primitives;
using System.Collections.Generic;
using System.Net.Http;
using System.Net.Sockets;
using System.Reflection;
using System.Linq;

namespace Azure.Sdk.Tools.HttpFaultInjector
{
[Route("{**catchAll}")]
[ApiController]
public partial class FaultInjectingController : ControllerBase
{
private readonly ILogger<FaultInjectingController> _logger;
private readonly HttpClient _httpClient;
public FaultInjectingController(IHttpClientFactory httpClientFactory, ILogger<FaultInjectingController> logger)
{
this._logger = logger;
this._httpClient = httpClientFactory.CreateClient("upstream");
}

[HttpGet]
[HttpPut]
[HttpPost]
[HttpDelete]
[HttpOptions]
[HttpPatch]
[HttpHead]
public async Task Proxy(
[FromHeader(Name = Utils.ResponseSelectionHeader)] string faultHeaderValue,
[FromHeader(Name = Utils.UpstreamBaseUriHeader)] string upstreamBaseUri,
CancellationToken cancellationToken)
{
if (!ValidateOrReadFaultMode(faultHeaderValue, out var fault))
{
Response.StatusCode = 400;
return;
}

await ProxyResponse(upstreamBaseUri, fault, cancellationToken);
}


private async Task<UpstreamResponse> SendUpstreamRequest(string uri)
{
var incomingUriBuilder = new UriBuilder()
{
Scheme = Request.Scheme,
Host = Request.Host.Host,
Path = Request.Path.Value,
Query = Request.QueryString.Value,
};
if (Request.Host.Port.HasValue)
{
incomingUriBuilder.Port = Request.Host.Port.Value;
}
var incomingUri = incomingUriBuilder.Uri;

var upstreamUriBuilder = new UriBuilder(uri)
{
Path = Request.Path.Value,
Query = Request.QueryString.Value,
};

var upstreamUri = upstreamUriBuilder.Uri;

using (var upstreamRequest = new HttpRequestMessage(new HttpMethod(Request.Method), upstreamUri))
{
if (Request.ContentLength > 0)
{
upstreamRequest.Content = new StreamContent(Request.Body);
foreach (var header in Request.Headers.Where(h => Utils.ContentRequestHeaders.Contains(h.Key)))
{
upstreamRequest.Content.Headers.Add(header.Key, values: header.Value);
}
}

foreach (var header in Request.Headers.Where(h => !Utils.ExcludedRequestHeaders.Contains(h.Key) && !Utils.ContentRequestHeaders.Contains(h.Key)))
{
if (!upstreamRequest.Headers.TryAddWithoutValidation(header.Key, values: header.Value))
{
throw new InvalidOperationException($"Could not add header {header.Key} with value {header.Value}");
}
}

using (var upstreamResponseMessage = await _httpClient.SendAsync(upstreamRequest))
{
var headers = new List<KeyValuePair<string, IEnumerable<string>>>();
// Must skip "Transfer-Encoding" header, since if it's set manually Kestrel requires you to implement
// your own chunking.
headers.AddRange(upstreamResponseMessage.Headers.Where(header => !string.Equals(header.Key, "Transfer-Encoding", StringComparison.OrdinalIgnoreCase)));
headers.AddRange(upstreamResponseMessage.Content.Headers);

var upstreamResponse = new UpstreamResponse()
{
StatusCode = (int)upstreamResponseMessage.StatusCode,
Headers = headers.Select(h => new KeyValuePair<string, StringValues>(h.Key, h.Value.ToArray())),
Content = await upstreamResponseMessage.Content.ReadAsByteArrayAsync()
};

_logger.LogInformation("Finished reading response body ({length})", upstreamResponse.Content.Length);

return upstreamResponse;
}
}
}

private async Task ProxyResponse(string upstreamUri, string fault, CancellationToken cancellationToken)
{
UpstreamResponse upstreamResponse = await SendUpstreamRequest(upstreamUri);
switch (fault)
{
case "f":
// Full response
await SendDownstreamResponse(upstreamResponse, upstreamResponse.Content.Length);
return;
case "p":
// Partial Response (full headers, 50% of body), then wait indefinitely
await SendDownstreamResponse(upstreamResponse, upstreamResponse.Content.Length / 2);
await Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken);
return;
case "pc":
// Partial Response (full headers, 50% of body), then close (TCP FIN)
await SendDownstreamResponse(upstreamResponse, upstreamResponse.Content.Length / 2);
Close();
return;
case "pa":
// Partial Response (full headers, 50% of body), then abort (TCP RST)
await SendDownstreamResponse(upstreamResponse, upstreamResponse.Content.Length / 2);
Abort();
return;
case "pn":
// Partial Response (full headers, 50% of body), then finish normally
await SendDownstreamResponse(upstreamResponse, upstreamResponse.Content.Length / 2);
return;
case "n":
// No response, then wait indefinitely
await Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken);
return;
case "nc":
// No response, then close (TCP FIN)
Close();
return;
case "na":
// No response, then abort (TCP RST)
Abort();
return;
default:
// can't really happen since we validated options before calling into this method.
throw new ArgumentException($"Invalid fault mode: {fault}", nameof(fault));
}
}

private async Task SendDownstreamResponse(UpstreamResponse upstreamResponse, int contentBytes)
{
Response.StatusCode = upstreamResponse.StatusCode;
foreach (var header in upstreamResponse.Headers)
{
Response.Headers.Add(header.Key, header.Value);
}

_logger.LogInformation("Started writing response body, {actualLength}", contentBytes);
try
{
await Response.Body.WriteAsync(upstreamResponse.Content, 0, contentBytes);
}
catch (Exception ex)
{
_logger.LogError(ex, "Can't write response body");
}
_logger.LogInformation("Finished writing response body");
}

// Close the TCP connection by sending FIN
private void Close()
{
HttpContext.Abort();
}

// Abort the TCP connection by sending RST
private void Abort()
{
// SocketConnection registered "this" as the IConnectionIdFeature among other things.
var socketConnection = HttpContext.Features.Get<IConnectionIdFeature>();
var socket = (Socket)socketConnection.GetType().GetField("_socket", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(socketConnection);
socket.LingerState = new LingerOption(true, 0);
socket.Dispose();
}

private bool ValidateOrReadFaultMode(string headerValue, out string fault)
{
fault = headerValue ?? Utils.ReadSelectionFromConsole();
if (!Utils.FaultModes.TryGetValue(fault, out var description))
{
_logger.LogError("Unknown {ResponseSelectionHeader} value - {fault}.", Utils.ResponseSelectionHeader, fault);
return false;
}

_logger.LogInformation("Using response option '{description}' from header value.", description);
return true;
}
}
}
Loading

0 comments on commit 0f7805f

Please sign in to comment.