Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDK] Circular buffer tweaks + cpu pressure test #3349

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 27 additions & 23 deletions src/OpenTelemetry/Internal/CircularBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
// limitations under the License.
// </copyright>

#nullable enable

using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;

Expand All @@ -26,7 +29,7 @@ namespace OpenTelemetry.Internal
internal class CircularBuffer<T>
where T : class
{
private readonly T[] trait;
private readonly T?[] trait;
private long head;
private long tail;

Expand Down Expand Up @@ -54,20 +57,20 @@ public int Count
{
get
{
var tailSnapshot = this.tail;
return (int)(this.head - tailSnapshot);
var tailSnapshot = Volatile.Read(ref this.tail);
return (int)(Volatile.Read(ref this.head) - tailSnapshot);
}
}

/// <summary>
/// Gets the number of items added to the <see cref="CircularBuffer{T}"/>.
/// </summary>
public long AddedCount => this.head;
public long AddedCount => Volatile.Read(ref this.head);

/// <summary>
/// Gets the number of items removed from the <see cref="CircularBuffer{T}"/>.
/// </summary>
public long RemovedCount => this.tail;
public long RemovedCount => Volatile.Read(ref this.tail);

/// <summary>
/// Adds the specified item to the buffer.
Expand All @@ -83,22 +86,23 @@ public bool Add(T value)

while (true)
{
var tailSnapshot = this.tail;
var headSnapshot = this.head;
var tailSnapshot = Volatile.Read(ref this.tail);
var headSnapshot = Volatile.Read(ref this.head);
CodeBlanch marked this conversation as resolved.
Show resolved Hide resolved

if (headSnapshot - tailSnapshot >= this.Capacity)
{
return false; // buffer is full
}

var head = Interlocked.CompareExchange(ref this.head, headSnapshot + 1, headSnapshot);
if (head != headSnapshot)
if (Interlocked.CompareExchange(ref this.head, headSnapshot + 1, headSnapshot) != headSnapshot)
{
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be useful to add Spinwait.SpinOnce() here before continue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defer to @reyang on this one, but I don't think so. This thread isn't so much waiting on another thread to finish as it is learning that some other thread took the head. It should retry immediately and just take the next head/index available.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SpinOnce might be more smart yielding if singlecore etc?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a yield here would be even worse than a spin 😄 Because it doesn't need to wait on anything. If there is only 1 core this thread should just loop around, take the next index, and continue on doing its thing. Same as if there were many cores, really.

There should probably be a SpinOnce here though:

Because that logic is actually waiting on the writer thread to finish. On single core, it should yield immediately because a spin won't accomplish anything other than delay letting the writer get the CPU back to finish its job 🤣

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a yield here would be even worse than a spin

+1

}

var index = (int)(head % this.Capacity);
this.trait[index] = value;
var previous = Interlocked.Exchange(ref this.trait[headSnapshot % this.Capacity], value);

Debug.Assert(previous == null, "Race: Another thread wrote to index.");

return true;
}
}
Expand All @@ -125,16 +129,15 @@ public bool TryAdd(T value, int maxSpinCount)

while (true)
{
var tailSnapshot = this.tail;
var headSnapshot = this.head;
var tailSnapshot = Volatile.Read(ref this.tail);
var headSnapshot = Volatile.Read(ref this.head);

if (headSnapshot - tailSnapshot >= this.Capacity)
{
return false; // buffer is full
}

var head = Interlocked.CompareExchange(ref this.head, headSnapshot + 1, headSnapshot);
if (head != headSnapshot)
if (Interlocked.CompareExchange(ref this.head, headSnapshot + 1, headSnapshot) != headSnapshot)
{
if (spinCountDown-- == 0)
{
Expand All @@ -144,8 +147,10 @@ public bool TryAdd(T value, int maxSpinCount)
continue;
}

var index = (int)(head % this.Capacity);
this.trait[index] = value;
var previous = Interlocked.Exchange(ref this.trait[headSnapshot % this.Capacity], value);
CodeBlanch marked this conversation as resolved.
Show resolved Hide resolved

Debug.Assert(previous == null, "Race: Another thread wrote to index.");

return true;
}
}
Expand All @@ -161,19 +166,18 @@ public bool TryAdd(T value, int maxSpinCount)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public T Read()
{
var index = (int)(this.tail % this.Capacity);
var index = (int)(Volatile.Read(ref this.tail) % this.Capacity);
while (true)
{
var value = this.trait[index];
if (value == null)
var previous = Interlocked.Exchange(ref this.trait[index], null);
if (previous == null)
{
// If we got here it means a writer isn't done.
continue;
}

this.trait[index] = null;
this.tail++;
return value;
Interlocked.Increment(ref this.tail);
return previous;
}
}
}
Expand Down
64 changes: 64 additions & 0 deletions test/OpenTelemetry.Tests/Internal/CircularBufferTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
// </copyright>

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Xunit;

namespace OpenTelemetry.Internal.Tests
Expand Down Expand Up @@ -109,5 +112,66 @@ public void CheckAddedCountAndCount()
Assert.Equal(1, circularBuffer.RemovedCount);
Assert.Equal(1, circularBuffer.Count);
}

[Fact]
public async Task CpuPressureTest()
reyang marked this conversation as resolved.
Show resolved Hide resolved
{
if (Environment.ProcessorCount < 2)
{
return;
}

var circularBuffer = new CircularBuffer<string>(2048);

List<Task> tasks = new();

int numberOfItemsPerWorker = 100_000;

for (int i = 0; i < Environment.ProcessorCount; i++)
{
int tid = i;

tasks.Add(Task.Run(async () =>
{
await Task.Delay(2000).ConfigureAwait(false);

if (tid == 0)
{
for (int i = 0; i < numberOfItemsPerWorker * (Environment.ProcessorCount - 1); i++)
{
SpinWait wait = default;
while (true)
{
if (circularBuffer.Count > 0)
{
circularBuffer.Read();
break;
}

wait.SpinOnce();
}
}
}
else
{
for (int i = 0; i < numberOfItemsPerWorker; i++)
{
SpinWait wait = default;
while (true)
{
if (circularBuffer.Add("item"))
{
break;
}

wait.SpinOnce();
}
}
}
}));
}

await Task.WhenAll(tasks).ConfigureAwait(false);
}
}
}