Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Service Bus] Tracing for send API #11651

Merged
merged 25 commits into from
Oct 8, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
a8c1d5d
tracing for trySend
HarshaNalluru Oct 5, 2020
a73fd31
tracing for send API
HarshaNalluru Oct 5, 2020
e46e805
API Report
HarshaNalluru Oct 5, 2020
4af1738
Pass tracing in tryAdd for array of messages, fx tryAdd, added new Pa…
HarshaNalluru Oct 6, 2020
c7d3a14
extractSpanContextFromServiceBusMessage tests
HarshaNalluru Oct 6, 2020
b5f77a8
test utils setTracerForTest
HarshaNalluru Oct 6, 2020
cd71e42
tracing tests
HarshaNalluru Oct 6, 2020
d631ee5
messageSpan tests
HarshaNalluru Oct 6, 2020
8375ed8
API Report
HarshaNalluru Oct 6, 2020
8038315
inline TryAddOptions
HarshaNalluru Oct 6, 2020
6fa23f7
API Report
HarshaNalluru Oct 6, 2020
a8b34fc
FIX ServiceBusMessage validations tests
HarshaNalluru Oct 6, 2020
d04a38e
changelog
HarshaNalluru Oct 6, 2020
9742d97
Update sdk/servicebus/service-bus/src/modelsToBeSharedWithEventHubs.ts
HarshaNalluru Oct 6, 2020
f262e45
sender link close - cleaning up new link - bug fix
HarshaNalluru Oct 6, 2020
ba4fef7
Merge branch 'harshan/sb/issue/tracing-send' of https://github.com/Ha…
HarshaNalluru Oct 6, 2020
e46deb4
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Oct 6, 2020
fea2177
let -> const
HarshaNalluru Oct 6, 2020
5c5795f
createSendSpan -> models shared with event-hubs
HarshaNalluru Oct 6, 2020
6b2409a
Changelog
HarshaNalluru Oct 7, 2020
7de9b6f
revert f262e4 and move onDetached to before calling refreshConnection
HarshaNalluru Oct 8, 2020
d1af28e
remove linentity - calling close during intialization
HarshaNalluru Oct 8, 2020
060842e
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Oct 8, 2020
3197fbc
skip the flaky test
HarshaNalluru Oct 8, 2020
84e48aa
Trigger pipeline(dummy commit)
HarshaNalluru Oct 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion sdk/servicebus/service-bus/review/service-bus.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import { PageSettings } from '@azure/core-paging';
import { PipelineOptions } from '@azure/core-http';
import { RetryOptions } from '@azure/core-amqp';
import { ServiceClient } from '@azure/core-http';
import { Span } from '@opentelemetry/api';
import { SpanContext } from '@opentelemetry/api';
import { TokenCredential } from '@azure/core-amqp';
import { TokenType } from '@azure/core-amqp';
import { UserAgentOptions } from '@azure/core-http';
Expand Down Expand Up @@ -226,6 +228,11 @@ export { OperationOptions }
// @public
export type OperationOptionsBase = Pick<OperationOptions, "abortSignal" | "tracingOptions">;

// @public
export interface ParentSpanOptions {
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved
parentSpan?: Span | SpanContext | null;
}

