Skip to content

Commit

Permalink
Introducing message port handling to blocks and graphs (#254)
Browse files Browse the repository at this point in the history
* Introducing message port handling to blocks and graphs

- Added a default message port pair to gr::Block with
  example messages for setting properties, and lifetime control
  of blocks;
- Additional message port pair for gr::Graph to communicate
  with the blocks it owns -- used for message propagation into
  the graph and for exporting messages outward;
- Blocks handle the messages with (optionally) type-tagged
  ports -- when using named ports (or another way of port type
  differentiation, the processMessages handlers are chosen during
  compile-time and there is no need to check which port a message
  came on during runtime);
- Messages can be targeted to a specified unique_name. The '/'
  is used for separating unique_name of parent graph(s) and the block.
  '*' (or an empty target) is used to denote a message that is meant
  for all blocks;
- Since messages are arbitrary property maps, some convenience
  functions and definitions are provided;

TODO:
- Write more detailed tests
- Check graph<->children connection issues
  • Loading branch information
ivan-cukic authored Jan 31, 2024
1 parent 017eb01 commit fb8b080
Show file tree
Hide file tree
Showing 33 changed files with 1,410 additions and 390 deletions.
19 changes: 11 additions & 8 deletions blocks/basic/include/gnuradio-4.0/basic/common_blocks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@

template<typename T>
class builtin_multiply : public gr::Block<builtin_multiply<T>> {
T _factor = static_cast<T>(1.0f);

public:
T factor = static_cast<T>(1.0f);

gr::PortIn<T> in;
gr::PortOut<T> out;

Expand All @@ -23,17 +23,17 @@ class builtin_multiply : public gr::Block<builtin_multiply<T>> {
builtin_multiply(gr::property_map properties) {
auto it = properties.find("factor");
if (it != properties.cend()) {
_factor = std::get<T>(it->second);
factor = std::get<T>(it->second);
}
}

[[nodiscard]] constexpr auto
processOne(T a) const noexcept {
return a * _factor;
return a * factor;
}
};

ENABLE_REFLECTION_FOR_TEMPLATE(builtin_multiply, in, out);
ENABLE_REFLECTION_FOR_TEMPLATE(builtin_multiply, in, out, factor);

template<typename T>
class builtin_counter : public gr::Block<builtin_counter<T>> {
Expand Down Expand Up @@ -70,7 +70,7 @@ class multi_adder : public gr::BlockModel {
protected:
using TPortIn = gr::PortIn<T>;
std::vector<TPortIn> _input_ports;
gr::PortOut<T> _output_port;
gr::PortOut<T> _output_port;

protected:
using setting_map = std::map<std::string, int, std::less<>>;
Expand Down Expand Up @@ -196,6 +196,9 @@ class multi_adder : public gr::BlockModel {
return { requested_work, available_samples, gr::work::Status::OK };
}

void
processScheduledMessages() override {}

void *
raw() override {
return this;
Expand Down Expand Up @@ -234,8 +237,8 @@ void
registerBuiltinBlocks(Registry *registry) {
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-variable"
GP_REGISTER_NODE_RUNTIME(registry, builtin_multiply, double, float);
GP_REGISTER_NODE_RUNTIME(registry, builtin_counter, double, float);
GP_REGISTER_BLOCK_RUNTIME(registry, builtin_multiply, double, float);
GP_REGISTER_BLOCK_RUNTIME(registry, builtin_counter, double, float);
#pragma GCC diagnostic pop
}

Expand Down
2 changes: 1 addition & 1 deletion blocks/basic/test/qa_Selector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ struct repeated_source : public gr::Block<repeated_source<T>> {
}

if (remaining_events_count != 0) {
auto &port = gr::outputPort<0>(this);
auto &port = gr::outputPort<0, gr::PortType::STREAM>(this);
auto &writer = port.streamWriter();
auto data = writer.reserve_output_range(1UZ);

Expand Down
275 changes: 275 additions & 0 deletions blocks/testing/include/gnuradio-4.0/testing/FunctionBlocks.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
#ifndef GNURADIO_TESTING_FUNCTION_BLOCKS_HPP
#define GNURADIO_TESTING_FUNCTION_BLOCKS_HPP

#include <fmt/format.h>
#include <fmt/ranges.h>

#include <gnuradio-4.0/Block.hpp>
#include <gnuradio-4.0/reflection.hpp>
#include <gnuradio-4.0/Tag.hpp>

namespace gr::testing {

/**
* FunctionSource is a convenience struct template to allow easy creation
* of a source block by providing a generator function of values that
* the source block should emit.
*
* - the `generator` member variable should be initialized to a function
* that only takes the this pointer as an argument and that returns
* optional<T>, where an empty optional means that the source has finished,
* and that the block should request the processing to stop.
*
* - If you want this block to have custom processing of messages received
* on the builtin message port (msgIn), you can define the message handler
* by assigning it to the `messageProcessor` member variable.
*
*
* For example, a FunctionSource that sends values from 0 to 99 and
* then quits can be written as:
*
* ```
* FunctionSource<int> source;
* source.generator = [counter = 0] (auto*) mutable -> optional<int> {
* if (counter < 100)
* return { counter };
* else
* return {};
* };
* ```
*
* For example, a FunctionSource that prints out the number of messages
* that have been send to it can be written as:
*
* ```
* FunctionSource<int> source;
* source.generator = ...;
* source.messageProcessor = [count = 0UZ] (auto* _this, auto& port, std::span<const gr::Message> messages) mutable {
* count += messages.size();
* std::print("Received {} messages so far\n", count);
* };
* ```
*
* Note: It is only meant to be used for testing purposes, and should not be
* used for benchmarking as it uses std::function internally.
*/
template<typename T>
struct FunctionSource : gr::Block<FunctionSource<T>> {
PortOut<T> out;

/** A function that generates the values sent by this source.
* It takes a pointer to this FunctionSource and returns optional<T>.
* If the result is an empty optional, it means that the source
* has no more values to send
*/
std::function<std::optional<T>(FunctionSource<T> *)> generator;

/** A function that processes messages sent to this block on the default
* message port msgIn.
*
* The arguments for the function are the pointer to this FunctionSource,
* a reference to the port on which the message has arrived, and a span
* of received messages
*/
std::function<void(FunctionSource<T> *, gr::MsgPortInNamed<"__Builtin"> &, std::span<const gr::Message>)> messageProcessor;

T
processOne() {
if (!generator) this->requestStop();

auto value = generator(this);
if (!value) this->requestStop();

return *value;
}

void
processMessages(gr::MsgPortInNamed<"__Builtin"> &port, std::span<const gr::Message> message) {
if (messageProcessor) messageProcessor(this, port, message);
gr::Block<FunctionSource<T>>::processMessages(port, message);
}
};

/**
* FunctionProcess is a convenience struct template to allow easy creation
* of a processing block that has one input and one output by providing a
* processing function that takes an input value and calculates the output
* value.
*
* - the `processor` member variable should be initialized to a function
* that gets a pointer to this, and a value that it needs to process,
* and returns the value that this block should emit on its output port.
*
* - If you want this block to have custom processing of messages received
* on the builtin message port (msgIn), you can define the message handler
* by assigning it to the `messageProcessor` member variable.
*
*
* For example, a FunctionProcess that multiplies values it receives by 2
* can be written as:
*
* ```
* FunctionProcess<int> source;
* source.processor = [counter = 0] (auto*, int value) mutable {
* return 2 * value;
* };
* ```
*
* Note: It is only meant to be used for testing purposes, and should not be
* used for benchmarking as it uses std::function internally.
*/
template<typename T>
struct FunctionProcess : gr::Block<FunctionProcess<T>> {
PortIn<T> in;
PortOut<T> out;

/** A function that processes the values sent to this source.
* It takes a pointer to this FunctionSource, a value of type T
* and returns a new value of type T.
*/
std::function<T(FunctionProcess<T> *, T)> processor;

/** A function that processes messages sent to this block on the default
* message port msgIn.
*
* The arguments for the function are the pointer to this FunctionSource,
* a reference to the port on which the message has arrived, and a span
* of received messages
*/
std::function<void(FunctionProcess<T> *, gr::MsgPortInNamed<"__Builtin"> &, std::span<const gr::Message>)> messageProcessor;

T
processOne(T value) {
return processor ? processor(this, value) : value;
}

void
processMessages(gr::MsgPortInNamed<"__Builtin"> &port, std::span<const gr::Message> message) {
if (messageProcessor) messageProcessor(this, port, message);
gr::Block<FunctionProcess<T>>::processMessages(port, message);
}
};

/**
* FunctionSink is a convenience struct template to allow easy creation
* of a sink block by providing a function that processes values that
* were sent to this block.
*
* - the `sink` member variable should be initialized to a function
* that gets a pointer to this, and a value that it needs to process.
*
* - If you want this block to have custom processing of messages received
* on the builtin message port (msgIn), you can define the message handler
* by assigning it to the `messageProcessor` member variable.
*
*
* For example, a FunctionSink that prints out the values it receives
* can be written as:
*
* ```
* FunctionSink<int> source;
* source.sink = [] (auto*, T value) mutable {
* fmt::print("Got value {}\n", value);
* };
* ```
*
* Note: It is only meant to be used for testing purposes, and should not be
* used for benchmarking as it uses std::function internally.
*/
template<typename T>
struct FunctionSink : gr::Block<FunctionSink<T>> {
PortIn<T> in;

std::function<void(FunctionSink<T> *, T)> sink;
std::function<void(FunctionSink<T> *, gr::MsgPortInNamed<"__Builtin"> &, std::span<const gr::Message>)> messageProcessor;

/** A function that processes the values sent to this source.
* It takes a pointer to this FunctionSource, and a value of type T
* that it should process.
*/
void
processOne(T value) {
if (sink) sink(this, value);
}

/** A function that processes messages sent to this block on the default
* message port msgIn.
*
* The arguments for the function are the pointer to this FunctionSource,
* a reference to the port on which the message has arrived, and a span
* of received messages
*/
void
processMessages(gr::MsgPortInNamed<"__Builtin"> &port, std::span<const gr::Message> message) {
if (messageProcessor) messageProcessor(this, port, message);
gr::Block<FunctionSink<T>>::processMessages(port, message);
}
};


/**
* MessageSender is a convenience struct template to allow easy creation
* of a block that generates and sends messages on the builtin messaging
* port.
*
* - the `messageGenerator` member variable should be initialized to a function
* that gets a pointer to this, and returns optional<gr::Message>.
* If the result is an empty optional, it means that this block has
* finished and that there are no more messages to be send.
*
* For example, a MessageSender that sends 10 multicast messages
* can be written as:
*
* ```
* gr::testing::MessageSender<float> messageSender;
* messageSender.messageGenerator = [&, count = 10UZ](auto * _this) mutable -> std::optional<gr::Message> {
* if (count > 0) {
* count--;
* gr::Message message;
* message[gr::message::key::Kind] = "custom_kind";
* message[gr::message::key::Target] = "";
* return message;
* } else {
* return {};
* }
* };
*
* Note: It is only meant to be used for testing purposes, and should not be
* used for benchmarking as it uses std::function internally.
*/
template<typename T>
struct MessageSender : public gr::Block<MessageSender<T>> {
using super_t = gr::Block<MessageSender<T>>;

/** A function that generates the messages to be sent by this block.
* It takes a pointer to this FunctionSource and returns optional<gr::Message>.
* If the result is an empty optional, it means that the block
* has no more messages to send
*/
std::function<std::optional<Message>(MessageSender *)> messageGenerator;

gr::PortOut<T> unused;

constexpr T
processOne() {
assert(messageGenerator);

auto message = messageGenerator(this);
if (message) {
super_t::emitMessage(super_t::msgOut, *message);
} else {
this->requestStop();
}

return T{};
}
};

} // namespace gr::testing

ENABLE_REFLECTION_FOR_TEMPLATE(gr::testing::FunctionSource, out);
ENABLE_REFLECTION_FOR_TEMPLATE(gr::testing::FunctionProcess, in, out);
ENABLE_REFLECTION_FOR_TEMPLATE(gr::testing::FunctionSink, in);
ENABLE_REFLECTION_FOR_TEMPLATE(gr::testing::MessageSender, unused)

#endif // GNURADIO_TESTING_FUNCTION_BLOCKS_HPP
Loading

0 comments on commit fb8b080

Please sign in to comment.