Allows to use MongoDB as a message queue broker.
- Installation
- Create context
- Send message to topic
- Send message to queue
- Send priority message
- Send expiration message
- Send delayed message
- Consume message
- Subscription consumer
$ composer require enqueue/mongodb
use Enqueue\Mongodb\MongodbConnectionFactory;
// connects to localhost
$connectionFactory = new MongodbConnectionFactory();
// same as above
$factory = new MongodbConnectionFactory('mongodb:');
// same as above
$factory = new MongodbConnectionFactory([]);
$factory = new MongodbConnectionFactory([
'dsn' => 'mongodb://localhost:27017/db_name',
'dbname' => 'enqueue',
'collection_name' => 'enqueue',
'polling_interval' => '1000',
$context = $factory->createContext();
// if you have enqueue/enqueue library installed you can use a factory to build context from DSN
$context = (new \Enqueue\ConnectionFactoryFactory())->create('mongodb:')->createContext();
/** @var \Enqueue\Mongodb\MongodbContext $context */
/** @var \Enqueue\Mongodb\MongodbDestination $fooTopic */
$message = $context->createMessage('Hello world!');
$context->createProducer()->send($fooTopic, $message);
/** @var \Enqueue\Mongodb\MongodbContext $context */
/** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */
$message = $context->createMessage('Hello world!');
$context->createProducer()->send($fooQueue, $message);
/** @var \Enqueue\Mongodb\MongodbContext $context */
$fooQueue = $context->createQueue('foo');
$message = $context->createMessage('Hello world!');
->setPriority(5) // the higher priority the sooner a message gets to a consumer
->send($fooQueue, $message)
/** @var \Enqueue\Mongodb\MongodbContext $context */
/** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */
$message = $context->createMessage('Hello world!');
->setTimeToLive(60000) // 60 sec
->send($fooQueue, $message)
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
/** @var \Enqueue\Mongodb\MongodbContext $context */
/** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */
// make sure you run "composer require enqueue/amqp-tools".
$message = $context->createMessage('Hello world!');
->setDeliveryDelay(5000) // 5 sec
->send($fooQueue, $message)
/** @var \Enqueue\Mongodb\MongodbContext $context */
/** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */
$consumer = $context->createConsumer($fooQueue);
$message = $consumer->receive();
// process a message
// $consumer->reject($message);
use Interop\Queue\Message;
use Interop\Queue\Consumer;
/** @var \Enqueue\Mongodb\MongodbContext $context */
/** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */
/** @var \Enqueue\Mongodb\MongodbDestination $barQueue */
$fooConsumer = $context->createConsumer($fooQueue);
$barConsumer = $context->createConsumer($barQueue);
$subscriptionConsumer = $context->createSubscriptionConsumer();
$subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) {
// process message
return true;
$subscriptionConsumer->subscribe($barConsumer, function(Message $message, Consumer $consumer) {
// process message
return true;
$subscriptionConsumer->consume(2000); // 2 sec