Skip to content

Commit

Permalink
refactor(NODE-5912): make server.command an async function (#3986)
Browse files Browse the repository at this point in the history
Co-authored-by: Neal Beeken <[email protected]>
Co-authored-by: Alena Khineika <[email protected]>
  • Loading branch information
3 people authored Feb 23, 2024
1 parent 17952d2 commit ff8b5f5
Show file tree
Hide file tree
Showing 25 changed files with 265 additions and 507 deletions.
140 changes: 36 additions & 104 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ import {
} from '../constants';
import {
type AnyError,
MONGODB_ERROR_CODES,
MongoError,
type MongoError,
MongoInvalidArgumentError,
MongoMissingCredentialsError,
MongoNetworkError,
Expand All @@ -27,7 +26,14 @@ import {
} from '../error';
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { Server } from '../sdam/server';
import { type Callback, eachAsync, List, makeCounter, TimeoutController } from '../utils';
import {
type Callback,
eachAsync,
List,
makeCounter,
promiseWithResolvers,
TimeoutController
} from '../utils';
import { connect } from './connect';
import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection';
import {
Expand Down Expand Up @@ -100,7 +106,8 @@ export interface ConnectionPoolOptions extends Omit<ConnectionOptions, 'id' | 'g

/** @internal */
export interface WaitQueueMember {
callback: Callback<Connection>;
resolve: (conn: Connection) => void;
reject: (err: AnyError) => void;
timeoutController: TimeoutController;
[kCancelled]?: boolean;
}
Expand Down Expand Up @@ -350,16 +357,18 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
* will be held by the pool. This means that if a connection is checked out it MUST be checked back in or
* explicitly destroyed by the new owner.
*/
checkOut(callback: Callback<Connection>): void {
async checkOut(): Promise<Connection> {
this.emitAndLog(
ConnectionPool.CONNECTION_CHECK_OUT_STARTED,
new ConnectionCheckOutStartedEvent(this)
);

const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;

const { promise, resolve, reject } = promiseWithResolvers<Connection>();
const waitQueueMember: WaitQueueMember = {
callback,
resolve,
reject,
timeoutController: new TimeoutController(waitQueueTimeoutMS)
};
waitQueueMember.timeoutController.signal.addEventListener('abort', () => {
Expand All @@ -370,7 +379,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, 'timeout')
);
waitQueueMember.callback(
waitQueueMember.reject(
new WaitQueueTimeoutError(
this.loadBalanced
? this.waitQueueErrorMetrics()
Expand All @@ -382,6 +391,8 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {

this[kWaitQueue].push(waitQueueMember);
process.nextTick(() => this.processWaitQueue());

return promise;
}

/**
Expand Down Expand Up @@ -534,115 +545,35 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
}

/**
* Runs a lambda with an implicitly checked out connection, checking that connection back in when the lambda
* has completed by calling back.
*
* NOTE: please note the required signature of `fn`
*
* @remarks When in load balancer mode, connections can be pinned to cursors or transactions.
* In these cases we pass the connection in to this method to ensure it is used and a new
* connection is not checked out.
*
* @param conn - A pinned connection for use in load balancing mode.
* @param fn - A function which operates on a managed connection
* @param callback - The original callback
*/
withConnection(
conn: Connection | undefined,
fn: WithConnectionCallback,
callback: Callback<Connection>
): void {
if (conn) {
// use the provided connection, and do _not_ check it in after execution
fn(undefined, conn, (fnErr, result) => {
if (fnErr) {
return this.withReauthentication(fnErr, conn, fn, callback);
}
callback(undefined, result);
});
return;
}

this.checkOut((err, conn) => {
// don't callback with `err` here, we might want to act upon it inside `fn`
fn(err as MongoError, conn, (fnErr, result) => {
if (fnErr) {
if (conn) {
this.withReauthentication(fnErr, conn, fn, callback);
} else {
callback(fnErr);
}
} else {
callback(undefined, result);
}

if (conn) {
this.checkIn(conn);
}
});
});
}

private withReauthentication(
fnErr: AnyError,
conn: Connection,
fn: WithConnectionCallback,
callback: Callback<Connection>
) {
if (fnErr instanceof MongoError && fnErr.code === MONGODB_ERROR_CODES.Reauthenticate) {
this.reauthenticate(conn, fn, (error, res) => {
if (error) {
return callback(error);
}
callback(undefined, res);
});
} else {
callback(fnErr);
}
}

/**
* Reauthenticate on the same connection and then retry the operation.
* @internal
* Reauthenticate a connection
*/
private reauthenticate(
connection: Connection,
fn: WithConnectionCallback,
callback: Callback
): void {
async reauthenticate(connection: Connection): Promise<void> {
const authContext = connection.authContext;
if (!authContext) {
return callback(new MongoRuntimeError('No auth context found on connection.'));
throw new MongoRuntimeError('No auth context found on connection.');
}
const credentials = authContext.credentials;
if (!credentials) {
return callback(
new MongoMissingCredentialsError(
'Connection is missing credentials when asked to reauthenticate'
)
throw new MongoMissingCredentialsError(
'Connection is missing credentials when asked to reauthenticate'
);
}

const resolvedCredentials = credentials.resolveAuthMechanism(connection.hello);
const provider = this[kServer].topology.client.s.authProviders.getOrCreateProvider(
resolvedCredentials.mechanism
);

if (!provider) {
return callback(
new MongoMissingCredentialsError(
`Reauthenticate failed due to no auth provider for ${credentials.mechanism}`
)
throw new MongoMissingCredentialsError(
`Reauthenticate failed due to no auth provider for ${credentials.mechanism}`
);
}
provider.reauth(authContext).then(
() => {
fn(undefined, connection, (fnErr, fnResult) => {
if (fnErr) {
return callback(fnErr);
}
callback(undefined, fnResult);
});
},
error => callback(error)
);

await provider.reauth(authContext);

return;
}

/** Clear the min pool size timer */
Expand Down Expand Up @@ -841,7 +772,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
);
waitQueueMember.timeoutController.clear();
this[kWaitQueue].shift();
waitQueueMember.callback(error);
waitQueueMember.reject(error);
continue;
}

Expand All @@ -863,7 +794,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
waitQueueMember.timeoutController.clear();

this[kWaitQueue].shift();
waitQueueMember.callback(undefined, connection);
waitQueueMember.resolve(connection);
}
}

Expand All @@ -889,16 +820,17 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
// TODO(NODE-5192): Remove this cast
new ConnectionCheckOutFailedEvent(this, 'connectionError', err as MongoError)
);
waitQueueMember.reject(err);
} else if (connection) {
this[kCheckedOut].add(connection);
this.emitAndLog(
ConnectionPool.CONNECTION_CHECKED_OUT,
new ConnectionCheckedOutEvent(this, connection)
);
waitQueueMember.resolve(connection);
}

waitQueueMember.timeoutController.clear();
waitQueueMember.callback(err, connection);
}
process.nextTick(() => this.processWaitQueue());
});
Expand Down
2 changes: 1 addition & 1 deletion src/operations/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,6 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
cmd = decorateWithExplain(cmd, this.explain);
}

return server.commandAsync(this.ns, cmd, options);
return server.command(this.ns, cmd, options);
}
}
2 changes: 1 addition & 1 deletion src/operations/find.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ export class FindOperation extends CommandOperation<Document> {
findCommand = decorateWithExplain(findCommand, this.explain);
}

