diff --git a/lib/commands/query.js b/lib/commands/query.js index 67386bbfba..e8cd215bb0 100644 --- a/lib/commands/query.js +++ b/lib/commands/query.js @@ -251,7 +251,7 @@ class Query extends Command { if (this.onResult) { this._rows[this._resultIndex].push(row); } else { - this.emit('result', row); + this.emit('result', row, this._resultIndex); } return Query.prototype.row; } @@ -268,11 +268,11 @@ class Query extends Command { stream._read = () => { this._connection && this._connection.resume(); }; - this.on('result', row => { + this.on('result', (row, resultSetIndex) => { if (!stream.push(row)) { this._connection.pause(); } - stream.emit('result', row); // replicate old emitter + stream.emit('result', row, resultSetIndex); // replicate old emitter }); this.on('error', err => { stream.emit('error', err); // Pass on any errors diff --git a/test/integration/test-multi-result-streaming.test.cjs b/test/integration/test-multi-result-streaming.test.cjs new file mode 100644 index 0000000000..81fc2dcc1f --- /dev/null +++ b/test/integration/test-multi-result-streaming.test.cjs @@ -0,0 +1,55 @@ +'use strict'; + +const { assert } = require('poku'); +const { createConnection } = require('../common.test.cjs'); + +(async () => { + const conn = createConnection({ multipleStatements: true }); + const captured1 = []; + const captured2 = []; + const sql1 = + 'select * from information_schema.columns order by table_schema, table_name, column_name limit 1;'; + const sql2 = + 'select * from information_schema.columns order by table_schema, table_name, ordinal_position limit 1;'; + + await conn.promise().query('set global max_allowed_packet=524288000'); + + const compare1 = await conn.promise().query(sql1); + const compare2 = await conn.promise().query(sql2); + + if (!compare1 || compare1.length < 1) { + assert.fail('no results for comparison 1'); + } + if (!compare2 || compare2.length < 1) { + assert.fail('no results for comparison 2'); + } + + const stream = conn.query(`${sql1}\n${sql2}`).stream(); + stream.on('result', (row, datasetIndex) => { + if (datasetIndex === 0) { + captured1.push(row); + } else { + captured2.push(row); + } + }); + // note: this is very important: + // after each result set is complete, + // the stream will emit "readable" and if we don't + // read then 'end' won't be emitted and the + // test will hang. + stream.on('readable', () => { + stream.read(); + }); + + await new Promise((resolve, reject) => { + stream.on('error', (e) => reject(e)); + stream.on('end', () => resolve()); + }); + + assert.equal(captured1.length, 1); + assert.equal(captured2.length, 1); + assert.deepEqual(captured1[0], compare1[0][0]); + assert.deepEqual(captured2[0], compare2[0][0]); + + conn.end(); +})();