Skip to content

Commit

Permalink
Realtime: complete port of fixes and tests for sendinputaudio behavior (
Browse files Browse the repository at this point in the history
#333)

* realtime: complete port of fixes and tests for sendinputaudio behavior

* run-checks.ps1
  • Loading branch information
trrwilson authored Dec 3, 2024
1 parent 597d56f commit 1cf176b
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ public partial class RealtimeConversationSession : IDisposable
private readonly RealtimeConversationClient _parentClient;
private readonly Uri _endpoint;
private readonly ApiKeyCredential _credential;
private readonly object _sendingAudioLock = new();
private bool _isSendingAudio = false;
private readonly SemaphoreSlim _audioSendSemaphore = new(1, 1);
private bool _isSendingAudioStream = false;

internal bool ShouldBufferTurnResponseData { get; set; }

Expand All @@ -47,13 +47,13 @@ protected internal RealtimeConversationSession(
public virtual async Task SendInputAudioAsync(Stream audio, CancellationToken cancellationToken = default)
{
Argument.AssertNotNull(audio, nameof(audio));
lock (_sendingAudioLock)
using (await _audioSendSemaphore.AutoReleaseWaitAsync(cancellationToken).ConfigureAwait(false))
{
if (_isSendingAudio)
if (_isSendingAudioStream)
{
throw new InvalidOperationException($"Only one stream of audio may be sent at once.");
}
_isSendingAudio = true;
_isSendingAudioStream = true;
}
try
{
Expand All @@ -75,23 +75,23 @@ public virtual async Task SendInputAudioAsync(Stream audio, CancellationToken ca
}
finally
{
lock (_sendingAudioLock)
using (await _audioSendSemaphore.AutoReleaseWaitAsync(cancellationToken).ConfigureAwait(false))
{
_isSendingAudio = false;
_isSendingAudioStream = false;
}
}
}

public virtual void SendInputAudio(Stream audio, CancellationToken cancellationToken = default)
{
Argument.AssertNotNull(audio, nameof(audio));
lock (_sendingAudioLock)
using (_audioSendSemaphore.AutoReleaseWait(cancellationToken))
{
if (_isSendingAudio)
if (_isSendingAudioStream)
{
throw new InvalidOperationException($"Only one stream of audio may be sent at once.");
}
_isSendingAudio = true;
_isSendingAudioStream = true;
}
try
{
Expand All @@ -113,9 +113,9 @@ public virtual void SendInputAudio(Stream audio, CancellationToken cancellationT
}
finally
{
lock (_sendingAudioLock)
using (_audioSendSemaphore.AutoReleaseWait(cancellationToken))
{
_isSendingAudio = false;
_isSendingAudioStream = false;
}
}
}
Expand All @@ -130,18 +130,17 @@ public virtual void SendInputAudio(Stream audio, CancellationToken cancellationT
public virtual async Task SendInputAudioAsync(BinaryData audio, CancellationToken cancellationToken = default)
{
Argument.AssertNotNull(audio, nameof(audio));
lock (_sendingAudioLock)
using (await _audioSendSemaphore.AutoReleaseWaitAsync(cancellationToken).ConfigureAwait(false))
{
if (_isSendingAudio)
if (_isSendingAudioStream)
{
throw new InvalidOperationException($"Cannot send a standalone audio chunk while a stream is already in progress.");
}
_isSendingAudio = true;
// TODO: consider automatically limiting/breaking size of chunk (as with streaming)
InternalRealtimeClientEventInputAudioBufferAppend internalCommand = new(audio);
BinaryData requestData = ModelReaderWriter.Write(internalCommand);
await SendCommandAsync(requestData, cancellationToken.ToRequestOptions()).ConfigureAwait(false);
}
// TODO: consider automatically limiting/breaking size of chunk (as with streaming)
InternalRealtimeClientEventInputAudioBufferAppend internalCommand = new(audio);
BinaryData requestData = ModelReaderWriter.Write(internalCommand);
await SendCommandAsync(requestData, cancellationToken.ToRequestOptions()).ConfigureAwait(false);
}

/// <summary>
Expand All @@ -154,18 +153,17 @@ public virtual async Task SendInputAudioAsync(BinaryData audio, CancellationToke
public virtual void SendInputAudio(BinaryData audio, CancellationToken cancellationToken = default)
{
Argument.AssertNotNull(audio, nameof(audio));
lock (_sendingAudioLock)
using (_audioSendSemaphore.AutoReleaseWait(cancellationToken))
{
if (_isSendingAudio)
if (_isSendingAudioStream)
{
throw new InvalidOperationException($"Cannot send a standalone audio chunk while a stream is already in progress.");
}
_isSendingAudio = true;
// TODO: consider automatically limiting/breaking size of chunk (as with streaming)
InternalRealtimeClientEventInputAudioBufferAppend internalCommand = new(audio);
BinaryData requestData = ModelReaderWriter.Write(internalCommand);
SendCommand(requestData, cancellationToken.ToRequestOptions());
}
// TODO: consider automatically limiting/breaking size of chunk (as with streaming)
InternalRealtimeClientEventInputAudioBufferAppend internalCommand = new(audio);
BinaryData requestData = ModelReaderWriter.Write(internalCommand);
SendCommand(requestData, cancellationToken.ToRequestOptions());
}

public virtual async Task ClearInputAudioAsync(CancellationToken cancellationToken = default)
Expand Down
58 changes: 58 additions & 0 deletions .dotnet/src/Utility/SemaphoreSlimExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
using System;
using System.Diagnostics.Contracts;
using System.Threading;
using System.Threading.Tasks;

namespace OpenAI;

internal static class SemaphoreSlimExtensions
{
public static async Task<IDisposable> AutoReleaseWaitAsync(
this SemaphoreSlim semaphore,
CancellationToken cancellationToken = default)
{
Contract.Requires(semaphore != null);
var wrapper = new ReleaseableSemaphoreSlimWrapper(semaphore);
await semaphore.WaitAsync(cancellationToken);
return wrapper;
}

public static IDisposable AutoReleaseWait(
this SemaphoreSlim semaphore,
CancellationToken cancellationToken = default)
{
Contract.Requires(semaphore != null);
var wrapper = new ReleaseableSemaphoreSlimWrapper(semaphore);
semaphore.Wait(cancellationToken);
return wrapper;
}

private class ReleaseableSemaphoreSlimWrapper
: IDisposable
{
private readonly SemaphoreSlim semaphore;
private bool alreadyDisposed = false;

public ReleaseableSemaphoreSlimWrapper(SemaphoreSlim semaphore)
=> this.semaphore = semaphore;

public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}

protected void Dispose(bool disposeActuallyCalled)
{
if (!this.alreadyDisposed)
{
if (disposeActuallyCalled)
{
this.semaphore?.Release();
}

this.alreadyDisposed = true;
}
}
}
}
109 changes: 106 additions & 3 deletions .dotnet/tests/RealtimeConversation/ConversationTests.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
using NUnit.Framework;
using OpenAI.RealtimeConversation;
using System;
using System.Buffers;
using System.ClientModel;
using System.ClientModel.Primitives;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Numerics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace OpenAI.Tests.Conversation;
Expand Down Expand Up @@ -239,7 +241,47 @@ await session.AddItemAsync(
}

[Test]
public async Task AudioWithToolsWorks()
public async Task AudioStreamConvenienceBlocksCorrectly()
{
RealtimeConversationClient client = GetTestClient();
using RealtimeConversationSession session = await client.StartConversationSessionAsync(CancellationToken);

string inputAudioFilePath = Path.Join("Assets", "realtime_whats_the_weather_pcm16_24khz_mono.wav");
using TestDelayedFileReadStream delayedStream = new(inputAudioFilePath, TimeSpan.FromMilliseconds(200), readsBeforeDelay: 2);
_ = session.SendInputAudioAsync(delayedStream, CancellationToken);

bool gotSpeechStarted = false;

await foreach (ConversationUpdate update in session.ReceiveUpdatesAsync(CancellationToken))
{
if (update is ConversationInputSpeechStartedUpdate)
{
gotSpeechStarted = true;
Assert.ThrowsAsync<InvalidOperationException>(
async () =>
{
using MemoryStream dummyStream = new();
await session.SendInputAudioAsync(dummyStream, CancellationToken);
},
"Sending a Stream while another Stream is being sent should throw!");
Assert.ThrowsAsync<InvalidOperationException>(
async () =>
{
BinaryData dummyData = BinaryData.FromString("hello, world! this isn't audio.");
await session.SendInputAudioAsync(dummyData, CancellationToken);
},
"Sending BinaryData while a Stream is being sent should throw!");
break;
}
}

Assert.That(gotSpeechStarted, Is.True);
}

[Test]
[TestCase(TestAudioSendType.WithAudioStreamHelper)]
[TestCase(TestAudioSendType.WithManualAudioChunks)]
public async Task AudioWithToolsWorks(TestAudioSendType audioSendType)
{
RealtimeConversationClient client = GetTestClient();
using RealtimeConversationSession session = await client.StartConversationSessionAsync(CancellationToken);
Expand Down Expand Up @@ -285,8 +327,27 @@ public async Task AudioWithToolsWorks()

await session.ConfigureSessionAsync(options, CancellationToken);

using Stream audioStream = File.OpenRead(Path.Join("Assets", "realtime_whats_the_weather_pcm16_24khz_mono.wav"));
_ = session.SendInputAudioAsync(audioStream, CancellationToken);
_ = Task.Run(async () =>
{
string inputAudioFilePath = Path.Join("Assets", "realtime_whats_the_weather_pcm16_24khz_mono.wav");
if (audioSendType == TestAudioSendType.WithAudioStreamHelper)
{
using Stream audioStream = File.OpenRead(inputAudioFilePath);
await session.SendInputAudioAsync(audioStream, CancellationToken);
}
else if (audioSendType == TestAudioSendType.WithManualAudioChunks)
{
byte[] allAudioBytes = await File.ReadAllBytesAsync(inputAudioFilePath, CancellationToken);
const int audioSendBufferLength = 8 * 1024;
byte[] audioSendBuffer = ArrayPool<byte>.Shared.Rent(audioSendBufferLength);
for (int readPos = 0; readPos < allAudioBytes.Length; readPos += audioSendBufferLength)
{
int nextSegmentLength = Math.Min(audioSendBufferLength, allAudioBytes.Length - readPos);
ArraySegment<byte> nextSegment = new(allAudioBytes, readPos, nextSegmentLength);
await session.SendInputAudioAsync(BinaryData.FromBytes(nextSegment), CancellationToken);
}
}
});

string userTranscript = null;

Expand Down Expand Up @@ -465,4 +526,46 @@ public async Task CanAddItems()

Assert.That(itemCreatedCount, Is.EqualTo(items.Count + 1));
}

public enum TestAudioSendType
{
WithAudioStreamHelper,
WithManualAudioChunks
}

private class TestDelayedFileReadStream : FileStream
{
private readonly TimeSpan _delayBetweenReads;
private readonly int _readsBeforeDelay;
private int _readsPerformed;

public TestDelayedFileReadStream(
string path,
TimeSpan delayBetweenReads,
int readsBeforeDelay = 0)
: base(path, FileMode.Open, FileAccess.Read)
{
_delayBetweenReads = delayBetweenReads;
_readsBeforeDelay = readsBeforeDelay;
_readsPerformed = 0;
}

public override int Read(byte[] buffer, int offset, int count)
{
if (++_readsPerformed > _readsBeforeDelay)
{
System.Threading.Thread.Sleep((int)_delayBetweenReads.TotalMilliseconds);
}
return base.Read(buffer, offset, count);
}

public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (++_readsPerformed > _readsBeforeDelay)
{
await Task.Delay(_delayBetweenReads);
}
return await base.ReadAsync(buffer, offset, count, cancellationToken);
}
}
}
3 changes: 2 additions & 1 deletion .scripts/Run-Checks.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ function Run-TopLevelNamespaceCheck {
"MultipartFormDataBinaryContent.cs",
"PageCollectionHelpers.cs",
"PageEnumerator.cs",
"PageResultEnumerator.cs"
"PageResultEnumerator.cs",
"SemaphoreSlimExtensions.cs"
)

$failures = @()
Expand Down

0 comments on commit 1cf176b

Please sign in to comment.