diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubApplicationInsightsTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubApplicationInsightsTests.cs index 6a4f3a8af30dc..6439da1a35960 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubApplicationInsightsTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubApplicationInsightsTests.cs @@ -49,6 +49,13 @@ public void SetUp() EventHubTestMultipleDispatchJobs.MessagesCount = 0; } + [TearDown] + public void TearDown() + { + _eventWait.Dispose(); + _channel.Dispose(); + } + private readonly JsonSerializerSettings jsonSettingThrowOnError = new JsonSerializerSettings { MissingMemberHandling = MissingMemberHandling.Error, diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs index 95b59c3bb7d98..883ebe4b1ac6d 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs @@ -27,15 +27,12 @@ namespace Microsoft.Azure.WebJobs.Host.EndToEndTests [LiveOnly(true)] public class EventHubEndToEndTests : WebJobsEventHubTestBase { + private static readonly TimeSpan NoEventReadTimeout = TimeSpan.FromSeconds(5); + private static EventWaitHandle _eventWait; private static List _results; private static DateTimeOffset _initialOffsetEnqueuedTimeUTC; - /// - /// Performs the tasks needed to initialize the test fixture. This - /// method runs once for the entire fixture, prior to running any tests. - /// - /// [SetUp] public void SetUp() { @@ -43,6 +40,12 @@ public void SetUp() _eventWait = new ManualResetEvent(initialState: false); } + [TearDown] + public void TearDown() + { + _eventWait.Dispose(); + } + [Test] public async Task EventHub_PocoBinding() { @@ -317,9 +320,7 @@ public async Task EventHub_PartitionKey() var (jobHost, host) = BuildHost(); using (jobHost) { - _eventWait = new ManualResetEvent(initialState: false); await jobHost.CallAsync(nameof(EventHubPartitionKeyTestJobs.SendEvents_TestHub), new { input = "data" }); - bool result = _eventWait.WaitOne(Timeout); Assert.True(result); @@ -329,7 +330,7 @@ public async Task EventHub_PartitionKey() [Test] public async Task EventHub_InitialOffsetFromStart() { - var producer = new EventHubProducerClient(EventHubsTestEnvironment.Instance.EventHubsConnectionString, _eventHubScope.EventHubName); + await using var producer = new EventHubProducerClient(EventHubsTestEnvironment.Instance.EventHubsConnectionString, _eventHubScope.EventHubName); await producer.SendAsync(new EventData[] { new EventData(new BinaryData("data")) }); var (jobHost, host) = BuildHost( @@ -355,7 +356,7 @@ public async Task EventHub_InitialOffsetFromStart() public async Task EventHub_InitialOffsetFromEnd() { // Send a message to ensure the stream is not empty as we are trying to validate that no messages are delivered in this case - var producer = new EventHubProducerClient(EventHubsTestEnvironment.Instance.EventHubsConnectionString, _eventHubScope.EventHubName); + await using var producer = new EventHubProducerClient(EventHubsTestEnvironment.Instance.EventHubsConnectionString, _eventHubScope.EventHubName); await producer.SendAsync(new EventData[] { new EventData(new BinaryData("data")) }); var (jobHost, host) = BuildHost( @@ -372,13 +373,29 @@ public async Task EventHub_InitialOffsetFromEnd() }); using (jobHost) { - // We don't expect to get signalled as there should be no messages received with a FromEnd initial offset - bool result = _eventWait.WaitOne(Timeout); + // We don't expect to get signaled as there should be no messages received with a FromEnd initial offset + bool result = _eventWait.WaitOne(NoEventReadTimeout); Assert.False(result, "An event was received while none were expected."); - // send a new event which should be received - await producer.SendAsync(new EventData[] { new EventData(new BinaryData("data")) }); + // send events which should be received. To ensure that the test is + // resilient to any errors where the link needs to be reestablished, + // continue sending events until cancellation takes place. + + using var cts = new CancellationTokenSource(); + + var sendTask = Task.Run(async () => + { + while (!cts.IsCancellationRequested) + { + await producer.SendAsync(new[] { new EventData("data") }, cts.Token).ConfigureAwait(false); + } + }); + result = _eventWait.WaitOne(Timeout); + + cts.Cancel(); + try { await sendTask; } catch { /* Ignore, we're not testing sends */ } + Assert.True(result); } } @@ -387,7 +404,8 @@ public async Task EventHub_InitialOffsetFromEnd() public async Task EventHub_InitialOffsetFromEnqueuedTime() { await using var producer = new EventHubProducerClient(EventHubsTestEnvironment.Instance.EventHubsConnectionString, _eventHubScope.EventHubName); - for (int i = 0; i < 3; i++) + + for (int i = 0; i < 5; i++) { // send one at a time so they will have slightly different enqueued times await producer.SendAsync(new EventData[] { new EventData(new BinaryData("data")) }); @@ -398,9 +416,7 @@ public async Task EventHub_InitialOffsetFromEnqueuedTime() EventHubsTestEnvironment.Instance.EventHubsConnectionString, _eventHubScope.EventHubName); - var events = consumer.ReadEventsAsync(); - _initialOffsetEnqueuedTimeUTC = DateTime.UtcNow; - await foreach (PartitionEvent evt in events) + await foreach (PartitionEvent evt in consumer.ReadEventsAsync()) { // use the timestamp from the first event for our FromEnqueuedTime _initialOffsetEnqueuedTimeUTC = evt.Data.EnqueuedTime; @@ -416,9 +432,9 @@ public async Task EventHub_InitialOffsetFromEnqueuedTime() services.Configure(options => { options.InitialOffsetOptions.Type = OffsetType.FromEnqueuedTime; + // Reads from enqueue time are non-inclusive. To ensure that we start with the desired event, set the time slightly in the past. - var dto = DateTimeOffset.Parse(_initialOffsetEnqueuedTimeUTC.AddMilliseconds(-150).ToString("yyyy-MM-ddTHH:mm:ssZ")); - options.InitialOffsetOptions.EnqueuedTimeUtc = dto; + options.InitialOffsetOptions.EnqueuedTimeUtc = _initialOffsetEnqueuedTimeUTC.Subtract(TimeSpan.FromMilliseconds(250)); }); }); ConfigureTestEventHub(builder); @@ -450,8 +466,6 @@ public static void ProcessSingleEvent([EventHubTrigger(TestHubName, Connection = PartitionContext partitionContext, TriggerPartitionContext triggerPartitionContext) { - Assert.True((DateTime.Now - enqueuedTimeUtc).TotalSeconds < 30); - Assert.AreEqual("value1", properties["TestProp1"]); Assert.AreEqual("value2", properties["TestProp2"]); @@ -509,7 +523,6 @@ public static void ProcessSingleEvent([EventHubTrigger(TestHubName, Connection = string partitionKey, DateTime enqueuedTimeUtc, IDictionary properties, IDictionary systemProperties) { - Assert.True((DateTime.Now - enqueuedTimeUtc).TotalSeconds < 30); Assert.AreEqual("data", evt.ToString()); _eventWait.Set(); } @@ -631,7 +644,7 @@ public class EventHubPartitionKeyTestJobs // so that we get coverage for receiving events from the same partition in multiple chunks private const int EventsPerPartition = 15; private const int PartitionCount = 5; - private const int TotalEventsCount = EventsPerPartition * PartitionCount; + private const string PartitionKeyPrefix = "test_pk"; public static async Task SendEvents_TestHub( string input, @@ -647,7 +660,7 @@ public static async Task SendEvents_TestHub( for (int i = 0; i < PartitionCount; i++) { evt = new EventData(Encoding.UTF8.GetBytes(input)); - tasks.Add(client.SendAsync(Enumerable.Repeat(evt, EventsPerPartition), new SendEventOptions() { PartitionKey = "test_pk" + i })); + tasks.Add(client.SendAsync(Enumerable.Repeat(evt, EventsPerPartition), new SendEventOptions() { PartitionKey = PartitionKeyPrefix + i })); } await Task.WhenAll(tasks); @@ -658,15 +671,17 @@ public static void ProcessMultiplePartitionEvents([EventHubTrigger(TestHubName, foreach (EventData eventData in events) { string message = Encoding.UTF8.GetString(eventData.Body.ToArray()); - _results.Add(eventData.PartitionKey); - _results.Sort(); - // count is 1 more because we sent an event without PK - if (_results.Count == TotalEventsCount + 1 && _results[TotalEventsCount] == "test_pk4") - { - _eventWait.Set(); - } + Assert.True(eventData.PartitionKey.StartsWith(PartitionKeyPrefix)); + } + + // The size of the batch read may not contain all events sent. If any + // were read, the format of the partition key was read and verified. + + if (_results.Count > 0) + { + _eventWait.Set(); } } } @@ -682,8 +697,6 @@ public static void SendEvent_TestHub(string input, [EventHub(TestHubName, Connec public static void ProcessSingleEvent([EventHubTrigger(TestHubName, Connection = "TestConnection")] string evt, DateTime enqueuedTimeUtc, IDictionary properties) { - Assert.True((DateTime.Now - enqueuedTimeUtc).TotalSeconds < 30); - Assert.AreEqual("value1", properties["TestProp1"]); Assert.AreEqual("value2", properties["TestProp2"]); @@ -712,18 +725,20 @@ public class EventHubTestInitialOffsetFromEnqueuedTimeJobs public static void ProcessMultipleEvents([EventHubTrigger(TestHubName, Connection = TestHubName)] EventData[] events) { - Assert.LessOrEqual(events.Length, ExpectedEventsCount); + // there's potentially some level of rewind due to clock differences; allow a small delta when validating. + var earliestAllowedOffset = _initialOffsetEnqueuedTimeUTC.Subtract(TimeSpan.FromMilliseconds(500)); + foreach (EventData eventData in events) { string message = Encoding.UTF8.GetString(eventData.Body.ToArray()); _results.Add(eventData.EnqueuedTime.ToString("MM/dd/yyyy hh:mm:ss.fff tt")); - if (_results.Count == ExpectedEventsCount) + if (_results.Count >= ExpectedEventsCount) { foreach (var result in _results) { - Assert.GreaterOrEqual(DateTimeOffset.Parse(result), _initialOffsetEnqueuedTimeUTC); + Assert.GreaterOrEqual(DateTimeOffset.Parse(result), earliestAllowedOffset); } _eventWait.Set(); } diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/WebJobsEventHubTestBase.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/WebJobsEventHubTestBase.cs index bfad93b538c16..449b94883bc6c 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/WebJobsEventHubTestBase.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/WebJobsEventHubTestBase.cs @@ -22,23 +22,23 @@ public class WebJobsEventHubTestBase protected EventHubScope _eventHubScope; /// - /// Performs the tasks needed to initialize the test fixture. This - /// method runs once for the entire fixture, prior to running any tests. + /// Performs the tasks needed to initialize each test. This + /// method runs once for the each test prior to running it. /// /// [SetUp] - public async Task FixtureSetUp() + public async Task BaseSetUp() { _eventHubScope = await EventHubScope.CreateAsync(2); } /// - /// Performs the tasks needed to cleanup the test fixture after all - /// tests have run. This method runs once for the entire fixture. + /// Performs the tasks needed to cleanup tests after it has run. This + /// method runs once for each test. /// /// [TearDown] - public async Task FixtureTearDown() + public async Task BaseTearDown() { await _eventHubScope.DisposeAsync(); }