Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

amqp connection lost, and recreate multiples consumers in same queue #87

Open
fabios1010 opened this issue Nov 16, 2022 · 1 comment
Open

Comments

@fabios1010
Copy link

fabios1010 commented Nov 16, 2022

I have been used this lib, but after a while the connection auto close and when start it again it connect with another consumer in queue,
I can publish in another queues but it stop consuming. in order to finger out what is happen the solution is complete delete the queue and restart it again. I read in pub.dev that
the driver does not currently support recovering client topologies when re-establishing connections. This feature may be implemented in a future version.

is that what is happening ?
how can I try implement this an check is there is an stable connection or if the queue has consumers?
how can I persist the consumer to avoid this?
thanks

class Amqp {

  late bool isRunning;
  late Client client;
  late SoundsService soundsService;

  Amqp() {

    isRunning = false;
    ConnectionSettings settings = ConnectionSettings(
      virtualHost:"/",
      host: "rabbitmqhost",
      port: rabbitmqport,
      authProvider: const PlainAuthenticator(
          "rabbitmqusers",
          "rabbitmqpass"
      ),
    );
    client        = Client(settings: settings);
    soundsService = SoundsService();
  }

  getClient() {
    return client;
  }

  start() async {
    print("AMQP::starting");

    //consume();

    client.connect().then( (defaultChannel) => {
      consume()
    }).catchError((onError) => {
      print("ERROR: $onError"),
    });

    return true;
  }

  close() async {
    isRunning = false;
    print("isRunning: $isRunning");
    client.channel().then((channel) => {
      print("amqp::channel::closed"),
      channel.queue(CurrentInfo.queue, durable: true).then((Queue queue) => {
        queue.delete().then((_) => {
          print("amqp::queue::closed"),
          client.close().then((_) => {
            print("amqp::client::closed"),
          })
        })
      }),
      channel.close()
    });
    client.close();
    return true;
  }

  void deleteQueue() {
    client.channel().then((channel) => {
      channel.queue(CurrentInfo.queue, durable: true).then((Queue queue) => {
        queue.delete().then((_) => {
          print("FORCE::amqp::queue::deleted"),
        })
      }),
    });
  }

  /// *************************************
  /// consumer
  consume() async {

    isRunning = true;

    print("AMQP::connected::starting-consume");
    print("isRunning: $isRunning");

    Channel channel   = await client.channel();

    Exchange exchange = await channel.exchange("amq.direct", ExchangeType.DIRECT, durable: true);

    Queue queue       = await channel.queue(CurrentInfo.queue, durable: true);
    queue.bind(exchange, CurrentInfo.queue);

    Consumer consumer = await queue.consume();
    consumer.listen( (AmqpMessage message) {

      var payload = Payload.fromJson(json.decode(message.payloadAsString));
      if ( payload.action == "notification" ) {
        print("AMQP::consume::action::notification");
        print(payload.toJson());
      }

      if ( payload.action == 'answer-order' ) {
        print("AMQP::consume::action::answer-order");
        print(payload.toJson());

        Vibration.vibrate(duration: 10000, intensities: [100, 200, 100, 200, 100, 200, 100, 200, 100, 200]);
        showActivityBottomSheet(AnswerOrderDetails.fromJson(payload.data));

       });
      }

      /**
          print(" [x] Received:: payloadString :: ${message.payloadAsString}");
          print(" [x] Received:: payloadJson   :: ${message.payloadAsJson}");
          print(" [x] Received:: routingKey    :: ${message.routingKey}");
       */

    });
  }


  /// *************************************
  /// publisher functions
  ///
  /// users/track
  publishUpdateLocation(double lat,double lng, String usersId,  String status) async {
    Channel channel = await client.channel();
    Exchange exchange = await channel.exchange("amq.direct", ExchangeType.DIRECT, durable: true);
    var position = {
      'usersId': usersId,
      "lat": lat,
      "lng": lng,
      "status": status
    };
    print("POSITION::DATA:: $position");
    exchange.publish(json.encode(position), 'position-listener');
    
  }

  /// users/track/orders
  publishUpdateLocationTrackOrder(double lat,double lng, String usersId, String ordersId, String status) async {
    Channel channel = await client.channel();
    Exchange exchange = await channel.exchange("amq.direct", ExchangeType.DIRECT, durable: true);
    var position = {
      'usersId':  usersId,
      'ordersId': ordersId,
      "lat": lat,
      "lng": lng,
      "status": status
    };
    exchange.publish(json.encode(position), 'position-orders-listener');
   
  }

  /// answer-order
  publishAnswerOrder(String answer, String usersId, String ordersId) async {
    Channel channel = await client.channel();
    Exchange exchange = await channel.exchange("amq.direct", ExchangeType.DIRECT, durable: true);
    var data = {
      'usersId':  usersId,
      "ordersId": ordersId,
      "answer":   answer
    };
    exchange.publish(json.encode(data), 'answer-orders-listener');
    print("ANSWER::ORDER:: $data");
   
  }

  ///
  publishInteractOrder(String type, String usersId, String ordersId) async {
    Channel channel = await client.channel();
    Exchange exchange = await channel.exchange("amq.direct", ExchangeType.DIRECT, durable: true);
    var data = {
      'usersId'  : usersId,
      "ordersId" : ordersId,
      "type"     : type // collected, delivered,
    };
    exchange.publish(json.encode(data), 'interact-orders-listener');
    print("INTERACT::ORDER:: $data");
   
  }



}
@achilleasa
Copy link
Owner

This behavior is intentional as per the comment about not re-establishing topologies after re-connecting.

The client will surface connection lost errors via a broadcast stream that you can subscribe to.
The recommended approach is to specify an error listener and check for connection-lost errors:

client.errorListener((ex) => handleError(ex));

void handleError(Exception ex) {
  // Check if ex is a ConnectionException or it's message contains "Lost connection to server"
  // and recreate topology.
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants