Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support start/stop fragment marker #813

Merged
merged 32 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
1f4cf1d
feat: support start/stop fragment marker
wyfo Nov 25, 2024
6837602
fix: fix test
wyfo Dec 4, 2024
ad94190
fix: typo
wyfo Dec 4, 2024
c5f50ac
fix: format
wyfo Dec 4, 2024
3736054
fix: fragment ext encoding
wyfo Dec 4, 2024
218019a
fix: rename start/stop marker to first/drop
wyfo Dec 4, 2024
0c5f62b
Update src/protocol/codec/transport.c
wyfo Dec 4, 2024
0f2a139
Update src/protocol/definitions/transport.c
wyfo Dec 4, 2024
c93e694
Update src/protocol/definitions/transport.c
wyfo Dec 4, 2024
8c55511
Update src/protocol/codec/transport.c
wyfo Dec 4, 2024
e997fc4
Update src/transport/unicast/rx.c
wyfo Dec 4, 2024
b42dca2
Update src/transport/unicast/rx.c
wyfo Dec 4, 2024
223d085
Update src/transport/multicast/rx.c
wyfo Dec 4, 2024
a05524d
Update src/protocol/definitions/transport.c
wyfo Dec 4, 2024
107ac1a
Update src/protocol/definitions/transport.c
wyfo Dec 4, 2024
bd89a76
Update src/protocol/codec/transport.c
wyfo Dec 4, 2024
645acc4
Update src/protocol/codec/transport.c
wyfo Dec 4, 2024
002a0e1
Update src/protocol/codec/transport.c
wyfo Dec 4, 2024
47d15e1
Update src/transport/multicast/rx.c
wyfo Dec 4, 2024
524b3df
Update src/transport/multicast/rx.c
wyfo Dec 4, 2024
daa56a0
Update src/transport/multicast/rx.c
wyfo Dec 4, 2024
1ba505b
Update src/transport/unicast/rx.c
wyfo Dec 4, 2024
1b681bb
Update src/transport/unicast/rx.c
wyfo Dec 4, 2024
5490645
Update src/transport/unicast/rx.c
wyfo Dec 4, 2024
1b36f8e
fix: apply PR feedbacks
wyfo Dec 4, 2024
bca36bd
fix: apply PR feedbacks
wyfo Dec 4, 2024
733a23d
fix: initializing all the fields is always a good idea
wyfo Dec 4, 2024
e73ce42
fix: add _patch field in copy functions
wyfo Dec 4, 2024
e55c499
fix: typo
wyfo Dec 4, 2024
6da6c45
fix: format
wyfo Dec 4, 2024
3fa9b5d
fix: reset dbuf instead of clear
wyfo Dec 4, 2024
9320775
fix: format
wyfo Dec 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions include/zenoh-pico/protocol/definitions/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,16 @@ extern "C" {
// Z Extensions if Z==1 then Zenoh extensions are present
#define _Z_FLAG_T_CLOSE_S 0x20 // 1 << 5

/*=============================*/
/* Patch */
/*=============================*/
/// Used to negotiate the patch version of the protocol
/// if not present (or 0), then protocol as released with 1.0.0
/// if >= 1, then fragmentation start/stop marker
#define _Z_NO_PATCH 0x00
#define _Z_CURRENT_PATCH 0x01
#define _Z_PATCH_HAS_FRAGMENT_START_STOP(patch) (patch >= 1)

/*=============================*/
/* Transport Messages */
/*=============================*/
Expand Down Expand Up @@ -235,6 +245,9 @@ typedef struct {
uint8_t _req_id_res;
uint8_t _seq_num_res;
uint8_t _version;
#if Z_FEATURE_FRAGMENTATION == 1
uint8_t _patch;
#endif
} _z_t_msg_join_t;
void _z_t_msg_join_clear(_z_t_msg_join_t *msg);

Expand Down Expand Up @@ -315,6 +328,9 @@ typedef struct {
uint8_t _req_id_res;
uint8_t _seq_num_res;
uint8_t _version;
#if Z_FEATURE_FRAGMENTATION == 1
uint8_t _patch;
#endif
} _z_t_msg_init_t;
void _z_t_msg_init_clear(_z_t_msg_init_t *msg);

Expand Down Expand Up @@ -478,11 +494,11 @@ void _z_t_msg_frame_clear(_z_t_msg_frame_t *msg);
typedef struct {
_z_slice_t _payload;
_z_zint_t _sn;
bool start;
bool stop;
} _z_t_msg_fragment_t;
void _z_t_msg_fragment_clear(_z_t_msg_fragment_t *msg);

#define _Z_FRAGMENT_HEADER_SIZE 12

/*------------------ Transport Message ------------------*/
typedef union {
_z_t_msg_join_t _join;
Expand Down Expand Up @@ -514,9 +530,10 @@ _z_transport_message_t _z_t_msg_make_keep_alive(void);
_z_transport_message_t _z_t_msg_make_frame(_z_zint_t sn, _z_network_message_vec_t messages,
z_reliability_t reliability);
_z_transport_message_t _z_t_msg_make_frame_header(_z_zint_t sn, z_reliability_t reliability);
_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last);
_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last,
bool start, bool stop);
_z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t messages, z_reliability_t reliability,
bool is_last);
bool is_last, bool start, bool stop);

