Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement querier #858

Merged
merged 9 commits into from
Jan 24, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Querier implementation
sashacmc committed Jan 23, 2025
commit 5788439c4341040829b2c5c937a5ac90aa4f6762
31 changes: 31 additions & 0 deletions include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
@@ -243,6 +243,37 @@ z_result_t _z_send_reply_err(const _z_query_t *query, const _z_session_rc_t *zsr
#endif

#if Z_FEATURE_QUERY == 1
/**
* Declare a :c:type:`_z_querier_t` for the given resource key.
*
* Parameters:
* zn: The zenoh-net session. The caller keeps its ownership.
* keyexpr: The resource key to query. The callee gets the ownership of any
* allocated value.
* consolidation_mode: The kind of consolidation that should be applied on replies.
* congestion_control: The congestion control to apply when routing the querier queries.
* target: The kind of queryables that should be target of this query.
* priority: The priority of the query.
* is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwidth.
* timeout_ms: The timeout value of this query.
* Returns:
* The created :c:type:`_z_querier_t` (in null state if the declaration failed)..
*/
_z_querier_t _z_declare_querier(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr,
z_consolidation_mode_t consolidation_mode, z_congestion_control_t congestion_control,
z_query_target_t target, z_priority_t priority, bool is_express, uint64_t timeout_ms);

/**
* Undeclare a :c:type:`_z_querier_t`.
*
* Parameters:
* querier: The :c:type:`_z_querier_t` to undeclare. The callee releases the
* querier upon successful return.
* Returns:
* 0 if success, or a negative value identifying the error.
*/
z_result_t _z_undeclare_querier(_z_querier_t *querier);

/**
* Query data from the matching queryables in the system.
*
7 changes: 5 additions & 2 deletions include/zenoh-pico/net/query.h
Original file line number Diff line number Diff line change
@@ -89,18 +89,21 @@ typedef struct _z_querier_t {
_z_zint_t _id;
_z_session_weak_t _zn;
_z_encoding_t _encoding;
z_consolidation_mode_t _consolidation_mode;
z_query_target_t _target;
z_congestion_control_t _congestion_control;
z_priority_t _priority;
z_reliability_t reliability;
bool _is_express;
uint64_t _timeout_ms;
} _z_querier_t;

#if Z_FEATURE_QUERY == 1
// Warning: None of the sub-types require a non-0 initialization. Add a init function if it changes.
static inline _z_querier_t _z_querier_null(void) { return (_z_querier_t){0}; }
static inline bool _z_querier_check(const _z_querier_t *querier) { return !_Z_RC_IS_NULL(&querier->_zn); }
void _z_querier_clear(_z_querier_t *pub);
void _z_querier_free(_z_querier_t **pub);
void _z_querier_clear(_z_querier_t *querier);
void _z_querier_free(_z_querier_t **querier);
#endif

#ifdef __cplusplus
121 changes: 96 additions & 25 deletions src/api/api.c
Original file line number Diff line number Diff line change
@@ -1206,60 +1206,131 @@ z_result_t z_get(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr
}

void _z_querier_drop(_z_querier_t *querier) {
(void)querier;
// TODO(sashacmc): Implement
// _z_undeclare_querier(pub);
// _z_querier_clear(pub);
_z_undeclare_querier(querier);
_z_querier_clear(querier);
}

_Z_OWNED_FUNCTIONS_VALUE_NO_COPY_IMPL(_z_querier_t, querier, _z_querier_check, _z_querier_null, _z_querier_drop)

#ifdef Z_FEATURE_UNSTABLE_API
void z_querier_get_options_default(z_querier_get_options_t *options) {
options->encoding = NULL;
// TODO(sashacmc): Implement
options->attachment = NULL;
options->payload = NULL;
}

void z_querier_options_default(z_querier_options_t *options) {
options->target = z_query_target_default();
options->consolidation = z_query_consolidation_default();
options->congestion_control = Z_CONGESTION_CONTROL_DEFAULT;
options->priority = Z_PRIORITY_DEFAULT;
options->is_express = false;
// TODO(sashacmc): Implement
options->timeout_ms = Z_GET_TIMEOUT_DEFAULT;
}

z_result_t z_declare_querier(const z_loaned_session_t *zs, z_owned_querier_t *querier,
const z_loaned_keyexpr_t *keyexpr, z_querier_options_t *options) {
(void)zs;
(void)querier;
(void)keyexpr;
(void)options;
// TODO(sashacmc): Implement
_z_keyexpr_t keyexpr_aliased = _z_keyexpr_alias_from_user_defined(*keyexpr, true);
_z_keyexpr_t key = keyexpr_aliased;

querier->_val = _z_querier_null();
// TODO: Currently, if resource declarations are done over multicast transports, the current protocol definition
// lacks a way to convey them to later-joining nodes. Thus, in the current version automatic
// resource declarations are only performed on unicast transports.
if (_Z_RC_IN_VAL(zs)->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) {
_z_resource_t *r = _z_get_resource_by_key(_Z_RC_IN_VAL(zs), &keyexpr_aliased);
if (r == NULL) {
uint16_t id = _z_declare_resource(_Z_RC_IN_VAL(zs), &keyexpr_aliased);
key = _z_keyexpr_from_string(id, &keyexpr_aliased._suffix);
}
}
// Set options
z_querier_options_t opt;
z_querier_options_default(&opt);
if (options != NULL) {
opt = *options;
}

// Set querier
_z_querier_t int_querier = _z_declare_querier(zs, key, opt.consolidation.mode, opt.congestion_control, opt.target,
opt.priority, opt.is_express, opt.timeout_ms);

querier->_val = int_querier;
return _Z_RES_OK;
}

z_result_t z_undeclare_querier(z_moved_querier_t *querier) {
(void)querier;
// TODO(sashacmc): Implement

return _Z_RES_OK;
z_result_t ret = _z_undeclare_querier(&querier->_this._val);
_z_querier_clear(&querier->_this._val);
return ret;
}

z_result_t z_querier_get(const z_loaned_querier_t *querier, const char *parameters, z_moved_closure_reply_t *callback,
z_querier_get_options_t *options) {
(void)querier;
(void)parameters;
(void)callback;
(void)options;
z_result_t ret = _Z_RES_OK;

// TODO(sashacmc): Implement
void *ctx = callback->_this._val.context;
callback->_this._val.context = NULL;

return _Z_RES_OK;
z_querier_get_options_t opt;
z_querier_get_options_default(&opt);
if (options != NULL) {
opt = *options;
}

_z_encoding_t encoding;
if (opt.encoding == NULL) {
_Z_RETURN_IF_ERR(_z_encoding_copy(&encoding, &querier->_encoding));
} else {
encoding = _z_encoding_steal(&opt.encoding->_this._val);
}
// Remove potentially redundant ke suffix
_z_keyexpr_t querier_keyexpr = _z_keyexpr_alias_from_user_defined(querier->_key, true);

_z_session_t *session = NULL;
// Try to upgrade session rc
_z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&querier->_zn);
if (!_Z_RC_IS_NULL(&sess_rc)) {
session = _Z_RC_IN_VAL(&sess_rc);
} else {
ret = _Z_ERR_SESSION_CLOSED;
}

z_consolidation_mode_t consolidation_mode = querier->_consolidation_mode;
if (consolidation_mode == Z_CONSOLIDATION_MODE_AUTO) {
const char *lp = (parameters == NULL) ? "" : parameters;
if (strstr(lp, Z_SELECTOR_TIME) != NULL) {
consolidation_mode = Z_CONSOLIDATION_MODE_NONE;
} else {
consolidation_mode = Z_CONSOLIDATION_MODE_LATEST;
}
}

if (session != NULL) {
_z_value_t value = {.payload = _z_bytes_from_owned_bytes(&opt.payload->_this),
.encoding = _z_encoding_from_owned(&opt.encoding->_this)};

ret = _z_query(session, querier_keyexpr, parameters, querier->_target, consolidation_mode, value,
callback->_this._val.call, callback->_this._val.drop, ctx, querier->_timeout_ms,
_z_bytes_from_owned_bytes(&opt.attachment->_this), querier->_congestion_control,
querier->_priority, querier->_is_express);
} else {
ret = _Z_ERR_SESSION_CLOSED;
}

_z_session_rc_drop(&sess_rc);

// Clean-up
z_bytes_drop(opt.payload);
z_encoding_drop(opt.encoding);
z_bytes_drop(opt.attachment);
z_internal_closure_reply_null(
&callback->_this); // call and drop passed to _z_query, so we nullify the closure here
return ret;
}

const z_loaned_keyexpr_t *z_querier_keyexpr(const z_loaned_querier_t *querier) {
(void)querier;
// TODO(sashacmc): Implement

return NULL;
return (const z_loaned_keyexpr_t *)&querier->_key;
}
#endif // Z_FEATURE_UNSTABLE_API

32 changes: 30 additions & 2 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
@@ -518,6 +518,33 @@ z_result_t _z_send_reply_err(const _z_query_t *query, const _z_session_rc_t *zsr
#endif

#if Z_FEATURE_QUERY == 1
/*------------------ Querier Declaration ------------------*/
_z_querier_t _z_declare_querier(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr,
z_consolidation_mode_t consolidation_mode, z_congestion_control_t congestion_control,
z_query_target_t target, z_priority_t priority, bool is_express, uint64_t timeout_ms) {
// Allocate querier
_z_querier_t ret;
// Fill querier
ret._key = _z_keyexpr_duplicate(&keyexpr);
ret._id = _z_get_entity_id(_Z_RC_IN_VAL(zn));
ret._consolidation_mode = consolidation_mode;
ret._congestion_control = congestion_control;
ret._target = target;
ret._priority = priority;
ret._is_express = is_express;
ret._timeout_ms = timeout_ms;
ret._zn = _z_session_rc_clone_as_weak(zn);
return ret;
}

z_result_t _z_undeclare_querier(_z_querier_t *querier) {
if (querier == NULL || _Z_RC_IS_NULL(&querier->_zn)) {
return _Z_ERR_ENTITY_UNKNOWN;
}
_z_undeclare_resource(_Z_RC_IN_VAL(&querier->_zn), querier->_key._id);
return _Z_RES_OK;
}

/*------------------ Query ------------------*/
z_result_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *parameters, const z_query_target_t target,
const z_consolidation_mode_t consolidation, _z_value_t value, _z_closure_reply_callback_t callback,
@@ -532,7 +559,7 @@ z_result_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *paramete
pq->_key = _z_get_expanded_key_from_key(zn, &keyexpr);
pq->_target = target;
pq->_consolidation = consolidation;
pq->_anykey = (strstr(parameters, Z_SELECTOR_QUERY_MATCH) == NULL) ? false : true;
pq->_anykey = (parameters == NULL || strstr(parameters, Z_SELECTOR_QUERY_MATCH) == NULL) ? false : true;
pq->_callback = callback;
pq->_dropper = dropper;
pq->_pending_replies = NULL;
@@ -542,7 +569,8 @@ z_result_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *paramete

ret = _z_register_pending_query(zn, pq); // Add the pending query to the current session
if (ret == _Z_RES_OK) {
_z_slice_t params = _z_slice_alias_buf((uint8_t *)parameters, strlen(parameters));
_z_slice_t params =
(parameters == NULL) ? _z_slice_null() : _z_slice_alias_buf((uint8_t *)parameters, strlen(parameters));
_z_zenoh_message_t z_msg = _z_msg_make_query(&keyexpr, &params, pq->_id, pq->_consolidation, &value,
timeout_ms, attachment, cong_ctrl, priority, is_express);

21 changes: 20 additions & 1 deletion src/net/query.c
Original file line number Diff line number Diff line change
@@ -14,10 +14,28 @@
#include "zenoh-pico/net/query.h"

#include "zenoh-pico/net/session.h"
#include "zenoh-pico/session/utils.h"
#include "zenoh-pico/transport/common/tx.h"
#include "zenoh-pico/utils/logging.h"

#if Z_FEATURE_QUERY == 1
void _z_querier_clear(_z_querier_t *querier) {
_z_keyexpr_clear(&querier->_key);
_z_session_weak_drop(&querier->_zn);
_z_encoding_clear(&querier->_encoding);
*querier = _z_querier_null();
}

void _z_querier_free(_z_querier_t **querier) {
_z_querier_t *ptr = *querier;

if (ptr != NULL) {
_z_querier_clear(ptr);

z_free(ptr);
*querier = NULL;
}
}

static void _z_query_clear_inner(_z_query_t *q) {
_z_keyexpr_clear(&q->_key);
_z_value_clear(&q->_value);
@@ -56,6 +74,7 @@ void _z_query_free(_z_query_t **query) {
*query = NULL;
}
}
#endif

#if Z_FEATURE_QUERYABLE == 1
void _z_queryable_clear(_z_queryable_t *qbl) {