From b629b21634fee3dcdb4818115c79443bcf430b08 Mon Sep 17 00:00:00 2001 From: Katherine Walker Date: Tue, 19 Feb 2019 15:09:09 -0500 Subject: [PATCH] feat(db): add database-level aggregation Fixes NODE-1783 --- lib/collection.js | 95 +---------- lib/db.js | 46 ++++++ lib/operations/aggregate.js | 121 ++++++++++++++ test/functional/aggregation_tests.js | 36 +++++ test/functional/crud_spec_tests.js | 49 ++++++ test/functional/spec/crud/README.rst | 83 ++++++---- .../functional/spec/crud/db/db-aggregate.json | 149 ++++++++++++++++++ test/functional/spec/crud/db/db-aggregate.yml | 60 +++++++ 8 files changed, 514 insertions(+), 125 deletions(-) create mode 100644 lib/operations/aggregate.js create mode 100644 test/functional/spec/crud/db/db-aggregate.json create mode 100644 test/functional/spec/crud/db/db-aggregate.yml diff --git a/lib/collection.js b/lib/collection.js index b8e002bf07..4bc4463362 100644 --- a/lib/collection.js +++ b/lib/collection.js @@ -4,7 +4,6 @@ const deprecate = require('util').deprecate; const deprecateOptions = require('./utils').deprecateOptions; const checkCollectionName = require('./utils').checkCollectionName; const ObjectID = require('mongodb-core').BSON.ObjectID; -const AggregationCursor = require('./aggregation_cursor'); const MongoError = require('mongodb-core').MongoError; const toError = require('./utils').toError; const normalizeHintField = require('./utils').normalizeHintField; @@ -19,10 +18,10 @@ const unordered = require('./bulk/unordered'); const ordered = require('./bulk/ordered'); const ChangeStream = require('./change_stream'); const executeOperation = require('./utils').executeOperation; -const applyWriteConcern = require('./utils').applyWriteConcern; const resolveReadPreference = require('./utils').resolveReadPreference; // Operations +const aggregate = require('./operations/aggregate').aggregate; const bulkWrite = require('./operations/collection_ops').bulkWrite; const checkForAtomicOperators = require('./operations/collection_ops').checkForAtomicOperators; const count = require('./operations/collection_ops').count; @@ -1733,97 +1732,7 @@ Collection.prototype.aggregate = function(pipeline, options, callback) { pipeline = args; } - // Ignore readConcern option - let ignoreReadConcern = false; - - // Build the command - const command = { aggregate: this.s.name, pipeline: pipeline }; - - // If out was specified - if (typeof options.out === 'string') { - pipeline.push({ $out: options.out }); - // Ignore read concern - ignoreReadConcern = true; - } else if (pipeline.length > 0 && pipeline[pipeline.length - 1]['$out']) { - ignoreReadConcern = true; - } - - // Decorate command with writeConcern if out has been specified - if ( - pipeline.length > 0 && - pipeline[pipeline.length - 1]['$out'] && - this.s.topology.capabilities().commandsTakeWriteConcern - ) { - applyWriteConcern(command, { db: this.s.db, collection: this }, options); - } - - // Have we specified collation - try { - decorateWithCollation(command, this, options); - } catch (err) { - if (typeof callback === 'function') return callback(err, null); - throw err; - } - - // If we have bypassDocumentValidation set - if (options.bypassDocumentValidation === true) { - command.bypassDocumentValidation = options.bypassDocumentValidation; - } - - // Do we have a readConcern specified - if (!ignoreReadConcern) { - decorateWithReadConcern(command, this, options); - } - - // If we have allowDiskUse defined - if (options.allowDiskUse) command.allowDiskUse = options.allowDiskUse; - if (typeof options.maxTimeMS === 'number') command.maxTimeMS = options.maxTimeMS; - - // If we are giving a hint - if (options.hint) command.hint = options.hint; - - options = Object.assign({}, options); - // Ensure we have the right read preference inheritance - options.readPreference = resolveReadPreference(options, { db: this.s.db, collection: this }); - - // If explain has been specified add it - if (options.explain) { - if (command.readConcern || command.writeConcern) { - throw toError('"explain" cannot be used on an aggregate call with readConcern/writeConcern'); - } - command.explain = options.explain; - } - - if (typeof options.comment === 'string') command.comment = options.comment; - - // Validate that cursor options is valid - if (options.cursor != null && typeof options.cursor !== 'object') { - throw toError('cursor options must be an object'); - } - - options.cursor = options.cursor || {}; - if (options.batchSize) options.cursor.batchSize = options.batchSize; - command.cursor = options.cursor; - - // promiseLibrary - options.promiseLibrary = this.s.promiseLibrary; - - // Set the AggregationCursor constructor - options.cursorFactory = AggregationCursor; - if (typeof callback !== 'function') { - if (!this.s.topology.capabilities()) { - throw new MongoError('cannot connect to server'); - } - - // Allow disk usage command - if (typeof options.allowDiskUse === 'boolean') command.allowDiskUse = options.allowDiskUse; - if (typeof options.maxTimeMS === 'number') command.maxTimeMS = options.maxTimeMS; - - // Execute the cursor - return this.s.topology.cursor(this.s.namespace, command, options); - } - - return handleCallback(callback, null, this.s.topology.cursor(this.s.namespace, command, options)); + return aggregate(this.s.db, this, pipeline, options, callback); }; /** diff --git a/lib/db.js b/lib/db.js index b3257f3a3b..d4a287f70d 100644 --- a/lib/db.js +++ b/lib/db.js @@ -23,6 +23,7 @@ const CONSTANTS = require('./constants'); // Operations const addUser = require('./operations/db_ops').addUser; +const aggregate = require('./operations/aggregate').aggregate; const collections = require('./operations/db_ops').collections; const createCollection = require('./operations/db_ops').createCollection; const createIndex = require('./operations/db_ops').createIndex; @@ -263,6 +264,44 @@ Db.prototype.command = function(command, options, callback) { return executeOperation(this.s.topology, executeCommand, [this, command, options, callback]); }; +/** + * Execute an aggregation framework pipeline against the database, needs MongoDB >= 3.6 + * @method + * @param {object} [pipeline=[]] Array containing all the aggregation framework commands for the execution. + * @param {object} [options] Optional settings. + * @param {(ReadPreference|string)} [options.readPreference] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST). + * @param {object} [options.cursor] Return the query as cursor, on 2.6 > it returns as a real cursor on pre 2.6 it returns as an emulated cursor. + * @param {number} [options.cursor.batchSize] The batchSize for the cursor + * @param {boolean} [options.explain=false] Explain returns the aggregation execution plan (requires mongodb 2.6 >). + * @param {boolean} [options.allowDiskUse=false] allowDiskUse lets the server know if it can use disk to store temporary results for the aggregation (requires mongodb 2.6 >). + * @param {number} [options.maxTimeMS] maxTimeMS specifies a cumulative time limit in milliseconds for processing operations on the cursor. MongoDB interrupts the operation at the earliest following interrupt point. + * @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher. + * @param {boolean} [options.raw=false] Return document results as raw BSON buffers. + * @param {boolean} [options.promoteLongs=true] Promotes Long values to number if they fit inside the 53 bits resolution. + * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types. + * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers. + * @param {object} [options.collation] Specify collation (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields). + * @param {string} [options.comment] Add a comment to an aggregation command + * @param {string|object} [options.hint] Add an index selection hint to an aggregation command + * @param {ClientSession} [options.session] optional session to use for this operation + * @param {Database~aggregationCallback} callback The command result callback + * @return {(null|AggregationCursor)} + */ +Db.prototype.aggregate = function(pipeline, options, callback) { + if (typeof options === 'function') { + callback = options; + options = {}; + } + + // If we have no options or callback we are doing + // a cursor based aggregation + if (options == null && callback == null) { + options = {}; + } + + return aggregate(this, '1', pipeline, options, callback); +}; + /** * Return the Admin db instance * @method @@ -281,6 +320,13 @@ Db.prototype.admin = function() { * @param {Collection} collection The collection instance. */ +/** + * The callback format for an aggregation call + * @callback Database~aggregationCallback + * @param {MongoError} error An error instance representing the error during the execution. + * @param {AggregationCursor} cursor The cursor if the aggregation command was executed successfully. + */ + const collectionKeys = [ 'pkFactory', 'readPreference', diff --git a/lib/operations/aggregate.js b/lib/operations/aggregate.js new file mode 100644 index 0000000000..28405bd788 --- /dev/null +++ b/lib/operations/aggregate.js @@ -0,0 +1,121 @@ +'use strict'; + +const AggregationCursor = require('../aggregation_cursor'); +const applyWriteConcern = require('../utils').applyWriteConcern; +const decorateWithCollation = require('../utils').decorateWithCollation; +const decorateWithReadConcern = require('../utils').decorateWithReadConcern; +const handleCallback = require('../utils').handleCallback; +const MongoError = require('mongodb-core').MongoError; +const resolveReadPreference = require('../utils').resolveReadPreference; +const toError = require('../utils').toError; + +const DB_AGGREGATE_COLLECTION = 1; + +/** + * Perform an aggregate operation. See Collection.prototype.aggregate or Db.prototype.aggregate for more information. + * + * @method + * @param {Db} db A Db instance. + * @param {Collection|string} coll A collection instance or the string '1', used for db.aggregate. + * @param {object} [pipeline=[]] Array containing all the aggregation framework commands for the execution. + * @param {object} [options] Optional settings. See Collection.prototype.aggregate or Db.prototype.aggregate for a list of options. + * @param {Db~aggregationCallback|Collection~aggregationCallback} callback The command result callback + */ +function aggregate(db, coll, pipeline, options, callback) { + const isDbAggregate = typeof coll === 'string'; + const target = isDbAggregate ? db : coll; + const topology = target.s.topology; + let ignoreReadConcern = false; + + if (typeof options.out === 'string') { + pipeline = pipeline.concat({ $out: options.out }); + ignoreReadConcern = true; + } else if (pipeline.length > 0 && pipeline[pipeline.length - 1]['$out']) { + ignoreReadConcern = true; + } + + let command; + let namespace; + let optionSources; + + if (isDbAggregate) { + command = { aggregate: DB_AGGREGATE_COLLECTION, pipeline: pipeline }; + namespace = `${db.s.databaseName}.${DB_AGGREGATE_COLLECTION}`; + + optionSources = { db }; + } else { + command = { aggregate: coll.s.name, pipeline: pipeline }; + namespace = coll.s.namespace; + + optionSources = { db: coll.s.db, collection: coll }; + } + + const takesWriteConcern = topology.capabilities().commandsTakeWriteConcern; + + if (!ignoreReadConcern) { + decorateWithReadConcern(command, target, options); + } + + if (pipeline.length > 0 && pipeline[pipeline.length - 1]['$out'] && takesWriteConcern) { + applyWriteConcern(command, optionSources, options); + } + + try { + decorateWithCollation(command, target, options); + } catch (err) { + if (typeof callback === 'function') return callback(err, null); + throw err; + } + + if (options.bypassDocumentValidation === true) { + command.bypassDocumentValidation = options.bypassDocumentValidation; + } + + if (typeof options.allowDiskUse === 'boolean') command.allowDiskUse = options.allowDiskUse; + if (typeof options.maxTimeMS === 'number') command.maxTimeMS = options.maxTimeMS; + + if (options.hint) command.hint = options.hint; + + options = Object.assign({}, options); + + // Ensure we have the right read preference inheritance + options.readPreference = resolveReadPreference(options, optionSources); + + if (options.explain) { + if (command.readConcern || command.writeConcern) { + throw toError('"explain" cannot be used on an aggregate call with readConcern/writeConcern'); + } + command.explain = options.explain; + } + + if (typeof options.comment === 'string') command.comment = options.comment; + + // Validate that cursor options is valid + if (options.cursor != null && typeof options.cursor !== 'object') { + throw toError('cursor options must be an object'); + } + + options.cursor = options.cursor || {}; + if (options.batchSize) options.cursor.batchSize = options.batchSize; + command.cursor = options.cursor; + + // promiseLibrary + options.promiseLibrary = target.s.promiseLibrary; + + // Set the AggregationCursor constructor + options.cursorFactory = AggregationCursor; + + if (typeof callback !== 'function') { + if (!topology.capabilities()) { + throw new MongoError('cannot connect to server'); + } + + return topology.cursor(namespace, command, options); + } + + return handleCallback(callback, null, topology.cursor(namespace, command, options)); +} + +module.exports = { + aggregate +}; diff --git a/test/functional/aggregation_tests.js b/test/functional/aggregation_tests.js index f4677ea28a..5a9866d3f4 100644 --- a/test/functional/aggregation_tests.js +++ b/test/functional/aggregation_tests.js @@ -94,6 +94,42 @@ describe('Aggregation', function() { } }); + it('should correctly execute db.aggregate() with $currentOp', { + metadata: { + requires: { + mongodb: '>=3.6.0', + topology: ['single', 'replicaset', 'sharded', 'ssl', 'heap', 'wiredtiger'] + } + }, + + test: function(done) { + const client = this.configuration.newClient({ w: 1 }, { poolSize: 1 }); + + client.connect(function(err, client) { + expect(err).to.be.null; + + // get admin db for $currentOp + const db = client.db('admin'); + + db.aggregate([{ $currentOp: {} }], (err, cursor) => { + expect(err).to.be.null; + + cursor.toArray((err, result) => { + expect(err).to.be.null; + + expect(result[0].command.aggregate).to.equal(1); + expect(result[0].command.pipeline).to.eql([{ $currentOp: {} }]); + expect(result[0].command.cursor).to.deep.equal({}); + expect(result[0].command['$db']).to.equal('admin'); + + client.close(); + done(); + }); + }); + }); + } + }); + /** * Correctly call the aggregation framework using a pipeline expressed as an argument list. * diff --git a/test/functional/crud_spec_tests.js b/test/functional/crud_spec_tests.js index 9889eeb7b5..bcc3e46a44 100644 --- a/test/functional/crud_spec_tests.js +++ b/test/functional/crud_spec_tests.js @@ -3,6 +3,7 @@ const fs = require('fs'); const path = require('path'); const test = require('./shared').assert; +const expect = require('chai').expect; function findScenarios(type) { return fs @@ -14,6 +15,7 @@ function findScenarios(type) { const readScenarios = findScenarios('read'); const writeScenarios = findScenarios('write'); +const dbScenarios = findScenarios('db'); const testContext = {}; describe('CRUD spec', function() { @@ -93,6 +95,37 @@ describe('CRUD spec', function() { }); }); + describe('db', function() { + dbScenarios.forEach(scenarioData => { + const scenarioName = scenarioData[0]; + const scenario = scenarioData[1]; + scenario.name = scenarioName; + const databaseName = scenarioData[1].database_name; + + const metadata = { + requires: { + topology: ['single', 'replicaset', 'sharded'] + } + }; + + if (scenario.minServerVersion) { + metadata.requires.mongodb = `>=${scenario.minServerVersion}`; + } + + describe(scenarioName, function() { + scenario.tests.forEach(scenarioTest => { + it(scenarioTest.description, { + metadata, + test: function() { + const db = testContext.client.db(databaseName); + return executeDbAggregateTest(scenarioTest, db); + } + }); + }); + }); + }); + }); + function executeAggregateTest(scenarioTest, db, collection) { const options = {}; if (scenarioTest.operation.arguments.collation) { @@ -326,6 +359,22 @@ describe('CRUD spec', function() { }); } + function executeDbAggregateTest(scenarioTest, db) { + const options = {}; + if (scenarioTest.operation.arguments.allowDiskUse) { + options.allowDiskUse = scenarioTest.operation.arguments.allowDiskUse; + } + + const pipeline = scenarioTest.operation.arguments.pipeline; + return db + .aggregate(pipeline, options) + .toArray() + .then(results => { + expect(results).to.deep.equal(scenarioTest.outcome.result); + return Promise.resolve(); + }); + } + function executeScenario(scenario, scenarioTest, configuration, context) { const collection = context.db.collection( 'crud_spec_tests_' + scenario.name + '_' + scenarioTest.operation.name diff --git a/test/functional/spec/crud/README.rst b/test/functional/spec/crud/README.rst index d52f5f6a53..1d92f6f00e 100644 --- a/test/functional/spec/crud/README.rst +++ b/test/functional/spec/crud/README.rst @@ -2,33 +2,41 @@ CRUD Tests ========== +.. contents:: + +---- + +Introduction +============ + The YAML and JSON files in this directory tree are platform-independent tests -meant to exercise the translation from the API to underlying commands that -MongoDB understands. Given the variety of languages and implementations and -limited nature of a description of a test, there are a number of things that -aren't testable. For instance, none of these tests assert that maxTimeMS was -properly sent to the server. This would involve a lot of infrastructure to -define and setup. Therefore, these YAML tests are in no way a replacement for -more thorough testing. However, they can provide an initial verification of your +that drivers can use to prove their conformance to the CRUD spec. + +Given the variety of languages and implementations and limited nature of a +description of a test, there are a number of things that aren't testable. For +instance, none of these tests assert that maxTimeMS was properly sent to the +server. This would involve a lot of infrastructure to define and setup. +Therefore, these YAML tests are in no way a replacement for more thorough +testing. However, they can provide an initial verification of your implementation. +Running these integration tests will require a running MongoDB server or +cluster with server versions 2.6.0 or later. Some tests have specific server +version requirements as noted by ``minServerVersion`` and ``maxServerVersion``. + Version ======= Files in the "specifications" repository have no version scheme. They are not -tied to a MongoDB server version, and it is our intention that each -specification moves from "draft" to "final" with no further revisions; it is -superseded by a future spec, not revised. - -However, implementers must have stable sets of tests to target. As test files -evolve they will occasionally be tagged like "crud-tests-YYYY-MM-DD", until the -spec is final. +tied to a MongoDB server version. Format ====== Each YAML file has the following keys: +- ``database_name`` (optional): The database to use for testing. + - ``data``: The data that should exist in the collection under test before each test run. @@ -49,32 +57,43 @@ Each YAML file has the following keys: - ``operation``: Document describing the operation to be executed. This will have the following fields: - - ``name``: The name of the operation as defined in the specification. + - ``name``: The name of the operation as defined in the specification. The + name `db-aggregate` refers to database-level aggregation. - - ``arguments``: The names and values of arguments from the specification. + - ``object``: The name of the object to perform the operation on. Can be + "database" or "collection". Defaults to "collection" if undefined. + + - ``arguments``: The names and values of arguments from the specification. - ``outcome``: Document describing the return value and/or expected state of the collection after the operation is executed. This will have some or all of the following fields: - - ``result``: The return value from the operation. Note that some tests - specify an ``upsertedCount`` field when the server does not provide - one in the result document. In these cases, an ``upsertedCount`` field - with a value of 0 should be manually added to the document received - from the server to facilitate comparison. + - ``error``: If ``true``, the test should expect an error or exception. Note + that some drivers may report server-side errors as a write error within a + write result object. + + - ``result``: The return value from the operation. This will correspond to + an operation's result object as defined in the CRUD specification. This + field may be omitted if ``error`` is ``true``. If this field is present + and ``error`` is ``true`` (generally for multi-statement tests), the + result reports information about operations that succeeded before an + unrecoverable failure. In that case, drivers may choose to check the + result object if their BulkWriteException (or equivalent) provides access + to a write result object. - - ``collection``: + - ``collection``: - - ``name`` (optional): The name of the collection to verify. If this - isn't present then use the collection under test. + - ``name`` (optional): The name of the collection to verify. If this isn't + present then use the collection under test. - - ``data``: The data that should exist in the collection after the - operation has been run. + - ``data``: The data that should exist in the collection after the + operation has been run. -Use as integration tests -======================== +Expectations +============ -Running these as integration tests will require a running mongod server. Each of -these tests is valid against a standalone mongod, a replica set, and a sharded -system for server version 3.0 and later. Many of them will run against 2.6, but -some will require conditional code. +Expected results for some tests may include optional fields, such as +``insertedId`` (for InsertOneResult), ``insertedIds`` (for InsertManyResult), +and ``upsertedCount`` (for UpdateResult). Drivers that do not implement these +fields can ignore them. diff --git a/test/functional/spec/crud/db/db-aggregate.json b/test/functional/spec/crud/db/db-aggregate.json new file mode 100644 index 0000000000..bd5d60c032 --- /dev/null +++ b/test/functional/spec/crud/db/db-aggregate.json @@ -0,0 +1,149 @@ +{ + "database_name": "admin", + "data": [], + "minServerVersion": "3.6", + "tests": [ + { + "description": "Aggregate with $currentOp", + "operation": { + "name": "aggregate", + "object": "database", + "arguments": { + "pipeline": [ + { + "$currentOp": { + "allUsers": false, + "idleConnections": false + } + }, + { + "$match": { + "command.aggregate": { + "$eq": 1 + } + } + }, + { + "$project": { + "command": 1 + } + }, + { + "$project": { + "command.lsid": 0 + } + } + ] + } + }, + "outcome": { + "result": [ + { + "command": { + "aggregate": 1, + "pipeline": [ + { + "$currentOp": { + "allUsers": false, + "idleConnections": false + } + }, + { + "$match": { + "command.aggregate": { + "$eq": 1 + } + } + }, + { + "$project": { + "command": 1 + } + }, + { + "$project": { + "command.lsid": 0 + } + } + ], + "cursor": {}, + "$db": "admin" + } + } + ] + } + }, + { + "description": "Aggregate with $currentOp and allowDiskUse", + "operation": { + "name": "aggregate", + "object": "database", + "arguments": { + "pipeline": [ + { + "$currentOp": { + "allUsers": true, + "idleConnections": true + } + }, + { + "$match": { + "command.aggregate": { + "$eq": 1 + } + } + }, + { + "$project": { + "command": 1 + } + }, + { + "$project": { + "command.lsid": 0 + } + } + ], + "allowDiskUse": true + } + }, + "outcome": { + "result": [ + { + "command": { + "aggregate": 1, + "pipeline": [ + { + "$currentOp": { + "allUsers": true, + "idleConnections": true + } + }, + { + "$match": { + "command.aggregate": { + "$eq": 1 + } + } + }, + { + "$project": { + "command": 1 + } + }, + { + "$project": { + "command.lsid": 0 + } + } + ], + "allowDiskUse": true, + "cursor": {}, + "$db": "admin" + } + } + ] + } + } + ] +} diff --git a/test/functional/spec/crud/db/db-aggregate.yml b/test/functional/spec/crud/db/db-aggregate.yml new file mode 100644 index 0000000000..43237a8ba3 --- /dev/null +++ b/test/functional/spec/crud/db/db-aggregate.yml @@ -0,0 +1,60 @@ +database_name: &database_name "admin" + +data: [] +minServerVersion: '3.6' +tests: + - + description: "Aggregate with $currentOp" + operation: + name: aggregate + object: database + arguments: + pipeline: + - $currentOp: {allUsers: false, idleConnections: false} + - $match: + command.aggregate: {$eq: 1} + - $project: {command: 1} + - $project: {command.lsid: 0} + + outcome: + result: + - + command: + aggregate: 1 + pipeline: + - $currentOp: {allUsers: false, idleConnections: false} + - $match: + command.aggregate: {$eq: 1} + - $project: {command: 1} + - $project: {command.lsid: 0} + cursor: {} + $db: "admin" + + - + description: "Aggregate with $currentOp and allowDiskUse" + operation: + name: aggregate + object: database + arguments: + pipeline: + - $currentOp: {allUsers: true, idleConnections: true} + - $match: + command.aggregate: {$eq: 1} + - $project: {command: 1} + - $project: {command.lsid: 0} + allowDiskUse: true + + outcome: + result: + - + command: + aggregate: 1 + pipeline: + - $currentOp: {allUsers: true, idleConnections: true} + - $match: + command.aggregate: {$eq: 1} + - $project: {command: 1} + - $project: {command.lsid: 0} + allowDiskUse: true + cursor: {} + $db: "admin"