Skip to content

Commit

Permalink
Return default when no bytes read from response streaming-no-response… (
Browse files Browse the repository at this point in the history
#3882)

* Return default when no bytes read from response streaming-no-response-body

This commit returns default(T) when no bytes are read from the response stream.
Move the stream null and seek check into JsonSerializer.

Update index exists integration test to not disable direct streaming in order to test.

* Changed formatting to indented

* do not disable direct streaming
  • Loading branch information
russcam authored and Mpdreamz committed Jun 27, 2019
1 parent 2c8ff4b commit 6ee869f
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,21 @@ public class LowLevelRequestResponseSerializer : IElasticsearchSerializer

public object Deserialize(Type type, Stream stream)
{
if (stream == null || stream.CanSeek && stream.Length == 0) return Task.FromResult(type.DefaultValue());

return JsonSerializer.NonGeneric.Deserialize(type, stream, ElasticsearchNetFormatterResolver.Instance);
}

public T Deserialize<T>(Stream stream)
{
if (stream == null || stream.CanSeek && stream.Length == 0) return default(T);

return JsonSerializer.Deserialize<T>(stream, ElasticsearchNetFormatterResolver.Instance);
}

public Task<object> DeserializeAsync(Type type, Stream stream, CancellationToken cancellationToken = default)
{
if (stream == null || stream.CanSeek && stream.Length == 0) return Task.FromResult(type.DefaultValue());

return JsonSerializer.NonGeneric.DeserializeAsync(type, stream, ElasticsearchNetFormatterResolver.Instance);
}

public Task<T> DeserializeAsync<T>(Stream stream, CancellationToken cancellationToken = default)
{
if (stream == null || stream.CanSeek && stream.Length == 0) return Task.FromResult(default(T));

return JsonSerializer.DeserializeAsync<T>(stream, ElasticsearchNetFormatterResolver.Instance);
}

Expand Down
87 changes: 60 additions & 27 deletions src/Elasticsearch.Net/Utf8Json/JsonSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Elasticsearch.Net.Utf8Json.Internal;
using Elasticsearch.Net.Utf8Json.Resolvers;

Expand Down Expand Up @@ -254,7 +255,11 @@ public static T Deserialize<T>(byte[] bytes, int offset)

public static T Deserialize<T>(byte[] bytes, int offset, IJsonFormatterResolver resolver)
{
if (resolver == null) resolver = DefaultResolver;
if (bytes == null || bytes.Length == 0)
return default;

if (resolver == null)
resolver = DefaultResolver;

var reader = new JsonReader(bytes, offset);
var formatter = resolver.GetFormatterWithVerify<T>();
Expand All @@ -281,11 +286,14 @@ public static T Deserialize<T>(Stream stream)

public static T Deserialize<T>(Stream stream, IJsonFormatterResolver resolver)
{
if (resolver == null) resolver = DefaultResolver;
if (stream == null || stream.CanSeek && stream.Length == 0)
return default;

if (resolver == null)
resolver = DefaultResolver;

#if NETSTANDARD && !NET45
var ms = stream as MemoryStream;
if (ms != null)
if (stream is MemoryStream ms)
{
if (ms.TryGetBuffer(out var buf2))
{
Expand All @@ -302,39 +310,63 @@ public static T Deserialize<T>(Stream stream, IJsonFormatterResolver resolver)
}
}
#endif
{
var buf = MemoryPool.Rent();
var poolBuf = buf;
try
{
var len = FillFromStream(stream, ref buf);
var buf = MemoryPool.Rent();
var poolBuf = buf;
try
{
var length = FillFromStream(stream, ref buf);

// when token is number, can not use from pool(can not find end line).
var token = new JsonReader(buf).GetCurrentJsonToken();
if (token == JsonToken.Number)
{
buf = BinaryUtil.FastCloneWithResize(buf, len);
}
if (length == 0)
return default;

return Deserialize<T>(buf, resolver);
}
finally
// when token is number, can not use from pool(can not find end line).
var token = new JsonReader(buf).GetCurrentJsonToken();
if (token == JsonToken.Number)
{
MemoryPool.Return(poolBuf);
buf = BinaryUtil.FastCloneWithResize(buf, length);
}
}

return Deserialize<T>(buf, resolver);
}
finally
{
MemoryPool.Return(poolBuf);
}
}

#if NETSTANDARD

public static System.Threading.Tasks.Task<T> DeserializeAsync<T>(Stream stream)
public static Task<T> DeserializeAsync<T>(Stream stream)
{
return DeserializeAsync<T>(stream, defaultResolver);
}

public static async System.Threading.Tasks.Task<T> DeserializeAsync<T>(Stream stream, IJsonFormatterResolver resolver)
public static async Task<T> DeserializeAsync<T>(Stream stream, IJsonFormatterResolver resolver)
{
if (resolver == null) resolver = DefaultResolver;
if (stream == null || stream.CanSeek && stream.Length == 0)
return default;

if (resolver == null)
resolver = DefaultResolver;

#if NETSTANDARD && !NET45
if (stream is MemoryStream ms)
{
if (ms.TryGetBuffer(out var buf2))
{
// when token is number, can not use from pool(can not find end line).
var token = new JsonReader(buf2.Array, buf2.Offset).GetCurrentJsonToken();
if (token == JsonToken.Number)
{
var buf3 = new byte[buf2.Count];
Buffer.BlockCopy(buf2.Array, buf2.Offset, buf3, 0, buf3.Length);
return Deserialize<T>(buf3, 0, resolver);
}

return Deserialize<T>(buf2.Array, buf2.Offset, resolver);
}
}
#endif

var buffer = MemoryPool.Rent();
var buf = buffer;
Expand All @@ -346,11 +378,12 @@ public static async System.Threading.Tasks.Task<T> DeserializeAsync<T>(Stream st
{
length += read;
if (length == buf.Length)
{
BinaryUtil.FastResize(ref buf, length * 2);
}
BinaryUtil.FastResize(ref buf, length * 2);
}

if (length == 0)
return default;

// when token is number, can not use from pool(can not find end line).
var token = new JsonReader(buf).GetCurrentJsonToken();
if (token == JsonToken.Number)
Expand Down
11 changes: 0 additions & 11 deletions src/Nest/CommonAbstractions/Extensions/TypeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,6 @@ internal static object CreateInstance(this Type t, params object[] args)
return activator(args);
}

internal static object DefaultValue(this Type type) =>
type.IsValueType
? CachedDefaultValues.GetOrAdd(type, t =>
Expression.Lambda<Func<object>>(
Expression.Convert(Expression.Default(type), typeof(object))
)
.Compile()
)
.Invoke()
: null;

//do not remove this is referenced through GetActivatorMethod
internal static ObjectActivator<T> GetActivator<T>(ConstructorInfo ctor)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,28 @@ internal class DefaultHighLevelSerializer : IElasticsearchSerializer, IInternalS

public T Deserialize<T>(Stream stream)
{
if (stream == null || stream.CanSeek && stream.Length == 0) return default;

return JsonSerializer.Deserialize<T>(stream, FormatterResolver);
}

public object Deserialize(Type type, Stream stream)
{
if (stream == null || stream.CanSeek && stream.Length == 0) return type.DefaultValue();

return JsonSerializer.NonGeneric.Deserialize(type, stream, FormatterResolver);
}

public Task<T> DeserializeAsync<T>(Stream stream, CancellationToken cancellationToken = default)
{
if (stream == null || stream.CanSeek && stream.Length == 0) return Task.FromResult(default(T));

return JsonSerializer.DeserializeAsync<T>(stream, FormatterResolver);
}

public Task<object> DeserializeAsync(Type type, Stream stream, CancellationToken cancellationToken = default)
{
if (stream == null || stream.CanSeek && stream.Length == 0) return Task.FromResult(type.DefaultValue());

return JsonSerializer.NonGeneric.DeserializeAsync(type, stream, FormatterResolver);
}

public virtual void Serialize<T>(T data, Stream writableStream, SerializationFormatting formatting = SerializationFormatting.Indented) =>
public virtual void Serialize<T>(T data, Stream writableStream, SerializationFormatting formatting = SerializationFormatting.None) =>
JsonSerializer.Serialize(writableStream, data, FormatterResolver);

public Task SerializeAsync<T>(T data, Stream stream, SerializationFormatting formatting = SerializationFormatting.Indented,
public Task SerializeAsync<T>(T data, Stream stream, SerializationFormatting formatting = SerializationFormatting.None,
CancellationToken cancellationToken = default
) => JsonSerializer.SerializeAsync(stream, data, FormatterResolver);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ namespace Tests.Core.ManagedElasticsearch.Clusters
{
public abstract class ClientTestClusterBase : XunitClusterBase<ClientTestClusterConfiguration>, INestTestCluster
{
public ClientTestClusterBase() : this(new ClientTestClusterConfiguration()) { }
protected ClientTestClusterBase() : this(new ClientTestClusterConfiguration()) { }

public ClientTestClusterBase(params ElasticsearchPlugin[] plugins) : this(new ClientTestClusterConfiguration(plugins)) { }
protected ClientTestClusterBase(params ElasticsearchPlugin[] plugins) : this(new ClientTestClusterConfiguration(plugins)) { }

public ClientTestClusterBase(ClientTestClusterConfiguration configuration) : base(configuration) { }
protected ClientTestClusterBase(ClientTestClusterConfiguration configuration) : base(configuration) { }

public IElasticClient Client => this.GetOrAddClient(s => ConnectionSettings(s.ApplyDomainSettings()));

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Elasticsearch.Net;
using FluentAssertions;
using Nest;
using Tests.Core.Client.Settings;
using Tests.Core.ManagedElasticsearch.Clusters;
using Tests.Domain;
using Tests.Framework.EndpointTests;
Expand Down Expand Up @@ -31,6 +32,7 @@ protected override LazyResponses ClientUsage() => Calls(
protected override void ExpectResponse(ExistsResponse response) => response.Exists.Should().BeTrue();
}

// DisableDirectStreaming = false so that response stream is not seekable
public class IndexNotExistsApiTests
: ApiIntegrationTestBase<ReadOnlyCluster, ExistsResponse, IIndexExistsRequest, IndexExistsDescriptor, IndexExistsRequest>
{
Expand All @@ -42,12 +44,18 @@ public IndexNotExistsApiTests(ReadOnlyCluster cluster, EndpointUsage usage) : ba
protected override int ExpectStatusCode => 404;
protected override HttpMethod HttpMethod => HttpMethod.HEAD;

protected override IndexExistsRequest Initializer => new IndexExistsRequest(NonExistentIndex);
protected override IndexExistsRequest Initializer => new IndexExistsRequest(NonExistentIndex)
{
RequestConfiguration = new RequestConfiguration
{
DisableDirectStreaming = false
}
};
protected override string UrlPath => $"/{NonExistentIndex}";

protected override LazyResponses ClientUsage() => Calls(
(client, f) => client.Indices.Exists(NonExistentIndex),
(client, f) => client.Indices.ExistsAsync(NonExistentIndex),
(client, f) => client.Indices.Exists(NonExistentIndex, r => r.RequestConfiguration(c => c.DisableDirectStreaming(false))),
(client, f) => client.Indices.ExistsAsync(NonExistentIndex,r => r.RequestConfiguration(c => c.DisableDirectStreaming(false))),
(client, r) => client.Indices.Exists(r),
(client, r) => client.Indices.ExistsAsync(r)
);
Expand Down

0 comments on commit 6ee869f

Please sign in to comment.