return server.commandAsync(this.ns, findCommand, {
return server.command(this.ns, findCommand, {
...this.options,
...this.bsonOptions,
documentsReturnedIn: 'firstBatch',
Expand Down
2 changes: 1 addition & 1 deletion src/operations/get_more.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ export class GetMoreOperation extends AbstractOperation {
...this.options
};

return server.commandAsync(this.ns, getMoreCmd, commandOptions);
return server.command(this.ns, getMoreCmd, commandOptions);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operations/kill_cursors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export class KillCursorsOperation extends AbstractOperation {
cursors: [this.cursorId]
};
try {
await server.commandAsync(this.ns, killCursorsCommand, { session });
await server.command(this.ns, killCursorsCommand, { session });
} catch {
// The driver should never emit errors from killCursors, this is spec-ed behavior
}
Expand Down
4 changes: 2 additions & 2 deletions src/operations/run_command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export class RunCommandOperation<T = Document> extends AbstractOperation<T> {

override async execute(server: Server, session: ClientSession | undefined): Promise<T> {
this.server = server;
return server.commandAsync(this.ns, this.command, {
return server.command(this.ns, this.command, {
...this.options,
readPreference: this.readPreference,
session
Expand All @@ -54,7 +54,7 @@ export class RunAdminCommandOperation<T = Document> extends AbstractOperation<T>

override async execute(server: Server, session: ClientSession | undefined): Promise<T> {
this.server = server;
return server.commandAsync(this.ns, this.command, {
return server.command(this.ns, this.command, {
...this.options,
readPreference: this.readPreference,
session
Expand Down
2 changes: 1 addition & 1 deletion src/operations/search_indexes/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export class CreateSearchIndexesOperation extends AbstractOperation<string[]> {
indexes: this.descriptions
};

const res = await server.commandAsync(namespace, command, { session });
const res = await server.command(namespace, command, { session });

const indexesCreated: Array<{ name: string }> = res?.indexesCreated ?? [];
return indexesCreated.map(({ name }) => name);
Expand Down
2 changes: 1 addition & 1 deletion src/operations/search_indexes/drop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export class DropSearchIndexOperation extends AbstractOperation<void> {
}

try {
await server.commandAsync(namespace, command, { session });
await server.command(namespace, command, { session });
} catch (error) {
const isNamespaceNotFoundError =
error instanceof MongoServerError && error.code === MONGODB_ERROR_CODES.NamespaceNotFound;
Expand Down
2 changes: 1 addition & 1 deletion src/operations/search_indexes/update.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class UpdateSearchIndexOperation extends AbstractOperation<void> {
definition: this.definition
};

await server.commandAsync(namespace, command, { session });
await server.command(namespace, command, { session });
return;
}
}
Loading

0 comments on commit ff8b5f5

Please sign in to comment.