diff --git a/src/attributes.ts b/src/attributes.ts index 57b852e..40160c8 100644 --- a/src/attributes.ts +++ b/src/attributes.ts @@ -5,6 +5,7 @@ const worker = `${ns}.worker`; export const BullMQAttributes = { MESSAGING_SYSTEM: "bullmq", + MESSAGING_OPERATION_NAME: `${ns}.operation.name`, JOB_ATTEMPTS: `${job}.attempts`, JOB_DELAY: `${job}.delay`, JOB_FAILED_REASON: `${job}.failedReason`, @@ -18,8 +19,6 @@ export const BullMQAttributes = { JOB_WAIT_CHILDREN_KEY: `${job}.parentOpts.waitChildrenKey`, JOB_BULK_NAMES: `${job}.bulk.names`, JOB_BULK_COUNT: `${job}.bulk.count`, - QUEUE_NAME: `${queue}.name`, - WORKER_NAME: `${worker}.name`, WORKER_CONCURRENCY: `${worker}.concurrency`, WORKER_LOCK_DURATION: `${worker}.lockDuration`, WORKER_LOCK_RENEW: `${worker}.lockRenewTime`, diff --git a/src/instrumentation.ts b/src/instrumentation.ts index 5421f2f..b591168 100644 --- a/src/instrumentation.ts +++ b/src/instrumentation.ts @@ -151,7 +151,8 @@ export class BullMQInstrumentation extends InstrumentationBase { private _patchAddJob(): (original: Function) => (...args: any) => any { const instrumentation = this; const tracer = instrumentation.tracer; - const action = "Job.addJob"; + const operationName = "Job.addJob"; + const operationType = "create"; return function addJob(original) { return async function patch( @@ -189,9 +190,13 @@ export class BullMQInstrumentation extends InstrumentationBase { let childSpan: Span | undefined; if (shouldCreateSpan) { - const spanName = `${this.queueName}.${this.name} ${action}`; + const spanName = `${this.queueName} ${operationType}`; childSpan = tracer.startSpan(spanName, { kind: SpanKind.PRODUCER, + attributes: { + [SemanticAttributes.MESSAGING_OPERATION]: operationType, + [BullMQAttributes.MESSAGING_OPERATION_NAME]: operationName, + }, }); } @@ -244,7 +249,8 @@ export class BullMQInstrumentation extends InstrumentationBase { private _patchQueueAdd(): (original: Function) => (...args: any) => any { const instrumentation = this; const tracer = instrumentation.tracer; - const action = "Queue.add"; + const operationName = "Queue.add"; + const operationType = "publish"; return function add(original) { return async function patch(this: Queue, ...args: any): Promise { @@ -257,9 +263,13 @@ export class BullMQInstrumentation extends InstrumentationBase { const [name] = [...args]; - const spanName = `${this.name}.${name} ${action}`; + const spanName = `${this.name} ${operationType}`; const span = tracer.startSpan(spanName, { kind: SpanKind.PRODUCER, + attributes: { + [SemanticAttributes.MESSAGING_OPERATION]: operationType, + [BullMQAttributes.MESSAGING_OPERATION_NAME]: operationName, + }, }); return BullMQInstrumentation.withContext(this, original, span, args); @@ -270,7 +280,8 @@ export class BullMQInstrumentation extends InstrumentationBase { private _patchQueueAddBulk(): (original: Function) => (...args: any) => any { const instrumentation = this; const tracer = instrumentation.tracer; - const action = "Queue.addBulk"; + const operationName = "Queue.addBulk"; + const operationType = "publish"; return function addBulk(original) { return async function patch( @@ -286,7 +297,7 @@ export class BullMQInstrumentation extends InstrumentationBase { const names = args[0].map((job) => job.name); - const spanName = `${this.name} ${action}`; + const spanName = `${this.name} ${operationType}`; const spanKind = instrumentation.shouldCreateSpan({ isBulk: true, isFlow: false, @@ -299,6 +310,8 @@ export class BullMQInstrumentation extends InstrumentationBase { [SemanticAttributes.MESSAGING_SYSTEM]: BullMQAttributes.MESSAGING_SYSTEM, [SemanticAttributes.MESSAGING_DESTINATION]: this.name, + [SemanticAttributes.MESSAGING_OPERATION]: operationType, + [BullMQAttributes.MESSAGING_OPERATION_NAME]: operationName, [BullMQAttributes.JOB_BULK_NAMES]: names, [BullMQAttributes.JOB_BULK_COUNT]: names.length, }, @@ -317,7 +330,8 @@ export class BullMQInstrumentation extends InstrumentationBase { ) => (...args: any) => any { const instrumentation = this; const tracer = instrumentation.tracer; - const action = "FlowProducer.add"; + const operationName = "FlowProducer.add"; + const operationType = "publish"; return function add(original) { return async function patch( @@ -332,7 +346,7 @@ export class BullMQInstrumentation extends InstrumentationBase { return await original.apply(this, [flow, opts]); } - const spanName = `${flow.queueName}.${flow.name} ${action}`; + const spanName = `${flow.queueName} ${operationType}`; const spanKind = instrumentation.shouldCreateSpan({ isBulk: false, isFlow: true, @@ -345,6 +359,8 @@ export class BullMQInstrumentation extends InstrumentationBase { [SemanticAttributes.MESSAGING_SYSTEM]: BullMQAttributes.MESSAGING_SYSTEM, [SemanticAttributes.MESSAGING_DESTINATION]: flow.queueName, + [SemanticAttributes.MESSAGING_OPERATION]: operationType, + [BullMQAttributes.MESSAGING_OPERATION_NAME]: operationName, [BullMQAttributes.JOB_NAME]: flow.name, }, kind: spanKind, @@ -368,7 +384,8 @@ export class BullMQInstrumentation extends InstrumentationBase { ) => (...args: any) => any { const instrumentation = this; const tracer = instrumentation.tracer; - const action = "FlowProducer.addBulk"; + const operationName = "FlowProducer.addBulk"; + const operationType = "publish"; return function addBulk(original) { return async function patch( @@ -382,7 +399,7 @@ export class BullMQInstrumentation extends InstrumentationBase { return await original.apply(this, args); } - const spanName = `${action}`; + const spanName = `(bulk) ${operationType}`; const spanKind = instrumentation.shouldCreateSpan({ isBulk: true, isFlow: true, @@ -395,6 +412,8 @@ export class BullMQInstrumentation extends InstrumentationBase { attributes: { [SemanticAttributes.MESSAGING_SYSTEM]: BullMQAttributes.MESSAGING_SYSTEM, + [SemanticAttributes.MESSAGING_OPERATION]: operationType, + [BullMQAttributes.MESSAGING_OPERATION_NAME]: operationName, [BullMQAttributes.JOB_BULK_NAMES]: names, [BullMQAttributes.JOB_BULK_COUNT]: names.length, }, @@ -414,6 +433,8 @@ export class BullMQInstrumentation extends InstrumentationBase { ) => (...args: any) => any { const instrumentation = this; const tracer = instrumentation.tracer; + const operationType = "process"; + const operationName = "Worker.run"; return function patch(original) { return async function callProcessJob( @@ -425,14 +446,15 @@ export class BullMQInstrumentation extends InstrumentationBase { const currentContext = context.active(); const producerContext = propagation.extract(currentContext, job.opts); - const spanName = `${job.queueName}.${job.name} Worker.${workerName} #${job.attemptsMade}`; + const spanName = `${job.queueName} ${operationType}`; const span = tracer.startSpan(spanName, { attributes: BullMQInstrumentation.dropInvalidAttributes({ [SemanticAttributes.MESSAGING_SYSTEM]: BullMQAttributes.MESSAGING_SYSTEM, [SemanticAttributes.MESSAGING_CONSUMER_ID]: workerName, [SemanticAttributes.MESSAGING_MESSAGE_ID]: job.id, - [SemanticAttributes.MESSAGING_OPERATION]: "receive", + [SemanticAttributes.MESSAGING_OPERATION]: operationType, + [BullMQAttributes.MESSAGING_OPERATION_NAME]: operationName, [BullMQAttributes.JOB_NAME]: job.name, [BullMQAttributes.JOB_ATTEMPTS]: job.attemptsMade, [BullMQAttributes.JOB_TIMESTAMP]: job.timestamp, @@ -442,8 +464,7 @@ export class BullMQInstrumentation extends InstrumentationBase { BullMQAttributes.JOB_OPTS, job.opts, ), - [BullMQAttributes.QUEUE_NAME]: job.queueName, - [BullMQAttributes.WORKER_NAME]: workerName, + [SemanticAttributes.MESSAGING_DESTINATION]: job.queueName, [BullMQAttributes.WORKER_CONCURRENCY]: this.opts?.concurrency, [BullMQAttributes.WORKER_LOCK_DURATION]: this.opts?.lockDuration, [BullMQAttributes.WORKER_LOCK_RENEW]: this.opts?.lockRenewTime, diff --git a/test/instrumentation.test.ts b/test/instrumentation.test.ts index 6453c67..ffa0a1f 100644 --- a/test/instrumentation.test.ts +++ b/test/instrumentation.test.ts @@ -102,18 +102,43 @@ function assertContains( object: Record, pairs: Record, ) { - contextualizeError(() => { - Object.entries(pairs).forEach(([key, value]) => { - assert.deepStrictEqual(object[key], value); - }); - }, { input: object, expected: pairs }) + contextualizeError( + () => { + Object.entries(pairs).forEach(([key, value]) => { + assert.deepStrictEqual(object[key], value); + }); + }, + { actual: object, expected: pairs }, + ); +} + +// Performs a set equality comparison +function assertEqualSet(actual: Iterable, expected: Iterable) { + const actualSet = new Set(actual); + const expectedSet = new Set(expected); + + contextualizeError( + () => { + assert.strictEqual(actualSet.size, expectedSet.size); + for (const value of actual) { + assert.ok(expectedSet.has(value)); + } + }, + { + actual: actualSet.values, + expected: expectedSet.values, + }, + ); } function assertDoesNotContain(object: Record, keys: string[]) { keys.forEach((key) => { - contextualizeError(() => { - assert.strictEqual(object[key], undefined); - }, { key }) + contextualizeError( + () => { + assert.strictEqual(object[key], undefined); + }, + { key }, + ); }); } @@ -122,7 +147,7 @@ function contextualizeError(fn: () => void, context: Record) { fn(); } catch (e: any) { Object.entries(context).forEach(([key, value]) => { - e.message += `\n${key}: ${util.format(value)}` + e.message += `\n${key}: ${util.format(value)}`; }); throw e; } @@ -246,12 +271,14 @@ describe("bullmq", () => { spans.forEach(assertMessagingSystem); const queueAddSpan = spans.find( - (span) => span.name === "queueName.jobName Queue.add", + (span) => span.name === "queueName publish", ); assert.notStrictEqual(queueAddSpan, undefined); assert.strictEqual(queueAddSpan?.kind, SpanKind.PRODUCER); assertContains(queueAddSpan?.attributes!, { "messaging.destination": "queueName", + "messaging.bullmq.operation.name": "Queue.add", + "messaging.operation": "publish", "messaging.bullmq.job.name": "jobName", }); @@ -271,7 +298,7 @@ describe("bullmq", () => { const spans = memoryExporter.getFinishedSpans(); const queueAddSpan = spans.find( - (span) => span.name === "queueName.jobName Queue.add", + (span) => span.name === "queueName publish", ); assert.notStrictEqual(queueAddSpan, undefined); assertContains(queueAddSpan?.attributes!, { @@ -286,7 +313,7 @@ describe("bullmq", () => { const spans = memoryExporter.getFinishedSpans(); const queueAddSpan = spans.find( - (span) => span.name === "queueName.jobName Queue.add", + (span) => span.name === "queueName publish", ); assert.notStrictEqual(queueAddSpan, undefined); assertContains(queueAddSpan?.attributes!, { @@ -306,11 +333,13 @@ describe("bullmq", () => { spans.forEach(assertMessagingSystem); const queueAddBulkSpan = spans.find( - (span) => span.name === "queueName Queue.addBulk", + (span) => span.name === "queueName publish", ); assert.notStrictEqual(queueAddBulkSpan, undefined); assertContains(queueAddBulkSpan?.attributes!, { "messaging.destination": "queueName", + "messaging.bullmq.operation.name": "Queue.addBulk", + "messaging.operation": "publish", "messaging.bullmq.job.bulk.names": ["jobName1", "jobName2"], "messaging.bullmq.job.bulk.count": 2, }); @@ -318,17 +347,30 @@ describe("bullmq", () => { "messaging.bullmq.job.name", ]); - const jobAddSpan1 = spans.find( - (span) => span.name === "queueName.jobName1 Job.addJob", + const jobAddSpans = spans.filter( + (span) => span.name === "queueName create", ); - const jobAddSpan2 = spans.find( - (span) => span.name === "queueName.jobName2 Job.addJob", + + assert.strictEqual(jobAddSpans.length, 2); + + jobAddSpans.forEach((jobAddSpan) => { + assert.notStrictEqual(jobAddSpan, undefined); + + assertContains(jobAddSpan?.attributes!, { + "messaging.bullmq.operation.name": "Job.addJob", + "messaging.operation": "create", + }); + + assertSpanParent(jobAddSpan!, queueAddBulkSpan!); + }); + + assertEqualSet( + jobAddSpans.map( + (jobAddSpan) => jobAddSpan.attributes!["messaging.bullmq.job.name"], + ), + ["jobName1", "jobName2"], ); - assert.notStrictEqual(jobAddSpan1, undefined); - assert.notStrictEqual(jobAddSpan2, undefined); - assertSpanParent(jobAddSpan1!, queueAddBulkSpan!); - assertSpanParent(jobAddSpan2!, queueAddBulkSpan!); assertRootSpan(queueAddBulkSpan!); }); }); @@ -347,7 +389,7 @@ describe("bullmq", () => { spans.forEach(assertMessagingSystem); const queueAddBulkSpan = spans.find( - (span) => span.name === "queueName Queue.addBulk", + (span) => span.name === "queueName publish", ); assert.notStrictEqual(queueAddBulkSpan, undefined); }); @@ -453,20 +495,22 @@ describe("bullmq", () => { spans.forEach(assertMessagingSystem); const flowProducerAddSpan = spans.find( - (span) => span.name === "queueName.jobName FlowProducer.add", + (span) => span.name === "queueName publish", ); assert.notStrictEqual(flowProducerAddSpan, undefined); assertContains(flowProducerAddSpan?.attributes!, { "messaging.destination": "queueName", + "messaging.bullmq.operation.name": "FlowProducer.add", + "messaging.operation": "publish", "messaging.bullmq.job.name": "jobName", }); - const jobAddSpan = spans.find( - (span) => span.name === "queueName.jobName Job.addJob", - ); + const jobAddSpan = spans.find((span) => span.name === "queueName create"); assert.notStrictEqual(jobAddSpan, undefined); assertContains(jobAddSpan?.attributes!, { "messaging.destination": "queueName", + "messaging.bullmq.operation.name": "Job.addJob", + "messaging.operation": "create", "messaging.bullmq.job.name": "jobName", }); @@ -505,22 +549,24 @@ describe("bullmq", () => { spans.forEach(assertMessagingSystem); const flowProducerAddSpan = spans.find( - (span) => span.name === "queueName.jobName FlowProducer.add", + (span) => span.name === "queueName publish", ); assert.notStrictEqual(flowProducerAddSpan, undefined); assert.strictEqual(flowProducerAddSpan?.kind, SpanKind.INTERNAL); assertContains(flowProducerAddSpan?.attributes!, { "messaging.destination": "queueName", + "messaging.bullmq.operation.name": "FlowProducer.add", + "messaging.operation": "publish", "messaging.bullmq.job.name": "jobName", }); - const jobAddSpan = spans.find( - (span) => span.name === "queueName.jobName Job.addJob", - ); + const jobAddSpan = spans.find((span) => span.name === "queueName create"); assert.notStrictEqual(jobAddSpan, undefined); assert.strictEqual(jobAddSpan?.kind, SpanKind.PRODUCER); assertContains(jobAddSpan?.attributes!, { "messaging.destination": "queueName", + "messaging.bullmq.operation.name": "Job.addJob", + "messaging.operation": "create", "messaging.bullmq.job.name": "jobName", "messaging.bullmq.job.parentOpts.waitChildrenKey": "bull:queueName:waiting-children", @@ -536,12 +582,14 @@ describe("bullmq", () => { const jobId = jobAddSpan?.attributes!["messaging.message_id"] as string; const childJobAddSpan = spans.find( - (span) => span.name === "childQueueName.childJobName Job.addJob", + (span) => span.name === "childQueueName create", ); assert.notStrictEqual(childJobAddSpan, undefined); assert.strictEqual(childJobAddSpan?.kind, SpanKind.PRODUCER); assertContains(childJobAddSpan?.attributes!, { "messaging.destination": "childQueueName", + "messaging.bullmq.operation.name": "Job.addJob", + "messaging.operation": "create", "messaging.bullmq.job.name": "childJobName", "messaging.bullmq.job.opts.parent.id": `${jobId}`, // TODO: should this just be `queueName`, without `bull:`? @@ -557,7 +605,10 @@ describe("bullmq", () => { childJobAddSpan?.attributes!["messaging.message_id"], "unknown", ); - assert.notStrictEqual(childJobAddSpan?.attributes!["messaging.message_id"], jobId); + assert.notStrictEqual( + childJobAddSpan?.attributes!["messaging.message_id"], + jobId, + ); assertDoesNotContain(childJobAddSpan?.attributes!, [ "messaging.bullmq.job.parentOpts.waitChildrenKey", ]); @@ -587,7 +638,7 @@ describe("bullmq", () => { spans.forEach(assertMessagingSystem); const flowProducerAddSpan = spans.find( - (span) => span.name === "queueName.jobName FlowProducer.add", + (span) => span.name === "queueName publish", ); assert.notStrictEqual(flowProducerAddSpan, undefined); assert.strictEqual(flowProducerAddSpan?.kind, SpanKind.PRODUCER); @@ -605,13 +656,15 @@ describe("bullmq", () => { spans.forEach(assertMessagingSystem); const flowProducerAddBulkSpan = spans.find( - (span) => span.name === "FlowProducer.addBulk", + (span) => span.name === "(bulk) publish", ); assert.notStrictEqual(flowProducerAddBulkSpan, undefined); assert.strictEqual(flowProducerAddBulkSpan?.kind, SpanKind.INTERNAL); assertContains(flowProducerAddBulkSpan?.attributes!, { "messaging.bullmq.job.bulk.names": ["jobName1", "jobName2"], + "messaging.bullmq.operation.name": "FlowProducer.addBulk", + "messaging.operation": "publish", "messaging.bullmq.job.bulk.count": 2, }); assertDoesNotContain(flowProducerAddBulkSpan?.attributes!, [ @@ -619,19 +672,27 @@ describe("bullmq", () => { "messaging.bullmq.job.name", ]); - const jobAddSpan1 = spans.find( - (span) => span.name === "queueName.jobName1 Job.addJob", - ); - const jobAddSpan2 = spans.find( - (span) => span.name === "queueName.jobName2 Job.addJob", + const jobAddSpans = spans.filter( + (span) => span.name === "queueName create", ); - for (const jobAddSpan of [jobAddSpan1, jobAddSpan2]) { + for (const jobAddSpan of jobAddSpans) { assert.notStrictEqual(jobAddSpan, undefined); assert.strictEqual(jobAddSpan?.kind, SpanKind.PRODUCER); + assertContains(jobAddSpan?.attributes!, { + "messaging.bullmq.operation.name": "Job.addJob", + "messaging.operation": "create", + }); assertSpanParent(jobAddSpan!, flowProducerAddBulkSpan!); } + assertEqualSet( + jobAddSpans.map( + (jobAddSpan) => jobAddSpan.attributes!["messaging.bullmq.job.name"], + ), + ["jobName1", "jobName2"], + ); + assertRootSpan(flowProducerAddBulkSpan!); }); @@ -672,7 +733,7 @@ describe("bullmq", () => { spans.forEach(assertMessagingSystem); const flowProducerAddBulkSpan = spans.find( - (span) => span.name === "FlowProducer.addBulk", + (span) => span.name === "(bulk) publish", ); assert.notStrictEqual(flowProducerAddBulkSpan, undefined); assert.strictEqual(flowProducerAddBulkSpan?.kind, SpanKind.PRODUCER); @@ -729,12 +790,12 @@ describe("bullmq", () => { spans.forEach(assertMessagingSystem); const queueAddSpan = spans.find( - (span) => span.name === "queueName.testJob Queue.add", + (span) => span.name === "queueName publish", ); assert.notStrictEqual(queueAddSpan, undefined); const workerJobSpan = spans.find((span) => - span.name.includes("queueName.testJob Worker.queueName"), + span.name.includes("queueName process"), ); assert.notStrictEqual(workerJobSpan, undefined); assert.strictEqual(workerJobSpan?.kind, SpanKind.CONSUMER); @@ -742,11 +803,11 @@ describe("bullmq", () => { assertSpanLink(workerJobSpan!, queueAddSpan!); assertContains(workerJobSpan?.attributes!, { "messaging.consumer_id": "queueName", + "messaging.destination": "queueName", "messaging.message_id": "1", - "messaging.operation": "receive", + "messaging.operation": "process", + "messaging.bullmq.operation.name": "Worker.run", "messaging.bullmq.job.name": "testJob", - "messaging.bullmq.queue.name": "queueName", - "messaging.bullmq.worker.name": "queueName", "messaging.bullmq.worker.concurrency": 1, "messaging.bullmq.worker.lockDuration": 30000, "messaging.bullmq.worker.lockRenewTime": 15000, @@ -799,7 +860,7 @@ describe("bullmq", () => { const spans = memoryExporter.getFinishedSpans(); const workerJobSpan = spans.find((span) => - span.name.includes("queueName.testJob Worker.queueName"), + span.name.includes("queueName process"), ); assert.notStrictEqual(workerJobSpan, undefined); @@ -831,7 +892,7 @@ describe("bullmq", () => { const span = memoryExporter .getFinishedSpans() - .find((span) => span.name.includes("Worker.worker")); + .find((span) => span.name.includes("worker process")); const evt = span?.events.find((event) => event.name.includes("extendLock"), ); @@ -860,7 +921,7 @@ describe("bullmq", () => { const span = memoryExporter .getFinishedSpans() - .find((span) => span.name.includes("Worker.worker")); + .find((span) => span.name.includes("worker process")); const evt = span?.events.find((event) => event.name.includes("exception"), ); @@ -903,7 +964,7 @@ describe("bullmq", () => { spans.forEach(assertMessagingSystem); const jobSpans = spans.filter((span) => - span.name.includes("Worker.worker"), + span.name.includes("worker process"), ); assert.strictEqual(jobSpans.length, 2); jobSpans.forEach((span) => {