Skip to content

Commit

Permalink
[mqtt] Finish implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
jcelerier committed May 18, 2024
1 parent 7bef967 commit c842977
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 47 deletions.
1 change: 1 addition & 0 deletions src/ossia-config.hpp.in
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#cmakedefine OSSIA_PROTOCOL_JOYSTICK
#cmakedefine OSSIA_PROTOCOL_WIIMOTE
#cmakedefine OSSIA_PROTOCOL_ARTNET
#cmakedefine OSSIA_PROTOCOL_MQTT5

// Additional features
#cmakedefine OSSIA_DNSSD
Expand Down
213 changes: 175 additions & 38 deletions src/ossia/protocols/mqtt/mqtt_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,81 @@
#include <boost/asio/as_tuple.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/use_future.hpp>
#include <boost/beast.hpp>

#include <QDebug>

#include <async_mqtt5.hpp>

#include <coroutine>

namespace ossia::net
{
using topics_type
= boost::unordered::concurrent_flat_map<std::string, ossia::net::parameter_base*>;
using strand_type
= decltype(boost::asio::make_strand(std::declval<boost::asio::io_context&>()));

static constexpr auto use_nothrow_awaitable
= boost::asio::as_tuple(boost::asio::use_awaitable);
struct mqtt5_client
struct mqtt5_client_base
{
virtual ~mqtt5_client_base() = default;
virtual void connect(const ossia::net::mqtt5_configuration& m_conf) = 0;
virtual boost::asio::awaitable<bool> publish(ossia::net::parameter_base& param) = 0;
virtual boost::asio::awaitable<bool> subscribe(std::string topic) = 0;
virtual boost::asio::awaitable<bool> unsubscribe(std::string topic) = 0;
virtual boost::asio::awaitable<void> receive(mqtt5_protocol& self) = 0;
virtual boost::asio::awaitable<bool> learn(mqtt5_protocol& self, bool) = 0;
};

using mqtt_tcp_socket = boost::asio::ip::tcp::socket;
using mqtt_ws_socket = boost::beast::websocket::stream<boost::asio::ip::tcp::socket>;
template <typename Socket = boost::asio::ip::tcp::socket>
struct mqtt5_client : mqtt5_client_base
{
async_mqtt5::mqtt_client<boost::asio::ip::tcp::socket> client;
explicit mqtt5_client(boost::asio::io_context& ioc)
: client{ioc}
async_mqtt5::mqtt_client<Socket> client;
explicit mqtt5_client(strand_type& strand)
: client{strand}
{
}

boost::asio::awaitable<bool> publish(ossia::net::parameter_base& param)
void connect(const ossia::net::mqtt5_configuration& m_conf) override
{
if constexpr(std::is_same_v<Socket, mqtt_tcp_socket>)
{
auto& conf = ossia::get<tcp_configuration>(m_conf.transport);
client.brokers(conf.host, conf.port).async_run(boost::asio::detached);
}
else if constexpr(std::is_same_v<Socket, mqtt_ws_socket>)
{
auto& conf = ossia::get<ws_client_configuration>(m_conf.transport);
std::string host = conf.url;
uint16_t default_port = 1883;
client.brokers(host, default_port).async_run(boost::asio::detached);
}
else
{
static_assert(std::is_same_v<Socket*, void>);
}
}

boost::asio::awaitable<bool> publish(ossia::net::parameter_base& param) override
{
co_return false;
auto&& [ec, rc, props]
= co_await client.async_publish<async_mqtt5::qos_e::at_least_once>(
= co_await client.template async_publish<async_mqtt5::qos_e::at_least_once>(
param.get_node().osc_address(), ossia::convert<std::string>(param.value()),
async_mqtt5::retain_e::no, async_mqtt5::publish_props{},
use_nothrow_awaitable);

co_return ec || !rc;
}

boost::asio::awaitable<bool> subscribe(const std::string& topic)
boost::asio::awaitable<bool> subscribe(std::string topic) override
{
// Configure the request to subscribe to a Topic.
async_mqtt5::subscribe_topic sub_topic = async_mqtt5::subscribe_topic{
Expand All @@ -48,6 +93,13 @@ struct mqtt5_client

co_return !ec && !sub_codes[0];
}
boost::asio::awaitable<bool> unsubscribe(std::string topic) override
{
auto&& [ec, sub_codes, sub_props] = co_await client.async_unsubscribe(
topic, async_mqtt5::unsubscribe_props{}, use_nothrow_awaitable);

co_return !ec && !sub_codes[0];
}

static void dump_props(const auto& publish_props)
{
Expand Down Expand Up @@ -80,12 +132,8 @@ struct mqtt5_client
});
}

boost::asio::awaitable<void> subscribe_and_receive(ossia::net::parameter_base& param)
boost::asio::awaitable<void> receive(mqtt5_protocol& self) override
{
// subscribe
if(!(co_await subscribe(param.get_node().osc_address())))
co_return;

// receive
for(;;)
{
Expand All @@ -102,11 +150,16 @@ struct mqtt5_client
else if(ec)
break;

param.push_value(ossia::convert(payload, param.get_value_type()));
self.on_message(topic, payload);
}

co_return;
}

boost::asio::awaitable<bool> learn(mqtt5_protocol& self, bool v) override
{
return v ? subscribe("#") : unsubscribe("#");
}
};

struct mqtt5_protocol::subscribe_state
Expand All @@ -120,11 +173,30 @@ struct mqtt5_protocol::subscribe_state
} state{created};
};

