-
Notifications
You must be signed in to change notification settings - Fork 7.3k
enable stream.Writable#_flush([callback]) #5315
Comments
So, it sounds like your Writable is calling the _write() callback before it's actually done writing that chunk. Why are you doing that, exactly?
Why doesn't it work if others are listening to 'finish'? Events can have lots of listeners, that's fine. The way that your stream doesn't comply with the Writable paradigm is that it's calling the write() callback (and, perhaps, the end() callback) before it's actually written or ended. Maybe what you need is a Transform that takes arbitrary-length chunks, and writes out fixed-length chunks, but even then, a reader can read(n) in whatever sizes they like... Still somewhat missing the intent here. |
@isaacs For |
@isaacs, at present 'finish' really means 'finishing'. The underlying resource can't be re-used during 'finishing' so emitting 'finish' to listeners outside the stream at that stage would be wrong. We'd have to tell our listeners to listen for a different event. Re write() callbacks, if we don't callback in nextTick, the client won't deliver more input if it does subsequent write() or end() in the callback, which would stall the stream. |
You can call the _write callback now or whenever. The Writable machinery will detect a sync call, and defer whatever it has to for your stream to behave correctly. How are you getting the stream to stall? 'finish' means: end() was called, and all the _write callbacks were called, indicating that the stream should be fully flushed. It sounds like you're calling the _write callbacks early, before the chunk is actually "done". There's no guarantee that all the resources are cleaned up when "finish" is emitted, only that everything you gave it to do is done. Most streams in node-core emit a "close" event when they've cleaned up their underlying resources. I'm sorry, I'm still not understanding what the problem is with something like this: function MyWritable(options) {
if (!(this instanceof MyWritable)) return new MyWritable(options);
Writable.call(this, options);
this._resource = getResourceThingieOrWhatever();
this.once('finish', this.destroy);
}
MyWritable.prototype.destroy = function(cb) {
if (cb) this.once('close', cb);
var me = this;
this._resource.close(function() {
me.emit('close');
});
};
// This is kind of silly, I mean, you're only saving the cost of a
// single JavaScript instance object, which is SUPER cheap.
MyWritable.prototype.reuse = function() {
var me = this;
this.destroy(function() {
me._resource = getResourceThingieOrWhatever();
});
}; What's the problem with self-listening? Why does this assume that no one else can listen to the I still feel like I must be missing something. |
The stream only stalls if we defer the write() callback until all of that buffer is flushed, and the client is calling subsequent write/end via the write callback. We fire the callback on nextTick, so it's not a problem. Oh, I didn't realize 'close' was an option! It's not mentioned in the stream.Writable docs. That does the trick. I'm not opposed to self-listening. @TooTallNate does this solve your case? |
It's not mentioned in the stream.Writable docs because it's not really a requirement of the API. Not all streams emit it, but perhaps it would be worthwhile to mention it anyway as an option. The streams that DO emit 'close' generally all do so to indicate that the underlying resource is completely cleaned up: for sockets and fs streams, this means the handle or file descriptor is closed; for zlib, it's after zlib_free; for crypto, it's after disposing ofthe openssl object, etc.
Right, but isn't that just another way to say that you're completely giving up on backpressure? What happens if you pipe a giant file into it, or a socket that sends you many terabytes of information? The "stalling" until the write callback is called is so that you can accurately communicate the information back to the client. |
If 'close' is a common event that a Writable subclass implementer should consider, then please do document it. If it was, you'd have at least saved yourself this entire discussion :-) I''ll edit this issue to request doc for 'close' unless @TooTallNate has further input. We can apply backpressure as necessary by deferring write() callbacks when we don't need the next batch of data immediately. |
@networkimprov Better to create a new issue, I'll close this one. Out of curiosity, what is this actual Writable? |
The Writable in question is an interface to the Xdelta (binary diff tool) patch mechanism. You stream the patch into the Writable and it reads from a source fd and writes to a destination fd. https://github.com/networkimprov/node-xdelta3/ New issue filed: #5336 |
I see. Yeah, that sounds a bit tricky. So, when you are done writing, perhaps the last bit of the patch is consumed, but it's still not "done", since it has to finish reading the input fd and writing to the output fd. Why is there a minimum amount that you can write, though? I mean... why not wait on the _write() callback until that bit of the patch is actually fully consumed? Do you just not get a notice of this, or does the underlying lib specify that you have to write in X-sized blocks? |
We need a complete patch to generate output. When we get a buffer from write, it may not be a whole patch, in which case we'd callback to ack and keep the stream going. We could defer callbacks once we have a whole patch to hand to the engine, but I'm not sure whether we know where the patch boundaries are. @mtibeica, can you shed more light on this? |
I see, so the chunk you wrote is "fully consumed", as far as you know, it's just that the internal lib is buffering it or something until they get a full patch, so it makes sense to just do the callback immediately. Am I understanding that correctly? If so, yeah, more and more, it sounds like you should listen on finish and emit 'close' when it's cleaned up, and you're probably doing the right thing. Thanks for the added details, that is very illuminating. |
I need to feed the xdelta lib XD3_ALLOCSIZE bytes at a time, so when a chunk is fed to _write I must keep a part of it for further processing (which I will concatenate with the next chunk). The problem is that I need to know when the last _write was called so that I can process the remaining chunk. |
Fixes a race condition with active uploads and end being called. Also remvoes the complexity of having to subclass end(). Relevant: * nodejs/node-v0.x-archive#5315 * nodejs/node-v0.x-archive#5336 * https://groups.google.com/forum/#!searchin/nodejs/_write/nodejs/ydWIyPrDVB4/Rt4EZk0YNgcJ
I've reviewed this thread and I'm not sure it addresses the use-case where a Writable stream has an internal buffer that needs to be flushed. To flush it, the writable stream needs to be able to able to detect when there's /no more input/ coming. The solutions I see proposed above include:
There's a need to trigger that it's time to flush, as the moment when the final flush has happened, which is already covered. My specific use case is that I have an incoming stream of addresses which need geocoding. I have a Writable stream which will send them to a remote geocoding service, but ideally in large batches. I need to know when I've received the last bit of data, so I can flush the last few addresses to the geocoder. I can add a listener to the Readable stream to explicitly flush the Writable stream, but it seems cleaner if the Writable stream could be self-contained and not require this. |
Actually, I've decided to revise my design to use a Transform stream instead of Writable Stream. Transform already has _flush, so I'm all set. |
For a write sream to support flushing on finish, it must listen to itself, flush then emit `close`. USers of such a stream must listen to `close` to know when the data has actually been flushed. Reference: nodejs/node-v0.x-archive#5315 (comment)
Optional method for stream.Writable subclasses. Called when end() is called by the stream client, before 'finish' is emitted. Similar to Transform#_flush()
Discussed here: https://groups.google.com/d/topic/nodejs/ydWIyPrDVB4/discussion
The text was updated successfully, but these errors were encountered: