Skip to content

Commit

Permalink
[Event Hubs Client] Track Two: Second Preview (Event Batching)
Browse files Browse the repository at this point in the history
  General
    - Ran formatting over the project, which now impacts the track one code,
      since it is embedded an no longer an external reference.

    - Adopted and revised the diagnostic event source used for logging in the
      track one code; the areas of instrumentation were maintained for
      consistency, but the messaging and data has been refactored for
      track two conventions.

    - Tweak to event consumer example in ReadMe, in response to feedback
      from the "Getting Started" review.

  Event Batching
    - Created infrastructure for an event batch, delegating to a transport-
      specific batch implementation.

    - Implemented batching for the AMQP transport.

  Publishing Events
    - Enhanced event publishing to accept an event batch as the set of
      events to be published.

  Live Tests
    - Tweaked some timing and sizing for large message tests to help
      stablize during nightly runs on non-Windows platforms.

    - Loosened criteria for invalid proxy tests, as the exception types are
      not consistent across different operating systems.

  AMQP Foundation
    - Translated message and infrastructure-related constants, grouping them
      into containing classes intended to be semantically relevant to their
      intended use and meaning.

    - Implemented translation of Event Data into the corresponding AMQP
      message format.
  • Loading branch information
jsquire committed Jul 24, 2019
1 parent fcae227 commit 2296a55
Show file tree
Hide file tree
Showing 56 changed files with 4,077 additions and 199 deletions.
4 changes: 2 additions & 2 deletions sdk/eventhub/Azure.Messaging.EventHubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ await using (EventHubProducer producer = client.CreateProducer())

### Consume events from an Event Hub

In order to consume events, you'll need to create an `EventHubConsumer` for a specific partition and consumer group combination. When an Event Hub is created, it starts with a default consumer group that can be used to get started. A consumer also needs to specify where in the event stream to begin receiving events; in our example, we will focus on reading new events as they are published.
In order to consume events, you'll need to create an `EventHubConsumer` for a specific partition and consumer group combination. When an Event Hub is created, it starts with a default consumer group that can be used to get started. A consumer also needs to specify where in the event stream to begin receiving events; in our example, we will focus on reading all published events in a partition.

