Skip to content

Commit

Permalink
pkg/lib: add new public "Channel" API
Browse files Browse the repository at this point in the history
This is the API that we intend all future channel implementations
(including users outside of cockpit.js) to use.  Port fsinfo.
  • Loading branch information
allisonkarlitskaya committed Jul 9, 2024
1 parent f2589fc commit 5a1804e
Show file tree
Hide file tree
Showing 2 changed files with 245 additions and 6 deletions.
240 changes: 240 additions & 0 deletions pkg/lib/channel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
/*
* This file is part of Cockpit.
*
* Copyright (C) 2024 Red Hat, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

import { JsonObject } from './_internal/common';
import { Transport, ensure_transport, transport_globals } from './_internal/transport';
import { EventEmitter } from './event';

type ChannelPayload = string | Uint8Array;

type ChannelBinaryOption<P extends ChannelPayload> =
P extends Uint8Array ?
{ binary: true }
: P extends string ?
{ binary?: false }
:
{ binary: boolean }
;

type ChannelOptions<P extends ChannelPayload> = JsonObject & ChannelBinaryOption<P> & {
command?: never;
channel?: never;
host?: string;
payload: string;
superuser?: "try" | "require";
};

interface ChannelControlMessage extends JsonObject {
command: string;
}

interface ChannelEvents<P extends ChannelPayload> {
control(options: ChannelControlMessage): void;
done(options: ChannelControlMessage): void;
ready(options: ChannelControlMessage): void;
close(options: ChannelControlMessage): void;
data(data: P): void;
}

export class Channel<P extends ChannelPayload = ChannelPayload> extends EventEmitter<ChannelEvents<P>> {
_payload_type: P | null = null; // required to forbid mixing types

id: string | null = null; // can be unassigned during transport startup
readonly options: ChannelOptions<P>;
readonly binary: ChannelBinaryOption<P>['binary'];

#transport: Transport | null = null;
#received: Partial<Record<"close" | "ready" | "done", ChannelControlMessage>> = {};
#queue: ([true, ChannelControlMessage] | [false, P])[] = [];

on_control(control: ChannelControlMessage) {
const command = control.command;

if (command === 'ready' || command === 'close' || command === 'done') {
if (command in this.#received) {
return;
}

this.#received[command] = control;
this.emit(command, control);
} else {
this.emit('control', control);
}

if (command === 'close') {
if (this.#transport && this.id)
this.#transport.unregister(this.id);
if (control.message)
console.warn(control.message);
}
}

constructor(options: ChannelOptions<P>) {
super();

this.options = { ...options };
this.binary = (options.binary === true);

ensure_transport(transport => {
if ('close' in this.#received)
return;

this.#transport = transport;
this.id = transport.next_channel();
transport.register(this.id, control => this.on_control(control), data => this.emit('data', data));

// We need to delay sending the open message until after we have
// the transport because we need to set the host field
const command: ChannelControlMessage = {
...this.options, command: 'open', channel: this.id, 'flow-control': true
};

if (!('host' in command) && transport_globals.default_host) {
command.host = transport_globals.default_host;
}

if (this.binary) {
command.binary = "raw";
} else {
delete command.binary;
}

// Go direct: we need this to go before the rest of the queue
transport.send_control(command);

// Now send everything else from the queue
for (const [is_control, message] of this.#queue) {
if (is_control) {
transport.send_control({ ...message, channel: this.id });
} else {
transport.send_message(message, this.id);
}
}
this.#queue = [];
});
}

/**
* Sends a payload frame.
*
* @message the payload to send, either a string or a Uint8Array.
*/
send_data(message: P) {
if (this.#transport && this.id) {
this.#transport.send_message(message, this.id);
} else {
this.#queue.push([false, message]);
}
}

/**
* Sends a control message on the channel.
*
* @options: the message to send. A command must be specified.
*/
send_control(options: ChannelControlMessage) {
// A sent close message gets handled as if the exact same close message
// was received. This allows signalling your own code for cancellation, etc.
if (options.command === 'close') {
this.on_control(options);
}

if (this.#transport) {
this.#transport.send_control({ ...options, channel: this.id });
} else {
this.#queue.push([true, options]);
}
}

/**
* Sends a done control message on the channel. This is something like
* EOF: it means that you won't send any more data using `.send_data()`.
*
* @options: optional extra arguments for the message.
*/
done(options?: JsonObject) {
this.send_control({ ...options, command: 'done' });
}

/**
* Closes the channel.
*
* This means that you're completely finished with the channel. Any
* underlying resources will be freed as soon as possible. When you call
* this you'll receive a 'close' signal (synchronously) and then nothing
* else.
*
* @problem: a problem code. If this is unset it implies something like a
* "successful" close. Otherwise, it indicates an error.
* @options: the bridge will ignore this, but it will be thrown as the
* result of any pending wait() operations and passed to the 'close' signal
* handler, so you can use it to communicate with your own code.
*/
close(problem?: string, options?: JsonObject): void {
this.send_control({ ...options, ...problem && { problem }, command: 'close' });
}

/**
* Waits for the result of the channel open request.
*
* @return: the content of the ready message, on success
* @throws: the content of the close message, on fail
*/
wait(): Promise<JsonObject> {
return new Promise((resolve, reject) => {
// If we got ready and closed then it's not an error.
// Resolve with the ready message.
if ('ready' in this.#received) {
resolve(this.#received.ready);
} else if ('close' in this.#received) {
reject(this.#received.close);
} else {
this.on('ready', resolve);
this.on('close', reject);
}
});
}

/**
* Provides a text description of the channel.
*/
toString(): string {
const state =
(!this.id && 'waiting for transport') ||
(this.#received.close?.problem && `${this.id} error ${this.#received.close.problem}`) ||
(this.#received.close && `${this.id} closed`) ||
(this.#received.ready && `${this.id} open`) ||
`${this.id} waiting for open`;

const host = this.options.host || "localhost";

return `[Channel ${state} -> ${this.options.payload}@${host}]`;
}

/**
* Open a new channel to the bridge.
*
* @options: The options for the channel. A payload type must be specified.
*/
static open(options: ChannelOptions<string>): Channel<string>;
static open(options: ChannelOptions<Uint8Array>): Channel<Uint8Array>;
static open(options: ChannelOptions<ChannelPayload>): Channel<ChannelPayload> {
return new Channel(options);
}
}
11 changes: 5 additions & 6 deletions pkg/lib/fsinfo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
'use strict';

import type { JsonObject, JsonValue } from './_internal/common';
import { Channel } from './_internal/channel';
import { Channel } from './channel';
import { EventEmitter } from './event';
import type cockpit from 'cockpit';

function is_json_dict(value: JsonValue): value is JsonObject {
return value?.constructor === Object;
Expand Down Expand Up @@ -83,7 +82,7 @@ export class FsInfoClient extends EventEmitter<FsInfoEvents> {
state: FsInfoState = { loading: true };

private partial_state: JsonValue = null;
private channel: cockpit.Channel<string>;
private channel: Channel<string>;

constructor(path: string, attrs: (keyof FileInfo)[], options?: JsonObject) {
super();
Expand All @@ -94,9 +93,9 @@ export class FsInfoClient extends EventEmitter<FsInfoEvents> {
attrs,
watch: true,
...options
}) as unknown as cockpit.Channel<string>;
});

this.channel.addEventListener("message", (_event, payload) => {
this.channel.on('data', payload => {
this.partial_state = json_merge(this.partial_state, JSON.parse(payload));

if (is_json_dict(this.partial_state) && !this.partial_state.partial) {
Expand All @@ -105,7 +104,7 @@ export class FsInfoClient extends EventEmitter<FsInfoEvents> {
}
});

this.channel.addEventListener("close", (_event, message) => {
this.channel.on('close', message => {
this.emit('close', message);
});
}
Expand Down

0 comments on commit 5a1804e

Please sign in to comment.