Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue level duplication: support for quorum and mirror queues #37

Open
maciejmackowiak opened this issue Sep 25, 2019 · 33 comments
Open
Labels
feature Feature request

Comments

@maciejmackowiak
Copy link

Hello!
Thank you for this plugin, it exactly what we need and it works great!

But I've found an issue or maybe I'm doing something wrong.

There is a case when tasks with same header are in the queue and it looks like it has something to do with requeue messages.

Steps to reproduce:

  1. Add message to the queue
  2. Got to RabbitMQ Web Ui and get messages

obraz

  1. Add the same message
  2. Got to RabbitMQ Web Ui and get messages

obraz

The x-deduplication-header is just a mda5 hash of message body

So am I doing something wrong or this is an issue?

Thanks in advance!
Maciej

@archon810
Copy link

This seems like a pretty important issue to figure out, as I observed the same behavior.

@noxdafox Please let me know if you need further reproduction steps.

@noxdafox
Copy link
Owner

Hello,

I tried reproducing your issue as istructed.

  1. I created a deduplication queue
  2. I published a message with deduplication header "asd"
  3. I got the message with default options: Nack, requeue = True
  4. I published another message

Yet only one message at a time was in the queue.

What version of RMQ, Erlang and the plugin are you using? Did you restart the broker after installing the plugin? Sometimes we observed cases in which adding the plugin to a running broker would look Ok but then it was not operational.
Can you provide a programmatic way to reproduce the issue? Even using bash and the curl would be enough.

@maciejmackowiak
Copy link
Author

maciejmackowiak commented Sep 30, 2019

Hi @noxdafox
Thanks for replay.

Rabbitmq version: 3.7.18
Erlang version: Erlang 22.1
Plugin:
rabbitmq_message_deduplication-0.4.2.ez
elixir-1.8.2.ez

I'm using cluster of four rabbitmq's and it looks like this is the problem.

This is my rabbitmq.conf file:

loopback_users.guest = false
listeners.tcp.default = 5672  
management.listener.port = 15672
management.listener.ssl = false
#log.file.level = none
#log.console.level = warning
management.load_definitions = /etc/rabbitmq/definitions.json
default_pass = rabbitmq
default_user = rabbitmq
default_vhost = /
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@rabbitmq1
cluster_formation.classic_config.nodes.2 = rabbit@rabbitmq2
cluster_formation.classic_config.nodes.3 = rabbit@rabbitmq3
cluster_formation.classic_config.nodes.4 = rabbit@rabbitmq4

And definitions.json:

{
      "rabbit_version": "3.7.18",
      "users": [
          {
              "name": "rabbitmq",
              "password_hash": "yra51GStTeHvsm29uazFweLLpESEaxzO8ezcR5FQ43z40vYL",
              "hashing_algorithm": "rabbit_password_hashing_sha256",
              "tags": "administrator"
          }
      ],
      "vhosts": [
          {
              "name": "/"
          }
      ],
      "permissions": [
          {
              "user": "rabbitmq",
              "vhost": "/",
              "configure": ".*",
              "write": ".*",
              "read": ".*"
          }
      ],
      "topic_permissions": [],
      "parameters": [],
      "global_parameters": [
          {
              "name": "cluster_name",
              "value": "rabbit@rabbitmq1"
          }
      ],
      "policies": [
          {
              "vhost": "/",
              "name": "test",
              "pattern": "^test",
              "apply-to": "all",
              "definition": {
                  "ha-mode": "all",
                  "ha-sync-mode": "automatic"
              },
              "priority": 0
          }
      ],
      "queues": [
          {
              "name": "test",
              "vhost": "/",
              "durable": true,
              "auto_delete": false,
              "arguments": {
                  "x-message-deduplication": true
              }
          }
      ],
      "exchanges": [],
      "bindings": []
  }

Plugin works perfectly but like mentioned before when I use the web ui to get messages then I can publish them again also I've been stoping and starting nodes and once node rejoins the cluster then I can publish message with the same deduplication header and it's added to the queue.

I think that it may be related with this #19

So it looks like it is problem with cluster syncing.
Maybe I'm missing some settings?

@noxdafox
Copy link
Owner

