Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mqtt js upgrade #562

Merged
merged 8 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions lib/browser/mqtt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/

import * as mqtt from "mqtt";
import * as mqtt_packet from "mqtt-packet";
import * as WebsocketUtils from "./ws";
import * as auth from "./auth";
import { Trie, TrieOp, Node as TrieNode } from "./trie";
Expand All @@ -36,7 +37,7 @@ import {
OnConnectionFailedResult,
OnConnectionClosedResult
} from "../common/mqtt";
import { normalize_payload } from "../common/mqtt_shared";
import {normalize_payload, normalize_payload_to_buffer} from "../common/mqtt_shared";

export {
QoS, Payload, MqttRequest, MqttSubscribeRequest, MqttWill, OnMessageCallback, MqttConnectionConnected, MqttConnectionDisconnected,
Expand Down Expand Up @@ -310,7 +311,7 @@ export class MqttClientConnection extends BufferedEventEmitter {

const will = this.config.will ? {
topic: this.config.will.topic,
payload: normalize_payload(this.config.will.payload),
payload: normalize_payload_to_buffer(this.config.will.payload),
xiazhvera marked this conversation as resolved.
Show resolved Hide resolved
qos: this.config.will.qos,
retain: this.config.will.retain,
} : undefined;
Expand Down Expand Up @@ -576,7 +577,18 @@ export class MqttClientConnection extends BufferedEventEmitter {
return this.on_error(error);
}
const sub = (packet as mqtt.ISubscriptionGrant[])[0];
resolve({ topic: sub.topic, qos: sub.qos });

/*
* 128 is not modeled in QoS, either on our side nor mqtt-js's side.
* We have always passed this 128 to the user and it is not reasonable to extend
* our output type with 128 since it's also our input type and we don't want anyone
* to pass 128 to us.
*
* The 5 client solves this by making the output type a completely separate enum.
*
* By doing this cast, we make the type checker ignore this edge case.
*/
resolve({ topic: sub.topic, qos: sub.qos as QoS });
xiazhvera marked this conversation as resolved.
Show resolved Hide resolved
});
});
}
Expand All @@ -603,7 +615,7 @@ export class MqttClientConnection extends BufferedEventEmitter {
}
resolve({
packet_id: packet
? (packet as mqtt.IUnsubackPacket).messageId
? (packet as mqtt_packet.IUnsubackPacket).messageId
: undefined,
});
});
Expand Down
4 changes: 2 additions & 2 deletions lib/browser/mqtt5.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import url from "url";
import {HttpsProxyAgent} from "https-proxy-agent";
import * as auth from "./auth";

jest.setTimeout(10000);
jest.setTimeout(1000000);

function createBrowserSpecificTestConfig (testType: test_utils.SuccessfulConnectionTestType) : mqtt5.Mqtt5ClientConfig {

Expand Down Expand Up @@ -405,7 +405,7 @@ test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvir
await test_utils.testNegotiatedSettings(forcedRejoinClient, true);
});

test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvironment())('Sub - Pub QoS 0 - Unsub', async () => {
test('Sub - Pub QoS 0 - Unsub', async () => {
let topic : string = `test/${uuid()}`;
let testPayload : Buffer = Buffer.from("Derp", "utf-8");

Expand Down
19 changes: 12 additions & 7 deletions lib/browser/mqtt5.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import {BufferedEventEmitter} from "../common/event";
import * as mqtt from "mqtt"; /* The mqtt-js external dependency */
import * as mqtt_packet from "mqtt-packet";
import * as mqtt5 from "../common/mqtt5";
import {OutboundTopicAliasBehaviorType} from "../common/mqtt5";
import * as mqtt5_packet from "../common/mqtt5_packet"
Expand Down Expand Up @@ -445,15 +446,19 @@ export class Mqtt5Client extends BufferedEventEmitter implements mqtt5.IMqtt5Cli
let subMap: mqtt.ISubscriptionMap = mqtt_utils.transform_crt_subscribe_to_mqtt_js_subscription_map(packet);
let subOptions: mqtt.IClientSubscribeOptions = mqtt_utils.transform_crt_subscribe_to_mqtt_js_subscribe_options(packet);

// @ts-ignore
this.browserClient.subscribe(subMap, subOptions, (error, grants) => {
this.browserClient.subscribe(subMap, subOptions, (error, grants, suback) => {
if (error) {
reject(error);
return;
}

const suback: mqtt5_packet.SubackPacket = mqtt_utils.transform_mqtt_js_subscription_grants_to_crt_suback(grants);
resolve(suback);
if (suback) {
const crtSubackFromMqttjsSuback = mqtt_utils.transform_mqtt_js_suback_to_crt_suback(suback);
resolve(crtSubackFromMqttjsSuback);
} else {
const crtSubackFromGrants: mqtt5_packet.SubackPacket = mqtt_utils.transform_mqtt_js_subscription_grants_to_crt_suback(grants ?? []);
resolve(crtSubackFromGrants);
}
});
} catch (err) {
reject(err);
Expand Down Expand Up @@ -505,7 +510,7 @@ export class Mqtt5Client extends BufferedEventEmitter implements mqtt5.IMqtt5Cli
};
resolve(unsuback);
} else {
const unsuback: mqtt5_packet.UnsubackPacket = mqtt_utils.transform_mqtt_js_unsuback_to_crt_unsuback(packet as mqtt.IUnsubackPacket);
const unsuback: mqtt5_packet.UnsubackPacket = mqtt_utils.transform_mqtt_js_unsuback_to_crt_unsuback(packet as mqtt_packet.IUnsubackPacket);
resolve(unsuback);
}
});
Expand Down Expand Up @@ -607,7 +612,7 @@ export class Mqtt5Client extends BufferedEventEmitter implements mqtt5.IMqtt5Cli
})
}

const puback: mqtt5_packet.PubackPacket = mqtt_utils.transform_mqtt_js_puback_to_crt_puback(completionPacket as mqtt.IPubackPacket);
const puback: mqtt5_packet.PubackPacket = mqtt_utils.transform_mqtt_js_puback_to_crt_puback(completionPacket as mqtt_packet.IPubackPacket);
resolve(puback);
break;

Expand Down Expand Up @@ -889,4 +894,4 @@ export class Mqtt5Client extends BufferedEventEmitter implements mqtt5.IMqtt5Cli
this.emit(Mqtt5Client.MESSAGE_RECEIVED, messageReceivedEvent);
}, 0);
}
}
}
73 changes: 55 additions & 18 deletions lib/browser/mqtt5_utils.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
*/

import * as mqtt from "mqtt";
import * as mqtt_packet from "mqtt-packet";
import * as mqtt5 from "./mqtt5";
import {InboundTopicAliasBehaviorType, OutboundTopicAliasBehaviorType} from "./mqtt5";
import * as mqtt5_utils from "./mqtt5_utils";
import * as mqtt_shared from "../common/mqtt_shared";
import {normalize_payload_to_buffer} from "../common/mqtt_shared";


test('MQTT.JS User Properties to CRT User Properties undefined', async () => {
Expand All @@ -17,7 +19,7 @@ test('MQTT.JS User Properties to CRT User Properties undefined', async () => {
});

test('MQTT.JS User Properties to CRT User Properties single', async () => {
let mqttJsUserProperties : mqtt.UserProperties = {
let mqttJsUserProperties : mqtt_packet.UserProperties = {
prop1 : "value1",
prop2 : "value2"
}
Expand All @@ -40,7 +42,7 @@ test('MQTT.JS User Properties to CRT User Properties single', async () => {
});

test('MQTT.JS User Properties to CRT User Properties multi', async () => {
let mqttJsUserProperties : mqtt.UserProperties = {
let mqttJsUserProperties : mqtt_packet.UserProperties = {
prop1 : "value1",
prop2 : ["value2_1", "value2_2", "value2_3"]
}
Expand Down Expand Up @@ -71,7 +73,7 @@ test('MQTT.JS User Properties to CRT User Properties multi', async () => {
});

test('CRT User Properties to MQTT.js User Properties undefined', async () => {
let mqttJsUserProperties : mqtt.UserProperties | undefined = mqtt5_utils.transform_crt_user_properties_to_mqtt_js_user_properties(undefined);
let mqttJsUserProperties : mqtt_packet.UserProperties | undefined = mqtt5_utils.transform_crt_user_properties_to_mqtt_js_user_properties(undefined);

expect(mqttJsUserProperties).toBeUndefined();
});
Expand All @@ -82,7 +84,7 @@ test('CRT User Properties to MQTT.js User Properties single', async () => {
{ name : "prop2", value: "value2"}
]

let mqttJsUserProperties : mqtt.UserProperties | undefined = mqtt5_utils.transform_crt_user_properties_to_mqtt_js_user_properties(crtUserProperties);
let mqttJsUserProperties : mqtt_packet.UserProperties | undefined = mqtt5_utils.transform_crt_user_properties_to_mqtt_js_user_properties(crtUserProperties);

expect(mqttJsUserProperties).toEqual(
{
Expand All @@ -99,9 +101,9 @@ test('CRT User Properties to MQTT.js User Properties single', async () => {
{ name : "prop2", value: "value2_3"}
]

let mqttJsUserProperties : mqtt.UserProperties | undefined = mqtt5_utils.transform_crt_user_properties_to_mqtt_js_user_properties(crtUserProperties);
let mqttJsUserProperties : mqtt_packet.UserProperties | undefined = mqtt5_utils.transform_crt_user_properties_to_mqtt_js_user_properties(crtUserProperties);
expect(mqttJsUserProperties).toBeDefined();
let definedProperties : mqtt.UserProperties = mqttJsUserProperties ?? {};
let definedProperties : mqtt_packet.UserProperties = mqttJsUserProperties ?? {};

const {prop1 : propOne, prop2: propTwo, ...rest} = definedProperties;

Expand Down Expand Up @@ -415,7 +417,7 @@ test('create_mqtt_js_client_config_from_crt_client_config maximal, minimal will'
expectedOptions["password"] = myPassword;
expectedOptions["will"] = {
topic : "Ohno",
payload : "",
payload : normalize_payload_to_buffer(""),
qos : mqtt5.QoS.AtLeastOnce,
retain : false
}
Expand Down Expand Up @@ -638,8 +640,6 @@ test('transform_crt_subscribe_to_mqtt_js_subscription_map', async() => {
});
});

//function transform_crt_subscribe_to_mqtt_js_subscribe_options(subscribe: mqtt5.SubscribePacket) : mqtt.IClientSubscribeOptions

test('transform_crt_subscribe_to_mqtt_js_subscribe_options minimal', async() => {
let subscribe : mqtt5.SubscribePacket = {
subscriptions: [
Expand Down Expand Up @@ -692,7 +692,7 @@ test('transform_mqtt_js_subscription_grants_to_crt_suback', async() => {
},
{
topic: "a/different/topic",
qos: mqtt5.SubackReasonCode.NotAuthorized,
qos: mqtt5.SubackReasonCode.UnspecifiedError,
nl: true,
rap: true,
rh: 2
Expand All @@ -703,7 +703,45 @@ test('transform_mqtt_js_subscription_grants_to_crt_suback', async() => {

expect(suback).toEqual({
type: mqtt5.PacketType.Suback,
reasonCodes: [2, mqtt5.SubackReasonCode.NotAuthorized]
reasonCodes: [2, mqtt5.SubackReasonCode.UnspecifiedError]
});
});

test('transform_mqtt_js_suback_to_crt_suback - minimal', async() => {
let mqttJsSuback : mqtt_packet.ISubackPacket = {
cmd: "suback",
granted: [1]
};

let suback : mqtt5.SubackPacket = mqtt5_utils.transform_mqtt_js_suback_to_crt_suback(mqttJsSuback);

expect(suback).toEqual({
type: mqtt5.PacketType.Suback,
reasonCodes: [mqtt5.SubackReasonCode.GrantedQoS1]
});
});

test('transform_mqtt_js_suback_to_crt_suback - maximal', async() => {
let mqttJsSuback : mqtt_packet.ISubackPacket = {
cmd: "suback",
granted: [2, 128],
properties : {
reasonString: "Misadventure",
userProperties: {
world: ["hello"]
}
}
};

let suback : mqtt5.SubackPacket = mqtt5_utils.transform_mqtt_js_suback_to_crt_suback(mqttJsSuback);

expect(suback).toEqual({
type: mqtt5.PacketType.Suback,
reasonCodes: [mqtt5.SubackReasonCode.GrantedQoS2, mqtt5.SubackReasonCode.UnspecifiedError],
reasonString: "Misadventure",
userProperties: [
{name: "world", value: "hello"}
]
});
});

Expand Down Expand Up @@ -827,7 +865,7 @@ test('transform_mqtt_js_publish_to_crt_publish maximal', async() => {
});

test('transform_mqtt_js_puback_to_crt_puback minimal', async() => {
let mqttJsPuback : mqtt.IPubackPacket = {
let mqttJsPuback : mqtt_packet.IPubackPacket = {
cmd: 'puback'
};

Expand All @@ -840,7 +878,7 @@ test('transform_mqtt_js_puback_to_crt_puback minimal', async() => {
});

test('transform_mqtt_js_puback_to_crt_puback maximal', async() => {
let mqttJsPuback : mqtt.IPubackPacket = {
let mqttJsPuback : mqtt_packet.IPubackPacket = {
cmd: 'puback',
reasonCode: mqtt5.PubackReasonCode.NotAuthorized,
properties: {
Expand Down Expand Up @@ -893,9 +931,9 @@ test('transform_crt_unsubscribe_to_mqtt_js_unsubscribe_options maximal', async()
});

test('transform_mqtt_js_unsuback_to_crt_unsuback minimal', async() => {
let mqttJsUnsuback : mqtt.IUnsubackPacket = {
let mqttJsUnsuback : mqtt_packet.IUnsubackPacket = {
cmd: 'unsuback',
reasonCode: mqtt5.UnsubackReasonCode.NoSubscriptionExisted
granted: [mqtt5.UnsubackReasonCode.NoSubscriptionExisted]
};

let crtUnsuback : mqtt5.UnsubackPacket = mqtt5_utils.transform_mqtt_js_unsuback_to_crt_unsuback(mqttJsUnsuback);
Expand All @@ -907,10 +945,9 @@ test('transform_mqtt_js_unsuback_to_crt_unsuback minimal', async() => {
});

test('transform_mqtt_js_unsuback_to_crt_unsuback maximal', async() => {
let mqttJsUnsuback : mqtt.IUnsubackPacket = {
let mqttJsUnsuback : mqtt_packet.IUnsubackPacket = {
cmd: 'unsuback',
// @ts-ignore
reasonCode: [mqtt5.UnsubackReasonCode.NoSubscriptionExisted, mqtt5.UnsubackReasonCode.ImplementationSpecificError],
granted: [mqtt5.UnsubackReasonCode.NoSubscriptionExisted, mqtt5.UnsubackReasonCode.ImplementationSpecificError],
properties: {
reasonString: "Dunno",
userProperties: {
Expand Down
Loading
Loading