Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement querier #858

Merged
merged 9 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,46 @@ See details at :ref:`owned_types_concept`
.. c:function:: void z_reply_clone(z_owned_reply_t * dst, const z_loaned_reply_t * reply)
.. c:function:: const z_loaned_reply_t * z_reply_loan(const z_owned_reply_t * reply)

Querier
=======

Represents a Zenoh Querier entity.

Types
-----

See details at :ref:`owned_types_concept`

.. c:type:: z_owned_querier_t
.. c:type:: z_loaned_querier_t
.. c:type:: z_moved_querier_t

Option Types
------------

.. autoctype:: types.h::z_querier_options_t
.. autoctype:: types.h::z_querier_get_options_t

Constants
---------

Functions
---------
.. autocfunction:: primitives.h::z_declare_querier
.. autocfunction:: primitives.h::z_undeclare_querier
.. autocfunction:: primitives.h::z_querier_get
.. autocfunction:: primitives.h::z_querier_keyexpr

.. autocfunction:: primitives.h::z_querier_options_default
.. autocfunction:: primitives.h::z_querier_get_options_default

Ownership Functions
-------------------

See details at :ref:`owned_types_concept`

.. c:function:: const z_loaned_querier_t * z_querier_loan(const z_owned_querier_t * closure)
.. c:function:: void z_querier_drop(z_moved_querier_t * closure)

Scouting
========
Expand Down
1 change: 1 addition & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ if(UNIX)
add_example(z_get_channel unix/c11/z_get_channel.c)
add_example(z_get_attachment unix/c11/z_get_attachment.c)
add_example(z_get_liveliness unix/c11/z_get_liveliness.c)
add_example(z_querier unix/c11/z_querier.c)
add_example(z_queryable unix/c11/z_queryable.c)
add_example(z_queryable_channel unix/c11/z_queryable_channel.c)
add_example(z_queryable_attachment unix/c11/z_queryable_attachment.c)
Expand Down
174 changes: 174 additions & 0 deletions examples/unix/c11/z_querier.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
//
// Copyright (c) 2025 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>

#include <limits.h>
#include <stddef.h>
#include <stdio.h>
#include <unistd.h>
#include <zenoh-pico.h>

#if Z_FEATURE_QUERY == 1 && Z_FEATURE_MULTI_THREAD == 1 && defined Z_FEATURE_UNSTABLE_API

int main(int argc, char **argv) {
const char *selector = "demo/example/**";
const char *mode = "client";
const char *clocator = NULL;
const char *llocator = NULL;
const char *value = NULL;
int n = INT_MAX;
int timeout_ms = 0;

int opt;
while ((opt = getopt(argc, argv, "s:e:m:v:l:n:t:")) != -1) {
switch (opt) {
case 's':
selector = optarg;
break;
case 'e':
clocator = optarg;
break;
case 'm':
mode = optarg;
break;
case 'l':
llocator = optarg;
break;
case 'v':
value = optarg;
break;
case 'n':
n = atoi(optarg);
break;
case 't':
timeout_ms = atoi(optarg);
break;
case '?':
if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l' ||
optopt == 'n' || optopt == 't') {
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
} else {
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
}
return 1;
default:
return -1;
}
}

z_owned_config_t config;
z_config_default(&config);
zp_config_insert(z_loan_mut(config), Z_CONFIG_MODE_KEY, mode);
if (clocator != NULL) {
zp_config_insert(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, clocator);
}
if (llocator != NULL) {
zp_config_insert(z_loan_mut(config), Z_CONFIG_LISTEN_KEY, llocator);
}

printf("Opening session...\n");
z_owned_session_t s;
if (z_open(&s, z_move(config), NULL) < 0) {
printf("Unable to open session!\n");
return -1;
}

// Start read and lease tasks for zenoh-pico
if (zp_start_read_task(z_loan_mut(s), NULL) < 0 || zp_start_lease_task(z_loan_mut(s), NULL) < 0) {
printf("Unable to start read and lease tasks\n");
z_session_drop(z_session_move(&s));
return -1;
}

const char *ke = selector;
size_t ke_len = strlen(ke);
const char *params = strchr(selector, '?');
if (params != NULL) {
ke_len = params - ke;
params += 1;
}

