Skip to content

Commit

Permalink
Refactor CosmosDBConverter to remove use reflection (#1924)
Browse files Browse the repository at this point in the history
Co-authored-by: Jacob Viau <[email protected]>
  • Loading branch information
liliankasem and jviau authored Dec 6, 2023
1 parent 7bba67b commit 3013ff0
Show file tree
Hide file tree
Showing 13 changed files with 600 additions and 137 deletions.
7 changes: 7 additions & 0 deletions DotNetWorker.sln
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DependentAssemblyWithFuncti
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Worker.Extensions.Http.AspNetCore.Tests", "test\extensions\Worker.Extensions.Http.AspNetCore.Tests\Worker.Extensions.Http.AspNetCore.Tests.csproj", "{D8E79B53-9A44-46CC-9D7A-E9494FC8CAF4}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Worker.Extensions.Shared.Tests", "test\Worker.Extensions.Shared.Tests\Worker.Extensions.Shared.Tests.csproj", "{B6342174-5436-4846-B43C-39D8E34AE5CF}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Worker.Extensions.Http.AspNetCore.Analyzers", "extensions\Worker.Extensions.Http.AspNetCore.Analyzers\Worker.Extensions.Http.AspNetCore.Analyzers.csproj", "{7B6C2920-7A02-43B2-8DA0-7B76B9FAFC6B}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DependentAssemblyWithFunctions.NetStandard", "test\DependentAssemblyWithFunctions.NetStandard\DependentAssemblyWithFunctions.NetStandard.csproj", "{198DA072-F94F-4585-A744-97B3DAC21686}"
Expand Down Expand Up @@ -342,6 +344,10 @@ Global
{D8E79B53-9A44-46CC-9D7A-E9494FC8CAF4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D8E79B53-9A44-46CC-9D7A-E9494FC8CAF4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D8E79B53-9A44-46CC-9D7A-E9494FC8CAF4}.Release|Any CPU.Build.0 = Release|Any CPU
{B6342174-5436-4846-B43C-39D8E34AE5CF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B6342174-5436-4846-B43C-39D8E34AE5CF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B6342174-5436-4846-B43C-39D8E34AE5CF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B6342174-5436-4846-B43C-39D8E34AE5CF}.Release|Any CPU.Build.0 = Release|Any CPU
{7B6C2920-7A02-43B2-8DA0-7B76B9FAFC6B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7B6C2920-7A02-43B2-8DA0-7B76B9FAFC6B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7B6C2920-7A02-43B2-8DA0-7B76B9FAFC6B}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -409,6 +415,7 @@ Global
{D2F67410-9933-42E8-B04A-E17634D83A30} = {9D6603BD-7EA2-4D11-A69C-0D9E01317FD6}
{AB6E1E7A-0D2C-4086-9487-566887C1E753} = {B5821230-6E0A-4535-88A9-ED31B6F07596}
{D8E79B53-9A44-46CC-9D7A-E9494FC8CAF4} = {AA4D318D-101B-49E7-A4EC-B34165AEDB33}
{B6342174-5436-4846-B43C-39D8E34AE5CF} = {FD7243E4-BF18-43F8-8744-BA1D17ACF378}
{7B6C2920-7A02-43B2-8DA0-7B76B9FAFC6B} = {A7B4FF1E-3DF7-4F28-9333-D0961CDDF702}
{198DA072-F94F-4585-A744-97B3DAC21686} = {B5821230-6E0A-4535-88A9-ED31B6F07596}
EndGlobalSection
Expand Down
1 change: 1 addition & 0 deletions extensions/Worker.Extensions.CosmosDB/release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@

- Updated `Microsoft.Azure.WebJobs.Extensions.CosmosDB` to 4.4.0
- Release notes for v4.4.0 can be found [here](https://github.com/Azure/azure-webjobs-sdk-extensions/releases/tag/cosmos-v4.4.0)
- Refactor CosmosDB converter to remove reflection & improve collection support
101 changes: 52 additions & 49 deletions extensions/Worker.Extensions.CosmosDB/src/CosmosDBConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Functions.Worker.Core;
using Microsoft.Azure.Functions.Worker.Converters;
using Microsoft.Azure.Functions.Worker.Extensions.CosmosDB;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Logging;
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;
using Microsoft.Azure.Functions.Worker.Extensions;
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;

namespace Microsoft.Azure.Functions.Worker
{
Expand All @@ -25,6 +27,7 @@ internal class CosmosDBConverter : IInputConverter
{
private readonly IOptionsMonitor<CosmosDBBindingOptions> _cosmosOptions;
private readonly ILogger<CosmosDBConverter> _logger;
private static readonly JsonSerializerOptions _serializerOptions = new() { PropertyNameCaseInsensitive = true };

public CosmosDBConverter(IOptionsMonitor<CosmosDBBindingOptions> cosmosOptions, ILogger<CosmosDBConverter> logger)
{
Expand Down Expand Up @@ -85,55 +88,53 @@ private CosmosDBInputAttribute GetBindingDataContent(ModelBindingData bindingDat

private async Task<object> CreateTargetObjectAsync(Type targetType, CosmosDBInputAttribute cosmosAttribute)
{
MethodInfo createPOCOMethod;

if (targetType.GenericTypeArguments.Any())
if (CreateCosmosClient<Container>(cosmosAttribute) is not Container container)
{
targetType = targetType.GenericTypeArguments.FirstOrDefault();

createPOCOMethod = GetType()
.GetMethod(nameof(CreatePOCOCollectionAsync), BindingFlags.Instance | BindingFlags.NonPublic)
.MakeGenericMethod(targetType);
throw new InvalidOperationException($"Unable to create Cosmos container client for '{cosmosAttribute.ContainerName}'.");
}
else

if (targetType.IsCollectionType())
{
createPOCOMethod = GetType()
.GetMethod(nameof(CreatePOCOAsync), BindingFlags.Instance | BindingFlags.NonPublic)
.MakeGenericMethod(targetType);
return await ParameterBinder.BindCollectionAsync(
elementType => GetDocumentsAsync(container, cosmosAttribute, elementType), targetType);
}


var container = CreateCosmosClient<Container>(cosmosAttribute) as Container;

if (container is null)
else
{
throw new InvalidOperationException($"Unable to create Cosmos container client for '{cosmosAttribute.ContainerName}'.");
return await CreatePocoAsync(container, cosmosAttribute, targetType);
}

return await (Task<object>)createPOCOMethod.Invoke(this, new object[] { container, cosmosAttribute });
}

private async Task<object> CreatePOCOAsync<T>(Container container, CosmosDBInputAttribute cosmosAttribute)
private async Task<object> CreatePocoAsync(Container container, CosmosDBInputAttribute cosmosAttribute, Type targetType)
{
if (String.IsNullOrEmpty(cosmosAttribute.Id) || String.IsNullOrEmpty(cosmosAttribute.PartitionKey))
if (string.IsNullOrEmpty(cosmosAttribute.Id) || string.IsNullOrEmpty(cosmosAttribute.PartitionKey))
{
throw new InvalidOperationException("The 'Id' and 'PartitionKey' properties of a CosmosDB single-item input binding cannot be null or empty.");
}

ItemResponse<T> item = await container.ReadItemAsync<T>(cosmosAttribute.Id, new PartitionKey(cosmosAttribute.PartitionKey));
ResponseMessage item = await container.ReadItemStreamAsync(cosmosAttribute.Id, new PartitionKey(cosmosAttribute.PartitionKey));
item.EnsureSuccessStatusCode();
return (await JsonSerializer.DeserializeAsync(item.Content, targetType, _serializerOptions))!;
}

if (item is null || item?.StatusCode is not System.Net.HttpStatusCode.OK || item.Resource is null)
private async IAsyncEnumerable<object> GetDocumentsAsync(Container container, CosmosDBInputAttribute cosmosAttribute, Type elementType)
{
await foreach (var stream in GetDocumentsStreamAsync(container, cosmosAttribute))
{
throw new InvalidOperationException($"Unable to retrieve document with ID '{cosmosAttribute.Id}' and PartitionKey '{cosmosAttribute.PartitionKey}'");
// Cosmos returns a stream of JSON which represents a paged response. The contents are in a property called "Documents".
// Deserializing into CosmosStream<T> will extract these documents.
Type target = typeof(CosmosStream<>).MakeGenericType(elementType);
CosmosStream page = (CosmosStream)(await JsonSerializer.DeserializeAsync(stream!, target, _serializerOptions))!;
foreach (var item in page.GetDocuments())
{
yield return item;
}
}

return item.Resource;
}

private async Task<object> CreatePOCOCollectionAsync<T>(Container container, CosmosDBInputAttribute cosmosAttribute)
private async IAsyncEnumerable<Stream> GetDocumentsStreamAsync(Container container, CosmosDBInputAttribute cosmosAttribute)
{
QueryDefinition queryDefinition = null!;
if (!String.IsNullOrEmpty(cosmosAttribute.SqlQuery))
if (!string.IsNullOrEmpty(cosmosAttribute.SqlQuery))
{
queryDefinition = new QueryDefinition(cosmosAttribute.SqlQuery);
if (cosmosAttribute.SqlQueryParameters?.Count() > 0)
Expand All @@ -146,32 +147,21 @@ private async Task<object> CreatePOCOCollectionAsync<T>(Container container, Cos
}

QueryRequestOptions queryRequestOptions = new();
if (!String.IsNullOrEmpty(cosmosAttribute.PartitionKey))
if (!string.IsNullOrEmpty(cosmosAttribute.PartitionKey))
{
var partitionKey = new PartitionKey(cosmosAttribute.PartitionKey);
var partitionKey = new PartitionKey(cosmosAttribute.PartitionKey);
queryRequestOptions = new() { PartitionKey = partitionKey };
}

using (var iterator = container.GetItemQueryIterator<T>(queryDefinition: queryDefinition, requestOptions: queryRequestOptions))
{
if (iterator is null)
{
throw new InvalidOperationException($"Unable to retrieve documents for container '{container.Id}'.");
}

return await ExtractCosmosDocumentsAsync(iterator);
}
}
using FeedIterator iterator = container.GetItemQueryStreamIterator(queryDefinition: queryDefinition, requestOptions: queryRequestOptions)
?? throw new InvalidOperationException($"Unable to retrieve documents for container '{container.Id}'.");

private async Task<IList<T>> ExtractCosmosDocumentsAsync<T>(FeedIterator<T> iterator)
{
var documentList = new List<T>();
while (iterator.HasMoreResults)
{
FeedResponse<T> response = await iterator.ReadNextAsync();
documentList.AddRange(response.Resource);
using ResponseMessage response = await iterator.ReadNextAsync();
response.EnsureSuccessStatusCode();
yield return response.Content;
}
return documentList;
}

private T CreateCosmosClient<T>(CosmosDBInputAttribute cosmosAttribute)
Expand All @@ -194,5 +184,18 @@ private T CreateCosmosClient<T>(CosmosDBInputAttribute cosmosAttribute)

return (T)cosmosReference;
}

// Need a non-generic type to cast to, and can't use IEnumerable directly (breaks json deserialization).
private abstract class CosmosStream
{
public abstract IEnumerable GetDocuments();
}

private class CosmosStream<T> : CosmosStream
{
public IEnumerable<T>? Documents { get; set; }

public override IEnumerable GetDocuments() => Documents!;
}
}
}
114 changes: 114 additions & 0 deletions extensions/Worker.Extensions.Shared/Reflection/ParameterBinder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections;
using System.Collections.Generic;
using System.Reflection;
using System.Threading.Tasks;

namespace Microsoft.Azure.Functions.Worker.Extensions
{
/// <summary>
/// Helpers for performing parameter binding.
/// </summary>
internal static class ParameterBinder
{
private const BindingFlags DeclaredOnlyLookup = BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance | BindingFlags.Static | BindingFlags.DeclaredOnly;

/// <summary>
/// Binds to a collection type.
/// </summary>
/// <param name="factory">Factory for producing an enumerable given the element type.</param>
/// <param name="collectionType">The collection type to bind to.</param>
/// <returns>The instantiated and populated collection.</returns>
public static async Task<object> BindCollectionAsync(Func<Type, IAsyncEnumerable<object>> factory, Type collectionType)
{
if (factory is null)
{
throw new ArgumentNullException(nameof(factory));
}

if (collectionType is null)
{
throw new ArgumentNullException(nameof(collectionType));
}

if (!collectionType.TryGetCollectionElementType(out Type? elementType))
{
throw new ArgumentException($"Type '{collectionType}' is not a collection type.", nameof(collectionType));
}

object? collection = null;
if (collectionType.IsConcreteType() && !collectionType.IsArray)
{
collection = Activator.CreateInstance(collectionType)!;
}
else if (IsListInterface(collectionType))
{
collection = Activator.CreateInstance(typeof(List<>).MakeGenericType(elementType!))!;
}
else if (collectionType.IsArray)
{
collection = Activator.CreateInstance(typeof(List<>).MakeGenericType(elementType!))!;
await BindCollectionAsync(factory(elementType!), collection);
IList list = (IList)collection;
Array arrayResult = Array.CreateInstance(elementType!, list.Count);
list.CopyTo(arrayResult, 0);
return arrayResult;
}
else
{
throw new ArgumentException($"Collection type '{collectionType}' is not supported for parameter binding.", nameof(collectionType));
}

await BindCollectionAsync(factory(elementType!), collection);
return collection;
}

/// <summary>
/// Binds to a collection.
/// </summary>
/// <param name="pageable">The pageable containing the items to populate the collection with.</param>
/// <param name="collection">The collection to populate.</param>
/// <returns>A task that completes when the collection has been populated.</returns>
public static async Task BindCollectionAsync(IAsyncEnumerable<object> pageable, object collection)
{
if (pageable is null)
{
throw new ArgumentNullException(nameof(pageable));
}

if (collection is null)
{
throw new ArgumentNullException(nameof(collection));
}

Action<object> add = GetAddMethod(collection);
await foreach (object item in pageable)
{
add(item);
}
}

private static bool IsListInterface(Type type)
{
return type.IsGenericType &&
(type.GetGenericTypeDefinition() == typeof(IList<>)
|| type.GetGenericTypeDefinition() == typeof(ICollection<>)
|| type.GetGenericTypeDefinition() == typeof(IEnumerable<>));
}

private static Action<object> GetAddMethod(object collection)
{
if (collection is IList list)
{
return e => list.Add(e);
}

MethodInfo method = collection.GetType().GetMethod("Add", DeclaredOnlyLookup)
?? throw new InvalidOperationException($"Could not find an 'Add' method on collection type '{collection.GetType()}'.");
return e => method.Invoke(collection, new[] { e });
}
}
}
Loading

0 comments on commit 3013ff0

Please sign in to comment.