From 3a465c069aa0505c3f8e04022646a8c078cbb2be Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Wed, 16 Oct 2024 09:47:15 +0200 Subject: [PATCH 01/12] feat: add refcount to z_buf --- include/zenoh-pico/collections/refcount.h | 3 +++ include/zenoh-pico/protocol/iobuf.h | 3 +++ src/protocol/iobuf.c | 8 +++++++- src/transport/multicast/read.c | 11 ++++++++--- tests/z_refcount_test.c | 2 ++ 5 files changed, 23 insertions(+), 4 deletions(-) diff --git a/include/zenoh-pico/collections/refcount.h b/include/zenoh-pico/collections/refcount.h index 644d167f9..a6e877368 100644 --- a/include/zenoh-pico/collections/refcount.h +++ b/include/zenoh-pico/collections/refcount.h @@ -244,6 +244,9 @@ size_t _z_simple_rc_strong_count(void *cnt); *p = name##_simple_rc_null(); \ return res; \ } \ + static inline bool name##_simple_rc_is_last_ref(const name##_simple_rc_t *p) { \ + return (_z_simple_rc_strong_count(p->_cnt) == 1); \ + } \ static inline size_t name##_simple_rc_size(name##_simple_rc_t *p) { \ _ZP_UNUSED(p); \ return sizeof(name##_simple_rc_t); \ diff --git a/include/zenoh-pico/protocol/iobuf.h b/include/zenoh-pico/protocol/iobuf.h index 888b69d46..105e67373 100644 --- a/include/zenoh-pico/protocol/iobuf.h +++ b/include/zenoh-pico/protocol/iobuf.h @@ -19,6 +19,7 @@ #include #include +#include "zenoh-pico/collections/arc_slice.h" #include "zenoh-pico/collections/element.h" #include "zenoh-pico/collections/slice.h" #include "zenoh-pico/collections/vec.h" @@ -62,8 +63,10 @@ _Z_VEC_DEFINE(_z_iosli, _z_iosli_t) /*------------------ ZBuf ------------------*/ typedef struct { _z_iosli_t _ios; + _z_slice_simple_rc_t _slice; } _z_zbuf_t; +static inline bool _z_zbuf_is_last_ref(const _z_zbuf_t *zbf) { return _z_slice_simple_rc_is_last_ref(&zbf->_slice); } _z_zbuf_t _z_zbuf_make(size_t capacity); _z_zbuf_t _z_zbuf_view(_z_zbuf_t *zbf, size_t length); /// Constructs a _borrowing_ reader on `slice` diff --git a/src/protocol/iobuf.c b/src/protocol/iobuf.c index c398bc766..7cb30b2a5 100644 --- a/src/protocol/iobuf.c +++ b/src/protocol/iobuf.c @@ -162,6 +162,8 @@ _z_iosli_t *_z_iosli_clone(const _z_iosli_t *src) { _z_zbuf_t _z_zbuf_make(size_t capacity) { _z_zbuf_t zbf; zbf._ios = _z_iosli_make(capacity); + _z_slice_t s = _z_slice_alias_buf(zbf._ios._buf, zbf._ios._capacity); + zbf._slice = _z_slice_simple_rc_new_from_val(&s); return zbf; } @@ -218,7 +220,11 @@ uint8_t *_z_zbuf_get_wptr(const _z_zbuf_t *zbf) { return zbf->_ios._buf + zbf->_ void _z_zbuf_reset(_z_zbuf_t *zbf) { _z_iosli_reset(&zbf->_ios); } -void _z_zbuf_clear(_z_zbuf_t *zbf) { _z_iosli_clear(&zbf->_ios); } +void _z_zbuf_clear(_z_zbuf_t *zbf) { + if (_z_slice_simple_rc_drop(&zbf->_slice)) { + _z_iosli_clear(&zbf->_ios); + } +} void _z_zbuf_compact(_z_zbuf_t *zbf) { if ((zbf->_ios._r_pos != 0) || (zbf->_ios._w_pos != 0)) { diff --git a/src/transport/multicast/read.c b/src/transport/multicast/read.c index 66e177597..d8d20da02 100644 --- a/src/transport/multicast/read.c +++ b/src/transport/multicast/read.c @@ -57,11 +57,11 @@ void *_zp_multicast_read_task(void *ztm_arg) { // Prepare the buffer _z_zbuf_reset(&ztm->_zbuf); - _z_slice_t addr = _z_slice_alias_buf(NULL, 0); + _z_slice_t addr = _z_slice_empty(); while (ztm->_read_task_running == true) { - // Read bytes from socket to the main buffer size_t to_read = 0; + // Read bytes from socket to the main buffer switch (ztm->_link._cap._flow) { case Z_LINK_CAP_FLOW_STREAM: if (_z_zbuf_len(&ztm->_zbuf) < _Z_MSG_LEN_ENC_SIZE) { @@ -119,9 +119,14 @@ void *_zp_multicast_read_task(void *ztm_arg) { continue; } } - // Move the read position of the read buffer _z_zbuf_set_rpos(&ztm->_zbuf, _z_zbuf_get_rpos(&ztm->_zbuf) + to_read); + // Check if user or defragment buffer took ownership of buffer + if (!_z_zbuf_is_last_ref(&ztm->_zbuf)) { + // Drop this buffer and allocate a new one + _z_zbuf_clear(&ztm->_zbuf); + ztm->_zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE); + } } _z_mutex_unlock(&ztm->_mutex_rx); return NULL; diff --git a/tests/z_refcount_test.c b/tests/z_refcount_test.c index 31a8faf71..74dcdecef 100644 --- a/tests/z_refcount_test.c +++ b/tests/z_refcount_test.c @@ -326,9 +326,11 @@ void test_simple_rc_clone_as_ptr(void) { _dummy_simple_rc_t *drc2 = _dummy_simple_rc_clone_as_ptr(&drc1); assert(drc2->_val != NULL); assert(!_Z_RC_IS_NULL(drc2)); + assert(!_dummy_simple_rc_is_last_ref(drc2)); assert(_z_simple_rc_strong_count(drc2->_cnt) == 2); assert(_dummy_simple_rc_eq(&drc1, drc2)); assert(!_dummy_simple_rc_drop(&drc1)); + assert(_dummy_simple_rc_is_last_ref(drc2)); assert(_dummy_simple_rc_drop(drc2)); z_free(drc2); } From 9ccd758586997b9a00d49f58f4b3884d6bd8999d Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Wed, 16 Oct 2024 15:36:29 +0200 Subject: [PATCH 02/12] feat: wrap payload instead of copy --- include/zenoh-pico/collections/arc_slice.h | 1 + src/collections/arc_slice.c | 12 ++++++++++++ src/protocol/codec.c | 13 ++++++++----- src/protocol/iobuf.c | 6 ++++++ 4 files changed, 27 insertions(+), 5 deletions(-) diff --git a/include/zenoh-pico/collections/arc_slice.h b/include/zenoh-pico/collections/arc_slice.h index f74d9f969..ebc21bb56 100644 --- a/include/zenoh-pico/collections/arc_slice.h +++ b/include/zenoh-pico/collections/arc_slice.h @@ -46,6 +46,7 @@ static inline _z_arc_slice_t _z_arc_slice_empty(void) { return (_z_arc_slice_t){ static inline size_t _z_arc_slice_len(const _z_arc_slice_t* s) { return s->len; } static inline bool _z_arc_slice_is_empty(const _z_arc_slice_t* s) { return _z_arc_slice_len(s) == 0; } _z_arc_slice_t _z_arc_slice_wrap(_z_slice_t s, size_t offset, size_t len); +_z_arc_slice_t _z_arc_slice_wrap_slice_rc(_z_slice_simple_rc_t* slice_rc, size_t offset, size_t len); _z_arc_slice_t _z_arc_slice_get_subslice(const _z_arc_slice_t* s, size_t offset, size_t len); const uint8_t* _z_arc_slice_data(const _z_arc_slice_t* s); z_result_t _z_arc_slice_copy(_z_arc_slice_t* dst, const _z_arc_slice_t* src); diff --git a/src/collections/arc_slice.c b/src/collections/arc_slice.c index 9c5f6c824..17d17a8eb 100644 --- a/src/collections/arc_slice.c +++ b/src/collections/arc_slice.c @@ -32,6 +32,18 @@ _z_arc_slice_t _z_arc_slice_wrap(_z_slice_t s, size_t offset, size_t len) { return arc_s; } +_z_arc_slice_t _z_arc_slice_wrap_slice_rc(_z_slice_simple_rc_t* slice_rc, size_t offset, size_t len) { + assert(offset + len <= _Z_RC_IN_VAL(slice_rc)->len); + _z_arc_slice_t arc_s; + arc_s.slice = _z_slice_simple_rc_clone(slice_rc); + if (_Z_RC_IS_NULL(&arc_s.slice)) { + return _z_arc_slice_empty(); + } + arc_s.len = len; + arc_s.start = offset; + return arc_s; +} + _z_arc_slice_t _z_arc_slice_get_subslice(const _z_arc_slice_t* s, size_t offset, size_t len) { assert(offset + len <= s->len); assert(!_Z_RC_IS_NULL(&s->slice) || (len == 0 && offset == 0)); diff --git a/src/protocol/codec.c b/src/protocol/codec.c index 0d6656327..8574d2d89 100644 --- a/src/protocol/codec.c +++ b/src/protocol/codec.c @@ -19,6 +19,7 @@ #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/utils/endianness.h" #include "zenoh-pico/utils/logging.h" +#include "zenoh-pico/utils/pointers.h" #include "zenoh-pico/utils/result.h" /*------------------ uint8 -------------------*/ @@ -281,13 +282,15 @@ z_result_t _z_slice_val_decode(_z_slice_t *bs, _z_zbuf_t *zbf) { return _z_slice z_result_t _z_slice_decode(_z_slice_t *bs, _z_zbuf_t *zbf) { return _z_slice_decode_na(bs, zbf); } z_result_t _z_bytes_decode(_z_bytes_t *bs, _z_zbuf_t *zbf) { + *bs = _z_bytes_null(); + // Decode slice _z_slice_t s; _Z_RETURN_IF_ERR(_z_slice_decode(&s, zbf)); - if (_z_slice_is_alloced(&s)) { - return _z_bytes_from_slice(bs, s); - } else { - return _z_bytes_from_buf(bs, s.start, s.len); - } + // Calc offset + size_t offset = _z_ptr_u8_diff(s.start, _Z_RC_IN_VAL(&zbf->_slice)->start); + // Get ownership of subslice + _z_arc_slice_t arcs = _z_arc_slice_wrap_slice_rc(&zbf->_slice, offset, s.len); + return _z_bytes_append_slice(bs, &arcs); } z_result_t _z_bytes_encode_val(_z_wbuf_t *wbf, const _z_bytes_t *bs) { diff --git a/src/protocol/iobuf.c b/src/protocol/iobuf.c index 7cb30b2a5..b58bb8819 100644 --- a/src/protocol/iobuf.c +++ b/src/protocol/iobuf.c @@ -20,6 +20,7 @@ #include #include "zenoh-pico/config.h" +#include "zenoh-pico/utils/logging.h" #include "zenoh-pico/utils/pointers.h" #include "zenoh-pico/utils/result.h" @@ -164,6 +165,10 @@ _z_zbuf_t _z_zbuf_make(size_t capacity) { zbf._ios = _z_iosli_make(capacity); _z_slice_t s = _z_slice_alias_buf(zbf._ios._buf, zbf._ios._capacity); zbf._slice = _z_slice_simple_rc_new_from_val(&s); + if (_Z_RC_IS_NULL(&zbf._slice)) { + _Z_ERROR("slice rc creation failed"); + _z_zbuf_clear(&zbf); + } return zbf; } @@ -171,6 +176,7 @@ _z_zbuf_t _z_zbuf_view(_z_zbuf_t *zbf, size_t length) { assert(_z_iosli_readable(&zbf->_ios) >= length); _z_zbuf_t v; v._ios = _z_iosli_wrap(_z_zbuf_get_rptr(zbf), length, 0, length); + v._slice = zbf->_slice; return v; } _z_zbuf_t _z_slice_as_zbuf(_z_slice_t slice) { From 26bad39121798c5e4156f70b6a5c3de2aa3958da Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Thu, 17 Oct 2024 10:14:16 +0200 Subject: [PATCH 03/12] fix: segfaults on tests and examples --- examples/unix/c11/z_get_attachment.c | 2 +- src/transport/multicast/read.c | 2 +- tests/z_client_test.c | 4 ++-- tests/z_peer_multicast_test.c | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/examples/unix/c11/z_get_attachment.c b/examples/unix/c11/z_get_attachment.c index 842273fc2..65404138b 100644 --- a/examples/unix/c11/z_get_attachment.c +++ b/examples/unix/c11/z_get_attachment.c @@ -178,7 +178,7 @@ int main(int argc, char **argv) { ze_owned_serializer_t serializer; ze_serializer_empty(&serializer); - ze_serializer_serialize_sequence_length(z_loan_mut(serializer), 2); + ze_serializer_serialize_sequence_length(z_loan_mut(serializer), 1); for (size_t i = 0; i < 1; ++i) { ze_serializer_serialize_string(z_loan_mut(serializer), z_loan(kvs[i].key)); ze_serializer_serialize_string(z_loan_mut(serializer), z_loan(kvs[i].value)); diff --git a/src/transport/multicast/read.c b/src/transport/multicast/read.c index d8d20da02..76907a5e1 100644 --- a/src/transport/multicast/read.c +++ b/src/transport/multicast/read.c @@ -94,7 +94,7 @@ void *_zp_multicast_read_task(void *ztm_arg) { default: break; } - // Wrap the main buffer for to_read bytes + // Wrap the main buffer to_read bytes _z_zbuf_t zbuf = _z_zbuf_view(&ztm->_zbuf, to_read); while (_z_zbuf_len(&zbuf) > 0) { diff --git a/tests/z_client_test.c b/tests/z_client_test.c index d073bc554..229722a48 100644 --- a/tests/z_client_test.c +++ b/tests/z_client_test.c @@ -138,7 +138,7 @@ int main(int argc, char **argv) { z_owned_session_t s1; assert(z_open(&s1, z_move(config), NULL) == Z_OK); _z_string_t zid1 = format_id(&(_Z_RC_IN_VAL(z_loan(s1))->_local_zid)); - printf("Session 1 with PID: %s\n", _z_string_data(&zid1)); + printf("Session 1 with PID: %.*s\n", (int)z_string_len(&zid1), _z_string_data(&zid1)); _z_string_clear(&zid1); // Start the read session session lease loops @@ -154,7 +154,7 @@ int main(int argc, char **argv) { assert(z_open(&s2, z_move(config), NULL) == Z_OK); assert(z_internal_check(s2)); _z_string_t zid2 = format_id(&(_Z_RC_IN_VAL(z_loan(s2))->_local_zid)); - printf("Session 2 with PID: %s\n", _z_string_data(&zid2)); + printf("Session 2 with PID: %.*s\n", (int)z_string_len(&zid2), _z_string_data(&zid2)); _z_string_clear(&zid2); // Start the read session session lease loops diff --git a/tests/z_peer_multicast_test.c b/tests/z_peer_multicast_test.c index 383ba799d..9cbd017b5 100644 --- a/tests/z_peer_multicast_test.c +++ b/tests/z_peer_multicast_test.c @@ -79,7 +79,7 @@ int main(int argc, char **argv) { _z_slice_t id_as_bytes = _z_slice_alias_buf(_Z_RC_IN_VAL(z_loan(s1))->_local_zid.id, _z_id_len(_Z_RC_IN_VAL(z_loan(s1))->_local_zid)); _z_string_t zid1 = _z_string_convert_bytes(&id_as_bytes); - printf("Session 1 with PID: %s\n", z_string_data(&zid1)); + printf("Session 1 with PID: %.*s\n", (int)z_string_len(&zid1), z_string_data(&zid1)); _z_string_clear(&zid1); // Start the read session session lease loops @@ -98,7 +98,7 @@ int main(int argc, char **argv) { id_as_bytes = _z_slice_alias_buf(_Z_RC_IN_VAL(z_loan(s2))->_local_zid.id, _z_id_len(_Z_RC_IN_VAL(z_loan(s2))->_local_zid)); _z_string_t zid2 = _z_string_convert_bytes(&id_as_bytes); - printf("Session 2 with PID: %s\n", z_string_data(&zid2)); + printf("Session 2 with PID: %.*s\n", (int)z_string_len(&zid2), z_string_data(&zid2)); _z_string_clear(&zid2); // Start the read session session lease loops From 07c69bc5cc611d64f0d4577a5bea10249b84db8b Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Thu, 17 Oct 2024 16:59:01 +0200 Subject: [PATCH 04/12] feat: pass arg by reference in reply_create --- include/zenoh-pico/net/reply.h | 5 +++-- src/net/reply.c | 16 +++++++++------- src/session/query.c | 4 ++-- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/include/zenoh-pico/net/reply.h b/include/zenoh-pico/net/reply.h index c68fcf678..3e1a1994b 100644 --- a/include/zenoh-pico/net/reply.h +++ b/include/zenoh-pico/net/reply.h @@ -89,8 +89,9 @@ _z_reply_t _z_reply_move(_z_reply_t *src_reply); void _z_reply_clear(_z_reply_t *src); void _z_reply_free(_z_reply_t **hello); z_result_t _z_reply_copy(_z_reply_t *dst, const _z_reply_t *src); -_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, _z_id_t id, const _z_bytes_t payload, const _z_timestamp_t *timestamp, - _z_encoding_t *encoding, z_sample_kind_t kind, const _z_bytes_t attachment); +_z_reply_t _z_reply_create(_z_keyexpr_t *keyexpr, _z_id_t id, const _z_bytes_t *payload, + const _z_timestamp_t *timestamp, _z_encoding_t *encoding, z_sample_kind_t kind, + const _z_bytes_t *attachment); _z_reply_t _z_reply_err_create(const _z_bytes_t payload, _z_encoding_t *encoding); typedef struct _z_pending_reply_t { diff --git a/src/net/reply.c b/src/net/reply.c index a331ae6cf..67987cb35 100644 --- a/src/net/reply.c +++ b/src/net/reply.c @@ -87,18 +87,19 @@ void _z_pending_reply_clear(_z_pending_reply_t *pr) { _z_timestamp_clear(&pr->_tstamp); } -_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, _z_id_t id, const _z_bytes_t payload, const _z_timestamp_t *timestamp, - _z_encoding_t *encoding, z_sample_kind_t kind, const _z_bytes_t attachment) { +_z_reply_t _z_reply_create(_z_keyexpr_t *keyexpr, _z_id_t id, const _z_bytes_t *payload, + const _z_timestamp_t *timestamp, _z_encoding_t *encoding, z_sample_kind_t kind, + const _z_bytes_t *attachment) { _z_reply_t reply = _z_reply_null(); reply.data._tag = _Z_REPLY_TAG_DATA; reply.data.replier_id = id; // Create reply sample - reply.data._result.sample.keyexpr = _z_keyexpr_steal(&keyexpr); + reply.data._result.sample.keyexpr = _z_keyexpr_steal(keyexpr); reply.data._result.sample.kind = kind; reply.data._result.sample.timestamp = _z_timestamp_duplicate(timestamp); - _z_bytes_copy(&reply.data._result.sample.payload, &payload); - _z_bytes_copy(&reply.data._result.sample.attachment, &attachment); + _z_bytes_copy(&reply.data._result.sample.payload, payload); + _z_bytes_copy(&reply.data._result.sample.attachment, attachment); _z_encoding_move(&reply.data._result.sample.encoding, encoding); return reply; @@ -112,8 +113,9 @@ _z_reply_t _z_reply_err_create(const _z_bytes_t payload, _z_encoding_t *encoding return reply; } #else -_z_reply_t _z_reply_create(_z_keyexpr_t keyexpr, _z_id_t id, const _z_bytes_t payload, const _z_timestamp_t *timestamp, - _z_encoding_t *encoding, z_sample_kind_t kind, const _z_bytes_t attachment) { +_z_reply_t _z_reply_create(_z_keyexpr_t *keyexpr, _z_id_t id, const _z_bytes_t *payload, + const _z_timestamp_t *timestamp, _z_encoding_t *encoding, z_sample_kind_t kind, + const _z_bytes_t *attachment) { _ZP_UNUSED(keyexpr); _ZP_UNUSED(id); _ZP_UNUSED(payload); diff --git a/src/session/query.c b/src/session/query.c index f9546990f..f3401dace 100644 --- a/src/session/query.c +++ b/src/session/query.c @@ -112,8 +112,8 @@ z_result_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, } // Build the reply - _z_reply_t reply = _z_reply_create(expanded_ke, zn->_local_zid, msg->_payload, &msg->_commons._timestamp, - &msg->_encoding, kind, msg->_attachment); + _z_reply_t reply = _z_reply_create(&expanded_ke, zn->_local_zid, &msg->_payload, &msg->_commons._timestamp, + &msg->_encoding, kind, &msg->_attachment); bool drop = false; // Verify if this is a newer reply, free the old one in case it is From 8641b404a2ecd83890a0f87a363d2c2ce0fe3d2d Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Thu, 17 Oct 2024 17:00:13 +0200 Subject: [PATCH 05/12] feat: add zbuf copy bytes --- include/zenoh-pico/protocol/iobuf.h | 2 ++ src/protocol/iobuf.c | 9 +++++++++ 2 files changed, 11 insertions(+) diff --git a/include/zenoh-pico/protocol/iobuf.h b/include/zenoh-pico/protocol/iobuf.h index 105e67373..02c826999 100644 --- a/include/zenoh-pico/protocol/iobuf.h +++ b/include/zenoh-pico/protocol/iobuf.h @@ -41,6 +41,7 @@ _z_iosli_t _z_iosli_wrap(const uint8_t *buf, size_t length, size_t r_pos, size_t size_t _z_iosli_readable(const _z_iosli_t *ios); uint8_t _z_iosli_read(_z_iosli_t *ios); void _z_iosli_read_bytes(_z_iosli_t *ios, uint8_t *dest, size_t offset, size_t length); +void _z_iosli_copy_bytes(_z_iosli_t *dst, const _z_iosli_t *src); uint8_t _z_iosli_get(const _z_iosli_t *ios, size_t pos); size_t _z_iosli_writable(const _z_iosli_t *ios); @@ -75,6 +76,7 @@ _z_zbuf_t _z_slice_as_zbuf(_z_slice_t slice); size_t _z_zbuf_capacity(const _z_zbuf_t *zbf); uint8_t const *_z_zbuf_start(const _z_zbuf_t *zbf); size_t _z_zbuf_len(const _z_zbuf_t *zbf); +void _z_zbuf_copy_bytes(_z_zbuf_t *dst, const _z_zbuf_t *src); bool _z_zbuf_can_read(const _z_zbuf_t *zbf); size_t _z_zbuf_space_left(const _z_zbuf_t *zbf); diff --git a/src/protocol/iobuf.c b/src/protocol/iobuf.c index b58bb8819..bbb155894 100644 --- a/src/protocol/iobuf.c +++ b/src/protocol/iobuf.c @@ -76,6 +76,13 @@ void _z_iosli_read_bytes(_z_iosli_t *ios, uint8_t *dst, size_t offset, size_t le ios->_r_pos = ios->_r_pos + length; } +void _z_iosli_copy_bytes(_z_iosli_t *dst, const _z_iosli_t *src) { + size_t length = _z_iosli_readable(src); + assert(_z_iosli_readable(dst) >= length); + (void)memcpy(dst->_buf + dst->_w_pos, src->_buf + src->_r_pos, length); + dst->_w_pos += length; +} + uint8_t _z_iosli_get(const _z_iosli_t *ios, size_t pos) { assert(pos < ios->_capacity); return ios->_buf[pos]; @@ -196,6 +203,8 @@ uint8_t const *_z_zbuf_start(const _z_zbuf_t *zbf) { } size_t _z_zbuf_len(const _z_zbuf_t *zbf) { return _z_iosli_readable(&zbf->_ios); } +void _z_zbuf_copy_bytes(_z_zbuf_t *dst, const _z_zbuf_t *src) { _z_iosli_copy_bytes(&dst->_ios, &src->_ios); } + bool _z_zbuf_can_read(const _z_zbuf_t *zbf) { return _z_zbuf_len(zbf) > (size_t)0; } uint8_t _z_zbuf_read(_z_zbuf_t *zbf) { return _z_iosli_read(&zbf->_ios); } From 72d65b2c41fb433d6bb2246ba676b02995b597a8 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Thu, 17 Oct 2024 17:00:26 +0200 Subject: [PATCH 06/12] fix: zbuf clear memory leak --- src/protocol/iobuf.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/protocol/iobuf.c b/src/protocol/iobuf.c index bbb155894..cccb5dfcb 100644 --- a/src/protocol/iobuf.c +++ b/src/protocol/iobuf.c @@ -170,12 +170,13 @@ _z_iosli_t *_z_iosli_clone(const _z_iosli_t *src) { _z_zbuf_t _z_zbuf_make(size_t capacity) { _z_zbuf_t zbf; zbf._ios = _z_iosli_make(capacity); - _z_slice_t s = _z_slice_alias_buf(zbf._ios._buf, zbf._ios._capacity); + _z_slice_t s = _z_slice_from_buf_custom_deleter(zbf._ios._buf, zbf._ios._capacity, _z_delete_context_default()); zbf._slice = _z_slice_simple_rc_new_from_val(&s); if (_Z_RC_IS_NULL(&zbf._slice)) { _Z_ERROR("slice rc creation failed"); _z_zbuf_clear(&zbf); } + zbf._ios._is_alloc = false; return zbf; } @@ -236,9 +237,8 @@ uint8_t *_z_zbuf_get_wptr(const _z_zbuf_t *zbf) { return zbf->_ios._buf + zbf->_ void _z_zbuf_reset(_z_zbuf_t *zbf) { _z_iosli_reset(&zbf->_ios); } void _z_zbuf_clear(_z_zbuf_t *zbf) { - if (_z_slice_simple_rc_drop(&zbf->_slice)) { - _z_iosli_clear(&zbf->_ios); - } + _z_iosli_clear(&zbf->_ios); + _z_slice_simple_rc_drop(&zbf->_slice); } void _z_zbuf_compact(_z_zbuf_t *zbf) { From c67a918f13b7e9f1814bc1fbe09d9bca0cc3b2f4 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Thu, 17 Oct 2024 17:01:08 +0200 Subject: [PATCH 07/12] feat: improve read task buffer realloc logic --- src/transport/multicast/read.c | 21 ++++++++++++----- src/transport/unicast/read.c | 43 ++++++++++++++++++++++++---------- 2 files changed, 46 insertions(+), 18 deletions(-) diff --git a/src/transport/multicast/read.c b/src/transport/multicast/read.c index 76907a5e1..d0b8e1477 100644 --- a/src/transport/multicast/read.c +++ b/src/transport/multicast/read.c @@ -98,11 +98,9 @@ void *_zp_multicast_read_task(void *ztm_arg) { _z_zbuf_t zbuf = _z_zbuf_view(&ztm->_zbuf, to_read); while (_z_zbuf_len(&zbuf) > 0) { - z_result_t ret = _Z_RES_OK; - // Decode one session message _z_transport_message_t t_msg; - ret = _z_transport_message_decode(&t_msg, &zbuf); + z_result_t ret = _z_transport_message_decode(&t_msg, &zbuf); if (ret == _Z_RES_OK) { ret = _z_multicast_handle_transport_message(ztm, &t_msg, &addr); @@ -123,9 +121,20 @@ void *_zp_multicast_read_task(void *ztm_arg) { _z_zbuf_set_rpos(&ztm->_zbuf, _z_zbuf_get_rpos(&ztm->_zbuf) + to_read); // Check if user or defragment buffer took ownership of buffer if (!_z_zbuf_is_last_ref(&ztm->_zbuf)) { - // Drop this buffer and allocate a new one - _z_zbuf_clear(&ztm->_zbuf); - ztm->_zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE); + // Allocate a new buffer + _z_zbuf_t new_zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE); + if (_z_zbuf_capacity(&new_zbuf) != Z_BATCH_MULTICAST_SIZE) { + _Z_ERROR("Connection closed due to lack of memory to allocate rx buffer"); + ztm->_read_task_running = false; + } + // Recopy leftover bytes + size_t leftovers = _z_zbuf_len(&ztm->_zbuf); + if (leftovers > 0) { + _z_zbuf_copy_bytes(&new_zbuf, &ztm->_zbuf); + } + // Drop buffer & update + _z_zbuf_clear(&ztm->_zbuf); // FIXME MEMORY LEAK BECAUSE OF HOW ITS FREED + ztm->_zbuf = new_zbuf; } } _z_mutex_unlock(&ztm->_mutex_rx); diff --git a/src/transport/unicast/read.c b/src/transport/unicast/read.c index 922b59667..0c523f6c0 100644 --- a/src/transport/unicast/read.c +++ b/src/transport/unicast/read.c @@ -95,27 +95,46 @@ void *_zp_unicast_read_task(void *ztu_arg) { // Mark the session that we have received data ztu->_received = true; - // Decode one session message - _z_transport_message_t t_msg; - z_result_t ret = _z_transport_message_decode(&t_msg, &zbuf); + while (_z_zbuf_len(&zbuf) > 0) { + // Decode one session message + _z_transport_message_t t_msg; + z_result_t ret = _z_transport_message_decode(&t_msg, &zbuf); - if (ret == _Z_RES_OK) { - ret = _z_unicast_handle_transport_message(ztu, &t_msg); if (ret == _Z_RES_OK) { - _z_t_msg_clear(&t_msg); + ret = _z_unicast_handle_transport_message(ztu, &t_msg); + if (ret == _Z_RES_OK) { + _z_t_msg_clear(&t_msg); + } else { + _Z_ERROR("Connection closed due to message processing error: %d", ret); + ztu->_read_task_running = false; + continue; + } } else { - _Z_ERROR("Connection closed due to message processing error: %d", ret); + _Z_ERROR("Connection closed due to malformed message: %d", ret); ztu->_read_task_running = false; continue; } - } else { - _Z_ERROR("Connection closed due to malformed message: %d", ret); - ztu->_read_task_running = false; - continue; } - // Move the read position of the read buffer _z_zbuf_set_rpos(&ztu->_zbuf, _z_zbuf_get_rpos(&ztu->_zbuf) + to_read); + // Check if user or defragment buffer took ownership of buffer + if (!_z_zbuf_is_last_ref(&ztu->_zbuf)) { + // Allocate a new one + size_t buff_capacity = _z_zbuf_capacity(&ztu->_zbuf); + _z_zbuf_t new_zbuf = _z_zbuf_make(buff_capacity); + if (_z_zbuf_capacity(&new_zbuf) != buff_capacity) { + _Z_ERROR("Connection closed due to lack of memory to allocate rx buffer"); + ztu->_read_task_running = false; + } + // Recopy leftover bytes + size_t leftovers = _z_zbuf_len(&ztu->_zbuf); + if (leftovers > 0) { + _z_zbuf_copy_bytes(&new_zbuf, &ztu->_zbuf); + } + // Drop buffer & update + _z_zbuf_clear(&ztu->_zbuf); // FIXME MEMORY LEAK BECAUSE OF HOW ITS FREED + ztu->_zbuf = new_zbuf; + } } _z_mutex_unlock(&ztu->_mutex_rx); return NULL; From 1c89937fb54ed569bcbf7ebf619fa2d2146d3dcd Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Thu, 17 Oct 2024 17:12:40 +0200 Subject: [PATCH 08/12] doc: update read task comment --- src/transport/multicast/read.c | 2 +- src/transport/unicast/read.c | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/transport/multicast/read.c b/src/transport/multicast/read.c index d0b8e1477..cc67a59f7 100644 --- a/src/transport/multicast/read.c +++ b/src/transport/multicast/read.c @@ -133,7 +133,7 @@ void *_zp_multicast_read_task(void *ztm_arg) { _z_zbuf_copy_bytes(&new_zbuf, &ztm->_zbuf); } // Drop buffer & update - _z_zbuf_clear(&ztm->_zbuf); // FIXME MEMORY LEAK BECAUSE OF HOW ITS FREED + _z_zbuf_clear(&ztm->_zbuf); ztm->_zbuf = new_zbuf; } } diff --git a/src/transport/unicast/read.c b/src/transport/unicast/read.c index 0c523f6c0..c93c73654 100644 --- a/src/transport/unicast/read.c +++ b/src/transport/unicast/read.c @@ -119,7 +119,7 @@ void *_zp_unicast_read_task(void *ztu_arg) { _z_zbuf_set_rpos(&ztu->_zbuf, _z_zbuf_get_rpos(&ztu->_zbuf) + to_read); // Check if user or defragment buffer took ownership of buffer if (!_z_zbuf_is_last_ref(&ztu->_zbuf)) { - // Allocate a new one + // Allocate a new buffer size_t buff_capacity = _z_zbuf_capacity(&ztu->_zbuf); _z_zbuf_t new_zbuf = _z_zbuf_make(buff_capacity); if (_z_zbuf_capacity(&new_zbuf) != buff_capacity) { @@ -132,7 +132,7 @@ void *_zp_unicast_read_task(void *ztu_arg) { _z_zbuf_copy_bytes(&new_zbuf, &ztu->_zbuf); } // Drop buffer & update - _z_zbuf_clear(&ztu->_zbuf); // FIXME MEMORY LEAK BECAUSE OF HOW ITS FREED + _z_zbuf_clear(&ztu->_zbuf); ztu->_zbuf = new_zbuf; } } From 5a0809c7196c04f6ee0fb7393266513cbb4f5a13 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Thu, 17 Oct 2024 17:22:10 +0200 Subject: [PATCH 09/12] feat: add raweth read task buffer allocation --- src/transport/raweth/read.c | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/transport/raweth/read.c b/src/transport/raweth/read.c index b5062ebab..b3f0bfdd8 100644 --- a/src/transport/raweth/read.c +++ b/src/transport/raweth/read.c @@ -85,6 +85,24 @@ void *_zp_raweth_read_task(void *ztm_arg) { } _z_t_msg_clear(&t_msg); _z_slice_clear(&addr); + + // Check if user or defragment buffer took ownership of buffer + if (!_z_zbuf_is_last_ref(&ztm->_zbuf)) { + // Allocate a new buffer + _z_zbuf_t new_zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE); + if (_z_zbuf_capacity(&new_zbuf) != Z_BATCH_MULTICAST_SIZE) { + _Z_ERROR("Connection closed due to lack of memory to allocate rx buffer"); + ztm->_read_task_running = false; + } + // Recopy leftover bytes + size_t leftovers = _z_zbuf_len(&ztm->_zbuf); + if (leftovers > 0) { + _z_zbuf_copy_bytes(&new_zbuf, &ztm->_zbuf); + } + // Drop buffer & update + _z_zbuf_clear(&ztm->_zbuf); + ztm->_zbuf = new_zbuf; + } } return NULL; } From b0589d443757988b289bf1ac5f6e47fa892a7d65 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Thu, 17 Oct 2024 17:37:23 +0200 Subject: [PATCH 10/12] feat: move rx buffer logic to function --- include/zenoh-pico/transport/multicast/rx.h | 1 + include/zenoh-pico/transport/raweth/rx.h | 1 + include/zenoh-pico/transport/unicast/rx.h | 1 + src/transport/multicast/read.c | 24 ++++++------------- src/transport/multicast/rx.c | 21 +++++++++++++++++ src/transport/raweth/read.c | 24 ++++++------------- src/transport/raweth/rx.c | 20 ++++++++++++++++ src/transport/unicast/read.c | 26 +++++++-------------- src/transport/unicast/rx.c | 22 +++++++++++++++++ 9 files changed, 88 insertions(+), 52 deletions(-) diff --git a/include/zenoh-pico/transport/multicast/rx.h b/include/zenoh-pico/transport/multicast/rx.h index 31a10e64a..97b1e8960 100644 --- a/include/zenoh-pico/transport/multicast/rx.h +++ b/include/zenoh-pico/transport/multicast/rx.h @@ -20,5 +20,6 @@ z_result_t _z_multicast_recv_t_msg(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_slice_t *addr); z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_slice_t *addr); +z_result_t _z_multicast_update_rx_buffer(_z_transport_multicast_t *ztm); #endif /* ZENOH_PICO_TRANSPORT_LINK_RX_H */ diff --git a/include/zenoh-pico/transport/raweth/rx.h b/include/zenoh-pico/transport/raweth/rx.h index 81874d633..4607dd6b2 100644 --- a/include/zenoh-pico/transport/raweth/rx.h +++ b/include/zenoh-pico/transport/raweth/rx.h @@ -19,5 +19,6 @@ z_result_t _z_raweth_recv_t_msg(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_slice_t *addr); z_result_t _z_raweth_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_slice_t *addr); +z_result_t _z_raweth_update_rx_buff(_z_transport_multicast_t *ztm); #endif /* ZENOH_PICO_RAWETH_RX_H */ diff --git a/include/zenoh-pico/transport/unicast/rx.h b/include/zenoh-pico/transport/unicast/rx.h index f36cb592c..1fa5b60d7 100644 --- a/include/zenoh-pico/transport/unicast/rx.h +++ b/include/zenoh-pico/transport/unicast/rx.h @@ -20,5 +20,6 @@ z_result_t _z_unicast_recv_t_msg(_z_transport_unicast_t *ztu, _z_transport_message_t *t_msg); z_result_t _z_unicast_recv_t_msg_na(_z_transport_unicast_t *ztu, _z_transport_message_t *t_msg); z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_transport_message_t *t_msg); +z_result_t _z_unicast_update_rx_buffer(_z_transport_unicast_t *ztu); #endif /* ZENOH_PICO_UNICAST_RX_H */ diff --git a/src/transport/multicast/read.c b/src/transport/multicast/read.c index cc67a59f7..0a39fe4e1 100644 --- a/src/transport/multicast/read.c +++ b/src/transport/multicast/read.c @@ -36,7 +36,10 @@ z_result_t _zp_multicast_read(_z_transport_multicast_t *ztm) { ret = _z_multicast_handle_transport_message(ztm, &t_msg, &addr); _z_t_msg_clear(&t_msg); } - + ret = _z_multicast_update_rx_buffer(ztm); + if (ret != _Z_RES_OK) { + _Z_ERROR("Failed to allocate rx buffer"); + } return ret; } #else @@ -119,22 +122,9 @@ void *_zp_multicast_read_task(void *ztm_arg) { } // Move the read position of the read buffer _z_zbuf_set_rpos(&ztm->_zbuf, _z_zbuf_get_rpos(&ztm->_zbuf) + to_read); - // Check if user or defragment buffer took ownership of buffer - if (!_z_zbuf_is_last_ref(&ztm->_zbuf)) { - // Allocate a new buffer - _z_zbuf_t new_zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE); - if (_z_zbuf_capacity(&new_zbuf) != Z_BATCH_MULTICAST_SIZE) { - _Z_ERROR("Connection closed due to lack of memory to allocate rx buffer"); - ztm->_read_task_running = false; - } - // Recopy leftover bytes - size_t leftovers = _z_zbuf_len(&ztm->_zbuf); - if (leftovers > 0) { - _z_zbuf_copy_bytes(&new_zbuf, &ztm->_zbuf); - } - // Drop buffer & update - _z_zbuf_clear(&ztm->_zbuf); - ztm->_zbuf = new_zbuf; + if (_z_multicast_update_rx_buffer(ztm) != _Z_RES_OK) { + _Z_ERROR("Connection closed due to lack of memory to allocate rx buffer"); + ztm->_read_task_running = false; } } _z_mutex_unlock(&ztm->_mutex_rx); diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index 7ab3ad542..fdbfb2ec1 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -350,6 +350,27 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, return ret; } + +z_result_t _z_multicast_update_rx_buffer(_z_transport_multicast_t *ztm) { + // Check if user or defragment buffer took ownership of buffer + if (!_z_zbuf_is_last_ref(&ztm->_zbuf)) { + // Allocate a new buffer + _z_zbuf_t new_zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE); + if (_z_zbuf_capacity(&new_zbuf) != Z_BATCH_MULTICAST_SIZE) { + return _Z_ERR_SYSTEM_OUT_OF_MEMORY; + } + // Recopy leftover bytes + size_t leftovers = _z_zbuf_len(&ztm->_zbuf); + if (leftovers > 0) { + _z_zbuf_copy_bytes(&new_zbuf, &ztm->_zbuf); + } + // Drop buffer & update + _z_zbuf_clear(&ztm->_zbuf); + ztm->_zbuf = new_zbuf; + } + return _Z_RES_OK; +} + #else z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_slice_t *addr) { diff --git a/src/transport/raweth/read.c b/src/transport/raweth/read.c index b3f0bfdd8..4c49d7b64 100644 --- a/src/transport/raweth/read.c +++ b/src/transport/raweth/read.c @@ -37,6 +37,10 @@ z_result_t _zp_raweth_read(_z_transport_multicast_t *ztm) { _z_t_msg_clear(&t_msg); } _z_slice_clear(&addr); + ret = _z_raweth_update_rx_buff(ztm); + if (ret != _Z_RES_OK) { + _Z_ERROR("Failed to allocate rx buffer"); + } return ret; } #else @@ -85,23 +89,9 @@ void *_zp_raweth_read_task(void *ztm_arg) { } _z_t_msg_clear(&t_msg); _z_slice_clear(&addr); - - // Check if user or defragment buffer took ownership of buffer - if (!_z_zbuf_is_last_ref(&ztm->_zbuf)) { - // Allocate a new buffer - _z_zbuf_t new_zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE); - if (_z_zbuf_capacity(&new_zbuf) != Z_BATCH_MULTICAST_SIZE) { - _Z_ERROR("Connection closed due to lack of memory to allocate rx buffer"); - ztm->_read_task_running = false; - } - // Recopy leftover bytes - size_t leftovers = _z_zbuf_len(&ztm->_zbuf); - if (leftovers > 0) { - _z_zbuf_copy_bytes(&new_zbuf, &ztm->_zbuf); - } - // Drop buffer & update - _z_zbuf_clear(&ztm->_zbuf); - ztm->_zbuf = new_zbuf; + if (_z_raweth_update_rx_buff(ztm) != _Z_RES_OK) { + _Z_ERROR("Connection closed due to lack of memory to allocate rx buffer"); + ztm->_read_task_running = false; } } return NULL; diff --git a/src/transport/raweth/rx.c b/src/transport/raweth/rx.c index 59b57e512..3414dcf3c 100644 --- a/src/transport/raweth/rx.c +++ b/src/transport/raweth/rx.c @@ -117,6 +117,26 @@ z_result_t _z_raweth_recv_t_msg(_z_transport_multicast_t *ztm, _z_transport_mess return _z_raweth_recv_t_msg_na(ztm, t_msg, addr); } +z_result_t _z_raweth_update_rx_buff(_z_transport_multicast_t *ztm) { + // Check if user or defragment buffer took ownership of buffer + if (!_z_zbuf_is_last_ref(&ztm->_zbuf)) { + // Allocate a new buffer + _z_zbuf_t new_zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE); + if (_z_zbuf_capacity(&new_zbuf) != Z_BATCH_MULTICAST_SIZE) { + return _Z_ERR_SYSTEM_OUT_OF_MEMORY; + } + // Recopy leftover bytes + size_t leftovers = _z_zbuf_len(&ztm->_zbuf); + if (leftovers > 0) { + _z_zbuf_copy_bytes(&new_zbuf, &ztm->_zbuf); + } + // Drop buffer & update + _z_zbuf_clear(&ztm->_zbuf); + ztm->_zbuf = new_zbuf; + } + return _Z_RES_OK; +} + #else z_result_t _z_raweth_recv_t_msg(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_slice_t *addr) { _ZP_UNUSED(ztm); diff --git a/src/transport/unicast/read.c b/src/transport/unicast/read.c index c93c73654..d6520b6c3 100644 --- a/src/transport/unicast/read.c +++ b/src/transport/unicast/read.c @@ -33,7 +33,10 @@ z_result_t _zp_unicast_read(_z_transport_unicast_t *ztu) { ret = _z_unicast_handle_transport_message(ztu, &t_msg); _z_t_msg_clear(&t_msg); } - + ret = _z_unicast_update_rx_buffer(ztu); + if (ret != _Z_RES_OK) { + _Z_ERROR("Failed to allocate rx buffer"); + } return ret; } #else @@ -117,23 +120,10 @@ void *_zp_unicast_read_task(void *ztu_arg) { } // Move the read position of the read buffer _z_zbuf_set_rpos(&ztu->_zbuf, _z_zbuf_get_rpos(&ztu->_zbuf) + to_read); - // Check if user or defragment buffer took ownership of buffer - if (!_z_zbuf_is_last_ref(&ztu->_zbuf)) { - // Allocate a new buffer - size_t buff_capacity = _z_zbuf_capacity(&ztu->_zbuf); - _z_zbuf_t new_zbuf = _z_zbuf_make(buff_capacity); - if (_z_zbuf_capacity(&new_zbuf) != buff_capacity) { - _Z_ERROR("Connection closed due to lack of memory to allocate rx buffer"); - ztu->_read_task_running = false; - } - // Recopy leftover bytes - size_t leftovers = _z_zbuf_len(&ztu->_zbuf); - if (leftovers > 0) { - _z_zbuf_copy_bytes(&new_zbuf, &ztu->_zbuf); - } - // Drop buffer & update - _z_zbuf_clear(&ztu->_zbuf); - ztu->_zbuf = new_zbuf; + + if (_z_unicast_update_rx_buffer(ztu) != _Z_RES_OK) { + _Z_ERROR("Connection closed due to lack of memory to allocate rx buffer"); + ztu->_read_task_running = false; } } _z_mutex_unlock(&ztu->_mutex_rx); diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index b500f55c9..6fce4fb78 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -215,6 +215,28 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t return ret; } + +z_result_t _z_unicast_update_rx_buffer(_z_transport_unicast_t *ztu) { + // Check if user or defragment buffer took ownership of buffer + if (!_z_zbuf_is_last_ref(&ztu->_zbuf)) { + // Allocate a new buffer + size_t buff_capacity = _z_zbuf_capacity(&ztu->_zbuf); + _z_zbuf_t new_zbuf = _z_zbuf_make(buff_capacity); + if (_z_zbuf_capacity(&new_zbuf) != buff_capacity) { + return _Z_ERR_SYSTEM_OUT_OF_MEMORY; + } + // Recopy leftover bytes + size_t leftovers = _z_zbuf_len(&ztu->_zbuf); + if (leftovers > 0) { + _z_zbuf_copy_bytes(&new_zbuf, &ztu->_zbuf); + } + // Drop buffer & update + _z_zbuf_clear(&ztu->_zbuf); + ztu->_zbuf = new_zbuf; + } + return _Z_RES_OK; +} + #else z_result_t _z_unicast_recv_t_msg(_z_transport_unicast_t *ztu, _z_transport_message_t *t_msg) { _ZP_UNUSED(ztu); From 1776867489750566cf73800c4e9b8ed684ae1e26 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Thu, 17 Oct 2024 19:46:11 +0200 Subject: [PATCH 11/12] fix: zbuf init code --- src/protocol/iobuf.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/protocol/iobuf.c b/src/protocol/iobuf.c index cccb5dfcb..7b4b361dc 100644 --- a/src/protocol/iobuf.c +++ b/src/protocol/iobuf.c @@ -168,13 +168,16 @@ _z_iosli_t *_z_iosli_clone(const _z_iosli_t *src) { /*------------------ ZBuf ------------------*/ _z_zbuf_t _z_zbuf_make(size_t capacity) { - _z_zbuf_t zbf; + _z_zbuf_t zbf = {0}; zbf._ios = _z_iosli_make(capacity); + if (_z_zbuf_capacity(&zbf) == 0) { + return zbf; + } _z_slice_t s = _z_slice_from_buf_custom_deleter(zbf._ios._buf, zbf._ios._capacity, _z_delete_context_default()); zbf._slice = _z_slice_simple_rc_new_from_val(&s); if (_Z_RC_IS_NULL(&zbf._slice)) { _Z_ERROR("slice rc creation failed"); - _z_zbuf_clear(&zbf); + _z_iosli_clear(&zbf._ios); } zbf._ios._is_alloc = false; return zbf; From 12d8de33782b61be9a8d50a7f0e25be8f4f20540 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Fri, 18 Oct 2024 16:16:51 +0200 Subject: [PATCH 12/12] feat: add rc count function --- include/zenoh-pico/collections/refcount.h | 4 ++-- include/zenoh-pico/protocol/iobuf.h | 2 +- src/transport/multicast/rx.c | 2 +- src/transport/raweth/rx.c | 2 +- src/transport/unicast/rx.c | 2 +- tests/z_refcount_test.c | 7 +++---- 6 files changed, 9 insertions(+), 10 deletions(-) diff --git a/include/zenoh-pico/collections/refcount.h b/include/zenoh-pico/collections/refcount.h index a6e877368..bcbd8f411 100644 --- a/include/zenoh-pico/collections/refcount.h +++ b/include/zenoh-pico/collections/refcount.h @@ -244,8 +244,8 @@ size_t _z_simple_rc_strong_count(void *cnt); *p = name##_simple_rc_null(); \ return res; \ } \ - static inline bool name##_simple_rc_is_last_ref(const name##_simple_rc_t *p) { \ - return (_z_simple_rc_strong_count(p->_cnt) == 1); \ + static inline size_t name##_simple_rc_count(const name##_simple_rc_t *p) { \ + return _z_simple_rc_strong_count(p->_cnt); \ } \ static inline size_t name##_simple_rc_size(name##_simple_rc_t *p) { \ _ZP_UNUSED(p); \ diff --git a/include/zenoh-pico/protocol/iobuf.h b/include/zenoh-pico/protocol/iobuf.h index 02c826999..7f1ebf722 100644 --- a/include/zenoh-pico/protocol/iobuf.h +++ b/include/zenoh-pico/protocol/iobuf.h @@ -67,7 +67,7 @@ typedef struct { _z_slice_simple_rc_t _slice; } _z_zbuf_t; -static inline bool _z_zbuf_is_last_ref(const _z_zbuf_t *zbf) { return _z_slice_simple_rc_is_last_ref(&zbf->_slice); } +static inline size_t _z_zbuf_get_ref_count(const _z_zbuf_t *zbf) { return _z_slice_simple_rc_count(&zbf->_slice); } _z_zbuf_t _z_zbuf_make(size_t capacity); _z_zbuf_t _z_zbuf_view(_z_zbuf_t *zbf, size_t length); /// Constructs a _borrowing_ reader on `slice` diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index fdbfb2ec1..2832d2053 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -353,7 +353,7 @@ z_result_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, z_result_t _z_multicast_update_rx_buffer(_z_transport_multicast_t *ztm) { // Check if user or defragment buffer took ownership of buffer - if (!_z_zbuf_is_last_ref(&ztm->_zbuf)) { + if (_z_zbuf_get_ref_count(&ztm->_zbuf) != 1) { // Allocate a new buffer _z_zbuf_t new_zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE); if (_z_zbuf_capacity(&new_zbuf) != Z_BATCH_MULTICAST_SIZE) { diff --git a/src/transport/raweth/rx.c b/src/transport/raweth/rx.c index 3414dcf3c..660f49293 100644 --- a/src/transport/raweth/rx.c +++ b/src/transport/raweth/rx.c @@ -119,7 +119,7 @@ z_result_t _z_raweth_recv_t_msg(_z_transport_multicast_t *ztm, _z_transport_mess z_result_t _z_raweth_update_rx_buff(_z_transport_multicast_t *ztm) { // Check if user or defragment buffer took ownership of buffer - if (!_z_zbuf_is_last_ref(&ztm->_zbuf)) { + if (_z_zbuf_get_ref_count(&ztm->_zbuf) != 1) { // Allocate a new buffer _z_zbuf_t new_zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE); if (_z_zbuf_capacity(&new_zbuf) != Z_BATCH_MULTICAST_SIZE) { diff --git a/src/transport/unicast/rx.c b/src/transport/unicast/rx.c index 6fce4fb78..be7d55446 100644 --- a/src/transport/unicast/rx.c +++ b/src/transport/unicast/rx.c @@ -218,7 +218,7 @@ z_result_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_t z_result_t _z_unicast_update_rx_buffer(_z_transport_unicast_t *ztu) { // Check if user or defragment buffer took ownership of buffer - if (!_z_zbuf_is_last_ref(&ztu->_zbuf)) { + if (_z_zbuf_get_ref_count(&ztu->_zbuf) != 1) { // Allocate a new buffer size_t buff_capacity = _z_zbuf_capacity(&ztu->_zbuf); _z_zbuf_t new_zbuf = _z_zbuf_make(buff_capacity); diff --git a/tests/z_refcount_test.c b/tests/z_refcount_test.c index 74dcdecef..7935b6c63 100644 --- a/tests/z_refcount_test.c +++ b/tests/z_refcount_test.c @@ -326,11 +326,10 @@ void test_simple_rc_clone_as_ptr(void) { _dummy_simple_rc_t *drc2 = _dummy_simple_rc_clone_as_ptr(&drc1); assert(drc2->_val != NULL); assert(!_Z_RC_IS_NULL(drc2)); - assert(!_dummy_simple_rc_is_last_ref(drc2)); - assert(_z_simple_rc_strong_count(drc2->_cnt) == 2); + assert(_dummy_simple_rc_count(drc2) == 2); assert(_dummy_simple_rc_eq(&drc1, drc2)); assert(!_dummy_simple_rc_drop(&drc1)); - assert(_dummy_simple_rc_is_last_ref(drc2)); + assert(_dummy_simple_rc_count(drc2) == 1); assert(_dummy_simple_rc_drop(drc2)); z_free(drc2); } @@ -341,7 +340,7 @@ void test_simple_rc_copy(void) { _dummy_simple_rc_t drc2 = _dummy_simple_rc_null(); assert(!_dummy_simple_rc_eq(&drc1, &drc2)); _dummy_simple_rc_copy(&drc2, &drc1); - assert(_z_simple_rc_strong_count(drc2._cnt) == 2); + assert(_dummy_simple_rc_count(&drc2) == 2); assert(_dummy_simple_rc_eq(&drc1, &drc2)); assert(!_dummy_simple_rc_drop(&drc2)); assert(_dummy_simple_rc_drop(&drc1));