z_view_keyexpr_t keyexpr;
if (z_view_keyexpr_from_substr(&keyexpr, ke, ke_len) < 0) {
printf("%.*s is not a valid key expression", (int)ke_len, ke);
exit(-1);
}

printf("Declaring Querier on '%s'...\n", ke);
z_owned_querier_t querier;

z_querier_options_t opts;
z_querier_options_default(&opts);
opts.timeout_ms = timeout_ms;

if (z_declare_querier(z_loan(s), &querier, z_loan(keyexpr), &opts) < 0) {
printf("Unable to declare Querier for key expression!\n");
exit(-1);
}

printf("Press CTRL-C to quit...\n");
char buf[256];
for (int idx = 0; idx != n; ++idx) {
z_sleep_s(1);
sprintf(buf, "[%4d] %s", idx, value ? value : "");
printf("Querying '%s' with payload '%s'...\n", selector, buf);
z_querier_get_options_t get_options;
z_querier_get_options_default(&get_options);

if (value != NULL) {
z_owned_bytes_t payload;
z_bytes_copy_from_str(&payload, buf);
get_options.payload = z_move(payload);
}

z_owned_fifo_handler_reply_t handler;
z_owned_closure_reply_t closure;
z_fifo_channel_reply_new(&closure, &handler, 16);

z_querier_get(z_loan(querier), params, z_move(closure), &get_options);

z_owned_reply_t reply;
for (z_result_t res = z_recv(z_loan(handler), &reply); res == Z_OK; res = z_recv(z_loan(handler), &reply)) {
if (z_reply_is_ok(z_loan(reply))) {
const z_loaned_sample_t *sample = z_reply_ok(z_loan(reply));

z_view_string_t keystr;
z_keyexpr_as_view_string(z_sample_keyexpr(sample), &keystr);

z_owned_string_t replystr;
z_bytes_to_string(z_sample_payload(sample), &replystr);

printf(">> Received ('%.*s': '%.*s')\n", (int)z_string_len(z_loan(keystr)),
z_string_data(z_loan(keystr)), (int)z_string_len(z_loan(replystr)),
z_string_data(z_loan(replystr)));
z_drop(z_move(replystr));
} else {
printf(">> Received an error\n");
}
z_drop(z_move(reply));
}
z_drop(z_move(handler));
}

z_drop(z_move(querier));
z_drop(z_move(s));

