Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Service Bus] Tracing for send API #11651

Merged
merged 25 commits into from
Oct 8, 2020
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
a8c1d5d
tracing for trySend
HarshaNalluru Oct 5, 2020
a73fd31
tracing for send API
HarshaNalluru Oct 5, 2020
e46e805
API Report
HarshaNalluru Oct 5, 2020
4af1738
Pass tracing in tryAdd for array of messages, fx tryAdd, added new Pa…
HarshaNalluru Oct 6, 2020
c7d3a14
extractSpanContextFromServiceBusMessage tests
HarshaNalluru Oct 6, 2020
b5f77a8
test utils setTracerForTest
HarshaNalluru Oct 6, 2020
cd71e42
tracing tests
HarshaNalluru Oct 6, 2020
d631ee5
messageSpan tests
HarshaNalluru Oct 6, 2020
8375ed8
API Report
HarshaNalluru Oct 6, 2020
8038315
inline TryAddOptions
HarshaNalluru Oct 6, 2020
6fa23f7
API Report
HarshaNalluru Oct 6, 2020
a8b34fc
FIX ServiceBusMessage validations tests
HarshaNalluru Oct 6, 2020
d04a38e
changelog
HarshaNalluru Oct 6, 2020
9742d97
Update sdk/servicebus/service-bus/src/modelsToBeSharedWithEventHubs.ts
HarshaNalluru Oct 6, 2020
f262e45
sender link close - cleaning up new link - bug fix
HarshaNalluru Oct 6, 2020
ba4fef7
Merge branch 'harshan/sb/issue/tracing-send' of https://github.com/Ha…
HarshaNalluru Oct 6, 2020
e46deb4
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Oct 6, 2020
fea2177
let -> const
HarshaNalluru Oct 6, 2020
5c5795f
createSendSpan -> models shared with event-hubs
HarshaNalluru Oct 6, 2020
6b2409a
Changelog
HarshaNalluru Oct 7, 2020
7de9b6f
revert f262e4 and move onDetached to before calling refreshConnection
HarshaNalluru Oct 8, 2020
d1af28e
remove linentity - calling close during intialization
HarshaNalluru Oct 8, 2020
060842e
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Oct 8, 2020
3197fbc
skip the flaky test
HarshaNalluru Oct 8, 2020
84e48aa
Trigger pipeline(dummy commit)
HarshaNalluru Oct 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
[PR 11250](https://github.com/Azure/azure-sdk-for-js/pull/11250)
- "properties" in the correlation rule filter now supports `Date`.
[PR 11117](https://github.com/Azure/azure-sdk-for-js/pull/11117)
- `sendMessages` method on the sender and `tryAdd` method to add messages to a batch now support tracing.
[PR 11651](https://github.com/Azure/azure-sdk-for-js/pull/11651)
- Message locks can be auto-renewed in all receive methods (receiver.receiveMessages, receiver.subcribe
and receiver.getMessageIterator). This can be configured in options when calling `ServiceBusClient.createReceiver()`.
[PR 11658](https://github.com/Azure/azure-sdk-for-js/pull/11658)
Expand Down
11 changes: 10 additions & 1 deletion sdk/servicebus/service-bus/review/service-bus.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import { PageSettings } from '@azure/core-paging';
import { PipelineOptions } from '@azure/core-http';
import { RetryOptions } from '@azure/core-amqp';
import { ServiceClient } from '@azure/core-http';
import { Span } from '@opentelemetry/api';
import { SpanContext } from '@opentelemetry/api';
import { TokenCredential } from '@azure/core-amqp';
import { TokenType } from '@azure/core-amqp';
import { UserAgentOptions } from '@azure/core-http';
Expand Down Expand Up @@ -395,8 +397,10 @@ export interface ServiceBusMessageBatch {
// @internal
_generateMessage(): Buffer;
readonly maxSizeInBytes: number;
// @internal
readonly _messageSpanContexts: SpanContext[];
readonly sizeInBytes: number;
tryAdd(message: ServiceBusMessage): boolean;
tryAdd(message: ServiceBusMessage, options?: TryAddOptions): boolean;
}

// @public
Expand Down Expand Up @@ -577,6 +581,11 @@ export interface TopicRuntimeProperties {
export interface TopicRuntimePropertiesResponse extends TopicRuntimeProperties, Response {
}

// @public
export interface TryAddOptions {
parentSpan?: Span | SpanContext | null;
}

export { WebSocketImpl }

export { WebSocketOptions }
Expand Down
31 changes: 8 additions & 23 deletions sdk/servicebus/service-bus/src/connectionContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -329,29 +329,14 @@ export namespace ConnectionContext {
await delay(Constants.connectionReconnectDelay);

const detachCalls: Promise<void>[] = [];

// Call onDetached() on sender so that it can gracefully shutdown
for (const senderName of Object.keys(connectionContext.senders)) {
const sender = connectionContext.senders[senderName];
if (sender) {
logger.verbose(
"[%s] calling detached on sender '%s'.",
connectionContext.connection.id,
sender.name
);
detachCalls.push(
sender.onDetached().catch((err) => {
logError(
err,
"[%s] An error occurred while calling onDetached() the sender '%s': %O.",
connectionContext.connection.id,
sender.name,
err
);
})
);
}
}
// Neither we do recovery for the sender, nor we cleanup
// No recovery:
// Because we don't want to keep the sender active all the time
// and the "next" send call would bear the burden of creating the link
// No cleanup:
// "Closing the link" cleanup would step over new link initializations
// and can possibly clear the link once created, hence we do the cleanup
// at the time of new link creation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take the case of a disconnected event firing and user making no more send requests. In that case, the loop to keep renewing the token on the sender link does not get cleared and we will have a rogue loop on our hands

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize that we can argue that in the normal case of user stopping to make any send requests and no disconnected event, our loop will still keep trying to renew the lock. In that case, at some point the service will close the link because it has been idle for too long. That results in an "amqp close" event in which case we should clean up the timers and assets from our end. In this PR, we are removing those as well

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upon discussion, reverted this in favour of the proposal for cleanup with detached where onDetached would be called before the refreshConnection() is invoked.

Triggered live tests https://dev.azure.com/azure-sdk/internal/_build/results?buildId=567200&view=results


// Call onDetached() on receivers so that batching receivers it can gracefully close any ongoing batch operation
// and streaming receivers can decide whether to reconnect or not.
Expand Down
6 changes: 6 additions & 0 deletions sdk/servicebus/service-bus/src/core/linkEntity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,12 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
this.checkIfConnectionReady();

logger.verbose(`${this._logPrefix} Creating with options %O`, options);
if (this.link) {
// For the cases such as connection is disconnected, onAmqpClose and onSessionClose events,
// we don't clear the link for sender. To compensate for the above,
// we are calling close on the link to cleanup before creating the new one.
await this.link.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why close() and not closeLink()?
The former sets the _wasClosedPermanently to true and removes the link from the context cache which may have other repercussions

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.close() is different from .link.close(), we are not calling .close() here.

closeLink would try acquiring a lock but the initialization already has it, leading into a deadlock. Hence closeLink shouldn't be called. Alternatively, closeLinkImpl can be called which is a combination of clear-timeout and link.close(). Timers are handled in ensureTokenRenewal anyway, so link.close() makes sense here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyways, reverted this in favour of the proposal for cleanup with detached where onDetached would be called before the refreshConnection() is invoked.

}
this._link = await this.createRheaLink(options);
checkAborted();

Expand Down
27 changes: 0 additions & 27 deletions sdk/servicebus/service-bus/src/core/messageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,6 @@ export class MessageSender extends LinkEntity<AwaitableSender> {
`${this.logPrefix} 'sender_close' event occurred. The associated error is: %O`,
senderError
);

await this.onDetached().catch((err) => {
logError(
err,
`${this.logPrefix} error when closing sender after 'sender_close' event: %O`,
err
);
});
};

this._onSessionClose = async (context: EventContext) => {
Expand All @@ -128,14 +120,6 @@ export class MessageSender extends LinkEntity<AwaitableSender> {
`${this.logPrefix} 'session_close' event occurred. The associated error is: %O`,
sessionError
);

await this.onDetached().catch((err) => {
logError(
err,
`${this.logPrefix} error when closing sender after 'session_close' event: %O`,
err
);
});
};
}

Expand Down Expand Up @@ -333,17 +317,6 @@ export class MessageSender extends LinkEntity<AwaitableSender> {
}
}

/**
* Closes the rhea link.
* To be called when connection is disconnected, onAmqpClose and onSessionClose events.
* @returns {Promise<void>} Promise<void>.
*/
async onDetached(): Promise<void> {
// Clears the token renewal timer. Closes the link and its session if they are open.
// Removes the link and its session if they are present in rhea's cache.
await this.closeLink();
}

/**
* Determines whether the AMQP sender link is open. If open then returns true else returns false.
* @return {boolean} boolean
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { extractSpanContextFromTraceParentHeader, getTraceParentHeader } from "@azure/core-tracing";
import { Span, SpanContext } from "@opentelemetry/api";
import { ServiceBusMessage } from "../serviceBusMessage";

/**
* @ignore
*/
export const TRACEPARENT_PROPERTY = "Diagnostic-Id";

/**
* Populates the `ServiceBusMessage` with `SpanContext` info to support trace propagation.
* Creates and returns a copy of the passed in `ServiceBusMessage` unless the `ServiceBusMessage`
* has already been instrumented.
* @param message The `ServiceBusMessage` to instrument.
* @param span The `Span` containing the context to propagate tracing information.
* @ignore
* @internal
*/
export function instrumentServiceBusMessage(
message: ServiceBusMessage,
span: Span
): ServiceBusMessage {
if (message.properties && message.properties[TRACEPARENT_PROPERTY]) {
return message;
}

// create a copy so the original isn't modified
message = { ...message, properties: { ...message.properties } };

const traceParent = getTraceParentHeader(span.context());
if (traceParent) {
message.properties![TRACEPARENT_PROPERTY] = traceParent;
}

return message;
}

/**
* Extracts the `SpanContext` from an `ServiceBusMessage` if the context exists.
* @param message An individual `ServiceBusMessage` object.
* @internal
* @ignore
*/
export function extractSpanContextFromServiceBusMessage(
message: ServiceBusMessage
): SpanContext | undefined {
if (!message.properties || !message.properties[TRACEPARENT_PROPERTY]) {
return;
}

const diagnosticId = message.properties[TRACEPARENT_PROPERTY] as string;
return extractSpanContextFromTraceParentHeader(diagnosticId);
}
27 changes: 27 additions & 0 deletions sdk/servicebus/service-bus/src/diagnostics/messageSpan.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { ConnectionConfig } from "@azure/core-amqp";
import { getTracer } from "@azure/core-tracing";
import { Span, SpanContext, SpanKind } from "@opentelemetry/api";

/**
* @internal
* @ignore
*/
export function createMessageSpan(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feedback not for this PR, but a note for both Event Hubs & Service Bus for which we can log a generic issue on improvements and make changes for both packages at once in the future:

  • The name createMessageSpan() is slightly misleading as it may indicate that I can use it at any time when a message is involved. In reality, this method is tied to the "producer" kind, so is usable only when sending message.
  • Also, jsdoc for this method would be helpful
  • Why is config optional here? I would imagine that we would always have a ConnectionConfig

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes to the first two!

Why is config optional here? I would imagine that we would always have a ConnectionConfig

parentSpan is not a required parameter but feels primary, I believe that was the reason Chris made the config optional in event-hubs. I can make it the first param in both service-bus and event-hubs and make it required in a later PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logged #11687

parentSpan?: Span | SpanContext | null,
config?: Pick<ConnectionConfig, "entityPath" | "host">
): Span {
const tracer = getTracer();
const span = tracer.startSpan("Azure.ServiceBus.message", {
kind: SpanKind.PRODUCER,
parent: parentSpan
});
span.setAttribute("az.namespace", "Microsoft.ServiceBus");
if (config) {
span.setAttribute("message_bus.destination", config.entityPath);
span.setAttribute("peer.address", config.host);
}
return span;
}
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export {
SubQueue,
SubscribeOptions
} from "./models";
export { OperationOptionsBase } from "./modelsToBeSharedWithEventHubs";
export { OperationOptionsBase, TryAddOptions } from "./modelsToBeSharedWithEventHubs";
export { ServiceBusReceiver } from "./receivers/receiver";
export { ServiceBusSessionReceiver } from "./receivers/sessionReceiver";
export { ServiceBusSender } from "./sender";
Expand Down
41 changes: 38 additions & 3 deletions sdk/servicebus/service-bus/src/modelsToBeSharedWithEventHubs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

// TODO: this code is a straight-copy from EventHubs. Need to merge.

import { Span, SpanContext } from "@opentelemetry/api";
import { Link, Span, SpanContext, SpanKind } from "@opentelemetry/api";
import { OperationOptions } from "@azure/core-http";
import { getTracer, OperationTracingOptions } from "@azure/core-tracing";

/**
* NOTE: This type is intended to mirror the relevant fields and structure from @azure/core-http OperationOptions
Expand All @@ -18,7 +19,41 @@ export type OperationOptionsBase = Pick<OperationOptions, "abortSignal" | "traci
* @ignore
*/
export function getParentSpan(
options: Pick<OperationOptionsBase, "tracingOptions">
options?: OperationTracingOptions
): Span | SpanContext | null | undefined {
return options.tracingOptions?.spanOptions?.parent;
return options?.spanOptions?.parent;
}

export function createSendSpan(
parentSpan?: Span | SpanContext | null,
spanContextsToLink: SpanContext[] = [],
entityPath?: string,
host?: string
): Span {
const links: Link[] = spanContextsToLink.map((context) => {
return {
context
};
});
const tracer = getTracer();
const span = tracer.startSpan("Azure.ServiceBus.send", {
kind: SpanKind.CLIENT,
parent: parentSpan,
links
});

span.setAttribute("az.namespace", "Microsoft.ServiceBus");
span.setAttribute("message_bus.destination", entityPath);
span.setAttribute("peer.address", host);

return span;
}
/**
* The set of options to manually propagate `Span` context for distributed tracing.
*/
export interface TryAddOptions {
/**
* The `Span` or `SpanContext` to use as the `parent` of any spans created while calling operations that make a request to the service.
*/
parentSpan?: Span | SpanContext | null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed in Event Hubs that null is not an option for parentSpan. Why do we have this here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are other places where we allow the span to be "null" which is a valid value and it signifies the rootSpan.

Moreover, I needed to allow this "null" since I'm calling getParentSpan() for the tryAdd over array of messages which may return a null value.

In event-hubs, this tracing for "array of messages" case is duplicated.
"null" is a valid value and event-hubs should also allow this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll log an issue for fixing it in event-hubs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logged #11687

}
49 changes: 39 additions & 10 deletions sdk/servicebus/service-bus/src/sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ import {
RetryOptions,
retry
} from "@azure/core-amqp";
import { OperationOptionsBase } from "./modelsToBeSharedWithEventHubs";
import {
createSendSpan,
getParentSpan,
OperationOptionsBase
} from "./modelsToBeSharedWithEventHubs";
import { CanonicalCode, SpanContext } from "@opentelemetry/api";

/**
* A Sender can be used to send messages, schedule messages to be sent at a later time
Expand Down Expand Up @@ -179,14 +184,19 @@ export class ServiceBusSenderImpl implements ServiceBusSender {
const invalidTypeErrMsg =
"Provided value for 'messages' must be of type ServiceBusMessage, ServiceBusMessageBatch or an array of type ServiceBusMessage.";

// link message span contexts
let spanContextsToLink: SpanContext[] = [];
if (isServiceBusMessage(messages)) {
messages = [messages];
}
let batch: ServiceBusMessageBatch;
if (Array.isArray(messages)) {
const batch = await this.createBatch(options);

batch = await this.createBatch(options);
for (const message of messages) {
if (!isServiceBusMessage(message)) {
throw new TypeError(invalidTypeErrMsg);
}
if (!batch.tryAdd(message)) {
if (!batch.tryAdd(message, { parentSpan: getParentSpan(options?.tracingOptions) })) {
// this is too big - throw an error
const error = new MessagingError(
"Messages were too big to fit in a single batch. Remove some messages and try again or create your own batch using createBatch(), which gives more fine-grained control."
Expand All @@ -195,14 +205,33 @@ export class ServiceBusSenderImpl implements ServiceBusSender {
throw error;
}
}

return this._sender.sendBatch(batch, options);
} else if (isServiceBusMessageBatch(messages)) {
return this._sender.sendBatch(messages, options);
} else if (isServiceBusMessage(messages)) {
return this._sender.send(messages, options);
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved
spanContextsToLink = messages._messageSpanContexts;
batch = messages;
} else {
throw new TypeError(invalidTypeErrMsg);
}

const sendSpan = createSendSpan(
getParentSpan(options?.tracingOptions),
spanContextsToLink,
this.entityPath,
this._context.config.host
);

try {
const result = await this._sender.sendBatch(batch, options);
sendSpan.setStatus({ code: CanonicalCode.OK });
return result;
} catch (error) {
sendSpan.setStatus({
code: CanonicalCode.UNKNOWN,
message: error.message
});
throw error;
} finally {
sendSpan.end();
}
throw new TypeError(invalidTypeErrMsg);
}

async createBatch(options?: CreateBatchOptions): Promise<ServiceBusMessageBatch> {
Expand Down
Loading