Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed detach #576

Merged
merged 7 commits into from
Dec 13, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions client/grpc-web-fake-transport/src/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ describe("FakeTransportBuilder", () => {
it("should allow the response messages to be stubbed", done => {
const pingResponseMsg = makePingResponse("hello");
const transport = new FakeTransportBuilder()
.withManualTrigger()
.withMessages([ pingResponseMsg ])
.build();

Expand All @@ -20,6 +21,7 @@ describe("FakeTransportBuilder", () => {
it("should allow the response trailers to be stubbed", done => {
const expectedTrailers = new grpc.Metadata({ foo: "bar" });
const transport = new FakeTransportBuilder()
.withManualTrigger()
Copy link
Contributor

@jonny-improbable jonny-improbable Nov 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you've made it be a manual trigger, but I don't see you triggering it anywhere in the test? I don't follow, you please explain this change?

.withTrailers(expectedTrailers)
.build();

Expand All @@ -31,6 +33,7 @@ describe("FakeTransportBuilder", () => {

it("should allow an error to be injected before any headers are sent", done => {
const transport = new FakeTransportBuilder()
.withManualTrigger()
.withPreHeadersError(500)
.withMessages([ makePingResponse("hello") ])
.build();
Expand All @@ -50,6 +53,7 @@ describe("FakeTransportBuilder", () => {
const pingResponseMsg = makePingResponse("hello");
const trailers = new grpc.Metadata({ foo: "bar" });
const transport = new FakeTransportBuilder()
.withManualTrigger()
.withPreMessagesError(grpc.Code.FailedPrecondition, "failed precondition :)")
.withMessages([ pingResponseMsg ])
.withTrailers(trailers)
Expand All @@ -75,6 +79,7 @@ describe("FakeTransportBuilder", () => {
const pingResponseMsg = makePingResponse("hello");
const trailers = new grpc.Metadata({ foo: "bar" });
const transport = new FakeTransportBuilder()
.withManualTrigger()
.withPreTrailersError(grpc.Code.FailedPrecondition, "failed precondition :)")
.withMessages([ pingResponseMsg ])
.withTrailers(trailers)
Expand All @@ -100,6 +105,7 @@ describe("FakeTransportBuilder", () => {
it("should not be called if no message is sent", done => {
const messageSpy = jest.fn();
const transport = new FakeTransportBuilder()
.withManualTrigger()
.withMessageListener(messageSpy)
.build();

Expand All @@ -115,6 +121,7 @@ describe("FakeTransportBuilder", () => {
const expectedBytes = frameRequest(req);

const transport = new FakeTransportBuilder()
.withManualTrigger()
.withMessageListener(messageSpy)
.build();

Expand All @@ -132,6 +139,7 @@ describe("FakeTransportBuilder", () => {
const expectedBytes = [ frameRequest(reqA), frameRequest(reqB) ];

const transport = new FakeTransportBuilder()
.withManualTrigger()
.withMessageListener(messageSpy)
.build();

Expand All @@ -151,6 +159,7 @@ describe("FakeTransportBuilder", () => {
const expectedHeaders = new grpc.Metadata({ expected: "header" });

const transport = new FakeTransportBuilder()
.withManualTrigger()
.withHeadersListener(headersSpy)
.build();

Expand All @@ -163,6 +172,7 @@ describe("FakeTransportBuilder", () => {

client.start(expectedHeaders);
client.finishSend();
transport.sendAll();
})
});

Expand All @@ -171,6 +181,7 @@ describe("FakeTransportBuilder", () => {
const requestSpy = jest.fn();

const transport = new FakeTransportBuilder()
.withManualTrigger()
.withRequestListener(requestSpy)
.build();

Expand All @@ -189,6 +200,7 @@ describe("FakeTransportBuilder", () => {
const cancelSpy = jest.fn();

const transport = new FakeTransportBuilder()
.withManualTrigger()
.withCancelListener(cancelSpy)
.build();

Expand All @@ -208,6 +220,7 @@ describe("FakeTransportBuilder", () => {
const finishSendSpy = jest.fn();

const transport = new FakeTransportBuilder()
.withManualTrigger()
.withFinishSendListener(finishSendSpy)
.build();

Expand All @@ -233,6 +246,7 @@ describe("FakeTransportBuilder", () => {
const onEndSpy = jest.fn();

const transport = new FakeTransportBuilder()
.withManualTrigger()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated line.

.withManualTrigger()
.withHeaders(new grpc.Metadata({ header: "value" }))
.withMessages([ makePingResponse("msgA") ])
Expand All @@ -258,8 +272,6 @@ describe("FakeTransportBuilder", () => {

transport.sendHeaders();

// Note that we must defer the assertions as the grpc-web package detaches all callbacks, so they
// will not run immediately.
setTimeout(() => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You've removed the comment, but not the timeout :)

expect(onHeadersSpy).toHaveBeenCalled();
expect(onMessageSpy).not.toHaveBeenCalled();
Expand All @@ -280,7 +292,6 @@ describe("FakeTransportBuilder", () => {
done();
}, 0);
}, 0);

}, 0);
});
});
Expand Down Expand Up @@ -342,6 +353,7 @@ describe("FakeTransportBuilder", () => {
client.start();
client.send(req);
client.finishSend();
transport.sendAll();
}

function doPingStreamRequest(transport: TriggerableTransport, requests: PingRequest[], callback: (data: RequestResponse<PingResponse>) => void) {
Expand All @@ -351,5 +363,6 @@ describe("FakeTransportBuilder", () => {
client.send(req);
}
client.finishSend();
transport.sendAll();
}
});
});
39 changes: 27 additions & 12 deletions client/grpc-web/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import {Metadata} from "./metadata";
import {ChunkParser, Chunk, ChunkType} from "./ChunkParser";
import {Code, httpStatusToCode} from "./Code";
import {debug} from "./debug";
import detach from "./detach";
import {Transport, TransportFactory, makeDefaultTransport} from "./transports/Transport";
import {MethodDefinition} from "./service";
import {frameRequest} from "./util";
Expand Down Expand Up @@ -202,20 +201,28 @@ class GrpcClient<TRequest extends ProtobufMessage, TResponse extends ProtobufMes
this.completed = true;

this.onEndCallbacks.forEach(callback => {
detach(() => {
if (this.closed) return;
if (this.closed) return;
try {
callback(code, message, trailers);
});
} catch (e) {
setTimeout(() => {
throw e;
})
}
});
}

rawOnHeaders(headers: Metadata) {
this.props.debug && debug("rawOnHeaders", headers);
if (this.completed) return;
this.onHeadersCallbacks.forEach(callback => {
detach(() => {
try {
callback(headers);
});
} catch (e) {
setTimeout(() => {
throw e;
})
}
});
}

Expand All @@ -224,21 +231,29 @@ class GrpcClient<TRequest extends ProtobufMessage, TResponse extends ProtobufMes
if (this.completed) return;
this.completed = true;
this.onEndCallbacks.forEach(callback => {
detach(() => {
if (this.closed) return;
if (this.closed) return;
try {
callback(code, msg, trailers);
});
} catch (e) {
setTimeout(() => {
throw e;
})
}
});
}

rawOnMessage(res: TResponse) {
this.props.debug && debug("rawOnMessage", res.toObject());
if (this.completed || this.closed) return;
this.onMessageCallbacks.forEach(callback => {
detach(() => {
if (this.closed) return;
if (this.closed) return;
try {
callback(res);
});
} catch (e) {
setTimeout(() => {
throw e;
})
}
});
}

Expand Down
42 changes: 1 addition & 41 deletions client/grpc-web/src/detach.ts
Original file line number Diff line number Diff line change
@@ -1,43 +1,3 @@
// awaitingExecution is null if there is no current timer
let awaitingExecution: Array<() => void> | null = null;

function runCallbacks() {
if (awaitingExecution) {
// Use a new reference to the awaitingExecution array to allow callbacks to add to the "new" awaitingExecution array
const thisCallbackSet = awaitingExecution;
awaitingExecution = null;
for (let i = 0; i < thisCallbackSet.length; i++) {
try {
thisCallbackSet[i]();
} catch (e) {
if (awaitingExecution === null) {
awaitingExecution = [];
setTimeout(() => {
runCallbacks();
}, 0);
}
// Add the remaining callbacks to the array so that they can be invoked on the next pass
for (let k = thisCallbackSet.length - 1; k > i; k--) {
awaitingExecution.unshift(thisCallbackSet[k]);
}
// rethrow the error
throw e;
}
}
}
}

// detach executes the callbacks in the order they are added with no context - this is used to avoid errors thrown
// in user callbacks being caught by handlers such as fetch's catch. This function is necessary as setTimeout in
// Safari is prone to switching the order of execution of setTimeout(0).
export default function detach(cb: () => void) {
if (awaitingExecution !== null) {
// there is a timer running, add to the list and this function will be executed with that existing timer
awaitingExecution.push(cb);
return;
}
awaitingExecution = [cb];
setTimeout(() => {
runCallbacks();
}, 0);
cb();
}
21 changes: 5 additions & 16 deletions client/grpc-web/src/transports/http/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import {Metadata} from "../../metadata";
import {Transport, TransportFactory, TransportOptions} from "../Transport";
import {debug} from "../../debug";
import detach from "../../detach";

type Omit<T, K extends keyof T> = Pick<T, Exclude<keyof T, K>>
export type FetchTransportInit = Omit<RequestInit, "headers" | "method" | "body" | "signal">;
Expand Down Expand Up @@ -44,14 +43,10 @@ class Fetch implements Transport {
this.reader.read()
.then((result: { done: boolean, value: Uint8Array }) => {
if (result.done) {
detach(() => {
this.options.onEnd();
});
this.options.onEnd();
return res;
}
detach(() => {
this.options.onChunk(result.value);
});
this.options.onChunk(result.value);
this.pump(this.reader, res);
return;
})
Expand All @@ -62,9 +57,7 @@ class Fetch implements Transport {
}
this.cancelled = true;
this.options.debug && debug("Fetch.catch", err.message);
detach(() => {
this.options.onEnd(err);
});
this.options.onEnd(err);
});
}

Expand All @@ -77,9 +70,7 @@ class Fetch implements Transport {
signal: this.controller && this.controller.signal
}).then((res: Response) => {
this.options.debug && debug("Fetch.response", res);
detach(() => {
this.options.onHeaders(new Metadata(res.headers as any), res.status);
});
this.options.onHeaders(new Metadata(res.headers as any), res.status);
if (res.body) {
this.pump(res.body.getReader(), res)
return;
Expand All @@ -92,9 +83,7 @@ class Fetch implements Transport {
}
this.cancelled = true;
this.options.debug && debug("Fetch.catch", err.message);
detach(() => {
this.options.onEnd(err);
});
this.options.onEnd(err);
});
}

Expand Down
21 changes: 5 additions & 16 deletions client/grpc-web/src/transports/http/xhr.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import {Metadata} from "../../metadata";
import {Transport, TransportFactory, TransportOptions} from "../Transport";
import {debug} from "../../debug";
import detach from "../../detach";
import {detectMozXHRSupport, detectXHROverrideMimeTypeSupport} from "./xhrUtil";

export interface XhrTransportInit {
Expand Down Expand Up @@ -37,24 +36,18 @@ export class XHR implements Transport {
const rawText = this.xhr.response.substr(this.index);
this.index = this.xhr.response.length;
const asArrayBuffer = stringToArrayBuffer(rawText);
detach(() => {
this.options.onChunk(asArrayBuffer);
});
this.options.onChunk(asArrayBuffer);
}

onLoadEvent() {
this.options.debug && debug("XHR.onLoadEvent");
detach(() => {
this.options.onEnd();
});
this.options.onEnd();
}

onStateChange() {
this.options.debug && debug("XHR.onStateChange", this.xhr.readyState);
if (this.xhr.readyState === XMLHttpRequest.HEADERS_RECEIVED) {
detach(() => {
this.options.onHeaders(new Metadata(this.xhr.getAllResponseHeaders()), this.xhr.status);
});
this.options.onHeaders(new Metadata(this.xhr.getAllResponseHeaders()), this.xhr.status);
}
}

Expand Down Expand Up @@ -85,9 +78,7 @@ export class XHR implements Transport {
xhr.addEventListener("loadend", this.onLoadEvent.bind(this));
xhr.addEventListener("error", (err: ErrorEvent) => {
this.options.debug && debug("XHR.error", err);
detach(() => {
this.options.onEnd(err.error);
});
this.options.onEnd(err.error);
});
}

Expand All @@ -114,9 +105,7 @@ export class MozChunkedArrayBufferXHR extends XHR {
onProgressEvent() {
const resp = this.xhr.response;
this.options.debug && debug("MozXHR.onProgressEvent: ", new Uint8Array(resp));
detach(() => {
this.options.onChunk(new Uint8Array(resp));
});
this.options.onChunk(new Uint8Array(resp));
}
}

Expand Down
Loading