static auto make_mqtt_client(const mqtt5_configuration& conf, strand_type& strand)
{
struct
{
strand_type& strand;
std::unique_ptr<mqtt5_client_base> operator()(const tcp_configuration& tcp)
{
return std::make_unique<mqtt5_client<mqtt_tcp_socket>>(strand);
}
std::unique_ptr<mqtt5_client_base> operator()(const ws_client_configuration& tcp)
{
return std::make_unique<mqtt5_client<mqtt_ws_socket>>(strand);
}
} vis{strand};

return visit(vis, conf.transport);
}

mqtt5_protocol::mqtt5_protocol(network_context_ptr ctx, const mqtt5_configuration& conf)
: m_context{ctx}
, m_conf{conf}
, m_client{std::make_unique<mqtt5_client>(m_context->context)}
, m_strand{boost::asio::make_strand(m_context->context)}
{
m_client = make_mqtt_client(conf, m_strand);
}

mqtt5_protocol::~mqtt5_protocol() { }
Expand All @@ -137,7 +209,7 @@ bool mqtt5_protocol::pull(ossia::net::parameter_base&)
bool mqtt5_protocol::push(const ossia::net::parameter_base& p, const ossia::value& v)
{
co_spawn(
m_context->context, m_client->publish(const_cast<ossia::net::parameter_base&>(p)),
m_strand, m_client->publish(const_cast<ossia::net::parameter_base&>(p)),
boost::asio::detached);
return false;
}
Expand All @@ -155,7 +227,10 @@ bool mqtt5_protocol::observe(ossia::net::parameter_base&, bool)
void mqtt5_protocol::on_new_param(const ossia::net::parameter_base& param)
{
// FIXME we should create a "safe" data object copied to the thread there
boost::asio::post([&param, this] {
boost::asio::post(boost::asio::bind_executor(
m_strand, [&param = const_cast<ossia::net::parameter_base&>(param), this]() {
if(!m_client)
return;
switch(param.get_access())
{
case ossia::access_mode::GET:
Expand All @@ -170,39 +245,26 @@ void mqtt5_protocol::on_new_param(const ossia::net::parameter_base& param)
on_subscribe(param);
break;
}
});
}));
}

void mqtt5_protocol::on_removed_param(const ossia::net::parameter_base& p)
{
if(!m_client)
return;
on_unsubscribe(p);
}

