Skip to content

Commit

Permalink
[protocols] Initial implementation of MQTT protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
jcelerier committed Mar 24, 2024
1 parent eb3bf4b commit f6eb9f8
Show file tree
Hide file tree
Showing 14 changed files with 382 additions and 5 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,6 @@
[submodule "3rdparty/unordered_dense"]
path = 3rdparty/unordered_dense
url = https://github.com/martinus/unordered_dense
[submodule "3rdparty/async-mqtt5"]
path = 3rdparty/async-mqtt5
url = https://github.com/mireo/async-mqtt5
1 change: 1 addition & 0 deletions 3rdparty/async-mqtt5
Submodule async-mqtt5 added at aea517
7 changes: 7 additions & 0 deletions cmake/OssiaDeps.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ include(deps/unordered_dense)
include(deps/verdigris)
include(deps/websocketpp)

if(OSSIA_PROTOCOL_MQTT5)
include(deps/mqtt)
if(NOT TARGET Async::MQTT5)
set(OSSIA_PROTOCOL_MQTT5 FALSE CACHE INTERNAL "" FORCE)
endif()
endif()

if(OSSIA_PROTOCOL_MIDI)
include(deps/libremidi)
endif()
Expand Down
2 changes: 2 additions & 0 deletions cmake/OssiaOptions.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ option(OSSIA_PROTOCOL_MIDI "Enable MIDI protocol" ON)
option(OSSIA_PROTOCOL_OSC "Enable OSC protocol" ON)
option(OSSIA_PROTOCOL_MINUIT "Enable Minuit protocol" ON)
option(OSSIA_PROTOCOL_OSCQUERY "Enable OSCQuery protocol" ON)
option(OSSIA_PROTOCOL_MQTT5 "Enable MQTT 5 protocol" ON)
option(OSSIA_PROTOCOL_HTTP "Enable HTTP protocol" ON) # Requires Qt
option(OSSIA_PROTOCOL_WEBSOCKETS "Enable WebSockets protocol" OFF) # Requires Qt
option(OSSIA_PROTOCOL_SERIAL "Enable Serial port protocol" OFF) # Requires Qt
Expand Down Expand Up @@ -89,6 +90,7 @@ set(OSSIA_AVAILABLE_PROTOCOLS
WIIMOTE
ARTNET
LIBMAPPER
MQTT5
)

set(CMAKE_MODULE_PATH "${CMAKE_MODULE_PATH};${PROJECT_SOURCE_DIR}/CMake;${PROJECT_SOURCE_DIR}/cmake/cmake-modules;")
Expand Down
8 changes: 8 additions & 0 deletions cmake/deps/mqtt.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
if(OSSIA_USE_SYSTEM_LIBRARIES)
find_package(async-mqtt5 CONFIG GLOBAL)
endif()

if(NOT TARGET Async::MQTT5)
add_subdirectory("${OSSIA_3RDPARTY_FOLDER}/async-mqtt5" async-mqtt5)
endif()

4 changes: 4 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ if(OSSIA_PROTOCOL_SERIAL)
ossia_add_example(network_arduino "${CMAKE_CURRENT_SOURCE_DIR}/Network/Arduino.cpp")
endif()

if(OSSIA_PROTOCOL_MQTT5)
ossia_add_example(mqtt_publish "${CMAKE_CURRENT_SOURCE_DIR}/Network/Mqtt_publication.cpp")
endif()

ossia_add_example(export_namespace_to_json "${CMAKE_CURRENT_SOURCE_DIR}/Preset/Export_namespace_to_json.cpp")
ossia_add_example(logger "${CMAKE_CURRENT_SOURCE_DIR}/Common/Logger.cpp")
ossia_add_example(fuzzysearch "${CMAKE_CURRENT_SOURCE_DIR}/Common/FuzzySearch.cpp")
41 changes: 41 additions & 0 deletions examples/Network/Mqtt_publication.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#include <ossia/network/base/parameter_data.hpp>
#include <ossia/network/common/debug.hpp>
#include <ossia/network/context.hpp>
#include <ossia/network/generic/generic_device.hpp>
#include <ossia/protocols/mqtt/mqtt_protocol.hpp>

#include <boost/asio/post.hpp>

#include <iostream>
#include <memory>

void do_post(boost::asio::io_context& ctx, ossia::net::parameter_base& param)
{
boost::asio::post(ctx, [&ctx, &param] {
param.push_value("oy! " + std::to_string(rand() % 10));

do_post(ctx, param);
});
};

int main(int argc, char** argv)
{
auto ctx = std::make_shared<ossia::net::network_context>();

ossia::net::mqtt5_configuration conf{.host = "127.0.0.1", .port = 1883};
auto proto = std::make_unique<ossia::net::mqtt5_protocol>(ctx, conf);
ossia::net::generic_device device{std::move(proto), "P"};

// libossia v3: we really need to prevent live modification of the tree
// and have an api to update it async
ossia::create_parameter(device.get_root_node(), "/foo", "string")
->set_access(ossia::access_mode::GET)
.add_callback([](const ossia::value& val) {
std::cerr << "Received: " << ossia::value_to_pretty_string(val) << "\n";
});
auto& bar = ossia::create_parameter(device.get_root_node(), "/bar", "string")
->set_access(ossia::access_mode::SET);

do_post(ctx->context, bar);
ctx->run();
}
10 changes: 5 additions & 5 deletions examples/Network/joystick/joystick.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ int main()
fmt::print("Using joystick {}\n", ossia::net::joystick_info::get_joystick_name(0));

auto ctx = ossia::net::create_network_context();
ossia::net::generic_device source_dev{
std::make_unique<ossia::net::multiplex_protocol>(
std::make_unique<ossia::net::joystick_protocol>(ctx, 0, 0),
std::make_unique<ossia::oscquery::oscquery_server_protocol>(5579, 5589)),
"joystick"};
auto proto = std::make_unique<ossia::net::multiplex_protocol>();
proto->expose_to(std::make_unique<ossia::net::joystick_protocol>(ctx, 0, 0));
proto->expose_to(
std::make_unique<ossia::oscquery::oscquery_server_protocol>(5579, 5589));
ossia::net::generic_device source_dev{std::move(proto), "joystick"};
source_dev.set_echo(true);

auto on_message = [](const ossia::net::parameter_base& param) {
Expand Down
10 changes: 10 additions & 0 deletions src/ossia/network/base/protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,14 @@ std::future<void> protocol_base::pull_async(parameter_base&)
{
return {};
}

bool protocol_base::publish(const parameter_base&)
{
return false;
}

bool protocol_base::unpublish(const parameter_base&)
{
return false;
}
}
16 changes: 16 additions & 0 deletions src/ossia/network/base/protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,22 @@ class OSSIA_EXPORT protocol_base
*/
virtual bool observe(parameter_base&, bool) = 0;

/**
* @brief Notify the network that a parameter is to be published
*
* In some protocols (MQTT), this may send a message to the broker to indicate
* that a new topic is being made available for publication.
*/
virtual bool publish(const parameter_base&);

/**
* @brief Notify the network that a parameter is to be removed
*
* In some protocols (MQTT), this may send a message to the broker to indicate
* that a previously published topic is disappearing.
*/
virtual bool unpublish(const parameter_base&);

/**
* @brief Begin observation without notifying the other computers.
*/
Expand Down
224 changes: 224 additions & 0 deletions src/ossia/protocols/mqtt/mqtt_protocol.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
#include "mqtt_protocol.hpp"

#include <ossia/network/base/device.hpp>

#include <boost/asio/as_tuple.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/use_awaitable.hpp>

#include <async_mqtt5.hpp>

#include <coroutine>

