From 5460fd55191fb35e8b8975693e97d58eb00a4964 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 18 Apr 2023 19:30:43 +0200 Subject: [PATCH] deps!: update stream types libp2p streams are now explicit about the types of sync/sources they provide, showing that they are `AsyncGenerator`s and not just `AsyncIterable`s. Refs: https://github.com/achingbrain/it-stream-types/pull/45 BREAKING CHANGE: the type of the source/sink properties have changed --- .gitignore | 10 ++++--- README.md | 15 ++++++++--- package.json | 19 +++++++------- src/duplex.ts | 6 ++--- src/ready.ts | 10 +++---- src/server.ts | 20 +++++++------- src/sink.ts | 10 +++---- src/source.ts | 52 +++++++++++++++++++------------------ src/ws-url.ts | 2 +- test/echo-inline.spec.ts | 2 +- test/echo.spec.ts | 16 +++++++----- test/helpers/server.ts | 4 +-- test/pass-in-server.spec.ts | 2 +- test/server-echo.spec.ts | 2 +- 14 files changed, 94 insertions(+), 76 deletions(-) diff --git a/.gitignore b/.gitignore index 1e9c6b9..7ad9e67 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,9 @@ node_modules -.nyc_output +build dist -coverage -package-lock.json \ No newline at end of file +.docs +.coverage +node_modules +package-lock.json +yarn.lock +.vscode diff --git a/README.md b/README.md index 21c3fc4..c60c565 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,14 @@ # it-ws [![codecov](https://img.shields.io/codecov/c/github/alanshaw/it-ws.svg?style=flat-square)](https://codecov.io/gh/alanshaw/it-ws) -[![CI](https://img.shields.io/github/workflow/status/alanshaw/it-ws/test%20&%20maybe%20release/master?style=flat-square)](https://github.com/alanshaw/it-ws/actions/workflows/js-test-and-release.yml) +[![CI](https://img.shields.io/github/actions/workflow/status/alanshaw/it-ws/js-test-and-release.yml?branch=master\&style=flat-square)](https://github.com/alanshaw/it-ws/actions/workflows/js-test-and-release.yml?query=branch%3Amaster) > Simple async iterables for websocket client connections ## Table of contents - [Install](#install) + - [Browser ` +``` + ## Usage ### Example - client @@ -259,6 +268,6 @@ Licensed under either of - Apache 2.0, ([LICENSE-APACHE](LICENSE-APACHE) / ) - MIT ([LICENSE-MIT](LICENSE-MIT) / ) -## Contribute +## Contribution Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions. diff --git a/package.json b/package.json index 67165dc..6b62ddc 100644 --- a/package.json +++ b/package.json @@ -43,7 +43,7 @@ }, "files": [ "src", - "dist/src", + "dist", "!dist/test", "!**/*.tsbuildinfo" ], @@ -176,26 +176,27 @@ "test:firefox-webworker": "aegir test -t webworker -- --browser firefox", "test:node": "aegir test -t node --cov", "test:electron-main": "aegir test -t electron-main", - "release": "aegir release" + "release": "aegir release", + "docs": "aegir docs" }, "dependencies": { "event-iterator": "^2.0.0", "iso-url": "^1.1.2", - "it-stream-types": "^1.0.2", + "it-stream-types": "^2.0.1", "uint8arrays": "^4.0.2", "ws": "^8.4.0" }, "devDependencies": { "@types/ws": "^8.2.2", - "aegir": "^37.0.15", + "aegir": "^38.1.8", "delay": "^5.0.0", - "it-all": "^2.0.0", - "it-drain": "^2.0.0", - "it-foreach": "^1.0.0", + "it-all": "^3.0.1", + "it-drain": "^3.0.1", + "it-foreach": "^2.0.2", "it-goodbye": "^4.0.0", - "it-map": "^2.0.0", + "it-map": "^3.0.2", "it-ndjson": "^1.0.0", - "it-pipe": "^2.0.3", + "it-pipe": "^3.0.1", "p-defer": "^4.0.0", "wherearewe": "^2.0.1", "wsurl": "^1.0.0" diff --git a/src/duplex.ts b/src/duplex.ts index b6c0ac2..b27b426 100644 --- a/src/duplex.ts +++ b/src/duplex.ts @@ -2,9 +2,9 @@ import source from './source.js' import sink from './sink.js' import type WebSocket from './web-socket.js' import type { SinkOptions } from './sink.js' -import type { Duplex } from 'it-stream-types' +import type { Duplex, Source } from 'it-stream-types' -export interface DuplexWebSocket extends Duplex> { +export interface DuplexWebSocket extends Duplex, Source, Promise> { connected: () => Promise localAddress?: string localPort?: number @@ -43,7 +43,7 @@ export default (socket: WebSocket, options?: DuplexWebSocketOptions): DuplexWebS const duplex: DuplexWebSocket = { sink: sink(socket, options), source: connectedSource, - connected: async () => await connectedSource.connected(), + connected: async () => { await connectedSource.connected() }, close: async () => { if (socket.readyState === socket.CONNECTING || socket.readyState === socket.OPEN) { await new Promise((resolve) => { diff --git a/src/ready.ts b/src/ready.ts index a1db440..e1533fc 100644 --- a/src/ready.ts +++ b/src/ready.ts @@ -1,6 +1,6 @@ import type { ErrorEvent, WebSocket } from 'ws' -export default (socket: WebSocket) => { +export default async (socket: WebSocket): Promise => { // if the socket is closing or closed, return end if (socket.readyState >= 2) { throw new Error('socket closed') @@ -11,18 +11,18 @@ export default (socket: WebSocket) => { return } - return new Promise((resolve, reject) => { - function cleanup () { + await new Promise((resolve, reject) => { + function cleanup (): void { socket.removeEventListener('open', handleOpen) socket.removeEventListener('error', handleErr) } - function handleOpen () { + function handleOpen (): void { cleanup() resolve() } - function handleErr (event: ErrorEvent) { + function handleErr (event: ErrorEvent): void { cleanup() reject(event.error ?? new Error(`connect ECONNREFUSED ${socket.url}`)) } diff --git a/src/server.ts b/src/server.ts index 2a226d5..3976d20 100644 --- a/src/server.ts +++ b/src/server.ts @@ -29,26 +29,26 @@ class Server extends EventEmitter { opts = opts ?? {} this.server = server this.wsServer = new WSServer({ - server: server, + server, perMessageDeflate: false, verifyClient: opts.verifyClient }) this.wsServer.on('connection', this.onWsServerConnection.bind(this)) } - async listen (addrInfo: { port: number } | number) { + async listen (addrInfo: { port: number } | number): Promise { return await new Promise((resolve, reject) => { - this.wsServer.once('error', (e) => reject(e)) - this.wsServer.once('listening', () => resolve(this)) + this.wsServer.once('error', (e) => { reject(e) }) + this.wsServer.once('listening', () => { resolve(this) }) this.server.listen(typeof addrInfo === 'number' ? addrInfo : addrInfo.port) }) } - async close () { - return await new Promise((resolve, reject) => { + async close (): Promise { + await new Promise((resolve, reject) => { this.server.close((err) => { if (err != null) { - return reject(err) + reject(err); return } resolve() @@ -56,11 +56,11 @@ class Server extends EventEmitter { }) } - address () { + address (): string | AddressInfo | null { return this.server.address() } - onWsServerConnection (socket: WebSocket, req: http.IncomingMessage) { + onWsServerConnection (socket: WebSocket, req: http.IncomingMessage): void { const addr = this.wsServer.address() if (typeof addr === 'string') { @@ -96,7 +96,7 @@ export function createServer (opts?: ServerOptions): WebSocketServer { wss.on('connection', opts.onConnection) } - function proxy (server: http.Server, event: string) { + function proxy (server: http.Server, event: string): http.Server { return server.on(event, (...args: any[]) => { wss.emit(event, ...args) }) diff --git a/src/sink.ts b/src/sink.ts index be73465..ea7826f 100644 --- a/src/sink.ts +++ b/src/sink.ts @@ -1,16 +1,16 @@ import ready from './ready.js' import type { WebSocket } from 'ws' -import type { Sink } from 'it-stream-types' +import type { Sink, Source } from 'it-stream-types' export interface SinkOptions { closeOnEnd?: boolean } -export default (socket: WebSocket, options: SinkOptions) => { +export default (socket: WebSocket, options: SinkOptions): Sink, Promise> => { options = options ?? {} options.closeOnEnd = options.closeOnEnd !== false - const sink: Sink> = async source => { + const sink: Sink, Promise> = async source => { for await (const data of source) { try { await ready(socket) @@ -23,7 +23,7 @@ export default (socket: WebSocket, options: SinkOptions) => { } if (options.closeOnEnd != null && socket.readyState <= 1) { - return await new Promise((resolve, reject) => { + await new Promise((resolve, reject) => { socket.addEventListener('close', event => { if (event.wasClean || event.code === 1006) { resolve() @@ -33,7 +33,7 @@ export default (socket: WebSocket, options: SinkOptions) => { } }) - setTimeout(() => socket.close()) + setTimeout(() => { socket.close() }) }) } } diff --git a/src/source.ts b/src/source.ts index 9cd008f..b8e3751 100644 --- a/src/source.ts +++ b/src/source.ts @@ -10,40 +10,42 @@ function isArrayBuffer (obj: any): obj is ArrayBuffer { (obj?.constructor?.name === 'ArrayBuffer' && typeof obj?.byteLength === 'number') } -export interface ConnectedSource extends AsyncIterable { +export interface ConnectedSource extends AsyncGenerator { connected: () => Promise } export default (socket: WebSocket): ConnectedSource => { socket.binaryType = 'arraybuffer' - const connected = async () => await new Promise((resolve, reject) => { - if (isConnected) { - return resolve() - } - if (connError != null) { - return reject(connError) - } - - const cleanUp = (cont: () => void) => { - socket.removeEventListener('open', onOpen) - socket.removeEventListener('error', onError) - cont() - } - - const onOpen = () => cleanUp(resolve) - const onError = (event: ErrorEvent) => { - cleanUp(() => reject(event.error ?? new Error(`connect ECONNREFUSED ${socket.url}`))) - } - - socket.addEventListener('open', onOpen) - socket.addEventListener('error', onError) - }) + const connected = async (): Promise => { + await new Promise((resolve, reject) => { + if (isConnected) { + resolve(); return + } + if (connError != null) { + reject(connError); return + } + + const cleanUp = (cont: () => void): void => { + socket.removeEventListener('open', onOpen) + socket.removeEventListener('error', onError) + cont() + } + + const onOpen = (): void => { cleanUp(resolve) } + const onError = (event: ErrorEvent): void => { + cleanUp(() => { reject(event.error ?? new Error(`connect ECONNREFUSED ${socket.url}`)) }) + } + + socket.addEventListener('open', onOpen) + socket.addEventListener('error', onError) + }) + } const source = (async function * () { const messages = new EventIterator( ({ push, stop, fail }) => { - const onMessage = (event: MessageEvent) => { + const onMessage = (event: MessageEvent): void => { let data: Uint8Array | null = null if (typeof event.data === 'string') { @@ -64,7 +66,7 @@ export default (socket: WebSocket): ConnectedSource => { push(data) } - const onError = (event: ErrorEvent) => fail(event.error ?? new Error('Socket error')) + const onError = (event: ErrorEvent): void => { fail(event.error ?? new Error('Socket error')) } socket.addEventListener('message', onMessage) socket.addEventListener('error', onError) diff --git a/src/ws-url.ts b/src/ws-url.ts index 49cf947..94672fc 100644 --- a/src/ws-url.ts +++ b/src/ws-url.ts @@ -3,4 +3,4 @@ import { relative } from 'iso-url' const map = { http: 'ws', https: 'wss' } const def = 'ws' -export default (url: string, location: string | Partial) => relative(url, location, map, def) +export default (url: string, location: string | Partial): string => relative(url, location, map, def) diff --git a/test/echo-inline.spec.ts b/test/echo-inline.spec.ts index 5ee3549..7347c2a 100644 --- a/test/echo-inline.spec.ts +++ b/test/echo-inline.spec.ts @@ -23,7 +23,7 @@ describe('simple echo server', () => { [1, 2, 3], // need a delay, because otherwise ws hangs up wrong. // otherwise use pull-goodbye. - (source) => map(source, async val => await new Promise(resolve => setTimeout(() => resolve(val), 10))), + (source) => map(source, async val => await new Promise(resolve => setTimeout(() => { resolve(val) }, 10))), (source) => map(ndjson.stringify(source), str => uint8ArrayFromString(str)), WS.connect('ws://localhost:5678'), ndjson.parse, diff --git a/test/echo.spec.ts b/test/echo.spec.ts index 971e9ed..6e2332f 100644 --- a/test/echo.spec.ts +++ b/test/echo.spec.ts @@ -78,13 +78,15 @@ describe('echo', () => { pws, goodbye({ source: expected, - sink: async source => await pipe( - source, - (source) => each(source, (value) => { - expect(value).to.equalBytes(expected.shift()) - }), - drain - ) + sink: async source => { + await pipe( + source, + (source) => each(source, (value) => { + expect(value).to.equalBytes(expected.shift()) + }), + drain + ) + } }), pws ) diff --git a/test/helpers/server.ts b/test/helpers/server.ts index 769adc7..0ee517d 100644 --- a/test/helpers/server.ts +++ b/test/helpers/server.ts @@ -3,7 +3,7 @@ import type { WebSocket } from 'ws' const port = parseInt(process.env.PORT ?? '3000', 10) -export function createTestServer () { +export function createTestServer (): WebSocketServer { const routes: Record void> = { '/read': function (ws: WebSocket) { const values = ['a', 'b', 'c', 'd'] @@ -24,7 +24,7 @@ export function createTestServer () { }) } } - const wss = new WebSocketServer({ port: port }) + const wss = new WebSocketServer({ port }) wss.on('connection', function (ws, req) { if (req.url == null) { diff --git a/test/pass-in-server.spec.ts b/test/pass-in-server.spec.ts index e586856..a317399 100644 --- a/test/pass-in-server.spec.ts +++ b/test/pass-in-server.spec.ts @@ -40,7 +40,7 @@ describe('simple echo server', () => { [1, 2, 3], // need a delay, because otherwise ws hangs up wrong. // otherwise use pull-goodbye. - (source) => map(source, async val => await new Promise(resolve => setTimeout(() => resolve(val), 10))), + (source) => map(source, async val => await new Promise(resolve => setTimeout(() => { resolve(val) }, 10))), (source) => map(ndjson.stringify(source), str => uint8ArrayFromString(str)), stream, ndjson.parse, diff --git a/test/server-echo.spec.ts b/test/server-echo.spec.ts index 9637e53..3eb73a9 100644 --- a/test/server-echo.spec.ts +++ b/test/server-echo.spec.ts @@ -29,7 +29,7 @@ describe('simple echo server', () => { [1, 2, 3], // need a delay, because otherwise ws hangs up wrong. // otherwise use pull-goodbye. - (source) => each(source, async () => await delay(10)), + (source) => each(source, async () => { await delay(10) }), (source) => map(ndjson.stringify(source), str => uint8ArrayFromString(str)), stream, ndjson.parse,