Skip to content

Commit

Permalink
Merge pull request #11488 from Uzlopak/chore-extract-and-refactor-agg…
Browse files Browse the repository at this point in the history
…regationcursor-and-querycursor

chore: extract and refactor aggregationcursor and querycursor
  • Loading branch information
vkarpov15 authored Mar 9, 2022
2 parents da0f4bd + 8d69a62 commit 3441ba0
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 85 deletions.
7 changes: 7 additions & 0 deletions test/types/aggregate.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ async function run() {
obj.name;
}

function eachAsync(): void {
Test.aggregate().cursor().eachAsync((doc) => {expectType<any>(doc);});
Test.aggregate().cursor().eachAsync((docs) => {expectType<any[]>(docs);}, { batchSize: 2 });
Test.aggregate().cursor<ITest>().eachAsync((doc) => {expectType<ITest>(doc);});
Test.aggregate().cursor<ITest>().eachAsync((docs) => {expectType<ITest[]>(docs);}, { batchSize: 2 });
}

// Aggregate.prototype.sort()
expectType<ITest[]>(await Test.aggregate<ITest>().sort('-name'));
expectType<ITest[]>(await Test.aggregate<ITest>().sort({ name: 1 }));
Expand Down
5 changes: 2 additions & 3 deletions test/types/queries.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,8 @@ function testGenericQuery(): void {
}

function eachAsync(): void {
Test.find().cursor().eachAsync((doc: ITest) => console.log(doc.name));

Test.find().cursor().eachAsync((docs: ITest[]) => console.log(docs[0].name), { batchSize: 2 });
Test.find().cursor().eachAsync((doc) => {expectType<(ITest & { _id: any; })>(doc);});
Test.find().cursor().eachAsync((docs) => {expectType<(ITest & { _id: any; })[]>(docs);}, { batchSize: 2 });
}

