Skip to content

Commit

Permalink
Add z_publisher_keyexpr and z_subscriber_keyexpr methods (#588)
Browse files Browse the repository at this point in the history
* feat: rework pub sub keyexpr functions

* feat: store ke suffix in publisher

* fix: avoid suffix duplication
  • Loading branch information
jean-roland authored Aug 13, 2024
1 parent 0944a5d commit e322116
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 24 deletions.
22 changes: 15 additions & 7 deletions include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -1737,8 +1737,6 @@ int8_t z_declare_publisher(z_owned_publisher_t *pub, const z_loaned_session_t *z
*/
int8_t z_undeclare_publisher(z_moved_publisher_t pub);

z_owned_keyexpr_t z_publisher_keyexpr(z_loaned_publisher_t *publisher);

/**
* Builds a :c:type:`z_publisher_put_options_t` with default values.
*
Expand Down Expand Up @@ -1780,6 +1778,17 @@ int8_t z_publisher_put(const z_loaned_publisher_t *pub, z_moved_bytes_t payload,
* ``0`` if delete operation successful, ``negative value`` otherwise.
*/
int8_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher_delete_options_t *options);

/**
* Gets the keyexpr from a publisher.
*
* Parameters:
* publisher: Pointer to a :c:type:`z_loaned_publisher_t` to get the keyexpr from.
*
* Return:
* The keyexpr wrapped as a :c:type:`z_loaned_keyexpr_t`.
*/
const z_loaned_keyexpr_t *z_publisher_keyexpr(const z_loaned_publisher_t *publisher);
#endif

#if Z_FEATURE_QUERY == 1
Expand Down Expand Up @@ -2093,16 +2102,15 @@ int8_t z_declare_subscriber(z_owned_subscriber_t *sub, const z_loaned_session_t
int8_t z_undeclare_subscriber(z_moved_subscriber_t sub);

/**
* Copies the keyexpr of a subscriber
* Gets the keyexpr from a subscriber.
*
* Parameters:
* keyexpr: Pointer to an uninitialized :c:type:`z_owned_keyexpr_t` to contain the keyexpr.
* sub: Pointer to a :c:type:`z_loaned_subscriber_t` to copy the keyexpr from.
* subscriber: Pointer to a :c:type:`z_loaned_subscriber_t` to get the keyexpr from.
*
* Return:
* ``0`` if copy successful, ``negative value`` otherwise.
* The keyexpr wrapped as a :c:type:`z_loaned_keyexpr_t`.
*/
int8_t z_subscriber_keyexpr(z_owned_keyexpr_t *keyexpr, z_loaned_subscriber_t *sub);
const z_loaned_keyexpr_t *z_subscriber_keyexpr(const z_loaned_subscriber_t *subscriber);
#endif

/************* Multi Thread Tasks helpers **************/
Expand Down
39 changes: 22 additions & 17 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1141,7 +1141,7 @@ int8_t z_declare_publisher(z_owned_publisher_t *pub, const z_loaned_session_t *z
_z_resource_t *r = _z_get_resource_by_key(_Z_RC_IN_VAL(zs), &keyexpr_aliased);
if (r == NULL) {
uint16_t id = _z_declare_resource(_Z_RC_IN_VAL(zs), keyexpr_aliased);
key = _z_rid_with_suffix(id, NULL);
key = _z_rid_with_suffix(id, keyexpr_aliased._suffix);
}
}
// Set options
Expand Down Expand Up @@ -1192,17 +1192,22 @@ int8_t z_publisher_put(const z_loaned_publisher_t *pub, z_moved_bytes_t payload,
} else {
encoding = _z_encoding_steal(&opt.encoding._ptr->_val);
}
// Remove potentially redundant ke suffix
_z_keyexpr_t pub_keyexpr = _z_keyexpr_alias(pub->_key);
if (pub_keyexpr._id != Z_RESOURCE_ID_NONE) {
pub_keyexpr._suffix = NULL;
}

// Check if write filter is active before writing
if (!_z_write_filter_active(pub)) {
// Write value
ret = _z_write(_Z_RC_IN_VAL(&pub->_zn), pub->_key, _z_bytes_from_owned_bytes(payload._ptr), &encoding,
ret = _z_write(_Z_RC_IN_VAL(&pub->_zn), pub_keyexpr, _z_bytes_from_owned_bytes(payload._ptr), &encoding,
Z_SAMPLE_KIND_PUT, pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp,
_z_bytes_from_owned_bytes(opt.attachment._ptr));
}
// Trigger local subscriptions
_z_trigger_local_subscriptions(
_Z_RC_IN_VAL(&pub->_zn), pub->_key, _z_bytes_from_owned_bytes(payload._ptr), &encoding,
_Z_RC_IN_VAL(&pub->_zn), pub_keyexpr, _z_bytes_from_owned_bytes(payload._ptr), &encoding,
_z_n_qos_make(pub->_is_express, pub->_congestion_control == Z_CONGESTION_CONTROL_BLOCK, pub->_priority),
opt.timestamp, _z_bytes_from_owned_bytes(opt.attachment._ptr));
// Clean-up
Expand All @@ -1219,14 +1224,18 @@ int8_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher_del
if (options != NULL) {
opt.timestamp = options->timestamp;
}
return _z_write(_Z_RC_IN_VAL(&pub->_zn), pub->_key, _z_bytes_null(), NULL, Z_SAMPLE_KIND_DELETE,
// Remove potentially redundant ke suffix
_z_keyexpr_t pub_keyexpr = _z_keyexpr_alias(pub->_key);
if (pub_keyexpr._id != Z_RESOURCE_ID_NONE) {
pub_keyexpr._suffix = NULL;
}

return _z_write(_Z_RC_IN_VAL(&pub->_zn), pub_keyexpr, _z_bytes_null(), NULL, Z_SAMPLE_KIND_DELETE,
pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp, _z_bytes_null());
}

z_owned_keyexpr_t z_publisher_keyexpr(z_loaned_publisher_t *publisher) {
z_owned_keyexpr_t ret;
ret._val = _z_keyexpr_duplicate(publisher->_key);
return ret;
const z_loaned_keyexpr_t *z_publisher_keyexpr(const z_loaned_publisher_t *publisher) {
return (const z_loaned_keyexpr_t *)&publisher->_key;
}
#endif

Expand Down Expand Up @@ -1608,22 +1617,18 @@ int8_t z_declare_subscriber(z_owned_subscriber_t *sub, const z_loaned_session_t

int8_t z_undeclare_subscriber(z_moved_subscriber_t sub) { return _z_undeclare_and_clear_subscriber(&sub._ptr->_val); }

int8_t z_subscriber_keyexpr(z_owned_keyexpr_t *keyexpr, z_loaned_subscriber_t *sub) {
// Init keyexpr
z_keyexpr_null(keyexpr);
if ((keyexpr == NULL) || (sub == NULL)) {
return _Z_ERR_GENERIC;
}
const z_loaned_keyexpr_t *z_subscriber_keyexpr(const z_loaned_subscriber_t *sub) {
// Retrieve keyexpr from session
uint32_t lookup = sub->_entity_id;
_z_subscription_rc_list_t *tail = _Z_RC_IN_VAL(&sub->_zn)->_local_subscriptions;
while (tail != NULL && !z_keyexpr_check(keyexpr)) {
while (tail != NULL) {
_z_subscription_rc_t *head = _z_subscription_rc_list_head(tail);
if (_Z_RC_IN_VAL(head)->_id == lookup) {
_Z_RETURN_IF_ERR(_z_keyexpr_copy(&keyexpr->_val, &_Z_RC_IN_VAL(head)->_key));
return (const z_loaned_keyexpr_t *)&_Z_RC_IN_VAL(head)->_key;
}
tail = _z_subscription_rc_list_tail(tail);
}
return _Z_RES_OK;
return NULL;
}
#endif

Expand Down

0 comments on commit e322116

Please sign in to comment.