Unfortunately, RMQ 3.7.18 introduced non-backward compatible changes which break the plugin (see compatibility notes).

I already pushed to master changes which bring support to new RMQ APIs but they still need to undergo testing before I make a new release. If you look at the README.md the maximum supported version is 3.7.17.

I hope I can release a new forward-compatible version of the plugin by the end of this week but I can't make promises.

@maciejmackowiak
Copy link
Author

Thanks for quick replay.

I've just downgraded local enviroment to 3.7.17 and the results are exactly the same as for 3.7.18

With single rabbitmq node it is working perfectly, I can get messages and the deduplication is working but with more nodes when I get messages(with Nack, requeue = True) and publish new message with the same header it is added to the queue and the same goes for stoping and starting nodes.

Rabbitmq version: 3.7.17
Erlang version: Erlang 22.0.7
Plugin:
rabbitmq_message_deduplication-0.4.2.ez
elixir-1.8.2.ez

@noxdafox
Copy link
Owner

noxdafox commented Oct 3, 2019

I just spinned a 3 nodes cluster using docker-compose and yet I could not reproduce the issue. I publish a message, consume it with Nack, requeue = True and publish a new one. Yet I end up with only one message in the queue.

Used rabbitmq:3.7.17-management image with the plugin downloaded from the release list. Is there something I am doing differently from you setup? If you try the above mentioned docker image, do you get the same result?

@maciejmackowiak
Copy link
Author

I've also used that image in docker, and plugin downloaded from the release list.
I've also installed RabbitMQ locally and had the same issue with cluster.

Have You set the policy so the queue was mirrored to all the nodes?
I used this policy:

          {
              "vhost": "/",
              "name": "test",
              "pattern": "^test",
              "apply-to": "all",
              "definition": {
                  "ha-mode": "all",
                  "ha-sync-mode": "automatic"
              },
              "priority": 0
          }

And queue

 {
              "name": "test",
              "vhost": "/",
              "durable": true,
              "auto_delete": false,
              "arguments": {
                  "x-message-deduplication": true
              }
          }

Just like in this comment: #37 (comment)
But of course with 3.7.17

Maybe I'am doing something wrong?
Can You share Your queue and policy settings or maybe also docker-compose file?

@archon810
Copy link

Any updates here, @noxdafox?

@noxdafox
Copy link
Owner

noxdafox commented Oct 9, 2019

I did not notice at first the use of HA queues. I will try to combine dedup and HA over the WE.

My gut feeling is queue-level deduplication and HA are not compatible. Reason is HA implementation bypasses some of the queue behaviours the plugin relies upon in order to check for duplicates. If there is no workaround, I guess the only way to go is relying on exchange-level deduplication.

@noxdafox
Copy link
Owner

I spent some time investigating the issue. Surprisingly enough, it seems duplicates show only if HA is enabled via policy. If HA is set on queue creation, de-duplication seem to work from the management console.

This is not a trivial problem. Core issue is, RMQ HA sits on top of the backing-queue-behaviour which is used in the de-duplication plugin to implement queue-level deduplication. In other words, this is not a problem of the plugin itself but rather an issue on how RMQ enables queue extensibility.

This means that we would need to submit quite few changes within RMQ to make this plugin and mirrored queues work. Considering RMQ is introducing a new HA model in 3.8 with quorum-queues, it becomes even harder to ensure queue-level de-duplication across all implementations.

For the time being, I'll update the README to underline the issue. You can always use exchange-level deduplication which is the recommended one anyways as it's much simpler to use and understand. During the week, I'll approach RMQ devs to see if there's a way we can tackle this problem.

@archon810
Copy link

Thank you for your diligence @noxdafox. Curious to hear from RMQ devs will say.

@maciejmackowiak
Copy link
Author

Thanks @noxdafox for feedback

You can always use exchange-level deduplication which is the recommended one anyways as it's much simpler to use and understand.

But correct me if I'm wrong exchange level deduplication doesn't really guarantee that messages will not get duplicated it will just ensure that for example if I set x-cache-ttl to 30 sec it means that once message is added then for 30 sec message with the same deduplication header will not be added to the queue? So for example if the queue worker will crash then we can end up with a lot duplicated messages?

@noxdafox
Copy link
Owner

