Skip to content

Commit

Permalink
[service bus] Updating to latest OTEL for Messaging (#25492)
Browse files Browse the repository at this point in the history
### Packages impacted by this PR

- @Azure/service-bus

### Issues associated with this PR


### Describe the problem that is addressed by this PR

Updating to latest [OTEL messaging
standards](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#messaging-attributes).
This is also explained in a gist from @lmolkova
[here](https://gist.github.com/lmolkova/e4215c0f44a49ef824983382762e6b92)



### What are the possible designs available to address the problem? If
there are more than one possible design, why was the one in this PR
chosen?


### Are there test cases added in this PR? _(If not, why?)_


### Provide a list of related PRs _(if any)_


### Command used to generate this PR:**_(Applicable only to SDK release
request PRs)_

### Checklists
- [x] Added impacted package name to the issue description
- [ ] Does this PR needs any fixes in the SDK Generator?** _(If so,
create an Issue in the
[Autorest/typescript](https://github.com/Azure/autorest.typescript)
repository and link it here)_
- [ ] Added a changelog (if necessary)

---------

Co-authored-by: Liudmila Molkova <[email protected]>
  • Loading branch information
mpodwysocki and lmolkova authored Apr 19, 2023
1 parent 461994e commit 0b08a4f
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 28 deletions.
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ export class BatchingReceiverLite {
"BatchingReceiverLite.process",
args,
() => messages,
toProcessingSpanOptions(messages, this, this._connectionContext.config)
toProcessingSpanOptions(messages, this, this._connectionContext.config, "process")
);
} finally {
this._closeHandler = undefined;
Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/src/core/streamingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ export class StreamingReceiver extends MessageReceiver {
"StreamReceiver.process",
operationOptions ?? {},
() => userHandlers.processMessage(message),
toProcessingSpanOptions(message, this, this._context.config)
toProcessingSpanOptions(message, this, this._context.config, "process")
);
} catch (err: any) {
this._messageHandlers().processError({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { ConnectionContext } from "../connectionContext";
import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
import { ServiceBusReceiver } from "../receivers/receiver";
import { ServiceBusMessage, ServiceBusReceivedMessage } from "../serviceBusMessage";
import { toSpanOptions, tracingClient } from "./tracing";
import { MessagingOperationNames, toSpanOptions, tracingClient } from "./tracing";

/**
* @internal
Expand All @@ -33,7 +33,8 @@ export function instrumentMessage<T extends InstrumentableMessage>(
message: T,
options: OperationOptionsBase,
entityPath: string,
host: string
host: string,
operation: MessagingOperationNames
): {
/**
* If instrumentation was done, a copy of the message with
Expand All @@ -60,7 +61,7 @@ export function instrumentMessage<T extends InstrumentableMessage>(
const { span: messageSpan, updatedOptions } = tracingClient.startSpan(
"message",
options,
toSpanOptions({ entityPath, host }, "producer")
toSpanOptions({ entityPath, host }, operation, "producer")
);

try {
Expand Down Expand Up @@ -136,7 +137,8 @@ function* getReceivedMessages(
export function toProcessingSpanOptions(
receivedMessages: ServiceBusReceivedMessage | ServiceBusReceivedMessage[],
receiver: Pick<ServiceBusReceiver, "entityPath">,
connectionConfig: Pick<ConnectionContext["config"], "host">
connectionConfig: Pick<ConnectionContext["config"], "host">,
operation: MessagingOperationNames
): TracingSpanOptions {
const spanLinks: TracingSpanLink[] = [];
for (const receivedMessage of getReceivedMessages(receivedMessages)) {
Expand All @@ -153,6 +155,6 @@ export function toProcessingSpanOptions(
return {
spanLinks,
spanKind: "consumer",
...toSpanOptions({ host: connectionConfig.host, entityPath: receiver.entityPath }),
...toSpanOptions({ host: connectionConfig.host, entityPath: receiver.entityPath }, operation),
};
}
17 changes: 15 additions & 2 deletions sdk/servicebus/service-bus/src/diagnostics/tracing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import { createTracingClient, TracingSpanOptions, TracingSpanKind } from "@azure
import { ConnectionConfig } from "@azure/core-amqp";
import { packageJsonInfo } from "../util/constants";

/**
* The names of the operations that can be instrumented.
*/
export type MessagingOperationNames = "publish" | "receive" | "process" | "settle";

/**
* The {@link TracingClient} that is used to add tracing spans.
*/
Expand All @@ -22,12 +27,20 @@ export const tracingClient = createTracingClient({
*/
export function toSpanOptions(
serviceBusConfig: Pick<ConnectionConfig, "host"> & { entityPath: string },
operation: MessagingOperationNames,
spanKind?: TracingSpanKind
): TracingSpanOptions {
const propertyName =
operation === "process" || operation === "receive"
? "messaging.source.name"
: "messaging.destination.name";

const spanOptions: TracingSpanOptions = {
spanAttributes: {
"message_bus.destination": serviceBusConfig.entityPath,
"peer.address": serviceBusConfig.host,
"messaging.system": "servicebus",
[propertyName]: serviceBusConfig.entityPath,
"messaging.operation": operation,
"net.peer.name": serviceBusConfig.host,
},
};
if (spanKind) {
Expand Down
1 change: 1 addition & 0 deletions sdk/servicebus/service-bus/src/receivers/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,7 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
spanLinks,
...toSpanOptions(
{ entityPath: this.entityPath, host: this._context.config.host },
"receive",
"client"
),
}
Expand Down
8 changes: 4 additions & 4 deletions sdk/servicebus/service-bus/src/receivers/receiverCommon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ export function completeMessage(
}),
{
spanLinks,
...toSpanOptions({ entityPath, host: context.config.host }, "client"),
...toSpanOptions({ entityPath, host: context.config.host }, "settle", "client"),
}
);
}
Expand Down Expand Up @@ -141,7 +141,7 @@ export function abandonMessage(
}),
{
spanLinks,
...toSpanOptions({ entityPath, host: context.config.host }, "client"),
...toSpanOptions({ entityPath, host: context.config.host }, "settle", "client"),
}
);
}
Expand Down Expand Up @@ -174,7 +174,7 @@ export function deferMessage(
}),
{
spanLinks,
...toSpanOptions({ entityPath, host: context.config.host }, "client"),
...toSpanOptions({ entityPath, host: context.config.host }, "settle", "client"),
}
);
}
Expand Down Expand Up @@ -229,7 +229,7 @@ export function deadLetterMessage(
),
{
spanLinks,
...toSpanOptions({ entityPath, host: context.config.host }, "client"),
...toSpanOptions({ entityPath, host: context.config.host }, "settle", "client"),
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ export class ServiceBusSessionReceiverImpl implements ServiceBusSessionReceiver
"SessionReceiver.process",
options ?? {},
() => handlers.processMessage(message),
toProcessingSpanOptions(message, this, this._context.config)
toProcessingSpanOptions(message, this, this._context.config, "process")
);
},
processError,
Expand Down
5 changes: 4 additions & 1 deletion sdk/servicebus/service-bus/src/sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ export class ServiceBusSenderImpl implements ServiceBusSender {
originalMessage,
options ?? {},
this.entityPath,
this._context.config.host
this._context.config.host,
"publish"
);
const spanLinks: TracingSpanLink[] = spanContext ? [{ tracingContext: spanContext }] : [];
return tracingClient.withSpan(
Expand All @@ -230,6 +231,7 @@ export class ServiceBusSenderImpl implements ServiceBusSender {
spanLinks,
...toSpanOptions(
{ entityPath: this.entityPath, host: this._context.config.host },
"publish",
"client"
),
}
Expand Down Expand Up @@ -269,6 +271,7 @@ export class ServiceBusSenderImpl implements ServiceBusSender {
spanLinks,
...toSpanOptions(
{ entityPath: this.entityPath, host: this._context.config.host },
"publish",
"client"
),
}
Expand Down
3 changes: 2 additions & 1 deletion sdk/servicebus/service-bus/src/serviceBusMessageBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ export class ServiceBusMessageBatchImpl implements ServiceBusMessageBatch {
originalMessage,
options,
this._context.config.entityPath!,
this._context.config.host
this._context.config.host,
"publish"
);

// Convert ServiceBusMessage to AmqpMessage.
Expand Down
39 changes: 27 additions & 12 deletions sdk/servicebus/service-bus/test/internal/unit/tracing.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@ import { ServiceBusReceivedMessage } from "../../../src/serviceBusMessage";
describe("tracing", () => {
describe("#getAdditionalSpanOptions", () => {
it("returns the initial set of attributes", () => {
assert.deepEqual(toSpanOptions({ entityPath: "testPath", host: "testHost" }), {
assert.deepEqual(toSpanOptions({ entityPath: "testPath", host: "testHost" }, "receive"), {
spanAttributes: {
"message_bus.destination": "testPath",
"peer.address": "testHost",
"messaging.operation": "receive",
"messaging.source.name": "testPath",
"messaging.system": "servicebus",
"net.peer.name": "testHost",
},
});
});

it("sets the spanKind if provided", () => {
const expectedSpanKind = "client";
assert.equal(
toSpanOptions({ entityPath: "", host: "" }, expectedSpanKind).spanKind,
toSpanOptions({ entityPath: "", host: "" }, "receive", expectedSpanKind).spanKind,
expectedSpanKind
);
});
Expand All @@ -55,7 +57,8 @@ describe("tracing", () => {
instrumentedMessage,
{},
"testPath",
"testHost"
"testHost",
"receive"
);
assert.notExists(spanContext);
assert.equal(message.applicationProperties?.[TRACEPARENT_PROPERTY], "exists");
Expand All @@ -75,7 +78,8 @@ describe("tracing", () => {
{ body: "", applicationProperties: undefined },
{},
"testPath",
"testHost"
"testHost",
"receive"
);
assert.notExists(spanContext); // was not instrumented
assert.notExists(message.applicationProperties?.[TRACEPARENT_PROPERTY]);
Expand All @@ -100,7 +104,8 @@ describe("tracing", () => {
{ body: "test", applicationProperties: undefined },
{},
"testPath",
"testHost"
"testHost",
"receive"
);

assert.equal(
Expand All @@ -119,12 +124,15 @@ describe("tracing", () => {
},
{
host: "testHost",
}
},
"receive"
);
assert.equal(processingSpanOptions.spanKind, "consumer");
assert.deepEqual(processingSpanOptions.spanAttributes, {
"message_bus.destination": "testPath",
"peer.address": "testHost",
"messaging.operation": "receive",
"messaging.source.name": "testPath",
"messaging.system": "servicebus",
"net.peer.name": "testHost",
});
});

Expand Down Expand Up @@ -153,7 +161,8 @@ describe("tracing", () => {
},
{
host: "testHost",
}
},
"receive"
);

assert.lengthOf(processingSpanOptions.spanLinks!, 1);
Expand All @@ -172,7 +181,13 @@ describe("tracing", () => {
},
};

const { message, spanContext } = instrumentMessage(alreadyInstrumentedMessage, {}, "", "");
const { message, spanContext } = instrumentMessage(
alreadyInstrumentedMessage,
{},
"",
"",
"receive"
);

assert.equal(
message,
Expand Down

0 comments on commit 0b08a4f

Please sign in to comment.