Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add eventhub name into load balancer logs #47271

Merged
merged 1 commit into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ protected PartitionLoadBalancerEventSource() : base(EventSourceName)
/// </summary>
///
/// <param name="count"> Minimum partitions per event processor.</param>
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
///
[Event(1, Level = EventLevel.Verbose, Message = "Expected minimum partitions per event processor '{0}'.")]
public virtual void MinimumPartitionsPerEventProcessor(int count)
[Event(1, Level = EventLevel.Verbose, Message = "Expected minimum partitions per event processor '{0}' for Event Hub '{1}'.")]
public virtual void MinimumPartitionsPerEventProcessor(int count, string eventHubName)
{
if (IsEnabled())
{
WriteEvent(1, count);
WriteEvent(1, count, eventHubName);
}
}

Expand All @@ -59,14 +60,16 @@ public virtual void MinimumPartitionsPerEventProcessor(int count)
///
/// <param name="identifier">A unique name used to identify the associated event processor.</param>
/// <param name="count"> Current ownership count.</param>
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
///
[Event(2, Level = EventLevel.Informational, Message = "Current ownership count is {0}. (Identifier: '{1}')")]
[Event(2, Level = EventLevel.Informational, Message = "Current ownership count is {0} for Event Hub '{2}'. (Identifier: '{1}')")]
public virtual void CurrentOwnershipCount(int count,
string identifier)
string identifier,
string eventHubName)
{
if (IsEnabled())
{
WriteEvent(2, count, identifier ?? string.Empty);
WriteEvent(2, count, identifier ?? string.Empty, eventHubName);
}
}

Expand All @@ -75,13 +78,14 @@ public virtual void CurrentOwnershipCount(int count,
/// </summary>
///
/// <param name="unclaimedPartitions">List of unclaimed partitions.</param>
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
///
[Event(3, Level = EventLevel.Informational, Message = "Unclaimed partitions: '{0}'.")]
public virtual void UnclaimedPartitions(HashSet<string> unclaimedPartitions)
[Event(3, Level = EventLevel.Informational, Message = "Unclaimed partitions: '{0}' for Event Hub '{1}'.")]
public virtual void UnclaimedPartitions(HashSet<string> unclaimedPartitions, string eventHubName)
{
if (IsEnabled())
{
WriteEvent(3, string.Join(", ", unclaimedPartitions));
WriteEvent(3, string.Join(", ", unclaimedPartitions), eventHubName);
}
}

Expand All @@ -90,13 +94,14 @@ public virtual void UnclaimedPartitions(HashSet<string> unclaimedPartitions)
/// </summary>
///
/// <param name="partitionId">The identifier of the Event Hub partition whose ownership claim attempt is starting.</param>
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
///
[Event(4, Level = EventLevel.Informational, Message = "Attempting to claim ownership of partition '{0}'.")]
public virtual void ClaimOwnershipStart(string partitionId)
[Event(4, Level = EventLevel.Informational, Message = "Attempting to claim ownership of partition '{0}' for Event Hub '{1}'.")]
public virtual void ClaimOwnershipStart(string partitionId, string eventHubName)
{
if (IsEnabled())
{
WriteEvent(4, partitionId ?? string.Empty);
WriteEvent(4, partitionId ?? string.Empty, eventHubName);
}
}

Expand All @@ -106,14 +111,16 @@ public virtual void ClaimOwnershipStart(string partitionId)
///
/// <param name="partitionId">The identifier of the Event Hub partition.</param>
/// <param name="errorMessage">The message for the exception that occurred.</param>
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
///
[Event(5, Level = EventLevel.Error, Message = "Failed to claim ownership of partition '{0}'. (ErrorMessage: '{1}')")]
[Event(5, Level = EventLevel.Error, Message = "Failed to claim ownership of partition '{0}' for Event Hub '{2}'. (ErrorMessage: '{1}')")]
public virtual void ClaimOwnershipError(string partitionId,
string errorMessage)
string errorMessage,
string eventHubName)
{
if (IsEnabled())
{
WriteEvent(5, partitionId ?? string.Empty, errorMessage ?? string.Empty);
WriteEvent(5, partitionId ?? string.Empty, errorMessage ?? string.Empty, eventHubName);
}
}

