diff --git a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts index 20934458400b..c9b28011a4f8 100644 --- a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts @@ -306,7 +306,7 @@ export class BatchingReceiverLite { "BatchingReceiverLite.process", args, () => messages, - toProcessingSpanOptions(messages, this, this._connectionContext.config) + toProcessingSpanOptions(messages, this, this._connectionContext.config, "process") ); } finally { this._closeHandler = undefined; diff --git a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts index 9affb6cc0112..4f22a6b68e64 100644 --- a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts @@ -503,7 +503,7 @@ export class StreamingReceiver extends MessageReceiver { "StreamReceiver.process", operationOptions ?? {}, () => userHandlers.processMessage(message), - toProcessingSpanOptions(message, this, this._context.config) + toProcessingSpanOptions(message, this, this._context.config, "process") ); } catch (err: any) { this._messageHandlers().processError({ diff --git a/sdk/servicebus/service-bus/src/diagnostics/instrumentServiceBusMessage.ts b/sdk/servicebus/service-bus/src/diagnostics/instrumentServiceBusMessage.ts index 17ff620215cd..4c82bbec26a5 100644 --- a/sdk/servicebus/service-bus/src/diagnostics/instrumentServiceBusMessage.ts +++ b/sdk/servicebus/service-bus/src/diagnostics/instrumentServiceBusMessage.ts @@ -6,7 +6,7 @@ import { ConnectionContext } from "../connectionContext"; import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs"; import { ServiceBusReceiver } from "../receivers/receiver"; import { ServiceBusMessage, ServiceBusReceivedMessage } from "../serviceBusMessage"; -import { toSpanOptions, tracingClient } from "./tracing"; +import { MessagingOperationNames, toSpanOptions, tracingClient } from "./tracing"; /** * @internal @@ -33,7 +33,8 @@ export function instrumentMessage( message: T, options: OperationOptionsBase, entityPath: string, - host: string + host: string, + operation: MessagingOperationNames ): { /** * If instrumentation was done, a copy of the message with @@ -60,7 +61,7 @@ export function instrumentMessage( const { span: messageSpan, updatedOptions } = tracingClient.startSpan( "message", options, - toSpanOptions({ entityPath, host }, "producer") + toSpanOptions({ entityPath, host }, operation, "producer") ); try { @@ -136,7 +137,8 @@ function* getReceivedMessages( export function toProcessingSpanOptions( receivedMessages: ServiceBusReceivedMessage | ServiceBusReceivedMessage[], receiver: Pick, - connectionConfig: Pick + connectionConfig: Pick, + operation: MessagingOperationNames ): TracingSpanOptions { const spanLinks: TracingSpanLink[] = []; for (const receivedMessage of getReceivedMessages(receivedMessages)) { @@ -153,6 +155,6 @@ export function toProcessingSpanOptions( return { spanLinks, spanKind: "consumer", - ...toSpanOptions({ host: connectionConfig.host, entityPath: receiver.entityPath }), + ...toSpanOptions({ host: connectionConfig.host, entityPath: receiver.entityPath }, operation), }; } diff --git a/sdk/servicebus/service-bus/src/diagnostics/tracing.ts b/sdk/servicebus/service-bus/src/diagnostics/tracing.ts index e13582e14b1b..c19efc5e1690 100644 --- a/sdk/servicebus/service-bus/src/diagnostics/tracing.ts +++ b/sdk/servicebus/service-bus/src/diagnostics/tracing.ts @@ -5,6 +5,11 @@ import { createTracingClient, TracingSpanOptions, TracingSpanKind } from "@azure import { ConnectionConfig } from "@azure/core-amqp"; import { packageJsonInfo } from "../util/constants"; +/** + * The names of the operations that can be instrumented. + */ +export type MessagingOperationNames = "publish" | "receive" | "process" | "settle"; + /** * The {@link TracingClient} that is used to add tracing spans. */ @@ -22,12 +27,20 @@ export const tracingClient = createTracingClient({ */ export function toSpanOptions( serviceBusConfig: Pick & { entityPath: string }, + operation: MessagingOperationNames, spanKind?: TracingSpanKind ): TracingSpanOptions { + const propertyName = + operation === "process" || operation === "receive" + ? "messaging.source.name" + : "messaging.destination.name"; + const spanOptions: TracingSpanOptions = { spanAttributes: { - "message_bus.destination": serviceBusConfig.entityPath, - "peer.address": serviceBusConfig.host, + "messaging.system": "servicebus", + [propertyName]: serviceBusConfig.entityPath, + "messaging.operation": operation, + "net.peer.name": serviceBusConfig.host, }, }; if (spanKind) { diff --git a/sdk/servicebus/service-bus/src/receivers/receiver.ts b/sdk/servicebus/service-bus/src/receivers/receiver.ts index c3445d47775f..92ea5dd62b64 100644 --- a/sdk/servicebus/service-bus/src/receivers/receiver.ts +++ b/sdk/servicebus/service-bus/src/receivers/receiver.ts @@ -633,6 +633,7 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver { spanLinks, ...toSpanOptions( { entityPath: this.entityPath, host: this._context.config.host }, + "receive", "client" ), } diff --git a/sdk/servicebus/service-bus/src/receivers/receiverCommon.ts b/sdk/servicebus/service-bus/src/receivers/receiverCommon.ts index ded6113e11e6..2e62ccd3e762 100644 --- a/sdk/servicebus/service-bus/src/receivers/receiverCommon.ts +++ b/sdk/servicebus/service-bus/src/receivers/receiverCommon.ts @@ -108,7 +108,7 @@ export function completeMessage( }), { spanLinks, - ...toSpanOptions({ entityPath, host: context.config.host }, "client"), + ...toSpanOptions({ entityPath, host: context.config.host }, "settle", "client"), } ); } @@ -141,7 +141,7 @@ export function abandonMessage( }), { spanLinks, - ...toSpanOptions({ entityPath, host: context.config.host }, "client"), + ...toSpanOptions({ entityPath, host: context.config.host }, "settle", "client"), } ); } @@ -174,7 +174,7 @@ export function deferMessage( }), { spanLinks, - ...toSpanOptions({ entityPath, host: context.config.host }, "client"), + ...toSpanOptions({ entityPath, host: context.config.host }, "settle", "client"), } ); } @@ -229,7 +229,7 @@ export function deadLetterMessage( ), { spanLinks, - ...toSpanOptions({ entityPath, host: context.config.host }, "client"), + ...toSpanOptions({ entityPath, host: context.config.host }, "settle", "client"), } ); } diff --git a/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts b/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts index b8ad45933ed5..52205127a846 100644 --- a/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts +++ b/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts @@ -467,7 +467,7 @@ export class ServiceBusSessionReceiverImpl implements ServiceBusSessionReceiver "SessionReceiver.process", options ?? {}, () => handlers.processMessage(message), - toProcessingSpanOptions(message, this, this._context.config) + toProcessingSpanOptions(message, this, this._context.config, "process") ); }, processError, diff --git a/sdk/servicebus/service-bus/src/sender.ts b/sdk/servicebus/service-bus/src/sender.ts index 6d8129e216ae..30eda083280b 100644 --- a/sdk/servicebus/service-bus/src/sender.ts +++ b/sdk/servicebus/service-bus/src/sender.ts @@ -219,7 +219,8 @@ export class ServiceBusSenderImpl implements ServiceBusSender { originalMessage, options ?? {}, this.entityPath, - this._context.config.host + this._context.config.host, + "publish" ); const spanLinks: TracingSpanLink[] = spanContext ? [{ tracingContext: spanContext }] : []; return tracingClient.withSpan( @@ -230,6 +231,7 @@ export class ServiceBusSenderImpl implements ServiceBusSender { spanLinks, ...toSpanOptions( { entityPath: this.entityPath, host: this._context.config.host }, + "publish", "client" ), } @@ -269,6 +271,7 @@ export class ServiceBusSenderImpl implements ServiceBusSender { spanLinks, ...toSpanOptions( { entityPath: this.entityPath, host: this._context.config.host }, + "publish", "client" ), } diff --git a/sdk/servicebus/service-bus/src/serviceBusMessageBatch.ts b/sdk/servicebus/service-bus/src/serviceBusMessageBatch.ts index 23f62d0d1249..df0b64140153 100644 --- a/sdk/servicebus/service-bus/src/serviceBusMessageBatch.ts +++ b/sdk/servicebus/service-bus/src/serviceBusMessageBatch.ts @@ -244,7 +244,8 @@ export class ServiceBusMessageBatchImpl implements ServiceBusMessageBatch { originalMessage, options, this._context.config.entityPath!, - this._context.config.host + this._context.config.host, + "publish" ); // Convert ServiceBusMessage to AmqpMessage. diff --git a/sdk/servicebus/service-bus/test/internal/unit/tracing.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/tracing.spec.ts index 7ab70fa952d8..0e0628c621a4 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/tracing.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/tracing.spec.ts @@ -21,10 +21,12 @@ import { ServiceBusReceivedMessage } from "../../../src/serviceBusMessage"; describe("tracing", () => { describe("#getAdditionalSpanOptions", () => { it("returns the initial set of attributes", () => { - assert.deepEqual(toSpanOptions({ entityPath: "testPath", host: "testHost" }), { + assert.deepEqual(toSpanOptions({ entityPath: "testPath", host: "testHost" }, "receive"), { spanAttributes: { - "message_bus.destination": "testPath", - "peer.address": "testHost", + "messaging.operation": "receive", + "messaging.source.name": "testPath", + "messaging.system": "servicebus", + "net.peer.name": "testHost", }, }); }); @@ -32,7 +34,7 @@ describe("tracing", () => { it("sets the spanKind if provided", () => { const expectedSpanKind = "client"; assert.equal( - toSpanOptions({ entityPath: "", host: "" }, expectedSpanKind).spanKind, + toSpanOptions({ entityPath: "", host: "" }, "receive", expectedSpanKind).spanKind, expectedSpanKind ); }); @@ -55,7 +57,8 @@ describe("tracing", () => { instrumentedMessage, {}, "testPath", - "testHost" + "testHost", + "receive" ); assert.notExists(spanContext); assert.equal(message.applicationProperties?.[TRACEPARENT_PROPERTY], "exists"); @@ -75,7 +78,8 @@ describe("tracing", () => { { body: "", applicationProperties: undefined }, {}, "testPath", - "testHost" + "testHost", + "receive" ); assert.notExists(spanContext); // was not instrumented assert.notExists(message.applicationProperties?.[TRACEPARENT_PROPERTY]); @@ -100,7 +104,8 @@ describe("tracing", () => { { body: "test", applicationProperties: undefined }, {}, "testPath", - "testHost" + "testHost", + "receive" ); assert.equal( @@ -119,12 +124,15 @@ describe("tracing", () => { }, { host: "testHost", - } + }, + "receive" ); assert.equal(processingSpanOptions.spanKind, "consumer"); assert.deepEqual(processingSpanOptions.spanAttributes, { - "message_bus.destination": "testPath", - "peer.address": "testHost", + "messaging.operation": "receive", + "messaging.source.name": "testPath", + "messaging.system": "servicebus", + "net.peer.name": "testHost", }); }); @@ -153,7 +161,8 @@ describe("tracing", () => { }, { host: "testHost", - } + }, + "receive" ); assert.lengthOf(processingSpanOptions.spanLinks!, 1); @@ -172,7 +181,13 @@ describe("tracing", () => { }, }; - const { message, spanContext } = instrumentMessage(alreadyInstrumentedMessage, {}, "", ""); + const { message, spanContext } = instrumentMessage( + alreadyInstrumentedMessage, + {}, + "", + "", + "receive" + ); assert.equal( message,