Skip to content

Commit

Permalink
Broker supports add/remove link, where links are valid module handles.
Browse files Browse the repository at this point in the history
  • Loading branch information
darobs committed Aug 13, 2016
1 parent 90c4295 commit 4ada508
Show file tree
Hide file tree
Showing 14 changed files with 1,543 additions and 469 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

# Features

Added routing module to module in gateway JSON configuration.
28 changes: 15 additions & 13 deletions bindings/nodejs/src/nodejs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,21 @@

#define DESTROY_WAIT_TIME_IN_SECS (5)

#define NODE_LOAD_SCRIPT(ss, main_path, module_id) ss << \
"(function() {" \
" try {" \
" var path = require('path');" \
" return gatewayHost.registerModule(" \
" require(path.resolve('" << (main_path) << "')), " << \
(module_id) << \
" ); " \
" } " \
" catch(err) { " \
" console.error(`ERROR: ${err.toString()}`);" \
" return false;" \
" }" \
#define NODE_LOAD_SCRIPT(ss, main_path, module_id) ss << \
"(function() {" \
" try {" \
" var path = require('path');" \
" var main_path = path.resolve('" << (main_path) << "');" << \
" delete require.cache[main_path]; " << \
" return gatewayHost.registerModule(" \
" require(main_path), " << \
(module_id) << \
" ); " \
" } " \
" catch(err) { " \
" console.error(`ERROR: ${err.toString()}`);" \
" return false;" \
" }" \
"})();"

static void on_module_start(NODEJS_MODULE_HANDLE_DATA* handle_data);
Expand Down
147 changes: 84 additions & 63 deletions core/devdoc/broadcast_bus_requirements.md

Large diffs are not rendered by default.

76 changes: 53 additions & 23 deletions core/devdoc/broker_hld.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Message Broker
Message Broker (PubSub)
==============

High level design
Expand Down Expand Up @@ -65,31 +65,35 @@ typedef struct MODULE_INFO_TAG
### Attaching a Module to the Broker

When a new module is added to the broker a worker thread is created to receive messages for that module. The worker thread will wait on `receive_socket` and deliver messages to the module's receive callback function. If `quit_worker` is equal to `1` then the worker thread will quit and return.
When a new module is added to the broker a worker thread is created to receive messages for that module. The worker thread will wait on `receive_socket` and deliver messages to the module's receive callback function. If the buffer received is equal to the `quit_message_guid`, then the loop will terminate.

### Publishing A Message

Using a messaging system like nanomsg strongly encourages the broker to pass messages as serialized data, rather than passing messages as pointers or handles.

Nanomsg sockets are considered thread-safe, which means we can avoid locking during a publish unless we need access to critical module data.

In published messages, the topic is always the value of `source` as a `MODULE_HANDLE` type. This will be copied into the message in the platform-specific serialization of the type.

**Message publishing pseudo code**

```c
01: MESSAGE_HANDLE msg = Message_Clone(message)
02: unsigned char* serial_message = Message_ToByteArray( msg, &size)
03: void* nn_msg = nn_allocmsg(size, 0)
04: memcpy(nn_msg, serial_message, size)
05: int nbytes = nn_send(broker_data->publish_socket, nn_msg, NN_MSG, 0)
06: free(serial_message)
07: Message_Destroy(msg)
02: message_size = Message_ToByteArray(msg, NULL, 0)
03: buffer_size = message_size + sizeof(MODULE_HANDLE)
04: void* nn_msg = nn_allocmsg(buffer_size, 0)
05: memcpy (nn_msg, source, sizeof(MODULE_HANDLE))
06: Message_ToByteArray(msg, nn_msg+sizeof(MODULE_HANDLE), message_size)
07: int nbytes = nn_send(broker_data->publish_socket, nn_msg, NN_MSG, 0)
08: free(nn_msg)
09: Message_Destroy(msg)
```

The call to nn_alloc creates a buffer managed by nanomsg. THis allows for zero copy message passing as well as memory management inside nanomsg. This buffer will be destroyed after a successful call.
The call to `nn_allocmsg` creates a buffer managed by nanomsg. This allows for zero copy message passing as well as memory management inside nanomsg. This buffer will be destroyed after a successful call.

### Module Publish Worker
### Module Worker

The `module_publish_worker` function is passed in a pointer to the relevant `MODULE_INFO` object as it's thread context parameter. The function's job is to basically wait on the receive socket and process messages when received. Here's the pseudo-code implementation of what it does:
The `module_worker` function is passed in a pointer to the relevant `MODULE_INFO` object as it's thread context parameter. The function's job is to basically wait on the receive socket and process messages when received. Here's the pseudo-code implementation of what it does:

