Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[service bus] Updating to latest OTEL for Messaging #25492

Merged
merged 8 commits into from
Apr 19, 2023
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ export class BatchingReceiverLite {
"BatchingReceiverLite.process",
args,
() => messages,
toProcessingSpanOptions(messages, this, this._connectionContext.config)
toProcessingSpanOptions(messages, this, this._connectionContext.config, "process")
);
} finally {
this._closeHandler = undefined;
Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/src/core/streamingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ export class StreamingReceiver extends MessageReceiver {
"StreamReceiver.process",
operationOptions ?? {},
() => userHandlers.processMessage(message),
toProcessingSpanOptions(message, this, this._context.config)
toProcessingSpanOptions(message, this, this._context.config, "process")
);
} catch (err: any) {
this._messageHandlers().processError({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { ConnectionContext } from "../connectionContext";
import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
import { ServiceBusReceiver } from "../receivers/receiver";
import { ServiceBusMessage, ServiceBusReceivedMessage } from "../serviceBusMessage";
import { toSpanOptions, tracingClient } from "./tracing";
import { MessagingOperationNames, toSpanOptions, tracingClient } from "./tracing";

/**
* @internal
Expand All @@ -33,7 +33,8 @@ export function instrumentMessage<T extends InstrumentableMessage>(
message: T,
options: OperationOptionsBase,
entityPath: string,
host: string
host: string,
operation: MessagingOperationNames
): {
/**
* If instrumentation was done, a copy of the message with
Expand All @@ -60,7 +61,7 @@ export function instrumentMessage<T extends InstrumentableMessage>(
const { span: messageSpan, updatedOptions } = tracingClient.startSpan(
"message",
options,
toSpanOptions({ entityPath, host }, "producer")
toSpanOptions({ entityPath, host }, operation, "producer")
);

try {
Expand Down Expand Up @@ -136,7 +137,8 @@ function* getReceivedMessages(
export function toProcessingSpanOptions(
receivedMessages: ServiceBusReceivedMessage | ServiceBusReceivedMessage[],
receiver: Pick<ServiceBusReceiver, "entityPath">,
connectionConfig: Pick<ConnectionContext["config"], "host">
connectionConfig: Pick<ConnectionContext["config"], "host">,
operation: MessagingOperationNames
): TracingSpanOptions {
const spanLinks: TracingSpanLink[] = [];
for (const receivedMessage of getReceivedMessages(receivedMessages)) {
Expand All @@ -153,6 +155,6 @@ export function toProcessingSpanOptions(
return {
spanLinks,
spanKind: "consumer",
...toSpanOptions({ host: connectionConfig.host, entityPath: receiver.entityPath }),
...toSpanOptions({ host: connectionConfig.host, entityPath: receiver.entityPath }, operation),
};
}
17 changes: 15 additions & 2 deletions sdk/servicebus/service-bus/src/diagnostics/tracing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import { createTracingClient, TracingSpanOptions, TracingSpanKind } from "@azure
import { ConnectionConfig } from "@azure/core-amqp";
import { packageJsonInfo } from "../util/constants";

/**
* The names of the operations that can be instrumented.
*/
export type MessagingOperationNames = "publish" | "receive" | "process";

/**
* The {@link TracingClient} that is used to add tracing spans.
*/
Expand All @@ -22,12 +27,20 @@ export const tracingClient = createTracingClient({
*/
export function toSpanOptions(
serviceBusConfig: Pick<ConnectionConfig, "host"> & { entityPath: string },
operation: MessagingOperationNames,
spanKind?: TracingSpanKind
): TracingSpanOptions {
const propertyName =
operation === "process" || operation === "receive"
? "messaging.source.name"
: "messaging.destination.name";

const spanOptions: TracingSpanOptions = {
spanAttributes: {
"message_bus.destination": serviceBusConfig.entityPath,
"peer.address": serviceBusConfig.host,
"messaging.system": "servicebus",
[propertyName]: serviceBusConfig.entityPath,
"messaging.operation": operation,
"net.peer.name": serviceBusConfig.host,
},
};
if (spanKind) {
Expand Down
1 change: 1 addition & 0 deletions sdk/servicebus/service-bus/src/receivers/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,7 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
spanLinks,
...toSpanOptions(
{ entityPath: this.entityPath, host: this._context.config.host },
"receive",
"client"
),
}
Expand Down
8 changes: 4 additions & 4 deletions sdk/servicebus/service-bus/src/receivers/receiverCommon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ export function completeMessage(
}),
{
spanLinks,
...toSpanOptions({ entityPath, host: context.config.host }, "client"),
...toSpanOptions({ entityPath, host: context.config.host }, "receive", "client"),
mpodwysocki marked this conversation as resolved.
Show resolved Hide resolved
}
);
}
Expand Down Expand Up @@ -141,7 +141,7 @@ export function abandonMessage(
}),
{
spanLinks,
...toSpanOptions({ entityPath, host: context.config.host }, "client"),
...toSpanOptions({ entityPath, host: context.config.host }, "receive", "client"),
mpodwysocki marked this conversation as resolved.
Show resolved Hide resolved
}
);
}
Expand Down Expand Up @@ -174,7 +174,7 @@ export function deferMessage(
}),
{
spanLinks,
...toSpanOptions({ entityPath, host: context.config.host }, "client"),
...toSpanOptions({ entityPath, host: context.config.host }, "receive", "client"),
mpodwysocki marked this conversation as resolved.
Show resolved Hide resolved
}
);
}
Expand Down Expand Up @@ -229,7 +229,7 @@ export function deadLetterMessage(
),
{
spanLinks,
...toSpanOptions({ entityPath, host: context.config.host }, "client"),
...toSpanOptions({ entityPath, host: context.config.host }, "receive", "client"),
mpodwysocki marked this conversation as resolved.
Show resolved Hide resolved
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ export class ServiceBusSessionReceiverImpl implements ServiceBusSessionReceiver
"SessionReceiver.process",
options ?? {},
() => handlers.processMessage(message),
toProcessingSpanOptions(message, this, this._context.config)
toProcessingSpanOptions(message, this, this._context.config, "receive")
mpodwysocki marked this conversation as resolved.
Show resolved Hide resolved
);
},
processError,
Expand Down
5 changes: 4 additions & 1 deletion sdk/servicebus/service-bus/src/sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ export class ServiceBusSenderImpl implements ServiceBusSender {
originalMessage,
options ?? {},
this.entityPath,
this._context.config.host
this._context.config.host,
"publish"
);
const spanLinks: TracingSpanLink[] = spanContext ? [{ tracingContext: spanContext }] : [];
return tracingClient.withSpan(
Expand All @@ -230,6 +231,7 @@ export class ServiceBusSenderImpl implements ServiceBusSender {
spanLinks,
...toSpanOptions(
{ entityPath: this.entityPath, host: this._context.config.host },
"publish",
"client"
),
}
Expand Down Expand Up @@ -269,6 +271,7 @@ export class ServiceBusSenderImpl implements ServiceBusSender {
spanLinks,
...toSpanOptions(
{ entityPath: this.entityPath, host: this._context.config.host },
"publish",
"client"
),
}
Expand Down
3 changes: 2 additions & 1 deletion sdk/servicebus/service-bus/src/serviceBusMessageBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ export class ServiceBusMessageBatchImpl implements ServiceBusMessageBatch {
originalMessage,
options,
this._context.config.entityPath!,
this._context.config.host
this._context.config.host,
"process"
mpodwysocki marked this conversation as resolved.
Show resolved Hide resolved
);

// Convert ServiceBusMessage to AmqpMessage.
Expand Down
27 changes: 19 additions & 8 deletions sdk/servicebus/service-bus/test/internal/unit/tracing.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { ServiceBusReceivedMessage } from "../../../src/serviceBusMessage";
describe("tracing", () => {
describe("#getAdditionalSpanOptions", () => {
it("returns the initial set of attributes", () => {
assert.deepEqual(toSpanOptions({ entityPath: "testPath", host: "testHost" }), {
assert.deepEqual(toSpanOptions({ entityPath: "testPath", host: "testHost" }, "receive"), {
spanAttributes: {
"message_bus.destination": "testPath",
"peer.address": "testHost",
Expand All @@ -32,7 +32,7 @@ describe("tracing", () => {
it("sets the spanKind if provided", () => {
const expectedSpanKind = "client";
assert.equal(
toSpanOptions({ entityPath: "", host: "" }, expectedSpanKind).spanKind,
toSpanOptions({ entityPath: "", host: "" }, "receive", expectedSpanKind).spanKind,
expectedSpanKind
);
});
Expand All @@ -55,7 +55,8 @@ describe("tracing", () => {
instrumentedMessage,
{},
"testPath",
"testHost"
"testHost",
"receive"
);
assert.notExists(spanContext);
assert.equal(message.applicationProperties?.[TRACEPARENT_PROPERTY], "exists");
Expand All @@ -75,7 +76,8 @@ describe("tracing", () => {
{ body: "", applicationProperties: undefined },
{},
"testPath",
"testHost"
"testHost",
"receive"
);
assert.notExists(spanContext); // was not instrumented
assert.notExists(message.applicationProperties?.[TRACEPARENT_PROPERTY]);
Expand All @@ -100,7 +102,8 @@ describe("tracing", () => {
{ body: "test", applicationProperties: undefined },
{},
"testPath",
"testHost"
"testHost",
"receive"
);

assert.equal(
Expand All @@ -119,7 +122,8 @@ describe("tracing", () => {
},
{
host: "testHost",
}
},
"receive"
);
assert.equal(processingSpanOptions.spanKind, "consumer");
assert.deepEqual(processingSpanOptions.spanAttributes, {
Expand Down Expand Up @@ -153,7 +157,8 @@ describe("tracing", () => {
},
{
host: "testHost",
}
},
"receive"
);

assert.lengthOf(processingSpanOptions.spanLinks!, 1);
Expand All @@ -172,7 +177,13 @@ describe("tracing", () => {
},
};

const { message, spanContext } = instrumentMessage(alreadyInstrumentedMessage, {}, "", "");
const { message, spanContext } = instrumentMessage(
alreadyInstrumentedMessage,
{},
"",
"",
"receive"
);

assert.equal(
message,
Expand Down