diff --git a/examples/simple/universal/z_simple.cxx b/examples/simple/universal/z_simple.cxx index c60d2480..eaa5def1 100644 --- a/examples/simple/universal/z_simple.cxx +++ b/examples/simple/universal/z_simple.cxx @@ -136,7 +136,7 @@ class CustomSubscriber { } private: - Subscriber _sub; + Subscriber _sub; void on_receive(const Sample& sample) { CustomStruct s = sample.get_payload().deserialize(CustomCodec()); std::cout << "Received: " << "{" << s.u << ", " << s.d << ", " << s.s << "}\n"; diff --git a/examples/zenohc/z_get_channel.cxx b/examples/zenohc/z_get_channel.cxx index 385d0dc7..9d1f61ab 100644 --- a/examples/zenohc/z_get_channel.cxx +++ b/examples/zenohc/z_get_channel.cxx @@ -40,11 +40,11 @@ int _main(int argc, char **argv) { auto session = Session::open(std::move(config)); std::cout << "Sending Query '" << expr << "'...\n"; - auto replies = session.get_reply_fifo_channel( - keyexpr, "", 16, {.target = QueryTarget::Z_QUERY_TARGET_ALL} + auto replies = session.get( + keyexpr, "", channels::FifoChannel(16), {.target = QueryTarget::Z_QUERY_TARGET_ALL} ); - for (auto reply = replies.get_next_reply(); replies.is_active() && reply; reply = replies.get_next_reply()) { + for (auto reply = replies.recv().first; static_cast(reply); reply = replies.recv().first) { const auto& sample = reply.get_ok(); std::cout << "Received ('" << sample.get_keyexpr().as_string_view() << "' : '" << sample.get_payload().deserialize() << "')\n"; diff --git a/examples/zenohc/z_get_channel_non_blocking.cxx b/examples/zenohc/z_get_channel_non_blocking.cxx index ef251978..66c2c3ac 100644 --- a/examples/zenohc/z_get_channel_non_blocking.cxx +++ b/examples/zenohc/z_get_channel_non_blocking.cxx @@ -45,11 +45,11 @@ int _main(int argc, char **argv) { std::cout << "Sending Query '" << expr << "'...\n"; - auto replies = session.get_reply_fifo_channel( - keyexpr, "", 16, {.target = QueryTarget::Z_QUERY_TARGET_ALL} + auto replies = session.get( + keyexpr, "", channels::FifoChannel(16), {.target = QueryTarget::Z_QUERY_TARGET_ALL} ); - for (auto reply = replies.get_next_reply(); replies.is_active(); reply = replies.get_next_reply()) { + for (auto [reply, alive] = replies.try_recv(); alive; std::tie(reply, alive) = replies.try_recv()) { if (!reply) { std::cout << "."; std::this_thread::sleep_for(1s); diff --git a/include/zenoh/api.hxx b/include/zenoh/api.hxx index b276f658..ef3a21fb 100644 --- a/include/zenoh/api.hxx +++ b/include/zenoh/api.hxx @@ -11,103 +11,31 @@ // Contributors: // ZettaScale Zenoh Team, -// -// This file contains structures and classes API without implementations -// - #pragma once -#include "base.hxx" -#include "internal.hxx" -#include "serde.hxx" -#include "closures.hxx" -#include -#include -#include -#include -#include - -namespace zenoh { - - -/// ``zenoh::Sample`` kind values. -/// -/// Values: -/// -/// - **Z_SAMPLE_KIND_PUT**: The Sample was issued by a "put" operation. -/// - **Z_SAMPLE_KIND_DELETE**: The Sample was issued by a "delete" operation. -typedef ::z_sample_kind_t SampleKind; +#include "api/bytes.hxx" +#include "api/closures.hxx" +#include "api/config.hxx" +#include "api/encoding.hxx" +#include "api/enums.hxx" +#include "api/hello.hxx" +#include "api/id.hxx" +#include "api/keyexpr.hxx" +#include "api/publisher.hxx" +#include "api/query_consolidation.hxx" +#include "api/query.hxx" +#include "api/queryable.hxx" +#include "api/reply.hxx" +#include "api/sample.hxx" +#include "api/scout.hxx" +#include "api/session.hxx" +#include "api/subscriber.hxx" +#include "api/timestamp.hxx" +#include "api/value.hxx" +#include "api/channels.hxx" -/// Consolidation mode values. -/// -/// Values: -/// - **Z_CONSOLIDATION_MODE_AUTO**: Let Zenoh decide the best consolidation mode depending on the query -/// selector. -/// - **Z_CONSOLIDATION_MODE_NONE**: No consolidation is applied. Replies may come in any order and any -/// number. -/// - **Z_CONSOLIDATION_MODE_MONOTONIC**: It guarantees that any reply for a given key expression will be -/// monotonic in time -/// w.r.t. the previous received replies for the same key expression. I.e., for the same key expression -/// multiple replies may be received. It is guaranteed that two replies received at t1 and t2 will have -/// timestamp ts2 > ts1. It optimizes latency. -/// - **Z_CONSOLIDATION_MODE_LATEST**: It guarantees unicity of replies for the same key expression. -/// It optimizes bandwidth. -typedef ::z_consolidation_mode_t ConsolidationMode; -/// Reliability values. -/// -/// Values: -/// - **Z_RELIABILITY_BEST_EFFORT**: Defines reliability as "best effort" -/// - **Z_RELIABILITY_RELIABLE**: Defines reliability as "reliable" -typedef ::z_reliability_t Reliability; - -/// Congestion control values. -/// -/// Values: -/// - **Z_CONGESTION_CONTROL_BLOCK**: Defines congestion control as "block". Messages are not dropped in case of -/// congestion control -/// - **Z_CONGESTION_CONTROL_DROP**: Defines congestion control as "drop". Messages are dropped in case of -/// congestion control -/// -typedef ::z_congestion_control_t CongestionControl; - -/// Priority of Zenoh messages values. -/// -/// Values: -/// - **Z_PRIORITY_REAL_TIME**: Priority for "realtime" messages. -/// - **Z_PRIORITY_INTERACTIVE_HIGH**: Highest priority for "interactive" messages. -/// - **Z_PRIORITY_INTERACTIVE_LOW**: Lowest priority for "interactive" messages. -/// - **Z_PRIORITY_DATA_HIGH**: Highest priority for "data" messages. -/// - **Z_PRIORITY_DATA**: Default priority for "data" messages. -/// - **Z_PRIORITY_DATA_LOW**: Lowest priority for "data" messages. -/// - **Z_PRIORITY_BACKGROUND**: Priority for "background traffic" messages. -typedef ::z_priority_t Priority; - -/// Query target values. -/// -/// see also: ``zenoh::query_target_default`` -/// -/// Values: -/// - **Z_QUERY_TARGET_BEST_MATCHING**: The nearest complete queryable if any else all matching queryables. -/// - **Z_QUERY_TARGET_ALL**: All matching queryables. -/// - **Z_QUERY_TARGET_ALL_COMPLETE**: A set of complete queryables. -typedef ::z_query_target_t QueryTarget; - -/// Constructs a default ``zenoh::QueryTarget`` -/// @return a default ``zenoh::QueryTarget`` -inline QueryTarget query_target_default(); - -typedef ::z_whatami_t WhatAmI; - -/// @brief Returns a string representation of the given ``c::WhatAmI`` -/// (or the ``zenohpico::WhatAmI``) value. -/// @param whatami the ``c::WhatAmI`` / ``zenohpico::WhatAmI`` value -/// @return a string representation of the given value -inline std::string_view whatami_as_str(WhatAmI whatami) { - ::z_view_string_t str_out; - ::z_whatami_to_str(whatami, &str_out); - return std::string_view(::z_string_data(::z_loan(str_out)), ::z_string_len(::z_loan(str_out))); -} +namespace zenoh { #ifdef ZENOHCXX_ZENOHC /// @brief Initializes logger @@ -118,1195 +46,4 @@ inline std::string_view whatami_as_str(WhatAmI whatami) { inline void init_logger() { ::zc_init_logger(); } #endif - -/// @brief A representation a Zenoh ID. -/// -/// In general, valid Zenoh IDs are LSB-first 128bit unsigned and non-zero integers. -struct Id : public Copyable<::z_id_t> { - using Copyable::Copyable; - - /// @name Methods - - /// Checks if the ID is valid - /// @return true if the ID is valid - bool is_valid() const { return _0.id[0] != 0; } - - /// Returns the byte sequence of the ID - const std::array& bytes() const { return *reinterpret_cast*>(&_0.id); } -}; - -inline std::ostream& operator<<(std::ostream& os, const Id& id) { - auto id_ptr = reinterpret_cast(&id)->id; - for (size_t i = 0; id_ptr[i] != 0 && i < 16; i++) - os << std::hex << std::setfill('0') << std::setw(2) << static_cast(id_ptr[i]); - return os; -} - -/// ``Hello`` message returned by a zenoh entity as a reply to a "scout" -/// message. -class Hello : public Owned<::z_owned_hello_t> { -public: - using Owned::Owned; - - /// @name Methods - - /// @brief Get ``Id`` of the entity - /// @return ``Id`` of the entity - Id get_id() const { return ::z_hello_zid(this->loan()); }; - - /// @brief Get the ``zenoh::WhatAmI`` of the entity - /// @return ``zenoh::WhatAmI`` of the entity - WhatAmI get_whatami() const { return ::z_hello_whatami(this->loan()); } - - /// @brief Get the array of locators of the entity - /// @return the array of locators of the entity - std::vector get_locators() const { - ::z_owned_string_array_t out; - ::z_hello_locators(this->loan(), &out); - std::vector locators(::z_string_array_len(::z_loan(out))); - for (size_t i = 0; i < ::z_string_array_len(::z_loan(out)); i++) { - auto s = ::z_string_array_get(::z_loan(out), i); - locators[i] = std::string_view(reinterpret_cast(::z_string_data(s)), ::z_string_len(s)); - } - return locators; - } -}; - - -/// @brief Owned key expression. -/// -/// See details about key expression syntax in the Key Expressions RFC. -class KeyExpr : public Owned<::z_owned_keyexpr_t> { - public: - using Owned::Owned; - - /// @name Constructors - - /// @brief Create a new instance from a string - /// - /// @param key_expr String representing key expression - /// @param autocanonize If true the key_expr will be autocanonized, prior to constructing key expression - /// @param err If not null the error code will be written to this location, otherwise exception will be thrown in case of error. - explicit KeyExpr(std::string_view key_expr, bool autocanonize = true, ZError* err = nullptr) - : Owned(nullptr) { - if (autocanonize) { - size_t s = key_expr.size(); - __ZENOH_ERROR_CHECK( - ::z_keyexpr_from_substring_autocanonize(&this->_0, key_expr.data(), &s), - err, - std::string("Failed to construct KeyExpr from: ").append(key_expr) - ); - } else { - __ZENOH_ERROR_CHECK( - ::z_keyexpr_from_substring(&this->_0, key_expr.data(), key_expr.size()), - err, - std::string("Failed to construct KeyExpr from: ").append(key_expr) - ); - } - } - - /// @name Methods - /// @brief Get underlying key expression string - std::string_view as_string_view() const { - ::z_view_string_t s; - ::z_keyexpr_as_view_string(this->loan(), &s); - return std::string_view(reinterpret_cast(::z_string_data(::z_loan(s))), ::z_string_len(::z_loan(s))); - } - - /// @name Operators - - /// @brief Equality operator - /// @param other the ``std::string_view`` to compare with - /// @return true if the key expression is equal to the string - bool operator==(std::string_view other) { - if (!(*this)) return false; - return as_string_view() == other; - } - - /// @brief InEquality operator - /// @param other the ``std::string_view`` to compare with - /// @return false if the key expression is equal to the string - bool operator!=(std::string_view other) { - return !((*this) == other); - } - - /// @brief Equality operator - /// @param other the ``KeyExpr`` to compare with - /// @return true if both key expressions are equal - bool operator==(const KeyExpr& other) { return ::z_keyexpr_equals(this->loan(), other.loan()); } - - /// @brief Inequality operator - /// @param other the ``KeyExpr`` to compare with - /// @return false if both key expressions are equal - bool operator!=(const KeyExpr& other) { return !(*this == other); } - - /// @brief Checks if a given ``KeyExpr`` includes the other. - /// @param other the ``KeyExpr`` to compare with - /// @return true if other is included in this. - bool includes(const KeyExpr& other) { - return ::z_keyexpr_includes(this->loan(), other.loan()); - } - - /// @brief Constructs new key expression by concatenation this with a string. - KeyExpr concat(std::string_view s, ZError* err = nullptr) const { - KeyExpr k(nullptr); - __ZENOH_ERROR_CHECK( - ::z_keyexpr_concat(&k._0, this->loan(), s.data(), s.size()), - err, - std::string("Failed to concatenate KeyExpr: ").append(this->as_string_view()).append(" with ").append(s) - ); - return k; - } - - /// @brief Constructs new key expression by joining this with another one - KeyExpr join(const KeyExpr& other, ZError* err = nullptr) const { - KeyExpr k(nullptr); - __ZENOH_ERROR_CHECK( - ::z_keyexpr_join(&k._0, this->loan(), other.loan()), - err, - std::string("Failed to join KeyExpr: ").append(this->as_string_view()).append(" with ").append(other.as_string_view()) - ); - return k; - } - - /// @brief Checks if 2 key expressions intersect - /// @return true if there is at least one non-empty key that is contained in both key expressions - bool intersects(const KeyExpr& other) const { return ::z_keyexpr_intersects(this->loan(), other.loan()); } - - typedef ::z_keyexpr_intersection_level_t IntersectionLevel; - - IntersectionLevel relation_to(const KeyExpr& other) { return ::z_keyexpr_relation_to(this->loan(), other.loan()); } - - /// @brief Verifies if the string is a canonical key expression - static bool is_canon(std::string_view s) { - return ::z_keyexpr_is_canon(s.data(), s.size()) == Z_OK; - } -}; - -class Encoding : public Owned<::z_owned_encoding_t> { -public: - using Owned::Owned; - - /// @name Constructors - - /// @brief Default encoding - Encoding() : Owned(nullptr) {} - - /// @brief Constructs encoding from string - Encoding(std::string_view s, ZError* err = nullptr) : Owned(nullptr) { - __ZENOH_ERROR_CHECK( - ::z_encoding_from_substring(&this->_0, s.data(), s.size()), - err, - std::string("Failed to create encoding from ").append(s) - ); - } - - /// @brief Converts encoding to a string - std::string as_string() const { - ::z_owned_string_t s; - ::z_encoding_to_string(this->loan(), &s); - std::string out = std::string(::z_string_data(::z_loan(s)), ::z_string_len(::z_loan(s))); - ::z_drop(::z_move(s)); - return out; - } -}; - -/// Zenoh Timestamp. -struct Timestamp : Copyable<::z_timestamp_t> { - using Copyable::Copyable; - // TODO: add utility methods to interpret time as mils, seconds, minutes, etc - - /// @name Methods - - /// @brief Get the NPT64 time part of timestamp - /// @return time in NPT64 format. - uint64_t get_time() const { return ::z_timestamp_npt64_time(&this->inner()); } - - /// @brief Get the unique id of the timestamp - /// @return unique id - Id get_id() const { return ::z_timestamp_id(&this->inner()); } -}; - -/// @brief A data sample. -/// -/// A sample is the value associated to a given resource at a given point in time. -class Sample : public Owned<::z_owned_sample_t> { -public: - using Owned::Owned; - - /// @name Methods - - /// @brief The resource key of this data sample. - /// @return ``KeyExpr`` object representing the resource key - decltype(auto) get_keyexpr() const { return detail::as_owned_cpp_obj(::z_sample_keyexpr(this->loan())); } - - /// @brief The data of this data sample - /// @return ``Bytes`` object representing the sample payload - decltype(auto) get_payload() const { return detail::as_owned_cpp_obj(::z_sample_payload(this->loan())); } - - /// @brief The encoding of the data of this data sample - /// @return ``Encoding`` object - decltype(auto) get_encoding() const { return detail::as_owned_cpp_obj(::z_sample_encoding(this->loan())); } - - /// @brief The kind of this data sample (PUT or DELETE) - /// @return ``zenoh::SampleKind`` value - SampleKind get_kind() const { return ::z_sample_kind(this->loan()); } - - /// @brief Checks if sample contains an attachment - /// @return ``True`` if sample contains an attachment - bool has_attachment() const { return ::z_sample_attachment(this->loan()) != nullptr; } - - /// @brief The attachment of this data sample - /// @return ``Bytes`` object - decltype(auto) get_attachment() const { return detail::as_owned_cpp_obj(::z_sample_attachment(this->loan())); } - - /// @brief The timestamp of this data sample - /// @return ``Timestamp`` object - decltype(auto) get_timestamp() const { return detail::as_copyable_cpp_obj(::z_sample_timestamp(this->loan())); } - - /// @brief The priority this data sample was sent with - /// @return ``Priority`` value - Priority get_priority() const { return ::z_sample_priority(this->loan()); } - - /// @brief The congestion control setting this data sample was sent with - /// @return ``CongestionControl`` value - CongestionControl get_congestion_control() const { return ::z_sample_congestion_control(this->loan()); } - - /// @brief The express setting this data sample was sent with - /// @return ``CongestionControl`` value - bool get_express() const { return ::z_sample_express(this->loan()); } - - /// @brief Constructs a shallow copy of this Sample - Sample clone() const { - Sample s(nullptr); - ::z_sample_clone(this->loan(), &s._0); - return s; - }; -}; - -/// A zenoh value. Contans refrence to data and it's encoding -class Value : public Owned<::z_owned_value_t> { -public: - using Owned::Owned; - /// @name Methods - - /// @brief The payload of this value - /// @return ``Bytes`` object - decltype(auto) get_payload() const { return detail::as_owned_cpp_obj(::z_value_payload(this->loan())); } - - /// @brief The encoding of this value - /// @return ``Encoding`` object - decltype(auto) get_encoding() const { return detail::as_owned_cpp_obj(::z_value_encoding(this->loan())); } -}; - -/// Replies consolidation mode to apply on replies of get operation -struct QueryConsolidation : Copyable<::z_query_consolidation_t> { - using Copyable::Copyable; - - /// @name Constructors - - /// @brief Create a new default ``QueryConsolidation`` value - QueryConsolidation() : Copyable(::z_query_consolidation_default()) {} - - /// @brief Create a new ``QueryConsolidation`` value with the given consolidation mode - /// @param v ``zenoh::ConsolidationMode`` value - QueryConsolidation(ConsolidationMode v) : Copyable({v}) {} - - /// @name Methods - - /// @name Operators - - /// @brief Equality operator - /// @param v the other ``QueryConsolidation`` to compare with - /// @return true if the two values are equal (have the same consolidation mode) - bool operator==(const QueryConsolidation& v) const { return this->_0.mode == v._0.mode; } - - /// @brief Inequality operator - /// @param v the other ``QueryConsolidation`` to compare with - /// @return true if the two values are not equal (have different consolidation mode) - bool operator!=(const QueryConsolidation& v) const { return !operator==(v); } -}; - -/// The query to be answered by a ``Queryable`` -class Query : public Owned<::z_owned_query_t> { -public: - using Owned::Owned; - - /// @name Methods - - /// @brief Get the key expression of the query - /// @return ``KeyExpr`` value - decltype(auto) get_keyexpr() const { return detail::as_owned_cpp_obj(::z_query_keyexpr(this->loan())); } - - /// @brief Get a query's parameters - /// - /// @return Parameters string - std::string_view get_parameters() const { - ::z_view_string_t p; - ::z_query_parameters(this->loan(), &p); - return std::string_view(::z_string_data(::z_loan(p)), ::z_string_len(::z_loan(p))); - } - - /// @brief Get the value of the query (payload and encoding) - /// @return ``Value`` value - decltype(auto) get_value() const { return detail::as_owned_cpp_obj(::z_query_value(this->loan())); } - - /// @brief Checks if query contains an attachment - /// @return ``True`` if query contains an attachment - bool has_attachment() const { return ::z_query_attachment(this->loan()) != nullptr; } - - /// @brief Get the attachment of the query - /// @return Attachment - decltype(auto) get_attachment() const { return detail::as_owned_cpp_obj(::z_query_attachment(this->loan())); } - - /// @brief Options passed to the ``Query::reply`` operation - struct ReplyOptions { - /// @brief An optional encoding of this reply payload and/or attachment - std::optional encoding = {}; - /// @brief An optional attachment to this reply. - std::optional attachment = {}; - - /// @brief Returns default option settings - static ReplyOptions create_default() { return {}; } - }; - - /// @brief Send reply to a query - void reply(const KeyExpr& key_expr, Bytes&& payload, ReplyOptions&& options = ReplyOptions::create_default(), ZError* err = nullptr) const { - auto payload_ptr = detail::as_owned_c_ptr(payload); - ::z_query_reply_options_t opts; - opts.encoding = detail::as_owned_c_ptr(options.encoding); - opts.attachment = detail::as_owned_c_ptr(options.attachment); - - __ZENOH_ERROR_CHECK( - ::z_query_reply(this->loan(), detail::loan(key_expr), payload_ptr, &opts), - err, - "Failed to send reply" - ); - } - - /// @brief Options passed to the ``Query::reply_err()`` operation - struct ReplyErrOptions { - /// @brief An optional encoding of the reply error payload - std::optional encoding = {}; - - /// @brief Returns default option settings - static ReplyErrOptions create_default() { return {}; } - }; - - /// @brief Send error to a query - void reply_err(Bytes&& payload, ReplyErrOptions&& options = ReplyErrOptions::create_default(), ZError* err = nullptr) const { - auto payload_ptr = detail::as_owned_c_ptr(payload); - ::z_query_reply_err_options_t opts; - opts.encoding = detail::as_owned_c_ptr(options.encoding); - - __ZENOH_ERROR_CHECK( - ::z_query_reply_err(this->loan(), payload_ptr, &opts), - err, - "Failed to send error" - ); - } -}; - - -/// A Zenoh Session config -class Config : public Owned<::z_owned_config_t> { -public: - using Owned::Owned; - - /// @name Constructors - - /// @brief Create a default configuration - Config() : Owned(nullptr) { - ::z_config_default(&this->_0); - } - -#ifdef ZENOHCXX_ZENOHC - /// @brief Get config parameter by the string key - /// @param key the key - /// @return value of the config parameter - /// @note zenoh-c only - std::string get(std::string_view key, ZError* err = nullptr) const { - ::z_owned_string_t s; - __ZENOH_ERROR_CHECK( - ::zc_config_get_from_substring(this->loan(), key.data(), key.size(), &s), - err, - std::string("Failed to get config value for the key: ").append(key) - ); - std::string out = std::string(::z_string_data(::z_loan(s)), ::z_string_len(::z_loan(s))); - ::z_drop(::z_move(s)); - return out; - } - - /// @brief Get the whole config as a JSON string - /// @return the JSON string in ``Str`` - /// @note zenoh-c only - std::string to_string() const { - ::z_owned_string_t s; - ::zc_config_to_string(this->loan(), &s); - std::string out = std::string(::z_string_data(::z_loan(s)), ::z_string_len(::z_loan(s))); - ::z_drop(::z_move(s)); - return out; - } - - /// @brief Insert a config parameter by the string key - /// @param key the key - /// @param value the JSON string value - /// @return true if the parameter was inserted - /// @note zenoh-c only - bool insert_json(const std::string& key, const std::string& value) { - return ::zc_config_insert_json(loan(), key.c_str(), value.c_str()) == 0; - } -#endif -#ifdef ZENOHCXX_ZENOHPICO - /// @brief Get config parameter by it's numeric ID - /// @param key the key - /// @return pointer to the null-terminated string value of the config parameter - /// @note zenoh-pico only - const char* get(uint8_t key) const { return ::zp_config_get(loan(), key); } - - /// @brief Insert a config parameter by it's numeric ID - /// @param key the key - /// @param value the null-terminated string value - /// @return true if the parameter was inserted - /// @note zenoh-pico only - bool insert(uint8_t key, const char* value); - - /// @brief Insert a config parameter by it's numeric ID - /// @param key the key - /// @param value the null-terminated string value - /// @param error the error code - /// @return true if the parameter was inserted - /// @note zenoh-pico only - bool insert(uint8_t key, const char* value, ErrNo& error); -#endif - -#ifdef ZENOHCXX_ZENOHC - /// @brief Create the default configuration for "peer" mode - /// @return the ``Config`` object - /// @note zenoh-c only - static Config peer() { - Config c(nullptr) ; - ::z_config_peer(&c._0); - return c; - } - - /// @brief Create the configuration from the JSON file - /// @param path path to the file - /// @return the ``Config`` object - /// @note zenoh-c only - static Config from_file(const std::string& path, ZError* err = nullptr) { - Config c(nullptr) ; - __ZENOH_ERROR_CHECK( - ::zc_config_from_file(&c._0, path.data()), - err, - std::string("Failed to create config from: ").append(path) - ); - return c; - } - - /// @brief Create the configuration from the JSON string - /// @param s the JSON string - /// @return the ``Config`` object - /// @note zenoh-c only - static Config from_str(const std::string& s, ZError* err = nullptr) { - Config c(nullptr) ; - __ZENOH_ERROR_CHECK( - ::zc_config_from_str(&c._0, s.data()), - err, - std::string("Failed to create config from: ").append(s) - ); - return c; - } - /// @brief Create the configuration for "client" mode - /// @param peers the array of peer endpoints - /// @return the ``Config`` object - /// @note zenoh-c only - static Config client(const std::vector& peers, ZError* err = nullptr) { - Config c(nullptr) ; - std::vector p; - p.reserve(peers.size()); - for (const auto& peer: peers) { - p.push_back(peer.c_str()); - } - __ZENOH_ERROR_CHECK( - ::z_config_client(&c._0, p.data(), p.size()), - err, - "Failed to create client config" - ); - return c; - } - #endif -}; - - - -/// A reply from queryable to ``Session::get`` operation -class Reply : public Owned<::z_owned_reply_t> { -public: - using Owned::Owned; - - /// @name Methods - - /// @brief Check if the reply is OK - /// @return true if the reply is OK, false if contains a error - bool is_ok() const { return ::z_reply_is_ok(this->loan()); } - - /// @brief Get the reply value. Will throw an exception if ``Reply::is_ok`` returns false. - /// @return Reply sample. - decltype(auto) get_ok() const { - if (!::z_reply_is_ok(this->loan())) { - throw ZException("Reply data sample was requested, but reply contains error", Z_EINVAL); - } - return detail::as_owned_cpp_obj(::z_reply_ok(this->loan())); - } - - /// @brief Get the reply error. Will throw an exception if ``Reply::is_ok`` returns false. - /// @return Reply error. - decltype(auto) get_err() const { - if (::z_reply_is_ok(this->loan())) { - throw ZException("Reply error was requested, but reply contains data sample", Z_EINVAL); - } - return detail::as_owned_cpp_obj(::z_reply_err(this->loan())); - } -}; - -/// An Zenoh subscriber. Destroying subscriber cancels the subscription -/// Constructed by ``Session::declare_subscriber`` method -class Subscriber : public Owned<::z_owned_subscriber_t> { -public: - using Owned::Owned; - -#ifdef ZENOHCXX_ZENOHC - /// @brief Get the key expression of the subscriber - decltype(auto) get_keyexpr() const { - return detail::as_owned_cpp_obj(::z_subscriber_keyexpr(this->loan())); - } -#endif -}; - -/// An Zenoh queryable. Constructed by ``Session::declare_queryable`` method -class Queryable : public Owned<::z_owned_queryable_t> { -public: - using Owned::Owned; -}; - -/// An Zenoh publisher. Constructed by ``Session::declare_publisher`` method -class Publisher : public Owned<::z_owned_publisher_t> { -public: - using Owned::Owned; - - /// @brief Options to be passed to ``Publisher::put()`` operation - struct PutOptions { - /// @brief The encoding of the data to publish. - std::optional encoding = {}; - /// @brief The attachment to attach to the publication. - std::optional attachment = {}; - - /// @brief Returns default option settings - static PutOptions create_default() { return {}; } - }; - - /// @brief Options to be passed to ``Publisher::delete_resource()`` operation - struct DeleteOptions { - uint8_t __dummy; - - /// @brief Returns default option settings - static DeleteOptions create_default() { return {}; } - }; - - /// @name Methods - - /// @brief Publish the payload - /// @param payload ``Payload`` to publish - /// @param options Optional values passed to put operation - /// @return 0 in case of success, negative error code otherwise - ZError put(Bytes&& payload, PutOptions&& options = PutOptions::create_default()) const { - auto payload_ptr = detail::as_owned_c_ptr(payload); - ::z_publisher_put_options_t opts; - opts.encoding = detail::as_owned_c_ptr(options.encoding); - opts.attachment = detail::as_owned_c_ptr(options.attachment); - - return ::z_publisher_put(this->loan(), payload_ptr, &opts); - } - - /// @brief Undeclare the resource associated with the publisher key expression - /// @param options Optional values to pass to delete operation - /// @return 0 in case of success, negative error code otherwise - ZError delete_resource(DeleteOptions&& options = DeleteOptions::create_default()) const { - ::z_publisher_delete_options_t opts; - opts.__dummy = options.__dummy; - return ::z_publisher_delete(this->loan(), &opts); - } - -#ifdef ZENOHCXX_ZENOHC - /// @brief Get the key expression of the publisher - decltype(auto) get_keyexpr() const { - return detail::as_owned_cpp_obj(::z_publisher_keyexpr(this->loan())); - } -#endif -}; - -namespace detail::closures { -extern "C" { - inline void _zenoh_on_reply_call(const ::z_loaned_reply_t* reply, void* context) { - IClosure::call_from_context(context, detail::as_owned_cpp_obj(reply)); - } - - inline void _zenoh_on_sample_call(const ::z_loaned_sample_t* sample, void* context) { - IClosure::call_from_context(context, detail::as_owned_cpp_obj(sample)); - } - - inline void _zenoh_on_query_call(const ::z_loaned_query_t* query, void* context) { - IClosure::call_from_context(context, detail::as_owned_cpp_obj(query)); - } - - inline void _zenoh_on_id_call(const ::z_id_t* z_id, void* context) { - IClosure::call_from_context(context, detail::as_copyable_cpp_obj(z_id)); - } - - inline void _zenoh_on_hello_call(const ::z_loaned_hello_t* hello, void* context) { - IClosure::call_from_context(context, detail::as_owned_cpp_obj(hello)); - } -} -} - - -enum class FifoChannelType { - Blocking, - NonBlocking -}; - - -template -class ReplyFifoChannel: public Owned<::z_owned_reply_channel_closure_t> { -private: - bool active = true; -public: - using Owned::Owned; - /// @brief Fetches next available reply. - /// @return Reply. Might return invalid reply in case of Non-blocking channel if there are no replies available. - /// In this case is is possible to verify whether the channel is still active (and might receive more replies in the future) - /// by calling ``ReplyFifoChannel::is_active()``. - Reply get_next_reply() { - Reply r(nullptr); - bool res = ::z_call(this->loan(), detail::as_owned_c_ptr(r)); - if constexpr (C == FifoChannelType::NonBlocking) { - active = static_cast(r) || !res; - } else { - active = static_cast(r); - } - return r; - } - - /// @brief Verifies if channel is still active. - /// @return True if channel is still active (i. e. might receive more valid replies in the future). Inactive channel will - /// always return an invalid ``Reply`` when calling ``ReplyFifoChannel::next_reply()`. - bool is_active() const { return active; } -}; - -/// A Zenoh session. -class Session : public Owned<::z_owned_session_t> { -public: - using Owned::Owned; - - Session(Config&& config, ZError* err = nullptr) : Owned(nullptr) { - __ZENOH_ERROR_CHECK( - ::z_open(&this->_0, detail::as_owned_c_ptr(config)), - err, - "Failed to open session" - ); - } - - /// @name Methods - -#ifdef ZENOHCXX_ZENOHC - /// @brief Create a shallow copy of the session - /// @return a new ``Session`` instance - /// @note zenoh-c only - Session clone() const { - Session s(nullptr); - ::zc_session_clone(this->loan(), &s._0); - return s; - } -#endif - - /// @brief Get the unique identifier of the zenoh node associated to this ``Session`` - /// @return the unique identifier ``Id`` - Id get_zid() const { - return Id(::z_info_zid(this->loan())); - } - - /// @brief Create ``KeyExpr`` instance with numeric id registered in ``Session`` routing tables - /// @param key_expr ``KeyExpr`` to declare - /// @return Declared ``KeyExpr`` instance - KeyExpr declare_keyexpr(const KeyExpr& key_expr, ZError* err = nullptr) const { - KeyExpr k(nullptr); - __ZENOH_ERROR_CHECK( - ::z_declare_keyexpr(detail::as_owned_c_ptr(k), this->loan(), detail::loan(key_expr)), - err, - std::string("Failed to declare key expression: ").append(k.as_string_view()) - ); - return k; - } - - /// @brief Remove ``KeyExpr`` instance from ``Session`` and drop ``KeyExpr`` instance - /// @param keyexpr ``KeyExpr`` instance to drop - void undeclare_keyexpr(KeyExpr&& key_expr, ZError* err = nullptr) const { - __ZENOH_ERROR_CHECK( - ::z_undeclare_keyexpr(detail::as_owned_c_ptr(key_expr), this->loan()), - err, - "Failed to undeclare key expression" - ); - } - - /// @brief Options passed to the ``get()`` operation - struct GetOptions { - /// @brief The Queryables that should be target of the query. - QueryTarget target = QueryTarget::Z_QUERY_TARGET_ALL; - /// @brief The replies consolidation strategy to apply on replies to the query. - QueryConsolidation consolidation = QueryConsolidation(); - /// @brief An optional payload of the query. - std::optional payload = {}; - /// @brief An optional encoding of the query payload and/or attachment. - std::optional encoding = {}; - /// @brief An optional attachment to the query. - std::optional attachment = {}; - /// @brief The timeout for the query in milliseconds. 0 means default query timeout from zenoh configuration. - uint64_t timeout_ms = 0; - - /// @brief Returns default option settings - static GetOptions create_default() { return {}; } - }; - - /// @brief Query data from the matching queryables in the system. Replies are provided through a callback function. - /// @param key_expr ``KeyExpr`` the key expression matching resources to query - /// @param parameters the parameters string in URL format - /// @param on_reply callback that will be called once for each received reply - /// @param on_drop callback that will be called once all replies are received - /// @param options ``GetOptions`` query options - template - void get( - const KeyExpr& key_expr, const std::string& parameters, C&& on_reply, D&& on_drop, - GetOptions&& options = GetOptions::create_default(), ZError* err = nullptr - ) const { - static_assert( - std::is_invocable_r::value, - "on_reply should be callable with the following signature: void on_reply(const zenoh::Reply& reply)" - ); - static_assert( - std::is_invocable_r::value, - "on_drop should be callable with the following signature: void on_drop()" - ); - ::z_owned_closure_reply_t c_closure; - using ClosureType = typename detail::closures::Closure; - auto closure = ClosureType::into_context(std::forward(on_reply), std::forward(on_drop)); - ::z_closure(&c_closure, detail::closures::_zenoh_on_reply_call, detail::closures::_zenoh_on_drop, closure); - ::z_get_options_t opts; - opts.target = options.target; - opts.consolidation = static_cast(options.consolidation); - opts.payload = detail::as_owned_c_ptr(options.payload); - opts.encoding = detail::as_owned_c_ptr(options.encoding); - opts.attachment = detail::as_owned_c_ptr(options.attachment); - opts.timeout_ms = options.timeout_ms; - - __ZENOH_ERROR_CHECK( - ::z_get(this->loan(), detail::loan(key_expr), parameters.c_str(), ::z_move(c_closure), &opts), - err, - "Failed to perform get operation" - ); - } - - /// @brief Query data from the matching queryables in the system. Replies are provided through a channel. - /// @param key_expr ``KeyExpr`` the key expression matching resources to query - /// @param parameters the parameters string in URL format - /// @param bound Capacity of the channel, if different from 0, the channel will be bound and apply back-pressure when full - /// @param options ``GetOptions`` query options - /// @return Reply fifo channel - template - ReplyFifoChannel get_reply_fifo_channel( - const KeyExpr& key_expr, const std::string& parameters, size_t bound, GetOptions&& options = GetOptions::create_default(), ZError* err = nullptr - ) const { - ::z_owned_reply_channel_t reply_channel; - if constexpr (C == FifoChannelType::Blocking) { - ::zc_reply_fifo_new(&reply_channel, bound); - } else { - ::zc_reply_non_blocking_fifo_new(&reply_channel, bound); - } - ReplyFifoChannel recv(&reply_channel.recv); - ::z_get_options_t opts; - opts.target = options.target; - opts.consolidation = static_cast(options.consolidation); - opts.payload = detail::as_owned_c_ptr(options.payload); - opts.encoding = detail::as_owned_c_ptr(options.encoding); - opts.attachment = detail::as_owned_c_ptr(options.attachment); - opts.timeout_ms = options.timeout_ms; - - ZError res = ::z_get(this->loan(), detail::loan(key_expr), parameters.c_str(), ::z_move(reply_channel.send), &opts); - __ZENOH_ERROR_CHECK( - res, - err, - "Failed to perform get operation" - ); - if (res != Z_OK) std::move(recv).take(); - return recv; - } - /// @brief Options to be passed to ``delete_resource()`` operation - struct DeleteOptions { - /// @brief The priority of the delete message. - Priority priority = Priority::Z_PRIORITY_DATA; - /// @brief The congestion control to apply when routing delete message. - CongestionControl congestion_control = CongestionControl::Z_CONGESTION_CONTROL_DROP; - /// @brief Whether Zenoh will NOT wait to batch delete message with others to reduce the bandwith. - bool is_express = false; - - /// @brief Returns default option settings - static DeleteOptions create_default() { return {}; } - }; - - /// @brief Undeclare a resource. Equal to ``Publisher::delete_resource`` - /// @param key_expr ``KeyExprView`` the key expression to delete the resource - /// @param options ``DeleteOptions`` delete options - /// @return 0 in case of success, negative error code otherwise - ZError delete_resource(const KeyExpr& key_expr, DeleteOptions&& options = DeleteOptions::create_default()) const { - ::z_delete_options_t opts; - opts.congestion_control = options.congestion_control; - opts.priority = options.priority; - opts.is_express = options.is_express; - - return ::z_delete(this->loan(), detail::loan(key_expr), &opts); - } - - /// @brief Options passed to the ``put()`` operation - struct PutOptions { - /// @brief The priority of this message. - Priority priority = Priority::Z_PRIORITY_DATA; - /// @brief The congestion control to apply when routing this message. - CongestionControl congestion_control = CongestionControl::Z_CONGESTION_CONTROL_DROP; - /// @brief Whether Zenoh will NOT wait to batch this message with others to reduce the bandwith. - bool is_express = false; - /// @brief An optional encoding of the message payload and/or attachment. - std::optional encoding = {}; - /// @brief An optional attachment to the message. - std::optional attachment = {}; - - /// @brief Returns default option settings - static PutOptions create_default() { return {}; } - }; - - /// @brief Publish data to the matching subscribers in the system. Equal to ``Publisher::put_owned`` - /// @param key_expr The key expression to put the data - /// @param payload The data to publish - /// @param options Options to pass to put operation - void put(const KeyExpr& key_expr, Bytes&& payload, PutOptions&& options = PutOptions::create_default(), ZError* err = nullptr) const { - ::z_put_options_t opts; - opts.encoding = detail::as_owned_c_ptr(options.encoding); - opts.congestion_control = options.congestion_control; - opts.priority = options.priority; - opts.is_express = options.is_express; - opts.attachment = detail::as_owned_c_ptr(options.attachment); - - auto payload_ptr = detail::as_owned_c_ptr(payload); - __ZENOH_ERROR_CHECK( - ::z_put(this->loan(), detail::loan(key_expr), payload_ptr, &opts), - err, - "Failed to perform put operation" - ); - } - - /// @brief Options to be passed when declaring a ``Queryable`` - struct QueryableOptions { - /// @brief The completeness of the Queryable. - bool complete = false; - - /// @brief Returns default option settings - static QueryableOptions create_default() { return {}; } - }; - - /// @brief Create a ``Queryable`` object to answer to ``Session::get`` requests - /// @param key_expr The key expression to match the ``Session::get`` requests - /// @param on_query The callback to handle ``Query`` requests. Will be called once for each query - /// @param on_drop The drop callback. Will be called once, when ``Queryable`` is destroyed or undeclared - /// @param options Options passed to queryable declaration - /// @return a ``Queryable`` object - template - Queryable declare_queryable( - const KeyExpr& key_expr, C&& on_query, D&& on_drop, QueryableOptions&& options = QueryableOptions::create_default(), ZError* err = nullptr - ) const { - static_assert( - std::is_invocable_r::value, - "on_query should be callable with the following signature: void on_query(const zenoh::Query& query)" - ); - static_assert( - std::is_invocable_r::value, - "on_drop should be callable with the following signature: void on_drop()" - ); - ::z_owned_closure_query_t c_closure; - using ClosureType = typename detail::closures::Closure; - auto closure = ClosureType::into_context(std::forward(on_query), std::forward(on_drop)); - ::z_closure(&c_closure, detail::closures::_zenoh_on_query_call, detail::closures::_zenoh_on_drop, closure); - ::z_queryable_options_t opts; - opts.complete = options.complete; - - Queryable q(nullptr); - ZError res = ::z_declare_queryable( - detail::as_owned_c_ptr(q), this->loan(), detail::loan(key_expr), ::z_move(c_closure), &opts - ); - __ZENOH_ERROR_CHECK(res, err, "Failed to declare Queryable"); - return q; - } - - /// @brief Options to be passed when declaring a ``Subscriber`` - struct SubscriberOptions { - /// @brief The subscription reliability. - Reliability reliability = Reliability::Z_RELIABILITY_BEST_EFFORT; - - /// @brief Returns default option settings - static SubscriberOptions create_default() { return {}; } - }; - - /// @brief Create a ``Subscriber`` object to receive data from matching ``Publisher`` objects or from - /// ``Session::put`` and ``Session::delete_resource`` requests - /// @param key_expr The key expression to match the publishers - /// @param on_sample The callback that will be called for each received sample - /// @param on_drop The callback that will be called once subscriber is destroyed or undeclared - /// @param options Options to pass to subscriber declaration - /// @return a ``Subscriber`` object - template - Subscriber declare_subscriber( - const KeyExpr& key_expr, C&& on_sample, D&& on_drop, SubscriberOptions&& options = SubscriberOptions::create_default(), ZError *err = nullptr - ) const { - static_assert( - std::is_invocable_r::value, - "on_sample should be callable with the following signature: void on_sample(const zenoh::Sample& sample)" - ); - static_assert( - std::is_invocable_r::value, - "on_drop should be callable with the following signature: void on_drop()" - ); - ::z_owned_closure_sample_t c_closure; - using ClosureType = typename detail::closures::Closure; - auto closure = ClosureType::into_context(std::forward(on_sample), std::forward(on_drop)); - ::z_closure(&c_closure, detail::closures::_zenoh_on_sample_call, detail::closures::_zenoh_on_drop, closure); - ::z_subscriber_options_t opts; - opts.reliability = options.reliability; - Subscriber s(nullptr); - ZError res = ::z_declare_subscriber( - detail::as_owned_c_ptr(s), this->loan(), detail::loan(key_expr), ::z_move(c_closure), &opts - ); - __ZENOH_ERROR_CHECK(res, err, "Failed to declare Subscriber"); - return s; - } - - /// @brief Options to be passed when declaring a ``Publisher`` - struct PublisherOptions { - /// @brief The congestion control to apply when routing messages from this publisher. - CongestionControl congestion_control; - /// @brief The priority of messages from this publisher. - Priority priority; - /// @brief If true, Zenoh will not wait to batch this message with others to reduce the bandwith - bool is_express; - /// @brief Returns default option settings - - static PublisherOptions create_default() { return {}; } - }; - - /// @brief Create a ``Publisher`` object to publish data to matching ``Subscriber`` and ``PullSubscriber`` objects - /// @param key_expr The key expression to match the subscribers - /// @param options Options passed to publisher declaration - /// @return a ``Publisher`` object - Publisher declare_publisher( - const KeyExpr& key_expr, PublisherOptions&& options = PublisherOptions::create_default(), ZError* err = nullptr - ) const { - ::z_publisher_options_t opts; - opts.congestion_control = options.congestion_control; - opts.priority = options.priority; - opts.is_express = options.is_express; - - Publisher p(nullptr); - ZError res = ::z_declare_publisher( - detail::as_owned_c_ptr(p), this->loan(), detail::loan(key_expr), &opts - ); - __ZENOH_ERROR_CHECK(res, err, "Failed to declare Publisher"); - return p; - } - - /// @brief Fetches the Zenoh IDs of all connected routers. - std::vector get_routers_z_id(ZError* err = nullptr) const { - std::vector out; - auto f = [&out](const Id& z_id) { - out.push_back(z_id); - }; - typedef decltype(f) F; - ::z_owned_closure_zid_t c_closure; - using ClosureType = typename detail::closures::Closure; - auto closure = ClosureType::into_context(std::forward(f), closures::none); - ::z_closure(&c_closure, detail::closures::_zenoh_on_id_call, detail::closures::_zenoh_on_drop, closure); - __ZENOH_ERROR_CHECK( - ::z_info_routers_zid(this->loan(), &c_closure), - err, - "Failed to fetch router Ids" - ); - return out; - } - - /// @brief Fetches the Zenoh IDs of all connected peers. - std::vector get_peers_z_id(ZError* err = nullptr) const { - std::vector out; - auto f = [&out](const Id& z_id) { - out.push_back(z_id); - }; - typedef decltype(f) F; - ::z_owned_closure_zid_t c_closure; - auto closure = detail::closures::Closure::into_context( - std::forward(f), closures::none - ); - ::z_closure(&c_closure, detail::closures::_zenoh_on_id_call, detail::closures::_zenoh_on_drop, closure); - __ZENOH_ERROR_CHECK( - ::z_info_peers_zid(this->loan(), &c_closure), - err, - "Failed to fetch peer Ids" - ); - return out; - } - -#ifdef ZENOHCXX_ZENOHPICO - - /// @brief Start a separate task to read from the network and process the messages as soon as they are received. - /// @return true if the operation was successful, false otherwise - /// @note zenoh-pico only - bool start_read_task(); - - /// @brief Start a separate task to read from the network and process the messages as soon as they are received. - /// @param error ``zenoh::ErrNo`` the error code - /// @return true if the operation was successful, false otherwise - /// @note zenoh-pico only - bool start_read_task(ZError* error = nullptr); - - /// @brief Stop the read task - /// @return true if the operation was successful, false otherwise - /// @note zenoh-pico only - bool stop_read_task(); - - /// @brief Stop the read task - /// @param error ``zenoh::ErrNo`` the error code - /// @return true if the operation was successful, false otherwise - /// @note zenoh-pico only - bool stop_read_task(ZError* error = nullptr); - - /// @brief Start a separate task to handle the session lease. This task will send KeepAlive messages when needed - /// and will close the session when the lease is expired. When operating over a multicast transport, it also - /// periodically sends the Join messages. - /// @return true if the operation was successful, false otherwise - /// @note zenoh-pico only - bool start_lease_task(); - - /// @brief Start a separate task to handle the session lease. This task will send KeepAlive messages when needed - /// and will close the session when the lease is expired. When operating over a multicast transport, it also - /// periodically sends the Join messages. - /// @param error ``zenoh::ErrNo`` the error code - /// @return true if the operation was successful, false otherwise - /// @note zenoh-pico only - bool start_lease_task(ZError* error = nullptr); - - /// @brief Stop the lease task - /// @return true if the operation was successful, false otherwise - /// @note zenoh-pico only - bool stop_lease_task(); - - /// @brief Stop the lease task - /// @param error ``zenoh::ErrNo`` the error code - /// @return true if the operation was successful, false otherwise - /// @note zenoh-pico only - bool stop_lease_task(ZError* error = nullptr); - - /// @brief Triggers a single execution of reading procedure from the network and processes of any received the - /// message - /// @return true if the operation was successful, false otherwise - /// @note zenoh-pico only - bool read(); - - /// @brief Triggers a single execution of reading procedure from the network and processes of any received the - /// message - /// @param error ``zenoh::ErrNo`` the error code - /// @return true if the operation was successful, false otherwise - /// @note zenoh-pico only - bool read(ZError* error = nullptr); - - /// @brief Triggers a single execution of keep alive procedure. It will send KeepAlive messages when needed and - /// will close the session when the lease is expired. - /// @return true if the leasing procedure was executed successfully, false otherwise. - bool send_keep_alive(); - - /// @brief Triggers a single execution of keep alive procedure. It will send KeepAlive messages when needed and - /// will close the session when the lease is expired. - /// @param error ``zenoh::ErrNo`` the error code - /// @return true if the leasing procedure was executed successfully, false otherwise. - bool send_keep_alive(ZError* error = nullptr); - - /// @brief Triggers a single execution of join procedure: send the Join message - /// @return true if the join procedure was executed successfully, false otherwise. - bool send_join(); - - /// @brief Triggers a single execution of join procedure: send the Join message - /// @param error ``zenoh::ErrNo`` the error code - /// @return true if the join procedure was executed successfully, false otherwise. - bool send_join(ZError* error = nullptr); -#endif - - /// @brief Create a ``Session`` with the given ``Config`` - /// @param config ``Config`` to use - /// @param start_background_tasks for zenoh-pico only. If true, start background threads which handles the network - /// traffic. If false, the threads should be called manually with ``Session::start_read_task`` and - /// ``Session::start_lease_task`` or methods ``Session::read``, ``Session::send_keep_alive`` and - /// ``Session::send_join`` should be called in loop. - /// @return a ``Session`` if the session was successfully created, an ``zenoh::ErrorMessage`` otherwise - static Session open(Config&& config, ZError* err = nullptr) { - return Session(std::move(config), err); - } -}; - -/// @brief Options to be passed to ``scout()`` operation -struct ScoutOptions { - /// @brief The maximum duration in ms the scouting can take. - size_t timeout_ms = 1000; - /// @brief Type of entities to scout for. - WhatAmI what = WhatAmI::Z_WHATAMI_ROUTER_PEER; - - /// @brief Returns default option settings - static ScoutOptions create_default() { return {}; } -}; - -/// @brief Scout for zenoh entities in the network -/// @param config ``ScoutingConfig`` to use -/// @param on_hello The callback to process each received ``Hello``message -/// @param on_drop The callback that will be called once all hello ``Hello``messages are received -template -void scout(Config&& config, C&& on_hello, D&& on_drop, ScoutOptions&& options = ScoutOptions::create_default(), ZError* err = nullptr) { - static_assert( - std::is_invocable_r::value, - "on_hello should be callable with the following signature: void on_hello(const zenoh::Hello& hello)" - ); - static_assert( - std::is_invocable_r::value, - "on_drop should be callable with the following signature: void on_drop()" - ); - ::z_owned_closure_hello_t c_closure; - using ClosureType = typename detail::closures::Closure; - auto closure = ClosureType::into_context(std::forward(on_hello), std::forward(on_drop)); - ::z_closure(&c_closure, detail::closures::_zenoh_on_hello_call, detail::closures::_zenoh_on_drop, closure); - ::z_scout_options_t opts; - opts.zc_timeout_ms = options.timeout_ms; - opts.zc_what = options.what; - - __ZENOH_ERROR_CHECK( - ::z_scout(detail::as_owned_c_ptr(config), ::z_move(c_closure), &opts), - err, - "Failed to perform scout operation" - ); -} - - - - } diff --git a/include/zenoh/base.hxx b/include/zenoh/api/base.hxx similarity index 98% rename from include/zenoh/base.hxx rename to include/zenoh/api/base.hxx index 0a78c692..54a24761 100644 --- a/include/zenoh/base.hxx +++ b/include/zenoh/api/base.hxx @@ -13,9 +13,11 @@ #pragma once -#include "zenohc.hxx" +#include "../zenohc.hxx" #include #include +#include +#include namespace zenoh { diff --git a/include/zenoh/serde.hxx b/include/zenoh/api/bytes.hxx similarity index 96% rename from include/zenoh/serde.hxx rename to include/zenoh/api/bytes.hxx index 9a1c4394..f9207192 100644 --- a/include/zenoh/serde.hxx +++ b/include/zenoh/api/bytes.hxx @@ -1,5 +1,5 @@ // -// Copyright (c) 2023 ZettaScale Technology +// Copyright (c) 2024 ZettaScale Technology // // This program and the accompanying materials are made available under the // terms of the Eclipse Public License 2.0 which is available at @@ -11,28 +11,27 @@ // Contributors: // ZettaScale Zenoh Team, -// -// This file contains structures and classes API without implementations -// - #pragma once #include "base.hxx" -#include "internal.hxx" +#include "../detail/interop.hxx" +#include "../detail/closures.hxx" #include "closures.hxx" + #include #include #include #include #include #include +#include namespace zenoh { namespace detail::closures { extern "C" { - inline void _zenoh_encode_iter(z_owned_bytes_t* b, void* context) { - IClosure::call_from_context(context, b); + inline bool _zenoh_encode_iter(z_owned_bytes_t* b, void* context) { + return IClosure::call_from_context(context, b); } } @@ -98,14 +97,15 @@ public: auto f = [current = begin, end, &codec] (z_owned_bytes_t* b) mutable { if (current == end) { ::z_null(b); - return; + return false; } *b = Bytes::serialize(*current, codec).take(); current++; + return true; }; using F = decltype(f); - using ClosureType = typename detail::closures::Closure; + using ClosureType = typename detail::closures::Closure; auto closure = ClosureType::into_context(std::forward(f), closures::none); ::z_bytes_encode_from_iter(detail::as_owned_c_ptr(out), detail::closures::_zenoh_encode_iter, closure); diff --git a/include/zenoh/api/channels.hxx b/include/zenoh/api/channels.hxx new file mode 100644 index 00000000..3b186016 --- /dev/null +++ b/include/zenoh/api/channels.hxx @@ -0,0 +1,193 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +// +// This file contains structures and classes API without implementations +// + +#pragma once +#include "../detail/interop.hxx" +#include "base.hxx" +#include "sample.hxx" +#include "reply.hxx" +#include "query.hxx" + +namespace zenoh::channels { + +namespace detail { + template + struct FifoHandlerData {}; + + template<> + struct FifoHandlerData { + typedef ::z_owned_fifo_handler_sample_t handler_type; + typedef ::z_owned_closure_sample_t closure_type; + static void create_cb_handler_pair(closure_type* cb, handler_type* h, size_t capacity) { + ::z_fifo_channel_sample_new(cb, h, capacity); + } + }; + + template<> + struct FifoHandlerData { + typedef ::z_owned_fifo_handler_query_t handler_type; + typedef ::z_owned_closure_query_t closure_type; + static void create_cb_handler_pair(closure_type* cb, handler_type* h, size_t capacity) { + ::z_fifo_channel_query_new(cb, h, capacity); + } + }; + + template<> + struct FifoHandlerData { + typedef ::z_owned_fifo_handler_reply_t handler_type; + typedef ::z_owned_closure_reply_t closure_type; + static void create_cb_handler_pair(closure_type* cb, handler_type* h, size_t capacity) { + ::z_fifo_channel_reply_new(cb, h, capacity); + } + }; + + template + struct RingHandlerData {}; + + template<> + struct RingHandlerData { + typedef ::z_owned_ring_handler_sample_t handler_type; + typedef ::z_owned_closure_sample_t closure_type; + static void create_cb_handler_pair(closure_type* cb, handler_type* h, size_t capacity) { + ::z_ring_channel_sample_new(cb, h, capacity); + } + }; + + template<> + struct RingHandlerData { + typedef ::z_owned_ring_handler_query_t handler_type; + typedef ::z_owned_closure_query_t closure_type; + static void create_cb_handler_pair(closure_type* cb, handler_type* h, size_t capacity) { + ::z_ring_channel_query_new(cb, h, capacity); + } + }; + + template<> + struct RingHandlerData { + typedef ::z_owned_ring_handler_reply_t handler_type; + typedef ::z_owned_closure_reply_t closure_type; + static void create_cb_handler_pair(closure_type* cb, handler_type* h, size_t capacity) { + ::z_ring_channel_reply_new(cb, h, capacity); + } + }; +} + +/// @brief A FIFO channel handler +/// @tparam T Data entry type +template +class FifoHandler : public Owned::handler_type> { +public: + using Owned::handler_type>::Owned; + + /// @brief Fetches a data entry from the handler's buffer. If buffer is empty will block until new data entry arrives + /// @return A pair containing a received data entry (will be in null state if there is no more data in the buffer and the steam is inactive), + /// and a bool flag indicating whether handler's stream is still active, i.e. if there is still possibility to fetch more data in the future. + std::pair recv() const { + std::pair p(nullptr, false); + p.second = ::z_recv(this->loan(), zenoh::detail::as_owned_c_ptr(p.first)); + return p; + } + + /// @brief Fetches a data entry from the handler's buffer. If buffer is empty will immediately return (with data entry in null state). + /// @return A pair containing a received data entry (will be in null state if there is no more data in the buffer), + /// and a bool flag indicating whether handler's stream is still active, i.e. if there is still possibility to fetch more data in the future. + std::pair try_recv() const { + std::pair p(nullptr, false); + p.second = ::z_try_recv(this->loan(), zenoh::detail::as_owned_c_ptr(p.first)); + return p; + } +}; + +/// @brief A circular buffer channel handler +/// @tparam T Data entry type +template +class RingHandler : public Owned::handler_type> { +public: + using Owned::handler_type>::Owned; + + /// @brief Fetches a data entry from the handler's buffer. If buffer is empty will block until new data entry arrives + /// @return A pair containing a received data entry (will be in null state if there is no more data in the buffer and the steam is inactive), + /// and a bool flag indicating whether handler's stream is still active, i.e. if there is still possibility to fetch more data in the future. + std::pair recv() const { + std::pair p(nullptr, false); + p.second = ::z_recv(this->_0, zenoh::detail::as_owned_c_ptr(p.first)); + return p; + } + + /// @brief Fetches a data entry from the handler's buffer. If buffer is empty will immediately return (with data entry in null state). + /// @return A pair containing a received data entry (will be in null state if there is no more data in the buffer), + /// and a bool flag indicating whether handler's stream is still active, i.e. if there is still possibility to fetch more data in the future. + std::pair try_recv() const { + std::pair p(nullptr, false); + p.second = ::z_try_recv(this->_0, zenoh::detail::as_owned_c_ptr(p.first)); + return p; + } +}; + +/// @brief A FIFO channel +class FifoChannel { + size_t _capacity; +public: + /// @brief Constructor + /// @param capacity Maximum number of entries in FIFO buffer of the channel. When the buffer is full all + /// new incoming entries will be ignored. + FifoChannel(size_t capacity) + :_capacity(capacity) + {} + + template + using HandlerType = FifoHandler; + + /// @brief Convert channel into a pair of zenoh callback and handler for the specified type. + /// @tparam T Entry Type + /// @return A callback-handler pair + template + std::pair::closure_type, HandlerType> into_cb_handler_pair() const { + typename detail::FifoHandlerData::closure_type c_closure; + FifoHandler h(nullptr); + detail::FifoHandlerData::create_cb_handler_pair(&c_closure, zenoh::detail::as_owned_c_ptr(h), _capacity); + return {c_closure, std::move(h)}; + } +}; + +/// @brief A circular buffer channel +class RingChannel { + size_t _capacity; +public: + /// @brief Constructor + /// @param capacity Maximum number of entries in circular buffer of the channel. When the buffer is full the older entries + /// will be removed to provide space for the new ones + RingChannel(size_t capacity) + :_capacity(capacity) + {} + + template + using HandlerType = RingHandler; + + /// @brief Convert channel into a pair of zenoh callback and handler for the specified type. + /// @tparam T Entry Type + /// @return A callback-handler pair + template + std::pair::closure_type, HandlerType> into_cb_handler_pair() const { + typename detail::RingHandlerData::closure_type c_closure; + FifoHandler h(nullptr); + detail::RingHandlerData::create_cb_handler_pair(&c_closure, zenoh::detail::as_owned_c_ptr(h), _capacity); + return {c_closure, std::move(h)}; + } +}; + +} \ No newline at end of file diff --git a/include/zenoh/api/closures.hxx b/include/zenoh/api/closures.hxx new file mode 100644 index 00000000..2f9a6ac9 --- /dev/null +++ b/include/zenoh/api/closures.hxx @@ -0,0 +1,19 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#pragma once + +namespace zenoh::closures { + inline void none() {}; + using None = decltype(&none); +} \ No newline at end of file diff --git a/include/zenoh/api/config.hxx b/include/zenoh/api/config.hxx new file mode 100644 index 00000000..88df5645 --- /dev/null +++ b/include/zenoh/api/config.hxx @@ -0,0 +1,154 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + + +#pragma once + +#include "base.hxx" +#include +#include + + + + +namespace zenoh { +/// A Zenoh Session config +class Config : public Owned<::z_owned_config_t> { +public: + using Owned::Owned; + + /// @name Constructors + + /// @brief Create a default configuration + Config() : Owned(nullptr) { + ::z_config_default(&this->_0); + } + +#ifdef ZENOHCXX_ZENOHC + /// @brief Get config parameter by the string key + /// @param key the key + /// @return value of the config parameter + /// @note zenoh-c only + std::string get(std::string_view key, ZError* err = nullptr) const { + ::z_owned_string_t s; + __ZENOH_ERROR_CHECK( + ::zc_config_get_from_substring(this->loan(), key.data(), key.size(), &s), + err, + std::string("Failed to get config value for the key: ").append(key) + ); + std::string out = std::string(::z_string_data(::z_loan(s)), ::z_string_len(::z_loan(s))); + ::z_drop(::z_move(s)); + return out; + } + + /// @brief Get the whole config as a JSON string + /// @return the JSON string in ``Str`` + /// @note zenoh-c only + std::string to_string() const { + ::z_owned_string_t s; + ::zc_config_to_string(this->loan(), &s); + std::string out = std::string(::z_string_data(::z_loan(s)), ::z_string_len(::z_loan(s))); + ::z_drop(::z_move(s)); + return out; + } + + /// @brief Insert a config parameter by the string key + /// @param key the key + /// @param value the JSON string value + /// @return true if the parameter was inserted + /// @note zenoh-c only + bool insert_json(const std::string& key, const std::string& value) { + return ::zc_config_insert_json(loan(), key.c_str(), value.c_str()) == 0; + } +#endif +#ifdef ZENOHCXX_ZENOHPICO + /// @brief Get config parameter by it's numeric ID + /// @param key the key + /// @return pointer to the null-terminated string value of the config parameter + /// @note zenoh-pico only + const char* get(uint8_t key) const { return ::zp_config_get(loan(), key); } + + /// @brief Insert a config parameter by it's numeric ID + /// @param key the key + /// @param value the null-terminated string value + /// @return true if the parameter was inserted + /// @note zenoh-pico only + bool insert(uint8_t key, const char* value); + + /// @brief Insert a config parameter by it's numeric ID + /// @param key the key + /// @param value the null-terminated string value + /// @param error the error code + /// @return true if the parameter was inserted + /// @note zenoh-pico only + bool insert(uint8_t key, const char* value, ErrNo& error); +#endif + +#ifdef ZENOHCXX_ZENOHC + /// @brief Create the default configuration for "peer" mode + /// @return the ``Config`` object + /// @note zenoh-c only + static Config peer() { + Config c(nullptr) ; + ::z_config_peer(&c._0); + return c; + } + + /// @brief Create the configuration from the JSON file + /// @param path path to the file + /// @return the ``Config`` object + /// @note zenoh-c only + static Config from_file(const std::string& path, ZError* err = nullptr) { + Config c(nullptr) ; + __ZENOH_ERROR_CHECK( + ::zc_config_from_file(&c._0, path.data()), + err, + std::string("Failed to create config from: ").append(path) + ); + return c; + } + + /// @brief Create the configuration from the JSON string + /// @param s the JSON string + /// @return the ``Config`` object + /// @note zenoh-c only + static Config from_str(const std::string& s, ZError* err = nullptr) { + Config c(nullptr) ; + __ZENOH_ERROR_CHECK( + ::zc_config_from_str(&c._0, s.data()), + err, + std::string("Failed to create config from: ").append(s) + ); + return c; + } + /// @brief Create the configuration for "client" mode + /// @param peers the array of peer endpoints + /// @return the ``Config`` object + /// @note zenoh-c only + static Config client(const std::vector& peers, ZError* err = nullptr) { + Config c(nullptr) ; + std::vector p; + p.reserve(peers.size()); + for (const auto& peer: peers) { + p.push_back(peer.c_str()); + } + __ZENOH_ERROR_CHECK( + ::z_config_client(&c._0, p.data(), p.size()), + err, + "Failed to create client config" + ); + return c; + } + #endif +}; +} \ No newline at end of file diff --git a/include/zenoh/api/encoding.hxx b/include/zenoh/api/encoding.hxx new file mode 100644 index 00000000..13669336 --- /dev/null +++ b/include/zenoh/api/encoding.hxx @@ -0,0 +1,49 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#pragma once +#include "base.hxx" +#include "../zenohc.hxx" + +#include + +namespace zenoh { +class Encoding : public Owned<::z_owned_encoding_t> { +public: + using Owned::Owned; + + /// @name Constructors + + /// @brief Default encoding + Encoding() : Owned(nullptr) {} + + /// @brief Constructs encoding from string + Encoding(std::string_view s, ZError* err = nullptr) : Owned(nullptr) { + __ZENOH_ERROR_CHECK( + ::z_encoding_from_substring(&this->_0, s.data(), s.size()), + err, + std::string("Failed to create encoding from ").append(s) + ); + } + + /// @brief Converts encoding to a string + std::string as_string() const { + ::z_owned_string_t s; + ::z_encoding_to_string(this->loan(), &s); + std::string out = std::string(::z_string_data(::z_loan(s)), ::z_string_len(::z_loan(s))); + ::z_drop(::z_move(s)); + return out; + } +}; + +} \ No newline at end of file diff --git a/include/zenoh/api/enums.hxx b/include/zenoh/api/enums.hxx new file mode 100644 index 00000000..4a70a40a --- /dev/null +++ b/include/zenoh/api/enums.hxx @@ -0,0 +1,104 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#pragma once +#include "../zenohc.hxx" +#include + +namespace zenoh { + + +/// ``zenoh::Sample`` kind values. +/// +/// Values: +/// +/// - **Z_SAMPLE_KIND_PUT**: The Sample was issued by a "put" operation. +/// - **Z_SAMPLE_KIND_DELETE**: The Sample was issued by a "delete" operation. +typedef ::z_sample_kind_t SampleKind; + +/// Consolidation mode values. +/// +/// Values: +/// - **Z_CONSOLIDATION_MODE_AUTO**: Let Zenoh decide the best consolidation mode depending on the query +/// selector. +/// - **Z_CONSOLIDATION_MODE_NONE**: No consolidation is applied. Replies may come in any order and any +/// number. +/// - **Z_CONSOLIDATION_MODE_MONOTONIC**: It guarantees that any reply for a given key expression will be +/// monotonic in time +/// w.r.t. the previous received replies for the same key expression. I.e., for the same key expression +/// multiple replies may be received. It is guaranteed that two replies received at t1 and t2 will have +/// timestamp ts2 > ts1. It optimizes latency. +/// - **Z_CONSOLIDATION_MODE_LATEST**: It guarantees unicity of replies for the same key expression. +/// It optimizes bandwidth. +typedef ::z_consolidation_mode_t ConsolidationMode; + +/// Reliability values. +/// +/// Values: +/// - **Z_RELIABILITY_BEST_EFFORT**: Defines reliability as "best effort" +/// - **Z_RELIABILITY_RELIABLE**: Defines reliability as "reliable" +typedef ::z_reliability_t Reliability; + +/// Congestion control values. +/// +/// Values: +/// - **Z_CONGESTION_CONTROL_BLOCK**: Defines congestion control as "block". Messages are not dropped in case of +/// congestion control +/// - **Z_CONGESTION_CONTROL_DROP**: Defines congestion control as "drop". Messages are dropped in case of +/// congestion control +/// +typedef ::z_congestion_control_t CongestionControl; + +/// Priority of Zenoh messages values. +/// +/// Values: +/// - **Z_PRIORITY_REAL_TIME**: Priority for "realtime" messages. +/// - **Z_PRIORITY_INTERACTIVE_HIGH**: Highest priority for "interactive" messages. +/// - **Z_PRIORITY_INTERACTIVE_LOW**: Lowest priority for "interactive" messages. +/// - **Z_PRIORITY_DATA_HIGH**: Highest priority for "data" messages. +/// - **Z_PRIORITY_DATA**: Default priority for "data" messages. +/// - **Z_PRIORITY_DATA_LOW**: Lowest priority for "data" messages. +/// - **Z_PRIORITY_BACKGROUND**: Priority for "background traffic" messages. +typedef ::z_priority_t Priority; + +/// Query target values. +/// +/// Values: +/// - **Z_QUERY_TARGET_BEST_MATCHING**: The nearest complete queryable if any else all matching queryables. +/// - **Z_QUERY_TARGET_ALL**: All matching queryables. +/// - **Z_QUERY_TARGET_ALL_COMPLETE**: A set of complete queryables. +typedef ::z_query_target_t QueryTarget; + +typedef ::z_whatami_t WhatAmI; + +/// @brief Returns a string representation of the given ``c::WhatAmI`` +/// (or the ``zenohpico::WhatAmI``) value. +/// @param whatami the ``c::WhatAmI`` / ``zenohpico::WhatAmI`` value +/// @return a string representation of the given value +inline std::string_view whatami_as_str(WhatAmI whatami) { + ::z_view_string_t str_out; + ::z_whatami_to_str(whatami, &str_out); + return std::string_view(::z_string_data(::z_loan(str_out)), ::z_string_len(::z_loan(str_out))); +} + +#ifdef ZENOHCXX_ZENOHC +/// The locality of samples to be received by subscribers or targeted by publishers. +/// +/// Values: +/// - **ZCU_LOCALITY_ANY**: Any. +/// - **ZCU_LOCALITY_SESSION_LOCAL**: Only from local sessions. +/// - **ZCU_LOCALITY_SESSION_REMOTE**: Only from remote sessions. +typedef ::zcu_locality_t Locality; +#endif + +} \ No newline at end of file diff --git a/include/zenoh/api/hello.hxx b/include/zenoh/api/hello.hxx new file mode 100644 index 00000000..ecdaa343 --- /dev/null +++ b/include/zenoh/api/hello.hxx @@ -0,0 +1,53 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#pragma once +#include "base.hxx" +#include "../zenohc.hxx" +#include "enums.hxx" +#include "id.hxx" + +#include +#include + +namespace zenoh { +/// ``Hello`` message returned by a zenoh entity as a reply to a "scout" +/// message. +class Hello : public Owned<::z_owned_hello_t> { +public: + using Owned::Owned; + + /// @name Methods + + /// @brief Get ``Id`` of the entity + /// @return ``Id`` of the entity + Id get_id() const { return ::z_hello_zid(this->loan()); }; + + /// @brief Get the ``zenoh::WhatAmI`` of the entity + /// @return ``zenoh::WhatAmI`` of the entity + WhatAmI get_whatami() const { return ::z_hello_whatami(this->loan()); } + + /// @brief Get the array of locators of the entity + /// @return the array of locators of the entity + std::vector get_locators() const { + ::z_owned_string_array_t out; + ::z_hello_locators(this->loan(), &out); + std::vector locators(::z_string_array_len(::z_loan(out))); + for (size_t i = 0; i < ::z_string_array_len(::z_loan(out)); i++) { + auto s = ::z_string_array_get(::z_loan(out), i); + locators[i] = std::string_view(reinterpret_cast(::z_string_data(s)), ::z_string_len(s)); + } + return locators; + } +}; +} \ No newline at end of file diff --git a/include/zenoh/api/id.hxx b/include/zenoh/api/id.hxx new file mode 100644 index 00000000..55cc7b43 --- /dev/null +++ b/include/zenoh/api/id.hxx @@ -0,0 +1,47 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#pragma once + +#include "base.hxx" +#include "../zenohc.hxx" + +#include +#include +#include + +namespace zenoh { + /// @brief A representation a Zenoh ID. +/// +/// In general, valid Zenoh IDs are LSB-first 128bit unsigned and non-zero integers. +class Id : public Copyable<::z_id_t> { +public: + using Copyable::Copyable; + + /// @name Methods + + /// Checks if the ID is valid + /// @return true if the ID is valid + bool is_valid() const { return _0.id[0] != 0; } + + /// Returns the byte sequence of the ID + const std::array& bytes() const { return *reinterpret_cast*>(&_0.id); } +}; + +inline std::ostream& operator<<(std::ostream& os, const Id& id) { + auto id_ptr = reinterpret_cast(&id)->id; + for (size_t i = 0; id_ptr[i] != 0 && i < 16; i++) + os << std::hex << std::setfill('0') << std::setw(2) << static_cast(id_ptr[i]); + return os; +} +} \ No newline at end of file diff --git a/include/zenoh/api/keyexpr.hxx b/include/zenoh/api/keyexpr.hxx new file mode 100644 index 00000000..adab4381 --- /dev/null +++ b/include/zenoh/api/keyexpr.hxx @@ -0,0 +1,133 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#pragma once +#include "base.hxx" +#include "../zenohc.hxx" + +#include + +namespace zenoh { + +/// @brief Owned key expression. +/// +/// See details about key expression syntax in the Key Expressions RFC. +class KeyExpr : public Owned<::z_owned_keyexpr_t> { + public: + using Owned::Owned; + + /// @name Constructors + + /// @brief Create a new instance from a string + /// + /// @param key_expr String representing key expression + /// @param autocanonize If true the key_expr will be autocanonized, prior to constructing key expression + /// @param err If not null the error code will be written to this location, otherwise exception will be thrown in case of error. + explicit KeyExpr(std::string_view key_expr, bool autocanonize = true, ZError* err = nullptr) + : Owned(nullptr) { + if (autocanonize) { + size_t s = key_expr.size(); + __ZENOH_ERROR_CHECK( + ::z_keyexpr_from_substring_autocanonize(&this->_0, key_expr.data(), &s), + err, + std::string("Failed to construct KeyExpr from: ").append(key_expr) + ); + } else { + __ZENOH_ERROR_CHECK( + ::z_keyexpr_from_substring(&this->_0, key_expr.data(), key_expr.size()), + err, + std::string("Failed to construct KeyExpr from: ").append(key_expr) + ); + } + } + + /// @name Methods + /// @brief Get underlying key expression string + std::string_view as_string_view() const { + ::z_view_string_t s; + ::z_keyexpr_as_view_string(this->loan(), &s); + return std::string_view(reinterpret_cast(::z_string_data(::z_loan(s))), ::z_string_len(::z_loan(s))); + } + + /// @name Operators + + /// @brief Equality operator + /// @param other the ``std::string_view`` to compare with + /// @return true if the key expression is equal to the string + bool operator==(std::string_view other) const { + if (!(*this)) return false; + return as_string_view() == other; + } + + /// @brief InEquality operator + /// @param other the ``std::string_view`` to compare with + /// @return false if the key expression is equal to the string + bool operator!=(std::string_view other) const { + return !((*this) == other); + } + + /// @brief Equality operator + /// @param other the ``KeyExpr`` to compare with + /// @return true if both key expressions are equal + bool operator==(const KeyExpr& other) const { return ::z_keyexpr_equals(this->loan(), other.loan()); } + + /// @brief Inequality operator + /// @param other the ``KeyExpr`` to compare with + /// @return false if both key expressions are equal + bool operator!=(const KeyExpr& other) const { return !(*this == other); } + + /// @brief Checks if a given ``KeyExpr`` includes the other. + /// @param other the ``KeyExpr`` to compare with + /// @return true if other is included in this. + bool includes(const KeyExpr& other) { + return ::z_keyexpr_includes(this->loan(), other.loan()); + } + + /// @brief Constructs new key expression by concatenation this with a string. + KeyExpr concat(std::string_view s, ZError* err = nullptr) const { + KeyExpr k(nullptr); + __ZENOH_ERROR_CHECK( + ::z_keyexpr_concat(&k._0, this->loan(), s.data(), s.size()), + err, + std::string("Failed to concatenate KeyExpr: ").append(this->as_string_view()).append(" with ").append(s) + ); + return k; + } + + /// @brief Constructs new key expression by joining this with another one + KeyExpr join(const KeyExpr& other, ZError* err = nullptr) const { + KeyExpr k(nullptr); + __ZENOH_ERROR_CHECK( + ::z_keyexpr_join(&k._0, this->loan(), other.loan()), + err, + std::string("Failed to join KeyExpr: ").append(this->as_string_view()).append(" with ").append(other.as_string_view()) + ); + return k; + } + + /// @brief Checks if 2 key expressions intersect + /// @return true if there is at least one non-empty key that is contained in both key expressions + bool intersects(const KeyExpr& other) const { return ::z_keyexpr_intersects(this->loan(), other.loan()); } + + typedef ::z_keyexpr_intersection_level_t IntersectionLevel; + + IntersectionLevel relation_to(const KeyExpr& other) { return ::z_keyexpr_relation_to(this->loan(), other.loan()); } + + /// @brief Verifies if the string is a canonical key expression + static bool is_canon(std::string_view s) { + return ::z_keyexpr_is_canon(s.data(), s.size()) == Z_OK; + } +}; + +} \ No newline at end of file diff --git a/include/zenoh/api/publisher.hxx b/include/zenoh/api/publisher.hxx new file mode 100644 index 00000000..0171c07a --- /dev/null +++ b/include/zenoh/api/publisher.hxx @@ -0,0 +1,89 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#pragma once + +#include "base.hxx" +#include "../detail/interop.hxx" +#include "enums.hxx" +#include "bytes.hxx" +#include "encoding.hxx" +#include "keyexpr.hxx" +#include "timestamp.hxx" + +#include + + +namespace zenoh { +/// An Zenoh publisher. Constructed by ``Session::declare_publisher`` method +class Publisher : public Owned<::z_owned_publisher_t> { +public: + using Owned::Owned; + + /// @brief Options to be passed to ``Publisher::put()`` operation + struct PutOptions { + /// @brief the timestamp of this message + std::optional timestamp = {}; + /// @brief The encoding of the data to publish. + std::optional encoding = {}; + /// @brief The attachment to attach to the publication. + std::optional attachment = {}; + + /// @brief Returns default option settings + static PutOptions create_default() { return {}; } + }; + + /// @brief Options to be passed to ``Publisher::delete_resource()`` operation + struct DeleteOptions { + /// @brief the timestamp of this message + std::optional timestamp = {}; + + /// @brief Returns default option settings + static DeleteOptions create_default() { return {}; } + }; + + /// @name Methods + + /// @brief Publish the payload + /// @param payload ``Payload`` to publish + /// @param options Optional values passed to put operation + /// @return 0 in case of success, negative error code otherwise + ZError put(Bytes&& payload, PutOptions&& options = PutOptions::create_default()) const { + auto payload_ptr = detail::as_owned_c_ptr(payload); + ::z_publisher_put_options_t opts; + z_publisher_put_options_default(&opts); + opts.encoding = detail::as_owned_c_ptr(options.encoding); + opts.attachment = detail::as_owned_c_ptr(options.attachment); + opts.timestamp = detail::as_copyable_c_ptr(options.timestamp); + + return ::z_publisher_put(this->loan(), payload_ptr, &opts); + } + + /// @brief Undeclare the resource associated with the publisher key expression + /// @param options Optional values to pass to delete operation + /// @return 0 in case of success, negative error code otherwise + ZError delete_resource(DeleteOptions&& options = DeleteOptions::create_default()) const { + ::z_publisher_delete_options_t opts; + z_publisher_delete_options_default(&opts); + opts.timestamp = detail::as_copyable_c_ptr(options.timestamp); + return ::z_publisher_delete(this->loan(), &opts); + } + +#ifdef ZENOHCXX_ZENOHC + /// @brief Get the key expression of the publisher + decltype(auto) get_keyexpr() const { + return detail::as_owned_cpp_obj(::z_publisher_keyexpr(this->loan())); + } +#endif +}; +} diff --git a/include/zenoh/api/query.hxx b/include/zenoh/api/query.hxx new file mode 100644 index 00000000..27d401b6 --- /dev/null +++ b/include/zenoh/api/query.hxx @@ -0,0 +1,111 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + + +#pragma once + +#include "base.hxx" +#include "../detail/interop.hxx" +#include "enums.hxx" +#include "keyexpr.hxx" +#include "bytes.hxx" +#include "value.hxx" + +#include +#include + + +namespace zenoh { +/// The query to be answered by a ``Queryable`` +class Query : public Owned<::z_owned_query_t> { +public: + using Owned::Owned; + + /// @name Methods + + /// @brief Get the key expression of the query + /// @return ``KeyExpr`` value + decltype(auto) get_keyexpr() const { return detail::as_owned_cpp_obj(::z_query_keyexpr(this->loan())); } + + /// @brief Get a query's parameters + /// + /// @return Parameters string + std::string_view get_parameters() const { + ::z_view_string_t p; + ::z_query_parameters(this->loan(), &p); + return std::string_view(::z_string_data(::z_loan(p)), ::z_string_len(::z_loan(p))); + } + + /// @brief Get the value of the query (payload and encoding) + /// @return ``Value`` value + decltype(auto) get_value() const { return detail::as_owned_cpp_obj(::z_query_value(this->loan())); } + + /// @brief Checks if query contains an attachment + /// @return ``True`` if query contains an attachment + bool has_attachment() const { return ::z_query_attachment(this->loan()) != nullptr; } + + /// @brief Get the attachment of the query + /// @return Attachment + decltype(auto) get_attachment() const { return detail::as_owned_cpp_obj(::z_query_attachment(this->loan())); } + + /// @brief Options passed to the ``Query::reply`` operation + struct ReplyOptions { + /// @brief An optional encoding of this reply payload and/or attachment + std::optional encoding = {}; + /// @brief An optional attachment to this reply. + std::optional attachment = {}; + + /// @brief Returns default option settings + static ReplyOptions create_default() { return {}; } + }; + + /// @brief Send reply to a query + void reply(const KeyExpr& key_expr, Bytes&& payload, ReplyOptions&& options = ReplyOptions::create_default(), ZError* err = nullptr) const { + auto payload_ptr = detail::as_owned_c_ptr(payload); + ::z_query_reply_options_t opts; + z_query_reply_options_default(&opts); + opts.encoding = detail::as_owned_c_ptr(options.encoding); + opts.attachment = detail::as_owned_c_ptr(options.attachment); + + __ZENOH_ERROR_CHECK( + ::z_query_reply(this->loan(), detail::loan(key_expr), payload_ptr, &opts), + err, + "Failed to send reply" + ); + } + + /// @brief Options passed to the ``Query::reply_err()`` operation + struct ReplyErrOptions { + /// @brief An optional encoding of the reply error payload + std::optional encoding = {}; + + /// @brief Returns default option settings + static ReplyErrOptions create_default() { return {}; } + }; + + /// @brief Send error to a query + void reply_err(Bytes&& payload, ReplyErrOptions&& options = ReplyErrOptions::create_default(), ZError* err = nullptr) const { + auto payload_ptr = detail::as_owned_c_ptr(payload); + ::z_query_reply_err_options_t opts; + z_query_reply_err_options_default(&opts); + opts.encoding = detail::as_owned_c_ptr(options.encoding); + + __ZENOH_ERROR_CHECK( + ::z_query_reply_err(this->loan(), payload_ptr, &opts), + err, + "Failed to send error" + ); + } +}; + +} \ No newline at end of file diff --git a/include/zenoh/api/query_consolidation.hxx b/include/zenoh/api/query_consolidation.hxx new file mode 100644 index 00000000..2d5a5e87 --- /dev/null +++ b/include/zenoh/api/query_consolidation.hxx @@ -0,0 +1,48 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#pragma once + +#include "enums.hxx" +#include "base.hxx" + +namespace zenoh { + +/// Replies consolidation mode to apply on replies of get operation +struct QueryConsolidation : Copyable<::z_query_consolidation_t> { + using Copyable::Copyable; + + /// @name Constructors + + /// @brief Create a new default ``QueryConsolidation`` value + QueryConsolidation() : Copyable(::z_query_consolidation_default()) {} + + /// @brief Create a new ``QueryConsolidation`` value with the given consolidation mode + /// @param v ``zenoh::ConsolidationMode`` value + QueryConsolidation(ConsolidationMode v) : Copyable({v}) {} + + /// @name Methods + + /// @name Operators + + /// @brief Equality operator + /// @param v the other ``QueryConsolidation`` to compare with + /// @return true if the two values are equal (have the same consolidation mode) + bool operator==(const QueryConsolidation& v) const { return this->_0.mode == v._0.mode; } + + /// @brief Inequality operator + /// @param v the other ``QueryConsolidation`` to compare with + /// @return true if the two values are not equal (have different consolidation mode) + bool operator!=(const QueryConsolidation& v) const { return !operator==(v); } +}; +} \ No newline at end of file diff --git a/include/zenoh/api/queryable.hxx b/include/zenoh/api/queryable.hxx new file mode 100644 index 00000000..3f2cfdab --- /dev/null +++ b/include/zenoh/api/queryable.hxx @@ -0,0 +1,47 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#pragma once + +#include "base.hxx" + +namespace zenoh { +class QueryableBase: public Owned<::z_owned_queryable_t> { +public: + using Owned::Owned; +}; + +/// A Zenoh queryable. Constructed by ``Session::declare_queryable`` method +template +class Queryable: public QueryableBase { + Handler _handler; +public: + /// @name Constructors + + /// @brief Constructs from queryable and handler + Queryable(QueryableBase queryable, Handler handler) + :QueryableBase(std::move(queryable)), _handler(std::move(handler)) { + } + + /// @name Methods + /// @brief Returns handler to queryable data stream + const Handler& handler() const { return _handler; }; +}; + +template<> +class Queryable :public QueryableBase { +public: + using QueryableBase::QueryableBase; +}; + +} \ No newline at end of file diff --git a/include/zenoh/api/reply.hxx b/include/zenoh/api/reply.hxx new file mode 100644 index 00000000..db9d2f21 --- /dev/null +++ b/include/zenoh/api/reply.hxx @@ -0,0 +1,53 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#pragma once + +#include "base.hxx" +#include "../detail/interop.hxx" +#include "sample.hxx" +#include "value.hxx" + + +namespace zenoh { +/// A reply from queryable to ``Session::get`` operation +class Reply : public Owned<::z_owned_reply_t> { +public: + using Owned::Owned; + + /// @name Methods + + /// @brief Check if the reply is OK + /// @return true if the reply is OK, false if contains a error + bool is_ok() const { return ::z_reply_is_ok(this->loan()); } + + /// @brief Get the reply value. Will throw an exception if ``Reply::is_ok`` returns false. + /// @return Reply sample. + decltype(auto) get_ok() const { + if (!::z_reply_is_ok(this->loan())) { + throw ZException("Reply data sample was requested, but reply contains error", Z_EINVAL); + } + return detail::as_owned_cpp_obj(::z_reply_ok(this->loan())); + } + + /// @brief Get the reply error. Will throw an exception if ``Reply::is_ok`` returns false. + /// @return Reply error. + decltype(auto) get_err() const { + if (::z_reply_is_ok(this->loan())) { + throw ZException("Reply error was requested, but reply contains data sample", Z_EINVAL); + } + return detail::as_owned_cpp_obj(::z_reply_err(this->loan())); + } +}; + +} \ No newline at end of file diff --git a/include/zenoh/api/sample.hxx b/include/zenoh/api/sample.hxx new file mode 100644 index 00000000..57f22a14 --- /dev/null +++ b/include/zenoh/api/sample.hxx @@ -0,0 +1,81 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#pragma once + +#include "base.hxx" +#include "../detail/interop.hxx" +#include "enums.hxx" +#include "keyexpr.hxx" +#include "bytes.hxx" +#include "encoding.hxx" +#include "timestamp.hxx" + +namespace zenoh { +/// @brief A data sample. +/// +/// A sample is the value associated to a given resource at a given point in time. +class Sample : public Owned<::z_owned_sample_t> { +public: + using Owned::Owned; + + /// @name Methods + + /// @brief The resource key of this data sample. + /// @return ``KeyExpr`` object representing the resource key + decltype(auto) get_keyexpr() const { return detail::as_owned_cpp_obj(::z_sample_keyexpr(this->loan())); } + + /// @brief The data of this data sample + /// @return ``Bytes`` object representing the sample payload + decltype(auto) get_payload() const { return detail::as_owned_cpp_obj(::z_sample_payload(this->loan())); } + + /// @brief The encoding of the data of this data sample + /// @return ``Encoding`` object + decltype(auto) get_encoding() const { return detail::as_owned_cpp_obj(::z_sample_encoding(this->loan())); } + + /// @brief The kind of this data sample (PUT or DELETE) + /// @return ``zenoh::SampleKind`` value + SampleKind get_kind() const { return ::z_sample_kind(this->loan()); } + + /// @brief Checks if sample contains an attachment + /// @return ``True`` if sample contains an attachment + bool has_attachment() const { return ::z_sample_attachment(this->loan()) != nullptr; } + + /// @brief The attachment of this data sample + /// @return ``Bytes`` object + decltype(auto) get_attachment() const { return detail::as_owned_cpp_obj(::z_sample_attachment(this->loan())); } + + /// @brief The timestamp of this data sample + /// @return ``Timestamp`` object + decltype(auto) get_timestamp() const { return detail::as_copyable_cpp_obj(::z_sample_timestamp(this->loan())); } + + /// @brief The priority this data sample was sent with + /// @return ``Priority`` value + Priority get_priority() const { return ::z_sample_priority(this->loan()); } + + /// @brief The congestion control setting this data sample was sent with + /// @return ``CongestionControl`` value + CongestionControl get_congestion_control() const { return ::z_sample_congestion_control(this->loan()); } + + /// @brief The express setting this data sample was sent with + /// @return ``CongestionControl`` value + bool get_express() const { return ::z_sample_express(this->loan()); } + + /// @brief Constructs a shallow copy of this Sample + Sample clone() const { + Sample s(nullptr); + ::z_sample_clone(this->loan(), &s._0); + return s; + }; +}; +} \ No newline at end of file diff --git a/include/zenoh/api/scout.hxx b/include/zenoh/api/scout.hxx new file mode 100644 index 00000000..6f8e666e --- /dev/null +++ b/include/zenoh/api/scout.hxx @@ -0,0 +1,54 @@ + + +#pragma once + +#include "base.hxx" +#include "../detail/interop.hxx" +#include "../detail/closures_concrete.hxx" + +#include "enums.hxx" +#include "config.hxx" + +namespace zenoh { + +/// @brief Options to be passed to ``scout()`` operation +struct ScoutOptions { + /// @brief The maximum duration in ms the scouting can take. + size_t timeout_ms = 1000; + /// @brief Type of entities to scout for. + WhatAmI what = WhatAmI::Z_WHATAMI_ROUTER_PEER; + + /// @brief Returns default option settings + static ScoutOptions create_default() { return {}; } +}; + +/// @brief Scout for zenoh entities in the network +/// @param config ``ScoutingConfig`` to use +/// @param on_hello The callback to process each received ``Hello``message +/// @param on_drop The callback that will be called once all hello ``Hello``messages are received +template +void scout(Config&& config, C&& on_hello, D&& on_drop, ScoutOptions&& options = ScoutOptions::create_default(), ZError* err = nullptr) { + static_assert( + std::is_invocable_r::value, + "on_hello should be callable with the following signature: void on_hello(const zenoh::Hello& hello)" + ); + static_assert( + std::is_invocable_r::value, + "on_drop should be callable with the following signature: void on_drop()" + ); + ::z_owned_closure_hello_t c_closure; + using ClosureType = typename detail::closures::Closure; + auto closure = ClosureType::into_context(std::forward(on_hello), std::forward(on_drop)); + ::z_closure(&c_closure, detail::closures::_zenoh_on_hello_call, detail::closures::_zenoh_on_drop, closure); + ::z_scout_options_t opts; + opts.zc_timeout_ms = options.timeout_ms; + opts.zc_what = options.what; + + __ZENOH_ERROR_CHECK( + ::z_scout(detail::as_owned_c_ptr(config), ::z_move(c_closure), &opts), + err, + "Failed to perform scout operation" + ); +} + +} \ No newline at end of file diff --git a/include/zenoh/api/session.hxx b/include/zenoh/api/session.hxx new file mode 100644 index 00000000..d63d26e3 --- /dev/null +++ b/include/zenoh/api/session.hxx @@ -0,0 +1,563 @@ + +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#pragma once + +#include "base.hxx" +#include "../detail/interop.hxx" +#include "../detail/closures_concrete.hxx" + +#include "enums.hxx" +#include "config.hxx" +#include "id.hxx" +#include "publisher.hxx" +#include "subscriber.hxx" +#include "queryable.hxx" +#include "query_consolidation.hxx" +#include "closures.hxx" + +#include + + + +namespace zenoh { +/// A Zenoh session. +class Session : public Owned<::z_owned_session_t> { +public: + using Owned::Owned; + + Session(Config&& config, ZError* err = nullptr) : Owned(nullptr) { + __ZENOH_ERROR_CHECK( + ::z_open(&this->_0, detail::as_owned_c_ptr(config)), + err, + "Failed to open session" + ); + } + + /// @name Methods + +#ifdef ZENOHCXX_ZENOHC + /// @brief Create a shallow copy of the session + /// @return a new ``Session`` instance + /// @note zenoh-c only + Session clone() const { + Session s(nullptr); + ::zc_session_clone(this->loan(), &s._0); + return s; + } +#endif + + /// @brief Get the unique identifier of the zenoh node associated to this ``Session`` + /// @return the unique identifier ``Id`` + Id get_zid() const { + return Id(::z_info_zid(this->loan())); + } + + /// @brief Create ``KeyExpr`` instance with numeric id registered in ``Session`` routing tables + /// @param key_expr ``KeyExpr`` to declare + /// @return Declared ``KeyExpr`` instance + KeyExpr declare_keyexpr(const KeyExpr& key_expr, ZError* err = nullptr) const { + KeyExpr k(nullptr); + __ZENOH_ERROR_CHECK( + ::z_declare_keyexpr(detail::as_owned_c_ptr(k), this->loan(), detail::loan(key_expr)), + err, + std::string("Failed to declare key expression: ").append(k.as_string_view()) + ); + return k; + } + + /// @brief Remove ``KeyExpr`` instance from ``Session`` and drop ``KeyExpr`` instance + /// @param keyexpr ``KeyExpr`` instance to drop + void undeclare_keyexpr(KeyExpr&& key_expr, ZError* err = nullptr) const { + __ZENOH_ERROR_CHECK( + ::z_undeclare_keyexpr(detail::as_owned_c_ptr(key_expr), this->loan()), + err, + "Failed to undeclare key expression" + ); + } + + /// @brief Options passed to the ``get()`` operation + struct GetOptions { + /// @brief The Queryables that should be target of the query. + QueryTarget target = QueryTarget::Z_QUERY_TARGET_ALL; + /// @brief The replies consolidation strategy to apply on replies to the query. + QueryConsolidation consolidation = QueryConsolidation(); + /// @brief An optional payload of the query. + std::optional payload = {}; + /// @brief An optional encoding of the query payload and/or attachment. + std::optional encoding = {}; + /// @brief An optional attachment to the query. + std::optional attachment = {}; + /// @brief The timeout for the query in milliseconds. 0 means default query timeout from zenoh configuration. + uint64_t timeout_ms = 0; + + /// @brief Returns default option settings + static GetOptions create_default() { return {}; } + }; + + /// @brief Query data from the matching queryables in the system. Replies are provided through a callback function. + /// @param key_expr ``KeyExpr`` the key expression matching resources to query + /// @param parameters the parameters string in URL format + /// @param on_reply callback that will be called once for each received reply + /// @param on_drop callback that will be called once all replies are received + /// @param options ``GetOptions`` query options + template + void get( + const KeyExpr& key_expr, const std::string& parameters, C&& on_reply, D&& on_drop, + GetOptions&& options = GetOptions::create_default(), ZError* err = nullptr + ) const { + static_assert( + std::is_invocable_r::value, + "on_reply should be callable with the following signature: void on_reply(const zenoh::Reply& reply)" + ); + static_assert( + std::is_invocable_r::value, + "on_drop should be callable with the following signature: void on_drop()" + ); + ::z_owned_closure_reply_t c_closure; + using ClosureType = typename detail::closures::Closure; + auto closure = ClosureType::into_context(std::forward(on_reply), std::forward(on_drop)); + ::z_closure(&c_closure, detail::closures::_zenoh_on_reply_call, detail::closures::_zenoh_on_drop, closure); + ::z_get_options_t opts; + z_get_options_default(&opts); + opts.target = options.target; + opts.consolidation = static_cast(options.consolidation); + opts.payload = detail::as_owned_c_ptr(options.payload); + opts.encoding = detail::as_owned_c_ptr(options.encoding); + opts.attachment = detail::as_owned_c_ptr(options.attachment); + opts.timeout_ms = options.timeout_ms; + + __ZENOH_ERROR_CHECK( + ::z_get(this->loan(), detail::loan(key_expr), parameters.c_str(), ::z_move(c_closure), &opts), + err, + "Failed to perform get operation" + ); + } + + /// @brief Query data from the matching queryables in the system. Replies are provided through a channel. + /// @tparam Channel the type of channel used to create stream of data. + /// @param key_expr the key expression matching resources to query + /// @param parameters the parameters string in URL format + /// @param channel Channel instance + /// @param options query options + /// @return Reply handler + template + typename Channel::template HandlerType get( + const KeyExpr& key_expr, const std::string& parameters, Channel channel, GetOptions&& options = GetOptions::create_default(), ZError* err = nullptr + ) const { + auto cb_handler_pair = channel.template into_cb_handler_pair(); + ::z_get_options_t opts; + z_get_options_default(&opts); + opts.target = options.target; + opts.consolidation = static_cast(options.consolidation); + opts.payload = detail::as_owned_c_ptr(options.payload); + opts.encoding = detail::as_owned_c_ptr(options.encoding); + opts.attachment = detail::as_owned_c_ptr(options.attachment); + opts.timeout_ms = options.timeout_ms; + + ZError res = ::z_get(this->loan(), detail::loan(key_expr), parameters.c_str(), ::z_move(cb_handler_pair.first), &opts); + __ZENOH_ERROR_CHECK( + res, + err, + "Failed to perform get operation" + ); + if (res != Z_OK) ::z_drop(::z_move(*detail::as_owned_c_ptr(cb_handler_pair.second))); + return std::move(cb_handler_pair.second); + } + /// @brief Options to be passed to ``delete_resource()`` operation + struct DeleteOptions { + /// @brief The priority of the delete message. + Priority priority = Priority::Z_PRIORITY_DATA; + /// @brief The congestion control to apply when routing delete message. + CongestionControl congestion_control = CongestionControl::Z_CONGESTION_CONTROL_DROP; + /// @brief Whether Zenoh will NOT wait to batch delete message with others to reduce the bandwith. + bool is_express = false; + /// @brief the timestamp of this message + std::optional timestamp = {}; + + /// @brief Returns default option settings + static DeleteOptions create_default() { return {}; } + }; + + /// @brief Undeclare a resource. Equal to ``Publisher::delete_resource`` + /// @param key_expr ``KeyExprView`` the key expression to delete the resource + /// @param options ``DeleteOptions`` delete options + /// @return 0 in case of success, negative error code otherwise + ZError delete_resource(const KeyExpr& key_expr, DeleteOptions&& options = DeleteOptions::create_default()) const { + ::z_delete_options_t opts; + z_delete_options_default(&opts); + opts.congestion_control = options.congestion_control; + opts.priority = options.priority; + opts.is_express = options.is_express; + + return ::z_delete(this->loan(), detail::loan(key_expr), &opts); + } + + /// @brief Options passed to the ``put()`` operation + struct PutOptions { + /// @brief The priority of this message. + Priority priority = Priority::Z_PRIORITY_DATA; + /// @brief The congestion control to apply when routing this message. + CongestionControl congestion_control = CongestionControl::Z_CONGESTION_CONTROL_DROP; + /// @brief Whether Zenoh will NOT wait to batch this message with others to reduce the bandwith. + bool is_express = false; + #ifdef ZENOHCXX_ZENOHC + /// @brief Allowed destination + Locality allowed_destination = ::zcu_locality_default(); + #endif + /// @brief the timestamp of this message + std::optional timestamp = {}; + /// @brief An optional encoding of the message payload and/or attachment. + std::optional encoding = {}; + /// @brief An optional attachment to the message. + std::optional attachment = {}; + + /// @brief Returns default option settings + static PutOptions create_default() { return {}; } + }; + + /// @brief Publish data to the matching subscribers in the system. Equal to ``Publisher::put_owned`` + /// @param key_expr The key expression to put the data + /// @param payload The data to publish + /// @param options Options to pass to put operation + void put(const KeyExpr& key_expr, Bytes&& payload, PutOptions&& options = PutOptions::create_default(), ZError* err = nullptr) const { + ::z_put_options_t opts; + z_put_options_default(&opts); + opts.encoding = detail::as_owned_c_ptr(options.encoding); + opts.congestion_control = options.congestion_control; + opts.priority = options.priority; + opts.is_express = options.is_express; + opts.attachment = detail::as_owned_c_ptr(options.attachment); + opts.timestamp = detail::as_copyable_c_ptr(options.timestamp); + #ifdef ZENOHCXX_ZENOHC + opts.allowed_destination = options.allowed_destination; + #endif + auto payload_ptr = detail::as_owned_c_ptr(payload); + __ZENOH_ERROR_CHECK( + ::z_put(this->loan(), detail::loan(key_expr), payload_ptr, &opts), + err, + "Failed to perform put operation" + ); + } + + /// @brief Options to be passed when declaring a ``Queryable`` + struct QueryableOptions { + /// @brief The completeness of the Queryable. + bool complete = false; + + /// @brief Returns default option settings + static QueryableOptions create_default() { return {}; } + }; + + /// @brief Create a ``Queryable`` object to answer to ``Session::get`` requests + /// @param key_expr The key expression to match the ``Session::get`` requests + /// @param on_query The callback to handle ``Query`` requests. Will be called once for each query + /// @param on_drop The drop callback. Will be called once, when ``Queryable`` is destroyed or undeclared + /// @param options Options passed to queryable declaration + /// @return a ``Queryable`` object + template + Queryable declare_queryable( + const KeyExpr& key_expr, C&& on_query, D&& on_drop, QueryableOptions&& options = QueryableOptions::create_default(), ZError* err = nullptr + ) const { + static_assert( + std::is_invocable_r::value, + "on_query should be callable with the following signature: void on_query(const zenoh::Query& query)" + ); + static_assert( + std::is_invocable_r::value, + "on_drop should be callable with the following signature: void on_drop()" + ); + ::z_owned_closure_query_t c_closure; + using ClosureType = typename detail::closures::Closure; + auto closure = ClosureType::into_context(std::forward(on_query), std::forward(on_drop)); + ::z_closure(&c_closure, detail::closures::_zenoh_on_query_call, detail::closures::_zenoh_on_drop, closure); + ::z_queryable_options_t opts; + z_queryable_options_default(&opts); + opts.complete = options.complete; + + Queryable q(nullptr); + ZError res = ::z_declare_queryable( + detail::as_owned_c_ptr(q), this->loan(), detail::loan(key_expr), ::z_move(c_closure), &opts + ); + __ZENOH_ERROR_CHECK(res, err, "Failed to declare Queryable"); + return q; + } + + /// @brief Create a ``Queryable`` object to answer to ``Session::get`` requests + /// @tparam Channel the type of channel used to create stream of data. + /// @param key_expr The key expression to match the ``Session::get`` requests + /// @param channel An instance of channel + /// @param options Options passed to queryable declaration + /// @return a ``Queryable`` object + template + Queryable> declare_queryable( + const KeyExpr& key_expr, Channel channel, QueryableOptions&& options = QueryableOptions::create_default(), ZError* err = nullptr + ) const { + auto cb_handler_pair = channel.template into_cb_handler_pair(); + ::z_queryable_options_t opts; + z_queryable_options_default(&opts); + opts.complete = options.complete; + + QueryableBase q(nullptr); + ZError res = ::z_declare_queryable( + detail::as_owned_c_ptr(q), this->loan(), detail::loan(key_expr), ::z_move(cb_handler_pair.first), &opts + ); + __ZENOH_ERROR_CHECK(res, err, "Failed to declare Queryable"); + if (res != Z_OK) ::z_drop(::z_move(*detail::as_owned_c_ptr(cb_handler_pair.second))); + return Queryable>(std::move(q), std::move(cb_handler_pair.second)); + } + + /// @brief Options to be passed when declaring a ``Subscriber`` + struct SubscriberOptions { + /// @brief The subscription reliability. + Reliability reliability = Reliability::Z_RELIABILITY_BEST_EFFORT; + + /// @brief Returns default option settings + static SubscriberOptions create_default() { return {}; } + }; + + /// @brief Create a ``Subscriber`` object to receive data from matching ``Publisher`` objects or from + /// ``Session::put`` and ``Session::delete_resource`` requests + /// @tparam Channel the type of channel used to create stream of data. + /// @param key_expr The key expression to match the publishers + /// @param on_sample The callback that will be called for each received sample + /// @param on_drop The callback that will be called once subscriber is destroyed or undeclared + /// @param options Options to pass to subscriber declaration + /// @return a ``Subscriber`` object + template + Subscriber declare_subscriber( + const KeyExpr& key_expr, C&& on_sample, D&& on_drop, SubscriberOptions&& options = SubscriberOptions::create_default(), ZError *err = nullptr + ) const { + static_assert( + std::is_invocable_r::value, + "on_sample should be callable with the following signature: void on_sample(const zenoh::Sample& sample)" + ); + static_assert( + std::is_invocable_r::value, + "on_drop should be callable with the following signature: void on_drop()" + ); + ::z_owned_closure_sample_t c_closure; + using ClosureType = typename detail::closures::Closure; + auto closure = ClosureType::into_context(std::forward(on_sample), std::forward(on_drop)); + ::z_closure(&c_closure, detail::closures::_zenoh_on_sample_call, detail::closures::_zenoh_on_drop, closure); + ::z_subscriber_options_t opts; + z_subscriber_options_default(&opts); + opts.reliability = options.reliability; + Subscriber s(nullptr); + ZError res = ::z_declare_subscriber( + detail::as_owned_c_ptr(s), this->loan(), detail::loan(key_expr), ::z_move(c_closure), &opts + ); + __ZENOH_ERROR_CHECK(res, err, "Failed to declare Subscriber"); + return s; + } + + /// @brief Create a ``Subscriber`` object to receive data from matching ``Publisher`` objects or from + /// ``Session::put`` and ``Session::delete_resource`` requests + /// @tparam Channel the type of channel used to create stream of data. + /// @param key_expr The key expression to match the publishers + /// @param channel An instance of channel + /// @param options Options to pass to subscriber declaration + /// @return a ``Subscriber`` object + template + Subscriber> declare_subscriber( + const KeyExpr& key_expr, Channel channel, SubscriberOptions&& options = SubscriberOptions::create_default(), ZError *err = nullptr + ) const { + auto cb_handler_pair = channel.template into_cb_handler_pair(); + ::z_subscriber_options_t opts; + z_subscriber_options_default(&opts); + opts.reliability = options.reliability; + SubscriberBase s(nullptr); + ZError res = ::z_declare_subscriber( + detail::as_owned_c_ptr(s), this->loan(), detail::loan(key_expr), ::z_move(cb_handler_pair.first), &opts + ); + __ZENOH_ERROR_CHECK(res, err, "Failed to declare Subscriber"); + if (res != Z_OK) ::z_drop(::z_move(*detail::as_owned_c_ptr(cb_handler_pair.second))); + return Subscriber>(std::move(s), std::move(cb_handler_pair.second)); + } + + /// @brief Options to be passed when declaring a ``Publisher`` + struct PublisherOptions { + /// @brief The congestion control to apply when routing messages from this publisher. + CongestionControl congestion_control; + /// @brief The priority of messages from this publisher. + Priority priority; + /// @brief If true, Zenoh will not wait to batch this message with others to reduce the bandwith + bool is_express; + #ifdef ZENOHCXX_ZENOHC + /// @brief Allowed destination + Locality allowed_destination = ::zcu_locality_default(); + #endif + /// @brief Returns default option settings + static PublisherOptions create_default() { return {}; } + }; + + /// @brief Create a ``Publisher`` object to publish data to matching ``Subscriber`` and ``PullSubscriber`` objects + /// @param key_expr The key expression to match the subscribers + /// @param options Options passed to publisher declaration + /// @return a ``Publisher`` object + Publisher declare_publisher( + const KeyExpr& key_expr, PublisherOptions&& options = PublisherOptions::create_default(), ZError* err = nullptr + ) const { + ::z_publisher_options_t opts; + z_publisher_options_default(&opts); + opts.congestion_control = options.congestion_control; + opts.priority = options.priority; + opts.is_express = options.is_express; + #ifdef ZENOHCXX_ZENOHC + opts.allowed_destination = options.allowed_destination; + #endif + + Publisher p(nullptr); + ZError res = ::z_declare_publisher( + detail::as_owned_c_ptr(p), this->loan(), detail::loan(key_expr), &opts + ); + __ZENOH_ERROR_CHECK(res, err, "Failed to declare Publisher"); + return p; + } + + /// @brief Fetches the Zenoh IDs of all connected routers. + std::vector get_routers_z_id(ZError* err = nullptr) const { + std::vector out; + auto f = [&out](const Id& z_id) { + out.push_back(z_id); + }; + typedef decltype(f) F; + ::z_owned_closure_zid_t c_closure; + using ClosureType = typename detail::closures::Closure; + auto closure = ClosureType::into_context(std::forward(f), closures::none); + ::z_closure(&c_closure, detail::closures::_zenoh_on_id_call, detail::closures::_zenoh_on_drop, closure); + __ZENOH_ERROR_CHECK( + ::z_info_routers_zid(this->loan(), &c_closure), + err, + "Failed to fetch router Ids" + ); + return out; + } + + /// @brief Fetches the Zenoh IDs of all connected peers. + std::vector get_peers_z_id(ZError* err = nullptr) const { + std::vector out; + auto f = [&out](const Id& z_id) { + out.push_back(z_id); + }; + typedef decltype(f) F; + ::z_owned_closure_zid_t c_closure; + auto closure = detail::closures::Closure::into_context( + std::forward(f), closures::none + ); + ::z_closure(&c_closure, detail::closures::_zenoh_on_id_call, detail::closures::_zenoh_on_drop, closure); + __ZENOH_ERROR_CHECK( + ::z_info_peers_zid(this->loan(), &c_closure), + err, + "Failed to fetch peer Ids" + ); + return out; + } + +#ifdef ZENOHCXX_ZENOHPICO + + /// @brief Start a separate task to read from the network and process the messages as soon as they are received. + /// @return true if the operation was successful, false otherwise + /// @note zenoh-pico only + bool start_read_task(); + + /// @brief Start a separate task to read from the network and process the messages as soon as they are received. + /// @param error ``zenoh::ErrNo`` the error code + /// @return true if the operation was successful, false otherwise + /// @note zenoh-pico only + bool start_read_task(ZError* error = nullptr); + + /// @brief Stop the read task + /// @return true if the operation was successful, false otherwise + /// @note zenoh-pico only + bool stop_read_task(); + + /// @brief Stop the read task + /// @param error ``zenoh::ErrNo`` the error code + /// @return true if the operation was successful, false otherwise + /// @note zenoh-pico only + bool stop_read_task(ZError* error = nullptr); + + /// @brief Start a separate task to handle the session lease. This task will send KeepAlive messages when needed + /// and will close the session when the lease is expired. When operating over a multicast transport, it also + /// periodically sends the Join messages. + /// @return true if the operation was successful, false otherwise + /// @note zenoh-pico only + bool start_lease_task(); + + /// @brief Start a separate task to handle the session lease. This task will send KeepAlive messages when needed + /// and will close the session when the lease is expired. When operating over a multicast transport, it also + /// periodically sends the Join messages. + /// @param error ``zenoh::ErrNo`` the error code + /// @return true if the operation was successful, false otherwise + /// @note zenoh-pico only + bool start_lease_task(ZError* error = nullptr); + + /// @brief Stop the lease task + /// @return true if the operation was successful, false otherwise + /// @note zenoh-pico only + bool stop_lease_task(); + + /// @brief Stop the lease task + /// @param error ``zenoh::ErrNo`` the error code + /// @return true if the operation was successful, false otherwise + /// @note zenoh-pico only + bool stop_lease_task(ZError* error = nullptr); + + /// @brief Triggers a single execution of reading procedure from the network and processes of any received the + /// message + /// @return true if the operation was successful, false otherwise + /// @note zenoh-pico only + bool read(); + + /// @brief Triggers a single execution of reading procedure from the network and processes of any received the + /// message + /// @param error ``zenoh::ErrNo`` the error code + /// @return true if the operation was successful, false otherwise + /// @note zenoh-pico only + bool read(ZError* error = nullptr); + + /// @brief Triggers a single execution of keep alive procedure. It will send KeepAlive messages when needed and + /// will close the session when the lease is expired. + /// @return true if the leasing procedure was executed successfully, false otherwise. + bool send_keep_alive(); + + /// @brief Triggers a single execution of keep alive procedure. It will send KeepAlive messages when needed and + /// will close the session when the lease is expired. + /// @param error ``zenoh::ErrNo`` the error code + /// @return true if the leasing procedure was executed successfully, false otherwise. + bool send_keep_alive(ZError* error = nullptr); + + /// @brief Triggers a single execution of join procedure: send the Join message + /// @return true if the join procedure was executed successfully, false otherwise. + bool send_join(); + + /// @brief Triggers a single execution of join procedure: send the Join message + /// @param error ``zenoh::ErrNo`` the error code + /// @return true if the join procedure was executed successfully, false otherwise. + bool send_join(ZError* error = nullptr); +#endif + + /// @brief Create a ``Session`` with the given ``Config`` + /// @param config ``Config`` to use + /// @param start_background_tasks for zenoh-pico only. If true, start background threads which handles the network + /// traffic. If false, the threads should be called manually with ``Session::start_read_task`` and + /// ``Session::start_lease_task`` or methods ``Session::read``, ``Session::send_keep_alive`` and + /// ``Session::send_join`` should be called in loop. + /// @return a ``Session`` if the session was successfully created, an ``zenoh::ErrorMessage`` otherwise + static Session open(Config&& config, ZError* err = nullptr) { + return Session(std::move(config), err); + } +}; +} \ No newline at end of file diff --git a/include/zenoh/api/subscriber.hxx b/include/zenoh/api/subscriber.hxx new file mode 100644 index 00000000..396c809f --- /dev/null +++ b/include/zenoh/api/subscriber.hxx @@ -0,0 +1,59 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#pragma once + +#include "base.hxx" +#include "../detail/interop.hxx" +#include "keyexpr.hxx" + +namespace zenoh { + +class SubscriberBase : public Owned<::z_owned_subscriber_t> { +public: + using Owned::Owned; + +#ifdef ZENOHCXX_ZENOHC + /// @brief Get the key expression of the subscriber + decltype(auto) get_keyexpr() const { + return detail::as_owned_cpp_obj(::z_subscriber_keyexpr(this->loan())); + } +#endif +}; + +/// A Zenoh subscriber. Destroying subscriber cancels the subscription +/// Constructed by ``Session::declare_subscriber`` method +template +class Subscriber: public SubscriberBase { + Handler _handler; +public: + /// @name Constructors + + /// @brief Construct from subscriber and handler + Subscriber(SubscriberBase subscriber, Handler handler) + :SubscriberBase(std::move(subscriber)), _handler(std::move(handler)) { + } + + /// @name Methods + /// @brief Returns handler to subscriber data stream + const Handler& handler() const { return _handler; }; +}; + +template<> +class Subscriber :public SubscriberBase { +public: + using SubscriberBase::SubscriberBase; +}; + + +} \ No newline at end of file diff --git a/include/zenoh/api/timestamp.hxx b/include/zenoh/api/timestamp.hxx new file mode 100644 index 00000000..7b6ff39f --- /dev/null +++ b/include/zenoh/api/timestamp.hxx @@ -0,0 +1,46 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#pragma once +#include "base.hxx" +#include "../zenohc.hxx" +#include "../detail/interop.hxx" +#include "id.hxx" + +namespace zenoh { +/// Zenoh Timestamp. +class Timestamp :public Copyable<::z_timestamp_t> { +public: + using Copyable::Copyable; + // TODO: add utility methods to interpret time as mils, seconds, minutes, etc + + /// @name Constructors + /// @brief Create Timestamp from Id and npt64 time + Timestamp(const Id& id, uint64_t npt64_time) + :Copyable({}) { + z_timestamp_new(&this->inner(), detail::as_copyable_c_ptr(id), npt64_time); + } + + /// @name Methods + + /// @brief Get the NPT64 time part of timestamp + /// @return time in NPT64 format. + uint64_t get_time() const { return ::z_timestamp_npt64_time(&this->inner()); } + + /// @brief Get the unique id of the timestamp + /// @return unique id + Id get_id() const { return ::z_timestamp_id(&this->inner()); } +private: +}; + +} \ No newline at end of file diff --git a/include/zenoh/api/value.hxx b/include/zenoh/api/value.hxx new file mode 100644 index 00000000..aa9b0e55 --- /dev/null +++ b/include/zenoh/api/value.hxx @@ -0,0 +1,42 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +// +// This file contains structures and classes API without implementations +// + +#pragma once + +#include "base.hxx" +#include "../detail/interop.hxx" + +#include "bytes.hxx" +#include "encoding.hxx" + +namespace zenoh { + +/// A zenoh value. Contans refrence to data and it's encoding +class Value : public Owned<::z_owned_value_t> { +public: + using Owned::Owned; + /// @name Methods + + /// @brief The payload of this value + /// @return ``Bytes`` object + decltype(auto) get_payload() const { return detail::as_owned_cpp_obj(::z_value_payload(this->loan())); } + + /// @brief The encoding of this value + /// @return ``Encoding`` object + decltype(auto) get_encoding() const { return detail::as_owned_cpp_obj(::z_value_encoding(this->loan())); } +}; +} \ No newline at end of file diff --git a/include/zenoh/closures.hxx b/include/zenoh/detail/closures.hxx similarity index 86% rename from include/zenoh/closures.hxx rename to include/zenoh/detail/closures.hxx index 1cc317f6..6a58afee 100644 --- a/include/zenoh/closures.hxx +++ b/include/zenoh/detail/closures.hxx @@ -17,9 +17,6 @@ #pragma once -#include "base.hxx" -#include - namespace zenoh::detail::closures { struct IDroppable { @@ -69,18 +66,4 @@ public: } }; - - -/// Ensure that function pointers are defined with extern C linkage -extern "C" { - inline void _zenoh_on_drop(void* context) { - IDroppable::delete_from_context(context); - } -} - -} - -namespace zenoh::closures { - inline void none() {}; - using None = decltype(&none); } \ No newline at end of file diff --git a/include/zenoh/detail/closures_concrete.hxx b/include/zenoh/detail/closures_concrete.hxx new file mode 100644 index 00000000..c94fdbab --- /dev/null +++ b/include/zenoh/detail/closures_concrete.hxx @@ -0,0 +1,56 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +// +// This file contains structures and classes API without implementations +// + +#pragma once + +#include "../zenohc.hxx" +#include "../detail/interop.hxx" +#include "closures.hxx" +#include "../api/sample.hxx" +#include "../api/reply.hxx" +#include "../api/query.hxx" +#include "../api/id.hxx" +#include "../api/hello.hxx" + +/// Ensure that function pointers are defined with extern C linkage +namespace zenoh::detail::closures { +extern "C" { + inline void _zenoh_on_drop(void* context) { + IDroppable::delete_from_context(context); + } + + inline void _zenoh_on_reply_call(const ::z_loaned_reply_t* reply, void* context) { + IClosure::call_from_context(context, detail::as_owned_cpp_obj(reply)); + } + + inline void _zenoh_on_sample_call(const ::z_loaned_sample_t* sample, void* context) { + IClosure::call_from_context(context, detail::as_owned_cpp_obj(sample)); + } + + inline void _zenoh_on_query_call(const ::z_loaned_query_t* query, void* context) { + IClosure::call_from_context(context, detail::as_owned_cpp_obj(query)); + } + + inline void _zenoh_on_id_call(const ::z_id_t* z_id, void* context) { + IClosure::call_from_context(context, detail::as_copyable_cpp_obj(z_id)); + } + + inline void _zenoh_on_hello_call(const ::z_loaned_hello_t* hello, void* context) { + IClosure::call_from_context(context, detail::as_owned_cpp_obj(hello)); + } +} +} diff --git a/include/zenoh/internal.hxx b/include/zenoh/detail/interop.hxx similarity index 82% rename from include/zenoh/internal.hxx rename to include/zenoh/detail/interop.hxx index a3609869..2462848e 100644 --- a/include/zenoh/internal.hxx +++ b/include/zenoh/detail/interop.hxx @@ -12,11 +12,31 @@ // ZettaScale Zenoh Team, #pragma once -#include "base.hxx" -#include +#include "../api/base.hxx" #include + namespace zenoh::detail { +template +CopyableType* as_copyable_c_ptr(Copyable& c) { + return reinterpret_cast(&c); +} + +template +const CopyableType* as_copyable_c_ptr(const Copyable& c) { + return reinterpret_cast(&c); +} + +template +auto* as_copyable_c_ptr(std::optional& c) { + return c.has_value() ? as_copyable_c_ptr(c.value()) : nullptr; +} + +template +const auto* as_copyable_c_ptr(const std::optional& c) { + return c.has_value() ? as_copyable_c_ptr(c.value()) : nullptr; +} + template OwnedType* as_owned_c_ptr(Owned& o) { return reinterpret_cast(&o); diff --git a/include/zenoh/zenohc.hxx b/include/zenoh/zenohc.hxx index dba749b2..3eed390c 100644 --- a/include/zenoh/zenohc.hxx +++ b/include/zenoh/zenohc.hxx @@ -1,6 +1,6 @@ #pragma once -// #define ZENOHCXX_ZENOHC +//#define ZENOHCXX_ZENOHC #if defined(ZENOHCXX_ZENOHPICO) && defined(ZENOHCXX_ZENOHC) #error("Only one of ZENOHCXX_ZENOHPICO and ZENOHCXX_ZENOHC should be defined. \ diff --git a/tests/universal/pub_sub.cxx b/tests/universal/pub_sub.cxx index 0f461008..02ed0125 100644 --- a/tests/universal/pub_sub.cxx +++ b/tests/universal/pub_sub.cxx @@ -87,8 +87,39 @@ void put_sub() { assert(received_messages[1].second == "second"); } +void put_sub_channels() { + KeyExpr ke("zenoh/test"); + auto session1 = Session::open(Config()); + auto session2 = Session::open(Config()); + + std::this_thread::sleep_for(1s); + + auto subscriber = session2.declare_subscriber(ke, channels::FifoChannel(16)); + + std::this_thread::sleep_for(1s); + + session1.put(ke, Bytes::serialize("first")); + session1.put(ke, Bytes::serialize("second")); + + std::this_thread::sleep_for(1s); + + auto msg = subscriber.handler().recv().first; + assert(static_cast(msg)); + assert(msg.get_keyexpr() == "zenoh/test"); + assert(msg.get_payload().deserialize() == "first"); + msg = subscriber.handler().try_recv().first; + assert(static_cast(msg)); + assert(msg.get_keyexpr() == "zenoh/test"); + assert(msg.get_payload().deserialize() == "second"); + + msg = subscriber.handler().try_recv().first; + assert(!static_cast(msg)); +} + + int main(int argc, char** argv) { pub_sub(); put_sub(); + put_sub_channels(); } diff --git a/tests/universal/queryable_get.cxx b/tests/universal/queryable_get.cxx index 120e6513..51462404 100644 --- a/tests/universal/queryable_get.cxx +++ b/tests/universal/queryable_get.cxx @@ -106,37 +106,44 @@ void queryable_get_channel() { size_t queries_processed = 0; auto queryable = session1.declare_queryable( ke, - [&queries](const Query& q) { - auto val = q.get_value().get_payload().deserialize(); - queries.push_back(QueryData{.key = std::string(q.get_keyexpr().as_string_view()), .params = std::string(q.get_parameters()), .payload = val}); - if (q.get_parameters() == "ok") { - q.reply(q.get_keyexpr(), Bytes::serialize(std::to_string(val))); - } else { - q.reply_err(Bytes::serialize("err")); - } - }, - closures::none + channels::FifoChannel(3) ); std::this_thread::sleep_for(1s); - auto replies = session2.get_reply_fifo_channel(selector, "ok", 3, {.payload = Bytes::serialize(1) }); - assert(replies.is_active()); - auto reply = replies.get_next_reply(); + auto replies = session2.get(selector, "ok", channels::FifoChannel(3), {.payload = Bytes::serialize(1) }); + { + auto query = queryable.handler().recv().first; + assert(static_cast(query)); + assert(query.get_keyexpr() == selector); + assert(query.get_parameters() == "ok"); + assert(query.get_value().get_payload().deserialize() == 1); + query.reply(query.get_keyexpr(), Bytes::serialize(std::to_string(1))); + } + + auto reply = replies.recv().first; assert(static_cast(reply)); assert(reply.is_ok()); assert(reply.get_ok().get_payload().deserialize() == "1"); assert(reply.get_ok().get_keyexpr().as_string_view() == "zenoh/test/1"); - assert(!replies.get_next_reply()); - assert(!replies.is_active()); - replies = session2.get_reply_fifo_channel(selector, "err", 3, {.payload = Bytes::serialize(3) }); - assert(replies.is_active()); + reply = replies.recv().first; + assert(!replies.recv().first); + + replies = session2.get(selector, "err", channels::FifoChannel(3), {.payload = Bytes::serialize(3) }); + { + auto query = queryable.handler().recv().first; + assert(static_cast(query)); + assert(query.get_keyexpr() == selector); + assert(query.get_parameters() == "err"); + assert(query.get_value().get_payload().deserialize() == 3); + query.reply_err(Bytes::serialize("err")); + } + + reply = replies.recv().first; assert(static_cast(reply)); - reply = replies.get_next_reply(); assert(!reply.is_ok()); assert(reply.get_err().get_payload().deserialize() == "err"); - assert(!replies.get_next_reply()); - assert(!replies.is_active()); + assert(!replies.recv().first); }