Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add matching subscribers #853

Merged
merged 3 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ jobs:
BUILD_TESTING: OFF
BUILD_MULTICAST: OFF
BUILD_INTEGRATION: ON
Z_FEATURE_UNSTABLE_API: 1

- name: Test debug
run: make test
Expand All @@ -45,4 +46,5 @@ jobs:
BUILD_TESTING: OFF # Workaround for Windows as it seems the previous step is being ignored
BUILD_MULTICAST: OFF # Workaround for Windows as it seems the previous step is being ignored
BUILD_INTEGRATION: ON # Workaround for Windows as it seems the previous step is being ignored
Z_FEATURE_UNSTABLE_API: 1
ZENOH_BRANCH: main
10 changes: 10 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ set(Z_FEATURE_TCP_NODELAY 1 CACHE STRING "Toggle TCP_NODELAY")
set(Z_FEATURE_LOCAL_SUBSCRIBER 0 CACHE STRING "Toggle local subscriptions")
set(Z_FEATURE_PUBLISHER_SESSION_CHECK 1 CACHE STRING "Toggle publisher session check")
set(Z_FEATURE_BATCHING 1 CACHE STRING "Toggle batching")
set(Z_FEATURE_MATCHING 1 CACHE STRING "Toggle matching feature")
set(Z_FEATURE_RX_CACHE 0 CACHE STRING "Toggle RX_CACHE")
set(Z_FEATURE_AUTO_RECONNECT 1 CACHE STRING "Toggle automatic reconnection")

Expand All @@ -252,6 +253,11 @@ if(Z_FEATURE_LINK_SERIAL_USB AND NOT Z_FEATURE_UNSTABLE_API)
set(Z_FEATURE_LINK_SERIAL_USB 0 CACHE STRING "Toggle Serial USB links" FORCE)
endif()

if(Z_FEATURE_MATCHING AND NOT Z_FEATURE_UNSTABLE_API)
message(WARNING "Z_FEATURE_MATCHING can only be enabled when Z_FEATURE_UNSTABLE_API is also enabled. Disabling Z_FEATURE_MATCHING.")
set(Z_FEATURE_MATCHING 0 CACHE STRING "Toggle matching feature" FORCE)
endif()