Expand All @@ -122,13 +129,14 @@ public virtual void ClaimOwnershipError(string partitionId,
/// </summary>
///
/// <param name="identifier">A unique name used to identify the associated event processor.</param>
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
///
[Event(6, Level = EventLevel.Informational, Message = "Load is unbalanced and this load balancer should steal a partition. (Identifier: '{0}')")]
public virtual void ShouldStealPartition(string identifier)
[Event(6, Level = EventLevel.Informational, Message = "Load is unbalanced and this load balancer should steal a partition for Event Hub '{1}'. (Identifier: '{0}')")]
public virtual void ShouldStealPartition(string identifier, string eventHubName)
{
if (IsEnabled())
{
WriteEvent(6, identifier ?? string.Empty);
WriteEvent(6, identifier ?? string.Empty, eventHubName);
}
}

Expand All @@ -139,15 +147,17 @@ public virtual void ShouldStealPartition(string identifier)
/// <param name="partitionId">The identifier of the partition that was selected to be stolen.</param>
/// <param name="stolenFrom">The identifier of the event processor that is being stolen from.</param>
/// <param name="identifier">A unique name used to identify the associated event processor.</param>
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
///
[Event(7, Level = EventLevel.Informational, Message = "No unclaimed partitions, attempting to steal partition '{0}' from event processor '{1}'. (Identifier: '{2}')")]
[Event(7, Level = EventLevel.Informational, Message = "No unclaimed partitions, attempting to steal partition '{0}' from event processor '{1}' for Event Hub '{3}'. (Identifier: '{2}').")]
public virtual void StealPartition(string partitionId,
string stolenFrom,
string identifier)
string identifier,
string eventHubName)
{
if (IsEnabled())
{
WriteEvent(7, partitionId ?? string.Empty, stolenFrom ?? string.Empty, identifier ?? string.Empty);
WriteEvent(7, partitionId ?? string.Empty, stolenFrom ?? string.Empty, identifier ?? string.Empty, eventHubName);
}
}

Expand All @@ -156,13 +166,14 @@ public virtual void StealPartition(string partitionId,
/// </summary>
///
/// <param name="identifier">A unique name used to identify the associated event processor.</param>
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
///
[Event(8, Level = EventLevel.Verbose, Message = "Attempting to renew ownership. (Identifier: '{0}')")]
public virtual void RenewOwnershipStart(string identifier)
[Event(8, Level = EventLevel.Verbose, Message = "Attempting to renew ownership for Event Hub '{1}'. (Identifier: '{0}')")]
public virtual void RenewOwnershipStart(string identifier, string eventHubName)
{
if (IsEnabled())
{
WriteEvent(8, identifier ?? string.Empty);
WriteEvent(8, identifier ?? string.Empty, eventHubName);
}
}

Expand All @@ -172,14 +183,16 @@ public virtual void RenewOwnershipStart(string identifier)
///
/// <param name="identifier">A unique name used to identify the associated event processor.</param>
/// <param name="errorMessage">The message for the exception that occurred.</param>
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
///
[Event(9, Level = EventLevel.Error, Message = "Failed to renew ownership. (Identifier: '{0}'; ErrorMessage: '{0}')")]
[Event(9, Level = EventLevel.Error, Message = "Failed to renew ownership for Event Hub '{2}'. (Identifier: '{0}'; ErrorMessage: '{1}')")]
public virtual void RenewOwnershipError(string identifier,
string errorMessage)
string errorMessage,
string eventHubName)
{
if (IsEnabled())
{
WriteEvent(9, identifier ?? string.Empty, errorMessage ?? string.Empty);
WriteEvent(9, identifier ?? string.Empty, errorMessage ?? string.Empty, eventHubName);
}
}

