Skip to content

Commit

Permalink
[event-hubs] add AmqpAnnotatedMessage support (#15939)
Browse files Browse the repository at this point in the history
* [event-hubs] add AmqpAnnotatedMessage support

* undo experimental deletion of decode method from service bus

* address feedback
  • Loading branch information
chradek authored Jun 30, 2021
1 parent bbd295a commit 19d0e76
Show file tree
Hide file tree
Showing 17 changed files with 1,228 additions and 178 deletions.
8 changes: 7 additions & 1 deletion sdk/eventhub/event-hubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,19 @@
## 5.6.0 (Unreleased)

### Features Added

- With the dropping of support for Node.js versions that are no longer in LTS, the dependency on `@types/node` has been updated to version 12. Read our [support policy](https://github.com/Azure/azure-sdk-for-js/blob/main/SUPPORT.md) for more details.

- Adds the `contentType`, `correlationId`, and `messageId` AMQP properties as top-level fields on `EventData` and `ReceivedEventData`.

- Enable encoding the body of a message to the 'value' or 'sequence' sections (via AmqpAnnotatedMessage.bodyType). Using this encoding is not required but does allow you to take advantage of native AMQP serialization for supported primitives or sequences.

More information about the AMQP message body type can be found in the AMQP specification: [link](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format)

### Breaking Changes

### Key Bugs Fixed


## 5.5.2 (2021-06-10)

### Bug fixes
Expand Down
1 change: 1 addition & 0 deletions sdk/eventhub/event-hubs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@
"assert": "^1.4.1",
"chai": "^4.2.0",
"chai-as-promised": "^7.1.1",
"chai-exclude": "^2.0.2",
"chai-string": "^1.5.0",
"cross-env": "^7.0.2",
"debug": "^4.1.1",
Expand Down
12 changes: 10 additions & 2 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
```ts

import { AbortSignalLike } from '@azure/abort-controller';
import { AmqpAnnotatedMessage } from '@azure/core-amqp';
import { MessagingError } from '@azure/core-amqp';
import { NamedKeyCredential } from '@azure/core-auth';
import { OperationTracingOptions } from '@azure/core-tracing';
Expand Down Expand Up @@ -54,6 +55,9 @@ export const earliestEventPosition: EventPosition;
// @public
export interface EventData {
body: any;
contentType?: string;
correlationId?: string | number | Buffer;
messageId?: string | number | Buffer;
properties?: {
[key: string]: any;
};
Expand All @@ -72,7 +76,7 @@ export interface EventDataBatch {
// @internal
readonly partitionKey?: string;
readonly sizeInBytes: number;
tryAdd(eventData: EventData, options?: TryAddOptions): boolean;
tryAdd(eventData: EventData | AmqpAnnotatedMessage, options?: TryAddOptions): boolean;
}

// @public
Expand Down Expand Up @@ -129,7 +133,7 @@ export class EventHubProducerClient {
getEventHubProperties(options?: GetEventHubPropertiesOptions): Promise<EventHubProperties>;
getPartitionIds(options?: GetPartitionIdsOptions): Promise<Array<string>>;
getPartitionProperties(partitionId: string, options?: GetPartitionPropertiesOptions): Promise<PartitionProperties>;
sendBatch(batch: EventData[], options?: SendBatchOptions): Promise<void>;
sendBatch(batch: EventData[] | AmqpAnnotatedMessage[], options?: SendBatchOptions): Promise<void>;
sendBatch(batch: EventDataBatch, options?: OperationOptions): Promise<void>;
}

Expand Down Expand Up @@ -239,7 +243,11 @@ export type ProcessInitializeHandler = (context: PartitionContext) => Promise<vo
// @public
export interface ReceivedEventData {
body: any;
contentType?: string;
correlationId?: string | number | Buffer;
enqueuedTimeUtc: Date;
getRawAmqpMessage(): AmqpAnnotatedMessage;
messageId?: string | number | Buffer;
offset: number;
partitionKey: string | null;
properties?: {
Expand Down
143 changes: 107 additions & 36 deletions sdk/eventhub/event-hubs/src/dataTransformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,19 @@ import { message } from "rhea-promise";
import isBuffer from "is-buffer";
import { Buffer } from "buffer";
import { logErrorStackTrace, logger } from "./log";
import { isObjectWithProperties } from "./util/typeGuards";

/**
* The allowed AMQP message body types.
* @internal
*/
export type BodyTypes = "data" | "value" | "sequence";

/** @internal */
export const dataSectionTypeCode = 0x75 as const;
/** @internal */
export const sequenceSectionTypeCode = 0x76 as const;
/** @internal */
export const valueSectionTypeCode = 0x77 as const;

/**
* The default data transformer that will be used by the Azure SDK.
Expand All @@ -17,21 +29,28 @@ export const defaultDataTransformer = {
* and returns an encoded body (some form of AMQP type).
*
* @param body - The AMQP message body
* @returns The encoded AMQP message body as an AMQP Data type
* (data section in rhea terms). Section object with following properties:
* - typecode: 117 (0x75)
* - content: The given AMQP message body as a Buffer.
* - multiple: true | undefined.
* @param bodyType - The AMQP section to story the body in.
* @returns The encoded AMQP message body as an AMQP Data/Sequence/Value section.
*/
encode(body: unknown): any {
encode(body: unknown, bodyType: BodyTypes): any {
let result: any;
if (isBuffer(body)) {
// string, undefined, null, boolean, array, object, number should end up here
// coercing undefined to null as that will ensure that null value will be given to the
// customer on receive.
if (body === undefined) body = null;

if (bodyType === "value") {
// TODO: Expose value_section from `rhea` similar to the data_section and sequence_section.
// Right now there isn't a way to create a value section officially.
result = message.data_section(body);
result.typecode = valueSectionTypeCode;
} else if (bodyType === "sequence") {
result = message.sequence_section(body);
} else if (isBuffer(body)) {
result = message.data_section(body);
} else if (body === null && bodyType === "data") {
result = message.data_section(null);
} else {
// string, undefined, null, boolean, array, object, number should end up here
// coercing undefined to null as that will ensure that null value will be given to the
// customer on receive.
if (body === undefined) body = null;
try {
const bodyStr = JSON.stringify(body);
result = message.data_section(Buffer.from(bodyStr, "utf8"));
Expand All @@ -49,38 +68,90 @@ export const defaultDataTransformer = {
},

/**
* A function that takes the body property from an AMQP message
* (an AMQP Data type (data section in rhea terms)) and returns the decoded message body.
* If it cannot decode the body then it returns the body
* as-is.
* @param body - The AMQP message body
* @returns decoded body or the given body as-is.
* A function that takes the body property from an AMQP message, which can come from either
* the 'data', 'value' or 'sequence' sections of an AMQP message.
*
* If the body is not a JSON string the the raw contents will be returned, along with the bodyType
* indicating which part of the AMQP message the body was decoded from.
*
* @param body - The AMQP message body as received from rhea.
* @returns The decoded/raw body and the body type.
*/
decode(body: unknown): any {
let processedBody: any = body;
decode(body: unknown | RheaAmqpSection): { body: unknown; bodyType: BodyTypes } {
try {
if (isObjectWithProperties(body, ["content"]) && isBuffer(body.content)) {
// This indicates that we are getting the AMQP described type. Let us try decoding it.
processedBody = body.content;
}
try {
// Trying to stringify and JSON.parse() anything else will fail flat and we shall return
// the original type back
const bodyStr: string = processedBody.toString("utf8");
processedBody = JSON.parse(bodyStr);
} catch (err) {
logger.verbose(
"[decode] An error occurred while trying JSON.parse() on the received body. " +
"The error is %O",
err
);
if (isRheaAmqpSection(body)) {
switch (body.typecode) {
case dataSectionTypeCode:
return { body: tryToJsonDecode(body.content), bodyType: "data" };
case sequenceSectionTypeCode:
return { body: body.content, bodyType: "sequence" };
case valueSectionTypeCode:
return { body: body.content, bodyType: "value" };
}
} else {
if (isBuffer(body)) {
return { body: tryToJsonDecode(body), bodyType: "data" };
}

return { body, bodyType: "value" };
}
} catch (err) {
logger.verbose(
"[decode] An error occurred while decoding the received message body. The error is: %O",
err
);
throw err;
}
return processedBody;
}
};

/**
* Attempts to decode 'body' as a JSON string. If it fails it returns body
* verbatim.
*
* @param body - An AMQP message body.
* @returns A JSON decoded object, or body if body was not a JSON string.
*
* @internal
*/
function tryToJsonDecode(body: unknown): unknown {
let processedBody: any = body;
try {
// Trying to stringify and JSON.parse() anything else will fail flat and we shall return
// the original type back
const bodyStr: string = processedBody.toString("utf8");
processedBody = JSON.parse(bodyStr);
} catch (err) {
logger.verbose(
"[decode] An error occurred while trying JSON.parse() on the received body. The error is %O",
err
);
}
return processedBody;
}

/**
* Mirror of the internal Section interface in rhea.
*
* @internal
*/
export interface RheaAmqpSection {
typecode:
| typeof dataSectionTypeCode
| typeof sequenceSectionTypeCode
| typeof valueSectionTypeCode;
content: any;
}

/** @internal */
export function isRheaAmqpSection(
possibleSection: any | RheaAmqpSection
): possibleSection is RheaAmqpSection {
return (
possibleSection != null &&
typeof possibleSection.typecode === "number" &&
(possibleSection.typecode === dataSectionTypeCode ||
possibleSection.typecode === valueSectionTypeCode ||
possibleSection.typecode === sequenceSectionTypeCode)
);
}
26 changes: 20 additions & 6 deletions sdk/eventhub/event-hubs/src/diagnostics/instrumentEventData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

import { extractSpanContextFromTraceParentHeader, getTraceParentHeader } from "@azure/core-tracing";
import { Span, SpanContext } from "@azure/core-tracing";
import { EventData } from "../eventData";
import { AmqpAnnotatedMessage } from "@azure/core-amqp";
import { EventData, isAmqpAnnotatedMessage } from "../eventData";

/**
* @hidden
Expand All @@ -14,20 +15,33 @@ export const TRACEPARENT_PROPERTY = "Diagnostic-Id";
* Populates the `EventData` with `SpanContext` info to support trace propagation.
* Creates and returns a copy of the passed in `EventData` unless the `EventData`
* has already been instrumented.
* @param eventData - The `EventData` to instrument.
* @param eventData - The `EventData` or `AmqpAnnotatedMessage` to instrument.
* @param span - The `Span` containing the context to propagate tracing information.
*/
export function instrumentEventData(eventData: EventData, span: Span): EventData {
if (eventData.properties && eventData.properties[TRACEPARENT_PROPERTY]) {
export function instrumentEventData(
eventData: EventData | AmqpAnnotatedMessage,
span: Span
): EventData {
const props = isAmqpAnnotatedMessage(eventData)
? eventData.applicationProperties
: eventData.properties;

if (props && props[TRACEPARENT_PROPERTY]) {
return eventData;
}

const copiedProps = { ...props };

// create a copy so the original isn't modified
eventData = { ...eventData, properties: { ...eventData.properties } };
if (isAmqpAnnotatedMessage(eventData)) {
eventData = { ...eventData, applicationProperties: copiedProps };
} else {
eventData = { ...eventData, properties: copiedProps };
}

const traceParent = getTraceParentHeader(span.spanContext());
if (traceParent) {
eventData.properties![TRACEPARENT_PROPERTY] = traceParent;
copiedProps[TRACEPARENT_PROPERTY] = traceParent;
}

return eventData;
Expand Down
Loading

0 comments on commit 19d0e76

Please sign in to comment.