Skip to content

Commit

Permalink
Merge pull request #223 from ggoodman/fix-pipe-after-socket-connect
Browse files Browse the repository at this point in the history
 Defer piping of options.payload until socket connection
  • Loading branch information
geek authored Jul 24, 2018
2 parents 576ce9d + 0122d11 commit 1315545
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 18 deletions.
17 changes: 16 additions & 1 deletion lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ internals.Client.prototype._request = function (method, url, options, relay, _tr
stream = options.payload.pipe(collector);
}

stream.pipe(req);
internals.deferPipeUntilSocketConnects(req, stream);
return req;
}

Expand All @@ -328,6 +328,21 @@ internals.Client.prototype._request = function (method, url, options, relay, _tr
};


internals.deferPipeUntilSocketConnects = function (req, stream) {

const onSocket = (socket) => {

socket.on('connect', onSocketConnect);
};
const onSocketConnect = () => {

stream.pipe(req);
};

req.on('socket', onSocket);
};


internals.redirectMethod = function (code, method, options) {

switch (code) {
Expand Down
89 changes: 72 additions & 17 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -958,57 +958,57 @@ describe('options.baseUrl', () => {

it('uses baseUrl option without trailing slash and uri is prefixed with a slash', async () => {

const promise = Wreck.request('get', '/foo', { baseUrl: 'http://localhost' });
const promise = Wreck.request('get', '/foo', { baseUrl: 'http://localhost:0' });
await expect(promise).to.reject();
expect(promise.req._headers.host).to.equal('localhost');
expect(promise.req._headers.host).to.equal('localhost:0');
expect(promise.req.path).to.equal('/foo');
});

it('uses baseUrl option with trailing slash and uri is prefixed without a slash', async () => {

const promise = Wreck.request('get', 'foo', { baseUrl: 'http://localhost/' });
const promise = Wreck.request('get', 'foo', { baseUrl: 'http://localhost:0/' });
await expect(promise).to.reject();
expect(promise.req._headers.host).to.equal('localhost');
expect(promise.req._headers.host).to.equal('localhost:0');
expect(promise.req.path).to.equal('/foo');
});

it('uses baseUrl option without trailing slash and uri is prefixed without a slash', async () => {

const promise = Wreck.request('get', 'foo', { baseUrl: 'http://localhost' });
const promise = Wreck.request('get', 'foo', { baseUrl: 'http://localhost:0' });
await expect(promise).to.reject();
expect(promise.req._headers.host).to.equal('localhost');
expect(promise.req._headers.host).to.equal('localhost:0');
expect(promise.req.path).to.equal('/foo');
});

it('uses baseUrl option when uri is an empty string', async () => {

const promise = Wreck.request('get', '', { baseUrl: 'http://localhost' });
const promise = Wreck.request('get', '', { baseUrl: 'http://localhost:0' });
await expect(promise).to.reject();
expect(promise.req._headers.host).to.equal('localhost');
expect(promise.req._headers.host).to.equal('localhost:0');
expect(promise.req.path).to.equal('/');
});

it('uses baseUrl option with a path', async () => {

const promise = Wreck.request('get', '/bar', { baseUrl: 'http://localhost/foo' });
const promise = Wreck.request('get', '/bar', { baseUrl: 'http://localhost:0/foo' });
await expect(promise).to.reject();
expect(promise.req._headers.host).to.equal('localhost');
expect(promise.req._headers.host).to.equal('localhost:0');
expect(promise.req.path).to.equal('/foo/bar');
});

it('uses baseUrl option with a path and removes extra slashes', async () => {

const promise = Wreck.request('get', '/bar', { baseUrl: 'http://localhost/foo/' });
const promise = Wreck.request('get', '/bar', { baseUrl: 'http://localhost:0/foo/' });
await expect(promise).to.reject();
expect(promise.req._headers.host).to.equal('localhost');
expect(promise.req._headers.host).to.equal('localhost:0');
expect(promise.req.path).to.equal('/foo/bar');
});

it('uses baseUrl option with a url that has a querystring', async () => {

const promise = Wreck.request('get', '/bar?test=hello', { baseUrl: 'http://localhost/foo' });
const promise = Wreck.request('get', '/bar?test=hello', { baseUrl: 'http://localhost:0/foo' });
await expect(promise).to.reject();
expect(promise.req._headers.host).to.equal('localhost');
expect(promise.req._headers.host).to.equal('localhost:0');
expect(promise.req.path).to.equal('/foo/bar?test=hello');
});
});
Expand Down Expand Up @@ -1121,6 +1121,47 @@ describe('read()', () => {
expect(err.isBoom).to.equal(true);
});

it('will not pipe the stream if no socket can be established', async () => {

const agent = new internals.SlowAgent();
const stream = new Stream.Readable({
read() {

piped = true;
this.push(null);
}
});
const onPiped = () => {

piped = true;
};
let piped = false;

stream.on('pipe', onPiped);

const promiseA = Wreck.request('post', 'http://localhost:0', {
agent,
payload: stream
});

await expect(promiseA).to.reject(Error, /Unable to obtain socket/);
expect(piped).to.equal(false);

const handler = (req, res) => {

res.writeHead(200);
res.end(internals.payload);
};

const server = await internals.server(handler);
const res = await Wreck.request('post', 'http://localhost:' + server.address().port, {
payload: stream
});
expect(res.statusCode).to.equal(200);
expect(piped).to.equal(true);
server.close();
});

it('times out when stream read takes too long', async () => {

const TestStream = function () {
Expand Down Expand Up @@ -1759,7 +1800,7 @@ describe('Events', () => {
once = true;
});

await expect(wreck.get('http://127.0.0.1', { timeout: 10 })).to.reject();
await expect(wreck.get('http://localhost:0', { timeout: 10 })).to.reject();
expect(once).to.be.true();
});

Expand All @@ -1777,8 +1818,8 @@ describe('Events', () => {
const wreck = Wreck.defaults({ events: true });
wreck.events.on('response', handler);

await expect(wreck.get('http://127.0.0.1', { timeout: 10 })).to.reject();
await expect(wreck.get('http://127.0.0.1', { timeout: 10 })).to.reject();
await expect(wreck.get('http://localhost:0', { timeout: 10 })).to.reject();
await expect(wreck.get('http://localhost:0', { timeout: 10 })).to.reject();
expect(count).to.equal(2);
});

Expand Down Expand Up @@ -1973,6 +2014,12 @@ internals.server = function (handler, socket) {
req.pipe(res);
};
}
else if (handler === 'fail') {
handler = (req, res) => {

res.socket.destroy();
};
}
else if (handler === 'ok') {
handler = (req, res) => {

Expand Down Expand Up @@ -2020,6 +2067,14 @@ internals.https = function (handler) {
};


internals.SlowAgent = class SlowAgent extends Http.Agent {
createConnection(options, cb) {

setTimeout(cb, 200, new Error('Unable to obtain socket'));
}
};


internals.wait = function (timeout) {

return new Promise((resolve) => setTimeout(resolve, timeout));
Expand Down

0 comments on commit 1315545

Please sign in to comment.