Skip to content

Commit

Permalink
[Event Hubs] Function bindings test tweaks (Azure#32166)
Browse files Browse the repository at this point in the history
* [Event Hubs] Function bindings test tweaks

The focus of these changes is to address some flakiness in the Functions
tests and to perform some minor refactoring.

* Address comments, fix failures

* Removing the check for event count; this measures the batch size read, not total count

* Emit multiple events

* FromEnd sends until cancelled

* Fixing spacing typo

* Remove batch size validation
  • Loading branch information
jsquire authored Oct 31, 2022
1 parent 6b0bab0 commit 6d37b50
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ public void SetUp()
EventHubTestMultipleDispatchJobs.MessagesCount = 0;
}

[TearDown]
public void TearDown()
{
_eventWait.Dispose();
_channel.Dispose();
}

private readonly JsonSerializerSettings jsonSettingThrowOnError = new JsonSerializerSettings
{
MissingMemberHandling = MissingMemberHandling.Error,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,25 @@ namespace Microsoft.Azure.WebJobs.Host.EndToEndTests
[LiveOnly(true)]
public class EventHubEndToEndTests : WebJobsEventHubTestBase
{
private static readonly TimeSpan NoEventReadTimeout = TimeSpan.FromSeconds(5);

private static EventWaitHandle _eventWait;
private static List<string> _results;
private static DateTimeOffset _initialOffsetEnqueuedTimeUTC;

/// <summary>
/// Performs the tasks needed to initialize the test fixture. This
/// method runs once for the entire fixture, prior to running any tests.
/// </summary>
///
[SetUp]
public void SetUp()
{
_results = new List<string>();
_eventWait = new ManualResetEvent(initialState: false);
}

[TearDown]
public void TearDown()
{
_eventWait.Dispose();
}

[Test]
public async Task EventHub_PocoBinding()
{
Expand Down Expand Up @@ -317,9 +320,7 @@ public async Task EventHub_PartitionKey()
var (jobHost, host) = BuildHost<EventHubPartitionKeyTestJobs>();
using (jobHost)
{
_eventWait = new ManualResetEvent(initialState: false);
await jobHost.CallAsync(nameof(EventHubPartitionKeyTestJobs.SendEvents_TestHub), new { input = "data" });

bool result = _eventWait.WaitOne(Timeout);

Assert.True(result);
Expand All @@ -329,7 +330,7 @@ public async Task EventHub_PartitionKey()
[Test]
public async Task EventHub_InitialOffsetFromStart()
{
var producer = new EventHubProducerClient(EventHubsTestEnvironment.Instance.EventHubsConnectionString, _eventHubScope.EventHubName);
await using var producer = new EventHubProducerClient(EventHubsTestEnvironment.Instance.EventHubsConnectionString, _eventHubScope.EventHubName);
await producer.SendAsync(new EventData[] { new EventData(new BinaryData("data")) });

var (jobHost, host) = BuildHost<EventHubTestInitialOffsetFromStartEndJobs>(
Expand All @@ -355,7 +356,7 @@ public async Task EventHub_InitialOffsetFromStart()
public async Task EventHub_InitialOffsetFromEnd()
{
// Send a message to ensure the stream is not empty as we are trying to validate that no messages are delivered in this case
var producer = new EventHubProducerClient(EventHubsTestEnvironment.Instance.EventHubsConnectionString, _eventHubScope.EventHubName);
await using var producer = new EventHubProducerClient(EventHubsTestEnvironment.Instance.EventHubsConnectionString, _eventHubScope.EventHubName);
await producer.SendAsync(new EventData[] { new EventData(new BinaryData("data")) });

var (jobHost, host) = BuildHost<EventHubTestInitialOffsetFromStartEndJobs>(
Expand All @@ -372,13 +373,29 @@ public async Task EventHub_InitialOffsetFromEnd()
});
using (jobHost)
{
// We don't expect to get signalled as there should be no messages received with a FromEnd initial offset
bool result = _eventWait.WaitOne(Timeout);
// We don't expect to get signaled as there should be no messages received with a FromEnd initial offset
bool result = _eventWait.WaitOne(NoEventReadTimeout);
Assert.False(result, "An event was received while none were expected.");

// send a new event which should be received
await producer.SendAsync(new EventData[] { new EventData(new BinaryData("data")) });
// send events which should be received. To ensure that the test is
// resilient to any errors where the link needs to be reestablished,
// continue sending events until cancellation takes place.

using var cts = new CancellationTokenSource();

var sendTask = Task.Run(async () =>
{
while (!cts.IsCancellationRequested)
{
await producer.SendAsync(new[] { new EventData("data") }, cts.Token).ConfigureAwait(false);
}
});

result = _eventWait.WaitOne(Timeout);

cts.Cancel();
try { await sendTask; } catch { /* Ignore, we're not testing sends */ }

Assert.True(result);
}
}
Expand All @@ -387,7 +404,8 @@ public async Task EventHub_InitialOffsetFromEnd()
public async Task EventHub_InitialOffsetFromEnqueuedTime()
{
await using var producer = new EventHubProducerClient(EventHubsTestEnvironment.Instance.EventHubsConnectionString, _eventHubScope.EventHubName);
for (int i = 0; i < 3; i++)

for (int i = 0; i < 5; i++)
{
// send one at a time so they will have slightly different enqueued times
await producer.SendAsync(new EventData[] { new EventData(new BinaryData("data")) });
Expand All @@ -398,9 +416,7 @@ public async Task EventHub_InitialOffsetFromEnqueuedTime()
EventHubsTestEnvironment.Instance.EventHubsConnectionString,
_eventHubScope.EventHubName);

var events = consumer.ReadEventsAsync();
_initialOffsetEnqueuedTimeUTC = DateTime.UtcNow;
await foreach (PartitionEvent evt in events)
await foreach (PartitionEvent evt in consumer.ReadEventsAsync())
{
// use the timestamp from the first event for our FromEnqueuedTime
_initialOffsetEnqueuedTimeUTC = evt.Data.EnqueuedTime;
Expand All @@ -416,9 +432,9 @@ public async Task EventHub_InitialOffsetFromEnqueuedTime()
services.Configure<EventHubOptions>(options =>
{
options.InitialOffsetOptions.Type = OffsetType.FromEnqueuedTime;

// Reads from enqueue time are non-inclusive. To ensure that we start with the desired event, set the time slightly in the past.
var dto = DateTimeOffset.Parse(_initialOffsetEnqueuedTimeUTC.AddMilliseconds(-150).ToString("yyyy-MM-ddTHH:mm:ssZ"));
options.InitialOffsetOptions.EnqueuedTimeUtc = dto;
options.InitialOffsetOptions.EnqueuedTimeUtc = _initialOffsetEnqueuedTimeUTC.Subtract(TimeSpan.FromMilliseconds(250));
});
});
ConfigureTestEventHub(builder);
Expand Down Expand Up @@ -450,8 +466,6 @@ public static void ProcessSingleEvent([EventHubTrigger(TestHubName, Connection =
PartitionContext partitionContext,
TriggerPartitionContext triggerPartitionContext)
{
Assert.True((DateTime.Now - enqueuedTimeUtc).TotalSeconds < 30);

Assert.AreEqual("value1", properties["TestProp1"]);
Assert.AreEqual("value2", properties["TestProp2"]);

Expand Down Expand Up @@ -509,7 +523,6 @@ public static void ProcessSingleEvent([EventHubTrigger(TestHubName, Connection =
string partitionKey, DateTime enqueuedTimeUtc, IDictionary<string, object> properties,
IDictionary<string, object> systemProperties)
{
Assert.True((DateTime.Now - enqueuedTimeUtc).TotalSeconds < 30);
Assert.AreEqual("data", evt.ToString());
_eventWait.Set();
}
Expand Down Expand Up @@ -631,7 +644,7 @@ public class EventHubPartitionKeyTestJobs
// so that we get coverage for receiving events from the same partition in multiple chunks
private const int EventsPerPartition = 15;
private const int PartitionCount = 5;
private const int TotalEventsCount = EventsPerPartition * PartitionCount;
private const string PartitionKeyPrefix = "test_pk";

public static async Task SendEvents_TestHub(
string input,
Expand All @@ -647,7 +660,7 @@ public static async Task SendEvents_TestHub(
for (int i = 0; i < PartitionCount; i++)
{
evt = new EventData(Encoding.UTF8.GetBytes(input));
tasks.Add(client.SendAsync(Enumerable.Repeat(evt, EventsPerPartition), new SendEventOptions() { PartitionKey = "test_pk" + i }));
tasks.Add(client.SendAsync(Enumerable.Repeat(evt, EventsPerPartition), new SendEventOptions() { PartitionKey = PartitionKeyPrefix + i }));
}

await Task.WhenAll(tasks);
Expand All @@ -658,15 +671,17 @@ public static void ProcessMultiplePartitionEvents([EventHubTrigger(TestHubName,
foreach (EventData eventData in events)
{
string message = Encoding.UTF8.GetString(eventData.Body.ToArray());

_results.Add(eventData.PartitionKey);
_results.Sort();

// count is 1 more because we sent an event without PK
if (_results.Count == TotalEventsCount + 1 && _results[TotalEventsCount] == "test_pk4")
{
_eventWait.Set();
}
Assert.True(eventData.PartitionKey.StartsWith(PartitionKeyPrefix));
}

// The size of the batch read may not contain all events sent. If any
// were read, the format of the partition key was read and verified.

if (_results.Count > 0)
{
_eventWait.Set();
}
}
}
Expand All @@ -682,8 +697,6 @@ public static void SendEvent_TestHub(string input, [EventHub(TestHubName, Connec

public static void ProcessSingleEvent([EventHubTrigger(TestHubName, Connection = "TestConnection")] string evt, DateTime enqueuedTimeUtc, IDictionary<string, object> properties)
{
Assert.True((DateTime.Now - enqueuedTimeUtc).TotalSeconds < 30);

Assert.AreEqual("value1", properties["TestProp1"]);
Assert.AreEqual("value2", properties["TestProp2"]);

Expand Down Expand Up @@ -712,18 +725,20 @@ public class EventHubTestInitialOffsetFromEnqueuedTimeJobs

public static void ProcessMultipleEvents([EventHubTrigger(TestHubName, Connection = TestHubName)] EventData[] events)
{
Assert.LessOrEqual(events.Length, ExpectedEventsCount);
// there's potentially some level of rewind due to clock differences; allow a small delta when validating.
var earliestAllowedOffset = _initialOffsetEnqueuedTimeUTC.Subtract(TimeSpan.FromMilliseconds(500));

foreach (EventData eventData in events)
{
string message = Encoding.UTF8.GetString(eventData.Body.ToArray());

_results.Add(eventData.EnqueuedTime.ToString("MM/dd/yyyy hh:mm:ss.fff tt"));

if (_results.Count == ExpectedEventsCount)
if (_results.Count >= ExpectedEventsCount)
{
foreach (var result in _results)
{
Assert.GreaterOrEqual(DateTimeOffset.Parse(result), _initialOffsetEnqueuedTimeUTC);
Assert.GreaterOrEqual(DateTimeOffset.Parse(result), earliestAllowedOffset);
}
_eventWait.Set();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,23 @@ public class WebJobsEventHubTestBase
protected EventHubScope _eventHubScope;

/// <summary>
/// Performs the tasks needed to initialize the test fixture. This
/// method runs once for the entire fixture, prior to running any tests.
/// Performs the tasks needed to initialize each test. This
/// method runs once for the each test prior to running it.
/// </summary>
///
[SetUp]
public async Task FixtureSetUp()
public async Task BaseSetUp()
{
_eventHubScope = await EventHubScope.CreateAsync(2);
}

/// <summary>
/// Performs the tasks needed to cleanup the test fixture after all
/// tests have run. This method runs once for the entire fixture.
/// Performs the tasks needed to cleanup tests after it has run. This
/// method runs once for each test.
/// </summary>
///
[TearDown]
public async Task FixtureTearDown()
public async Task BaseTearDown()
{
await _eventHubScope.DisposeAsync();
}
Expand Down

0 comments on commit 6d37b50

Please sign in to comment.