/*------------------ Copy ------------------*/
void _z_t_msg_copy(_z_transport_message_t *clone, _z_transport_message_t *msg);
Expand Down
7 changes: 6 additions & 1 deletion include/zenoh-pico/protocol/ext.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ extern "C" {
/*=============================*/
/* Extension IDs */
/*=============================*/
// #define _Z_MSG_EXT_ID_FOO 0x00 // Hex(ENC|M|ID)
#define _Z_MSG_EXT_ID_JOIN_QOS (0x01 | _Z_MSG_EXT_FLAG_M | _Z_MSG_EXT_ENC_ZBUF)
#define _Z_MSG_EXT_ID_JOIN_PATCH (0x07 | _Z_MSG_EXT_ENC_ZINT)
#define _Z_MSG_EXT_ID_INIT_PATCH (0x07 | _Z_MSG_EXT_ENC_ZINT)
#define _Z_MSG_EXT_ID_FRAGMENT_START (0x02 | _Z_MSG_EXT_ENC_UNIT)
#define _Z_MSG_EXT_ID_FRAGMENT_STOP (0x03 | _Z_MSG_EXT_ENC_UNIT)

/*=============================*/
/* Extension Encodings */
Expand All @@ -58,6 +62,7 @@ extern "C" {
#define _Z_MSG_EXT_FLAG_M 0x10
#define _Z_MSG_EXT_IS_MANDATORY(h) ((h & _Z_MSG_EXT_FLAG_M) != 0)
#define _Z_MSG_EXT_FLAG_Z 0x80
#define _Z_MSG_EXT_MORE(more) (more ? _Z_MSG_EXT_FLAG_Z : 0)

typedef struct {
uint8_t __dummy; // Just to avoid empty structures that might cause undefined behavior
Expand Down
3 changes: 2 additions & 1 deletion include/zenoh-pico/transport/common/tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability);
void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability);
/*This function is unsafe because it operates in potentially concurrent
data.*Make sure that the following mutexes are locked before calling this function : *-ztu->mutex_tx */
z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn);
z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn,
bool start);

/*------------------ Transmission and Reception helpers ------------------*/
z_result_t _z_send_t_msg(_z_transport_t *zt, const _z_transport_message_t *t_msg);
Expand Down
13 changes: 13 additions & 0 deletions include/zenoh-pico/transport/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ typedef struct {

uint16_t _peer_id;
volatile bool _received;

#if Z_FEATURE_FRAGMENTATION == 1
// Patch
uint8_t _patch;
#endif
} _z_transport_peer_entry_t;

size_t _z_transport_peer_entry_size(const _z_transport_peer_entry_t *src);
Expand Down Expand Up @@ -108,6 +113,11 @@ typedef struct {

volatile bool _received;
volatile bool _transmitted;

#if Z_FEATURE_FRAGMENTATION == 1
// Patch
uint8_t _patch;
#endif
} _z_transport_unicast_t;

