From a1516b90bf33d13d2b30254279c41e6289b9325d Mon Sep 17 00:00:00 2001 From: Dan Tao Date: Mon, 30 Dec 2013 07:35:13 -0800 Subject: [PATCH] first step in implementing #toStream (see discussion in #53) --- lazy.node.js | 50 +++++++++++++++++++++++++++++++++++++++++++++-- spec/node_spec.js | 17 ++++++++++++++++ 2 files changed, 65 insertions(+), 2 deletions(-) diff --git a/lazy.node.js b/lazy.node.js index 97c6e11..6190116 100644 --- a/lazy.node.js +++ b/lazy.node.js @@ -3,6 +3,7 @@ var http = require("http"); var os = require("os"); var Stream = require("stream"); var URL = require("url"); +var util = require("util"); // The starting point is everything that works in any environment (browser OR // Node.js) @@ -32,10 +33,16 @@ StreamedSequence.prototype.openStream = function(callback) { StreamedSequence.prototype.each = function(fn) { var encoding = this.encoding || "utf-8"; + var handle = new Lazy.AsyncHandle(); + this.openStream(function(stream) { var listener = function(e) { - if (fn(e) === false) { - stream.removeListener("data", listener); + try { + if (fn(e) === false) { + stream.removeListener("data", listener); + } + } catch (e) { + handle.errorCallback(e); } }; @@ -44,7 +51,13 @@ StreamedSequence.prototype.each = function(fn) { } stream.on("data", listener); + + stream.on("end", function() { + handle.completeCallback(); + }); }); + + return handle; }; /** @@ -107,6 +120,39 @@ Lazy.makeHttpRequest = function(url) { return new HttpStreamSequence(url); }; +Lazy.Sequence.prototype.toStream = function toStream(options) { + return new LazyStream(this, options); +}; + +function LazyStream(sequence, options) { + options = Lazy(options || {}) + .extend({ objectMode: true }) + .toObject(); + + Stream.Readable.call(this, options); + + this.sequence = sequence; + this.started = false; +} + +util.inherits(LazyStream, Stream.Readable); + +LazyStream.prototype._read = function() { + var self = this; + + if (!this.started) { + var handle = this.sequence.each(function(e, i) { + self.push(e, i); + }); + if (handle instanceof Lazy.AsyncHandle) { + handle.onComplete(function() { + self.push(null); + }); + } + this.started = true; + } +}; + /* * Add support for `Lazy(Stream)`. */ diff --git a/spec/node_spec.js b/spec/node_spec.js index 8eacc4e..39375cc 100644 --- a/spec/node_spec.js +++ b/spec/node_spec.js @@ -153,4 +153,21 @@ describe("working with streams", function() { }); }); }); + + describe("toStream", function() { + it('creates a readable stream that you can use just like any other stream', function() { + var stream = Lazy(fs.createReadStream('./spec/data/lines.txt')) + .lines() + .map(function(chunk) { return chunk.toUpperCase(); }) + .toStream(); + + var finished = jasmine.createSpy(); + + stream.pipe(fs.createWriteStream('./spec/data/temp.txt')); + + stream.on('end', finished); + + waitsFor(toBeCalled(finished)); + }); + }); });