Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support message coalescing #306

Merged
merged 12 commits into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 52 additions & 42 deletions lib/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ export class Connection {
// - createSocket: create a new Socket connection
this.options = options;
// id if next command to send
this.commandId = 1;
this.commandId = 2; // socket may send 1 at the start to enable features
// info about active subscriptions and commands in flight
this.commands = new Map();
// map of event listeners
Expand Down Expand Up @@ -349,57 +349,67 @@ export class Connection {
}

private _handleMessage = (event: MessageEvent) => {
const message: WebSocketResponse = JSON.parse(event.data);
let messageGroup: WebSocketResponse | WebSocketResponse[] = JSON.parse(
event.data
);

if (DEBUG) {
console.log("Received", message);
if (!Array.isArray(messageGroup)) {
messageGroup = [messageGroup];
}

const info = this.commands.get(message.id);

switch (message.type) {
case "event":
if (info) {
(info as SubscribeEventCommmandInFlight<any>).callback(message.event);
} else {
console.warn(
`Received event for unknown subscription ${message.id}. Unsubscribing.`
);
this.sendMessagePromise(messages.unsubscribeEvents(message.id));
}
break;
messageGroup.forEach((message) => {
if (DEBUG) {
console.log("Received", message);
}

case "result":
// No info is fine. If just sendMessage is used, we did not store promise for result
if (info) {
if (message.success) {
info.resolve(message.result);
const info = this.commands.get(message.id);

// Don't remove subscriptions.
if (!("subscribe" in info)) {
switch (message.type) {
case "event":
if (info) {
(info as SubscribeEventCommmandInFlight<any>).callback(
message.event
);
} else {
console.warn(
`Received event for unknown subscription ${message.id}. Unsubscribing.`
);
this.sendMessagePromise(messages.unsubscribeEvents(message.id));
}
break;

case "result":
// No info is fine. If just sendMessage is used, we did not store promise for result
if (info) {
if (message.success) {
info.resolve(message.result);

// Don't remove subscriptions.
if (!("subscribe" in info)) {
this.commands.delete(message.id);
}
} else {
info.reject(message.error);
this.commands.delete(message.id);
}
} else {
info.reject(message.error);
}
break;

case "pong":
if (info) {
info.resolve();
this.commands.delete(message.id);
} else {
console.warn(`Received unknown pong response ${message.id}`);
}
}
break;

case "pong":
if (info) {
info.resolve();
this.commands.delete(message.id);
} else {
console.warn(`Received unknown pong response ${message.id}`);
}
break;
break;

default:
if (DEBUG) {
console.warn("Unhandled message", message);
}
}
default:
if (DEBUG) {
console.warn("Unhandled message", message);
}
}
});
};

private _handleClose = async () => {
Expand Down
8 changes: 8 additions & 0 deletions lib/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ export function auth(accessToken: string) {
};
}

export function supportedFeatures() {
return {
type: "supported_features",
id: 1, // Always the first message after auth
features: { coalesce_messages: 1 },
};
}

export function states() {
return {
type: "get_states",
Expand Down
5 changes: 5 additions & 0 deletions lib/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
import { Error } from "./types.js";
import type { ConnectionOptions } from "./connection.js";
import * as messages from "./messages.js";
import { atLeastHaVersion } from "./util.js";

const DEBUG = false;

Expand Down Expand Up @@ -112,6 +113,10 @@ export function createSocket(options: ConnectionOptions): Promise<HaWebSocket> {
socket.removeEventListener("close", closeMessage);
socket.removeEventListener("error", closeMessage);
socket.haVersion = message.ha_version;
if (atLeastHaVersion(socket.haVersion, 2022, 9)) {
socket.send(JSON.stringify(messages.supportedFeatures()));
}

promResolve(socket);
break;

Expand Down