Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add z_publisher_keyexpr and z_subscriber_keyexpr methods #588

Merged
merged 3 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -1737,8 +1737,6 @@ int8_t z_declare_publisher(z_owned_publisher_t *pub, const z_loaned_session_t *z
*/
int8_t z_undeclare_publisher(z_moved_publisher_t pub);

z_owned_keyexpr_t z_publisher_keyexpr(z_loaned_publisher_t *publisher);

/**
* Builds a :c:type:`z_publisher_put_options_t` with default values.
*
Expand Down Expand Up @@ -1780,6 +1778,17 @@ int8_t z_publisher_put(const z_loaned_publisher_t *pub, z_moved_bytes_t payload,
* ``0`` if delete operation successful, ``negative value`` otherwise.
*/
int8_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher_delete_options_t *options);

/**
* Gets the keyexpr from a publisher.
*
* Parameters:
* publisher: Pointer to a :c:type:`z_loaned_publisher_t` to get the keyexpr from.
*
* Return:
* The keyexpr wrapped as a :c:type:`z_loaned_keyexpr_t`.
*/
const z_loaned_keyexpr_t *z_publisher_keyexpr(const z_loaned_publisher_t *publisher);
#endif

#if Z_FEATURE_QUERY == 1
Expand Down Expand Up @@ -2093,16 +2102,15 @@ int8_t z_declare_subscriber(z_owned_subscriber_t *sub, const z_loaned_session_t
int8_t z_undeclare_subscriber(z_moved_subscriber_t sub);

/**
* Copies the keyexpr of a subscriber
* Gets the keyexpr from a subscriber.
*
* Parameters:
* keyexpr: Pointer to an uninitialized :c:type:`z_owned_keyexpr_t` to contain the keyexpr.
* sub: Pointer to a :c:type:`z_loaned_subscriber_t` to copy the keyexpr from.
* subscriber: Pointer to a :c:type:`z_loaned_subscriber_t` to get the keyexpr from.
*
* Return:
* ``0`` if copy successful, ``negative value`` otherwise.
* The keyexpr wrapped as a :c:type:`z_loaned_keyexpr_t`.
*/
int8_t z_subscriber_keyexpr(z_owned_keyexpr_t *keyexpr, z_loaned_subscriber_t *sub);
const z_loaned_keyexpr_t *z_subscriber_keyexpr(const z_loaned_subscriber_t *subscriber);
#endif

/************* Multi Thread Tasks helpers **************/
Expand Down
39 changes: 22 additions & 17 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1141,7 +1141,7 @@ int8_t z_declare_publisher(z_owned_publisher_t *pub, const z_loaned_session_t *z
_z_resource_t *r = _z_get_resource_by_key(_Z_RC_IN_VAL(zs), &keyexpr_aliased);
if (r == NULL) {
uint16_t id = _z_declare_resource(_Z_RC_IN_VAL(zs), keyexpr_aliased);
key = _z_rid_with_suffix(id, NULL);
key = _z_rid_with_suffix(id, keyexpr_aliased._suffix);
}
}
// Set options
Expand Down Expand Up @@ -1192,17 +1192,22 @@ int8_t z_publisher_put(const z_loaned_publisher_t *pub, z_moved_bytes_t payload,
} else {
encoding = _z_encoding_steal(&opt.encoding._ptr->_val);
}
// Remove potentially redundant ke suffix
_z_keyexpr_t pub_keyexpr = _z_keyexpr_alias(pub->_key);
if (pub_keyexpr._id != Z_RESOURCE_ID_NONE) {
pub_keyexpr._suffix = NULL;
}

// Check if write filter is active before writing
if (!_z_write_filter_active(pub)) {
// Write value
ret = _z_write(_Z_RC_IN_VAL(&pub->_zn), pub->_key, _z_bytes_from_owned_bytes(payload._ptr), &encoding,
ret = _z_write(_Z_RC_IN_VAL(&pub->_zn), pub_keyexpr, _z_bytes_from_owned_bytes(payload._ptr), &encoding,
Z_SAMPLE_KIND_PUT, pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp,
_z_bytes_from_owned_bytes(opt.attachment._ptr));
}
// Trigger local subscriptions
_z_trigger_local_subscriptions(
_Z_RC_IN_VAL(&pub->_zn), pub->_key, _z_bytes_from_owned_bytes(payload._ptr), &encoding,
_Z_RC_IN_VAL(&pub->_zn), pub_keyexpr, _z_bytes_from_owned_bytes(payload._ptr), &encoding,
_z_n_qos_make(pub->_is_express, pub->_congestion_control == Z_CONGESTION_CONTROL_BLOCK, pub->_priority),
opt.timestamp, _z_bytes_from_owned_bytes(opt.attachment._ptr));
// Clean-up
Expand All @@ -1219,14 +1224,18 @@ int8_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher_del
if (options != NULL) {
opt.timestamp = options->timestamp;
}
return _z_write(_Z_RC_IN_VAL(&pub->_zn), pub->_key, _z_bytes_null(), NULL, Z_SAMPLE_KIND_DELETE,
// Remove potentially redundant ke suffix
_z_keyexpr_t pub_keyexpr = _z_keyexpr_alias(pub->_key);
if (pub_keyexpr._id != Z_RESOURCE_ID_NONE) {
pub_keyexpr._suffix = NULL;
}

return _z_write(_Z_RC_IN_VAL(&pub->_zn), pub_keyexpr, _z_bytes_null(), NULL, Z_SAMPLE_KIND_DELETE,
pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp, _z_bytes_null());
}

z_owned_keyexpr_t z_publisher_keyexpr(z_loaned_publisher_t *publisher) {
z_owned_keyexpr_t ret;
ret._val = _z_keyexpr_duplicate(publisher->_key);
return ret;
const z_loaned_keyexpr_t *z_publisher_keyexpr(const z_loaned_publisher_t *publisher) {
return (const z_loaned_keyexpr_t *)&publisher->_key;
}
#endif

Expand Down Expand Up @@ -1608,22 +1617,18 @@ int8_t z_declare_subscriber(z_owned_subscriber_t *sub, const z_loaned_session_t

int8_t z_undeclare_subscriber(z_moved_subscriber_t sub) { return _z_undeclare_and_clear_subscriber(&sub._ptr->_val); }

int8_t z_subscriber_keyexpr(z_owned_keyexpr_t *keyexpr, z_loaned_subscriber_t *sub) {
// Init keyexpr
z_keyexpr_null(keyexpr);
if ((keyexpr == NULL) || (sub == NULL)) {
return _Z_ERR_GENERIC;
}
const z_loaned_keyexpr_t *z_subscriber_keyexpr(const z_loaned_subscriber_t *sub) {
// Retrieve keyexpr from session
uint32_t lookup = sub->_entity_id;
_z_subscription_rc_list_t *tail = _Z_RC_IN_VAL(&sub->_zn)->_local_subscriptions;
while (tail != NULL && !z_keyexpr_check(keyexpr)) {
while (tail != NULL) {
_z_subscription_rc_t *head = _z_subscription_rc_list_head(tail);
if (_Z_RC_IN_VAL(head)->_id == lookup) {
_Z_RETURN_IF_ERR(_z_keyexpr_copy(&keyexpr->_val, &_Z_RC_IN_VAL(head)->_key));
return (const z_loaned_keyexpr_t *)&_Z_RC_IN_VAL(head)->_key;
}
tail = _z_subscription_rc_list_tail(tail);
}
return _Z_RES_OK;
return NULL;
}
#endif

Expand Down
Loading