Skip to content

Commit

Permalink
Instrument event processor (#7512)
Browse files Browse the repository at this point in the history
  • Loading branch information
pakrym authored Sep 16, 2019
1 parent 9d046b7 commit 065dc77
Show file tree
Hide file tree
Showing 13 changed files with 268 additions and 24 deletions.
37 changes: 34 additions & 3 deletions sdk/core/Azure.Core/src/Pipeline/AzureOperationScope.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Diagnostics;

namespace Azure.Core.Pipeline
{
public readonly struct DiagnosticScope : IDisposable
{
private readonly Activity? _activity;
private readonly DiagnosticActivity? _activity;

private readonly string _name;

Expand All @@ -18,7 +19,8 @@ internal DiagnosticScope(string name, DiagnosticListener source)
{
_name = name;
_source = source;
_activity = _source.IsEnabled() ? new Activity(_name) : null;
_activity = _source.IsEnabled() ? new DiagnosticActivity(_name) : null;
_activity?.SetW3CFormat();
}

public bool IsEnabled => _activity != null;
Expand All @@ -44,11 +46,23 @@ public void AddAttribute<T>(string name, T value, Func<T, string> format)
}
}

public void AddLink(string id)
{
if (_activity != null)
{
var linkedActivity = new Activity("LinkedActivity");
linkedActivity.SetW3CFormat();
linkedActivity.SetParentId(id);

_activity.AddLink(linkedActivity);
}
}

public void Start()
{
if (_activity != null && _source.IsEnabled(_name))
{
_source.StartActivity(_activity, null);
_source.StartActivity(_activity, _activity);
}
}

Expand Down Expand Up @@ -79,5 +93,22 @@ public void Failed(Exception e)
_source?.Write(_activity.OperationName + ".Exception", e);

}

private class DiagnosticActivity : Activity
{
private List<Activity>? _links;

public IEnumerable<Activity> Links => (IEnumerable<Activity>?)_links ?? Array.Empty<Activity>();

public DiagnosticActivity(string operationName) : base(operationName)
{
}

public void AddLink(Activity activity)
{
_links ??= new List<Activity>();
_links.Add(activity);
}
}
}
}
43 changes: 43 additions & 0 deletions sdk/core/Azure.Core/tests/ClientDiagnosticsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using Azure.Core.Pipeline;
using NUnit.Framework;

Expand Down Expand Up @@ -38,11 +39,53 @@ public void CreatesActivityWithNameAndTags()
Assert.AreEqual("ActivityName.Start", startEvent.Key);
Assert.AreEqual("ActivityName.Stop", stopEvent.Key);

Assert.AreEqual(ActivityIdFormat.W3C, activity.IdFormat);
CollectionAssert.Contains(activity.Tags, new KeyValuePair<string, string>("Attribute1", "Value1"));
CollectionAssert.Contains(activity.Tags, new KeyValuePair<string, string>("Attribute2", "2"));
CollectionAssert.Contains(activity.Tags, new KeyValuePair<string, string>("Attribute3", "3"));
}

[Test]
public void AddLinkCallsPassesLinksAsPartOfStartPayload()
{
using var testListener = new TestDiagnosticListener("Azure.Clients");
ClientDiagnostics clientDiagnostics = new ClientDiagnostics(true);

DiagnosticScope scope = clientDiagnostics.CreateScope("ActivityName");

scope.AddLink("id");
scope.AddLink("id2");
scope.Start();

KeyValuePair<string, object> startEvent = testListener.Events.Dequeue();

Activity activity = Activity.Current;

scope.Dispose();

KeyValuePair<string, object> stopEvent = testListener.Events.Dequeue();

Assert.Null(Activity.Current);
Assert.AreEqual("ActivityName.Start", startEvent.Key);
Assert.AreEqual("ActivityName.Stop", stopEvent.Key);

var activities = (IEnumerable<Activity>)startEvent.Value.GetType().GetProperty("Links").GetValue(startEvent.Value);
Activity[] activitiesArray = activities.ToArray();

Assert.AreEqual(activitiesArray.Length, 2);

Activity linkedActivity1 = activitiesArray[0];
Activity linkedActivity2 = activitiesArray[1];

Assert.AreEqual(ActivityIdFormat.W3C, linkedActivity1.IdFormat);
Assert.AreEqual("id", linkedActivity1.ParentId);

Assert.AreEqual(ActivityIdFormat.W3C, linkedActivity2.IdFormat);
Assert.AreEqual("id2", linkedActivity2.ParentId);

Assert.AreEqual(0, testListener.Events.Count);
}

[Test]
public void FailedStopsActivityAndWritesExceptionEvent()
{
Expand Down
3 changes: 2 additions & 1 deletion sdk/core/Azure.Core/tests/TestFramework/RecordMatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ public RecordMatcher(RecordedTestSanitizer sanitizer)
"x-ms-date",
"x-ms-client-request-id",
"User-Agent",
"Request-Id"
"Request-Id",
"traceparent"
};

// Headers that don't indicate meaningful changes between updated recordings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reflection;

namespace Azure.Core.Tests
{
Expand Down Expand Up @@ -37,13 +38,18 @@ public void OnNext(KeyValuePair<string, object> value)
var startSuffix = ".Start";
var stopSuffix = ".Stop";
var exceptionSuffix = ".Exception";

if (value.Key.EndsWith(startSuffix))
{
var name = value.Key.Substring(0, value.Key.Length - startSuffix.Length);
PropertyInfo propertyInfo = value.Value.GetType().GetProperty("Links");
var links = propertyInfo?.GetValue(value.Value) as IEnumerable<Activity> ?? Array.Empty<Activity>();

var scope = new ProducedDiagnosticScope()
{
Name = name,
Activity = Activity.Current
Activity = Activity.Current,
Links = links.Select(a => a.ParentId).ToList()
};
Scopes.Add(scope);
}
Expand Down Expand Up @@ -168,6 +174,7 @@ public class ProducedDiagnosticScope
public Activity Activity { get; set; }
public bool IsCompleted { get; set; }
public Exception Exception { get; set; }
public List<string> Links { get; set; } = new List<string>();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Messaging.EventHubs",
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Messaging.EventHubs.Tests", "tests\Azure.Messaging.EventHubs.Tests.csproj", "{51A23FD9-A32D-4390-9A5B-1EA7A045F0F9}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Azure.Messaging.EventHubs.Samples", "samples\Azure.Messaging.EventHubs.Samples.csproj", "{AD33C619-AC51-4F29-937B-184B4F785F57}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Messaging.EventHubs.Samples", "samples\Azure.Messaging.EventHubs.Samples.csproj", "{AD33C619-AC51-4F29-937B-184B4F785F57}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Core", "..\..\core\Azure.Core\src\Azure.Core.csproj", "{6864EFDA-9CE3-4F93-8060-554019389767}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -55,6 +57,18 @@ Global
{AD33C619-AC51-4F29-937B-184B4F785F57}.Release|x64.Build.0 = Release|Any CPU
{AD33C619-AC51-4F29-937B-184B4F785F57}.Release|x86.ActiveCfg = Release|Any CPU
{AD33C619-AC51-4F29-937B-184B4F785F57}.Release|x86.Build.0 = Release|Any CPU
{6864EFDA-9CE3-4F93-8060-554019389767}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6864EFDA-9CE3-4F93-8060-554019389767}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6864EFDA-9CE3-4F93-8060-554019389767}.Debug|x64.ActiveCfg = Debug|Any CPU
{6864EFDA-9CE3-4F93-8060-554019389767}.Debug|x64.Build.0 = Debug|Any CPU
{6864EFDA-9CE3-4F93-8060-554019389767}.Debug|x86.ActiveCfg = Debug|Any CPU
{6864EFDA-9CE3-4F93-8060-554019389767}.Debug|x86.Build.0 = Debug|Any CPU
{6864EFDA-9CE3-4F93-8060-554019389767}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6864EFDA-9CE3-4F93-8060-554019389767}.Release|Any CPU.Build.0 = Release|Any CPU
{6864EFDA-9CE3-4F93-8060-554019389767}.Release|x64.ActiveCfg = Release|Any CPU
{6864EFDA-9CE3-4F93-8060-554019389767}.Release|x64.Build.0 = Release|Any CPU
{6864EFDA-9CE3-4F93-8060-554019389767}.Release|x86.ActiveCfg = Release|Any CPU
{6864EFDA-9CE3-4F93-8060-554019389767}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
<PackageReleaseNotes>https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md</PackageReleaseNotes>
<TargetFrameworks>$(RequiredTargetFrameworks)</TargetFrameworks>
<EnableFxCopAnalyzers>false</EnableFxCopAnalyzers>
<UseProjectReferenceToAzureCore>true</UseProjectReferenceToAzureCore>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using Azure.Messaging.EventHubs.Processor;

namespace Azure.Messaging.EventHubs.Diagnostics
{
/// <summary>
Expand All @@ -18,6 +20,12 @@ internal static class DiagnosticProperty
/// <summary>The activity name associated with Event Hub producers.</summary>
public static readonly string ProducerActivityName = $"{ BaseActivityName }.{ nameof(EventHubProducer) }.Send";

/// <summary>The activity name associated with EventProcessor processing a list of events.</summary>
public static readonly string EventProcessorProcessingActivityName = $"{ BaseActivityName }.{ typeof(EventProcessor<>).Name }.Process";

/// <summary>The activity name associated with EventProcessor creating a checkpoint.</summary>
public static readonly string EventProcessorCheckpointActivityName = $"{ BaseActivityName }.{ typeof(EventProcessor<>).Name }.Checkpoint";

/// <summary>The attribute which represents a unique identifier for the diagnostics context.</summary>
public static string DiagnosticIdAttribute = "Diagnostic-Id";

Expand All @@ -38,5 +46,14 @@ internal static class DiagnosticProperty

/// <summary>The value which identifies an Event Hub producer as the type associated with the diagnostics information.</summary>
public const string EventHubProducerType = "producer";

/// <summary>The attribute which represents the kind of diagnostic scope.</summary>
public const string KindAttribute = "kind";

/// <summary>The value which identifies the Event Processor scope kind.</summary>
public const string ServerKind = "server";

/// <summary>The value which identifies the message instrumentation scope kind.</summary>
public const string InternalKind = "internal";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,23 @@ namespace Azure.Messaging.EventHubs.Diagnostics
///
internal class EventDataInstrumentation
{
/// <summary>The client diagnostics instance responsible for managing scope.</summary>
public static ClientDiagnostics ClientDiagnostics { get; } = new ClientDiagnostics(true);

/// <summary>
/// Applies diagnostics instrumentation to a given event.
/// </summary>
///
/// <param name="clientDiagnostics">The client diagnostics instance responsible for managing scope.</param>
/// <param name="eventData">The event to instrument.</param>
///
/// <returns><c>true</c> if the event was instrumented in response to this request; otherwise, <c>false</c>.</returns>
///
public static bool InstrumentEvent(ClientDiagnostics clientDiagnostics,
EventData eventData)
public static bool InstrumentEvent(EventData eventData)
{
if (!eventData.Properties.ContainsKey(DiagnosticProperty.DiagnosticIdAttribute))
{
using DiagnosticScope messageScope = clientDiagnostics.CreateScope(DiagnosticProperty.EventActivityName);
using DiagnosticScope messageScope = ClientDiagnostics.CreateScope(DiagnosticProperty.EventActivityName);
messageScope.AddAttribute("kind", "internal");
messageScope.Start();

var activity = Activity.Current;
Expand All @@ -41,6 +43,27 @@ public static bool InstrumentEvent(ClientDiagnostics clientDiagnostics,
return false;
}

/// <summary>
/// Extracts a diagnostic id from the given event.
/// </summary>
///
/// <param name="eventData">The event to instrument.</param>
/// <param name="id">The value of </param>
/// <returns><c>true</c> if the event was contained the diagnostic id; otherwise, <c>false</c>.</returns>
///
public static bool TryExtractDiagnosticId(EventData eventData, out string id)
{
id = null;

if (eventData.Properties.TryGetValue(DiagnosticProperty.DiagnosticIdAttribute, out var objectId) && objectId is string stringId)
{
id = stringId;
return true;
}

return false;
}

/// <summary>
/// Resets the instrumentation associated with a given event.
/// </summary>
Expand Down
7 changes: 1 addition & 6 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/EventDataBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ namespace Azure.Messaging.EventHubs
///
public sealed class EventDataBatch : IDisposable
{
/// <summary>The client diagnostics instance responsible for managing scope.</summary>
private readonly ClientDiagnostics _clientDiagnostics;

/// <summary>
/// The maximum size allowed for the batch, in bytes. This includes the events in the batch as
/// well as any overhead for the batch itself when sent to the Event Hubs service.
Expand Down Expand Up @@ -76,8 +73,6 @@ internal EventDataBatch(TransportEventBatch transportBatch,

InnerBatch = transportBatch;
SendOptions = sendOptions;

_clientDiagnostics = new ClientDiagnostics(isActivityEnabled: true);
}

/// <summary>
Expand All @@ -91,7 +86,7 @@ internal EventDataBatch(TransportEventBatch transportBatch,
///
public bool TryAdd(EventData eventData)
{
bool instrumented = EventDataInstrumentation.InstrumentEvent(_clientDiagnostics, eventData);
bool instrumented = EventDataInstrumentation.InstrumentEvent(eventData);
bool added = InnerBatch.TryAdd(eventData);

if (!added && instrumented)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ public class EventHubProducer : IAsyncDisposable
/// <summary>The set of default publishing options to use when no specific options are requested.</summary>
private static readonly SendOptions DefaultSendOptions = new SendOptions();

/// <summary>The client diagnostics instance responsible for managing scope.</summary>
private readonly ClientDiagnostics _clientDiagnostics;

/// <summary>The fully-qualified location of the Event Hub instance to which events will be sent.</summary>
private readonly Uri _endpoint;

Expand Down Expand Up @@ -135,7 +132,6 @@ internal EventHubProducer(TransportEventHubProducer transportProducer,

_endpoint = endpoint;
_retryPolicy = retryPolicy;
_clientDiagnostics = new ClientDiagnostics(true);
}

/// <summary>
Expand Down Expand Up @@ -390,7 +386,7 @@ public virtual async Task<EventDataBatch> CreateBatchAsync(BatchOptions options,
///
private DiagnosticScope CreateDiagnosticScope()
{
DiagnosticScope scope = _clientDiagnostics.CreateScope(DiagnosticProperty.ProducerActivityName);
DiagnosticScope scope = EventDataInstrumentation.ClientDiagnostics.CreateScope(DiagnosticProperty.ProducerActivityName);
scope.AddAttribute(DiagnosticProperty.TypeAttribute, DiagnosticProperty.EventHubProducerType);
scope.AddAttribute(DiagnosticProperty.ServiceContextAttribute, DiagnosticProperty.EventHubsServiceContext);
scope.AddAttribute(DiagnosticProperty.EventHubAttribute, EventHubName);
Expand All @@ -410,7 +406,7 @@ private void InstrumentMessages(IEnumerable<EventData> events)
{
foreach (var eventData in events)
{
EventDataInstrumentation.InstrumentEvent(_clientDiagnostics, eventData);
EventDataInstrumentation.InstrumentEvent(eventData);
}
}

Expand Down
Loading

0 comments on commit 065dc77

Please sign in to comment.