Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument event processor #7512

Merged
merged 9 commits into from
Sep 16, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion sdk/core/Azure.Core/src/Pipeline/AzureOperationScope.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ internal DiagnosticScope(string name, DiagnosticListener source)
{
_name = name;
_source = source;
_activity = _source.IsEnabled() ? new Activity(_name) : null;
_activity = _source.IsEnabled() ? new Activity(_name) : null;
_activity?.SetIdFormat(ActivityIdFormat.W3C);
}

public bool IsEnabled => _activity != null;
Expand All @@ -44,6 +45,18 @@ public void AddAttribute<T>(string name, T value, Func<T, string> format)
}
}

public void AddLink(string id)
{
if (_activity != null)
{
var linkedActivity = new Activity("LinkedActivity");
linkedActivity.SetIdFormat(ActivityIdFormat.W3C);
linkedActivity.SetParentId(id);

_source.Write(_activity.OperationName + ".AddLink", linkedActivity);
}
}

public void Start()
{
if (_activity != null && _source.IsEnabled(_name))
Expand Down
45 changes: 44 additions & 1 deletion sdk/core/Azure.Core/tests/ClientDiagnosticsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,54 @@ public void CreatesActivityWithNameAndTags()
Assert.AreEqual("ActivityName.Start", startEvent.Key);
Assert.AreEqual("ActivityName.Stop", stopEvent.Key);

Assert.AreEqual(ActivityIdFormat.W3C, activity.IdFormat);
CollectionAssert.Contains(activity.Tags, new KeyValuePair<string, string>("Attribute1", "Value1"));
CollectionAssert.Contains(activity.Tags, new KeyValuePair<string, string>("Attribute2", "2"));
CollectionAssert.Contains(activity.Tags, new KeyValuePair<string, string>("Attribute3", "3"));
}

[Test]
public void AddLinkCallsAddLinkWithActivity()
{

using var testListener = new TestDiagnosticListener("Azure.Clients");
ClientDiagnostics clientDiagnostics = new ClientDiagnostics(true);

DiagnosticScope scope = clientDiagnostics.CreateScope("ActivityName");

scope.AddAttribute("Attribute1", "Value1");
scope.AddAttribute("Attribute2", 2, i => i.ToString());

scope.Start();

KeyValuePair<string, object> startEvent = testListener.Events.Dequeue();

Activity activity = Activity.Current;

var exception = new Exception();
scope.AddLink("id");
scope.Dispose();

KeyValuePair<string, object> addLinkEvent = testListener.Events.Dequeue();
KeyValuePair<string, object> stopEvent = testListener.Events.Dequeue();


Assert.Null(Activity.Current);
Assert.AreEqual("ActivityName.Start", startEvent.Key);
Assert.AreEqual("ActivityName.AddLink", addLinkEvent.Key);
Assert.AreEqual("ActivityName.Stop", stopEvent.Key);
Assert.IsInstanceOf<Activity>(addLinkEvent.Value);

var linkedActivity = (Activity)addLinkEvent.Value;

Assert.AreEqual(ActivityIdFormat.W3C, linkedActivity.IdFormat);
Assert.AreEqual("id", linkedActivity.ParentId);
Assert.AreEqual(0, testListener.Events.Count);

CollectionAssert.Contains(activity.Tags, new KeyValuePair<string, string>("Attribute1", "Value1"));
CollectionAssert.Contains(activity.Tags, new KeyValuePair<string, string>("Attribute2", "2"));
}

[Test]
public void FailedStopsActivityAndWritesExceptionEvent()
{
Expand All @@ -63,7 +106,7 @@ public void FailedStopsActivityAndWritesExceptionEvent()
var exception = new Exception();
scope.Failed(exception);
scope.Dispose();

KeyValuePair<string, object> exceptionEvent = testListener.Events.Dequeue();
KeyValuePair<string, object> stopEvent = testListener.Events.Dequeue();

Expand Down
3 changes: 2 additions & 1 deletion sdk/core/Azure.Core/tests/TestFramework/RecordMatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ public RecordMatcher(RecordedTestSanitizer sanitizer)
"x-ms-date",
"x-ms-client-request-id",
"User-Agent",
"Request-Id"
"Request-Id",
"traceparent"
};

// Headers that don't indicate meaningful changes between updated recordings
Expand Down
19 changes: 17 additions & 2 deletions sdk/core/Azure.Core/tests/TestFramework/TestDiagnosticListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public void OnNext(KeyValuePair<string, object> value)
var startSuffix = ".Start";
var stopSuffix = ".Stop";
var exceptionSuffix = ".Exception";
var addLinkSuffix = ".AddLink";
if (value.Key.EndsWith(startSuffix))
{
var name = value.Key.Substring(0, value.Key.Length - startSuffix.Length);
Expand All @@ -47,6 +48,19 @@ public void OnNext(KeyValuePair<string, object> value)
};
Scopes.Add(scope);
}
else if (value.Key.EndsWith(addLinkSuffix))
{
var name = value.Key.Substring(0, value.Key.Length - addLinkSuffix.Length);
foreach (var producedDiagnosticScope in Scopes)
{
if (producedDiagnosticScope.Name == name)
{
producedDiagnosticScope.Links.Add(((Activity)value.Value).ParentId);
return;
}
}
throw new InvalidOperationException($"Event '{name}' was not started");
}
else if (value.Key.EndsWith(stopSuffix))
{
var name = value.Key.Substring(0, value.Key.Length - stopSuffix.Length);
Expand Down Expand Up @@ -90,7 +104,7 @@ public void OnNext(DiagnosticListener value)
}
}
}

public void Dispose()
{
List<IDisposable> subscriptions;
Expand All @@ -113,7 +127,7 @@ public void Dispose()
}
}
}

public ProducedDiagnosticScope AssertScopeStarted(string name, params KeyValuePair<string, string>[] expectedAttributes)
{
lock (Scopes)
Expand Down Expand Up @@ -168,6 +182,7 @@ public class ProducedDiagnosticScope
public Activity Activity { get; set; }
public bool IsCompleted { get; set; }
public Exception Exception { get; set; }
public List<string> Links { get; set; } = new List<string>();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Messaging.EventHubs",
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Messaging.EventHubs.Tests", "tests\Azure.Messaging.EventHubs.Tests.csproj", "{51A23FD9-A32D-4390-9A5B-1EA7A045F0F9}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Azure.Messaging.EventHubs.Samples", "samples\Azure.Messaging.EventHubs.Samples.csproj", "{AD33C619-AC51-4F29-937B-184B4F785F57}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Messaging.EventHubs.Samples", "samples\Azure.Messaging.EventHubs.Samples.csproj", "{AD33C619-AC51-4F29-937B-184B4F785F57}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Core", "..\..\core\Azure.Core\src\Azure.Core.csproj", "{6864EFDA-9CE3-4F93-8060-554019389767}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -55,6 +57,18 @@ Global
{AD33C619-AC51-4F29-937B-184B4F785F57}.Release|x64.Build.0 = Release|Any CPU
{AD33C619-AC51-4F29-937B-184B4F785F57}.Release|x86.ActiveCfg = Release|Any CPU
{AD33C619-AC51-4F29-937B-184B4F785F57}.Release|x86.Build.0 = Release|Any CPU
{6864EFDA-9CE3-4F93-8060-554019389767}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6864EFDA-9CE3-4F93-8060-554019389767}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6864EFDA-9CE3-4F93-8060-554019389767}.Debug|x64.ActiveCfg = Debug|Any CPU
{6864EFDA-9CE3-4F93-8060-554019389767}.Debug|x64.Build.0 = Debug|Any CPU
{6864EFDA-9CE3-4F93-8060-554019389767}.Debug|x86.ActiveCfg = Debug|Any CPU
{6864EFDA-9CE3-4F93-8060-554019389767}.Debug|x86.Build.0 = Debug|Any CPU
{6864EFDA-9CE3-4F93-8060-554019389767}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6864EFDA-9CE3-4F93-8060-554019389767}.Release|Any CPU.Build.0 = Release|Any CPU
{6864EFDA-9CE3-4F93-8060-554019389767}.Release|x64.ActiveCfg = Release|Any CPU
{6864EFDA-9CE3-4F93-8060-554019389767}.Release|x64.Build.0 = Release|Any CPU
{6864EFDA-9CE3-4F93-8060-554019389767}.Release|x86.ActiveCfg = Release|Any CPU
{6864EFDA-9CE3-4F93-8060-554019389767}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using Azure.Messaging.EventHubs.Processor;

namespace Azure.Messaging.EventHubs.Diagnostics
{
/// <summary>
Expand All @@ -18,6 +20,12 @@ internal static class DiagnosticProperty
/// <summary>The activity name associated with Event Hub producers.</summary>
public static readonly string ProducerActivityName = $"{ BaseActivityName }.{ nameof(EventHubProducer) }.Send";

/// <summary>The activity name associated with EventProcessor processing a list of events.</summary>
public static readonly string EventProcessorProcessingActivityName = $"{ BaseActivityName }.{ nameof(EventProcessor) }.Process";

/// <summary>The activity name associated with EventProcessor creating a checkpoint.</summary>
public static readonly string EventProcessorCheckpointActivityName = $"{ BaseActivityName }.{ nameof(EventProcessor) }.Checkpoint";

/// <summary>The attribute which represents a unique identifier for the diagnostics context.</summary>
public static string DiagnosticIdAttribute = "Diagnostic-Id";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,21 @@ namespace Azure.Messaging.EventHubs.Diagnostics
///
internal class EventDataInstrumentation
{
public static ClientDiagnostics ClientDiagnostics { get; } = new ClientDiagnostics(true);
pakrym marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Applies diagnostics instrumentation to a given event.
/// </summary>
///
/// <param name="clientDiagnostics">The client diagnostics instance responsible for managing scope.</param>
/// <param name="eventData">The event to instrument.</param>
///
/// <returns><c>true</c> if the event was instrumented in response to this request; otherwise, <c>false</c>.</returns>
///
public static bool InstrumentEvent(ClientDiagnostics clientDiagnostics,
EventData eventData)
public static bool InstrumentEvent(EventData eventData)
{
if (!eventData.Properties.ContainsKey(DiagnosticProperty.DiagnosticIdAttribute))
{
using DiagnosticScope messageScope = clientDiagnostics.CreateScope(DiagnosticProperty.EventActivityName);
using DiagnosticScope messageScope = ClientDiagnostics.CreateScope(DiagnosticProperty.EventActivityName);
messageScope.Start();

var activity = Activity.Current;
Expand All @@ -41,6 +41,19 @@ public static bool InstrumentEvent(ClientDiagnostics clientDiagnostics,
return false;
}

public static bool TryExtractDiagnosticId(EventData eventData, out string id)
pakrym marked this conversation as resolved.
Show resolved Hide resolved
{
id = null;

if (eventData.Properties.TryGetValue(DiagnosticProperty.DiagnosticIdAttribute, out var objectId) && objectId is string stringId)
{
id = stringId;
return true;
}

return false;
}

/// <summary>
/// Resets the instrumentation associated with a given event.
/// </summary>
Expand Down
7 changes: 1 addition & 6 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/EventDataBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ namespace Azure.Messaging.EventHubs
///
public sealed class EventDataBatch : IDisposable
{
/// <summary>The client diagnostics instance responsible for managing scope.</summary>
private readonly ClientDiagnostics _clientDiagnostics;

/// <summary>
/// The maximum size allowed for the batch, in bytes. This includes the events in the batch as
/// well as any overhead for the batch itself when sent to the Event Hubs service.
Expand Down Expand Up @@ -76,8 +73,6 @@ internal EventDataBatch(TransportEventBatch transportBatch,

InnerBatch = transportBatch;
SendOptions = sendOptions;

_clientDiagnostics = new ClientDiagnostics(isActivityEnabled: true);
}

/// <summary>
Expand All @@ -91,7 +86,7 @@ internal EventDataBatch(TransportEventBatch transportBatch,
///
public bool TryAdd(EventData eventData)
{
bool instrumented = EventDataInstrumentation.InstrumentEvent(_clientDiagnostics, eventData);
bool instrumented = EventDataInstrumentation.InstrumentEvent(eventData);
bool added = InnerBatch.TryAdd(eventData);

if (!added && instrumented)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ public class EventHubProducer : IAsyncDisposable
/// <summary>The set of default publishing options to use when no specific options are requested.</summary>
private static readonly SendOptions DefaultSendOptions = new SendOptions();

/// <summary>The client diagnostics instance responsible for managing scope.</summary>
private readonly ClientDiagnostics _clientDiagnostics;

/// <summary>The fully-qualified location of the Event Hub instance to which events will be sent.</summary>
private readonly Uri _endpoint;

Expand Down Expand Up @@ -135,7 +132,6 @@ internal EventHubProducer(TransportEventHubProducer transportProducer,

_endpoint = endpoint;
_retryPolicy = retryPolicy;
_clientDiagnostics = new ClientDiagnostics(true);
}

/// <summary>
Expand Down Expand Up @@ -390,7 +386,7 @@ public virtual async Task<EventDataBatch> CreateBatchAsync(BatchOptions options,
///
private DiagnosticScope CreateDiagnosticScope()
{
DiagnosticScope scope = _clientDiagnostics.CreateScope(DiagnosticProperty.ProducerActivityName);
DiagnosticScope scope = EventDataInstrumentation.ClientDiagnostics.CreateScope(DiagnosticProperty.ProducerActivityName);
scope.AddAttribute(DiagnosticProperty.TypeAttribute, DiagnosticProperty.EventHubProducerType);
scope.AddAttribute(DiagnosticProperty.ServiceContextAttribute, DiagnosticProperty.EventHubsServiceContext);
scope.AddAttribute(DiagnosticProperty.EventHubAttribute, EventHubName);
Expand All @@ -410,7 +406,7 @@ private void InstrumentMessages(IEnumerable<EventData> events)
{
foreach (var eventData in events)
{
EventDataInstrumentation.InstrumentEvent(_clientDiagnostics, eventData);
EventDataInstrumentation.InstrumentEvent(eventData);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Threading.Tasks;
using Azure.Core.Pipeline;
using Azure.Messaging.EventHubs.Diagnostics;

namespace Azure.Messaging.EventHubs.Processor
{
Expand Down Expand Up @@ -81,7 +84,17 @@ public async Task UpdateCheckpointAsync(long offset,
sequenceNumber
);

await Manager.UpdateCheckpointAsync(checkpoint).ConfigureAwait(false);
using DiagnosticScope scope =
EventDataInstrumentation.ClientDiagnostics.CreateScope(DiagnosticProperty.EventProcessorCheckpointActivityName);
scope.Start();
try
{
await Manager.UpdateCheckpointAsync(checkpoint).ConfigureAwait(false);
}
catch (Exception e)
{
scope.Failed(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core.Pipeline;
using Azure.Messaging.EventHubs.Core;
using Azure.Messaging.EventHubs.Diagnostics;

namespace Azure.Messaging.EventHubs.Processor
{
Expand Down Expand Up @@ -233,12 +235,28 @@ private async Task RunAsync(CancellationToken cancellationToken)
{
receivedEvents = await InnerConsumer.ReceiveAsync(Options.MaximumMessageCount, Options.MaximumReceiveWaitTime, cancellationToken).ConfigureAwait(false);

using DiagnosticScope diagnosticScope = EventDataInstrumentation.ClientDiagnostics.CreateScope(DiagnosticProperty.EventProcessorProcessingActivityName);
pakrym marked this conversation as resolved.
Show resolved Hide resolved

diagnosticScope.Start();
pakrym marked this conversation as resolved.
Show resolved Hide resolved

if (diagnosticScope.IsEnabled)
{
foreach (var eventData in receivedEvents)
{
if (EventDataInstrumentation.TryExtractDiagnosticId(eventData, out string diagnosticId))
{
diagnosticScope.AddLink(diagnosticId);
}
}
}

try
{
await PartitionProcessor.ProcessEventsAsync(receivedEvents, cancellationToken).ConfigureAwait(false);
}
catch (Exception partitionProcessorException)
{
diagnosticScope.Failed(partitionProcessorException);
unrecoverableException = partitionProcessorException;
CloseReason = PartitionProcessorCloseReason.PartitionProcessorException;

Expand Down
Loading