typedef struct _z_transport_multicast_t {
Expand Down Expand Up @@ -175,6 +185,9 @@ typedef struct {
uint8_t _req_id_res;
uint8_t _seq_num_res;
bool _is_qos;
#if Z_FEATURE_FRAGMENTATION == 1
uint8_t _patch;
#endif
} _z_transport_unicast_establish_param_t;

typedef struct {
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/transport/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ _z_zint_t _z_sn_max(uint8_t bits);
_z_zint_t _z_sn_half(_z_zint_t sn);
_z_zint_t _z_sn_modulo_mask(uint8_t bits);
bool _z_sn_precedes(const _z_zint_t sn_resolution, const _z_zint_t sn_left, const _z_zint_t sn_right);
bool _z_sn_consecutive(const _z_zint_t sn_resolution, const _z_zint_t sn_left, const _z_zint_t sn_right);
_z_zint_t _z_sn_increment(const _z_zint_t sn_resolution, const _z_zint_t sn);
_z_zint_t _z_sn_decrement(const _z_zint_t sn_resolution, const _z_zint_t sn);

Expand Down
60 changes: 53 additions & 7 deletions src/protocol/codec/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,14 @@ z_result_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t
}
_Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_next_sn._val._plain._reliable));
_Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_next_sn._val._plain._best_effort));
#if Z_FEATURE_FRAGMENTATION == 1
bool has_patch = msg->_patch != _Z_NO_PATCH;
#else
bool has_patch = false;
#endif
if (msg->_next_sn._is_qos) {
if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) {
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZBUF | _Z_MSG_EXT_FLAG_M | 1));
if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) {
wyfo marked this conversation as resolved.
Show resolved Hide resolved
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_QOS | _Z_MSG_EXT_MORE(has_patch)));
size_t len = 0;
for (uint8_t i = 0; (i < Z_PRIORITIES_NUM) && (ret == _Z_RES_OK); i++) {
len += _z_zint_len(msg->_next_sn._val._qos[i]._reliable) +
Expand All @@ -82,21 +87,35 @@ z_result_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t
ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED;
}
}
#if Z_FEATURE_FRAGMENTATION == 1
if (has_patch) {
if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) {
wyfo marked this conversation as resolved.
Show resolved Hide resolved
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_PATCH));
_Z_RETURN_IF_ERR(_z_zint64_encode(wbf, msg->_patch));
} else {
_Z_DEBUG("Attempted to serialize Patch extension, but the header extension flag was unset");
ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED;
}
}
#endif

return ret;
}

z_result_t _z_join_decode_ext(_z_msg_ext_t *extension, void *ctx) {
z_result_t ret = _Z_RES_OK;
_z_t_msg_join_t *msg = (_z_t_msg_join_t *)ctx;
if (_Z_EXT_FULL_ID(extension->_header) ==
(_Z_MSG_EXT_ENC_ZBUF | _Z_MSG_EXT_FLAG_M | 1)) { // QOS: (enc=zbuf)(mandatory=true)(id=1)
if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_JOIN_QOS) {
msg->_next_sn._is_qos = true;
_z_zbuf_t zbf = _z_slice_as_zbuf(extension->_body._zbuf._val);
for (int i = 0; (ret == _Z_RES_OK) && (i < Z_PRIORITIES_NUM); ++i) {
ret |= _z_zsize_decode(&msg->_next_sn._val._qos[i]._reliable, &zbf);
ret |= _z_zsize_decode(&msg->_next_sn._val._qos[i]._best_effort, &zbf);
}
#if Z_FEATURE_FRAGMENTATION == 1
} else if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_JOIN_PATCH) {
msg->_patch = (uint8_t)extension->_body._zint._val;
#endif
} else if (_Z_MSG_EXT_IS_MANDATORY(extension->_header)) {
ret = _Z_ERR_MESSAGE_EXTENSION_MANDATORY_AND_UNKNOWN;
}
Expand Down Expand Up @@ -147,7 +166,8 @@ z_result_t _z_join_decode(_z_t_msg_join_t *msg, _z_zbuf_t *zbf, uint8_t header)
ret |= _z_zsize_decode(&msg->_next_sn._val._plain._reliable, zbf);
ret |= _z_zsize_decode(&msg->_next_sn._val._plain._best_effort, zbf);
}
if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_Z)) {
msg->_patch = _Z_NO_PATCH;
if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) {
wyfo marked this conversation as resolved.
Show resolved Hide resolved
ret |= _z_msg_ext_decode_iter(zbf, _z_join_decode_ext, msg);
}

Expand Down Expand Up @@ -180,6 +200,32 @@ z_result_t _z_init_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_init_t
_Z_RETURN_IF_ERR(_z_slice_encode(wbf, &msg->_cookie))
}

