Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2 design ideas #229

Open
michaelklishin opened this issue May 26, 2023 · 9 comments
Open

v2 design ideas #229

michaelklishin opened this issue May 26, 2023 · 9 comments
Assignees

Comments

@michaelklishin
Copy link
Member

michaelklishin commented May 26, 2023

Updates

The design below evolved over time. The last major update is from Sep 26, 2023.

Problem Definition

The current design of this plugin has plenty of limitations that make it unsuitable for many environments. They are visibly called out in the README but to reiterate:

  • All delayed messages are stored in a non-replicated Mnesia table
  • The only metric provided is the total number of delayed messages
  • Mnesia has a lot of peculiarities in how it approaches recovery from failures. It will be removed from RabbitMQ as of 4.0, so we need a replacement

These limitations call for a new plugin with the same goal but a different design. For the lack of a more creative name, let's call it Delayed Message Exchange v2 or simply "v2".

This issue is an outline of some of the ideas our team has. I am intentionally filing it as
an issue, despite this being just a number of ideas.

You are welcome to share your ideas in the comments as long as

  1. The goal does not venture too far from what this plugin currently can do. In other words, the goal is to keep it small and focussed, and some ideas will not be accepted
  2. The discussion remains civil, unlike some other issues in this repo. If it ends up being too heated, it will be locked. Insults towards the maintainers will not be tolerated

Where to Store Messages

This plugin stores messages for future and some metadata about them:

  • When it is due for re-publishing
  • Where to publish it
  • How to route it

This information should be replicated. Unlike in the early 2010s, when this plugin was first envisioned, modern RabbitMQ versions have a mature and efficient Raft implementation
plus a few more sub-projects that can be considered for distributed plugins such
as this one in v2.

Streams do not allow for random reads of individual
messages, so using a stream or an Osiris log directly would only be possible
as a replication mechanism that local instances of the plugin will read from and
put the data into a suitable store providing random reads.

Perhaps ironically we need a classic efficient disk-based key value store,
with local storage provided by something like

and the distribution layer provided by Ra. This example Ra-based K/V store would have
to have a more mature version used for more than running Jepsen tests against Ra.

This store would allow for random reads of individual messages stored for later publishing
and needs to provide a very minimalistic API.

Rust-based stores such as Marble can be accessed via Rustler.

Where to Store Metadata

RabbitMQ is in the middle of a transition to Khepri, a Raft-based tree-structured data store we
have developed for storing schema.

Khepri will be first introduced in RabbitMQ 3.13 and is going to be merged into main.

Specifically Khepri is suitable for storing an index of delayed messages:

  • Timestamps can be used as keys on which we sort/filter
  • Values can be message IDs (or mc/message container data structures if we target RabbitMQ 3.13 from the start)

The index will help locate the IDs of messages that are up for delivery. The messages
themselves, possibly with their metadata, can be loaded from a durable key/value store
described above.

Because Khepri will be available via a feature flag in 3.13, this plugin will have to
require that flag.

Using Fewer Timers

Assuming that all metadata, including expiration, is stored in Khepri in a way that makes it
possible to easily query a subset of delayed messages, this plugin can use a very small
number of timers, for example, just one or one per virtual host.

Not having a lot of far-in-the-future timers has another benefit: the current (2^32 - 1)
limitation for timer intervals will no longer apply. The timer will be used as a periodic
tick process that finds out what messages are up for re-publishing, loads their metadata and
hands it all off to a different process.

This way, if someone wants to delay a message for more than ≈ 24 days, they will be able to
do it, even though I would still not recommend it.

What Metrics to Provide

Right now this plugin provides a single metric: the number of delayed messages. There is
interest in turning it into a histogram of time periods.

More Powerful CLI Tools

Besides inspecting various metrics provided by this plugin, the operator will occasionally need
to manipulate the messages delayed: delete individual messages or a subset, inspect them, and so on.

Moving storage to a stream will make inspection possible, and deletion can be done on just the
metadata, with stream retention policies taking care of cleaning up the newly orphaned messages
on disk.

Re-publishing from a "Leader"

Like with many non-eventually consistent distributed systems, we have to either decide to perform writes via a single elected leader, or partition the data set such that N writers can
co-exist within a single cluster.

Some existing (commercial) plugins use a per-virtual host partitioning scheme. For this
plugin it makes more sense to do this per-exchange. We cannot do this per-queue because
the queues the message will route to is unknown in advance.

Mapping routing keys to a set of writers/publishers won't work either because we need to
guarantee message ordering within a single queue.

Khepri, much like etcd, can be used for leader elections much like etcd is used by
Kubernetes-oriented systems.

Known Problems and Limitations

The biggest issue before RabbitMQ 3.13.0 will be Khepri cluster formation. Getting it right
from a plugin can be painful. When RabbitMQ itself introduces Khepri in the core, it's not
clear whether a reasonable upgrade path can be provided.

Khepri is a tree-structured store, so certain types of queries will not be an option. This
means that the data model of this plugin has to be carefully designed to support
the subset of CLI commands we'd introduce.

Khepri, much like etcd, can be used for leader elections. But it's not RabbitMQ's use
case for Khepri, so there may be "unknown unknowns".

