Skip to content

Commit

Permalink
Merge pull request #111 from achilleasa/track-missed-heartbeats
Browse files Browse the repository at this point in the history
Raise HeartbeatFailedException after missing maxMissedHeartbeats (new tuning param) consecutive heartbeats
  • Loading branch information
achilleasa authored Jan 28, 2024
2 parents 6cdc538 + 9e52ca9 commit 019d59a
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 18 deletions.
11 changes: 6 additions & 5 deletions API.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ you don't have to deal with tuning at the application level. The client will use

The class exposes the following parameters:

| Param name | Default value | Description
|-----------------|------------------|----------------------
| maxChannels | 0 (no limit) | The maximum number of channels that can be opened by the client. When set to zero, the maximum number of channels is 65535.
| maxFrameSize | 4096 bytes | The maximum frame size that can be parsed by the client. According to the spec, this is set to a high-enough initial value so that the client can parse the messages exchanged during the handshake. The client will negotiate with the server during the handshake phase and adjust this value upwards.
| heartbeatPeriod | 0 sec | The preferred heartbeat period (or `Duration.zero` to disable heartbeats) expressed as a [Duration](https://api.dartlang.org/apidocs/channels/stable/dartdoc-viewer/dart:core.Duration) object.
| Param name | Default value | Description
|---------------------|---------------|
| maxChannels | 0 (no limit) | The maximum number of channels that can be opened by the client. When set to zero, the maximum number of channels is 65535.
| maxFrameSize | 4096 bytes | The maximum frame size that can be parsed by the client. According to the spec, this is set to a high-enough initial value so that the client can parse the messages exchanged during the handshake. The client will negotiate with the server during the handshake phase and adjust this value upwards.
| heartbeatPeriod | 0 sec | The preferred heartbeat period (or `Duration.zero` to disable heartbeats) expressed as a [Duration](https://api.dartlang.org/apidocs/channels/stable/dartdoc-viewer/dart:core.Duration) object.
| maxMissedHeartbeats | 3 | If heartbeats are enabled, raise an exception if `maxMissedHeartbeats` consecutive heartbeats have been sent by the client without receiving any response from the broker.


### Creating a new client
Expand Down
16 changes: 12 additions & 4 deletions lib/src/client/impl/client_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ class _ClientImpl implements Client {
final _error = StreamController<Exception>.broadcast();

// The heartbeattRecvTimer is reset every time we receive _any_ message from
// the server. If the timer expires, a HeartbeatFailed exception will be raised.
// the server. If the timer expires, and a HeartbeatFailed exception will be
// raised.
//
// The timer is set to a multiple of the negotiated interval to reset the
// connection if we have not received any message from the server for a
// consecutive number of maxMissedHeartbeats (see tuningSettings).
RestartableTimer? _heartbeatRecvTimer;

_ClientImpl({ConnectionSettings? settings}) {
Expand Down Expand Up @@ -138,14 +143,17 @@ class _ClientImpl implements Client {
// period has been configured, start monitoring incoming heartbeats.
if (serverMessage.message is ConnectionOpenOk &&
tuningSettings.heartbeatPeriod.inSeconds > 0) {
// Raise an exception if we miss maxMissedHeartbeats consecutive
// heartbeats.
Duration missInterval =
tuningSettings.heartbeatPeriod * tuningSettings.maxMissedHeartbeats;
_heartbeatRecvTimer?.cancel();
_heartbeatRecvTimer =
RestartableTimer(tuningSettings.heartbeatPeriod, () {
_heartbeatRecvTimer = RestartableTimer(missInterval, () {
// Set the timer to null to avoid accidentally resetting it while
// shutting down.
_heartbeatRecvTimer = null;
_handleException(HeartbeatFailedException(
"Server did not respond to heartbeats for ${tuningSettings.heartbeatPeriod.inSeconds}s"));
"Server did not respond to heartbeats for ${tuningSettings.heartbeatPeriod.inSeconds}s (missed consecutive heartbeats: ${tuningSettings.maxMissedHeartbeats})"));
});
}

Expand Down
8 changes: 5 additions & 3 deletions lib/src/protocol/io/tuning_settings.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ class TuningSettings {
// min(client hb period, server hb period). In other words, clients may force
// a lower heartbeat period but they are never allowed to increase it beyond
// the value suggested by the remote server.
//
Duration heartbeatPeriod = Duration.zero;

// When a non-zero heartbeat period is negotiated with the remote server, a
// [HeartbeatFailedException] will be raised if the server does not respond
// to heartbeat requests within the configured heartbeat period.
Duration heartbeatPeriod = Duration.zero;
// to [maxMissedHeartbeats] consecutive heartbeat requests.
int maxMissedHeartbeats = 3;

TuningSettings({
this.maxChannels = 0,
this.maxFrameSize = 4096,
this.heartbeatPeriod = Duration.zero,
this.maxMissedHeartbeats = 3,
});
}
4 changes: 2 additions & 2 deletions test/lib/auth_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ main({bool enableLogger = true}) {
tuningSettings = TuningSettings();
frameWriter = FrameWriter(tuningSettings);
server = mock.MockServer();
client = Client(settings: ConnectionSettings(port: 9000));
return server.listen('127.0.0.1', 9000);
client = Client(settings: ConnectionSettings(port: 9001));
return server.listen('127.0.0.1', 9001);
});

tearDown(() async {
Expand Down
9 changes: 6 additions & 3 deletions test/lib/client_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,15 @@ main({bool enableLogger = true}) {
expect(client.tuningSettings.heartbeatPeriod,
equals(const Duration(seconds: 1)));

// Perform a blocking call until the heartbeat timer expires.
// Perform a blocking call until we miss
// tuningSettings.maxMissedHeartbeats consecutive heartbeats.
await client.channel();
} catch (e) {
expect(e, const TypeMatcher<HeartbeatFailedException>());
expect((e as HeartbeatFailedException).message,
equals("Server did not respond to heartbeats for 1s"));
expect(
(e as HeartbeatFailedException).message,
equals(
"Server did not respond to heartbeats for 1s (missed consecutive heartbeats: 3)"));

// Encode final connection close
frameWriter.writeMessage(0, mock.ConnectionCloseOkMock());
Expand Down
2 changes: 1 addition & 1 deletion test/lib/mocks/server_mocks.dart
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class MockServer {
Future listen(String host, int port) async {
mockLogger.info("Binding MockServer to $host:$port");

_server = await ServerSocket.bind(host, port);
_server = await ServerSocket.bind(host, port, shared: true);
mockLogger.info("[$host:$port] Listening for incoming connections");
_server!.listen(_handleConnection);
}
Expand Down

0 comments on commit 019d59a

Please sign in to comment.