Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[service-bus] Adding in tracing for receive operations #11810

Merged
merged 10 commits into from
Oct 14, 2020
Merged
30 changes: 20 additions & 10 deletions sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import { ConnectionContext } from "../connectionContext";
import { throwErrorIfConnectionClosed } from "../util/errors";
import { AbortSignalLike } from "@azure/abort-controller";
import { checkAndRegisterWithAbortSignal } from "../util/utils";
import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
import { createAndEndProcessingSpan } from "../diagnostics/instrumentServiceBusMessage";
import { ReceiveMode } from "../models";

/**
Expand Down Expand Up @@ -108,7 +110,7 @@ export class BatchingReceiver extends MessageReceiver {
maxMessageCount: number,
maxWaitTimeInMs: number,
maxTimeAfterFirstMessageInMs: number,
userAbortSignal?: AbortSignalLike
options: OperationOptionsBase
): Promise<ServiceBusMessageImpl[]> {
throwErrorIfConnectionClosed(this._context);

Expand All @@ -123,7 +125,7 @@ export class BatchingReceiver extends MessageReceiver {
maxMessageCount,
maxWaitTimeInMs,
maxTimeAfterFirstMessageInMs,
userAbortSignal
...options
});

if (this._lockRenewer) {
Expand Down Expand Up @@ -217,11 +219,10 @@ type MessageAndDelivery = Pick<EventContext, "message" | "delivery">;
* @internal
* @ignore
*/
interface ReceiveMessageArgs {
interface ReceiveMessageArgs extends OperationOptionsBase {
maxMessageCount: number;
maxWaitTimeInMs: number;
maxTimeAfterFirstMessageInMs: number;
userAbortSignal?: AbortSignalLike;
}

/**
Expand All @@ -234,17 +235,24 @@ interface ReceiveMessageArgs {
* @ignore
*/
export class BatchingReceiverLite {
/**
* NOTE: exists only to make unit testing possible.
*/
private _createAndEndProcessingSpan: typeof createAndEndProcessingSpan;
HarshaNalluru marked this conversation as resolved.
Show resolved Hide resolved

constructor(
connectionContext: ConnectionContext,
entityPath: string,
private _connectionContext: ConnectionContext,
public entityPath: string,
private _getCurrentReceiver: (
abortSignal?: AbortSignalLike
) => Promise<MinimalReceiver | undefined>,
private _receiveMode: ReceiveMode
) {
this._createAndEndProcessingSpan = createAndEndProcessingSpan;

this._createServiceBusMessage = (context: MessageAndDelivery) => {
return new ServiceBusMessageImpl(
connectionContext,
_connectionContext,
entityPath,
context.message!,
context.delivery!,
Expand Down Expand Up @@ -279,14 +287,16 @@ export class BatchingReceiverLite {
public async receiveMessages(args: ReceiveMessageArgs): Promise<ServiceBusMessageImpl[]> {
try {
this.isReceivingMessages = true;
const receiver = await this._getCurrentReceiver(args.userAbortSignal);
const receiver = await this._getCurrentReceiver(args.abortSignal);

if (receiver == null) {
// (was somehow closed in between the init() and the return)
return [];
}

return await this._receiveMessagesImpl(receiver, args);
const messages = await this._receiveMessagesImpl(receiver, args);
this._createAndEndProcessingSpan(messages, this, this._connectionContext.config, args);
return messages;
} finally {
this._closeHandler = undefined;
this.isReceivingMessages = false;
Expand Down Expand Up @@ -475,7 +485,7 @@ export class BatchingReceiverLite {
abortSignalCleanupFunction = checkAndRegisterWithAbortSignal((err) => {
cleanupBeforeResolveOrReject("removeDrainHandler");
reject(err);
}, args.userAbortSignal);
}, args.abortSignal);

// Action to be performed after the max wait time is over.
const actionAfterWaitTimeout = (): void => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { extractSpanContextFromTraceParentHeader, getTraceParentHeader } from "@azure/core-tracing";
import { Span, SpanContext } from "@opentelemetry/api";
import { ServiceBusMessage } from "../serviceBusMessage";
import {
extractSpanContextFromTraceParentHeader,
getTraceParentHeader,
getTracer
} from "@azure/core-tracing";
import { CanonicalCode, Link, Span, SpanContext, SpanKind } from "@opentelemetry/api";
import { ConnectionContext } from "../connectionContext";
import { getParentSpan, OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
import { ServiceBusReceiver } from "../receivers/receiver";
import { ServiceBusMessage, ServiceBusReceivedMessage } from "../serviceBusMessage";

/**
* @ignore
Expand Down Expand Up @@ -54,3 +61,95 @@ export function extractSpanContextFromServiceBusMessage(
const diagnosticId = message.properties[TRACEPARENT_PROPERTY] as string;
return extractSpanContextFromTraceParentHeader(diagnosticId);
}

/**
* Provides an iterable over messages, whether it is a single message or multiple
* messages.
*
* @param receivedMessages A single message or a set of messages
* @internal
* @ignore
*/
function* getReceivedMessages(
receivedMessages: ServiceBusReceivedMessage | ServiceBusReceivedMessage[]
): Iterable<ServiceBusReceivedMessage> {
if (!Array.isArray(receivedMessages)) {
yield receivedMessages;
} else {
for (const message of receivedMessages) {
yield message;
}
}
}

/**
* A span that encompasses the period when the message has been received and
* is being processed.
*
* NOTE: The amount of time the user would be considered processing the message is
* not always clear - in that case the span will have a very short lifetime
* since we'll start the span when we receive the message and end it when we
* give the message to the user.
*
* @internal
* @ignore
*/
export function createProcessingSpan(
receivedMessages: ServiceBusReceivedMessage | ServiceBusReceivedMessage[],
// NOTE: the connectionConfig also has an entityPath property but that only
// represents the optional entityPath in their connection string which is NOT
// what we want for tracing.
receiver: Pick<ServiceBusReceiver<any>, "entityPath">,
connectionConfig: Pick<ConnectionContext["config"], "host">,
options?: OperationOptionsBase
): Span {
const links: Link[] = [];

for (const receivedMessage of getReceivedMessages(receivedMessages)) {
const spanContext = extractSpanContextFromServiceBusMessage(receivedMessage);

if (spanContext == null) {
continue;
}

links.push({
context: spanContext,
attributes: {
enqueuedTime: receivedMessage.enqueuedTimeUtc?.getTime()
}
});
}

const span = getTracer().startSpan("Azure.ServiceBus.process", {
kind: SpanKind.CONSUMER,
links,
parent: getParentSpan(options?.tracingOptions)
});

span.setAttributes({
"az.namespace": "Microsoft.ServiceBus",
"message_bus.destination": receiver.entityPath,
"peer.address": connectionConfig.host
});

return span;
}

/**
* Creates and immediately ends a processing span. Used when
* the 'processing' occurs outside of our control so we don't
* know the scope.
*
* @internal
* @ignore
*/
export function createAndEndProcessingSpan(
receivedMessages: ServiceBusReceivedMessage | ServiceBusReceivedMessage[],
receiver: Pick<ServiceBusReceiver<any>, "entityPath">,
connectionConfig: Pick<ConnectionContext["config"], "host">,
options?: OperationOptionsBase
): void {
const span = createProcessingSpan(receivedMessages, receiver, connectionConfig, options);
span.setStatus({ code: CanonicalCode.OK });
span.end();
}
25 changes: 24 additions & 1 deletion sdk/servicebus/service-bus/src/modelsToBeSharedWithEventHubs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

// TODO: this code is a straight-copy from EventHubs. Need to merge.

import { Link, Span, SpanContext, SpanKind } from "@opentelemetry/api";
import { CanonicalCode, Link, Span, SpanContext, SpanKind } from "@opentelemetry/api";
import { OperationOptions } from "@azure/core-http";
import { getTracer, OperationTracingOptions } from "@azure/core-tracing";

Expand Down Expand Up @@ -57,3 +57,26 @@ export interface TryAddOptions {
*/
parentSpan?: Span | SpanContext | null;
}

/**
* Runs the `fn` passed in and marks the span as completed with an error (and the
* corresponding message) or as OK.
*
* @ignore
* @internal
*/
export async function trace<T>(fn: () => Promise<T>, span: Span): Promise<T> {
try {
const ret = await fn();
span.setStatus({ code: CanonicalCode.OK });
return ret;
} catch (err) {
span.setStatus({
code: CanonicalCode.UNKNOWN,
message: err.message
});
throw err;
} finally {
span.end();
}
}
20 changes: 15 additions & 5 deletions sdk/servicebus/service-bus/src/receivers/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
SubscribeOptions,
InternalMessageHandlers
} from "../models";
import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
import { OperationOptionsBase, trace } from "../modelsToBeSharedWithEventHubs";
import { ServiceBusReceivedMessage } from "..";
import { ConnectionContext } from "../connectionContext";
import {
Expand All @@ -28,6 +28,7 @@ import { ServiceBusReceivedMessageWithLock, ServiceBusMessageImpl } from "../ser
import { Constants, RetryConfig, RetryOperationType, RetryOptions, retry } from "@azure/core-amqp";
import "@azure/core-asynciterator-polyfill";
import { LockRenewer } from "../core/autoLockRenewer";
import { createProcessingSpan } from "../diagnostics/instrumentServiceBusMessage";
import { receiverLogger as logger } from "../log";

/**
Expand Down Expand Up @@ -157,6 +158,8 @@ export class ServiceBusReceiverImpl<
private _streamingReceiver?: StreamingReceiver;
private _lockRenewer: LockRenewer | undefined;

private _createProcessingSpan: typeof createProcessingSpan;

private get logPrefix() {
return `[${this._context.connectionId}|receiver:${this.entityPath}]`;
}
Expand All @@ -178,6 +181,7 @@ export class ServiceBusReceiverImpl<
maxAutoRenewLockDurationInMs,
receiveMode
);
this._createProcessingSpan = createProcessingSpan;
}

private _throwIfAlreadyReceiving(): void {
Expand Down Expand Up @@ -228,7 +232,7 @@ export class ServiceBusReceiverImpl<
onInitialize: () => Promise<void>,
onMessage: OnMessage,
onError: OnError,
options?: SubscribeOptions
options: SubscribeOptions
): void {
this._throwIfReceiverOrConnectionClosed();
this._throwIfAlreadyReceiving();
Expand Down Expand Up @@ -262,7 +266,9 @@ export class ServiceBusReceiverImpl<
}

if (!this.isClosed) {
sReceiver.subscribe(onMessage, onError);
sReceiver.subscribe(async (message) => {
await onMessage(message);
}, onError);
} else {
await sReceiver.close();
}
Expand Down Expand Up @@ -305,12 +311,14 @@ export class ServiceBusReceiverImpl<
options
);
}

const receivedMessages = await this._batchingReceiver.receive(
maxMessageCount,
options?.maxWaitTimeInMs ?? Constants.defaultOperationTimeoutInMs,
defaultMaxTimeAfterFirstMessageForBatchingMs,
options?.abortSignal
options ?? {}
);

return (receivedMessages as unknown) as ReceivedMessageT[];
};
const config: RetryConfig<ReceivedMessageT[]> = {
Expand Down Expand Up @@ -420,6 +428,7 @@ export class ServiceBusReceiverImpl<
close(): Promise<void>;
} {
assertValidMessageHandlers(handlers);
options = options ?? {};

const processError = wrapProcessErrorHandler(handlers);

Expand All @@ -434,7 +443,8 @@ export class ServiceBusReceiverImpl<
}
},
async (message: ServiceBusMessageImpl) => {
return handlers.processMessage((message as any) as ReceivedMessageT);
const span = this._createProcessingSpan(message, this, this._context.config, options);
return trace(() => handlers.processMessage((message as any) as ReceivedMessageT), span);
},
processError,
options
Expand Down
15 changes: 11 additions & 4 deletions sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import {
ErrorNameConditionMapper,
translate
} from "@azure/core-amqp";
import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
import { OperationOptionsBase, trace } from "../modelsToBeSharedWithEventHubs";
import "@azure/core-asynciterator-polyfill";
import { AmqpError } from "rhea-promise";
import { createProcessingSpan } from "../diagnostics/instrumentServiceBusMessage";
import { receiverLogger as logger } from "../log";

/**
Expand Down Expand Up @@ -113,6 +114,8 @@ export class ServiceBusSessionReceiverImpl<
*/
private _isClosed: boolean = false;

private _createProcessingSpan: typeof createProcessingSpan;

private get logPrefix() {
return `[${this._context.connectionId}|session:${this.entityPath}]`;
}
Expand All @@ -131,6 +134,7 @@ export class ServiceBusSessionReceiverImpl<
) {
throwErrorIfConnectionClosed(_context);
this.sessionId = _messageSession.sessionId;
this._createProcessingSpan = createProcessingSpan;
}

private _throwIfReceiverOrConnectionClosed(): void {
Expand Down Expand Up @@ -382,7 +386,7 @@ export class ServiceBusSessionReceiverImpl<
maxMessageCount,
options?.maxWaitTimeInMs ?? Constants.defaultOperationTimeoutInMs,
defaultMaxTimeAfterFirstMessageForBatchingMs,
options?.abortSignal
options ?? {}
);

return (receivedMessages as any) as ReceivedMessageT[];
Expand All @@ -406,11 +410,14 @@ export class ServiceBusSessionReceiverImpl<
// TODO - receiverOptions for subscribe??
assertValidMessageHandlers(handlers);

options = options ?? {};

const processError = wrapProcessErrorHandler(handlers);

this._registerMessageHandler(
async (message: ServiceBusMessageImpl) => {
return handlers.processMessage((message as any) as ReceivedMessageT);
const span = this._createProcessingSpan(message, this, this._context.config, options);
return trace(() => handlers.processMessage((message as any) as ReceivedMessageT), span);
},
processError,
options
Expand Down Expand Up @@ -448,7 +455,7 @@ export class ServiceBusSessionReceiverImpl<
private _registerMessageHandler(
onMessage: OnMessage,
onError: OnError,
options?: SubscribeOptions
options: SubscribeOptions
): void {
this._throwIfReceiverOrConnectionClosed();
this._throwIfAlreadyReceiving();
Expand Down
Loading