In the discussion about how many leaders/writers the cluster of plugin instances should have,
we completely avoid the issue of message ordering at their original publishing time.

For example, if messages M1, M2, …, Mn are published in order but their delay is such that
they all must be published at the same time, would the users expect the M1, M2, …, Mn order to be preserved? If so, what kind of leader/writer design trade-offs would that entail?

@michaelklishin
Copy link
Member Author

Updated after a closer review with @SimonUnge and a few members of the core team.

@gomoripeti
Copy link
Contributor

Great initiative, great write up. I dont have any comment on the details of the storage implementation itself (khepri for metadata and a KV store sounds good)
Just putting down some random ideas about the plugin

Expectations

  • My 2c is that delivery order of messages scheduled for the same
    timestamp is unspecified (not the publishing order)
    (it would be different if there would be an explicit "schedule at"
    parameter and not a "delay by")

  • Would be good to store as little data in memory as conveniently
    possible, so that more messages can be delayed (ie there is some
    data that is only stored on disk - as opposed to Mnesia or khepri)
    (for example the exchange, although metadata, could be stored in the KV store and not khepri)

Metrics

  • total size of messages (bodies) per exchange
  • max message body size per exchange
  • timestamp histogram might be hard to maintain, but a first/last timestamp would be still informative
  • memory/disk usage of the plugin

Actions

  • purge messages per exchange

Feature ideas

  • Allow limiting delivery rate of expired messages

    Could be useful when the broker is stopped for a period, and at startup
    tries to deliver all the messages which expired while it was stopped,
    creating a delivery (and hence cpu/memory) spike.

  • Configurable upper limit of memory used by the plugin

    Reject publishes if above.
    Alternatively could be similar to max-length or max-length-bytes (per-exchange or global)
    + Metric: number of rejected publishes per exchange

@Joseph-Zhichao
Copy link

Joseph-Zhichao commented May 24, 2024

Hello @michaelklishin,

I have some question about using delay message plugin,

You mentioned the limitation

Not having a lot of far-in-the-future timers has another benefit: the current (2^32 - 1)
limitation for timer intervals will no longer apply. The timer will be used as a periodic
tick process that finds out what messages are up for re-publishing, loads their metadata and
hands it all off to a different process.
This way, if someone wants to delay a message for more than ≈ 24 days, they will be able to
do it, even though I would still not recommend it.

If I understand correctly, the latest design supports more than 24 days. But I am unsure about the limit.
I am considering using the RabbitMQ delay message plug-in for my project which needs a delay of 300 days, would rabbit still be the best solution for me?

I am also curious about what's the size limit on the exchange. In my use case, I would need to put over 10k-20k messages into the exchange with various delays. Would the messages still be persisted (remain in the exchange) if rabbit server went down.

I'd appreciate it very much if you could help me clear the confusion

@illotum
Copy link

illotum commented May 27, 2024

@Joseph-Zhichao with respect, sounds like you need a database not message broker.

@Joseph-Zhichao
Copy link

@illotum thanks for the reply, Alex.

We are trying to explore the options without using a database, but I agree with you!

@jarodriguez-itsoft
Copy link

Thanks @michaelklishin for the detailed write up!

Have you considered using KeyDB (a Redis open-source branch)?
It already supports clustering out-of-the-box and has an interesting mechanism that would avoid you use any timers: you can set keys to expire and be notified via pub/sub when they do (https://docs.keydb.dev/docs/notifications/)
Max. expiration time is 64 bit, and can be set in ms or seconds,, so I guess it's more than enough for the requirements ;)

@michaelklishin
Copy link
Member Author

@jarodriguez-itsoft asking every RabbitMQ user who uses the plugin today or will its "successor" in Tanzu RabbitMQ to "bring their own Redis" is not realistic. But you are welcome to build a plugin that would be Redis-specific.

@jarodriguez-itsoft
Copy link

Hi @michaelklishin I understand your point and mostly agree with it, but being able to configure alternative K-V stores would be a great enhancement.
I had a quick look over the plugin code and TBH, before adding support to Redis, I think it should be refactored and generalized to separate the current Mnesia logic into an interface we could implement to initialize and use different stores.
The timer logic should also be interfaced so it could be replaced by an event-driven mechanism.

I think adding support to Keydb in Erlang is relatively straightforward as erl already has support to Redis, but making a fork can be a pain, not only because of the current codebase being fully dependant on Mnesia but also with the changes you are gonna make regarding schema API modules usage.
Having a separated plugin for each store type means the mantiners will need to upgrade it everytime a new Rabbit or delayed-exchange plugin version is released.

I see it working with some optional configuration options which default to whatever K-V engine is decided to be the default one when missing. e.g. "rabbitmq_delayed_exchange_engine" = "redis", "rabbitmq_delated_exchange_connectionstring" = "127.0.0.1:6379", etc... and the plugin using a factory-based initialization of the engine.

What do you think?

@michaelklishin
Copy link
Member Author

@jarodriguez-itsoft "alternative K-V stores" is not a design goal here. Like I said, you are welcome to build a separate exchange type plugin that uses any K-V store you need. This is extensible open source software after all.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants