Skip to content

Commit

Permalink
[EventHubs] Port idempotent Publishing support from feature branch (#…
Browse files Browse the repository at this point in the history
…20156)

* Cherry-Pick 2e45f32 from feature/event-hubs-idempotent-producer

[event-hubs] idempotent producer support (#14100)

* [event-hubs] adds EventHubProducerClientOptions and enableIdempotentPartitions field to options

* [event-hubs] enforces specifying partitionId and ommitting partitionKey when EventHubProducerClient has idempotent partitions enabled.

* [event-hubs] idempotent producer enforces 1 concurrent send per partition

* [event-hubs] set com.microsoft:idempotent-producer desired capability on sender link when idempotent partitions is enabled

* [event-hubs] add getPartitionPublishingProperties method to EventHubProducerClient

* [event-hubs] cache partition publisher properties on EventHubSender

* [event-hubs] support updating event data with idempotent properties while sending events

* [event-hubs] update package version to 5.5.0-beta.1

* update event-hubs dep in eventhubs-checkpointstore-blob to the beta version

* adds partitionOptions to EventHubProducerClientOptions so users can specify their own idempotent publishing state

* idempotent producer maintains state between reconnects

* add tests for EventDataBatch.startingPublishedSequenceNumber

* commit published sequence number to EventData user passes in

* support user specifying partial partitionOptions and SequenceOutOfOrder errors

* throws error if user attempts to send an already published EventDataBatch of EventData[]

* appease ci for eventhubs-checkpointstore-blob

* remove eslint comments, change "while" to "when" in error messages, and remove code snippet

* updates idempotent error message when at least some events are published to be more accurate

* centralize partition setting validation

* extract extractPartitionAssignment+ methods into their own functions

* improve comment around tracingProperties in sendBatch

* update documentation for PartitionPublishingOptions and PartitionPublishingProperties

* extract some functions out and write tests!

* update tests with new validation error messages

* Rename the idempotent publish test file for now.

* Move new options properties into buffered producer client

* Account for the newly added `getRawAmqpMessage()` which is not required for idempotent publishing

* Remove redundant encode() call that has been done in toRheaMessage()

* Adapt to EventHubSenderOptions change

* React to EventDataBatchImpl change

* opentelemetry => core-tracing

* Move the test file back

* Move idempotent publishing test to internal and make it build.  private options are set after casting the producer client to any

* Port idempotent publishing code from feature/event-hubs-idempotent-producer and remove `generateEventTraceProperty()` that is no longer needed because `instrumentEventData()` already sets the properties.

* Update api.md

* Hook up buffered producer's idempotent publishing options with the internal standard producer

* Add a test for idempotent publishing options

* Expose `getPartitionPublishingProperties()` on buffered producer instead

* Update CHANGELOG

* Remove unused EventHubProducerClientOptions

* Remove CHANGELOG entry as this is an advanced scenario

* Make PartitionPublishingOptions and PartitionPublishingProperties private

* Hide internal properties that we don't want to surface

Co-authored-by: chradek <[email protected]>
  • Loading branch information
jeremymeng and chradek authored Feb 7, 2022
1 parent 521a3be commit 6b7e478
Show file tree
Hide file tree
Showing 18 changed files with 2,019 additions and 135 deletions.
5 changes: 1 addition & 4 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,8 @@ export interface EventDataAdapterParameters {
// @public
export interface EventDataBatch {
readonly count: number;
// @internal
_generateMessage(): Buffer;
readonly maxSizeInBytes: number;
// @internal
readonly _messageSpanContexts: SpanContext[];
// @internal
readonly partitionId?: string;
// @internal
readonly partitionKey?: string;
Expand All @@ -125,6 +121,7 @@ export class EventHubBufferedProducerClient {

// @public
export interface EventHubBufferedProducerClientOptions extends EventHubClientOptions {
enableIdempotentPartitions?: boolean;
maxEventBufferLengthPerPartition?: number;
maxWaitTimeInMs?: number;
onSendEventsErrorHandler: (ctx: OnSendEventsErrorContext) => Promise<void>;
Expand Down
104 changes: 103 additions & 1 deletion sdk/eventhub/event-hubs/src/eventData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,18 @@

import { AmqpAnnotatedMessage, Constants } from "@azure/core-amqp";
import { BodyTypes, defaultDataTransformer } from "./dataTransformer";
import { DeliveryAnnotations, MessageAnnotations, Message as RheaMessage } from "rhea-promise";
import {
DeliveryAnnotations,
MessageAnnotations,
Message as RheaMessage,
types,
} from "rhea-promise";
import { isDefined, isObjectWithProperties, objectHasProperty } from "./util/typeGuards";
import {
idempotentProducerAmqpPropertyNames,
PENDING_PUBLISH_SEQ_NUM_SYMBOL,
} from "./util/constants";
import { EventDataBatch, EventDataBatchImpl, isEventDataBatch } from "./eventDataBatch";

/**
* Describes the delivery annotations.
Expand Down Expand Up @@ -135,6 +145,16 @@ export interface EventDataInternal {
* Returns the underlying raw amqp message.
*/
getRawAmqpMessage(): AmqpAnnotatedMessage;
/**
* The pending publish sequence number, set while the event
* is being published with idempotent partitions enabled.
*/
[PENDING_PUBLISH_SEQ_NUM_SYMBOL]?: number;
/**
* The sequence number the event was published with
* when idempotent partitions are enabled.
*/
_publishedSequenceNumber?: number;
}

const messagePropertiesMap = {
Expand Down Expand Up @@ -473,3 +493,85 @@ function convertDatesToNumbers<T = unknown>(thing: T): T {

return thing;
}

/**
* @internal
*/
export interface PopulateIdempotentMessageAnnotationsParameters {
isIdempotentPublishingEnabled: boolean;
ownerLevel?: number;
producerGroupId?: number;
publishSequenceNumber?: number;
}

/**
* Populates a rhea message with idempotent producer properties.
* @internal
*/
export function populateIdempotentMessageAnnotations(
rheaMessage: RheaMessage,
{
isIdempotentPublishingEnabled,
ownerLevel,
producerGroupId,
publishSequenceNumber,
}: PopulateIdempotentMessageAnnotationsParameters
): void {
if (!isIdempotentPublishingEnabled) {
return;
}

const messageAnnotations = rheaMessage.message_annotations || {};
if (!rheaMessage.message_annotations) {
rheaMessage.message_annotations = messageAnnotations;
}

if (isDefined(ownerLevel)) {
messageAnnotations[idempotentProducerAmqpPropertyNames.epoch] = types.wrap_short(ownerLevel);
}
if (isDefined(producerGroupId)) {
messageAnnotations[idempotentProducerAmqpPropertyNames.producerId] =
types.wrap_long(producerGroupId);
}
if (isDefined(publishSequenceNumber)) {
messageAnnotations[idempotentProducerAmqpPropertyNames.producerSequenceNumber] =
types.wrap_int(publishSequenceNumber);
}
}

/**
* Commits the pending publish sequence number events.
* EventDataBatch exposes this as `startingPublishSequenceNumber`,
* EventData not in a batch exposes this as `publishedSequenceNumber`.
* @internal
*/
export function commitIdempotentSequenceNumbers(
events: Omit<EventDataInternal, "getRawAmqpMessage">[] | EventDataBatch
): void {
if (isEventDataBatch(events)) {
(events as EventDataBatchImpl)._commitPublish();
} else {
// For each event, set the `publishedSequenceNumber` equal to the sequence number
// we set when we attempted to send the events to the service.
for (const event of events) {
event._publishedSequenceNumber = event[PENDING_PUBLISH_SEQ_NUM_SYMBOL];
delete event[PENDING_PUBLISH_SEQ_NUM_SYMBOL];
}
}
}

/**
* Rolls back any pending publish sequence number in the events.
* @internal
*/
export function rollbackIdempotentSequenceNumbers(
events: Omit<EventDataInternal, "getRawAmqpMessage">[] | EventDataBatch
): void {
if (isEventDataBatch(events)) {
/* No action required. */
} else {
for (const event of events) {
delete event[PENDING_PUBLISH_SEQ_NUM_SYMBOL];
}
}
}
145 changes: 120 additions & 25 deletions sdk/eventhub/event-hubs/src/eventDataBatch.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { EventData, toRheaMessage } from "./eventData";
import { MessageAnnotations, Message as RheaMessage, message } from "rhea-promise";
import { Span, SpanContext } from "@azure/core-tracing";
import { isDefined, isObjectWithProperties } from "./util/typeGuards";
import { AmqpAnnotatedMessage } from "@azure/core-amqp";
import { EventData, populateIdempotentMessageAnnotations, toRheaMessage } from "./eventData";
import { ConnectionContext } from "./connectionContext";
import { MessageAnnotations, message, Message as RheaMessage } from "rhea-promise";
import { Span, SpanContext } from "@azure/core-tracing";
import { isDefined, isObjectWithProperties } from "./util/typeGuards";
import { OperationTracingOptions } from "@azure/core-tracing";
import { convertTryAddOptionsForCompatibility } from "./diagnostics/tracing";
import { instrumentEventData } from "./diagnostics/instrumentEventData";
import { throwTypeErrorIfParameterMissing } from "./util/error";
import { PartitionPublishingProperties } from "./models/private";

/**
* The amount of bytes to reserve as overhead for a small message.
Expand Down Expand Up @@ -110,22 +111,6 @@ export interface EventDataBatch {
* @returns A boolean value indicating if the event data has been added to the batch or not.
*/
tryAdd(eventData: EventData | AmqpAnnotatedMessage, options?: TryAddOptions): boolean;

/**
* The AMQP message containing encoded events that were added to the batch.
* Used internally by the `sendBatch()` method on the `EventHubProducerClient`.
* This is not meant for the user to use directly.
*
* @internal
*/
_generateMessage(): Buffer;

/**
* Gets the "message" span contexts that were created when adding events to the batch.
* Used internally by the `sendBatch()` method to set up the right spans in traces if tracing is enabled.
* @internal
*/
readonly _messageSpanContexts: SpanContext[];
}

/**
Expand Down Expand Up @@ -176,6 +161,22 @@ export class EventDataBatchImpl implements EventDataBatch {
* A common annotation is the partition key.
*/
private _batchAnnotations?: MessageAnnotations;
/**
* Indicates that the batch should be treated as idempotent.
*/
private _isIdempotent: boolean;
/**
* The sequence number assigned to the first event in the batch while
* the batch is being sent to the service.
*/
private _pendingStartingSequenceNumber?: number;
/**
* The publishing sequence number assigned to the first event in the batch at the time
* the batch was successfully published.
* If the producer was not configured to apply sequence numbering or if the batch
* has not yet been successfully published, the value will be `undefined`.
*/
private _startingPublishSequenceNumber?: number;

/**
* EventDataBatch should not be constructed using `new EventDataBatch()`
Expand All @@ -185,11 +186,13 @@ export class EventDataBatchImpl implements EventDataBatch {
constructor(
context: ConnectionContext,
maxSizeInBytes: number,
isIdempotent: boolean,
partitionKey?: string,
partitionId?: string
) {
this._context = context;
this._maxSizeInBytes = maxSizeInBytes;
this._isIdempotent = isIdempotent;
this._partitionKey = isDefined(partitionKey) ? String(partitionKey) : partitionKey;
this._partitionId = isDefined(partitionId) ? String(partitionId) : partitionId;
this._sizeInBytes = 0;
Expand Down Expand Up @@ -239,6 +242,16 @@ export class EventDataBatchImpl implements EventDataBatch {
return this._count;
}

/**
* The publishing sequence number assigned to the first event in the batch at the time
* the batch was successfully published.
* If the producer was not configured to apply sequence numbering or if the batch
* has not yet been successfully published, the value will be `undefined`.
*/
get startingPublishedSequenceNumber(): number | undefined {
return this._startingPublishSequenceNumber;
}

/**
* Gets the "message" span contexts that were created when adding events to the batch.
* @internal
Expand All @@ -251,8 +264,27 @@ export class EventDataBatchImpl implements EventDataBatch {
* Generates an AMQP message that contains the provided encoded events and annotations.
* @param encodedEvents - The already encoded events to include in the AMQP batch.
* @param annotations - The message annotations to set on the batch.
* @param publishingProps - Idempotent publishing properties used to decorate the events in the batch while sending.
*/
private _generateBatch(encodedEvents: Buffer[], annotations?: MessageAnnotations): Buffer {
private _generateBatch(
encodedEvents: Buffer[],
annotations: MessageAnnotations | undefined,
publishingProps?: PartitionPublishingProperties
): Buffer {
if (this._isIdempotent && publishingProps) {
// We need to decode the encoded events, add the idempotent annotations, and re-encode them.
// We can't lazily encode the events because we rely on `message.encode` to capture the
// byte length of anything not in `event.body`.
// Events can't be decorated ahead of time because the publishing properties aren't known
// until the events are being sent to the service.
const decodedEvents = encodedEvents.map(message.decode) as unknown as RheaMessage[];
const decoratedEvents = this._decorateRheaMessagesWithPublishingProps(
decodedEvents,
publishingProps
);
encodedEvents = decoratedEvents.map(message.encode);
}

const batchEnvelope: RheaMessage = {
body: message.data_sections(encodedEvents),
};
Expand All @@ -262,6 +294,58 @@ export class EventDataBatchImpl implements EventDataBatch {
return message.encode(batchEnvelope);
}

/**
* Uses the publishingProps to add idempotent properties as message annotations to rhea messages.
*/
private _decorateRheaMessagesWithPublishingProps(
events: RheaMessage[],
publishingProps: PartitionPublishingProperties
): RheaMessage[] {
if (!this._isIdempotent) {
return events;
}

const { lastPublishedSequenceNumber = 0, ownerLevel, producerGroupId } = publishingProps;
const startingSequenceNumber = lastPublishedSequenceNumber + 1;
for (let i = 0; i < events.length; i++) {
const event = events[i];
populateIdempotentMessageAnnotations(event, {
isIdempotentPublishingEnabled: this._isIdempotent,
ownerLevel,
producerGroupId,
publishSequenceNumber: startingSequenceNumber + i,
});
}

this._pendingStartingSequenceNumber = startingSequenceNumber;
return events;
}

/**
* Annotates a rhea message with placeholder idempotent properties if the batch is idempotent.
* This is necessary so that we can accurately calculate the size of the batch while adding events.
* Placeholder values are used because real values won't be known until we attempt to send the batch.
*/
private _decorateRheaMessageWithPlaceholderIdempotencyProps(event: RheaMessage): RheaMessage {
if (!this._isIdempotent) {
return event;
}

if (!event.message_annotations) {
event.message_annotations = {};
}

// Set placeholder values for these annotations.
populateIdempotentMessageAnnotations(event, {
isIdempotentPublishingEnabled: this._isIdempotent,
ownerLevel: 0,
publishSequenceNumber: 0,
producerGroupId: 0,
});

return event;
}

/**
* Generates the single AMQP message which is the result of encoding all the events
* added into the `EventDataBatch` instance.
Expand All @@ -272,8 +356,15 @@ export class EventDataBatchImpl implements EventDataBatch {
* this single batched AMQP message is what gets sent over the wire to the service.
* @readonly
*/
_generateMessage(): Buffer {
return this._generateBatch(this._encodedMessages, this._batchAnnotations);
_generateMessage(publishingProps?: PartitionPublishingProperties): Buffer {
return this._generateBatch(this._encodedMessages, this._batchAnnotations, publishingProps);
}

/**
* Sets startingPublishSequenceNumber to the pending publish sequence number.
*/
_commitPublish(): void {
this._startingPublishSequenceNumber = this._pendingStartingSequenceNumber;
}

/**
Expand All @@ -298,15 +389,19 @@ export class EventDataBatchImpl implements EventDataBatch {

// Convert EventData to RheaMessage.
const amqpMessage = toRheaMessage(instrumentedEvent, this._partitionKey);
const originalAnnotations = amqpMessage.message_annotations && {
...amqpMessage.message_annotations,
};
this._decorateRheaMessageWithPlaceholderIdempotencyProps(amqpMessage);
const encodedMessage = message.encode(amqpMessage);

let currentSize = this._sizeInBytes;
// The first time an event is added, we need to calculate
// the overhead of creating an AMQP batch, including the
// message_annotations that are taken from the 1st message.
if (this.count === 0) {
if (amqpMessage.message_annotations) {
this._batchAnnotations = amqpMessage.message_annotations;
if (originalAnnotations) {
this._batchAnnotations = originalAnnotations;
}

// Figure out the overhead of creating a batch by generating an empty batch
Expand Down
12 changes: 12 additions & 0 deletions sdk/eventhub/event-hubs/src/eventHubBufferedProducerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ export interface EventHubBufferedProducerClientOptions extends EventHubClientOpt
* The handler to call when a batch fails to publish.
*/
onSendEventsErrorHandler: (ctx: OnSendEventsErrorContext) => Promise<void>;
/**
* Indicates whether or not the EventHubProducerClient should enable idempotent publishing to Event Hub partitions.
* If enabled, the producer will only be able to publish directly to partitions;
* it will not be able to publish to the Event Hubs gateway for automatic partition routing
* nor will it be able to use a partition key.
* Default: false
*/
enableIdempotentPartitions?: boolean;
}

/**
Expand Down Expand Up @@ -271,6 +279,10 @@ export class EventHubBufferedProducerClient {
);
this._clientOptions = { ...options4! };
}

// setting internal idempotent publishing option on the standard producer.
(this._producer as any)._enableIdempotentPartitions =
this._clientOptions.enableIdempotentPartitions;
}

/**
Expand Down
Loading

0 comments on commit 6b7e478

Please sign in to comment.