return 0;
}
#else
int main(void) {
printf(
"ERROR: Zenoh pico was compiled without Z_FEATURE_QUERY or Z_FEATURE_MULTI_THREAD but this example requires "
"them.\n");
return -2;
}
#endif
22 changes: 22 additions & 0 deletions include/zenoh-pico/api/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
z_owned_session_t : z_session_loan, \
z_owned_subscriber_t : z_subscriber_loan, \
z_owned_publisher_t : z_publisher_loan, \
z_owned_querier_t : z_querier_loan, \
z_owned_matching_listener_t : z_matching_listener_loan, \
z_owned_queryable_t : z_queryable_loan, \
z_owned_liveliness_token_t : z_liveliness_token_loan, \
Expand Down Expand Up @@ -82,6 +83,7 @@
z_owned_config_t : z_config_loan_mut, \
z_owned_session_t : z_session_loan_mut, \
z_owned_publisher_t : z_publisher_loan_mut, \
z_owned_querier_t : z_querier_loan_mut, \
z_owned_matching_listener_t : z_matching_listener_loan_mut, \
z_owned_queryable_t : z_queryable_loan_mut, \
z_owned_liveliness_token_t : z_liveliness_token_loan_mut, \
Expand Down Expand Up @@ -116,6 +118,7 @@
z_moved_session_t* : z_session_drop, \
z_moved_subscriber_t* : z_subscriber_drop, \
z_moved_publisher_t* : z_publisher_drop, \
z_moved_querier_t* : z_querier_drop, \
z_moved_matching_listener_t* : z_matching_listener_drop, \
z_moved_queryable_t* : z_queryable_drop, \
z_moved_liveliness_token_t* : z_liveliness_token_drop, \
Expand Down Expand Up @@ -165,6 +168,7 @@
z_owned_session_t : z_internal_session_check, \
z_owned_subscriber_t : z_internal_subscriber_check, \
z_owned_publisher_t : z_internal_publisher_check, \
z_owned_querier_t : z_internal_querier_check, \
z_owned_matching_listener_t : z_internal_matching_listener_check, \
z_owned_queryable_t : z_internal_queryable_check, \
z_owned_liveliness_token_t : z_internal_liveliness_token_check, \
Expand Down Expand Up @@ -237,6 +241,7 @@
z_owned_session_t : z_session_move, \
z_owned_subscriber_t : z_subscriber_move, \
z_owned_publisher_t : z_publisher_move, \
z_owned_querier_t : z_querier_move, \
z_owned_matching_listener_t: z_matching_listener_move, \
z_owned_queryable_t : z_queryable_move, \
z_owned_liveliness_token_t : z_liveliness_token_move, \
Expand Down Expand Up @@ -298,6 +303,7 @@
z_owned_keyexpr_t *: z_keyexpr_take, \
z_owned_mutex_t *: z_mutex_take, \
z_owned_publisher_t *: z_publisher_take, \
z_owned_querier_t *: z_querier_take, \
z_owned_matching_listener_t *: z_matching_listener_take, \
z_owned_query_t *: z_query_take, \
z_owned_queryable_t *: z_queryable_take, \
Expand Down Expand Up @@ -351,6 +357,7 @@
#define z_internal_null(x) _Generic((x), \
z_owned_session_t * : z_internal_session_null, \
z_owned_publisher_t * : z_internal_publisher_null, \
z_owned_querier_t * : z_internal_querier_null, \
z_owned_matching_listener_t * : z_internal_matching_listener_null, \
z_owned_keyexpr_t * : z_internal_keyexpr_null, \
z_owned_config_t * : z_internal_config_null, \
Expand Down Expand Up @@ -410,6 +417,7 @@ inline const z_loaned_config_t* z_loan(const z_owned_config_t& x) { return z_con
inline const z_loaned_session_t* z_loan(const z_owned_session_t& x) { return z_session_loan(&x); }
inline const z_loaned_subscriber_t* z_loan(const z_owned_subscriber_t& x) { return z_subscriber_loan(&x); }
inline const z_loaned_publisher_t* z_loan(const z_owned_publisher_t& x) { return z_publisher_loan(&x); }
inline const z_loaned_querier_t* z_loan(const z_owned_querier_t& x) { return z_querier_loan(&x); }
inline const z_loaned_matching_listener_t* z_loan(const z_owned_matching_listener_t& x) { return z_matching_listener_loan(&x); }
inline const z_loaned_queryable_t* z_loan(const z_owned_queryable_t& x) { return z_queryable_loan(&x); }
inline const z_loaned_liveliness_token_t* z_loan(const z_owned_liveliness_token_t& x) { return z_liveliness_token_loan(&x); }
Expand Down Expand Up @@ -449,6 +457,7 @@ inline z_loaned_keyexpr_t* z_loan_mut(z_view_keyexpr_t& x) { return z_view_keyex
inline z_loaned_config_t* z_loan_mut(z_owned_config_t& x) { return z_config_loan_mut(&x); }
inline z_loaned_session_t* z_loan_mut(z_owned_session_t& x) { return z_session_loan_mut(&x); }
inline z_loaned_publisher_t* z_loan_mut(z_owned_publisher_t& x) { return z_publisher_loan_mut(&x); }
inline z_loaned_querier_t* z_loan_mut(z_owned_querier_t& x) { return z_querier_loan_mut(&x); }
inline z_loaned_matching_listener_t* z_loan_mut(z_owned_matching_listener_t& x) { return z_matching_listener_loan_mut(&x); }
inline z_loaned_queryable_t* z_loan_mut(z_owned_queryable_t& x) { return z_queryable_loan_mut(&x); }
inline z_loaned_liveliness_token_t* z_loan_mut(z_owned_liveliness_token_t& x) { return z_liveliness_token_loan_mut(&x); }
Expand All @@ -474,6 +483,7 @@ inline ze_loaned_serializer_t* z_loan_mut(ze_owned_serializer_t& x) { return ze_
// z_drop definition
inline void z_drop(z_moved_session_t* v) { z_session_drop(v); }
inline void z_drop(z_moved_publisher_t* v) { z_publisher_drop(v); }
inline void z_drop(z_moved_querier_t* v) { z_querier_drop(v); }
inline void z_drop(z_moved_matching_listener_t* v) { z_matching_listener_drop(v); }
inline void z_drop(z_moved_keyexpr_t* v) { z_keyexpr_drop(v); }
inline void z_drop(z_moved_config_t* v) { z_config_drop(v); }
Expand Down Expand Up @@ -510,6 +520,7 @@ inline void z_drop(ze_moved_serializer_t* v) { ze_serializer_drop(v); }
// z_internal_null definition
inline void z_internal_null(z_owned_session_t* v) { z_internal_session_null(v); }
inline void z_internal_null(z_owned_publisher_t* v) { z_internal_publisher_null(v); }
inline void z_internal_null(z_owned_querier_t* v) { z_internal_querier_null(v); }
inline void z_internal_null(z_owned_matching_listener_t* v) { z_internal_matching_listener_null(v); }
inline void z_internal_null(z_owned_keyexpr_t* v) { z_internal_keyexpr_null(v); }
inline void z_internal_null(z_owned_config_t* v) { z_internal_config_null(v); }
Expand Down Expand Up @@ -542,6 +553,7 @@ inline void z_internal_null(ze_owned_serializer_t* v) { return ze_internal_seria
// z_internal_check definition
inline bool z_internal_check(const z_owned_session_t& v) { return z_internal_session_check(&v); }
inline bool z_internal_check(const z_owned_publisher_t& v) { return z_internal_publisher_check(&v); }
inline bool z_internal_check(const z_owned_querier_t& v) { return z_internal_querier_check(&v); }
inline bool z_internal_check(const z_owned_matching_listener_t& v) { return z_internal_matching_listener_check(&v); }
inline bool z_internal_check(const z_owned_keyexpr_t& v) { return z_internal_keyexpr_check(&v); }
inline bool z_internal_check(const z_owned_config_t& v) { return z_internal_config_check(&v); }
Expand Down Expand Up @@ -690,6 +702,7 @@ inline z_moved_reply_err_t* z_move(z_owned_reply_err_t& x) { return z_reply_err_
inline z_moved_hello_t* z_move(z_owned_hello_t& x) { return z_hello_move(&x); }
inline z_moved_keyexpr_t* z_move(z_owned_keyexpr_t& x) { return z_keyexpr_move(&x); }
inline z_moved_publisher_t* z_move(z_owned_publisher_t& x) { return z_publisher_move(&x); }
inline z_moved_querier_t* z_move(z_owned_querier_t& x) { return z_querier_move(&x); }
inline z_moved_matching_listener_t* z_move(z_owned_matching_listener_t& x) { return z_matching_listener_move(&x); }
inline z_moved_query_t* z_move(z_owned_query_t& x) { return z_query_move(&x); }
inline z_moved_queryable_t* z_move(z_owned_queryable_t& x) { return z_queryable_move(&x); }
Expand Down Expand Up @@ -717,6 +730,7 @@ inline ze_moved_serializer_t* z_move(ze_owned_serializer_t& x) { return ze_seria
// z_take definition
inline void z_take(z_owned_session_t* this_, z_moved_session_t* v) { return z_session_take(this_, v); }
inline void z_take(z_owned_publisher_t* this_, z_moved_publisher_t* v) { return z_publisher_take(this_, v); }
inline void z_take(z_owned_querier_t* this_, z_moved_querier_t* v) { return z_querier_take(this_, v); }
inline void z_take(z_owned_matching_listener_t* this_, z_moved_matching_listener_t* v) {
return z_matching_listener_take(this_, v);
}
Expand Down Expand Up @@ -847,6 +861,14 @@ struct z_owned_to_loaned_type_t<z_owned_publisher_t> {
typedef z_loaned_publisher_t type;
};
template <>
struct z_loaned_to_owned_type_t<z_loaned_querier_t> {
typedef z_owned_querier_t type;
};
template <>
struct z_owned_to_loaned_type_t<z_owned_querier_t> {
typedef z_loaned_querier_t type;
};
template <>
struct z_loaned_to_owned_type_t<z_loaned_matching_listener_t> {
typedef z_owned_matching_listener_t type;
};
Expand Down
Loading
Loading