void mqtt5_protocol::on_subscribe(const ossia::net::parameter_base& p)
void mqtt5_protocol::on_subscribe(ossia::net::parameter_base& p)
{
m_subscriptions.visit(&p, [this, &p](auto& res) {
subscribe_state& sub = res.second;

if(std::exchange(sub.state, subscribe_state::operating)
== subscribe_state::operating)
return;

co_spawn(
m_context->context,
m_client->subscribe_and_receive(const_cast<ossia::net::parameter_base&>(p)),
boost::asio::bind_cancellation_slot(
sub.cancellation.slot(), boost::asio::detached));
});
m_topics.try_emplace(p.get_node().osc_address(), &p);
co_spawn(
m_strand, m_client->subscribe(p.get_node().osc_address()), boost::asio::detached);
}

void mqtt5_protocol::on_unsubscribe(const ossia::net::parameter_base& p)
{
m_subscriptions.visit(&p, [this, &p](auto& res) {
subscribe_state& sub = res.second;
if(std::exchange(sub.state, subscribe_state::cancelled)
== subscribe_state::operating)
sub.cancellation.emit(boost::asio::cancellation_type::all);
});
m_topics.erase(p.get_node().osc_address());
}

bool mqtt5_protocol::update(ossia::net::node_base& node_base)
Expand All @@ -212,15 +274,90 @@ bool mqtt5_protocol::update(ossia::net::node_base& node_base)

void mqtt5_protocol::set_device(device_base& dev)
{
m_device = &dev;
dev.on_parameter_created.connect<&mqtt5_protocol::on_new_param>(*this);
dev.on_parameter_removing.connect<&mqtt5_protocol::on_removed_param>(*this);

// Connect to the broker
m_client->client.brokers(m_conf.host, m_conf.port).async_run(boost::asio::detached);
m_client->connect(m_conf);

// Publish all existing nodes
ossia::net::iterate_all_children(
&dev.get_root_node(),
[this](ossia::net::parameter_base& param) { on_new_param(param); });

m_root = std::make_shared<subscribe_state>();
co_spawn(
m_strand, m_client->receive(*this),
boost::asio::bind_cancellation_slot(
m_root->cancellation.slot(), boost::asio::detached));
}

void mqtt5_protocol::set_learning(bool v)
{
can_learn::set_learning(v);
co_spawn(m_strand, m_client->learn(*this, v), boost::asio::detached);
}

void mqtt5_protocol::on_message(const std::string& topic, const std::string& payload)
{
if(learning())
{
[[unlikely]];
on_learn(topic, payload);
return;
}

m_topics.visit(topic, [&payload = payload](auto& v) {
auto& param = *v.second;
param.set_value(ossia::convert(payload, param.get_value_type()));
});
}

void mqtt5_protocol::on_learn(const std::string& topic, const std::string& payload)
{
assert(this->m_device);
auto* n = &this->m_device->get_root_node();
std::vector<std::string> v = address_parts(topic);
bool is_new = false;
for(const auto& part : v)
{
auto cld = n->find_child(part);
if(cld)
{
n = cld;
}
else
{
// Start adding
n = n->create_child(part);
is_new = true;
}
}

if(!is_new)
{
return;
}
auto addr = n->create_parameter(ossia::val_type::STRING);
addr->set_value(payload);
}

void mqtt5_protocol::stop()
{
assert(m_device);
m_device->on_parameter_created.disconnect<&mqtt5_protocol::on_new_param>(*this);
m_device->on_parameter_removing.disconnect<&mqtt5_protocol::on_removed_param>(*this);

std::future<void> wait = boost::asio::dispatch(
boost::asio::bind_executor(m_strand, std::packaged_task<void()>([this] {
m_client.reset();

m_root->cancellation.emit(boost::asio::cancellation_type::all);
})));
wait.get();

std::future<void> wait2 = boost::asio::dispatch(m_strand, boost::asio::use_future);
wait2.get();
}
}
Loading

0 comments on commit c842977

Please sign in to comment.