Skip to content

Commit

Permalink
Refactor UI blocks and use multithreaded scheduler (#149)
Browse files Browse the repository at this point in the history
* Bump opencmw-cpp to latest main
* Fixes issues with emscripten GET/SET, i.e. initial dashboard retrieval and subscribe/unsubscribe (query parameters got lost).

* Make remote sources a regular GR block 
  Remove special treatment for remote sources, make it a regular GR
  block.

TODO:
 - re-add remote flowgraph handling
 - re-add adding new sources from the UI (wait for messages?)

* RemoteQueue: Prevent crash in callback after blcok destroyed

RestClient's tasks continue running when the RestClient is destroyed,
the callback then accessed the already deleted "this". Move the shared
data to a shared_ptr, to ensure it's still valid when appending data to
it.

* Turn SineSource into a regular GP block

* SineSource: Fix hang when destroyed while running

Make sure that destroyed while waiting in processOne(), there
isn't a deadlock where both mutex destruction and processOne
block each other.

* Make Arithmetic a normal GR block

* UI: Use multithreaded scheduler
  • Loading branch information
frankosterfeld authored Feb 6, 2024
1 parent bac8525 commit 87a40ca
Show file tree
Hide file tree
Showing 18 changed files with 191 additions and 366 deletions.
2 changes: 1 addition & 1 deletion cmake/Dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ include(FetchContent)
FetchContent_Declare(
opencmw-cpp
GIT_REPOSITORY https://github.com/fair-acc/opencmw-cpp.git
GIT_TAG 57f31a19d8998da944ec73223d7f3fba4feeb324
GIT_TAG a7a7c5c319b93ddfaf160893665a011d2d88bff8 # main as of 2024-02-06
)

FetchContent_Declare(
Expand Down
12 changes: 6 additions & 6 deletions src/service/dashboard/defaultDashboard.flowgraph
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@ blocks:
- name: FFT
id: FFT
- name: sum sigs
id: Arithmetic
id: opendigitizer::Arithmetic
- name: sine source 1
id: sine_source
id: opendigitizer::SineSource
parameters:
frequency: 0.100000
- name: source for sink 1
id: sink_source
- name: source for sink 2
id: sink_source
- name: remote source 1
id: http://localhost:8080/GnuRadio/Acquisition?channelNameFilter=test
id: opendigitizer::RemoteSource
parameters:
remote_uri: http://localhost:8080/GnuRadio/Acquisition?channelNameFilter=test
signal_name: test
- name: sink 1
id: sink
- name: sink 2
Expand All @@ -29,6 +32,3 @@ connections:
- [sum sigs, 0, sink 2, 0]
- [sine source 1, 0, sink 3, 0]
- [remote source 1, 0, sink 4, 0]
remote_sources:
- uri: http://localhost:8080/GnuRadio/Acquisition?channelNameFilter=test
signal_name: test
3 changes: 0 additions & 3 deletions src/ui/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ set(sources
flowgraph.cpp
flowgraphitem.cpp
dashboard.cpp
flowgraph/datasource.cpp
flowgraph/datasink.cpp
flowgraph/remotedatasource.cpp
flowgraph/arithmetic_block.cpp
dashboardpage.cpp
opendashboardpage.cpp
imguiutils.cpp
Expand Down
30 changes: 19 additions & 11 deletions src/ui/app.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
#include "common.h"
#include "dashboard.h"
#include "dashboardpage.h"
#include "flowgraph.h"
#include "flowgraphitem.h"
#include "opendashboardpage.h"

#include <gnuradio-4.0/Scheduler.hpp>

struct ImFont;

namespace DigitizerUi {
Expand Down Expand Up @@ -72,13 +73,14 @@ struct App {
ImFont *fontIconsSolidBig;
ImFont *fontIconsSolidLarge;
std::chrono::seconds editPaneCloseDelay{ 15 };
// The thread limit here is mainly for emscripten
std::shared_ptr<gr::thread_pool::BasicThreadPool> schedulerThreadPool = std::make_shared<gr::thread_pool::BasicThreadPool>("scheduler-pool", gr::thread_pool::CPU_BOUND, 2, 4);

template<typename Scheduler, typename Graph>
template<typename Graph>
void assignScheduler(Graph &&graph) {
if (m_scheduler) {
m_garbageSchedulers.push_back(std::move(m_scheduler));
}
m_scheduler.emplace<Scheduler>(std::forward<Graph>(graph));
using Scheduler = gr::scheduler::Simple<gr::scheduler::multiThreaded>;

m_scheduler.emplace<Scheduler>(std::forward<Graph>(graph), schedulerThreadPool);
}

private:
Expand All @@ -95,26 +97,32 @@ struct App {
};
template<typename T>
struct HandlerImpl : Handler {
T data;
std::thread thread;
T data;
std::thread thread;
std::atomic<bool> stopRequested = false;

template<typename... Args>
explicit HandlerImpl(Args &&...args)
: data(std::forward<Args>(args)...) {
thread = std::thread([this]() {
data.init();
data.runAndWait();
data.start();
while (!stopRequested && data.isProcessing()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
data.stop();
});
}
~HandlerImpl() {
stopRequested = true;
thread.join();
}
};

std::unique_ptr<Handler> handler;
};

SchedWrapper m_scheduler;
std::vector<SchedWrapper> m_garbageSchedulers; // TODO: Cleaning up schedulers needs support in opencmw to return unsubscription confirmation
SchedWrapper m_scheduler;

App();

Expand Down
6 changes: 3 additions & 3 deletions src/ui/assets/sampleDashboards/DemoDashboard.grc
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
blocks:
- name: sum sigs
id: Arithmetic
id: opendigitizer::Arithmetic
- name: FFT
id: FFT
- name: sine source 1
id: sine_source
id: opendigitizer::SineSource
parameters:
frequency: 1.000000
- name: sine source 3
id: sine_source
id: opendigitizer::SineSource
parameters:
frequency: 1.300000
- name: sink 1
Expand Down
14 changes: 7 additions & 7 deletions src/ui/assets/sampleDashboards/ExtendedDemoDashboard.grc
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
blocks:
- name: sum sigs1
id: Arithmetic
id: opendigitizer::Arithmetic
- name: sum sigs2
id: Arithmetic
id: opendigitizer::Arithmetic
- name: sum sigs3
id: Arithmetic
id: opendigitizer::Arithmetic
- name: FFT
id: FFT
- name: sine source 3
id: sine_source
id: opendigitizer::SineSource
parameters:
frequency: 0.100000
- name: sine source 4
id: sine_source
id: opendigitizer::SineSource
parameters:
frequency: 0.200000
- name: sine source 5
id: sine_source
id: opendigitizer::SineSource
parameters:
frequency: 0.300000
- name: sine source 6
id: sine_source
id: opendigitizer::SineSource
parameters:
frequency: 0.400000
- name: source for sink 1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
#include "arithmetic_block.h"
#ifndef OPENDIGITIZER_ARITHMETIC_HPP
#define OPENDIGITIZER_ARITHMETIC_HPP

#include <type_traits>
#include <gnuradio-4.0/Block.hpp>

#include "../flowgraph.h"
namespace opendigitizer {

template<typename T>
requires std::is_arithmetic_v<T>
struct MathNode : public gr::Block<MathNode<T>> {
gr::PortIn<T> in1{};
gr::PortIn<T> in2{};
struct Arithmetic : public gr::Block<Arithmetic<T>> {
gr::PortIn<T> in1;
gr::PortIn<T> in2;

gr::PortOut<T> out{};
gr::PortOut<T> out;

gr::Annotated<std::string, "operation"> operation = std::string("+");

Expand All @@ -33,12 +34,8 @@ struct MathNode : public gr::Block<MathNode<T>> {
}
};

ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (MathNode<T>), in1, in2, out, operation);
} // namespace opendigitizer

namespace DigitizerUi {
ENABLE_REFLECTION_FOR_TEMPLATE(opendigitizer::Arithmetic, in1, in2, out, operation)

void ArithmeticBlock::registerBlockType() {
BlockType::registry().addBlockType<MathNode>("Arithmetic");
}

} // namespace DigitizerUi
#endif
107 changes: 107 additions & 0 deletions src/ui/blocks/RemoteSource.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#ifndef OPENDIGITIZER_REMOTESOURCE_HPP
#define OPENDIGITIZER_REMOTESOURCE_HPP

#include <gnuradio-4.0/Block.hpp>

#include <daq_api.hpp>

#include <IoSerialiserYaS.hpp>
#include <MdpMessage.hpp>
#include <opencmw.hpp>
#include <RestClient.hpp>
#include <type_traits>

namespace opendigitizer {

template<typename T>
requires std::is_same_v<T, float>
struct RemoteSource : public gr::Block<RemoteSource<T>> {
gr::PortOut<float> out;
std::string remote_uri;
std::string signal_name;
opencmw::client::RestClient _client;

struct Data {
opendigitizer::acq::Acquisition data;
std::size_t read = 0;
};

struct Queue {
std::deque<Data> data;
std::mutex mutex;
};

std::shared_ptr<Queue> _queue = std::make_shared<Queue>();

auto processBulk(gr::PublishableSpan auto &output) noexcept {
std::size_t written = 0;
std::lock_guard lock(_queue->mutex);
while (written < output.size() && !_queue->data.empty()) {
auto &d = _queue->data.front();
auto in = std::span<const float>(d.data.channelValue.begin(), d.data.channelValue.end());
in = in.subspan(d.read, std::min(output.size() - written, in.size() - d.read));

std::copy(in.begin(), in.end(), output.begin() + written);
written += in.size();
d.read += in.size();
if (d.read == d.data.channelValue.size()) {
_queue->data.pop_front();
}
}
output.publish(written);
return gr::work::Status::OK;
}

void
settingsChanged(const gr::property_map &old_settings, const gr::property_map & /*new_settings*/) {
const auto oldValue = old_settings.find("remote_uri");
if (oldValue != old_settings.end()) {
const auto oldUri = std::get<std::string>(oldValue->second);
if (!oldUri.empty()) {
fmt::print("Unsubscribing from {}\n", oldUri);
opencmw::client::Command command;
command.command = opencmw::mdp::Command::Unsubscribe;
command.topic = opencmw::URI<>(remote_uri);
command.callback = [oldUri](const opencmw::mdp::Message &) {
// TODO: Add cleanup once openCMW starts calling the callback
// on successful unsubscribe
fmt::print("Unsubscribed from {} successfully\n", oldUri);
};
}
}

opencmw::client::Command command;
command.command = opencmw::mdp::Command::Subscribe;
command.topic = opencmw::URI<>(remote_uri);
fmt::print("Subscribing to {}\n", remote_uri);

std::weak_ptr maybeQueue = _queue;

command.callback = [maybeQueue](const opencmw::mdp::Message &rep) {
if (rep.data.empty()) {
return;
}
try {
auto queue = maybeQueue.lock();
if (!queue) {
return;
}
auto buf = rep.data;
opendigitizer::acq::Acquisition acq;
opencmw::deserialise<opencmw::YaS, opencmw::ProtocolCheck::IGNORE>(buf, acq);
std::lock_guard lock(queue->mutex);
queue->data.push_back({ std::move(acq), 0 });
} catch (opencmw::ProtocolException &e) {
fmt::print(std::cerr, "{}\n", e.what());
return;
}
};
_client.request(command);
}
};

} // namespace opendigitizer

ENABLE_REFLECTION_FOR_TEMPLATE(opendigitizer::RemoteSource, out, remote_uri, signal_name)

#endif
45 changes: 24 additions & 21 deletions src/ui/flowgraph/datasource.cpp → src/ui/blocks/SineSource.hpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
#include "datasource.h"

#include <fmt/format.h>
#include <math.h>
#include <mutex>
#ifndef OPENDIGITIZER_SINESOURCE_HPP
#define OPENDIGITIZER_SINESOURCE_HPP

#include <fmt/format.h>
#include <gnuradio-4.0/Block.hpp>

#include "../flowgraph.h"
#include <mutex>

namespace opendigitizer {

template<typename T>
requires std::is_arithmetic_v<T>
Expand Down Expand Up @@ -40,31 +39,35 @@ struct SineSource : public gr::Block<SineSource<T>, gr::BlockingIO<true>> {
}

~SineSource() {
std::unique_lock guard(mutex);
quit = true;
thread.join();
conditionvar.notify_all();
}

T processOne() {
auto processBulk(gr::PublishableSpan auto &output) {
// technically, this wouldn't have to block, but could just publish 0 samples,
// but keep it as test case for BlockingIO<true>.
std::unique_lock guard(mutex);
if (samples.size() == 0) {
while (samples.empty() && !quit) {
conditionvar.wait(guard);
}

T v = samples.front();
samples.pop_front();
out.max_samples = std::max<int>(1, samples.size());
return v;
const auto n = std::min(output.size(), samples.size());
if (n == 0) {
output.publish(0);
return gr::work::Status::OK;
}

std::copy(samples.begin(), samples.begin() + n, output.begin());
samples.erase(samples.begin(), samples.begin() + n);
output.publish(n);
return gr::work::Status::OK;
}
};

ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (SineSource<T>), out, frequency);
static_assert(gr::traits::block::can_processOne<SineSource<float>>);
static_assert(gr::traits::block::can_processOne<SineSource<double>>);

namespace DigitizerUi {
} // namespace opendigitizer

void DataSource::registerBlockType() {
BlockType::registry().addBlockType<SineSource>("sine_source");
}
ENABLE_REFLECTION_FOR_TEMPLATE(opendigitizer::SineSource, out, frequency)

} // namespace DigitizerUi
#endif
Loading

0 comments on commit 87a40ca

Please sign in to comment.