async function gh10617(): Promise<void> {
Expand Down
48 changes: 48 additions & 0 deletions types/cursor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import stream = require('stream');

declare module 'mongoose' {
type CursorFlag = 'tailable' | 'oplogReplay' | 'noCursorTimeout' | 'awaitData' | 'partial';

class Cursor<DocType = any, Options = never> extends stream.Readable {
[Symbol.asyncIterator](): AsyncIterableIterator<DocType>;

/**
* Adds a [cursor flag](http://mongodb.github.io/node-mongodb-native/2.2/api/Cursor.html#addCursorFlag).
* Useful for setting the `noCursorTimeout` and `tailable` flags.
*/
addCursorFlag(flag: CursorFlag, value: boolean): this;

/**
* Marks this cursor as closed. Will stop streaming and subsequent calls to
* `next()` will error.
*/
close(callback: CallbackWithoutResult): void;
close(): Promise<void>;

/**
* Execute `fn` for every document(s) in the cursor. If batchSize is provided
* `fn` will be executed for each batch of documents. If `fn` returns a promise,
* will wait for the promise to resolve before iterating on to the next one.
* Returns a promise that resolves when done.
*/
eachAsync(fn: (doc: DocType[]) => any, options: { parallel?: number, batchSize: number }, callback: CallbackWithoutResult): void;
eachAsync(fn: (doc: DocType) => any, options: { parallel?: number }, callback: CallbackWithoutResult): void;
eachAsync(fn: (doc: DocType[]) => any, options: { parallel?: number, batchSize: number }): Promise<void>;
eachAsync(fn: (doc: DocType) => any, options?: { parallel?: number }): Promise<void>;

/**
* Registers a transform function which subsequently maps documents retrieved
* via the streams interface or `.next()`
*/
map<ResultType>(fn: (res: DocType) => ResultType): Cursor<ResultType, Options>;

/**
* Get the next document from this cursor. Will return `null` when there are
* no documents left.
*/
next(callback: Callback<DocType | null>): void;
next(): Promise<DocType>;

options: Options;
}
}
85 changes: 3 additions & 82 deletions types/index.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/// <reference path="./connection.d.ts" />
/// <reference path="./cursor.ts" />
/// <reference path="./error.d.ts" />
/// <reference path="./pipelinestage.d.ts" />
/// <reference path="./schemaoptions.d.ts" />
Expand Down Expand Up @@ -1894,7 +1895,7 @@ declare module 'mongoose' {
* Returns a wrapper around a [mongodb driver cursor](http://mongodb.github.io/node-mongodb-native/2.1/api/Cursor.html).
* A QueryCursor exposes a Streams3 interface, as well as a `.next()` function.
*/
cursor(options?: any): QueryCursor<DocType>;
cursor(options?: QueryOptions): Cursor<DocType, QueryOptions>;

/**
* Declare and/or execute this query as a `deleteMany()` operation. Works like
Expand Down Expand Up @@ -2459,49 +2460,6 @@ declare module 'mongoose' {
T extends Document ? RawDocType :
T;

class QueryCursor<DocType> extends stream.Readable {
[Symbol.asyncIterator](): AsyncIterableIterator<DocType>;

/**
* Adds a [cursor flag](http://mongodb.github.io/node-mongodb-native/2.2/api/Cursor.html#addCursorFlag).
* Useful for setting the `noCursorTimeout` and `tailable` flags.
*/
addCursorFlag(flag: string, value: boolean): this;

/**
* Marks this cursor as closed. Will stop streaming and subsequent calls to
* `next()` will error.
*/
close(): Promise<void>;
close(callback: CallbackWithoutResult): void;

/**
* Execute `fn` for every document(s) in the cursor. If batchSize is provided
* `fn` will be executed for each batch of documents. If `fn` returns a promise,
* will wait for the promise to resolve before iterating on to the next one.
* Returns a promise that resolves when done.
*/
eachAsync(fn: (doc: DocType) => any, options?: { parallel?: number }): Promise<void>;
eachAsync(fn: (doc: DocType[]) => any, options: { parallel?: number, batchSize: number }): Promise<void>;
eachAsync(fn: (doc: DocType) => any, options?: { parallel?: number, batchSize?: number }, cb?: CallbackWithoutResult): void;
eachAsync(fn: (doc: DocType[]) => any, options: { parallel?: number, batchSize: number }, cb?: CallbackWithoutResult): void;

/**
* Registers a transform function which subsequently maps documents retrieved
* via the streams interface or `.next()`
*/
map<ResultType>(fn: (res: DocType) => ResultType): QueryCursor<ResultType>;

/**
* Get the next document from this cursor. Will return `null` when there are
* no documents left.
*/
next(): Promise<DocType>;
next(callback: Callback<DocType | null>): void;

options: any;
}

class Aggregate<R> {
/**
* Returns an asyncIterator for use with [`for/await/of` loops](https://thecodebarbarian.com/getting-started-with-async-iterators-in-node-js
Expand Down Expand Up @@ -2543,7 +2501,7 @@ declare module 'mongoose' {
/**
* Sets the cursor option for the aggregation query (ignored for < 2.6.0).
*/
cursor(options?: Record<string, unknown>): AggregationCursor;
cursor<DocType = any>(options?: Record<string, unknown>): Cursor<DocType>;

/** Executes the aggregate pipeline on the currently bound Model. */
exec(callback?: Callback<R>): Promise<R>;
Expand Down Expand Up @@ -2649,43 +2607,6 @@ declare module 'mongoose' {
unwind(...args: PipelineStage.Unwind['$unwind'][]): this;
}

class AggregationCursor extends stream.Readable {
/**
* Adds a [cursor flag](http://mongodb.github.io/node-mongodb-native/2.2/api/Cursor.html#addCursorFlag).
* Useful for setting the `noCursorTimeout` and `tailable` flags.
*/
addCursorFlag(flag: string, value: boolean): this;

/**
* Marks this cursor as closed. Will stop streaming and subsequent calls to
* `next()` will error.
*/
close(): Promise<void>;
close(callback: CallbackWithoutResult): void;

/**
* Execute `fn` for every document(s) in the cursor. If batchSize is provided
* `fn` will be executed for each batch of documents. If `fn` returns a promise,
* will wait for the promise to resolve before iterating on to the next one.
* Returns a promise that resolves when done.
*/
eachAsync(fn: (doc: any) => any, options?: { parallel?: number, batchSize?: number }): Promise<void>;
eachAsync(fn: (doc: any) => any, options?: { parallel?: number, batchSize?: number }, cb?: CallbackWithoutResult): void;

/**
* Registers a transform function which subsequently maps documents retrieved
* via the streams interface or `.next()`
*/
map(fn: (res: any) => any): this;

/**
* Get the next document from this cursor. Will return `null` when there are
* no documents left.
*/
next(): Promise<any>;
next(callback: Callback): void;
}

class SchemaType {
/** SchemaType constructor */
constructor(path: string, options?: AnyObject, instance?: string);
Expand Down

0 comments on commit 3441ba0

Please sign in to comment.