From 7e0ca71e7842e9e556b994cc7f4380e8549398b5 Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Wed, 14 Oct 2020 16:18:21 -0700 Subject: [PATCH] [service-bus] Adding in tracing for receive operations (#11810) Adding in tracing for the three receive methods (subscribe, receiveMessages and getMessageIterator) for sessions and non-sessions. Fixes #11465 --- sdk/servicebus/service-bus/CHANGELOG.md | 6 +- .../service-bus/src/core/batchingReceiver.ts | 30 +- .../instrumentServiceBusMessage.ts | 105 ++++- .../src/modelsToBeSharedWithEventHubs.ts | 25 +- .../service-bus/src/receivers/receiver.ts | 20 +- .../src/receivers/sessionReceiver.ts | 15 +- .../service-bus/src/session/messageSession.ts | 7 +- .../test/internal/batchingReceiver.spec.ts | 27 +- .../test/internal/messageSession.spec.ts | 10 +- .../test/internal/receiver.spec.ts | 2 +- .../test/internal/serviceBusClient.spec.ts | 14 +- .../service-bus/test/internal/tracing.spec.ts | 394 ++++++++++++++++++ .../test/internal/unittestUtils.ts | 29 +- .../service-bus/test/retries.spec.ts | 18 +- 14 files changed, 644 insertions(+), 58 deletions(-) create mode 100644 sdk/servicebus/service-bus/test/internal/tracing.spec.ts diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index 2e6b699dd6ff..68237d2a9fd6 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -2,8 +2,12 @@ ## 7.0.0-preview.8 (Unreleased) -- `sendMessages` method on the sender and `tryAdd` method to add messages to a batch now support tracing. +### New features: + +- Tracing, using [@azure/core-tracing](https://github.com/Azure/azure-sdk-for-js/blob/master/sdk/core/core-tracing/README.md), has been added for sending and receiving of messages. [PR 11651](https://github.com/Azure/azure-sdk-for-js/pull/11651) + and + [PR 11810](https://github.com/Azure/azure-sdk-for-js/pull/11810) ## 7.0.0-preview.7 (2020-10-07) diff --git a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts index 132c92c9f04c..0f8e5bab30a4 100644 --- a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts @@ -18,6 +18,8 @@ import { ConnectionContext } from "../connectionContext"; import { throwErrorIfConnectionClosed } from "../util/errors"; import { AbortSignalLike } from "@azure/abort-controller"; import { checkAndRegisterWithAbortSignal } from "../util/utils"; +import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs"; +import { createAndEndProcessingSpan } from "../diagnostics/instrumentServiceBusMessage"; import { ReceiveMode } from "../models"; /** @@ -108,7 +110,7 @@ export class BatchingReceiver extends MessageReceiver { maxMessageCount: number, maxWaitTimeInMs: number, maxTimeAfterFirstMessageInMs: number, - userAbortSignal?: AbortSignalLike + options: OperationOptionsBase ): Promise { throwErrorIfConnectionClosed(this._context); @@ -123,7 +125,7 @@ export class BatchingReceiver extends MessageReceiver { maxMessageCount, maxWaitTimeInMs, maxTimeAfterFirstMessageInMs, - userAbortSignal + ...options }); if (this._lockRenewer) { @@ -217,11 +219,10 @@ type MessageAndDelivery = Pick; * @internal * @ignore */ -interface ReceiveMessageArgs { +interface ReceiveMessageArgs extends OperationOptionsBase { maxMessageCount: number; maxWaitTimeInMs: number; maxTimeAfterFirstMessageInMs: number; - userAbortSignal?: AbortSignalLike; } /** @@ -234,17 +235,24 @@ interface ReceiveMessageArgs { * @ignore */ export class BatchingReceiverLite { + /** + * NOTE: exists only to make unit testing possible. + */ + private _createAndEndProcessingSpan: typeof createAndEndProcessingSpan; + constructor( - connectionContext: ConnectionContext, - entityPath: string, + private _connectionContext: ConnectionContext, + public entityPath: string, private _getCurrentReceiver: ( abortSignal?: AbortSignalLike ) => Promise, private _receiveMode: ReceiveMode ) { + this._createAndEndProcessingSpan = createAndEndProcessingSpan; + this._createServiceBusMessage = (context: MessageAndDelivery) => { return new ServiceBusMessageImpl( - connectionContext, + _connectionContext, entityPath, context.message!, context.delivery!, @@ -279,14 +287,16 @@ export class BatchingReceiverLite { public async receiveMessages(args: ReceiveMessageArgs): Promise { try { this.isReceivingMessages = true; - const receiver = await this._getCurrentReceiver(args.userAbortSignal); + const receiver = await this._getCurrentReceiver(args.abortSignal); if (receiver == null) { // (was somehow closed in between the init() and the return) return []; } - return await this._receiveMessagesImpl(receiver, args); + const messages = await this._receiveMessagesImpl(receiver, args); + this._createAndEndProcessingSpan(messages, this, this._connectionContext.config, args); + return messages; } finally { this._closeHandler = undefined; this.isReceivingMessages = false; @@ -475,7 +485,7 @@ export class BatchingReceiverLite { abortSignalCleanupFunction = checkAndRegisterWithAbortSignal((err) => { cleanupBeforeResolveOrReject("removeDrainHandler"); reject(err); - }, args.userAbortSignal); + }, args.abortSignal); // Action to be performed after the max wait time is over. const actionAfterWaitTimeout = (): void => { diff --git a/sdk/servicebus/service-bus/src/diagnostics/instrumentServiceBusMessage.ts b/sdk/servicebus/service-bus/src/diagnostics/instrumentServiceBusMessage.ts index f927eff1e7a2..be1fb67af3b7 100644 --- a/sdk/servicebus/service-bus/src/diagnostics/instrumentServiceBusMessage.ts +++ b/sdk/servicebus/service-bus/src/diagnostics/instrumentServiceBusMessage.ts @@ -1,9 +1,16 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -import { extractSpanContextFromTraceParentHeader, getTraceParentHeader } from "@azure/core-tracing"; -import { Span, SpanContext } from "@opentelemetry/api"; -import { ServiceBusMessage } from "../serviceBusMessage"; +import { + extractSpanContextFromTraceParentHeader, + getTraceParentHeader, + getTracer +} from "@azure/core-tracing"; +import { CanonicalCode, Link, Span, SpanContext, SpanKind } from "@opentelemetry/api"; +import { ConnectionContext } from "../connectionContext"; +import { getParentSpan, OperationOptionsBase } from "../modelsToBeSharedWithEventHubs"; +import { ServiceBusReceiver } from "../receivers/receiver"; +import { ServiceBusMessage, ServiceBusReceivedMessage } from "../serviceBusMessage"; /** * @ignore @@ -54,3 +61,95 @@ export function extractSpanContextFromServiceBusMessage( const diagnosticId = message.properties[TRACEPARENT_PROPERTY] as string; return extractSpanContextFromTraceParentHeader(diagnosticId); } + +/** + * Provides an iterable over messages, whether it is a single message or multiple + * messages. + * + * @param receivedMessages A single message or a set of messages + * @internal + * @ignore + */ +function* getReceivedMessages( + receivedMessages: ServiceBusReceivedMessage | ServiceBusReceivedMessage[] +): Iterable { + if (!Array.isArray(receivedMessages)) { + yield receivedMessages; + } else { + for (const message of receivedMessages) { + yield message; + } + } +} + +/** + * A span that encompasses the period when the message has been received and + * is being processed. + * + * NOTE: The amount of time the user would be considered processing the message is + * not always clear - in that case the span will have a very short lifetime + * since we'll start the span when we receive the message and end it when we + * give the message to the user. + * + * @internal + * @ignore + */ +export function createProcessingSpan( + receivedMessages: ServiceBusReceivedMessage | ServiceBusReceivedMessage[], + // NOTE: the connectionConfig also has an entityPath property but that only + // represents the optional entityPath in their connection string which is NOT + // what we want for tracing. + receiver: Pick, "entityPath">, + connectionConfig: Pick, + options?: OperationOptionsBase +): Span { + const links: Link[] = []; + + for (const receivedMessage of getReceivedMessages(receivedMessages)) { + const spanContext = extractSpanContextFromServiceBusMessage(receivedMessage); + + if (spanContext == null) { + continue; + } + + links.push({ + context: spanContext, + attributes: { + enqueuedTime: receivedMessage.enqueuedTimeUtc?.getTime() + } + }); + } + + const span = getTracer().startSpan("Azure.ServiceBus.process", { + kind: SpanKind.CONSUMER, + links, + parent: getParentSpan(options?.tracingOptions) + }); + + span.setAttributes({ + "az.namespace": "Microsoft.ServiceBus", + "message_bus.destination": receiver.entityPath, + "peer.address": connectionConfig.host + }); + + return span; +} + +/** + * Creates and immediately ends a processing span. Used when + * the 'processing' occurs outside of our control so we don't + * know the scope. + * + * @internal + * @ignore + */ +export function createAndEndProcessingSpan( + receivedMessages: ServiceBusReceivedMessage | ServiceBusReceivedMessage[], + receiver: Pick, "entityPath">, + connectionConfig: Pick, + options?: OperationOptionsBase +): void { + const span = createProcessingSpan(receivedMessages, receiver, connectionConfig, options); + span.setStatus({ code: CanonicalCode.OK }); + span.end(); +} diff --git a/sdk/servicebus/service-bus/src/modelsToBeSharedWithEventHubs.ts b/sdk/servicebus/service-bus/src/modelsToBeSharedWithEventHubs.ts index 2216f9910609..8e4cc931df0b 100644 --- a/sdk/servicebus/service-bus/src/modelsToBeSharedWithEventHubs.ts +++ b/sdk/servicebus/service-bus/src/modelsToBeSharedWithEventHubs.ts @@ -3,7 +3,7 @@ // TODO: this code is a straight-copy from EventHubs. Need to merge. -import { Link, Span, SpanContext, SpanKind } from "@opentelemetry/api"; +import { CanonicalCode, Link, Span, SpanContext, SpanKind } from "@opentelemetry/api"; import { OperationOptions } from "@azure/core-http"; import { getTracer, OperationTracingOptions } from "@azure/core-tracing"; @@ -57,3 +57,26 @@ export interface TryAddOptions { */ parentSpan?: Span | SpanContext | null; } + +/** + * Runs the `fn` passed in and marks the span as completed with an error (and the + * corresponding message) or as OK. + * + * @ignore + * @internal + */ +export async function trace(fn: () => Promise, span: Span): Promise { + try { + const ret = await fn(); + span.setStatus({ code: CanonicalCode.OK }); + return ret; + } catch (err) { + span.setStatus({ + code: CanonicalCode.UNKNOWN, + message: err.message + }); + throw err; + } finally { + span.end(); + } +} diff --git a/sdk/servicebus/service-bus/src/receivers/receiver.ts b/sdk/servicebus/service-bus/src/receivers/receiver.ts index 90db798104f5..c9b0a879e010 100644 --- a/sdk/servicebus/service-bus/src/receivers/receiver.ts +++ b/sdk/servicebus/service-bus/src/receivers/receiver.ts @@ -9,7 +9,7 @@ import { SubscribeOptions, InternalMessageHandlers } from "../models"; -import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs"; +import { OperationOptionsBase, trace } from "../modelsToBeSharedWithEventHubs"; import { ServiceBusReceivedMessage } from ".."; import { ConnectionContext } from "../connectionContext"; import { @@ -28,6 +28,7 @@ import { ServiceBusReceivedMessageWithLock, ServiceBusMessageImpl } from "../ser import { Constants, RetryConfig, RetryOperationType, RetryOptions, retry } from "@azure/core-amqp"; import "@azure/core-asynciterator-polyfill"; import { LockRenewer } from "../core/autoLockRenewer"; +import { createProcessingSpan } from "../diagnostics/instrumentServiceBusMessage"; import { receiverLogger as logger } from "../log"; /** @@ -157,6 +158,8 @@ export class ServiceBusReceiverImpl< private _streamingReceiver?: StreamingReceiver; private _lockRenewer: LockRenewer | undefined; + private _createProcessingSpan: typeof createProcessingSpan; + private get logPrefix() { return `[${this._context.connectionId}|receiver:${this.entityPath}]`; } @@ -178,6 +181,7 @@ export class ServiceBusReceiverImpl< maxAutoRenewLockDurationInMs, receiveMode ); + this._createProcessingSpan = createProcessingSpan; } private _throwIfAlreadyReceiving(): void { @@ -228,7 +232,7 @@ export class ServiceBusReceiverImpl< onInitialize: () => Promise, onMessage: OnMessage, onError: OnError, - options?: SubscribeOptions + options: SubscribeOptions ): void { this._throwIfReceiverOrConnectionClosed(); this._throwIfAlreadyReceiving(); @@ -262,7 +266,9 @@ export class ServiceBusReceiverImpl< } if (!this.isClosed) { - sReceiver.subscribe(onMessage, onError); + sReceiver.subscribe(async (message) => { + await onMessage(message); + }, onError); } else { await sReceiver.close(); } @@ -305,12 +311,14 @@ export class ServiceBusReceiverImpl< options ); } + const receivedMessages = await this._batchingReceiver.receive( maxMessageCount, options?.maxWaitTimeInMs ?? Constants.defaultOperationTimeoutInMs, defaultMaxTimeAfterFirstMessageForBatchingMs, - options?.abortSignal + options ?? {} ); + return (receivedMessages as unknown) as ReceivedMessageT[]; }; const config: RetryConfig = { @@ -420,6 +428,7 @@ export class ServiceBusReceiverImpl< close(): Promise; } { assertValidMessageHandlers(handlers); + options = options ?? {}; const processError = wrapProcessErrorHandler(handlers); @@ -434,7 +443,8 @@ export class ServiceBusReceiverImpl< } }, async (message: ServiceBusMessageImpl) => { - return handlers.processMessage((message as any) as ReceivedMessageT); + const span = this._createProcessingSpan(message, this, this._context.config, options); + return trace(() => handlers.processMessage((message as any) as ReceivedMessageT), span); }, processError, options diff --git a/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts b/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts index 6573fe51b997..98d9dd471401 100644 --- a/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts +++ b/sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts @@ -26,9 +26,10 @@ import { ErrorNameConditionMapper, translate } from "@azure/core-amqp"; -import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs"; +import { OperationOptionsBase, trace } from "../modelsToBeSharedWithEventHubs"; import "@azure/core-asynciterator-polyfill"; import { AmqpError } from "rhea-promise"; +import { createProcessingSpan } from "../diagnostics/instrumentServiceBusMessage"; import { receiverLogger as logger } from "../log"; /** @@ -113,6 +114,8 @@ export class ServiceBusSessionReceiverImpl< */ private _isClosed: boolean = false; + private _createProcessingSpan: typeof createProcessingSpan; + private get logPrefix() { return `[${this._context.connectionId}|session:${this.entityPath}]`; } @@ -131,6 +134,7 @@ export class ServiceBusSessionReceiverImpl< ) { throwErrorIfConnectionClosed(_context); this.sessionId = _messageSession.sessionId; + this._createProcessingSpan = createProcessingSpan; } private _throwIfReceiverOrConnectionClosed(): void { @@ -382,7 +386,7 @@ export class ServiceBusSessionReceiverImpl< maxMessageCount, options?.maxWaitTimeInMs ?? Constants.defaultOperationTimeoutInMs, defaultMaxTimeAfterFirstMessageForBatchingMs, - options?.abortSignal + options ?? {} ); return (receivedMessages as any) as ReceivedMessageT[]; @@ -406,11 +410,14 @@ export class ServiceBusSessionReceiverImpl< // TODO - receiverOptions for subscribe?? assertValidMessageHandlers(handlers); + options = options ?? {}; + const processError = wrapProcessErrorHandler(handlers); this._registerMessageHandler( async (message: ServiceBusMessageImpl) => { - return handlers.processMessage((message as any) as ReceivedMessageT); + const span = this._createProcessingSpan(message, this, this._context.config, options); + return trace(() => handlers.processMessage((message as any) as ReceivedMessageT), span); }, processError, options @@ -448,7 +455,7 @@ export class ServiceBusSessionReceiverImpl< private _registerMessageHandler( onMessage: OnMessage, onError: OnError, - options?: SubscribeOptions + options: SubscribeOptions ): void { this._throwIfReceiverOrConnectionClosed(); this._throwIfAlreadyReceiving(); diff --git a/sdk/servicebus/service-bus/src/session/messageSession.ts b/sdk/servicebus/service-bus/src/session/messageSession.ts index f7a4faae0be5..09ea245db7cd 100644 --- a/sdk/servicebus/service-bus/src/session/messageSession.ts +++ b/sdk/servicebus/service-bus/src/session/messageSession.ts @@ -28,6 +28,7 @@ import { onMessageSettled, DeferredPromiseAndTimer } from "../core/shared"; import { AbortError, AbortSignalLike } from "@azure/abort-controller"; import { ReceiverHelper } from "../core/receiverHelper"; import { AcceptSessionOptions, ReceiveMode, SubscribeOptions } from "../models"; +import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs"; /** * Describes the options that need to be provided while creating a message session receiver link. @@ -573,7 +574,7 @@ export class MessageSession extends LinkEntity { * * @returns void */ - subscribe(onMessage: OnMessage, onError: OnError, options?: SubscribeOptions): void { + subscribe(onMessage: OnMessage, onError: OnError, options: SubscribeOptions): void { if (!options) options = {}; if (options.abortSignal?.aborted) { @@ -713,14 +714,14 @@ export class MessageSession extends LinkEntity { maxMessageCount: number, maxWaitTimeInMs: number, maxTimeAfterFirstMessageInMs: number, - userAbortSignal?: AbortSignalLike + options: OperationOptionsBase ): Promise { try { return await this._batchingReceiverLite.receiveMessages({ maxMessageCount, maxWaitTimeInMs, maxTimeAfterFirstMessageInMs, - userAbortSignal + ...options }); } catch (error) { logger.logError(error, `${this.logPrefix} Rejecting receiveMessages() with error`); diff --git a/sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts index daf610420a67..dbfeeb10cd98 100644 --- a/sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/batchingReceiver.spec.ts @@ -16,7 +16,7 @@ import { } from "../../src/core/batchingReceiver"; import { defer, createConnectionContextForTests } from "./unittestUtils"; import { createAbortSignalForTest } from "../utils/abortSignalTestUtils"; -import { AbortController, AbortSignalLike } from "@azure/abort-controller"; +import { AbortController } from "@azure/abort-controller"; import { ServiceBusMessageImpl } from "../../src/serviceBusMessage"; import { Receiver as RheaReceiver, @@ -30,6 +30,7 @@ import { StandardAbortMessage } from "../../src/util/utils"; import { OnAmqpEventAsPromise } from "../../src/core/messageReceiver"; import { ConnectionContext } from "../../src/connectionContext"; import { ServiceBusReceiverImpl } from "../../src/receivers/receiver"; +import { OperationOptionsBase } from "../../src/modelsToBeSharedWithEventHubs"; import { ReceiveMode } from "../../src"; describe("BatchingReceiver unit tests", () => { @@ -64,9 +65,9 @@ describe("BatchingReceiver unit tests", () => { _maxMessageCount: number, _maxWaitTimeInMs: number, _maxTimeAfterFirstMessageMs: number, - abortSignal?: AbortSignalLike + options?: OperationOptionsBase ): Promise { - assert.equal(abortSignal, origAbortSignal); + assert.equal(options?.abortSignal, origAbortSignal); wasCalled = true; return []; } @@ -91,7 +92,9 @@ describe("BatchingReceiver unit tests", () => { }); try { - await receiver.receive(1, 60 * 1000, 60 * 1000, abortController.signal); + await receiver.receive(1, 60 * 1000, 60 * 1000, { + abortSignal: abortController.signal + }); assert.fail("Should have thrown"); } catch (err) { assert.equal(err.message, StandardAbortMessage); @@ -151,7 +154,7 @@ describe("BatchingReceiver unit tests", () => { }; try { - await receiver.receive(1, 60 * 1000, 60 * 1000, abortController.signal); + await receiver.receive(1, 60 * 1000, 60 * 1000, { abortSignal: abortController.signal }); assert.fail("Should have thrown"); } catch (err) { assert.equal(err.message, StandardAbortMessage); @@ -201,7 +204,7 @@ describe("BatchingReceiver unit tests", () => { receiver ); - const receivePromise = receiver.receive(1, bigTimeout, bigTimeout); + const receivePromise = receiver.receive(1, bigTimeout, bigTimeout, {}); await receiveIsReady; // batch fulfillment is checked when we receive a message... @@ -233,7 +236,7 @@ describe("BatchingReceiver unit tests", () => { const { receiveIsReady, remainingRegisteredListeners } = setupBatchingReceiver(receiver); - const receivePromise = receiver.receive(1, littleTimeout, bigTimeout); + const receivePromise = receiver.receive(1, littleTimeout, bigTimeout, {}); await receiveIsReady; @@ -267,7 +270,7 @@ describe("BatchingReceiver unit tests", () => { receiver ); - const receivePromise = receiver.receive(3, bigTimeout, littleTimeout); + const receivePromise = receiver.receive(3, bigTimeout, littleTimeout, {}); await receiveIsReady; // batch fulfillment is checked when we receive a message... @@ -317,7 +320,7 @@ describe("BatchingReceiver unit tests", () => { receiver ); - const receivePromise = receiver.receive(3, bigTimeout, littleTimeout); + const receivePromise = receiver.receive(3, bigTimeout, littleTimeout, {}); await receiveIsReady; // batch fulfillment is checked when we receive a message... @@ -393,7 +396,7 @@ describe("BatchingReceiver unit tests", () => { }; }; - const receivePromise = receiver.receive(3, bigTimeout + 1, bigTimeout + 2); + const receivePromise = receiver.receive(3, bigTimeout + 1, bigTimeout + 2, {}); await receiveIsReady; emitter.emit(ReceiverEvents.message, { @@ -566,7 +569,7 @@ describe("BatchingReceiver unit tests", () => { const { fakeRheaReceiver, receiveIsReady } = createFakeReceiver(); const receiver = new BatchingReceiverLite( - {} as ConnectionContext, + createConnectionContextForTests(), "fakeEntityPath", async () => { return fakeRheaReceiver; @@ -628,7 +631,7 @@ describe("BatchingReceiver unit tests", () => { const { fakeRheaReceiver, receiveIsReady } = createFakeReceiver(); const receiver = new BatchingReceiverLite( - {} as ConnectionContext, + createConnectionContextForTests(), "fakeEntityPath", async () => { return fakeRheaReceiver; diff --git a/sdk/servicebus/service-bus/test/internal/messageSession.spec.ts b/sdk/servicebus/service-bus/test/internal/messageSession.spec.ts index 6fa7074af131..1da832c9dc92 100644 --- a/sdk/servicebus/service-bus/test/internal/messageSession.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/messageSession.spec.ts @@ -60,7 +60,7 @@ describe("Message session unit tests", () => { const { receiveIsReady, emitter } = setupFakeReceiver(receiver as any); - const receivePromise = receiver.receiveMessages(1, bigTimeout, bigTimeout); + const receivePromise = receiver.receiveMessages(1, bigTimeout, bigTimeout, {}); await receiveIsReady; // batch fulfillment is checked when we receive a message... @@ -89,7 +89,7 @@ describe("Message session unit tests", () => { const { receiveIsReady } = setupFakeReceiver(receiver); - const receivePromise = receiver.receiveMessages(1, littleTimeout, bigTimeout); + const receivePromise = receiver.receiveMessages(1, littleTimeout, bigTimeout, {}); await receiveIsReady; @@ -118,7 +118,7 @@ describe("Message session unit tests", () => { const { receiveIsReady, emitter } = setupFakeReceiver(receiver); - const receivePromise = receiver.receiveMessages(3, bigTimeout, littleTimeout); + const receivePromise = receiver.receiveMessages(3, bigTimeout, littleTimeout, {}); await receiveIsReady; // batch fulfillment is checked when we receive a message... @@ -163,7 +163,7 @@ describe("Message session unit tests", () => { const { receiveIsReady, emitter } = setupFakeReceiver(receiver); - const receivePromise = receiver.receiveMessages(3, bigTimeout, littleTimeout); + const receivePromise = receiver.receiveMessages(3, bigTimeout, littleTimeout, {}); await receiveIsReady; // batch fulfillment is checked when we receive a message... @@ -234,7 +234,7 @@ describe("Message session unit tests", () => { }; }; - const receivePromise = receiver.receiveMessages(3, bigTimeout + 1, bigTimeout + 2); + const receivePromise = receiver.receiveMessages(3, bigTimeout + 1, bigTimeout + 2, {}); await receiveIsReady; emitter.emit(ReceiverEvents.message, { diff --git a/sdk/servicebus/service-bus/test/internal/receiver.spec.ts b/sdk/servicebus/service-bus/test/internal/receiver.spec.ts index 71a2ab8ce90b..271fd92cbabf 100644 --- a/sdk/servicebus/service-bus/test/internal/receiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/receiver.spec.ts @@ -42,7 +42,7 @@ describe("Receiver unit tests", () => { }; // make an init() happen internally. - const emptyArrayOfMessages = await batchingReceiver.receive(1, 1, 1); + const emptyArrayOfMessages = await batchingReceiver.receive(1, 1, 1, {}); assert.isEmpty(emptyArrayOfMessages); assert.isTrue(initWasCalled); diff --git a/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts b/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts index 85f648a811ec..3a0349b6c616 100644 --- a/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/serviceBusClient.spec.ts @@ -49,7 +49,12 @@ describe("serviceBusClient unit tests", () => { try { const abortSignalStuff = createAbortSignal(); - client["_connectionContext"] = createConnectionContextForTestsWithSessionId("a session id"); + const origConnectionContext = client["_connectionContext"]; + + client["_connectionContext"] = createConnectionContextForTestsWithSessionId( + "a session id", + origConnectionContext.config + ); let sessionReceiver: ServiceBusSessionReceiver; @@ -97,7 +102,12 @@ describe("serviceBusClient unit tests", () => { try { const abortSignalStuff = createAbortSignal(); - client["_connectionContext"] = createConnectionContextForTestsWithSessionId("session id"); + const origConnectionContext = client["_connectionContext"]; + + client["_connectionContext"] = createConnectionContextForTestsWithSessionId( + "session id", + origConnectionContext.config + ); let sessionReceiver: ServiceBusSessionReceiver; diff --git a/sdk/servicebus/service-bus/test/internal/tracing.spec.ts b/sdk/servicebus/service-bus/test/internal/tracing.spec.ts new file mode 100644 index 000000000000..4373383fac7e --- /dev/null +++ b/sdk/servicebus/service-bus/test/internal/tracing.spec.ts @@ -0,0 +1,394 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import chai from "chai"; +import { getTracer, NoOpSpan, TestSpan, TestTracer } from "@azure/core-tracing"; +import { CanonicalCode, SpanOptions } from "@opentelemetry/api"; +import { + ServiceBusMessage, + ServiceBusMessageImpl, + ServiceBusReceivedMessage +} from "../../src/serviceBusMessage"; +import { + createAndEndProcessingSpan, + createProcessingSpan, + instrumentServiceBusMessage, + TRACEPARENT_PROPERTY +} from "../../src/diagnostics/instrumentServiceBusMessage"; +import { OperationOptionsBase, trace } from "../../src/modelsToBeSharedWithEventHubs"; +import { setTracerForTest } from "../utils/misc"; +import { SpanKind } from "@opentelemetry/api"; +import { BatchingReceiverLite } from "../../src/core/batchingReceiver"; +import { Receiver } from "rhea-promise"; +import { createConnectionContextForTests } from "./unittestUtils"; +import sinon from "sinon"; +import { ServiceBusReceiverImpl } from "../../src/receivers/receiver"; +import { OnMessage } from "../../src/core/messageReceiver"; +import { ServiceBusSessionReceiverImpl } from "../../src/receivers/sessionReceiver"; +import { MessageSession } from "../../src/session/messageSession"; +const should = chai.should(); +const assert = chai.assert; + +describe("Tracing tests", () => { + let tracer: TestTracer2; + let resetTracer: () => void; + const tracingOptions: OperationOptionsBase["tracingOptions"] = { + spanOptions: { + parent: { + spanId: "my parent span id", + traceId: "my trace id", + traceFlags: 0 + } + } + }; + + beforeEach(() => { + ({ tracer, resetTracer } = setTracerForTest(new TestTracer2())); + }); + + afterEach(() => { + resetTracer(); + }); + + it("batching", async () => { + const createSpanStub = sinon.spy(createAndEndProcessingSpan); + + // both session and non-session receivers use the batching receiver lite + const br = new BatchingReceiverLite( + createConnectionContextForTests(), + "my entity path", + async () => (({} as any) as Receiver), + "peekLock" + ); + + br["_createAndEndProcessingSpan"] = createSpanStub; + + br["_receiveMessagesImpl"] = async () => { + return ([ + { + properties: { + "Diagnostic-Id": "diagnostic id 1" + } + }, + { + properties: { + "Diagnostic-Id": "diagnostic id 2" + } + } + ] as any) as ServiceBusMessageImpl[]; + }; + + await br.receiveMessages({ + maxMessageCount: 1, + maxTimeAfterFirstMessageInMs: 1, + maxWaitTimeInMs: 1, + tracingOptions + }); + + assert.isTrue(createSpanStub.calledOnce, "create span was called"); + + const [messages, , , options] = createSpanStub.args[0]; + + assert.isTrue( + Array.isArray(messages), + "only expect one call to the create a span (it can handle multiple messages)" + ); + + assert.deepEqual( + (messages as ServiceBusReceivedMessage[]).map((m) => m.properties!["Diagnostic-Id"]), + ["diagnostic id 1", "diagnostic id 2"] + ); + + assert.equal( + options?.tracingOptions?.spanOptions?.parent?.spanId, + "my parent span id", + "Parent span should be properly passed in." + ); + }); + + it("streaming (no sessions)", async () => { + const receiver = new ServiceBusReceiverImpl( + createConnectionContextForTests(), + "entity path", + "peekLock", + 1 + ); + + const testData = stubCreateProcessingSpan(receiver); + + let processMessage: OnMessage | undefined; + + receiver["_registerMessageHandler"] = (_pi, pm, _pe) => { + processMessage = pm; + }; + + receiver.subscribe( + { + processMessage: async (msg) => { + if (msg.properties!["Diagnostic-Id"] === "should throw") { + throw new Error("This message failed when we tried to process it"); + } + }, + processError: async (_err) => {} + }, + { + tracingOptions + } + ); + + assert.exists(processMessage, "subscribe call should have called _registerMessageHandler"); + + try { + await processMessage!(({ + properties: { + [TRACEPARENT_PROPERTY]: "should throw" + } + } as any) as ServiceBusMessageImpl); + assert.fail("Error should propagate after being traced"); + } catch (err) { + assert.equal(err.message, "This message failed when we tried to process it"); + + assert.deepEqual(testData.span?.status, { + code: CanonicalCode.UNKNOWN, + message: "This message failed when we tried to process it" + }); + } + + await processMessage!(({ + properties: { + [TRACEPARENT_PROPERTY]: "should NOT throw" + } + } as any) as ServiceBusMessageImpl); + + assert.equal(testData.span!.status.code, CanonicalCode.OK); + }); + + it("streaming (sessions)", async () => { + const receiver = new ServiceBusSessionReceiverImpl( + {} as any, + createConnectionContextForTests(), + "entity path", + "peekLock" + ); + + const testData = stubCreateProcessingSpan(receiver); + + let processMessage: OnMessage | undefined; + + receiver["_registerMessageHandler"] = (pm, _pe) => { + processMessage = pm; + }; + + receiver.subscribe( + { + processMessage: async (msg) => { + if (msg.properties!["Diagnostic-Id"] === "should throw") { + throw new Error("This message failed when we tried to process it"); + } + }, + processError: async (_err) => {} + }, + { + tracingOptions + } + ); + + assert.exists(processMessage, "subscribe call should have called _registerMessageHandler"); + + try { + await processMessage!(({ + properties: { + [TRACEPARENT_PROPERTY]: "should throw" + } + } as any) as ServiceBusMessageImpl); + assert.fail("Error should propagate after being traced"); + } catch (err) { + assert.equal(err.message, "This message failed when we tried to process it"); + assert.deepEqual(testData.span!.status, { + code: CanonicalCode.UNKNOWN, + message: "This message failed when we tried to process it" + }); + } + + await processMessage!(({ + properties: { + [TRACEPARENT_PROPERTY]: "should NOT throw" + } + } as any) as ServiceBusMessageImpl); + + assert.equal(testData.span!.status.code, CanonicalCode.OK); + }); + + /** + * Iterators are simple since they just pass their tracing data to receiveMessages. So + * we just make sure we've done that much and just rely on receiveMessages tests + * to validate tracing. + */ + [ + new ServiceBusReceiverImpl(createConnectionContextForTests(), "entity path", "peekLock", 1), + new ServiceBusSessionReceiverImpl( + {} as MessageSession, + createConnectionContextForTests(), + "entity path", + "peekLock" + ) + ].forEach((receiver) => { + it(`iterator (${receiver.constructor.name})`, async () => { + receiver["receiveMessages"] = async (_count, options) => { + assert.deepEqual(tracingOptions, options?.tracingOptions); + throw new Error( + "We are passing tracing options so it'll be up to receiveMessages to trace properly" + ); + }; + + const iterator = receiver.getMessageIterator({ + tracingOptions + }); + + try { + await iterator.next(); + assert.fail("Should throw my error"); + } catch (err) { + assert.equal( + err.message, + "We are passing tracing options so it'll be up to receiveMessages to trace properly" + ); + } + }); + }); + + function stubCreateProcessingSpan(receiver: any) { + let data: { + span?: TestSpan; + } = {}; + + const fakeCreateProcessingSpan: typeof createProcessingSpan = ( + messages, + receiver, + config, + options + ) => { + assert.equal(receiver.entityPath, "entity path"); + assert.equal(config.host, "fakeHost"); + assert.isFalse(Array.isArray(messages)); + assert.equal(options?.tracingOptions?.spanOptions?.parent?.spanId, "my parent span id"); + + data.span = getTracer().startSpan("some span") as TestSpan; + return data.span; + }; + + receiver["_createProcessingSpan"] = fakeCreateProcessingSpan; + return data; + } + + describe("telemetry", () => { + const otherProperties = { + entityPath: "entityPath" + }; + + const connectionConfig = { + host: "thehost" + }; + + it("basic span properties are set", async () => { + const fakeParentSpanContext = new NoOpSpan().context(); + + createProcessingSpan([], otherProperties, connectionConfig, { + tracingOptions: { + spanOptions: { + parent: fakeParentSpanContext + } + } + }); + + should.equal(tracer.spanName, "Azure.ServiceBus.process"); + + should.exist(tracer.spanOptions); + tracer.spanOptions!.kind!.should.equal(SpanKind.CONSUMER); + tracer.spanOptions!.parent!.should.equal(fakeParentSpanContext); + + const attributes = tracer.getRootSpans()[0].attributes; + + attributes!.should.deep.equal({ + "az.namespace": "Microsoft.ServiceBus", + "message_bus.destination": otherProperties.entityPath, + "peer.address": connectionConfig.host + }); + }); + + it("received events are linked to this span using Diagnostic-Id", async () => { + const requiredMessageProperties = { + body: "hello", + enqueuedTimeUtc: new Date() + }; + + const firstEvent = tracer.startSpan("a"); + const thirdEvent = tracer.startSpan("c"); + + const receivedMessages: ServiceBusMessage[] = [ + instrumentServiceBusMessage({ ...requiredMessageProperties }, firstEvent), + { properties: {}, ...requiredMessageProperties }, // no diagnostic ID means it gets skipped + instrumentServiceBusMessage({ ...requiredMessageProperties }, thirdEvent) + ]; + + createProcessingSpan( + (receivedMessages as any) as ServiceBusReceivedMessage[], + otherProperties, + connectionConfig, + {} + ); + + // middle event, since it has no trace information, doesn't get included + // in the telemetry + tracer.spanOptions!.links!.length.should.equal(3 - 1); + // the test tracer just hands out a string integer that just gets + // incremented + tracer.spanOptions!.links![0]!.context.traceId.should.equal(firstEvent.context().traceId); + (tracer.spanOptions!.links![0]!.attributes!.enqueuedTime as number).should.equal( + requiredMessageProperties.enqueuedTimeUtc.getTime() + ); + tracer.spanOptions!.links![1]!.context.traceId.should.equal(thirdEvent.context().traceId); + (tracer.spanOptions!.links![1]!.attributes!.enqueuedTime as number).should.equal( + requiredMessageProperties.enqueuedTimeUtc.getTime() + ); + }); + + it("trace - normal", async () => { + const tracer = new TestTracer(); + const span = tracer.startSpan("whatever"); + + await trace(async () => {}, span); + + span.status!.code.should.equal(CanonicalCode.OK); + span.endCalled.should.be.ok; + }); + + it("trace - throws", async () => { + const tracer = new TestTracer(); + const span = tracer.startSpan("whatever"); + + await trace(async () => { + throw new Error("error thrown from fn"); + }, span).should.be.rejectedWith(/error thrown from fn/); + + span.status!.code.should.equal(CanonicalCode.UNKNOWN); + span.status!.message!.should.equal("error thrown from fn"); + span.endCalled.should.be.ok; + }); + }); +}); + +class TestTracer2 extends TestTracer { + public spanOptions: SpanOptions | undefined; + public spanName: string | undefined; + + constructor() { + super(); + } + + startSpan(nameArg: string, optionsArg?: SpanOptions): TestSpan { + this.spanName = nameArg; + this.spanOptions = optionsArg; + return super.startSpan(nameArg, optionsArg); + } +} diff --git a/sdk/servicebus/service-bus/test/internal/unittestUtils.ts b/sdk/servicebus/service-bus/test/internal/unittestUtils.ts index 4887f23a4b79..78ffb89dca29 100644 --- a/sdk/servicebus/service-bus/test/internal/unittestUtils.ts +++ b/sdk/servicebus/service-bus/test/internal/unittestUtils.ts @@ -13,6 +13,13 @@ import { EventEmitter } from "events"; import { getUniqueName } from "../../src/util/utils"; import { Link } from "rhea-promise/typings/lib/link"; +export interface CreateConnectionContextForTestsOptions { + host?: string; + entityPath?: string; + onCreateAwaitableSenderCalled?: () => void; + onCreateReceiverCalled?: (receiver: RheaReceiver) => void; +} + /** * Creates a fake ConnectionContext for tests that can create semi-realistic * senders (less realistic, could use some work) and receivers (decent!). @@ -24,10 +31,9 @@ import { Link } from "rhea-promise/typings/lib/link"; * is created (via onCreateAwaitableSenderCalled). * */ -export function createConnectionContextForTests(options?: { - onCreateAwaitableSenderCalled?: () => void; - onCreateReceiverCalled?: (receiver: RheaReceiver) => void; -}): ConnectionContext & { +export function createConnectionContextForTests( + options?: CreateConnectionContextForTestsOptions +): ConnectionContext & { initWasCalled: boolean; } { let initWasCalled = false; @@ -41,7 +47,12 @@ export function createConnectionContextForTests(options?: { senders: {}, messageSessions: {}, managementClients: {}, - config: { endpoint: "my.service.bus" }, + config: { + endpoint: "my.service.bus", + // used by tracing + entityPath: options?.entityPath ?? "fakeEntityPath", + host: options?.host ?? "fakeHost" + }, connectionId: "connection-id", connection: { id: "connection-id", @@ -105,11 +116,13 @@ export function createConnectionContextForTests(options?: { * @param sessionId A session ID to use or the default ("hello") */ export function createConnectionContextForTestsWithSessionId( - sessionId: string = "hello" + sessionId: string = "hello", + options?: CreateConnectionContextForTestsOptions ): ConnectionContext & { initWasCalled: boolean; } { const connectionContext = createConnectionContextForTests({ + ...options, onCreateReceiverCalled: (receiver) => { (receiver as any).source = { filter: { @@ -120,6 +133,10 @@ export function createConnectionContextForTestsWithSessionId( (receiver as any).properties = { ["com.microsoft:locked-until-utc"]: Date.now() }; + + if (options?.onCreateReceiverCalled) { + options?.onCreateReceiverCalled(receiver); + } } }); diff --git a/sdk/servicebus/service-bus/test/retries.spec.ts b/sdk/servicebus/service-bus/test/retries.spec.ts index b809d919683b..5055cbdd3f02 100644 --- a/sdk/servicebus/service-bus/test/retries.spec.ts +++ b/sdk/servicebus/service-bus/test/retries.spec.ts @@ -12,12 +12,12 @@ import { ServiceBusSender, ServiceBusSenderImpl } from "../src/sender"; import { MessagingError } from "@azure/core-amqp"; import Long from "long"; import { BatchingReceiver } from "../src/core/batchingReceiver"; -import { delay } from "rhea-promise"; import { ServiceBusSessionReceiverImpl, ServiceBusSessionReceiver } from "../src/receivers/sessionReceiver"; import { ServiceBusReceiver, ServiceBusReceiverImpl } from "../src/receivers/receiver"; +import { InternalMessageHandlers } from "../src/models"; describe("Retries - ManagementClient", () => { let sender: ServiceBusSender; @@ -481,11 +481,19 @@ describe("Retries - onDetached", () => { it("Unpartitioned Queue: streaming", async function(): Promise { await beforeEachTest(TestClientType.UnpartitionedQueue); await mockOnDetachedAndVerifyRetries(async () => { - receiver.subscribe({ - async processMessage() {}, - async processError() {} + const subscribeInitializedPromise = new Promise((resolve, reject) => { + receiver.subscribe({ + async processInitialize() { + resolve(); + }, + async processMessage() {}, + async processError(err) { + reject(err); + } + } as InternalMessageHandlers); }); - await delay(2000); + + await subscribeInitializedPromise; const streamingReceiver = (receiver as ServiceBusReceiverImpl)["_streamingReceiver"]!; should.exist(streamingReceiver);