Skip to content

Commit

Permalink
Removed detach (#576)
Browse files Browse the repository at this point in the history
* Removed detach

* Included setTimeouts in tests

* Fixed fake transport race

* Rethrowing callback exceptions

* Removed detach suite

* Simplify FakeTransport changes and change autoTrigger behaviour

* Added README explanation of withManualTrigger()
  • Loading branch information
MarcusLongmuir authored and jonny-improbable committed Dec 13, 2019
1 parent 18fb6cf commit cd3cae9
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 261 deletions.
34 changes: 33 additions & 1 deletion client/grpc-web-fake-transport/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@
## Usage

`FakeTransportBuilder` builds a `Transport` that can be configured to send preset headers, messages, and trailers for testing.

By default the `Transport` that `FakeTranportBuilder.build()` generates will trigger all of the specified response behaviours (`.withHeaders(metadata)`, `.withMessages([msgOne, msgTwo])`, `.withTrailers(metadata)`) when the client has finished sending.

This is usually the desired flow as all of the response behaviour is triggered only after the client has finished sending as would most commonly occur in production usage.

However, in the case of bi-directional or other complex usage it can be helpful to use `.withManualTrigger()` to disable automatic sending of messages or headers/trailers and trigger the sending manually using `sendHeaders()`, `sendMessages()` and `sendTrailers()`.

### With Service Stubs generated by ts-protoc-gen
```typescript
import { FakeTransportBuilder } from '@improbable-eng/grpc-web-fake-transport';
Expand Down Expand Up @@ -34,11 +42,35 @@ grpc.invoke(PingService.DoPing, {
});
```

### With `withManualTrigger()`
```typescript
import { grpc } from '@improbable-eng/grpc-web';
import { FakeTransportBuilder } from '@improbable-eng/grpc-web-fake-transport';

const fakeTransport = new FakeTransportBuilder()
.withManualTrigger()
.withHeaders(new grpc.Metadata({ headerKey: "value" }))
.withMessages([ new PingResponse() ])
.withTrailers(new grpc.Metadata({ trailerKey: "value" }))
.build();

grpc.invoke(PingService.DoPing, {
host: "https://example.com",
transport: fakeTransport,
/* ... */
});

// Manually trigger the response behaviours
fakeTransport.sendHeaders();
fakeTransport.sendMessages();
fakeTransport.sendTrailers();
```

Alternatively replace the Default Transport when initialising your tests:
```typescript
import { grpc } from "@improbable-eng/grpc-web";
import { NodeHttpTransport } from "@improbable-eng/grpc-web-fake-transport";

// Do this first, before you make any grpc requests!
grpc.setDefaultTransport(fakeTransport);
```
```
45 changes: 18 additions & 27 deletions client/grpc-web-fake-transport/src/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ describe("FakeTransportBuilder", () => {
const expectedBytes = frameRequest(req);

const transport = new FakeTransportBuilder()
.withMessageListener(messageSpy)
.withMessageListener(messageBytes => {
messageSpy(messageBytes);
})
.build();

doPingStreamRequest(transport, [ req ], () => {
Expand Down Expand Up @@ -227,7 +229,7 @@ describe("FakeTransportBuilder", () => {
});

describe("manual trigger", () => {
it("should allow the consumer to control the lifecycle of the server response", done => {
it("should allow the consumer to control the lifecycle of the server response", () => {
const onHeadersSpy = jest.fn();
const onMessageSpy = jest.fn();
const onEndSpy = jest.fn();
Expand Down Expand Up @@ -258,30 +260,19 @@ describe("FakeTransportBuilder", () => {

transport.sendHeaders();

// Note that we must defer the assertions as the grpc-web package detaches all callbacks, so they
// will not run immediately.
setTimeout(() => {
expect(onHeadersSpy).toHaveBeenCalled();
expect(onMessageSpy).not.toHaveBeenCalled();
expect(onEndSpy).not.toHaveBeenCalled();

transport.sendMessages();
setTimeout(() => {
expect(onHeadersSpy).toHaveBeenCalled();
expect(onMessageSpy).toHaveBeenCalled();
expect(onEndSpy).not.toHaveBeenCalled();

transport.sendTrailers();
setTimeout(() => {
expect(onHeadersSpy).toHaveBeenCalled();
expect(onMessageSpy).toHaveBeenCalled();
expect(onEndSpy).toHaveBeenCalled();

done();
}, 0);
}, 0);

}, 0);
expect(onHeadersSpy).toHaveBeenCalled();
expect(onMessageSpy).not.toHaveBeenCalled();
expect(onEndSpy).not.toHaveBeenCalled();

transport.sendMessages();
expect(onHeadersSpy).toHaveBeenCalled();
expect(onMessageSpy).toHaveBeenCalled();
expect(onEndSpy).not.toHaveBeenCalled();

transport.sendTrailers();
expect(onHeadersSpy).toHaveBeenCalled();
expect(onMessageSpy).toHaveBeenCalled();
expect(onEndSpy).toHaveBeenCalled();
});
});

Expand Down Expand Up @@ -352,4 +343,4 @@ describe("FakeTransportBuilder", () => {
}
client.finishSend();
}
});
});
8 changes: 4 additions & 4 deletions client/grpc-web-fake-transport/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,6 @@ export class FakeTransportBuilder {
if (mock.headersListener) {
mock.headersListener(metadata);
}
if (mock.autoTrigger) {
triggers.sendAll();
}
},
sendMessage: (msgBytes: ArrayBufferView) => {
if (mock.messageListener) {
Expand All @@ -290,6 +287,9 @@ export class FakeTransportBuilder {
if (mock.finishSendListener) {
mock.finishSendListener();
}
if (mock.autoTrigger) {
triggers.sendAll();
}
},
cancel: () => {
if (mock.cancelListener) {
Expand All @@ -308,4 +308,4 @@ export interface TriggerableTransport extends grpc.TransportFactory {
sendMessages(): boolean;
sendTrailers(): boolean;
sendAll(): void;
}
}
39 changes: 27 additions & 12 deletions client/grpc-web/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import {Metadata} from "./metadata";
import {ChunkParser, Chunk, ChunkType} from "./ChunkParser";
import {Code, httpStatusToCode} from "./Code";
import {debug} from "./debug";
import detach from "./detach";
import {Transport, TransportFactory, makeDefaultTransport} from "./transports/Transport";
import {MethodDefinition} from "./service";
import {frameRequest} from "./util";
Expand Down Expand Up @@ -202,20 +201,28 @@ class GrpcClient<TRequest extends ProtobufMessage, TResponse extends ProtobufMes
this.completed = true;

this.onEndCallbacks.forEach(callback => {
detach(() => {
if (this.closed) return;
if (this.closed) return;
try {
callback(code, message, trailers);
});
} catch (e) {
setTimeout(() => {
throw e;
})
}
});
}

rawOnHeaders(headers: Metadata) {
this.props.debug && debug("rawOnHeaders", headers);
if (this.completed) return;
this.onHeadersCallbacks.forEach(callback => {
detach(() => {
try {
callback(headers);
});
} catch (e) {
setTimeout(() => {
throw e;
})
}
});
}

Expand All @@ -224,21 +231,29 @@ class GrpcClient<TRequest extends ProtobufMessage, TResponse extends ProtobufMes
if (this.completed) return;
this.completed = true;
this.onEndCallbacks.forEach(callback => {
detach(() => {
if (this.closed) return;
if (this.closed) return;
try {
callback(code, msg, trailers);
});
} catch (e) {
setTimeout(() => {
throw e;
})
}
});
}

rawOnMessage(res: TResponse) {
this.props.debug && debug("rawOnMessage", res.toObject());
if (this.completed || this.closed) return;
this.onMessageCallbacks.forEach(callback => {
detach(() => {
if (this.closed) return;
if (this.closed) return;
try {
callback(res);
});
} catch (e) {
setTimeout(() => {
throw e;
})
}
});
}

Expand Down
42 changes: 1 addition & 41 deletions client/grpc-web/src/detach.ts
Original file line number Diff line number Diff line change
@@ -1,43 +1,3 @@
// awaitingExecution is null if there is no current timer
let awaitingExecution: Array<() => void> | null = null;

function runCallbacks() {
if (awaitingExecution) {
// Use a new reference to the awaitingExecution array to allow callbacks to add to the "new" awaitingExecution array
const thisCallbackSet = awaitingExecution;
awaitingExecution = null;
for (let i = 0; i < thisCallbackSet.length; i++) {
try {
thisCallbackSet[i]();
} catch (e) {
if (awaitingExecution === null) {
awaitingExecution = [];
setTimeout(() => {
runCallbacks();
}, 0);
}
// Add the remaining callbacks to the array so that they can be invoked on the next pass
for (let k = thisCallbackSet.length - 1; k > i; k--) {
awaitingExecution.unshift(thisCallbackSet[k]);
}
// rethrow the error
throw e;
}
}
}
}

// detach executes the callbacks in the order they are added with no context - this is used to avoid errors thrown
// in user callbacks being caught by handlers such as fetch's catch. This function is necessary as setTimeout in
// Safari is prone to switching the order of execution of setTimeout(0).
export default function detach(cb: () => void) {
if (awaitingExecution !== null) {
// there is a timer running, add to the list and this function will be executed with that existing timer
awaitingExecution.push(cb);
return;
}
awaitingExecution = [cb];
setTimeout(() => {
runCallbacks();
}, 0);
cb();
}
21 changes: 5 additions & 16 deletions client/grpc-web/src/transports/http/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import {Metadata} from "../../metadata";
import {Transport, TransportFactory, TransportOptions} from "../Transport";
import {debug} from "../../debug";
import detach from "../../detach";

type Omit<T, K extends keyof T> = Pick<T, Exclude<keyof T, K>>
export type FetchTransportInit = Omit<RequestInit, "headers" | "method" | "body" | "signal">;
Expand Down Expand Up @@ -44,14 +43,10 @@ class Fetch implements Transport {
this.reader.read()
.then((result: { done: boolean, value: Uint8Array }) => {
if (result.done) {
detach(() => {
this.options.onEnd();
});
this.options.onEnd();
return res;
}
detach(() => {
this.options.onChunk(result.value);
});
this.options.onChunk(result.value);
this.pump(this.reader, res);
return;
})
Expand All @@ -62,9 +57,7 @@ class Fetch implements Transport {
}
this.cancelled = true;
this.options.debug && debug("Fetch.catch", err.message);
detach(() => {
this.options.onEnd(err);
});
this.options.onEnd(err);
});
}

Expand All @@ -77,9 +70,7 @@ class Fetch implements Transport {
signal: this.controller && this.controller.signal
}).then((res: Response) => {
this.options.debug && debug("Fetch.response", res);
detach(() => {
this.options.onHeaders(new Metadata(res.headers as any), res.status);
});
this.options.onHeaders(new Metadata(res.headers as any), res.status);
if (res.body) {
this.pump(res.body.getReader(), res)
return;
Expand All @@ -92,9 +83,7 @@ class Fetch implements Transport {
}
this.cancelled = true;
this.options.debug && debug("Fetch.catch", err.message);
detach(() => {
this.options.onEnd(err);
});
this.options.onEnd(err);
});
}

Expand Down
21 changes: 5 additions & 16 deletions client/grpc-web/src/transports/http/xhr.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import {Metadata} from "../../metadata";
import {Transport, TransportFactory, TransportOptions} from "../Transport";
import {debug} from "../../debug";
import detach from "../../detach";
import {detectMozXHRSupport, detectXHROverrideMimeTypeSupport} from "./xhrUtil";

export interface XhrTransportInit {
Expand Down Expand Up @@ -37,24 +36,18 @@ export class XHR implements Transport {
const rawText = this.xhr.response.substr(this.index);
this.index = this.xhr.response.length;
const asArrayBuffer = stringToArrayBuffer(rawText);
detach(() => {
this.options.onChunk(asArrayBuffer);
});
this.options.onChunk(asArrayBuffer);
}

onLoadEvent() {
this.options.debug && debug("XHR.onLoadEvent");
detach(() => {
this.options.onEnd();
});
this.options.onEnd();
}

onStateChange() {
this.options.debug && debug("XHR.onStateChange", this.xhr.readyState);
if (this.xhr.readyState === XMLHttpRequest.HEADERS_RECEIVED) {
detach(() => {
this.options.onHeaders(new Metadata(this.xhr.getAllResponseHeaders()), this.xhr.status);
});
this.options.onHeaders(new Metadata(this.xhr.getAllResponseHeaders()), this.xhr.status);
}
}

Expand Down Expand Up @@ -85,9 +78,7 @@ export class XHR implements Transport {
xhr.addEventListener("loadend", this.onLoadEvent.bind(this));
xhr.addEventListener("error", (err: ErrorEvent) => {
this.options.debug && debug("XHR.error", err);
detach(() => {
this.options.onEnd(err.error);
});
this.options.onEnd(err.error);
});
}

Expand All @@ -114,9 +105,7 @@ export class MozChunkedArrayBufferXHR extends XHR {
onProgressEvent() {
const resp = this.xhr.response;
this.options.debug && debug("MozXHR.onProgressEvent: ", new Uint8Array(resp));
detach(() => {
this.options.onChunk(new Uint8Array(resp));
});
this.options.onChunk(new Uint8Array(resp));
}
}

Expand Down
Loading

0 comments on commit cd3cae9

Please sign in to comment.