Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(telemetry): add option to omit context propagation #2946

Merged
merged 10 commits into from
Dec 10, 2024
5 changes: 4 additions & 1 deletion src/classes/flow-producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,10 @@ export class FlowProducer extends EventEmitter {
...jobsOpts,
...node.opts,
parent: parent?.parentOpts,
telemetryMetadata: dstPropagationMetadata,
telemetryMetadata:
span &&
!node.opts?.telemetry?.omitContext &&
dstPropagationMetadata,
},
jobId,
);
Expand Down
3 changes: 2 additions & 1 deletion src/classes/job-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ export class JobScheduler extends QueueBase {
{
...opts,
repeat: filteredRepeatOpts,
telemetryMetadata: srcPropagationMedatada,
telemetryMetadata:
span && !opts?.telemetry?.omitContext && srcPropagationMedatada,
},
jobData,
iterationCount,
Expand Down
3 changes: 2 additions & 1 deletion src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const optsDecodeMap = {
kl: 'keepLogs',
rdof: 'removeDependencyOnFailure',
tm: 'telemetryMetadata',
omc: 'omitContext',
};

const optsEncodeMap = invertObject(optsDecodeMap);
Expand Down Expand Up @@ -747,7 +748,7 @@ export class Job<
this.getSpanOperation(command),
this.queue.name,
async (span, dstPropagationMedatadata) => {
if (dstPropagationMedatadata) {
if (!this.opts?.telemetry?.omitContext && dstPropagationMedatadata) {
this.scripts.execCommand(multi, 'updateJobOption', [
this.toKey(this.id),
'tm',
Expand Down
2 changes: 1 addition & 1 deletion src/classes/queue-base.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { EventEmitter } from 'events';
import { QueueBaseOptions, RedisClient, Span, Tracer } from '../interfaces';
import { MinimalQueue } from '../types';
import { JobsOptions, MinimalQueue } from '../types';
import {
delay,
DELAY_TIME_5,
Expand Down
7 changes: 5 additions & 2 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ export class Queue<
'add',
`${this.name}.${name}`,
async (span, srcPropagationMedatada) => {
if (srcPropagationMedatada) {
if (!opts?.telemetry?.omitContext && srcPropagationMedatada) {
opts = { ...opts, telemetryMetadata: srcPropagationMedatada };
}

Expand Down Expand Up @@ -404,7 +404,10 @@ export class Queue<
...this.jobsOpts,
...job.opts,
jobId: job.opts?.jobId,
tm: span && srcPropagationMedatada,
tm:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused, why is the option used here "tm" instead of "telemetryMetadata" as on the other functions above?

span &&
!job.opts?.telemetry?.omitContext &&
srcPropagationMedatada,
},
})),
);
Expand Down
6 changes: 6 additions & 0 deletions src/interfaces/base-job-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,10 @@ export interface BaseJobOptions extends DefaultJobOptions {
* TelemetryMetadata, provide for context propagation.
*/
telemetryMetadata?: string;

/**
* If `true` telemetry will omit the context propagation
* @default false
*/
omitContext?: boolean;
}
21 changes: 21 additions & 0 deletions src/types/job-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,22 @@ export type JobsOptions = BaseJobOptions & {
* If true, removes the job from its parent dependencies when it fails after all attempts.
*/
removeDependencyOnFailure?: boolean;

/**
* Telemetry options
*/
telemetry?: {
/**
* TelemetryMetadata, provide for context propagation.
*/
metadata?: string;

/**
* If `true` telemetry will omit the context propagation
* @default false
*/
omitContext?: boolean;
};
};

/**
Expand Down Expand Up @@ -61,4 +77,9 @@ export type RedisJobOptions = BaseJobOptions & {
* TelemetryMetadata, provide for context propagation.
*/
tm?: string;

/**
* Omit Context Propagation
*/
omc?: boolean;
};
133 changes: 131 additions & 2 deletions tests/test_telemetry_interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { expect, assert } from 'chai';
import { default as IORedis } from 'ioredis';
import { after, beforeEach, describe, it, before } from 'mocha';
import { v4 } from 'uuid';
import { FlowProducer, JobScheduler, Queue, Worker } from '../src/classes';
import { FlowProducer, Job, JobScheduler, Queue, Worker } from '../src/classes';
import { removeAllQueueData } from '../src/utils';
import {
Telemetry,
Expand Down Expand Up @@ -93,7 +93,7 @@ describe('Telemetry', () => {
this.options = options;
}

setSpanOnContext(ctx: any): any {
setSpanOnContext(ctx: any, omitContext?: boolean): any {
context['getSpan'] = () => this;
return { ...context, getMetadata_span: this['name'] };
}
Expand Down Expand Up @@ -260,6 +260,7 @@ describe('Telemetry', () => {
});

it('should correctly handle errors and record them in telemetry for upsertJobScheduler', async () => {
const originalCreateNextJob = JobScheduler.prototype.createNextJob;
const recordExceptionSpy = sinon.spy(
MockSpan.prototype,
'recordException',
Expand All @@ -283,6 +284,7 @@ describe('Telemetry', () => {
const recordedError = recordExceptionSpy.firstCall.args[0];
assert.equal(recordedError.message, errMessage);
} finally {
JobScheduler.prototype.createNextJob = originalCreateNextJob;
recordExceptionSpy.restore();
}
});
Expand Down Expand Up @@ -511,4 +513,131 @@ describe('Telemetry', () => {
}
});
});

describe('Omit Propagation', () => {
let fromMetadataSpy;

beforeEach(() => {
fromMetadataSpy = sinon.spy(
telemetryClient.contextManager,
'fromMetadata',
);
});

afterEach(() => fromMetadataSpy.restore());

it('should omit propagation on queue add', async () => {
const worker = new Worker(queueName, async () => 'some result', {
connection,
telemetry: telemetryClient,
});
await worker.waitUntilReady();

const job = await queue.add(
'testJob',
{ foo: 'bar' },
{ telemetry: { omitContext: true } },
);

await worker.processJob(job, 'some-token', () => false, new Set());

expect(fromMetadataSpy.callCount).to.equal(0);
await worker.close();
});

it('should omit propagation on queue addBulk', async () => {
const worker = new Worker(queueName, async () => 'some result', {
connection,
telemetry: telemetryClient,
});
await worker.waitUntilReady();

const jobs = [
{
name: 'job1',
data: { foo: 'bar' },
opts: { telemetry: { omitContext: true } },
},
];
const jobArray = await queue.addBulk(jobs);

await Promise.all(
jobArray.map(job =>
worker.processJob(job, 'some-token', () => false, new Set()),
),
);

expect(fromMetadataSpy.callCount).to.equal(0);
await worker.close();
});

it('should omit propagation on job scheduler', async () => {
const worker = new Worker(queueName, async () => 'some result', {
connection,
telemetry: telemetryClient,
});
await worker.waitUntilReady();

const jobSchedulerId = 'testJobScheduler';
const data = { foo: 'bar' };

const job = await queue.upsertJobScheduler(
jobSchedulerId,
{ every: 1000, endDate: Date.now() + 1000, limit: 1 },
{
name: 'repeatable-job',
data,
opts: { telemetry: { omitContext: true } },
},
);

await worker.processJob(job, 'some-token', () => false, new Set());

expect(fromMetadataSpy.callCount).to.equal(0);
await worker.close();
});

it('should omit propagation on flow producer', async () => {
const worker = new Worker(queueName, async () => 'some result', {
connection,
telemetry: telemetryClient,
});
await worker.waitUntilReady();

const flowProducer = new FlowProducer({
connection,
telemetry: telemetryClient,
});

const testFlow = {
name: 'parentJob',
queueName,
data: { foo: 'bar' },
children: [
{
name: 'childJob',
queueName,
data: { baz: 'qux' },
opts: { telemetry: { omitContext: true } },
},
],
opts: { telemetry: { omitContext: true } },
};

const jobNode = await flowProducer.add(testFlow);
const jobs = jobNode.children
? [jobNode.job, ...jobNode.children.map(c => c.job)]
: [jobNode.job];

await Promise.all(
jobs.map(job =>
worker.processJob(job, 'some-token', () => false, new Set()),
),
);

expect(fromMetadataSpy.callCount).to.equal(0);
await flowProducer.close();
await worker.close();
});
});
});