diff --git a/src/service/dashboard/defaultDashboard.flowgraph b/src/service/dashboard/defaultDashboard.flowgraph index 4fdfc32f..6e1f018d 100644 --- a/src/service/dashboard/defaultDashboard.flowgraph +++ b/src/service/dashboard/defaultDashboard.flowgraph @@ -12,7 +12,10 @@ blocks: - name: source for sink 2 id: sink_source - name: remote source 1 - id: http://localhost:8080/GnuRadio/Acquisition?channelNameFilter=test + id: opendigitizer::RemoteSource + parameters: + remote_uri: http://localhost:8080/GnuRadio/Acquisition?channelNameFilter=test + signal_name: test - name: sink 1 id: sink - name: sink 2 @@ -29,6 +32,3 @@ connections: - [sum sigs, 0, sink 2, 0] - [sine source 1, 0, sink 3, 0] - [remote source 1, 0, sink 4, 0] -remote_sources: - - uri: http://localhost:8080/GnuRadio/Acquisition?channelNameFilter=test - signal_name: test diff --git a/src/ui/CMakeLists.txt b/src/ui/CMakeLists.txt index ec211e79..288cf554 100644 --- a/src/ui/CMakeLists.txt +++ b/src/ui/CMakeLists.txt @@ -18,7 +18,6 @@ set(sources dashboard.cpp flowgraph/datasource.cpp flowgraph/datasink.cpp - flowgraph/remotedatasource.cpp flowgraph/arithmetic_block.cpp dashboardpage.cpp opendashboardpage.cpp diff --git a/src/ui/RemoteSource.hpp b/src/ui/RemoteSource.hpp new file mode 100644 index 00000000..faf687c7 --- /dev/null +++ b/src/ui/RemoteSource.hpp @@ -0,0 +1,101 @@ +#ifndef OPENDIGITIZER_REMOTESOURCE_HPP +#define OPENDIGITIZER_REMOTESOURCE_HPP + +#include + +#include + +#include +#include +#include +#include +#include + +namespace opendigitizer { + +template + requires std::is_same_v +struct RemoteSource : public gr::Block> { + gr::PortOut out; + std::string remote_uri; + std::string signal_name; + opencmw::client::RestClient m_client; + + struct Data { + opendigitizer::acq::Acquisition data; + std::size_t read = 0; + }; + + std::deque m_data; + std::mutex m_mutex; + + void append(opendigitizer::acq::Acquisition &&data) { + std::lock_guard lock(m_mutex); + m_data.push_back({ std::move(data), 0 }); + } + + auto processBulk(gr::PublishableSpan auto &output) noexcept { + std::size_t written = 0; + std::lock_guard lock(m_mutex); + while (written < output.size() && !m_data.empty()) { + auto &d = m_data.front(); + auto in = std::span(d.data.channelValue.begin(), d.data.channelValue.end()); + in = in.subspan(d.read, std::min(output.size() - written, in.size() - d.read)); + + std::copy(in.begin(), in.end(), output.begin() + written); + written += in.size(); + d.read += in.size(); + if (d.read == d.data.channelValue.size()) { + m_data.pop_front(); + } + } + output.publish(written); + return gr::work::Status::OK; + } + + void + settingsChanged(const gr::property_map &old_settings, const gr::property_map & /*new_settings*/) { + const auto oldValue = old_settings.find("remote_uri"); + if (oldValue != old_settings.end()) { + const auto oldUri = std::get(oldValue->second); + if (!oldUri.empty()) { + fmt::print("Unsubscribing from {}\n", oldUri); + opencmw::client::Command command; + command.command = opencmw::mdp::Command::Unsubscribe; + command.topic = opencmw::URI<>(remote_uri); + command.callback = [oldUri](const opencmw::mdp::Message &) { + // TODO: Add cleanup once openCMW starts calling the callback + // on successful unsubscribe + fmt::print("Unsubscribed from {} successfully\n", oldUri); + }; + } + } + + opencmw::client::Command command; + command.command = opencmw::mdp::Command::Subscribe; + command.topic = opencmw::URI<>(remote_uri); + fmt::print("Subscribing to {}\n", remote_uri); + + command.callback = [this](const opencmw::mdp::Message &rep) { + if (rep.data.empty()) { + return; + } + try { + auto buf = rep.data; + opendigitizer::acq::Acquisition acq; + opencmw::deserialise(buf, acq); + append(std::move(acq)); + } catch (opencmw::ProtocolException &e) { + fmt::print(std::cerr, "{}\n", e.what()); + return; + } + }; + m_client.request(command); + } +}; + +} // namespace opendigitizer + +ENABLE_REFLECTION_FOR_TEMPLATE(opendigitizer::RemoteSource, out, remote_uri, signal_name) + +#endif diff --git a/src/ui/flowgraph.cpp b/src/ui/flowgraph.cpp index 89f6b2f0..dcec3660 100644 --- a/src/ui/flowgraph.cpp +++ b/src/ui/flowgraph.cpp @@ -9,7 +9,6 @@ #include -#include "flowgraph/remotedatasource.h" #include "yamlutils.h" #include @@ -285,19 +284,9 @@ void FlowGraph::parse(const std::filesystem::path &file) { void FlowGraph::parse(const std::string &str) { clear(); - YAML::Node tree = YAML::Load(str); + YAML::Node tree = YAML::Load(str); - auto rsources = tree["remote_sources"]; - if (rsources && rsources.IsSequence()) { - for (const auto &s : rsources) { - auto uri = s["uri"].as(); - auto signalName = s["signal_name"].as(); - - RemoteDataSource::registerBlockType(this, uri, signalName); - } - } - - auto blocks = tree["blocks"]; + auto blocks = tree["blocks"]; for (const auto &b : blocks) { auto n = b["name"].as(); auto id = b["id"].as(); @@ -307,7 +296,6 @@ void FlowGraph::parse(const std::string &str) { auto type = BlockType::registry().get(id); if (!type) { std::cerr << "Block type '" << id << "' is unkown.\n"; - auto block = std::make_unique(n, id, type); m_blocks.push_back(std::move(block)); continue; @@ -379,7 +367,9 @@ void FlowGraph::clear() { m_sourceBlocks.clear(); m_sinkBlocks.clear(); m_connections.clear(); +#if 0 m_remoteSources.clear(); +#endif } int FlowGraph::save(std::ostream &stream) { @@ -427,21 +417,6 @@ int FlowGraph::save(std::ostream &stream) { } }); } - - if (!m_remoteSources.empty()) { - root.write("remote_sources", [&]() { - YamlSeq sources(out); - for (const auto &s : m_remoteSources) { - YamlMap map(out); - map.write("uri", s.uri); - // we need to save down the name of the signal (and in the future probably other stuff) because when we load - // having to wait for information from the servers about all the remote signals used by the flowgraph would - // not be ideal. Moreover, this way a flowgraph can be loaded even when some signal isn't available at the - // moment. There will be no data but the flowgraph will load correctly anyway. - map.write("signal_name", s.type->outputs[0].name); - } - }); - } } stream << out.c_str(); @@ -568,12 +543,7 @@ void FlowGraph::disconnect(Connection *c) { } void FlowGraph::addRemoteSource(std::string_view uri) { - RemoteDataSource::registerBlockType(this, uri); -} - -void FlowGraph::registerRemoteSource(std::unique_ptr &&type, std::string_view uri) { - m_remoteSources.push_back({ type.get(), std::string(uri) }); - BlockType::registry().addBlockType(std::move(type)); + // TODO create block, etc. } gr::Graph FlowGraph::createGraph() { diff --git a/src/ui/flowgraph.h b/src/ui/flowgraph.h index da114596..9be2d244 100644 --- a/src/ui/flowgraph.h +++ b/src/ui/flowgraph.h @@ -424,7 +424,6 @@ class FlowGraph { int save(std::ostream &stream); void addRemoteSource(std::string_view uri); - void registerRemoteSource(std::unique_ptr &&type, std::string_view uri); std::function sourceBlockAddedCallback; std::function sinkBlockAddedCallback; diff --git a/src/ui/flowgraph/remotedatasource.cpp b/src/ui/flowgraph/remotedatasource.cpp deleted file mode 100644 index d70c030b..00000000 --- a/src/ui/flowgraph/remotedatasource.cpp +++ /dev/null @@ -1,206 +0,0 @@ -#include "remotedatasource.h" - -#include -#include -#include -#include - -#include - -#include "../app.h" - -using namespace opendigitizer::acq; - -template -struct RemoteSource : public gr::Block> { - gr::PortOut out{}; - DigitizerUi::RemoteBlockType *block; - - RemoteSource(DigitizerUi::RemoteBlockType *b) - : block(b) { - } - - void append(const Acquisition &data) { - std::lock_guard lock(m_mutex); - m_data.push_back({ data, 0 }); - } - - struct Data { - Acquisition data; - std::size_t read = 0; - }; - std::deque m_data; - std::mutex m_mutex; - - auto processBulk(gr::PublishableSpan auto &output) noexcept { - std::size_t written = 0; - std::lock_guard lock(m_mutex); - while (written < output.size() && !m_data.empty()) { - auto &d = m_data.front(); - auto in = std::span(d.data.channelValue.begin() + d.read, d.data.channelValue.end()); - in = in.first(std::min(output.size() - written, in.size())); - - std::copy(in.begin(), in.end(), output.begin() + written); - written += in.size(); - d.read += in.size(); - if (d.read == d.data.channelValue.size()) { - m_data.pop_front(); - } - } - output.publish(written); - return gr::work::Status::OK; - } -}; - -ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (RemoteSource), out); - -namespace DigitizerUi { - -class RemoteBlockType : public BlockType { -public: - explicit RemoteBlockType(std::string_view uri) - : BlockType(uri, uri, "Remote signals", true) - , m_uri(opencmw::URI<>::UriFactory().path(uri).build()) { - outputs.resize(1); - outputs[0].type = "float"; - createBlock = [this](std::string_view n) { - static int created = 0; - ++created; - if (n.empty()) { - std::string name = fmt::format("remote source {}", created); - return std::make_unique(name, this); - } - return std::make_unique(n, this); - }; - } - - void subscribe(RemoteDataSource *block) { - m_blocks.push_back(block); - if (m_subscribed++ > 0) { - return; - } - - opencmw::client::Command command; - command.command = opencmw::mdp::Command::Subscribe; - command.topic = m_uri; - - fmt::print("Subscribing to {}\n", m_uri); - - command.callback = [this](const opencmw::mdp::Message &rep) { - if (rep.data.size() == 0) { - return; - } - - auto buf = rep.data; - - try { - opencmw::deserialise(buf, m_data); - for (auto *b : m_blocks) { - if (b->graphNode()) { - if (auto *n = static_cast *>(b->graphNode()->raw())) { - n->append(m_data); - } - } - } - } catch (opencmw::ProtocolException &e) { - fmt::print("{}\n", e.what()); - return; - } - }; - m_client.request(command); - } - - void unsubscribe(RemoteDataSource *block) { - assert(m_subscribed > 0); - - auto it = std::find(m_blocks.begin(), m_blocks.end(), block); - if (it == m_blocks.end()) { - return; - } - m_blocks.erase(it); - - if (--m_subscribed > 0) { - return; - } - - fmt::print("Unsubscribing from {}\n", m_uri); - opencmw::client::Command command; - command.command = opencmw::mdp::Command::Unsubscribe; - command.topic = m_uri; - command.callback = [uri = m_uri](const opencmw::mdp::Message &rep) { - // TODO: Add cleanup once openCMW starts calling the callback - // on successful unsubscribe - fmt::format("Unsubscribed from {} successfully\n", uri); - }; - m_client.request(command); - } - - opencmw::URI<> m_uri; - opencmw::client::RestClient m_client; - int m_subscribed = 0; - - Acquisition m_data; - std::vector m_blocks; -}; - -RemoteDataSource::RemoteDataSource(std::string_view name, RemoteBlockType *t) - : Block(name, t->name, t) - , m_type(t) { - m_type->subscribe(this); -} - -RemoteDataSource::~RemoteDataSource() { - m_type->unsubscribe(this); -} - -std::unique_ptr RemoteDataSource::createGraphNode() { - // return nullptr; - return std::make_unique>>(m_type); -} - -void RemoteDataSource::registerBlockType(FlowGraph *fg, std::string_view uri) { - opencmw::client::Command command; - command.command = opencmw::mdp::Command::Get; - command.topic = opencmw::URI::UriFactory().path(uri).build(); - - auto *dashboard = App::instance().dashboard.get(); - - command.callback = [fg, dashboard, uri = std::string(uri)](const opencmw::mdp::Message &rep) { - if (rep.data.size() == 0) { - return; - } - - auto buf = rep.data; - Acquisition reply; - - try { - opencmw::deserialise(buf, reply); - } catch (opencmw::ProtocolException &e) { - fmt::print("{}\n", e.what()); - return; - } - - App::instance().executeLater([uri, fg, channelName = reply.channelName, dashboard]() mutable { - auto t = std::make_unique(uri); - t->outputs[0].name = channelName; - if (App::instance().dashboard.get() != dashboard) { - // If the current dashboard in the app is not the same as it was before issuing the request - // that means that in the mean time that we were waiting for the this callback to be called - // the dashboard was closed or changed. - return; - } - fg->registerRemoteSource(std::move(t), uri); - dashboard->addRemoteService(uri); - }); - }; - static opencmw::client::RestClient client; - client.request(command); -} - -void RemoteDataSource::registerBlockType(FlowGraph *fg, std::string_view uri, std::string_view signalName) { - auto t = std::make_unique(uri); - t->outputs[0].name = std::move(signalName); - fg->registerRemoteSource(std::move(t), uri); - App::instance().dashboard->addRemoteService(uri); -} -} // namespace DigitizerUi diff --git a/src/ui/flowgraph/remotedatasource.h b/src/ui/flowgraph/remotedatasource.h deleted file mode 100644 index 5641d25c..00000000 --- a/src/ui/flowgraph/remotedatasource.h +++ /dev/null @@ -1,25 +0,0 @@ -#ifndef REMOTEDATASOURCE_H -#define REMOTEDATASOURCE_H - -#include "../flowgraph.h" - -namespace DigitizerUi { - -class RemoteBlockType; - -class RemoteDataSource final : public Block { -public: - RemoteDataSource(std::string_view name, RemoteBlockType *t); - ~RemoteDataSource(); - - std::unique_ptr createGraphNode() final; - static void registerBlockType(FlowGraph *fg, std::string_view path); - static void registerBlockType(FlowGraph *fg, std::string_view path, std::string_view signalName); - -private: - RemoteBlockType *m_type; -}; - -} // namespace DigitizerUi - -#endif diff --git a/src/ui/main.cpp b/src/ui/main.cpp index 27a11baa..3d896b46 100644 --- a/src/ui/main.cpp +++ b/src/ui/main.cpp @@ -32,6 +32,7 @@ #include "flowgraph/datasink.h" #include "flowgraph/datasource.h" #include "flowgraphitem.h" +#include "RemoteSource.hpp" #include "utils/TouchHandler.hpp" CMRC_DECLARE(ui_assets); @@ -240,7 +241,7 @@ int main(int argc, char **argv) { DigitizerUi::DataSink::registerBlockType(); DigitizerUi::DataSinkSource::registerBlockType(); DigitizerUi::ArithmeticBlock::registerBlockType(); - + DigitizerUi::BlockType::registry().addBlockType("opendigitizer::RemoteSource"); DigitizerUi::BlockType::registry().addBlockType("FFT"); loadFonts(app);