**Code Segment 2**
```c
Expand All @@ -107,18 +111,19 @@ The `module_publish_worker` function is passed in a pointer to the relevant `MOD
11: should_continue = false
12: }
13: else
14: {
15: MESSAGE_HANDLE msg = Message_CreateFromByteArray(buf, nbytes)
16: Deliver msg to module_info.module
17: Destroy msg
18: }
19: nn_freemsg(buf)
20: }
21: else
22: {
23: should_continue = false;
24: }
25: }
14: {
15: Strip off topic from received buffer.
16: MESSAGE_HANDLE msg = Message_CreateFromByteArray(buf, nbytes)
17: Deliver msg to module_info.module
18: Destroy msg
19: }
20: nn_freemsg(buf)
21: }
22: else
23: {
24: should_continue = false;
25: }
26: }
```
Why do we need the `socket_lock`? Helgrind and drd found a race condition between `nn_recv` and `nn_close` on the internal socket data. The socket lock prevents this race condition.
Expand All @@ -138,3 +143,28 @@ The following is pseudo-code for stopping the Module Publish Worker thread:
```

If for any reason the send fails, closing the socket will guarantee the next read will fail, and the thread will terminate.

### Routing

The broker will receive a series of links, each with a valid source module handle and a valid sink module handle. The link entry specifies that the source will publish a message expected to be consumed by the sink. Therefore, a sink will subscribe to a source.

As described above in *Publishing A Message*, the PubSub Broker will always publish the message using the `MODULE_HANDLE` as the topic. For each link pair sent to the Broker, the sink will subscribe to the source `MODULE_HANDLE`.

The following is pseudo-code for Broker_AddLink:
```c
01: Lock modules_lock
02: Locate module_info for sink module.
03: nn_setsockopt(sink->receive_socket, NN_SUB, NN_SUB_SUBSCRIBE, &source,sizeof(MODULE_HANDLE));
04: Unlock modules_lock
```

When removing the link, the Broker will unsubscribe to the source `MODULE_HANDLE`. The following is pseudo-code for Broker_RemoveLink:
```c
01: Lock modules_lock
02: Locate module_info for sink module.
03: nn_setsockopt(sink->receive_socket, NN_SUB, NN_SUB_UNSUBSCRIBE, &source,sizeof(MODULE_HANDLE));
04: Unlock modules_lock
```

The MODULE_HANDLE was chosen over the module name simply because the handle is a fixed size, making it quick and easy to strip off of the received buffer.

70 changes: 62 additions & 8 deletions core/devdoc/pubsub_bus_requirements.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ extern void Broker_DecRef(BROKER_HANDLE broker);
extern BROKER_RESULT Broker_Publish(BROKER_HANDLE broker, MODULE_HANDLE source, MESSAGE_HANDLE message);
extern BROKER_RESULT Broker_AddModule(BROKER_HANDLE broker, const MODULE* module);
extern BROKER_RESULT Broker_RemoveModule(BROKER_HANDLE broker, const MODULE* module);
extern BROKER_RESULT Broker_AddLink(BROKER_HANDLE broker, const LINK_DATA* link);
extern BROKER_RESULT Broker_RemoveLink(BROKER_HANDLE broker, const LINK_DATA* link);
extern void Broker_Destroy(BROKER_HANDLE broker);
```
Expand Down Expand Up @@ -109,7 +111,7 @@ typedef struct BROKER_HANDLE_DATA_TAG
}BROKER_HANDLE_DATA;
```

**SRS_BROKER_13_067: [** `Broker_Create` shall `malloc` a new instance of `BROKER_HANDLE_DATA` and return `NULL` if it fails. **]**
**SRS_BROKER_13_067: [** `Broker_Create` shall `malloc` a new instance of `BROKER_HANDLE_DATA`. **]**

**SRS_BROKER_13_007: [** `Broker_Create` shall initialize `BROKER_HANDLE_DATA::modules` with a valid `VECTOR_HANDLE`. **]**

Expand All @@ -134,28 +136,30 @@ Broker_Clone creates a clone of the message broker handle.
**SRS_BROKER_13_109: [** Otherwise, `Broker_IncRef` shall increment the internal ref count. **]**
## module_publish_worker
## module_worker
```C
static void module_publish_worker(void* user_data)
static void module_worker(void* user_data)
```

**SRS_BROKER_13_026: [** This function shall assign `user_data` to a local variable called `module_info` of type `BROKER_MODULEINFO*`. **]**

