Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proof-of-concept serialization of advanced JSON types, records #1073

Merged
merged 5 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/Dapr.Actors/Client/ActorProxyFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace Dapr.Actors.Client
using System;
using System.Net.Http;
using Dapr.Actors.Builder;
using Dapr.Actors.Communication;
using Dapr.Actors.Communication.Client;

/// <summary>
Expand Down Expand Up @@ -79,7 +80,15 @@ public object CreateActorProxy(ActorId actorId, Type actorInterfaceType, string
options ??= this.DefaultOptions;

var daprInteractor = new DaprHttpInteractor(this.handler, options.HttpEndpoint, options.DaprApiToken, options.RequestTimeout);
var remotingClient = new ActorRemotingClient(daprInteractor);

// provide a serializer if 'useJsonSerialization' is true and no serialization provider is provided.
IActorMessageBodySerializationProvider serializationProvider = null;
if (options.UseJsonSerialization)
{
serializationProvider = new ActorMessageBodyJsonSerializationProvider(options.JsonSerializerOptions);
}

var remotingClient = new ActorRemotingClient(daprInteractor, serializationProvider);
var proxyGenerator = ActorCodeBuilder.GetOrCreateProxyGenerator(actorInterfaceType);
var actorProxy = proxyGenerator.CreateActorProxy();
actorProxy.Initialize(remotingClient, actorId, actorType, options);
Expand Down
5 changes: 5 additions & 0 deletions src/Dapr.Actors/Client/ActorProxyOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,10 @@ public JsonSerializerOptions JsonSerializerOptions
/// The timeout allowed for an actor request. Can be set to System.Threading.Timeout.InfiniteTimeSpan to disable any timeouts.
/// </summary>
public TimeSpan? RequestTimeout { get; set; } = null;

/// <summary>
/// Enable JSON serialization for actor proxy message serialization in both remoting and non-remoting invocations.
/// </summary>
public bool UseJsonSerialization { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace Dapr.Actors.Communication
using System.Collections.Generic;
using System.IO;
using System.Runtime.Serialization;
using System.Threading.Tasks;
using System.Xml;

/// <summary>
Expand Down Expand Up @@ -185,21 +186,21 @@ byte[] IActorRequestMessageBodySerializer.Serialize(IActorRequestMessageBody act
return stream.ToArray();
}

IActorRequestMessageBody IActorRequestMessageBodySerializer.Deserialize(Stream stream)
ValueTask<IActorRequestMessageBody> IActorRequestMessageBodySerializer.DeserializeAsync(Stream stream)
{
if (stream == null)
{
return null;
return default;
}

if (stream.Length == 0)
{
return null;
return default;
}

stream.Position = 0;
using var reader = this.CreateXmlDictionaryReader(stream);
return (TRequest)this.serializer.ReadObject(reader);
return new ValueTask<IActorRequestMessageBody>((TRequest)this.serializer.ReadObject(reader));
}

byte[] IActorResponseMessageBodySerializer.Serialize(IActorResponseMessageBody actorResponseMessageBody)
Expand All @@ -217,11 +218,11 @@ byte[] IActorResponseMessageBodySerializer.Serialize(IActorResponseMessageBody a
return stream.ToArray();
}

IActorResponseMessageBody IActorResponseMessageBodySerializer.Deserialize(Stream messageBody)
ValueTask<IActorResponseMessageBody> IActorResponseMessageBodySerializer.DeserializeAsync(Stream messageBody)
{
if (messageBody == null)
{
return null;
return default;
}

// TODO check performance
Expand All @@ -231,11 +232,11 @@ IActorResponseMessageBody IActorResponseMessageBodySerializer.Deserialize(Stream

if (stream.Capacity == 0)
{
return null;
return default;
}

using var reader = this.CreateXmlDictionaryReader(stream);
return (TResponse)this.serializer.ReadObject(reader);
return new ValueTask<IActorResponseMessageBody>((TResponse)this.serializer.ReadObject(reader));
}

/// <summary>
Expand Down
95 changes: 95 additions & 0 deletions src/Dapr.Actors/Communication/ActorMessageBodyJsonConverter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Text.Json;
using System.Text.Json.Serialization;

namespace Dapr.Actors.Communication
{
internal class ActorMessageBodyJsonConverter<T> : JsonConverter<T>
{
private readonly List<Type> methodRequestParameterTypes;
private readonly List<Type> wrappedRequestMessageTypes;
private readonly Type wrapperMessageType;

public ActorMessageBodyJsonConverter(
List<Type> methodRequestParameterTypes,
List<Type> wrappedRequestMessageTypes = null
)
{
this.methodRequestParameterTypes = methodRequestParameterTypes;
this.wrappedRequestMessageTypes = wrappedRequestMessageTypes;

if (this.wrappedRequestMessageTypes != null && this.wrappedRequestMessageTypes.Count == 1)
{
this.wrapperMessageType = this.wrappedRequestMessageTypes[0];
}
}

public override T Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
// Ensure start-of-object, then advance
if (reader.TokenType != JsonTokenType.StartObject) throw new JsonException();
reader.Read();

// Ensure property name, then advance
if (reader.TokenType != JsonTokenType.PropertyName || reader.GetString() != "value") throw new JsonException();
reader.Read();

// If the value is null, return null.
if (reader.TokenType == JsonTokenType.Null)
{
// Read the end object token.
reader.Read();
return default;
}

// If the value is an object, deserialize it to wrapper message type
if (this.wrapperMessageType != null)
{
var value = JsonSerializer.Deserialize(ref reader, this.wrapperMessageType, options);

// Construct a new WrappedMessageBody with the deserialized value.
var wrapper = new WrappedMessageBody()
{
Value = value,
};

// Read the end object token.
reader.Read();

// Coerce the type to T; required because WrappedMessageBody inherits from two separate interfaces, which
// cannot both be used as generic constraints
return (T)(object)wrapper;
}

return JsonSerializer.Deserialize<T>(ref reader, options);
}

public override void Write(Utf8JsonWriter writer, T value, JsonSerializerOptions options)
{
writer.WriteStartObject();
writer.WritePropertyName("value");

if (value is WrappedMessageBody body)
{
JsonSerializer.Serialize(writer, body.Value, body.Value.GetType(), options);
}
else
writer.WriteNullValue();
writer.WriteEndObject();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

namespace Dapr.Actors.Communication
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Text.Json;
using System.Threading.Tasks;
using System.Xml;

/// <summary>
/// This is the implmentation for <see cref="IActorMessageBodySerializationProvider"/>used by remoting service and client during
/// request/response serialization . It uses request Wrapping and data contract for serialization.
/// </summary>
internal class ActorMessageBodyJsonSerializationProvider : IActorMessageBodySerializationProvider
{
public JsonSerializerOptions Options { get; }

/// <summary>
/// Initializes a new instance of the <see cref="ActorMessageBodyJsonSerializationProvider"/> class.
/// </summary>
public ActorMessageBodyJsonSerializationProvider(JsonSerializerOptions options)
{
Options = options;
}

/// <summary>
/// Creates a MessageFactory for Wrapped Message Json Remoting Types. This is used to create Remoting Request/Response objects.
/// </summary>
/// <returns>
/// <see cref="IActorMessageBodyFactory" /> that provides an instance of the factory for creating
/// remoting request and response message bodies.
/// </returns>
public IActorMessageBodyFactory CreateMessageBodyFactory()
{
return new WrappedRequestMessageFactory();
}

/// <summary>
/// Creates IActorRequestMessageBodySerializer for a serviceInterface using Wrapped Message Json implementation.
/// </summary>
/// <param name="serviceInterfaceType">The remoted service interface.</param>
/// <param name="methodRequestParameterTypes">The union of parameter types of all of the methods of the specified interface.</param>
/// <param name="wrappedRequestMessageTypes">Wrapped Request Types for all Methods.</param>
/// <returns>
/// An instance of the <see cref="IActorRequestMessageBodySerializer" /> that can serialize the service
/// actor request message body to a messaging body for transferring over the transport.
/// </returns>
public IActorRequestMessageBodySerializer CreateRequestMessageBodySerializer(
Type serviceInterfaceType,
IEnumerable<Type> methodRequestParameterTypes,
IEnumerable<Type> wrappedRequestMessageTypes = null)
{
return new MemoryStreamMessageBodySerializer<WrappedMessageBody, WrappedMessageBody>(Options, serviceInterfaceType, methodRequestParameterTypes, wrappedRequestMessageTypes);
}

/// <summary>
/// Creates IActorResponseMessageBodySerializer for a serviceInterface using Wrapped Message Json implementation.
/// </summary>
/// <param name="serviceInterfaceType">The remoted service interface.</param>
/// <param name="methodReturnTypes">The return types of all of the methods of the specified interface.</param>
/// <param name="wrappedResponseMessageTypes">Wrapped Response Types for all remoting methods.</param>
/// <returns>
/// An instance of the <see cref="IActorResponseMessageBodySerializer" /> that can serialize the service
/// actor response message body to a messaging body for transferring over the transport.
/// </returns>
public IActorResponseMessageBodySerializer CreateResponseMessageBodySerializer(
Type serviceInterfaceType,
IEnumerable<Type> methodReturnTypes,
IEnumerable<Type> wrappedResponseMessageTypes = null)
{
return new MemoryStreamMessageBodySerializer<WrappedMessageBody, WrappedMessageBody>(Options, serviceInterfaceType, methodReturnTypes, wrappedResponseMessageTypes);
}

/// <summary>
/// Default serializer for service remoting request and response message body that uses the
/// memory stream to create outgoing message buffers.
/// </summary>
private class MemoryStreamMessageBodySerializer<TRequest, TResponse> :
IActorRequestMessageBodySerializer,
IActorResponseMessageBodySerializer
where TRequest : IActorRequestMessageBody
where TResponse : IActorResponseMessageBody
{
private readonly JsonSerializerOptions serializerOptions;

public MemoryStreamMessageBodySerializer(
JsonSerializerOptions serializerOptions,
Type serviceInterfaceType,
IEnumerable<Type> methodRequestParameterTypes,
IEnumerable<Type> wrappedRequestMessageTypes = null)
{
var _methodRequestParameterTypes = new List<Type>(methodRequestParameterTypes);
var _wrappedRequestMessageTypes = new List<Type>(wrappedRequestMessageTypes);

this.serializerOptions = new(serializerOptions)
{
// Workaround since WrappedMessageBody creates an object
// with parameters as fields
IncludeFields = true,
};

this.serializerOptions.Converters.Add(new ActorMessageBodyJsonConverter<TRequest>(_methodRequestParameterTypes, _wrappedRequestMessageTypes));
this.serializerOptions.Converters.Add(new ActorMessageBodyJsonConverter<TResponse>(_methodRequestParameterTypes, _wrappedRequestMessageTypes));
}

byte[] IActorRequestMessageBodySerializer.Serialize(IActorRequestMessageBody actorRequestMessageBody)
{
if (actorRequestMessageBody == null)
{
return null;
}

return JsonSerializer.SerializeToUtf8Bytes<object>(actorRequestMessageBody, this.serializerOptions);
}

async ValueTask<IActorRequestMessageBody> IActorRequestMessageBodySerializer.DeserializeAsync(Stream stream)
{
if (stream == null)
{
return default;
}

if (stream.Length == 0)
{
return default;
}

stream.Position = 0;
return await JsonSerializer.DeserializeAsync<TRequest>(stream, this.serializerOptions);
}

byte[] IActorResponseMessageBodySerializer.Serialize(IActorResponseMessageBody actorResponseMessageBody)
{
if (actorResponseMessageBody == null)
{
return null;
}

return JsonSerializer.SerializeToUtf8Bytes<object>(actorResponseMessageBody, this.serializerOptions);
}

async ValueTask<IActorResponseMessageBody> IActorResponseMessageBodySerializer.DeserializeAsync(Stream messageBody)
{
if (messageBody == null)
{
return null;
}

using var stream = new MemoryStream();
messageBody.CopyTo(stream);
stream.Position = 0;

if (stream.Capacity == 0)
{
return null;
}

return await JsonSerializer.DeserializeAsync<TResponse>(stream, this.serializerOptions);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
namespace Dapr.Actors.Communication
{
using System.IO;
using System.Threading.Tasks;

/// <summary>
/// Defines the interface that must be implemented to provide a serializer/deserializer for remoting request message body.
Expand All @@ -32,6 +33,6 @@ internal interface IActorRequestMessageBodySerializer
/// </summary>
/// <param name="messageBody">Serialized message body.</param>
/// <returns>Deserialized remoting request message body object.</returns>
IActorRequestMessageBody Deserialize(Stream messageBody);
ValueTask<IActorRequestMessageBody> DeserializeAsync(Stream messageBody);
}
}
Loading