Expand All @@ -188,13 +201,14 @@ public virtual void RenewOwnershipError(string identifier,
/// </summary>
///
/// <param name="identifier">A unique name used to identify the associated event processor.</param>
/// <param name="eventHubName">The name of the Event Hub that the load balancer is associated with.</param>
///
[Event(10, Level = EventLevel.Verbose, Message = "Attempt to renew ownership has completed. (Identifier: '{0}')")]
public virtual void RenewOwnershipComplete(string identifier)
[Event(10, Level = EventLevel.Verbose, Message = "Attempt to renew ownership has completed for Event Hub '{1}'. (Identifier: '{0}')")]
public virtual void RenewOwnershipComplete(string identifier, string eventHubName)
{
if (IsEnabled())
{
WriteEvent(10, identifier ?? string.Empty);
WriteEvent(10, identifier ?? string.Empty, eventHubName);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,10 +350,10 @@ public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellatio

var unevenPartitionDistribution = (partitionCount % ActiveOwnershipWithDistribution.Keys.Count) > 0;
var minimumOwnedPartitionsCount = partitionCount / ActiveOwnershipWithDistribution.Keys.Count;
Logger.MinimumPartitionsPerEventProcessor(minimumOwnedPartitionsCount);
Logger.MinimumPartitionsPerEventProcessor(minimumOwnedPartitionsCount, EventHubName);

var ownedPartitionsCount = ActiveOwnershipWithDistribution[OwnerIdentifier].Count;
Logger.CurrentOwnershipCount(ownedPartitionsCount, OwnerIdentifier);
Logger.CurrentOwnershipCount(ownedPartitionsCount, OwnerIdentifier, EventHubName);

// There are two possible situations in which we may need to claim a partition ownership:
//
Expand All @@ -374,7 +374,7 @@ public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellatio
{
// Look for unclaimed partitions. If any, randomly pick one of them to claim.

Logger.UnclaimedPartitions(unclaimedPartitions);
Logger.UnclaimedPartitions(unclaimedPartitions, EventHubName);

if (unclaimedPartitions.Count > 0)
{
Expand Down Expand Up @@ -423,7 +423,7 @@ public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellatio
if ((ownedPartitionsCount < minimumOwnedPartitionsCount)
|| (ownedPartitionsCount < maximumOwnedPartitionsCount && partitionsOwnedByProcessorWithGreaterThanMaximumOwnedPartitionsCount.Count > 0))
{
Logger.ShouldStealPartition(OwnerIdentifier);
Logger.ShouldStealPartition(OwnerIdentifier, EventHubName);

// Prefer stealing from a processor that owns more than the maximum number of partitions.

Expand All @@ -444,7 +444,7 @@ public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellatio
}
}

Logger.StealPartition(partitionToSteal, stealingFrom, OwnerIdentifier);
Logger.StealPartition(partitionToSteal, stealingFrom, OwnerIdentifier, EventHubName);

var returnTask = ClaimOwnershipAsync(
partitionToSteal,
Expand Down Expand Up @@ -474,7 +474,7 @@ public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellatio
}
}

Logger.StealPartition(partitionToSteal, stealingFrom, OwnerIdentifier);
Logger.StealPartition(partitionToSteal, stealingFrom, OwnerIdentifier, EventHubName);

var returnTask = ClaimOwnershipAsync(
partitionToSteal,
Expand All @@ -501,7 +501,7 @@ private async Task RenewOwnershipAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

Logger.RenewOwnershipStart(OwnerIdentifier);
Logger.RenewOwnershipStart(OwnerIdentifier, EventHubName);

var utcNow = GetDateTimeOffsetNow();

Expand Down Expand Up @@ -543,7 +543,7 @@ private async Task RenewOwnershipAsync(CancellationToken cancellationToken)
// If ownership renewal fails just give up and try again in the next cycle. The processor may
// end up losing some of its ownership.

Logger.RenewOwnershipError(OwnerIdentifier, ex.Message);
Logger.RenewOwnershipError(OwnerIdentifier, ex.Message, EventHubName);

// Set the EventHubName to null so it doesn't modify the exception message. This exception message is
// used so the processor can retrieve the raw Operation string, and adding the EventHubName would append
Expand All @@ -553,7 +553,7 @@ private async Task RenewOwnershipAsync(CancellationToken cancellationToken)
}
finally
{
Logger.RenewOwnershipComplete(OwnerIdentifier);
Logger.RenewOwnershipComplete(OwnerIdentifier, EventHubName);
}
}

Expand All @@ -572,7 +572,7 @@ private async Task RenewOwnershipAsync(CancellationToken cancellationToken)
CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
Logger.ClaimOwnershipStart(partitionId);
Logger.ClaimOwnershipStart(partitionId, EventHubName);

// We need the eTag from the most recent ownership of this partition, even if it's expired. We want to keep the offset and
// the sequence number as well.
Expand Down Expand Up @@ -602,7 +602,7 @@ private async Task RenewOwnershipAsync(CancellationToken cancellationToken)

// If ownership claim fails, just treat it as a usual ownership claim failure.

Logger.ClaimOwnershipError(partitionId, ex.Message);
Logger.ClaimOwnershipError(partitionId, ex.Message, EventHubName);

// Set the EventHubName to null so it doesn't modify the exception message. This exception message is
// used so the processor can retrieve the raw Operation string, and adding the EventHubName would append
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ public async Task RunLoadBalancingAsyncDoesNotStealFromItself()

// Verify that no attempts to steal were logged.

mockLog.Verify(log => log.ShouldStealPartition(It.IsAny<string>()), Times.Never);
mockLog.Verify(log => log.ShouldStealPartition(It.IsAny<string>(), It.IsAny<string>()), Times.Never);
}

/// <summary>
Expand Down Expand Up @@ -597,7 +597,7 @@ public async Task RunLoadBalancingAsyncDoesNotStealWhenLessPartitionsThanProcess

// Verify that no attempts to steal were logged.

mockLog.Verify(log => log.ShouldStealPartition(It.IsAny<string>()), Times.Never);
mockLog.Verify(log => log.ShouldStealPartition(It.IsAny<string>(), It.IsAny<string>()), Times.Never);
}