#if Z_FEATURE_FRAGMENTATION == 1
if (msg->_patch != _Z_CURRENT_PATCH) {
if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) {
wyfo marked this conversation as resolved.
Show resolved Hide resolved
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ID_JOIN_PATCH));
_Z_RETURN_IF_ERR(_z_zint64_encode(wbf, msg->_patch));
} else {
_Z_DEBUG("Attempted to serialize Patch extension, but the header extension flag was unset");
ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED;
}
}
#endif

return ret;
}

z_result_t _z_init_decode_ext(_z_msg_ext_t *extension, void *ctx) {
z_result_t ret = _Z_RES_OK;
_z_t_msg_init_t *msg = (_z_t_msg_init_t *)ctx;
if (false) {
#if Z_FEATURE_FRAGMENTATION == 1
} else if (_Z_EXT_FULL_ID(extension->_header) == _Z_MSG_EXT_ID_INIT_PATCH) {
msg->_patch = (uint8_t)extension->_body._zint._val;
#endif
} else if (_Z_MSG_EXT_IS_MANDATORY(extension->_header)) {
ret = _Z_ERR_MESSAGE_EXTENSION_MANDATORY_AND_UNKNOWN;
}
return ret;
}

Expand Down Expand Up @@ -222,8 +268,8 @@ z_result_t _z_init_decode(_z_t_msg_init_t *msg, _z_zbuf_t *zbf, uint8_t header)
msg->_cookie = _z_slice_empty();
}

if ((ret == _Z_RES_OK) && (_Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true)) {
ret |= _z_msg_ext_skip_non_mandatories(zbf, 0x01);
if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_Z) == true) {
wyfo marked this conversation as resolved.
Show resolved Hide resolved
ret |= _z_msg_ext_decode_iter(zbf, _z_init_decode_ext, msg);
}

