Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EventHubs] Track Two: Second Preview (Diagnostics Tests) #6913

Merged
merged 23 commits into from
Jul 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
714ba9e
Some test files included (a lot of changes expected). Single incomple…
kinelski Jul 12, 2019
d05b4c1
Using only LiveTests for now. Single test failing, but complete.
kinelski Jul 12, 2019
1d56125
All send live tests added. Diagnostics not working as expected
kinelski Jul 15, 2019
d8f76f4
PartitionSender tests not necessary. Injection issue needs to be fixed.
kinelski Jul 16, 2019
d6cdb86
Receive test included as well. Injection still not working.
kinelski Jul 16, 2019
3676acf
Receive test working. The order matters.
kinelski Jul 16, 2019
4378502
Injection issue somehow solved.
kinelski Jul 16, 2019
786d491
PartitionKey property in diagnostics may receive partition id
kinelski Jul 16, 2019
7ddce05
Merge remote-tracking branch 'upstream/master' into diag
kinelski Jul 16, 2019
d704760
Non-live diagnostics tests added
kinelski Jul 16, 2019
e422bc3
Extension class moved to new namespace
kinelski Jul 16, 2019
4c5c446
final comments
kinelski Jul 17, 2019
6fea5fa
Diagnostics contain Track Two Event Data.
kinelski Jul 17, 2019
dd6eb76
Reorder; more comments and small fix in diagnostics asserts.
kinelski Jul 17, 2019
40e7963
Small fix: isFaulted
kinelski Jul 17, 2019
b1da098
Final changes to increase coverage.
kinelski Jul 17, 2019
d0c5c61
Addressed some comments
kinelski Jul 17, 2019
357936e
Deleted diagnostics event data extension method related stuff
kinelski Jul 17, 2019
c968061
Moved fake listener to infrastructure
kinelski Jul 17, 2019
6c087b9
Added comments and made small changes.
kinelski Jul 18, 2019
8fef3d5
Wording
kinelski Jul 18, 2019
da27270
Merge conflict
kinelski Jul 18, 2019
fd74163
Final fixes.
kinelski Jul 18, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public async Task RunAsync(string connectionString,
// time that we've allowed.
//
// We will ask for just our event, but allow a fairly long wait period to ensure that we're able to receive it.
// If you observe the time that the call takes, it is extremely likely that the request to recieve will complete
// If you observe the time that the call takes, it is extremely likely that the request to receive will complete
kinelski marked this conversation as resolved.
Show resolved Hide resolved
// long before the maximum wait time.

Stopwatch watch = Stopwatch.StartNew();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace Azure.Messaging.EventHubs.Compatibility
internal static class TrackOneExceptionExtensions
{
/// <summary>
/// Maps a <see cref="TrackOne.EventHubsException" /> or a child of it to the equivilent
/// Maps a <see cref="TrackOne.EventHubsException" /> or a child of it to the equivalent
/// exception for the new API surface.
/// </summary>
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ internal struct ConnectionStringProperties
/// Initializes a new instance of the <see cref="ConnectionStringProperties"/> struct.
/// </summary>
///
/// <param name="endpoint">The endpoint of the EVent Hubs namespace.</param>
/// <param name="endpoint">The endpoint of the Event Hubs namespace.</param>
kinelski marked this conversation as resolved.
Show resolved Hide resolved
/// <param name="eventHubPath">The path to the specific Event Hub under the namespace.</param>
/// <param name="sharedAccessKeyName">The name of the shared access key, to use authorization.</param>
/// <param name="sharedAccessKey">The shared access key to use for authorization.</param>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ protected EventDataSender(EventHubClient eventHubClient, string partitionId)
public async Task SendAsync(IEnumerable<EventData> eventDatas, string partitionKey)
{
int count = ValidateEvents(eventDatas);
var activePartitionRouting = String.IsNullOrEmpty(partitionKey) ?
this.PartitionId :
partitionKey;

EventHubsEventSource.Log.EventSendStart(this.ClientId, count, partitionKey);
Activity activity = EventHubsDiagnosticSource.StartSendActivity(this.ClientId, this.EventHubClient.ConnectionStringBuilder, partitionKey, eventDatas, count);
Activity activity = EventHubsDiagnosticSource.StartSendActivity(this.ClientId, this.EventHubClient.ConnectionStringBuilder, activePartitionRouting, eventDatas, count);

Task sendTask = null;
try
Expand All @@ -41,13 +44,13 @@ public async Task SendAsync(IEnumerable<EventData> eventDatas, string partitionK
catch (Exception exception)
{
EventHubsEventSource.Log.EventSendException(this.ClientId, exception.ToString());
EventHubsDiagnosticSource.FailSendActivity(activity, this.EventHubClient.ConnectionStringBuilder, partitionKey, eventDatas, exception);
EventHubsDiagnosticSource.FailSendActivity(activity, this.EventHubClient.ConnectionStringBuilder, activePartitionRouting, eventDatas, exception);
throw;
}
finally
{
EventHubsEventSource.Log.EventSendStop(this.ClientId);
EventHubsDiagnosticSource.StopSendActivity(activity, this.EventHubClient.ConnectionStringBuilder, partitionKey, eventDatas, sendTask);
EventHubsDiagnosticSource.StopSendActivity(activity, this.EventHubClient.ConnectionStringBuilder, activePartitionRouting, eventDatas, sendTask);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace TrackOne
/// </summary>
internal static class EventHubsDiagnosticSource
{
public const string DiagnosticSourceName = "TrackOne";
public const string DiagnosticSourceName = "Azure.Messaging.EventHubs";

public const string ActivityIdPropertyName = "Diagnostic-Id";
public const string CorrelationContextPropertyName = "Correlation-Context";
Expand All @@ -32,7 +32,7 @@ internal static class EventHubsDiagnosticSource

internal static readonly DiagnosticListener DiagnosticListener = new DiagnosticListener(DiagnosticSourceName);

internal static Activity StartSendActivity(string clientId, EventHubsConnectionStringBuilder csb, string partitionKey, IEnumerable<EventData> eventDatas, int count)
internal static Activity StartSendActivity(string clientId, EventHubsConnectionStringBuilder csb, string activePartitionRouting, IEnumerable<EventData> eventDatas, int count)
{
// skip if diagnostic source not enabled
if (!DiagnosticListener.IsEnabled())
Expand All @@ -50,7 +50,7 @@ internal static Activity StartSendActivity(string clientId, EventHubsConnectionS

activity.AddTag("peer.hostname", csb.Endpoint.Host);
activity.AddTag("eh.event_hub_name", csb.EntityPath);
activity.AddTag("eh.partition_key", partitionKey);
activity.AddTag("eh.active_partition_routing", activePartitionRouting);
activity.AddTag("eh.event_count", count.ToString());
activity.AddTag("eh.client_id", clientId);

Expand All @@ -63,8 +63,8 @@ internal static Activity StartSendActivity(string clientId, EventHubsConnectionS
{
csb.Endpoint,
Entity = csb.EntityPath,
PartitionKey = partitionKey,
EventDatas = eventDatas
ActivePartitionRouting = activePartitionRouting,
EventDatas = eventDatas.Select(TransformEvent)
});
}
else
Expand All @@ -77,7 +77,7 @@ internal static Activity StartSendActivity(string clientId, EventHubsConnectionS
return activity;
}

internal static void FailSendActivity(Activity activity, EventHubsConnectionStringBuilder csb, string partitionKey, IEnumerable<EventData> eventDatas, Exception ex)
internal static void FailSendActivity(Activity activity, EventHubsConnectionStringBuilder csb, string activePartitionRouting, IEnumerable<EventData> eventDatas, Exception ex)
{
if (!DiagnosticListener.IsEnabled() || !DiagnosticListener.IsEnabled(SendActivityExceptionName))
{
Expand All @@ -89,13 +89,13 @@ internal static void FailSendActivity(Activity activity, EventHubsConnectionStri
{
csb.Endpoint,
Entity = csb.EntityPath,
PartitionKey = partitionKey,
EventDatas = eventDatas,
ActivePartitionRouting = activePartitionRouting,
EventDatas = eventDatas.Select(TransformEvent),
Exception = ex
});
}

internal static void StopSendActivity(Activity activity, EventHubsConnectionStringBuilder csb, string partitionKey, IEnumerable<EventData> eventDatas, Task sendTask)
internal static void StopSendActivity(Activity activity, EventHubsConnectionStringBuilder csb, string activePartitionRouting, IEnumerable<EventData> eventDatas, Task sendTask)
{
if (activity == null)
{
Expand All @@ -107,16 +107,16 @@ internal static void StopSendActivity(Activity activity, EventHubsConnectionStri
{
csb.Endpoint,
Entity = csb.EntityPath,
PartitionKey = partitionKey,
EventDatas = eventDatas,
ActivePartitionRouting = activePartitionRouting,
EventDatas = eventDatas.Select(TransformEvent),
sendTask?.Status
});
}

internal static Activity StartReceiveActivity(
string clientId,
EventHubsConnectionStringBuilder csb,
string partitionKey,
string activePartitionRouting,
string consumerGroup,
EventPosition eventPosition)
{
Expand All @@ -137,7 +137,7 @@ internal static Activity StartReceiveActivity(
// extract activity tags from input
activity.AddTag("peer.hostname", csb.Endpoint.Host);
activity.AddTag("eh.event_hub_name", csb.EntityPath);
activity.AddTag("eh.partition_key", partitionKey);
activity.AddTag("eh.active_partition_routing", activePartitionRouting);
activity.AddTag("eh.consumer_group", consumerGroup);
activity.AddTag("eh.start_offset", eventPosition.Offset);
activity.AddTag("eh.start_sequence_number", eventPosition.SequenceNumber?.ToString());
Expand All @@ -153,7 +153,7 @@ internal static Activity StartReceiveActivity(
{
Endpoint = csb.Endpoint,
Entity = csb.EntityPath,
PartitionKey = partitionKey,
ActivePartitionRouting = activePartitionRouting,
ConsumerGroup = consumerGroup
});
}
Expand All @@ -165,7 +165,7 @@ internal static Activity StartReceiveActivity(
return activity;
}

internal static void FailReceiveActivity(Activity activity, EventHubsConnectionStringBuilder csb, string partitionKey, string consumerGroup, Exception ex)
internal static void FailReceiveActivity(Activity activity, EventHubsConnectionStringBuilder csb, string activePartitionRouting, string consumerGroup, Exception ex)
{
// TODO consider enriching activity with data from exception
if (!DiagnosticListener.IsEnabled() || !DiagnosticListener.IsEnabled(ReceiveActivityExceptionName))
Expand All @@ -178,13 +178,13 @@ internal static void FailReceiveActivity(Activity activity, EventHubsConnectionS
{
csb.Endpoint,
Entity = csb.EntityPath,
PartitionKey = partitionKey,
ActivePartitionRouting = activePartitionRouting,
ConsumerGroup = consumerGroup,
Exception = ex
});
}

internal static void StopReceiveActivity(Activity activity, EventHubsConnectionStringBuilder csb, string partitionKey, string consumerGroup, IList<EventData> events, Task receiveTask)
internal static void StopReceiveActivity(Activity activity, EventHubsConnectionStringBuilder csb, string activePartitionRouting, string consumerGroup, IList<EventData> events, Task receiveTask)
{
if (activity == null)
{
Expand All @@ -199,9 +199,9 @@ internal static void StopReceiveActivity(Activity activity, EventHubsConnectionS
{
csb.Endpoint,
Entity = csb.EntityPath,
PartitionKey = partitionKey,
ActivePartitionRouting = activePartitionRouting,
ConsumerGroup = consumerGroup,
EventDatas = events,
EventDatas = events.Select(TransformEvent),
receiveTask?.Status
});
}
Expand Down Expand Up @@ -262,5 +262,11 @@ private static void SetRelatedOperations(Activity activity, IEnumerable<EventDat
}

#endregion Diagnostic Context Injection

private static Azure.Messaging.EventHubs.EventData TransformEvent(EventData eventData) =>
new Azure.Messaging.EventHubs.EventData(eventData.Body.ToArray())
{
Properties = eventData.Properties
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void IsEventPositionEquivalentDetectsDifferentOffsets()
var trackOnePosition = TrackOne.EventPosition.FromOffset("12", false);
var trackTwoPosition = EventPosition.FromOffset(12);

Assert.That(TrackOneComparer.IsEventPositionEquivalent(trackOnePosition, trackTwoPosition), Is.False, "The offset for track two is inclusive; even the same base offset with non-inclusive is not equivilent.");
Assert.That(TrackOneComparer.IsEventPositionEquivalent(trackOnePosition, trackTwoPosition), Is.False, "The offset for track two is inclusive; even the same base offset with non-inclusive is not equivalent.");
}

/// <summary>
Expand All @@ -156,7 +156,7 @@ public void IsEventPositionEquivalentRecognizesSameOffsets()
var trackOnePosition = TrackOne.EventPosition.FromOffset("12", true);
var trackTwoPosition = EventPosition.FromOffset(12);

Assert.That(TrackOneComparer.IsEventPositionEquivalent(trackOnePosition, trackTwoPosition), Is.True, "The offset for track two is inclusive; the equivilent offset set as inclusive should match.");
Assert.That(TrackOneComparer.IsEventPositionEquivalent(trackOnePosition, trackTwoPosition), Is.True, "The offset for track two is inclusive; the equivalent offset set as inclusive should match.");
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ public async Task SendAsyncTransformsSimpleEvents()
var sentEvents = mock.SendCalledWithParameters.Events.ToArray();
Assert.That(sentEvents.Length, Is.EqualTo(sourceEvents.Length), "The number of events sent should match the number of source events.");

// The events should not only be structurally equivilent, the ordering of them should be preserved. Compare the
// sequence in order and validate that the events in each position are equivilent.
// The events should not only be structurally equivalent, the ordering of them should be preserved. Compare the
// sequence in order and validate that the events in each position are equivalent.

for (var index = 0; index < sentEvents.Length; ++index)
{
Expand Down Expand Up @@ -154,8 +154,8 @@ public async Task SendAsyncTransformsComplexEvents()
var sentEvents = mock.SendCalledWithParameters.Events.ToArray();
Assert.That(sentEvents.Length, Is.EqualTo(sourceEvents.Length), "The number of events sent should match the number of source events.");

// The events should not only be structurally equivilent, the ordering of them should be preserved. Compare the
// sequence in order and validate that the events in each position are equivilent.
// The events should not only be structurally equivalent, the ordering of them should be preserved. Compare the
// sequence in order and validate that the events in each position are equivalent.

for (var index = 0; index < sentEvents.Length; ++index)
{
Expand Down
Loading