diff --git a/README.md b/README.md
index 630c7cb0..599f81f9 100644
--- a/README.md
+++ b/README.md
@@ -52,25 +52,28 @@ const roots = {
```ts
import https from 'https';
+import ws from 'ws'; // yarn add ws
+import { useServer } from 'graphql-ws/lib/use/ws';
import { execute, subscribe } from 'graphql';
-import { createServer } from 'graphql-ws';
const server = https.createServer(function weServeSocketsOnly(_, res) {
res.writeHead(404);
res.end();
});
-createServer(
+const wsServer = new ws.Server({
+ server,
+ path: '/graphql',
+});
+
+useServer(
{
schema, // from the previous step
roots, // from the previous step
execute,
subscribe,
},
- {
- server,
- path: '/graphql',
- },
+ wsServer,
);
server.listen(443);
@@ -79,7 +82,7 @@ server.listen(443);
#### Use the client
```ts
-import { createClient } from 'graphql-ws';
+import { createClient } from 'graphql-ws/lib/use/ws';
const client = createClient({
url: 'wss://welcomer.com/graphql',
@@ -133,7 +136,7 @@ const client = createClient({
🔗 Client usage with Promise
```ts
-import { createClient, SubscribePayload } from 'graphql-ws';
+import { createClient, SubscribePayload } from 'graphql-ws/lib/use/ws';
const client = createClient({
url: 'wss://hey.there/graphql',
@@ -170,7 +173,7 @@ async function execute(payload: SubscribePayload) {
🔗 Client usage with AsyncIterator
```ts
-import { createClient, SubscribePayload } from 'graphql-ws';
+import { createClient, SubscribePayload } from 'graphql-ws/lib/use/ws';
const client = createClient({
url: 'wss://iterators.ftw/graphql',
@@ -281,7 +284,7 @@ import {
RequestParameters,
Variables,
} from 'relay-runtime';
-import { createClient } from 'graphql-ws';
+import { createClient } from 'graphql-ws/lib/use/ws';
const subscriptionsClient = createClient({
url: 'wss://i.love/graphql',
@@ -346,7 +349,7 @@ export const network = Network.create(fetchOrSubscribe, fetchOrSubscribe);
```ts
import { createClient, defaultExchanges, subscriptionExchange } from 'urql';
-import { createClient as createWSClient } from 'graphql-ws';
+import { createClient as createWSClient } from 'graphql-ws/lib/use/ws';
const wsClient = createWSClient({
url: 'wss://its.urql/graphql',
@@ -380,7 +383,7 @@ const client = createClient({
```typescript
import { ApolloLink, Operation, FetchResult, Observable } from '@apollo/client';
import { print, GraphQLError } from 'graphql';
-import { createClient, ClientOptions, Client } from 'graphql-ws';
+import { createClient, ClientOptions, Client } from 'graphql-ws/lib/use/ws';
class WebSocketLink extends ApolloLink {
private client: Client;
@@ -472,13 +475,13 @@ const link = new WebSocketLink({
🔗 Client usage in Node
```ts
-const WebSocket = require('ws'); // yarn add ws
+const ws = require('ws'); // yarn add ws
const Crypto = require('crypto');
const { createClient } = require('graphql-ws');
const client = createClient({
url: 'wss://no.browser/graphql',
- webSocketImpl: WebSocket,
+ webSocketImpl: ws,
/**
* Generates a v4 UUID to be used as the ID.
* Reference: https://stackoverflow.com/a/2117523/709884
@@ -494,14 +497,72 @@ const client = createClient({
+
+🔗 Server usage with ws
+
+```ts
+// minimal version of `import { useServer } from 'graphql-ws/lib/use/ws';`
+
+import http from 'http';
+import ws from 'ws'; // yarn add ws
+import { makeServer, ServerOptions } from 'graphql-ws';
+import { execute, subscribe } from 'graphql';
+import { schema } from 'my-graphql-schema';
+
+// make
+const server = makeServer({
+ schema,
+ execute,
+ subscribe,
+});
+
+// create websocket server
+const wsServer = new ws.Server({
+ server,
+ path: '/graphql',
+});
+
+// implement
+wsServer.on('connection', (socket, request) => {
+ const closed = server.opened(
+ {
+ protocol: socket.protocol,
+ send: (data) =>
+ new Promise((resolve, reject) => {
+ socket.send(data, (err) => (err ? reject(err) : resolve()));
+ }),
+ close: (code, reason) => socket.close(code, reason),
+ onMessage: (cb) =>
+ socket.on('message', async (event) => {
+ try {
+ await cb(event.toString());
+ } catch (err) {
+ socket.close(1011, err.message);
+ }
+ }),
+ },
+ { socket, request },
+ );
+
+ socket.once('close', () => {
+ if (pongWait) clearTimeout(pongWait);
+ if (pingInterval) clearInterval(pingInterval);
+ closed();
+ });
+});
+```
+
+
+
-🔗 Server usage with Express GraphQL
+🔗 ws server usage with Express GraphQL
```typescript
import https from 'https';
+import ws from 'ws'; // yarn add ws
import express from 'express';
import { graphqlHTTP } from 'express-graphql';
-import { createServer } from 'graphql-ws';
+import { useServer } from 'graphql-ws/lib/use/ws';
import { execute, subscribe } from 'graphql';
import { schema } from 'my-graphql-schema';
@@ -512,17 +573,20 @@ app.use('/graphql', graphqlHTTP({ schema }));
// create a http server using express
const server = https.createServer(app);
+// create websocket server
+const wsServer = new ws.Server({
+ server,
+ path: '/graphql',
+});
+
server.listen(443, () => {
- createServer(
+ useServer(
{
schema,
execute,
subscribe,
},
- {
- server,
- path: '/graphql', // you can use the same path too, just use the `ws` schema
- },
+ wsServer,
);
});
```
@@ -530,13 +594,14 @@ server.listen(443, () => {
-🔗 Server usage with Apollo Server Express
+🔗 ws server usage with Apollo Server Express
```typescript
import https from 'https';
import express from 'express';
import { ApolloServer } from 'apollo-server-express';
-import { createServer } from 'graphql-ws';
+import ws from 'ws'; // yarn add ws
+import { useServer } from 'graphql-ws/lib/use/ws';
import { execute, subscribe } from 'graphql';
import { schema } from 'my-graphql-schema';
@@ -552,17 +617,20 @@ apolloServer.applyMiddleware({ app });
// create a http server using express
const server = https.createServer(app);
+// create websocket server
+const wsServer = new ws.Server({
+ server,
+ path: '/graphql',
+});
+
server.listen(443, () => {
- createServer(
+ useServer(
{
schema,
execute,
subscribe,
},
- {
- server,
- path: '/graphql', // you can use the same path too, just use the `ws` schema
- },
+ wsServer,
);
});
```
@@ -570,12 +638,13 @@ server.listen(443, () => {
-🔗 Server usage with console logging
+🔗 ws server usage with console logging
```typescript
import https from 'https';
import { execute, subscribe } from 'graphql';
-import { createServer } from 'graphql-ws';
+import ws from 'ws'; // yarn add ws
+import { useServer } from 'graphql-ws/lib/use/ws';
import { schema } from 'my-graphql-schema';
const server = https.createServer(function weServeSocketsOnly(_, res) {
@@ -583,7 +652,12 @@ const server = https.createServer(function weServeSocketsOnly(_, res) {
res.end();
});
-createServer(
+const wsServer = new ws.Server({
+ server,
+ path: '/graphql',
+});
+
+useServer(
{
schema,
onConnect: (ctx) => {
@@ -602,10 +676,7 @@ createServer(
console.log('Complete', { ctx, msg });
},
},
- {
- server,
- path: '/graphql',
- },
+ wsServer,
);
server.listen(443);
@@ -614,14 +685,14 @@ server.listen(443);
-🔗 Server usage on a multi WebSocket server
+🔗 ws server usage on a multi WebSocket server
```typescript
import https from 'https';
-import WebSocket from 'ws';
+import ws from 'ws'; // yarn add ws
import url from 'url';
import { execute, subscribe } from 'graphql';
-import { createServer, createClient } from 'graphql-ws';
+import { useServer, createClient } from 'graphql-ws/lib/use/ws';
import { schema } from 'my-graphql-schema';
const server = https.createServer(function weServeSocketsOnly(_, res) {
@@ -634,8 +705,8 @@ const server = https.createServer(function weServeSocketsOnly(_, res) {
* - `/wave` sends out waves
* - `/graphql` serves graphql
*/
-const waveWS = new WebSocket.Server({ noServer: true });
-const graphqlWS = new WebSocket.Server({ noServer: true });
+const waveWS = new ws.Server({ noServer: true });
+const graphqlWS = new ws.Server({ noServer: true });
// delegate upgrade requests to relevant destinations
server.on('upgrade', (request, socket, head) => {
@@ -660,7 +731,7 @@ waveWS.on('connection', (socket) => {
});
// serve graphql
-createServer(
+useServer(
{
schema,
execute,
@@ -675,14 +746,15 @@ server.listen(443);
-🔗 Server usage with custom context value
+🔗 ws server usage with custom context value
```typescript
import { validate, execute, subscribe } from 'graphql';
-import { createServer } from 'graphql-ws';
+import ws from 'ws'; // yarn add ws
+import { useServer } from 'graphql-ws/lib/use/ws';
import { schema, roots, getDynamicContext } from 'my-graphql';
-createServer(
+useServer(
{
context: (ctx, msg, args) => {
return getDynamicContext(ctx, msg, args);
@@ -692,24 +764,22 @@ createServer(
execute,
subscribe,
},
- {
- server,
- path: '/graphql',
- },
+ wsServer,
);
```
-🔗 Server usage with custom execution arguments and validation
+🔗 ws server usage with custom execution arguments and validation
```typescript
import { parse, validate, execute, subscribe } from 'graphql';
-import { createServer } from 'graphql-ws';
+import ws from 'ws'; // yarn add ws
+import { useServer } from 'graphql-ws/lib/use/ws';
import { schema, myValidationRules } from 'my-graphql';
-createServer(
+useServer(
{
execute,
subscribe,
@@ -730,23 +800,21 @@ createServer(
return args;
},
},
- {
- server,
- path: '/graphql',
- },
+ wsServer,
);
```
-🔗 Server and client usage with persisted queries
+🔗 ws server and client usage with persisted queries
```typescript
// 🛸 server
import { parse, execute, subscribe } from 'graphql';
-import { createServer } from 'graphql-ws';
+import ws from 'ws'; // yarn add ws
+import { useServer } from 'graphql-ws/lib/use/ws';
import { schema } from 'my-graphql-schema';
// a unique GraphQL execution ID used for representing
@@ -761,7 +829,12 @@ const queriesStore: Record = {
},
};
-createServer(
+const wsServer = new ws.Server({
+ server,
+ path: '/graphql',
+});
+
+useServer(
{
execute,
subscribe,
@@ -777,17 +850,14 @@ createServer(
};
},
},
- {
- server,
- path: '/graphql',
- },
+ wsServer,
);
```
```typescript
// 📺 client
-import { createClient } from 'graphql-ws';
+import { createClient } from 'graphql-ws/lib/use/ws';
const client = createClient({
url: 'wss://persisted.graphql/queries',
diff --git a/docs/README.md b/docs/README.md
index beecc159..975314ab 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -13,3 +13,4 @@
* ["protocol"](modules/_protocol_.md)
* ["server"](modules/_server_.md)
* ["types"](modules/_types_.md)
+* ["use/ws"](modules/_use_ws_.md)
diff --git a/docs/interfaces/_server_.context.md b/docs/interfaces/_server_.context.md
index 7c42b5c3..b3ac4db2 100644
--- a/docs/interfaces/_server_.context.md
+++ b/docs/interfaces/_server_.context.md
@@ -2,7 +2,13 @@
> [Globals](../README.md) / ["server"](../modules/_server_.md) / Context
-# Interface: Context
+# Interface: Context\
+
+## Type parameters
+
+Name | Default |
+------ | ------ |
+`E` | unknown |
## Hierarchy
@@ -15,15 +21,14 @@
* [acknowledged](_server_.context.md#acknowledged)
* [connectionInitReceived](_server_.context.md#connectioninitreceived)
* [connectionParams](_server_.context.md#connectionparams)
-* [request](_server_.context.md#request)
-* [socket](_server_.context.md#socket)
+* [extra](_server_.context.md#extra)
* [subscriptions](_server_.context.md#subscriptions)
## Properties
### acknowledged
-• **acknowledged**: boolean
+• `Readonly` **acknowledged**: boolean
Indicates that the connection was acknowledged
by having dispatched the `ConnectionAck` message
@@ -33,7 +38,7 @@ ___
### connectionInitReceived
-• **connectionInitReceived**: boolean
+• `Readonly` **connectionInitReceived**: boolean
Indicates that the `ConnectionInit` message
has been received by the server. If this is
@@ -44,32 +49,24 @@ ___
### connectionParams
-• `Optional` **connectionParams**: Readonly\>
+• `Optional` `Readonly` **connectionParams**: Readonly\>
The parameters passed during the connection initialisation.
___
-### request
-
-• `Readonly` **request**: IncomingMessage
-
-The initial HTTP request before the actual
-socket and connection is established.
-
-___
-
-### socket
+### extra
-• `Readonly` **socket**: WebSocket
+• **extra**: E
-The actual WebSocket connection between the server and the client.
+An extra field where you can store your own context values
+to pass between callbacks.
___
### subscriptions
-• **subscriptions**: Record\<[ID](../modules/_types_.md#id), AsyncIterator\>
+• `Readonly` **subscriptions**: Record\<[ID](../modules/_types_.md#id), AsyncIterator\>
Holds the active subscriptions for this context.
Subscriptions are for **streaming operations only**,
diff --git a/docs/interfaces/_server_.server.md b/docs/interfaces/_server_.server.md
index 9ba1039c..5adefb02 100644
--- a/docs/interfaces/_server_.server.md
+++ b/docs/interfaces/_server_.server.md
@@ -2,33 +2,47 @@
> [Globals](../README.md) / ["server"](../modules/_server_.md) / Server
-# Interface: Server
+# Interface: Server\
-## Hierarchy
+## Type parameters
+
+Name | Default |
+------ | ------ |
+`E` | undefined |
-* [Disposable](_types_.disposable.md)
+## Hierarchy
- ↳ **Server**
+* **Server**
## Index
-### Properties
+### Methods
+
+* [opened](_server_.server.md#opened)
-* [dispose](_server_.server.md#dispose)
-* [webSocketServer](_server_.server.md#websocketserver)
+## Methods
-## Properties
+### opened
-### dispose
+â–¸ **opened**(`socket`: [WebSocket](_server_.websocket.md), `ctxExtra`: E): function
-• **dispose**: () => void \| Promise\
+New socket has beeen established. The lib will validate
+the protocol and use the socket accordingly. Returned promise
+will resolve after the socket closes.
-*Inherited from [Disposable](_types_.disposable.md).[dispose](_types_.disposable.md#dispose)*
+The second argument will be passed in the `extra` field
+of the `Context`. You may pass the initial request or the
+original WebSocket, if you need it down the road.
-Dispose of the instance and clear up resources.
+Returns a function that should be called when the same socket
+has been closed, for whatever reason. The returned promise will
+resolve once the internal cleanup is complete.
-___
+#### Parameters:
-### webSocketServer
+Name | Type |
+------ | ------ |
+`socket` | [WebSocket](_server_.websocket.md) |
+`ctxExtra` | E |
-• **webSocketServer**: Server
+**Returns:** function
diff --git a/docs/interfaces/_server_.serveroptions.md b/docs/interfaces/_server_.serveroptions.md
index ef31447b..df6756eb 100644
--- a/docs/interfaces/_server_.serveroptions.md
+++ b/docs/interfaces/_server_.serveroptions.md
@@ -2,7 +2,13 @@
> [Globals](../README.md) / ["server"](../modules/_server_.md) / ServerOptions
-# Interface: ServerOptions
+# Interface: ServerOptions\
+
+## Type parameters
+
+Name | Default |
+------ | ------ |
+`E` | unknown |
## Hierarchy
@@ -15,7 +21,6 @@
* [connectionInitWaitTimeout](_server_.serveroptions.md#connectioninitwaittimeout)
* [context](_server_.serveroptions.md#context)
* [execute](_server_.serveroptions.md#execute)
-* [keepAlive](_server_.serveroptions.md#keepalive)
* [onComplete](_server_.serveroptions.md#oncomplete)
* [onConnect](_server_.serveroptions.md#onconnect)
* [onError](_server_.serveroptions.md#onerror)
@@ -48,7 +53,7 @@ ___
### context
-• `Optional` **context**: [GraphQLExecutionContextValue](../modules/_server_.md#graphqlexecutioncontextvalue) \| (ctx: [Context](_server_.context.md), message: [SubscribeMessage](_message_.subscribemessage.md), args: ExecutionArgs) => [GraphQLExecutionContextValue](../modules/_server_.md#graphqlexecutioncontextvalue)
+• `Optional` **context**: [GraphQLExecutionContextValue](../modules/_server_.md#graphqlexecutioncontextvalue) \| (ctx: [Context](_server_.context.md)\, message: [SubscribeMessage](_message_.subscribemessage.md), args: ExecutionArgs) => Promise\<[GraphQLExecutionContextValue](../modules/_server_.md#graphqlexecutioncontextvalue)> \| [GraphQLExecutionContextValue](../modules/_server_.md#graphqlexecutioncontextvalue)
A value which is provided to every resolver and holds
important contextual information like the currently
@@ -78,23 +83,9 @@ in the close event reason.
___
-### keepAlive
-
-• `Optional` **keepAlive**: undefined \| number
-
-The timout between dispatched keep-alive messages. Internally the lib
-uses the [WebSocket Ping and Pongs]((https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_servers#Pings_and_Pongs_The_Heartbeat_of_WebSockets)) to check that the link between
-the clients and the server is operating and to prevent the link from being broken due to idling.
-
-Set to nullish value to disable.
-
-**`default`** 12 * 1000 (12 seconds)
-
-___
-
### onComplete
-• `Optional` **onComplete**: undefined \| (ctx: [Context](_server_.context.md), message: [CompleteMessage](_message_.completemessage.md)) => Promise\ \| void
+• `Optional` **onComplete**: undefined \| (ctx: [Context](_server_.context.md)\, message: [CompleteMessage](_message_.completemessage.md)) => Promise\ \| void
The complete callback is executed after the
operation has completed right before sending
@@ -112,7 +103,7 @@ ___
### onConnect
-• `Optional` **onConnect**: undefined \| (ctx: [Context](_server_.context.md)) => Promise\ \| boolean \| void> \| Record\ \| boolean \| void
+• `Optional` **onConnect**: undefined \| (ctx: [Context](_server_.context.md)\) => Promise\ \| boolean \| void> \| Record\ \| boolean \| void
Is the connection callback called when the
client requests the connection initialisation
@@ -141,7 +132,7 @@ ___
### onError
-• `Optional` **onError**: undefined \| (ctx: [Context](_server_.context.md), message: [ErrorMessage](_message_.errormessage.md), errors: readonly GraphQLError[]) => Promise\ \| readonly GraphQLError[] \| void
+• `Optional` **onError**: undefined \| (ctx: [Context](_server_.context.md)\, message: [ErrorMessage](_message_.errormessage.md), errors: readonly GraphQLError[]) => Promise\ \| readonly GraphQLError[] \| void
Executed after an error occured right before it
has been dispatched to the client.
@@ -159,7 +150,7 @@ ___
### onNext
-• `Optional` **onNext**: undefined \| (ctx: [Context](_server_.context.md), message: [NextMessage](_message_.nextmessage.md), args: ExecutionArgs, result: ExecutionResult) => Promise\ \| ExecutionResult \| void
+• `Optional` **onNext**: undefined \| (ctx: [Context](_server_.context.md)\, message: [NextMessage](_message_.nextmessage.md), args: ExecutionArgs, result: ExecutionResult) => Promise\ \| ExecutionResult \| void
Executed after an operation has emitted a result right before
that result has been sent to the client. Results from both
@@ -178,7 +169,7 @@ ___
### onOperation
-• `Optional` **onOperation**: undefined \| (ctx: [Context](_server_.context.md), message: [SubscribeMessage](_message_.subscribemessage.md), args: ExecutionArgs, result: [OperationResult](../modules/_server_.md#operationresult)) => Promise\<[OperationResult](../modules/_server_.md#operationresult) \| void> \| [OperationResult](../modules/_server_.md#operationresult) \| void
+• `Optional` **onOperation**: undefined \| (ctx: [Context](_server_.context.md)\, message: [SubscribeMessage](_message_.subscribemessage.md), args: ExecutionArgs, result: [OperationResult](../modules/_server_.md#operationresult)) => Promise\<[OperationResult](../modules/_server_.md#operationresult) \| void> \| [OperationResult](../modules/_server_.md#operationresult) \| void
Executed after the operation call resolves. For streaming
operations, triggering this callback does not necessarely
@@ -203,7 +194,7 @@ ___
### onSubscribe
-• `Optional` **onSubscribe**: undefined \| (ctx: [Context](_server_.context.md), message: [SubscribeMessage](_message_.subscribemessage.md)) => Promise\ \| ExecutionArgs \| readonly GraphQLError[] \| void
+• `Optional` **onSubscribe**: undefined \| (ctx: [Context](_server_.context.md)\, message: [SubscribeMessage](_message_.subscribemessage.md)) => Promise\ \| ExecutionArgs \| readonly GraphQLError[] \| void
The subscribe callback executed right after
acknowledging the request before any payload
diff --git a/docs/interfaces/_server_.websocket.md b/docs/interfaces/_server_.websocket.md
new file mode 100644
index 00000000..b333d583
--- /dev/null
+++ b/docs/interfaces/_server_.websocket.md
@@ -0,0 +1,101 @@
+**[graphql-ws](../README.md)**
+
+> [Globals](../README.md) / ["server"](../modules/_server_.md) / WebSocket
+
+# Interface: WebSocket
+
+## Hierarchy
+
+* **WebSocket**
+
+## Index
+
+### Properties
+
+* [protocol](_server_.websocket.md#protocol)
+
+### Methods
+
+* [close](_server_.websocket.md#close)
+* [onMessage](_server_.websocket.md#onmessage)
+* [send](_server_.websocket.md#send)
+
+## Properties
+
+### protocol
+
+• `Readonly` **protocol**: string
+
+The subprotocol of the WebSocket. Will be used
+to validate agains the supported ones.
+
+## Methods
+
+### close
+
+â–¸ **close**(`code`: number, `reason`: string): Promise\ \| void
+
+Closes the socket gracefully. Will always provide
+the appropriate code and close reason.
+
+The returned promise is used to control the graceful
+closure.
+
+#### Parameters:
+
+Name | Type |
+------ | ------ |
+`code` | number |
+`reason` | string |
+
+**Returns:** Promise\ \| void
+
+___
+
+### onMessage
+
+â–¸ **onMessage**(`cb`: (data: string) => Promise\): void
+
+Called when message is received. The library requires the data
+to be a `string`.
+
+All operations requested from the client will block the promise until
+completed, this means that the callback will not resolve until all
+subscription events have been emittet (or until the client has completed
+the stream), or until the query/mutation resolves.
+
+Exceptions raised during any phase of operation processing will
+reject the callback's promise, catch them and communicate them
+to your clients however you wish.
+
+#### Parameters:
+
+Name | Type |
+------ | ------ |
+`cb` | (data: string) => Promise\ |
+
+**Returns:** void
+
+___
+
+### send
+
+â–¸ **send**(`data`: string): Promise\ \| void
+
+Sends a message through the socket. Will always
+provide a `string` message.
+
+Please take care that the send is ready. Meaning,
+only provide a truly OPEN socket through the `opened`
+method of the `Server`.
+
+The returned promise is used to control the flow of data
+(like handling backpressure).
+
+#### Parameters:
+
+Name | Type |
+------ | ------ |
+`data` | string |
+
+**Returns:** Promise\ \| void
diff --git a/docs/interfaces/_types_.disposable.md b/docs/interfaces/_types_.disposable.md
index 4ccba627..f9b20e4a 100644
--- a/docs/interfaces/_types_.disposable.md
+++ b/docs/interfaces/_types_.disposable.md
@@ -10,8 +10,6 @@
↳ [Client](_client_.client.md)
- ↳ [Server](_server_.server.md)
-
## Index
### Properties
diff --git a/docs/interfaces/_use_ws_.extra.md b/docs/interfaces/_use_ws_.extra.md
new file mode 100644
index 00000000..ea5e206e
--- /dev/null
+++ b/docs/interfaces/_use_ws_.extra.md
@@ -0,0 +1,35 @@
+**[graphql-ws](../README.md)**
+
+> [Globals](../README.md) / ["use/ws"](../modules/_use_ws_.md) / Extra
+
+# Interface: Extra
+
+The extra that will be put in the `Context`.
+
+## Hierarchy
+
+* **Extra**
+
+## Index
+
+### Properties
+
+* [request](_use_ws_.extra.md#request)
+* [socket](_use_ws_.extra.md#socket)
+
+## Properties
+
+### request
+
+• `Readonly` **request**: IncomingMessage
+
+The initial HTTP request before the actual
+socket and connection is established.
+
+___
+
+### socket
+
+• `Readonly` **socket**: WebSocket
+
+The actual socket connection between the server and the client.
diff --git a/docs/modules/_server_.md b/docs/modules/_server_.md
index 07c73d72..467a5770 100644
--- a/docs/modules/_server_.md
+++ b/docs/modules/_server_.md
@@ -11,6 +11,7 @@
* [Context](../interfaces/_server_.context.md)
* [Server](../interfaces/_server_.server.md)
* [ServerOptions](../interfaces/_server_.serveroptions.md)
+* [WebSocket](../interfaces/_server_.websocket.md)
### Type aliases
@@ -19,7 +20,7 @@
### Functions
-* [createServer](_server_.md#createserver)
+* [makeServer](_server_.md#makeserver)
## Type aliases
@@ -42,19 +43,26 @@ ___
## Functions
-### createServer
+### makeServer
-â–¸ **createServer**(`options`: [ServerOptions](../interfaces/_server_.serveroptions.md), `websocketOptionsOrServer`: WebSocketServerOptions \| WebSocketServer): [Server](../interfaces/_server_.server.md)
+â–¸ **makeServer**\(`options`: [ServerOptions](../interfaces/_server_.serveroptions.md)\): [Server](../interfaces/_server_.server.md)\
-Creates a protocol complient WebSocket GraphQL
-subscription server. Read more about the protocol
-in the PROTOCOL.md documentation file.
+Makes a Protocol complient WebSocket GraphQL server. The server
+is actually an API which is to be used with your favourite WebSocket
+server library!
+
+Read more about the Protocol in the PROTOCOL.md documentation file.
+
+#### Type parameters:
+
+Name | Default |
+------ | ------ |
+`E` | unknown |
#### Parameters:
Name | Type |
------ | ------ |
-`options` | [ServerOptions](../interfaces/_server_.serveroptions.md) |
-`websocketOptionsOrServer` | WebSocketServerOptions \| WebSocketServer |
+`options` | [ServerOptions](../interfaces/_server_.serveroptions.md)\ |
-**Returns:** [Server](../interfaces/_server_.server.md)
+**Returns:** [Server](../interfaces/_server_.server.md)\
diff --git a/docs/modules/_use_ws_.md b/docs/modules/_use_ws_.md
new file mode 100644
index 00000000..88819f70
--- /dev/null
+++ b/docs/modules/_use_ws_.md
@@ -0,0 +1,34 @@
+**[graphql-ws](../README.md)**
+
+> [Globals](../README.md) / "use/ws"
+
+# Module: "use/ws"
+
+## Index
+
+### Interfaces
+
+* [Extra](../interfaces/_use_ws_.extra.md)
+
+### Functions
+
+* [useServer](_use_ws_.md#useserver)
+
+## Functions
+
+### useServer
+
+â–¸ **useServer**(`options`: [ServerOptions](../interfaces/_server_.serveroptions.md)\<[Extra](../interfaces/_use_ws_.extra.md)>, `ws`: WebSocketServer, `keepAlive?`: number): [Disposable](../interfaces/_types_.disposable.md)
+
+Use the server on a [ws](https://github.com/websockets/ws) ws server.
+This is a basic starter, feel free to copy the code over and adjust it to your needs
+
+#### Parameters:
+
+Name | Type | Default value |
+------ | ------ | ------ |
+`options` | [ServerOptions](../interfaces/_server_.serveroptions.md)\<[Extra](../interfaces/_use_ws_.extra.md)> | - |
+`ws` | WebSocketServer | - |
+`keepAlive` | number | 12 * 1000 |
+
+**Returns:** [Disposable](../interfaces/_types_.disposable.md)
diff --git a/jest.config.js b/jest.config.js
index 95a76c22..864bf74f 100644
--- a/jest.config.js
+++ b/jest.config.js
@@ -2,5 +2,5 @@ module.exports = {
testEnvironment: 'node',
moduleFileExtensions: ['ts', 'js'],
testRegex: '/tests/.+.ts$',
- testPathIgnorePatterns: ['/node_modules/', '/fixtures/'],
+ testPathIgnorePatterns: ['/node_modules/', '/fixtures/', '/utils/'],
};
diff --git a/package.json b/package.json
index 393e2b62..610f23a0 100644
--- a/package.json
+++ b/package.json
@@ -50,9 +50,6 @@
"peerDependencies": {
"graphql": ">=0.11 <=15"
},
- "dependencies": {
- "ws": "^7.4.0"
- },
"devDependencies": {
"@babel/core": "^7.12.3",
"@babel/plugin-proposal-class-properties": "^7.12.1",
@@ -80,6 +77,7 @@
"semantic-release": "^17.2.2",
"typedoc": "^0.19.2",
"typedoc-plugin-markdown": "^3.0.11",
- "typescript": "^4.0.5"
+ "typescript": "^4.0.5",
+ "ws": "^7.4.0"
}
}
diff --git a/src/server.ts b/src/server.ts
index 2dca782b..6e1cf96d 100644
--- a/src/server.ts
+++ b/src/server.ts
@@ -4,8 +4,6 @@
*
*/
-import * as http from 'http';
-import * as WebSocket from 'ws';
import {
OperationTypeNode,
GraphQLSchema,
@@ -17,7 +15,6 @@ import {
SubscriptionArgs,
ExecutionResult,
} from 'graphql';
-import { Disposable } from './types';
import { GRAPHQL_TRANSPORT_WS_PROTOCOL } from './protocol';
import {
Message,
@@ -29,13 +26,7 @@ import {
ErrorMessage,
CompleteMessage,
} from './message';
-import {
- isObject,
- isAsyncIterable,
- hasOwnObjectProperty,
- hasOwnStringProperty,
- areGraphQLErrors,
-} from './utils';
+import { isObject, isAsyncIterable, areGraphQLErrors } from './utils';
import { ID } from './types';
export type OperationResult =
@@ -61,7 +52,7 @@ export type GraphQLExecutionContextValue =
| undefined
| null;
-export interface ServerOptions {
+export interface ServerOptions {
/**
* The GraphQL schema on which the operations
* will be executed and validated against.
@@ -88,7 +79,7 @@ export interface ServerOptions {
context?:
| GraphQLExecutionContextValue
| ((
- ctx: Context,
+ ctx: Context,
message: SubscribeMessage,
args: ExecutionArgs,
) =>
@@ -141,16 +132,6 @@ export interface ServerOptions {
* @default 3 * 1000 (3 seconds)
*/
connectionInitWaitTimeout?: number;
- /**
- * The timout between dispatched keep-alive messages. Internally the lib
- * uses the [WebSocket Ping and Pongs]((https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_servers#Pings_and_Pongs_The_Heartbeat_of_WebSockets)) to check that the link between
- * the clients and the server is operating and to prevent the link from being broken due to idling.
- *
- * Set to nullish value to disable.
- *
- * @default 12 * 1000 (12 seconds)
- */
- keepAlive?: number;
/**
* Is the connection callback called when the
* client requests the connection initialisation
@@ -176,7 +157,7 @@ export interface ServerOptions {
* in the close event reason.
*/
onConnect?: (
- ctx: Context,
+ ctx: Context,
) =>
| Promise | boolean | void>
| Record
@@ -212,7 +193,7 @@ export interface ServerOptions {
* in the close event reason.
*/
onSubscribe?: (
- ctx: Context,
+ ctx: Context,
message: SubscribeMessage,
) =>
| Promise
@@ -240,7 +221,7 @@ export interface ServerOptions {
* in the close event reason.
*/
onOperation?: (
- ctx: Context,
+ ctx: Context,
message: SubscribeMessage,
args: ExecutionArgs,
result: OperationResult,
@@ -259,7 +240,7 @@ export interface ServerOptions {
* in the close event reason.
*/
onError?: (
- ctx: Context,
+ ctx: Context,
message: ErrorMessage,
errors: readonly GraphQLError[],
) => Promise | readonly GraphQLError[] | void;
@@ -278,7 +259,7 @@ export interface ServerOptions {
* in the close event reason.
*/
onNext?: (
- ctx: Context,
+ ctx: Context,
message: NextMessage,
args: ExecutionArgs,
result: ExecutionResult,
@@ -296,61 +277,108 @@ export interface ServerOptions {
* operations even after an abrupt closure, this callback
* will still be called.
*/
- onComplete?: (ctx: Context, message: CompleteMessage) => Promise | void;
+ onComplete?: (
+ ctx: Context,
+ message: CompleteMessage,
+ ) => Promise | void;
}
-export interface Context {
+export interface Server {
/**
- * The actual WebSocket connection between the server and the client.
+ * New socket has beeen established. The lib will validate
+ * the protocol and use the socket accordingly. Returned promise
+ * will resolve after the socket closes.
+ *
+ * The second argument will be passed in the `extra` field
+ * of the `Context`. You may pass the initial request or the
+ * original WebSocket, if you need it down the road.
+ *
+ * Returns a function that should be called when the same socket
+ * has been closed, for whatever reason. The returned promise will
+ * resolve once the internal cleanup is complete.
*/
- readonly socket: WebSocket;
+ opened(socket: WebSocket, ctxExtra: E): () => Promise; // closed
+}
+
+export interface WebSocket {
/**
- * The initial HTTP request before the actual
- * socket and connection is established.
+ * The subprotocol of the WebSocket. Will be used
+ * to validate agains the supported ones.
*/
- readonly request: http.IncomingMessage;
+ readonly protocol: string;
+ /**
+ * Sends a message through the socket. Will always
+ * provide a `string` message.
+ *
+ * Please take care that the send is ready. Meaning,
+ * only provide a truly OPEN socket through the `opened`
+ * method of the `Server`.
+ *
+ * The returned promise is used to control the flow of data
+ * (like handling backpressure).
+ */
+ send(data: string): Promise | void;
+ /**
+ * Closes the socket gracefully. Will always provide
+ * the appropriate code and close reason.
+ *
+ * The returned promise is used to control the graceful
+ * closure.
+ */
+ close(code: number, reason: string): Promise | void;
+ /**
+ * Called when message is received. The library requires the data
+ * to be a `string`.
+ *
+ * All operations requested from the client will block the promise until
+ * completed, this means that the callback will not resolve until all
+ * subscription events have been emittet (or until the client has completed
+ * the stream), or until the query/mutation resolves.
+ *
+ * Exceptions raised during any phase of operation processing will
+ * reject the callback's promise, catch them and communicate them
+ * to your clients however you wish.
+ */
+ onMessage(cb: (data: string) => Promise): void;
+}
+
+export interface Context {
/**
* Indicates that the `ConnectionInit` message
* has been received by the server. If this is
* `true`, the client wont be kicked off after
* the wait timeout has passed.
*/
- connectionInitReceived: boolean;
+ readonly connectionInitReceived: boolean;
/**
* Indicates that the connection was acknowledged
* by having dispatched the `ConnectionAck` message
* to the related client.
*/
- acknowledged: boolean;
+ readonly acknowledged: boolean;
/** The parameters passed during the connection initialisation. */
- connectionParams?: Readonly>;
+ readonly connectionParams?: Readonly>;
/**
* Holds the active subscriptions for this context.
* Subscriptions are for **streaming operations only**,
* those that resolve once wont be added here.
*/
- subscriptions: Record>;
-}
-
-export interface Server extends Disposable {
- webSocketServer: WebSocket.Server;
+ readonly subscriptions: Record>;
+ /**
+ * An extra field where you can store your own context values
+ * to pass between callbacks.
+ */
+ extra: E;
}
-// for documentation gen only
-type WebSocketServerOptions = WebSocket.ServerOptions;
-type WebSocketServer = WebSocket.Server;
-
/**
- * Creates a protocol complient WebSocket GraphQL
- * subscription server. Read more about the protocol
- * in the PROTOCOL.md documentation file.
+ * Makes a Protocol complient WebSocket GraphQL server. The server
+ * is actually an API which is to be used with your favourite WebSocket
+ * server library!
+ *
+ * Read more about the Protocol in the PROTOCOL.md documentation file.
*/
-export function createServer(
- options: ServerOptions,
- websocketOptionsOrServer: WebSocketServerOptions | WebSocketServer,
-): Server {
- const isProd = process.env.NODE_ENV === 'production';
-
+export function makeServer(options: ServerOptions): Server {
const {
schema,
context,
@@ -358,7 +386,6 @@ export function createServer(
execute,
subscribe,
connectionInitWaitTimeout = 3 * 1000, // 3 seconds
- keepAlive = 12 * 1000, // 12 seconds
onConnect,
onSubscribe,
onOperation,
@@ -366,177 +393,87 @@ export function createServer(
onError,
onComplete,
} = options;
- const webSocketServer = isWebSocketServer(websocketOptionsOrServer)
- ? websocketOptionsOrServer
- : new WebSocket.Server(websocketOptionsOrServer);
- webSocketServer.on('connection', handleConnection);
- function handleConnection(socket: WebSocket, request: http.IncomingMessage) {
- if (
- Array.isArray(socket.protocol)
- ? socket.protocol.indexOf(GRAPHQL_TRANSPORT_WS_PROTOCOL) === -1
- : socket.protocol !== GRAPHQL_TRANSPORT_WS_PROTOCOL
- ) {
- return socket.close(1002, 'Protocol Error');
- }
+ return {
+ opened(socket, extra) {
+ if (socket.protocol !== GRAPHQL_TRANSPORT_WS_PROTOCOL) {
+ socket.close(1002, 'Protocol Error');
+ return async () => {
+ /* nothing was set up */
+ };
+ }
- const ctxRef: { current: Context } = {
- current: {
- socket,
- request,
+ const ctx: Context = {
connectionInitReceived: false,
acknowledged: false,
subscriptions: {},
- },
- };
-
- // kick the client off (close socket) if the connection has
- // not been initialised after the specified wait timeout
- const connectionInitWait =
- connectionInitWaitTimeout > 0 && isFinite(connectionInitWaitTimeout)
- ? setTimeout(() => {
- if (!ctxRef.current.connectionInitReceived) {
- ctxRef.current.socket.close(
- 4408,
- 'Connection initialisation timeout',
- );
- }
- }, connectionInitWaitTimeout)
- : null;
-
- // keep alive through ping-pong messages
- // read more about the websocket heartbeat here: https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_servers#Pings_and_Pongs_The_Heartbeat_of_WebSockets
- let pongWait: NodeJS.Timeout | null = null;
- const pingInterval =
- keepAlive > 0 && isFinite(keepAlive)
- ? setInterval(() => {
- // ping pong on open sockets only
- if (socket.readyState === WebSocket.OPEN) {
- // terminate the connection after pong wait has passed because the client is idle
- pongWait = setTimeout(() => {
- socket.terminate();
- }, keepAlive);
-
- // listen for client's pong and stop socket termination
- socket.once('pong', () => {
- if (pongWait) {
- clearTimeout(pongWait);
- pongWait = null;
- }
- });
-
- socket.ping();
- }
- }, keepAlive)
- : null;
-
- function errorOrCloseHandler(
- errorOrClose: WebSocket.ErrorEvent | WebSocket.CloseEvent,
- ) {
- if (connectionInitWait) {
- clearTimeout(connectionInitWait);
- }
- if (pongWait) {
- clearTimeout(pongWait);
- }
- if (pingInterval) {
- clearInterval(pingInterval);
- }
-
- if (isErrorEvent(errorOrClose)) {
- ctxRef.current.socket.close(
- 1011,
- isProd ? 'Internal Error' : errorOrClose.message,
- );
- }
-
- Object.values(ctxRef.current.subscriptions).forEach((subscription) => {
- subscription.return?.();
- });
- }
-
- socket.onerror = errorOrCloseHandler;
- socket.onclose = errorOrCloseHandler;
- socket.onmessage = makeOnMessage(ctxRef.current);
- }
-
- webSocketServer.on('error', (err) => {
- // catch the first thrown error and re-throw it once all clients have been notified
- let firstErr: Error | null = null;
-
- // report server errors by erroring out all clients with the same error
- for (const client of webSocketServer.clients) {
- try {
- client.emit('error', err);
- } catch (err) {
- firstErr = firstErr ?? err;
- }
- }
-
- if (firstErr) {
- throw firstErr;
- }
- });
-
- // Sends through a message only if the socket is open.
- async function sendMessage(
- ctx: Context,
- message: Message,
- ) {
- if (ctx.socket.readyState === WebSocket.OPEN) {
- return new Promise((resolve, reject) => {
- ctx.socket.send(stringifyMessage(message), (err) =>
- err ? reject(err) : resolve(),
- );
- });
- }
- }
-
- function makeOnMessage(ctx: Context) {
- return async function onMessage(event: WebSocket.MessageEvent) {
- try {
- const message = parseMessage(event.data);
+ extra,
+ };
+
+ // kick the client off (close socket) if the connection has
+ // not been initialised after the specified wait timeout
+ const connectionInitWait =
+ connectionInitWaitTimeout > 0 && isFinite(connectionInitWaitTimeout)
+ ? setTimeout(() => {
+ if (!ctx.connectionInitReceived) {
+ socket.close(4408, 'Connection initialisation timeout');
+ }
+ }, connectionInitWaitTimeout)
+ : null;
+
+ socket.onMessage(async function onMessage(data) {
+ let message: Message;
+ try {
+ message = parseMessage(data);
+ } catch (err) {
+ return socket.close(4400, 'Invalid message received');
+ }
switch (message.type) {
case MessageType.ConnectionInit: {
if (ctx.connectionInitReceived) {
- return ctx.socket.close(4429, 'Too many initialisation requests');
+ return socket.close(4429, 'Too many initialisation requests');
}
+ // @ts-expect-error: I can write
ctx.connectionInitReceived = true;
if (isObject(message.payload)) {
+ // @ts-expect-error: I can write
ctx.connectionParams = message.payload;
}
const permittedOrPayload = await onConnect?.(ctx);
if (permittedOrPayload === false) {
- return ctx.socket.close(4403, 'Forbidden');
+ return socket.close(4403, 'Forbidden');
}
- await sendMessage(
- ctx,
- isObject(permittedOrPayload)
- ? {
- type: MessageType.ConnectionAck,
- payload: permittedOrPayload,
- }
- : {
- type: MessageType.ConnectionAck,
- // payload is completely absent if not provided
- },
+ await socket.send(
+ stringifyMessage(
+ isObject(permittedOrPayload)
+ ? {
+ type: MessageType.ConnectionAck,
+ payload: permittedOrPayload,
+ }
+ : {
+ type: MessageType.ConnectionAck,
+ // payload is completely absent if not provided
+ },
+ ),
);
+ // @ts-expect-error: I can write
ctx.acknowledged = true;
break;
}
case MessageType.Subscribe: {
if (!ctx.acknowledged) {
- return ctx.socket.close(4401, 'Unauthorized');
+ return socket.close(4401, 'Unauthorized');
}
+ const id = message.id;
const emit = {
next: async (result: ExecutionResult, args: ExecutionArgs) => {
let nextMessage: NextMessage = {
- id: message.id,
+ id,
type: MessageType.Next,
payload: result,
};
@@ -554,11 +491,13 @@ export function createServer(
};
}
}
- await sendMessage(ctx, nextMessage);
+ await socket.send(
+ stringifyMessage(nextMessage),
+ );
},
error: async (errors: readonly GraphQLError[]) => {
let errorMessage: ErrorMessage = {
- id: message.id,
+ id,
type: MessageType.Error,
payload: errors,
};
@@ -571,16 +510,20 @@ export function createServer(
};
}
}
- await sendMessage(ctx, errorMessage);
+ await socket.send(
+ stringifyMessage(errorMessage),
+ );
},
complete: async (notifyClient: boolean) => {
const completeMessage: CompleteMessage = {
- id: message.id,
+ id,
type: MessageType.Complete,
};
await onComplete?.(ctx, completeMessage);
if (notifyClient) {
- await sendMessage(ctx, completeMessage);
+ await socket.send(
+ stringifyMessage(completeMessage),
+ );
}
},
};
@@ -601,10 +544,7 @@ export function createServer(
if (!schema) {
// you either provide a schema dynamically through
// `onSubscribe` or you set one up during the server setup
- return webSocketServer.emit(
- 'error',
- new Error('The GraphQL schema is not provided'),
- );
+ throw new Error('The GraphQL schema is not provided');
}
const { operationName, query, variables } = message.payload;
@@ -614,7 +554,6 @@ export function createServer(
document: parse(query),
variableValues: variables,
};
-
const validationErrors = validate(
execArgs.schema,
execArgs.document,
@@ -673,13 +612,13 @@ export function createServer(
/** multiple emitted results */
// iterable subscriptions are distinct on ID
- if (ctx.subscriptions[message.id]) {
- return ctx.socket.close(
+ if (ctx.subscriptions[id]) {
+ return socket.close(
4409,
- `Subscriber for ${message.id} already exists`,
+ `Subscriber for ${id} already exists`,
);
}
- ctx.subscriptions[message.id] = operationResult;
+ ctx.subscriptions[id] = operationResult;
for await (const result of operationResult) {
await emit.next(result, execArgs);
@@ -687,8 +626,8 @@ export function createServer(
// lack of subscription at this point indicates that the client
// completed the stream, he doesnt need to be reminded
- await emit.complete(Boolean(ctx.subscriptions[message.id]));
- delete ctx.subscriptions[message.id];
+ await emit.complete(Boolean(ctx.subscriptions[id]));
+ delete ctx.subscriptions[id];
} else {
/** single emitted result */
@@ -707,38 +646,15 @@ export function createServer(
`Unexpected message of type ${message.type} received`,
);
}
- } catch (err) {
- // TODO-db-201031 we perceive this as a client bad request error, but is it always?
- ctx.socket.close(4400, isProd ? 'Bad Request' : err.message);
- }
- };
- }
-
- return {
- webSocketServer,
- dispose: async () => {
- for (const client of webSocketServer.clients) {
- client.close(1001, 'Going away');
- }
-
- webSocketServer.removeAllListeners();
+ });
- await new Promise((resolve, reject) =>
- webSocketServer.close((err) => (err ? reject(err) : resolve())),
- );
+ // wait for close and cleanup
+ return async () => {
+ if (connectionInitWait) clearTimeout(connectionInitWait);
+ for (const sub of Object.values(ctx.subscriptions)) {
+ await sub.return?.();
+ }
+ };
},
};
}
-
-function isErrorEvent(obj: unknown): obj is WebSocket.ErrorEvent {
- return (
- isObject(obj) &&
- hasOwnObjectProperty(obj, 'error') &&
- hasOwnStringProperty(obj, 'message') &&
- hasOwnStringProperty(obj, 'type')
- );
-}
-
-function isWebSocketServer(obj: unknown): obj is WebSocketServer {
- return isObject(obj) && typeof obj.on === 'function';
-}
diff --git a/src/tests/client.ts b/src/tests/client.ts
index 4658bdeb..fc4ef8c9 100644
--- a/src/tests/client.ts
+++ b/src/tests/client.ts
@@ -451,7 +451,7 @@ describe('subscription operation', () => {
it('should stop dispatching messages after completing a subscription', async () => {
const {
url,
- server,
+ clients,
waitForOperation,
waitForComplete,
} = await startTServer();
@@ -461,7 +461,7 @@ describe('subscription operation', () => {
});
await waitForOperation();
- for (const client of server.webSocketServer.clients) {
+ for (const client of clients) {
client.once('message', () => {
// no more messages from the client
fail("Shouldn't have dispatched a message");
diff --git a/src/tests/fixtures/simple.ts b/src/tests/fixtures/simple.ts
index 17e2649f..3bc8b8f1 100644
--- a/src/tests/fixtures/simple.ts
+++ b/src/tests/fixtures/simple.ts
@@ -10,7 +10,8 @@ import { EventEmitter } from 'events';
import WebSocket from 'ws';
import net from 'net';
import http from 'http';
-import { createServer, ServerOptions, Server, Context } from '../../server';
+import { ServerOptions, Context } from '../../server';
+import { useServer, Extra } from '../../use/ws';
// distinct server for each test; if you forget to dispose, the fixture wont
const leftovers: Dispose[] = [];
@@ -25,7 +26,7 @@ afterEach(async () => {
export interface TServer {
url: string;
- server: Server;
+ ws: WebSocket.Server;
clients: Set;
pong: (key?: string) => void;
waitForClient: (
@@ -33,7 +34,7 @@ export interface TServer {
expire?: number,
) => Promise;
waitForConnect: (
- test?: (ctx: Context) => void,
+ test?: (ctx: Context) => void,
expire?: number,
) => Promise;
waitForOperation: (test?: () => void, expire?: number) => Promise;
@@ -119,7 +120,8 @@ export const schema = new GraphQLSchema({
});
export async function startTServer(
- options: Partial = {},
+ options: Partial> = {},
+ keepAlive?: number, // for ws tests sake
): Promise {
const path = '/simple';
const emitter = new EventEmitter();
@@ -138,10 +140,14 @@ export async function startTServer(
});
// create server and hook up for tracking operations
- const pendingConnections: Context[] = [];
+ const pendingConnections: Context[] = [];
let pendingOperations = 0,
pendingCompletes = 0;
- const server = await createServer(
+ const ws = new WebSocket.Server({
+ server: httpServer,
+ path,
+ });
+ const server = await useServer(
{
schema,
execute,
@@ -170,10 +176,8 @@ export async function startTServer(
emitter.emit('compl');
},
},
- {
- server: httpServer,
- path,
- },
+ ws,
+ keepAlive,
);
// search for open port from the starting port
@@ -208,7 +212,7 @@ export async function startTServer(
// pending websocket clients
let pendingCloses = 0;
const pendingClients: WebSocket[] = [];
- server.webSocketServer.on('connection', (client) => {
+ ws.on('connection', (client) => {
pendingClients.push(client);
client.once('close', () => {
pendingCloses++;
@@ -243,9 +247,9 @@ export async function startTServer(
return {
url: `ws://localhost:${addr.port}${path}`,
- server,
+ ws,
get clients() {
- return server.webSocketServer.clients;
+ return ws.clients;
},
pong,
waitForClient(test, expire) {
@@ -260,10 +264,10 @@ export async function startTServer(
if (pendingClients.length > 0) {
return done();
}
- server.webSocketServer.once('connection', done);
+ ws.once('connection', done);
if (expire) {
setTimeout(() => {
- server.webSocketServer.off('connection', done); // expired
+ ws.off('connection', done); // expired
resolve();
}, expire);
}
diff --git a/src/tests/server.ts b/src/tests/server.ts
index 4706e0df..a0120b48 100644
--- a/src/tests/server.ts
+++ b/src/tests/server.ts
@@ -1,4 +1,3 @@
-import WebSocket from 'ws';
import {
parse,
buildSchema,
@@ -8,85 +7,9 @@ import {
ExecutionArgs,
} from 'graphql';
import { GRAPHQL_TRANSPORT_WS_PROTOCOL } from '../protocol';
-import {
- MessageType,
- parseMessage,
- stringifyMessage,
- SubscribePayload,
-} from '../message';
+import { MessageType, parseMessage, stringifyMessage } from '../message';
import { schema, startTServer } from './fixtures/simple';
-
-function createTClient(
- url: string,
- protocols: string | string[] = GRAPHQL_TRANSPORT_WS_PROTOCOL,
-) {
- let closeEvent: WebSocket.CloseEvent;
- const queue: WebSocket.MessageEvent[] = [];
- return new Promise<{
- ws: WebSocket;
- waitForMessage: (
- test?: (data: WebSocket.MessageEvent) => void,
- expire?: number,
- ) => Promise;
- waitForClose: (
- test?: (event: WebSocket.CloseEvent) => void,
- expire?: number,
- ) => Promise;
- }>((resolve) => {
- const ws = new WebSocket(url, protocols);
- ws.onclose = (event) => (closeEvent = event); // just so that none are missed
- ws.onmessage = (message) => queue.push(message); // guarantee message delivery with a queue
- ws.once('open', () =>
- resolve({
- ws,
- async waitForMessage(test, expire) {
- return new Promise((resolve) => {
- const done = () => {
- // the onmessage listener above will be called before our listener, populating the queue
- // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
- const next = queue.shift()!;
- test?.(next);
- resolve();
- };
- if (queue.length > 0) {
- return done();
- }
- ws.once('message', done);
- if (expire) {
- setTimeout(() => {
- ws.removeListener('message', done); // expired
- resolve();
- }, expire);
- }
- });
- },
- async waitForClose(
- test?: (event: WebSocket.CloseEvent) => void,
- expire?: number,
- ) {
- return new Promise((resolve) => {
- if (closeEvent) {
- test?.(closeEvent);
- return resolve();
- }
- ws.onclose = (event) => {
- closeEvent = event;
- test?.(event);
- resolve();
- };
- if (expire) {
- setTimeout(() => {
- // @ts-expect-error: its ok
- ws.onclose = null; // expired
- resolve();
- }, expire);
- }
- });
- },
- }),
- );
- });
-}
+import { createTClient } from './utils';
/**
* Tests
@@ -126,44 +49,6 @@ it('should allow connections with valid protocols only', async () => {
);
});
-it('should gracefully go away when disposing', async () => {
- const server = await startTServer();
-
- const client1 = await createTClient(server.url);
- const client2 = await createTClient(server.url);
-
- await server.dispose(true);
-
- await client1.waitForClose((event) => {
- expect(event.code).toBe(1001);
- expect(event.reason).toBe('Going away');
- expect(event.wasClean).toBeTruthy();
- });
- await client2.waitForClose((event) => {
- expect(event.code).toBe(1001);
- expect(event.reason).toBe('Going away');
- expect(event.wasClean).toBeTruthy();
- });
-});
-
-it('should report server errors to clients by closing the connection', async () => {
- const {
- url,
- server: { webSocketServer },
- } = await startTServer();
-
- const client = await createTClient(url);
-
- const emittedError = new Error("I'm a teapot");
- webSocketServer.emit('error', emittedError);
-
- await client.waitForClose((event) => {
- expect(event.code).toBe(1011); // 1011: Internal Error
- expect(event.reason).toBe(emittedError.message);
- expect(event.wasClean).toBeTruthy(); // because the server reported the error
- });
-});
-
it('should use the provided roots as resolvers', async () => {
const schema = buildSchema(`
type Query {
@@ -452,41 +337,6 @@ it('should prefer the `onSubscribe` context value even if `context` option is se
);
});
-it('should handle errors thrown from client error listeners', async () => {
- const { server, url } = await startTServer();
-
- const client = await createTClient(url);
- client.ws.send(
- stringifyMessage({
- type: MessageType.ConnectionInit,
- }),
- );
- await client.waitForMessage(({ data }) => {
- expect(parseMessage(data).type).toBe(MessageType.ConnectionAck);
- });
-
- const surpriseErr1 = new Error('Well hello there!');
- const surpriseErr2 = new Error('I wont be thrown!'); // first to throw stops emission
- for (const client of server.webSocketServer.clients) {
- client.on('error', () => {
- throw surpriseErr1;
- });
- client.on('error', () => {
- throw surpriseErr2;
- });
- }
-
- expect(() => {
- server.webSocketServer.emit('error', new Error('I am a nice error'));
- }).toThrowError(surpriseErr1);
-
- await client.waitForClose((event) => {
- expect(event.code).toBe(1011);
- expect(event.reason).toBe('I am a nice error');
- expect(event.wasClean).toBeTruthy();
- });
-});
-
describe('Connect', () => {
it('should refuse connection and close socket if returning `false`', async () => {
const { url } = await startTServer({
@@ -510,29 +360,6 @@ describe('Connect', () => {
});
});
- it('should close socket with error thrown from the callback', async () => {
- const error = new Error("I'm a teapot");
-
- const { url } = await startTServer({
- onConnect: () => {
- throw error;
- },
- });
-
- const client = await createTClient(url);
- client.ws.send(
- stringifyMessage({
- type: MessageType.ConnectionInit,
- }),
- );
-
- await client.waitForClose((event) => {
- expect(event.code).toBe(4400);
- expect(event.reason).toBe(error.message);
- expect(event.wasClean).toBeTruthy();
- });
- });
-
it('should acknowledge connection if not implemented, returning `true` or nothing', async () => {
async function test(url: string) {
const client = await createTClient(url);
@@ -736,159 +563,6 @@ describe('Subscribe', () => {
});
});
- it('should close the socket on request if schema is left undefined', async () => {
- const { url } = await startTServer({
- schema: undefined,
- });
-
- const client = await createTClient(url);
-
- client.ws.send(
- stringifyMessage({
- type: MessageType.ConnectionInit,
- }),
- );
-
- await client.waitForMessage(({ data }) => {
- expect(parseMessage(data).type).toBe(MessageType.ConnectionAck);
- client.ws.send(
- stringifyMessage({
- id: '1',
- type: MessageType.Subscribe,
- payload: {
- operationName: 'TestString',
- query: `query TestString {
- getValue
- }`,
- variables: {},
- },
- }),
- );
- });
-
- await client.waitForClose((event) => {
- expect(event.code).toBe(1011);
- expect(event.reason).toBe('The GraphQL schema is not provided');
- expect(event.wasClean).toBeTruthy();
- });
- });
-
- it('should close the socket with errors thrown from any callback', async () => {
- const error = new Error('Stop');
-
- // onConnect
- let server = await startTServer({
- onConnect: () => {
- throw error;
- },
- });
- const client = await createTClient(server.url);
- client.ws.send(
- stringifyMessage({
- type: MessageType.ConnectionInit,
- }),
- );
- await client.waitForClose((event) => {
- expect(event.code).toBe(4400);
- expect(event.reason).toBe(error.message);
- expect(event.wasClean).toBeTruthy();
- });
- await server.dispose();
-
- async function test(
- url: string,
- payload: SubscribePayload = {
- query: `query { getValue }`,
- },
- ) {
- const client = await createTClient(url);
- client.ws.send(
- stringifyMessage({
- type: MessageType.ConnectionInit,
- }),
- );
-
- await client.waitForMessage(({ data }) => {
- expect(parseMessage(data).type).toBe(MessageType.ConnectionAck);
- client.ws.send(
- stringifyMessage({
- id: '1',
- type: MessageType.Subscribe,
- payload,
- }),
- );
- });
-
- await client.waitForClose((event) => {
- expect(event.code).toBe(4400);
- expect(event.reason).toBe(error.message);
- expect(event.wasClean).toBeTruthy();
- });
- }
-
- // onSubscribe
- server = await startTServer({
- onSubscribe: () => {
- throw error;
- },
- });
- await test(server.url);
- await server.dispose();
-
- server = await startTServer({
- onOperation: () => {
- throw error;
- },
- });
- await test(server.url);
- await server.dispose();
-
- // execute
- server = await startTServer({
- execute: () => {
- throw error;
- },
- });
- await test(server.url);
- await server.dispose();
-
- // subscribe
- server = await startTServer({
- subscribe: () => {
- throw error;
- },
- });
- await test(server.url, { query: 'subscription { greetings }' });
- await server.dispose();
-
- // onNext
- server = await startTServer({
- onNext: () => {
- throw error;
- },
- });
- await test(server.url);
- await server.dispose();
-
- // onError
- server = await startTServer({
- onError: () => {
- throw error;
- },
- });
- await test(server.url, { query: 'query { noExisto }' });
- await server.dispose();
-
- // onComplete
- server = await startTServer({
- onComplete: () => {
- throw error;
- },
- });
- await test(server.url);
- await server.dispose();
- });
-
it('should directly use the execution arguments returned from `onSubscribe`', async () => {
const nopeArgs = {
schema,
@@ -1000,44 +674,6 @@ describe('Subscribe', () => {
}, 30);
});
- it('should close the socket on empty arrays returned from `onSubscribe`', async () => {
- const { url } = await startTServer({
- onSubscribe: () => {
- return [];
- },
- });
-
- const client = await createTClient(url);
-
- client.ws.send(
- stringifyMessage({
- type: MessageType.ConnectionInit,
- }),
- );
-
- await client.waitForMessage(({ data }) => {
- expect(parseMessage(data).type).toBe(MessageType.ConnectionAck);
- });
-
- client.ws.send(
- stringifyMessage({
- id: '1',
- type: MessageType.Subscribe,
- payload: {
- query: 'subscription { ping }',
- },
- }),
- );
-
- await client.waitForClose((event) => {
- expect(event.code).toBe(4400);
- expect(event.reason).toBe(
- 'Invalid return value from onSubscribe hook, expected an array of GraphQLError objects',
- );
- expect(event.wasClean).toBeTruthy();
- });
- });
-
it('should use the execution result returned from `onNext`', async () => {
const { url } = await startTServer({
onNext: (_ctx, _message) => {
@@ -1638,64 +1274,3 @@ describe('Subscribe', () => {
client.ws.terminate();
});
});
-
-describe('Keep-Alive', () => {
- it('should dispatch pings after the timeout has passed', async (done) => {
- const { url } = await startTServer({
- keepAlive: 50,
- });
-
- const client = await createTClient(url);
- client.ws.send(
- stringifyMessage({
- type: MessageType.ConnectionInit,
- }),
- );
-
- client.ws.once('ping', () => done());
- });
-
- it('should not dispatch pings if disabled with nullish timeout', async (done) => {
- const { url } = await startTServer({
- keepAlive: 0,
- });
-
- const client = await createTClient(url);
- client.ws.send(
- stringifyMessage({
- type: MessageType.ConnectionInit,
- }),
- );
-
- client.ws.once('ping', () => fail('Shouldnt have pinged'));
-
- setTimeout(done, 50);
- });
-
- it('should terminate the socket if no pong is sent in response to a ping', async () => {
- const { url } = await startTServer({
- keepAlive: 50,
- });
-
- const client = await createTClient(url);
- client.ws.send(
- stringifyMessage({
- type: MessageType.ConnectionInit,
- }),
- );
-
- // disable pong
- client.ws.pong = () => {
- /**/
- };
-
- // ping is received
- await new Promise((resolve) => client.ws.once('ping', resolve));
-
- // termination is not graceful or clean
- await client.waitForClose((event) => {
- expect(event.code).toBe(1006);
- expect(event.wasClean).toBeFalsy();
- });
- });
-});
diff --git a/src/tests/use/ws.ts b/src/tests/use/ws.ts
new file mode 100644
index 00000000..e7b45a9c
--- /dev/null
+++ b/src/tests/use/ws.ts
@@ -0,0 +1,332 @@
+import WebSocket from 'ws';
+import http from 'http';
+import {
+ MessageType,
+ stringifyMessage,
+ parseMessage,
+ SubscribePayload,
+} from '../../message';
+import { startTServer } from '../fixtures/simple';
+import { createTClient } from '../utils';
+
+it('should gracefully go away when disposing', async () => {
+ const server = await startTServer();
+
+ const client1 = await createTClient(server.url);
+ const client2 = await createTClient(server.url);
+
+ await server.dispose(true);
+
+ await client1.waitForClose((event) => {
+ expect(event.code).toBe(1001);
+ expect(event.reason).toBe('Going away');
+ expect(event.wasClean).toBeTruthy();
+ });
+ await client2.waitForClose((event) => {
+ expect(event.code).toBe(1001);
+ expect(event.reason).toBe('Going away');
+ expect(event.wasClean).toBeTruthy();
+ });
+});
+
+it('should add the initial request and websocket in the context extra', async (done) => {
+ const server = await startTServer({
+ onConnect: (ctx) => {
+ expect(ctx.extra.socket).toBeInstanceOf(WebSocket);
+ expect(ctx.extra.request).toBeInstanceOf(http.IncomingMessage);
+ done();
+ return false; // reject client for sake of test
+ },
+ });
+
+ const client = await createTClient(server.url);
+ client.ws.send(
+ stringifyMessage({
+ type: MessageType.ConnectionInit,
+ }),
+ );
+});
+
+it('should close the socket with errors thrown from any callback', async () => {
+ const error = new Error('Stop');
+
+ // onConnect
+ let server = await startTServer({
+ onConnect: () => {
+ throw error;
+ },
+ });
+ const client = await createTClient(server.url);
+ client.ws.send(
+ stringifyMessage({
+ type: MessageType.ConnectionInit,
+ }),
+ );
+ await client.waitForClose((event) => {
+ expect(event.code).toBe(1011);
+ expect(event.reason).toBe(error.message);
+ expect(event.wasClean).toBeTruthy();
+ });
+ await server.dispose();
+
+ async function test(
+ url: string,
+ payload: SubscribePayload = {
+ query: `query { getValue }`,
+ },
+ ) {
+ const client = await createTClient(url);
+ client.ws.send(
+ stringifyMessage({
+ type: MessageType.ConnectionInit,
+ }),
+ );
+
+ await client.waitForMessage(({ data }) => {
+ expect(parseMessage(data).type).toBe(MessageType.ConnectionAck);
+ client.ws.send(
+ stringifyMessage({
+ id: '1',
+ type: MessageType.Subscribe,
+ payload,
+ }),
+ );
+ });
+
+ await client.waitForClose((event) => {
+ expect(event.code).toBe(1011);
+ expect(event.reason).toBe(error.message);
+ expect(event.wasClean).toBeTruthy();
+ });
+ }
+
+ // onSubscribe
+ server = await startTServer({
+ onSubscribe: () => {
+ throw error;
+ },
+ });
+ await test(server.url);
+ await server.dispose();
+
+ server = await startTServer({
+ onOperation: () => {
+ throw error;
+ },
+ });
+ await test(server.url);
+ await server.dispose();
+
+ // execute
+ server = await startTServer({
+ execute: () => {
+ throw error;
+ },
+ });
+ await test(server.url);
+ await server.dispose();
+
+ // subscribe
+ server = await startTServer({
+ subscribe: () => {
+ throw error;
+ },
+ });
+ await test(server.url, { query: 'subscription { greetings }' });
+ await server.dispose();
+
+ // onNext
+ server = await startTServer({
+ onNext: () => {
+ throw error;
+ },
+ });
+ await test(server.url);
+ await server.dispose();
+
+ // onError
+ server = await startTServer({
+ onError: () => {
+ throw error;
+ },
+ });
+ await test(server.url, { query: 'query { noExisto }' });
+ await server.dispose();
+
+ // onComplete
+ server = await startTServer({
+ onComplete: () => {
+ throw error;
+ },
+ });
+ await test(server.url);
+ await server.dispose();
+});
+
+it('should close the socket on request if schema is left undefined', async () => {
+ const { url } = await startTServer({
+ schema: undefined,
+ });
+
+ const client = await createTClient(url);
+
+ client.ws.send(
+ stringifyMessage({
+ type: MessageType.ConnectionInit,
+ }),
+ );
+
+ await client.waitForMessage(({ data }) => {
+ expect(parseMessage(data).type).toBe(MessageType.ConnectionAck);
+ client.ws.send(
+ stringifyMessage({
+ id: '1',
+ type: MessageType.Subscribe,
+ payload: {
+ operationName: 'TestString',
+ query: `query TestString {
+ getValue
+ }`,
+ variables: {},
+ },
+ }),
+ );
+ });
+
+ await client.waitForClose((event) => {
+ expect(event.code).toBe(1011);
+ expect(event.reason).toBe('The GraphQL schema is not provided');
+ expect(event.wasClean).toBeTruthy();
+ });
+});
+
+it('should close the socket on empty arrays returned from `onSubscribe`', async () => {
+ const { url } = await startTServer({
+ onSubscribe: () => {
+ return [];
+ },
+ });
+
+ const client = await createTClient(url);
+
+ client.ws.send(
+ stringifyMessage({
+ type: MessageType.ConnectionInit,
+ }),
+ );
+
+ await client.waitForMessage(({ data }) => {
+ expect(parseMessage(data).type).toBe(MessageType.ConnectionAck);
+ });
+
+ client.ws.send(
+ stringifyMessage({
+ id: '1',
+ type: MessageType.Subscribe,
+ payload: {
+ query: 'subscription { ping }',
+ },
+ }),
+ );
+
+ await client.waitForClose((event) => {
+ expect(event.code).toBe(1011);
+ expect(event.reason).toBe(
+ 'Invalid return value from onSubscribe hook, expected an array of GraphQLError objects',
+ );
+ expect(event.wasClean).toBeTruthy();
+ });
+});
+
+it('should close socket with error thrown from the callback', async () => {
+ const error = new Error("I'm a teapot");
+
+ const { url } = await startTServer({
+ onConnect: () => {
+ throw error;
+ },
+ });
+
+ const client = await createTClient(url);
+ client.ws.send(
+ stringifyMessage({
+ type: MessageType.ConnectionInit,
+ }),
+ );
+
+ await client.waitForClose((event) => {
+ expect(event.code).toBe(1011);
+ expect(event.reason).toBe(error.message);
+ expect(event.wasClean).toBeTruthy();
+ });
+});
+
+it('should report server errors to clients by closing the connection', async () => {
+ const { url, ws } = await startTServer();
+
+ const client = await createTClient(url);
+
+ const emittedError = new Error("I'm a teapot");
+ ws.emit('error', emittedError);
+
+ await client.waitForClose((event) => {
+ expect(event.code).toBe(1011); // 1011: Internal Error
+ expect(event.reason).toBe(emittedError.message);
+ expect(event.wasClean).toBeTruthy(); // because the server reported the error
+ });
+});
+
+describe('Keep-Alive', () => {
+ it('should dispatch pings after the timeout has passed', async (done) => {
+ const { url } = await startTServer(undefined, 50);
+
+ const client = await createTClient(url);
+ client.ws.send(
+ stringifyMessage({
+ type: MessageType.ConnectionInit,
+ }),
+ );
+
+ client.ws.once('ping', () => done());
+ });
+
+ it('should not dispatch pings if disabled with nullish timeout', async (done) => {
+ const { url } = await startTServer(undefined, 0);
+
+ const client = await createTClient(url);
+ client.ws.send(
+ stringifyMessage({
+ type: MessageType.ConnectionInit,
+ }),
+ );
+
+ client.ws.once('ping', () => fail('Shouldnt have pinged'));
+
+ setTimeout(done, 50);
+ });
+
+ it('should terminate the socket if no pong is sent in response to a ping', async () => {
+ const { url } = await startTServer(undefined, 50);
+
+ const client = await createTClient(url);
+ client.ws.send(
+ stringifyMessage({
+ type: MessageType.ConnectionInit,
+ }),
+ );
+
+ // disable pong
+ client.ws.pong = () => {
+ /**/
+ };
+
+ // ping is received
+ await new Promise((resolve) => client.ws.once('ping', resolve));
+
+ // termination is not graceful or clean
+ await client.waitForClose((event) => {
+ expect(event.code).toBe(1006);
+ expect(event.wasClean).toBeFalsy();
+ });
+ });
+});
diff --git a/src/tests/utils/index.ts b/src/tests/utils/index.ts
new file mode 100644
index 00000000..75f07a56
--- /dev/null
+++ b/src/tests/utils/index.ts
@@ -0,0 +1 @@
+export * from './tclient';
diff --git a/src/tests/utils/tclient.ts b/src/tests/utils/tclient.ts
new file mode 100644
index 00000000..0949e376
--- /dev/null
+++ b/src/tests/utils/tclient.ts
@@ -0,0 +1,75 @@
+import WebSocket from 'ws';
+import { GRAPHQL_TRANSPORT_WS_PROTOCOL } from '../../protocol';
+
+// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
+export function createTClient(
+ url: string,
+ protocols: string | string[] = GRAPHQL_TRANSPORT_WS_PROTOCOL,
+) {
+ let closeEvent: WebSocket.CloseEvent;
+ const queue: WebSocket.MessageEvent[] = [];
+ return new Promise<{
+ ws: WebSocket;
+ waitForMessage: (
+ test?: (data: WebSocket.MessageEvent) => void,
+ expire?: number,
+ ) => Promise;
+ waitForClose: (
+ test?: (event: WebSocket.CloseEvent) => void,
+ expire?: number,
+ ) => Promise;
+ }>((resolve) => {
+ const ws = new WebSocket(url, protocols);
+ ws.onclose = (event) => (closeEvent = event); // just so that none are missed
+ ws.onmessage = (message) => queue.push(message); // guarantee message delivery with a queue
+ ws.once('open', () =>
+ resolve({
+ ws,
+ async waitForMessage(test, expire) {
+ return new Promise((resolve) => {
+ const done = () => {
+ // the onmessage listener above will be called before our listener, populating the queue
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
+ const next = queue.shift()!;
+ test?.(next);
+ resolve();
+ };
+ if (queue.length > 0) {
+ return done();
+ }
+ ws.once('message', done);
+ if (expire) {
+ setTimeout(() => {
+ ws.removeListener('message', done); // expired
+ resolve();
+ }, expire);
+ }
+ });
+ },
+ async waitForClose(
+ test?: (event: WebSocket.CloseEvent) => void,
+ expire?: number,
+ ) {
+ return new Promise((resolve) => {
+ if (closeEvent) {
+ test?.(closeEvent);
+ return resolve();
+ }
+ ws.onclose = (event) => {
+ closeEvent = event;
+ test?.(event);
+ resolve();
+ };
+ if (expire) {
+ setTimeout(() => {
+ // @ts-expect-error: its ok
+ ws.onclose = null; // expired
+ resolve();
+ }, expire);
+ }
+ });
+ },
+ }),
+ );
+ });
+}
diff --git a/src/use/ws.ts b/src/use/ws.ts
new file mode 100644
index 00000000..00312487
--- /dev/null
+++ b/src/use/ws.ts
@@ -0,0 +1,126 @@
+import type * as http from 'http';
+import type * as ws from 'ws';
+import { makeServer, ServerOptions } from '../server';
+import { Disposable } from '../types';
+
+// for nicer documentation
+type WebSocket = typeof ws.prototype;
+type WebSocketServer = ws.Server;
+
+/**
+ * The extra that will be put in the `Context`.
+ */
+export interface Extra {
+ /**
+ * The actual socket connection between the server and the client.
+ */
+ readonly socket: WebSocket;
+ /**
+ * The initial HTTP request before the actual
+ * socket and connection is established.
+ */
+ readonly request: http.IncomingMessage;
+}
+
+/**
+ * Use the server on a [ws](https://github.com/websockets/ws) ws server.
+ * This is a basic starter, feel free to copy the code over and adjust it to your needs
+ */
+export function useServer(
+ options: ServerOptions,
+ ws: WebSocketServer,
+ /**
+ * The timout between dispatched keep-alive messages. Internally uses the [ws Ping and Pongs]((https://developer.mozilla.org/en-US/docs/Web/API/wss_API/Writing_ws_servers#Pings_and_Pongs_The_Heartbeat_of_wss))
+ * to check that the link between the clients and the server is operating and to prevent the link
+ * from being broken due to idling.
+ *
+ * @default 12 * 1000 // 12 seconds
+ */
+ keepAlive = 12 * 1000,
+): Disposable {
+ const isProd = process.env.NODE_ENV === 'production';
+ const server = makeServer(options);
+
+ ws.on('error', (err) => {
+ // catch the first thrown error and re-throw it once all clients have been notified
+ let firstErr: Error | null = null;
+
+ // report server errors by erroring out all clients with the same error
+ for (const client of ws.clients) {
+ try {
+ client.close(1011, isProd ? 'Internal Error' : err.message);
+ } catch (err) {
+ firstErr = firstErr ?? err;
+ }
+ }
+
+ if (firstErr) {
+ throw firstErr;
+ }
+ });
+
+ ws.on('connection', (socket, request) => {
+ // keep alive through ping-pong messages
+ let pongWait: NodeJS.Timeout | null = null;
+ const pingInterval =
+ keepAlive > 0 && isFinite(keepAlive)
+ ? setInterval(() => {
+ // ping pong on open sockets only
+ if (socket.readyState === socket.OPEN) {
+ // terminate the connection after pong wait has passed because the client is idle
+ pongWait = setTimeout(() => {
+ socket.terminate();
+ }, keepAlive);
+
+ // listen for client's pong and stop socket termination
+ socket.once('pong', () => {
+ if (pongWait) {
+ clearTimeout(pongWait);
+ pongWait = null;
+ }
+ });
+
+ socket.ping();
+ }
+ }, keepAlive)
+ : null;
+
+ const closed = server.opened(
+ {
+ protocol: socket.protocol,
+ send: (data) =>
+ new Promise((resolve, reject) => {
+ socket.send(data, (err) => (err ? reject(err) : resolve()));
+ }),
+ close: (code, reason) => socket.close(code, reason),
+ onMessage: (cb) =>
+ socket.on('message', async (event) => {
+ try {
+ await cb(event.toString());
+ } catch (err) {
+ socket.close(1011, isProd ? 'Internal Error' : err.message);
+ }
+ }),
+ },
+ { socket, request },
+ );
+
+ socket.once('close', () => {
+ if (pongWait) clearTimeout(pongWait);
+ if (pingInterval) clearInterval(pingInterval);
+ closed();
+ });
+ });
+
+ return {
+ dispose: async () => {
+ for (const client of ws.clients) {
+ client.close(1001, 'Going away');
+ }
+ ws.removeAllListeners();
+ await new Promise((resolve, reject) => {
+ ws.close((err) => (err ? reject(err) : resolve()));
+ });
+ },
+ };
+}