Skip to content

Commit

Permalink
refactor: Distinct server for each test allowing parallel runs (#42)
Browse files Browse the repository at this point in the history
* refactor: start tserver for each test

* perf: run tests in parallel

* docs: comments
  • Loading branch information
enisdenjo authored Oct 24, 2020
1 parent 5483a5d commit e4fa7fa
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 161 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
"gendocs": "typedoc --options typedoc.js src/",
"lint": "eslint 'src'",
"type-check": "tsc --noEmit",
"test": "jest -i",
"test": "jest",
"build": "tsc -b tsconfig.build.json",
"release": "semantic-release"
},
Expand Down
59 changes: 40 additions & 19 deletions src/tests/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,22 @@

import WebSocket from 'ws';
import { EventEmitter } from 'events';
import { url, startTServer, TServer } from './fixtures/simple';
import { startTServer } from './fixtures/simple';
import { createClient, Client, EventListener } from '../client';
import { SubscribePayload } from '../message';

// simulate browser environment for easier client testing
beforeEach(() => {
Object.assign(global, {
WebSocket: WebSocket,
});
});

// Just does nothing
function noop(): void {
/**/
}

let server: TServer, forgottenDispose: TServer['dispose'] | undefined;
beforeEach(async () => {
Object.assign(global, { WebSocket: WebSocket });
const { dispose, ...rest } = await startTServer();
forgottenDispose = dispose;
server = {
...rest,
dispose: (beNice) =>
dispose(beNice).then(() => (forgottenDispose = undefined)),
};
});
afterEach(async () => {
if (forgottenDispose) {
await forgottenDispose();
forgottenDispose = undefined;
}
});

