Skip to content

Commit

Permalink
Fix IsSelfFilter not resolving properly when matched with `MockedCo…
Browse files Browse the repository at this point in the history
…nnectionPool` (#378)

* Clarify PoolBuilder and IsSelfFilter timing

* Use Synchronizer<T> instead of custom lock
  • Loading branch information
haga-rak authored Feb 4, 2025
1 parent 594f6a6 commit c930ba2
Show file tree
Hide file tree
Showing 11 changed files with 351 additions and 248 deletions.
2 changes: 1 addition & 1 deletion src/Fluxzy.Core/Clients/Dns/DnsOverHttpsResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public DnsOverHttpsResolver(string nameOrUrl, ProxyConfiguration? proxyConfigura
{
var key = new DnsCacheKey(type, hostName);

using var _ = await Synchronizer<DnsCacheKey>.Instance.LockAsync(key).ConfigureAwait(false);
using var _ = await Synchronizer<DnsCacheKey>.Shared.LockAsync(key).ConfigureAwait(false);

if (_cache.TryGetValue(key, out var cached))
{
Expand Down
20 changes: 4 additions & 16 deletions src/Fluxzy.Core/Clients/DnsUtility.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,17 @@ public DnsResolutionResult(IPEndPoint endPoint, DateTime dnsSolveStart, DateTime

public DateTime DnsSolveEnd { get; }
}

internal static class DnsUtility
{
public static async ValueTask<(DnsResolutionResult, MockedConnectionPool ?)>
public static async ValueTask<DnsResolutionResult>
ComputeDnsUpdateExchange(Exchange exchange,
ITimingProvider timingProvider, IDnsSolver dnsSolver,
ProxyRuntimeSetting? runtimeSetting)
{
var dnsSolveStart = timingProvider.Instant();
var connectHostName = exchange.Context.ProxyConfiguration?.Host ?? exchange.Authority.HostName;



var ipAddress = exchange.Context.RemoteHostIp ??
await dnsSolver.SolveDns(connectHostName).ConfigureAwait(false);

Expand All @@ -49,18 +48,7 @@ internal static class DnsUtility

var remoteEndPoint = new IPEndPoint(ipAddress, remotePort);

if (runtimeSetting != null) {

await runtimeSetting.EnforceRules(exchange.Context,
FilterScope.DnsSolveDone,
exchange.Connection, exchange).ConfigureAwait(false);

if (exchange.Context.PreMadeResponse != null)
return (new(remoteEndPoint, dnsSolveStart, dnsSolveEnd), new MockedConnectionPool(
exchange.Authority, exchange.Context.PreMadeResponse));
}

return (new(remoteEndPoint, dnsSolveStart, dnsSolveEnd), null);
return new(remoteEndPoint, dnsSolveStart, dnsSolveEnd);
}

public static async ValueTask<DnsResolutionResult>
Expand Down
54 changes: 27 additions & 27 deletions src/Fluxzy.Core/Clients/IHttpConnectionPool.cs
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
// Copyright 2021 - Haga Rakotoharivelo - https://github.com/haga-rak

using System;
using System.Threading;
using System.Threading.Tasks;
// Copyright 2021 - Haga Rakotoharivelo - https://github.com/haga-rak

using System;
using System.Threading;
using System.Threading.Tasks;
using Fluxzy.Core;
using Fluxzy.Misc.ResizableBuffers;

namespace Fluxzy.Clients
{
/// <summary>
/// Represents a connection pool to the same authority, using the same .
/// </summary>
public interface IHttpConnectionPool : IAsyncDisposable
{
Authority Authority { get; }

bool Complete { get; }

void Init();

ValueTask<bool> CheckAlive();

ValueTask Send(
Exchange exchange, ILocalLink localLink, RsBuffer buffer,
CancellationToken cancellationToken = default);
}
}
using Fluxzy.Misc.ResizableBuffers;

namespace Fluxzy.Clients
{
/// <summary>
/// Represents a connection pool to the same authority, using the same .
/// </summary>
public interface IHttpConnectionPool : IAsyncDisposable
{
Authority Authority { get; }

bool Complete { get; }

void Init();

ValueTask<bool> CheckAlive();

ValueTask Send(
Exchange exchange, ILocalLink localLink, RsBuffer buffer,
CancellationToken cancellationToken = default);
}
}
5 changes: 0 additions & 5 deletions src/Fluxzy.Core/Clients/IRemoteConnectionBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Security;
using System.Threading;
using System.Threading.Tasks;
using Fluxzy.Clients.Headers;
using Fluxzy.Clients.Ssl;
using Fluxzy.Core;
using Fluxzy.Misc.Streams;
using Fluxzy.Rules;

namespace Fluxzy.Clients
{
Expand Down Expand Up @@ -69,7 +66,6 @@ public async ValueTask<RemoteConnectionResult> OpenConnectionToRemote(
setting.ArchiveWriter != null!
? setting.ArchiveWriter.GetDumpfilePath(exchange.Connection.Id)!
: string.Empty);


var localEndpoint = await tcpConnection.ConnectAsync(
resolutionResult.EndPoint.Address,
Expand Down Expand Up @@ -145,7 +141,6 @@ await _sslConnectionBuilder.AuthenticateAsClient(
: RemoteConnectionResultType.Http11;

exchange.Connection.ReadStream = exchange.Connection.WriteStream = resultStream;


return new RemoteConnectionResult(protoType, exchange.Connection);
}
Expand Down
101 changes: 48 additions & 53 deletions src/Fluxzy.Core/Clients/PoolBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
using Fluxzy.Clients.Mock;
using Fluxzy.Core;
using Fluxzy.Misc;
using Fluxzy.Rules;
using Fluxzy.Utils;
using Fluxzy.Writers;

namespace Fluxzy.Clients
Expand All @@ -39,15 +41,15 @@ static PoolBuilder()

private readonly IDictionary<Authority, IHttpConnectionPool> _connectionPools =
new Dictionary<Authority, IHttpConnectionPool>();

private readonly ConcurrentDictionary<Authority, SemaphoreSlim> _lock = new();
private readonly CancellationTokenSource _poolCheckHaltSource = new();

private readonly RemoteConnectionBuilder _remoteConnectionBuilder;
private readonly ITimingProvider _timingProvider;

private readonly ConcurrentDictionary<string, DefaultDnsResolver> _dnsSolversCache = new();

private Synchronizer<Authority> _synchronizer = new(true);

public PoolBuilder(
RemoteConnectionBuilder remoteConnectionBuilder,
ITimingProvider timingProvider,
Expand Down Expand Up @@ -102,31 +104,29 @@ public async ValueTask<IHttpConnectionPool>
ProxyRuntimeSetting proxyRuntimeSetting,
CancellationToken cancellationToken = default)
{
// At this point, we'll trying the suitable pool for exchange

if (exchange.Context.PreMadeResponse != null) {
return new MockedConnectionPool(exchange.Authority,
exchange.Context.PreMadeResponse);
}

var dnsSolver = string.IsNullOrWhiteSpace(exchange.Context.DnsOverHttpsNameOrUrl) ?
_dnsSolver : _dnsSolversCache.GetOrAdd(exchange.Context.DnsOverHttpsNameOrUrl,
n => new DnsOverHttpsResolver(n, exchange.Context.DnsOverHttpsCapture ?
proxyRuntimeSetting.GetInternalProxyAuthentication() : null));
var dnsSolver = ResolveDnsProvider(exchange, proxyRuntimeSetting);

// We should solve DNS here
var computeDnsPromise =
DnsUtility.ComputeDnsUpdateExchange(exchange, _timingProvider, dnsSolver, proxyRuntimeSetting);

IHttpConnectionPool? result = null;
var dnsResolutionResult = await computeDnsPromise.ConfigureAwait(false);

var semaphorePerAuthority = _lock.GetOrAdd(exchange.Authority, auth => new SemaphoreSlim(1));
var released = false;
await proxyRuntimeSetting.EnforceRules(exchange.Context,
FilterScope.DnsSolveDone,
exchange.Connection, exchange).ConfigureAwait(false);

if (exchange.Context.PreMadeResponse != null) {
var mockedConnectionPool =
new MockedConnectionPool(exchange.Authority, exchange.Context.PreMadeResponse);
mockedConnectionPool.Init();
return mockedConnectionPool;
}

IHttpConnectionPool? result = null;

try
{
if (!semaphorePerAuthority.Wait(0))
await semaphorePerAuthority.WaitAsync(cancellationToken).ConfigureAwait(false);
using var _ = await _synchronizer.LockAsync(exchange.Authority);

var forceNewConnection = exchange.Context.ForceNewConnection;

Expand All @@ -136,48 +136,42 @@ public async ValueTask<IHttpConnectionPool>
// Looking for existing HttpPool

if (!forceNewConnection) {
lock (_connectionPools) {
while (_connectionPools.TryGetValue(exchange.Authority, out var pool)) {
lock (_connectionPools)
{
if (_connectionPools.TryGetValue(exchange.Authority, out var pool))
{
if (pool.Complete) {
_connectionPools.Remove(pool.Authority);

continue;
}
else {
if (exchange.Metrics.RetrievingPool == default)
exchange.Metrics.RetrievingPool = ITimingProvider.Default.Instant();

if (exchange.Metrics.RetrievingPool == default)
exchange.Metrics.RetrievingPool = ITimingProvider.Default.Instant();
exchange.Metrics.ReusingConnection = true;

exchange.Metrics.ReusingConnection = true;

return pool;
return pool;
}
}
}
}

if (exchange.Metrics.RetrievingPool == default)
exchange.Metrics.RetrievingPool = ITimingProvider.Default.Instant();

var dnsResolutionResult = await computeDnsPromise.ConfigureAwait(false);

if (dnsResolutionResult.Item2 != null)
{
dnsResolutionResult.Item2.Init();
return dnsResolutionResult.Item2;
}

// pool
if (exchange.Context.BlindMode) {
var tunneledConnectionPool = new TunnelOnlyConnectionPool(
exchange.Authority, _timingProvider,
_remoteConnectionBuilder, proxyRuntimeSetting, dnsResolutionResult.Item1);
_remoteConnectionBuilder, proxyRuntimeSetting, dnsResolutionResult);

return result = tunneledConnectionPool;
}

if (exchange.Request.Header.IsWebSocketRequest) {
var tunneledConnectionPool = new WebsocketConnectionPool(
exchange.Authority, _timingProvider,
_remoteConnectionBuilder, proxyRuntimeSetting, dnsResolutionResult.Item1);
_remoteConnectionBuilder, proxyRuntimeSetting, dnsResolutionResult);

return result = tunneledConnectionPool;
}
Expand All @@ -187,7 +181,7 @@ public async ValueTask<IHttpConnectionPool>

var http11ConnectionPool = new Http11ConnectionPool(exchange.Authority,
_remoteConnectionBuilder, _timingProvider, proxyRuntimeSetting,
_archiveWriter!, dnsResolutionResult.Item1);
_archiveWriter!, dnsResolutionResult);

exchange.HttpVersion = "HTTP/1.1";

Expand All @@ -209,7 +203,7 @@ public async ValueTask<IHttpConnectionPool>
{
openingResult =
(await _remoteConnectionBuilder.OpenConnectionToRemote(
exchange, dnsResolutionResult.Item1,
exchange, dnsResolutionResult,
exchange.Context.SslApplicationProtocols ?? AllProtocols, proxyRuntimeSetting,
exchange.Context.ProxyConfiguration,
cancellationToken).ConfigureAwait(false))!;
Expand All @@ -228,12 +222,11 @@ public async ValueTask<IHttpConnectionPool>
throw;
}

// exchange.Connection = openingResult.Connection;

if (openingResult.Type == RemoteConnectionResultType.Http11) {
var http11ConnectionPool = new Http11ConnectionPool(exchange.Authority,
_remoteConnectionBuilder, _timingProvider, proxyRuntimeSetting, _archiveWriter,
dnsResolutionResult.Item1);
dnsResolutionResult);

exchange.HttpVersion = exchange.Connection!.HttpVersion = "HTTP/1.1";

Expand Down Expand Up @@ -264,25 +257,27 @@ public async ValueTask<IHttpConnectionPool>
throw new NotSupportedException($"Unhandled protocol type {openingResult.Type}");
}
finally {
try {
if (result != null) {
released = true;
semaphorePerAuthority.Release();

if (result != null) {
try
{
result.Init();
}
}
catch {
if (result != null)
catch
{
OnConnectionFaulted(result);
}
finally {
if (!released)
semaphorePerAuthority.Release();
}
}
}
}

private IDnsSolver ResolveDnsProvider(Exchange exchange, ProxyRuntimeSetting proxyRuntimeSetting)
{
return string.IsNullOrWhiteSpace(exchange.Context.DnsOverHttpsNameOrUrl) ?
_dnsSolver : _dnsSolversCache.GetOrAdd(exchange.Context.DnsOverHttpsNameOrUrl,
n => new DnsOverHttpsResolver(n, exchange.Context.DnsOverHttpsCapture ?
proxyRuntimeSetting.GetInternalProxyAuthentication() : null));
}

private void OnConnectionFaulted(IHttpConnectionPool h2ConnectionPool)
{
lock (_connectionPools) {
Expand Down
Loading

0 comments on commit c930ba2

Please sign in to comment.