Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-6367): enable mixed use of iteration APIs #4231

Merged
merged 15 commits into from
Sep 12, 2024
2 changes: 1 addition & 1 deletion src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ export abstract class AbstractCursor<
}

async *[Symbol.asyncIterator](): AsyncGenerator<TSchema, void, void> {
if (this.isClosed) {
if (this.closed) {
return;
}

Expand Down
344 changes: 344 additions & 0 deletions test/integration/crud/find_cursor_methods.test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';
const { expect } = require('chai');
const { filterForCommands } = require('../shared');
const { promiseWithResolvers, MongoCursorExhaustedError } = require('../../mongodb');

describe('Find Cursor', function () {
let client;
Expand Down Expand Up @@ -361,4 +362,347 @@ describe('Find Cursor', function () {
}
});
});

describe('mixing iteration APIs', function () {
let client;
let collection;
let cursor;

beforeEach(async function () {
client = this.configuration.newClient();
await client.connect();
collection = client.db('next-symbolasynciterator').collection('bar');
await collection.deleteMany({}, { writeConcern: { w: 'majority' } });
await collection.insertMany([{ a: 1 }, { a: 2 }], { writeConcern: { w: 'majority' } });
});

afterEach(async function () {
await cursor.close();
await client.close();
});

context('when all documents are retrieved in the first batch', function () {
it('allows combining iteration modes', async function () {
let count = 0;
cursor = collection.find().map(doc => {
count++;
return doc;
});

await cursor.next();
// eslint-disable-next-line no-unused-vars
for await (const _ of cursor) {
/* empty */
}

expect(count).to.equal(2);
});

it('works with next + next() loop', async function () {
let count = 0;
cursor = collection.find().map(doc => {
count++;
return doc;
});

await cursor.next();

let doc;
while ((doc = (await cursor.next()) && doc != null)) {
/** empty */
}

expect(count).to.equal(2);
});

context('when next() is called in a loop after a single invocation', function () {
it('iterates over all documents', async function () {
let count = 0;
cursor = collection.find({}).map(doc => {
count++;
return doc;
});

await cursor.next();

let doc;
while ((doc = (await cursor.next()) && doc != null)) {
/** empty */
}

expect(count).to.equal(2);
});
});

context(
'when cursor.next() is called after cursor.stream() is partially iterated',
function () {
it('returns null', async function () {
cursor = collection.find({});

const stream = cursor.stream();
const { promise, resolve, reject } = promiseWithResolvers();

stream.once('data', v => {
resolve(v);
});

stream.once('error', v => {
reject(v);
});
await promise;

expect(await cursor.next()).to.be.null;
});
}
);

context('when cursor.tryNext() is called after cursor.stream()', function () {
it('returns null', async function () {
cursor = collection.find({});

const stream = cursor.stream();
const { promise, resolve, reject } = promiseWithResolvers();

stream.once('data', v => {
resolve(v);
});

stream.once('error', v => {
reject(v);
});
await promise;

expect(await cursor.tryNext()).to.be.null;
});
});

context(
'when cursor.[Symbol.asyncIterator] is called after cursor.stream() is partly iterated',
function () {
it('returns an empty iterator', async function () {
cursor = collection.find({});

const stream = cursor.stream();
const { promise, resolve, reject } = promiseWithResolvers();

stream.once('data', v => {
resolve(v);
});

stream.once('error', v => {
reject(v);
});
await promise;

let count = 0;
// eslint-disable-next-line no-unused-vars
for await (const _ of cursor) {
count++;
}

expect(count).to.equal(0);
});
}
);

context('when cursor.readBufferedDocuments() is called after cursor.next()', function () {
it('returns an array with remaining buffered documents', async function () {
cursor = collection.find({});

await cursor.next();
const docs = cursor.readBufferedDocuments();

expect(docs).to.have.lengthOf(1);
});
});

context('when cursor.next() is called after cursor.toArray()', function () {
it('returns null', async function () {
cursor = collection.find({});

await cursor.toArray();
expect(await cursor.next()).to.be.null;
});
});

context('when cursor.tryNext is called after cursor.toArray()', function () {
it('returns null', async function () {
cursor = collection.find({});

await cursor.toArray();
expect(await cursor.tryNext()).to.be.null;
});
});

context('when cursor.[Symbol.asyncIterator] is called after cursor.toArray()', function () {
it('should not iterate', async function () {
cursor = collection.find({});

await cursor.toArray();
// eslint-disable-next-line no-unused-vars
for await (const _ of cursor) {
expect.fail('should not iterate');
}
});
});

context('when cursor.readBufferedDocuments() is called after cursor.toArray()', function () {
it('return and empty array', async function () {
cursor = collection.find({});

await cursor.toArray();
expect(cursor.readBufferedDocuments()).to.have.lengthOf(0);
});
});

context('when cursor.stream() is called after cursor.toArray()', function () {
it('returns an empty stream', async function () {
cursor = collection.find({});
await cursor.toArray();

const s = cursor.stream();
const { promise, resolve, reject } = promiseWithResolvers();

s.once('data', d => {
reject(d);
});

s.once('end', d => {
resolve(d);
});

expect(await promise).to.be.undefined;
});
});
});

context('when there are documents that are not retrieved in the first batch', function () {
it('allows combining next() and for await syntax', async function () {
let count = 0;
cursor = collection.find({}, { batchSize: 1 }).map(doc => {
count++;
return doc;
});

await cursor.next();
// eslint-disable-next-line no-unused-vars
for await (const _ of cursor) {
/* empty */
}

expect(count).to.equal(2);
});

context(
'when a cursor is partially iterated with for await and then .next() is called',
function () {
it('throws a MongoCursorExhaustedError', async function () {
cursor = collection.find({}, { batchSize: 1 });

// eslint-disable-next-line no-unused-vars
for await (const _ of cursor) {
/* empty */
break;
}

const maybeError = await cursor.next().then(
() => null,
e => e
);
expect(maybeError).to.be.instanceof(MongoCursorExhaustedError);
});
}
);

context('when next() is called in a loop after a single invocation', function () {
it('iterates over all documents', async function () {
let count = 0;
cursor = collection.find({}, { batchSize: 1 }).map(doc => {
count++;
return doc;
});

await cursor.next();

let doc;
while ((doc = (await cursor.next()) && doc != null)) {
/** empty */
}

expect(count).to.equal(2);
});
});

context('when cursor.readBufferedDocuments() is called after cursor.next()', function () {
it('returns an empty array', async function () {
cursor = collection.find({}, { batchSize: 1 });

await cursor.next();
const docs = cursor.readBufferedDocuments();

expect(docs).to.have.lengthOf(0);
});
});

context('when cursor.next() is called after cursor.toArray()', function () {
it('returns null', async function () {
cursor = collection.find({}, { batchSize: 1 });

await cursor.toArray();
expect(await cursor.next()).to.be.null;
});
});

context('when cursor.tryNext is called after cursor.toArray()', function () {
it('returns null', async function () {
cursor = collection.find({}, { batchSize: 1 });

await cursor.toArray();
expect(await cursor.tryNext()).to.be.null;
});
});

context('when cursor.[Symbol.asyncIterator] is called after cursor.toArray()', function () {
it('should not iterate', async function () {
cursor = collection.find({}, { batchSize: 1 });

await cursor.toArray();
// eslint-disable-next-line no-unused-vars
for await (const _ of cursor) {
expect.fail('should not iterate');
}
});
});

context('when cursor.readBufferedDocuments() is called after cursor.toArray()', function () {
it('return and empty array', async function () {
cursor = collection.find({}, { batchSize: 1 });

await cursor.toArray();
expect(cursor.readBufferedDocuments()).to.have.lengthOf(0);
});
});

context('when cursor.stream() is called after cursor.toArray()', function () {
it('returns an empty stream', async function () {
cursor = collection.find({}, { batchSize: 1 });
await cursor.toArray();

const s = cursor.stream();
const { promise, resolve, reject } = promiseWithResolvers();

s.once('data', d => {
reject(d);
});

s.once('end', d => {
resolve(d);
});

expect(await promise).to.be.undefined;
});
});
});
});
});