noxdafox commented Oct 25, 2019

If I understand correctly, with "queue worker will crash" you mean the queue consumer right?

In this case a new consumer will receive the same message again as the one which crashed did not acknowledge it. If you want to avoid message re-delivering in case of error you can disable acknowledgment.

The de-duplication plugin does not prevent message duplicates due to network or broker errors. It is designed to prevent publishers from publishing duplicate messages.

The exchange level de-duplication allows you to select for how long (virtually forever) a message should not be published again. As it works at the exchange-level, it allows to de-duplicate multiple queues at once.

The queue level de-duplication prevents a message to be published within a queue if already another one with the same de-duplication header is present.

Usually people prefer queue level de-duplication but fail to see how more unpredictable de-duplication becomes. What if the queue is empty most of the time for example? The queue level de-duplication is effective if the User wants to prevent two identical messages to be consumed at the same time but still wants identical messages to be consumed in different time frames.

@archon810
Copy link

@noxdafox Our use case is as follows, for example. We would like to use a queue for jobs that would update various caches when blog posts are updated.

The goal is to remove the delay associated with dumping such caches which could take multiple seconds and make it async, thus having the post screen in Wordpress come back faster and annoy our writers less.

There will be other uses, but let's take this one as a hypothetical.

The simple goal here is this:

  1. If there's a job to dump cache of post with ID 123 in the queue, don't add a duplicate job if another cache dump request comes in for the same ID.
  2. If there's no such job, then add it.

We went with queue level deduplication and ran into issues with deduping not working and still inserting duplicate jobs. Now that we're considering exchange level deduplication with these ttls, what is the downside of using it vs queue level?

If the job is already gone and processed but ttl is set to 10 min, does exchange level deduplication still add the job or it will simply look at the previous ttl and drop the adding the new duplicate job, even though it's not even a dupe anymore (because the original job was consumed and processed)?

The goal is so simple - if there's a job with the same params, don't re-add it. If there isn't, add it. I don't get why RMQ doesn't support it natively, and why they're making it so difficult to support for a plugin.

@archon810
Copy link

Did RMQ devs tell you anything useful to get this to work btw?

@noxdafox
Copy link
Owner

noxdafox commented Nov 7, 2019

@archon810, your Use Case is pretty legitimate. The queue level de-duplication seems the best approach.

The exchange level de-duplication will de-duplicate for the entire duration of the TTL. In other words, a TTL of 10 minutes would mean no other message with the same header would be routed for the next 10 minutes since the first one.

The reason why RMQ does not support de-duplication is because it's not part of the AMQP standard.

RMQ provides a rabbit_backing_queue behaviour which can be used to provide queues implementation and add features for the already existing ones.

I used such behaviour to implement de-duplication over existing queues.

Unfortunately, not all features are using the above behaviour. The reason for that is unknown to me. My gut feeling is the above behaviour was not fully intended for extending RMQ queues capabilities. As some of the features do not use such behaviour, they will "step over" the behaviour implementations.

Among the problematic features/queues Mirrored and Quorum queues seem to be the offending ones.

I did not yet reach RMQ community as I did not have time to dig deeper into the issue. I'd like to be able to pinpoint where the problem is before bothering the core developers. ATM i'm very busy so you will need to be patient for a while as my free time is very little.

If de-duplication is critical for you, I might suggest you to try disabling mirroring for the meantime. OFC you need to consider the risk of loosing messages in case of network or broker errors.

@archon810
Copy link

Thank you for clarifying things @noxdafox. I, of course, understand the free time issue, guilty of that myself. May I just ask that once you get some free time, you don't let this issue get forgotten?

@maciejmackowiak
Copy link
Author

Hi @noxdafox
Do you have any updates in this issue?

@noxdafox
Copy link
Owner

Hello,

sorry for the delay.

The current desire of RMQ devs is to slowly retire generic multi-feature queues to provide more specialised queue implementations which better suit specific Use Cases. Quorum Queues are the first example of the direction RMQ developers are taking in this matter.

So far, this plugin is not supporting Mirrored and Quorum queues and it won't in the near future (I am updating the README).

My plan is to reach RMQ devs and try to understand if we can find a way to provide generic hooks to implement features such as queue-level deduplication on the new queue architecture. This will of course take some time considering there is no clear understanding of how the new architecture will look like.

My suggestion for the moment is to rely on exchange level deduplication if queue HA is needed. I know is a sub-optimal solution for certain cases but there is not a simple alternative at the moment.

@maciejmackowiak
Copy link
Author

Hi @noxdafox
do you have any updates in this matter?

thanks
Maciej

@noxdafox
Copy link
Owner

Unfortunately no updates yet.

As I said, this requires significant work on RabbitMQ rather than the plugin itself.

I will resume this track in September when my agenda gets more free.

@maciejmackowiak
Copy link
Author

Hi @noxdafox
Did you have time to look into it?

@noxdafox
Copy link
Owner

noxdafox commented Nov 4, 2020

No sorry, I am currently focusing on other issues affecting the plugin.

@Burnsys2
Copy link

Burnsys2 commented Dec 8, 2020

Hi guys, same issue here:
Deduplication at queue level.
Repeated messages.

image
image

@mlb5000
Copy link

mlb5000 commented Feb 27, 2021

@noxdafox im having the same issue as @Burnsys2, where duplicates are getting queued even in basic cases. I don't have HA turned on in my cluster of anything like that

@Burnsys2
Copy link

Burnsys2 commented Mar 7, 2021

Just a clarification. Something happens and the queue starts to accepts duplicates until i have to recreate the queue from scratch,.. After recreation it works ok, unitl it starts to fail at some point and then keeps failing until queue recreation.

@DrParanoia
Copy link

@noxdafox Hey there! Sorry to bother, but is there any news on supporting Quorum queues? I am trying to scale RMQ, and this is a real blocker 😢 A lot of our queues are using the plugin, and it seems I won't be able to move them to the cluster.

@dvcrn
Copy link

dvcrn commented Jan 14, 2022

Hi I'm having the same issue of having multiple items with the same x-deduplication-header header ending up in the same queue. No special settings either, just a fresh rabbitmq (single node). Set up a new deduplication exchange to publish to a different exchange, which then publishes into a queue

Erlang 24.2, RabbitMQ 3.9.12

Publish code:

	headers := make(amqp.Table)
	headers["x-deduplication-header"] = id

	b, err := json.Marshal(payload)
	if err != nil {
		return err
	}

	err = c.amqpChan.Publish(exchange, routingKey, false, false, amqp.Publishing{
		ContentType: "application/json",
		Body:        b,
		Headers:     headers,
	})

@haljac
Copy link

haljac commented May 10, 2022

I'm experiencing this issue. Is this plugin functional?

@noxdafox noxdafox changed the title Queue level duplication: tasks with same header are in the queue Queue level duplication: support for quorum and mirror queues May 10, 2022
@noxdafox
Copy link
Owner

This ticket is for supporting deduplication on mirror and quorum queues. Not for issues related with de-duplication itself.

If you are having issues with de-duplication, please open a ticket where you clearly state:

  • How you create a deduplication exchange
  • How you publish a message in said exchange
  • How you are observing the duplicates on the consumer side
  • Version of the plugin and the broker

@noxdafox noxdafox added the feature Feature request label Oct 29, 2022
@TDola
Copy link

TDola commented Jan 18, 2023

I am also looking for quorum deduplication functionality. Was disappointed to see the plugin only supports the deprecated classic queues.
Hoping for an update

@dvcrn
Copy link

dvcrn commented Jan 22, 2023

I've solved this issue for my project by creating a little Golang service that listens to the queues and does in-memory deduplication based on a unique id field / deduplication field, before publishing back into a deduped exchange

Ripped out from my project so there's some stuff missing, but FYI in case this is helpful. You get the idea

https://gist.github.com/dvcrn/fbfceeb1cd253cb4e4c81ec3b4d5d70f#file-main-go

@noxdafox
Copy link
Owner

noxdafox commented Mar 5, 2023

I reached the RMQ community today related to this issue:
https://groups.google.com/g/rabbitmq-users/c/WawIw3z5oGA

I cannot promise that there will be a solution in the foreseeable future but, if interested, you can follow the discussion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature Feature request
Projects
None yet
Development

No branches or pull requests

9 participants