Skip to content

Commit

Permalink
first step in implementing #toStream (see discussion in #53)
Browse files Browse the repository at this point in the history
  • Loading branch information
dtao committed Dec 30, 2013
1 parent 03a89ab commit a1516b9
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 2 deletions.
50 changes: 48 additions & 2 deletions lazy.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ var http = require("http");
var os = require("os");
var Stream = require("stream");
var URL = require("url");
var util = require("util");

// The starting point is everything that works in any environment (browser OR
// Node.js)
Expand Down Expand Up @@ -32,10 +33,16 @@ StreamedSequence.prototype.openStream = function(callback) {
StreamedSequence.prototype.each = function(fn) {
var encoding = this.encoding || "utf-8";

var handle = new Lazy.AsyncHandle();

this.openStream(function(stream) {
var listener = function(e) {
if (fn(e) === false) {
stream.removeListener("data", listener);
try {
if (fn(e) === false) {
stream.removeListener("data", listener);
}
} catch (e) {
handle.errorCallback(e);
}
};

Expand All @@ -44,7 +51,13 @@ StreamedSequence.prototype.each = function(fn) {
}

stream.on("data", listener);

stream.on("end", function() {
handle.completeCallback();
});
});

return handle;
};

/**
Expand Down Expand Up @@ -107,6 +120,39 @@ Lazy.makeHttpRequest = function(url) {
return new HttpStreamSequence(url);
};

Lazy.Sequence.prototype.toStream = function toStream(options) {
return new LazyStream(this, options);
};

function LazyStream(sequence, options) {
options = Lazy(options || {})
.extend({ objectMode: true })
.toObject();

Stream.Readable.call(this, options);

this.sequence = sequence;
this.started = false;
}

util.inherits(LazyStream, Stream.Readable);

LazyStream.prototype._read = function() {
var self = this;

if (!this.started) {
var handle = this.sequence.each(function(e, i) {
self.push(e, i);
});
if (handle instanceof Lazy.AsyncHandle) {
handle.onComplete(function() {
self.push(null);
});
}
this.started = true;
}
};

/*
* Add support for `Lazy(Stream)`.
*/
Expand Down
17 changes: 17 additions & 0 deletions spec/node_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,21 @@ describe("working with streams", function() {
});
});
});

describe("toStream", function() {
it('creates a readable stream that you can use just like any other stream', function() {
var stream = Lazy(fs.createReadStream('./spec/data/lines.txt'))
.lines()
.map(function(chunk) { return chunk.toUpperCase(); })
.toStream();

var finished = jasmine.createSpy();

stream.pipe(fs.createWriteStream('./spec/data/temp.txt'));

stream.on('end', finished);

waitsFor(toBeCalled(finished));
});
});
});

0 comments on commit a1516b9

Please sign in to comment.