// @public
export interface PeekMessagesOptions extends OperationOptionsBase {
fromSequenceNumber?: Long;
Expand Down Expand Up @@ -395,8 +402,10 @@ export interface ServiceBusMessageBatch {
// @internal
_generateMessage(): Buffer;
readonly maxSizeInBytes: number;
// @internal
readonly _messageSpanContexts: SpanContext[];
readonly sizeInBytes: number;
tryAdd(message: ServiceBusMessage): boolean;
tryAdd(message: ServiceBusMessage, options?: TryAddOptions): boolean;
}

// @public
Expand Down Expand Up @@ -577,6 +586,10 @@ export interface TopicRuntimeProperties {
export interface TopicRuntimePropertiesResponse extends TopicRuntimeProperties, Response {
}

// @public
export interface TryAddOptions extends ParentSpanOptions {
}

export { WebSocketImpl }

export { WebSocketOptions }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { extractSpanContextFromTraceParentHeader, getTraceParentHeader } from "@azure/core-tracing";
import { Span, SpanContext } from "@opentelemetry/api";
import { ServiceBusMessage } from "../serviceBusMessage";

/**
* @ignore
*/
export const TRACEPARENT_PROPERTY = "Diagnostic-Id";

/**
* Populates the `ServiceBusMessage` with `SpanContext` info to support trace propagation.
* Creates and returns a copy of the passed in `ServiceBusMessage` unless the `ServiceBusMessage`
* has already been instrumented.
* @param message The `ServiceBusMessage` to instrument.
* @param span The `Span` containing the context to propagate tracing information.
* @ignore
* @internal
*/
export function instrumentServiceBusMessage(
message: ServiceBusMessage,
span: Span
): ServiceBusMessage {
if (message.properties && message.properties[TRACEPARENT_PROPERTY]) {
return message;
}

// create a copy so the original isn't modified
message = { ...message, properties: { ...message.properties } };

const traceParent = getTraceParentHeader(span.context());
if (traceParent) {
message.properties![TRACEPARENT_PROPERTY] = traceParent;
}

return message;
}

/**
* Extracts the `SpanContext` from an `ServiceBusMessage` if the context exists.
* @param message An individual `ServiceBusMessage` object.
* @internal
* @ignore
*/
export function extractSpanContextFromServiceBusMessage(
message: ServiceBusMessage
): SpanContext | undefined {
if (!message.properties || !message.properties[TRACEPARENT_PROPERTY]) {
return;
}

const diagnosticId = message.properties[TRACEPARENT_PROPERTY] as string;
return extractSpanContextFromTraceParentHeader(diagnosticId);
}
27 changes: 27 additions & 0 deletions sdk/servicebus/service-bus/src/diagnostics/messageSpan.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { ConnectionConfig } from "@azure/core-amqp";
import { getTracer } from "@azure/core-tracing";
import { Span, SpanContext, SpanKind } from "@opentelemetry/api";

/**
* @internal
* @ignore
*/
export function createMessageSpan(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feedback not for this PR, but a note for both Event Hubs & Service Bus for which we can log a generic issue on improvements and make changes for both packages at once in the future:

  • The name createMessageSpan() is slightly misleading as it may indicate that I can use it at any time when a message is involved. In reality, this method is tied to the "producer" kind, so is usable only when sending message.
  • Also, jsdoc for this method would be helpful
  • Why is config optional here? I would imagine that we would always have a ConnectionConfig

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes to the first two!

Why is config optional here? I would imagine that we would always have a ConnectionConfig

parentSpan is not a required parameter but feels primary, I believe that was the reason Chris made the config optional in event-hubs. I can make it the first param in both service-bus and event-hubs and make it required in a later PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logged #11687

parentSpan?: Span | SpanContext | null,
config?: Pick<ConnectionConfig, "entityPath" | "host">
): Span {
const tracer = getTracer();
const span = tracer.startSpan("Azure.ServiceBus.message", {
kind: SpanKind.PRODUCER,
parent: parentSpan
});
span.setAttribute("az.namespace", "Microsoft.ServiceBus");
if (config) {
span.setAttribute("message_bus.destination", config.entityPath);
span.setAttribute("peer.address", config.host);
}
return span;
}
6 changes: 5 additions & 1 deletion sdk/servicebus/service-bus/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ export {
SubQueue,
SubscribeOptions
} from "./models";
export { OperationOptionsBase } from "./modelsToBeSharedWithEventHubs";
export {
OperationOptionsBase,
TryAddOptions,
ParentSpanOptions
} from "./modelsToBeSharedWithEventHubs";
export { ServiceBusReceiver } from "./receivers/receiver";
export { ServiceBusSessionReceiver } from "./receivers/sessionReceiver";
export { ServiceBusSender } from "./sender";
Expand Down
20 changes: 18 additions & 2 deletions sdk/servicebus/service-bus/src/modelsToBeSharedWithEventHubs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import { Span, SpanContext } from "@opentelemetry/api";
import { OperationOptions } from "@azure/core-http";
import { OperationTracingOptions } from "@azure/core-tracing";

/**
* NOTE: This type is intended to mirror the relevant fields and structure from @azure/core-http OperationOptions
Expand All @@ -18,7 +19,22 @@ export type OperationOptionsBase = Pick<OperationOptions, "abortSignal" | "traci
* @ignore
*/
export function getParentSpan(
options: Pick<OperationOptionsBase, "tracingOptions">
options?: OperationTracingOptions
): Span | SpanContext | null | undefined {
return options.tracingOptions?.spanOptions?.parent;
return options?.spanOptions?.parent;
}

/**
* The set of options to manually propagate `Span` context for distributed tracing.
* - `parentSpan` : The `Span` or `SpanContext` for the operation to use as a `parent` when creating its own span.
HarshaNalluru marked this conversation as resolved.
Show resolved Hide resolved
*/
export interface ParentSpanOptions {
/**
* The `Span` or `SpanContext` to use as the `parent` of any spans created while calling operations that make a request to the service.
*/
parentSpan?: Span | SpanContext | null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed in Event Hubs that null is not an option for parentSpan. Why do we have this here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are other places where we allow the span to be "null" which is a valid value and it signifies the rootSpan.

Moreover, I needed to allow this "null" since I'm calling getParentSpan() for the tryAdd over array of messages which may return a null value.

In event-hubs, this tracing for "array of messages" case is duplicated.
"null" is a valid value and event-hubs should also allow this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll log an issue for fixing it in event-hubs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logged #11687

}
/**
* Options to configure the behavior of the `tryAdd` method on the `ServiceBusMessageBatch` class.
*/
export interface TryAddOptions extends ParentSpanOptions {}
69 changes: 58 additions & 11 deletions sdk/servicebus/service-bus/src/sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import {
RetryOptions,
retry
} from "@azure/core-amqp";
import { OperationOptionsBase } from "./modelsToBeSharedWithEventHubs";
import { getParentSpan, OperationOptionsBase } from "./modelsToBeSharedWithEventHubs";
import { CanonicalCode, Link, Span, SpanContext, SpanKind } from "@opentelemetry/api";
import { getTracer } from "@azure/core-tracing";

/**
* A Sender can be used to send messages, schedule messages to be sent at a later time
Expand Down Expand Up @@ -179,14 +181,19 @@ export class ServiceBusSenderImpl implements ServiceBusSender {
const invalidTypeErrMsg =
"Provided value for 'messages' must be of type ServiceBusMessage, ServiceBusMessageBatch or an array of type ServiceBusMessage.";

// link message span contexts
let spanContextsToLink: SpanContext[] = [];
if (isServiceBusMessage(messages)) {
messages = [messages];
}
let batch: ServiceBusMessageBatch;
if (Array.isArray(messages)) {
const batch = await this.createBatch(options);

for (const message of messages) {
batch = await this.createBatch(options);
for (let message of messages) {
HarshaNalluru marked this conversation as resolved.
Show resolved Hide resolved
if (!isServiceBusMessage(message)) {
throw new TypeError(invalidTypeErrMsg);
}
if (!batch.tryAdd(message)) {
if (!batch.tryAdd(message, { parentSpan: getParentSpan(options?.tracingOptions) })) {
// this is too big - throw an error
const error = new MessagingError(
"Messages were too big to fit in a single batch. Remove some messages and try again or create your own batch using createBatch(), which gives more fine-grained control."
Expand All @@ -195,14 +202,31 @@ export class ServiceBusSenderImpl implements ServiceBusSender {
throw error;
}
}

return this._sender.sendBatch(batch, options);
} else if (isServiceBusMessageBatch(messages)) {
return this._sender.sendBatch(messages, options);
} else if (isServiceBusMessage(messages)) {
return this._sender.send(messages, options);
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved
spanContextsToLink = messages._messageSpanContexts;
batch = messages;
} else {
throw new TypeError(invalidTypeErrMsg);
}

const sendSpan = this._createSendSpan(
getParentSpan(options?.tracingOptions),
spanContextsToLink
);

try {
const result = await this._sender.sendBatch(batch, options);
sendSpan.setStatus({ code: CanonicalCode.OK });
return result;
} catch (error) {
sendSpan.setStatus({
code: CanonicalCode.UNKNOWN,
message: error.message
});
throw error;
} finally {
sendSpan.end();
}
throw new TypeError(invalidTypeErrMsg);
}

async createBatch(options?: CreateBatchOptions): Promise<ServiceBusMessageBatch> {
Expand Down Expand Up @@ -323,6 +347,29 @@ export class ServiceBusSenderImpl implements ServiceBusSender {
throw err;
}
}

private _createSendSpan(
HarshaNalluru marked this conversation as resolved.
Show resolved Hide resolved
parentSpan?: Span | SpanContext | null,
spanContextsToLink: SpanContext[] = []
): Span {
const links: Link[] = spanContextsToLink.map((context) => {
return {
context
};
});
const tracer = getTracer();
const span = tracer.startSpan("Azure.ServiceBus.send", {
kind: SpanKind.CLIENT,
parent: parentSpan,
links
});

span.setAttribute("az.namespace", "Microsoft.ServiceBus");
span.setAttribute("message_bus.destination", this.entityPath);
span.setAttribute("peer.address", this._context.config.host);

return span;
}
}

/**
Expand Down
47 changes: 45 additions & 2 deletions sdk/servicebus/service-bus/src/serviceBusMessageBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ import {
message as RheaMessageUtil
} from "rhea-promise";
import { AmqpMessage } from "@azure/core-amqp";
import { SpanContext } from "@opentelemetry/api";
import {
instrumentServiceBusMessage,
TRACEPARENT_PROPERTY
} from "./diagnostics/instrumentServiceBusMessage";
import { createMessageSpan } from "./diagnostics/messageSpan";
import { TryAddOptions } from "./modelsToBeSharedWithEventHubs";

/**
* @internal
Expand Down Expand Up @@ -65,7 +72,7 @@ export interface ServiceBusMessageBatch {
* @param message An individual service bus message.
* @returns A boolean value indicating if the message has been added to the batch or not.
*/
tryAdd(message: ServiceBusMessage): boolean;
tryAdd(message: ServiceBusMessage, options?: TryAddOptions): boolean;

/**
* The AMQP message containing encoded events that were added to the batch.
Expand All @@ -77,6 +84,14 @@ export interface ServiceBusMessageBatch {
* @ignore
*/
_generateMessage(): Buffer;

/**
* Gets the "message" span contexts that were created when adding events to the batch.
* Used internally by the `sendBatch()` method to set up the right spans in traces if tracing is enabled.
* @internal
* @ignore
*/
readonly _messageSpanContexts: SpanContext[];
}

/**
Expand All @@ -95,6 +110,10 @@ export class ServiceBusMessageBatchImpl implements ServiceBusMessageBatch {
* @property Encoded amqp messages.
*/
private _encodedMessages: Buffer[] = [];
/**
* List of 'message' span contexts.
*/
private _spanContexts: SpanContext[] = [];
/**
* ServiceBusMessageBatch should not be constructed using `new ServiceBusMessageBatch()`
* Use the `createBatch()` method on your `Sender` instead.
Expand Down Expand Up @@ -132,6 +151,15 @@ export class ServiceBusMessageBatchImpl implements ServiceBusMessageBatch {
return this._encodedMessages.length;
}

/**
* Gets the "message" span contexts that were created when adding messages to the batch.
* @internal
* @ignore
*/
get _messageSpanContexts(): SpanContext[] {
return this._spanContexts;
}

/**
* Generates an AMQP message that contains the provided encoded messages and annotations.
*
Expand Down Expand Up @@ -210,12 +238,24 @@ export class ServiceBusMessageBatchImpl implements ServiceBusMessageBatch {
* @param message An individual service bus message.
* @returns A boolean value indicating if the message has been added to the batch or not.
*/
public tryAdd(message: ServiceBusMessage): boolean {
public tryAdd(message: ServiceBusMessage, options: TryAddOptions = {}): boolean {
throwTypeErrorIfParameterMissing(this._context.connectionId, "message", message);
if (!isServiceBusMessage(message)) {
throw new TypeError("Provided value for 'message' must be of type ServiceBusMessage.");
}

// check if the event has already been instrumented
const previouslyInstrumented = Boolean(
message.properties && message.properties[TRACEPARENT_PROPERTY]
);
let spanContext: SpanContext | undefined;
if (!previouslyInstrumented) {
const messageSpan = createMessageSpan(options?.parentSpan, this._context.config);
message = instrumentServiceBusMessage(message, messageSpan);
spanContext = messageSpan.context();
messageSpan.end();
}

// Convert ServiceBusMessage to AmqpMessage.
const amqpMessage = toAmqpMessage(message);
amqpMessage.body = this._context.dataTransformer.encode(message.body);
Expand Down Expand Up @@ -261,6 +301,9 @@ export class ServiceBusMessageBatchImpl implements ServiceBusMessageBatch {

// The message will fit in the batch, so it is now safe to store it.
this._encodedMessages.push(encodedMessage);
if (spanContext) {
this._spanContexts.push(spanContext);
}

this._sizeInBytes = currentSize;
return true;
Expand Down
Loading