Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Cosmos pagination #34103

Merged
merged 6 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/EFCore.Analyzers/EFDiagnostics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ public static class EFDiagnostics
public const string ProviderExperimentalApi = "EF9002";
public const string PrecompiledQueryExperimental = "EF9100";
public const string MetricsExperimental = "EF9101";
public const string PagingExperimental = "EF9102";
}
4 changes: 2 additions & 2 deletions src/EFCore.Cosmos/Diagnostics/CosmosQueryEventData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public CosmosQueryEventData(
EventDefinitionBase eventDefinition,
Func<EventDefinitionBase, EventData, string> messageGenerator,
string containerId,
PartitionKey partitionKeyValue,
PartitionKey? partitionKeyValue,
IReadOnlyList<(string Name, object? Value)> parameters,
string querySql,
bool logSensitiveData)
Expand All @@ -46,7 +46,7 @@ public CosmosQueryEventData(
/// <summary>
/// The key of the Cosmos partition that the query is using.
/// </summary>
public virtual PartitionKey PartitionKeyValue { get; }
public virtual PartitionKey? PartitionKeyValue { get; }

/// <summary>
/// Name/values for each parameter in the Cosmos Query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public CosmosQueryExecutedEventData(
double requestCharge,
string activityId,
string containerId,
PartitionKey partitionKeyValue,
PartitionKey? partitionKeyValue,
IReadOnlyList<(string Name, object? Value)> parameters,
string querySql,
bool logSensitiveData)
Expand Down Expand Up @@ -70,7 +70,7 @@ public CosmosQueryExecutedEventData(
/// <summary>
/// The key of the Cosmos partition that the query is using.
/// </summary>
public virtual PartitionKey PartitionKeyValue { get; }
public virtual PartitionKey? PartitionKeyValue { get; }

/// <summary>
/// Name/values for each parameter in the Cosmos Query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static void SyncNotSupported(
public static void ExecutingSqlQuery(
this IDiagnosticsLogger<DbLoggerCategory.Database.Command> diagnostics,
string containerId,
PartitionKey partitionKeyValue,
PartitionKey? partitionKeyValue,
CosmosSqlQuery cosmosSqlQuery)
{
var definition = CosmosResources.LogExecutingSqlQuery(diagnostics);
Expand All @@ -66,7 +66,7 @@ public static void ExecutingSqlQuery(
definition.Log(
diagnostics,
containerId,
logSensitiveData ? partitionKeyValue.ToString() : "?",
logSensitiveData ? partitionKeyValue?.ToString() : "?",
FormatParameters(cosmosSqlQuery.Parameters, logSensitiveData && cosmosSqlQuery.Parameters.Count > 0),
Environment.NewLine,
cosmosSqlQuery.Query);
Expand Down Expand Up @@ -158,7 +158,7 @@ public static void ExecutedReadNext(
double requestCharge,
string activityId,
string containerId,
PartitionKey partitionKeyValue,
PartitionKey? partitionKeyValue,
CosmosSqlQuery cosmosSqlQuery)
{
var definition = CosmosResources.LogExecutedReadNext(diagnostics);
Expand All @@ -177,7 +177,7 @@ public static void ExecutedReadNext(
requestCharge,
activityId,
containerId,
logSensitiveData ? partitionKeyValue.ToString() : "?",
logSensitiveData ? partitionKeyValue?.ToString() : "?",
FormatParameters(cosmosSqlQuery.Parameters, logSensitiveData && cosmosSqlQuery.Parameters.Count > 0),
Environment.NewLine,
cosmosSqlQuery.Query));
Expand Down
1 change: 1 addition & 0 deletions src/EFCore.Cosmos/EFCore.Cosmos.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
<PackageTags>$(PackageTags);CosmosDb;SQL API</PackageTags>
<ImplicitUsings>true</ImplicitUsings>
<NoWarn>$(NoWarn);EF9101</NoWarn> <!-- Metrics is experimental -->
<NoWarn>$(NoWarn);EF9102</NoWarn> <!-- Paging is experimental -->
</PropertyGroup>

<ItemGroup>
Expand Down
49 changes: 49 additions & 0 deletions src/EFCore.Cosmos/Extensions/CosmosQueryableExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics.CodeAnalysis;
using JetBrains.Annotations;
using Microsoft.EntityFrameworkCore.Cosmos.Query.Internal;
using Microsoft.EntityFrameworkCore.Query.Internal;
Expand Down Expand Up @@ -177,4 +178,52 @@ private static FromSqlQueryRootExpression GenerateFromSqlQueryRoot(
sql,
Expression.Constant(arguments));
}

internal static readonly MethodInfo ToPageAsyncMethodInfo
= typeof(CosmosQueryableExtensions).GetMethod(nameof(ToPageAsync))!;


/// <summary>
/// Allows paginating through query results by repeatedly executing the same query, passing continuation tokens to retrieve
/// successive pages of the result set, and specifying the maximum number of results per page.
/// </summary>
/// <param name="source">The source query.</param>
/// <param name="continuationToken">
/// An optional continuation token returned from a previous execution of this query via
/// <see cref="CosmosPage{T}.ContinuationToken" />. If <see langword="null" />, retrieves query results from the start.
/// </param>
/// <param name="pageSize">
/// The maximum number of results in the returned <see cref="CosmosPage{T}" />. The page may contain fewer results if the database
/// did not contain enough matching results.
/// </param>
/// <param name="responseContinuationTokenLimitInKb">Limits the length of continuation token in the query response.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> to observe while waiting for the task to complete.</param>
/// <returns>A <see cref="CosmosPage{T}" /> containing at most <paramref name="pageSize" /> results.</returns>
[Experimental(EFDiagnostics.PagingExperimental)]
public static Task<CosmosPage<TSource>> ToPageAsync<TSource>(
this IQueryable<TSource> source,
int pageSize,
string? continuationToken,
int? responseContinuationTokenLimitInKb = null,
CancellationToken cancellationToken = default)
{
if (source.Provider is not IAsyncQueryProvider provider)
{
throw new InvalidOperationException(CoreStrings.IQueryableProviderNotAsync);
}

return provider.ExecuteAsync<Task<CosmosPage<TSource>>>(
Expression.Call(
instance: null,
method: ToPageAsyncMethodInfo.MakeGenericMethod(typeof(TSource)),
arguments:
[
source.Expression,
Expression.Constant(pageSize, typeof(int)),
Expression.Constant(continuationToken, typeof(string)),
Expression.Constant(responseContinuationTokenLimitInKb, typeof(int?)),
Expression.Constant(default(CancellationToken), typeof(CancellationToken))
]),
cancellationToken);
}
}
32 changes: 32 additions & 0 deletions src/EFCore.Cosmos/Query/CosmosPage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics.CodeAnalysis;

// ReSharper disable once CheckNamespace
namespace Microsoft.EntityFrameworkCore;

/// <summary>
/// A single page of results returned from a user query; can be used to paginate through long result-sets.
/// Returned by <see cref="CosmosQueryableExtensions.ToPageAsync{T}" />.
/// </summary>
/// <param name="values">The values contained in this page.</param>
/// <param name="continuationToken">
/// The continuation token for fetching further results from the query. Is <see langword="null" /> or empty when there are no more
/// results.
/// </param>
/// <typeparam name="T">The type of values contained in the page.</typeparam>
[Experimental(EFDiagnostics.PagingExperimental)]
public readonly struct CosmosPage<T>(IReadOnlyList<T> values, string? continuationToken)
{
/// <summary>
/// The values contained in this page.
/// </summary>
public IReadOnlyList<T> Values { get; } = values;

/// <summary>
/// The continuation token for fetching further results from the query. Is <see langword="null" /> or empty when there are no more
/// results.
/// </summary>
public string? ContinuationToken { get; } = continuationToken;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Diagnostics.CodeAnalysis;
using Microsoft.EntityFrameworkCore.Cosmos.Internal;
using Microsoft.EntityFrameworkCore.Cosmos.Metadata.Internal;
using Microsoft.EntityFrameworkCore.Cosmos.Query.Internal.Expressions;
using Microsoft.EntityFrameworkCore.Cosmos.Storage.Internal;
using Microsoft.EntityFrameworkCore.Internal;

Expand Down Expand Up @@ -86,6 +87,64 @@ protected CosmosQueryableMethodTranslatingExpressionVisitor(
_subquery = true;
}

/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public override Expression Translate(Expression expression)
{
// Handle ToPageAsync(), which can only ever be the top-level node in the query tree.
if (expression is MethodCallExpression { Method: var method, Arguments: var arguments }
&& method.DeclaringType == typeof(CosmosQueryableExtensions)
&& method.Name is nameof(CosmosQueryableExtensions.ToPageAsync))
{
var source = base.Translate(arguments[0]);

if (source == QueryCompilationContext.NotTranslatedExpression)
{
return source;
}

if (source is not ShapedQueryExpression shapedQuery)
{
throw new UnreachableException($"Expected a ShapedQueryExpression but found {source.GetType().Name}");
}

// The arguments to ToPage/ToPageAsync must have been parameterized by the funcletizer, since they're non-lambda arguments to
// a top-level function (like Skip/Take). Translate to get these as SqlParameterExpressions.
if (arguments is not
[
_, // source
ParameterExpression maxItemCount,
ParameterExpression continuationToken,
ParameterExpression responseContinuationTokenLimitInKb,
_ // cancellation token
]
|| _sqlTranslator.Translate(maxItemCount) is not SqlParameterExpression translatedMaxItemCount
|| _sqlTranslator.Translate(continuationToken) is not SqlParameterExpression translatedContinuationToken
|| _sqlTranslator.Translate(responseContinuationTokenLimitInKb) is not SqlParameterExpression
translatedResponseContinuationTokenLimitInKb)
{
throw new UnreachableException("ToPageAsync without the appropriate parameterized arguments");
}

// Wrap the shaper for the entire query in a PagingExpression which also contains the paging arguments, and update
// the final cardinality to Single (since we'll be returning a single Page).
return shapedQuery
.UpdateShaperExpression(new PagingExpression(
shapedQuery.ShaperExpression,
translatedMaxItemCount,
translatedContinuationToken,
translatedResponseContinuationTokenLimitInKb,
typeof(CosmosPage<>).MakeGenericType(shapedQuery.ShaperExpression.Type)))
.UpdateResultCardinality(ResultCardinality.Single);
}

return base.Translate(expression);
}

/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
/// the same compatibility standards as public APIs. It may be changed or removed without notice in
Expand Down
Loading