add_compile_definitions("Z_BUILD_DEBUG=$<CONFIG:Debug>")
message(STATUS "Building with feature confing:\n\
* UNSTABLE_API: ${Z_FEATURE_UNSTABLE_API}\n\
Expand All @@ -263,6 +269,7 @@ message(STATUS "Building with feature confing:\n\
* LIVELINESS: ${Z_FEATURE_LIVELINESS}\n\
* INTEREST: ${Z_FEATURE_INTEREST}\n\
* AUTO_RECONNECT: ${Z_FEATURE_AUTO_RECONNECT}\n\
* MATCHING: ${Z_FEATURE_MATCHING}\n\
* RAWETH: ${Z_FEATURE_RAWETH_TRANSPORT}")

configure_file(
Expand Down Expand Up @@ -572,11 +579,13 @@ if(UNIX OR MSVC)
add_executable(z_api_alignment_test ${PROJECT_SOURCE_DIR}/tests/z_api_alignment_test.c)
add_executable(z_session_test ${PROJECT_SOURCE_DIR}/tests/z_session_test.c)
add_executable(z_api_liveliness_test ${PROJECT_SOURCE_DIR}/tests/z_api_liveliness_test.c)
add_executable(z_api_matching_test ${PROJECT_SOURCE_DIR}/tests/z_api_matching_test.c)

target_link_libraries(z_client_test zenohpico::lib)
target_link_libraries(z_api_alignment_test zenohpico::lib)
target_link_libraries(z_session_test zenohpico::lib)
target_link_libraries(z_api_liveliness_test zenohpico::lib)
target_link_libraries(z_api_matching_test zenohpico::lib)

configure_file(${PROJECT_SOURCE_DIR}/tests/routed.sh ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/routed.sh COPYONLY)
configure_file(${PROJECT_SOURCE_DIR}/tests/api.sh ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh COPYONLY)
Expand All @@ -586,6 +595,7 @@ if(UNIX OR MSVC)
add_test(z_api_alignment_test bash ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh z_api_alignment_test)
add_test(z_session_test bash ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh z_session_test)
add_test(z_api_liveliness_test bash ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh z_api_liveliness_test)
add_test(z_api_matching_test bash ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh z_api_matching_test)
endif()
endif()
endif()
Expand Down
58 changes: 58 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,41 @@ See details at :ref:`owned_types_concept`
.. c:function:: void z_closure_zid_drop(z_moved_closure_zid_t * closure)


Matching closure
----------
Types
^^^^^

See details at :ref:`owned_types_concept`

.. c:type:: z_owned_closure_matching_status_t
.. c:type:: z_loaned_closure_matching_status_t
.. c:type:: z_moved_closure_matching_status_t

.. c:type:: void (* z_closure_matching_status_callback_t)(z_matching_status_t * status, void * arg);

Function pointer type for handling matching status response.
Represents a callback function that is invoked when a matching status was changed.

Parameters:
- **status** - Pointer to a :c:type:`z_matching_status_t`.
- **arg** - A user-defined pointer to additional data that can be used during the processing of the matching status.


Functions
^^^^^^^^^
.. autocfunction:: primitives.h::z_closure_matching_status
.. autocfunction:: primitives.h::z_closure_matching_status_call

Ownership Functions
^^^^^^^^^^^^^^^^^^^

See details at :ref:`owned_types_concept`

.. c:function:: const z_loaned_closure_matching_status_t * z_closure_matching_status_loan(const z_owned_closure_matching_status_t * closure)
.. c:function:: void z_closure_matching_status_drop(z_moved_closure_matching_status_t * closure)


.. _channels_concept:

Channels
Expand Down Expand Up @@ -955,6 +990,26 @@ See details at :ref:`owned_types_concept`
.. c:function:: void z_session_drop(z_moved_session_t * closure)


Matching
========

Types
-----
See details at :ref:`owned_types_concept`

.. c:type:: z_owned_matching_listener_t
.. c:type:: z_loaned_matching_listener_t
.. c:type:: z_moved_matching_listener_t


.. autoctype:: types.h::z_matching_status_t

Functions
---------

.. autocfunction:: primitives.h::z_undeclare_matching_listener


Publication
===========

Expand Down Expand Up @@ -1002,6 +1057,9 @@ Functions
.. autocfunction:: primitives.h::z_publisher_put_options_default
.. autocfunction:: primitives.h::z_publisher_delete_options_default
.. autocfunction:: primitives.h::z_reliability_default
.. autocfunction:: primitives.h::z_publisher_get_matching_status
.. autocfunction:: primitives.h::z_publisher_declare_matching_listener
.. autocfunction:: primitives.h::z_publisher_declare_background_matching_listener

Ownership Functions
-------------------
Expand Down
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"-DZ_FEATURE_QUERYABLE=1",
"-DZ_FEATURE_ENCODING_VALUES=1",
"-DZ_FEATURE_LIVELINESS=1",
"-DZ_FEATURE_MATCHING=1",
]

# -- Options for HTML output -------------------------------------------------
Expand Down
31 changes: 28 additions & 3 deletions examples/unix/c11/z_pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,19 @@
#include <unistd.h>
#include <zenoh-pico.h>

#include "zenoh-pico/system/platform.h"

#if Z_FEATURE_PUBLICATION == 1

#if Z_FEATURE_MATCHING == 1
void matching_status_handler(const z_matching_status_t *matching_status, void *arg) {
(void)arg;
if (matching_status->matching) {
printf("Publisher has matching subscribers.\n");
} else {
printf("Publisher has NO MORE matching subscribers.\n");
}
}
#endif

int main(int argc, char **argv) {
const char *keyexpr = "demo/example/zenoh-pico-pub";
char *const default_value = "Pub from Pico!";
Expand All @@ -31,9 +41,10 @@ int main(int argc, char **argv) {
char *clocator = NULL;
char *llocator = NULL;
int n = 2147483647; // max int value by default
bool add_matching_listener = false;

int opt;
while ((opt = getopt(argc, argv, "k:v:e:m:l:n:")) != -1) {
while ((opt = getopt(argc, argv, "k:v:e:m:l:n:a")) != -1) {
switch (opt) {
case 'k':
keyexpr = optarg;
Expand All @@ -53,6 +64,9 @@ int main(int argc, char **argv) {
case 'n':
n = atoi(optarg);
break;
case 'a':
add_matching_listener = true;
break;
case '?':
if (optopt == 'k' || optopt == 'v' || optopt == 'e' || optopt == 'm' || optopt == 'l' ||
optopt == 'n') {
Expand Down Expand Up @@ -104,6 +118,17 @@ int main(int argc, char **argv) {
return -1;
}

if (add_matching_listener) {
#if Z_FEATURE_MATCHING == 1
z_owned_closure_matching_status_t callback;
z_closure(&callback, matching_status_handler, NULL, NULL);
z_publisher_declare_background_matching_listener(z_loan(pub), z_move(callback));
#else
printf("ERROR: Zenoh pico was compiled without Z_FEATURE_MATCHING but this example requires it.\n");
return -2;
#endif
}

// Publish data
printf("Press CTRL-C to quit...\n");
char buf[256];
Expand Down
Loading
Loading