Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent multiple QueueEvents handlers from processing same message #2221

Closed
gl-jkeys opened this issue Oct 9, 2023 · 14 comments
Closed

Prevent multiple QueueEvents handlers from processing same message #2221

gl-jkeys opened this issue Oct 9, 2023 · 14 comments

Comments

@gl-jkeys
Copy link

gl-jkeys commented Oct 9, 2023

Is your feature request related to a problem? Please describe.

AFAICT, there's currently no way of instructing the QueueEvents listener to lock incoming messages, such that only one QueueEvents object will process any given message.

That makes it impossible to scale the QueueEvents horizontally, and creates a bottleneck in any system that relies on BullMQ.

Describe the solution you'd like

The QueueEvents object should lock or lease the message in a way that allows re-delivery if the node fails to handle it. In the happy path, only one listener should handle any given message.

Describe alternatives you've considered

  • Attach handler to worker.
    • We're using BullMQ Pro, and this seems to be an antipattern. If we have a large batch size, but need to fail a single job out of that batch, then we have to pull all jobs in the batch.
  • Elect a leader node to run the QueueEvents listener.
    • adding leader election to a multi-node system is non-trivial, and does not solve the bottleneck issue.

Additional context

We're using BullMQ Pro with the batches functionality.

@gl-jkeys gl-jkeys changed the title Prevent multiple QueueEvents handlers from processing same job Prevent multiple QueueEvents handlers from processing same message Oct 9, 2023
@manast
Copy link
Contributor

manast commented Oct 9, 2023

QueueEvents is not designed to process jobs. You use the Worker instance for this. It scales actually nicely horizontally as you can add as many workers as you see fit.

@gl-jkeys
Copy link
Author

gl-jkeys commented Oct 9, 2023

Sure, I'm talking about the event messages. Not jobs.

@manast
Copy link
Contributor

manast commented Oct 9, 2023

A message is the same as a job. The worker receives a "job/message" and does something with it.

@manast
Copy link
Contributor

manast commented Oct 9, 2023

Events are useful for debugging, updating progress, etc.

@gl-jkeys
Copy link
Author

Okay, so again, we're not supposed to do any transactional work inside BullMQ event listeners because they are not guaranteed to be delivered -- is that correct? What do you recommend instead of event listeners, for performing rollback work?

@manast
Copy link
Contributor

manast commented Oct 10, 2023

As I mentioned before, you use the worker for that. The worker gets jobs/messages and you process them in the processor function. This is all architected so that you can enable concurrency processing per worker, delivery guarantees and so on.

@gl-jkeys
Copy link
Author

Going back to the original topic, how can I horizontally scale a QueueEvents listener?


As I mentioned before, you use the worker for that. The worker gets jobs/messages and you process them in the processor function. This is all architected so that you can enable concurrency processing per worker, delivery guarantees and so on.

Are event delivery guarantees not something that BullMQ can offer for event listeners? I think it's possible to write an event emitter that will redeliver forever until a consumer acknowledges it (example below) -- is there a technical limitation that prevents BullMQ from emulating this behavior with redis?

e.g.

const EventEmitter = require('events');

class Emitter extends EventEmitter {
  constructor() {
    super();
    this.pendingEvents = [];
  }

  emitWithAcknowledgment(event, data) {
    this.pendingEvents.push({ event, data });
    this.processEvents();
  }

  acknowledgeEvent(data) {
    // Assume data contains some unique ID or reference to identify the event
    const index = this.pendingEvents.findIndex(e => e.data.id === data.id);
    if (index !== -1) {
      this.pendingEvents.splice(index, 1);
    }
  }

  processEvents() {
    for (let pendingEvent of this.pendingEvents) {
      super.emit(pendingEvent.event, pendingEvent.data);
    }
  }
}

const emitter = new Emitter();

class Consumer {
  constructor(emitter) {
    emitter.on('someEvent', (data) => {
      console.log(`Received event with data: ${data.id}`);
      if (Math.random() > 0.5) { // Simulate sometimes acknowledging the event
        console.log(`Acknowledging event with data: ${data.id}`);
        emitter.acknowledgeEvent(data);
      }
    });
  }
}

const consumer = new Consumer(emitter);

// Emit events with acknowledgment
let eventId = 1;
setInterval(() => {
  emitter.emitWithAcknowledgment('someEvent', { id: eventId++ });
  emitter.processEvents(); // Retry unacknowledged events
}, 1000);

@manast
Copy link
Contributor

manast commented Oct 10, 2023

Why would you like to do something like that? Maybe it is better that you explain your use case, so I can tell you if it is possible to achieve with BullMQ or not.

@gl-jkeys
Copy link
Author

@manast I will send you an email at the commercial support email on your docs that describes our system in more detail. (We are BullMQ Pro subscribers.)

Thank you.

@manast
Copy link
Contributor

manast commented Oct 10, 2023

Great. I will close this issue then.

@manast manast closed this as completed Oct 10, 2023
@godinja
Copy link

godinja commented Dec 8, 2023

@manast I have a use case where I'd like to prevent multiple QueueEvents handlers from processing the same message.

I'm working with a system that makes use of flows. We'd like to listen to the active, completed, and failed events on each of our workers to update a database about the state of the individual jobs. This works fine when using a worker listener, but from my understanding (as outlined here #2229 (comment)) a parent job that is moved to the failed status as a result of a child job failure does not emit the failed event. This is what lead me to use a QueueEvents class.

If these QueueEvents handlers are scaled horizontally, we get events that are processed multiple times. How can we use this class while ensuring that each event is only consumed once? Or maybe there is a better way to go about architecting this solution.

@manast
Copy link
Contributor

manast commented Dec 8, 2023

@godinja I think the most robust solution is to update the database status from within the job processor itself. When the job starts processing you can update the database as you would from the active event, and so on. QueueEvents is mostly useful for updating UIs or for debugging purposes.

@godinja
Copy link

godinja commented Dec 8, 2023

I think the most robust solution is to update the database status from within the job processor itself

The problem with this, at least in our use case, is that the processor for the parent job will never execute if the child fails since we are enable the failParentOnFailure option. The only way to listen to the failed event for a parent job whose child fails is through a QueueEvents class.

Would you ever consider implementing some sort of event lock functionality that can be conditionally enabled by any of the event emitters?

@manast
Copy link
Contributor

manast commented Dec 8, 2023

What do you mean with "event lock" ? Btw, even if you were getting the same event several times if you happen to have several instances of QueueEvents running, what does it matter other than being a bit more inneficient?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants