Skip to content
This repository has been archived by the owner on Apr 14, 2023. It is now read-only.

Allow connectionParams as a Promise #443

Merged
merged 3 commits into from
Jul 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ ReactDOM.render(
- `options?: Object` : optional, object to modify default client behavior
* `timeout?: number` : how long the client should wait in ms for a keep-alive message from the server (default 30000 ms), this parameter is ignored if the server does not send keep-alive messages. This will also be used to calculate the max connection time per connect/reconnect
* `lazy?: boolean` : use to set lazy mode - connects only when first subscription created, and delay the socket initialization
* `connectionParams?: Object | Function` : object that will be available as first argument of `onConnect` (in server side), if passed a function - it will call it and send the return value
* `connectionParams?: Object | Function | Promise<Object>` : object that will be available as first argument of `onConnect` (in server side), if passed a function - it will call it and send the return value, if function returns as promise - it will wait until it resolves and send the resolved value.
* `reconnect?: boolean` : automatic reconnect in case of connection error
* `reconnectionAttempts?: number` : how much reconnect attempts
* `connectionCallback?: (error) => {}` : optional, callback that called after the first init message, with the error (if there is one)
Expand Down Expand Up @@ -312,7 +312,7 @@ ReactDOM.render(
* `onDisconnect?: (webSocket: WebSocket, context: ConnectionContext)` : optional method that called when a client disconnects
* `keepAlive?: number` : optional interval in ms to send `KEEPALIVE` messages to all clients

- `socketOptions: {WebSocket.IServerOptions}` : options to pass to the WebSocket object (full docs [here](https://github.com/websockets/ws/blob/master/doc/ws.md))
- `socketOptions: {WebSocket.IServerOptions}` : options to pass to the WebSocket object (full docs [here](https://github.com/websockets/ws/blob/master/doc/ws.md))
* `server?: HttpServer` - existing HTTP server to use (use without `host`/`port`)
* `host?: string` - server host
* `port?: number` - server port
Expand All @@ -322,7 +322,7 @@ ReactDOM.render(

## How it works?

* For GraphQL WebSocket protocol docs, [click here](https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md)
* For GraphQL WebSocket protocol docs, [click here](https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md)
* This package also uses `AsyncIterator` internally using [iterall](https://github.com/leebyron/iterall), for more information [click here](https://github.com/ReactiveX/IxJS), or [the proposal](https://github.com/tc39/proposal-async-iteration)

The current version of this transport, also support a previous version of the protocol.
Expand Down
39 changes: 29 additions & 10 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export type ConnectionParams = {
[paramName: string]: any,
};

export type ConnectionParamsOptions = ConnectionParams | Function;
export type ConnectionParamsOptions = ConnectionParams | Function | Promise<ConnectionParams>;

export interface ClientOptions {
connectionParams?: ConnectionParamsOptions;
Expand All @@ -73,7 +73,7 @@ export class SubscriptionClient {
public operations: Operations;
private url: string;
private nextOperationId: number;
private connectionParams: ConnectionParamsOptions;
private connectionParams: Function;
private wsTimeout: number;
private unsentMessagesQueue: Array<any>; // queued messages while websocket is opening.
private reconnect: boolean;
Expand Down Expand Up @@ -111,7 +111,6 @@ export class SubscriptionClient {
throw new Error('Unable to find native implementation, or alternative implementation for WebSocket!');
}

this.connectionParams = connectionParams;
this.connectionCallback = connectionCallback;
this.url = url;
this.operations = {};
Expand All @@ -129,6 +128,7 @@ export class SubscriptionClient {
this.middlewares = [];
this.client = null;
this.maxConnectTimeGenerator = this.createMaxConnectTimeGenerator();
this.connectionParams = this.getConnectionParams(connectionParams);

if (!this.lazy) {
this.connect();
Expand Down Expand Up @@ -288,6 +288,20 @@ export class SubscriptionClient {
return this;
}

private getConnectionParams(connectionParams: ConnectionParamsOptions): Function {
return (): Promise<ConnectionParams> => new Promise((resolve, reject) => {
if (typeof connectionParams === 'function') {
try {
return resolve(connectionParams.call(null));
} catch (error) {
return reject(error);
}
}

resolve(connectionParams);
});
}

private executeOperation(options: OperationOptions, handler: (error: Error[], result?: any) => void): string {
if (this.client === null) {
this.connect();
Expand Down Expand Up @@ -526,22 +540,27 @@ export class SubscriptionClient {

this.checkMaxConnectTimeout();

this.client.onopen = () => {
this.client.onopen = async () => {
if (this.status === this.wsImpl.OPEN) {
this.clearMaxConnectTimeout();
this.closedByUser = false;
this.eventEmitter.emit(this.reconnecting ? 'reconnecting' : 'connecting');

const payload: ConnectionParams = typeof this.connectionParams === 'function' ? this.connectionParams() : this.connectionParams;

// Send CONNECTION_INIT message, no need to wait for connection to success (reduce roundtrips)
this.sendMessage(undefined, MessageTypes.GQL_CONNECTION_INIT, payload);
this.flushUnsentMessagesQueue();
try {
const connectionParams: ConnectionParams = await this.connectionParams();

// Send CONNECTION_INIT message, no need to wait for connection to success (reduce roundtrips)
this.sendMessage(undefined, MessageTypes.GQL_CONNECTION_INIT, connectionParams);
this.flushUnsentMessagesQueue();
} catch (error) {
this.sendMessage(undefined, MessageTypes.GQL_CONNECTION_ERROR, error);
this.flushUnsentMessagesQueue();
}
}
};

this.client.onclose = () => {
if ( !this.closedByUser ) {
if (!this.closedByUser) {
this.close(false, false);
}
};
Expand Down
62 changes: 62 additions & 0 deletions src/test/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,68 @@ describe('Client', function () {
});
});

it('should send connectionParams which resolves from a promise along with init message', (done) => {
const connectionParams: any = {
test: true,
};
wsServer.on('connection', (connection: any) => {
connection.on('message', (message: any) => {
const parsedMessage = JSON.parse(message);
expect(JSON.stringify(parsedMessage.payload)).to.equal(JSON.stringify(connectionParams));
done();
});
});

new SubscriptionClient(`ws://localhost:${RAW_TEST_PORT}/`, {
connectionParams: new Promise((resolve) => {
setTimeout(() => {
resolve(connectionParams);
}, 100);
}),
});
});

it('should send connectionParams as a function which returns a promise along with init message', (done) => {
const connectionParams: any = {
test: true,
};
wsServer.on('connection', (connection: any) => {
connection.on('message', (message: any) => {
const parsedMessage = JSON.parse(message);
expect(JSON.stringify(parsedMessage.payload)).to.equal(JSON.stringify(connectionParams));
done();
});
});

new SubscriptionClient(`ws://localhost:${RAW_TEST_PORT}/`, {
connectionParams: new Promise((resolve) => {
setTimeout(() => {
resolve(connectionParams);
}, 100);
}),
});
});

it('should catch errors in connectionParams which came from a promise', (done) => {
const error = 'foo';

wsServer.on('connection', (connection: any) => {
connection.on('message', (message: any) => {
const parsedMessage = JSON.parse(message);
expect(parsedMessage.payload).to.equal(error);
done();
});
});

new SubscriptionClient(`ws://localhost:${RAW_TEST_PORT}/`, {
connectionParams: new Promise((_, reject) => {
setTimeout(() => {
reject(error);
}, 100);
}),
});
});

it('should override OperationOptions with middleware', function (done) {
const client3 = new SubscriptionClient(`ws://localhost:${TEST_PORT}/`);
let asyncFunc = (next: any) => {
Expand Down