Skip to content

Commit

Permalink
Send channels from one session to another
Browse files Browse the repository at this point in the history
Closes #22.
  • Loading branch information
mcollina committed Sep 1, 2014
1 parent 1587f70 commit 3992409
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 29 deletions.
4 changes: 4 additions & 0 deletions lib/channels.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ ReadChannel.prototype.forceClose = function forceClose(cb) {
this.end();
};

ReadChannel.prototype.readChannel = true;

function WriteChannel(session, id) {
Channel.call(this, session, id);

Expand Down Expand Up @@ -81,6 +83,8 @@ WriteChannel.prototype.forceClose = function forceClose(cb) {
this.end();
};

WriteChannel.prototype.writeChannel = true;

function ByteStream(session, id) {
if (!(this instanceof ByteStream)) {
return new ByteStream(session, id);
Expand Down
33 changes: 28 additions & 5 deletions lib/encoder.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ var ReadableN = require('stream').Readable;
var WritableN = require('stream').Writable;
var Readable = require('readable-stream').Readable;
var Writable = require('readable-stream').Writable;
var topChannels = require('./channels');

function Encoder(session, channels) {
if (!(this instanceof Encoder)) {
Expand All @@ -22,7 +23,30 @@ function Encoder(session, channels) {
var WriteChannel = channels.WriteChannel;
var ByteStream = channels.ByteStream;

function rewriteChannel(chan) {

if (chan._session === session) {
return chan;
}

var newChan;

if (chan.readChannel) {
newChan = that.encodingChannel.createWriteChannel();
chan.pipe(newChan);
} else if (chan.writeChannel) {
newChan = that.encodingChannel.createReadChannel();
newChan.pipe(chan);
} else {
throw new Error('specify readChannel or writeChannel property for streams not in the current channels');
}

return newChan;
}

function encodeChannel(chan) {
chan = rewriteChannel(chan);

// hack, let's just use 4 bytes
var buf = new Buffer(5);
if (chan instanceof ReadChannel) {
Expand Down Expand Up @@ -101,22 +125,21 @@ function Encoder(session, channels) {
this._msgpack.register(0x2, WritableN, encodeStream, noop);
this._msgpack.register(0x2, Writable, encodeStream, noop);
// no need for Duplex* because it will catched by Readable*

this._msgpack.register(0x1, topChannels.Channel, encodeChannel, noop);
}

inherits(Encoder, EventEmitter);

Encoder.prototype.encode = function encode(obj) {
Encoder.prototype.encode = function encode(obj, channel) {
this.encodingChannel = channel;
return this._msgpack.encode(obj);
};

Encoder.prototype.decode = function decode(obj) {
return this._msgpack.decode(obj);
};

Encoder.prototype.encoder = function (opts) {
return this._msgpack.encoder(opts);
};

Encoder.prototype.decoder = function (opts) {
return this._msgpack.decoder(opts);
};
Expand Down
2 changes: 1 addition & 1 deletion lib/spdy/channels.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ SPDYWriteChannel.prototype.handle = handle;

SPDYWriteChannel.prototype._transform = function transform(buf, enc, done) {
try {
var encoded = this._session.encoder.encode(buf);
var encoded = this._session.encoder.encode(buf, this);
this.push(encoded.slice(0, encoded.length));
} catch(err) {
this.emit('error', err);
Expand Down
13 changes: 8 additions & 5 deletions lib/stream/channels.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ function dispatchData(obj, enc, done) {
this._firstChunk = false;
}

this._session._dispatch(msg, done);
this._session._dispatch(msg, this, done);
}

StreamWriteChannel.prototype._transform = dispatchData;

StreamWriteChannel.prototype._flush = function(done) {
this._session._dispatch({
id: this.id
}, done);
}, this, done);
};

module.exports.WriteChannel = StreamWriteChannel;
Expand All @@ -58,9 +58,12 @@ function ByteStream(session, id) {
this._finished = false;

this.on('finish', function() {
this._finished = true;
session._dispatch({
var that = this;
this._session._dispatch({
id: id
}, this, function() {
that._finished = true;
that.emit('finishDispatched');
});
});
}
Expand Down Expand Up @@ -90,7 +93,7 @@ ByteStream.prototype.dispatch = function(chunk, done) {

ByteStream.prototype.forceClose = function forceClose(cb) {
if (cb && !this._finished) {
this.on('finish', cb);
this.on('finishDispatched', cb);
} else if (cb) {
cb();
}
Expand Down
80 changes: 67 additions & 13 deletions lib/stream/session.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ function StreamSession(inStream, outStream, opts, server) {

this._isServer = opts.server || false;

if (opts.header === false) {
this._haveHeaders = false;
} else {
this._haveHeaders = true;
}

this._inStream = inStream;
this._outStream = outStream;
this._delayedChannels = [];
Expand Down Expand Up @@ -77,9 +83,6 @@ function StreamSession(inStream, outStream, opts, server) {
}
}));

this._writePipe = this._encoder.encoder(opts);
this._writePipe.pipe(outStream);

this.on('newListener', function(event, listener) {
var chan;
if (event === 'channel') {
Expand All @@ -89,8 +92,6 @@ function StreamSession(inStream, outStream, opts, server) {
}
});

this._writePipe.on('error', this.emit.bind(this, 'error'));

this._encoder.on('channel', function(chan) {
if (that._streams[chan.id]) {
that._streams[chan.id].pipe(chan, { end: false });
Expand All @@ -102,17 +103,23 @@ function StreamSession(inStream, outStream, opts, server) {
this.on('channel', server);
}

var toClose = 0;
var count = 2;
function complete() {
if (++toClose === 2) {
/*jshint validthis:true */
this.removeListener('finish', complete);
this.removeListener('end', complete);
this.removeListener('close', complete);
if (--count === 0) {
that._closing = false;
that._closed = true;
that.emit('close');
}
}

that._inStream.on('end', complete);
that._writePipe.on('end', complete);
that._outStream.on('finish', complete);
that._inStream.on('close', complete);
that._outStream.on('close', complete);
}

inherits(StreamSession, EventEmitter);
Expand Down Expand Up @@ -148,8 +155,37 @@ StreamSession.prototype.createWriteChannel = function createWriteChannel() {
return this._createWriteChannel();
};

StreamSession.prototype._dispatch = function dispatch(obj, done) {
this._writePipe.write(obj, done);
StreamSession.prototype._dispatch = function dispatch(obj, chan, done) {
if (this._closing) {
if (done) {
// we are closing everything anyway
done();
}
return this;
}

try {
var encoded = this._encoder.encode(obj, chan).slice(0);

// header logic copied from msgpack5.encoder
if (this._haveHeaders) {
var header = new Buffer(4);
header.writeUInt32BE(encoded.length, 0);
this._outStream.write(header);
}

if (this._outStream.write.length === 2 ) {
this._outStream.write(encoded, done);
} else {
this._outStream.write(encoded);
done();
}
} catch(err) {
done();
// swallow any closing error
this.emit('error', err);
}

return this;
};

Expand All @@ -173,15 +209,33 @@ StreamSession.prototype.close = function close(done) {
cb();
});
}, function() {
that._inStream.end();
that._writePipe.end();
if (that._inStream.destroy) {
that._inStream.destroy();
} else if (that._inStream.end) {
that._inStream.end();
} else if (done) {
done(new Error('unable to close inStream'));
} else {
throw new Error('unable to close inStream');
}

if (that._outStream.destroy) {
that._outStream.destroy();
} else if (that._outStream.end) {
that._outStream.end();
} else if (done) {
done(new Error('unable to close outStream'));
} else {
throw new Error('unable to close outStream');
}

// consume all awaiting messages
try {
that._inStream.resume();
} catch(err) {}

try {
that._writePipe.resume();
that._outStream.resume();
} catch(err) {}
});

Expand Down
111 changes: 107 additions & 4 deletions test/abstract_session.js
Original file line number Diff line number Diff line change
Expand Up @@ -410,14 +410,12 @@ module.exports = function abstractSession(builder) {
outSession.on('error', function() {});
});

chan.write({
chan.end({
bin: bin
});

inSession.on('channel', function server(chan) {
chan.on('data', function() {
// skip it
});
chan.resume(); // skip all of it
});
});
});
Expand Down Expand Up @@ -529,4 +527,109 @@ module.exports = function abstractSession(builder) {
outSession.close();
});
});

describe('double pair', function() {

var inSession2;
var outSession2;

beforeEach(function buildSessions2(done) {
builder(function(err, inS, out) {
if (err) {
return done(err);
}

inSession2 = inS;

outSession2 = out;

done();
});
});

afterEach(function closeOutSession2(done) {
outSession2.close(function() {
// avoid errors
done();
});
});

afterEach(function closeInSession2(done) {
inSession2.close(function() {
// avoid errors
done();
});
});

it('should pass ReadChannel between sessions', function(done) {
(function client1() {
var chan = outSession.createWriteChannel();
var ret = chan.createReadChannel();

ret.on('data', function(data) {
data.chan.end({ hello: 'world' });
});

chan.write({ ret: ret });
})();

function client2() {
var chan = outSession2.createWriteChannel();
var ret = chan.createReadChannel();

ret.on('data', function(data) {
expect(data).to.eql({ hello: 'world' });
done();
});

chan.write({ chan: ret });
}

(function server() {
inSession.once('channel', function(channel1) {
client2();
channel1.on('data', function(msg) {
inSession2.once('channel', function(channel2) {
channel2.pipe(msg.ret);
});
});
});
})();
});

it('should pass WriteChannel between session', function(done) {
(function client1() {
var chan = outSession.createWriteChannel();
var more = chan.createWriteChannel();

chan.write({ more: more });
more.write({ hello: 'world' });
})();

function client2() {
var chan = outSession2.createWriteChannel();
var ret = chan.createReadChannel();

ret.on('data', function(msg) {
msg.more.on('data', function(data) {
expect(data).to.eql({ hello: 'world' });
done();
});
});

chan.write({ ret: ret });
}

(function server() {
inSession.once('channel', function(channel1) {
client2();
inSession2.once('channel', function(channel2) {
channel2.on('data', function(msg) {
channel1.pipe(msg.ret);
});
});
});
})();
});
});
};
Loading

0 comments on commit 3992409

Please sign in to comment.