Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure Sync stream serialization is handling IAsyncEnumerable correctly #67035

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ protected override void CreateCollection(ref Utf8JsonReader reader, ref ReadStac

internal override bool OnTryWrite(Utf8JsonWriter writer, TAsyncEnumerable value, JsonSerializerOptions options, ref WriteStack state)
{
if (!state.SupportContinuation)
if (!state.SupportAsync)
{
ThrowHelper.ThrowNotSupportedException_TypeRequiresAsyncSerialization(TypeToConvert);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public sealed override void Write(Utf8JsonWriter writer, T value, JsonSerializer
// Bridge from resumable to value converters.

WriteStack state = default;
state.Initialize(typeof(T), options, supportContinuation: false);
state.Initialize(typeof(T), options, supportContinuation: false, supportAsync: false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: supportsAsync

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, but in this case it just mimics convention for the existing supportContinuation parameter. We can rename evertyhing at a future PR.

try
{
TryWrite(writer, value, options, ref state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jsonTypeInfo is not JsonTypeInfo<TValue> ||
"Incorrect method called. WriteUsingGeneratedSerializer() should have been called instead.");

WriteStack state = default;
state.Initialize(jsonTypeInfo, supportContinuation: false);
state.Initialize(jsonTypeInfo, supportContinuation: false, supportAsync: false);

JsonConverter converter = jsonTypeInfo.PropertyInfoForTypeInfo.ConverterBase;
Debug.Assert(converter != null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ private static async Task WriteStreamAsync<TValue>(
using (var writer = new Utf8JsonWriter(bufferWriter, writerOptions))
{
WriteStack state = new WriteStack { CancellationToken = cancellationToken };
JsonConverter converter = state.Initialize(jsonTypeInfo, supportContinuation: true);
JsonConverter converter = state.Initialize(jsonTypeInfo, supportContinuation: true, supportAsync: true);

bool isFinalBlock;

Expand Down Expand Up @@ -329,7 +329,7 @@ private static void WriteStream<TValue>(
using (var writer = new Utf8JsonWriter(bufferWriter, writerOptions))
{
WriteStack state = default;
JsonConverter converter = state.Initialize(jsonTypeInfo, supportContinuation: true);
JsonConverter converter = state.Initialize(jsonTypeInfo, supportContinuation: true, supportAsync: false);

bool isFinalBlock;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ internal struct WriteStack
/// </summary>
public bool SupportContinuation;

/// <summary>
/// Internal flag indicating that async serialization is supported. Implies `SupportContinuation`.
/// </summary>
public bool SupportAsync;

/// <summary>
/// Stores a reference id that has been calculated for a newly serialized object.
/// </summary>
Expand All @@ -98,14 +103,16 @@ private void EnsurePushCapacity()
/// <summary>
/// Initialize the state without delayed initialization of the JsonTypeInfo.
/// </summary>
public JsonConverter Initialize(Type type, JsonSerializerOptions options, bool supportContinuation)
public JsonConverter Initialize(Type type, JsonSerializerOptions options, bool supportContinuation, bool supportAsync)
{
JsonTypeInfo jsonTypeInfo = options.GetOrAddJsonTypeInfoForRootType(type);
return Initialize(jsonTypeInfo, supportContinuation);
return Initialize(jsonTypeInfo, supportContinuation, supportAsync);
}

internal JsonConverter Initialize(JsonTypeInfo jsonTypeInfo, bool supportContinuation)
internal JsonConverter Initialize(JsonTypeInfo jsonTypeInfo, bool supportContinuation, bool supportAsync)
{
Debug.Assert(!supportAsync || supportContinuation, "supportAsync implies supportContinuation.");

Current.JsonTypeInfo = jsonTypeInfo;
Current.JsonPropertyInfo = jsonTypeInfo.PropertyInfoForTypeInfo;
Current.NumberHandling = Current.JsonPropertyInfo.NumberHandling;
Expand All @@ -118,6 +125,7 @@ internal JsonConverter Initialize(JsonTypeInfo jsonTypeInfo, bool supportContinu
}

SupportContinuation = supportContinuation;
SupportAsync = supportAsync;

return jsonTypeInfo.PropertyInfoForTypeInfo.ConverterBase;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public abstract partial class CollectionTests
[MemberData(nameof(GetAsyncEnumerableSources))]
public async Task WriteRootLevelAsyncEnumerable<TElement>(IEnumerable<TElement> source, int delayInterval, int bufferSize)
{
if (StreamingSerializer is null)
if (StreamingSerializer?.IsAsyncSerializer != true)
{
return;
}
Expand All @@ -43,7 +43,7 @@ public async Task WriteRootLevelAsyncEnumerable<TElement>(IEnumerable<TElement>
[MemberData(nameof(GetAsyncEnumerableSources))]
public async Task WriteNestedAsyncEnumerable<TElement>(IEnumerable<TElement> source, int delayInterval, int bufferSize)
{
if (StreamingSerializer is null)
if (StreamingSerializer?.IsAsyncSerializer != true)
{
return;
}
Expand All @@ -68,7 +68,7 @@ public async Task WriteNestedAsyncEnumerable<TElement>(IEnumerable<TElement> sou
[MemberData(nameof(GetAsyncEnumerableSources))]
public async Task WriteNestedAsyncEnumerable_DTO<TElement>(IEnumerable<TElement> source, int delayInterval, int bufferSize)
{
if (StreamingSerializer is null)
if (StreamingSerializer?.IsAsyncSerializer != true)
{
return;
}
Expand All @@ -93,7 +93,7 @@ public async Task WriteNestedAsyncEnumerable_DTO<TElement>(IEnumerable<TElement>
[MemberData(nameof(GetAsyncEnumerableSources))]
public async Task WriteNestedAsyncEnumerable_Nullable<TElement>(IEnumerable<TElement> source, int delayInterval, int bufferSize)
{
if (StreamingSerializer is null)
if (StreamingSerializer?.IsAsyncSerializer != true)
{
return;
}
Expand Down Expand Up @@ -151,7 +151,7 @@ public class AsyncEnumerableDto<TElement>
[MemberData(nameof(GetAsyncEnumerableSources))]
public async Task WriteSequentialNestedAsyncEnumerables<TElement>(IEnumerable<TElement> source, int delayInterval, int bufferSize)
{
if (StreamingSerializer is null)
if (StreamingSerializer?.IsAsyncSerializer != true)
{
return;
}
Expand All @@ -176,7 +176,7 @@ public async Task WriteSequentialNestedAsyncEnumerables<TElement>(IEnumerable<TE
[MemberData(nameof(GetAsyncEnumerableSources))]
public async Task WriteAsyncEnumerableOfAsyncEnumerables<TElement>(IEnumerable<TElement> source, int delayInterval, int bufferSize)
{
if (StreamingSerializer is null)
if (StreamingSerializer?.IsAsyncSerializer != true)
{
return;
}
Expand Down Expand Up @@ -209,19 +209,21 @@ public void WriteRootLevelAsyncEnumerableSync_ThrowsNotSupportedException()
{
IAsyncEnumerable<int> asyncEnumerable = new MockedAsyncEnumerable<int>(Enumerable.Range(1, 10));
Assert.Throws<NotSupportedException>(() => JsonSerializer.Serialize(asyncEnumerable));
Assert.Throws<NotSupportedException>(() => JsonSerializer.Serialize(new MemoryStream(), asyncEnumerable));
}

[Fact]
public void WriteNestedAsyncEnumerableSync_ThrowsNotSupportedException()
{
IAsyncEnumerable<int> asyncEnumerable = new MockedAsyncEnumerable<int>(Enumerable.Range(1, 10));
Assert.Throws<NotSupportedException>(() => JsonSerializer.Serialize(new { Data = asyncEnumerable }));
Assert.Throws<NotSupportedException>(() => JsonSerializer.Serialize(new MemoryStream(), new { Data = asyncEnumerable }));
}

[Fact]
public async Task WriteAsyncEnumerable_ElementSerializationThrows_ShouldDisposeEnumerator()
{
if (StreamingSerializer is null)
if (StreamingSerializer?.IsAsyncSerializer != true)
{
return;
}
Expand All @@ -243,7 +245,7 @@ static IEnumerable<int> ThrowingEnumerable()
[Fact]
public async Task ReadRootLevelAsyncEnumerable()
{
if (StreamingSerializer is null)
if (StreamingSerializer?.IsAsyncSerializer != true)
{
return;
}
Expand All @@ -257,7 +259,7 @@ public async Task ReadRootLevelAsyncEnumerable()
[Fact]
public async Task ReadNestedAsyncEnumerable()
{
if (StreamingSerializer is null)
if (StreamingSerializer?.IsAsyncSerializer != true)
{
return;
}
Expand All @@ -271,7 +273,7 @@ public async Task ReadNestedAsyncEnumerable()
[Fact]
public async Task ReadAsyncEnumerableOfAsyncEnumerables()
{
if (StreamingSerializer is null)
if (StreamingSerializer?.IsAsyncSerializer != true)
{
return;
}
Expand All @@ -289,7 +291,7 @@ public async Task ReadAsyncEnumerableOfAsyncEnumerables()
[Fact]
public async Task ReadRootLevelAsyncEnumerableDerivative_ThrowsNotSupportedException()
{
if (StreamingSerializer is null)
if (StreamingSerializer?.IsAsyncSerializer != true)
{
return;
}
Expand All @@ -314,7 +316,7 @@ public static IEnumerable<object[]> GetAsyncEnumerableSources()
[Fact]
public async Task RegressionTest_DisposingEnumeratorOnPendingMoveNextAsyncOperation()
{
if (StreamingSerializer is null)
if (StreamingSerializer?.IsAsyncSerializer != true)
{
return;
}
Expand All @@ -338,7 +340,7 @@ static async IAsyncEnumerable<int> GetNumbersAsync()
[Fact]
public async Task RegressionTest_ExceptionOnFirstMoveNextShouldNotFlushBuffer()
{
if (StreamingSerializer is null)
if (StreamingSerializer?.IsAsyncSerializer != true)
{
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public abstract partial class StreamingJsonSerializerWrapper : JsonSerializerWra
/// <summary>
/// True if the serializer is streaming data synchronously.
/// </summary>
public virtual bool IsBlockingSerializer => false;
public abstract bool IsAsyncSerializer { get; }

public abstract Task SerializeWrapper(Stream stream, object value, Type inputType, JsonSerializerOptions? options = null);
public abstract Task SerializeWrapper<T>(Stream stream, T value, JsonSerializerOptions? options = null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ internal sealed class AsyncStreamSerializerWrapper : StreamingJsonSerializerWrap
private readonly JsonSerializerContext _defaultContext;
private readonly Func<JsonSerializerOptions, JsonSerializerContext> _customContextCreator;

public override bool IsAsyncSerializer => true;

public AsyncStreamSerializerWrapper(JsonSerializerContext defaultContext!!, Func<JsonSerializerOptions, JsonSerializerContext> customContextCreator!!)
{
_defaultContext = defaultContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ public sealed partial class CollectionTestsDynamic_AsyncStream : CollectionTests
public CollectionTestsDynamic_AsyncStream() : base(JsonSerializerWrapper.AsyncStreamSerializer) { }
}

[ActiveIssue("https://github.com/dotnet/runtime/issues/66687")]
public sealed partial class CollectionTestsDynamic_AsyncStreamWithSmallBuffer : CollectionTests
{
public CollectionTestsDynamic_AsyncStreamWithSmallBuffer() : base(JsonSerializerWrapper.AsyncStreamSerializerWithSmallBuffer) { }
}

public sealed partial class CollectionTestsDynamic_SyncStream : CollectionTests
{
public CollectionTestsDynamic_SyncStream() : base(JsonSerializerWrapper.SyncStreamSerializer) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ private class AsyncStreamSerializerWrapper : StreamingJsonSerializerWrapper
{
private readonly bool _forceSmallBufferInOptions;

public override bool IsAsyncSerializer => true;

public AsyncStreamSerializerWrapper(bool forceSmallBufferInOptions = false)
{
_forceSmallBufferInOptions = forceSmallBufferInOptions;
Expand Down Expand Up @@ -183,7 +185,7 @@ public SyncStreamSerializerWrapper(bool forceSmallBufferInOptions = false)
private JsonSerializerOptions? ResolveOptionsInstance(JsonSerializerOptions? options)
=> _forceSmallBufferInOptions ? JsonSerializerOptionsSmallBufferMapper.ResolveOptionsInstanceWithSmallBuffer(options) : options;

public override bool IsBlockingSerializer => true;
public override bool IsAsyncSerializer => false;

public override Task SerializeWrapper<T>(Stream utf8Json, T value, JsonSerializerOptions options = null)
{
Expand Down