namespace ossia::net
{
static constexpr auto use_nothrow_awaitable
= boost::asio::as_tuple(boost::asio::use_awaitable);
struct mqtt5_client
{
async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket> client;
explicit mqtt5_client(boost::asio::io_context& ioc)
: client{ioc}
{
}

boost::asio::awaitable<bool> publish(ossia::net::parameter_base& param)
{
auto&& [ec, rc, props]
= co_await client.async_publish<async_mqtt5::qos_e::at_least_once>(
param.get_node().osc_address(), ossia::convert<std::string>(param.value()),
async_mqtt5::retain_e::no, async_mqtt5::publish_props{},
use_nothrow_awaitable);

co_return ec || !rc;
}

boost::asio::awaitable<bool> subscribe(const std::string& topic)
{
// Configure the request to subscribe to a Topic.
async_mqtt5::subscribe_topic sub_topic = async_mqtt5::subscribe_topic{
topic, async_mqtt5::subscribe_options{
async_mqtt5::qos_e::exactly_once, async_mqtt5::no_local_e::no,
async_mqtt5::retain_as_published_e::retain,
async_mqtt5::retain_handling_e::send}};

auto&& [ec, sub_codes, sub_props] = co_await client.async_subscribe(
sub_topic, async_mqtt5::subscribe_props{}, use_nothrow_awaitable);

co_return !ec && !sub_codes[0];
}

static void dump_props(const auto& publish_props)
{
publish_props.visit([]<typename Key>(const Key& x, const auto& v) -> bool {
if constexpr(requires { v.has_value(); })
{
if(v.has_value())
std::cerr << "prop " << (int)Key::value << " : " << *v << "\n";
else
std::cerr << "prop " << (int)Key::value << " (unset) \n";
}
else if constexpr(requires { v.size(); })
{
std::cerr << "arr prop (" << v.size() << "): [ ";
for(size_t i = 0; i < v.size(); i++)
{
if constexpr(requires {
v[i].first;
v[i].second;
})
std::cerr << "(" << v[i].first << ", " << v[i].second << ")";
else
std::cerr << v[i];
if(i + 1 < v.size())
std::cerr << ", ";
}
std::cerr << "]\n";
}
return true;
});
}

boost::asio::awaitable<void> subscribe_and_receive(ossia::net::parameter_base& param)
{
// subscribe
if(!(co_await subscribe(param.get_node().osc_address())))
co_return;

// receive
for(;;)
{
auto&& [ec, topic, payload, publish_props]
= co_await client.async_receive(use_nothrow_awaitable);

if(ec == async_mqtt5::client::error::session_expired)
{
if(co_await subscribe(topic))
continue;
else
break;
}
else if(ec)
break;

param.push_value(ossia::convert(payload, param.get_value_type()));
}

co_return;
}
};

struct mqtt5_protocol::subscribe_state
{
boost::asio::cancellation_signal cancellation;
enum
{
created,
operating,
cancelled
} state{created};
};

mqtt5_protocol::mqtt5_protocol(network_context_ptr ctx, const mqtt5_configuration& conf)
: m_context{ctx}
, m_conf{conf}
, m_client{std::make_unique<mqtt5_client>(m_context->context)}
{
}

mqtt5_protocol::~mqtt5_protocol() { }

bool mqtt5_protocol::pull(ossia::net::parameter_base&)
{
return false;
}

bool mqtt5_protocol::push(const ossia::net::parameter_base& p, const ossia::value& v)
{
co_spawn(
m_context->context, m_client->publish(const_cast<ossia::net::parameter_base&>(p)),
boost::asio::detached);
return false;
}

bool mqtt5_protocol::push_raw(const ossia::net::full_parameter_data&)
{
return false;
}

bool mqtt5_protocol::observe(ossia::net::parameter_base&, bool)
{
return false;
}

void mqtt5_protocol::on_new_param(const ossia::net::parameter_base& param)
{
// FIXME we should create a "safe" data object copied to the thread there
boost::asio::post([&param, this] {
switch(param.get_access())
{
case ossia::access_mode::GET:
on_subscribe(param);
break;
case ossia::access_mode::SET:
// Nothing to do. publishing is just "push".
break;
case ossia::access_mode::BI:
// does not make sense with MQTT...
// let's try to subscribe anyways as most likely we want to read some sensor
on_subscribe(param);
break;
}
});
}

void mqtt5_protocol::on_removed_param(const ossia::net::parameter_base& p)
{
on_unsubscribe(p);
}

void mqtt5_protocol::on_subscribe(const ossia::net::parameter_base& p)
{
m_subscriptions.visit(&p, [this, &p](auto& res) {
subscribe_state& sub = res.second;

if(std::exchange(sub.state, sub.operating) == sub.operating)
return;

co_spawn(
m_context->context,
m_client->subscribe_and_receive(const_cast<ossia::net::parameter_base&>(p)),
boost::asio::bind_cancellation_slot(
sub.cancellation.slot(), boost::asio::detached));
});
}

void mqtt5_protocol::on_unsubscribe(const ossia::net::parameter_base& p)
{
m_subscriptions.visit(&p, [this, &p](auto& res) {
subscribe_state& sub = res.second;
if(std::exchange(sub.state, sub.cancelled) == sub.operating)
sub.cancellation.emit(boost::asio::cancellation_type::all);
});
}

bool mqtt5_protocol::update(ossia::net::node_base& node_base)
{
return false;
}

void mqtt5_protocol::set_device(device_base& dev)
{
dev.on_parameter_created.connect<&mqtt5_protocol::on_new_param>(*this);
dev.on_parameter_removing.connect<&mqtt5_protocol::on_removed_param>(*this);

// Connect to the broker
m_client->client.brokers(m_conf.host, m_conf.port).async_run(boost::asio::detached);

// Publish all existing nodes
ossia::net::iterate_all_children(
&dev.get_root_node(),
[this](ossia::net::parameter_base& param) { on_new_param(param); });
}
}
Loading

0 comments on commit f6eb9f8

Please sign in to comment.