Skip to content

Commit

Permalink
refactored port<T> interface (#171)
Browse files Browse the repository at this point in the history
* refactored port<T> interface

... as outlined by GR Architecture WG and #148

tackled items:
 * refactored port structure (mandatory enum NTTPs vs. optional type-wrapped arguments)
 * added optional domain argument
 * added default init value (needed for cyclic graphs)
 * add isOptional() annotation
 * fixed repeated_port name -> name0, name1, name2, ...
 * added 'Async' port annotation and 'isSynchronous()' function
 * renamed IN,OUT,... short-hand aliases to more explicit/hopefully descriptive PortIn, PortOut names
 * changed to Capitalised class naming following [C++ Core guidline item](https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#example-389) and Bjarne Stroustrup [style naming](https://www.stroustrup.com/Programming/PPP-style.pdf)

Signed-off-by: Ralph J. Steinhagen <[email protected]>
Signed-off-by: rstein <[email protected]>
  • Loading branch information
RalphSteinhagen authored Sep 21, 2023
1 parent 27aac92 commit ed71b4c
Show file tree
Hide file tree
Showing 29 changed files with 615 additions and 303 deletions.
32 changes: 16 additions & 16 deletions bench/bm_case1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ inline constexpr std::size_t N_ITER = 10;
inline constexpr std::size_t N_SAMPLES = gr::util::round_up(10'000, 1024);

template<typename T, char op>
struct math_op : public fg::node<math_op<T, op>, fg::IN<T, 0, N_MAX, "in">, fg::OUT<T, 0, N_MAX, "out">> {
struct math_op : public fg::node<math_op<T, op>, fg::PortInNamed<T, "in">, fg::PortOutNamed<T, "out">> {
T factor = static_cast<T>(1.0f);

// public:
Expand Down Expand Up @@ -58,7 +58,7 @@ static_assert(fg::traits::node::can_process_one_simd<multiply<float>>);
#endif

template<typename T, char op>
class math_bulk_op : public fg::node<math_bulk_op<T, op>, fg::IN<T, 0, N_MAX, "in">, fg::OUT<T, 0, N_MAX, "out">> {
class math_bulk_op : public fg::node<math_bulk_op<T, op>, fg::PortInNamed<T, "in", fg::RequiredSamples<1, N_MAX>>, fg::PortOutNamed<T, "out", fg::RequiredSamples<1, N_MAX>>> {
T _factor = static_cast<T>(1.0f);

public:
Expand Down Expand Up @@ -126,8 +126,8 @@ class converting_multiply : public fg::node<converting_multiply<T, R>> {
T _factor = static_cast<T>(1.0f);

public:
fg::IN<T> in;
fg::OUT<R> out;
fg::PortIn<T> in;
fg::PortOut<R> out;

converting_multiply() = delete;

Expand Down Expand Up @@ -160,8 +160,8 @@ static_assert(fg::traits::node::can_process_one_simd<converting_multiply<float,
template<typename T, int addend>
class add : public fg::node<add<T, addend>> {
public:
fg::IN<T> in;
fg::OUT<T> out;
fg::PortIn<T> in;
fg::PortOut<T> out;

template<fair::meta::t_or_simd<T> V>
[[nodiscard]] constexpr V
Expand All @@ -183,7 +183,7 @@ static_assert(fg::traits::node::can_process_one_simd<add<float, 1>>);
// It doesn't need to be enabled for reflection.
//
template<typename T, char op>
class gen_operation_SIMD : public fg::node<gen_operation_SIMD<T, op>, fg::IN<T, 0, N_MAX, "in">, fg::OUT<T, 0, N_MAX, "out">> {
class gen_operation_SIMD : public fg::node<gen_operation_SIMD<T, op>, fg::PortInNamed<T, "in", fg::RequiredSamples<1, N_MAX>>, fg::PortOutNamed<T, "out", fg::RequiredSamples<1, N_MAX>>> {
T _value = static_cast<T>(1.0f);

public:
Expand Down Expand Up @@ -270,11 +270,11 @@ using multiply_SIMD = gen_operation_SIMD<T, '*'>;
template<typename T>
using add_SIMD = gen_operation_SIMD<T, '+'>;

template<typename T, std::size_t N_MIN = 0, std::size_t N_MAX = N_MAX, bool use_bulk_operation = false, bool use_memcopy = true>
template<typename T, std::size_t N_MIN = 1, std::size_t N_MAX = N_MAX, bool use_bulk_operation = false, bool use_memcopy = true>
class copy : public fg::node<copy<T, N_MIN, N_MAX, use_bulk_operation, use_memcopy>> {
public:
fg::IN<T, N_MIN, N_MAX> in;
fg::OUT<T, N_MIN, N_MAX> out;
fg::PortIn<T, fg::RequiredSamples<N_MIN, N_MAX>> in;
fg::PortOut<T, fg::RequiredSamples<N_MIN, N_MAX>> out;

template<fair::meta::t_or_simd<T> V>
[[nodiscard]] constexpr V
Expand Down Expand Up @@ -336,8 +336,8 @@ simd_size() noexcept {

namespace stdx = vir::stdx;

template<typename From, typename To, std::size_t N_MIN = 0 /* SIMD size */, std::size_t N_MAX = N_MAX>
class convert : public fg::node<convert<From, To, N_MIN, N_MAX>, fg::IN<From, N_MIN, N_MAX, "in">, fg::OUT<To, N_MIN, N_MAX, "out">> {
template<typename From, typename To, std::size_t N_MIN = 1 /* SIMD size */, std::size_t N_MAX = N_MAX>
class convert : public fg::node<convert<From, To, N_MIN, N_MAX>, fg::PortInNamed<From, "in", fg::RequiredSamples<N_MIN, N_MAX>>, fg::PortOutNamed<To, "out", fg::RequiredSamples<N_MIN, N_MAX>>> {
static_assert(stdx::is_simd_v<From> != stdx::is_simd_v<To>, "either input xor output must be SIMD capable");
constexpr static std::size_t from_simd_size = detail::simd_size<From>();
constexpr static std::size_t to_simd_size = detail::simd_size<To>();
Expand Down Expand Up @@ -451,7 +451,7 @@ inline const boost::ut::suite _constexpr_bm = [] {
}

{
auto merged_node = merge<"out", "in">(merge<"out", "in">(merge<"out", "in">(merge<"out", "in">(test::source<float, 1024, 1024>(N_SAMPLES), copy<float, 0, 128>()), copy<float, 0, 1024>()),
auto merged_node = merge<"out", "in">(merge<"out", "in">(merge<"out", "in">(merge<"out", "in">(test::source<float, 1024, 1024>(N_SAMPLES), copy<float, 1, 128>()), copy<float, 1, 1024>()),
copy<float, 32, 128>()),
test::sink<float>());
"merged src(N=1024)->b1(N≤128)->b2(N=1024)->b3(N=32...128)->sink"_benchmark.repeat<N_ITER>(N_SAMPLES) = [&merged_node]() { loop_over_process_one(merged_node); };
Expand Down Expand Up @@ -521,7 +521,7 @@ inline const boost::ut::suite _runtime_tests = [] {
auto &src = flow_graph.make_node<test::source<float>>(N_SAMPLES);
auto &sink = flow_graph.make_node<test::sink<float>>();

using copy = ::copy<float, 0, N_MAX, true, true>;
using copy = ::copy<float, 1, N_MAX, true, true>;
std::vector<copy *> cpy(10);
for (std::size_t i = 0; i < cpy.size(); i++) {
cpy[i] = std::addressof(flow_graph.make_node<copy>({ { "name", fmt::format("copy {} at {}", i, fair::graph::this_source_location()) } }));
Expand All @@ -542,8 +542,8 @@ inline const boost::ut::suite _runtime_tests = [] {

{
fg::graph flow_graph;
auto &src = flow_graph.make_node<test::source<float, 0, 1024>>(N_SAMPLES);
auto &b1 = flow_graph.make_node<copy<float, 0, 128>>();
auto &src = flow_graph.make_node<test::source<float, 1, 1024>>(N_SAMPLES);
auto &b1 = flow_graph.make_node<copy<float, 1, 128>>();
auto &b2 = flow_graph.make_node<copy<float, 1024, 1024>>();
auto &b3 = flow_graph.make_node<copy<float, 32, 128>>();
auto &sink = flow_graph.make_node<test::sink<float>>();
Expand Down
2 changes: 1 addition & 1 deletion bench/bm_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ inline constexpr std::size_t N_SAMPLES = gr::util::round_up(10'000'000, 1024);
inline constexpr std::size_t N_NODES = 5;

template<typename T, char op>
class math_op : public fg::node<math_op<T, op>, fg::IN<T, 0, N_MAX, "in">, fg::OUT<T, 0, N_MAX, "out">> {
class math_op : public fg::node<math_op<T, op>, fg::PortInNamed<T, "in">, fg::PortOutNamed<T, "out">> {
T _factor = static_cast<T>(1.0f);

public:
Expand Down
14 changes: 7 additions & 7 deletions bench/bm_test_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ inline static std::size_t n_samples_produced = 0_UZ;
template<typename T, std::size_t min = 0_UZ, std::size_t count = N_MAX, bool use_bulk_operation = true>
class source : public fg::node<source<T, min, count>> {
public:
uint64_t _n_samples_max;
std::size_t _n_tag_offset;
fg::OUT<T> out;
uint64_t _n_samples_max;
std::size_t _n_tag_offset;
fg::PortOut<T> out;

source() = delete;

Expand Down Expand Up @@ -87,11 +87,11 @@ class source : public fg::node<source<T, min, count>> {

inline static std::size_t n_samples_consumed = 0_UZ;

template<typename T, std::size_t N_MIN = 0_UZ, std::size_t N_MAX = N_MAX>
template<typename T, std::size_t N_MIN = 1_UZ, std::size_t N_MAX = N_MAX>
struct sink : public fg::node<sink<T, N_MIN, N_MAX>> {
fg::IN<T, N_MIN, N_MAX> in;
std::size_t should_receive_n_samples = 0;
int64_t _last_tag_position = -1;
fg::PortIn<T, fg::RequiredSamples<N_MIN, N_MAX>> in;
std::size_t should_receive_n_samples = 0;
int64_t _last_tag_position = -1;

template<fair::meta::t_or_simd<T> V>
[[nodiscard]] constexpr auto
Expand Down
55 changes: 30 additions & 25 deletions include/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ continuously improve this document.
through a general `work()` function. Blocks are the building blocks of a flow-graph and can be thought of as vertices
in a graph, and *ports* are their input/output connections to neighboring blocks for data streaming, streaming tags,
and asynchronous messages. For the specific implementation, see [node.hpp](node.hpp).
* [port](#Ports) is an interface through which data flows into or out of a block. Each block may have zero, one or
more input ports, and zero, one or more output ports. Data is passed between blocks by connecting the output port of
one block to the input port of another. For the specific implementation, see [port.hpp](port.hpp).
* [buffer](#Buffer) is an area of memory where data is temporarily stored in the runtime-connected graph. Each port
* [Port](#Ports) is an interface through which data flows into or out of a block. Each block may have zero, one or
more input ports, and zero, one or more output ports. Data is passed between blocks by connecting the output Port of
one block to the input Port of another. For the specific implementation, see [port.hpp](port.hpp).
* [buffer](#Buffer) is an area of memory where data is temporarily stored in the runtime-connected graph. Each Port
has its own buffer to store the data, tags, or other messages it needs to perform its computations. Buffer
implementations are typically domain-specific (e.g. for blocks/ports implemented on the CPU, GPU, etc.) and are often,
but not necessarily, implemented as circular buffers. For the specific interface see [Buffer.hpp](Buffer.hpp) and one
Expand Down Expand Up @@ -54,31 +54,36 @@ switch and adopt the provided low-level graph algorithms.

### Ports

Ports in this framework are designed to interconnect blocks in a graph, similar to RF connectors. The port class
Ports in this framework are designed to interconnect blocks in a graph, similar to RF connectors. The Port class
template has several parameters that define its behaviour, including the type of data it handles (`T`), its
name (`PortName`), type (`PortType`), direction (`PortDirection` <-> input/output), and the minimum and maximum number
of samples (`MIN_SAMPLES` and `MAX_SAMPLES`) the user requires for a given block before the `work()` is invoked by the
scheduler. The buffer type used by the port can also be specified using the `BufferType` parameter,
with `gr::circular_buffer<T>` being the default:

name (`PortName`), type (`PortType`), direction (`PortDirection` <-> input/output), and optional list of `Arguments`
that may constrain the port behaviour on the `Block` or `Scheduler` level::
```cpp
template<typename T, fixed_string PortName, port_type_t PortType, port_direction_t PortDirection, // TODO: sort default arguments
std::size_t MIN_SAMPLES = std::dynamic_extent, std::size_t MAX_SAMPLES = std::dynamic_extent,
gr::Buffer BufferType = gr::circular_buffer<T>>
class port { /* ... */ };
class template<typename T, fixed_string PortName, port_type_t PortType, port_direction_t PortDirection, typename... Arguments>
struct Port { /* ... */ };
```
Some of the possible optional port annotation attributes are:
* `RequiredSamples` to describe the min/max number of samples required from this port before invoking the blocks work
function,
* `Optional` informing the graph/scheduler that a given port does not require to be connected,
* `PortDomain<fixed_string>` described whether the port can be handled within the same scheduling domain (e.g. `CPU`
or `GPU`),
* `StreamBufferType` and `TagBufferType` to inject specific user-provided buffer implementations to the port, or
* `Async` for making a port asynchronous in a signal flow-graph block.
When connecting ports, either a single-step or a two-step connection method can be used:
1. single-step connection: which allocates a buffer and passes the corresponding `BufferWriter` and `BufferReader`
instances to the source and destination port. The buffer size is determined only once based on
instances to the source and destination Port. The buffer size is determined only once based on
the `[MIN, MAX]_SAMPLES` constraints and is inherited/fixed for further connected input ports.
2. two-step connection (usually done by the graph):
* register all ports that shall be connected to each other
* determine the minimum buffer size required by the set of connected port and then perform the actual connections as
* determine the minimum buffer size required by the set of connected Port and then perform the actual connections as
outlined in the single-step connection method.
Each port belongs to a single computing domain, which is specified using the port_domain_t enumeration:
Each Port belongs to a single computing domain, which is specified using the port_domain_t enumeration:
```cpp
enum class port_domain_t { CPU, GPU, NET, FPGA, DSP, MLU /*, ...*/ };
Expand All @@ -97,8 +102,8 @@ favour low-latency execution (e.g. few bytes) to keep the data and L1/L2/L3 cach
including, for example, GPUs this choice would cause significant overhead when copying data from the CPU to GPU that
favour DMA-type block-transfer to exchange data for best efficiency.

Additionally, the usage of one buffer type and port per computation domain, along with explicit data conversion, enables
users to easily extend the framework. This approach provides the flexibility for users to define custom buffer and port
Additionally, the usage of one buffer type and Port per computation domain, along with explicit data conversion, enables
users to easily extend the framework. This approach provides the flexibility for users to define custom buffer and Port
implementations that cater to the specific requirements of their applications, thus offering optimal performance and
scalability. The ability to create specialized implementations for specific use cases, coupled with the framework's
openness to user-defined extensions, makes it a versatile and customizable solution.
Expand All @@ -113,8 +118,8 @@ For example:

```cpp
struct user_defined_block : node<user_defined_block> {
IN<float> in;
OUT<float> out;
PortIn<float> in;
PortOut<float> out;
// implement either:
[[nodiscard]] constexpr work_return_t work() noexcept {...}
// or one of the convenience functions outlined below
Expand All @@ -127,7 +132,7 @@ types through templating the input 'T' and return type 'R':
```cpp
template<typename T, typename R>
struct user_defined_block : node<user_defined_block, IN<T, 0, N_MAX, "in">, OUT<R, 0, N_MAX, "out">> {
struct user_defined_block : node<user_defined_block, PortIn<T, 0, N_MAX, "in">, PortOut<R, 0, N_MAX, "out">> {
// implement either:
[[nodiscard]] constexpr work_return_t work() noexcept {...}
// or one of the convenience functions outlined below
Expand All @@ -146,7 +151,7 @@ The following defaults are defined for one of the two 'user_defined_block' block
* **case 1a** - non-decimating N-in->N-out mechanic and automatic handling of streaming tags and settings changes:
```cpp
template<typename T, typename R>
struct user_defined_block : node<user_defined_block, IN<T, 0, N_MAX, "in">, OUT<R, 0, N_MAX, "out">> {
struct user_defined_block : node<user_defined_block, PortIn<T, 0, N_MAX, "in">, PortOut<R, 0, N_MAX, "out">> {
T _factor = T{1};
// constuctor setting _factor etc.

Expand All @@ -155,12 +160,12 @@ The following defaults are defined for one of the two 'user_defined_block' block
}
};
```
The number, type, and ordering of input and arguments of `process_one(..)` are defined by the port definitions.
The number, type, and ordering of input and arguments of `process_one(..)` are defined by the Port definitions.
* **case 1b** - non-decimating N-in->N-out mechanic providing bulk access to the input/output data and automatic
handling of streaming tags and settings changes:
```cpp
template<typename T, typename R>
struct user_defined_block : node<user_defined_block, IN<T, 0, N_MAX, "in">, OUT<R, 0, N_MAX, "out">> {
struct user_defined_block : node<user_defined_block, PortIn<T, 0, N_MAX, "in">, PortOut<R, 0, N_MAX, "out">> {
T _factor = T{1};
// constuctor setting _factor etc.

Expand Down
Loading

0 comments on commit ed71b4c

Please sign in to comment.