Skip to content

Commit

Permalink
[service-bus] Adding in tracing for receive operations (Azure#11810)
Browse files Browse the repository at this point in the history
Adding in tracing for the three receive methods (subscribe, receiveMessages and getMessageIterator) for sessions and non-sessions.

Fixes Azure#11465
  • Loading branch information
richardpark-msft authored Oct 14, 2020
1 parent 51cda73 commit 7e0ca71
Show file tree
Hide file tree
Showing 14 changed files with 644 additions and 58 deletions.
6 changes: 5 additions & 1 deletion sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@

## 7.0.0-preview.8 (Unreleased)

- `sendMessages` method on the sender and `tryAdd` method to add messages to a batch now support tracing.
### New features:

- Tracing, using [@azure/core-tracing](https://github.com/Azure/azure-sdk-for-js/blob/master/sdk/core/core-tracing/README.md), has been added for sending and receiving of messages.
[PR 11651](https://github.com/Azure/azure-sdk-for-js/pull/11651)
and
[PR 11810](https://github.com/Azure/azure-sdk-for-js/pull/11810)

## 7.0.0-preview.7 (2020-10-07)

Expand Down
30 changes: 20 additions & 10 deletions sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import { ConnectionContext } from "../connectionContext";
import { throwErrorIfConnectionClosed } from "../util/errors";
import { AbortSignalLike } from "@azure/abort-controller";
import { checkAndRegisterWithAbortSignal } from "../util/utils";
import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
import { createAndEndProcessingSpan } from "../diagnostics/instrumentServiceBusMessage";
import { ReceiveMode } from "../models";

/**
Expand Down Expand Up @@ -108,7 +110,7 @@ export class BatchingReceiver extends MessageReceiver {
maxMessageCount: number,
maxWaitTimeInMs: number,
maxTimeAfterFirstMessageInMs: number,
userAbortSignal?: AbortSignalLike
options: OperationOptionsBase
): Promise<ServiceBusMessageImpl[]> {
throwErrorIfConnectionClosed(this._context);

Expand All @@ -123,7 +125,7 @@ export class BatchingReceiver extends MessageReceiver {
maxMessageCount,
maxWaitTimeInMs,
maxTimeAfterFirstMessageInMs,
userAbortSignal
...options
});

if (this._lockRenewer) {
Expand Down Expand Up @@ -217,11 +219,10 @@ type MessageAndDelivery = Pick<EventContext, "message" | "delivery">;
* @internal
* @ignore
*/
interface ReceiveMessageArgs {
interface ReceiveMessageArgs extends OperationOptionsBase {
maxMessageCount: number;
maxWaitTimeInMs: number;
maxTimeAfterFirstMessageInMs: number;
userAbortSignal?: AbortSignalLike;
}

/**
Expand All @@ -234,17 +235,24 @@ interface ReceiveMessageArgs {
* @ignore
*/
export class BatchingReceiverLite {
/**
* NOTE: exists only to make unit testing possible.
*/
private _createAndEndProcessingSpan: typeof createAndEndProcessingSpan;

constructor(
connectionContext: ConnectionContext,
entityPath: string,
private _connectionContext: ConnectionContext,
public entityPath: string,
private _getCurrentReceiver: (
abortSignal?: AbortSignalLike
) => Promise<MinimalReceiver | undefined>,
private _receiveMode: ReceiveMode
) {
this._createAndEndProcessingSpan = createAndEndProcessingSpan;

this._createServiceBusMessage = (context: MessageAndDelivery) => {
return new ServiceBusMessageImpl(
connectionContext,
_connectionContext,
entityPath,
context.message!,
context.delivery!,
Expand Down Expand Up @@ -279,14 +287,16 @@ export class BatchingReceiverLite {
public async receiveMessages(args: ReceiveMessageArgs): Promise<ServiceBusMessageImpl[]> {
try {
this.isReceivingMessages = true;
const receiver = await this._getCurrentReceiver(args.userAbortSignal);
const receiver = await this._getCurrentReceiver(args.abortSignal);

if (receiver == null) {
// (was somehow closed in between the init() and the return)
return [];
}

return await this._receiveMessagesImpl(receiver, args);
const messages = await this._receiveMessagesImpl(receiver, args);
this._createAndEndProcessingSpan(messages, this, this._connectionContext.config, args);
return messages;
} finally {
this._closeHandler = undefined;
this.isReceivingMessages = false;
Expand Down Expand Up @@ -475,7 +485,7 @@ export class BatchingReceiverLite {
abortSignalCleanupFunction = checkAndRegisterWithAbortSignal((err) => {
cleanupBeforeResolveOrReject("removeDrainHandler");
reject(err);
}, args.userAbortSignal);
}, args.abortSignal);

// Action to be performed after the max wait time is over.
const actionAfterWaitTimeout = (): void => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { extractSpanContextFromTraceParentHeader, getTraceParentHeader } from "@azure/core-tracing";
import { Span, SpanContext } from "@opentelemetry/api";
import { ServiceBusMessage } from "../serviceBusMessage";
import {
extractSpanContextFromTraceParentHeader,
getTraceParentHeader,
getTracer
} from "@azure/core-tracing";
import { CanonicalCode, Link, Span, SpanContext, SpanKind } from "@opentelemetry/api";
import { ConnectionContext } from "../connectionContext";
import { getParentSpan, OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
import { ServiceBusReceiver } from "../receivers/receiver";
import { ServiceBusMessage, ServiceBusReceivedMessage } from "../serviceBusMessage";

/**
* @ignore
Expand Down Expand Up @@ -54,3 +61,95 @@ export function extractSpanContextFromServiceBusMessage(
const diagnosticId = message.properties[TRACEPARENT_PROPERTY] as string;
return extractSpanContextFromTraceParentHeader(diagnosticId);
}

/**
* Provides an iterable over messages, whether it is a single message or multiple
* messages.
*
* @param receivedMessages A single message or a set of messages
* @internal
* @ignore
*/
function* getReceivedMessages(
receivedMessages: ServiceBusReceivedMessage | ServiceBusReceivedMessage[]
): Iterable<ServiceBusReceivedMessage> {
if (!Array.isArray(receivedMessages)) {
yield receivedMessages;
} else {
for (const message of receivedMessages) {
yield message;
}
}
}

/**
* A span that encompasses the period when the message has been received and
* is being processed.
*
* NOTE: The amount of time the user would be considered processing the message is
* not always clear - in that case the span will have a very short lifetime
* since we'll start the span when we receive the message and end it when we
* give the message to the user.
*
* @internal
* @ignore
*/
export function createProcessingSpan(
receivedMessages: ServiceBusReceivedMessage | ServiceBusReceivedMessage[],
// NOTE: the connectionConfig also has an entityPath property but that only
// represents the optional entityPath in their connection string which is NOT
// what we want for tracing.
receiver: Pick<ServiceBusReceiver<any>, "entityPath">,
connectionConfig: Pick<ConnectionContext["config"], "host">,
options?: OperationOptionsBase
): Span {
const links: Link[] = [];

for (const receivedMessage of getReceivedMessages(receivedMessages)) {
const spanContext = extractSpanContextFromServiceBusMessage(receivedMessage);

if (spanContext == null) {
continue;
}

links.push({
context: spanContext,
attributes: {
enqueuedTime: receivedMessage.enqueuedTimeUtc?.getTime()
}
});
}

const span = getTracer().startSpan("Azure.ServiceBus.process", {
kind: SpanKind.CONSUMER,
links,
parent: getParentSpan(options?.tracingOptions)
});

span.setAttributes({
"az.namespace": "Microsoft.ServiceBus",
"message_bus.destination": receiver.entityPath,
"peer.address": connectionConfig.host
});

return span;
}

/**
* Creates and immediately ends a processing span. Used when
* the 'processing' occurs outside of our control so we don't
* know the scope.
*
* @internal
* @ignore
*/
export function createAndEndProcessingSpan(
receivedMessages: ServiceBusReceivedMessage | ServiceBusReceivedMessage[],
receiver: Pick<ServiceBusReceiver<any>, "entityPath">,
connectionConfig: Pick<ConnectionContext["config"], "host">,
options?: OperationOptionsBase
): void {
const span = createProcessingSpan(receivedMessages, receiver, connectionConfig, options);
span.setStatus({ code: CanonicalCode.OK });
span.end();
}
25 changes: 24 additions & 1 deletion sdk/servicebus/service-bus/src/modelsToBeSharedWithEventHubs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

// TODO: this code is a straight-copy from EventHubs. Need to merge.

import { Link, Span, SpanContext, SpanKind } from "@opentelemetry/api";
import { CanonicalCode, Link, Span, SpanContext, SpanKind } from "@opentelemetry/api";
import { OperationOptions } from "@azure/core-http";
import { getTracer, OperationTracingOptions } from "@azure/core-tracing";

Expand Down Expand Up @@ -57,3 +57,26 @@ export interface TryAddOptions {
*/
parentSpan?: Span | SpanContext | null;
}

/**
* Runs the `fn` passed in and marks the span as completed with an error (and the
* corresponding message) or as OK.
*
* @ignore
* @internal
*/
export async function trace<T>(fn: () => Promise<T>, span: Span): Promise<T> {
try {
const ret = await fn();
span.setStatus({ code: CanonicalCode.OK });
return ret;
} catch (err) {
span.setStatus({
code: CanonicalCode.UNKNOWN,
message: err.message
});
throw err;
} finally {
span.end();
}
}
20 changes: 15 additions & 5 deletions sdk/servicebus/service-bus/src/receivers/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
SubscribeOptions,
InternalMessageHandlers
} from "../models";
import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
import { OperationOptionsBase, trace } from "../modelsToBeSharedWithEventHubs";
import { ServiceBusReceivedMessage } from "..";
import { ConnectionContext } from "../connectionContext";
import {
Expand All @@ -28,6 +28,7 @@ import { ServiceBusReceivedMessageWithLock, ServiceBusMessageImpl } from "../ser
import { Constants, RetryConfig, RetryOperationType, RetryOptions, retry } from "@azure/core-amqp";
import "@azure/core-asynciterator-polyfill";
import { LockRenewer } from "../core/autoLockRenewer";
import { createProcessingSpan } from "../diagnostics/instrumentServiceBusMessage";
import { receiverLogger as logger } from "../log";

/**
Expand Down Expand Up @@ -157,6 +158,8 @@ export class ServiceBusReceiverImpl<
private _streamingReceiver?: StreamingReceiver;
private _lockRenewer: LockRenewer | undefined;

private _createProcessingSpan: typeof createProcessingSpan;

private get logPrefix() {
return `[${this._context.connectionId}|receiver:${this.entityPath}]`;
}
Expand All @@ -178,6 +181,7 @@ export class ServiceBusReceiverImpl<
maxAutoRenewLockDurationInMs,
receiveMode
);
this._createProcessingSpan = createProcessingSpan;
}

private _throwIfAlreadyReceiving(): void {
Expand Down Expand Up @@ -228,7 +232,7 @@ export class ServiceBusReceiverImpl<
onInitialize: () => Promise<void>,
onMessage: OnMessage,
onError: OnError,
options?: SubscribeOptions
options: SubscribeOptions
): void {
this._throwIfReceiverOrConnectionClosed();
this._throwIfAlreadyReceiving();
Expand Down Expand Up @@ -262,7 +266,9 @@ export class ServiceBusReceiverImpl<
}

if (!this.isClosed) {
sReceiver.subscribe(onMessage, onError);
sReceiver.subscribe(async (message) => {
await onMessage(message);
}, onError);
} else {
await sReceiver.close();
}
Expand Down Expand Up @@ -305,12 +311,14 @@ export class ServiceBusReceiverImpl<
options
);
}

const receivedMessages = await this._batchingReceiver.receive(
maxMessageCount,
options?.maxWaitTimeInMs ?? Constants.defaultOperationTimeoutInMs,
defaultMaxTimeAfterFirstMessageForBatchingMs,
options?.abortSignal
options ?? {}
);

return (receivedMessages as unknown) as ReceivedMessageT[];
};
const config: RetryConfig<ReceivedMessageT[]> = {
Expand Down Expand Up @@ -420,6 +428,7 @@ export class ServiceBusReceiverImpl<
close(): Promise<void>;
} {
assertValidMessageHandlers(handlers);
options = options ?? {};

const processError = wrapProcessErrorHandler(handlers);

Expand All @@ -434,7 +443,8 @@ export class ServiceBusReceiverImpl<
}
},
async (message: ServiceBusMessageImpl) => {
return handlers.processMessage((message as any) as ReceivedMessageT);
const span = this._createProcessingSpan(message, this, this._context.config, options);
return trace(() => handlers.processMessage((message as any) as ReceivedMessageT), span);
},
processError,
options
Expand Down
Loading

0 comments on commit 7e0ca71

Please sign in to comment.