Skip to content

Commit

Permalink
fix: fix context loss when cursor are accesed concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
david-luna committed Oct 6, 2023
1 parent 4503d3e commit 12d980a
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { AsyncResource } from 'async_hooks';

import {
context,
Expand Down Expand Up @@ -42,6 +43,7 @@ import {
MongoInternalTopology,
WireProtocolInternal,
V4Connection,
V4ConnectionPool,
} from './internal-types';
import { V4Connect, V4Session } from './internal-types';
import { VERSION } from './version';
Expand Down Expand Up @@ -76,6 +78,8 @@ export class MongoDBInstrumentation extends InstrumentationBase {
const { v4PatchConnect, v4UnpatchConnect } = this._getV4ConnectPatches();
const { v4PatchConnection, v4UnpatchConnection } =
this._getV4ConnectionPatches();
const { v4PatchConnectionPool, v4UnpatchConnectionPool } =
this._getV4ConnectionPoolPatches();
const { v4PatchSessions, v4UnpatchSessions } = this._getV4SessionsPatches();

return [
Expand Down Expand Up @@ -105,6 +109,12 @@ export class MongoDBInstrumentation extends InstrumentationBase {
v4PatchConnection,
v4UnpatchConnection
),
new InstrumentationNodeModuleFile<V4ConnectionPool>(
'mongodb/lib/cmap/connection_pool.js',
['4.*', '5.*'],
v4PatchConnectionPool,
v4UnpatchConnectionPool
),
new InstrumentationNodeModuleFile<V4Connect>(
'mongodb/lib/cmap/connect.js',
['4.*', '5.*'],
Expand Down Expand Up @@ -268,6 +278,32 @@ export class MongoDBInstrumentation extends InstrumentationBase {
};
}

private _getV4ConnectionPoolPatches<T extends V4ConnectionPool>() {
return {
v4PatchConnectionPool: (moduleExports: any, moduleVersion?: string) => {
diag.debug(`Applying patch for mongodb@${moduleVersion}`);
const poolPrototype = moduleExports.ConnectionPool.prototype;

if (isWrapped(poolPrototype.checkOut)) {
this._unwrap(poolPrototype, 'checkOut');
}

this._wrap(
poolPrototype,
'checkOut',
this._getV4ConnectionPoolCheckOut()
);
return moduleExports;
},
v4UnpatchConnectionPool: (moduleExports?: any, moduleVersion?: string) => {
diag.debug(`Removing internal patch for mongodb@${moduleVersion}`);
if (moduleExports === undefined) return;

this._unwrap(moduleExports.ConnectionPool.prototype, 'checkOut');
},
};
}

private _getV4ConnectPatches<T extends V4Connect>() {
return {
v4PatchConnect: (moduleExports: any, moduleVersion?: string) => {
Expand All @@ -288,6 +324,15 @@ export class MongoDBInstrumentation extends InstrumentationBase {
};
}

private _getV4ConnectionPoolCheckOut() {
return (original: V4ConnectionPool['checkOut']) => {
return function patchedCheckout(this: unknown, callback: any) {
const patchedCallback = AsyncResource.bind(callback);
return original.call(this, patchedCallback);
};
};
}

private _getV4ConnectCommand() {
const instrumentation = this;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,13 @@ export type V4Connection = {
): void;
};

// https://github.com/mongodb/node-mongodb-native/blob/v4.2.2/src/cmap/connection_pool.ts
export type V4ConnectionPool = {
// Instrumentation jsut cares about carrying the async context so
// types of callback params are not needed
checkOut: (callback: (error: any, connection: any) => void) => void;
};

// https://github.com/mongodb/node-mongodb-native/blob/v4.2.2/src/cmap/connect.ts
export type V4Connect = {
connect: (options: any, callback: any) => void;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,42 @@ describe('MongoDBInstrumentation-Tracing-v4', () => {
});
});
});

it('should create child spans for concurrent cursor operations', done => {
const queries = [{ a: 1 }, { a: 2 }, { a: 3 }];
const tasks = queries.map((query, idx) => {
return new Promise((resolve, reject) => {
process.nextTick(() => {
const span = trace.getTracer('default').startSpan(`findRootSpan ${idx}`);
context.with(trace.setSpan(context.active(), span), () => {
collection
.find(query)
.toArray()
.then(() => {
resolve(span.end());
})
.catch(reject);
});
});
});
});

Promise.all(tasks)
.then(() => {
const spans = getTestSpans();
const roots = spans.filter(s => s.name.startsWith('findRootSpan'));

roots.forEach(root => {
const rootId = root.spanContext().spanId;
const children = spans.filter(s => s.parentSpanId === rootId);
assert.strictEqual(children.length, 1);
})
done();
})
.catch(err => {
done(err);
})
});
});

/** Should intercept command */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,42 @@ describe('MongoDBInstrumentation-Tracing-v5', () => {
});
});
});

it('should create child spans for concurrent cursor operations', done => {
const queries = [{ a: 1 }, { a: 2 }, { a: 3 }];
const tasks = queries.map((query, idx) => {
return new Promise((resolve, reject) => {
process.nextTick(() => {
const span = trace.getTracer('default').startSpan(`findRootSpan ${idx}`);
context.with(trace.setSpan(context.active(), span), () => {
collection
.find(query)
.toArray()
.then(() => {
resolve(span.end());
})
.catch(reject);
});
});
});
});

Promise.all(tasks)
.then(() => {
const spans = getTestSpans();
const roots = spans.filter(s => s.name.startsWith('findRootSpan'));

roots.forEach(root => {
const rootId = root.spanContext().spanId;
const children = spans.filter(s => s.parentSpanId === rootId);
assert.strictEqual(children.length, 1);
})
done();
})
.catch(err => {
done(err);
})
});
});

/** Should intercept command */
Expand Down

0 comments on commit 12d980a

Please sign in to comment.