Skip to content

Commit

Permalink
TODO
Browse files Browse the repository at this point in the history
  • Loading branch information
Danielius1922 committed Aug 28, 2023
1 parent 4d44ef7 commit 27b3171
Showing 1 changed file with 75 additions and 53 deletions.
128 changes: 75 additions & 53 deletions messaging/coap/observe.c
Original file line number Diff line number Diff line change
Expand Up @@ -1262,6 +1262,43 @@ notify_batch_observer(coap_observer_t *obs, oc_response_t *response)
return coap_send_notification_internal(ctx);
}

static bool
observe_batch_set_response_buffer(batch_observer_t *batch_obs,
oc_response_buffer_t *response_buffer)
{
coap_observer_t *obs = batch_obs->obs;
oc_rep_start_links_array();
int size_before = oc_rep_get_encoded_payload_size();
create_batch_for_batch_observer(&links_array, batch_obs, &obs->endpoint);
batch_observer_t *bnext = batch_obs->next;
while (bnext != NULL) {
batch_observer_t *next = bnext->next;
if (bnext->obs == obs) {
// TODO: does this override the payload or append to the payload?
create_batch_for_batch_observer(&links_array, bnext, &obs->endpoint);
oc_list_remove(g_batch_observers_list, bnext);
free_batch_observer(bnext);
}
bnext = next;
}
int size_after = oc_rep_get_encoded_payload_size();
if (size_before == size_after) {
COAP_DBG("drop observations");
return false;
}
oc_rep_end_links_array();
size_after = oc_rep_get_encoded_payload_size();
if (size_after < 0) {
COAP_ERR("invalid size after batch serialization");
return false;
}
COAP_DBG("sending data with size %d", size_after);
response_buffer->content_format = APPLICATION_VND_OCF_CBOR;
response_buffer->response_length = size_after;
response_buffer->code = oc_status_code(OC_STATUS_OK);
return true;
}

static void
process_batch_observers(void)
{
Expand All @@ -1270,39 +1307,46 @@ process_batch_observers(void)
if (batch_obs == NULL) {
return;
}
#ifndef OC_DYNAMIC_ALLOCATION
uint8_t buffer[OC_MIN_OBSERVE_SIZE];
#else /* !OC_DYNAMIC_ALLOCATION */
#ifdef OC_DYNAMIC_ALLOCATION
uint8_t *buffer = malloc(OC_MIN_OBSERVE_SIZE);
if (buffer == NULL) {
COAP_WRN("out of memory allocating buffer");
return;
}
#else /* !OC_DYNAMIC_ALLOCATION */
uint8_t buffer[OC_MIN_OBSERVE_SIZE];
#endif /* OC_DYNAMIC_ALLOCATION */
oc_response_buffer_t response_buffer;
memset(&response_buffer, 0, sizeof(response_buffer));
COAP_DBG("Issue GET request to discovery resource for %s resource",
batch_observer_get_resource_uri(batch_obs));
response_buffer.buffer = buffer;
response_buffer.buffer_size = OC_MIN_OBSERVE_SIZE;

// TODO: cache of etags for batch interface to avoid repeated calculation?
// struct {
// const oc_resource_t* resource;
// const oc_endpoint_t* endpoint;
// uint64_t etag;
//}
// oc_list_t etags;

while (batch_obs != NULL) {
if (!batch_obs->resource &&
!oc_string_len(batch_obs->removed_resource_uri)) {
COAP_WRN("resource is NULL and removed_resource_uri is empty");
oc_list_remove(g_batch_observers_list, batch_obs);
free_batch_observer(batch_obs);
batch_obs = (batch_observer_t *)oc_list_head(g_batch_observers_list);
continue;
}
coap_observer_t *obs = batch_obs->obs;
#ifdef OC_BLOCK_WISE
coap_observer_t *obs = batch_obs->obs;
// obs->iface_mask is always OC_IF_B
oc_string_view_t query = oc_query_encode_interface(obs->iface_mask);
// obs->resource is always the discovery resource
const oc_blockwise_state_t *response_state =
oc_blockwise_find_response_buffer(oc_string(obs->resource->uri) + 1,
oc_string_len(obs->resource->uri) - 1,
&obs->endpoint, OC_GET, query.data,
query.length, OC_BLOCKWISE_SERVER);
if (response_state != NULL) {
COAP_DBG(
"response_state is not NULL, sending of batch response currently "
"in progress for endpoint, skipping to next observer");
// TODO: reschedule?
batch_obs = batch_obs->next;
continue;
}
Expand All @@ -1313,38 +1357,11 @@ process_batch_observers(void)
#else /* OC_DYNAMIC_ALLOCATION */
oc_rep_new_v1(response_buffer.buffer, response_buffer.buffer_size);
#endif /* !OC_DYNAMIC_ALLOCATION */
oc_rep_start_links_array();
int size_before = oc_rep_get_encoded_payload_size();
batch_observer_t *o = batch_obs->next;
create_batch_for_batch_observer(&links_array, batch_obs, &obs->endpoint);
while (o != NULL) {
batch_observer_t *next = o->next;
if (o->obs == obs) {
create_batch_for_batch_observer(&links_array, o, &obs->endpoint);
oc_list_remove(g_batch_observers_list, o);
free_batch_observer(o);
}
o = next;
}
int size_after = oc_rep_get_encoded_payload_size();
if (size_before == size_after) {
COAP_DBG("drop observations");
} else {
oc_rep_end_links_array();
size_after = oc_rep_get_encoded_payload_size();
if (size_after < 0) {
COAP_ERR("invalid size after batch serialization");
} else {
COAP_DBG("sending data with size %d", size_after);
response_buffer.content_format = APPLICATION_VND_OCF_CBOR;
response_buffer.response_length = size_after;
response_buffer.code = oc_status_code(OC_STATUS_OK);
oc_response_t response;
memset(&response, 0, sizeof(response));
response.response_buffer = &response_buffer;
if (notify_batch_observer(obs, &response) < 0) {
goto leave_notify_observers;
}
if (observe_batch_set_response_buffer(batch_obs, &response_buffer)) {
oc_response_t response = { 0 };
response.response_buffer = &response_buffer;
if (notify_batch_observer(obs, &response) < 0) {
goto leave_notify_observers;
}
}
#ifdef OC_DYNAMIC_ALLOCATION
Expand All @@ -1357,7 +1374,7 @@ process_batch_observers(void)
leave_notify_observers:
#ifdef OC_DYNAMIC_ALLOCATION
buffer = response_buffer.buffer;
if (buffer) {
if (buffer != NULL) {
free(buffer);
}
#endif /* OC_DYNAMIC_ALLOCATION */
Expand Down Expand Up @@ -1411,8 +1428,14 @@ add_notification_batch_observers_list(oc_resource_t *resource, bool removed)
if (obs->resource != discover_resource || obs->iface_mask != OC_IF_B) {
continue;
}
if (removed && (oc_string_len(resource->uri) == 0)) {
COAP_WRN("resource has no URI");
continue;
}
#ifdef OC_SECURITY
if (!oc_sec_check_acl(OC_GET, resource, &obs->endpoint)) {
COAP_DBG("resource %s not authorized for endpoint",
oc_string(resource->uri));
continue;
}
#endif /* OC_SECURITY */
Expand All @@ -1434,16 +1457,15 @@ add_notification_batch_observers_list(oc_resource_t *resource, bool removed)
COAP_ERR("cannot allocate batch observer for resource (%s)",
oc_string(resource->uri));
return;
}
o->obs = obs;
if (removed) {
oc_new_string(&o->removed_resource_uri, oc_string(resource->uri),
oc_string_len(resource->uri));
} else {
o->obs = obs;
if (removed) {
oc_new_string(&o->removed_resource_uri, oc_string(resource->uri),
oc_string_len(resource->uri));
} else {
o->resource = resource;
}
oc_list_add(g_batch_observers_list, o);
o->resource = resource;
}
oc_list_add(g_batch_observers_list, o);
}
dispatch_process_batch_observers();
}
Expand Down

0 comments on commit 27b3171

Please sign in to comment.