From eca6a65771533ac499a58d386e836405eadf19d2 Mon Sep 17 00:00:00 2001 From: Alex Hong Date: Tue, 19 Feb 2019 17:39:02 -0800 Subject: [PATCH] iot: add mqtt gateway samples to GA (#1169) * iot: add mqtt gateway samples to GA * iot: update kokoro iot presubmits, codereview check * iot: update mqttDeviceDemo arguments * iot: add iot presubmit common.cfg * iot: fix name for mqtt-http test * iot: fix flaky test * iot: lint fix --- .kokoro/iot-scripts.cfg | 13 - .kokoro/iot/common.cfg | 16 + .../iot-http-example.cfg} | 0 .kokoro/{ => iot}/iot-manager.cfg | 0 .../iot-mqtt-example.cfg} | 0 iot/beta-features/gateway/gateway.js | 1 - .../system-test/cloudiot_http_example.test.js | 2 +- iot/manager/manager.js | 24 +- iot/manager/system-test/manager.test.js | 12 +- .../cloudiot_mqtt_example_nodejs.js | 736 +++++++++++++++--- iot/mqtt_example/package.json | 3 +- .../system-test/cloudiot_mqtt_example.test.js | 200 ++++- 12 files changed, 865 insertions(+), 142 deletions(-) delete mode 100644 .kokoro/iot-scripts.cfg create mode 100644 .kokoro/iot/common.cfg rename .kokoro/{iot-http_example.cfg => iot/iot-http-example.cfg} (100%) rename .kokoro/{ => iot}/iot-manager.cfg (100%) rename .kokoro/{iot-mqtt_example.cfg => iot/iot-mqtt-example.cfg} (100%) diff --git a/.kokoro/iot-scripts.cfg b/.kokoro/iot-scripts.cfg deleted file mode 100644 index 21347f8103..0000000000 --- a/.kokoro/iot-scripts.cfg +++ /dev/null @@ -1,13 +0,0 @@ -# Format: //devtools/kokoro/config/proto/build.proto - -# Set the folder in which the tests are run -env_vars: { - key: "PROJECT" - value: "iot/scripts" -} - -# Tell the trampoline which build file to use. -env_vars: { - key: "TRAMPOLINE_BUILD_FILE" - value: "github/nodejs-docs-samples/.kokoro/build.sh" -} diff --git a/.kokoro/iot/common.cfg b/.kokoro/iot/common.cfg new file mode 100644 index 0000000000..3e09e3a430 --- /dev/null +++ b/.kokoro/iot/common.cfg @@ -0,0 +1,16 @@ +# Format: //devtools/kokoro/config/proto/build.proto + +# Download trampoline resources. These will be in ${KOKORO_GFILE_DIR} +gfile_resources: "/bigstore/cloud-devrel-kokoro-resources/trampoline" + +# Download secrets from Cloud Storage. +gfile_resources: "/bigstore/cloud-devrel-kokoro-resources/nodejs-docs-samples" + +# All builds use the trampoline script to run in docker. +build_file: "nodejs-docs-samples/.kokoro/trampoline.sh" + +# Configure the docker image for kokoro-trampoline. +env_vars: { + key: "TRAMPOLINE_IMAGE" + value: "gcr.io/cloud-devrel-kokoro-resources/node:8-user" +} \ No newline at end of file diff --git a/.kokoro/iot-http_example.cfg b/.kokoro/iot/iot-http-example.cfg similarity index 100% rename from .kokoro/iot-http_example.cfg rename to .kokoro/iot/iot-http-example.cfg diff --git a/.kokoro/iot-manager.cfg b/.kokoro/iot/iot-manager.cfg similarity index 100% rename from .kokoro/iot-manager.cfg rename to .kokoro/iot/iot-manager.cfg diff --git a/.kokoro/iot-mqtt_example.cfg b/.kokoro/iot/iot-mqtt-example.cfg similarity index 100% rename from .kokoro/iot-mqtt_example.cfg rename to .kokoro/iot/iot-mqtt-example.cfg diff --git a/iot/beta-features/gateway/gateway.js b/iot/beta-features/gateway/gateway.js index 183764e66b..4275dc8211 100644 --- a/iot/beta-features/gateway/gateway.js +++ b/iot/beta-features/gateway/gateway.js @@ -758,7 +758,6 @@ function sendDataFromBoundDevice( if (!success) { console.log('Client not connected...'); } else if (!publishChainInProgress) { - // TODO: wait for commands console.log('Client connected: Attaching device'); attachDevice(deviceId, client); setTimeout(() => { diff --git a/iot/http_example/system-test/cloudiot_http_example.test.js b/iot/http_example/system-test/cloudiot_http_example.test.js index f5ab5001db..33f87dfd64 100644 --- a/iot/http_example/system-test/cloudiot_http_example.test.js +++ b/iot/http_example/system-test/cloudiot_http_example.test.js @@ -114,7 +114,7 @@ test(`should send event message`, async t => { await tools.runAsync(`${helper} deleteRegistry ${localRegName}`, cwd); }); -test(`should send event message`, async t => { +test(`should send state message`, async t => { const localDevice = `test-rsa-device`; const localRegName = `${registryName}-rsa256`; await tools.runAsync(`${helper} setupIotTopic ${topicName}`, cwd); diff --git a/iot/manager/manager.js b/iot/manager/manager.js index d9d52557a2..cb67eded5d 100644 --- a/iot/manager/manager.js +++ b/iot/manager/manager.js @@ -1381,10 +1381,10 @@ require(`yargs`) // eslint-disable-line `Creates a device with the given public key. Public key can be ommitted and added later on.`, { publicKeyFormat: { - default: 'RS256_X509_PEM', + default: 'RSA_X509_PEM', description: 'Public key format for devices.', requiresArg: true, - choices: ['RS256_PEM', 'RS256_X509_PEM', 'ES256_PEM', 'ES256_X509_PEM'], + choices: ['RSA_PEM', 'RSA_X509_PEM', 'ES256_PEM', 'ES256_X509_PEM'], type: 'string', }, publicKeyFile: { @@ -1671,7 +1671,21 @@ require(`yargs`) // eslint-disable-line .command( `createGateway `, `Creates a gateway`, - {}, + { + publicKeyFormat: { + default: 'RSA_X509_PEM', + description: 'Public key format for devices.', + requiresArg: true, + choices: ['RSA_PEM', 'RSA_X509_PEM', 'ES256_PEM', 'ES256_X509_PEM'], + type: 'string', + }, + publicKeyFile: { + description: + 'Path to the public key file used for device authentication.', + requiresArg: true, + type: 'string', + }, + }, opts => { const cb = function(client) { createGateway( @@ -1680,8 +1694,8 @@ require(`yargs`) // eslint-disable-line opts.cloudRegion, opts.registryId, opts.gatewayId, - opts.publicKeyFile, - opts.publicKeyFormat + opts.publicKeyFormat, + opts.publicKeyFile ); }; getClient(opts.serviceAccount, cb); diff --git a/iot/manager/system-test/manager.test.js b/iot/manager/system-test/manager.test.js index 9e504d35be..72c07d6b37 100644 --- a/iot/manager/system-test/manager.test.js +++ b/iot/manager/system-test/manager.test.js @@ -267,7 +267,7 @@ test(`should send command message to device`, async t => { ); tools.runAsync( - `node cloudiot_mqtt_example_nodejs.js --deviceId=${deviceId} --registryId=${registryName} --privateKeyFile=${rsaPrivateKey} --algorithm=RS256 --numMessages=20 --mqttBridgePort=443`, + `node cloudiot_mqtt_example_nodejs.js mqttDeviceDemo --deviceId=${deviceId} --registryId=${registryName} --privateKeyFile=${rsaPrivateKey} --algorithm=RS256 --numMessages=20 --mqttBridgePort=443`, path.join(__dirname, '../../mqtt_example') ); @@ -283,7 +283,7 @@ test(`should send command message to device`, async t => { test(`should create a new gateway`, async t => { const gatewayId = `nodejs-test-gateway-iot-${uuid.v4()}`; let gatewayOut = await tools.runAsync( - `${cmd} createGateway ${registryName} ${gatewayId} RS256_X509_PEM ${rsaPublicCert}` + `${cmd} createGateway ${registryName} ${gatewayId} RSA_X509_PEM ${rsaPublicCert}` ); // test no error on create gateway. @@ -297,7 +297,7 @@ test(`should create a new gateway`, async t => { test(`should list gateways`, async t => { const gatewayId = `nodejs-test-gateway-iot-${uuid.v4()}`; await tools.runAsync( - `${cmd} createGateway ${registryName} ${gatewayId} RS256_X509_PEM ${rsaPublicCert}` + `${cmd} createGateway ${registryName} ${gatewayId} RSA_X509_PEM ${rsaPublicCert}` ); // look for output in list gateway @@ -312,7 +312,7 @@ test(`should list gateways`, async t => { test(`should bind existing device to gateway`, async t => { const gatewayId = `nodejs-test-gateway-iot-${uuid.v4()}`; await tools.runAsync( - `${cmd} createGateway ${registryName} ${gatewayId} RS256_X509_PEM ${rsaPublicCert}` + `${cmd} createGateway ${registryName} ${gatewayId} RSA_X509_PEM ${rsaPublicCert}` ); // create device @@ -350,7 +350,7 @@ test(`should bind existing device to gateway`, async t => { test(`should list devices bound to gateway`, async t => { const gatewayId = `nodejs-test-gateway-iot-${uuid.v4()}`; await tools.runAsync( - `${cmd} createGateway ${registryName} ${gatewayId} RS256_X509_PEM ${rsaPublicCert}` + `${cmd} createGateway ${registryName} ${gatewayId} RSA_X509_PEM ${rsaPublicCert}` ); const deviceId = `nodejs-test-device-iot-${uuid.v4()}`; @@ -389,7 +389,7 @@ test(`should list devices bound to gateway`, async t => { test(`should list gateways for bound device`, async t => { const gatewayId = `nodejs-test-gateway-iot-${uuid.v4()}`; await tools.runAsync( - `${cmd} createGateway ${registryName} ${gatewayId} RS256_X509_PEM ${rsaPublicCert}` + `${cmd} createGateway ${registryName} ${gatewayId} RSA_X509_PEM ${rsaPublicCert}` ); // create device diff --git a/iot/mqtt_example/cloudiot_mqtt_example_nodejs.js b/iot/mqtt_example/cloudiot_mqtt_example_nodejs.js index 8fff643dfd..88bbfc9243 100644 --- a/iot/mqtt_example/cloudiot_mqtt_example_nodejs.js +++ b/iot/mqtt_example/cloudiot_mqtt_example_nodejs.js @@ -77,12 +77,6 @@ var argv = require(`yargs`) choices: ['RS256', 'ES256'], type: 'string', }, - numMessages: { - default: 100, - description: 'Number of messages to publish.', - requiresArg: true, - type: 'number', - }, tokenExpMins: { default: 20, description: 'Minutes to JWT token expiration.', @@ -101,16 +95,148 @@ var argv = require(`yargs`) requiresArg: true, type: 'number', }, - messageType: { - default: 'events', - description: 'Message type to publish.', - requiresArg: true, - choices: ['events', 'state'], - type: 'string', - }, }) + .command( + `mqttDeviceDemo`, + `Connects a device, sends data, and receives data`, + { + messageType: { + default: 'events', + description: 'Message type to publish.', + requiresArg: true, + choices: ['events', 'state'], + type: 'string', + }, + numMessages: { + default: 10, + description: 'Number of messages to publish.', + demandOption: true, + type: 'number', + }, + }, + opts => { + mqttDeviceDemo( + opts.deviceId, + opts.registryId, + opts.projectId, + opts.cloudRegion, + opts.algorithm, + opts.privateKeyFile, + opts.mqttBridgeHostname, + opts.mqttBridgePort, + opts.messageType, + opts.numMessages + ); + } + ) + .command( + `sendDataFromBoundDevice`, + `Sends data from a gateway on behalf of a bound device.`, + { + gatewayId: { + description: 'Cloud IoT gateway ID.', + requiresArg: true, + demandOption: true, + type: 'string', + }, + numMessages: { + default: 10, + description: 'Number of messages to publish.', + demandOption: true, + type: 'number', + }, + }, + opts => { + sendDataFromBoundDevice( + opts.deviceId, + opts.gatewayId, + opts.registryId, + opts.projectId, + opts.cloudRegion, + opts.algorithm, + opts.privateKeyFile, + opts.mqttBridgeHostname, + opts.mqttBridgePort, + opts.numMessages, + opts.tokenExpMins + ); + } + ) + .command( + `listenForConfigMessages`, + `Listens for configuration changes on a gateway and bound device.`, + { + gatewayId: { + description: 'Cloud IoT gateway ID.', + requiresArg: true, + demandOption: true, + type: 'string', + }, + clientDuration: { + default: 60000, + description: 'Duration in milliseconds for MQTT client to run.', + requiresArg: true, + type: 'number', + }, + }, + opts => { + listenForConfigMessages( + opts.deviceId, + opts.gatewayId, + opts.registryId, + opts.projectId, + opts.cloudRegion, + opts.algorithm, + opts.privateKeyFile, + opts.mqttBridgeHostname, + opts.mqttBridgePort, + opts.clientDuration + ); + } + ) + .command( + `listenForErrorMessages`, + `Listens for error messages on a gateway.`, + { + gatewayId: { + description: 'Cloud IoT gateway ID.', + requiresArg: true, + demandOption: true, + type: 'string', + }, + clientDuration: { + default: 60000, + description: 'Duration in milliseconds for MQTT client to run.', + requiresArg: true, + type: 'number', + }, + }, + opts => { + listenForErrorMessages( + opts.deviceId, + opts.gatewayId, + opts.registryId, + opts.projectId, + opts.cloudRegion, + opts.algorithm, + opts.privateKeyFile, + opts.mqttBridgeHostname, + opts.mqttBridgePort, + opts.clientDuration + ); + } + ) + .example( + `node $0 mqttDeviceDemo --projectId=blue-jet-123 \\\n\t--registryId=my-registry --deviceId=my-node-device \\\n\t--privateKeyFile=../rsa_private.pem --algorithm=RS256 \\\n\t--cloudRegion=us-central1 --numMessages=10 \\\n` + ) + .example( + `node $0 sendDataFromBoundDevice --projectId=blue-jet-123 \\\n\t--registryId=my-registry --deviceId=my-node-device \\\n\t--privateKeyFile=../rsa_private.pem --algorithm=RS256 \\\n\t--cloudRegion=us-central1 --gatewayId=my-node-gateway \\\n` + ) .example( - `node $0 cloudiot_mqtt_example_nodejs.js --projectId=blue-jet-123 \\\n\t--registryId=my-registry --deviceId=my-node-device \\\n\t--privateKeyFile=../rsa_private.pem --algorithm=RS256 \\\n\t --cloudRegion=us-central1` + `node $0 listenForConfigMessages --projectId=blue-jet-123 \\\n\t--registryId=my-registry --deviceId=my-node-device \\\n\t--privateKeyFile=../rsa_private.pem --algorithm=RS256 \\\n\t--cloudRegion=us-central1 --gatewayId=my-node-gateway \\\n\t--clientDuration=300000 \\\n` + ) + .example( + `node $0 listenForErrorMessages --projectId=blue-jet-123 \\\n\t--registryId=my-registry --deviceId=my-node-device \\\n\t--privateKeyFile=../rsa_private.pem --algorithm=RS256 \\\n\t--cloudRegion=us-central1 --gatewayId=my-node-gateway \\\n\t--clientDuration=300000 \\\n` ) .wrap(120) .recommendCommands() @@ -138,7 +264,14 @@ function createJwt(projectId, privateKeyFile, algorithm) { // Publish numMessages messages asynchronously, starting from message // messagesSent. // [START iot_mqtt_publish] -function publishAsync(messagesSent, numMessages) { +function publishAsync( + mqttTopic, + client, + iatTime, + messagesSent, + numMessages, + connectionArgs +) { // If we have published enough messages or backed off too many times, stop. if (messagesSent > numMessages || backoffTime >= MAXIMUM_BACKOFF_TIME) { if (backoffTime >= MAXIMUM_BACKOFF_TIME) { @@ -192,13 +325,21 @@ function publishAsync(messagesSent, numMessages) { connectionArgs.protocolVersion = 4; connectionArgs.clean = true; client = mqtt.connect(connectionArgs); + // [END iot_mqtt_jwt_refresh] client.on('connect', success => { console.log('connect'); if (!success) { console.log('Client not connected...'); } else if (!publishChainInProgress) { - publishAsync(1, argv.numMessages); + publishAsync( + mqttTopic, + client, + iatTime, + messagesSent, + numMessages, + connectionArgs + ); } }); @@ -222,88 +363,501 @@ function publishAsync(messagesSent, numMessages) { // Note: logging packet send is very verbose }); } - // [END iot_mqtt_jwt_refresh] - publishAsync(messagesSent + 1, numMessages); + publishAsync( + mqttTopic, + client, + iatTime, + messagesSent + 1, + numMessages, + connectionArgs + ); }, schedulePublishDelayMs); }, publishDelayMs); } // [END iot_mqtt_publish] -// [START iot_mqtt_run] -// The mqttClientId is a unique string that identifies this device. For Google -// Cloud IoT Core, it must be in the format below. -const mqttClientId = `projects/${argv.projectId}/locations/${ - argv.cloudRegion -}/registries/${argv.registryId}/devices/${argv.deviceId}`; - -// With Google Cloud IoT Core, the username field is ignored, however it must be -// non-empty. The password field is used to transmit a JWT to authorize the -// device. The "mqtts" protocol causes the library to connect using SSL, which -// is required for Cloud IoT Core. -let connectionArgs = { - host: argv.mqttBridgeHostname, - port: argv.mqttBridgePort, - clientId: mqttClientId, - username: 'unused', - password: createJwt(argv.projectId, argv.privateKeyFile, argv.algorithm), - protocol: 'mqtts', - secureProtocol: 'TLSv1_2_method', -}; - -// Create a client, and connect to the Google MQTT bridge. -let iatTime = parseInt(Date.now() / 1000); -let client = mqtt.connect(connectionArgs); - -// Subscribe to the /devices/{device-id}/config topic to receive config updates. -// Config updates are recommended to use QoS 1 (at least once delivery) -client.subscribe(`/devices/${argv.deviceId}/config`, {qos: 1}); - -// Subscribe to the /devices/{device-id}/commands/# topic to receive all -// commands or to the /devices/{device-id}/commands/ to just receive -// messages published to a specific commands folder; we recommend you use -// QoS 0 (at most once delivery) -client.subscribe(`/devices/${argv.deviceId}/commands/#`, {qos: 0}); - -// The MQTT topic that this device will publish data to. The MQTT topic name is -// required to be in the format below. The topic name must end in 'state' to -// publish state and 'events' to publish telemetry. Note that this is not the -// same as the device registry's Cloud Pub/Sub topic. -const mqttTopic = `/devices/${argv.deviceId}/${argv.messageType}`; - -client.on('connect', success => { - console.log('connect'); - if (!success) { - console.log('Client not connected...'); - } else if (!publishChainInProgress) { - publishAsync(1, argv.numMessages); +function mqttDeviceDemo( + deviceId, + registryId, + projectId, + region, + algorithm, + privateKeyFile, + mqttBridgeHostname, + mqttBridgePort, + messageType, + numMessages +) { + // [START iot_mqtt_run] + + // const deviceId = `myDevice`; + // const registryId = `myRegistry`; + // const region = `us-central1`; + // const algorithm = `RS256`; + // const privateKeyFile = `./rsa_private.pem`; + // const mqttBridgeHostname = `mqtt.googleapis.com`; + // const mqttBridgePort = 8883; + // const messageType = `events`; + // const numMessages = 5; + + // The mqttClientId is a unique string that identifies this device. For Google + // Cloud IoT Core, it must be in the format below. + const mqttClientId = `projects/${projectId}/locations/${region}/registries/${registryId}/devices/${deviceId}`; + + // With Google Cloud IoT Core, the username field is ignored, however it must be + // non-empty. The password field is used to transmit a JWT to authorize the + // device. The "mqtts" protocol causes the library to connect using SSL, which + // is required for Cloud IoT Core. + let connectionArgs = { + host: mqttBridgeHostname, + port: mqttBridgePort, + clientId: mqttClientId, + username: 'unused', + password: createJwt(projectId, privateKeyFile, algorithm), + protocol: 'mqtts', + secureProtocol: 'TLSv1_2_method', + }; + + // Create a client, and connect to the Google MQTT bridge. + let iatTime = parseInt(Date.now() / 1000); + let client = mqtt.connect(connectionArgs); + + // Subscribe to the /devices/{device-id}/config topic to receive config updates. + // Config updates are recommended to use QoS 1 (at least once delivery) + client.subscribe(`/devices/${deviceId}/config`, {qos: 1}); + + // Subscribe to the /devices/{device-id}/commands/# topic to receive all + // commands or to the /devices/{device-id}/commands/ to just receive + // messages published to a specific commands folder; we recommend you use + // QoS 0 (at most once delivery) + client.subscribe(`/devices/${deviceId}/commands/#`, {qos: 0}); + + // The MQTT topic that this device will publish data to. The MQTT topic name is + // required to be in the format below. The topic name must end in 'state' to + // publish state and 'events' to publish telemetry. Note that this is not the + // same as the device registry's Cloud Pub/Sub topic. + const mqttTopic = `/devices/${deviceId}/${messageType}`; + + client.on('connect', success => { + console.log('connect'); + if (!success) { + console.log('Client not connected...'); + } else if (!publishChainInProgress) { + publishAsync(mqttTopic, client, iatTime, 1, numMessages, connectionArgs); + } + }); + + client.on('close', () => { + console.log('close'); + shouldBackoff = true; + }); + + client.on('error', err => { + console.log('error', err); + }); + + client.on('message', (topic, message) => { + let messageStr = 'Message received: '; + if (topic === `/devices/${deviceId}/config`) { + messageStr = 'Config message received: '; + } else if (topic === `/devices/${deviceId}/commands`) { + messageStr = 'Command message received: '; + } + + messageStr += Buffer.from(message, 'base64').toString('ascii'); + console.log(messageStr); + }); + + client.on('packetsend', () => { + // Note: logging packet send is very verbose + }); + + // Once all of the messages have been published, the connection to Google Cloud + // IoT will be closed and the process will exit. See the publishAsync method. + // [END iot_mqtt_run] +} + +// Attaches a device to a gateway. +function attachDevice(deviceId, client, jwt) { + // [START attach_device] + // const deviceId = 'my-unauth-device'; + const attachTopic = `/devices/${deviceId}/attach`; + console.log(`Attaching: ${attachTopic}`); + let attachPayload = '{}'; + if (jwt && jwt !== '') { + attachPayload = `{ 'authorization' : ${jwt} }`; } -}); - -client.on('close', () => { - console.log('close'); - shouldBackoff = true; -}); - -client.on('error', err => { - console.log('error', err); -}); - -client.on('message', (topic, message) => { - let messageStr = 'Message received: '; - if (topic === `/devices/${argv.deviceId}/config`) { - messageStr = 'Config message received: '; - } else if (topic === `/devices/${argv.deviceId}/commands`) { - messageStr = 'Command message received: '; + + client.publish(attachTopic, attachPayload, {qos: 1}, err => { + if (!err) { + shouldBackoff = false; + backoffTime = MINIMUM_BACKOFF_TIME; + } else { + console.log(err); + } + }); + // [END attach_device] +} + +// Detaches a device from a gateway. +function detachDevice(deviceId, client, jwt) { + // [START detach_device] + const detachTopic = `/devices/${deviceId}/detach`; + console.log(`Detaching: ${detachTopic}`); + let detachPayload = '{}'; + if (jwt && jwt !== '') { + detachPayload = `{ 'authorization' : ${jwt} }`; } - messageStr += Buffer.from(message, 'base64').toString('ascii'); - console.log(messageStr); -}); + client.publish(detachTopic, detachPayload, {qos: 1}, err => { + if (!err) { + shouldBackoff = false; + backoffTime = MINIMUM_BACKOFF_TIME; + } else { + console.log(err); + } + }); + // [END detach_device] +} -client.on('packetsend', () => { - // Note: logging packet send is very verbose -}); +// Publish numMessages messages asynchronously through a gateway client connection +function publishAsyncGateway( + client, + iatTime, + tokenExpMins, + messagesSent, + numMessages, + registryId, + deviceId, + gatewayId, + connectionArgs, + projectId, + privateKeyFile, + algorithm +) { + // If we have published enough messages or backed off too many times, stop. + if (messagesSent > numMessages || backoffTime >= MAXIMUM_BACKOFF_TIME) { + if (backoffTime >= MAXIMUM_BACKOFF_TIME) { + console.log('Backoff time is too high. Giving up.'); + } + if (messagesSent >= numMessages) { + detachDevice(deviceId, client); + } + console.log('Closing connection to MQTT. Goodbye!'); + client.end(); + publishChainInProgress = false; + return; + } -// Once all of the messages have been published, the connection to Google Cloud -// IoT will be closed and the process will exit. See the publishAsync method. -// [END iot_mqtt_run] + // Publish and schedule the next publish. + publishChainInProgress = true; + var publishDelayMs = 0; + if (shouldBackoff) { + publishDelayMs = 1000 * (backoffTime + Math.random()); + backoffTime *= 2; + console.log(`Backing off for ${publishDelayMs}ms before publishing.`); + } + let mqttTopic = `/devices/${gatewayId}/state`; + let payload = `${registryId}/${gatewayId}-connected-${new Date().getTime()}`; + console.log(`Publishing message ${messagesSent}/${numMessages}`); + if (messagesSent > 0) { + mqttTopic = `/devices/${deviceId}/state`; + payload = `${registryId}/${deviceId}-payload-${messagesSent}`; + } + + setTimeout(function() { + // Publish "payload" to the MQTT topic. qos=1 means at least once delivery. + // Cloud IoT Core also supports qos=0 for at most once delivery. + console.log(`Publishing message: ${payload} to ${mqttTopic}`); + client.publish(mqttTopic, payload, {qos: 1}, function(err) { + if (!err) { + shouldBackoff = false; + backoffTime = MINIMUM_BACKOFF_TIME; + } + }); + + var schedulePublishDelayMs = 5000; // messageType === 'events' ? 1000 : 2000; + setTimeout(function() { + let secsFromIssue = parseInt(Date.now() / 1000) - iatTime; + if (secsFromIssue > tokenExpMins * 60) { + iatTime = parseInt(Date.now() / 1000); + console.log(`\tRefreshing token after ${secsFromIssue} seconds.`); + + client.end(); + connectionArgs.password = createJwt( + projectId, + privateKeyFile, + algorithm + ); + client = mqtt.connect(connectionArgs); + } + publishAsyncGateway( + client, + iatTime, + tokenExpMins, + messagesSent + 1, + numMessages, + registryId, + deviceId, + gatewayId + ); + }, schedulePublishDelayMs); + }, publishDelayMs); +} + +// Sends data from a gateway on behalf of a device that is bound to that gateway. +function sendDataFromBoundDevice( + deviceId, + gatewayId, + registryId, + projectId, + region, + algorithm, + privateKeyFile, + mqttBridgeHostname, + mqttBridgePort, + numMessages, + tokenExpMins +) { + // [START iot_send_data_from_bound_device] + // const deviceId = `myDevice`; + // const gatewayId = `mygateway`; + // const registryId = `myRegistry`; + // const region = `us-central1`; + // const algorithm = `RS256`; + // const privateKeyFile = `./rsa_private.pem`; + // const mqttBridgeHostname = `mqtt.googleapis.com`; + // const mqttBridgePort = 8883; + // const numMessages = 5; + // const tokenExpMins = 60; + + const mqttClientId = `projects/${projectId}/locations/${region}/registries/${registryId}/devices/${gatewayId}`; + console.log(`MQTT client id: ${mqttClientId}`); + let connectionArgs = { + host: mqttBridgeHostname, + port: mqttBridgePort, + clientId: mqttClientId, + username: 'unused', + password: createJwt(projectId, privateKeyFile, algorithm), + protocol: 'mqtts', + qos: 1, + secureProtocol: 'TLSv1_2_method', + }; + + // Create a client, and connect to the Google MQTT bridge. + let iatTime = parseInt(Date.now() / 1000); + let client = mqtt.connect(connectionArgs); + + client.on('connect', success => { + if (!success) { + console.log('Client not connected...'); + } else if (!publishChainInProgress) { + console.log('Client connected: Attaching device'); + attachDevice(deviceId, client); + setTimeout(() => { + console.log('Client connected: Gateway is ready to relay'); + publishAsyncGateway( + client, + iatTime, + tokenExpMins, + 0, + numMessages, + registryId, + deviceId, + gatewayId, + connectionArgs, + projectId, + privateKeyFile, + algorithm + ); + }, 5000); + } + }); + + client.on('close', () => { + console.log('Connection closed'); + shouldBackoff = true; + }); + + client.on('error', err => { + console.log('error', err); + }); + + client.on('message', (topic, message) => { + console.log( + 'message received: ', + Buffer.from(message, 'base64').toString('ascii') + ); + }); + + client.on('packetsend', () => { + // Note: logging packet send is very verbose + }); + // [END iot_send_data_from_bound_device] +} + +// Listen for configuration messages on a gateway and bound device. +function listenForConfigMessages( + deviceId, + gatewayId, + registryId, + projectId, + region, + algorithm, + privateKeyFile, + mqttBridgeHostname, + mqttBridgePort, + clientDuration +) { + // [START iot_listen_for_config_messages] + // const deviceId = `myDevice`; + // const gatewayId = `mygateway`; + // const registryId = `myRegistry`; + // const region = `us-central1`; + // const algorithm = `RS256`; + // const privateKeyFile = `./rsa_private.pem`; + // const mqttBridgeHostname = `mqtt.googleapis.com`; + // const mqttBridgePort = 8883; + // const clientDuration = 60000; + + const mqttClientId = `projects/${projectId}/locations/${region}/registries/${registryId}/devices/${gatewayId}`; + console.log(mqttClientId); + let connectionArgs = { + host: mqttBridgeHostname, + port: mqttBridgePort, + clientId: mqttClientId, + username: 'unused', + password: createJwt(projectId, privateKeyFile, algorithm), + protocol: 'mqtts', + qos: 1, + secureProtocol: 'TLSv1_2_method', + }; + + // Create a client, and connect to the Google MQTT bridge. + let client = mqtt.connect(connectionArgs); + + client.on('connect', success => { + if (!success) { + console.log('Client not connected...'); + } else { + console.log('Client connected: Gateway is listening, attaching device'); + attachDevice(deviceId, client); + + setTimeout(() => { + // Subscribe to any configuration topics. + client.subscribe(`/devices/${gatewayId}/config`, {qos: 1}); + client.subscribe(`/devices/${deviceId}/config`, {qos: 1}); + + setTimeout(() => { + detachDevice(deviceId, client); + console.log('Closing connection to MQTT. Goodbye!'); + client.end(true); + }, clientDuration); // Safely detach device and close connection. + }, 5000); + } + }); + + client.on('close', () => { + console.log('Connection closed'); + shouldBackoff = true; + }); + + client.on('error', err => { + console.log('error', err); + }); + + client.on('message', (topic, message) => { + let decodedMessage = Buffer.from(message, 'base64').toString('ascii'); + + if (topic === `/devices/${gatewayId}/errors`) { + console.log(`message received on error topic: ${decodedMessage}`); + } else { + console.log(`message received on topic ${topic}: ${decodedMessage}`); + } + }); + + client.on('packetsend', () => { + // Note: logging packet send is very verbose + }); + // [END iot_listen_for_config_messages] +} + +// Listen for error messages on a gateway. +function listenForErrorMessages( + deviceId, + gatewayId, + registryId, + projectId, + region, + algorithm, + privateKeyFile, + mqttBridgeHostname, + mqttBridgePort, + clientDuration +) { + // [START iot_listen_for_error_messages] + // const deviceId = `myDevice`; + // const gatewayId = `mygateway`; + // const registryId = `myRegistry`; + // const projectId = `my-project-123`; + // const region = `us-central1`; + // const algorithm = `RS256`; + // const privateKeyFile = `./rsa_private.pem`; + // const mqttBridgeHostname = `mqtt.googleapis.com`; + // const mqttBridgePort = 8883; + // const clientDuration = 60000; + + const mqttClientId = `projects/${projectId}/locations/${region}/registries/${registryId}/devices/${gatewayId}`; + console.log(mqttClientId); + let connectionArgs = { + host: mqttBridgeHostname, + port: mqttBridgePort, + clientId: mqttClientId, + username: 'unused', + password: createJwt(projectId, privateKeyFile, algorithm), + protocol: 'mqtts', + qos: 1, + secureProtocol: 'TLSv1_2_method', + }; + + // Create a client, and connect to the Google MQTT bridge. + let client = mqtt.connect(connectionArgs); + + client.on('connect', success => { + if (!success) { + console.log('Client not connected...'); + } else { + setTimeout(() => { + // Subscribe to gateway error topic. + client.subscribe(`/devices/${gatewayId}/errors`, {qos: 0}); + + attachDevice(deviceId, client); + + setTimeout(() => { + console.log('Closing connection to MQTT. Goodbye!'); + client.end(true); + }, clientDuration); // Safely detach device and close connection. + }, 5000); + } + }); + + client.on('close', () => { + console.log('Connection closed'); + shouldBackoff = true; + }); + + client.on('error', err => { + console.log('error', err); + }); + + client.on('message', (topic, message) => { + let decodedMessage = Buffer.from(message, 'base64').toString('ascii'); + + console.log(`message received on error topic ${topic}: ${decodedMessage}`); + }); + + client.on('packetsend', () => { + // Note: logging packet send is very verbose + }); + // [END iot_listen_for_error_messages] +} diff --git a/iot/mqtt_example/package.json b/iot/mqtt_example/package.json index d074cf187c..1864632421 100644 --- a/iot/mqtt_example/package.json +++ b/iot/mqtt_example/package.json @@ -16,13 +16,14 @@ "test": "repo-tools test run --cmd ava -- -T 3m --verbose system-test/*.test.js" }, "dependencies": { + "@google-cloud/iot": "^0.2.0", "jsonwebtoken": "8.4.0", "mqtt": "2.18.8", "yargs": "12.0.5" }, "devDependencies": { + "@google-cloud/nodejs-repo-tools": "^3.2.0", "@google-cloud/pubsub": "0.22.2", - "@google-cloud/nodejs-repo-tools": "^3.0.0", "ava": "0.25.0", "uuid": "3.3.2" } diff --git a/iot/mqtt_example/system-test/cloudiot_mqtt_example.test.js b/iot/mqtt_example/system-test/cloudiot_mqtt_example.test.js index 1f2b362542..4b0bb19c7d 100644 --- a/iot/mqtt_example/system-test/cloudiot_mqtt_example.test.js +++ b/iot/mqtt_example/system-test/cloudiot_mqtt_example.test.js @@ -15,37 +15,72 @@ 'use strict'; +const iot = require('@google-cloud/iot'); const path = require(`path`); const {PubSub} = require(`@google-cloud/pubsub`); const test = require(`ava`); const tools = require(`@google-cloud/nodejs-repo-tools`); const uuid = require(`uuid`); -const topicName = `nodejs-docs-samples-test-iot-${uuid.v4()}`; -const registryName = `nodejs-test-registry-iot-${uuid.v4()}`; +const projectId = + process.env.GOOGLE_CLOUD_PROJECT || process.env.GCLOUD_PROJECT; +const topicName = `nodejs-iot-test-mqtt-topic`; +const registryName = `nodejs-iot-test-mqtt-registry`; +const region = `us-central1`; +const rsaPublicCert = process.env.NODEJS_IOT_RSA_PUBLIC_CERT; +const rsaPrivateKey = process.env.NODEJS_IOT_RSA_PRIVATE_KEY; + const helper = `node ../manager/manager.js`; const cmd = `node cloudiot_mqtt_example_nodejs.js `; -const cmdSuffix = ` --numMessages=1 --privateKeyFile=resources/rsa_private.pem --algorithm=RS256`; +const cmdSuffix = ` --numMessages=1 --privateKeyFile=${rsaPrivateKey} --algorithm=RS256`; const cwd = path.join(__dirname, `..`); const installDeps = `npm install`; +const iotClient = new iot.v1.DeviceManagerClient(); +const pubSubClient = new PubSub({projectId}); + test.todo(tools.run(installDeps, `${cwd}/../manager`)); test.before(tools.checkCredentials); test.before(async () => { - const pubsub = new PubSub(); - return pubsub.createTopic(topicName).then(results => { - const topic = results[0]; - console.log(`Topic ${topic.name} created.`); - return topic; - }); + // Create a single topic to be used for testing. + let createTopicRes = await pubSubClient.createTopic(topicName); + let topic = createTopicRes[0]; + console.log(`Topic ${topic.name} created.`); + + // Cleans up and creates a single registry to be used for tests. + tools.run(`${helper} unbindAllDevices ${registryName}`, cwd); + tools.run(`${helper} clearRegistry ${registryName}`, cwd); + + console.log('Cleaned up existing registry.'); + let createRegistryRequest = { + parent: iotClient.locationPath(projectId, region), + deviceRegistry: { + id: registryName, + eventNotificationConfigs: [ + { + pubsubTopicName: topic.name, + }, + ], + }, + }; + await tools.runAsync(`${helper} setupIotTopic ${topicName}`, cwd); + + await iotClient.createDeviceRegistry(createRegistryRequest); + console.log(`Created registry: ${registryName}`); }); test.after.always(async () => { - const pubsub = new PubSub(); - const topic = pubsub.topic(topicName); - return topic.delete().then(() => { - console.log(`Topic ${topic.name} deleted.`); + await pubSubClient.topic(topicName).delete(); + console.log(`Topic ${topicName} deleted.`); + + const deleteRegistryRequest = { + name: iotClient.registryPath(projectId, region, registryName), + }; + + await iotClient.deleteDeviceRegistry(deleteRegistryRequest).catch(err => { + console.log(err); }); + console.log('Deleted test registry.'); }); test(`should receive configuration message`, async t => { @@ -61,12 +96,12 @@ test(`should receive configuration message`, async t => { cwd ); await tools.runAsync( - `${helper} createRsa256Device ${localDevice} ${localRegName} resources/rsa_cert.pem`, + `${helper} createRsa256Device ${localDevice} ${localRegName} ${rsaPublicCert}`, cwd ); output = await tools.runAsync( - `${cmd} --messageType=events --registryId="${localRegName}" --deviceId="${localDevice}" ${cmdSuffix}`, + `${cmd} mqttDeviceDemo --messageType=events --registryId="${localRegName}" --deviceId="${localDevice}" ${cmdSuffix}`, cwd ); @@ -95,12 +130,12 @@ test(`should send event message`, async t => { cwd ); await tools.runAsync( - `${helper} createRsa256Device ${localDevice} ${localRegName} resources/rsa_cert.pem`, + `${helper} createRsa256Device ${localDevice} ${localRegName} ${rsaPublicCert}`, cwd ); const output = await tools.runAsync( - `${cmd} --messageType=events --registryId="${localRegName}" --deviceId="${localDevice}" ${cmdSuffix}`, + `${cmd} mqttDeviceDemo --messageType=events --registryId="${localRegName}" --deviceId="${localDevice}" ${cmdSuffix}`, cwd ); t.regex(output, new RegExp(`Publishing message:`)); @@ -126,12 +161,12 @@ test(`should send state message`, async t => { cwd ); await tools.runAsync( - `${helper} createRsa256Device ${localDevice} ${localRegName} resources/rsa_cert.pem`, + `${helper} createRsa256Device ${localDevice} ${localRegName} ${rsaPublicCert}`, cwd ); const output = await tools.runAsync( - `${cmd} --messageType=state --registryId="${localRegName}" --deviceId="${localDevice}" ${cmdSuffix}`, + `${cmd} mqttDeviceDemo --messageType=state --registryId="${localRegName}" --deviceId="${localDevice}" ${cmdSuffix}`, cwd ); t.regex(output, new RegExp(`Publishing message:`)); @@ -149,7 +184,7 @@ test(`should send state message`, async t => { }); test(`should receive command message`, async t => { - const localDevice = `test-rsa-device`; + const deviceId = `nodejs-test-device-iot-${uuid.v4()}`; const localRegName = `${registryName}-rsa256`; const message = 'rotate 180 degrees'; @@ -159,17 +194,17 @@ test(`should receive command message`, async t => { cwd ); await tools.runAsync( - `${helper} createRsa256Device ${localDevice} ${localRegName} resources/rsa_cert.pem`, + `${helper} createRsa256Device ${deviceId} ${localRegName} ${rsaPublicCert}`, cwd ); let output = tools.runAsync( - `${cmd} --registryId=${localRegName} --deviceId=${localDevice} --numMessages=30 --privateKeyFile=resources/rsa_private.pem --algorithm=RS256 --mqttBridgePort=443`, + `${cmd} mqttDeviceDemo --registryId=${localRegName} --deviceId=${deviceId} --numMessages=30 --privateKeyFile=${rsaPrivateKey} --algorithm=RS256 --mqttBridgePort=443`, cwd ); await tools.runAsync( - `${helper} sendCommand ${localDevice} ${localRegName} "${message}"`, + `${helper} sendCommand ${deviceId} ${localRegName} "${message}"`, cwd ); @@ -177,8 +212,125 @@ test(`should receive command message`, async t => { // Cleanup await tools.runAsync( - `${helper} deleteDevice ${localDevice} ${localRegName}`, + `${helper} deleteDevice ${deviceId} ${localRegName}`, cwd ); await tools.runAsync(`${helper} deleteRegistry ${localRegName}`, cwd); }); + +test(`should listen for bound device config message`, async t => { + const gatewayId = `nodejs-test-gateway-iot-${uuid.v4()}`; + await tools.runAsync( + `${helper} createGateway ${registryName} ${gatewayId} --publicKeyFormat=RSA_X509_PEM --publicKeyFile=${rsaPublicCert}`, + cwd + ); + + const deviceId = `nodejs-test-device-iot-${uuid.v4()}`; + + await tools.runAsync( + `${helper} bindDeviceToGateway ${registryName} ${gatewayId} ${deviceId}`, + cwd + ); + + // listen for configuration changes + let out = await tools.runAsync( + `${cmd} listenForConfigMessages --deviceId=${deviceId} --gatewayId=${gatewayId} --registryId=${registryName} --privateKeyFile=${rsaPrivateKey} --clientDuration=10000 --algorithm=RS256` + ); + + t.regex(out, new RegExp('message received')); + + // cleanup + await tools.runAsync( + `${helper} unbindDeviceFromGateway ${registryName} ${gatewayId} ${deviceId}`, + cwd + ); + await tools.runAsync( + `${helper} deleteDevice ${gatewayId} ${registryName}`, + cwd + ); + await tools.runAsync( + `${helper} deleteDevice ${deviceId} ${registryName}`, + cwd + ); +}); + +test(`should listen for error topic messages`, async t => { + const gatewayId = `nodejs-test-gateway-iot-${uuid.v4()}`; + await tools.runAsync( + `${helper} createGateway ${registryName} ${gatewayId} --publicKeyFormat=RSA_X509_PEM --publicKeyFile=${rsaPublicCert}`, + cwd + ); + + // create a device but don't associate it with the gateway + const deviceId = `nodejs-test-device-iot-${uuid.v4()}`; + await tools.runAsync( + `${helper} createRsa256Device ${deviceId} ${registryName} ${rsaPublicCert}`, + cwd + ); + + // check error topic contains error of attaching a device that is not bound + let out = await tools.runAsync( + `${cmd} listenForErrorMessages --gatewayId=${gatewayId} --registryId=${registryName} --deviceId=${deviceId} --privateKeyFile=${rsaPrivateKey} --clientDuration=30000 --algorithm=RS256` + ); + + t.regex( + out, + new RegExp(`DeviceId ${deviceId} is not associated with Gateway`) + ); + + // cleanup + await tools.runAsync( + `${helper} unbindDeviceFromGateway ${registryName} ${gatewayId} ${deviceId}`, + cwd + ); + await tools.runAsync( + `${helper} deleteDevice ${gatewayId} ${registryName}`, + cwd + ); + await tools.runAsync( + `${helper} deleteDevice ${deviceId} ${registryName}`, + cwd + ); +}); + +test(`should send data from bound device`, async t => { + const gatewayId = `nodejs-test-gateway-iot-${uuid.v4()}`; + await tools.runAsync( + `${helper} createGateway ${registryName} ${gatewayId} --publicKeyFormat=RSA_X509_PEM --publicKeyFile=${rsaPublicCert}`, + cwd + ); + + const deviceId = `nodejs-test-device-iot-${uuid.v4()}`; + await iotClient.createDevice({ + parent: iotClient.registryPath(projectId, region, registryName), + device: { + id: deviceId, + }, + }); + + await tools.runAsync( + `${helper} bindDeviceToGateway ${registryName} ${gatewayId} ${deviceId}`, + cwd + ); + + // relay telemetry on behalf of device + let out = await tools.runAsync( + `${cmd} sendDataFromBoundDevice --deviceId=${deviceId} --gatewayId=${gatewayId} --registryId=${registryName} --privateKeyFile=${rsaPrivateKey} --numMessages=5 --algorithm=RS256` + ); + + t.regex(out, new RegExp('Publishing message 5/5')); + t.notRegex(out, new RegExp('Error: Connection refused')); + + await tools.runAsync( + `${helper} unbindDeviceFromGateway ${registryName} ${gatewayId} ${deviceId}`, + cwd + ); + await tools.runAsync( + `${helper} deleteDevice ${gatewayId} ${registryName}`, + cwd + ); + await tools.runAsync( + `${helper} deleteDevice ${deviceId} ${registryName}`, + cwd + ); +});