interface TSubscribe<T> {
waitForNext: (test?: (value: T) => void, expire?: number) => Promise<void>;
waitForError: (
Expand Down Expand Up @@ -134,6 +123,8 @@ function tsubscribe<T = unknown>(
*/

it('should use the provided WebSocket implementation', async () => {
const { url, ...server } = await startTServer();

Object.assign(global, {
WebSocket: null,
});
Expand All @@ -148,6 +139,8 @@ it('should use the provided WebSocket implementation', async () => {
});

it('should not accept invalid WebSocket implementations', async () => {
const { url } = await startTServer();

Object.assign(global, {
WebSocket: null,
});
Expand All @@ -163,6 +156,8 @@ it('should not accept invalid WebSocket implementations', async () => {

describe('query operation', () => {
it('should execute the query, "next" the result and then complete', async () => {
const { url } = await startTServer();

const client = createClient({ url });

const sub = tsubscribe(client, {
Expand All @@ -179,6 +174,8 @@ describe('query operation', () => {

describe('subscription operation', () => {
it('should execute and "next" the emitted results until disposed', async () => {
const { url, ...server } = await startTServer();

const client = createClient({ url });

const sub = tsubscribe(client, {
Expand Down Expand Up @@ -208,6 +205,8 @@ describe('subscription operation', () => {
});

it('should emit results to correct distinct sinks', async () => {
const { url, ...server } = await startTServer();

const client = createClient({ url });

const sub1 = tsubscribe(client, {
Expand Down Expand Up @@ -263,6 +262,8 @@ describe('subscription operation', () => {
});

it('should use the provided `generateID` for subscription IDs', async () => {
const { url, ...server } = await startTServer();

const generateIDFn = jest.fn(() => 'not unique');

const client = createClient({ url, generateID: generateIDFn });
Expand All @@ -276,6 +277,8 @@ describe('subscription operation', () => {
});

it('should dispose of the subscription on complete', async () => {
const { url, ...server } = await startTServer();

const client = createClient({ url });

const sub = tsubscribe(client, {
Expand All @@ -290,6 +293,8 @@ describe('subscription operation', () => {
});

it('should dispose of the subscription on error', async () => {
const { url, ...server } = await startTServer();

const client = createClient({ url });

const sub = tsubscribe(client, {
Expand All @@ -306,6 +311,8 @@ describe('subscription operation', () => {

describe('"concurrency"', () => {
it('should dispatch and receive messages even if one subscriber disposes while another one subscribes', async () => {
const { url, ...server } = await startTServer();

const client = createClient({ url, retryAttempts: 0 });

const sub1 = tsubscribe(client, {
Expand Down Expand Up @@ -336,6 +343,8 @@ describe('"concurrency"', () => {

describe('lazy', () => {
it('should connect immediately when mode is disabled', async () => {
const { url, ...server } = await startTServer();

createClient({
url,
lazy: false,
Expand All @@ -345,6 +354,8 @@ describe('lazy', () => {
});

it('should close socket when disposing while mode is disabled', async () => {
const { url, ...server } = await startTServer();

// wait for connected
const client = await new Promise<Client>((resolve) => {
const client = createClient({
Expand All @@ -363,6 +374,8 @@ describe('lazy', () => {
});

it('should connect on first subscribe when mode is enabled', async () => {
const { url, ...server } = await startTServer();

const client = createClient({
url,
lazy: true, // default
Expand All @@ -388,6 +401,8 @@ describe('lazy', () => {
});

it('should disconnect on last unsubscribe when mode is enabled', async () => {
const { url, ...server } = await startTServer();

const client = createClient({
url,
lazy: true, // default
Expand Down Expand Up @@ -427,6 +442,8 @@ describe('lazy', () => {

describe('reconnecting', () => {
it('should not reconnect if retry attempts is zero', async () => {
const { url, ...server } = await startTServer();

createClient({
url,
lazy: false,
Expand All @@ -444,6 +461,8 @@ describe('reconnecting', () => {
});

it('should reconnect silently after socket closes', async () => {
const { url, ...server } = await startTServer();

createClient({
url,
lazy: false,
Expand Down Expand Up @@ -473,6 +492,8 @@ describe('reconnecting', () => {

describe('events', () => {
it('should emit to relevant listeners with expected arguments', async () => {
const { url, ...server } = await startTServer();

const connectingFn = jest.fn(noop as EventListener<'connecting'>);
const connectedFn = jest.fn(noop as EventListener<'connected'>);
const closedFn = jest.fn(noop as EventListener<'closed'>);
Expand Down
107 changes: 76 additions & 31 deletions src/tests/fixtures/simple.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,33 @@ import net from 'net';
import http from 'http';
import { createServer, ServerOptions, Server } from '../../server';

// distinct server for each test; if you forget to dispose, the fixture wont
const leftovers: Dispose[] = [];
afterEach(async () => {
while (leftovers.length > 0) {
// if not disposed by test, cleanup
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const dispose = leftovers.pop()!;
await dispose();
}
});

export interface TServer {
url: string;
server: Server;
clients: Set<WebSocket>;
pong: (key?: string) => void;
waitForClient: (
test?: (client: WebSocket) => void,
expire?: number,
) => Promise<void>;
waitForOperation: (test?: () => void, expire?: number) => Promise<void>;
waitForClientClose: (test?: () => void, expire?: number) => Promise<void>;
dispose: Dispose;
}

type Dispose = (beNice?: boolean) => Promise<void>;

// use for dispatching a `pong` to the `ping` subscription
const pendingPongs: Record<string, number | undefined> = {};
const pongListeners: Record<string, ((done: boolean) => void) | undefined> = {};
Expand Down Expand Up @@ -87,28 +114,16 @@ export const schema = new GraphQLSchema({
}),
});

export interface TServer {
server: Server;
clients: Set<WebSocket>;
pong: (key?: string) => void;
waitForClient: (
test?: (client: WebSocket) => void,
expire?: number,
) => Promise<void>;
waitForOperation: (test?: () => void, expire?: number) => Promise<void>;
waitForClientClose: (test?: () => void, expire?: number) => Promise<void>;
dispose: (beNice?: boolean) => Promise<void>;
}

export const port = 8273,
path = '/graphql-simple',
url = `ws://localhost:${port}${path}`;
// test server finds an open port starting the search from this one
const startPort = 8765;

export async function startTServer(
options: Partial<ServerOptions> = {},
): Promise<TServer> {
const path = '/simple';
const emitter = new EventEmitter();

// prepare http server
const httpServer = http.createServer((_req, res) => {
res.writeHead(404);
res.end();
Expand All @@ -121,6 +136,7 @@ export async function startTServer(
httpServer.once('close', () => sockets.delete(socket));
});

// create server and hook up for tracking operations
let pendingOperations = 0;
const server = await createServer(
{
Expand All @@ -146,7 +162,28 @@ export async function startTServer(
},
);

await new Promise((resolve) => httpServer.listen(port, resolve));
// search for open port from the starting port
let port = startPort;
for (;;) {
try {
await new Promise((resolve, reject) => {
httpServer.once('error', reject);
httpServer.once('listening', resolve);
httpServer.listen(port);
});
break; // listening
} catch (err) {
if ('code' in err && err.code === 'EADDRINUSE') {
port++;
if (port - startPort > 256) {
throw new Error(`Cant find open port, stopping search on ${port}`);
}
continue; // try another one if this port is in use
} else {
throw err; // throw all other errors immediately
}
}
}

// pending websocket clients
let pendingCloses = 0;
Expand All @@ -159,7 +196,28 @@ export async function startTServer(
});
});

// disposes of all started servers
const dispose: Dispose = (beNice) => {
return new Promise((resolve, reject) => {
if (!beNice) {
for (const socket of sockets) {
socket.destroy();
sockets.delete(socket);
}
}
const disposing = server.dispose() as Promise<void>;
disposing.catch(reject).then(() => {
httpServer.close(() => {
leftovers.splice(leftovers.indexOf(dispose), 1);
resolve();
});
});
});
};
leftovers.push(dispose);

return {
url: `ws://localhost:${port}${path}`,
server,
get clients() {
return server.webSocketServer.clients;
Expand Down Expand Up @@ -224,19 +282,6 @@ export async function startTServer(
}
});
},
dispose(beNice) {
return new Promise((resolve, reject) => {
if (!beNice) {
for (const socket of sockets) {
socket.destroy();
sockets.delete(socket);
}
}
const disposing = server.dispose() as Promise<void>;
disposing.catch(reject).then(() => {
httpServer.close(() => resolve());
});
});
},
dispose,
};
}
Loading

0 comments on commit e4fa7fa

Please sign in to comment.