-
Notifications
You must be signed in to change notification settings - Fork 41
/
Copy pathclient_impl.dart
360 lines (307 loc) · 12 KB
/
client_impl.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
part of "../../client.dart";
class _ClientImpl implements Client {
// Configuration options
@override
late ConnectionSettings settings;
// Tuning settings
@override
TuningSettings get tuningSettings => settings.tuningSettings;
// The connection to the server
int _connectionAttempt = 0;
Socket? _socket;
// The list of open channels. Channel 0 is always reserved for signaling
final Map<int, _ChannelImpl> _channels = <int, _ChannelImpl>{};
// Connection status
Completer? _connected;
Completer? _clientClosed;
// Error Stream
final _error = StreamController<Exception>.broadcast();
// The heartbeattRecvTimer is reset every time we receive _any_ message from
// the server. If the timer expires, and a HeartbeatFailed exception will be
// raised.
//
// The timer is set to a multiple of the negotiated interval to reset the
// connection if we have not received any message from the server for a
// consecutive number of maxMissedHeartbeats (see tuningSettings).
RestartableTimer? _heartbeatRecvTimer;
_ClientImpl({ConnectionSettings? settings}) {
// Use defaults if no settings specified
this.settings = settings ?? ConnectionSettings();
}
/// Attempt to reconnect to the server. If the attempt fails, it will be retried after
/// [reconnectWaitTime] ms up to [maxConnectionAttempts] times. If all connection attempts
/// fail, then the [_connected] [Future] returned by a call to [open[ will also fail
Future _reconnect() {
_connected ??= Completer();
Future<Socket> fs;
if (settings.tlsContext != null) {
connectionLogger.info(
"Trying to connect to ${settings.host}:${settings.port} using TLS [attempt ${_connectionAttempt + 1}/${settings.maxConnectionAttempts}]");
fs = SecureSocket.connect(
settings.host,
settings.port,
timeout: settings.connectTimeout,
context: settings.tlsContext,
onBadCertificate: settings.onBadCertificate,
);
} else {
connectionLogger.info(
"Trying to connect to ${settings.host}:${settings.port} [attempt ${_connectionAttempt + 1}/${settings.maxConnectionAttempts}]");
fs = Socket.connect(settings.host, settings.port,
timeout: settings.connectTimeout);
}
fs.then((Socket s) {
_socket = s;
// Bind processors and initiate handshake
RawFrameParser(tuningSettings)
.transformer
.bind(_socket!)
.transform(AmqpMessageDecoder().transformer)
.listen(_handleMessage,
onError: _handleException,
onDone: () =>
_handleException(const SocketException("Socket closed")));
// Allocate channel 0 for handshaking and transmit the AMQP header to bootstrap the handshake
_channels.clear();
_channels.putIfAbsent(0, () => _ChannelImpl(0, this));
}).catchError((err, trace) {
// Connection attempt completed with an error (probably protocol mismatch)
if (_connected!.isCompleted) {
return;
}
if (++_connectionAttempt >= settings.maxConnectionAttempts) {
String errorMessage =
"Could not connect to ${settings.host}:${settings.port} after ${settings.maxConnectionAttempts} attempts. Giving up";
connectionLogger.severe(errorMessage);
_connected!.completeError(ConnectionFailedException(errorMessage));
// Clear _connected future so the client can invoke open() in the future
_connected = null;
} else {
// Retry after reconnectWaitTime ms
Timer(settings.reconnectWaitTime, _reconnect);
}
});
return _connected!.future;
}
/// Check if a connection is currently in handshake state
@override
bool get handshaking =>
_socket != null && _connected != null && !_connected!.isCompleted;
void _handleMessage(DecodedMessage serverMessage) {
try {
// If we are still handshaking and we receive a message on another channel this is an error
if (!_connected!.isCompleted && serverMessage.channel != 0) {
throw FatalException(
"Received message for channel ${serverMessage.channel} while still handshaking");
}
// Reset heartbeat timer if it has been initialized.
_heartbeatRecvTimer?.reset();
// Heartbeat frames should be received on channel 0
if (serverMessage is HeartbeatFrameImpl) {
if (serverMessage.channel != 0) {
throw ConnectionException(
"Received HEARTBEAT message on a channel > 0",
ErrorType.COMMAND_INVALID,
0,
0);
}
// No further processing required.
return;
}
// Connection-class messages should only be received on channel 0
if (serverMessage.message!.msgClassId == 10 &&
serverMessage.channel != 0) {
throw ConnectionException(
"Received CONNECTION class message on a channel > 0",
ErrorType.COMMAND_INVALID,
serverMessage.message!.msgClassId,
serverMessage.message!.msgMethodId);
}
// If we got a ConnectionOpen message from the server and a heartbeat
// period has been configured, start monitoring incoming heartbeats.
if (serverMessage.message is ConnectionOpenOk &&
tuningSettings.heartbeatPeriod.inSeconds > 0) {
// Raise an exception if we miss maxMissedHeartbeats consecutive
// heartbeats.
Duration missInterval =
tuningSettings.heartbeatPeriod * tuningSettings.maxMissedHeartbeats;
_heartbeatRecvTimer?.cancel();
_heartbeatRecvTimer = RestartableTimer(missInterval, () {
// Set the timer to null to avoid accidentally resetting it while
// shutting down.
_heartbeatRecvTimer = null;
_handleException(HeartbeatFailedException(
"Server did not respond to heartbeats for ${tuningSettings.heartbeatPeriod.inSeconds}s (missed consecutive heartbeats: ${tuningSettings.maxMissedHeartbeats})"));
});
}
// Fetch target channel and forward frame for processing
_ChannelImpl? target = _channels[serverMessage.channel];
if (target == null) {
// message on unknown channel; ignore
return;
}
// If we got a ConnectionClose message from the server, throw the appropriate exception
if (serverMessage.message is ConnectionClose) {
// Ack the closing of the connection
_channels[0]!.writeMessage(ConnectionCloseOk());
ConnectionClose serverResponse =
(serverMessage.message as ConnectionClose);
throw ConnectionException(
serverResponse.replyText ?? "Server closed the connection",
ErrorType.valueOf(serverResponse.replyCode),
serverResponse.msgClassId,
serverResponse.msgMethodId);
}
// Deliver to channel
target.handleMessage(serverMessage);
// If we got a ConnectionCloseOk message before a pending ChannelCloseOk message
// force the other channels to close
if (serverMessage.message is ConnectionCloseOk) {
_channels.values
.where((_ChannelImpl channel) =>
channel._channelClosed != null &&
!channel._channelClosed!.isCompleted)
.forEach((_ChannelImpl channel) =>
channel._completeOperation(serverMessage.message));
}
} catch (e) {
_handleException(e);
}
}
void _handleException(ex) {
// Ignore exceptions while shutting down
if (_clientClosed != null) {
return;
}
// If we are still handshaking, it could be that the server disconnected us
// due to a failed SASL auth attempt. In this case we should trigger a connection
// exception
if (ex is SocketException) {
// Wrap the exception
if (handshaking &&
_channels.containsKey(0) &&
(_channels[0]!._lastHandshakeMessage is ConnectionStartOk ||
_channels[0]!._lastHandshakeMessage is ConnectionSecureOk)) {
ex = FatalException("Authentication failed");
} else {
ex = FatalException("Lost connection to the server");
}
}
connectionLogger.severe(ex);
// If we are still handshaking, abort the connection; flush the channels and shut down
if (handshaking) {
_channels.clear();
_connected!.completeError(ex);
_close();
return;
}
if (_error.hasListener && !_error.isClosed) {
_error.add(ex);
}
switch (ex.runtimeType) {
case HeartbeatFailedException:
case FatalException:
case ConnectionException:
// Forward to all channels and then shutdown
_channels.values
.toList()
.reversed
.forEach((_ChannelImpl channel) => channel.handleException(ex));
_close();
break;
case ChannelException:
// Forward to the appropriate channel and remove it from our list
_ChannelImpl? target = _channels[ex.channel];
if (target != null) {
target.handleException(ex);
_channels.remove(ex.channel);
}
break;
}
}
/// Open a working connection to the server using [config.cqlVersion] and optionally select
/// keyspace [defaultKeyspace]. Returns a [Future] to be completed on a successful protocol handshake
@override
Future connect() {
// Prevent multiple connection attempts
if (_connected != null) {
return _connected!.future;
}
_connectionAttempt = 0;
return _reconnect();
}
/// Shutdown any open channels and disconnect the socket. Return a [Future] to be completed
/// when the client has shut down
@override
Future close() {
return _close(closeErrorStream: true);
}
Future _close({bool closeErrorStream = false}) {
_heartbeatRecvTimer?.cancel();
_heartbeatRecvTimer = null;
if (_socket == null) {
return Future.value();
}
// Already shutting down
if (_clientClosed != null) {
return _clientClosed!.future;
}
// Close all channels in reverse order so we send a connection close message when we close channel 0
_clientClosed = Completer();
Future.wait(_channels.values
.toList()
.reversed
.map((_ChannelImpl channel) => channel.close()))
.then((_) => _socket!.flush())
.then((_) => _socket!.close(), onError: (e) {
// Mute exception as the socket may be already closed
}).whenComplete(() {
_socket!.destroy();
_socket = null;
_connected = null;
if (closeErrorStream) {
_error.close();
}
_clientClosed!.complete();
_clientClosed = null;
});
return _clientClosed!.future;
}
@override
Future<Channel> channel() {
return connect().then((_) {
// Check if we have exceeded our channel limit (open channels excluding channel 0)
if (tuningSettings.maxChannels > 0 &&
_channels.length - 1 >= tuningSettings.maxChannels) {
return Future.error(StateError(
"Cannot allocate channel; channel limit exceeded (max ${tuningSettings.maxChannels})"));
}
// Find next available channel
_ChannelImpl? userChannel;
int nextChannelId = 0;
while (nextChannelId < 65536) {
if (!_channels.containsKey(++nextChannelId)) {
// Found empty slot
userChannel = _ChannelImpl(nextChannelId, this);
_channels[nextChannelId] = userChannel;
break;
}
}
// Run out of slots?
if (userChannel == null) {
return Future.error(StateError(
"Cannot allocate channel; all channels are currently in use"));
}
return userChannel._channelOpened.future;
});
}
@override
StreamSubscription<Exception> errorListener(
void Function(Exception error) onData,
{Function? onError,
void Function()? onDone,
bool cancelOnError = false}) =>
_error.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
_ChannelImpl _removeChannel(int channelId) => _channels.remove(channelId)!;
}