Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flows): add telemetry support #2879

Merged
merged 5 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 158 additions & 89 deletions src/classes/flow-producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ import {
IoredisListener,
QueueBaseOptions,
RedisClient,
Tracer,
ContextManager,
} from '../interfaces';
import { getParentKey, isRedisInstance } from '../utils';
import { getParentKey, isRedisInstance, trace } from '../utils';
import { Job } from './job';
import { KeysMap, QueueKeys } from './queue-keys';
import { RedisConnection } from './redis-connection';
import { SpanKind, TelemetryAttributes } from '../enums';

export interface AddNodeOpts {
multi: ChainableCommander;
Expand Down Expand Up @@ -95,6 +98,10 @@ export class FlowProducer extends EventEmitter {
queueKeys: QueueKeys;

protected connection: RedisConnection;
protected telemetry: {
tracer: Tracer | undefined;
contextManager: ContextManager | undefined;
};

constructor(
public opts: QueueBaseOptions = { connection: {} },
Expand Down Expand Up @@ -122,6 +129,10 @@ export class FlowProducer extends EventEmitter {
});

this.queueKeys = new QueueKeys(opts.prefix);

if (opts?.telemetry) {
this.telemetry = opts.telemetry;
}
}

emit<U extends keyof FlowProducerListener>(
Expand Down Expand Up @@ -196,19 +207,32 @@ export class FlowProducer extends EventEmitter {
? `${parentKey}:dependencies`
: undefined;

const jobsTree = this.addNode({
multi,
node: flow,
queuesOpts: opts?.queuesOptions,
parent: {
parentOpts,
parentDependenciesKey,
},
});
return trace<Promise<JobNode>>(
this.telemetry,
SpanKind.PRODUCER,
flow.queueName,
'addFlow',
flow.queueName,
async span => {
span?.setAttributes({
[TelemetryAttributes.FlowName]: flow.name,
});

const jobsTree = await this.addNode({
multi,
node: flow,
queuesOpts: opts?.queuesOptions,
parent: {
parentOpts,
parentDependenciesKey,
},
});

await multi.exec();
await multi.exec();

return jobsTree;
return jobsTree;
},
);
}

/**
Expand Down Expand Up @@ -255,11 +279,27 @@ export class FlowProducer extends EventEmitter {
const client = await this.connection.client;
const multi = client.multi();

const jobsTrees = this.addNodes(multi, flows);

await multi.exec();

return jobsTrees;
return trace<Promise<JobNode[]>>(
this.telemetry,
SpanKind.PRODUCER,
'',
'addBulkFlows',
'',
async span => {
span?.setAttributes({
[TelemetryAttributes.BulkCount]: flows.length,
[TelemetryAttributes.BulkNames]: flows
.map(flow => flow.name)
.join(','),
});

const jobsTrees = await this.addNodes(multi, flows);

await multi.exec();

return jobsTrees;
},
);
}

/**
Expand All @@ -273,70 +313,92 @@ export class FlowProducer extends EventEmitter {
* @param parent - parent data sent to children to create the "links" to their parent
* @returns
*/
protected addNode({ multi, node, parent, queuesOpts }: AddNodeOpts): JobNode {
protected async addNode({
multi,
node,
parent,
queuesOpts,
}: AddNodeOpts): Promise<JobNode> {
const prefix = node.prefix || this.opts.prefix;
const queue = this.queueFromNode(node, new QueueKeys(prefix), prefix);
const queueOpts = queuesOpts && queuesOpts[node.queueName];

const jobsOpts = queueOpts?.defaultJobOptions ?? {};
const jobId = node.opts?.jobId || v4();

const job = new this.Job(
queue,
return trace<Promise<JobNode>>(
this.telemetry,
SpanKind.PRODUCER,
node.name,
node.data,
{
...jobsOpts,
...node.opts,
parent: parent?.parentOpts,
},
jobId,
);

const parentKey = getParentKey(parent?.parentOpts);

if (node.children && node.children.length > 0) {
// Create parent job, will be a job in status "waiting-children".
const parentId = jobId;
const queueKeysParent = new QueueKeys(node.prefix || this.opts.prefix);
const waitChildrenKey = queueKeysParent.toKey(
node.queueName,
'waiting-children',
);

job.addJob(<Redis>(multi as unknown), {
parentDependenciesKey: parent?.parentDependenciesKey,
waitChildrenKey,
parentKey,
});

const parentDependenciesKey = `${queueKeysParent.toKey(
node.queueName,
parentId,
)}:dependencies`;

const children = this.addChildren({
multi,
nodes: node.children,
parent: {
parentOpts: {
id: parentId,
queue: queueKeysParent.getQueueQualifiedName(node.queueName),
'addNode',
node.queueName,
async (span, dstPropagationMetadata) => {
span?.setAttributes({
[TelemetryAttributes.JobName]: node.name,
[TelemetryAttributes.JobId]: jobId,
});

const job = new this.Job(
queue,
node.name,
node.data,
{
...jobsOpts,
...node.opts,
parent: parent?.parentOpts,
telemetryMetadata: dstPropagationMetadata,
},
parentDependenciesKey,
},
queuesOpts,
});

return { job, children };
} else {
job.addJob(<Redis>(multi as unknown), {
parentDependenciesKey: parent?.parentDependenciesKey,
parentKey,
});
jobId,
);

return { job };
}
const parentKey = getParentKey(parent?.parentOpts);

if (node.children && node.children.length > 0) {
// Create the parent job, it will be a job in status "waiting-children".
const parentId = jobId;
const queueKeysParent = new QueueKeys(
node.prefix || this.opts.prefix,
);
const waitChildrenKey = queueKeysParent.toKey(
node.queueName,
'waiting-children',
);

await job.addJob(<Redis>(multi as unknown), {
parentDependenciesKey: parent?.parentDependenciesKey,
waitChildrenKey,
parentKey,
});

const parentDependenciesKey = `${queueKeysParent.toKey(
node.queueName,
parentId,
)}:dependencies`;

const children = await this.addChildren({
multi,
nodes: node.children,
parent: {
parentOpts: {
id: parentId,
queue: queueKeysParent.getQueueQualifiedName(node.queueName),
},
parentDependenciesKey,
},
queuesOpts,
});

return { job, children };
} else {
await job.addJob(<Redis>(multi as unknown), {
parentDependenciesKey: parent?.parentDependenciesKey,
parentKey,
});

return { job };
}
},
);
}

/**
Expand All @@ -349,23 +411,28 @@ export class FlowProducer extends EventEmitter {
* @param nodes - the nodes representing jobs to be added to some queue
* @returns
*/
protected addNodes(multi: ChainableCommander, nodes: FlowJob[]): JobNode[] {
return nodes.map(node => {
const parentOpts = node?.opts?.parent;
const parentKey = getParentKey(parentOpts);
const parentDependenciesKey = parentKey
? `${parentKey}:dependencies`
: undefined;

return this.addNode({
multi,
node,
parent: {
parentOpts,
parentDependenciesKey,
},
});
});
protected addNodes(
multi: ChainableCommander,
nodes: FlowJob[],
): Promise<JobNode[]> {
return Promise.all(
nodes.map(node => {
const parentOpts = node?.opts?.parent;
const parentKey = getParentKey(parentOpts);
const parentDependenciesKey = parentKey
? `${parentKey}:dependencies`
: undefined;

return this.addNode({
multi,
node,
parent: {
parentOpts,
parentDependenciesKey,
},
});
}),
);
}

private async getNode(client: RedisClient, node: NodeOpts): Promise<JobNode> {
Expand Down Expand Up @@ -406,7 +473,9 @@ export class FlowProducer extends EventEmitter {
}

private addChildren({ multi, nodes, parent, queuesOpts }: AddChildrenOpts) {
return nodes.map(node => this.addNode({ multi, node, parent, queuesOpts }));
return Promise.all(
nodes.map(node => this.addNode({ multi, node, parent, queuesOpts })),
);
}

private getChildren(
Expand Down
Loading
Loading