From f0a9a8de58333c8034f52a57e0e474c0db72d1d8 Mon Sep 17 00:00:00 2001
From: tovyhnal <65427136+tovyhnal@users.noreply.github.com>
Date: Fri, 22 Nov 2024 01:58:02 +0100
Subject: [PATCH] add eventhub name into load balancer logs (#47271)
---
.../PartitionLoadBalancerEventSource.cs | 74 +++++++++++--------
.../src/Processor/PartitionLoadBalancer.cs | 22 +++---
.../Processor/PartitionLoadBalancerTests.cs | 22 +++---
3 files changed, 66 insertions(+), 52 deletions(-)
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Diagnostics/PartitionLoadBalancerEventSource.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Diagnostics/PartitionLoadBalancerEventSource.cs
index f98edd246bc4b..a9c4176ea943d 100755
--- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Diagnostics/PartitionLoadBalancerEventSource.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Diagnostics/PartitionLoadBalancerEventSource.cs
@@ -43,13 +43,14 @@ protected PartitionLoadBalancerEventSource() : base(EventSourceName)
///
///
/// Minimum partitions per event processor.
+ /// The name of the Event Hub that the load balancer is associated with.
///
- [Event(1, Level = EventLevel.Verbose, Message = "Expected minimum partitions per event processor '{0}'.")]
- public virtual void MinimumPartitionsPerEventProcessor(int count)
+ [Event(1, Level = EventLevel.Verbose, Message = "Expected minimum partitions per event processor '{0}' for Event Hub '{1}'.")]
+ public virtual void MinimumPartitionsPerEventProcessor(int count, string eventHubName)
{
if (IsEnabled())
{
- WriteEvent(1, count);
+ WriteEvent(1, count, eventHubName);
}
}
@@ -59,14 +60,16 @@ public virtual void MinimumPartitionsPerEventProcessor(int count)
///
/// A unique name used to identify the associated event processor.
/// Current ownership count.
+ /// The name of the Event Hub that the load balancer is associated with.
///
- [Event(2, Level = EventLevel.Informational, Message = "Current ownership count is {0}. (Identifier: '{1}')")]
+ [Event(2, Level = EventLevel.Informational, Message = "Current ownership count is {0} for Event Hub '{2}'. (Identifier: '{1}')")]
public virtual void CurrentOwnershipCount(int count,
- string identifier)
+ string identifier,
+ string eventHubName)
{
if (IsEnabled())
{
- WriteEvent(2, count, identifier ?? string.Empty);
+ WriteEvent(2, count, identifier ?? string.Empty, eventHubName);
}
}
@@ -75,13 +78,14 @@ public virtual void CurrentOwnershipCount(int count,
///
///
/// List of unclaimed partitions.
+ /// The name of the Event Hub that the load balancer is associated with.
///
- [Event(3, Level = EventLevel.Informational, Message = "Unclaimed partitions: '{0}'.")]
- public virtual void UnclaimedPartitions(HashSet unclaimedPartitions)
+ [Event(3, Level = EventLevel.Informational, Message = "Unclaimed partitions: '{0}' for Event Hub '{1}'.")]
+ public virtual void UnclaimedPartitions(HashSet unclaimedPartitions, string eventHubName)
{
if (IsEnabled())
{
- WriteEvent(3, string.Join(", ", unclaimedPartitions));
+ WriteEvent(3, string.Join(", ", unclaimedPartitions), eventHubName);
}
}
@@ -90,13 +94,14 @@ public virtual void UnclaimedPartitions(HashSet unclaimedPartitions)
///
///
/// The identifier of the Event Hub partition whose ownership claim attempt is starting.
+ /// The name of the Event Hub that the load balancer is associated with.
///
- [Event(4, Level = EventLevel.Informational, Message = "Attempting to claim ownership of partition '{0}'.")]
- public virtual void ClaimOwnershipStart(string partitionId)
+ [Event(4, Level = EventLevel.Informational, Message = "Attempting to claim ownership of partition '{0}' for Event Hub '{1}'.")]
+ public virtual void ClaimOwnershipStart(string partitionId, string eventHubName)
{
if (IsEnabled())
{
- WriteEvent(4, partitionId ?? string.Empty);
+ WriteEvent(4, partitionId ?? string.Empty, eventHubName);
}
}
@@ -106,14 +111,16 @@ public virtual void ClaimOwnershipStart(string partitionId)
///
/// The identifier of the Event Hub partition.
/// The message for the exception that occurred.
+ /// The name of the Event Hub that the load balancer is associated with.
///
- [Event(5, Level = EventLevel.Error, Message = "Failed to claim ownership of partition '{0}'. (ErrorMessage: '{1}')")]
+ [Event(5, Level = EventLevel.Error, Message = "Failed to claim ownership of partition '{0}' for Event Hub '{2}'. (ErrorMessage: '{1}')")]
public virtual void ClaimOwnershipError(string partitionId,
- string errorMessage)
+ string errorMessage,
+ string eventHubName)
{
if (IsEnabled())
{
- WriteEvent(5, partitionId ?? string.Empty, errorMessage ?? string.Empty);
+ WriteEvent(5, partitionId ?? string.Empty, errorMessage ?? string.Empty, eventHubName);
}
}
@@ -122,13 +129,14 @@ public virtual void ClaimOwnershipError(string partitionId,
///
///
/// A unique name used to identify the associated event processor.
+ /// The name of the Event Hub that the load balancer is associated with.
///
- [Event(6, Level = EventLevel.Informational, Message = "Load is unbalanced and this load balancer should steal a partition. (Identifier: '{0}')")]
- public virtual void ShouldStealPartition(string identifier)
+ [Event(6, Level = EventLevel.Informational, Message = "Load is unbalanced and this load balancer should steal a partition for Event Hub '{1}'. (Identifier: '{0}')")]
+ public virtual void ShouldStealPartition(string identifier, string eventHubName)
{
if (IsEnabled())
{
- WriteEvent(6, identifier ?? string.Empty);
+ WriteEvent(6, identifier ?? string.Empty, eventHubName);
}
}
@@ -139,15 +147,17 @@ public virtual void ShouldStealPartition(string identifier)
/// The identifier of the partition that was selected to be stolen.
/// The identifier of the event processor that is being stolen from.
/// A unique name used to identify the associated event processor.
+ /// The name of the Event Hub that the load balancer is associated with.
///
- [Event(7, Level = EventLevel.Informational, Message = "No unclaimed partitions, attempting to steal partition '{0}' from event processor '{1}'. (Identifier: '{2}')")]
+ [Event(7, Level = EventLevel.Informational, Message = "No unclaimed partitions, attempting to steal partition '{0}' from event processor '{1}' for Event Hub '{3}'. (Identifier: '{2}').")]
public virtual void StealPartition(string partitionId,
string stolenFrom,
- string identifier)
+ string identifier,
+ string eventHubName)
{
if (IsEnabled())
{
- WriteEvent(7, partitionId ?? string.Empty, stolenFrom ?? string.Empty, identifier ?? string.Empty);
+ WriteEvent(7, partitionId ?? string.Empty, stolenFrom ?? string.Empty, identifier ?? string.Empty, eventHubName);
}
}
@@ -156,13 +166,14 @@ public virtual void StealPartition(string partitionId,
///
///
/// A unique name used to identify the associated event processor.
+ /// The name of the Event Hub that the load balancer is associated with.
///
- [Event(8, Level = EventLevel.Verbose, Message = "Attempting to renew ownership. (Identifier: '{0}')")]
- public virtual void RenewOwnershipStart(string identifier)
+ [Event(8, Level = EventLevel.Verbose, Message = "Attempting to renew ownership for Event Hub '{1}'. (Identifier: '{0}')")]
+ public virtual void RenewOwnershipStart(string identifier, string eventHubName)
{
if (IsEnabled())
{
- WriteEvent(8, identifier ?? string.Empty);
+ WriteEvent(8, identifier ?? string.Empty, eventHubName);
}
}
@@ -172,14 +183,16 @@ public virtual void RenewOwnershipStart(string identifier)
///
/// A unique name used to identify the associated event processor.
/// The message for the exception that occurred.
+ /// The name of the Event Hub that the load balancer is associated with.
///
- [Event(9, Level = EventLevel.Error, Message = "Failed to renew ownership. (Identifier: '{0}'; ErrorMessage: '{0}')")]
+ [Event(9, Level = EventLevel.Error, Message = "Failed to renew ownership for Event Hub '{2}'. (Identifier: '{0}'; ErrorMessage: '{1}')")]
public virtual void RenewOwnershipError(string identifier,
- string errorMessage)
+ string errorMessage,
+ string eventHubName)
{
if (IsEnabled())
{
- WriteEvent(9, identifier ?? string.Empty, errorMessage ?? string.Empty);
+ WriteEvent(9, identifier ?? string.Empty, errorMessage ?? string.Empty, eventHubName);
}
}
@@ -188,13 +201,14 @@ public virtual void RenewOwnershipError(string identifier,
///
///
/// A unique name used to identify the associated event processor.
+ /// The name of the Event Hub that the load balancer is associated with.
///
- [Event(10, Level = EventLevel.Verbose, Message = "Attempt to renew ownership has completed. (Identifier: '{0}')")]
- public virtual void RenewOwnershipComplete(string identifier)
+ [Event(10, Level = EventLevel.Verbose, Message = "Attempt to renew ownership has completed for Event Hub '{1}'. (Identifier: '{0}')")]
+ public virtual void RenewOwnershipComplete(string identifier, string eventHubName)
{
if (IsEnabled())
{
- WriteEvent(10, identifier ?? string.Empty);
+ WriteEvent(10, identifier ?? string.Empty, eventHubName);
}
}
}
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Processor/PartitionLoadBalancer.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Processor/PartitionLoadBalancer.cs
index 6346be48d7c80..93d2def809859 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Processor/PartitionLoadBalancer.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Processor/PartitionLoadBalancer.cs
@@ -350,10 +350,10 @@ public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellatio
var unevenPartitionDistribution = (partitionCount % ActiveOwnershipWithDistribution.Keys.Count) > 0;
var minimumOwnedPartitionsCount = partitionCount / ActiveOwnershipWithDistribution.Keys.Count;
- Logger.MinimumPartitionsPerEventProcessor(minimumOwnedPartitionsCount);
+ Logger.MinimumPartitionsPerEventProcessor(minimumOwnedPartitionsCount, EventHubName);
var ownedPartitionsCount = ActiveOwnershipWithDistribution[OwnerIdentifier].Count;
- Logger.CurrentOwnershipCount(ownedPartitionsCount, OwnerIdentifier);
+ Logger.CurrentOwnershipCount(ownedPartitionsCount, OwnerIdentifier, EventHubName);
// There are two possible situations in which we may need to claim a partition ownership:
//
@@ -374,7 +374,7 @@ public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellatio
{
// Look for unclaimed partitions. If any, randomly pick one of them to claim.
- Logger.UnclaimedPartitions(unclaimedPartitions);
+ Logger.UnclaimedPartitions(unclaimedPartitions, EventHubName);
if (unclaimedPartitions.Count > 0)
{
@@ -423,7 +423,7 @@ public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellatio
if ((ownedPartitionsCount < minimumOwnedPartitionsCount)
|| (ownedPartitionsCount < maximumOwnedPartitionsCount && partitionsOwnedByProcessorWithGreaterThanMaximumOwnedPartitionsCount.Count > 0))
{
- Logger.ShouldStealPartition(OwnerIdentifier);
+ Logger.ShouldStealPartition(OwnerIdentifier, EventHubName);
// Prefer stealing from a processor that owns more than the maximum number of partitions.
@@ -444,7 +444,7 @@ public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellatio
}
}
- Logger.StealPartition(partitionToSteal, stealingFrom, OwnerIdentifier);
+ Logger.StealPartition(partitionToSteal, stealingFrom, OwnerIdentifier, EventHubName);
var returnTask = ClaimOwnershipAsync(
partitionToSteal,
@@ -474,7 +474,7 @@ public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellatio
}
}
- Logger.StealPartition(partitionToSteal, stealingFrom, OwnerIdentifier);
+ Logger.StealPartition(partitionToSteal, stealingFrom, OwnerIdentifier, EventHubName);
var returnTask = ClaimOwnershipAsync(
partitionToSteal,
@@ -501,7 +501,7 @@ private async Task RenewOwnershipAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
- Logger.RenewOwnershipStart(OwnerIdentifier);
+ Logger.RenewOwnershipStart(OwnerIdentifier, EventHubName);
var utcNow = GetDateTimeOffsetNow();
@@ -543,7 +543,7 @@ private async Task RenewOwnershipAsync(CancellationToken cancellationToken)
// If ownership renewal fails just give up and try again in the next cycle. The processor may
// end up losing some of its ownership.
- Logger.RenewOwnershipError(OwnerIdentifier, ex.Message);
+ Logger.RenewOwnershipError(OwnerIdentifier, ex.Message, EventHubName);
// Set the EventHubName to null so it doesn't modify the exception message. This exception message is
// used so the processor can retrieve the raw Operation string, and adding the EventHubName would append
@@ -553,7 +553,7 @@ private async Task RenewOwnershipAsync(CancellationToken cancellationToken)
}
finally
{
- Logger.RenewOwnershipComplete(OwnerIdentifier);
+ Logger.RenewOwnershipComplete(OwnerIdentifier, EventHubName);
}
}
@@ -572,7 +572,7 @@ private async Task RenewOwnershipAsync(CancellationToken cancellationToken)
CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
- Logger.ClaimOwnershipStart(partitionId);
+ Logger.ClaimOwnershipStart(partitionId, EventHubName);
// We need the eTag from the most recent ownership of this partition, even if it's expired. We want to keep the offset and
// the sequence number as well.
@@ -602,7 +602,7 @@ private async Task RenewOwnershipAsync(CancellationToken cancellationToken)
// If ownership claim fails, just treat it as a usual ownership claim failure.
- Logger.ClaimOwnershipError(partitionId, ex.Message);
+ Logger.ClaimOwnershipError(partitionId, ex.Message, EventHubName);
// Set the EventHubName to null so it doesn't modify the exception message. This exception message is
// used so the processor can retrieve the raw Operation string, and adding the EventHubName would append
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Processor/PartitionLoadBalancerTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Processor/PartitionLoadBalancerTests.cs
index b778ba68c1100..b03f48c5ec18a 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Processor/PartitionLoadBalancerTests.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/tests/Processor/PartitionLoadBalancerTests.cs
@@ -540,7 +540,7 @@ public async Task RunLoadBalancingAsyncDoesNotStealFromItself()
// Verify that no attempts to steal were logged.
- mockLog.Verify(log => log.ShouldStealPartition(It.IsAny()), Times.Never);
+ mockLog.Verify(log => log.ShouldStealPartition(It.IsAny(), It.IsAny()), Times.Never);
}
///
@@ -597,7 +597,7 @@ public async Task RunLoadBalancingAsyncDoesNotStealWhenLessPartitionsThanProcess
// Verify that no attempts to steal were logged.
- mockLog.Verify(log => log.ShouldStealPartition(It.IsAny()), Times.Never);
+ mockLog.Verify(log => log.ShouldStealPartition(It.IsAny(), It.IsAny()), Times.Never);
}
///
@@ -678,7 +678,7 @@ public async Task RunLoadBalancingAsyncDoesNotStealWhenTheLoadIsBalanced(int[] a
// Verify that no attempts to steal were logged.
- mockLog.Verify(log => log.ShouldStealPartition(It.IsAny()), Times.Never);
+ mockLog.Verify(log => log.ShouldStealPartition(It.IsAny(), It.IsAny()), Times.Never);
}
///
@@ -902,14 +902,14 @@ public async Task VerifiesEventProcessorLogs()
await loadbalancer.RelinquishOwnershipAsync(CancellationToken.None);
- mockLog.Verify(m => m.RenewOwnershipStart(loadbalancer.OwnerIdentifier));
- mockLog.Verify(m => m.RenewOwnershipComplete(loadbalancer.OwnerIdentifier));
- mockLog.Verify(m => m.ClaimOwnershipStart(It.Is(p => partitionIds.Contains(p))));
- mockLog.Verify(m => m.MinimumPartitionsPerEventProcessor(MinimumpartitionCount));
- mockLog.Verify(m => m.CurrentOwnershipCount(MinimumpartitionCount, loadbalancer.OwnerIdentifier));
- mockLog.Verify(m => m.StealPartition(It.IsAny(), It.IsAny(), loadbalancer.OwnerIdentifier));
- mockLog.Verify(m => m.ShouldStealPartition(loadbalancer.OwnerIdentifier));
- mockLog.Verify(m => m.UnclaimedPartitions(It.Is>(set => set.Count == 0 || set.All(item => partitionIds.Contains(item)))));
+ mockLog.Verify(m => m.RenewOwnershipStart(loadbalancer.OwnerIdentifier, loadbalancer.EventHubName));
+ mockLog.Verify(m => m.RenewOwnershipComplete(loadbalancer.OwnerIdentifier, loadbalancer.EventHubName));
+ mockLog.Verify(m => m.ClaimOwnershipStart(It.Is(p => partitionIds.Contains(p)), loadbalancer.EventHubName));
+ mockLog.Verify(m => m.MinimumPartitionsPerEventProcessor(MinimumpartitionCount, loadbalancer.EventHubName));
+ mockLog.Verify(m => m.CurrentOwnershipCount(MinimumpartitionCount, loadbalancer.OwnerIdentifier, loadbalancer.EventHubName));
+ mockLog.Verify(m => m.StealPartition(It.IsAny(), It.IsAny(), loadbalancer.OwnerIdentifier, loadbalancer.EventHubName));
+ mockLog.Verify(m => m.ShouldStealPartition(loadbalancer.OwnerIdentifier, loadbalancer.EventHubName));
+ mockLog.Verify(m => m.UnclaimedPartitions(It.Is>(set => set.Count == 0 || set.All(item => partitionIds.Contains(item))), loadbalancer.EventHubName));
}
///