From 12d980a2b0358db77cf5a983e6956291bd74948d Mon Sep 17 00:00:00 2001 From: David Luna Date: Fri, 6 Oct 2023 16:27:12 +0200 Subject: [PATCH 1/5] fix: fix context loss when cursor are accesed concurrently --- .../src/instrumentation.ts | 45 +++++++++++++++++++ .../src/internal-types.ts | 7 +++ .../test/mongodb-v4.test.ts | 36 +++++++++++++++ .../test/mongodb-v5.test.ts | 36 +++++++++++++++ 4 files changed, 124 insertions(+) diff --git a/plugins/node/opentelemetry-instrumentation-mongodb/src/instrumentation.ts b/plugins/node/opentelemetry-instrumentation-mongodb/src/instrumentation.ts index 587c0fe185..09c36908f1 100644 --- a/plugins/node/opentelemetry-instrumentation-mongodb/src/instrumentation.ts +++ b/plugins/node/opentelemetry-instrumentation-mongodb/src/instrumentation.ts @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +import { AsyncResource } from 'async_hooks'; import { context, @@ -42,6 +43,7 @@ import { MongoInternalTopology, WireProtocolInternal, V4Connection, + V4ConnectionPool, } from './internal-types'; import { V4Connect, V4Session } from './internal-types'; import { VERSION } from './version'; @@ -76,6 +78,8 @@ export class MongoDBInstrumentation extends InstrumentationBase { const { v4PatchConnect, v4UnpatchConnect } = this._getV4ConnectPatches(); const { v4PatchConnection, v4UnpatchConnection } = this._getV4ConnectionPatches(); + const { v4PatchConnectionPool, v4UnpatchConnectionPool } = + this._getV4ConnectionPoolPatches(); const { v4PatchSessions, v4UnpatchSessions } = this._getV4SessionsPatches(); return [ @@ -105,6 +109,12 @@ export class MongoDBInstrumentation extends InstrumentationBase { v4PatchConnection, v4UnpatchConnection ), + new InstrumentationNodeModuleFile( + 'mongodb/lib/cmap/connection_pool.js', + ['4.*', '5.*'], + v4PatchConnectionPool, + v4UnpatchConnectionPool + ), new InstrumentationNodeModuleFile( 'mongodb/lib/cmap/connect.js', ['4.*', '5.*'], @@ -268,6 +278,32 @@ export class MongoDBInstrumentation extends InstrumentationBase { }; } + private _getV4ConnectionPoolPatches() { + return { + v4PatchConnectionPool: (moduleExports: any, moduleVersion?: string) => { + diag.debug(`Applying patch for mongodb@${moduleVersion}`); + const poolPrototype = moduleExports.ConnectionPool.prototype; + + if (isWrapped(poolPrototype.checkOut)) { + this._unwrap(poolPrototype, 'checkOut'); + } + + this._wrap( + poolPrototype, + 'checkOut', + this._getV4ConnectionPoolCheckOut() + ); + return moduleExports; + }, + v4UnpatchConnectionPool: (moduleExports?: any, moduleVersion?: string) => { + diag.debug(`Removing internal patch for mongodb@${moduleVersion}`); + if (moduleExports === undefined) return; + + this._unwrap(moduleExports.ConnectionPool.prototype, 'checkOut'); + }, + }; + } + private _getV4ConnectPatches() { return { v4PatchConnect: (moduleExports: any, moduleVersion?: string) => { @@ -288,6 +324,15 @@ export class MongoDBInstrumentation extends InstrumentationBase { }; } + private _getV4ConnectionPoolCheckOut() { + return (original: V4ConnectionPool['checkOut']) => { + return function patchedCheckout(this: unknown, callback: any) { + const patchedCallback = AsyncResource.bind(callback); + return original.call(this, patchedCallback); + }; + }; + } + private _getV4ConnectCommand() { const instrumentation = this; diff --git a/plugins/node/opentelemetry-instrumentation-mongodb/src/internal-types.ts b/plugins/node/opentelemetry-instrumentation-mongodb/src/internal-types.ts index 03131aa12a..38e989eafb 100644 --- a/plugins/node/opentelemetry-instrumentation-mongodb/src/internal-types.ts +++ b/plugins/node/opentelemetry-instrumentation-mongodb/src/internal-types.ts @@ -184,6 +184,13 @@ export type V4Connection = { ): void; }; +// https://github.com/mongodb/node-mongodb-native/blob/v4.2.2/src/cmap/connection_pool.ts +export type V4ConnectionPool = { + // Instrumentation jsut cares about carrying the async context so + // types of callback params are not needed + checkOut: (callback: (error: any, connection: any) => void) => void; +}; + // https://github.com/mongodb/node-mongodb-native/blob/v4.2.2/src/cmap/connect.ts export type V4Connect = { connect: (options: any, callback: any) => void; diff --git a/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v4.test.ts b/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v4.test.ts index 18ba6bc9ef..d126c494b7 100644 --- a/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v4.test.ts +++ b/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v4.test.ts @@ -237,6 +237,42 @@ describe('MongoDBInstrumentation-Tracing-v4', () => { }); }); }); + + it('should create child spans for concurrent cursor operations', done => { + const queries = [{ a: 1 }, { a: 2 }, { a: 3 }]; + const tasks = queries.map((query, idx) => { + return new Promise((resolve, reject) => { + process.nextTick(() => { + const span = trace.getTracer('default').startSpan(`findRootSpan ${idx}`); + context.with(trace.setSpan(context.active(), span), () => { + collection + .find(query) + .toArray() + .then(() => { + resolve(span.end()); + }) + .catch(reject); + }); + }); + }); + }); + + Promise.all(tasks) + .then(() => { + const spans = getTestSpans(); + const roots = spans.filter(s => s.name.startsWith('findRootSpan')); + + roots.forEach(root => { + const rootId = root.spanContext().spanId; + const children = spans.filter(s => s.parentSpanId === rootId); + assert.strictEqual(children.length, 1); + }) + done(); + }) + .catch(err => { + done(err); + }) + }); }); /** Should intercept command */ diff --git a/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v5.test.ts b/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v5.test.ts index bd8271cb8d..17dac52714 100644 --- a/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v5.test.ts +++ b/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v5.test.ts @@ -243,6 +243,42 @@ describe('MongoDBInstrumentation-Tracing-v5', () => { }); }); }); + + it('should create child spans for concurrent cursor operations', done => { + const queries = [{ a: 1 }, { a: 2 }, { a: 3 }]; + const tasks = queries.map((query, idx) => { + return new Promise((resolve, reject) => { + process.nextTick(() => { + const span = trace.getTracer('default').startSpan(`findRootSpan ${idx}`); + context.with(trace.setSpan(context.active(), span), () => { + collection + .find(query) + .toArray() + .then(() => { + resolve(span.end()); + }) + .catch(reject); + }); + }); + }); + }); + + Promise.all(tasks) + .then(() => { + const spans = getTestSpans(); + const roots = spans.filter(s => s.name.startsWith('findRootSpan')); + + roots.forEach(root => { + const rootId = root.spanContext().spanId; + const children = spans.filter(s => s.parentSpanId === rootId); + assert.strictEqual(children.length, 1); + }) + done(); + }) + .catch(err => { + done(err); + }) + }); }); /** Should intercept command */ From 0b9b8d07805b80303a3e6ddb5be7968957e01f69 Mon Sep 17 00:00:00 2001 From: David Luna Date: Mon, 9 Oct 2023 13:55:48 +0200 Subject: [PATCH 2/5] chore: add comment about mongodb issue --- .../src/instrumentation.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/plugins/node/opentelemetry-instrumentation-mongodb/src/instrumentation.ts b/plugins/node/opentelemetry-instrumentation-mongodb/src/instrumentation.ts index 09c36908f1..d9fb3333c1 100644 --- a/plugins/node/opentelemetry-instrumentation-mongodb/src/instrumentation.ts +++ b/plugins/node/opentelemetry-instrumentation-mongodb/src/instrumentation.ts @@ -324,6 +324,10 @@ export class MongoDBInstrumentation extends InstrumentationBase { }; } + // This is a pack to avoid loosing the async context when the + // ConnecitonPool class queues an operation while witing for + // an available connection. This shouuld be revoved when + // https://jira.mongodb.org/browse/NODE-5639 is done. private _getV4ConnectionPoolCheckOut() { return (original: V4ConnectionPool['checkOut']) => { return function patchedCheckout(this: unknown, callback: any) { From cf7222bcb7eb3b4e1b9048125e2d681f37437a89 Mon Sep 17 00:00:00 2001 From: David Luna Date: Mon, 9 Oct 2023 13:57:00 +0200 Subject: [PATCH 3/5] chore: improve comments --- .../src/instrumentation.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/plugins/node/opentelemetry-instrumentation-mongodb/src/instrumentation.ts b/plugins/node/opentelemetry-instrumentation-mongodb/src/instrumentation.ts index d9fb3333c1..aaaf3336c4 100644 --- a/plugins/node/opentelemetry-instrumentation-mongodb/src/instrumentation.ts +++ b/plugins/node/opentelemetry-instrumentation-mongodb/src/instrumentation.ts @@ -324,9 +324,7 @@ export class MongoDBInstrumentation extends InstrumentationBase { }; } - // This is a pack to avoid loosing the async context when the - // ConnecitonPool class queues an operation while witing for - // an available connection. This shouuld be revoved when + // This patch will become unnecessary once // https://jira.mongodb.org/browse/NODE-5639 is done. private _getV4ConnectionPoolCheckOut() { return (original: V4ConnectionPool['checkOut']) => { From 917c848320c69e8bf14bb58576df6f58aadc0b35 Mon Sep 17 00:00:00 2001 From: David Luna Date: Fri, 13 Oct 2023 08:53:21 +0200 Subject: [PATCH 4/5] chore: lint fix --- .../src/internal-types.ts | 2 +- .../test/mongodb-v4.test.ts | 8 +++++--- .../test/mongodb-v5.test.ts | 8 +++++--- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/plugins/node/opentelemetry-instrumentation-mongodb/src/internal-types.ts b/plugins/node/opentelemetry-instrumentation-mongodb/src/internal-types.ts index 38e989eafb..5cb4119de5 100644 --- a/plugins/node/opentelemetry-instrumentation-mongodb/src/internal-types.ts +++ b/plugins/node/opentelemetry-instrumentation-mongodb/src/internal-types.ts @@ -186,7 +186,7 @@ export type V4Connection = { // https://github.com/mongodb/node-mongodb-native/blob/v4.2.2/src/cmap/connection_pool.ts export type V4ConnectionPool = { - // Instrumentation jsut cares about carrying the async context so + // Instrumentation just cares about carrying the async context so // types of callback params are not needed checkOut: (callback: (error: any, connection: any) => void) => void; }; diff --git a/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v4.test.ts b/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v4.test.ts index d126c494b7..724e33cffd 100644 --- a/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v4.test.ts +++ b/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v4.test.ts @@ -243,7 +243,9 @@ describe('MongoDBInstrumentation-Tracing-v4', () => { const tasks = queries.map((query, idx) => { return new Promise((resolve, reject) => { process.nextTick(() => { - const span = trace.getTracer('default').startSpan(`findRootSpan ${idx}`); + const span = trace + .getTracer('default') + .startSpan(`findRootSpan ${idx}`); context.with(trace.setSpan(context.active(), span), () => { collection .find(query) @@ -266,12 +268,12 @@ describe('MongoDBInstrumentation-Tracing-v4', () => { const rootId = root.spanContext().spanId; const children = spans.filter(s => s.parentSpanId === rootId); assert.strictEqual(children.length, 1); - }) + }); done(); }) .catch(err => { done(err); - }) + }); }); }); diff --git a/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v5.test.ts b/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v5.test.ts index 17dac52714..e9f80ad6c5 100644 --- a/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v5.test.ts +++ b/plugins/node/opentelemetry-instrumentation-mongodb/test/mongodb-v5.test.ts @@ -249,7 +249,9 @@ describe('MongoDBInstrumentation-Tracing-v5', () => { const tasks = queries.map((query, idx) => { return new Promise((resolve, reject) => { process.nextTick(() => { - const span = trace.getTracer('default').startSpan(`findRootSpan ${idx}`); + const span = trace + .getTracer('default') + .startSpan(`findRootSpan ${idx}`); context.with(trace.setSpan(context.active(), span), () => { collection .find(query) @@ -272,12 +274,12 @@ describe('MongoDBInstrumentation-Tracing-v5', () => { const rootId = root.spanContext().spanId; const children = spans.filter(s => s.parentSpanId === rootId); assert.strictEqual(children.length, 1); - }) + }); done(); }) .catch(err => { done(err); - }) + }); }); }); From 80feddf8a8b076e65eb4589e1d6bab96ff089d92 Mon Sep 17 00:00:00 2001 From: David Luna Date: Fri, 13 Oct 2023 18:16:29 +0200 Subject: [PATCH 5/5] chore: lint autofix --- .../src/instrumentation.ts | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/plugins/node/opentelemetry-instrumentation-mongodb/src/instrumentation.ts b/plugins/node/opentelemetry-instrumentation-mongodb/src/instrumentation.ts index aaaf3336c4..3fb4d2c75a 100644 --- a/plugins/node/opentelemetry-instrumentation-mongodb/src/instrumentation.ts +++ b/plugins/node/opentelemetry-instrumentation-mongodb/src/instrumentation.ts @@ -283,7 +283,7 @@ export class MongoDBInstrumentation extends InstrumentationBase { v4PatchConnectionPool: (moduleExports: any, moduleVersion?: string) => { diag.debug(`Applying patch for mongodb@${moduleVersion}`); const poolPrototype = moduleExports.ConnectionPool.prototype; - + if (isWrapped(poolPrototype.checkOut)) { this._unwrap(poolPrototype, 'checkOut'); } @@ -295,7 +295,10 @@ export class MongoDBInstrumentation extends InstrumentationBase { ); return moduleExports; }, - v4UnpatchConnectionPool: (moduleExports?: any, moduleVersion?: string) => { + v4UnpatchConnectionPool: ( + moduleExports?: any, + moduleVersion?: string + ) => { diag.debug(`Removing internal patch for mongodb@${moduleVersion}`); if (moduleExports === undefined) return; @@ -324,7 +327,7 @@ export class MongoDBInstrumentation extends InstrumentationBase { }; } - // This patch will become unnecessary once + // This patch will become unnecessary once // https://jira.mongodb.org/browse/NODE-5639 is done. private _getV4ConnectionPoolCheckOut() { return (original: V4ConnectionPool['checkOut']) => {