-
Notifications
You must be signed in to change notification settings - Fork 10.1k
/
HttpConnectionManager.cs
222 lines (194 loc) · 8.61 KB
/
HttpConnectionManager.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.IO.Pipelines;
using System.Net.WebSockets;
using System.Security.Cryptography;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Internal;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Internal;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Microsoft.AspNetCore.Http.Connections.Internal
{
internal partial class HttpConnectionManager
{
// TODO: Consider making this configurable? At least for testing?
private static readonly TimeSpan _heartbeatTickRate = TimeSpan.FromSeconds(1);
private readonly ConcurrentDictionary<string, (HttpConnectionContext Connection, ValueStopwatch Timer)> _connections =
new ConcurrentDictionary<string, (HttpConnectionContext Connection, ValueStopwatch Timer)>(StringComparer.Ordinal);
private readonly TimerAwaitable _nextHeartbeat;
private readonly ILogger<HttpConnectionManager> _logger;
private readonly ILogger<HttpConnectionContext> _connectionLogger;
private readonly TimeSpan _disconnectTimeout;
public HttpConnectionManager(ILoggerFactory loggerFactory, IHostApplicationLifetime appLifetime, IOptions<ConnectionOptions> connectionOptions)
{
_logger = loggerFactory.CreateLogger<HttpConnectionManager>();
_connectionLogger = loggerFactory.CreateLogger<HttpConnectionContext>();
_nextHeartbeat = new TimerAwaitable(_heartbeatTickRate, _heartbeatTickRate);
_disconnectTimeout = connectionOptions.Value.DisconnectTimeout ?? ConnectionOptionsSetup.DefaultDisconectTimeout;
// Register these last as the callbacks could run immediately
appLifetime.ApplicationStarted.Register(() => Start());
appLifetime.ApplicationStopping.Register(() => CloseConnections());
}
public void Start()
{
_nextHeartbeat.Start();
// Start the timer loop
_ = ExecuteTimerLoop();
}
internal bool TryGetConnection(string id, [NotNullWhen(true)] out HttpConnectionContext? connection)
{
connection = null;
if (_connections.TryGetValue(id, out var pair))
{
connection = pair.Connection;
return true;
}
return false;
}
internal HttpConnectionContext CreateConnection()
{
return CreateConnection(PipeOptions.Default, PipeOptions.Default);
}
/// <summary>
/// Creates a connection without Pipes setup to allow saving allocations until Pipes are needed.
/// </summary>
/// <returns></returns>
internal HttpConnectionContext CreateConnection(PipeOptions transportPipeOptions, PipeOptions appPipeOptions, int negotiateVersion = 0)
{
string connectionToken;
var id = MakeNewConnectionId();
if (negotiateVersion > 0)
{
connectionToken = MakeNewConnectionId();
}
else
{
connectionToken = id;
}
Log.CreatedNewConnection(_logger, id);
var connectionTimer = HttpConnectionsEventSource.Log.ConnectionStart(id);
var connection = new HttpConnectionContext(id, connectionToken, _connectionLogger);
var pair = DuplexPipe.CreateConnectionPair(transportPipeOptions, appPipeOptions);
connection.Transport = pair.Application;
connection.Application = pair.Transport;
_connections.TryAdd(connectionToken, (connection, connectionTimer));
return connection;
}
public void RemoveConnection(string id)
{
if (_connections.TryRemove(id, out var pair))
{
// Remove the connection completely
HttpConnectionsEventSource.Log.ConnectionStop(id, pair.Timer);
Log.RemovedConnection(_logger, id);
}
}
private static string MakeNewConnectionId()
{
// 128 bit buffer / 8 bits per byte = 16 bytes
Span<byte> buffer = stackalloc byte[16];
// Generate the id with RNGCrypto because we want a cryptographically random id, which GUID is not
RandomNumberGenerator.Fill(buffer);
return WebEncoders.Base64UrlEncode(buffer);
}
private async Task ExecuteTimerLoop()
{
Log.HeartBeatStarted(_logger);
// Dispose the timer when all the code consuming callbacks has completed
using (_nextHeartbeat)
{
// The TimerAwaitable will return true until Stop is called
while (await _nextHeartbeat)
{
try
{
Scan();
}
catch (Exception ex)
{
Log.ScanningConnectionsFailed(_logger, ex);
}
}
}
Log.HeartBeatEnded(_logger);
}
public void Scan()
{
// Scan the registered connections looking for ones that have timed out
foreach (var c in _connections)
{
var connection = c.Value.Connection;
// Capture the connection state
var lastSeenUtc = connection.LastSeenUtcIfInactive;
var utcNow = DateTimeOffset.UtcNow;
// Once the decision has been made to dispose we don't check the status again
// But don't clean up connections while the debugger is attached.
if (!Debugger.IsAttached && lastSeenUtc.HasValue && (utcNow - lastSeenUtc.Value).TotalSeconds > _disconnectTimeout.TotalSeconds)
{
Log.ConnectionTimedOut(_logger, connection.ConnectionId);
HttpConnectionsEventSource.Log.ConnectionTimedOut(connection.ConnectionId);
// This is most likely a long polling connection. The transport here ends because
// a poll completed and has been inactive for > 5 seconds so we wait for the
// application to finish gracefully
_ = DisposeAndRemoveAsync(connection, closeGracefully: true);
}
else
{
if (!Debugger.IsAttached)
{
connection.TryCancelSend(utcNow.Ticks);
}
// Tick the heartbeat, if the connection is still active
connection.TickHeartbeat();
}
}
}
public void CloseConnections()
{
// Stop firing the timer
_nextHeartbeat.Stop();
var tasks = new List<Task>(_connections.Count);
// REVIEW: In the future we can consider a hybrid where we first try to wait for shutdown
// for a certain time frame then after some grace period we shutdown more aggressively
foreach (var c in _connections)
{
// We're shutting down so don't wait for closing the application
tasks.Add(DisposeAndRemoveAsync(c.Value.Connection, closeGracefully: false));
}
Task.WaitAll(tasks.ToArray(), TimeSpan.FromSeconds(5));
}
internal async Task DisposeAndRemoveAsync(HttpConnectionContext connection, bool closeGracefully)
{
try
{
await connection.DisposeAsync(closeGracefully);
}
catch (IOException ex)
{
Log.ConnectionReset(_logger, connection.ConnectionId, ex);
}
catch (WebSocketException ex) when (ex.InnerException is IOException)
{
Log.ConnectionReset(_logger, connection.ConnectionId, ex);
}
catch (Exception ex)
{
Log.FailedDispose(_logger, connection.ConnectionId, ex);
}
finally
{
// Remove it from the list after disposal so that's it's easy to see
// connections that might be in a hung state via the connections list
RemoveConnection(connection.ConnectionToken);
}
}
}
}