From 0757db6203d290ed6b2f9a00ad138c4fe20ae092 Mon Sep 17 00:00:00 2001 From: harkamal Date: Thu, 14 Apr 2022 15:25:30 +0530 Subject: [PATCH 1/8] Update the compressor to chunk the bytes as per the standard --- lib/compress-stream.js | 39 +++++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/lib/compress-stream.js b/lib/compress-stream.js index cf4f72a..763325b 100644 --- a/lib/compress-stream.js +++ b/lib/compress-stream.js @@ -1,3 +1,11 @@ +/** + * As per the snappy framing format for streams, the size of any uncompressed chunk can be + * no longer than 65536 bytes. + * + * From: https://github.com/google/snappy/blob/main/framing_format.txt#L90:L92 + */ +const UNCOMPRESSED_CHUNK_SIZE = 65536; + var Transform = require('stream').Transform , util = require('util') @@ -52,18 +60,25 @@ CompressStream.prototype._uncompressed = function (chunk) { CompressStream.prototype._transform = function (chunk, enc, callback) { var self = this + async function compressChunks() { + try { + for (let startFrom = 0; startFrom < chunk.length; startFrom += UNCOMPRESSED_CHUNK_SIZE) { + const endAt = startFrom + Math.min(chunk.length - startFrom, UNCOMPRESSED_CHUNK_SIZE); + const bytesChunk = chunk.slice(startFrom, endAt); + const compressed = snappy.compressSync(bytesChunk) + if (compressed.length < bytesChunk.length) + self._compressed(bytesChunk, compressed) + else + self._uncompressed(bytesChunk) + + } + callback(); + } catch (err) { + return callback(err); + } + } - snappy.compress(chunk, function (err, compressed) { - if (err) - return callback(err) - - if (compressed.length < chunk.length) - self._compressed(chunk, compressed) - else - self._uncompressed(chunk) - - callback() - }) + compressChunks(); } -module.exports = CompressStream \ No newline at end of file +module.exports = CompressStream From 6e22a09f1e7e7548a1f1c919d112ae704405dcb5 Mon Sep 17 00:00:00 2001 From: harkamal Date: Thu, 14 Apr 2022 15:26:34 +0530 Subject: [PATCH 2/8] bump package version --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index e9716fb..94b31ce 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@chainsafe/snappy-stream", - "version": "5.0.0", + "version": "5.1.0", "description": "Compress data over a Stream using the snappy framing format", "main": "index.js", "scripts": { From ab41c6db34668fe3e9a0ed03393f98df3afdf27b Mon Sep 17 00:00:00 2001 From: harkamal Date: Thu, 14 Apr 2022 23:22:42 +0530 Subject: [PATCH 3/8] test for multi chunk compress --- test/compress-test.js | 53 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/test/compress-test.js b/test/compress-test.js index 8e0b233..5988000 100644 --- a/test/compress-test.js +++ b/test/compress-test.js @@ -1,10 +1,16 @@ var spawn = require('child_process').spawn - , createCompressStream = require('../').createCompressStream , test = require('tap').test , largerInput = require('fs').readFileSync(__filename) , largerInputString = largerInput.toString() +const UNCOMPRESSED_CHUNK_SIZE = 65536 +let superLargeInput = largerInput; +for (let i = largerInput.length; i <= UNCOMPRESSED_CHUNK_SIZE; i += largerInput.length) { + superLargeInput = Buffer.concat([superLargeInput, largerInput]); +} +const superLargeInputString = superLargeInput.toString(); + test('compress small string', function (t) { var child = spawn('python', [ '-m', 'snappy', '-d' ]) , compressStream = createCompressStream() @@ -48,3 +54,48 @@ test('compress large string', function (t) { compressStream.write(largerInputString) compressStream.end() }) + + +test('compress very very large string', function (t) { + var child = spawn('python', [ '-m', 'snappy', '-d' ]) + , compressStream = createCompressStream() + , data = '' + + child.stdout.on('data', function (chunk) { + data = data + chunk.toString() + }) + + child.stdout.on('end', function () { + t.equal(data, superLargeInputString) + t.end() + }) + + child.stderr.pipe(process.stderr) + + compressStream.pipe(child.stdin) + + compressStream.write(superLargeInputString) + compressStream.end() +}) + +test('compress very very large string', function(t) { + var child = spawn('python', ['-m', 'snappy', '-d']), + compressStream = createCompressStream(), + data = '' + + child.stdout.on('data', function(chunk) { + data = data + chunk.toString() + }) + + child.stdout.on('end', function() { + t.equal(data, superLargeInputString) + t.end() + }) + + child.stderr.pipe(process.stderr) + + compressStream.pipe(child.stdin) + + compressStream.write(superLargeInputString) + compressStream.end() +}) From ce0a9080baed3a247d479fe7911c0aded39f2461 Mon Sep 17 00:00:00 2001 From: harkamal Date: Thu, 14 Apr 2022 23:28:01 +0530 Subject: [PATCH 4/8] remove the dupilicated test --- test/compress-test.js | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/test/compress-test.js b/test/compress-test.js index 5988000..d520ee2 100644 --- a/test/compress-test.js +++ b/test/compress-test.js @@ -77,25 +77,3 @@ test('compress very very large string', function (t) { compressStream.write(superLargeInputString) compressStream.end() }) - -test('compress very very large string', function(t) { - var child = spawn('python', ['-m', 'snappy', '-d']), - compressStream = createCompressStream(), - data = '' - - child.stdout.on('data', function(chunk) { - data = data + chunk.toString() - }) - - child.stdout.on('end', function() { - t.equal(data, superLargeInputString) - t.end() - }) - - child.stderr.pipe(process.stderr) - - compressStream.pipe(child.stdin) - - compressStream.write(superLargeInputString) - compressStream.end() -}) From 5c0a678d55488bb208673342f03be92a42ac6d95 Mon Sep 17 00:00:00 2001 From: harkamal Date: Fri, 15 Apr 2022 21:56:03 +0530 Subject: [PATCH 5/8] revering version bump as releases are auto created --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 94b31ce..e9716fb 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@chainsafe/snappy-stream", - "version": "5.1.0", + "version": "5.0.0", "description": "Compress data over a Stream using the snappy framing format", "main": "index.js", "scripts": { From d6c2786f5d215ebef6837ebca6bd338fc6208d57 Mon Sep 17 00:00:00 2001 From: harkamal Date: Mon, 25 Apr 2022 15:04:12 +0530 Subject: [PATCH 6/8] benchmarks --- lib/compress-stream.js | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/lib/compress-stream.js b/lib/compress-stream.js index 763325b..1019bad 100644 --- a/lib/compress-stream.js +++ b/lib/compress-stream.js @@ -58,6 +58,21 @@ CompressStream.prototype._uncompressed = function (chunk) { ) } +/** + * Some compression benchmarks : + * + * i) Chunking in transform with snappy.compressSync (the new implementation) + * ii) Chunking from outside with compressStream.write (using original snappy.compress) + * iii) No chunking (Original) + * + * | Size | in transform | compressStream.write | orginal (no chunking) | + * |-------------------|--------------|----------------------|-----------------------| + * | 10kb (1 chunk) | 0.0229 ms | 0.0385 ms | 0.0388 ms | + * | 100kb (2 chunks) | 0.0562 ms | 0.1051 ms | 0.0844 ms | + * | 1000kb (16 chunks)| 0.382 ms | 0.7971 ms | 0.1998 ms | + * + */ + CompressStream.prototype._transform = function (chunk, enc, callback) { var self = this async function compressChunks() { From 0e7e5efc2dd815ce7f0ca27c0fd880db9fb451e8 Mon Sep 17 00:00:00 2001 From: harkamal Date: Mon, 25 Apr 2022 20:00:13 +0530 Subject: [PATCH 7/8] log unhandled exception --- lib/compress-stream.js | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/lib/compress-stream.js b/lib/compress-stream.js index 1019bad..4331375 100644 --- a/lib/compress-stream.js +++ b/lib/compress-stream.js @@ -73,9 +73,9 @@ CompressStream.prototype._uncompressed = function (chunk) { * */ -CompressStream.prototype._transform = function (chunk, enc, callback) { +CompressStream.prototype._transform = function(chunk, enc, callback) { var self = this - async function compressChunks() { + new Promise(() => { try { for (let startFrom = 0; startFrom < chunk.length; startFrom += UNCOMPRESSED_CHUNK_SIZE) { const endAt = startFrom + Math.min(chunk.length - startFrom, UNCOMPRESSED_CHUNK_SIZE); @@ -91,9 +91,7 @@ CompressStream.prototype._transform = function (chunk, enc, callback) { } catch (err) { return callback(err); } - } - - compressChunks(); + }).catch(e => console.log(e)) } module.exports = CompressStream From 21c7b4c5b5e76cda882dec72409970e65fd5cb7f Mon Sep 17 00:00:00 2001 From: harkamal Date: Tue, 26 Apr 2022 10:57:47 +0530 Subject: [PATCH 8/8] remove self --- lib/compress-stream.js | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/compress-stream.js b/lib/compress-stream.js index 4331375..f542e89 100644 --- a/lib/compress-stream.js +++ b/lib/compress-stream.js @@ -74,7 +74,6 @@ CompressStream.prototype._uncompressed = function (chunk) { */ CompressStream.prototype._transform = function(chunk, enc, callback) { - var self = this new Promise(() => { try { for (let startFrom = 0; startFrom < chunk.length; startFrom += UNCOMPRESSED_CHUNK_SIZE) { @@ -82,9 +81,9 @@ CompressStream.prototype._transform = function(chunk, enc, callback) { const bytesChunk = chunk.slice(startFrom, endAt); const compressed = snappy.compressSync(bytesChunk) if (compressed.length < bytesChunk.length) - self._compressed(bytesChunk, compressed) + this._compressed(bytesChunk, compressed) else - self._uncompressed(bytesChunk) + this._uncompressed(bytesChunk) } callback();