Skip to content

Commit

Permalink
Added Windows port of message queue limit.
Browse files Browse the repository at this point in the history
  • Loading branch information
sp-milos committed Jun 10, 2024
1 parent 0b456a5 commit 3d6a364
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 65 deletions.
184 changes: 119 additions & 65 deletions port/windows/ipadapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
#include <malloc.h>
#endif /* OC_DYNAMIC_ALLOCATION */

#define MAX_EVENT_COUNT 8

#define OCF_PORT_UNSECURED (5683)
static const uint8_t ALL_OCF_NODES_LL[] = {
0xff, 0x02, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x01, 0x58
Expand Down Expand Up @@ -723,80 +725,114 @@ network_event_thread(void *data)
#endif /* OC_IPV4 */
#undef OC_WSAEVENTSELECT

DWORD events_list_size = 0;
WSAEVENT events_list[7];
DWORD IFCHANGE = 0;
WSAEVENT event_list[MAX_EVENT_COUNT];
DWORD event_list_size = 0;

// Add control flow events.
if (dev->device == 0) {
events_list[0] = ifchange_event.hEvent;
events_list_size++;
event_list[event_list_size++] = ifchange_event.hEvent;
process_interface_change_event();
}
DWORD MCAST6 = events_list_size;
events_list[events_list_size] = mcast6_event;
events_list_size++;
DWORD SERVER6 = events_list_size;
events_list[events_list_size] = server6_event;
events_list_size++;
#if defined(OC_SECURITY)
DWORD SECURE6 = events_list_size;
events_list[events_list_size] = secure6_event;
events_list_size++;
#if defined(OC_IPV4)
DWORD MCAST4 = events_list_size;
events_list[events_list_size] = mcast4_event;
events_list_size++;
DWORD SERVER4 = events_list_size;
events_list[events_list_size] = server4_event;
events_list_size++;
DWORD SECURE4 = events_list_size;
events_list[events_list_size] = secure4_event;
events_list_size++;
#endif /* OC_IPV4 */
#elif defined(OC_IPV4) /* OC_SECURITY */
DWORD MCAST4 = events_list_size;
events_list[events_list_size] = mcast4_event;
events_list_size++;
DWORD SERVER4 = events_list_size;
events_list[events_list_size] = server4_event;
events_list_size++;
#endif /* !OC_SECURITY */

DWORD i, index;
#ifdef OC_DYNAMIC_ALLOCATION
event_list[event_list_size++] = dev->wake_up_event;
#endif // OC_DYNAMIC_ALLOCATION
DWORD control_flow_event_count = event_list_size;

// Add remaining message events.
event_list[event_list_size++] = mcast6_event;
event_list[event_list_size++] = server6_event;
#ifdef OC_SECURITY
event_list[event_list_size++] = secure6_event;
#ifdef OC_IPV4
event_list[event_list_size++] = mcast4_event;
event_list[event_list_size++] = server4_event;
event_list[event_list_size++] = secure4_event;
#endif /* OC_IPV4 */
#else /* OC_SECURITY */
event_list[event_list_size++] = mcast4_event;
event_list[event_list_size++] = server4_event;
#endif /* !OC_SECURITY */

while (!dev->terminate) {
index = WSAWaitForMultipleEvents(events_list_size, events_list, FALSE,
INFINITE, FALSE);
index -= WSA_WAIT_EVENT_0;
for (i = index; !dev->terminate && i < events_list_size; i++) {
index = WSAWaitForMultipleEvents(1, &events_list[i], TRUE, 0, FALSE);
if (index != WSA_WAIT_TIMEOUT && index != WSA_WAIT_FAILED) {
if (WSAResetEvent(events_list[i]) == FALSE) {
DWORD event_count = event_list_size;

#ifdef OC_DYNAMIC_ALLOCATION
DWORD queue_length = (DWORD) oc_network_get_event_queue_length(dev->device);
if (queue_length >= OC_DEVICE_MAX_NUM_CONCURRENT_REQUESTS) {
// Network event queue is full, wait only for control flow events.
event_count = control_flow_event_count;
}
#endif /* OC_DYNAMIC_ALLOCATION */

DWORD i = WSAWaitForMultipleEvents(event_count, event_list, FALSE, INFINITE, FALSE);
i -= WSA_WAIT_EVENT_0;

// Process control flow events.
for (; i < control_flow_event_count; i++) {

WSAEVENT cf_event = event_list[i];
DWORD ret = WSAWaitForMultipleEvents(1, &cf_event, TRUE, 0, FALSE);

if (ret != WSA_WAIT_TIMEOUT && ret != WSA_WAIT_FAILED) {
if (WSAResetEvent(cf_event) == FALSE) {
OC_WRN("WSAResetEvent returned error: %d", WSAGetLastError());
}
if (dev->device == 0 && i == IFCHANGE) {
if (dev->device == 0 && cf_event == ifchange_event.hEvent) {
process_interface_change_event();
DWORD bytes_returned = 0;
if (WSAIoctl(ifchange_sock, SIO_ADDRESS_LIST_CHANGE, NULL, 0, NULL, 0,
&bytes_returned, &ifchange_event,
NULL) == SOCKET_ERROR) {
DWORD err = GetLastError();
if (err != ERROR_IO_PENDING) {
OC_ERR("could not reset SIO_ADDRESS_LIST_CHANGE on network "
"interface change socket");
if (GetLastError() != ERROR_IO_PENDING) {
OC_ERR("could not reset SIO_ADDRESS_LIST_CHANGE on network interface change socket");
}
}
continue;
}
}
}

oc_message_t *message = oc_allocate_message();
// Collect signalled message events.
WSAEVENT msg_events[MAX_EVENT_COUNT];
DWORD msg_event_count = 0;

for (; i < event_count; i++) {
DWORD ret = WSAWaitForMultipleEvents(1, event_list + i, TRUE, 0, FALSE);
if (ret != WSA_WAIT_TIMEOUT && ret != WSA_WAIT_FAILED) {
msg_events[msg_event_count++] = event_list[i];
}
}

#ifdef OC_DYNAMIC_ALLOCATION
// Randomly remove events from list if the queue is about to fill.
DWORD available_items = OC_DEVICE_MAX_NUM_CONCURRENT_REQUESTS - queue_length;
DWORD items_to_remove = msg_event_count - min(msg_event_count, available_items);

while (items_to_remove--) {
DWORD rnd = (DWORD) rand() % msg_event_count;
msg_event_count--;
msg_events[rnd] = msg_events[msg_event_count];
}
#endif /* OC_DYNAMIC_ALLOCATION */

if (!message) {
// Process message events.
for (i = 0; !dev->terminate && i < msg_event_count; i++) {

WSAEVENT msg_event = msg_events[i];
DWORD ret = WSAWaitForMultipleEvents(1, &msg_event, TRUE, 0, FALSE);

if (ret != WSA_WAIT_TIMEOUT && ret != WSA_WAIT_FAILED) {
if (WSAResetEvent(msg_event) == FALSE) {
OC_WRN("WSAResetEvent returned error: %d", WSAGetLastError());
}

oc_message_t *message = oc_allocate_message();
if (message == NULL) {
break;
}

message->endpoint.device = dev->device;

if (i == SERVER6) {
if (msg_event == server6_event) {
int count = recv_msg(dev->server_sock, message->data, OC_PDU_SIZE,
&message->endpoint, false);
if (count < 0) {
Expand All @@ -805,10 +841,9 @@ network_event_thread(void *data)
}
message->length = count;
message->endpoint.flags = IPV6;
goto common;
}

if (i == MCAST6) {
if (msg_event == mcast6_event) {
int count = recv_msg(dev->mcast_sock, message->data, OC_PDU_SIZE,
&message->endpoint, true);
if (count < 0) {
Expand All @@ -817,11 +852,10 @@ network_event_thread(void *data)
}
message->length = count;
message->endpoint.flags = IPV6 | MULTICAST;
goto common;
}

#ifdef OC_IPV4
if (i == SERVER4) {
if (msg_event == server4_event) {
int count = recv_msg(dev->server4_sock, message->data, OC_PDU_SIZE,
&message->endpoint, false);
if (count < 0) {
Expand All @@ -830,10 +864,9 @@ network_event_thread(void *data)
}
message->length = count;
message->endpoint.flags = IPV4;
goto common;
}

if (i == MCAST4) {
if (msg_event == mcast4_event) {
int count = recv_msg(dev->mcast4_sock, message->data, OC_PDU_SIZE,
&message->endpoint, true);
if (count < 0) {
Expand All @@ -842,12 +875,11 @@ network_event_thread(void *data)
}
message->length = count;
message->endpoint.flags = IPV4 | MULTICAST;
goto common;
}
#endif /* OC_IPV4 */

#ifdef OC_SECURITY
if (i == SECURE6) {
if (msg_event == secure6_event) {
int count = recv_msg(dev->secure_sock, message->data, OC_PDU_SIZE,
&message->endpoint, false);
if (count < 0) {
Expand All @@ -857,10 +889,9 @@ network_event_thread(void *data)
message->length = count;
message->endpoint.flags = IPV6 | SECURED;
message->encrypted = 1;
goto common;
}
#ifdef OC_IPV4
if (i == SECURE4) {
if (msg_event == secure4_event) {
int count = recv_msg(dev->secure4_sock, message->data, OC_PDU_SIZE,
&message->endpoint, false);
if (count < 0) {
Expand All @@ -873,7 +904,7 @@ network_event_thread(void *data)
}
#endif /* OC_IPV4 */
#endif /* OC_SECURITY */
common:

OC_DBG("Incoming message of size %zd bytes from", message->length);
OC_LOGipaddr(message->endpoint);
OC_DBG("%s", "");
Expand All @@ -882,8 +913,8 @@ network_event_thread(void *data)
}
}

for (i = 0; i < events_list_size; ++i) {
WSACloseEvent(events_list[i]);
for (DWORD i = 0; i < event_list_size; ++i) {
WSACloseEvent(event_list[i]);
}

return 0;
Expand Down Expand Up @@ -1434,6 +1465,15 @@ oc_connectivity_init(size_t device, oc_connectivity_ports_t ports)
oc_abort("Insufficient memory");
}

#ifdef OC_DYNAMIC_ALLOCATION
dev->wake_up_event = WSACreateEvent();
if (dev->wake_up_event == WSA_INVALID_EVENT) {
OC_ERR("Creating wake up event for network event thread");
oc_memb_free(&g_ip_context_s, dev);
return -1;
}
#endif /* OC_DYNAMIC_ALLOCATION */

dev->device = device;
OC_LIST_STRUCT_INIT(dev, eps);
memset(&dev->mcast, 0, sizeof(dev->mcast));
Expand Down Expand Up @@ -1636,6 +1676,20 @@ oc_connectivity_init(size_t device, oc_connectivity_ports_t ports)
return 0;
}

void
oc_connectivity_wakeup(size_t device)
{
ip_context_t *dev = get_ip_context_for_device(device);
if (dev == NULL) {
OC_WRN("no ip-context found for device(%zu)", device);
return;
}

if (WSASetEvent(dev->wake_up_event) == FALSE) {
OC_WRN("cannot wake up network event thread (error: %d)", WSAGetLastError());
}
}

void
oc_connectivity_shutdown(size_t device)
{
Expand Down
3 changes: 3 additions & 0 deletions port/windows/ipcontext.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ typedef struct ip_context_t
BOOL terminate;
size_t device;
OC_ATOMIC_INT8_T flags;
#ifdef OC_DYNAMIC_ALLOCATION
WSAEVENT wake_up_event;
#endif /* OC_DYNAMIC_ALLOCATION */
} ip_context_t;

#ifdef __cplusplus
Expand Down
5 changes: 5 additions & 0 deletions port/windows/oc_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ typedef uint64_t oc_clock_time_t;
/* Enable reallocation during encoding the representation to cbor */
// #define OC_REP_ENCODING_REALLOC

/* Maximum number of messages in the network event queue for a device */
#ifndef OC_DEVICE_MAX_NUM_CONCURRENT_REQUESTS
#define OC_DEVICE_MAX_NUM_CONCURRENT_REQUESTS (32)
#endif /* OC_DEVICE_MAX_NUM_CONCURRENT_REQUESTS */

/* Maximum size of uri for a collection resource */
// #define OC_MAX_COLLECTIONS_INSTANCE_URI_SIZE (64)

Expand Down

0 comments on commit 3d6a364

Please sign in to comment.