Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

errorListener and heartbeat do not work if the connection was closed by server and reestablished again #107

Closed
Boris-Plato opened this issue Dec 1, 2023 · 2 comments · Fixed by #112

Comments

@Boris-Plato
Copy link

Hi! I'm trying to improve our reconnect on rabbit errors (right now, we just shut everything down and restart). I saw in another issue that for network issues, you recommended using the .connect() and recreating the topology. This approach sounds good to me, and it works for channelExeptions, but making my tests I noticed that errorListener does not really work when the connection was closed by the server, and I established a new connection using the same Client object. Here is a code example:

import 'package:dart_amqp/dart_amqp.dart';

var client = Client(settings: ConnectionSettings()
  ..host = "localhost"
  ..authProvider = PlainAuthenticator("guest", "guest")
  ..port = 35672
  ..tuningSettings = TuningSettings(
    heartbeatPeriod: const Duration(seconds: 30),
  ));

void main(List<String> arguments) async {
  client.connect();
  client.errorListener((e) {print("client error: $e");reconnect();}, onError: (e) => print("client onError: $e"), onDone: () => print("client onDone"));
  print("client connected");
  await Future.delayed(Duration(seconds: 360));
}

Future<void> reconnect() async {
  await Future.delayed(Duration(microseconds: 500));
  client.connect();
  client.errorListener((e) {print("client error: $e");reconnect();}, onError: (e) => print("client onError: $e"), onDone: () => print("client onDone"));
  print("client reconnected");
}

I used this in pubspec to get the latest commit of the client:

  dart_amqp:
    git: "https://github.com/achilleasa/dart_amqp"

Running this script, I get the connection in rabbit. Then I force close it in the management console, and a new connection appears soon. But if I try to force close it again - I do not get ConnectionException in errorListener, so the reconnect does not work the second time. It looks like that errorListener does not work after the connection error even while the new connection was created, and I can open a channel, send/receive messages (I did not add this part to the code example). The heartbeat also seems not working in this scenario because the new established connection disappears after 30 seconds and stays long if I remove the heartbeat timeout (and it is set to 0 by default).

Here is the console output (including second time forced closed connection wich does not add new logs):

client connected
client error: ConnectionException(CONNECTION_FORCED): CONNECTION_FORCED - Closed via management plugin
client onDone
client reconnected
client onDone

I was thinking a lot about is it correct to use the same client after the hardly closed connection and connectionException was caught. But still decided to create this ticket since I need to introduce a graceful reconnect without the full shutdown/restart, and I think "Create a new Client object on every reconnect" does not sound like a good approach to me.

achilleasa added a commit that referenced this issue Jan 2, 2024
This change addresses one of the issues described in
#107.

Prior to this change, the error broadcast stream was always closed when
the connection was closed either by the client or the server. This
prevented clients that implemented a reconnection policy from being able
to register error listeners.

This commit modifies the client behavior so that the error broadcast
channel is only closed if the client invokes the close() method on a
client instance.
@achilleasa
Copy link
Owner

Thanks for reporting this! After digging into this it turns out that it was caused by two bugs in the reconnection-handling logic. I proposed #112 to fix them.

Note that after the fix lands, the error stream will not be closed when the connection is lost but only if the client's close() method is invoked. The provided example needs to be adjusted so that reconnect() does not register a new error handler every time. Otherwise the example code would yield the following output (note that the reconnection attempts are de-dupped internally but each registered stream handler will be invoked regardless):

client connected
client error: ConnectionException(CONNECTION_FORCED): CONNECTION_FORCED - Closed via management plugin 1
client reconnected
client error: ConnectionException(CONNECTION_FORCED): CONNECTION_FORCED - Closed via management plugin 2
client error: ConnectionException(CONNECTION_FORCED): CONNECTION_FORCED - Closed via management plugin 2
client reconnected
client reconnected

@Boris-Plato
Copy link
Author

Hi @achilleasa, thanks a lot for this fix! I had no time for testing it during my vacation, but now I'm back and can confirm that it works as expected. I only need to call client.connect() on the reconnect attempt (not adding errorListener again). I also do not see client onDone errors anymore, which also makes logs a bit more clear. I already prepared the "reconnecting client" wrapper which creates a new client object on every reconnect, but I can easily tune it to use a single client object with the latest drivers changes. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants