Skip to content

Commit

Permalink
[EventHubs] Track Two: Second Preview (Diagnostics Tests) (Azure#6913)
Browse files Browse the repository at this point in the history
  • Loading branch information
kinelski authored Jul 18, 2019
1 parent ddc7ec5 commit 4280c4b
Show file tree
Hide file tree
Showing 14 changed files with 932 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public async Task RunAsync(string connectionString,
// time that we've allowed.
//
// We will ask for just our event, but allow a fairly long wait period to ensure that we're able to receive it.
// If you observe the time that the call takes, it is extremely likely that the request to recieve will complete
// If you observe the time that the call takes, it is extremely likely that the request to receive will complete
// long before the maximum wait time.

Stopwatch watch = Stopwatch.StartNew();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace Azure.Messaging.EventHubs.Compatibility
internal static class TrackOneExceptionExtensions
{
/// <summary>
/// Maps a <see cref="TrackOne.EventHubsException" /> or a child of it to the equivilent
/// Maps a <see cref="TrackOne.EventHubsException" /> or a child of it to the equivalent
/// exception for the new API surface.
/// </summary>
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ internal struct ConnectionStringProperties
/// Initializes a new instance of the <see cref="ConnectionStringProperties"/> struct.
/// </summary>
///
/// <param name="endpoint">The endpoint of the EVent Hubs namespace.</param>
/// <param name="endpoint">The endpoint of the Event Hubs namespace.</param>
/// <param name="eventHubPath">The path to the specific Event Hub under the namespace.</param>
/// <param name="sharedAccessKeyName">The name of the shared access key, to use authorization.</param>
/// <param name="sharedAccessKey">The shared access key to use for authorization.</param>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ protected EventDataSender(EventHubClient eventHubClient, string partitionId)
public async Task SendAsync(IEnumerable<EventData> eventDatas, string partitionKey)
{
int count = ValidateEvents(eventDatas);
var activePartitionRouting = String.IsNullOrEmpty(partitionKey) ?
this.PartitionId :
partitionKey;

EventHubsEventSource.Log.EventSendStart(this.ClientId, count, partitionKey);
Activity activity = EventHubsDiagnosticSource.StartSendActivity(this.ClientId, this.EventHubClient.ConnectionStringBuilder, partitionKey, eventDatas, count);
Activity activity = EventHubsDiagnosticSource.StartSendActivity(this.ClientId, this.EventHubClient.ConnectionStringBuilder, activePartitionRouting, eventDatas, count);

Task sendTask = null;
try
Expand All @@ -41,13 +44,13 @@ public async Task SendAsync(IEnumerable<EventData> eventDatas, string partitionK
catch (Exception exception)
{
EventHubsEventSource.Log.EventSendException(this.ClientId, exception.ToString());
EventHubsDiagnosticSource.FailSendActivity(activity, this.EventHubClient.ConnectionStringBuilder, partitionKey, eventDatas, exception);
EventHubsDiagnosticSource.FailSendActivity(activity, this.EventHubClient.ConnectionStringBuilder, activePartitionRouting, eventDatas, exception);
throw;
}
finally
{
EventHubsEventSource.Log.EventSendStop(this.ClientId);
EventHubsDiagnosticSource.StopSendActivity(activity, this.EventHubClient.ConnectionStringBuilder, partitionKey, eventDatas, sendTask);
EventHubsDiagnosticSource.StopSendActivity(activity, this.EventHubClient.ConnectionStringBuilder, activePartitionRouting, eventDatas, sendTask);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace TrackOne
/// </summary>
internal static class EventHubsDiagnosticSource
{
public const string DiagnosticSourceName = "TrackOne";
public const string DiagnosticSourceName = "Azure.Messaging.EventHubs";

public const string ActivityIdPropertyName = "Diagnostic-Id";
public const string CorrelationContextPropertyName = "Correlation-Context";
Expand All @@ -32,7 +32,7 @@ internal static class EventHubsDiagnosticSource

internal static readonly DiagnosticListener DiagnosticListener = new DiagnosticListener(DiagnosticSourceName);

internal static Activity StartSendActivity(string clientId, EventHubsConnectionStringBuilder csb, string partitionKey, IEnumerable<EventData> eventDatas, int count)
internal static Activity StartSendActivity(string clientId, EventHubsConnectionStringBuilder csb, string activePartitionRouting, IEnumerable<EventData> eventDatas, int count)
{
// skip if diagnostic source not enabled
if (!DiagnosticListener.IsEnabled())
Expand All @@ -50,7 +50,7 @@ internal static Activity StartSendActivity(string clientId, EventHubsConnectionS

activity.AddTag("peer.hostname", csb.Endpoint.Host);
activity.AddTag("eh.event_hub_name", csb.EntityPath);
activity.AddTag("eh.partition_key", partitionKey);
activity.AddTag("eh.active_partition_routing", activePartitionRouting);
activity.AddTag("eh.event_count", count.ToString());
activity.AddTag("eh.client_id", clientId);

Expand All @@ -63,8 +63,8 @@ internal static Activity StartSendActivity(string clientId, EventHubsConnectionS
{
csb.Endpoint,
Entity = csb.EntityPath,
PartitionKey = partitionKey,
EventDatas = eventDatas
ActivePartitionRouting = activePartitionRouting,
EventDatas = eventDatas.Select(TransformEvent)
});
}
else
Expand All @@ -77,7 +77,7 @@ internal static Activity StartSendActivity(string clientId, EventHubsConnectionS
return activity;
}

internal static void FailSendActivity(Activity activity, EventHubsConnectionStringBuilder csb, string partitionKey, IEnumerable<EventData> eventDatas, Exception ex)
internal static void FailSendActivity(Activity activity, EventHubsConnectionStringBuilder csb, string activePartitionRouting, IEnumerable<EventData> eventDatas, Exception ex)
{
if (!DiagnosticListener.IsEnabled() || !DiagnosticListener.IsEnabled(SendActivityExceptionName))
{
Expand All @@ -89,13 +89,13 @@ internal static void FailSendActivity(Activity activity, EventHubsConnectionStri
{
csb.Endpoint,
Entity = csb.EntityPath,
PartitionKey = partitionKey,
EventDatas = eventDatas,
ActivePartitionRouting = activePartitionRouting,
EventDatas = eventDatas.Select(TransformEvent),
Exception = ex
});
}

internal static void StopSendActivity(Activity activity, EventHubsConnectionStringBuilder csb, string partitionKey, IEnumerable<EventData> eventDatas, Task sendTask)
internal static void StopSendActivity(Activity activity, EventHubsConnectionStringBuilder csb, string activePartitionRouting, IEnumerable<EventData> eventDatas, Task sendTask)
{
if (activity == null)
{
Expand All @@ -107,16 +107,16 @@ internal static void StopSendActivity(Activity activity, EventHubsConnectionStri
{
csb.Endpoint,
Entity = csb.EntityPath,
PartitionKey = partitionKey,
EventDatas = eventDatas,
ActivePartitionRouting = activePartitionRouting,
EventDatas = eventDatas.Select(TransformEvent),
sendTask?.Status
});
}

internal static Activity StartReceiveActivity(
string clientId,
EventHubsConnectionStringBuilder csb,
string partitionKey,
string activePartitionRouting,
string consumerGroup,
EventPosition eventPosition)
{
Expand All @@ -137,7 +137,7 @@ internal static Activity StartReceiveActivity(
// extract activity tags from input
activity.AddTag("peer.hostname", csb.Endpoint.Host);
activity.AddTag("eh.event_hub_name", csb.EntityPath);
activity.AddTag("eh.partition_key", partitionKey);
activity.AddTag("eh.active_partition_routing", activePartitionRouting);
activity.AddTag("eh.consumer_group", consumerGroup);
activity.AddTag("eh.start_offset", eventPosition.Offset);
activity.AddTag("eh.start_sequence_number", eventPosition.SequenceNumber?.ToString());
Expand All @@ -153,7 +153,7 @@ internal static Activity StartReceiveActivity(
{
Endpoint = csb.Endpoint,
Entity = csb.EntityPath,
PartitionKey = partitionKey,
ActivePartitionRouting = activePartitionRouting,
ConsumerGroup = consumerGroup
});
}
Expand All @@ -165,7 +165,7 @@ internal static Activity StartReceiveActivity(
return activity;
}

internal static void FailReceiveActivity(Activity activity, EventHubsConnectionStringBuilder csb, string partitionKey, string consumerGroup, Exception ex)
internal static void FailReceiveActivity(Activity activity, EventHubsConnectionStringBuilder csb, string activePartitionRouting, string consumerGroup, Exception ex)
{
// TODO consider enriching activity with data from exception
if (!DiagnosticListener.IsEnabled() || !DiagnosticListener.IsEnabled(ReceiveActivityExceptionName))
Expand All @@ -178,13 +178,13 @@ internal static void FailReceiveActivity(Activity activity, EventHubsConnectionS
{
csb.Endpoint,
Entity = csb.EntityPath,
PartitionKey = partitionKey,
ActivePartitionRouting = activePartitionRouting,
ConsumerGroup = consumerGroup,
Exception = ex
});
}

internal static void StopReceiveActivity(Activity activity, EventHubsConnectionStringBuilder csb, string partitionKey, string consumerGroup, IList<EventData> events, Task receiveTask)
internal static void StopReceiveActivity(Activity activity, EventHubsConnectionStringBuilder csb, string activePartitionRouting, string consumerGroup, IList<EventData> events, Task receiveTask)
{
if (activity == null)
{
Expand All @@ -199,9 +199,9 @@ internal static void StopReceiveActivity(Activity activity, EventHubsConnectionS
{
csb.Endpoint,
Entity = csb.EntityPath,
PartitionKey = partitionKey,
ActivePartitionRouting = activePartitionRouting,
ConsumerGroup = consumerGroup,
EventDatas = events,
EventDatas = events.Select(TransformEvent),
receiveTask?.Status
});
}
Expand Down Expand Up @@ -262,5 +262,11 @@ private static void SetRelatedOperations(Activity activity, IEnumerable<EventDat
}

#endregion Diagnostic Context Injection

private static Azure.Messaging.EventHubs.EventData TransformEvent(EventData eventData) =>
new Azure.Messaging.EventHubs.EventData(eventData.Body.ToArray())
{
Properties = eventData.Properties
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void IsEventPositionEquivalentDetectsDifferentOffsets()
var trackOnePosition = TrackOne.EventPosition.FromOffset("12", false);
var trackTwoPosition = EventPosition.FromOffset(12);

Assert.That(TrackOneComparer.IsEventPositionEquivalent(trackOnePosition, trackTwoPosition), Is.False, "The offset for track two is inclusive; even the same base offset with non-inclusive is not equivilent.");
Assert.That(TrackOneComparer.IsEventPositionEquivalent(trackOnePosition, trackTwoPosition), Is.False, "The offset for track two is inclusive; even the same base offset with non-inclusive is not equivalent.");
}

/// <summary>
Expand All @@ -156,7 +156,7 @@ public void IsEventPositionEquivalentRecognizesSameOffsets()
var trackOnePosition = TrackOne.EventPosition.FromOffset("12", true);
var trackTwoPosition = EventPosition.FromOffset(12);

Assert.That(TrackOneComparer.IsEventPositionEquivalent(trackOnePosition, trackTwoPosition), Is.True, "The offset for track two is inclusive; the equivilent offset set as inclusive should match.");
Assert.That(TrackOneComparer.IsEventPositionEquivalent(trackOnePosition, trackTwoPosition), Is.True, "The offset for track two is inclusive; the equivalent offset set as inclusive should match.");
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ public async Task SendAsyncTransformsSimpleEvents()
var sentEvents = mock.SendCalledWithParameters.Events.ToArray();
Assert.That(sentEvents.Length, Is.EqualTo(sourceEvents.Length), "The number of events sent should match the number of source events.");

// The events should not only be structurally equivilent, the ordering of them should be preserved. Compare the
// sequence in order and validate that the events in each position are equivilent.
// The events should not only be structurally equivalent, the ordering of them should be preserved. Compare the
// sequence in order and validate that the events in each position are equivalent.

for (var index = 0; index < sentEvents.Length; ++index)
{
Expand Down Expand Up @@ -154,8 +154,8 @@ public async Task SendAsyncTransformsComplexEvents()
var sentEvents = mock.SendCalledWithParameters.Events.ToArray();
Assert.That(sentEvents.Length, Is.EqualTo(sourceEvents.Length), "The number of events sent should match the number of source events.");

// The events should not only be structurally equivilent, the ordering of them should be preserved. Compare the
// sequence in order and validate that the events in each position are equivilent.
// The events should not only be structurally equivalent, the ordering of them should be preserved. Compare the
// sequence in order and validate that the events in each position are equivalent.

for (var index = 0; index < sentEvents.Length; ++index)
{
Expand Down
Loading

0 comments on commit 4280c4b

Please sign in to comment.