/// <summary>
Expand Down Expand Up @@ -678,7 +678,7 @@ public async Task RunLoadBalancingAsyncDoesNotStealWhenTheLoadIsBalanced(int[] a

// Verify that no attempts to steal were logged.

mockLog.Verify(log => log.ShouldStealPartition(It.IsAny<string>()), Times.Never);
mockLog.Verify(log => log.ShouldStealPartition(It.IsAny<string>(), It.IsAny<string>()), Times.Never);
}

/// <summary>
Expand Down Expand Up @@ -902,14 +902,14 @@ public async Task VerifiesEventProcessorLogs()

await loadbalancer.RelinquishOwnershipAsync(CancellationToken.None);

mockLog.Verify(m => m.RenewOwnershipStart(loadbalancer.OwnerIdentifier));
mockLog.Verify(m => m.RenewOwnershipComplete(loadbalancer.OwnerIdentifier));
mockLog.Verify(m => m.ClaimOwnershipStart(It.Is<string>(p => partitionIds.Contains(p))));
mockLog.Verify(m => m.MinimumPartitionsPerEventProcessor(MinimumpartitionCount));
mockLog.Verify(m => m.CurrentOwnershipCount(MinimumpartitionCount, loadbalancer.OwnerIdentifier));
mockLog.Verify(m => m.StealPartition(It.IsAny<string>(), It.IsAny<string>(), loadbalancer.OwnerIdentifier));
mockLog.Verify(m => m.ShouldStealPartition(loadbalancer.OwnerIdentifier));
mockLog.Verify(m => m.UnclaimedPartitions(It.Is<HashSet<string>>(set => set.Count == 0 || set.All(item => partitionIds.Contains(item)))));
mockLog.Verify(m => m.RenewOwnershipStart(loadbalancer.OwnerIdentifier, loadbalancer.EventHubName));
mockLog.Verify(m => m.RenewOwnershipComplete(loadbalancer.OwnerIdentifier, loadbalancer.EventHubName));
mockLog.Verify(m => m.ClaimOwnershipStart(It.Is<string>(p => partitionIds.Contains(p)), loadbalancer.EventHubName));
mockLog.Verify(m => m.MinimumPartitionsPerEventProcessor(MinimumpartitionCount, loadbalancer.EventHubName));
mockLog.Verify(m => m.CurrentOwnershipCount(MinimumpartitionCount, loadbalancer.OwnerIdentifier, loadbalancer.EventHubName));
mockLog.Verify(m => m.StealPartition(It.IsAny<string>(), It.IsAny<string>(), loadbalancer.OwnerIdentifier, loadbalancer.EventHubName));
mockLog.Verify(m => m.ShouldStealPartition(loadbalancer.OwnerIdentifier, loadbalancer.EventHubName));
mockLog.Verify(m => m.UnclaimedPartitions(It.Is<HashSet<string>>(set => set.Count == 0 || set.All(item => partitionIds.Contains(item))), loadbalancer.EventHubName));
}

/// <summary>
Expand Down
Loading