**SRS_BROKER_13_089: [** This function shall acquire the lock on `module_info->socket_lock`. **]**

**SRS_BROKER_02_004: [** If acquiring the lock fails, then module_publish_worker shall return. **]**
**SRS_BROKER_02_004: [** If acquiring the lock fails, then `module_worker` shall return. **]**

**SRS_BROKER_13_068: [** This function shall run a loop that keeps running until `module_info->quit_message_guid` is sent to the thread. **]**

**SRS_BROKER_13_091: [** The function shall unlock `module_info->socket_lock`. **]**

**SRS_BROKER_17_016: [** If releasing the lock fails, then module_publish_worker shall return. **]**
**SRS_BROKER_17_016: [** If releasing the lock fails, then `module_worker` shall return. **]**

**SRS_BROKER_17_005: [** For every iteration of the loop, the function shall wait on the `receive_socket` for messages. **]**

**SRS_BROKER_17_006: [** An error on receiving a message shall terminate the loop. **]**

**SRS_BROKER_17_024: [** The function shall strip off the topic from the message. **]**

**SRS_BROKER_17_017: [** The function shall deserialize the message received. **]**

**SRS_BROKER_17_018: [** If the deserialization is not successful, the message loop shall continue. **]**
Expand All @@ -178,15 +182,19 @@ BROKER_RESULT Broker_Publish(
);
```

**SRS_BROKER_13_030: [** If `broker` or `message` is `NULL` the function shall return `BROKER_INVALIDARG`. **]**
**SRS_BROKER_13_030: [** If `broker`, `source`, or `message` is `NULL` the function shall return `BROKER_INVALIDARG`. **]**

**SRS_BROKER_17_022: [** `Broker_Publish` shall Lock the modules lock. **]**

**SRS_BROKER_17_007: [** `Broker_Publish` shall clone the `message`. **]**

**SRS_BROKER_17_008: [** `Broker_Publish` shall serialize the `message`. **]**

**SRS_BROKER_17_009: [** `Broker_Publish` shall allocate a nanomsg buffer and copy the serialized message into it. **]**
**SRS_BROKER_17_025: [** `Broker_Publish` shall allocate a nanomsg buffer the size of the serialized message + `sizeof(MODULE_HANDLE)`. **]**

**SRS_BROKER_17_026: [** `Broker_Publish` shall copy `source` into the beginning of the nanomsg buffer. **]**

**SRS_BROKER_17_027: [** `Broker_Publish` shall serialize the `message` into the remainder of the nanomsg buffer. **]**

**SRS_BROKER_17_010: [** `Broker_Publish` shall send a message on the `publish_socket`. **]**

Expand Down Expand Up @@ -216,7 +224,9 @@ BROKER_RESULT Broker_AddModule(BROKER_HANDLE broker, const MODULE* module)
**SRS_BROKER_17_020: [** The function shall create a unique ID used as a quit signal. **]**
**SRS_BROKER_13_102: [** The function shall create a new thread for the module by calling `ThreadAPI_Create` using `module_publish_worker` as the thread callback and using the newly allocated `BROKER_MODULEINFO` object as the thread context. **]**
**SRS_BROKER_17_028: [** The function shall subscribe `BROKER_MODULEINFO::receive_socket` to the quit signal GUID. **]**
**SRS_BROKER_13_102: [** The function shall create a new thread for the module by calling `ThreadAPI_Create` using `module_worker` as the thread callback and using the newly allocated `BROKER_MODULEINFO` object as the thread context. **]**
**SRS_BROKER_13_039: [** This function shall acquire the lock on `BROKER_HANDLE_DATA::modules_lock`. **]**
Expand Down Expand Up @@ -263,6 +273,50 @@ BROKER_RESULT Broker_RemoveModule(BROKER_HANDLE broker, const MODULE* module)

**SRS_BROKER_13_053: [** This function shall return `BROKER_ERROR` if an underlying API call to the platform causes an error or `BROKER_OK` otherwise. **]**


## Broker_AddLink
```c
extern BROKER_RESULT Broker_AddLink(BROKER_HANDLE broker, const LINK_DATA* link);
```
Add a router link to the Broker.
**SRS_BROKER_17_029: [** If `broker`, `link`, `link->module_source_handle` or `link->module_sink_handle` are NULL, `Broker_AddLink` shall return `BROKER_INVALIDARG`. **]**
**SRS_BROKER_17_030: [** `Broker_AddLink` shall lock the `modules_lock`. **]**
**SRS_BROKER_17_031: [** `Broker_AddLink` shall find the `BROKER_HANDLE_DATA::module_info` for `link->module_sink_handle`. **]**
**SRS_BROKER_17_041: [** `Broker_AddLink` shall find the `BROKER_HANDLE_DATA::module_info` for `link->module_source_handle`. **]**
**SRS_BROKER_17_032: [** `Broker_AddLink` shall subscribe `module_info->receive_socket` to the `link->module_source_handle` module handle. **]**
**SRS_BROKER_17_033: [** `Broker_AddLink` shall unlock the `modules_lock`. **]**
**SRS_BROKER_17_034: [** Upon an error, `Broker_AddLink` shall return `BROKER_ADD_LINK_ERROR` **]**
## Broker_RemoveLink
```c
extern BROKER_RESULT Broker_RemoveLink(BROKER_HANDLE broker, const LINK_DATA* link);
```

Remove a router link from the Broker.

**SRS_BROKER_17_035: [** If `broker`, `link`, `link->module_source_handle` or `link->module_sink_handle` are NULL, `Broker_RemoveLink` shall return `BROKER_INVALIDARG`. **]**

**SRS_BROKER_17_036: [** `Broker_RemoveLink` shall lock the `modules_lock`. **]**

**SRS_BROKER_17_037: [** `Broker_RemoveLink` shall find the `module_info` for `link->module_sink_handle`. **]**

**SRS_BROKER_17_042: [** `Broker_RemoveLink` shall find the `module_info` for `link->module_source_handle`. **]**

**SRS_BROKER_17_038: [** `Broker_RemoveLink` shall unsubscribe `module_info->receive_socket` from the `link->module_source_handle` module handle. **]**

**SRS_BROKER_17_039: [** `Broker_RemoveLink` shall unlock the `modules_lock`. **]**

**SRS_BROKER_17_040: [** Upon an error, `Broker_RemoveLink` shall return `BROKER_REMOVE_LINK_ERROR`. **]**

## Broker_Destroy

```C
Expand Down
32 changes: 31 additions & 1 deletion core/inc/broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,16 @@ extern "C"
#include <stddef.h>
#endif

typedef struct BROKER_LINK_DATA_TAG {
MODULE_HANDLE module_source_handle;
MODULE_HANDLE module_sink_handle;
} BROKER_LINK_DATA;

#define BROKER_RESULT_VALUES \
BROKER_OK, \
BROKER_ERROR, \
BROKER_ADD_LINK_ERROR, \
BROKER_REMOVE_LINK_ERROR, \
BROKER_INVALIDARG

/** @brief Enumeration describing the result of ::Broker_Publish,
Expand Down Expand Up @@ -83,7 +89,7 @@ extern void Broker_DecRef(BROKER_HANDLE broker);
*/
extern BROKER_RESULT Broker_Publish(BROKER_HANDLE broker, MODULE_HANDLE source, MESSAGE_HANDLE message);

/** @brief Sends a message to the message broker.
/** @brief Adds a module to the message broker.
*
* @details For details about threading with regard to the message broker
* and modules connected to it, see
Expand All @@ -107,6 +113,30 @@ extern BROKER_RESULT Broker_AddModule(BROKER_HANDLE broker, const MODULE* module
*/
extern BROKER_RESULT Broker_RemoveModule(BROKER_HANDLE broker, const MODULE* module);

/** @brief Adds a route to the message broker.
*
* @details For details about threading with regard to the message broker
* and modules connected to it, see
* <a href="https://github.com/Azure/azure-iot-gateway-sdk/blob/develop/core/devdoc/broker_hld.md">Broker High Level Design Documentation</a>.
*
* @param broker The #BROKER_HANDLE onto which the module will be
* added.
* @param route The #BROKER_LINK_DATA for the route that will be added
* to this message broker.
*
* @return A #BROKER_RESULT describing the result of the function.
*/
extern BROKER_RESULT Broker_AddLink(BROKER_HANDLE broker, const BROKER_LINK_DATA* link);

/** @brief Removes a route from the message broker.
*
* @param broker The #BROKER_HANDLE from which the module will be removed.
* @param route The #BROKER_LINK_DATA of the module to be removed.
*
* @return A #BROKER_RESULT describing the result of the function.
*/
extern BROKER_RESULT Broker_RemoveLink(BROKER_HANDLE broker, const BROKER_LINK_DATA* link);

/** @brief Disposes of resources allocated by a message broker.
*
* @param broker The #BROKER_HANDLE to be destroyed.
Expand Down
Loading

0 comments on commit 4ada508

Please sign in to comment.