From 6746bc0213353af724a37bb4312284e358aa0d5a Mon Sep 17 00:00:00 2001 From: Jesse Squire Date: Mon, 1 Feb 2021 15:59:29 -0500 Subject: [PATCH] [Event Hubs Client] Formatting Pass The purpose of these changes is to format code to apply project conventions for consistency and in some cases, update member names and documentation to better convey context. **Note:** These changes are intended to be superficial; no change to existing behavior should have taken place. --- .../Diagnostics/BlobEventStoreEventSource.cs | 18 +-- .../BlobsCheckpointStore.Diagnostics.cs | 97 ++++++++++--- .../BlobsCheckpointStoreTests.cs | 2 +- .../Infrastructure/StorageTestEnvironment.cs | 2 +- .../EventProcessorClientLiveTests.cs | 32 ++--- .../Processor/EventProcessorClientTests.cs | 61 ++++---- .../BlobsCheckpointStore.cs | 133 ++++++++++++++---- .../src/Processor/PartitionLoadBalancer.cs | 2 +- .../src/Testing/EventGenerator.cs | 3 +- .../src/Testing/EventHubsTestEnvironment.cs | 6 +- .../Processor/PartitionLoadBalancerTests.cs | 2 +- .../Testing/InMemoryStorageManagerTests.cs | 1 - .../src/Amqp/AmqpConnectionScope.cs | 2 +- .../src/Amqp/AmqpProducer.cs | 2 +- .../src/Core/ChannelReaderExtensions.cs | 2 +- .../src/Core/TransportProducerPool.cs | 2 +- .../src/Diagnostics/EventHubsEventSource.cs | 1 - .../src/Primitives/EventProcessor.cs | 2 +- .../src/Producer/EventHubProducerClient.cs | 4 +- .../Producer/PartitionPublishingOptions.cs | 2 +- .../Producer/PartitionPublishingProperties.cs | 2 +- .../src/Producer/PartitionPublishingState.cs | 2 +- .../tests/Amqp/AmqpConnectionScopeTests.cs | 4 +- .../tests/Amqp/AmqpProducerTests.cs | 1 - ...EventHubsSharedAccessKeyCredentialTests.cs | 1 - .../Connection/EventHubConnectionLiveTests.cs | 1 - .../EventHubConsumerClientLiveTests.cs | 58 ++++---- .../tests/Core/EventDataTests.cs | 8 +- ...ventHubsConnectionStringPropertiesTests.cs | 1 - .../tests/Core/EventHubsModelFactoryTests.cs | 4 +- .../EventProcessorTests.MainProcessingLoop.cs | 23 ++- .../EventHubProducerClientOptionsTests.cs | 6 +- .../Producer/EventHubProducerClientTests.cs | 37 ++--- .../Producer/IdempotentPublishingLiveTests.cs | 14 +- 34 files changed, 335 insertions(+), 203 deletions(-) mode change 100755 => 100644 sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Testing/InMemoryStorageManagerTests.cs mode change 100755 => 100644 sdk/eventhub/Azure.Messaging.EventHubs/src/Core/ChannelReaderExtensions.cs diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Diagnostics/BlobEventStoreEventSource.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Diagnostics/BlobEventStoreEventSource.cs index 49c1c9966aafd..d83c3aad339d2 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Diagnostics/BlobEventStoreEventSource.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Diagnostics/BlobEventStoreEventSource.cs @@ -341,10 +341,10 @@ public virtual void UpdateCheckpointStart(string partitionId, /// The name of the consumer group the checkpoint is associated with. /// [Event(33, Level = EventLevel.Informational, Message = "Completed the attempt to create/update a checkpoint for partition: `{0}` of FullyQualifiedNamespace: '{1}'; EventHubName: '{2}'; ConsumerGroup: '{3}'.")] - public virtual void UpdateCheckpointComplete(string partitionId, - string fullyQualifiedNamespace, - string eventHubName, - string consumerGroup) + public virtual void UpdateCheckpointComplete(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup) { if (IsEnabled()) { @@ -363,11 +363,11 @@ public virtual void UpdateCheckpointComplete(string partitionId, /// The message for the exception that occurred. /// [Event(34, Level = EventLevel.Error, Message = "An exception occurred when creating/updating a checkpoint for partition: `{0}` of FullyQualifiedNamespace: '{1}'; EventHubName: '{2}'; ConsumerGroup: '{3}'. ErrorMessage: '{4}'.")] - public virtual void UpdateCheckpointError(string partitionId, - string fullyQualifiedNamespace, - string eventHubName, - string consumerGroup, - string errorMessage) + public virtual void UpdateCheckpointError(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string errorMessage) { if (IsEnabled()) { diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Storage/BlobsCheckpointStore.Diagnostics.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Storage/BlobsCheckpointStore.Diagnostics.cs index 9056902721c52..217bac1b9a3b6 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Storage/BlobsCheckpointStore.Diagnostics.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Storage/BlobsCheckpointStore.Diagnostics.cs @@ -37,7 +37,10 @@ static BlobsCheckpointStore() /// The name of the consumer group the ownership are associated with. /// The amount of ownership received from the storage service. /// - partial void ListOwnershipComplete(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, int ownershipCount) => + partial void ListOwnershipComplete(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + int ownershipCount) => Logger.ListOwnershipComplete(fullyQualifiedNamespace, eventHubName, consumerGroup, ownershipCount); /// @@ -49,7 +52,10 @@ partial void ListOwnershipComplete(string fullyQualifiedNamespace, string eventH /// The name of the consumer group the ownership are associated with. /// The exception that occurred. /// - partial void ListOwnershipError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, Exception exception) => + partial void ListOwnershipError(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + Exception exception) => Logger.ListOwnershipError(fullyQualifiedNamespace, eventHubName, consumerGroup, exception.Message); /// @@ -60,7 +66,9 @@ partial void ListOwnershipError(string fullyQualifiedNamespace, string eventHubN /// The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it. /// The name of the consumer group the ownership are associated with. /// - partial void ListOwnershipStart(string fullyQualifiedNamespace, string eventHubName, string consumerGroup) => + partial void ListOwnershipStart(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup) => Logger.ListOwnershipStart(fullyQualifiedNamespace, eventHubName, consumerGroup); /// @@ -72,7 +80,10 @@ partial void ListOwnershipStart(string fullyQualifiedNamespace, string eventHubN /// The name of the consumer group the checkpoints are associated with. /// The amount of checkpoints received from the storage service. /// - partial void ListCheckpointsComplete(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, int checkpointCount) => + partial void ListCheckpointsComplete(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + int checkpointCount) => Logger.ListCheckpointsComplete(fullyQualifiedNamespace, eventHubName, consumerGroup, checkpointCount); /// @@ -84,7 +95,10 @@ partial void ListCheckpointsComplete(string fullyQualifiedNamespace, string even /// The name of the consumer group the ownership are associated with. /// The exception that occurred. /// - partial void ListCheckpointsError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, Exception exception) => + partial void ListCheckpointsError(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + Exception exception) => Logger.ListCheckpointsError(fullyQualifiedNamespace, eventHubName, consumerGroup, exception.Message); /// @@ -96,7 +110,10 @@ partial void ListCheckpointsError(string fullyQualifiedNamespace, string eventHu /// The name of the specific Event Hub the data is associated with, relative to the Event Hubs namespace that contains it. /// The name of the consumer group the data is associated with. /// - partial void InvalidCheckpointFound(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup) => + partial void InvalidCheckpointFound(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup) => Logger.InvalidCheckpointFound(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup); /// @@ -107,7 +124,9 @@ partial void InvalidCheckpointFound(string partitionId, string fullyQualifiedNam /// The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it. /// The name of the consumer group the checkpoints are associated with. /// - partial void ListCheckpointsStart(string fullyQualifiedNamespace, string eventHubName, string consumerGroup) => + partial void ListCheckpointsStart(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup) => Logger.ListCheckpointsStart(fullyQualifiedNamespace, eventHubName, consumerGroup); /// @@ -120,7 +139,11 @@ partial void ListCheckpointsStart(string fullyQualifiedNamespace, string eventHu /// The name of the consumer group the checkpoint is associated with. /// The exception that occurred. /// - partial void UpdateCheckpointError(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, Exception exception) => + partial void UpdateCheckpointError(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + Exception exception) => Logger.UpdateCheckpointError(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, exception.Message); /// @@ -132,7 +155,10 @@ partial void UpdateCheckpointError(string partitionId, string fullyQualifiedName /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. /// The name of the consumer group the checkpoint is associated with. /// - partial void UpdateCheckpointComplete(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup) => + partial void UpdateCheckpointComplete(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup) => Logger.UpdateCheckpointComplete(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup); /// @@ -144,7 +170,10 @@ partial void UpdateCheckpointComplete(string partitionId, string fullyQualifiedN /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. /// The name of the consumer group the checkpoint is associated with. /// - partial void UpdateCheckpointStart(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup) => + partial void UpdateCheckpointStart(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup) => Logger.UpdateCheckpointStart(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup); /// @@ -157,7 +186,11 @@ partial void UpdateCheckpointStart(string partitionId, string fullyQualifiedName /// The name of the consumer group the ownership is associated with. /// The identifier of the processor that attempted to claim the ownership for. /// - partial void ClaimOwnershipComplete(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string ownerIdentifier) => + partial void ClaimOwnershipComplete(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string ownerIdentifier) => Logger.ClaimOwnershipComplete(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, ownerIdentifier); /// @@ -171,7 +204,12 @@ partial void ClaimOwnershipComplete(string partitionId, string fullyQualifiedNam /// The identifier of the processor that attempted to claim the ownership for. /// The exception that occurred. /// - partial void ClaimOwnershipError(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string ownerIdentifier, Exception exception) => + partial void ClaimOwnershipError(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string ownerIdentifier, + Exception exception) => Logger.ClaimOwnershipError(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, ownerIdentifier, exception.Message); /// @@ -185,7 +223,12 @@ partial void ClaimOwnershipError(string partitionId, string fullyQualifiedNamesp /// The identifier of the processor that attempted to claim the ownership for. /// The message for the failure. /// - partial void OwnershipNotClaimable(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string ownerIdentifier, string message) => + partial void OwnershipNotClaimable(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string ownerIdentifier, + string message) => Logger.OwnershipNotClaimable(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, ownerIdentifier, message); /// @@ -211,7 +254,11 @@ partial void OwnershipClaimed(string partitionId, string fullyQualifiedNamespace /// The name of the consumer group the ownership is associated with. /// The identifier of the processor that attempted to claim the ownership for. /// - partial void ClaimOwnershipStart(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string ownerIdentifier) => + partial void ClaimOwnershipStart(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string ownerIdentifier) => Logger.ClaimOwnershipStart(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup, ownerIdentifier); /// @@ -222,7 +269,9 @@ partial void ClaimOwnershipStart(string partitionId, string fullyQualifiedNamesp /// The Storage account name corresponding to the associated container client. /// The name of the associated container client. /// - partial void BlobsCheckpointStoreCreated(string typeName, string accountName, string containerName) => + partial void BlobsCheckpointStoreCreated(string typeName, + string accountName, + string containerName) => Logger.BlobsCheckpointStoreCreated(typeName, accountName, containerName); /// @@ -234,7 +283,10 @@ partial void BlobsCheckpointStoreCreated(string typeName, string accountName, st /// The name of the consumer group the checkpoint is associated with. /// The partition id the specific checkpoint is associated with. /// - partial void GetCheckpointStart(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId) => + partial void GetCheckpointStart(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string partitionId) => Logger.GetCheckpointStart(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId); /// @@ -246,7 +298,10 @@ partial void GetCheckpointStart(string fullyQualifiedNamespace, string eventHubN /// The name of the consumer group the checkpoint is associated with. /// The partition id the specific checkpoint is associated with. /// - partial void GetCheckpointComplete(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId) => + partial void GetCheckpointComplete(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string partitionId) => Logger.GetCheckpointComplete(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId); /// @@ -259,7 +314,11 @@ partial void GetCheckpointComplete(string fullyQualifiedNamespace, string eventH /// The partition id the specific checkpoint is associated with. /// The message for the exception that occurred. /// - partial void GetCheckpointError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, Exception exception) => + partial void GetCheckpointError(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string partitionId, + Exception exception) => Logger.GetCheckpointError(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId, exception.Message); } -} \ No newline at end of file +} diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs index 4b4fca6b58d49..7f70beee1e508 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs @@ -1466,7 +1466,7 @@ public async Task GetCheckpointLogsStartAndComplete() await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", new CancellationToken()); mockLog.Verify(m => m.GetCheckpointStart(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0")); - mockLog.Verify(m => m.GetCheckpointComplete(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0")); + mockLog.Verify(m => m.GetCheckpointComplete(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0")); } /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Infrastructure/StorageTestEnvironment.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Infrastructure/StorageTestEnvironment.cs index 19f4741375b3b..8b3bbc6d42dc6 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Infrastructure/StorageTestEnvironment.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Infrastructure/StorageTestEnvironment.cs @@ -13,7 +13,7 @@ namespace Azure.Messaging.EventHubs.Processor.Tests /// being run, offering access to information such as environment variables. /// /// - public class StorageTestEnvironment: TestEnvironment + public class StorageTestEnvironment : TestEnvironment { /// The singleton instance of the , lazily created. private static readonly Lazy Singleton = new Lazy(() => new StorageTestEnvironment(), LazyThreadSafetyMode.ExecutionAndPublication); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientLiveTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientLiveTests.cs index f83add6dff325..c5d72a888549f 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientLiveTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientLiveTests.cs @@ -47,7 +47,7 @@ public async Task EventsCanBeReadByOneProcessorClient(LoadBalancingStrategy load var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName); using var cancellationSource = new CancellationTokenSource(); - cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); // Send a set of events. @@ -79,7 +79,7 @@ public async Task EventsCanBeReadByOneProcessorClient(LoadBalancingStrategy load foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(processedEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -97,7 +97,7 @@ public async Task EventsCanBeReadByOneProcessorClientUsingAnIdentityCredential() var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName); using var cancellationSource = new CancellationTokenSource(); - cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); // Send a set of events. @@ -129,7 +129,7 @@ public async Task EventsCanBeReadByOneProcessorClientUsingAnIdentityCredential() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(processedEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -148,7 +148,7 @@ public async Task EventsCanBeReadByOneProcessorClientUsingTheSharedKeyCredential var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName); using var cancellationSource = new CancellationTokenSource(); - cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); // Send a set of events. @@ -180,7 +180,7 @@ public async Task EventsCanBeReadByOneProcessorClientUsingTheSharedKeyCredential foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(processedEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -244,7 +244,7 @@ public async Task EventsCanBeReadByMultipleProcessorClients() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(processedEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -288,7 +288,7 @@ public async Task ProcessorClientCreatesOwnership() var processedEvents = new ConcurrentDictionary(); var completionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var storageManager = new InMemoryStorageManager(_ => {}); + var storageManager = new InMemoryStorageManager(_ => { }); var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(250) }; var processor = CreateProcessorWithIdentity(scope.ConsumerGroups.First(), scope.EventHubName, storageManager, options); @@ -346,7 +346,7 @@ public async Task ProcessorClientCanStartFromAnInitialPosition() await using (var consumer = new EventHubConsumerClient(scope.ConsumerGroups.First(), connectionString)) { - await foreach (var partitionEvent in consumer.ReadEventsAsync(new ReadEventOptions { MaximumWaitTime = null }, cancellationSource.Token)) + await foreach (var partitionEvent in consumer.ReadEventsAsync(new ReadEventOptions { MaximumWaitTime = null }, cancellationSource.Token)) { if (partitionEvent.Data.IsEquivalentTo(lastSourceEvent)) { @@ -393,7 +393,7 @@ public async Task ProcessorClientCanStartFromAnInitialPosition() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(processedEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -438,7 +438,7 @@ public async Task ProcessorClientBeginsWithTheNextEventAfterCheckpointing() var completionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var beforeCheckpointProcessHandler = CreateEventTrackingHandler(segmentEventCount, processedEvents, completionSource, cancellationSource.Token, processedEventCallback); var options = new EventProcessorOptions { LoadBalancingUpdateInterval = TimeSpan.FromMilliseconds(250) }; - var storageManager = new InMemoryStorageManager(_ => {}); + var storageManager = new InMemoryStorageManager(_ => { }); var processor = CreateProcessor(scope.ConsumerGroups.First(), connectionString, storageManager, options); processor.ProcessErrorAsync += CreateAssertingErrorHandler(); @@ -476,7 +476,7 @@ public async Task ProcessorClientBeginsWithTheNextEventAfterCheckpointing() foreach (var sourceEvent in afterCheckpointEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(processedEvents.TryGetValue(sourceId, out var processedEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(processedEvent), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -500,7 +500,7 @@ private EventProcessorClient CreateProcessor(string consumerGroup, { EventHubConnection createConnection() => new EventHubConnection(connectionString); - storageManager ??= new InMemoryStorageManager(_=> {}); + storageManager ??= new InMemoryStorageManager(_ => { }); return new TestEventProcessorClient(storageManager, consumerGroup, "fakeNamespace", "fakeEventHub", Mock.Of(), createConnection, options); } @@ -524,7 +524,7 @@ private EventProcessorClient CreateProcessorWithIdentity(string consumerGroup, var credential = EventHubsTestEnvironment.Instance.Credential; EventHubConnection createConnection() => new EventHubConnection(EventHubsTestEnvironment.Instance.FullyQualifiedNamespace, eventHubName, credential); - storageManager ??= new InMemoryStorageManager(_=> {}); + storageManager ??= new InMemoryStorageManager(_ => { }); return new TestEventProcessorClient(storageManager, consumerGroup, EventHubsTestEnvironment.Instance.FullyQualifiedNamespace, eventHubName, credential, createConnection, options); } @@ -548,7 +548,7 @@ private EventProcessorClient CreateProcessorWithSharedAccessKey(string consumerG var credential = new EventHubsSharedAccessKeyCredential(EventHubsTestEnvironment.Instance.SharedAccessKeyName, EventHubsTestEnvironment.Instance.SharedAccessKey); EventHubConnection createConnection() => null; //new EventHubConnection(EventHubsTestEnvironment.Instance.FullyQualifiedNamespace, eventHubName, credential); - storageManager ??= new InMemoryStorageManager(_=> {}); + storageManager ??= new InMemoryStorageManager(_ => { }); return new TestEventProcessorClient(storageManager, consumerGroup, EventHubsTestEnvironment.Instance.FullyQualifiedNamespace, eventHubName, credential, createConnection, options); } @@ -621,7 +621,7 @@ private Func CreateEventTrackingHandler(int targetCount, if (processedEvents.Count >= targetCount) { - completionSource.TrySetResult(true); + completionSource.TrySetResult(true); } } } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientTests.cs index 4d268db208f9c..87a5ec601271b 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientTests.cs @@ -131,27 +131,27 @@ void assertOptionsMatch(EventProcessorOptions expected, Assert.That(actual, Is.Not.Null, $"The processor options should have been created for the { constructorDescription } constructor."); Assert.That(actual.ConnectionOptions.TransportType, Is.EqualTo(expected.ConnectionOptions.TransportType), $"The connection options are incorrect for the { constructorDescription } constructor."); Assert.That(actual.RetryOptions.MaximumRetries, Is.EqualTo(expected.RetryOptions.MaximumRetries), $"The retry options are incorrect for the { constructorDescription } constructor."); - Assert.That(actual.Identifier, Is.EqualTo(expected.Identifier), $"The identifier is incorrect for the { constructorDescription } constructor."); - Assert.That(actual.MaximumWaitTime, Is.EqualTo(expected.MaximumWaitTime), $"The maximum wait time is incorrect for the { constructorDescription } constructor."); - Assert.That(actual.TrackLastEnqueuedEventProperties, Is.EqualTo(expected.TrackLastEnqueuedEventProperties), $"The last event tracking flag is incorrect for the { constructorDescription } constructor."); - Assert.That(actual.DefaultStartingPosition, Is.EqualTo(expected.DefaultStartingPosition), $"The default starting position is incorrect for the { constructorDescription } constructor."); - Assert.That(actual.LoadBalancingUpdateInterval, Is.EqualTo(expected.LoadBalancingUpdateInterval), $"The load balancing interval is incorrect for the { constructorDescription } constructor."); - Assert.That(actual.PartitionOwnershipExpirationInterval, Is.EqualTo(expected.PartitionOwnershipExpirationInterval), $"The ownership expiration interval incorrect for the { constructorDescription } constructor."); - Assert.That(actual.PrefetchCount, Is.EqualTo(expected.PrefetchCount), $"The prefetch count is incorrect for the { constructorDescription } constructor."); - Assert.That(actual.PrefetchSizeInBytes, Is.EqualTo(expected.PrefetchSizeInBytes), $"The prefetch byte size is incorrect for the { constructorDescription } constructor."); + Assert.That(actual.Identifier, Is.EqualTo(expected.Identifier), $"The identifier is incorrect for the { constructorDescription } constructor."); + Assert.That(actual.MaximumWaitTime, Is.EqualTo(expected.MaximumWaitTime), $"The maximum wait time is incorrect for the { constructorDescription } constructor."); + Assert.That(actual.TrackLastEnqueuedEventProperties, Is.EqualTo(expected.TrackLastEnqueuedEventProperties), $"The last event tracking flag is incorrect for the { constructorDescription } constructor."); + Assert.That(actual.DefaultStartingPosition, Is.EqualTo(expected.DefaultStartingPosition), $"The default starting position is incorrect for the { constructorDescription } constructor."); + Assert.That(actual.LoadBalancingUpdateInterval, Is.EqualTo(expected.LoadBalancingUpdateInterval), $"The load balancing interval is incorrect for the { constructorDescription } constructor."); + Assert.That(actual.PartitionOwnershipExpirationInterval, Is.EqualTo(expected.PartitionOwnershipExpirationInterval), $"The ownership expiration interval incorrect for the { constructorDescription } constructor."); + Assert.That(actual.PrefetchCount, Is.EqualTo(expected.PrefetchCount), $"The prefetch count is incorrect for the { constructorDescription } constructor."); + Assert.That(actual.PrefetchSizeInBytes, Is.EqualTo(expected.PrefetchSizeInBytes), $"The prefetch byte size is incorrect for the { constructorDescription } constructor."); } var clientOptions = new EventProcessorClientOptions { - ConnectionOptions = new EventHubConnectionOptions { TransportType = EventHubsTransportType.AmqpWebSockets }, - RetryOptions = new EventHubsRetryOptions { MaximumRetries = 99 }, - Identifier = "OMG, HAI!", - MaximumWaitTime = TimeSpan.FromDays(54), - TrackLastEnqueuedEventProperties = true, - PrefetchCount = 5, - PrefetchSizeInBytes = 500, - LoadBalancingUpdateInterval = TimeSpan.FromDays(65), - PartitionOwnershipExpirationInterval = TimeSpan.FromMilliseconds(65) + ConnectionOptions = new EventHubConnectionOptions { TransportType = EventHubsTransportType.AmqpWebSockets }, + RetryOptions = new EventHubsRetryOptions { MaximumRetries = 99 }, + Identifier = "OMG, HAI!", + MaximumWaitTime = TimeSpan.FromDays(54), + TrackLastEnqueuedEventProperties = true, + PrefetchCount = 5, + PrefetchSizeInBytes = 500, + LoadBalancingUpdateInterval = TimeSpan.FromDays(65), + PartitionOwnershipExpirationInterval = TimeSpan.FromMilliseconds(65) }; var expectedOptions = InvokeCreateOptions(clientOptions); @@ -608,7 +608,7 @@ public void ProcessorAllowsRemovingEventHandlers() processorClient.PartitionInitializingAsync += initHandler; processorClient.PartitionClosingAsync += closeHandler; processorClient.ProcessEventAsync += eventHandler; - processorClient.ProcessErrorAsync +=errorHandler; + processorClient.ProcessErrorAsync += errorHandler; Assert.That(() => processorClient.PartitionInitializingAsync -= initHandler, Throws.Nothing, "The initializing handler should allow removing registrations."); Assert.That(() => processorClient.PartitionClosingAsync -= closeHandler, Throws.Nothing, "The closing handler should allow removing registrations."); @@ -630,7 +630,8 @@ public async Task ProcessorDoesNotAllowEventHandlerChangesWhenRunning() Func errorHandler = eventArgs => Task.CompletedTask; var processorClient = new TestEventProcessorClient(Mock.Of(), "consumerGroup", "namespace", "eventHub", Mock.Of(), Mock.Of(), default); - processorClient.ProcessEventAsync += eventHandler;; + processorClient.ProcessEventAsync += eventHandler; + ; processorClient.ProcessErrorAsync += errorHandler; // Handlers should not be allowed when the processor is running. @@ -944,7 +945,7 @@ public async Task EventProcessingToleratesAndSurfacesMultipleExceptions() var eventBatch = Enumerable .Range(0, eventCount) - .Select(index => new MockEventData(Array.Empty(), offset: 1000 + index, sequenceNumber: 2000 + index)) + .Select(index => new MockEventData(Array.Empty(), offset: 1000 + index, sequenceNumber: 2000 + index)) .ToList(); processorClient.ProcessEventAsync += eventArgs => @@ -1422,16 +1423,16 @@ public void ClientOptionsCanBeTranslated() { var clientOptions = new EventProcessorClientOptions { - ConnectionOptions = new EventHubConnectionOptions { TransportType = EventHubsTransportType.AmqpWebSockets }, - RetryOptions = new EventHubsRetryOptions { MaximumRetries = 99 }, - Identifier = "OMG, HAI!", - MaximumWaitTime = TimeSpan.FromDays(54), - TrackLastEnqueuedEventProperties = true, - LoadBalancingStrategy = LoadBalancingStrategy.Greedy, - PrefetchCount = 9990, - PrefetchSizeInBytes = 400, - LoadBalancingUpdateInterval = TimeSpan.FromSeconds(45), - PartitionOwnershipExpirationInterval = TimeSpan.FromMilliseconds(44) + ConnectionOptions = new EventHubConnectionOptions { TransportType = EventHubsTransportType.AmqpWebSockets }, + RetryOptions = new EventHubsRetryOptions { MaximumRetries = 99 }, + Identifier = "OMG, HAI!", + MaximumWaitTime = TimeSpan.FromDays(54), + TrackLastEnqueuedEventProperties = true, + LoadBalancingStrategy = LoadBalancingStrategy.Greedy, + PrefetchCount = 9990, + PrefetchSizeInBytes = 400, + LoadBalancingUpdateInterval = TimeSpan.FromSeconds(45), + PartitionOwnershipExpirationInterval = TimeSpan.FromMilliseconds(44) }; var defaultOptions = new EventProcessorOptions(); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs index 81c33dfb65ee3..20e9e47dccd07 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs @@ -49,7 +49,16 @@ internal partial class BlobsCheckpointStore : StorageManager /// with the specified prefix. /// /// - private const string LegacyCheckpointPrefix = "{0}/{1}/{2}/"; + /// + /// This pattern is specific to the prefix used by the Azure Functions extension. The legacy + /// EventProcessorHost allowed this value to be specified as an option, defaulting to + /// an empty prefix. + /// + /// For this to be general-purpose, it will need to be refactored into an option with this + /// pattern passed by the Functions extension. + /// + /// + private const string FunctionsLegacyCheckpointPrefix = "{0}/{1}/{2}/"; /// /// Specifies a string that filters the results to return only ownership blobs whose name begins @@ -295,7 +304,7 @@ public override async Task> ListCheckpoint if (InitializeWithLegacyCheckpoints) { // Legacy checkpoints are not normalized to lowercase - var legacyPrefix = string.Format(CultureInfo.InvariantCulture, LegacyCheckpointPrefix, fullyQualifiedNamespace, eventHubName, consumerGroup); + var legacyPrefix = string.Format(CultureInfo.InvariantCulture, FunctionsLegacyCheckpointPrefix, fullyQualifiedNamespace, eventHubName, consumerGroup); await foreach (BlobItem blob in ContainerClient.GetBlobsAsync(prefix: legacyPrefix, cancellationToken: cancellationToken).ConfigureAwait(false)) { @@ -351,7 +360,11 @@ public override async Task> ListCheckpoint /// /// A initialized with checkpoint properties if the checkpoint exists, otherwise null. /// - public override async Task GetCheckpointAsync(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, CancellationToken cancellationToken) + public override async Task GetCheckpointAsync(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string partitionId, + CancellationToken cancellationToken) { try { @@ -376,7 +389,7 @@ public override async Task GetCheckpointAsync(string f { if (InitializeWithLegacyCheckpoints) { - var legacyPrefix = string.Format(CultureInfo.InvariantCulture, LegacyCheckpointPrefix, fullyQualifiedNamespace, eventHubName, consumerGroup) + partitionId; + var legacyPrefix = string.Format(CultureInfo.InvariantCulture, FunctionsLegacyCheckpointPrefix, fullyQualifiedNamespace, eventHubName, consumerGroup) + partitionId; return await CreateLegacyCheckpoint(fullyQualifiedNamespace, eventHubName, consumerGroup, legacyPrefix, partitionId, cancellationToken).ConfigureAwait(false); } } @@ -479,7 +492,7 @@ private async Task CreateLegacyCheckpoint(string fully await blobClient.DownloadToAsync(memoryStream, cancellationToken).ConfigureAwait(false); if (TryReadLegacyCheckpoint( - memoryStream.GetBuffer().AsSpan(0, (int) memoryStream.Length), + memoryStream.GetBuffer().AsSpan(0, (int)memoryStream.Length), out long? offset, out long? sequenceNumber)) { @@ -588,8 +601,11 @@ public override async Task UpdateCheckpointAsync(EventProcessorCheckpoint checkp /// "Offset":"8591964920", /// "SequenceNumber":960180 /// } - /// /// - private static bool TryReadLegacyCheckpoint(Span data, out long? offset, out long? sequenceNumber) + /// + /// + private static bool TryReadLegacyCheckpoint(Span data, + out long? offset, + out long? sequenceNumber) { offset = null; sequenceNumber = null; @@ -647,6 +663,7 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o catch (JsonException) { // Ignore this because if the data is malformed, it will be treated as if the checkpoint didn't exist. + return false; } @@ -662,7 +679,10 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o /// The name of the consumer group the ownership are associated with. /// The amount of ownership received from the storage service. /// - partial void ListOwnershipComplete(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, int ownershipCount); + partial void ListOwnershipComplete(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + int ownershipCount); /// /// Indicates that an unhandled exception was encountered while retrieving a list of ownership. @@ -673,7 +693,10 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o /// The name of the consumer group the ownership are associated with. /// The message for the exception that occurred. /// - partial void ListOwnershipError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, Exception exception); + partial void ListOwnershipError(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + Exception exception); /// /// Indicates that an attempt to retrieve a list of ownership has started. @@ -683,7 +706,9 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o /// The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it. /// The name of the consumer group the ownership are associated with. /// - partial void ListOwnershipStart(string fullyQualifiedNamespace, string eventHubName, string consumerGroup); + partial void ListOwnershipStart(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup); /// /// Indicates that an attempt to retrieve a list of checkpoints has completed. @@ -694,7 +719,10 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o /// The name of the consumer group the checkpoints are associated with. /// The amount of checkpoints received from the storage service. /// - partial void ListCheckpointsComplete(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, int checkpointCount); + partial void ListCheckpointsComplete(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + int checkpointCount); /// /// Indicates that an unhandled exception was encountered while retrieving a list of checkpoints. @@ -705,7 +733,10 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o /// The name of the consumer group the ownership are associated with. /// The message for the exception that occurred. /// - partial void ListCheckpointsError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, Exception exception); + partial void ListCheckpointsError(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + Exception exception); /// /// Indicates that an attempt to retrieve a checkpoint has started. @@ -716,7 +747,10 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o /// The name of the consumer group the checkpoint is associated with. /// The partition id the specific checkpoint is associated with. /// - partial void GetCheckpointStart(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId); + partial void GetCheckpointStart(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string partitionId); /// /// Indicates that an attempt to retrieve a checkpoint has completed. @@ -727,7 +761,10 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o /// The name of the consumer group the checkpoint is associated with. /// The partition id the specific checkpoint is associated with. /// - partial void GetCheckpointComplete(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId); + partial void GetCheckpointComplete(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string partitionId); /// /// Indicates that an unhandled exception was encountered while retrieving a checkpoint. @@ -739,7 +776,11 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o /// The partition id the specific checkpoint is associated with. /// The message for the exception that occurred. /// - partial void GetCheckpointError(string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string partitionId, Exception exception); + partial void GetCheckpointError(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string partitionId, + Exception exception); /// /// Indicates that invalid checkpoint data was found during an attempt to retrieve a list of checkpoints. @@ -750,7 +791,10 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o /// The name of the specific Event Hub the data is associated with, relative to the Event Hubs namespace that contains it. /// The name of the consumer group the data is associated with. /// - partial void InvalidCheckpointFound(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup); + partial void InvalidCheckpointFound(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup); /// /// Indicates that an attempt to retrieve a list of checkpoints has started. @@ -760,7 +804,9 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o /// The name of the specific Event Hub the checkpoints are associated with, relative to the Event Hubs namespace that contains it. /// The name of the consumer group the checkpoints are associated with. /// - partial void ListCheckpointsStart(string fullyQualifiedNamespace, string eventHubName, string consumerGroup); + partial void ListCheckpointsStart(string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup); /// /// Indicates that an unhandled exception was encountered while updating a checkpoint. @@ -772,7 +818,11 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o /// The name of the consumer group the checkpoint is associated with. /// The message for the exception that occurred. /// - partial void UpdateCheckpointError(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, Exception exception); + partial void UpdateCheckpointError(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + Exception exception); /// /// Indicates that an attempt to update a checkpoint has completed. @@ -783,7 +833,10 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. /// The name of the consumer group the checkpoint is associated with. /// - partial void UpdateCheckpointComplete(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup); + partial void UpdateCheckpointComplete(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup); /// /// Indicates that an attempt to create/update a checkpoint has started. @@ -794,7 +847,10 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o /// The name of the specific Event Hub the checkpoint is associated with, relative to the Event Hubs namespace that contains it. /// The name of the consumer group the checkpoint is associated with. /// - partial void UpdateCheckpointStart(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup); + partial void UpdateCheckpointStart(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup); /// /// Indicates that an attempt to retrieve claim partition ownership has completed. @@ -806,7 +862,11 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o /// The name of the consumer group the ownership is associated with. /// The identifier of the processor that attempted to claim the ownership for. /// - partial void ClaimOwnershipComplete(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string ownerIdentifier); + partial void ClaimOwnershipComplete(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string ownerIdentifier); /// /// Indicates that an exception was encountered while attempting to retrieve claim partition ownership. @@ -819,7 +879,12 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o /// The identifier of the processor that attempted to claim the ownership for. /// The message for the exception that occurred. /// - partial void ClaimOwnershipError(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string ownerIdentifier, Exception exception); + partial void ClaimOwnershipError(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string ownerIdentifier, + Exception exception); /// /// Indicates that ownership was unable to be claimed. @@ -832,7 +897,12 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o /// The identifier of the processor that attempted to claim the ownership for. /// The message for the failure. /// - partial void OwnershipNotClaimable(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string ownerIdentifier, string message); + partial void OwnershipNotClaimable(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string ownerIdentifier, + string message); /// /// Indicates that ownership was successfully claimed. @@ -844,7 +914,11 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o /// The name of the consumer group the ownership is associated with. /// The identifier of the processor that attempted to claim the ownership for. /// - partial void OwnershipClaimed(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string ownerIdentifier); + partial void OwnershipClaimed(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string ownerIdentifier); /// /// Indicates that an attempt to claim a partition ownership has started. @@ -856,7 +930,11 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o /// The name of the consumer group the ownership is associated with. /// The identifier of the processor that attempted to claim the ownership for. /// - partial void ClaimOwnershipStart(string partitionId, string fullyQualifiedNamespace, string eventHubName, string consumerGroup, string ownerIdentifier); + partial void ClaimOwnershipStart(string partitionId, + string fullyQualifiedNamespace, + string eventHubName, + string consumerGroup, + string ownerIdentifier); /// /// Indicates that a was created. @@ -866,12 +944,15 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o /// The Storage account name corresponding to the associated container client. /// The name of the associated container client. /// - partial void BlobsCheckpointStoreCreated(string typeName, string accountName, string containerName); + partial void BlobsCheckpointStoreCreated(string typeName, + string accountName, + string containerName); /// /// Contains the information to reflect the state of event processing for a given Event Hub partition. /// Provides access to the offset and the sequence number retrieved from the blob. /// + /// public class BlobStorageCheckpoint : EventProcessorCheckpoint { public long? Offset { get; set; } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Processor/PartitionLoadBalancer.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Processor/PartitionLoadBalancer.cs index 008b531ddd0db..82d78116243c8 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Processor/PartitionLoadBalancer.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Processor/PartitionLoadBalancer.cs @@ -55,7 +55,7 @@ internal class PartitionLoadBalancer /// read only in the context of this group. /// /// - public string ConsumerGroup { get; } + public string ConsumerGroup { get; } /// /// The identifier of the EventProcessorClient that owns this load balancer. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventGenerator.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventGenerator.cs index da3384a444848..141bbcb989c49 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventGenerator.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventGenerator.cs @@ -3,7 +3,6 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Threading; using System.Threading.Tasks; using Azure.Messaging.EventHubs.Producer; @@ -134,7 +133,7 @@ public static async Task> BuildBatchesAsync(IEnumera } else { - queuedEvents.Dequeue(); + queuedEvents.Dequeue(); } } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventHubsTestEnvironment.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventHubsTestEnvironment.cs index 6ca33dd946152..efbf0cb58cd5c 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventHubsTestEnvironment.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventHubsTestEnvironment.cs @@ -5,8 +5,6 @@ using System.Threading; using System.Threading.Tasks; using Azure.Core.TestFramework; -using Azure.Messaging.EventHubs.Authorization; -using Azure.Messaging.EventHubs.Core; namespace Azure.Messaging.EventHubs.Tests { @@ -16,7 +14,7 @@ namespace Azure.Messaging.EventHubs.Tests /// variables. /// /// - public sealed class EventHubsTestEnvironment: TestEnvironment + public sealed class EventHubsTestEnvironment : TestEnvironment { /// The name of the shared access key to be used for accessing an Event Hubs namespace. public const string EventHubsDefaultSharedAccessKey = "RootManageSharedAccessKey"; @@ -131,7 +129,7 @@ public sealed class EventHubsTestEnvironment: TestEnvironment /// The location of the resource manager for the active cloud environment. /// /// - public new string ResourceManagerUrl => base.ResourceManagerUrl ?? "https://management.azure.com/"; + public new string ResourceManagerUrl => base.ResourceManagerUrl ?? "https://management.azure.com/"; /// /// Initializes a new instance of . diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Processor/PartitionLoadBalancerTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Processor/PartitionLoadBalancerTests.cs index 72c41640e906f..140cba1b05d5f 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Processor/PartitionLoadBalancerTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Processor/PartitionLoadBalancerTests.cs @@ -636,7 +636,7 @@ public async Task RunLoadBalancingAsyncDoesNotStealOwnershipAsRecovery() // // Assign the processor ownership over half of the partitions in storage, but do not formally claim them. - await storageManager.ClaimOwnershipAsync(CreatePartitionOwnership( partitionIds.Skip(MinimumPartitionCount).Take(OrphanedPartitionCount), loadBalancer.OwnerIdentifier)); + await storageManager.ClaimOwnershipAsync(CreatePartitionOwnership(partitionIds.Skip(MinimumPartitionCount).Take(OrphanedPartitionCount), loadBalancer.OwnerIdentifier)); completeOwnership = await storageManager.ListOwnershipAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup); Assert.That(completeOwnership.Count(), Is.EqualTo(OrphanedPartitionCount + MinimumPartitionCount), "Storage should be tracking half the partitions as owned by another processor as well as some orphans."); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Testing/InMemoryStorageManagerTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Testing/InMemoryStorageManagerTests.cs old mode 100755 new mode 100644 index d9b33d62e252b..6d9ad4c9aff89 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Testing/InMemoryStorageManagerTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Testing/InMemoryStorageManagerTests.cs @@ -6,7 +6,6 @@ using System.Linq; using System.Threading.Tasks; using Azure.Messaging.EventHubs.Primitives; -using Moq; using NUnit.Framework; namespace Azure.Messaging.EventHubs.Tests diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs index 0224b4d86623b..8fd0d57aa14f2 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs @@ -909,7 +909,7 @@ protected virtual async Task RequestAuthorizationUsingCbsAsync(AmqpCon if (connection.IsClosing()) { - throw new EventHubsException(true, EventHubName, Resources.UnknownCommunicationException, EventHubsException.FailureReason.ServiceCommunicationProblem); + throw new EventHubsException(true, EventHubName, Resources.UnknownCommunicationException, EventHubsException.FailureReason.ServiceCommunicationProblem); } var authLink = connection.Extensions.Find(); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProducer.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProducer.cs index c76f60a90839c..26b646f506230 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProducer.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProducer.cs @@ -578,7 +578,7 @@ protected virtual async Task CreateLinkAndEnsureProducerStateAs } catch (Exception ex) { - ExceptionDispatchInfo.Capture(ex.TranslateConnectionCloseDuringLinkCreationException(EventHubName)).Throw(); + ExceptionDispatchInfo.Capture(ex.TranslateConnectionCloseDuringLinkCreationException(EventHubName)).Throw(); } return link; diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/ChannelReaderExtensions.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/ChannelReaderExtensions.cs old mode 100755 new mode 100644 index 7c4eb0dddd353..203860809eafe --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/ChannelReaderExtensions.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/ChannelReaderExtensions.cs @@ -32,7 +32,7 @@ internal static class ChannelReaderExtensions /// public static async IAsyncEnumerable EnumerateChannel(this ChannelReader reader, TimeSpan? maximumWaitTime, - [EnumeratorCancellation]CancellationToken cancellationToken) + [EnumeratorCancellation] CancellationToken cancellationToken) { Argument.AssertNotNull(reader, nameof(reader)); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportProducerPool.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportProducerPool.cs index 83497535dd7b4..bc7d532ef075f 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportProducerPool.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportProducerPool.cs @@ -290,7 +290,7 @@ public PoolItem(string partitionId, /// A class wrapping a , triggering a clean-up when the object is disposed. /// /// - internal class PooledProducer: IAsyncDisposable + internal class PooledProducer : IAsyncDisposable { /// /// A function responsible of cleaning up the resources in use. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Diagnostics/EventHubsEventSource.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Diagnostics/EventHubsEventSource.cs index ce9fb45124f33..02114d671ea66 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Diagnostics/EventHubsEventSource.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Diagnostics/EventHubsEventSource.cs @@ -1,7 +1,6 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -using System; using System.Diagnostics.Tracing; using Azure.Core.Diagnostics; using Azure.Messaging.EventHubs.Consumer; diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs index 7727f0e6e20ed..c4130344291cd 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs @@ -783,7 +783,7 @@ async Task performProcessing() protected abstract Task> ListCheckpointsAsync(CancellationToken cancellationToken); /// - /// Returns a checkpoint for the Event Hub, consumer group, and identifier of the partition associated with the + /// Returns a checkpoint for the Event Hub, consumer group, and identifier of the partition associated with the /// event processor instance, so that processing for a given partition can be properly initialized. /// The default implementation calls the and filters results by . /// It's recommended that this method is overriden in implementations to achieve an optimal performance. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs index a509ac3b62359..ab9d9447bd706 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs @@ -624,8 +624,8 @@ public virtual async Task SendAsync(IEnumerable eventBatch, var events = eventBatch switch { - IReadOnlyList eventList => eventList, - _ => eventBatch.ToList() + IReadOnlyList eventList => eventList, + _ => eventBatch.ToList() }; if (events.Count == 0) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingOptions.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingOptions.cs index b76564afd12cf..64fce10348956 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingOptions.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingOptions.cs @@ -31,7 +31,7 @@ internal class PartitionPublishingOptions /// /// /// - public long? ProducerGroupId { get; set; } + public long? ProducerGroupId { get; set; } /// /// The owner level indicates that a publishing is intended to be performed exclusively for events in the diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingProperties.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingProperties.cs index c19f4ea15be10..c4584957f738e 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingProperties.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingProperties.cs @@ -44,7 +44,7 @@ internal static PartitionPublishingProperties Empty /// The identifier of the producer group for which this producer is publishing to the associated partition. /// /// - public long? ProducerGroupId { get; } + public long? ProducerGroupId { get; } /// /// The owner level of the producer publishing to the associated partition. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingState.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingState.cs index 331471d8ee021..4455b436246be 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingState.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingState.cs @@ -41,7 +41,7 @@ internal class PartitionPublishingState /// The identifier of the producer group for which publishing is being performed. /// /// - public long? ProducerGroupId { get; set; } + public long? ProducerGroupId { get; set; } /// /// The owner level for which publishing is being performed. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConnectionScopeTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConnectionScopeTests.cs index 436873aced42f..57fcb88e564c9 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConnectionScopeTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConnectionScopeTests.cs @@ -2,8 +2,8 @@ // Licensed under the MIT License. using System; -using System.Collections.Generic; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Linq; using System.Net; using System.Reflection; @@ -1966,7 +1966,7 @@ public async Task RequestAuthorizationUsingCbsAsyncRespectsTheConnectionClosing( } catch { - // Ignore any other exception; the assertions will fail with better context. + // Ignore any other exception; the assertions will fail with better context. } Assert.That(observedException, Is.Not.Null, "An Event Hubs exception should have been thrown when requesting authorization."); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpProducerTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpProducerTests.cs index 69dbb63f8ff43..9742d4723d4c9 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpProducerTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpProducerTests.cs @@ -12,7 +12,6 @@ using Azure.Messaging.EventHubs.Producer; using Microsoft.Azure.Amqp; using Microsoft.Azure.Amqp.Framing; -using Microsoft.Identity.Client; using Moq; using Moq.Protected; using NUnit.Framework; diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Authorization/EventHubsSharedAccessKeyCredentialTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Authorization/EventHubsSharedAccessKeyCredentialTests.cs index 5da17e8517e92..bcaec3f71f90e 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Authorization/EventHubsSharedAccessKeyCredentialTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Authorization/EventHubsSharedAccessKeyCredentialTests.cs @@ -3,7 +3,6 @@ using System; using System.Reflection; -using Azure.Core; using Azure.Messaging.EventHubs.Authorization; using NUnit.Framework; diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Connection/EventHubConnectionLiveTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Connection/EventHubConnectionLiveTests.cs index cf764a957143a..0cd77df67a91a 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Connection/EventHubConnectionLiveTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Connection/EventHubConnectionLiveTests.cs @@ -9,7 +9,6 @@ using System.Threading; using System.Threading.Tasks; using Azure.Core; -using Azure.Identity; using Azure.Messaging.EventHubs.Authorization; using Azure.Messaging.EventHubs.Core; using NUnit.Framework; diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientLiveTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientLiveTests.cs index a7316d86ff8d2..ab5273c1fa2ad 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientLiveTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Consumer/EventHubConsumerClientLiveTests.cs @@ -52,7 +52,7 @@ public async Task ConsumerWithNoOptionsCanRead(EventHubsTransportType transportT await using (EventHubScope scope = await EventHubScope.CreateAsync(1)) { using var cancellationSource = new CancellationTokenSource(); - cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName); @@ -237,7 +237,7 @@ public async Task ConsumerCanReadBatchOfZeroLengthEvents() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -275,7 +275,7 @@ public async Task ConsumerCanReadBatchOfEvents() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -314,7 +314,7 @@ public async Task ConsumerCanReadBatchOfEventsWithCustomPrefetchAndBatchCounts() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -346,14 +346,14 @@ public async Task ConsumerCanReadBatchOfEventsWithCustomPrefetchAndBatchCountsAn // Read the events and validate the resulting state. - var readOptions = new ReadEventOptions { PrefetchCount = 150, CacheEventCount = 50, PrefetchSizeInBytes = 128 }; + var readOptions = new ReadEventOptions { PrefetchCount = 150, CacheEventCount = 50, PrefetchSizeInBytes = 128 }; var readState = await ReadEventsFromPartitionAsync(consumer, partition, sourceEvents.Count, cancellationSource.Token, readOptions: readOptions); Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled."); foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -392,7 +392,7 @@ public async Task ConsumerCanReadEventsWithPrefetchDisabled() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -439,7 +439,7 @@ public async Task ConsumerCanReadEventsWithCustomProperties() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -479,7 +479,7 @@ public async Task ConsumerCanReadEventsUsingAnIdentityCredential() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -519,7 +519,7 @@ public async Task ConsumerCanReadEventsUsingTheSharedKeyCredential() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -557,7 +557,7 @@ public async Task ConsumerCanReadFromEarliest() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -608,7 +608,7 @@ public async Task ConsumerCanReadFromLatest() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -667,7 +667,7 @@ public async Task ConsumerCanReadFromOffset(bool isInclusive) foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -726,7 +726,7 @@ public async Task ConsumerCanReadFromSequenceNumber(bool isInclusive) foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -780,7 +780,7 @@ public async Task ConsumerCanReadFromEnqueuedTime() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -826,10 +826,10 @@ public async Task ConsumerCanReadFromMultipleConsumerGroups() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState[0].Events.TryGetValue(sourceId, out var customReadEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed for the custom consumer group." ); + Assert.That(readState[0].Events.TryGetValue(sourceId, out var customReadEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed for the custom consumer group."); Assert.That(sourceEvent.IsEquivalentTo(customReadEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event for the custom consumer group."); - Assert.That(readState[1].Events.TryGetValue(sourceId, out var defaultReadEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed for the default consumer group." ); + Assert.That(readState[1].Events.TryGetValue(sourceId, out var defaultReadEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed for the default consumer group."); Assert.That(sourceEvent.IsEquivalentTo(defaultReadEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event for the default consumer group."); } } @@ -935,7 +935,7 @@ public async Task ConsumerCannotReadFromInvalidPartition() await using (EventHubScope scope = await EventHubScope.CreateAsync(1)) { using var cancellationSource = new CancellationTokenSource(); - cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); await using (var consumer = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGroupName, EventHubsTestEnvironment.Instance.EventHubsConnectionString, scope.EventHubName)) { @@ -961,7 +961,7 @@ public async Task ConsumerCannotReadFromInvalidConsumerGroup() await using (EventHubScope scope = await EventHubScope.CreateAsync(1)) { using var cancellationSource = new CancellationTokenSource(); - cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); var invalidConsumerGroup = "ThisIsFake"; @@ -1257,7 +1257,7 @@ public async Task ExclusiveConsumerSupercedesNonExclusiveActiveReader() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -1319,7 +1319,7 @@ public async Task ConsumerWithHigherOwnerLevelSupercedesActiveReader() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -1484,7 +1484,7 @@ public async Task ConsumerIsNotCompromisedByFailureToReadFromInvalidPartition() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -1559,10 +1559,10 @@ await Task.WhenAll foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(higherReadState.Events.TryGetValue(sourceId, out var higherEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed by the higher reader." ); + Assert.That(higherReadState.Events.TryGetValue(sourceId, out var higherEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed by the higher reader."); Assert.That(sourceEvent.IsEquivalentTo(higherEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event for the higher reader."); - Assert.That(lowerReadState.Events.TryGetValue(sourceId, out var lowerEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed by the lower reader." ); + Assert.That(lowerReadState.Events.TryGetValue(sourceId, out var lowerEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed by the lower reader."); Assert.That(sourceEvent.IsEquivalentTo(lowerEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event for the lower reader."); } } @@ -1833,7 +1833,7 @@ public async Task ConsumerCanReadFromAllPartitions() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -1874,7 +1874,7 @@ public async Task ConsumerCanReadFromAllPartitionsWithCustomPrefetchAndBatchCoun foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -1915,7 +1915,7 @@ public async Task ConsumerCanReadFromAllPartitionsUsingAnIdentityCredential() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -1956,7 +1956,7 @@ public async Task ConsumerCanReadFromAllPartitionsUsingTheSharedKeyCredential() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } @@ -2007,7 +2007,7 @@ public async Task ConsumerCanReadFromAllPartitionsStartingWithLatest() foreach (var sourceEvent in sourceEvents) { var sourceId = sourceEvent.Properties[EventGenerator.IdPropertyName].ToString(); - Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed." ); + Assert.That(readState.Events.TryGetValue(sourceId, out var readEvent), Is.True, $"The event with custom identifier [{ sourceId }] was not processed."); Assert.That(sourceEvent.IsEquivalentTo(readEvent.Data), $"The event with custom identifier [{ sourceId }] did not match the corresponding processed event."); } } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/EventDataTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/EventDataTests.cs index 7baf8dd98f94b..b97ff82190925 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/EventDataTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/EventDataTests.cs @@ -87,8 +87,8 @@ public void CloneProducesACopy() { var sourceEvent = new EventData( new byte[] { 0x21, 0x22 }, - new Dictionary { {"Test", 123 } }, - new Dictionary { { "System", "Hello" }}, + new Dictionary { { "Test", 123 } }, + new Dictionary { { "System", "Hello" } }, 33334444, 666777, DateTimeOffset.Parse("2015-10-27T00:00:00Z"), @@ -117,8 +117,8 @@ public void CloneIsolatesPropertyChanges() { var sourceEvent = new EventData( new byte[] { 0x21, 0x22 }, - new Dictionary { {"Test", 123 } }, - new Dictionary { { "System", "Hello" }}, + new Dictionary { { "Test", 123 } }, + new Dictionary { { "System", "Hello" } }, 33334444, 666777, DateTimeOffset.Parse("2015-10-27T00:00:00Z"), diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/EventHubsConnectionStringPropertiesTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/EventHubsConnectionStringPropertiesTests.cs index abf2f81074b65..b1db18c4b43f9 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/EventHubsConnectionStringPropertiesTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/EventHubsConnectionStringPropertiesTests.cs @@ -4,7 +4,6 @@ using System; using System.Collections.Generic; using System.Reflection; -using Azure.Messaging.EventHubs; using NUnit.Framework; using NUnit.Framework.Constraints; diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/EventHubsModelFactoryTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/EventHubsModelFactoryTests.cs index 7ef24491be147..18babe16d950a 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/EventHubsModelFactoryTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/EventHubsModelFactoryTests.cs @@ -148,8 +148,8 @@ public void PartitionContextDefaultsLastEnqueuedEventProperties() public void EventDataInitializesProperties() { var body = new BinaryData("Hello"); - var properties = new Dictionary {{ "id", 12 }}; - var systemProperties = new Dictionary {{ "custom", "sys-value" }}; + var properties = new Dictionary { { "id", 12 } }; + var systemProperties = new Dictionary { { "custom", "sys-value" } }; var sequenceNumber = long.MaxValue - 512; var offset = long.MaxValue - 1024; var enqueueTime = new DateTimeOffset(2015, 10, 27, 12, 0, 0, TimeSpan.Zero); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.MainProcessingLoop.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.MainProcessingLoop.cs index 64406bc975ffd..41a8c9e5ae900 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.MainProcessingLoop.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.MainProcessingLoop.cs @@ -3,7 +3,6 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Threading; using System.Threading.Tasks; using Azure.Core; @@ -736,10 +735,10 @@ public async Task BackgroundProcessingLogsHandlerErrorWhenPartitionProcessingSto .Callback(() => startCompletionSource.TrySetResult(true)) .CallBase(); - mockProcessor - .Protected() - .Setup("OnPartitionProcessingStoppedAsync", ItExpr.IsAny(), ItExpr.IsAny(), ItExpr.IsAny()) - .Throws(expectedException); + mockProcessor + .Protected() + .Setup("OnPartitionProcessingStoppedAsync", ItExpr.IsAny(), ItExpr.IsAny(), ItExpr.IsAny()) + .Throws(expectedException); await mockProcessor.Object.StartProcessingAsync(cancellationSource.Token); Assert.That(mockProcessor.Object.Status, Is.EqualTo(EventProcessorStatus.Running), "The processor should have started."); @@ -1308,9 +1307,9 @@ public async Task BackgroundProcessingDispatchesExceptionsWhenStartingToProcessC var mockConnection = new Mock(); var mockProcessor = new Mock>(65, "consumerGroup", "namespace", "eventHub", Mock.Of(), options, mockLoadBalancer.Object) { CallBase = true }; - mockLoadBalancer - .SetupGet(lb => lb.OwnedPartitionIds) - .Returns(ownedPartitions); + mockLoadBalancer + .SetupGet(lb => lb.OwnedPartitionIds) + .Returns(ownedPartitions); mockLoadBalancer .SetupSequence(lb => lb.RunLoadBalancingAsync(partitionIds, It.IsAny())) @@ -1670,8 +1669,8 @@ public async Task LoadBalancingWhenGreedyAppliesTheTimeoutAfterBalance() }) .Returns(() => { - completionSource.TrySetResult(true); - return new ValueTask(default(EventProcessorPartitionOwnership)); + completionSource.TrySetResult(true); + return new ValueTask(default(EventProcessorPartitionOwnership)); }); mockConnection @@ -1762,8 +1761,8 @@ public async Task LoadBalancingAppliesTheBalancedStrategy() }) .Returns(() => { - completionSource.TrySetResult(true); - return new ValueTask(default(EventProcessorPartitionOwnership)); + completionSource.TrySetResult(true); + return new ValueTask(default(EventProcessorPartitionOwnership)); }); mockConnection diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientOptionsTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientOptionsTests.cs index 3310c39bad7b9..fd8a585f80394 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientOptionsTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientOptionsTests.cs @@ -118,7 +118,7 @@ public void CreateFeatureFlagsDetectsIdempotentPublishing() public void GetPublishingOptionsOrDefaultForPartitionDefaultsWhenNoPartitionIsSpecified(string partitionId) { var options = new EventHubProducerClientOptions(); - options.PartitionOptions.Add("1", new PartitionPublishingOptions{ ProducerGroupId = 1 }); + options.PartitionOptions.Add("1", new PartitionPublishingOptions { ProducerGroupId = 1 }); Assert.That(options.GetPublishingOptionsOrDefaultForPartition(partitionId), Is.EqualTo(default(PartitionPublishingOptions))); } @@ -132,7 +132,7 @@ public void GetPublishingOptionsOrDefaultForPartitionDefaultsWhenNoPartitionIsSp public void GetPublishingOptionsOrDefaultForPartitionDefaultsWhenNoPartitionIsFound() { var options = new EventHubProducerClientOptions(); - options.PartitionOptions.Add("1", new PartitionPublishingOptions{ ProducerGroupId = 1 }); + options.PartitionOptions.Add("1", new PartitionPublishingOptions { ProducerGroupId = 1 }); Assert.That(options.GetPublishingOptionsOrDefaultForPartition("0"), Is.EqualTo(default(PartitionPublishingOptions))); } @@ -146,7 +146,7 @@ public void GetPublishingOptionsOrDefaultForPartitionDefaultsWhenNoPartitionIsFo public void GetPublishingOptionsOrDefaultForPartitionReturnsTheOptionsWhenThePartitionIsFound() { var partitionId = "12"; - var expectedPartitionOptions = new PartitionPublishingOptions{ ProducerGroupId = 1 }; + var expectedPartitionOptions = new PartitionPublishingOptions { ProducerGroupId = 1 }; var options = new EventHubProducerClientOptions(); options.PartitionOptions.Add(partitionId, expectedPartitionOptions); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs index 51ce9ac7125e3..ed54bea17ca08 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs @@ -52,7 +52,7 @@ public void ConstructorValidatesTheConnectionStringIsPopulated(string connection [TestCase("HostName=value.azure-devices.net;SharedAccessKeyName=[value];SharedAccessKey=[value];EntityPath=[value]")] public void ConstructorValidatesConnectionString(string connectionString) { - Assert.That(() =>new EventHubProducerClient(connectionString), Throws.ArgumentException.And.Message.StartsWith(Resources.MissingConnectionInformation)); + Assert.That(() => new EventHubProducerClient(connectionString), Throws.ArgumentException.And.Message.StartsWith(Resources.MissingConnectionInformation)); } /// @@ -406,9 +406,9 @@ public async Task ReadPartitionPublishingPropertiesAsyncInitializesPartitionStat clientOptions.PartitionOptions.Add(expectedPartition, new PartitionPublishingOptions { - ProducerGroupId = 999, - OwnerLevel = 999, - StartingSequenceNumber = 999 + ProducerGroupId = 999, + OwnerLevel = 999, + StartingSequenceNumber = 999 }); var producer = new EventHubProducerClient(connection, clientOptions); @@ -786,10 +786,10 @@ public void SendIdempotentDoesNotAllowResending() var events = EventGenerator.CreateEvents(5).Select(item => { - item.PendingPublishSequenceNumber = 5; - item.CommitPublishingState(); + item.PendingPublishSequenceNumber = 5; + item.CommitPublishingState(); - return item; + return item; }); var sendOptions = new SendEventOptions { PartitionId = "0" }; @@ -880,9 +880,9 @@ public async Task SendIdempotentInitializesPartitionState() clientOptions.PartitionOptions.Add(expectedPartition, new PartitionPublishingOptions { - ProducerGroupId = 999, - OwnerLevel = 999, - StartingSequenceNumber = 999 + ProducerGroupId = 999, + OwnerLevel = 999, + StartingSequenceNumber = 999 }); var producer = new EventHubProducerClient(connection, clientOptions); @@ -1295,7 +1295,8 @@ public void SendIdempotentRequiresThePartitionWithABatch() var batchOptions = new CreateBatchOptions { PartitionKey = "testKey" }; batch = new EventDataBatch(new MockTransportBatch(1), "ns", "eh", batchOptions); - Assert.That(async () => await producer.SendAsync(batch), Throws.InstanceOf(), "A partition key cannot be used with idempotent publishing.");; + Assert.That(async () => await producer.SendAsync(batch), Throws.InstanceOf(), "A partition key cannot be used with idempotent publishing."); + ; } /// @@ -1340,10 +1341,10 @@ public void SendIdempotentDoesNotAllowResendingWithABatchContainingPublishedEven var events = EventGenerator.CreateEvents(5).Skip(4).Select(item => { - item.PendingPublishSequenceNumber = 5; - item.CommitPublishingState(); + item.PendingPublishSequenceNumber = 5; + item.CommitPublishingState(); - return item; + return item; }); var batch = new EventDataBatch(new MockTransportBatch(), "ns", "eh", new CreateBatchOptions { PartitionId = "0" }); @@ -1434,9 +1435,9 @@ public async Task SendIdempotentInitializesPartitionStateWithABatch() clientOptions.PartitionOptions.Add(expectedPartition, new PartitionPublishingOptions { - ProducerGroupId = 999, - OwnerLevel = 999, - StartingSequenceNumber = 999 + ProducerGroupId = 999, + OwnerLevel = 999, + StartingSequenceNumber = 999 }); var producer = new EventHubProducerClient(connection, clientOptions); @@ -2749,7 +2750,7 @@ private class MockPooledProducer : TransportProducerPool.PooledProducer { public bool WasClosed { get; set; } = false; - public MockPooledProducer(TransportProducer transportProducer): base(transportProducer, (_) => Task.CompletedTask) + public MockPooledProducer(TransportProducer transportProducer) : base(transportProducer, (_) => Task.CompletedTask) { } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/IdempotentPublishingLiveTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/IdempotentPublishingLiveTests.cs index d32d8fa3e2cdf..14beba232074e 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/IdempotentPublishingLiveTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/IdempotentPublishingLiveTests.cs @@ -60,7 +60,7 @@ public async Task ProducerCanPublishEvents(EventHubsTransportType transportType) await using (EventHubScope scope = await EventHubScope.CreateAsync(1)) { var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName); - var options = new EventHubProducerClientOptions { EnableIdempotentPartitions = true, ConnectionOptions = new EventHubConnectionOptions { TransportType = transportType }}; + var options = new EventHubProducerClientOptions { EnableIdempotentPartitions = true, ConnectionOptions = new EventHubConnectionOptions { TransportType = transportType } }; await using var producer = new EventHubProducerClient(connectionString, options); @@ -88,7 +88,7 @@ public async Task ProducerCanPublishBatches(EventHubsTransportType transportType await using (EventHubScope scope = await EventHubScope.CreateAsync(1)) { var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName); - var options = new EventHubProducerClientOptions { EnableIdempotentPartitions = true, ConnectionOptions = new EventHubConnectionOptions { TransportType = transportType }}; + var options = new EventHubProducerClientOptions { EnableIdempotentPartitions = true, ConnectionOptions = new EventHubConnectionOptions { TransportType = transportType } }; await using var producer = new EventHubProducerClient(connectionString, options); @@ -193,7 +193,7 @@ public async Task ProducerManagesConcurrencyWhenPublishingEvents() var partition = (await producer.GetPartitionIdsAsync(cancellationSource.Token)).First(); var sendOptions = new SendEventOptions { PartitionId = partition }; - async Task sendEvents (int delayMilliseconds) + async Task sendEvents(int delayMilliseconds) { await Task.Delay(delayMilliseconds); await producer.SendAsync(EventGenerator.CreateEvents(2), sendOptions, cancellationSource.Token); @@ -230,7 +230,7 @@ public async Task ProducerManagesConcurrencyWhenPublishingBatches() var partition = (await producer.GetPartitionIdsAsync(cancellationSource.Token)).First(); var batchOptions = new CreateBatchOptions { PartitionId = partition }; - async Task sendBatch (int delayMilliseconds) + async Task sendBatch(int delayMilliseconds) { await Task.Delay(delayMilliseconds); @@ -462,7 +462,7 @@ public async Task ProducerCanInitializeWithPartitionOptions() cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); var partition = default(string); - var partitionProperties = default(PartitionPublishingProperties); + var partitionProperties = default(PartitionPublishingProperties); // Create a producer for a small scope that will Send some events and read the properties. @@ -505,7 +505,7 @@ public async Task ProducerCanInitializeWithPartialPartitionOptions() cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); var partition = default(string); - var partitionProperties = default(PartitionPublishingProperties); + var partitionProperties = default(PartitionPublishingProperties); // Create a producer for a small scope that will Send some events and read the properties. @@ -560,7 +560,7 @@ public async Task ProducerIsRejectedWithPartitionOptionsForInvalidState() cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); var partition = default(string); - var partitionProperties = default(PartitionPublishingProperties); + var partitionProperties = default(PartitionPublishingProperties); // Create a producer for a small scope that will Send some events and read the properties.