Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sendToQueue callback seems to be not called with ConfirmChannel #103

Closed
ScOut3R opened this issue Oct 15, 2014 · 8 comments
Closed

sendToQueue callback seems to be not called with ConfirmChannel #103

ScOut3R opened this issue Oct 15, 2014 · 8 comments

Comments

@ScOut3R
Copy link

ScOut3R commented Oct 15, 2014

@squaremo here's my full example as you've requested:

amqp.connect(config.rabbit.uri).then( function (conn) {
    return when(conn.createConfirmChannel().then( function (ch) {
      var q = 'fetch_friends';
      var msg = JSON.stringify({ uid: uid, oauth_token: oauth_token });

      var ok = ch.assertQueue(q, { durable: true });

      return ok.then( function (_qok) {
        ch.sendToQueue(q, new Buffer(msg), { persistent: true }, function (error, ok) {
          if (error) {
            console.log('Message dropped!');
          } else {
            console.log('Message OK');
          }
        });
        return ch.close();
      });
    })).ensure( function () { conn.close(); });
  }).then(null, console.warn);

The channel is indeed working in confirmation mode according to the RabbitMQ admin interface and the message is sent to the queue, but again nothing is logged to the console. Am I using it wrong?

@squaremo
Copy link
Collaborator

Yes, one would reasonably expect that to work. What's happening is that the channel is closed before the confirmation comes back -- in fact, the whole program could exit before the confirmation comes back. This is because publish and sendToQueue return immediately, meaning the next thing the program does is close the channel, without waiting for any I/O.

One thing you can do (now, in master but not in a release yet) is use ConfirmChannel#waitForConfirms(), which returns a promise, to wait until all unresolved confirmations have been received.

return ch.waitForConfirms().then(function() { return ch.close(); });

Another possibility is to close the channel in the confirm callback, and link the connection close to the 'close' event on the channel; another is to resolve a promise in the confirm callback, and close the channel in that promise's continuation. But waitForConfirms() seems the most convenient way.

@ScOut3R
Copy link
Author

ScOut3R commented Oct 15, 2014

Thanks @squaremo for the quick reply! So I was using it in the wrong way indeed. :) Anyway, I've tried the solution you've suggested and it works like a charm!

@ScOut3R ScOut3R closed this as completed Oct 15, 2014
@squaremo
Copy link
Collaborator

So I was using it in the wrong way indeed.

Weeeelll, it's a pretty subtle problem, which I don't explain very well in documentation.
Glad you got it all working.

@gauravchl
Copy link

@squaremo @ScOut3R

Hey Guys, I'm having the similar kind of issue:
I'm trying to get the job's status(success or fail) on client in sendToQueue's callback that trigger when ch.ack or ch.nack hocked inside consumer.


But in my case the sendToQueue callback runs immediately on client, It suppossed to be run after acknowledge by consumer( ch.ack/ch.nack).
Any Idea about this?
Here is the code that i'm running:

#!/usr/bin/env node

function runOnWorker(){
  var amqp = require('amqplib/callback_api');

  amqp.connect("amqp://localhost", function(err, con){
    if(err!==null) return;
    console.log("[RMQ] Connected!");
    con.createConfirmChannel(function(err, ch) {
      ch.assertQueue("testQ", {durable: true});

      var onReceive = function(msg){
        setTimeout(function(){
          if(msg.content.toString() == "Hello1"){
            ch.ack(msg);
            console.log("Sending [ack] for Hello1");
          }else{
            console.log("Sending [nack] for Hello2");
            ch.nack(msg,false,false);
          }

        }, 5000);
      }

      ch.consume("testQ", onReceive, {noAck: false});
    });
  });
}

runOnWorker();
#!/usr/bin/env node

function runOnClient(){
  var amqp = require('amqplib/callback_api');

  amqp.connect("amqp://localhost", function(err, con){
    if(err!==null) return;
    console.log("[RMQ] Connected!");
    con.createConfirmChannel(function(err, ch) {
      ch.assertQueue("testQ", {durable: true});

      ch.sendToQueue("testQ", new Buffer("Hello1"),{persistent: true},function(e,r){console.log("callback[Hello1]: ",e,r)});
      console.log("Hello1 sent to queue;");
      ch.sendToQueue("testQ", new Buffer("Hello2"),{persistent: true},function(e,r){console.log("callback[Hello2]: ",e,r)});
      console.log("Hello2 sent to queue;");

      ch.waitForConfirms(function(err){
        if(err){
          console.error(err);
        }else{
          console.log('All message done');
          con.close();
        }

      });
    });
  });
}

runOnClient();

Output:

2016-01-07 14-28-54

@ScOut3R
Copy link
Author

ScOut3R commented Jan 7, 2016

@gauravchl If I get this right then this is normal. The Client/Producer puts the messages on the Queues and exits. After this the Worker/Consumer pulls the messages from the Queues and works on them. The ACK messaging is always between a the Producer-Broker and Consumer-Broker. The Producer and Consumer cannot and should not talk to each other.

@gauravchl
Copy link

@ScOut3R Thanks for the quick reply :)

The Producer and Consumer cannot and should not talk to each other.

Then we will have to refactor some unscheduled jobs :)

@eluck @angusmcleod

@t1bb4r
Copy link

t1bb4r commented Mar 26, 2017

Ran into the same issue. The documentation does specify this, but it's easy to miss.
"On a channel in confirmation mode, each published message is 'acked' or (in exceptional circumstances) 'nacked' by the server, thereby indicating that it's been dealt with."

We understood this paragraph as, when the worker 'acked', while it is actually referring to the RabbitMQ server.

@webdobe
Copy link

webdobe commented Feb 4, 2022

I was running into the same error message I have a promise setup within that I resolve it in the consumer:

         // Listen for response
        const reply = channel.consume(rpcQueue, (msg) => {
          if (msg.properties.correlationId === corrId)
            resolve(msg.content.toString());
            channel.close();
          }, { noAck: true }
        )

        // Send request
        const request = channel.sendToQueue(
          rpcQueue,
          Buffer.from(sendMsg), 
          {
              correlationId: corrId, 
              replyTo: rpcQueue
          }
        )

My issue was I was closing my channel before I resolved. Closing the channel after my resolve fixed my issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants