Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: promises support #1644

Merged
merged 15 commits into from
Jul 25, 2023
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -310,12 +310,17 @@ Also user can manually register topic-alias pair using PUBLISH topic:'some', ta:
## API

- [`mqtt.connect()`](#connect)
- [`mqtt.connectAsync()`](#connect-async)
- [`mqtt.Client()`](#client)
- [`mqtt.Client#connect()`](#client-connect)
- [`mqtt.Client#publish()`](#publish)
- [`mqtt.Client#publishAsync()`](#publish-async)
- [`mqtt.Client#subscribe()`](#subscribe)
- [`mqtt.Client#subscribeAsync()`](#subscribe-async)
- [`mqtt.Client#unsubscribe()`](#unsubscribe)
- [`mqtt.Client#unsubscribeAsync()`](#unsubscribe-async)
- [`mqtt.Client#end()`](#end)
- [`mqtt.Client#endAsync()`](#end-async)
- [`mqtt.Client#removeOutgoingMessage()`](#removeOutgoingMessage)
- [`mqtt.Client#reconnect()`](#reconnect)
- [`mqtt.Client#handleMessage()`](#handleMessage)
Expand Down Expand Up @@ -350,6 +355,12 @@ at every connect.
For all MQTT-related options, see the [Client](#client)
constructor.

<a name="connect-async"></a>

### connectAsync([url], options)

Async [`connect`](#connect). Returns a `Promise` that resolves to a `mqtt.Client` instance.

---

<a name="client"></a>
Expand Down Expand Up @@ -574,6 +585,12 @@ Publish a message to a topic
- `callback` - `function (err)`, fired when the QoS handling completes,
or at the next tick if QoS 0. An error occurs if client is disconnecting.

<a name="publish-async"></a>

### mqtt.Client#publishAsync(topic, message, [options])

Async [`publish`](#publish). Returns a `Promise<void>`.

---

<a name="subscribe"></a>
Expand Down Expand Up @@ -601,6 +618,12 @@ Subscribe to a topic or topics
- `topic` is a subscribed to topic
- `qos` is the granted QoS level on it

<a name="subscribe-async"></a>

### mqtt.Client#subscribeAsync(topic/topic array/topic object, [options])

Async [`subscribe`](#subscribe). Returns a `Promise<granted[]>`.

---

<a name="unsubscribe"></a>
Expand All @@ -615,6 +638,12 @@ Unsubscribe from a topic or topics
- `userProperties`: The User Property is allowed to appear multiple times to represent multiple name, value pairs `object`
- `callback` - `function (err)`, fired on unsuback. An error occurs if client is disconnecting.

<a name="unsubscribe-async"></a>

### mqtt.Client#unsubscribeAsync(topic/topic array, [options])

Async [`unsubscribe`](#unsubscribe). Returns a `Promise<void>`.

---

<a name="end"></a>
Expand All @@ -636,6 +665,12 @@ Close the client, accepts the following options:
- `callback`: will be called when the client is closed. This parameter is
optional.

<a name="end-async"></a>

### mqtt.Client#endAsync([force], [options])

Async [`end`](#end). Returns a `Promise<void>`.

---

<a name="removeOutgoingMessage"></a>
Expand Down
109 changes: 97 additions & 12 deletions src/lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -897,9 +897,11 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
// .publish(topic, payload, cb);
if (typeof opts === 'function') {
callback = opts as DoneCallback
opts = {} as IClientPublishOptions
opts = null
}

opts = opts || {}

// default opts
const defaultOpts: IClientPublishOptions = {
qos: 0,
Expand Down Expand Up @@ -968,9 +970,32 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
callback,
})
}

return this
}

public publishAsync(topic: string, message: string | Buffer): Promise<void>
public publishAsync(
topic: string,
message: string | Buffer,
opts?: IClientPublishOptions,
): Promise<void>
public publishAsync(
topic: string,
message: string | Buffer,
opts?: IClientPublishOptions,
): Promise<void> {
return new Promise((resolve, reject) => {
this.publish(topic, message, opts, (err) => {
if (err) {
reject(err)
} else {
resolve()
}
})
})
}

/**
* subscribe - subscribe to <topic>
*
Expand Down Expand Up @@ -1192,6 +1217,28 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
return this
}

public subscribeAsync(
topicObject: string | string[] | ISubscriptionMap,
): Promise<ISubscriptionGrant[]>
public subscribeAsync(
topicObject: string | string[] | ISubscriptionMap,
opts?: IClientSubscribeOptions | IClientSubscribeProperties,
): Promise<ISubscriptionGrant[]>
public subscribeAsync(
topicObject: string | string[] | ISubscriptionMap,
opts?: IClientSubscribeOptions | IClientSubscribeProperties,
): Promise<ISubscriptionGrant[]> {
return new Promise((resolve, reject) => {
this.subscribe(topicObject, opts, (err, granted) => {
if (err) {
reject(err)
} else {
resolve(granted)
}
})
})
}

/**
* unsubscribe - unsubscribe from topic(s)
*
Expand Down Expand Up @@ -1297,6 +1344,26 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
return this
}

public unsubscribeAsync(topic: string | string[]): Promise<void>
public unsubscribeAsync(
topic: string | string[],
opts?: IClientSubscribeOptions,
): Promise<void>
public unsubscribeAsync(
topic: string | string[],
opts?: IClientSubscribeOptions,
): Promise<void> {
return new Promise((resolve, reject) => {
this.unsubscribe(topic, opts, (err) => {
if (err) {
reject(err)
} else {
resolve()
}
})
})
}

/**
* end - close connection
*
Expand Down Expand Up @@ -1324,25 +1391,21 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
this.log('end :: (%s)', this.options.clientId)

if (force == null || typeof force !== 'boolean') {
cb = (opts || this.noop) as DoneCallback
cb = cb || (opts as DoneCallback)
opts = force as Partial<IDisconnectPacket>
force = false
if (typeof opts !== 'object') {
cb = opts
opts = null
if (typeof cb !== 'function') {
cb = this.noop
}
}
}

if (typeof opts !== 'object') {
cb = opts
cb = cb || opts
opts = null
}

this.log('end :: cb? %s', !!cb)
cb = cb || this.noop

if (!cb || typeof cb !== 'function') {
cb = this.noop
}

const closeStores = () => {
this.log('end :: closeStores: closing incoming and outgoing stores')
Expand Down Expand Up @@ -1414,6 +1477,28 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
return this
}

public endAsync(): Promise<void>
public endAsync(force?: boolean): Promise<void>
public endAsync(opts?: Partial<IDisconnectPacket>): Promise<void>
public endAsync(
force?: boolean,
opts?: Partial<IDisconnectPacket>,
): Promise<void>
public endAsync(
force?: boolean | Partial<IDisconnectPacket>,
opts?: Partial<IDisconnectPacket>,
): Promise<void> {
return new Promise((resolve, reject) => {
this.end(force as boolean, opts, (err) => {
if (err) {
reject(err)
} else {
resolve()
}
})
})
}

/**
* removeOutgoingMessage - remove a message in outgoing store
* the outgoing callback will be called withe Error('Message removed') if the message is removed
Expand Down Expand Up @@ -1547,7 +1632,7 @@ export default class MqttClient extends TypedEventEmitter<MqttClientEventCallbac
}
}

private _checkDisconnecting(callback: GenericCallback<any>) {
private _checkDisconnecting(callback?: GenericCallback<any>) {
if (this.disconnecting) {
if (callback && callback !== this.noop) {
callback(new Error('client disconnecting'))
Expand Down
66 changes: 65 additions & 1 deletion src/lib/connect/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
/* eslint-disable @typescript-eslint/no-var-requires */
import url from 'url'
import MqttClient, { IClientOptions, MqttProtocol } from '../client'
import MqttClient, {
IClientOptions,
MqttClientEventCallbacks,
MqttProtocol,
} from '../client'
import IS_BROWSER from '../is-browser'
import Store from '../store'
import DefaultMessageIdProvider from '../default-message-id-provider'
Expand Down Expand Up @@ -177,4 +181,64 @@ function connect(
return client
}

function connectAsync(brokerUrl: string): Promise<MqttClient>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unsure about connectAsync. I feel like all the async functions should just be simple "promisification" of a callback-style function, but connectAsync is doing more than that. Maybe have an equivalent function in callback style?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe pick a different name for connectAsync to make it clear that it's doing something different from connect

Copy link
Member Author

@robertsLando robertsLando Jul 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

connect doesn't accept a callback so a 'promisified' version of it just makes no sense.

connectAsync does much more because it returns a client just when it actually connects and rejects if there is a connection error, that's how most users expect it to work and what async-mqtt does: https://github.com/mqttjs/async-mqtt/blob/master/index.js#L144

Copy link
Member Author

@robertsLando robertsLando Jul 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe pick a different name for connectAsync to make it clear that it's doing something different from connect

How would you call it? I tried to keep the compatibility for users using async-mqtt, now they don't need that library anymore

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I'm actually struggling to think of a name.

I think the problem is that we use connect() to instantiate a client, but I think a better API would have the user call the MqttClient constructor and then call connect() or connectAsync() which would call the callback or resolve with a CONNACK instead of the client.

Since we're trying to keep the API close to the v4 API, I think we should just keep what's here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok so I'm going to merge this for now and I think I will release a stable v5 after this. What do you think? So we will get more feedbacks

function connectAsync(opts: IClientOptions): Promise<MqttClient>
function connectAsync(
brokerUrl: string,
opts?: IClientOptions,
): Promise<MqttClient>
function connectAsync(
brokerUrl: string | IClientOptions,
opts?: IClientOptions,
allowRetries = true,
): Promise<MqttClient> {
return new Promise((resolve, reject) => {
const client = connect(brokerUrl as string, opts)

const promiseResolutionListeners: Partial<MqttClientEventCallbacks> = {
connect: (connack) => {
removePromiseResolutionListeners()
resolve(client) // Resolve on connect
},
end: () => {
removePromiseResolutionListeners()
resolve(client) // Resolve on end
},
error: (err) => {
removePromiseResolutionListeners()
client.end()
reject(err) // Reject on error
},
}

// If retries are not allowed, reject on close
if (allowRetries === false) {
promiseResolutionListeners.close = () => {
promiseResolutionListeners.error(
new Error("Couldn't connect to server"),
)
}
}

// Remove listeners added to client by this promise
function removePromiseResolutionListeners() {
Object.keys(promiseResolutionListeners).forEach((eventName) => {
client.off(
eventName as keyof MqttClientEventCallbacks,
promiseResolutionListeners[eventName],
)
})
}

// Add listeners to client
Object.keys(promiseResolutionListeners).forEach((eventName) => {
client.on(
eventName as keyof MqttClientEventCallbacks,
promiseResolutionListeners[eventName],
)
})
})
}

export default connect
export { connectAsync }
1 change: 0 additions & 1 deletion src/lib/connect/tls.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ const buildStream: StreamBuilder = (client, opts) => {
)

const connection = tls.connect(opts)
/* eslint no-use-before-define: [2, "nofunc"] */
connection.on('secureConnect', () => {
if (opts.rejectUnauthorized && !connection.authorized) {
connection.emit('error', new Error('TLS not authorized'))
Expand Down
4 changes: 3 additions & 1 deletion src/mqtt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@ import MqttClient from './lib/client'
import DefaultMessageIdProvider from './lib/default-message-id-provider'
import UniqueMessageIdProvider from './lib/unique-message-id-provider'
import Store, { IStore } from './lib/store'
import connect from './lib/connect'
import connect, { connectAsync } from './lib/connect'

export const Client = MqttClient
export {
connect,
connectAsync,
MqttClient,
Store,
DefaultMessageIdProvider,
UniqueMessageIdProvider,
IStore,
}
export * from './lib/client'
export { ReasonCodes } from './lib/handlers/ack'
Loading