-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[EventHubs] Port idempotent Publishing support from feature branch #20156
Conversation
[event-hubs] idempotent producer support (Azure#14100) * [event-hubs] adds EventHubProducerClientOptions and enableIdempotentPartitions field to options * [event-hubs] enforces specifying partitionId and ommitting partitionKey when EventHubProducerClient has idempotent partitions enabled. * [event-hubs] idempotent producer enforces 1 concurrent send per partition * [event-hubs] set com.microsoft:idempotent-producer desired capability on sender link when idempotent partitions is enabled * [event-hubs] add getPartitionPublishingProperties method to EventHubProducerClient * [event-hubs] cache partition publisher properties on EventHubSender * [event-hubs] support updating event data with idempotent properties while sending events * [event-hubs] update package version to 5.5.0-beta.1 * update event-hubs dep in eventhubs-checkpointstore-blob to the beta version * adds partitionOptions to EventHubProducerClientOptions so users can specify their own idempotent publishing state * idempotent producer maintains state between reconnects * add tests for EventDataBatch.startingPublishedSequenceNumber * commit published sequence number to EventData user passes in * support user specifying partial partitionOptions and SequenceOutOfOrder errors * throws error if user attempts to send an already published EventDataBatch of EventData[] * appease ci for eventhubs-checkpointstore-blob * remove eslint comments, change "while" to "when" in error messages, and remove code snippet * updates idempotent error message when at least some events are published to be more accurate * centralize partition setting validation * extract extractPartitionAssignment+ methods into their own functions * improve comment around tracingProperties in sendBatch * update documentation for PartitionPublishingOptions and PartitionPublishingProperties * extract some functions out and write tests! * update tests with new validation error messages
which is not required for idempotent publishing
This reverts commit 019f6c5.
private options are set after casting the producer client to any
…oducer And remove `generateEventTraceProperty()` that is no longer needed as `instrumentEventData()` already sets the properties.
…ternal standard producer
Seems that I will need to disable |
as the mock hub doesn't support this feature. Also fix incorrect test assert
API changes have been detected in API changes + readonly publishedSequenceNumber?: number;
+ readonly publishedSequenceNumber?: number;
+ _commitPublish(): void;
+ _generateMessage(publishingProps?: PartitionPublishingProperties): Buffer;
+ readonly _messageSpanContexts: SpanContext[];
+ readonly startingPublishedSequenceNumber?: number;
+ getPartitionPublishingProperties(partitionId: string, options?: OperationOptions): Promise<PartitionPublishingProperties>;
+ enableIdempotentPartitions?: boolean; |
API changes have been detected in API changes + _commitPublish(): void;
+ _generateMessage(publishingProps?: PartitionPublishingProperties): Buffer;
+ readonly _messageSpanContexts: SpanContext[];
+ enableIdempotentPartitions?: boolean; |
API changes have been detected in API changes + readonly _messageSpanContexts: SpanContext[];
+ enableIdempotentPartitions?: boolean; |
readonly maxSizeInBytes: number; | ||
// @internal | ||
readonly _messageSpanContexts: SpanContext[]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't SpanContext
still in beta?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's imported from core-tracing. This code has been here for a few months already. Anyway, Event Hubs package is currently in beta too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. I still wonder if it would be possible to remove this from public surface, but I'm not sure I understand the scenario
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks that it is for internal use, I will try hiding it.
// Events can't be decorated ahead of time because the publishing properties aren't known | ||
// until the events are being sent to the service. | ||
const decodedEvents = encodedEvents.map(message.decode) as unknown as RheaMessage[]; | ||
const decoratedEvents = this._decorateRheaMessagesWithPublishingProps( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is it called decorated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would have to ask Chris :) I guess it means adding something on top of rhea message (decorator pattern?)
|
||
const properties: PartitionPublishingProperties = { | ||
isIdempotentPublishingEnabled: this._isIdempotentProducer, | ||
partitionId: this.partitionId ?? "", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure what defaulting to an empty string for a partition ID means
if (typeof partitionId === "number") { | ||
partitionId = String(partitionId); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is weird given that the type of partitionId
is string
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've seen many runtime checks in Service Bus code. this might be one of them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah these checks seem to be leftovers from JavaScript code. I would move/remove them as appropriate because if we do not trust our types, then it is meaningless to write them down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR is quite big but I the logic looks good and the tests provide good coverage of the feature. I left two comments.
API changes have been detected in API changes + enableIdempotentPartitions?: boolean; |
Merging this. I will address any additional feedback in later PRs. |
Packages impacted by this PR
@azure/event-hubs
Issues associated with this PR
#18671
This ports PR #14100 from feature branch. Major work on top of #14100 include
getPartitionPublishingProperties()
on buffered producer, which is just a pass-through wrapper of the private method on standard producer.