diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index c8054ea94a8a..3effd347d2a4 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -4,7 +4,8 @@ ### Features Added -- Add `state` property to `ServiceBusReceivedMessage`. Its value is one of `"active"`, `"deferred"`, or `"scheduled"`. [PR 18938](https://github.com/Azure/azure-sdk-for-js/pull/18938) +- Add `state` property to `ServiceBusReceivedMessage`. Its value is one of `"active"`, `"deferred"`, or `"scheduled"`. [PR #18938](https://github.com/Azure/azure-sdk-for-js/pull/18938) +- Add optional boolean `skipParsingBodyAsJson` property to `ServiceBusReceiverOptions` and `ServiceBusSessionReceiverOptions`. By default, the client attempts to parse message body as JSON object, and this new parameter controls whether the client should skip performing this parsing. [PR #18692](https://github.com/Azure/azure-sdk-for-js/pull/18692) ### Breaking Changes diff --git a/sdk/servicebus/service-bus/review/service-bus.api.md b/sdk/servicebus/service-bus/review/service-bus.api.md index 9307a4667b3b..17708dd3e280 100644 --- a/sdk/servicebus/service-bus/review/service-bus.api.md +++ b/sdk/servicebus/service-bus/review/service-bus.api.md @@ -459,6 +459,7 @@ export interface ServiceBusReceiver { export interface ServiceBusReceiverOptions { maxAutoLockRenewalDurationInMs?: number; receiveMode?: "peekLock" | "receiveAndDelete"; + skipParsingBodyAsJson?: boolean; subQueueType?: "deadLetter" | "transferDeadLetter"; } @@ -489,6 +490,7 @@ export interface ServiceBusSessionReceiver extends ServiceBusReceiver { export interface ServiceBusSessionReceiverOptions extends OperationOptionsBase { maxAutoLockRenewalDurationInMs?: number; receiveMode?: "peekLock" | "receiveAndDelete"; + skipParsingBodyAsJson?: boolean; } // @public diff --git a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts index e21d125163c1..d00c59ab4b7e 100644 --- a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts @@ -69,7 +69,8 @@ export class BatchingReceiver extends MessageReceiver { return this.link; }, - this.receiveMode + this.receiveMode, + options.skipParsingBodyAsJson ?? false ); } @@ -248,7 +249,8 @@ export class BatchingReceiverLite { private _getCurrentReceiver: ( abortSignal?: AbortSignalLike ) => Promise, - private _receiveMode: ReceiveMode + private _receiveMode: ReceiveMode, + _skipParsingBodyAsJson: boolean ) { this._createAndEndProcessingSpan = createAndEndProcessingSpan; @@ -257,7 +259,8 @@ export class BatchingReceiverLite { context.message!, context.delivery!, true, - this._receiveMode + this._receiveMode, + _skipParsingBodyAsJson ); }; diff --git a/sdk/servicebus/service-bus/src/core/managementClient.ts b/sdk/servicebus/service-bus/src/core/managementClient.ts index 73e258a23900..d408a77ec986 100644 --- a/sdk/servicebus/service-bus/src/core/managementClient.ts +++ b/sdk/servicebus/service-bus/src/core/managementClient.ts @@ -63,6 +63,12 @@ export interface SendManagementRequestOptions extends SendRequestOptions { * This is used for service side optimization. */ associatedLinkName?: string; + /** + * Option to disable the client from running JSON.parse() on the message body when receiving the message. + * Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you + * prefer to work directly with the bytes present in the message body than have the client attempt to parse it. + */ + skipParsingBodyAsJson?: boolean; } /** @@ -499,9 +505,10 @@ export class ManagementClient extends LinkEntity { const messages = result.body.messages as { message: Buffer }[]; for (const msg of messages) { const decodedMessage = RheaMessageUtil.decode(msg.message); - const message = fromRheaMessage(decodedMessage as any); - - message.body = defaultDataTransformer.decode(message.body); + const message = fromRheaMessage( + decodedMessage as any, + options?.skipParsingBodyAsJson ?? false + ); messageList.push(message); this._lastPeekedSequenceNumber = message.sequenceNumber!; } @@ -813,7 +820,8 @@ export class ManagementClient extends LinkEntity { decodedMessage as any, { tag: msg["lock-token"] } as any, false, - receiveMode + receiveMode, + options?.skipParsingBodyAsJson ?? false ); messageList.push(message); } diff --git a/sdk/servicebus/service-bus/src/core/messageReceiver.ts b/sdk/servicebus/service-bus/src/core/messageReceiver.ts index 3b204dd2aad4..7a02624886a5 100644 --- a/sdk/servicebus/service-bus/src/core/messageReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/messageReceiver.ts @@ -51,6 +51,12 @@ export interface ReceiveOptions extends SubscribeOptions { * maxAutoRenewLockDurationInMs value when they created their receiver. */ lockRenewer: LockRenewer | undefined; + /** + * Option to disable the client from running JSON.parse() on the message body when receiving the message. + * Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you + * prefer to work directly with the bytes present in the message body than have the client attempt to parse it. + */ + skipParsingBodyAsJson: boolean; } /** diff --git a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts index a3f81a6c15e6..ad1d58a04d35 100644 --- a/sdk/servicebus/service-bus/src/core/streamingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/streamingReceiver.ts @@ -233,7 +233,8 @@ export class StreamingReceiver extends MessageReceiver { context.message!, context.delivery!, true, - this.receiveMode + this.receiveMode, + options.skipParsingBodyAsJson ?? false ); this._lockRenewer?.start(this, bMessage, (err) => { diff --git a/sdk/servicebus/service-bus/src/dataTransformer.ts b/sdk/servicebus/service-bus/src/dataTransformer.ts index 9c55fb9720f1..2ef860a2422c 100644 --- a/sdk/servicebus/service-bus/src/dataTransformer.ts +++ b/sdk/servicebus/service-bus/src/dataTransformer.ts @@ -84,16 +84,17 @@ export const defaultDataTransformer = { * of the AMQP mesage. * * @param body - The AMQP message body + * @param skipParsingBodyAsJson - Boolean to skip running JSON.parse() on message body content. * @returns decoded body or the given body as-is. */ - decode(body: unknown): unknown { + decode(body: unknown, skipParsingBodyAsJson: boolean): unknown { let actualContent = body; if (isRheaAmqpSection(body)) { actualContent = body.content; } - return tryToJsonDecode(actualContent); + return skipParsingBodyAsJson ? actualContent : tryToJsonDecode(actualContent); }, /** * A function that takes the body property from an AMQP message, which can come from either @@ -103,16 +104,21 @@ export const defaultDataTransformer = { * indicating which part of the AMQP message the body was decoded from. * * @param body - The AMQP message body as received from rhea. + * @param skipParsingBodyAsJson - Boolean to skip running JSON.parse() on message body. * @returns The decoded/raw body and the body type. */ decodeWithType( - body: unknown | RheaAmqpSection + body: unknown | RheaAmqpSection, + skipParsingBodyAsJson: boolean ): { body: unknown; bodyType: "data" | "sequence" | "value" } { try { if (isRheaAmqpSection(body)) { switch (body.typecode) { case dataSectionTypeCode: - return { body: tryToJsonDecode(body.content), bodyType: "data" }; + return { + body: skipParsingBodyAsJson ? body.content : tryToJsonDecode(body.content), + bodyType: "data" + }; case sequenceSectionTypeCode: // typecode: // handle sequences @@ -125,7 +131,7 @@ export const defaultDataTransformer = { // not sure - we have to try to infer the proper bodyType and content if (isBuffer(body)) { // This indicates that we are getting the AMQP described type. Let us try decoding it. - return { body: tryToJsonDecode(body), bodyType: "data" }; + return { body: skipParsingBodyAsJson ? body : tryToJsonDecode(body), bodyType: "data" }; } else { return { body: body, bodyType: "value" }; } diff --git a/sdk/servicebus/service-bus/src/models.ts b/sdk/servicebus/service-bus/src/models.ts index 254ba299c686..0d890d9d8f34 100644 --- a/sdk/servicebus/service-bus/src/models.ts +++ b/sdk/servicebus/service-bus/src/models.ts @@ -136,6 +136,12 @@ export interface ServiceBusReceiverOptions { * - **To disable autolock renewal**, set this to `0`. */ maxAutoLockRenewalDurationInMs?: number; + /** + * Option to disable the client from running JSON.parse() on the message body when receiving the message. + * Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you + * prefer to work directly with the bytes present in the message body than have the client attempt to parse it. + */ + skipParsingBodyAsJson?: boolean; } /** @@ -233,6 +239,12 @@ export interface ServiceBusSessionReceiverOptions extends OperationOptionsBase { * - **To disable autolock renewal**, set this to `0`. */ maxAutoLockRenewalDurationInMs?: number; + /** + * Option to disable the client from running JSON.parse() on the message body when receiving the message. + * Not applicable if the message was sent with AMQP body type value or sequence. Use this option when you + * prefer to work directly with the bytes present in the message body than have the client attempt to parse it. + */ + skipParsingBodyAsJson?: boolean; } /** diff --git a/sdk/servicebus/service-bus/src/receivers/receiver.ts b/sdk/servicebus/service-bus/src/receivers/receiver.ts index e897fb6cc19a..0faf6af5a071 100644 --- a/sdk/servicebus/service-bus/src/receivers/receiver.ts +++ b/sdk/servicebus/service-bus/src/receivers/receiver.ts @@ -296,6 +296,7 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver { public entityPath: string, public receiveMode: "peekLock" | "receiveAndDelete", maxAutoRenewLockDurationInMs: number, + private skipParsingBodyAsJson: boolean, retryOptions: RetryOptions = {} ) { throwErrorIfConnectionClosed(_context); @@ -357,7 +358,8 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver { const receiveOptions: ReceiveOptions = { maxConcurrentCalls: 0, receiveMode: this.receiveMode, - lockRenewer: this._lockRenewer + lockRenewer: this._lockRenewer, + skipParsingBodyAsJson: this.skipParsingBodyAsJson }; this._batchingReceiver = this._createBatchingReceiver( this._context, @@ -507,7 +509,8 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver { ...options, receiveMode: this.receiveMode, retryOptions: this._retryOptions, - lockRenewer: this._lockRenewer + lockRenewer: this._lockRenewer, + skipParsingBodyAsJson: this.skipParsingBodyAsJson }); // this ensures that if the outer service bus client is closed that this receiver is cleaned up. diff --git a/sdk/servicebus/service-bus/src/serviceBusClient.ts b/sdk/servicebus/service-bus/src/serviceBusClient.ts index ac4019a021c3..3f987393bbc7 100644 --- a/sdk/servicebus/service-bus/src/serviceBusClient.ts +++ b/sdk/servicebus/service-bus/src/serviceBusClient.ts @@ -211,6 +211,7 @@ export class ServiceBusClient { entityPathWithSubQueue, receiveMode, maxLockAutoRenewDurationInMs, + options?.skipParsingBodyAsJson ?? false, this._clientOptions.retryOptions ); } @@ -321,7 +322,8 @@ export class ServiceBusClient { maxAutoLockRenewalDurationInMs: options?.maxAutoLockRenewalDurationInMs, receiveMode, abortSignal: options?.abortSignal, - retryOptions: this._clientOptions.retryOptions + retryOptions: this._clientOptions.retryOptions, + skipParsingBodyAsJson: options?.skipParsingBodyAsJson ?? false } ); @@ -406,7 +408,8 @@ export class ServiceBusClient { maxAutoLockRenewalDurationInMs: options?.maxAutoLockRenewalDurationInMs, receiveMode, abortSignal: options?.abortSignal, - retryOptions: this._clientOptions.retryOptions + retryOptions: this._clientOptions.retryOptions, + skipParsingBodyAsJson: options?.skipParsingBodyAsJson ?? false } ); diff --git a/sdk/servicebus/service-bus/src/serviceBusMessage.ts b/sdk/servicebus/service-bus/src/serviceBusMessage.ts index 0fc8d4b2e7a1..1ae95676465c 100644 --- a/sdk/servicebus/service-bus/src/serviceBusMessage.ts +++ b/sdk/servicebus/service-bus/src/serviceBusMessage.ts @@ -516,6 +516,7 @@ export interface ServiceBusReceivedMessage extends ServiceBusMessage { */ export function fromRheaMessage( rheaMessage: RheaMessage, + skipParsingBodyAsJson: boolean, delivery?: Delivery, shouldReorderLockToken?: boolean ): ServiceBusReceivedMessage { @@ -525,7 +526,10 @@ export function fromRheaMessage( }; } - const { body, bodyType } = defaultDataTransformer.decodeWithType(rheaMessage.body); + const { body, bodyType } = defaultDataTransformer.decodeWithType( + rheaMessage.body, + skipParsingBodyAsJson + ); const sbmsg: ServiceBusMessage = { body: body @@ -896,13 +900,16 @@ export class ServiceBusMessageImpl implements ServiceBusReceivedMessage { msg: RheaMessage, delivery: Delivery, shouldReorderLockToken: boolean, - receiveMode: ReceiveMode + receiveMode: ReceiveMode, + skipParsingBodyAsJson: boolean ) { const { _rawAmqpMessage, ...restOfMessageProps } = fromRheaMessage( msg, + skipParsingBodyAsJson, delivery, shouldReorderLockToken ); + this._rawAmqpMessage = _rawAmqpMessage; // need to initialize _rawAmqpMessage property to make compiler happy Object.assign(this, restOfMessageProps); this.state = restOfMessageProps.state; // to suppress error TS2564: Property 'state' has no initializer and is not definitely assigned in the constructor. @@ -911,23 +918,6 @@ export class ServiceBusMessageImpl implements ServiceBusReceivedMessage { if (receiveMode === "receiveAndDelete") { this.lockToken = undefined; } - - let actualBodyType: - | ReturnType["bodyType"] - | undefined = undefined; - - if (msg.body) { - try { - const result = defaultDataTransformer.decodeWithType(msg.body); - - this.body = result.body; - actualBodyType = result.bodyType; - } catch (err) { - this.body = undefined; - } - } - this._rawAmqpMessage = _rawAmqpMessage; - this._rawAmqpMessage.bodyType = actualBodyType; this.delivery = delivery; } diff --git a/sdk/servicebus/service-bus/src/session/messageSession.ts b/sdk/servicebus/service-bus/src/session/messageSession.ts index f38a9fb075f5..2bed1e33e051 100644 --- a/sdk/servicebus/service-bus/src/session/messageSession.ts +++ b/sdk/servicebus/service-bus/src/session/messageSession.ts @@ -62,6 +62,7 @@ export type MessageSessionOptions = Pick< > & { receiveMode?: ReceiveMode; retryOptions: RetryOptions | undefined; + skipParsingBodyAsJson: boolean; }; /** @@ -180,6 +181,11 @@ export class MessageSession extends LinkEntity { private _totalAutoLockRenewDuration: number; + /** + * Whether to prevent the client from running JSON.parse() on the message body when receiving the message. + */ + private skipParsingBodyAsJson: boolean; + public get receiverHelper(): ReceiverHelper { return this._receiverHelper; } @@ -375,6 +381,7 @@ export class MessageSession extends LinkEntity { this.autoComplete = false; if (isDefined(this._providedSessionId)) this.sessionId = this._providedSessionId; this.receiveMode = options.receiveMode || "peekLock"; + this.skipParsingBodyAsJson = options.skipParsingBodyAsJson; this.maxAutoRenewDurationInMs = options.maxAutoLockRenewalDurationInMs != null ? options.maxAutoLockRenewalDurationInMs @@ -389,7 +396,8 @@ export class MessageSession extends LinkEntity { async (_abortSignal?: AbortSignalLike): Promise => { return this.link!; }, - this.receiveMode + this.receiveMode, + this.skipParsingBodyAsJson ); // setting all the handlers @@ -628,7 +636,8 @@ export class MessageSession extends LinkEntity { context.message!, context.delivery!, true, - this.receiveMode + this.receiveMode, + this.skipParsingBodyAsJson ); try { diff --git a/sdk/servicebus/service-bus/test/internal/node/samples.spec.ts b/sdk/servicebus/service-bus/test/internal/node/samples.spec.ts index 553517cd30e6..00371c57fc09 100644 --- a/sdk/servicebus/service-bus/test/internal/node/samples.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/node/samples.spec.ts @@ -34,13 +34,13 @@ describe("Ensure typescript samples use published package", function(): void { } it("Ensure TypeScript samples use published package", async () => { - const pattern = "samples/typescript/src/**/*.ts"; + const pattern = "samples/v7/typescript/src/**/*.ts"; const files = await globAsync(pattern); testSamples(files, new RegExp('from\\s"@azure/service-bus"')); }); it("Ensure JavaScript samples use published package", async () => { - const pattern = "samples/javascript/**/*.js"; + const pattern = "samples/v7/javascript/**/*.js"; const files = await globAsync(pattern); testSamples(files, new RegExp('=\\srequire\\("@azure/service-bus"\\)')); }); diff --git a/sdk/servicebus/service-bus/test/internal/retries.spec.ts b/sdk/servicebus/service-bus/test/internal/retries.spec.ts index aa4aec9c2239..067a342b14e0 100644 --- a/sdk/servicebus/service-bus/test/internal/retries.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/retries.spec.ts @@ -353,7 +353,8 @@ describe("Retries - Receive methods", () => { "dummyEntityPath", { lockRenewer: undefined, - receiveMode: "peekLock" + receiveMode: "peekLock", + skipParsingBodyAsJson: false } ); batchingReceiver.isOpen = () => true; diff --git a/sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts index 571ad9671211..c48c9b3ead9b 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/abortSignal.spec.ts @@ -29,7 +29,8 @@ import { ReceiveMode } from "../../../src/models"; describe("AbortSignal", () => { const defaultOptions = { lockRenewer: undefined, - receiveMode: "peekLock" + receiveMode: "peekLock", + skipParsingBodyAsJson: false }; const testMessageThatDoesntMatter = { @@ -357,7 +358,8 @@ describe("AbortSignal", () => { const connectionContext = createConnectionContextForTestsWithSessionId(); const messageSession = await MessageSession.create(connectionContext, "entityPath", "hello", { - retryOptions: undefined + retryOptions: undefined, + skipParsingBodyAsJson: false }); const session = new ServiceBusSessionReceiverImpl( @@ -397,7 +399,8 @@ describe("AbortSignal", () => { createConnectionContextForTests(), "entityPath", "peekLock", - 1 + 1, + false ); try { diff --git a/sdk/servicebus/service-bus/test/internal/unit/amqpUnitTests.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/amqpUnitTests.spec.ts index 4dccc02c4a36..41e3ee64971b 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/amqpUnitTests.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/amqpUnitTests.spec.ts @@ -46,7 +46,8 @@ describe("AMQP message encoding", () => { } as any) as Message, {} as Delivery, false, - "receiveAndDelete" + "receiveAndDelete", + false ); it("isAmqpAnnotatedMessage", () => { diff --git a/sdk/servicebus/service-bus/test/internal/unit/batchingReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/batchingReceiver.spec.ts index b34c3501cd3c..af37d0d58787 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/batchingReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/batchingReceiver.spec.ts @@ -53,7 +53,8 @@ describe("BatchingReceiver unit tests", () => { createConnectionContextForTests(), "fakeEntityPath", "peekLock", - 1 + 1, + false ); let wasCalled = false; @@ -86,7 +87,8 @@ describe("BatchingReceiver unit tests", () => { const receiver = new BatchingReceiver(createConnectionContextForTests(), "fakeEntityPath", { receiveMode: "peekLock", - lockRenewer: undefined + lockRenewer: undefined, + skipParsingBodyAsJson: false }); try { @@ -105,7 +107,8 @@ describe("BatchingReceiver unit tests", () => { const receiver = new BatchingReceiver(createConnectionContextForTests(), "fakeEntityPath", { receiveMode: "peekLock", - lockRenewer: undefined + lockRenewer: undefined, + skipParsingBodyAsJson: false }); closeables.push(receiver); @@ -193,7 +196,8 @@ describe("BatchingReceiver unit tests", () => { "dummyEntityPath", { receiveMode: lockMode, - lockRenewer: undefined + lockRenewer: undefined, + skipParsingBodyAsJson: false } ); closeables.push(batchingReceiver); @@ -225,7 +229,8 @@ describe("BatchingReceiver unit tests", () => { "dummyEntityPath", { receiveMode: lockMode, - lockRenewer: undefined + lockRenewer: undefined, + skipParsingBodyAsJson: false } ); closeables.push(receiver); @@ -257,7 +262,8 @@ describe("BatchingReceiver unit tests", () => { "dummyEntityPath", { receiveMode: lockMode, - lockRenewer: undefined + lockRenewer: undefined, + skipParsingBodyAsJson: false } ); closeables.push(batchingReceiver); @@ -305,7 +311,8 @@ describe("BatchingReceiver unit tests", () => { "dummyEntityPath", { receiveMode: lockMode, - lockRenewer: undefined + lockRenewer: undefined, + skipParsingBodyAsJson: false } ); closeables.push(batchingReceiver); @@ -359,7 +366,8 @@ describe("BatchingReceiver unit tests", () => { "dummyEntityPath", { receiveMode: lockMode, - lockRenewer: undefined + lockRenewer: undefined, + skipParsingBodyAsJson: false } ); closeables.push(batchingReceiver); @@ -526,7 +534,8 @@ describe("BatchingReceiver unit tests", () => { async () => { return fakeRheaReceiver; }, - "peekLock" + "peekLock", + false ); assert.isFalse(batchingReceiver.isReceivingMessages); @@ -541,7 +550,7 @@ describe("BatchingReceiver unit tests", () => { assert.isTrue(batchingReceiver.isReceivingMessages); await receiveIsReady; - await clock.tick(10 + 1); + clock.tick(10 + 1); await prm; assert.isFalse(batchingReceiver.isReceivingMessages); @@ -556,7 +565,8 @@ describe("BatchingReceiver unit tests", () => { async () => { return fakeRheaReceiver; }, - "peekLock" + "peekLock", + false ); assert.notExists(batchingReceiver["_closeHandler"]); @@ -572,7 +582,7 @@ describe("BatchingReceiver unit tests", () => { await receiveIsReady; assert.exists(batchingReceiver["_closeHandler"]); - await batchingReceiver.terminate(new Error("actual error")); + batchingReceiver.terminate(new Error("actual error")); try { await receiveMessagesPromise; @@ -591,7 +601,8 @@ describe("BatchingReceiver unit tests", () => { async () => { return fakeRheaReceiver; }, - "peekLock" + "peekLock", + false ); assert.notExists(batchingReceiver["_closeHandler"]); @@ -644,7 +655,8 @@ describe("BatchingReceiver unit tests", () => { async () => { return fakeRheaReceiver; }, - "peekLock" + "peekLock", + false ); batchingReceiverLite["_receiveMessagesImpl"]( @@ -705,7 +717,8 @@ describe("BatchingReceiver unit tests", () => { async () => { return fakeRheaReceiver; }, - "peekLock" + "peekLock", + false ); const receiveIsReady = getReceiveIsReadyPromise(batchingReceiverLite); diff --git a/sdk/servicebus/service-bus/test/internal/unit/dataTransformer.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/dataTransformer.spec.ts index aea607ecd8fa..5e13fdb5974e 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/dataTransformer.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/dataTransformer.spec.ts @@ -45,16 +45,25 @@ describe("DataTransformer", function() { const encoded: any = transformer.encode(stringBody, "data"); encoded.typecode.should.equal(117); isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); + const decoded: any = transformer.decode(encoded, false); decoded.should.equal(stringBody); done(); }); + it("should not decode a message body when skipParsingBodyAsJson is specified", function(done) { + const encoded: any = transformer.encode(stringBody, "data"); + encoded.typecode.should.equal(117); + isBuffer(encoded.content).should.equal(true); + const decoded: any = transformer.decode(encoded, true); + decoded.should.equal(encoded.content); + done(); + }); + it("should correctly encode/decode a number message body", function(done) { const encoded: any = transformer.encode(numberBody, "data"); encoded.typecode.should.equal(117); isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); + const decoded: any = transformer.decode(encoded, false); decoded.should.equal(numberBody); done(); }); @@ -63,7 +72,7 @@ describe("DataTransformer", function() { const encoded: any = transformer.encode(booleanBody, "data"); encoded.typecode.should.equal(117); isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); + const decoded: any = transformer.decode(encoded, false); decoded.should.equal(booleanBody); done(); }); @@ -72,7 +81,7 @@ describe("DataTransformer", function() { const encoded: any = transformer.encode(nullBody, "data"); encoded.typecode.should.equal(117); isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); + const decoded: any = transformer.decode(encoded, false); should.equal(decoded, nullBody); done(); }); @@ -81,7 +90,7 @@ describe("DataTransformer", function() { const encoded: any = transformer.encode(undefinedBody, "data"); encoded.typecode.should.equal(117); isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); + const decoded: any = transformer.decode(encoded, false); should.equal(decoded, nullBody); done(); }); @@ -90,7 +99,7 @@ describe("DataTransformer", function() { const encoded: any = transformer.encode(emptyStringBody, "data"); encoded.typecode.should.equal(117); isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); + const decoded: any = transformer.decode(encoded, false); decoded.should.equal(emptyStringBody); done(); }); @@ -99,7 +108,7 @@ describe("DataTransformer", function() { const encoded: any = transformer.encode(arrayBody, "data"); encoded.typecode.should.equal(117); isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); + const decoded: any = transformer.decode(encoded, false); assert.deepEqual(decoded, arrayBody); done(); }); @@ -108,7 +117,7 @@ describe("DataTransformer", function() { const encoded: any = transformer.encode(objectBody, "data"); encoded.typecode.should.equal(117); isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); + const decoded: any = transformer.decode(encoded, false); assert.deepEqual(decoded, objectBody); done(); }); @@ -117,7 +126,7 @@ describe("DataTransformer", function() { const encoded: any = transformer.encode(bufferBody, "data"); encoded.typecode.should.equal(117); isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); + const decoded: any = transformer.decode(encoded, false); assert.deepEqual(decoded, bufferBody); done(); }); @@ -126,7 +135,7 @@ describe("DataTransformer", function() { const encoded: any = transformer.encode(hexBufferBody, "data"); encoded.typecode.should.equal(117); isBuffer(encoded.content).should.equal(true); - const decoded: any = transformer.decode(encoded); + const decoded: any = transformer.decode(encoded, false); assert.deepEqual(decoded, hexBufferBody); done(); }); @@ -135,61 +144,61 @@ describe("DataTransformer", function() { // It is possible that we receive an AMQP value type from the messages that were sent with // previously shipped version of the sdk. If so then we should be able to handle those scenarios. it("should correctly decode a string message body", function(done) { - const decoded: any = transformer.decode(stringBody); + const decoded: any = transformer.decode(stringBody, false); decoded.should.equal(stringBody); done(); }); it("should correctly decode a number message body", function(done) { - const decoded: any = transformer.decode(numberBody); + const decoded: any = transformer.decode(numberBody, false); decoded.should.equal(numberBody); done(); }); it("should correctly decode a boolean message body", function(done) { - const decoded: any = transformer.decode(booleanBody); + const decoded: any = transformer.decode(booleanBody, false); decoded.should.equal(booleanBody); done(); }); it("should correctly decode a null message body", function(done) { - const decoded: any = transformer.decode(nullBody); + const decoded: any = transformer.decode(nullBody, false); should.equal(decoded, nullBody); done(); }); it("should correctly decode an undefined message body", function(done) { - const decoded: any = transformer.decode(undefinedBody); + const decoded: any = transformer.decode(undefinedBody, false); should.equal(decoded, undefined); done(); }); it("should correctly decode an empty string message body", function(done) { - const decoded: any = transformer.decode(emptyStringBody); + const decoded: any = transformer.decode(emptyStringBody, false); decoded.should.equal(emptyStringBody); done(); }); it("should correctly decode an array message body", function(done) { - const decoded: any = transformer.decode(arrayBody); + const decoded: any = transformer.decode(arrayBody, false); assert.deepEqual(decoded, arrayBody); done(); }); it("should correctly decode an object message body", function(done) { - const decoded: any = transformer.decode(objectBody); + const decoded: any = transformer.decode(objectBody, false); assert.deepEqual(decoded, objectBody); done(); }); it("should correctly decode a buffer message body", function(done) { - const decoded: any = transformer.decode(bufferBody); + const decoded: any = transformer.decode(bufferBody, false); assert.deepEqual(decoded, bufferBody); done(); }); it("should correctly decode a hex buffer message body", function(done) { - const decoded: any = transformer.decode(hexBufferBody); + const decoded: any = transformer.decode(hexBufferBody, false); assert.deepEqual(decoded, hexBufferBody); done(); }); diff --git a/sdk/servicebus/service-bus/test/internal/unit/linkentity.unittest.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/linkentity.unittest.spec.ts index 0468a38c8c49..d217707ea4f9 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/linkentity.unittest.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/linkentity.unittest.spec.ts @@ -348,6 +348,7 @@ describe("LinkEntity unit tests", () => { abortSignal: undefined, lockRenewer: undefined, receiveMode: "receiveAndDelete", + skipParsingBodyAsJson: false, tracingOptions: {} }); @@ -371,6 +372,7 @@ describe("LinkEntity unit tests", () => { abortSignal: undefined, lockRenewer: undefined, receiveMode: "receiveAndDelete", + skipParsingBodyAsJson: false, tracingOptions: {} }); @@ -410,7 +412,8 @@ describe("LinkEntity unit tests", () => { it("session", () => { const messageSession = new MessageSession(connectionContext, "entityPath", "session-id", { abortSignal: undefined, - retryOptions: {} + retryOptions: {}, + skipParsingBodyAsJson: false }); initCachedLinks(messageSession.name); diff --git a/sdk/servicebus/service-bus/test/internal/unit/messageSession.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/messageSession.spec.ts index 2aaefeeb185a..09a77896408c 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/messageSession.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/messageSession.spec.ts @@ -54,7 +54,8 @@ describe("Message session unit tests", () => { undefined, { receiveMode: lockMode, - retryOptions: undefined + retryOptions: undefined, + skipParsingBodyAsJson: false } ); @@ -84,7 +85,8 @@ describe("Message session unit tests", () => { undefined, { receiveMode: lockMode, - retryOptions: undefined + retryOptions: undefined, + skipParsingBodyAsJson: false } ); @@ -114,7 +116,8 @@ describe("Message session unit tests", () => { undefined, { receiveMode: lockMode, - retryOptions: undefined + retryOptions: undefined, + skipParsingBodyAsJson: false } ); @@ -160,7 +163,8 @@ describe("Message session unit tests", () => { undefined, { receiveMode: lockMode, - retryOptions: undefined + retryOptions: undefined, + skipParsingBodyAsJson: false } ); @@ -212,7 +216,8 @@ describe("Message session unit tests", () => { undefined, { receiveMode: lockMode, - retryOptions: undefined + retryOptions: undefined, + skipParsingBodyAsJson: false } ); @@ -360,7 +365,8 @@ describe("Message session unit tests", () => { "session id", { receiveMode: "receiveAndDelete", - retryOptions: undefined + retryOptions: undefined, + skipParsingBodyAsJson: false } ); diff --git a/sdk/servicebus/service-bus/test/internal/unit/receivedMessageProps.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/receivedMessageProps.spec.ts index 14286de7fe16..dd13910585cd 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/receivedMessageProps.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/receivedMessageProps.spec.ts @@ -24,7 +24,7 @@ describe("Message translations", () => { ...toRheaMessage(testMessage, { encode: (body) => body }), message_annotations: { [Constants.enqueuedTime]: Date.now().valueOf() } }; - const expiresAtUtc = fromRheaMessage(rheaMsg).expiresAtUtc; + const expiresAtUtc = fromRheaMessage(rheaMsg, false).expiresAtUtc; should.not.equal(expiresAtUtc?.toString(), "Invalid Date", "expiresAtUtc is Invalid Date"); } diff --git a/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts index f8b3080d9bdf..30b5d63c52b4 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/receiver.spec.ts @@ -31,7 +31,8 @@ describe("Receiver unit tests", () => { "fakeEntityPath", { lockRenewer: undefined, - receiveMode: "peekLock" + receiveMode: "peekLock", + skipParsingBodyAsJson: false } ); @@ -58,7 +59,8 @@ describe("Receiver unit tests", () => { "fakeEntityPath", { lockRenewer: undefined, - receiveMode: "peekLock" + receiveMode: "peekLock", + skipParsingBodyAsJson: false } ); @@ -99,7 +101,8 @@ describe("Receiver unit tests", () => { createConnectionContextForTests(), "fakeEntityPath", "peekLock", - 1 + 1, + false ); const subscription = await subscribeAndWaitForInitialize(receiverImpl); @@ -137,7 +140,8 @@ describe("Receiver unit tests", () => { }), "fakeEntityPath", "peekLock", - 1 + 1, + false ); const subscription = await subscribeAndWaitForInitialize(receiverImpl); @@ -168,7 +172,8 @@ describe("Receiver unit tests", () => { createConnectionContextForTests(), "fakeEntityPath", "peekLock", - 1 + 1, + false ); const abortSignal = { @@ -211,7 +216,8 @@ describe("Receiver unit tests", () => { createConnectionContextForTests(), "entity path", "peekLock", - 1 + 1, + false ); const abortSignal = createAbortSignalForTest(true); @@ -237,7 +243,8 @@ describe("Receiver unit tests", () => { "entity path", undefined, { - retryOptions: undefined + retryOptions: undefined, + skipParsingBodyAsJson: false } ); @@ -276,7 +283,7 @@ describe("Receiver unit tests", () => { it("create() with an existing _streamingReceiver", async () => { const context = createConnectionContextForTests(); - impl = new ServiceBusReceiverImpl(context, "entity path", "peekLock", 1); + impl = new ServiceBusReceiverImpl(context, "entity path", "peekLock", 1, false); const existingStreamingReceiver = createStreamingReceiver("entityPath"); const subscribeStub = sinon.spy(existingStreamingReceiver, "subscribe"); @@ -303,7 +310,7 @@ describe("Receiver unit tests", () => { it("create() with an existing receiver and that receiver is NOT open()", async () => { const context = createConnectionContextForTests(); - impl = new ServiceBusReceiverImpl(context, "entity path", "peekLock", 1); + impl = new ServiceBusReceiverImpl(context, "entity path", "peekLock", 1, false); await subscribeAndWaitForInitialize(impl); diff --git a/sdk/servicebus/service-bus/test/internal/unit/serviceBusMessage.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/serviceBusMessage.spec.ts index 1db7da374d06..2b3c3223636c 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/serviceBusMessage.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/serviceBusMessage.spec.ts @@ -42,7 +42,8 @@ describe("ServiceBusMessageImpl unit tests", () => { amqpMessage, { tag: fakeDeliveryTag } as Delivery, false, - "peekLock" + "peekLock", + false ); assert.equal(sbMessage.lockToken, expectedLockToken, "Unexpected lock token found"); @@ -53,7 +54,8 @@ describe("ServiceBusMessageImpl unit tests", () => { amqpMessage, { tag: fakeDeliveryTag } as Delivery, false, - "receiveAndDelete" + "receiveAndDelete", + false ); assert.equal(!!sbMessage.lockToken, false, "Unexpected lock token found"); @@ -105,7 +107,13 @@ describe("ServiceBusMessageImpl unit tests", () => { user_id: "random_user_id" }; - const sbMessage = new ServiceBusMessageImpl(amqpMessage, fakeDelivery, false, "peekLock"); + const sbMessage = new ServiceBusMessageImpl( + amqpMessage, + fakeDelivery, + false, + "peekLock", + false + ); it("headers match", () => { assert.equal(sbMessage._rawAmqpMessage.header?.firstAcquirer, amqpMessage.first_acquirer); diff --git a/sdk/servicebus/service-bus/test/internal/unit/serviceBusReceiverUnitTests.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/serviceBusReceiverUnitTests.spec.ts index 135678944748..8db7e2ffd962 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/serviceBusReceiverUnitTests.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/serviceBusReceiverUnitTests.spec.ts @@ -17,7 +17,8 @@ describe("ServiceBusReceiver unit tests", () => { createConnectionContextForTests(), "entityPath", "peekLock", - 0 + 0, + false ); }); diff --git a/sdk/servicebus/service-bus/test/internal/unit/streamingReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/streamingReceiver.spec.ts index 07fe52bd0b10..a1c87bee3fce 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/streamingReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/streamingReceiver.spec.ts @@ -73,7 +73,8 @@ describe("StreamingReceiver unit tests", () => { it("errors thrown from the user's callback are marked as 'processMessageCallback' errors", async () => { const streamingReceiver = createTestStreamingReceiver("entity path", { lockRenewer: undefined, - receiveMode: "receiveAndDelete" + receiveMode: "receiveAndDelete", + skipParsingBodyAsJson: false }); try { @@ -243,7 +244,8 @@ describe("StreamingReceiver unit tests", () => { it("_setMessageHandlers", async () => { const streamingReceiver = createTestStreamingReceiver("entitypath", { lockRenewer: undefined, - receiveMode: "peekLock" + receiveMode: "peekLock", + skipParsingBodyAsJson: false }); let processErrorMessages: string[] = []; diff --git a/sdk/servicebus/service-bus/test/internal/unit/tracing.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/tracing.spec.ts index d8bc6559c777..3be601c13a20 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/tracing.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/tracing.spec.ts @@ -61,7 +61,8 @@ describe("Tracing tests", () => { createConnectionContextForTests(), "my entity path", async () => (({} as any) as Receiver), - "peekLock" + "peekLock", + false ); br["_createAndEndProcessingSpan"] = createSpanStub; @@ -239,7 +240,13 @@ describe("Tracing tests", () => { * to validate tracing. */ [ - new ServiceBusReceiverImpl(createConnectionContextForTests(), "entity path", "peekLock", 1), + new ServiceBusReceiverImpl( + createConnectionContextForTests(), + "entity path", + "peekLock", + 1, + false + ), new ServiceBusSessionReceiverImpl( {} as MessageSession, createConnectionContextForTests(), diff --git a/sdk/servicebus/service-bus/test/internal/unit/unittestUtils.ts b/sdk/servicebus/service-bus/test/internal/unit/unittestUtils.ts index 65ae0400579a..9028aa432417 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/unittestUtils.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/unittestUtils.ts @@ -286,7 +286,8 @@ export function addTestStreamingReceiver(): ( options = { lockRenewer: undefined, receiveMode: "peekLock", - maxConcurrentCalls: 101 + maxConcurrentCalls: 101, + skipParsingBodyAsJson: false }; }