```csharp
var connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
Expand All @@ -111,7 +111,7 @@ await using (var client = new EventHubClient(connectionString, eventHubName))
{
string firstPartition = (await client.GetPartitionIdsAsync()).First();
string consumerGroup = EventHubConsumer.DefaultConsumerGroup;
EventPosition startingPosition = EventPosition.Latest;
EventPosition startingPosition = EventPosition.Earliest;

await using (EventHubConsumer consumer = client.CreateConsumer(consumerGroup, firstPartition, startingPosition))
{
Expand Down
28 changes: 28 additions & 0 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpAnnotation.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

namespace Azure.Messaging.EventHubs.Amqp
{
/// <summary>
/// The set of well-known annotations associated with an AMQP messages and
/// entities.
/// </summary>
///
internal static class AmqpAnnotation
{
/// <summary>The date and time, in UTC, that a message was enqueued.</summary>
public const string EnqueuedTime = "x-opt-enqueued-time";

/// <summary>The sequence number assigned to a message.</summary>
public const string SequenceNumber = "x-opt-sequence-number";

/// <summary>The offset of a message within a given partition.</summary>
public const string Offset = "x-opt-offset";

/// <summary>The name of the entity that published a message.</summary>
public const string Publisher = "x-opt-publisher";

/// <summary>The partition hashing key used for grouping a batch of events together with the intent of routing to a single partition.</summary>
public const string PartitionKey = "x-opt-partition-key";
}
}
58 changes: 58 additions & 0 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpError.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Encoding;

namespace Azure.Messaging.EventHubs
{
/// <summary>
/// The set of well-known error codes associated with an AMQP messages and
/// entities.
/// </summary>
///
internal static class AmqpError
{
/// <summary>Indicates that a timeout occurred on the link.</summary>
public static readonly AmqpSymbol TimeoutError = AmqpConstants.Vendor + ":timeout";

/// <summary>Indicates that a message is no longer available.</summary>
public static readonly AmqpSymbol MessageLockLostError = AmqpConstants.Vendor + ":message-lock-lost";

/// <summary>Indicates that a session is no longer available.</summary>
public static readonly AmqpSymbol SessionLockLostError = AmqpConstants.Vendor + ":session-lock-lost";

/// <summary>Indicates that a store is no longer available.</summary>
public static readonly AmqpSymbol StoreLockLostError = AmqpConstants.Vendor + ":store-lock-lost";

/// <summary>Indicates that a session is no longer available.</summary>
public static readonly AmqpSymbol SessionCannotBeLockedError = AmqpConstants.Vendor + ":session-cannot-be-locked";

/// <summary>Indicates that a referenced subscription is no longer available.</summary>
public static readonly AmqpSymbol NoMatchingSubscriptionError = AmqpConstants.Vendor + ":no-matching-subscription";

/// <summary>Indicates that the server was busy and could not allow the requested operation.</summary>
public static readonly AmqpSymbol ServerBusyError = AmqpConstants.Vendor + ":server-busy";

/// <summary>Indicates that an argument provided to the Event Hubs service was incorrect.</summary>
public static readonly AmqpSymbol ArgumentError = AmqpConstants.Vendor + ":argument-error";

/// <summary>Indicates that an argument provided to the Event Hubs service was incorrect.</summary>
public static readonly AmqpSymbol ArgumentOutOfRangeError = AmqpConstants.Vendor + ":argument-out-of-range";

/// <summary>Indicates that the consumer requesting an operation does not own the associated partition.</summary>
public static readonly AmqpSymbol PartitionNotOwnedError = AmqpConstants.Vendor + ":partition-not-owned";

/// <summary>Indicates that the requested Event Hubs resource is disabled.</summary>
public static readonly AmqpSymbol ResourceDisabledError = AmqpConstants.Vendor + ":entity-disabled";

/// <summary>Indicates that the producer requesting an operation is not allowed to publish events to the requested resource.</summary>
public static readonly AmqpSymbol PublisherRevokedError = AmqpConstants.Vendor + ":publisher-revoked";

/// <summary>Indicates that an operation was cancelled by the Event Hubs service.</summary>
public static readonly AmqpSymbol OperationCancelledError = AmqpConstants.Vendor + ":operation-cancelled";

/// <summary>Indicates that the requested resource cannot be created because it already exists.</summary>
public static readonly AmqpSymbol ResourceAlreadyExistsError = AmqpConstants.Vendor + ":entity-already-exists";
}
}
190 changes: 190 additions & 0 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpEventBatch.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using Azure.Messaging.EventHubs.Core;
using Microsoft.Azure.Amqp;

namespace Azure.Messaging.EventHubs.Amqp
{
/// <summary>
/// A set of events with known size constraints, based on messages to be sent
/// using an AMQP-based transport.
/// </summary>
///
internal class AmqpEventBatch : TransportEventBatch
{
/// <summary>The amount of bytes to reserve as overhead for a small message.</summary>
private const byte OverheadBytesSmallMessage = 5;

/// <summary>The amount of bytes to reserve as overhead for a large message.</summary>
private const byte OverheadBytesLargeMessage = 8;

/// <summary>The maximum number of bytes that a message may be to be considered small.</summary>
private const byte MaximumBytesSmallMessage = 255;

/// <summary>A flag that indicates whether or not the instance has been disposed.</summary>
private bool _disposed = false;

/// <summary>The size of the batch, in bytes, as it will be sent via the AMQP transport.</summary>
private long _sizeBytes = 0;

/// <summary>
/// The maximum size allowed for the batch, in bytes. This includes the events in the batch as
/// well as any overhead for the batch itself when sent to the Event Hubs service.
/// </summary>
///
public override long MaximumSizeInBytes { get; }

/// <summary>
/// The size of the batch, in bytes, as it will be sent to the Event Hubs
/// service.
/// </summary>
///
public override long SizeInBytes => _sizeBytes;

/// <summary>
/// The count of events contained in the batch.
/// </summary>
///
public override int Count => BatchMessages.Count;

/// <summary>
/// The converter to use for translating <see cref="EventData" /> into the corresponding AMQP message.
/// </summary>
///
private AmqpMessageConverter MessageConverter { get; }

/// <summary>
/// The set of options to apply to the batch.
/// </summary>
///
private BatchOptions Options { get; }

/// <summary>
/// The set of messages that have been added to the batch.
/// </summary>
///
private List<AmqpMessage> BatchMessages { get; } = new List<AmqpMessage>();

/// <summary>
/// Initializes a new instance of the <see cref="AmqpEventBatch"/> class.
/// </summary>
///
/// <param name="messageConverter">The converter to use for translating <see cref="EventData" /> into the corresponding AMQP message.</param>
/// <param name="options">The set of options to apply to the batch.</param>
///
public AmqpEventBatch(AmqpMessageConverter messageConverter,
BatchOptions options)
{
Guard.ArgumentNotNull(nameof(messageConverter), messageConverter);
Guard.ArgumentNotNull(nameof(options), options);
Guard.ArgumentNotNull(nameof(options.MaximumizeInBytes), options.MaximumizeInBytes);

MessageConverter = messageConverter;
Options = options;
MaximumSizeInBytes = options.MaximumizeInBytes.Value;

// Initialize the size by reserving space for the batch envelope.

using var envelope = messageConverter.CreateBatchFromEvents(Enumerable.Empty<EventData>(), options.PartitionKey);
_sizeBytes = envelope.SerializedMessageSize;
}

/// <summary>
/// Attempts to add an event to the batch, ensuring that the size
/// of the batch does not exceed its maximum.
/// </summary>
///
/// <param name="eventData">The event to attempt to add to the batch.</param>
///
/// <returns><c>true</c> if the event was added; otherwise, <c>false</c>.</returns>
///
public override bool TryAdd(EventData eventData)
{
Guard.ArgumentNotNull(nameof(eventData), eventData);
GuardDisposed();

var eventMessage = MessageConverter.CreateMessageFromEvent(eventData, Options.PartitionKey);

try
{
// Calculate the size for the event, based on the AMQP message size and accounting for a
// bit of reserved overhead size.

var size = _sizeBytes
+ eventMessage.SerializedMessageSize
+ (eventMessage.SerializedMessageSize <= MaximumBytesSmallMessage
? OverheadBytesSmallMessage
: OverheadBytesLargeMessage);

if (size > MaximumSizeInBytes)
{
eventMessage.Dispose();
return false;
}

_sizeBytes = size;
BatchMessages.Add(eventMessage);
return true;
}
catch
{
eventMessage?.Dispose();
throw;
}
}

/// <summary>
/// Represents the batch as an enumerable set of transport-specific
/// representations of an event.
/// </summary>
///
/// <typeparam name="T">The transport-specific event representation being requested.</typeparam>
///
/// <returns>The set of events as an enumerable of the requested type.</returns>
///
public override IEnumerable<T> AsEnumerable<T>()
{
if (typeof(T) != typeof(AmqpMessage))
{
throw new FormatException(String.Format(CultureInfo.CurrentCulture, Resources.UnsupportedTransportEventType, typeof(T).Name));
}

return (IEnumerable<T>)BatchMessages;
}

/// <summary>
/// Performs the task needed to clean up resources used by the <see cref="AmqpEventBatch" />.
/// </summary>
///
public override void Dispose()
{
_disposed = true;

foreach (var message in BatchMessages)
{
message.Dispose();
}

BatchMessages.Clear();
_sizeBytes = 0;
}

/// <summary>
/// Ensures that the batch has not already been disposed, throwing an exception
/// if it has.
/// </summary>
///
private void GuardDisposed()
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(AmqpEventBatch));
}
}
}
}
22 changes: 22 additions & 0 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpFilter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

namespace Azure.Messaging.EventHubs
{
/// <summary>
/// The set of filters associated with an AMQP messages and
/// entities.
/// </summary>
///
internal static class AmqpFilter
{
/// <summary>Indicates filtering based on the sequence number of a message.</summary>
public const string SeqNumberName = "amqp.annotation.x-opt-sequence-number";

/// <summary>Indicates filtering based on the offset of a message.</summary>
public const string OffsetPartName = "amqp.annotation.x-opt-offset";

/// <summary>Indicates filtering based on time that a message was enqueued.</summary>
public const string ReceivedAtName = "amqp.annotation.x-opt-enqueued-time";
}
}
60 changes: 60 additions & 0 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpManagement.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using Microsoft.Azure.Amqp;

namespace Azure.Messaging.EventHubs
{
/// <summary>
/// The set of annotations for management-related operations associated with an AMQP messages and
/// entities.
/// </summary>
///
internal static class AmqpManagement
{
/// <summary>The location to specify for management operations.</summary>
public const string Address = "$management";

/// <summary>The key to use for specifying an Event Hubs resource name.</summary>
public const string ResourceNameKey = "name";

/// <summary>The key to use for specifying a partition. </summary>
public const string PartitionNameKey = "partition";

/// <summary>The key to use for specifying an operation.</summary>
public const string OperationKey = "operation";

/// <summary>The key to use for specifying the type of Event Hubs resource.</summary>
public const string ResourceTypeKey = "type";

/// <summary>The key to use for specifying a security token.</summary>
public const string SecurityTokenKey = "security_token";

/// <summary>The value to specify when requesting a read-based operation.</summary>
public const string ReadOperationValue = "READ";

/// <summary>The value to specify when identifying an Event Hub resource.</summary>
public const string EventHubResourceTypeValue = AmqpConstants.Vendor + ":eventhub";

/// <summary>The value to specify when identifying a partition resource.</summary>
public const string PartitionResourceTypeValue = AmqpConstants.Vendor + ":partition";

/// <summary>The message property that identifies the beginning sequence number in a partition.</summary>
public const string PartitionBeginSequenceNumber = "begin_sequence_number";

/// <summary>The message property that identifies the last sequence number enqueued for a partition.</summary>
public const string PartitionLastEnqueuedSequenceNumber = "last_enqueued_sequence_number";

/// <summary>The message property that identifies the last offset enqueued for a partition.</summary>
public const string PartitionLastEnqueuedOffset = "last_enqueued_offset";

/// <summary>The message property that identifies the last time enqueued for a partition.</summary>
public const string PartitionLastEnqueuedTimeUtc = "last_enqueued_time_utc";

/// <summary>The message property that identifies the date and time, in UTC, that partition information was sent from the Event Hubs service.</summary>
public const string PartitionRuntimeInfoRetrievalTimeUtc = "runtime_info_retrieval_time_utc";

/// <summary>The message property that identifies whether or not a partition is considered empty.</summary>
public const string PartitionRuntimeInfoPartitionIsEmpty = "is_partition_empty";
}
}
Loading

0 comments on commit 2296a55

Please sign in to comment.