Skip to content

Commit

Permalink
fix(nice-grpc): resolve compatibility issues with grpc-js 1.10.x
Browse files Browse the repository at this point in the history
Closes #555
Closes #607
  • Loading branch information
davidfiala authored Jun 6, 2024
1 parent b3d35c9 commit bdfc754
Show file tree
Hide file tree
Showing 18 changed files with 2,513 additions and 3,130 deletions.
1 change: 1 addition & 0 deletions .nvmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
18
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
},
"devDependencies": {
"@types/jest": "^29.2.4",
"abort-controller": "^3.0.0",
"conventional-changelog-conventionalcommits": "^6.1.0",
"husky": "^9.0.11",
"jest": "^29.3.1",
Expand Down
2 changes: 1 addition & 1 deletion packages/nice-grpc-prometheus/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
"devDependencies": {
"@tsconfig/node14": "^14.1.0",
"@types/defer-promise": "^1.0.0",
"@types/node": "^14.18.23",
"@types/node": "^18.0.0",
"defer-promise": "^2.0.1",
"grpc-tools": "^1.10.0",
"nice-grpc": "^2.1.8",
Expand Down
2 changes: 1 addition & 1 deletion packages/nice-grpc-server-health/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"license": "MIT",
"devDependencies": {
"@tsconfig/node14": "^14.1.0",
"@types/node": "^14.18.23",
"@types/node": "^18.0.0",
"grpc-tools": "^1.10.0",
"jest-os-detection": "^1.3.1",
"nice-grpc-server-middleware-terminator": "^2.0.10",
Expand Down
3 changes: 1 addition & 2 deletions packages/nice-grpc-server-reflection/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@
"license": "MIT",
"devDependencies": {
"@tsconfig/node14": "^14.1.0",
"@types/node": "^14.18.23",
"@types/node": "^18.0.0",
"cpr": "^3.0.1",
"grpc-tools": "^1.11.0",
"grpc_tools_node_protoc_ts": "^5.1.3"
},
"dependencies": {
"@grpc/grpc-js": "~1.9.14",
"@types/google-protobuf": "^3.7.4",
"google-protobuf": "^3.15.6",
"nice-grpc": "^2.1.8"
Expand Down
4 changes: 2 additions & 2 deletions packages/nice-grpc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@
"@tsconfig/node14": "^14.1.0",
"@types/defer-promise": "^1.0.0",
"@types/google-protobuf": "^3.7.4",
"@types/node": "^14.18.23",
"@types/node": "^18.0.0",
"defer-promise": "^2.0.1",
"google-protobuf": "^3.14.0",
"grpc-tools": "^1.10.0",
"grpc_tools_node_protoc_ts": "^5.0.1",
"ts-proto": "^1.112.0"
},
"dependencies": {
"@grpc/grpc-js": "~1.9.14",
"@grpc/grpc-js": "^1.10.8",
"abort-controller-x": "^0.4.0",
"nice-grpc-common": "^2.0.2"
}
Expand Down
243 changes: 240 additions & 3 deletions packages/nice-grpc/src/__tests__/serverStreaming.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,254 @@
import defer = require('defer-promise');
import {forever, isAbortError} from 'abort-controller-x';
import {
createChannel,
createClient,
createServer,
Metadata,
ServerError,
Status,
createChannel,
createClient,
createServer,
} from '..';
import {TestService} from '../../fixtures/grpc-js/test_grpc_pb';
import {TestRequest, TestResponse} from '../../fixtures/grpc-js/test_pb';
import {throwUnimplemented} from './utils/throwUnimplemented';

function waitForAbort(signal: AbortSignal, timeout: number | undefined = 1000) {
return new Promise<void>((resolve, reject) => {
const savedError = new Error('waitForAbort timeout'); // capture stack trace of where the timeout was created
let timeoutId: ReturnType<typeof setTimeout> | undefined = undefined;
const resolverReference = () => {
clearTimeout(timeoutId);
resolve();
};
signal.addEventListener('abort', resolverReference);
if (timeout) {
timeoutId = setTimeout(() => {
signal.removeEventListener('abort', resolverReference);
reject(savedError);
}, timeout);
}
});
}

/**
* Tests that two streaming RPCs one after another work correctly. Specifically tests
* that the server does not reuse the same AbortSignal for both RPCs.
*/
test('back-to-back', async () => {
const server = createServer();

const serverSignal1 = defer<AbortSignal>();
const serverSignal2 = defer<AbortSignal>();

let firstTimeOnly = true;

server.add(TestService, {
async *testServerStream(request: TestRequest, context) {
const first = firstTimeOnly;
firstTimeOnly = false;

if (first) {
serverSignal1.resolve(context.signal);
} else {
serverSignal2.resolve(context.signal);
}

let count = first ? 100 : 200;
while (true) {
yield new TestResponse().setId(`${request.getId()}-${count++}`);
}
},
testUnary: throwUnimplemented,
testClientStream: throwUnimplemented,
testBidiStream: throwUnimplemented,
});

const port = await server.listen('127.0.0.1:0');
const channel = createChannel(`127.0.0.1:${port}`);
const client = createClient(TestService, channel);

const it1 = client
.testServerStream(new TestRequest().setId('first'))
[Symbol.asyncIterator]();

await expect(it1.next()).resolves.toMatchInlineSnapshot(`
{
"done": false,
"value": nice_grpc.test.TestResponse {
"id": "first-100",
},
}
`);
const serverSig1 = await serverSignal1.promise;
expect(serverSig1.aborted).toBe(false);

await expect(it1.next()).resolves.toMatchInlineSnapshot(`
{
"done": false,
"value": nice_grpc.test.TestResponse {
"id": "first-101",
},
}
`);

expect(serverSig1.aborted).toBe(false);
await it1.return?.();
await waitForAbort(serverSig1);

const it2 = client
.testServerStream(new TestRequest().setId('second'))
[Symbol.asyncIterator]();
await expect(it2.next()).resolves.toMatchInlineSnapshot(`
{
"done": false,
"value": nice_grpc.test.TestResponse {
"id": "second-200",
},
}
`);
const serverSig2 = await serverSignal2.promise;
expect(serverSig2.aborted).toBe(false);

await expect(it2.next()).resolves.toMatchInlineSnapshot(`
{
"done": false,
"value": nice_grpc.test.TestResponse {
"id": "second-201",
},
}
`);

expect(serverSig2.aborted).toBe(false);
await it2.return?.();
await waitForAbort(serverSig2);

channel.close();
await server.shutdown();
});

/**
* Tests that two interleaved streaming RPCs work correctly. Specifically tests
* that the server does not reuse the same AbortSignal for both RPCs and that
* messages from both RPCs are routed correctly.
*/
test('interleaved', async () => {
const server = createServer();

const serverSignal1 = defer<AbortSignal>();
const serverSignal2 = defer<AbortSignal>();

let firstTimeOnly = true;

server.add(TestService, {
async *testServerStream(request: TestRequest, context) {
const first = firstTimeOnly;
firstTimeOnly = false;

if (first) {
serverSignal1.resolve(context.signal);
} else {
serverSignal2.resolve(context.signal);
}

let count = first ? 100 : 200;
while (true) {
yield new TestResponse().setId(`${request.getId()}-${count++}`);
}
},
testUnary: throwUnimplemented,
testClientStream: throwUnimplemented,
testBidiStream: throwUnimplemented,
});

const port = await server.listen('127.0.0.1:0');
const channel = createChannel(`127.0.0.1:${port}`);
const client = createClient(TestService, channel);

const it1 = client
.testServerStream(new TestRequest().setId('first'))
[Symbol.asyncIterator]();

await expect(it1.next()).resolves.toMatchInlineSnapshot(`
{
"done": false,
"value": nice_grpc.test.TestResponse {
"id": "first-100",
},
}
`);
const serverSig1 = await serverSignal1.promise;
expect(serverSig1.aborted).toBe(false);

await expect(it1.next()).resolves.toMatchInlineSnapshot(`
{
"done": false,
"value": nice_grpc.test.TestResponse {
"id": "first-101",
},
}
`);

const it2 = client
.testServerStream(new TestRequest().setId('second'))
[Symbol.asyncIterator]();
await expect(it2.next()).resolves.toMatchInlineSnapshot(`
{
"done": false,
"value": nice_grpc.test.TestResponse {
"id": "second-200",
},
}
`);
await expect(it2.next()).resolves.toMatchInlineSnapshot(`
{
"done": false,
"value": nice_grpc.test.TestResponse {
"id": "second-201",
},
}
`);

const serverSig2 = await serverSignal2.promise;
expect(serverSig1.aborted).toBe(false);
expect(serverSig2.aborted).toBe(false);
await expect(it1.next()).resolves.toMatchInlineSnapshot(`
{
"done": false,
"value": nice_grpc.test.TestResponse {
"id": "first-102",
},
}
`);
await expect(it2.next()).resolves.toMatchInlineSnapshot(`
{
"done": false,
"value": nice_grpc.test.TestResponse {
"id": "second-202",
},
}
`);

expect(serverSig1.aborted).toBe(false);
await it1.return?.();
await waitForAbort(serverSig1);

await expect(it2.next()).resolves.toMatchInlineSnapshot(`
{
"done": false,
"value": nice_grpc.test.TestResponse {
"id": "second-203",
},
}
`);

expect(serverSig2.aborted).toBe(false);
await it2.return?.();
await waitForAbort(serverSig2);

channel.close();
await server.shutdown();
});

test('basic', async () => {
const server = createServer();

Expand Down
4 changes: 1 addition & 3 deletions packages/nice-grpc/src/client/createBidiStreamingMethod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import {
MethodDefinition,
toGrpcJsMethodDefinition,
} from '../service-definitions';
import {CompatAbortSignal} from '../utils/compatAbortSignal';
import {
convertMetadataFromGrpcJs,
convertMetadataToGrpcJs,
Expand Down Expand Up @@ -48,8 +47,7 @@ export function createBidiStreamingMethod<Request, Response>(

const {metadata = Metadata(), onHeader, onTrailer} = options;

const signal = (options.signal ??
new AbortController().signal) as CompatAbortSignal;
const signal = options.signal ?? new AbortController().signal;

const pipeAbortController = new AbortController();

Expand Down
4 changes: 1 addition & 3 deletions packages/nice-grpc/src/client/createServerStreamingMethod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import {
MethodDefinition,
toGrpcJsMethodDefinition,
} from '../service-definitions';
import {CompatAbortSignal} from '../utils/compatAbortSignal';
import {
convertMetadataFromGrpcJs,
convertMetadataToGrpcJs,
Expand Down Expand Up @@ -48,8 +47,7 @@ export function createServerStreamingMethod<Request, Response>(

const {metadata = Metadata(), onHeader, onTrailer} = options;

const signal = (options.signal ??
new AbortController().signal) as CompatAbortSignal;
const signal = options.signal ?? new AbortController().signal;

const call = client.makeServerStreamRequest(
grpcMethodDefinition.path,
Expand Down
Loading

0 comments on commit bdfc754

Please sign in to comment.