Skip to content

Commit

Permalink
Align with Messaging Semantic Conventions
Browse files Browse the repository at this point in the history
- Use `process`, not `receive`, for the `messaging.operation` value
in the `Worker.run` span.
- Remove `messaging.bullmq.(worker|queue).name` attributes. The name
of the worker is the name of the queue, which is represented in the
`messaging.destination` attributes.
- Use a `messaging.bullmq.operation.name` to provide BullMQ-meaningful
names of operations. This mirrors the `messaging.operation.name`
attribute in the not-yet-published v1.26.0 of the OpenTelemetry
Semantic Conventions.
- Name spans by the destination (the queue) and the operation type.
Notably, this removes the job name, which per BullMQ's documentation,
is meant to be human-meaningful and uniquely identify the job, meaning
it's very likely to be high-cardinality.
  • Loading branch information
unflxw committed Jun 6, 2024
1 parent da7c63a commit 976dee5
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 66 deletions.
3 changes: 1 addition & 2 deletions src/attributes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const worker = `${ns}.worker`;

export const BullMQAttributes = {
MESSAGING_SYSTEM: "bullmq",
MESSAGING_OPERATION_NAME: `${ns}.operation.name`,
JOB_ATTEMPTS: `${job}.attempts`,
JOB_DELAY: `${job}.delay`,
JOB_FAILED_REASON: `${job}.failedReason`,
Expand All @@ -18,8 +19,6 @@ export const BullMQAttributes = {
JOB_WAIT_CHILDREN_KEY: `${job}.parentOpts.waitChildrenKey`,
JOB_BULK_NAMES: `${job}.bulk.names`,
JOB_BULK_COUNT: `${job}.bulk.count`,
QUEUE_NAME: `${queue}.name`,
WORKER_NAME: `${worker}.name`,
WORKER_CONCURRENCY: `${worker}.concurrency`,
WORKER_LOCK_DURATION: `${worker}.lockDuration`,
WORKER_LOCK_RENEW: `${worker}.lockRenewTime`,
Expand Down
49 changes: 35 additions & 14 deletions src/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ export class BullMQInstrumentation extends InstrumentationBase {
private _patchAddJob(): (original: Function) => (...args: any) => any {
const instrumentation = this;
const tracer = instrumentation.tracer;
const action = "Job.addJob";
const operationName = "Job.addJob";
const operationType = "create";

return function addJob(original) {
return async function patch(
Expand Down Expand Up @@ -189,9 +190,13 @@ export class BullMQInstrumentation extends InstrumentationBase {
let childSpan: Span | undefined;

if (shouldCreateSpan) {
const spanName = `${this.queueName}.${this.name} ${action}`;
const spanName = `${this.queueName} ${operationType}`;
childSpan = tracer.startSpan(spanName, {
kind: SpanKind.PRODUCER,
attributes: {
[SemanticAttributes.MESSAGING_OPERATION]: operationType,
[BullMQAttributes.MESSAGING_OPERATION_NAME]: operationName,
},
});
}

Expand Down Expand Up @@ -244,7 +249,8 @@ export class BullMQInstrumentation extends InstrumentationBase {
private _patchQueueAdd(): (original: Function) => (...args: any) => any {
const instrumentation = this;
const tracer = instrumentation.tracer;
const action = "Queue.add";
const operationName = "Queue.add";
const operationType = "publish";

return function add(original) {
return async function patch(this: Queue, ...args: any): Promise<Job> {
Expand All @@ -257,9 +263,13 @@ export class BullMQInstrumentation extends InstrumentationBase {

const [name] = [...args];

const spanName = `${this.name}.${name} ${action}`;
const spanName = `${this.name} ${operationType}`;
const span = tracer.startSpan(spanName, {
kind: SpanKind.PRODUCER,
attributes: {
[SemanticAttributes.MESSAGING_OPERATION]: operationType,
[BullMQAttributes.MESSAGING_OPERATION_NAME]: operationName,
},
});

return BullMQInstrumentation.withContext(this, original, span, args);
Expand All @@ -270,7 +280,8 @@ export class BullMQInstrumentation extends InstrumentationBase {
private _patchQueueAddBulk(): (original: Function) => (...args: any) => any {
const instrumentation = this;
const tracer = instrumentation.tracer;
const action = "Queue.addBulk";
const operationName = "Queue.addBulk";
const operationType = "publish";

return function addBulk(original) {
return async function patch(
Expand All @@ -286,7 +297,7 @@ export class BullMQInstrumentation extends InstrumentationBase {

const names = args[0].map((job) => job.name);

const spanName = `${this.name} ${action}`;
const spanName = `${this.name} ${operationType}`;
const spanKind = instrumentation.shouldCreateSpan({
isBulk: true,
isFlow: false,
Expand All @@ -299,6 +310,8 @@ export class BullMQInstrumentation extends InstrumentationBase {
[SemanticAttributes.MESSAGING_SYSTEM]:
BullMQAttributes.MESSAGING_SYSTEM,
[SemanticAttributes.MESSAGING_DESTINATION]: this.name,
[SemanticAttributes.MESSAGING_OPERATION]: operationType,
[BullMQAttributes.MESSAGING_OPERATION_NAME]: operationName,
[BullMQAttributes.JOB_BULK_NAMES]: names,
[BullMQAttributes.JOB_BULK_COUNT]: names.length,
},
Expand All @@ -317,7 +330,8 @@ export class BullMQInstrumentation extends InstrumentationBase {
) => (...args: any) => any {
const instrumentation = this;
const tracer = instrumentation.tracer;
const action = "FlowProducer.add";
const operationName = "FlowProducer.add";
const operationType = "publish";

return function add(original) {
return async function patch(
Expand All @@ -332,7 +346,7 @@ export class BullMQInstrumentation extends InstrumentationBase {
return await original.apply(this, [flow, opts]);
}

const spanName = `${flow.queueName}.${flow.name} ${action}`;
const spanName = `${flow.queueName} ${operationType}`;
const spanKind = instrumentation.shouldCreateSpan({
isBulk: false,
isFlow: true,
Expand All @@ -345,6 +359,8 @@ export class BullMQInstrumentation extends InstrumentationBase {
[SemanticAttributes.MESSAGING_SYSTEM]:
BullMQAttributes.MESSAGING_SYSTEM,
[SemanticAttributes.MESSAGING_DESTINATION]: flow.queueName,
[SemanticAttributes.MESSAGING_OPERATION]: operationType,
[BullMQAttributes.MESSAGING_OPERATION_NAME]: operationName,
[BullMQAttributes.JOB_NAME]: flow.name,
},
kind: spanKind,
Expand All @@ -368,7 +384,8 @@ export class BullMQInstrumentation extends InstrumentationBase {
) => (...args: any) => any {
const instrumentation = this;
const tracer = instrumentation.tracer;
const action = "FlowProducer.addBulk";
const operationName = "FlowProducer.addBulk";
const operationType = "publish";

return function addBulk(original) {
return async function patch(
Expand All @@ -382,7 +399,7 @@ export class BullMQInstrumentation extends InstrumentationBase {
return await original.apply(this, args);
}

const spanName = `${action}`;
const spanName = `(bulk) ${operationType}`;
const spanKind = instrumentation.shouldCreateSpan({
isBulk: true,
isFlow: true,
Expand All @@ -395,6 +412,8 @@ export class BullMQInstrumentation extends InstrumentationBase {
attributes: {
[SemanticAttributes.MESSAGING_SYSTEM]:
BullMQAttributes.MESSAGING_SYSTEM,
[SemanticAttributes.MESSAGING_OPERATION]: operationType,
[BullMQAttributes.MESSAGING_OPERATION_NAME]: operationName,
[BullMQAttributes.JOB_BULK_NAMES]: names,
[BullMQAttributes.JOB_BULK_COUNT]: names.length,
},
Expand All @@ -414,6 +433,8 @@ export class BullMQInstrumentation extends InstrumentationBase {
) => (...args: any) => any {
const instrumentation = this;
const tracer = instrumentation.tracer;
const operationType = "process";
const operationName = "Worker.run";

return function patch(original) {
return async function callProcessJob(
Expand All @@ -425,14 +446,15 @@ export class BullMQInstrumentation extends InstrumentationBase {
const currentContext = context.active();
const producerContext = propagation.extract(currentContext, job.opts);

const spanName = `${job.queueName}.${job.name} Worker.${workerName} #${job.attemptsMade}`;
const spanName = `${job.queueName} ${operationType}`;
const span = tracer.startSpan(spanName, {
attributes: BullMQInstrumentation.dropInvalidAttributes({
[SemanticAttributes.MESSAGING_SYSTEM]:
BullMQAttributes.MESSAGING_SYSTEM,
[SemanticAttributes.MESSAGING_CONSUMER_ID]: workerName,
[SemanticAttributes.MESSAGING_MESSAGE_ID]: job.id,
[SemanticAttributes.MESSAGING_OPERATION]: "receive",
[SemanticAttributes.MESSAGING_OPERATION]: operationType,
[BullMQAttributes.MESSAGING_OPERATION_NAME]: operationName,
[BullMQAttributes.JOB_NAME]: job.name,
[BullMQAttributes.JOB_ATTEMPTS]: job.attemptsMade,
[BullMQAttributes.JOB_TIMESTAMP]: job.timestamp,
Expand All @@ -442,8 +464,7 @@ export class BullMQInstrumentation extends InstrumentationBase {
BullMQAttributes.JOB_OPTS,
job.opts,
),
[BullMQAttributes.QUEUE_NAME]: job.queueName,
[BullMQAttributes.WORKER_NAME]: workerName,
[SemanticAttributes.MESSAGING_DESTINATION]: job.queueName,
[BullMQAttributes.WORKER_CONCURRENCY]: this.opts?.concurrency,
[BullMQAttributes.WORKER_LOCK_DURATION]: this.opts?.lockDuration,
[BullMQAttributes.WORKER_LOCK_RENEW]: this.opts?.lockRenewTime,
Expand Down
Loading

0 comments on commit 976dee5

Please sign in to comment.