return ret;
Expand Down
50 changes: 46 additions & 4 deletions src/protocol/definitions/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ _z_transport_message_t _z_t_msg_make_join(z_whatami_t whatami, _z_zint_t lease,
msg._body._join._batch_size = Z_BATCH_MULTICAST_SIZE;
msg._body._join._next_sn = next_sn;
msg._body._join._zid = zid;
#if Z_FEATURE_FRAGMENTATION == 1
msg._body._join._patch = _Z_CURRENT_PATCH;
#endif

if ((lease % 1000) == 0) {
_Z_SET_FLAG(msg._header, _Z_FLAG_T_JOIN_T);
Expand All @@ -112,7 +115,12 @@ _z_transport_message_t _z_t_msg_make_join(z_whatami_t whatami, _z_zint_t lease,
_Z_SET_FLAG(msg._header, _Z_FLAG_T_JOIN_S);
}

if (next_sn._is_qos) {
#if Z_FEATURE_FRAGMENTATION == 1
bool has_patch = msg._body._join._patch != _Z_NO_PATCH;
#else
bool has_patch = false;
#endif
if (next_sn._is_qos == true || has_patch == true) {
wyfo marked this conversation as resolved.
Show resolved Hide resolved
_Z_SET_FLAG(msg._header, _Z_FLAG_T_Z);
}

Expand All @@ -131,13 +139,25 @@ _z_transport_message_t _z_t_msg_make_init_syn(z_whatami_t whatami, _z_id_t zid)
msg._body._init._req_id_res = Z_REQ_RESOLUTION;
msg._body._init._batch_size = Z_BATCH_UNICAST_SIZE;
_z_slice_reset(&msg._body._init._cookie);
#if Z_FEATURE_FRAGMENTATION == 1
msg._body._init._patch = _Z_CURRENT_PATCH;
#endif

if ((msg._body._init._batch_size != _Z_DEFAULT_UNICAST_BATCH_SIZE) ||
(msg._body._init._seq_num_res != _Z_DEFAULT_RESOLUTION_SIZE) ||
(msg._body._init._req_id_res != _Z_DEFAULT_RESOLUTION_SIZE)) {
_Z_SET_FLAG(msg._header, _Z_FLAG_T_INIT_S);
}

#if Z_FEATURE_FRAGMENTATION == 1
bool has_patch = msg._body._join._patch != _Z_NO_PATCH;
#else
bool has_patch = false;
#endif
if (has_patch == true) {
wyfo marked this conversation as resolved.
Show resolved Hide resolved
_Z_SET_FLAG(msg._header, _Z_FLAG_T_Z);
}

return msg;
}

Expand All @@ -153,13 +173,25 @@ _z_transport_message_t _z_t_msg_make_init_ack(z_whatami_t whatami, _z_id_t zid,
msg._body._init._req_id_res = Z_REQ_RESOLUTION;
msg._body._init._batch_size = Z_BATCH_UNICAST_SIZE;
msg._body._init._cookie = cookie;
#if Z_FEATURE_FRAGMENTATION == 1
msg._body._init._patch = _Z_CURRENT_PATCH;
#endif

if ((msg._body._init._batch_size != _Z_DEFAULT_UNICAST_BATCH_SIZE) ||
(msg._body._init._seq_num_res != _Z_DEFAULT_RESOLUTION_SIZE) ||
(msg._body._init._req_id_res != _Z_DEFAULT_RESOLUTION_SIZE)) {
_Z_SET_FLAG(msg._header, _Z_FLAG_T_INIT_S);
}

#if Z_FEATURE_FRAGMENTATION == 1
bool has_patch = msg._body._init._patch != _Z_NO_PATCH;
#else
bool has_patch = false;
#endif
if (has_patch == true) {
_Z_SET_FLAG(msg._header, _Z_FLAG_T_Z);
}

wyfo marked this conversation as resolved.
Show resolved Hide resolved
return msg;
}

Expand Down Expand Up @@ -247,11 +279,12 @@ _z_transport_message_t _z_t_msg_make_frame_header(_z_zint_t sn, z_reliability_t
}

/*------------------ Fragment Message ------------------*/
_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last) {
return _z_t_msg_make_fragment(sn, _z_slice_empty(), reliability, is_last);
_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, z_reliability_t reliability, bool is_last,
bool start, bool stop) {
return _z_t_msg_make_fragment(sn, _z_slice_empty(), reliability, is_last, start, stop);
}
_z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t payload, z_reliability_t reliability,
bool is_last) {
bool is_last, bool start, bool stop) {
_z_transport_message_t msg;
msg._header = _Z_MID_T_FRAGMENT;
if (is_last == false) {
Expand All @@ -263,12 +296,20 @@ _z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_slice_t payload,

msg._body._fragment._sn = sn;
msg._body._fragment._payload = payload;
if (start == true || stop == true) {
wyfo marked this conversation as resolved.
Show resolved Hide resolved
_Z_SET_FLAG(msg._header, _Z_FLAG_T_Z);
}
msg._body._fragment.start = start;
msg._body._fragment.stop = stop;

return msg;
}

void _z_t_msg_copy_fragment(_z_t_msg_fragment_t *clone, _z_t_msg_fragment_t *msg) {
clone->_payload = msg->_payload;
_z_slice_copy(&clone->_payload, &msg->_payload);
clone->start = msg->start;
clone->stop = msg->stop;
}

void _z_t_msg_copy_join(_z_t_msg_join_t *clone, _z_t_msg_join_t *msg) {
Expand All @@ -279,6 +320,7 @@ void _z_t_msg_copy_join(_z_t_msg_join_t *clone, _z_t_msg_join_t *msg) {
clone->_req_id_res = msg->_req_id_res;
clone->_batch_size = msg->_batch_size;
clone->_next_sn = msg->_next_sn;
clone->_patch = msg->_patch;
memcpy(clone->_zid.id, msg->_zid.id, 16);
}

Expand Down
5 changes: 3 additions & 2 deletions src/transport/common/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ z_result_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t
return ret;
}

z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn) {
z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn,
bool start) {
z_result_t ret = _Z_RES_OK;

// Assume first that this is not the final fragment
Expand All @@ -144,7 +145,7 @@ z_result_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z
size_t w_pos = _z_wbuf_get_wpos(dst); // Mark the buffer for the writing operation

_z_transport_message_t f_hdr =
_z_t_msg_make_fragment_header(sn, reliability == Z_RELIABILITY_RELIABLE, is_final);
_z_t_msg_make_fragment_header(sn, reliability == Z_RELIABILITY_RELIABLE, is_final, start, false);
ret = _z_transport_message_encode(dst, &f_hdr); // Encode the frame header
if (ret == _Z_RES_OK) {
size_t space_left = _z_wbuf_space_left(dst);
Expand Down
Loading
Loading