Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing message port handling to blocks and graphs #254

Merged
merged 17 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions blocks/basic/include/gnuradio-4.0/basic/common_blocks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@

template<typename T>
class builtin_multiply : public gr::Block<builtin_multiply<T>> {
T _factor = static_cast<T>(1.0f);

public:
T factor = static_cast<T>(1.0f);

gr::PortIn<T> in;
gr::PortOut<T> out;

Expand All @@ -23,17 +23,17 @@ class builtin_multiply : public gr::Block<builtin_multiply<T>> {
builtin_multiply(gr::property_map properties) {
auto it = properties.find("factor");
if (it != properties.cend()) {
_factor = std::get<T>(it->second);
factor = std::get<T>(it->second);
}
}

[[nodiscard]] constexpr auto
processOne(T a) const noexcept {
return a * _factor;
return a * factor;
}
};

ENABLE_REFLECTION_FOR_TEMPLATE(builtin_multiply, in, out);
ENABLE_REFLECTION_FOR_TEMPLATE(builtin_multiply, in, out, factor);

template<typename T>
class builtin_counter : public gr::Block<builtin_counter<T>> {
Expand Down Expand Up @@ -70,7 +70,7 @@ class multi_adder : public gr::BlockModel {
protected:
using TPortIn = gr::PortIn<T>;
std::vector<TPortIn> _input_ports;
gr::PortOut<T> _output_port;
gr::PortOut<T> _output_port;

protected:
using setting_map = std::map<std::string, int, std::less<>>;
Expand Down Expand Up @@ -196,6 +196,9 @@ class multi_adder : public gr::BlockModel {
return { requested_work, available_samples, gr::work::Status::OK };
}

void
processScheduledMessages() override {}

void *
raw() override {
return this;
Expand Down Expand Up @@ -234,8 +237,8 @@ void
registerBuiltinBlocks(Registry *registry) {
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-variable"
GP_REGISTER_NODE_RUNTIME(registry, builtin_multiply, double, float);
GP_REGISTER_NODE_RUNTIME(registry, builtin_counter, double, float);
GP_REGISTER_BLOCK_RUNTIME(registry, builtin_multiply, double, float);
GP_REGISTER_BLOCK_RUNTIME(registry, builtin_counter, double, float);
#pragma GCC diagnostic pop
}

Expand Down
2 changes: 1 addition & 1 deletion blocks/basic/test/qa_Selector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ struct repeated_source : public gr::Block<repeated_source<T>> {
}

if (remaining_events_count != 0) {
auto &port = gr::outputPort<0>(this);
auto &port = gr::outputPort<0, gr::PortType::STREAM>(this);
auto &writer = port.streamWriter();
auto data = writer.reserve_output_range(1UZ);

Expand Down
275 changes: 275 additions & 0 deletions blocks/testing/include/gnuradio-4.0/testing/FunctionBlocks.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
#ifndef GNURADIO_TESTING_FUNCTION_BLOCKS_HPP
#define GNURADIO_TESTING_FUNCTION_BLOCKS_HPP

#include <fmt/format.h>
#include <fmt/ranges.h>

#include <gnuradio-4.0/Block.hpp>
#include <gnuradio-4.0/reflection.hpp>
#include <gnuradio-4.0/Tag.hpp>

namespace gr::testing {

/**
* FunctionSource is a convenience struct template to allow easy creation
* of a source block by providing a generator function of values that
* the source block should emit.
*
* - the `generator` member variable should be initialized to a function
* that only takes the this pointer as an argument and that returns
* optional<T>, where an empty optional means that the source has finished,
* and that the block should request the processing to stop.
*
* - If you want this block to have custom processing of messages received
* on the builtin message port (msgIn), you can define the message handler
* by assigning it to the `messageProcessor` member variable.
*
*
* For example, a FunctionSource that sends values from 0 to 99 and
* then quits can be written as:
*
* ```
* FunctionSource<int> source;
* source.generator = [counter = 0] (auto*) mutable -> optional<int> {
* if (counter < 100)
* return { counter };
* else
* return {};
* };
* ```
*
* For example, a FunctionSource that prints out the number of messages
* that have been send to it can be written as:
*
* ```
* FunctionSource<int> source;
* source.generator = ...;
* source.messageProcessor = [count = 0UZ] (auto* _this, auto& port, std::span<const gr::Message> messages) mutable {
* count += messages.size();
* std::print("Received {} messages so far\n", count);
* };
* ```
*
* Note: It is only meant to be used for testing purposes, and should not be
* used for benchmarking as it uses std::function internally.
*/
template<typename T>
struct FunctionSource : gr::Block<FunctionSource<T>> {
PortOut<T> out;

/** A function that generates the values sent by this source.
* It takes a pointer to this FunctionSource and returns optional<T>.
* If the result is an empty optional, it means that the source
* has no more values to send
*/
std::function<std::optional<T>(FunctionSource<T> *)> generator;

/** A function that processes messages sent to this block on the default
* message port msgIn.
*
* The arguments for the function are the pointer to this FunctionSource,
* a reference to the port on which the message has arrived, and a span
* of received messages
*/
std::function<void(FunctionSource<T> *, gr::MsgPortInNamed<"__Builtin"> &, std::span<const gr::Message>)> messageProcessor;

T
processOne() {
if (!generator) this->requestStop();

auto value = generator(this);
if (!value) this->requestStop();

return *value;
}

void
processMessages(gr::MsgPortInNamed<"__Builtin"> &port, std::span<const gr::Message> message) {
if (messageProcessor) messageProcessor(this, port, message);
gr::Block<FunctionSource<T>>::processMessages(port, message);
}
};

/**
* FunctionProcess is a convenience struct template to allow easy creation
* of a processing block that has one input and one output by providing a
* processing function that takes an input value and calculates the output
* value.
*
* - the `processor` member variable should be initialized to a function
* that gets a pointer to this, and a value that it needs to process,
* and returns the value that this block should emit on its output port.
*
* - If you want this block to have custom processing of messages received
* on the builtin message port (msgIn), you can define the message handler
* by assigning it to the `messageProcessor` member variable.
*
*
* For example, a FunctionProcess that multiplies values it receives by 2
* can be written as:
*
* ```
* FunctionProcess<int> source;
* source.processor = [counter = 0] (auto*, int value) mutable {
* return 2 * value;
* };
* ```
*
* Note: It is only meant to be used for testing purposes, and should not be
* used for benchmarking as it uses std::function internally.
*/
template<typename T>
struct FunctionProcess : gr::Block<FunctionProcess<T>> {
PortIn<T> in;
PortOut<T> out;

/** A function that processes the values sent to this source.
* It takes a pointer to this FunctionSource, a value of type T
* and returns a new value of type T.
*/
std::function<T(FunctionProcess<T> *, T)> processor;

/** A function that processes messages sent to this block on the default
* message port msgIn.
*
* The arguments for the function are the pointer to this FunctionSource,
* a reference to the port on which the message has arrived, and a span
* of received messages
*/
std::function<void(FunctionProcess<T> *, gr::MsgPortInNamed<"__Builtin"> &, std::span<const gr::Message>)> messageProcessor;

T
processOne(T value) {
return processor ? processor(this, value) : value;
}

void
processMessages(gr::MsgPortInNamed<"__Builtin"> &port, std::span<const gr::Message> message) {
if (messageProcessor) messageProcessor(this, port, message);
gr::Block<FunctionProcess<T>>::processMessages(port, message);
}
};

/**
* FunctionSink is a convenience struct template to allow easy creation
* of a sink block by providing a function that processes values that
* were sent to this block.
*
* - the `sink` member variable should be initialized to a function
* that gets a pointer to this, and a value that it needs to process.
*
* - If you want this block to have custom processing of messages received
* on the builtin message port (msgIn), you can define the message handler
* by assigning it to the `messageProcessor` member variable.
*
*
* For example, a FunctionSink that prints out the values it receives
* can be written as:
*
* ```
* FunctionSink<int> source;
* source.sink = [] (auto*, T value) mutable {
* fmt::print("Got value {}\n", value);
* };
* ```
*
* Note: It is only meant to be used for testing purposes, and should not be
* used for benchmarking as it uses std::function internally.
*/
template<typename T>
struct FunctionSink : gr::Block<FunctionSink<T>> {
PortIn<T> in;

std::function<void(FunctionSink<T> *, T)> sink;
std::function<void(FunctionSink<T> *, gr::MsgPortInNamed<"__Builtin"> &, std::span<const gr::Message>)> messageProcessor;

/** A function that processes the values sent to this source.
* It takes a pointer to this FunctionSource, and a value of type T
* that it should process.
*/
void
processOne(T value) {
if (sink) sink(this, value);
}

/** A function that processes messages sent to this block on the default
* message port msgIn.
*
* The arguments for the function are the pointer to this FunctionSource,
* a reference to the port on which the message has arrived, and a span
* of received messages
*/
void
processMessages(gr::MsgPortInNamed<"__Builtin"> &port, std::span<const gr::Message> message) {
if (messageProcessor) messageProcessor(this, port, message);
gr::Block<FunctionSink<T>>::processMessages(port, message);
}
};


/**
* MessageSender is a convenience struct template to allow easy creation
* of a block that generates and sends messages on the builtin messaging
* port.
*
* - the `messageGenerator` member variable should be initialized to a function
* that gets a pointer to this, and returns optional<gr::Message>.
* If the result is an empty optional, it means that this block has
* finished and that there are no more messages to be send.
*
* For example, a MessageSender that sends 10 multicast messages
* can be written as:
*
* ```
* gr::testing::MessageSender<float> messageSender;
* messageSender.messageGenerator = [&, count = 10UZ](auto * _this) mutable -> std::optional<gr::Message> {
* if (count > 0) {
* count--;
* gr::Message message;
* message[gr::message::key::Kind] = "custom_kind";
* message[gr::message::key::Target] = "";
* return message;
* } else {
* return {};
* }
* };
*
* Note: It is only meant to be used for testing purposes, and should not be
* used for benchmarking as it uses std::function internally.
*/
template<typename T>
struct MessageSender : public gr::Block<MessageSender<T>> {
using super_t = gr::Block<MessageSender<T>>;

/** A function that generates the messages to be sent by this block.
* It takes a pointer to this FunctionSource and returns optional<gr::Message>.
* If the result is an empty optional, it means that the block
* has no more messages to send
*/
std::function<std::optional<Message>(MessageSender *)> messageGenerator;

gr::PortOut<T> unused;

constexpr T
processOne() {
assert(messageGenerator);

auto message = messageGenerator(this);
if (message) {
super_t::emitMessage(super_t::msgOut, *message);
} else {
this->requestStop();
}

return T{};
}
};

} // namespace gr::testing

ENABLE_REFLECTION_FOR_TEMPLATE(gr::testing::FunctionSource, out);
ENABLE_REFLECTION_FOR_TEMPLATE(gr::testing::FunctionProcess, in, out);
ENABLE_REFLECTION_FOR_TEMPLATE(gr::testing::FunctionSink, in);
ENABLE_REFLECTION_FOR_TEMPLATE(gr::testing::MessageSender, unused)

#endif // GNURADIO_TESTING_FUNCTION_BLOCKS_HPP
Loading
Loading