diff --git a/CMakeLists.txt b/CMakeLists.txt index 4c4bdf5..98dc9d9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.14) -project(iotic VERSION 0.3.0 LANGUAGES C CXX) +project(iotic VERSION 0.3.1 LANGUAGES C CXX) include(FetchContent) set(FETCHCONTENT_QUIET FALSE) diff --git a/README.md b/README.md index f43656b..b1f0edb 100644 --- a/README.md +++ b/README.md @@ -45,8 +45,8 @@ iotic implements several strategies to realize PC surplus charging of electric v You need at least Bookworm (not working with Bullseye). Installation ``` -wget https://github.com/mincequi/iotic/releases/download/v0.3.0/iotic_0.3.0_armhf.deb -sudo apt install ./iotic_0.3.0_armhf.deb +wget https://github.com/mincequi/iotic/releases/download/v0.3.1/iotic_0.3.1_armhf.deb +sudo apt install ./iotic_0.3.1_armhf.deb ``` Starting ``` diff --git a/src/http/HttpClient.cpp b/src/http/HttpClient.cpp index 19a692d..af188e5 100644 --- a/src/http/HttpClient.cpp +++ b/src/http/HttpClient.cpp @@ -39,8 +39,7 @@ void HttpClient::run(char const* host, char const* port, char const* target) { } void HttpClient::on_resolve(beast::error_code ec, tcp::resolver::results_type results) { - if (ec) - return fail(ec, "resolve"); + if (ec) return fail(ec, "resolve"); // Set a timeout on the operation _stream.expires_after(std::chrono::seconds(30)); @@ -52,8 +51,7 @@ void HttpClient::on_resolve(beast::error_code ec, tcp::resolver::results_type re } void HttpClient::on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type) { - if (ec) - return fail(ec, "connect"); + if (ec) return fail(ec, "connect"); // Set a timeout on the operation _stream.expires_after(std::chrono::seconds(30)); @@ -63,16 +61,14 @@ void HttpClient::on_connect(beast::error_code ec, tcp::resolver::results_type::e } void HttpClient::on_write(beast::error_code ec, std::size_t /*bytes_transferred*/) { - if (ec) - return fail(ec, "write"); + if (ec) return fail(ec, "write"); // Receive the HTTP response http::async_read(_stream, _buffer, _response, beast::bind_front_handler(&HttpClient::on_read, shared_from_this())); } void HttpClient::on_read(beast::error_code ec, std::size_t /*bytes_transferred*/) { - if(ec) - return fail(ec, "read"); + if (ec) return fail(ec, "read"); // Write the message to standard out std::cout << _response << std::endl; diff --git a/src/main.cpp b/src/main.cpp index 9d03d1f..b3eb6c8 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -41,7 +41,7 @@ int main(int argc, char *argv[]) { QTimer timer; QObject::connect(&timer, &QTimer::timeout, []() { uvw::loop::get_default()->run(uvw::loop::run_mode::NOWAIT); - modbus::ModbusDiscovery::ioc.poll(); + //modbus::ModbusDiscovery::ioc.poll(); }); timer.start(100ms); diff --git a/src/modbus/CMakeLists.txt b/src/modbus/CMakeLists.txt index a3bf817..1b40588 100644 --- a/src/modbus/CMakeLists.txt +++ b/src/modbus/CMakeLists.txt @@ -1,5 +1,7 @@ add_library(modbus STATIC ModbusDiscovery.cpp + ModbusThing.cpp + #ModbusThingDecorator.cpp ) target_link_libraries(modbus diff --git a/src/modbus/ModbusDiscovery.cpp b/src/modbus/ModbusDiscovery.cpp index a63dea4..c720aa8 100644 --- a/src/modbus/ModbusDiscovery.cpp +++ b/src/modbus/ModbusDiscovery.cpp @@ -2,23 +2,18 @@ #include #include -#include #include #include +#include #include -#include -#include namespace modbus { using namespace std::placeholders; using namespace sunspec; -asio::io_context ModbusDiscovery::ioc; - -ModbusDiscovery::ModbusDiscovery() : - _httpClient(std::make_shared(ioc)) { +ModbusDiscovery::ModbusDiscovery() { _discoveryTimer.callOnTimeout(this, &ModbusDiscovery::onStartDiscovering); } @@ -48,7 +43,6 @@ void ModbusDiscovery::onStartDiscovering() { } // Scan subnets - QTcpSocket socket; LOG_S(INFO) << "discovering things in subnet: " << subnet << "0/24"; for (uint8_t i = 1; i < 255; ++i) { const QString host = subnet + QString::number(i); @@ -58,21 +52,22 @@ void ModbusDiscovery::onStartDiscovering() { //_httpClient->connect(host.toStdString(), 502); auto candidate = std::make_unique(ThingInfo{ThingInfo::SunSpec, host.toStdString(), host.toStdString()}); - auto sub = candidate->state().subscribe(std::bind(&ModbusDiscovery::onCandidateStateChanged, this, candidate.get(), _1)); - candidate->connectDevice(); + //auto candidate = std::make_unique(ThingInfo{ThingInfo::SunSpec, host.toStdString(), host.toStdString()}); + auto sub = candidate->stateObservable().subscribe(std::bind(&ModbusDiscovery::onCandidateStateChanged, this, candidate.get(), _1)); + candidate->connect(); _candidates.push_back({std::move(candidate), sub}); } } -void ModbusDiscovery::onCandidateStateChanged(const SunSpecThing* candidate_, SunSpecThing::State state) { +void ModbusDiscovery::onCandidateStateChanged(const SunSpecThing* candidate_, Thing::State state) { switch (state) { - case SunSpecThing::State::Failed: + case Thing::State::Failed: // TODO: do we actually need to unsubscribe? _candidates.remove_if([&](const auto& c) { return c.first.get() == candidate_; }); break; - case SunSpecThing::State::Ready: + case Thing::State::Ready: { // Steal candidate from container auto it = std::find_if(_candidates.begin(), _candidates.end(), [&](const auto& c) { return c.first.get() == candidate_; @@ -103,6 +98,9 @@ void ModbusDiscovery::onCandidateStateChanged(const SunSpecThing* candidate_, Su thingDiscoveredSubscriber().on_next(thing); break; } + default: + break; + } } } // namespace modbus diff --git a/src/modbus/ModbusDiscovery.h b/src/modbus/ModbusDiscovery.h index b854de8..9f13409 100644 --- a/src/modbus/ModbusDiscovery.h +++ b/src/modbus/ModbusDiscovery.h @@ -4,7 +4,6 @@ #include -#include #include #include @@ -20,16 +19,12 @@ class ModbusDiscovery : public QObject, public ThingsDiscovery { void start(int msec) override; void stop() override; - static asio::io_context ioc; - private: void onStartDiscovering(); - void onCandidateStateChanged(const sunspec::SunSpecThing* thing, sunspec::SunSpecThing::State state); + void onCandidateStateChanged(const sunspec::SunSpecThing* thing, Thing::State state); QTimer _discoveryTimer; std::list, rpp::composite_subscription>> _candidates; - - std::shared_ptr _httpClient; }; } // namespace modbus diff --git a/src/modbus/ModbusThing.cpp b/src/modbus/ModbusThing.cpp new file mode 100644 index 0000000..e610ec4 --- /dev/null +++ b/src/modbus/ModbusThing.cpp @@ -0,0 +1,63 @@ +#include "ModbusThing.h" + +#include + +#include + +using namespace std::placeholders; + +ModbusThing::ModbusThing(const ThingInfo& info) : + Thing{info}, + _modbusClient(new QModbusTcpClient) { + + _modbusClient->connect(_modbusClient, &QModbusTcpClient::stateChanged, std::bind(&ModbusThing::onStateChanged, this, _1)); + _modbusClient->connect(_modbusClient, &QModbusTcpClient::errorOccurred, std::bind(&ModbusThing::onErrorOccurred, this, _1)); + + _modbusClient->setConnectionParameter(QModbusDevice::NetworkPortParameter, 502); + _modbusClient->setConnectionParameter(QModbusDevice::NetworkAddressParameter, QString::fromStdString(info.host())); + + _modbusClient->setTimeout(5000); + _modbusClient->setNumberOfRetries(2); +} + +ModbusThing::~ModbusThing() { + _modbusClient->disconnect(); + _modbusClient->deleteLater(); +} + +bool ModbusThing::connect() { + return _modbusClient->connectDevice(); +} + +void ModbusThing::disconnect() { + return _modbusClient->disconnectDevice(); +} + +void ModbusThing::doRead() { +}; + +void ModbusThing::onStateChanged(QModbusDevice::State state_) { + if (state_ == QModbusDevice::State::ConnectedState && state() == Thing::State::Uninitialized) { + LOG_S(INFO) << "host found: " << _modbusClient->connectionParameter(QModbusDevice::NetworkAddressParameter).toString(); + stateSubscriber().on_next(State::Ready); + //pollNextUnitId(); + } else if (state_ == QModbusDevice::State::UnconnectedState && state() != Thing::State::Uninitialized) { + // Elgris smart meters disconnects after 10s. So, we automatically reconnect. + LOG_S(WARNING) << id() << "> disconnected. Reconnecting..."; + connect(); + } +} + +void ModbusThing::onErrorOccurred(QModbusDevice::Error error) { + if (error == QModbusDevice::Error::NoError) { + return; + } + + if (state() == Thing::State::Uninitialized) { + stateSubscriber().on_next(State::Failed); + return; + } + + // We filter for connection error, because remotes might hang up. + LOG_IF_S(WARNING, error != QModbusDevice::Error::ConnectionError) << host() << "> error occured: " << _modbusClient->errorString().toStdString(); +} diff --git a/src/modbus/ModbusThing.h b/src/modbus/ModbusThing.h new file mode 100644 index 0000000..dfb82fe --- /dev/null +++ b/src/modbus/ModbusThing.h @@ -0,0 +1,24 @@ +#pragma once + +#include + +#include + +class QModbusTcpClient; + +class ModbusThing : public Thing { +public: + ModbusThing(const ThingInfo& info); + virtual ~ModbusThing(); + + bool connect(); + void disconnect(); + +private: + virtual void doRead() override; + + void onStateChanged(QModbusDevice::State state); + void onErrorOccurred(QModbusDevice::Error error); + + QModbusTcpClient* _modbusClient; +}; diff --git a/src/things/Thing.cpp b/src/things/Thing.cpp index faede38..fc1fe7b 100644 --- a/src/things/Thing.cpp +++ b/src/things/Thing.cpp @@ -70,9 +70,24 @@ dynamic_observable> Thing::properties() const { return _propertiesObservable; } -dynamic_observable Thing::state() const { +dynamic_observable Thing::stateObservable() const { return _stateSubject.get_observable(); } void Thing::onRead(const std::string&) { } + +void Thing::doSetProperty(MutableProperty, const Value&) { +} + +Thing::State Thing::state() const { + return _stateSubject.get_value(); +} + +dynamic_subscriber Thing::stateSubscriber() const { + return _stateSubject.get_subscriber().as_dynamic(); +} + +dynamic_subscriber> Thing::propertiesSubscriber() const { + return _propertiesSubject.get_subscriber().as_dynamic(); +} diff --git a/src/things/Thing.h b/src/things/Thing.h index 36c150e..035a0ce 100644 --- a/src/things/Thing.h +++ b/src/things/Thing.h @@ -19,6 +19,7 @@ class Thing : public ThingInfo { }; enum class State { + Uninitialized, Ready, Failed }; @@ -33,23 +34,24 @@ class Thing : public ThingInfo { // If thing does not fire updates itself, this can be called to trigger it. void read(); + dynamic_observable stateObservable() const; + void setProperty(MutableProperty property, const Value& value); const std::map& mutableProperties() const; - - /** - * @brief properties - * @return - */ dynamic_observable> properties() const; - dynamic_observable state() const; - protected: + State state() const; + dynamic_subscriber stateSubscriber() const; + dynamic_subscriber> propertiesSubscriber() const; + + Type _type = Type::Undefined; + +private: virtual void doRead() = 0; virtual void onRead(const std::string& response); - virtual void doSetProperty(MutableProperty property, const Value& value) = 0; + virtual void doSetProperty(MutableProperty property, const Value& value); - Type _type = Type::Undefined; uint16_t _materialIcon = 0; // TODO: do not store persistent properties here. Use Config as single source of truth. @@ -57,7 +59,7 @@ class Thing : public ThingInfo { publish_subject> _propertiesSubject; dynamic_observable> _propertiesObservable; - publish_subject _stateSubject; + behavior_subject _stateSubject = State::Uninitialized; friend class ThingsRepository; }; diff --git a/src/things/ThingsRepository.cpp b/src/things/ThingsRepository.cpp index 4be9ae9..9f503d1 100644 --- a/src/things/ThingsRepository.cpp +++ b/src/things/ThingsRepository.cpp @@ -32,7 +32,7 @@ void ThingsRepository::addThing(ThingPtr&& thing) { _things.push_back(std::move(thing)); _thingAdded.get_subscriber().on_next(_things.back()); - _things.back()->state().subscribe([this, id](auto state) { + _things.back()->stateObservable().subscribe([this, id](auto state) { if (state == Thing::State::Failed) { LOG_S(WARNING) << "thing completed: " << id; // We must not directly delete this thing because thing itself might still process something. diff --git a/src/things/goe/GoeCharger.cpp b/src/things/goe/GoeCharger.cpp index 8a0b60c..23601b1 100644 --- a/src/things/goe/GoeCharger.cpp +++ b/src/things/goe/GoeCharger.cpp @@ -66,7 +66,7 @@ void GoeCharger::onRead(const std::string& response) { _status = goe::toStatus(doc["car"].toInt()); - _propertiesSubject.get_subscriber().on_next({ + propertiesSubscriber().on_next({ { Property::status, static_cast(_status) }, { Property::power, nrg.at(11).toDouble() }, { Property::voltage, voltage } diff --git a/src/things/http/CMakeLists.txt b/src/things/http/CMakeLists.txt index 2d69d3a..4631e9e 100644 --- a/src/things/http/CMakeLists.txt +++ b/src/things/http/CMakeLists.txt @@ -46,5 +46,7 @@ PRIVATE target_link_libraries(httpthing PUBLIC common + goe qmdnsengine + shelly ) diff --git a/src/things/shelly/Shelly.cpp b/src/things/shelly/Shelly.cpp index 15a6eac..1dfb62f 100644 --- a/src/things/shelly/Shelly.cpp +++ b/src/things/shelly/Shelly.cpp @@ -61,6 +61,6 @@ void Shelly::onRead(const std::string& response) { } if (!properties.empty()) { - _propertiesSubject.get_subscriber().on_next(properties); + propertiesSubscriber().on_next(properties); } } diff --git a/src/things/sunspec/CMakeLists.txt b/src/things/sunspec/CMakeLists.txt index 0a82df5..18d891f 100644 --- a/src/things/sunspec/CMakeLists.txt +++ b/src/things/sunspec/CMakeLists.txt @@ -1,7 +1,7 @@ add_library(sunspec STATIC SunSpecBlock.cpp SunSpecDataPoint.cpp - SunSpecFactory.cpp + SunSpecDiscovery.cpp SunSpecLogger.cpp SunSpecManager.cpp SunSpecMeasuredValue.cpp diff --git a/src/things/sunspec/SunSpecDiscovery.cpp b/src/things/sunspec/SunSpecDiscovery.cpp new file mode 100644 index 0000000..057fd31 --- /dev/null +++ b/src/things/sunspec/SunSpecDiscovery.cpp @@ -0,0 +1,10 @@ +#include "SunSpecDiscovery.h" + +SunSpecDiscovery::SunSpecDiscovery() { +} + +void SunSpecDiscovery::start(int msec) { +} + +void SunSpecDiscovery::stop() { +} diff --git a/src/things/sunspec/SunSpecFactory.h b/src/things/sunspec/SunSpecDiscovery.h similarity index 59% rename from src/things/sunspec/SunSpecFactory.h rename to src/things/sunspec/SunSpecDiscovery.h index 716892f..ee16540 100644 --- a/src/things/sunspec/SunSpecFactory.h +++ b/src/things/sunspec/SunSpecDiscovery.h @@ -1,8 +1,8 @@ #include -class SunSpecFactory : public ThingsDiscovery { +class SunSpecDiscovery : public ThingsDiscovery { public: - SunSpecFactory(); + SunSpecDiscovery(); void start(int msec) override; void stop() override; diff --git a/src/things/sunspec/SunSpecFactory.cpp b/src/things/sunspec/SunSpecFactory.cpp deleted file mode 100644 index 0fe3556..0000000 --- a/src/things/sunspec/SunSpecFactory.cpp +++ /dev/null @@ -1,10 +0,0 @@ -#include "SunSpecFactory.h" - -SunSpecFactory::SunSpecFactory() { -} - -void SunSpecFactory::start(int msec) { -} - -void SunSpecFactory::stop() { -} diff --git a/src/things/sunspec/SunSpecThing.cpp b/src/things/sunspec/SunSpecThing.cpp index cd2ce2a..22ad4ee 100644 --- a/src/things/sunspec/SunSpecThing.cpp +++ b/src/things/sunspec/SunSpecThing.cpp @@ -65,11 +65,11 @@ const std::map>& SunSpecThing::models() return _modelAddresses; } -bool SunSpecThing::connectDevice() { +bool SunSpecThing::connect() { return _modbusClient->connectDevice(); } -void SunSpecThing::disconnectDevice() { +void SunSpecThing::disconnect() { return _modbusClient->disconnectDevice(); } @@ -89,10 +89,6 @@ void SunSpecThing::reset() { _models.clear(); } -dynamic_observable SunSpecThing::state() const { - return _stateSubject.get_observable(); -} - void SunSpecThing::doRead() { for (const auto& kv : _modelAddresses) { switch (kv.first) { @@ -101,7 +97,7 @@ void SunSpecThing::doRead() { case Model::Id::InverterSplitPhase: case Model::Id::MeterWyeConnectThreePhase: if (!readModel(kv.first, QDateTime::currentSecsSinceEpoch())) { - _stateSubject.get_subscriber().on_next(State::Failed); + stateSubscriber().on_next(State::Failed); return; } break; @@ -111,9 +107,6 @@ void SunSpecThing::doRead() { } } -void SunSpecThing::doSetProperty(MutableProperty, const Value&) { -} - uint8_t SunSpecThing::nextUnitId() { static const std::vector ids = { //// Primary Devices @@ -189,8 +182,7 @@ uint8_t SunSpecThing::nextUnitId() { void SunSpecThing::pollNextUnitId() { const uint8_t id = nextUnitId(); if (id == 0) { - //emit stateChanged(State::Failed); - _stateSubject.get_subscriber().on_next(State::Failed); + stateSubscriber().on_next(State::Failed); return; } @@ -202,11 +194,11 @@ void SunSpecThing::readHeader(uint8_t id) { auto* reply = _modbusClient->sendReadRequest(dataUnit, id); if (!reply) { //emit stateChanged(State::Failed); - _stateSubject.get_subscriber().on_next(State::Failed); + stateSubscriber().on_next(State::Failed); } else if (reply->isFinished()) { // broadcast replies return immediately delete reply; //emit stateChanged(State::Failed); - _stateSubject.get_subscriber().on_next(State::Failed); + stateSubscriber().on_next(State::Failed); } else { reply->connect(reply, &QModbusReply::finished, std::bind(&SunSpecThing::onReadHeader, this, reply)); } @@ -216,7 +208,7 @@ void SunSpecThing::onReadHeader(QModbusReply* reply) { //auto reply = qobject_cast(sender()); if (!reply) { //emit stateChanged(State::Failed); - _stateSubject.get_subscriber().on_next(State::Failed); + stateSubscriber().on_next(State::Failed); return; } @@ -234,13 +226,13 @@ void SunSpecThing::onReadHeader(QModbusReply* reply) { } else { LOG_S(INFO) << "no SunSpec header found at host: " << _modbusClient->connectionParameter(QModbusDevice::NetworkAddressParameter).toString();; //emit stateChanged(State::Failed); - _stateSubject.get_subscriber().on_next(State::Failed); + stateSubscriber().on_next(State::Failed); } } else { // TODO: we have a crash here. This call arrives, although _modbusClient is already deleted. LOG_S(WARNING) << host().toStdString() << "> reply error: " << reply->error(); //emit stateChanged(State::Failed); - _stateSubject.get_subscriber().on_next(State::Failed); + stateSubscriber().on_next(State::Failed); } reply->disconnect(); @@ -252,11 +244,11 @@ void SunSpecThing::readModelTable(uint16_t address) { auto* reply = _modbusClient->sendReadRequest(dataUnit, _unitId); if (!reply) { //emit stateChanged(State::Failed); - _stateSubject.get_subscriber().on_next(State::Failed); + stateSubscriber().on_next(State::Failed); } else if (reply->isFinished()) { // broadcast replies return immediately delete reply; //emit stateChanged(State::Failed); - _stateSubject.get_subscriber().on_next(State::Failed); + stateSubscriber().on_next(State::Failed); } else { reply->connect(reply, &QModbusReply::finished, std::bind(&SunSpecThing::onReadModelTable, this, reply)); } @@ -266,7 +258,7 @@ void SunSpecThing::onReadModelTable(QModbusReply* reply) { //auto reply = qobject_cast(sender()); if (reply->error() != QModbusDevice::NoError) { //emit stateChanged(State::Failed); - _stateSubject.get_subscriber().on_next(State::Failed); + stateSubscriber().on_next(State::Failed); } else { const QModbusDataUnit unit = reply->result(); addModelAddress(unit.value(0), unit.startAddress() + 2, unit.value(1)); @@ -282,7 +274,7 @@ void SunSpecThing::onReadModelTable(QModbusReply* reply) { } } //emit stateChanged(State::Connected); - _stateSubject.get_subscriber().on_next(State::Ready); + stateSubscriber().on_next(State::Ready); } else { readModelTable(unit.startAddress() + 2 + unit.value(1)); } @@ -346,14 +338,12 @@ void SunSpecThing::onReadBlockError(uint16_t modelId, QModbusReply* reply) { ++_timeoutCount; // Set device as failed if sunSpecId is empty or it timed out 5 times in a row. if (_timeoutCount > 4 || sunSpecId().empty()) { - //emit stateChanged(State::Failed); - _stateSubject.get_subscriber().on_next(State::Failed); + stateSubscriber().on_next(State::Failed); } break; default: LOG_S(WARNING) << sunSpecId() << "> error: " << reply->errorString().toStdString(); - //emit stateChanged(State::Failed); - _stateSubject.get_subscriber().on_next(State::Failed); + stateSubscriber().on_next(State::Failed); break; } } @@ -364,7 +354,8 @@ void SunSpecThing::onStateChanged(QModbusDevice::State state) { pollNextUnitId(); } else if (state == QModbusDevice::State::UnconnectedState && !_sunSpecId.empty()) { // Elgris smart meters disconnects after 10s. So, we automatically reconnect. - connectDevice(); + LOG_S(WARNING) << _sunSpecId << "> disconnected. Reconnecting..."; + connect(); } } @@ -374,8 +365,7 @@ void SunSpecThing::onErrorOccurred(QModbusDevice::Error error) { } if (_sunSpecId.empty()) { - //emit stateChanged(State::Failed); - _stateSubject.get_subscriber().on_next(State::Failed); + stateSubscriber().on_next(State::Failed); return; } @@ -407,8 +397,7 @@ void SunSpecThing::parseModel(uint16_t modelId, const std::vector& buf break; } } - _propertiesSubject.get_subscriber().on_next(props); - //emit modelRead(model, timestamp); + propertiesSubscriber().on_next(props); } } diff --git a/src/things/sunspec/SunSpecThing.h b/src/things/sunspec/SunSpecThing.h index f4a7097..ad33c22 100644 --- a/src/things/sunspec/SunSpecThing.h +++ b/src/things/sunspec/SunSpecThing.h @@ -26,8 +26,8 @@ class SunSpecThing : public Thing { const std::map>& models() const; - bool connectDevice(); - void disconnectDevice(); + bool connect(); + void disconnect(); bool isValid() const; bool hasCommonModel() const; @@ -36,15 +36,8 @@ class SunSpecThing : public Thing { void reset(); - dynamic_observable state() const; - -signals: - //void stateChanged(State state); - //void modelRead(const sunspec::Model& model, uint32_t timestamp); - private: void doRead() override; - void doSetProperty(MutableProperty, const Value&) override; uint8_t nextUnitId(); void pollNextUnitId(); diff --git a/tests/things/TestThing.h b/tests/things/TestThing.h index 590b38e..4cdcfa1 100644 --- a/tests/things/TestThing.h +++ b/tests/things/TestThing.h @@ -13,19 +13,19 @@ class TestThing : public Thing { virtual void doSetProperty(MutableProperty property, const Value& value) override {}; void setProperty(Property property, const Value& value) { - _propertiesSubject.get_subscriber().on_next( {{property, value }} ); + propertiesSubscriber().on_next( {{property, value }} ); } void tick() { static int i = 0; if (i%4 == 0) - _propertiesSubject.get_subscriber().on_next({}); + propertiesSubscriber().on_next({}); if (i%4 == 1) - _propertiesSubject.get_subscriber().on_next({}); + propertiesSubscriber().on_next({}); if (i%4 == 2) - _propertiesSubject.get_subscriber().on_next({}); + propertiesSubscriber().on_next({}); if (i%4 == 3) - _propertiesSubject.get_subscriber().on_completed(); + propertiesSubscriber().on_completed(); ++i; } };