Skip to content

Commit

Permalink
fix(mqtt): Add mqtt.MosquittoContainer (testcontainers#568) (testcont…
Browse files Browse the repository at this point in the history
…ainers#599)

This PR is adding a new MosquittoContainer class that helps creating
integration tests for MQTT clients.
The MosquittoContainer class contains a bunch of methods to help with
testing:
* checking number of messages received 
* watching topics
* check last payload published on a particular topic
* etc

This PR lacks tests. I can add them if there is interest in this PR...

---------

Co-authored-by: Dave Ankin <[email protected]>
  • Loading branch information
f18m and alexanderankin authored Jun 18, 2024
1 parent ec76df2 commit 59cb6fc
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 1 deletion.
1 change: 1 addition & 0 deletions index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ testcontainers-python facilitates the use of Docker containers for functional an
modules/milvus/README
modules/minio/README
modules/mongodb/README
modules/mqtt/README
modules/mssql/README
modules/mysql/README
modules/nats/README
Expand Down
2 changes: 2 additions & 0 deletions modules/mqtt/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.. autoclass:: testcontainers.mqtt.MosquittoContainer
.. title:: testcontainers.mqtt.MosquittoContainer
155 changes: 155 additions & 0 deletions modules/mqtt/testcontainers/mqtt/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from pathlib import Path
from typing import TYPE_CHECKING, Optional

from typing_extensions import Self

from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_container_is_ready, wait_for_logs

if TYPE_CHECKING:
from paho.mqtt.client import Client
from paho.mqtt.enums import MQTTErrorCode


class MosquittoContainer(DockerContainer):
"""
Specialization of DockerContainer for MQTT broker Mosquitto.
Example:
.. doctest::
>>> from testcontainers.mqtt import MosquittoContainer
>>> with MosquittoContainer() as mosquitto_broker:
... mqtt_client = mosquitto_broker.get_client()
"""

TESTCONTAINERS_CLIENT_ID = "TESTCONTAINERS-CLIENT"
MQTT_PORT = 1883
CONFIG_FILE = "testcontainers-mosquitto-default-configuration.conf"

def __init__(
self,
image: str = "eclipse-mosquitto:latest",
# password: Optional[str] = None,
**kwargs,
) -> None:
super().__init__(image, **kwargs)
# self.password = password
# reusable client context:
self.client: Optional["Client"] = None

@wait_container_is_ready()
def get_client(self) -> "Client":
"""
Creates and connects a client, caching the result in `self.client`
returning that if it exists.
Connection attempts are retried using `@wait_container_is_ready`.
Returns:
a client from the paho library
"""
if self.client:
return self.client
client, err = self.new_client()
# 0 is a conventional "success" value in C, which is falsy in python
if err:
# retry, maybe it is not available yet
raise ConnectionError(f"Failed to establish a connection: {err}")
if not client.is_connected():
raise TimeoutError("The Paho MQTT secondary thread has not connected yet!")
self.client = client
return client

def new_client(self, **kwargs) -> tuple["Client", "MQTTErrorCode"]:
"""
Get a paho.mqtt client connected to this container.
Check the returned object is_connected() method before use
Usage of this method is required for versions <2;
versions >=2 will wait for log messages to determine container readiness.
There is no way to pass arguments to new_client in versions <2,
please use an up-to-date version.
Args:
**kwargs: Keyword arguments passed to `paho.mqtt.client`.
Returns:
client: MQTT client to connect to the container.
error: an error code or MQTT_ERR_SUCCESS.
"""
try:
from paho.mqtt.client import CallbackAPIVersion, Client
from paho.mqtt.enums import MQTTErrorCode
except ImportError as i:
raise ImportError("'pip install paho-mqtt' required for MosquittoContainer.new_client") from i

err = MQTTErrorCode.MQTT_ERR_SUCCESS
if self.client is None:
self.client = Client(
client_id=MosquittoContainer.TESTCONTAINERS_CLIENT_ID,
callback_api_version=CallbackAPIVersion.VERSION2,
userdata=self,
**kwargs,
)
self.client._connect_timeout = 1.0

# connect() is a blocking call:
err = self.client.connect(self.get_container_host_ip(), int(self.get_exposed_port(self.MQTT_PORT)))
self.client.loop_start() # launch a thread to call loop() and dequeue the message

return self.client, err

def start(self, configfile: Optional[str] = None) -> Self:
# setup container:
self.with_exposed_ports(self.MQTT_PORT)
if configfile is None:
# default config file
configfile = Path(__file__).parent / MosquittoContainer.CONFIG_FILE
self.with_volume_mapping(configfile, "/mosquitto/config/mosquitto.conf")
# if self.password:
# # TODO: add authentication
# pass

# do container start
super().start()

self._wait()
return self

def _wait(self):
if self.image.split(":")[-1].startswith("1"):
import logging

logging.warning(
"You are using version 1 of eclipse-mosquitto which is not supported for use by this module without paho-mqtt also installed"
)
self.get_client()
else:
wait_for_logs(self, r"mosquitto version \d+.\d+.\d+ running", timeout=30)

def stop(self, force=True, delete_volume=True) -> None:
if self.client is not None:
self.client.disconnect()
self.client = None # force recreation of the client object at next start()
super().stop(force, delete_volume)

def publish_message(self, topic: str, payload: str, timeout: int = 2) -> None:
ret = self.get_client().publish(topic, payload)
ret.wait_for_publish(timeout=timeout)
if not ret.is_published():
raise RuntimeError(f"Could not publish a message on topic {topic} to Mosquitto broker: {ret}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# see https://mosquitto.org/man/mosquitto-conf-5.html

protocol mqtt
user root
log_dest stdout
allow_anonymous true

log_type error
log_type warning
log_type notice
log_type information

log_timestamp_format %Y-%m-%d %H:%M:%S
persistence true
persistence_location /data/

listener 1883
protocol mqtt

sys_interval 1
18 changes: 18 additions & 0 deletions modules/mqtt/tests/test_mosquitto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import pytest

from testcontainers.mqtt import MosquittoContainer

VERSIONS = ["1.6.15", "2.0.18"]


@pytest.mark.parametrize("version", VERSIONS)
def test_mosquitto(version):
with MosquittoContainer(image=f"eclipse-mosquitto:{version}") as container:
external_port = int(container.get_exposed_port(container.MQTT_PORT))
print(f"listening on port: {external_port}")


@pytest.mark.parametrize("version", VERSIONS)
def test_mosquitto_client(version):
with MosquittoContainer(image=f"eclipse-mosquitto:{version}") as container:
container.get_client()
17 changes: 16 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ packages = [
{ include = "testcontainers", from = "modules/minio" },
{ include = "testcontainers", from = "modules/milvus" },
{ include = "testcontainers", from = "modules/mongodb" },
{ include = "testcontainers", from = "modules/mqtt" },
{ include = "testcontainers", from = "modules/mssql" },
{ include = "testcontainers", from = "modules/mysql" },
{ include = "testcontainers", from = "modules/nats" },
Expand Down Expand Up @@ -117,6 +118,7 @@ memcached = []
minio = ["minio"]
milvus = []
mongodb = ["pymongo"]
mqtt = []
mssql = ["sqlalchemy", "pymssql"]
mysql = ["sqlalchemy", "pymysql"]
nats = ["nats-py"]
Expand Down Expand Up @@ -153,6 +155,7 @@ pytest-asyncio = "0.23.5"
kafka-python-ng = "^2.2.0"
hvac = "*"
pymilvus = "2.4.3"
paho-mqtt = "2.1.0"

[[tool.poetry.source]]
name = "PyPI"
Expand Down

0 comments on commit 59cb6fc

Please sign in to comment.