From 27b31715d88dbc2ba5c2658fa6703e34f377f405 Mon Sep 17 00:00:00 2001 From: Daniel Adam Date: Mon, 28 Aug 2023 10:32:51 +0200 Subject: [PATCH] TODO --- messaging/coap/observe.c | 128 +++++++++++++++++++++++---------------- 1 file changed, 75 insertions(+), 53 deletions(-) diff --git a/messaging/coap/observe.c b/messaging/coap/observe.c index def33adb21..41c2d096f4 100644 --- a/messaging/coap/observe.c +++ b/messaging/coap/observe.c @@ -1262,6 +1262,43 @@ notify_batch_observer(coap_observer_t *obs, oc_response_t *response) return coap_send_notification_internal(ctx); } +static bool +observe_batch_set_response_buffer(batch_observer_t *batch_obs, + oc_response_buffer_t *response_buffer) +{ + coap_observer_t *obs = batch_obs->obs; + oc_rep_start_links_array(); + int size_before = oc_rep_get_encoded_payload_size(); + create_batch_for_batch_observer(&links_array, batch_obs, &obs->endpoint); + batch_observer_t *bnext = batch_obs->next; + while (bnext != NULL) { + batch_observer_t *next = bnext->next; + if (bnext->obs == obs) { + // TODO: does this override the payload or append to the payload? + create_batch_for_batch_observer(&links_array, bnext, &obs->endpoint); + oc_list_remove(g_batch_observers_list, bnext); + free_batch_observer(bnext); + } + bnext = next; + } + int size_after = oc_rep_get_encoded_payload_size(); + if (size_before == size_after) { + COAP_DBG("drop observations"); + return false; + } + oc_rep_end_links_array(); + size_after = oc_rep_get_encoded_payload_size(); + if (size_after < 0) { + COAP_ERR("invalid size after batch serialization"); + return false; + } + COAP_DBG("sending data with size %d", size_after); + response_buffer->content_format = APPLICATION_VND_OCF_CBOR; + response_buffer->response_length = size_after; + response_buffer->code = oc_status_code(OC_STATUS_OK); + return true; +} + static void process_batch_observers(void) { @@ -1270,14 +1307,14 @@ process_batch_observers(void) if (batch_obs == NULL) { return; } -#ifndef OC_DYNAMIC_ALLOCATION - uint8_t buffer[OC_MIN_OBSERVE_SIZE]; -#else /* !OC_DYNAMIC_ALLOCATION */ +#ifdef OC_DYNAMIC_ALLOCATION uint8_t *buffer = malloc(OC_MIN_OBSERVE_SIZE); if (buffer == NULL) { COAP_WRN("out of memory allocating buffer"); return; } +#else /* !OC_DYNAMIC_ALLOCATION */ + uint8_t buffer[OC_MIN_OBSERVE_SIZE]; #endif /* OC_DYNAMIC_ALLOCATION */ oc_response_buffer_t response_buffer; memset(&response_buffer, 0, sizeof(response_buffer)); @@ -1285,24 +1322,31 @@ process_batch_observers(void) batch_observer_get_resource_uri(batch_obs)); response_buffer.buffer = buffer; response_buffer.buffer_size = OC_MIN_OBSERVE_SIZE; + + // TODO: cache of etags for batch interface to avoid repeated calculation? + // struct { + // const oc_resource_t* resource; + // const oc_endpoint_t* endpoint; + // uint64_t etag; + //} + // oc_list_t etags; + while (batch_obs != NULL) { - if (!batch_obs->resource && - !oc_string_len(batch_obs->removed_resource_uri)) { - COAP_WRN("resource is NULL and removed_resource_uri is empty"); - oc_list_remove(g_batch_observers_list, batch_obs); - free_batch_observer(batch_obs); - batch_obs = (batch_observer_t *)oc_list_head(g_batch_observers_list); - continue; - } - coap_observer_t *obs = batch_obs->obs; #ifdef OC_BLOCK_WISE + coap_observer_t *obs = batch_obs->obs; + // obs->iface_mask is always OC_IF_B oc_string_view_t query = oc_query_encode_interface(obs->iface_mask); + // obs->resource is always the discovery resource const oc_blockwise_state_t *response_state = oc_blockwise_find_response_buffer(oc_string(obs->resource->uri) + 1, oc_string_len(obs->resource->uri) - 1, &obs->endpoint, OC_GET, query.data, query.length, OC_BLOCKWISE_SERVER); if (response_state != NULL) { + COAP_DBG( + "response_state is not NULL, sending of batch response currently " + "in progress for endpoint, skipping to next observer"); + // TODO: reschedule? batch_obs = batch_obs->next; continue; } @@ -1313,38 +1357,11 @@ process_batch_observers(void) #else /* OC_DYNAMIC_ALLOCATION */ oc_rep_new_v1(response_buffer.buffer, response_buffer.buffer_size); #endif /* !OC_DYNAMIC_ALLOCATION */ - oc_rep_start_links_array(); - int size_before = oc_rep_get_encoded_payload_size(); - batch_observer_t *o = batch_obs->next; - create_batch_for_batch_observer(&links_array, batch_obs, &obs->endpoint); - while (o != NULL) { - batch_observer_t *next = o->next; - if (o->obs == obs) { - create_batch_for_batch_observer(&links_array, o, &obs->endpoint); - oc_list_remove(g_batch_observers_list, o); - free_batch_observer(o); - } - o = next; - } - int size_after = oc_rep_get_encoded_payload_size(); - if (size_before == size_after) { - COAP_DBG("drop observations"); - } else { - oc_rep_end_links_array(); - size_after = oc_rep_get_encoded_payload_size(); - if (size_after < 0) { - COAP_ERR("invalid size after batch serialization"); - } else { - COAP_DBG("sending data with size %d", size_after); - response_buffer.content_format = APPLICATION_VND_OCF_CBOR; - response_buffer.response_length = size_after; - response_buffer.code = oc_status_code(OC_STATUS_OK); - oc_response_t response; - memset(&response, 0, sizeof(response)); - response.response_buffer = &response_buffer; - if (notify_batch_observer(obs, &response) < 0) { - goto leave_notify_observers; - } + if (observe_batch_set_response_buffer(batch_obs, &response_buffer)) { + oc_response_t response = { 0 }; + response.response_buffer = &response_buffer; + if (notify_batch_observer(obs, &response) < 0) { + goto leave_notify_observers; } } #ifdef OC_DYNAMIC_ALLOCATION @@ -1357,7 +1374,7 @@ process_batch_observers(void) leave_notify_observers: #ifdef OC_DYNAMIC_ALLOCATION buffer = response_buffer.buffer; - if (buffer) { + if (buffer != NULL) { free(buffer); } #endif /* OC_DYNAMIC_ALLOCATION */ @@ -1411,8 +1428,14 @@ add_notification_batch_observers_list(oc_resource_t *resource, bool removed) if (obs->resource != discover_resource || obs->iface_mask != OC_IF_B) { continue; } + if (removed && (oc_string_len(resource->uri) == 0)) { + COAP_WRN("resource has no URI"); + continue; + } #ifdef OC_SECURITY if (!oc_sec_check_acl(OC_GET, resource, &obs->endpoint)) { + COAP_DBG("resource %s not authorized for endpoint", + oc_string(resource->uri)); continue; } #endif /* OC_SECURITY */ @@ -1434,16 +1457,15 @@ add_notification_batch_observers_list(oc_resource_t *resource, bool removed) COAP_ERR("cannot allocate batch observer for resource (%s)", oc_string(resource->uri)); return; + } + o->obs = obs; + if (removed) { + oc_new_string(&o->removed_resource_uri, oc_string(resource->uri), + oc_string_len(resource->uri)); } else { - o->obs = obs; - if (removed) { - oc_new_string(&o->removed_resource_uri, oc_string(resource->uri), - oc_string_len(resource->uri)); - } else { - o->resource = resource; - } - oc_list_add(g_batch_observers_list, o); + o->resource = resource; } + oc_list_add(g_batch_observers_list, o); } dispatch_process_batch_observers(); }