diff --git a/blocks/basic/include/gnuradio-4.0/basic/common_blocks.hpp b/blocks/basic/include/gnuradio-4.0/basic/common_blocks.hpp index 8968df8b..6a5155cd 100644 --- a/blocks/basic/include/gnuradio-4.0/basic/common_blocks.hpp +++ b/blocks/basic/include/gnuradio-4.0/basic/common_blocks.hpp @@ -12,9 +12,9 @@ template class builtin_multiply : public gr::Block> { - T _factor = static_cast(1.0f); - public: + T factor = static_cast(1.0f); + gr::PortIn in; gr::PortOut out; @@ -23,17 +23,17 @@ class builtin_multiply : public gr::Block> { builtin_multiply(gr::property_map properties) { auto it = properties.find("factor"); if (it != properties.cend()) { - _factor = std::get(it->second); + factor = std::get(it->second); } } [[nodiscard]] constexpr auto processOne(T a) const noexcept { - return a * _factor; + return a * factor; } }; -ENABLE_REFLECTION_FOR_TEMPLATE(builtin_multiply, in, out); +ENABLE_REFLECTION_FOR_TEMPLATE(builtin_multiply, in, out, factor); template class builtin_counter : public gr::Block> { @@ -70,7 +70,7 @@ class multi_adder : public gr::BlockModel { protected: using TPortIn = gr::PortIn; std::vector _input_ports; - gr::PortOut _output_port; + gr::PortOut _output_port; protected: using setting_map = std::map>; @@ -196,6 +196,9 @@ class multi_adder : public gr::BlockModel { return { requested_work, available_samples, gr::work::Status::OK }; } + void + processScheduledMessages() override {} + void * raw() override { return this; @@ -234,8 +237,8 @@ void registerBuiltinBlocks(Registry *registry) { #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-variable" - GP_REGISTER_NODE_RUNTIME(registry, builtin_multiply, double, float); - GP_REGISTER_NODE_RUNTIME(registry, builtin_counter, double, float); + GP_REGISTER_BLOCK_RUNTIME(registry, builtin_multiply, double, float); + GP_REGISTER_BLOCK_RUNTIME(registry, builtin_counter, double, float); #pragma GCC diagnostic pop } diff --git a/blocks/basic/test/qa_Selector.cpp b/blocks/basic/test/qa_Selector.cpp index eccaaa82..47daaf16 100644 --- a/blocks/basic/test/qa_Selector.cpp +++ b/blocks/basic/test/qa_Selector.cpp @@ -33,7 +33,7 @@ struct repeated_source : public gr::Block> { } if (remaining_events_count != 0) { - auto &port = gr::outputPort<0>(this); + auto &port = gr::outputPort<0, gr::PortType::STREAM>(this); auto &writer = port.streamWriter(); auto data = writer.reserve_output_range(1UZ); diff --git a/blocks/testing/include/gnuradio-4.0/testing/FunctionBlocks.hpp b/blocks/testing/include/gnuradio-4.0/testing/FunctionBlocks.hpp new file mode 100644 index 00000000..358575b2 --- /dev/null +++ b/blocks/testing/include/gnuradio-4.0/testing/FunctionBlocks.hpp @@ -0,0 +1,275 @@ +#ifndef GNURADIO_TESTING_FUNCTION_BLOCKS_HPP +#define GNURADIO_TESTING_FUNCTION_BLOCKS_HPP + +#include +#include + +#include +#include +#include + +namespace gr::testing { + +/** + * FunctionSource is a convenience struct template to allow easy creation + * of a source block by providing a generator function of values that + * the source block should emit. + * + * - the `generator` member variable should be initialized to a function + * that only takes the this pointer as an argument and that returns + * optional, where an empty optional means that the source has finished, + * and that the block should request the processing to stop. + * + * - If you want this block to have custom processing of messages received + * on the builtin message port (msgIn), you can define the message handler + * by assigning it to the `messageProcessor` member variable. + * + * + * For example, a FunctionSource that sends values from 0 to 99 and + * then quits can be written as: + * + * ``` + * FunctionSource source; + * source.generator = [counter = 0] (auto*) mutable -> optional { + * if (counter < 100) + * return { counter }; + * else + * return {}; + * }; + * ``` + * + * For example, a FunctionSource that prints out the number of messages + * that have been send to it can be written as: + * + * ``` + * FunctionSource source; + * source.generator = ...; + * source.messageProcessor = [count = 0UZ] (auto* _this, auto& port, std::span messages) mutable { + * count += messages.size(); + * std::print("Received {} messages so far\n", count); + * }; + * ``` + * + * Note: It is only meant to be used for testing purposes, and should not be + * used for benchmarking as it uses std::function internally. + */ +template +struct FunctionSource : gr::Block> { + PortOut out; + + /** A function that generates the values sent by this source. + * It takes a pointer to this FunctionSource and returns optional. + * If the result is an empty optional, it means that the source + * has no more values to send + */ + std::function(FunctionSource *)> generator; + + /** A function that processes messages sent to this block on the default + * message port msgIn. + * + * The arguments for the function are the pointer to this FunctionSource, + * a reference to the port on which the message has arrived, and a span + * of received messages + */ + std::function *, gr::MsgPortInNamed<"__Builtin"> &, std::span)> messageProcessor; + + T + processOne() { + if (!generator) this->requestStop(); + + auto value = generator(this); + if (!value) this->requestStop(); + + return *value; + } + + void + processMessages(gr::MsgPortInNamed<"__Builtin"> &port, std::span message) { + if (messageProcessor) messageProcessor(this, port, message); + gr::Block>::processMessages(port, message); + } +}; + +/** + * FunctionProcess is a convenience struct template to allow easy creation + * of a processing block that has one input and one output by providing a + * processing function that takes an input value and calculates the output + * value. + * + * - the `processor` member variable should be initialized to a function + * that gets a pointer to this, and a value that it needs to process, + * and returns the value that this block should emit on its output port. + * + * - If you want this block to have custom processing of messages received + * on the builtin message port (msgIn), you can define the message handler + * by assigning it to the `messageProcessor` member variable. + * + * + * For example, a FunctionProcess that multiplies values it receives by 2 + * can be written as: + * + * ``` + * FunctionProcess source; + * source.processor = [counter = 0] (auto*, int value) mutable { + * return 2 * value; + * }; + * ``` + * + * Note: It is only meant to be used for testing purposes, and should not be + * used for benchmarking as it uses std::function internally. + */ +template +struct FunctionProcess : gr::Block> { + PortIn in; + PortOut out; + + /** A function that processes the values sent to this source. + * It takes a pointer to this FunctionSource, a value of type T + * and returns a new value of type T. + */ + std::function *, T)> processor; + + /** A function that processes messages sent to this block on the default + * message port msgIn. + * + * The arguments for the function are the pointer to this FunctionSource, + * a reference to the port on which the message has arrived, and a span + * of received messages + */ + std::function *, gr::MsgPortInNamed<"__Builtin"> &, std::span)> messageProcessor; + + T + processOne(T value) { + return processor ? processor(this, value) : value; + } + + void + processMessages(gr::MsgPortInNamed<"__Builtin"> &port, std::span message) { + if (messageProcessor) messageProcessor(this, port, message); + gr::Block>::processMessages(port, message); + } +}; + +/** + * FunctionSink is a convenience struct template to allow easy creation + * of a sink block by providing a function that processes values that + * were sent to this block. + * + * - the `sink` member variable should be initialized to a function + * that gets a pointer to this, and a value that it needs to process. + * + * - If you want this block to have custom processing of messages received + * on the builtin message port (msgIn), you can define the message handler + * by assigning it to the `messageProcessor` member variable. + * + * + * For example, a FunctionSink that prints out the values it receives + * can be written as: + * + * ``` + * FunctionSink source; + * source.sink = [] (auto*, T value) mutable { + * fmt::print("Got value {}\n", value); + * }; + * ``` + * + * Note: It is only meant to be used for testing purposes, and should not be + * used for benchmarking as it uses std::function internally. + */ +template +struct FunctionSink : gr::Block> { + PortIn in; + + std::function *, T)> sink; + std::function *, gr::MsgPortInNamed<"__Builtin"> &, std::span)> messageProcessor; + + /** A function that processes the values sent to this source. + * It takes a pointer to this FunctionSource, and a value of type T + * that it should process. + */ + void + processOne(T value) { + if (sink) sink(this, value); + } + + /** A function that processes messages sent to this block on the default + * message port msgIn. + * + * The arguments for the function are the pointer to this FunctionSource, + * a reference to the port on which the message has arrived, and a span + * of received messages + */ + void + processMessages(gr::MsgPortInNamed<"__Builtin"> &port, std::span message) { + if (messageProcessor) messageProcessor(this, port, message); + gr::Block>::processMessages(port, message); + } +}; + + +/** + * MessageSender is a convenience struct template to allow easy creation + * of a block that generates and sends messages on the builtin messaging + * port. + * + * - the `messageGenerator` member variable should be initialized to a function + * that gets a pointer to this, and returns optional. + * If the result is an empty optional, it means that this block has + * finished and that there are no more messages to be send. + * + * For example, a MessageSender that sends 10 multicast messages + * can be written as: + * + * ``` + * gr::testing::MessageSender messageSender; + * messageSender.messageGenerator = [&, count = 10UZ](auto * _this) mutable -> std::optional { + * if (count > 0) { + * count--; + * gr::Message message; + * message[gr::message::key::Kind] = "custom_kind"; + * message[gr::message::key::Target] = ""; + * return message; + * } else { + * return {}; + * } + * }; + * + * Note: It is only meant to be used for testing purposes, and should not be + * used for benchmarking as it uses std::function internally. + */ +template +struct MessageSender : public gr::Block> { + using super_t = gr::Block>; + + /** A function that generates the messages to be sent by this block. + * It takes a pointer to this FunctionSource and returns optional. + * If the result is an empty optional, it means that the block + * has no more messages to send + */ + std::function(MessageSender *)> messageGenerator; + + gr::PortOut unused; + + constexpr T + processOne() { + assert(messageGenerator); + + auto message = messageGenerator(this); + if (message) { + super_t::emitMessage(super_t::msgOut, *message); + } else { + this->requestStop(); + } + + return T{}; + } +}; + +} // namespace gr::testing + +ENABLE_REFLECTION_FOR_TEMPLATE(gr::testing::FunctionSource, out); +ENABLE_REFLECTION_FOR_TEMPLATE(gr::testing::FunctionProcess, in, out); +ENABLE_REFLECTION_FOR_TEMPLATE(gr::testing::FunctionSink, in); +ENABLE_REFLECTION_FOR_TEMPLATE(gr::testing::MessageSender, unused) + +#endif // GNURADIO_TESTING_FUNCTION_BLOCKS_HPP diff --git a/core/benchmarks/bm-nosonar_node_api.cpp b/core/benchmarks/bm-nosonar_node_api.cpp index 53c95e3b..919882ee 100644 --- a/core/benchmarks/bm-nosonar_node_api.cpp +++ b/core/benchmarks/bm-nosonar_node_api.cpp @@ -190,8 +190,8 @@ class gen_operation_SIMD : public gr::Block, gr::PortI gr::work::Result work(std::size_t requested_work) noexcept { - auto &out_port = outputPort<0>(this); - auto &in_port = inputPort<0>(this); + auto &out_port = outputPort<0, gr::PortType::STREAM>(this); + auto &in_port = inputPort<0, gr::PortType::STREAM>(this); auto &reader = in_port.streamReader(); auto &writer = out_port.streamWriter(); @@ -281,8 +281,8 @@ class copy : public gr::Block, gr::PortInName gr::work::Status work() noexcept { using namespace stdx; - auto &out_port = outputPort<"out">(this); - auto &in_port = inputPort<"in">(this); + auto &out_port = outputPort<"out">(this); + auto &in_port = inputPort<"in">(this); auto &reader = in_port.streamReader(); auto &writer = out_port.streamWriter(); @@ -363,7 +363,7 @@ class convert : public gr::Block, gr::PortInName const auto objects_to_write = stdx::is_simd_v ? n_simd_to_convert : scalars_to_convert; const auto objects_to_read = stdx::is_simd_v ? n_simd_to_convert : scalars_to_convert; - auto return_value = gr::work::Status::OK; + auto return_value = gr::work::Status::OK; writer.publish( // [&](std::span output) { const auto input = reader.get(); @@ -518,7 +518,7 @@ inline const boost::ut::suite _runtime_tests = [] { auto &src = testGraph.emplaceBlock>(N_SAMPLES); auto &sink = testGraph.emplaceBlock>(); - using copy = ::copy; + using copy = ::copy; std::vector cpy(10); for (std::size_t i = 0; i < cpy.size(); i++) { cpy[i] = std::addressof(testGraph.emplaceBlock({ { "name", fmt::format("copy {} at {}", i, gr::this_source_location()) } })); @@ -576,9 +576,9 @@ inline const boost::ut::suite _runtime_tests = [] { templated_cascaded_test(static_cast(2.0), "runtime src->mult(2.0)->div(2.0)->add(-1)->sink - int"); constexpr auto templated_cascaded_test_10 = [](T factor, const char *test_name) { - gr::Graph testGraph; - auto &src = testGraph.emplaceBlock>(N_SAMPLES); - auto &sink = testGraph.emplaceBlock>(); + gr::Graph testGraph; + auto &src = testGraph.emplaceBlock>(N_SAMPLES); + auto &sink = testGraph.emplaceBlock>(); std::vector *> mult1; std::vector *> div1; @@ -643,9 +643,9 @@ inline const boost::ut::suite _simd_tests = [] { } { - gr::Graph testGraph; - auto &src = testGraph.emplaceBlock>(N_SAMPLES); - auto &sink = testGraph.emplaceBlock>(); + gr::Graph testGraph; + auto &src = testGraph.emplaceBlock>(N_SAMPLES); + auto &sink = testGraph.emplaceBlock>(); std::vector *> mult1; std::vector *> mult2; diff --git a/core/include/gnuradio-4.0/Block.hpp b/core/include/gnuradio-4.0/Block.hpp index 2945fe48..30007dcd 100644 --- a/core/include/gnuradio-4.0/Block.hpp +++ b/core/include/gnuradio-4.0/Block.hpp @@ -3,6 +3,9 @@ #include #include +#include + +#include #include #ifdef __GNUC__ @@ -130,54 +133,64 @@ invokeProcessOneWithOrWithoutOffset(T &node, std::size_t offset, const Us &...in return node.processOne(inputs...); } -template +template [[nodiscard]] constexpr auto & inputPort(Self *self) noexcept { - using TRequestedPortType = typename traits::block::input_ports::template at; + using TRequestedPortType = typename traits::block::ports_data::template for_type::input_ports::template at; if constexpr (traits::block::block_defines_ports_as_member_variables) { using member_descriptor = traits::block::get_port_member_descriptor; return member_descriptor()(*self); } else { - return std::get(*self); + return self->template getArgument(); } } -template +template [[nodiscard]] constexpr auto & outputPort(Self *self) noexcept { - using requested_port_type = typename traits::block::output_ports::template at; + using TRequestedPortType = typename traits::block::ports_data::template for_type::output_ports::template at; if constexpr (traits::block::block_defines_ports_as_member_variables) { - using member_descriptor = traits::block::get_port_member_descriptor; + using member_descriptor = traits::block::get_port_member_descriptor; return member_descriptor()(*self); } else { - return std::get(*self); + return self->template getArgument(); } } template [[nodiscard]] constexpr auto & inputPort(Self *self) noexcept { - constexpr int Index = meta::indexForName>(); - return inputPort(self); + constexpr int Index = meta::indexForName>(); + if constexpr (Index == meta::default_message_port_index) { + return self->msgIn; + } + return inputPort(self); } template [[nodiscard]] constexpr auto & outputPort(Self *self) noexcept { - constexpr int Index = meta::indexForName>(); - return outputPort(self); + constexpr int Index = meta::indexForName>(); + if constexpr (Index == meta::default_message_port_index) { + return self->msgOut; + } + return outputPort(self); } -template +template [[nodiscard]] constexpr auto inputPorts(Self *self) noexcept { - return [self](std::index_sequence) { return std::tie(inputPort(self)...); }(std::make_index_sequence::size>()); + return [self](std::index_sequence) { + return std::tie(inputPort(self)...); + }(std::make_index_sequence::template for_type::input_ports::size()>()); } -template +template [[nodiscard]] constexpr auto outputPorts(Self *self) noexcept { - return [self](std::index_sequence) { return std::tie(outputPort(self)...); }(std::make_index_sequence::size>()); + return [self](std::index_sequence) { + return std::tie(outputPort(self)...); + }(std::make_index_sequence::template for_type::output_ports::size>()); } namespace work { @@ -385,11 +398,12 @@ static_assert(PublishableSpan>); * @tparam Arguments NTTP list containing the compile-time defined port instances, setting structs, or other constraints. */ template -struct Block : protected std::tuple { +class Block : protected std::tuple { static std::atomic_size_t _unique_id_counter; template using A = Annotated; +public: using base_t = Block; using derived_t = Derived; using ArgumentsTypeList = typename gr::meta::typelist; @@ -399,6 +413,19 @@ struct Block : protected std::tuple { using StrideControl = ArgumentsTypeList::template find_or_default>; constexpr static bool blockingIO = std::disjunction_v, Arguments>...> || std::disjunction_v, Arguments>...>; + template + auto & + getArgument() { + return std::get(*this); + } + + template + const auto & + getArgument() const { + return std::get(*this); + } + + // TODO: These are not involved in move operations, might be a problem later alignas(hardware_destructive_interference_size) std::atomic ioRequestedWork{ std::numeric_limits::max() }; alignas(hardware_destructive_interference_size) work::Counter ioWorkDone{}; alignas(hardware_destructive_interference_size) std::atomic ioLastWorkStatus{ work::Status::OK }; @@ -407,21 +434,36 @@ struct Block : protected std::tuple { "block_thread_pool", gr::thread_pool::TaskType::IO_BOUND, 2UZ, std::numeric_limits::max()); constexpr static TagPropagationPolicy tag_policy = TagPropagationPolicy::TPP_ALL_TO_ALL; + // using RatioValue = std::conditional_t; A1: Interpolate, =1: No change)">, Limits<1UZ, std::size_t(-1)>> numerator = Resampling::kNumerator; A1: Interpolate, =1: No change)">, Limits<1UZ, std::size_t(-1)>> denominator = Resampling::kDenominator; using StrideValue = std::conditional_t; - AN for skip, =0 for back-to-back.">> stride = StrideControl::kStride; - std::size_t stride_counter = 0UZ; - const std::size_t unique_id = _unique_id_counter++; - const std::string unique_name = fmt::format("{}#{}", gr::meta::type_name(), unique_id); - A ::unique_name">> name = gr::meta::type_name(); - A> meta_information; - constexpr static std::string_view description = static_cast(Description::value); - std::atomic state = lifecycle::State::IDLE; + AN for skip, =0 for back-to-back.">> stride = StrideControl::kStride; + + // + std::size_t stride_counter = 0UZ; + std::atomic state = lifecycle::State::IDLE; + + // TODO: These are not involved in move operations, might be a problem later + const std::size_t unique_id = _unique_id_counter++; + const std::string unique_name = fmt::format("{}#{}", gr::meta::type_name(), unique_id); + + // + A ::unique_name">> name = gr::meta::type_name(); + A> meta_information; + + // + constexpr static std::string_view description = static_cast(Description::value); static_assert(std::atomic::is_always_lock_free, "std::atomic is not lock-free"); + // TODO: C++26 make sure these are not reflected + // We support ports that are template parameters or reflected member variables, + // so these are handled in a special way + MsgPortInNamed<"__Builtin"> msgIn; + MsgPortOutNamed<"__Builtin"> msgOut; + struct PortsStatus { std::size_t in_min_samples{ 1UZ }; // max of `port.min_samples()` of all input ports std::size_t in_max_samples{ std::numeric_limits::max() }; // min of `port.max_samples()` of all input ports @@ -498,7 +540,7 @@ struct Block : protected std::tuple { ps.nSamplesToEosTag = std::min(ps.nSamplesToEosTag, samples_to_eos_tag(port).value_or(std::numeric_limits::max())); } }; - for_each_port([&adjust_for_input_port](PortLike auto &port) { adjust_for_input_port(port); }, inputPorts(&self())); + for_each_port([&adjust_for_input_port](PortLike auto &port) { adjust_for_input_port(port); }, inputPorts(&self())); auto adjust_for_output_port = [&ps = ports_status](Port &port) { if constexpr (std::remove_cvref_t::kIsSynch) { @@ -508,7 +550,7 @@ struct Block : protected std::tuple { ps.out_available = std::min(ps.out_available, port.streamWriter().available()); } }; - for_each_port([&adjust_for_output_port](PortLike auto &port) { adjust_for_output_port(port); }, outputPorts(&self())); + for_each_port([&adjust_for_output_port](PortLike auto &port) { adjust_for_output_port(port); }, outputPorts(&self())); ports_status.in_samples = ports_status.in_available; if (ports_status.in_samples < ports_status.in_min_samples) ports_status.in_samples = 0; @@ -546,7 +588,25 @@ struct Block : protected std::tuple { } } - Block(Block &&other) noexcept : std::tuple(std::move(other)), _settings(std::move(other._settings)) {} + Block(Block &&other) noexcept + : std::tuple(std::move(other)) + , numerator(std::move(other.numerator)) + , denominator(std::move(other.denominator)) + , stride(std::move(other.stride)) + , stride_counter(std::move(other.stride_counter)) + , state(other.state.load()) // atomic, not moving + , msgIn(std::move(other.msgIn)) + , msgOut(std::move(other.msgOut)) + , ports_status(std::move(other.ports_status)) + , _output_tags_changed(std::move(other._output_tags_changed)) + , _mergedInputTag(std::move(other._mergedInputTag)) + , _settings(std::move(other._settings)) {} + + // There are a few const or conditionally const member variables, + // we can not have a move-assignment that is equivalent to + // the move constructor + Block & + operator=(Block &&other) = delete; ~Block() { // NOSONAR -- need to request the (potentially) running ioThread to stop if (lifecycle::isActive(std::atomic_load_explicit(&state, std::memory_order_acquire))) { @@ -593,8 +653,8 @@ struct Block : protected std::tuple { } } }; - traits::block::input_ports::template apply_func(setPortName); - traits::block::output_ports::template apply_func(setPortName); + traits::block::all_input_ports::for_each(setPortName); + traits::block::all_output_ports::for_each(setPortName); // Handle settings // important: these tags need to be queued because at this stage the block is not yet connected to other downstream blocks @@ -613,9 +673,9 @@ struct Block : protected std::tuple { [[nodiscard]] constexpr std::size_t availableInputSamples(Container &data) const noexcept { if constexpr (gr::meta::vector_type) { - data.resize(traits::block::input_port_types::size); + data.resize(traits::block::stream_input_port_types::size); } else if constexpr (gr::meta::array_type) { - static_assert(std::tuple_size::value >= traits::block::input_port_types::size); + static_assert(std::tuple_size::value >= traits::block::stream_input_port_types::size); } else { static_assert(gr::meta::always_false, "type not supported"); } @@ -630,17 +690,17 @@ struct Block : protected std::tuple { } } }, - inputPorts(&self())); - return traits::block::input_port_types::size; + inputPorts(&self())); + return traits::block::stream_input_port_types::size; } template [[nodiscard]] constexpr std::size_t availableOutputSamples(Container &data) const noexcept { if constexpr (gr::meta::vector_type) { - data.resize(traits::block::output_port_types::size); + data.resize(traits::block::stream_output_port_types::size); } else if constexpr (gr::meta::array_type) { - static_assert(std::tuple_size::value >= traits::block::output_port_types::size); + static_assert(std::tuple_size::value >= traits::block::stream_output_port_types::size); } else { static_assert(gr::meta::always_false, "type not supported"); } @@ -655,8 +715,8 @@ struct Block : protected std::tuple { } } }, - outputPorts(&self())); - return traits::block::output_port_types::size; + outputPorts(&self())); + return traits::block::stream_output_port_types::size; } [[nodiscard]] constexpr bool @@ -708,8 +768,8 @@ struct Block : protected std::tuple { constexpr void checkParametersAndThrowIfNeeded() { - constexpr bool kIsSourceBlock = traits::block::input_port_types::size == 0; - constexpr bool kIsSinkBlock = traits::block::output_port_types::size == 0; + constexpr bool kIsSourceBlock = traits::block::stream_input_port_types::size == 0; + constexpr bool kIsSinkBlock = traits::block::stream_output_port_types::size == 0; if constexpr (Resampling::kEnabled) { static_assert(!kIsSinkBlock, "Decimation/interpolation is not available for sink blocks. Remove 'ResamplingRatio<>' from the block definition."); @@ -732,7 +792,7 @@ struct Block : protected std::tuple { void write_to_outputs(std::size_t available_values_count, auto &writers_tuple) noexcept { - if constexpr (traits::block::output_ports::size > 0) { + if constexpr (traits::block::stream_output_ports::size > 0) { meta::tuple_for_each_enumerate( [available_values_count](auto i, OutputRange &output_range) { if constexpr (traits::block::can_processOne or traits::block::processBulk_requires_ith_output_as_span) { @@ -771,7 +831,7 @@ struct Block : protected std::tuple { bool consumeReaders(Self &self, std::size_t available_values_count) { bool success = true; - if constexpr (traits::block::input_ports::size > 0) { + if constexpr (traits::block::stream_input_ports::size > 0) { std::apply( [available_values_count, &success](auto &...input_port) { auto consume_port = [&](Port &port_or_collection) { @@ -785,7 +845,7 @@ struct Block : protected std::tuple { }; (consume_port(input_port), ...); }, - inputPorts(&self)); + inputPorts(&self)); } return success; } @@ -793,10 +853,10 @@ struct Block : protected std::tuple { template constexpr auto invoke_processOne(std::size_t offset, Ts &&...inputs) { - if constexpr (traits::block::output_ports::size == 0) { + if constexpr (traits::block::stream_output_ports::size == 0) { invokeProcessOneWithOrWithoutOffset(self(), offset, std::forward(inputs)...); return std::tuple{}; - } else if constexpr (traits::block::output_ports::size == 1) { + } else if constexpr (traits::block::stream_output_ports::size == 1) { return std::tuple{ invokeProcessOneWithOrWithoutOffset(self(), offset, std::forward(inputs)...) }; } else { return invokeProcessOneWithOrWithoutOffset(self(), offset, std::forward(inputs)...); @@ -807,10 +867,10 @@ struct Block : protected std::tuple { constexpr auto invoke_processOne_simd(std::size_t offset, auto width, Ts &&...input_simds) { if constexpr (sizeof...(Ts) == 0) { - if constexpr (traits::block::output_ports::size == 0) { + if constexpr (traits::block::stream_output_ports::size == 0) { self().processOne_simd(offset, width); return std::tuple{}; - } else if constexpr (traits::block::output_ports::size == 1) { + } else if constexpr (traits::block::stream_output_ports::size == 1) { return std::tuple{ self().processOne_simd(offset, width) }; } else { return self().processOne_simd(offset, width); @@ -827,7 +887,7 @@ struct Block : protected std::tuple { _mergedInputTag.map.clear(); } - for_each_port([](PortLike auto &outPort) noexcept { outPort.publishPendingTags(); }, outputPorts(&self())); + for_each_port([](PortLike auto &outPort) noexcept { outPort.publishPendingTags(); }, outputPorts(&self())); _output_tags_changed = false; } @@ -849,12 +909,12 @@ struct Block : protected std::tuple { const Tag mergedPortTags = input_port.getTag(untilOffset); mergeSrcMapInto(mergedPortTags.map, _mergedInputTag.map); }, - inputPorts(&self())); + inputPorts(&self())); if (!mergedInputTag().map.empty()) { settings().autoUpdate(mergedInputTag().map); // apply tags as new settings if matching if constexpr (Derived::tag_policy == TagPropagationPolicy::TPP_ALL_TO_ALL) { - for_each_port([this](PortLike auto &outPort) noexcept { outPort.publishTag(mergedInputTag().map, 0); }, outputPorts(&self())); + for_each_port([this](PortLike auto &outPort) noexcept { outPort.publishTag(mergedInputTag().map, 0); }, outputPorts(&self())); } if (mergedInputTag().map.contains(gr::tag::END_OF_STREAM)) { requestStop(); @@ -866,7 +926,7 @@ struct Block : protected std::tuple { updateOutputTagsWithSettingParametersIfNeeded() { if (settings().changed()) { if (const auto forward_parameters = settings().applyStagedParameters(); !forward_parameters.empty()) { - for_each_port([&forward_parameters](PortLike auto &outPort) { outPort.publishTag(forward_parameters, 0); }, outputPorts(&self())); + for_each_port([&forward_parameters](PortLike auto &outPort) { outPort.publishTag(forward_parameters, 0); }, outputPorts(&self())); } settings()._changed.store(false); } @@ -952,7 +1012,7 @@ struct Block : protected std::tuple { return result; } }, - inputPorts(&self())); + inputPorts(&self())); } constexpr auto @@ -977,17 +1037,17 @@ struct Block : protected std::tuple { return result; } }, - outputPorts(&self())); + outputPorts(&self())); } inline constexpr void publishTag(property_map &&tag_data, Tag::signed_index_type tagOffset = -1) noexcept { - for_each_port([tag_data = std::move(tag_data), tagOffset](PortLike auto &outPort) { outPort.publishTag(tag_data, tagOffset); }, outputPorts(&self())); + for_each_port([tag_data = std::move(tag_data), tagOffset](PortLike auto &outPort) { outPort.publishTag(tag_data, tagOffset); }, outputPorts(&self())); } inline constexpr void publishTag(const property_map &tag_data, Tag::signed_index_type tagOffset = -1) noexcept { - for_each_port([&tag_data, tagOffset](PortLike auto &outPort) { outPort.publishTag(tag_data, tagOffset); }, outputPorts(&self())); + for_each_port([&tag_data, tagOffset](PortLike auto &outPort) { outPort.publishTag(tag_data, tagOffset); }, outputPorts(&self())); } constexpr void @@ -996,6 +1056,31 @@ struct Block : protected std::tuple { this->state.notify_all(); } + constexpr void + processScheduledMessages() { + auto processPort = [this](TPort &inPort) { + const auto available = inPort.streamReader().available(); + if constexpr (traits::block::can_processMessagesForPort) { + if (available > 0) { + self().processMessages(inPort, inPort.streamReader().get(available)); + } + } + if (available > 0) { + if (auto consumed = inPort.streamReader().consume(available); !consumed) { + throw fmt::format("Could not consume the messages from the message port"); + } + } + }; + processPort(msgIn); + for_each_port(processPort, inputPorts(&self())); + } + + void + emitMessage(auto &port, Message message) { + message[gr::message::key::Sender] = unique_name; + port.streamWriter().publish([&](auto &out) { out[0] = std::move(message); }, 1); + } + protected: /** * @brief @@ -1004,8 +1089,8 @@ struct Block : protected std::tuple { work::Result workInternal(std::size_t requested_work) { using gr::work::Status; - using TInputTypes = traits::block::input_port_types; - using TOutputTypes = traits::block::output_port_types; + using TInputTypes = traits::block::stream_input_port_types; + using TOutputTypes = traits::block::stream_output_port_types; constexpr bool kIsSourceBlock = TInputTypes::size == 0; constexpr bool kIsSinkBlock = TOutputTypes::size == 0; @@ -1197,7 +1282,7 @@ struct Block : protected std::tuple { // cannot use std::apply because it requires tuple_cat(inputSpans, writersTuple). The latter doesn't work because writersTuple isn't copyable. const work::Status ret = [&](std::index_sequence, std::index_sequence) { return self().processBulk(std::get(inputSpans)..., std::get(writersTuple)...); - }(std::make_index_sequence::size>(), std::make_index_sequence::size>()); + }(std::make_index_sequence::size>(), std::make_index_sequence::size>()); forwardTags(); if constexpr (kIsSourceBlock) { @@ -1313,6 +1398,8 @@ struct Block : protected std::tuple { */ work::Result work(std::size_t requested_work = std::numeric_limits::max()) noexcept { + processScheduledMessages(); + if constexpr (blockingIO) { constexpr bool useIoThread = std::disjunction_v, Arguments>...>; std::atomic_store_explicit(&ioRequestedWork, requested_work, std::memory_order_release); @@ -1358,6 +1445,45 @@ struct Block : protected std::tuple { return workInternal(requested_work); } } + + void + processMessages(MsgPortInNamed<"__Builtin"> &port, std::span messages) { + if (std::addressof(port) != std::addressof(msgIn)) { + fmt::print("{} got a message on a wrong port\n", self().unique_name); + return; + } + + for (const auto &message : messages) { + const auto kind = messageField(message, gr::message::key::Kind).value_or(std::string{}); + const auto target = messageField(message, gr::message::key::Target); + + if (target && !target->empty() && *target != self().unique_name) { + continue; + } + + if (kind == gr::message::kind::UpdateSettings) { + const auto data = messageField(message, gr::message::key::Data).value(); + auto notSet = settings().set(data); + + std::string keysNotSet; + for (const auto &[k, v] : notSet) { + keysNotSet += " " + k; + } + + Message settingsUpdated; + settingsUpdated[gr::message::key::Kind] = gr::message::kind::SettingsChanged; + settingsUpdated[gr::message::key::Data] = settings().get(); + + if (!notSet.empty()) { + Message errorMessage; + errorMessage[gr::message::key::Kind] = gr::message::kind::Error; + errorMessage[gr::message::key::Data] = notSet; + settingsUpdated[gr::message::key::ErrorInfo] = std::move(errorMessage); + } + emitMessage(msgOut, std::move(settingsUpdated)); + } + } + } }; template @@ -1384,7 +1510,7 @@ blockDescription() noexcept { /*constexpr*/ std::string ret = fmt::format("# {}\n{}\n{}\n**supported data types:**", // gr::meta::type_name(), Description::value._data, kIsBlocking ? "**BlockingIO**\n_i.e. potentially non-deterministic/non-real-time behaviour_\n" : ""); - gr::meta::typelist::template apply_func([&](std::size_t index, auto &&t) { + gr::meta::typelist::for_each([&](std::size_t index, auto &&t) { std::string type_name = gr::meta::type_name(); ret += fmt::format("{}:{} ", index, type_name); }); @@ -1423,7 +1549,7 @@ struct BlockParameters { template typename TBlock, typename RegisterInstance> void registerOn(RegisterInstance *plugin_instance, std::string block_type) const { - plugin_instance->template add_block_type(block_type); + plugin_instance->template addBlockType(block_type); } }; @@ -1431,14 +1557,15 @@ template typename TBlock, typename... TBlockParameters> struct RegisterBlock { template RegisterBlock(RegisterInstance *plugin_instance, std::string block_type) { - auto add_block_type = [&] { + std::cout << "registerBlock " << block_type << std::endl; + auto addBlockType = [&] { if constexpr (meta::is_instantiation_of) { Type().template registerOn(plugin_instance, block_type); } else { - plugin_instance->template add_block_type(block_type); + plugin_instance->template addBlockType(block_type); } }; - ((add_block_type.template operator()()), ...); + ((addBlockType.template operator()()), ...); } }; } // namespace detail diff --git a/core/include/gnuradio-4.0/BlockRegistry.hpp b/core/include/gnuradio-4.0/BlockRegistry.hpp index 01ecf8f7..78e52656 100644 --- a/core/include/gnuradio-4.0/BlockRegistry.hpp +++ b/core/include/gnuradio-4.0/BlockRegistry.hpp @@ -22,7 +22,7 @@ class BlockRegistry { template typename TBlock, typename... TBlockParameters> static auto - create_handler() { + createHandler() { return [](std::unique_ptr &result, const property_map ¶ms) { using BlockType = TBlock; @@ -68,9 +68,9 @@ class BlockRegistry { public: template typename TBlock, typename... TBlockParameters> void - add_block_type(std::string block_type) { + addBlockType(std::string block_type) { auto &block_handlers = findBlock_type_handlers_map(block_type); - block_handlers[encoded_list_of_types()] = create_handler(); + block_handlers[encoded_list_of_types()] = createHandler(); fmt::print("Registered {} {}\n", block_type, encoded_list_of_types()); } diff --git a/core/include/gnuradio-4.0/BlockTraits.hpp b/core/include/gnuradio-4.0/BlockTraits.hpp index 674b98f1..b85f2219 100644 --- a/core/include/gnuradio-4.0/BlockTraits.hpp +++ b/core/include/gnuradio-4.0/BlockTraits.hpp @@ -21,30 +21,6 @@ namespace detail { template using member_type = typename FieldDescriptor::value_type; -template -auto -unwrap_port_helper() { - if constexpr (port::is_port_v) { - return static_cast(nullptr); - } else if constexpr (port::is_port_collection_v) { - return static_cast(nullptr); - } else { - static_assert(meta::always_false, "Not a port or a collection of ports"); - } -} - -template -using unwrap_port = std::remove_pointer_t())>; - -template -using is_port_or_collection = std::integral_constant || port::is_port_collection_v>; - -template -using is_input_port_or_collection = std::integral_constant() && port::is_input_v>>; - -template -using is_output_port_or_collection = std::integral_constant() && port::is_output_v>>; - template constexpr bool is_port_descriptor_v = port::is_port_v>; @@ -96,7 +72,7 @@ concept Reflectable = refl::is_reflectable(); template struct member_ports_detector { - using member_ports = typename meta::to_typelist>::template filter::template transform; + using member_ports = typename meta::to_typelist>::template filter::template transform; static constexpr bool value = member_ports::size != 0; }; @@ -115,31 +91,38 @@ struct member_descriptor_has_type { template struct fixedBlock_ports_data_helper; -// This specialization defines node attributes when the node is created +// This specialization defines block attributes when the block is created // with two type lists - one list for input and one for output ports -template - requires InputPorts::template -all_of &&OutputPorts::template all_of struct fixedBlock_ports_data_helper { +template + requires(InputPorts::template all_of && OutputPorts::template all_of) +struct fixedBlock_ports_data_helper { using member_ports_detector = std::false_type; - // using member_ports_detector = detail::member_ports_detector; + using defined_input_ports = InputPorts; + using defined_output_ports = OutputPorts; - using input_ports = InputPorts; - using output_ports = OutputPorts; + template + struct for_type { + using input_ports = typename defined_input_ports ::template filter::template is_input_port_or_collection>; + using output_ports = typename defined_output_ports ::template filter::template is_output_port_or_collection>; + using all_ports = meta::concat; - using input_port_types = typename input_ports ::template transform; - using output_port_types = typename output_ports ::template transform; + using input_port_types = typename input_ports ::template transform; + using output_port_types = typename output_ports ::template transform; + }; - using all_ports = meta::concat; + using all = for_type; + using stream = for_type; + using message = for_type; }; -// This specialization defines node attributes when the node is created +// This specialization defines block attributes when the block is created // with a list of ports as template arguments template struct fixedBlock_ports_data_helper { using member_ports_detector = detail::member_ports_detector; - using all_ports = std::remove_pointer_t(nullptr); } else { @@ -147,11 +130,18 @@ struct fixedBlock_ports_data_helper { } }())>; - using input_ports = typename all_ports ::template filter; - using output_ports = typename all_ports ::template filter; + template + struct for_type { + using input_ports = typename all_ports ::template filter::template is_input_port_or_collection>; + using output_ports = typename all_ports ::template filter::template is_output_port_or_collection>; + + using input_port_types = typename input_ports ::template transform; + using output_port_types = typename output_ports ::template transform; + }; - using input_port_types = typename input_ports ::template transform; - using output_port_types = typename output_ports ::template transform; + using all = for_type; + using stream = for_type; + using message = for_type; }; // clang-format off @@ -165,31 +155,46 @@ using fixedBlock_ports_data = // clang-format on template -using all_ports = typename fixedBlock_ports_data::all_ports; +using ports_data = fixedBlock_ports_data; + +template +using all_input_ports = typename fixedBlock_ports_data::all::input_ports; template -using input_ports = typename fixedBlock_ports_data::input_ports; +using all_output_ports = typename fixedBlock_ports_data::all::output_ports; template -using output_ports = typename fixedBlock_ports_data::output_ports; +using all_input_port_types = typename fixedBlock_ports_data::all::input_port_types; template -using input_port_types = typename fixedBlock_ports_data::input_port_types; +using all_output_port_types = typename fixedBlock_ports_data::all::output_port_types; template -using output_port_types = typename fixedBlock_ports_data::output_port_types; +using all_input_port_types_tuple = typename all_input_port_types::tuple_type; template -using input_port_types_tuple = typename input_port_types::tuple_type; +using stream_input_ports = typename fixedBlock_ports_data::stream::input_ports; template -using return_type = typename output_port_types::tuple_or_type; +using stream_output_ports = typename fixedBlock_ports_data::stream::output_ports; template -using input_port_names = typename input_ports::template transform; +using stream_input_port_types = typename fixedBlock_ports_data::stream::input_port_types; template -using output_port_names = typename output_ports::template transform; +using stream_output_port_types = typename fixedBlock_ports_data::stream::output_port_types; + +template +using stream_input_port_types_tuple = typename stream_input_port_types::tuple_type; + +template +using stream_return_type = typename fixedBlock_ports_data::stream::output_port_types::tuple_or_type; + +template +using all_input_port_names = typename all_input_ports::template transform; + +template +using all_output_port_names = typename all_output_ports::template transform; template constexpr bool block_defines_ports_as_member_variables = fixedBlock_ports_data::member_ports_detector::value; @@ -202,26 +207,27 @@ using get_port_member_descriptor = typename meta::to_typelist auto -can_processOne_invoke_test(auto &node, const auto &input, std::index_sequence) -> decltype(node.processOne(std::get(input)...)); +can_processOne_invoke_test(auto &block, const auto &input, std::index_sequence) -> decltype(block.processOne(std::get(input)...)); template struct exact_argument_type { template U> - constexpr operator U() const noexcept; + constexpr + operator U() const noexcept; }; template auto -can_processOne_with_offset_invoke_test(auto &node, const auto &input, std::index_sequence) -> decltype(node.processOne(exact_argument_type(), std::get(input)...)); +can_processOne_with_offset_invoke_test(auto &block, const auto &input, std::index_sequence) -> decltype(block.processOne(exact_argument_type(), std::get(input)...)); template -using simd_return_type_of_can_processOne = meta::simdize, meta::simdize_size_v>>>; +using simd_return_type_of_can_processOne = meta::simdize, meta::simdize_size_v>>>; } // namespace detail -/* A node "can process simd" if its `processOne` function takes at least one argument and all +/* A block "can process simd" if its `processOne` function takes at least one argument and all * arguments can be simdized types of the actual port data types. * - * The node can be a sink (no output ports). + * The block can be a sink (no output ports). * The requirement of at least one function argument disallows sources. * * There is another (unnamed) concept for source nodes: Source nodes can implement @@ -232,10 +238,10 @@ concept can_processOne_simd = #if DISABLE_SIMD false; #else - traits::block::input_ports::template all_of and // checks we don't have port collections inside - traits::block::input_port_types::size() > 0 and requires(TBlock &node, const meta::simdize> &input_simds) { + traits::block::stream_input_ports::template all_of and // checks we don't have port collections inside + traits::block::stream_input_port_types::size() > 0 and requires(TBlock &block, const meta::simdize> &input_simds) { { - detail::can_processOne_invoke_test(node, input_simds, std::make_index_sequence::size()>()) + detail::can_processOne_invoke_test(block, input_simds, std::make_index_sequence::size()>()) } -> std::same_as>; }; #endif @@ -245,22 +251,22 @@ concept can_processOne_simd_with_offset = #if DISABLE_SIMD false; #else - traits::block::input_ports::template all_of and // checks we don't have port collections inside - traits::block::input_port_types::size() > 0 && requires(TBlock &node, const meta::simdize> &input_simds) { + traits::block::stream_input_ports::template all_of and // checks we don't have port collections inside + traits::block::stream_input_port_types::size() > 0 && requires(TBlock &block, const meta::simdize> &input_simds) { { - detail::can_processOne_with_offset_invoke_test(node, input_simds, std::make_index_sequence::size()>()) + detail::can_processOne_with_offset_invoke_test(block, input_simds, std::make_index_sequence::size()>()) } -> std::same_as>; }; #endif template -concept can_processOne_scalar = requires(TBlock &node, const input_port_types_tuple &inputs) { - { detail::can_processOne_invoke_test(node, inputs, std::make_index_sequence::size()>()) } -> std::same_as>; +concept can_processOne_scalar = requires(TBlock &block, const stream_input_port_types_tuple &inputs) { + { detail::can_processOne_invoke_test(block, inputs, std::make_index_sequence::size()>()) } -> std::same_as>; }; template -concept can_processOne_scalar_with_offset = requires(TBlock &node, const input_port_types_tuple &inputs) { - { detail::can_processOne_with_offset_invoke_test(node, inputs, std::make_index_sequence::size()>()) } -> std::same_as>; +concept can_processOne_scalar_with_offset = requires(TBlock &block, const stream_input_port_types_tuple &inputs) { + { detail::can_processOne_with_offset_invoke_test(block, inputs, std::make_index_sequence::size()>()) } -> std::same_as>; }; template @@ -269,6 +275,9 @@ concept can_processOne = can_processOne_scalar or can_processOne_simd concept can_processOne_with_offset = can_processOne_scalar_with_offset or can_processOne_simd_with_offset; +template +concept can_processMessagesForPort = requires(TBlock &block, TPort &inPort) { block.processMessages(inPort, inPort.streamReader().get(1UZ)); }; + namespace detail { template struct dummy_input_span : std::span { // NOSONAR @@ -351,15 +360,15 @@ using dynamic_span = std::span; template auto -can_processBulk_invoke_test(auto &node, const auto &inputs, auto &outputs, std::index_sequence, std::index_sequence) - -> decltype(node.processBulk(std::get(inputs)..., std::get(outputs)...)); +can_processBulk_invoke_test(auto &block, const auto &inputs, auto &outputs, std::index_sequence, std::index_sequence) + -> decltype(block.processBulk(std::get(inputs)..., std::get(outputs)...)); } // namespace detail template -concept can_processBulk = requires(TBlock &n, typename meta::transform_types_nested>::tuple_type inputs, - typename meta::transform_types_nested>::tuple_type outputs) { +concept can_processBulk = requires(TBlock &n, typename meta::transform_types_nested>::tuple_type inputs, + typename meta::transform_types_nested>::tuple_type outputs) { { - detail::can_processBulk_invoke_test(n, inputs, outputs, std::make_index_sequence::size>(), std::make_index_sequence::size>()) + detail::can_processBulk_invoke_test(n, inputs, outputs, std::make_index_sequence::size>(), std::make_index_sequence::size>()) } -> std::same_as; }; @@ -371,24 +380,24 @@ concept can_processBulk = requires(TBlock &n, typename meta::transform_types_nes template concept processBulk_requires_ith_output_as_span = can_processBulk - && requires(TDerived &d, typename meta::transform_types>::template apply inputs, + && requires(TDerived &d, typename meta::transform_types>::template apply inputs, typename meta::transform_conditional>::template apply + traits::block::stream_output_port_types>::template apply outputs, typename meta::transform_conditional>::template apply + traits::block::stream_output_port_types>::template apply bad_outputs) { { [](std::index_sequence, std::index_sequence) -> decltype(d.processBulk(std::get(inputs)..., std::get(outputs)...)) { return {}; - }(std::make_index_sequence::size>(), std::make_index_sequence::size>()) + }(std::make_index_sequence::size>(), std::make_index_sequence::size>()) } -> std::same_as; not requires { [](std::index_sequence, std::index_sequence) -> decltype(d.processBulk(std::get(inputs)..., std::get(bad_outputs)...)) { return {}; - }(std::make_index_sequence::size>(), std::make_index_sequence::size>()); + }(std::make_index_sequence::size>(), std::make_index_sequence::size>()); }; }; diff --git a/core/include/gnuradio-4.0/Graph.hpp b/core/include/gnuradio-4.0/Graph.hpp index 7a4dd799..aa3dcf3e 100644 --- a/core/include/gnuradio-4.0/Graph.hpp +++ b/core/include/gnuradio-4.0/Graph.hpp @@ -64,6 +64,9 @@ class BlockModel { if (!_dynamicPortsLoaded) _dynamicPortsLoader(); } + MsgPortInNamed<"__Builtin"> *msgIn; + MsgPortOutNamed<"__Builtin"> *msgOut; + [[nodiscard]] gr::DynamicPort & dynamicInputPort(std::size_t index, std::size_t subIndex = meta::invalid_index) { initDynamicPorts(); @@ -220,7 +223,9 @@ class BlockModel { * @brief Block state (N.B. IDLE, INITIALISED, RUNNING, REQUESTED_STOP, REQUESTED_PAUSE, STOPPED, PAUSED, ERROR) * See enum description for details. */ - [[nodiscard]] virtual lifecycle::State state() const noexcept = 0; + [[nodiscard]] virtual lifecycle::State + state() const noexcept + = 0; /** * @brief number of available readable samples at the block's input ports @@ -285,6 +290,10 @@ class BlockModel { work(std::size_t requested_work) = 0; + virtual void + processScheduledMessages() + = 0; + [[nodiscard]] virtual void * raw() = 0; }; @@ -319,6 +328,12 @@ class BlockWrapper : public BlockModel { } } + void + initMessagePorts() { + msgIn = std::addressof(_block.msgIn); + msgOut = std::addressof(_block.msgOut); + } + void createDynamicPortsLoader() { _dynamicPortsLoader = [this] { @@ -341,9 +356,9 @@ class BlockWrapper : public BlockModel { } else { // We can also have ports defined as template parameters if constexpr (decltype(direction)::value == PortDirection::INPUT) { - processPort(where, gr::inputPort(&blockRef())); + processPort(where, gr::inputPort(&blockRef())); } else { - processPort(where, gr::outputPort(&blockRef())); + processPort(where, gr::outputPort(&blockRef())); } } } else { @@ -361,11 +376,11 @@ class BlockWrapper : public BlockModel { } } }; - traits::block::input_ports::template apply_func(registerPort, _dynamicInputPorts, std::integral_constant{}); - traits::block::output_ports::template apply_func(registerPort, _dynamicOutputPorts, std::integral_constant{}); + traits::block::all_input_ports::for_each(registerPort, _dynamicInputPorts, std::integral_constant{}); + traits::block::all_output_ports::for_each(registerPort, _dynamicOutputPorts, std::integral_constant{}); - constexpr std::size_t input_port_count = gr::traits::block::template input_port_types::size; - constexpr std::size_t output_port_count = gr::traits::block::template output_port_types::size; + constexpr std::size_t input_port_count = gr::traits::block::all_input_port_types::size; + constexpr std::size_t output_port_count = gr::traits::block::all_output_port_types::size; static_assert(input_port_count + output_port_count > 0); _dynamicPortsLoaded = true; }; @@ -383,21 +398,29 @@ class BlockWrapper : public BlockModel { ~BlockWrapper() override = default; - BlockWrapper() { createDynamicPortsLoader(); } + BlockWrapper() { + initMessagePorts(); + createDynamicPortsLoader(); + } template requires(!std::is_same_v, T>) explicit BlockWrapper(Arg &&arg) : _block(std::forward(arg)) { + initMessagePorts(); createDynamicPortsLoader(); } template requires(!detail::contains_type...> && sizeof...(Args) > 1) explicit BlockWrapper(Args &&...args) : _block{ std::forward(args)... } { + initMessagePorts(); createDynamicPortsLoader(); } - explicit BlockWrapper(std::initializer_list> init_parameter) : _block{ std::move(init_parameter) } { createDynamicPortsLoader(); } + explicit BlockWrapper(std::initializer_list> init_parameter) : _block{ std::move(init_parameter) } { + initMessagePorts(); + createDynamicPortsLoader(); + } void init(std::shared_ptr progress, std::shared_ptr ioThreadPool) override { @@ -444,12 +467,18 @@ class BlockWrapper : public BlockModel { return blockRef().work(requested_work); } + void + processScheduledMessages() override { + return blockRef().processScheduledMessages(); + } + [[nodiscard]] constexpr bool isBlocking() const noexcept override { return blockRef().isBlocking(); } - [[nodiscard]] lifecycle::State state() const noexcept override { + [[nodiscard]] lifecycle::State + state() const noexcept override { return blockRef().state.load(); } @@ -591,16 +620,17 @@ class Edge { } }; -struct Graph { +class Graph : public gr::Block { alignas(hardware_destructive_interference_size) std::shared_ptr progress = std::make_shared(); alignas(hardware_destructive_interference_size) std::shared_ptr ioThreadPool = std::make_shared( "graph_thread_pool", gr::thread_pool::TaskType::IO_BOUND, 2UZ, std::numeric_limits::max()); private: std::vector> _connectionDefinitions; - std::vector> _blocks; std::vector _edges; + std::vector> _blocks; + template std::unique_ptr & findBlock(TBlock &what) { @@ -621,7 +651,7 @@ struct Graph { typename Destination, typename DestinationPort> [[nodiscard]] ConnectionResult connectImpl(Source &sourceNodeRaw, SourcePort &source_port_or_collection, Destination &destinationNodeRaw, DestinationPort &destinationPort_or_collection, std::size_t minBufferSize = 65536, - std::int32_t weight = 0, std::string_view name = "unnamed edge") { + std::int32_t weight = 0, std::string_view edgeName = "unnamed edge") { if (!std::any_of(_blocks.begin(), _blocks.end(), [&](const auto ®isteredNode) { return registeredNode->raw() == std::addressof(sourceNodeRaw); }) || !std::any_of(_blocks.begin(), _blocks.end(), [&](const auto ®isteredNode) { return registeredNode->raw() == std::addressof(destinationNodeRaw); })) { throw std::runtime_error( @@ -644,15 +674,18 @@ struct Graph { } }(); - static_assert(std::is_same_v::value_type, typename std::remove_pointer_t::value_type>, - "The source port type needs to match the sink port type"); + if constexpr (!std::is_same_v::value_type, typename std::remove_pointer_t::value_type>) { + meta::print_types, typename std::remove_pointer_t::value_type, + typename std::remove_pointer_t::value_type>{}; + } auto result = sourcePort->connect(*destinationPort); if (result == ConnectionResult::SUCCESS) { auto *sourceNode = findBlock(sourceNodeRaw).get(); auto *destinationNode = findBlock(destinationNodeRaw).get(); + // TODO: Rethink edge definition, indices, message port -1 etc. _edges.emplace_back(sourceNode, PortIndexDefinition{ sourcePortIndex, sourcePortSubIndex }, destinationNode, - PortIndexDefinition{ destinationPortIndex, destinationPortSubIndex }, minBufferSize, weight, name); + PortIndexDefinition{ destinationPortIndex, destinationPortSubIndex }, minBufferSize, weight, edgeName); } return result; @@ -669,7 +702,7 @@ struct Graph { SourceConnector(Graph &_self, Source &_source, Port &_port) : self(_self), source(_source), port(_port) {} - static_assert(traits::port::is_port_v || (sourcePortSubIndex != meta::invalid_index), + static_assert(std::is_same_v || traits::port::is_port_v || (sourcePortSubIndex != meta::invalid_index), "When we have a collection of ports, we need to have an index to access the desired port in the collection"); private: @@ -695,16 +728,27 @@ struct Graph { // connect using the port index template - [[nodiscard, deprecated("For internal use only, the the with the port name should be used")]] auto - to(Destination &destination) { - auto &destinationPort = inputPort(&destination); + [[nodiscard]] auto + to_internal(Destination &destination) { + auto &destinationPort = inputPort(&destination); return to, destinationPortIndex, destinationPortSubIndex>(destination, destinationPort); } + template + [[nodiscard, deprecated("For internal use only, the one with the port name should be used")]] auto + to(Destination &destination) { + return to_internal(destination); + } + template - [[nodiscard, deprecated("For internal use only, the the with the port name should be used")]] auto + [[nodiscard]] auto to(Destination &destination) { - return to(destination); + if constexpr (destinationPortIndex == gr::meta::default_message_port_index) { + return to(destination, destination.msgIn); + + } else { + return to(destination); + } } // connect using the port name @@ -712,14 +756,14 @@ struct Graph { template [[nodiscard]] constexpr auto to(Destination &destination) { - using destination_input_ports = typename traits::block::input_ports; + using destination_input_ports = typename traits::block::all_input_ports; constexpr std::size_t destinationPortIndex = meta::indexForName(); if constexpr (destinationPortIndex == meta::invalid_index) { meta::print_types, Destination, meta::message_type, - meta::message_type<"These are the known names:">, traits::block::input_port_names, meta::message_type<"Full ports info:">, destination_input_ports> + meta::message_type<"These are the known names:">, traits::block::all_input_port_names, meta::message_type<"Full ports info:">, destination_input_ports> port_not_found_error{}; } - return to(destination); + return to_internal(destination); } template @@ -770,6 +814,7 @@ struct Graph { addBlock(std::unique_ptr block) { auto &new_block_ref = _blocks.emplace_back(std::move(block)); new_block_ref->init(progress, ioThreadPool); + // TODO: Should we connectChildMessagePorts for these blocks as well? return *new_block_ref.get(); } @@ -787,12 +832,12 @@ struct Graph { auto & emplaceBlock(const property_map &initialSettings) { static_assert(std::is_same_v>); - auto &new_block_ref = _blocks.emplace_back(std::make_unique>()); - auto raw_ref = static_cast(new_block_ref->raw()); - const auto failed = raw_ref->settings().set(initialSettings); + auto &new_block_ref = _blocks.emplace_back(std::make_unique>()); + auto raw_ref = static_cast(new_block_ref->raw()); + const auto failed = raw_ref->settings().set(initialSettings); if (!failed.empty()) { std::vector keys; - for (const auto& pair : failed) { + for (const auto &pair : failed) { keys.push_back(pair.first); } throw std::invalid_argument(fmt::format("initial Block settings could not be applied successfully - mismatched keys or value-type: {}\n", fmt::join(keys, ", "))); @@ -804,16 +849,26 @@ struct Graph { // connect using the port index template - [[nodiscard, deprecated("For internal use only, the connect with the port name should be used")]] auto - connect(Source &source) { - auto &port_or_collection = outputPort(&source); + [[nodiscard]] auto + connect_internal(Source &source) { + auto &port_or_collection = outputPort(&source); return SourceConnector, sourcePortIndex, sourcePortSubIndex>(*this, source, port_or_collection); } + template + [[nodiscard, deprecated("The connect with the port name should be used")]] auto + connect(Source &source) { + return connect_internal(source); + } + template - [[nodiscard, deprecated("For internal use only, the connect with the port name should be used")]] auto + [[nodiscard]] auto connect(Source &source) { - return connect(source); + if constexpr (sourcePortIndex == meta::default_message_port_index) { + return SourceConnector(*this, source, source.msgOut); + } else { + return connect(source); + } } // connect using the port name @@ -821,14 +876,14 @@ struct Graph { template [[nodiscard]] auto connect(Source &source) { - using source_output_ports = typename traits::block::output_ports; + using source_output_ports = typename traits::block::all_output_ports; constexpr std::size_t sourcePortIndex = meta::indexForName(); if constexpr (sourcePortIndex == meta::invalid_index) { meta::print_types, Source, meta::message_type, - meta::message_type<"These are the known names:">, traits::block::output_port_names, meta::message_type<"Full ports info:">, source_output_ports> + meta::message_type<"These are the known names:">, traits::block::all_output_port_names, meta::message_type<"Full ports info:">, source_output_ports> port_not_found_error{}; } - return connect(source); + return connect_internal(source); } template @@ -837,18 +892,20 @@ struct Graph { return connect(source); } + // dynamic/runtime connections + template requires(!std::is_pointer_v> && !std::is_pointer_v>) ConnectionResult connect(Source &sourceBlockRaw, PortIndexDefinition sourcePortDefinition, Destination &destinationBlockRaw, PortIndexDefinition destinationPortDefinition, - std::size_t minBufferSize = 65536, std::int32_t weight = 0, std::string_view name = "unnamed edge") { + std::size_t minBufferSize = 65536, std::int32_t weight = 0, std::string_view edgeName = "unnamed edge") { auto result = findBlock(sourceBlockRaw) ->dynamicOutputPort(sourcePortDefinition.topLevel, sourcePortDefinition.subIndex) .connect(findBlock(destinationBlockRaw)->dynamicInputPort(destinationPortDefinition.topLevel, destinationPortDefinition.subIndex)); if (result == ConnectionResult::SUCCESS) { auto *sourceBlock = findBlock(sourceBlockRaw).get(); auto *destinationBlock = findBlock(destinationBlockRaw).get(); - _edges.emplace_back(sourceBlock, sourcePortDefinition, destinationBlock, destinationPortDefinition, minBufferSize, weight, name); + _edges.emplace_back(sourceBlock, sourcePortDefinition, destinationBlock, destinationPortDefinition, minBufferSize, weight, edgeName); } return result; } @@ -857,11 +914,17 @@ struct Graph { requires(!std::is_pointer_v> && !std::is_pointer_v>) ConnectionResult connect(Source &sourceBlockRaw, PortIndexDefinition sourcePortDefinition, Destination &destinationBlockRaw, PortIndexDefinition destinationPortDefinition, - std::size_t minBufferSize = 65536, std::int32_t weight = 0, std::string_view name = "unnamed edge") { + std::size_t minBufferSize = 65536, std::int32_t weight = 0, std::string_view edgeName = "unnamed edge") { auto sourcePortIndex = this->findBlock(sourceBlockRaw)->dynamicOutputPortIndex(sourcePortDefinition.topLevel); auto destinationPortIndex = this->findBlock(destinationBlockRaw)->dynamicInputPortIndex(destinationPortDefinition.topLevel); return connect(sourceBlockRaw, { sourcePortIndex, sourcePortDefinition.subIndex }, destinationBlockRaw, { destinationPortIndex, destinationPortDefinition.subIndex }, minBufferSize, weight, - name); + edgeName); + } + + template + void + processMessages(MsgPortInNamed<"__FromChildren"> &port, std::span input) { + static_assert(meta::always_false, "This is not called, children are processed in processScheduledMessages"); } bool @@ -877,16 +940,18 @@ struct Graph { template // TODO: F must be constraint by a descriptive concept void forEachBlock(F &&f) const { - std::for_each(_blocks.cbegin(), _blocks.cend(), [f](const auto &block_ptr) { f(*block_ptr.get()); }); + std::ranges::for_each(_blocks, [f](const auto &block_ptr) { std::invoke(f, *block_ptr.get()); }); } template // TODO: F must be constraint by a descriptive concept void forEachEdge(F &&f) const { - std::for_each(_edges.cbegin(), _edges.cend(), [f](const auto &edge) { f(edge); }); + std::ranges::for_each(_edges, [f](const auto &edge) { std::invoke(f, edge); }); } }; +static_assert(BlockLike); + /*******************************************************************************************************/ /**************************** begin of SIMD-Merged Graph Implementation ********************************/ /*******************************************************************************************************/ @@ -930,18 +995,19 @@ struct Graph { */ template -concept SourceBlockLike = traits::block::can_processOne and traits::block::template output_port_types::size > 0; +concept SourceBlockLike = traits::block::can_processOne and traits::block::template stream_output_port_types::size > 0; static_assert(not SourceBlockLike); template -concept SinkBlockLike = traits::block::can_processOne and traits::block::template input_port_types::size > 0; +concept SinkBlockLike = traits::block::can_processOne and traits::block::template stream_input_port_types::size > 0; static_assert(not SinkBlockLike); template -class MergedGraph : public Block, meta::concat, meta::remove_at>>, - meta::concat>, typename traits::block::output_ports>> { +class MergedGraph + : public Block, meta::concat, meta::remove_at>>, + meta::concat>, typename traits::block::stream_output_ports>> { static std::atomic_size_t _unique_id_counter; public: @@ -950,8 +1016,8 @@ class MergedGraph : public Block, meta::co private: // copy-paste from above, keep in sync - using base = Block, meta::concat, meta::remove_at>>, - meta::concat>, typename traits::block::output_ports>>; + using base = Block, meta::concat, meta::remove_at>>, + meta::concat>, typename traits::block::stream_output_ports>>; Left left; Right right; @@ -983,8 +1049,8 @@ class MergedGraph : public Block, meta::co return std::dynamic_extent; } }(); - return std::min({ traits::block::input_ports::template apply::value, traits::block::output_ports::template apply::value, - left_size, right_size }); + return std::min({ traits::block::stream_input_ports::template apply::value, + traits::block::stream_output_ports::template apply::value, left_size, right_size }); } template @@ -999,8 +1065,8 @@ class MergedGraph : public Block, meta::co constexpr auto apply_right(std::size_t offset, auto &&input_tuple, auto &&tmp) noexcept { return [&](std::index_sequence, std::index_sequence) { - constexpr std::size_t first_offset = traits::block::input_port_types::size; - constexpr std::size_t second_offset = traits::block::input_port_types::size + sizeof...(Is); + constexpr std::size_t first_offset = traits::block::stream_input_port_types::size; + constexpr std::size_t second_offset = traits::block::stream_input_port_types::size + sizeof...(Is); static_assert(second_offset + sizeof...(Js) == std::tuple_size_v>); return invokeProcessOneWithOrWithoutOffset(right, offset, std::get(std::forward(input_tuple))..., std::forward(tmp), std::get(input_tuple)...); @@ -1008,9 +1074,9 @@ class MergedGraph : public Block, meta::co } public: - using TInputPortTypes = typename traits::block::input_port_types; - using TOutputPortTypes = typename traits::block::output_port_types; - using TReturnType = typename traits::block::return_type; + using TInputPortTypes = typename traits::block::stream_input_port_types; + using TOutputPortTypes = typename traits::block::stream_output_port_types; + using TReturnType = typename traits::block::stream_return_type; constexpr MergedGraph(Left l, Right r) : left(std::move(l)), right(std::move(r)) {} @@ -1028,9 +1094,9 @@ class MergedGraph : public Block, meta::co requires traits::block::can_processOne_simd and traits::block::can_processOne_simd constexpr meta::simdize>> processOne(std::size_t offset, const Ts &...inputs) { - static_assert(traits::block::output_port_types::size == 1, "TODO: SIMD for multiple output ports not implemented yet"); - return apply_right::size() - InId - 1>(offset, std::tie(inputs...), - apply_left::size()>(offset, std::tie(inputs...))); + static_assert(traits::block::stream_output_port_types::size == 1, "TODO: SIMD for multiple output ports not implemented yet"); + return apply_right::size() - InId - 1>(offset, std::tie(inputs...), + apply_left::size()>(offset, std::tie(inputs...))); } constexpr auto @@ -1046,7 +1112,7 @@ class MergedGraph : public Block, meta::co }) { return invokeProcessOneWithOrWithoutOffset(right, offset, left.processOne_simd(N)); } else { - using LeftResult = typename traits::block::return_type; + using LeftResult = typename traits::block::stream_return_type; using V = meta::simdize; alignas(stdx::memory_alignment_v) LeftResult tmp[V::size()]; for (std::size_t i = 0; i < V::size(); ++i) { @@ -1064,33 +1130,34 @@ class MergedGraph : public Block, meta::co // if (sizeof...(Ts) == 0) we could call `return processOne_simd(integral_constant)`. But if // the caller expects to process *one* sample (no inputs for the caller to explicitly // request simd), and we process more, we risk inconsistencies. - if constexpr (traits::block::output_port_types::size == 1) { + if constexpr (traits::block::stream_output_port_types::size == 1) { // only the result from the right block needs to be returned - return apply_right::size() - InId - 1>(offset, std::forward_as_tuple(std::forward(inputs)...), - apply_left::size()>(offset, std::forward_as_tuple( - std::forward(inputs)...))); + return apply_right::size() - InId + - 1>(offset, std::forward_as_tuple(std::forward(inputs)...), + apply_left::size()>(offset, std::forward_as_tuple(std::forward(inputs)...))); } else { // left produces a tuple - auto left_out = apply_left::size()>(offset, std::forward_as_tuple(std::forward(inputs)...)); - auto right_out = apply_right::size() - InId - 1>(offset, std::forward_as_tuple(std::forward(inputs)...), - std::move(std::get(left_out))); + auto left_out = apply_left::size()>(offset, std::forward_as_tuple(std::forward(inputs)...)); + auto right_out = apply_right::size() - InId - 1>(offset, std::forward_as_tuple(std::forward(inputs)...), + std::move(std::get(left_out))); - if constexpr (traits::block::output_port_types::size == 2 && traits::block::output_port_types::size == 1) { + if constexpr (traits::block::stream_output_port_types::size == 2 && traits::block::stream_output_port_types::size == 1) { return std::make_tuple(std::move(std::get(left_out)), std::move(right_out)); - } else if constexpr (traits::block::output_port_types::size == 2) { + } else if constexpr (traits::block::stream_output_port_types::size == 2) { return std::tuple_cat(std::make_tuple(std::move(std::get(left_out))), std::move(right_out)); - } else if constexpr (traits::block::output_port_types::size == 1) { + } else if constexpr (traits::block::stream_output_port_types::size == 1) { return [&](std::index_sequence, std::index_sequence) { return std::make_tuple(std::move(std::get(left_out))..., std::move(std::get(left_out))..., std::move(right_out)); - }(std::make_index_sequence(), std::make_index_sequence::size - OutId - 1>()); + }(std::make_index_sequence(), std::make_index_sequence::size - OutId - 1>()); } else { return [&](std::index_sequence, std::index_sequence, std::index_sequence) { return std::make_tuple(std::move(std::get(left_out))..., std::move(std::get(left_out))..., std::move(std::get(right_out)...)); - }(std::make_index_sequence(), std::make_index_sequence::size - OutId - 1>(), std::make_index_sequence()); + }(std::make_index_sequence(), std::make_index_sequence::size - OutId - 1>(), + std::make_index_sequence()); } } } // end:: processOne @@ -1129,13 +1196,13 @@ inline std::atomic_size_t MergedGraph::_unique_id_coun template constexpr auto mergeByIndex(A &&a, B &&b) -> MergedGraph, std::remove_cvref_t, OutId, InId> { - if constexpr (!std::is_same_v>::template at, - typename traits::block::input_port_types>::template at>) { - gr::meta::print_types, typename traits::block::output_port_types>, std::integral_constant, - typename traits::block::output_port_types>::template at, + if constexpr (!std::is_same_v>::template at, + typename traits::block::stream_input_port_types>::template at>) { + gr::meta::print_types, typename traits::block::stream_output_port_types>, std::integral_constant, + typename traits::block::stream_output_port_types>::template at, - gr::meta::message_type<"INPUT_PORTS_ARE:">, typename traits::block::input_port_types>, std::integral_constant, - typename traits::block::input_port_types>::template at>{}; + gr::meta::message_type<"INPUT_PORTS_ARE:">, typename traits::block::stream_input_port_types>, std::integral_constant, + typename traits::block::stream_input_port_types>::template at>{}; } return { std::forward(a), std::forward(b) }; } @@ -1165,14 +1232,14 @@ mergeByIndex(A &&a, B &&b) -> MergedGraph, std::remove_cv template constexpr auto merge(A &&a, B &&b) { - constexpr int OutIdUnchecked = meta::indexForName>(); - constexpr int InIdUnchecked = meta::indexForName>(); + constexpr int OutIdUnchecked = meta::indexForName>(); + constexpr int InIdUnchecked = meta::indexForName>(); static_assert(OutIdUnchecked != -1); static_assert(InIdUnchecked != -1); constexpr auto OutId = static_cast(OutIdUnchecked); constexpr auto InId = static_cast(InIdUnchecked); - static_assert(std::same_as>::template at, - typename traits::block::input_port_types>::template at>, + static_assert(std::same_as>::template at, + typename traits::block::stream_input_port_types>::template at>, "Port types do not match"); return MergedGraph, std::remove_cvref_t, OutId, InId>{ std::forward(a), std::forward(b) }; } @@ -1203,8 +1270,8 @@ namespace gr { #if !DISABLE_SIMD namespace test { -static_assert(traits::block::input_port_types::size() == 1); -static_assert(std::same_as, float>); +static_assert(traits::block::stream_input_port_types::size() == 1); +static_assert(std::same_as, float>); static_assert(traits::block::can_processOne_scalar); static_assert(traits::block::can_processOne_simd); static_assert(traits::block::can_processOne_scalar_with_offset(copy(), copy()))>); diff --git a/core/include/gnuradio-4.0/Graph_yaml_importer.hpp b/core/include/gnuradio-4.0/Graph_yaml_importer.hpp index 8d4c167f..c298f22e 100644 --- a/core/include/gnuradio-4.0/Graph_yaml_importer.hpp +++ b/core/include/gnuradio-4.0/Graph_yaml_importer.hpp @@ -10,7 +10,7 @@ #pragma GCC diagnostic pop #include "Graph.hpp" -#include "plugin_loader.hpp" +#include "PluginLoader.hpp" namespace gr { @@ -76,7 +76,8 @@ struct YamlMap { } }; -inline std::size_t parseIndex(std::string_view str) { +inline std::size_t +parseIndex(std::string_view str) { std::size_t index{}; auto [_, src_ec] = std::from_chars(str.begin(), str.end(), index); if (src_ec != std::errc()) { @@ -88,29 +89,29 @@ inline std::size_t parseIndex(std::string_view str) { } // namespace detail inline gr::Graph -load_grc(plugin_loader &loader, const std::string &yaml_source) { - Graph testGraph; +load_grc(PluginLoader &loader, const std::string &yaml_source) { + Graph testGraph; std::map createdBlocks; - YAML::Node tree = YAML::Load(yaml_source); - auto blocks = tree["blocks"]; + YAML::Node tree = YAML::Load(yaml_source); + auto blocks = tree["blocks"]; for (const auto &grc_block : blocks) { auto name = grc_block["name"].as(); auto id = grc_block["id"].as(); // TODO: Discuss how GRC should store the node types, how we should // in general handle nodes that are parametrised by more than one type - auto ¤tBlock = loader.instantiate_in_graph(testGraph, id, "double"); + auto ¤tBlock = loader.instantiateInGraph(testGraph, id, "double"); currentBlock.setName(name); - createdBlocks[name] = ¤tBlock; + createdBlocks[name] = ¤tBlock; - auto currentBlock_settings = currentBlock.settings().get(); + auto currentBlock_settings = currentBlock.settings().get(); property_map new_properties; - auto parameters = grc_block["parameters"]; + auto parameters = grc_block["parameters"]; if (parameters && parameters.IsMap()) { for (const auto &kv : parameters) { const auto &key = kv.first.as(); @@ -183,7 +184,7 @@ load_grc(plugin_loader &loader, const std::string &yaml_source) { } struct result { - decltype(node) block_it; + decltype(node) block_it; PortIndexDefinition port_definition; }; @@ -205,7 +206,6 @@ load_grc(plugin_loader &loader, const std::string &yaml_source) { auto dst = parseBlock_port(connection[2], connection[3]); testGraph.connect(*src.block_it->second, src.port_definition, *dst.block_it->second, dst.port_definition); } else { - } } @@ -221,7 +221,7 @@ save_grc(const gr::Graph &testGraph) { root.write_fn("blocks", [&]() { detail::YamlSeq nodes(out); - auto writeBlock = [&](const auto &node) { + auto writeBlock = [&](const auto &node) { detail::YamlMap map(out); map.write("name", std::string(node.name())); diff --git a/core/include/gnuradio-4.0/Message.hpp b/core/include/gnuradio-4.0/Message.hpp new file mode 100644 index 00000000..73530c46 --- /dev/null +++ b/core/include/gnuradio-4.0/Message.hpp @@ -0,0 +1,43 @@ +#ifndef GNURADIO_MESSAGE_HPP +#define GNURADIO_MESSAGE_HPP + +#include "gnuradio-4.0/Tag.hpp" +#include + +#include + +namespace gr { + +namespace message::key { +const std::string Sender = "SENDER_KEY"; +const std::string Target = "TARGET_KEY"; +const std::string Kind = "KIND_KEY"; +const std::string What = "WHAT_KEY"; +const std::string Data = "DATA_KEY"; +const std::string Location = "LOCATION_KEY"; +const std::string ErrorInfo = "ERROR_INFO_KEY"; // optional: if a message has an additional error information +} // namespace message::key + +namespace message::kind { +const std::string Error = "ERROR_KIND"; +const std::string Graph_update = "GRAPH_UPDATE_KIND"; +const std::string UpdateSettings = "UPDATE_SETTINGS_KIND"; +const std::string SettingsChanged = "SETTINGS_CHANGED_KIND"; +} // namespace message::kind + +using Message = property_map; + +template +std::optional +messageField(const Message &message, const std::string &key) { + auto it = message.find(key); + if (it == message.end()) { + return {}; + } + + return std::get(it->second); +} + +} // namespace gr + +#endif // include guard diff --git a/core/include/gnuradio-4.0/plugin_loader.hpp b/core/include/gnuradio-4.0/PluginLoader.hpp similarity index 70% rename from core/include/gnuradio-4.0/plugin_loader.hpp rename to core/include/gnuradio-4.0/PluginLoader.hpp index 1439288d..5da6c782 100644 --- a/core/include/gnuradio-4.0/plugin_loader.hpp +++ b/core/include/gnuradio-4.0/PluginLoader.hpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -15,7 +16,7 @@ #include "Graph.hpp" #include "plugin.hpp" -using plugin_create_function_t = void (*)(gp_plugin_base **); +using plugin_create_function_t = void (*)(gp_plugin_base **); using plugin_destroy_function_t = void (*)(gp_plugin_base *); namespace gr { @@ -26,14 +27,14 @@ using namespace std::string_view_literals; #ifndef __EMSCRIPTEN__ // Plugins are not supported on WASM -class plugin_handler { +class PluginHandler { private: void *_dl_handle = nullptr; plugin_create_function_t _create_fn = nullptr; plugin_destroy_function_t _destroy_fn = nullptr; gp_plugin_base *_instance = nullptr; - std::string _status; + std::string _status; void release() { @@ -49,9 +50,9 @@ class plugin_handler { } public: - plugin_handler() = default; + PluginHandler() = default; - explicit plugin_handler(const std::string &plugin_file) { + explicit PluginHandler(const std::string &plugin_file) { _dl_handle = dlopen(plugin_file.c_str(), RTLD_LAZY); if (!_dl_handle) { _status = "Failed to load the plugin file"; @@ -86,19 +87,19 @@ class plugin_handler { } } - plugin_handler(const plugin_handler &other) = delete; - plugin_handler & - operator=(const plugin_handler &other) + PluginHandler(const PluginHandler &other) = delete; + PluginHandler & + operator=(const PluginHandler &other) = delete; - plugin_handler(plugin_handler &&other) noexcept + PluginHandler(PluginHandler &&other) noexcept : _dl_handle(std::exchange(other._dl_handle, nullptr)) , _create_fn(std::exchange(other._create_fn, nullptr)) , _destroy_fn(std::exchange(other._destroy_fn, nullptr)) , _instance(std::exchange(other._instance, nullptr)) {} - plugin_handler & - operator=(plugin_handler &&other) noexcept { + PluginHandler & + operator=(PluginHandler &&other) noexcept { auto tmp = std::move(other); std::swap(_dl_handle, tmp._dl_handle); std::swap(_create_fn, tmp._create_fn); @@ -107,7 +108,7 @@ class plugin_handler { return *this; } - ~plugin_handler() { release(); } + ~PluginHandler() { release(); } explicit operator bool() const { @@ -125,17 +126,18 @@ class plugin_handler { } }; -class plugin_loader { +class PluginLoader { private: - std::vector _handlers; - std::unordered_map _handler_for_name; - std::unordered_map _failed_plugins; + std::vector _handlers; + std::unordered_map _handlerForName; + std::unordered_map _failedPlugins; + std::unordered_set _loadedPluginFiles; - BlockRegistry *_global_registry; - std::vector _knownBlocks; + BlockRegistry *_globalRegistry; + std::vector _knownBlocks; public: - plugin_loader(BlockRegistry *global_registry, std::span plugin_directories) : _global_registry(global_registry) { + PluginLoader(BlockRegistry *global_registry, std::span plugin_directories) : _globalRegistry(global_registry) { for (const auto &directory : plugin_directories) { std::cerr << std::filesystem::current_path() << std::endl; @@ -143,16 +145,20 @@ class plugin_loader { for (const auto &file : std::filesystem::directory_iterator{ directory }) { if (file.is_regular_file() && file.path().extension() == ".so") { - if (plugin_handler handler(file.path().string()); handler) { + auto fileString = file.path().string(); + if (_loadedPluginFiles.contains(fileString)) continue; + _loadedPluginFiles.insert(fileString); + + if (PluginHandler handler(file.path().string()); handler) { for (const auto &block_name : handler->providedBlocks()) { - _handler_for_name.emplace(std::string(block_name), handler.operator->()); + _handlerForName.emplace(std::string(block_name), handler.operator->()); _knownBlocks.emplace_back(block_name); } _handlers.push_back(std::move(handler)); } else { - _failed_plugins[file.path()] = handler.status(); + _failedPlugins[file.path()] = handler.status(); } } } @@ -166,13 +172,13 @@ class plugin_loader { const auto & failed_plugins() const { - return _failed_plugins; + return _failedPlugins; } auto knownBlocks() const { auto result = _knownBlocks; - const auto &builtin = _global_registry->knownBlocks(); + const auto &builtin = _globalRegistry->knownBlocks(); result.insert(result.end(), builtin.begin(), builtin.end()); return result; } @@ -180,11 +186,11 @@ class plugin_loader { std::unique_ptr instantiate(std::string_view name, std::string_view type, const property_map ¶ms = {}) { // Try to create a node from the global registry - if (auto result = _global_registry->createBlock(name, type, params)) { + if (auto result = _globalRegistry->createBlock(name, type, params)) { return result; } - auto it = _handler_for_name.find(std::string(name)); // TODO avoid std::string here - if (it == _handler_for_name.end()) return {}; + auto it = _handlerForName.find(std::string(name)); // TODO avoid std::string here + if (it == _handlerForName.end()) return {}; auto &handler = it->second; @@ -193,7 +199,7 @@ class plugin_loader { template gr::BlockModel & - instantiate_in_graph(Graph &graph, InstantiateArgs &&...args) { + instantiateInGraph(Graph &graph, InstantiateArgs &&...args) { auto block_load = instantiate(std::forward(args)...); if (!block_load) { throw fmt::format("Unable to create node"); diff --git a/core/include/gnuradio-4.0/Port.hpp b/core/include/gnuradio-4.0/Port.hpp index 53377d05..14b72658 100644 --- a/core/include/gnuradio-4.0/Port.hpp +++ b/core/include/gnuradio-4.0/Port.hpp @@ -10,6 +10,7 @@ #include "annotated.hpp" #include "CircularBuffer.hpp" #include "DataSet.hpp" +#include "Message.hpp" #include "Tag.hpp" namespace gr { @@ -28,7 +29,8 @@ enum class ConnectionResult { SUCCESS, FAILED }; enum class PortType { STREAM, /*!< used for single-producer-only ond usually synchronous one-to-one or one-to-many communications */ - MESSAGE /*!< used for multiple-producer one-to-one, one-to-many, many-to-one, or many-to-many communications */ + MESSAGE, /*!< used for multiple-producer one-to-one, one-to-many, many-to-one, or many-to-many communications */ + ANY // 'ANY' only for querying and not to be used for port declarations }; /** @@ -151,9 +153,12 @@ using is_tag_buffer_attribute = std::bool_constant>; template struct DefaultStreamBuffer : StreamBufferType> {}; +struct DefaultMessageBuffer : StreamBufferType> {}; + struct DefaultTagBuffer : TagBufferType> {}; static_assert(is_stream_buffer_attribute>::value); +static_assert(is_stream_buffer_attribute::value); static_assert(!is_stream_buffer_attribute::value); static_assert(!is_tag_buffer_attribute>::value); static_assert(is_tag_buffer_attribute::value); @@ -204,6 +209,7 @@ struct Port { using with_name_and_descriptor = Port; static_assert(portDirection != PortDirection::ANY, "ANY reserved for queries and not port direction declarations"); + static_assert(portType != PortType::ANY, "ANY reserved for queries and not port type declarations"); using value_type = T; using AttributeTypeList = typename gr::meta::typelist; @@ -320,18 +326,7 @@ struct Port { , _tagIoHandler(std::move(other._tagIoHandler)) {} constexpr Port & - operator=(Port &&other) noexcept { - Port tmp(std::move(other)); - std::swap(name, tmp._name); - std::swap(min_samples, tmp._min_samples); - std::swap(max_samples, tmp._max_samples); - std::swap(priority, tmp._priority); - - std::swap(_connected, tmp._connected); - std::swap(_ioHandler, tmp._ioHandler); - std::swap(_tagIoHandler, tmp._tagIoHandler); - return *this; - } + operator=(Port &&other) = delete; ~Port() = default; @@ -434,7 +429,7 @@ struct Port { buffer() { struct port_buffers { BufferType streamBuffer; - TagBufferType tagBufferType; + TagBufferType tagBuffer; }; return port_buffers{ _ioHandler.buffer(), _tagIoHandler.buffer() }; @@ -448,7 +443,7 @@ struct Port { _connected = true; } else { _ioHandler = streamBuffer.new_writer(); - _tagIoHandler = tagBuffer.new_reader(); + _tagIoHandler = tagBuffer.new_writer(); } } @@ -651,25 +646,27 @@ template using PortIn = Port; template using PortOut = Port; -template -using MsgPortIn = Port; -template -using MsgPortOut = Port; - template using PortInNamed = Port; template using PortOutNamed = Port; + +template +using MsgPortIn = Port; +template +using MsgPortOut = Port; template -using MsgPortInNamed = Port; +using MsgPortInNamed = Port; template -using MsgPortOutNamed = Port; +using MsgPortOutNamed = Port; static_assert(PortLike>); static_assert(PortLike())>); static_assert(PortLike>); -static_assert(PortLike>); -static_assert(PortLike>); +static_assert(PortLike>); +static_assert(PortLike>); + +static_assert(std::is_same_v::BufferType, gr::CircularBuffer>); static_assert(PortIn>::Required::kMinSamples == 1); static_assert(PortIn>::Required::kMaxSamples == 2); diff --git a/core/include/gnuradio-4.0/PortTraits.hpp b/core/include/gnuradio-4.0/PortTraits.hpp index 74ea0f60..ba8057a1 100644 --- a/core/include/gnuradio-4.0/PortTraits.hpp +++ b/core/include/gnuradio-4.0/PortTraits.hpp @@ -50,6 +50,58 @@ using is_port = std::integral_constant>; template concept is_port_collection_v = is_port_v; +template +auto +unwrap_port_helper() { + if constexpr (port::is_port_v) { + return static_cast(nullptr); + } else if constexpr (port::is_port_collection_v) { + return static_cast(nullptr); + } else { + meta::print_types, T>{}; + } +} + +template +using unwrap_port = std::remove_pointer_t())>; + +struct kind { + template + static constexpr auto + value_helper() { + if constexpr (std::is_same_v) { + return gr::PortType::MESSAGE; + } else { + return gr::PortType::STREAM; + } + } + + template + struct tester_for { + template + static constexpr bool matches_kind = portType == PortType::ANY || kind::value_helper() == portType; + + template + constexpr static bool + is_port_or_collection_helper() { + if constexpr (port::is_port_v || port::is_port_collection_v) { + return matches_kind>; + } else { + return false; + } + } + + template + using is_port_or_collection = std::integral_constant()>; + + template + using is_input_port_or_collection = std::integral_constant() && port::is_input_v>>; + + template + using is_output_port_or_collection = std::integral_constant() && port::is_output_v>>; + }; +}; + template auto type_helper() { @@ -69,6 +121,9 @@ struct min_samples : std::integral_constant struct max_samples : std::integral_constant {}; +template +constexpr bool is_not_any_port_or_collection = !gr::traits::port::kind::tester_for::is_port_or_collection(); + } // namespace gr::traits::port #endif // include guard diff --git a/core/include/gnuradio-4.0/Scheduler.hpp b/core/include/gnuradio-4.0/Scheduler.hpp index 4dea5404..88827c58 100644 --- a/core/include/gnuradio-4.0/Scheduler.hpp +++ b/core/include/gnuradio-4.0/Scheduler.hpp @@ -29,7 +29,13 @@ class SchedulerBase { std::atomic_size_t _running_threads; std::atomic_bool _stop_requested; + MsgPortOutNamed<"__ForChildren"> _toChildMessagePort; + MsgPortInNamed<"__FromChildren"> _fromChildMessagePort; + public: + MsgPortInNamed<"__Builtin"> msgIn; + MsgPortOutNamed<"__Builtin"> msgOut; + explicit SchedulerBase(gr::Graph &&graph, std::shared_ptr thread_pool = std::make_shared("simple-scheduler-pool", thread_pool::CPU_BOUND), const profiling::Options &profiling_options = {}) : _graph(std::move(graph)), _profiler{ profiling_options }, _profiler_handler{ _profiler.forThisThread() }, _pool(std::move(thread_pool)) {} @@ -38,27 +44,27 @@ class SchedulerBase { void startBlocks() { - std::ranges::for_each(this->_graph.blocks(), [](auto &b) { b->start(); }); + _graph.forEachBlock(&BlockModel::start); } void stopBlocks() { - std::ranges::for_each(this->_graph.blocks(), [](auto &b) { b->stop(); }); + _graph.forEachBlock(&BlockModel::stop); } void pauseBlocks() { - std::ranges::for_each(this->_graph.blocks(), [](auto &b) { b->pause(); }); + _graph.forEachBlock(&BlockModel::pause); } void resumeBlocks() { - std::ranges::for_each(this->_graph.blocks(), [](auto &b) { b->resume(); }); + _graph.forEachBlock(&BlockModel::resume); } void resetBlocks() { - std::ranges::for_each(this->_graph.blocks(), [](auto &b) { b->reset(); }); + _graph.forEachBlock(&BlockModel::reset); } bool @@ -134,6 +140,37 @@ class SchedulerBase { _state = lifecycle::State::REQUESTED_PAUSE; } + void + connectBlockMessagePorts() { + _graph.forEachBlock([this](auto &block) { + if (ConnectionResult::SUCCESS != _toChildMessagePort.connect(*block.msgIn)) { + throw fmt::format("Failed to connect scheduler output message port to child {}", block.uniqueName()); + } + + auto buffer = _fromChildMessagePort.buffer(); + block.msgOut->setBuffer(buffer.streamBuffer, buffer.tagBuffer); + }); + } + + void + processScheduledMessages() { + // Process messages in scheduler + auto passMessages = [](auto &inPort, auto &outPort) { + auto &reader = inPort.streamReader(); + if (const auto available = reader.available(); available > 0) { + const auto &input = reader.get(available); + outPort.streamWriter().publish([&](auto &output) { std::ranges::copy(input, output.begin()); }, available); + } + }; + + passMessages(msgIn, _toChildMessagePort); + passMessages(_fromChildMessagePort, msgOut); + + // Process messages in the graph + _graph.processScheduledMessages(); + _graph.forEachBlock(&BlockModel::processScheduledMessages); + } + void init() { [[maybe_unused]] const auto pe = _profiler_handler.startCompleteEvent("scheduler_base.init"); @@ -141,6 +178,9 @@ class SchedulerBase { return; } const auto result = _graph.performConnections(); + + connectBlockMessagePorts(); + if (result) { _state = lifecycle::State::INITIALISED; } else { @@ -148,6 +188,11 @@ class SchedulerBase { } } + auto & + graph() { + return _graph; + } + void reset() { if (!canReset()) { @@ -256,6 +301,8 @@ class Simple : public SchedulerBase { constexpr std::size_t requestedWorkAllBlocks = std::numeric_limits::max(); std::size_t performedWorkAllBlocks = 0UZ; + this->processScheduledMessages(); + bool something_happened = false; for (auto ¤tBlock : blocks) { const auto [requested_work, performed_work, status] = currentBlock->work(requestedWorkAllBlocks); diff --git a/core/include/gnuradio-4.0/Settings.hpp b/core/include/gnuradio-4.0/Settings.hpp index 27a35928..ee07dddc 100644 --- a/core/include/gnuradio-4.0/Settings.hpp +++ b/core/include/gnuradio-4.0/Settings.hpp @@ -270,7 +270,7 @@ class BasicSettings : public SettingsBase { auto iterate_over_member = [&](auto member) { using RawType = std::remove_cvref_t; using Type = unwrap_if_wrapped_t; - if constexpr (!traits::block::detail::is_port_or_collection() && !std::is_const_v && is_writable(member) && isSupportedType()) { + if constexpr (traits::port::is_not_any_port_or_collection && !std::is_const_v && is_writable(member) && isSupportedType()) { if (default_tag == get_display_name(member)) { _auto_forward.emplace(get_display_name(member)); } @@ -360,7 +360,7 @@ class BasicSettings : public SettingsBase { bool is_set = false; auto iterate_over_member = [&, this](auto member) { using Type = unwrap_if_wrapped_t>; - if constexpr (!traits::block::detail::is_port_or_collection() && !std::is_const_v && is_writable(member) && isSupportedType()) { + if constexpr (traits::port::is_not_any_port_or_collection && !std::is_const_v && is_writable(member) && isSupportedType()) { if (std::string(get_display_name(member)) == key && std::holds_alternative(value)) { if (_auto_update.contains(key)) { _auto_update.erase(key); @@ -418,7 +418,7 @@ class BasicSettings : public SettingsBase { const auto &value = localValue; auto iterate_over_member = [&](auto member) { using Type = unwrap_if_wrapped_t>; - if constexpr (!traits::block::detail::is_port_or_collection() && !std::is_const_v && is_writable(member) && isSupportedType()) { + if constexpr (traits::port::is_not_any_port_or_collection && !std::is_const_v && is_writable(member) && isSupportedType()) { if (std::string(get_display_name(member)) == key && std::holds_alternative(value)) { _staged.insert_or_assign(key, value); SettingsBase::_changed.store(true); @@ -509,7 +509,7 @@ class BasicSettings : public SettingsBase { auto apply_member_changes = [&key, &staged, &forward_parameters, &staged_value, this](auto member) { using RawType = std::remove_cvref_t; using Type = unwrap_if_wrapped_t; - if constexpr (!traits::block::detail::is_port_or_collection() && !std::is_const_v && is_writable(member) && isSupportedType()) { + if constexpr (traits::port::is_not_any_port_or_collection && !std::is_const_v && is_writable(member) && isSupportedType()) { if (std::string(get_display_name(member)) == key && std::holds_alternative(staged_value)) { if constexpr (is_annotated()) { if (member(*_block).validate_and_set(std::get(staged_value))) { @@ -554,7 +554,7 @@ class BasicSettings : public SettingsBase { // update active parameters auto update_active = [this](auto member) { using Type = unwrap_if_wrapped_t>; - if constexpr (!traits::block::detail::is_port_or_collection() && is_readable(member) && isSupportedType()) { + if constexpr (traits::port::is_not_any_port_or_collection && is_readable(member) && isSupportedType()) { _active.insert_or_assign(get_display_name(member), pmtv::pmt(member(*_block))); } }; @@ -593,7 +593,7 @@ class BasicSettings : public SettingsBase { auto iterate_over_member = [&, this](auto member) { using Type = unwrap_if_wrapped_t>; - if constexpr ((!traits::block::detail::is_port_or_collection()) && is_readable(member) && isSupportedType()) { + if constexpr (traits::port::is_not_any_port_or_collection && is_readable(member) && isSupportedType()) { _active.insert_or_assign(get_display_name_const(member).str(), member(*_block)); } }; @@ -612,7 +612,7 @@ class BasicSettings : public SettingsBase { auto iterate_over_member = [&, this](auto member) { using Type = unwrap_if_wrapped_t>; - if constexpr (!traits::block::detail::is_port_or_collection() && is_readable(member) && isSupportedType()) { + if constexpr (traits::port::is_not_any_port_or_collection && is_readable(member) && isSupportedType()) { oldSettings.insert_or_assign(get_display_name(member), pmtv::pmt(member(*_block))); } }; diff --git a/core/include/gnuradio-4.0/Transactions.hpp b/core/include/gnuradio-4.0/Transactions.hpp index 1693459e..4986a009 100644 --- a/core/include/gnuradio-4.0/Transactions.hpp +++ b/core/include/gnuradio-4.0/Transactions.hpp @@ -17,6 +17,7 @@ #include #include +#include "PortTraits.hpp" #include "Settings.hpp" #include "Tag.hpp" @@ -317,7 +318,7 @@ class CtxSettings : public SettingsBase { auto apply_member_changes = [&key, &staged, &forward_parameters, &staged_value, this](auto member) { using RawType = std::remove_cvref_t; using Type = unwrap_if_wrapped_t; - if constexpr (!traits::block::detail::is_port_or_collection() && !std::is_const_v && is_writable(member) && isSupportedType()) { + if constexpr (traits::port::is_not_any_port_or_collection && !std::is_const_v && is_writable(member) && isSupportedType()) { if (std::string(get_display_name(member)) == key && std::holds_alternative(staged_value)) { if constexpr (is_annotated()) { if (member(*_block).validate_and_set(std::get(staged_value))) { diff --git a/core/include/gnuradio-4.0/plugin.hpp b/core/include/gnuradio-4.0/plugin.hpp index 31665d1f..9161f733 100644 --- a/core/include/gnuradio-4.0/plugin.hpp +++ b/core/include/gnuradio-4.0/plugin.hpp @@ -66,8 +66,9 @@ class plugin : public gp_plugin_base { template typename TBlock, typename... Args> void - add_block_type(std::string block_type) { - registry.add_block_type(std::move(block_type)); + addBlockType(std::string block_type) { + std::cout << "New block type: " << block_type << std::endl; + registry.addBlockType(std::move(block_type)); } }; @@ -101,6 +102,6 @@ class plugin : public gp_plugin_base { } \ } -#define GP_PLUGIN_REGISTER_NODE(...) GP_REGISTER_NODE(gp_plugin_instance(), __VA_ARGS__); +#define GP_PLUGIN_REGISTER_BLOCK(...) GP_REGISTER_BLOCK(gp_plugin_instance(), __VA_ARGS__); #endif // include guard diff --git a/core/include/gnuradio-4.0/reflection.hpp b/core/include/gnuradio-4.0/reflection.hpp index c3a619f8..7faa344d 100644 --- a/core/include/gnuradio-4.0/reflection.hpp +++ b/core/include/gnuradio-4.0/reflection.hpp @@ -89,16 +89,16 @@ #define GP_CONCAT_IMPL(x, y) x##y #define GP_MACRO_CONCAT(x, y) GP_CONCAT_IMPL(x, y) -#define GP_REGISTER_NODE_IMPL(Register, Name, ...) gr::detail::RegisterBlock GP_MACRO_CONCAT(GP_REGISTER_NODE_, __COUNTER__)(Register, #Name); -#define GP_REGISTER_NODE(Register, Name, ...) \ +#define GP_REGISTER_BLOCK_IMPL(Register, Name, ...) gr::detail::RegisterBlock GP_MACRO_CONCAT(GP_REGISTER_BLOCK_, __COUNTER__)(Register, #Name); +#define GP_REGISTER_BLOCK(Register, Name, ...) \ namespace { \ using gr::detail::BlockParameters; \ - GP_REGISTER_NODE_IMPL(Register, Name, __VA_ARGS__); \ + GP_REGISTER_BLOCK_IMPL(Register, Name, __VA_ARGS__); \ } -#define GP_REGISTER_NODE_RUNTIME(Register, Name, ...) \ +#define GP_REGISTER_BLOCK_RUNTIME(Register, Name, ...) \ { \ using gr::detail::BlockParameters; \ - GP_REGISTER_NODE_IMPL(Register, Name, __VA_ARGS__); \ + GP_REGISTER_BLOCK_IMPL(Register, Name, __VA_ARGS__); \ } #pragma GCC diagnostic pop diff --git a/core/src/main.cpp b/core/src/main.cpp index 53b7db13..d12698e8 100644 --- a/core/src/main.cpp +++ b/core/src/main.cpp @@ -68,7 +68,7 @@ class duplicate : public grg::Block, gr::meta::typelist, gr::meta::typelist>, grg::repeated_ports>; public: - using return_type = typename grg::traits::block::return_type; + using return_type = typename grg::traits::block::stream_return_type; [[nodiscard]] constexpr return_type processOne(T a) const noexcept { diff --git a/core/test/CMakeLists.txt b/core/test/CMakeLists.txt index 2fc97ebc..41f72cff 100644 --- a/core/test/CMakeLists.txt +++ b/core/test/CMakeLists.txt @@ -38,6 +38,7 @@ add_ut_test(qa_Scheduler) add_ut_test(qa_reader_writer_lock) add_ut_test(qa_Settings) add_ut_test(qa_Tags) +add_ut_test(qa_Messages) add_ut_test(qa_port_array) add_ut_test(qa_thread_affinity) add_ut_test(qa_thread_pool) diff --git a/core/test/app_plugins_test.cpp b/core/test/app_plugins_test.cpp index ba460603..48244318 100644 --- a/core/test/app_plugins_test.cpp +++ b/core/test/app_plugins_test.cpp @@ -5,7 +5,7 @@ #include #include -#include +#include #include @@ -19,7 +19,7 @@ struct TestContext { explicit TestContext(std::vector paths) : registry(), loader(®istry, std::move(paths)) {} grg::BlockRegistry registry; - grg::plugin_loader loader; + grg::PluginLoader loader; }; namespace names { @@ -37,6 +37,8 @@ main(int argc, char *argv[]) { if (argc < 2) { paths.emplace_back("test/plugins"); paths.emplace_back("plugins"); + paths.emplace_back("core/test/plugins"); + paths.emplace_back("plugins"); } else { for (int i = 1; i < argc; ++i) { paths.emplace_back(argv[i]); @@ -60,6 +62,10 @@ main(int argc, char *argv[]) { auto known = context.loader.knownBlocks(); std::vector requireds{ names::cout_sink, names::fixed_source, names::divide, names::multiply }; + for (const auto &plugin : known) { + fmt::print("Known: {}\n", plugin); + } + for (const auto &required [[maybe_unused]] : requireds) { assert(std::ranges::find(known, required) != known.end()); } @@ -67,7 +73,7 @@ main(int argc, char *argv[]) { grg::Graph testGraph; // Instantiate the node that is defined in a plugin - auto &block_source = context.loader.instantiate_in_graph(testGraph, names::fixed_source, "double"); + auto &block_source = context.loader.instantiateInGraph(testGraph, names::fixed_source, "double"); // Instantiate a built-in node in a static way gr::property_map block_multiply_1_params; @@ -75,8 +81,8 @@ main(int argc, char *argv[]) { auto &block_multiply_1 = testGraph.emplaceBlock>(block_multiply_1_params); // Instantiate a built-in node via the plugin loader - auto &block_multiply_2 = context.loader.instantiate_in_graph(testGraph, names::builtin_multiply, "double"); - auto &block_counter = context.loader.instantiate_in_graph(testGraph, names::builtin_counter, "double"); + auto &block_multiply_2 = context.loader.instantiateInGraph(testGraph, names::builtin_multiply, "double"); + auto &block_counter = context.loader.instantiateInGraph(testGraph, names::builtin_counter, "double"); // const std::size_t repeats = 100; diff --git a/core/test/plugins/good_base_plugin.cpp b/core/test/plugins/good_base_plugin.cpp index 8c643a07..864f63b1 100644 --- a/core/test/plugins/good_base_plugin.cpp +++ b/core/test/plugins/good_base_plugin.cpp @@ -62,7 +62,7 @@ class fixed_source : public grg::Block, grg::PortOutNamed(this); + auto &port = gr::outputPort<0, gr::PortType::STREAM>(this); auto &writer = port.streamWriter(); auto data = writer.reserve_output_range(1UZ); data[0] = value; @@ -79,8 +79,32 @@ class fixed_source : public grg::Block, grg::PortOutNamed>::size == 1); +static_assert(std::is_same_v>, gr::meta::typelist>); +static_assert(bts::stream_input_ports>::size == 1); +static_assert(std::is_same_v>, gr::meta::typelist>); + +static_assert(bts::all_output_ports>::size == 0); +static_assert(std::is_same_v>, gr::meta::typelist<>>); +static_assert(bts::stream_output_ports>::size == 0); +static_assert(std::is_same_v>, gr::meta::typelist<>>); + +static_assert(bts::all_output_ports>::size == 0); +static_assert(std::is_same_v>, gr::meta::typelist<>>); +static_assert(bts::stream_output_ports>::size == 0); +static_assert(std::is_same_v>, gr::meta::typelist<>>); ENABLE_REFLECTION_FOR_TEMPLATE(good::fixed_source, event_count); -GP_PLUGIN_REGISTER_NODE(good::fixed_source, float, double); +GP_PLUGIN_REGISTER_BLOCK(good::fixed_source, float, double); +static_assert(bts::all_input_ports>::size == 0); +static_assert(std::is_same_v>, gr::meta::typelist<>>); +static_assert(bts::stream_input_ports>::size == 0); +static_assert(std::is_same_v>, gr::meta::typelist<>>); +static_assert(bts::all_output_ports>::size == 1); +static_assert(std::is_same_v>, gr::meta::typelist>); +static_assert(bts::stream_output_ports>::size == 1); +static_assert(std::is_same_v>, gr::meta::typelist>); diff --git a/core/test/plugins/good_conversion_plugin.cpp b/core/test/plugins/good_conversion_plugin.cpp index ab04b57c..f8469f28 100644 --- a/core/test/plugins/good_conversion_plugin.cpp +++ b/core/test/plugins/good_conversion_plugin.cpp @@ -3,7 +3,7 @@ #include #include -GP_PLUGIN("Good Base Plugin", "Unknown", "LGPL3", "v1") +GP_PLUGIN("Good Conversion Plugin", "Unknown", "LGPL3", "v1") namespace good { namespace grg = gr; @@ -27,4 +27,4 @@ ENABLE_REFLECTION_FOR_TEMPLATE(good::convert, in, out); // Another is to use the same macro for both single-parametrised // and mulciple-parametrised nodes, just to have the parameter // packs wrapped in some special type like this: -GP_PLUGIN_REGISTER_NODE(good::convert, BlockParameters, BlockParameters); +GP_PLUGIN_REGISTER_BLOCK(good::convert, BlockParameters, BlockParameters); diff --git a/core/test/plugins/good_math_plugin.cpp b/core/test/plugins/good_math_plugin.cpp index 6e582883..6d5d0d82 100644 --- a/core/test/plugins/good_math_plugin.cpp +++ b/core/test/plugins/good_math_plugin.cpp @@ -3,10 +3,9 @@ #include -GP_PLUGIN("Good Base Plugin", "Unknown", "LGPL3", "v1") +GP_PLUGIN("Good Math Plugin", "Unknown", "LGPL3", "v1") namespace good { -namespace grg = gr; template auto @@ -28,8 +27,8 @@ class math_base { T _factor = static_cast(1.0f); public: - grg::PortIn> in; - grg::PortOut> out; + gr::PortIn> in; + gr::PortOut> out; math_base() = delete; @@ -37,7 +36,7 @@ class math_base { }; template -class multiply : public grg::Block>, public math_base { +class multiply : public gr::Block>, public math_base { public: using math_base::math_base; @@ -49,7 +48,7 @@ class multiply : public grg::Block>, public math_base { }; template -class divide : public grg::Block>, public math_base { +class divide : public gr::Block>, public math_base { public: using math_base::math_base; @@ -62,8 +61,18 @@ class divide : public grg::Block>, public math_base { } // namespace good +namespace bts = gr::traits::block; + ENABLE_REFLECTION_FOR_TEMPLATE(good::multiply, in, out); -GP_PLUGIN_REGISTER_NODE(good::multiply, float, double); +GP_PLUGIN_REGISTER_BLOCK(good::multiply, float, double); +static_assert(bts::all_input_ports>::size == 1); +static_assert(std::is_same_v>, gr::meta::typelist>); +static_assert(bts::stream_input_ports>::size == 1); +static_assert(std::is_same_v>, gr::meta::typelist>); +static_assert(bts::all_output_ports>::size == 1); +static_assert(std::is_same_v>, gr::meta::typelist>); +static_assert(bts::stream_output_ports>::size == 1); +static_assert(std::is_same_v>, gr::meta::typelist>); ENABLE_REFLECTION_FOR_TEMPLATE(good::divide, in, out); -GP_PLUGIN_REGISTER_NODE(good::divide, float, double); +GP_PLUGIN_REGISTER_BLOCK(good::divide, float, double); diff --git a/core/test/qa_DynamicBlock.cpp b/core/test/qa_DynamicBlock.cpp index cb495a8c..5a674813 100644 --- a/core/test/qa_DynamicBlock.cpp +++ b/core/test/qa_DynamicBlock.cpp @@ -14,7 +14,7 @@ struct fixed_source : public gr::Block, gr::PortOutNamed(this); + auto &port = gr::outputPort<0, gr::PortType::STREAM>(this); auto &writer = port.streamWriter(); auto data = writer.reserve_output_range(1UZ); data[0] = value; @@ -26,12 +26,12 @@ struct fixed_source : public gr::Block, gr::PortOutNamed>); -static_assert(gr::traits::block::input_ports>::size() == 0); -static_assert(gr::traits::block::output_ports>::size() == 1); +static_assert(gr::traits::block::stream_input_ports>::size() == 0); +static_assert(gr::traits::block::stream_output_ports>::size() == 1); template struct DebugSink : public gr::Block> { - T lastValue = {}; + T lastValue = {}; gr::PortIn in; void @@ -47,10 +47,10 @@ ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T), (DebugSink), lastValue, in) const boost::ut::suite DynamicBlocktests = [] { using namespace boost::ut; "Change number of ports dynamically"_test = [] { - constexpr const int sources_count = 10; + constexpr const int sources_count = 10; constexpr const std::size_t events_count = 5; - gr::Graph testGraph; + gr::Graph testGraph; // Adder has sources_count inputs in total, but let's create // sources_count / 2 inputs on construction, and change the number @@ -99,7 +99,7 @@ const boost::ut::suite DynamicBlocktests = [] { const auto source_work = source->work(1UZ); expect(eq(source_work.performed_work, 1UZ)); } - const auto adder_work = adder.work(1UZ); + const auto adder_work = adder.work(1UZ); expect(eq(adder_work.performed_work, 1UZ)); const auto sink_work = sink.work(1UZ); expect(eq(sink_work.performed_work, 1UZ)); diff --git a/core/test/qa_DynamicPort.cpp b/core/test/qa_DynamicPort.cpp index 2bed6b98..20ef2bf0 100644 --- a/core/test/qa_DynamicPort.cpp +++ b/core/test/qa_DynamicPort.cpp @@ -150,7 +150,7 @@ const boost::ut::suite PortApiTests = [] { const BufferReader auto &reader = input_port.streamReader(); expect(eq(reader.available(), 0UZ)); auto buffers = output_port.buffer(); - input_port.setBuffer(buffers.streamBuffer, buffers.tagBufferType); + input_port.setBuffer(buffers.streamBuffer, buffers.tagBuffer); expect(eq(buffers.streamBuffer.n_readers(), 1UZ)); diff --git a/core/test/qa_HierBlock.cpp b/core/test/qa_HierBlock.cpp index b2e2f932..29b87d61 100644 --- a/core/test/qa_HierBlock.cpp +++ b/core/test/qa_HierBlock.cpp @@ -56,9 +56,9 @@ class HierBlock : public gr::BlockModel { assert(gr::ConnectionResult::SUCCESS == graph.connect<"scaled">(left_scale_block).to<"addend0">(adder_block)); assert(gr::ConnectionResult::SUCCESS == graph.connect<"scaled">(right_scale_block).to<"addend1">(adder_block)); - _dynamicInputPorts.emplace_back(gr::DynamicPort(gr::inputPort<0>(&left_scale_block), gr::DynamicPort::non_owned_reference_tag{})); - _dynamicInputPorts.emplace_back(gr::DynamicPort(gr::inputPort<0>(&right_scale_block), gr::DynamicPort::non_owned_reference_tag{})); - _dynamicOutputPorts.emplace_back(gr::DynamicPort(gr::outputPort<0>(&adder_block), gr::DynamicPort::non_owned_reference_tag{})); + _dynamicInputPorts.emplace_back(gr::DynamicPort(gr::inputPort<0, gr::PortType::STREAM>(&left_scale_block), gr::DynamicPort::non_owned_reference_tag{})); + _dynamicInputPorts.emplace_back(gr::DynamicPort(gr::inputPort<0, gr::PortType::STREAM>(&right_scale_block), gr::DynamicPort::non_owned_reference_tag{})); + _dynamicOutputPorts.emplace_back(gr::DynamicPort(gr::outputPort<0, gr::PortType::STREAM>(&adder_block), gr::DynamicPort::non_owned_reference_tag{})); _dynamicPortsLoaded = true; return graph; @@ -137,6 +137,11 @@ class HierBlock : public gr::BlockModel { return { requested_work, requested_work, gr::work::Status::DONE }; } + void + processScheduledMessages() override { + // TODO + } + void * raw() override { return this; diff --git a/core/test/qa_Messages.cpp b/core/test/qa_Messages.cpp new file mode 100644 index 00000000..6602a7ae --- /dev/null +++ b/core/test/qa_Messages.cpp @@ -0,0 +1,343 @@ +#include "gnuradio-4.0/basic/common_blocks.hpp" +#include + +#include +#include +#include + +#include + +#if defined(__clang__) && __clang_major__ >= 16 +// clang 16 does not like ut's default reporter_junit due to some issues with stream buffers and output redirection +template<> +auto boost::ut::cfg = boost::ut::runner>{}; +#endif + +using namespace std::chrono_literals; +using namespace std::string_literals; + +template +auto +createOnesGenerator(IsDone &isDone) { + return [&isDone](auto * /*_this */) -> std::optional { + if (!isDone()) { + return 1.0f; + } else { + return {}; + } + }; +} + +auto +messageProcessorCounter(std::atomic_size_t &countdown, std::atomic_size_t &totalCounter, std::string inMessageKind, gr::Message replyMessage) { + return [&, inMessageKind, replyMessage](auto *_this, gr::MsgPortInNamed<"__Builtin"> &, std::span messages) { + if (countdown > 0) { + for (const auto &message : messages) { + const auto target = gr::messageField(message, gr::message::key::Target); + + // Filtering on target is block's job now + if (target && !target->empty() && *target != _this->unique_name) continue; + + const auto kind = gr::messageField(message, gr::message::key::Kind); + assert(kind); + if (kind != "custom_kind") { + continue; + } + + fmt::print("Got a message, countdown was {}\n", countdown.load()); + if (countdown > 0) countdown--; + totalCounter++; + + if (!replyMessage.empty()) _this->emitMessage(_this->msgOut, std::move(replyMessage)); + } + } + }; +} + +const boost::ut::suite MessagesTests = [] { + using namespace boost::ut; + using namespace gr; + + // Testing if multicast messages sent from outside of the graph reach + // the nodes inside, and if messages sent from the node reach the outside + "MulticastMessaggingWithTheWorld"_test = [] { + using Scheduler = gr::scheduler::Simple<>; + + std::atomic_size_t collectedEventCount = 0; + std::atomic_size_t sourceMessagesCountdown = 1; + std::atomic_size_t sinkMessagesCountdown = 1; + constexpr std::size_t requiredEventCount = 2; + std::atomic_bool receiverGotAMessage = false; + auto isDone = [&] { return requiredEventCount <= collectedEventCount && receiverGotAMessage; }; + + auto scheduler = [&] { + gr::Graph flow; + auto &source = flow.emplaceBlock>(); + + source.generator = createOnesGenerator(isDone); + source.messageProcessor = messageProcessorCounter(sourceMessagesCountdown, collectedEventCount, "custom_kind"s, {}); + + auto &process = flow.emplaceBlock>(property_map{}); + + auto &sink = flow.emplaceBlock>(); + sink.messageProcessor = messageProcessorCounter(sinkMessagesCountdown, collectedEventCount, "custom_kind"s, [] { + gr::Message outMessage; + outMessage[gr::message::key::Kind] = "custom_reply_kind"; + return outMessage; + }()); + + expect(eq(ConnectionResult::SUCCESS, flow.connect<"out">(source).to<"in">(process))); + expect(eq(ConnectionResult::SUCCESS, flow.connect<"out">(process).to<"in">(sink))); + + return Scheduler(std::move(flow)); + }(); + + gr::testing::MessageSender messageSender; + messageSender.messageGenerator = [&](auto * /*this*/) -> std::optional { + if (!isDone()) { + gr::Message message; + message[gr::message::key::Kind] = "custom_kind"; + message[gr::message::key::Target] = ""; + return message; + } else { + return {}; + } + }; + expect(eq(ConnectionResult::SUCCESS, messageSender.msgOut.connect(scheduler.msgIn))); + + gr::testing::FunctionSink messageReceiver; + messageReceiver.messageProcessor = [&](auto * /*_this*/, gr::MsgPortInNamed<"__Builtin"> & /*port*/, std::span messages) { + for (const auto &message : messages) { + const auto kind = gr::messageField(message, gr::message::key::Kind); + if (kind == "custom_reply_kind") { + receiverGotAMessage = true; + } + } + }; + expect(eq(ConnectionResult::SUCCESS, scheduler.msgOut.connect(messageReceiver.msgIn))); + + std::thread messenger([&] { + while (!isDone()) { + std::this_thread::sleep_for(100ms); + messageSender.processOne(); + messageReceiver.processScheduledMessages(); + } + }); + + scheduler.runAndWait(); + messenger.join(); + }; + + // Testing if targetted messages sent from outside of the graph reach + // the node + "TargettedMessageForABlock"_test = [] { + using Scheduler = gr::scheduler::Simple<>; + + std::atomic_size_t collectedEventCount = 0; + std::atomic_size_t sourceMessagesCountdown = 1; // not a target, should not go down + std::atomic_size_t sinkMessagesCountdown = 2; + constexpr std::size_t requiredEventCount = 2; + auto isDone = [&] { return requiredEventCount <= collectedEventCount; }; + + auto scheduler = [&] { + gr::Graph flow; + auto &source = flow.emplaceBlock>(); + + source.generator = createOnesGenerator(isDone); + source.messageProcessor = messageProcessorCounter(sourceMessagesCountdown, collectedEventCount, "custom_kind"s, {}); + + auto &process = flow.emplaceBlock>(property_map{}); + + auto &sink = flow.emplaceBlock>(); + sink.messageProcessor = messageProcessorCounter(sinkMessagesCountdown, collectedEventCount, "custom_kind"s, {}); + + expect(eq(ConnectionResult::SUCCESS, flow.connect<"out">(source).to<"in">(process))); + expect(eq(ConnectionResult::SUCCESS, flow.connect<"out">(process).to<"in">(sink))); + + return Scheduler(std::move(flow)); + }(); + + auto &flow = scheduler.graph(); + + std::string target; + for (const auto &block : flow.blocks()) { + if (block->uniqueName().contains("FunctionSink")) { + target = block->uniqueName(); + } + } + + gr::testing::MessageSender messageSender; + messageSender.messageGenerator = [&](auto * /*this*/) -> std::optional { + if (!isDone()) { + gr::Message message; + message[gr::message::key::Kind] = "custom_kind"; + message[gr::message::key::Target] = target; + + return message; + } else { + return {}; + } + }; + expect(eq(ConnectionResult::SUCCESS, messageSender.msgOut.connect(scheduler.msgIn))); + + std::thread messenger([&] { + while (!isDone()) { + std::this_thread::sleep_for(100ms); + messageSender.processOne(); + } + }); + + scheduler.runAndWait(); + messenger.join(); + + expect(eq(sourceMessagesCountdown.load(), 1UZ)); + fmt::print("This is the sinkMessagesCountdown {}\n", sinkMessagesCountdown.load()); + expect(eq(sinkMessagesCountdown.load(), 0UZ)); + }; + + // Testing if settings messages work + "SettingsManagement"_test = [] { + using Scheduler = gr::scheduler::Simple<>; + + constexpr std::atomic_size_t requiredMessageCount = 4; + std::atomic_size_t settingsSuccessMessageCount = 0; + std::atomic_size_t settingsFailureMessageCount = 0; + // + auto isDone = [&] { return settingsSuccessMessageCount >= requiredMessageCount && settingsFailureMessageCount >= requiredMessageCount; }; + + auto scheduler = [&] { + gr::Graph flow; + auto &source = flow.emplaceBlock>(); + + source.generator = createOnesGenerator(isDone); + + auto &process = flow.emplaceBlock>(property_map{}); + auto &sink = flow.emplaceBlock>(); + + expect(eq(ConnectionResult::SUCCESS, flow.connect<"out">(source).to<"in">(process))); + expect(eq(ConnectionResult::SUCCESS, flow.connect<"out">(process).to<"in">(sink))); + + return Scheduler(std::move(flow)); + }(); + + gr::testing::MessageSender messageSender; + messageSender.messageGenerator = [&](auto * /*this*/) -> std::optional { + if (!isDone()) { + gr::Message message; + message[gr::message::key::Kind] = gr::message::kind::UpdateSettings; + message[gr::message::key::Target] = std::string(); + message[gr::message::key::Data] = gr::property_map{ { "factor", 4.4f } }; + return message; + } else { + return {}; + } + }; + + expect(eq(ConnectionResult::SUCCESS, messageSender.msgOut.connect(scheduler.msgIn))); + + gr::testing::FunctionSink messageReceiver; + messageReceiver.messageProcessor = [&](auto * /*_this*/, gr::MsgPortInNamed<"__Builtin"> & /*port*/, std::span messages) { + for (const auto &message : messages) { + const auto kind = *gr::messageField(message, gr::message::key::Kind); + if (kind == gr::message::kind::SettingsChanged) { + const auto sender = gr::messageField(message, gr::message::key::Sender); + const auto errorInfo = gr::messageField(message, gr::message::key::ErrorInfo); + + if (errorInfo.has_value()) { + settingsSuccessMessageCount++; + } else { + settingsFailureMessageCount++; + } + } + } + }; + expect(eq(ConnectionResult::SUCCESS, scheduler.msgOut.connect(messageReceiver.msgIn))); + + std::thread messenger([&] { + while (!isDone()) { + std::this_thread::sleep_for(100ms); + messageSender.processOne(); + messageReceiver.processScheduledMessages(); + } + }); + + scheduler.runAndWait(); + messenger.join(); + }; + + // Testing message passing without a running scheduler + "MessagesWithoutARunningScheduler"_test = [] { + using Scheduler = gr::scheduler::Simple<>; + + std::atomic_size_t collectedEventCount = 0; + std::atomic_size_t sourceMessagesCountdown = 1; + std::atomic_size_t sinkMessagesCountdown = 1; + constexpr std::size_t requiredEventCount = 2; + std::atomic_bool receiverGotAMessage = false; + auto isDone = [&] { return requiredEventCount <= collectedEventCount && receiverGotAMessage; }; + + auto scheduler = [&] { + gr::Graph flow; + auto &source = flow.emplaceBlock>(); + + source.generator = createOnesGenerator(isDone); + source.messageProcessor = messageProcessorCounter(sourceMessagesCountdown, collectedEventCount, "custom_kind"s, {}); + + auto &process = flow.emplaceBlock>(property_map{}); + + auto &sink = flow.emplaceBlock>(); + sink.messageProcessor = messageProcessorCounter(sinkMessagesCountdown, collectedEventCount, "custom_kind"s, [] { + gr::Message outMessage; + outMessage[gr::message::key::Kind] = "custom_reply_kind"; + return outMessage; + }()); + + expect(eq(ConnectionResult::SUCCESS, flow.connect<"out">(source).to<"in">(process))); + expect(eq(ConnectionResult::SUCCESS, flow.connect<"out">(process).to<"in">(sink))); + + return Scheduler(std::move(flow)); + }(); + + gr::testing::MessageSender messageSender; + messageSender.messageGenerator = [&](auto * /*this*/) -> std::optional { + if (!isDone()) { + gr::Message message; + message[gr::message::key::Kind] = "custom_kind"; + message[gr::message::key::Target] = ""; + return message; + } else { + return {}; + } + }; + expect(eq(ConnectionResult::SUCCESS, messageSender.msgOut.connect(scheduler.msgIn))); + + gr::testing::FunctionSink messageReceiver; + messageReceiver.messageProcessor = [&](auto * /*_this*/, gr::MsgPortInNamed<"__Builtin"> & /*port*/, std::span messages) { + for (const auto &message : messages) { + const auto kind = gr::messageField(message, gr::message::key::Kind); + if (kind == "custom_reply_kind") { + receiverGotAMessage = true; + } + } + }; + expect(eq(ConnectionResult::SUCCESS, scheduler.msgOut.connect(messageReceiver.msgIn))); + + std::thread messenger([&] { + while (!isDone()) { + std::this_thread::sleep_for(100ms); + messageSender.processOne(); + messageReceiver.processScheduledMessages(); + } + }); + + scheduler.init(); + while (!isDone()) { + scheduler.processScheduledMessages(); + } + messenger.join(); + }; +}; + +int +main() { /* tests are statically executed */ +} diff --git a/core/test/qa_grc.cpp b/core/test/qa_grc.cpp index 0bea7d8e..09e2d121 100644 --- a/core/test/qa_grc.cpp +++ b/core/test/qa_grc.cpp @@ -35,7 +35,7 @@ struct TestContext { explicit TestContext(std::vector paths = {}) : registry(), loader(®istry, std::move(paths)) {} gr::BlockRegistry registry; - gr::plugin_loader loader; + gr::PluginLoader loader; }; namespace { @@ -74,8 +74,8 @@ const boost::ut::suite GrcTests = [] { static TestContext context = [] { TestContext ctx({ TESTS_BINARY_PATH "/plugins" }); registerBuiltinBlocks(&ctx.registry); - GP_REGISTER_NODE_RUNTIME(&ctx.registry, ArraySource, double); - GP_REGISTER_NODE_RUNTIME(&ctx.registry, ArraySink, double); + GP_REGISTER_BLOCK_RUNTIME(&ctx.registry, ArraySource, double); + GP_REGISTER_BLOCK_RUNTIME(&ctx.registry, ArraySink, double); return ctx; }(); @@ -172,4 +172,4 @@ const boost::ut::suite GrcTests = [] { int main() { /* tests are statically executed */ -} \ No newline at end of file +} diff --git a/core/test/qa_plugins_test.cpp b/core/test/qa_plugins_test.cpp index d4142ed3..941f9de5 100644 --- a/core/test/qa_plugins_test.cpp +++ b/core/test/qa_plugins_test.cpp @@ -8,7 +8,7 @@ #include #include -#include +#include #if defined(__clang__) && __clang_major__ >= 16 // clang 16 does not like ut's default reporter_junit due to some issues with stream buffers and output redirection @@ -44,9 +44,9 @@ ENABLE_REFLECTION_FOR_TEMPLATE(builtin_multiply, in, out); struct TestContext { gr::BlockRegistry registry; - gr::plugin_loader loader; + gr::PluginLoader loader; - TestContext() : loader(®istry, std::vector{ "core/test/plugins", "test/plugins", "plugins" }) { GP_REGISTER_NODE_RUNTIME(®istry, builtin_multiply, double, float); } + TestContext() : loader(®istry, std::vector{ "core/test/plugins", "test/plugins", "plugins" }) { GP_REGISTER_BLOCK_RUNTIME(®istry, builtin_multiply, double, float); } }; TestContext & @@ -153,7 +153,7 @@ const boost::ut::suite BasicPluginBlocksConnectionTests = [] { gr::Graph testGraph; // Instantiate the node that is defined in a plugin - auto &block_source = context().loader.instantiate_in_graph(testGraph, names::fixed_source, "double"); + auto &block_source = context().loader.instantiateInGraph(testGraph, names::fixed_source, "double"); // Instantiate a built-in node in a static way gr::property_map block_multiply_1_params; @@ -161,10 +161,10 @@ const boost::ut::suite BasicPluginBlocksConnectionTests = [] { auto &block_multiply_double = testGraph.emplaceBlock>(block_multiply_1_params); // Instantiate a built-in node via the plugin loader - auto &block_multiply_float = context().loader.instantiate_in_graph(testGraph, names::builtin_multiply, "float"); + auto &block_multiply_float = context().loader.instantiateInGraph(testGraph, names::builtin_multiply, "float"); - auto &block_convert_to_float = context().loader.instantiate_in_graph(testGraph, names::convert, "double;float"); - auto &block_convert_to_double = context().loader.instantiate_in_graph(testGraph, names::convert, "float;double"); + auto &block_convert_to_float = context().loader.instantiateInGraph(testGraph, names::convert, "double;float"); + auto &block_convert_to_double = context().loader.instantiateInGraph(testGraph, names::convert, "float;double"); // std::size_t repeats = 10; diff --git a/core/test/qa_port_array.cpp b/core/test/qa_port_array.cpp index 99206f27..e424a53d 100644 --- a/core/test/qa_port_array.cpp +++ b/core/test/qa_port_array.cpp @@ -33,7 +33,7 @@ struct RepeatedSource : public gr::Block> { } if (remaining_events_count != 0) { - auto &port = gr::outputPort<0>(this); + auto &port = gr::outputPort<0, gr::PortType::STREAM>(this); auto &writer = port.streamWriter(); auto data = writer.reserve_output_range(1UZ); diff --git a/meta/include/gnuradio-4.0/meta/typelist.hpp b/meta/include/gnuradio-4.0/meta/typelist.hpp index b1472156..d791733b 100644 --- a/meta/include/gnuradio-4.0/meta/typelist.hpp +++ b/meta/include/gnuradio-4.0/meta/typelist.hpp @@ -330,8 +330,8 @@ concept is_typelist_v = requires { typename T::typelist_tag; }; template struct typelist { - using this_t = typelist; - using typelist_tag = std::true_type; + using this_t = typelist; + using typelist_tag = std::true_type; static inline constexpr std::integral_constant size = {}; @@ -340,21 +340,21 @@ struct typelist { template static constexpr void - apply_impl(F &&f, std::index_sequence, LeadingArguments &&...args) { + for_each_impl(F &&f, std::index_sequence, LeadingArguments &&...args) { (f(std::forward(args)..., std::integral_constant{}, Ts{}), ...); } template static constexpr void - apply_func(F &&f, LeadingArguments &&...args) { - apply_impl(std::forward(f), std::make_index_sequence{}, std::forward(args)...); + for_each(F &&f, LeadingArguments &&...args) { + for_each_impl(std::forward(f), std::make_index_sequence{}, std::forward(args)...); } template using at = detail::at_impl::type; - template - using prepend = typelist; + template + using prepend = typelist; template static constexpr inline bool are_equal = std::same_as>; @@ -393,7 +393,7 @@ struct typelist { } }())>; - using safe_head = std::remove_pointer_t 0) { return static_cast *>(nullptr); } else { @@ -417,7 +417,7 @@ struct typelist { static constexpr std::size_t index_of() { std::size_t result = static_cast(-1); - gr::meta::typelist::template apply_func([&](auto index, auto &&t) { + gr::meta::typelist::for_each([&](auto index, auto &&t) { if constexpr (std::is_same_v>) { result = index; } @@ -428,8 +428,8 @@ struct typelist { template inline static constexpr bool contains = std::disjunction_v...>; - using tuple_type = std::tuple; - using tuple_or_type = std::remove_pointer_t; + using tuple_or_type = std::remove_pointer_t(nullptr); } else if constexpr (sizeof...(Ts) == 1) { diff --git a/meta/include/gnuradio-4.0/meta/utils.hpp b/meta/include/gnuradio-4.0/meta/utils.hpp index 000dd616..f525a13e 100644 --- a/meta/include/gnuradio-4.0/meta/utils.hpp +++ b/meta/include/gnuradio-4.0/meta/utils.hpp @@ -184,7 +184,8 @@ struct message_type {}; template constexpr bool always_false = false; -constexpr std::size_t invalid_index = -1UZ; +constexpr std::size_t invalid_index = -1UZ; +constexpr std::size_t default_message_port_index = -2UZ; #if HAVE_SOURCE_LOCATION [[gnu::always_inline]] inline void @@ -343,7 +344,7 @@ static_assert(std::same_as, short, st std::tuple::size()>>>>); template -consteval int +consteval std::size_t indexForName() { auto helper = [](std::index_sequence) { auto static_name_for_index = [] { @@ -361,7 +362,7 @@ indexForName() { static_assert(n_matches <= 1, "Multiple ports with that name were found. The name must be unique. You can " "still use a port index instead."); static_assert(n_matches == 1, "No port with the given name exists."); - int result = -1; + std::size_t result = meta::invalid_index; ((static_name_for_index.template operator()() == Name ? (result = Ids) : 0), ...); return result; };