Skip to content

Commit

Permalink
Add cql batch inserts (#110)
Browse files Browse the repository at this point in the history
  • Loading branch information
adelinag08 authored Aug 30, 2024
1 parent 9c6f963 commit 537fc1c
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 4 deletions.
2 changes: 1 addition & 1 deletion src/SnD.Sdk.Storage.Minio/Models/EventWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public record S3
/// <summary>
/// Gets or sets the object involved in the event.
/// </summary>
[JsonPropertyName("@object")]
[JsonPropertyName("object")]
public Object Object { get; set; }
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using Serilog;
using System.Diagnostics.CodeAnalysis;
using Serilog;

namespace Snd.Sdk.Logs.Providers.Configurations
{
/// <summary>
/// Extension methods for configuration of all sinks
/// </summary>
[ExcludeFromCodeCoverage]
public static class DefaultLoggingConfiguration
{
/// <summary>
Expand Down
20 changes: 19 additions & 1 deletion src/SnD.Sdk/Storage/Base/ICqlEntityService.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Threading;
using Akka;
using Akka.Streams.Dsl;
using System.Threading.Tasks;
Expand Down Expand Up @@ -30,6 +31,22 @@ public interface ICqlEntityService
/// <returns></returns>
Task<bool> UpsertEntity<T>(T entity, int? ttlSeconds = null, bool insertNulls = false);

/// <summary>
/// Inserts or updates a batch of entities in the table with optional TTL and null field handling.
/// </summary>
/// <typeparam name="T">The type of the entities to be upserted.</typeparam>
/// <param name="entities">A list of entities to be upserted.</param>
/// <param name="batchSize">The number of entities to be processed in each batch. Default is 1000.</param>
/// <param name="ttlSeconds">Optional time to live for the entities in seconds. Default is null.</param>
/// <param name="insertNulls">Specifies whether to merge non-supplied fields. Default is false.</param>
/// <param name="rateLimit">Rate limit for the operation. Default is 1000.</param>
/// <param name="rateLimitPeriod">The time period for the rate limit. Default is 1 second.</param>
/// <param name="cancellationToken">Token to monitor for cancellation requests.</param>
/// <returns>Returns a task that represents the asynchronous operation. The task result contains a boolean indicating success or failure.</returns>
public Task<bool> UpsertBatch<T>(List<T> entities, int batchSize = 1000, TimeSpan? ttlSeconds = null,
bool insertNulls = false, int rateLimit = 1000, TimeSpan rateLimitPeriod = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Creates or updates a pair of entities atomically with optional TTL.
/// </summary>
Expand Down Expand Up @@ -107,7 +124,8 @@ Source<T, NotUsed> GetEntities<T>(Func<Table<T>, CqlQuery<T>> selectEntitiesDele
/// <param name="pagingState">Page identifier to return.</param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
Task<IPage<T>> GetEntityPage<T>(Func<Table<T>, CqlQuery<T>> selectEntityDelegate, int? pageSize = null, byte[] pagingState = null);
Task<IPage<T>> GetEntityPage<T>(Func<Table<T>, CqlQuery<T>> selectEntityDelegate, int? pageSize = null,
byte[] pagingState = null);

/// <summary>
/// Reads a subset of a paged query using paging state blob. If not provided, will always return the first page.
Expand Down
68 changes: 68 additions & 0 deletions src/SnD.Sdk/Storage/Cql/CqlApiExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Polly;
using Polly.RateLimit;
using Polly.Retry;
using Snd.Sdk.Tasks;

namespace Snd.Sdk.Storage.Cql;

/// <summary>
/// Provides extension methods for CQL API.
/// </summary>
public static class CqlApiExtensions
{
/// <summary>
/// Executes a CQL API call with retry and rate limit policies.
/// </summary>
/// <typeparam name="TResult">The type of the result produced by the CQL API call.</typeparam>
/// <typeparam name="TCaller">The type of the caller for logging purposes.</typeparam>
/// <param name="cqlApiCall">The CQL API call to be executed.</param>
/// <param name="logger">The logger to log retry and rate limit information.</param>
/// <param name="rateLimit">The rate limit (number of requests) per specified period.</param>
/// <param name="rateLimitPeriod">The time period for the rate limit.</param>
/// <param name="cancellationToken">The cancellation token to cancel the operation.</param>
/// <returns>A task that represents the asynchronous operation, which produces the result of the CQL API call.</returns>
public static Task<TResult> ExecuteWithRetryAndRateLimit<TResult, TCaller>(
this Func<CancellationToken, Task<TResult>> cqlApiCall,
ILogger<TCaller> logger,
int rateLimit, TimeSpan rateLimitPeriod,
CancellationToken cancellationToken = default
)
{
var wrapPolicy = CreateRetryPolicy(logger).WrapAsync(Policy.RateLimitAsync(rateLimit, rateLimitPeriod));
var wrappedTask = cqlApiCall.WithWrapPolicy(wrapPolicy, cancellationToken);

return wrappedTask;
}

private static AsyncRetryPolicy CreateRetryPolicy(ILogger logger)
{
return Policy
.Handle<RateLimitRejectedException>()
.WaitAndRetryAsync(
retryCount: 5,
sleepDurationProvider: (retryAttempt, exception, context) =>
{
// Respect the retry after time provided by the rate limiter
if (exception is RateLimitRejectedException rateLimitException)
{
logger.LogWarning("Rate limit hit. Retrying after {RetryAfter} milliseconds",
rateLimitException.RetryAfter.TotalMilliseconds);
return rateLimitException.RetryAfter;
}

// Exponential backoff for other exceptions
return TimeSpan.FromSeconds(Math.Pow(2, retryAttempt));
},
onRetryAsync: (exception, timeSpan, retryCount, _) =>
{
logger.LogWarning(exception,
"Retrying batch after {SleepDuration}. Retry attempt {RetryCount}",
timeSpan, retryCount);
return Task.CompletedTask;
});
}
}
55 changes: 54 additions & 1 deletion src/SnD.Sdk/Storage/Cql/CqlService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
using Snd.Sdk.Tasks;
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Util;
using Cassandra;
Expand All @@ -17,6 +19,7 @@ namespace Snd.Sdk.Storage.Cql
/// <summary>
/// CQL-API compatible entity collection service.
/// </summary>
[ExcludeFromCodeCoverage]
public class CqlService : ICqlEntityService
{
private readonly ILogger<CqlService> logger;
Expand Down Expand Up @@ -159,7 +162,57 @@ public Task<bool> UpsertEntity<T>(T entity, int? ttlSeconds = null, bool insertN
}

/// <inheritdoc />
public Task<bool> UpsertAtomicPair<TFirst, TSecond>(TFirst first, TSecond second, int? ttlSeconds = null, bool insertNulls = false)
public Task<bool> UpsertBatch<T>(List<T> entities, int batchSize = 1000, TimeSpan? ttlSeconds = null,
bool insertNulls = false, int rateLimit = 1000, TimeSpan rateLimitPeriod = default, CancellationToken cancellationToken = default)
{
if (rateLimitPeriod == default)
{
rateLimitPeriod = TimeSpan.FromSeconds(1); // Default to 1000 requests per second
}
var totalBatches = (entities.Count + batchSize - 1) / batchSize;

return Task.WhenAll(Enumerable.Range(0, totalBatches)
.Select(i => CreateBatch(entities, i, batchSize, ttlSeconds, insertNulls))
.Select((batch, i) => ExecuteBatch(batch, i, rateLimit, rateLimitPeriod, cancellationToken)))
.TryMap(results => results.All(r => r), exception =>
{
this.logger.LogError(exception, "Failed to insert batch");
return false;
});
}

private Batch CreateBatch<T>(List<T> entities, int batchIndex, int batchSize, TimeSpan? ttlSeconds, bool insertNulls)
{
var batch = this.session.CreateBatch(BatchType.Unlogged);
var start = batchIndex * batchSize;
var end = Math.Min(start + batchSize, entities.Count);
int? ttl = ttlSeconds.HasValue ? (int)ttlSeconds.Value.TotalSeconds : null;

for (var j = start; j < end; j++)
{
batch.Append(GetInsertCommand(entities[j], ttl, insertNulls));
}

return batch;
}

private Task<bool> ExecuteBatch(Batch batch, int batchIndex, int rateLimit, TimeSpan rateLimitPeriod, CancellationToken cancellationToken = default)
{
var cqlUpsert = (CancellationToken ct) => batch.ExecuteAsync().TryMap((() =>
{
this.logger.LogDebug("Successfully inserted batch at index {BatchIndex}. Trace: {queryTrace}", batchIndex, batch.QueryTrace);
return true;
}), exception =>
{
this.logger.LogError(exception, "Failed to insert batch at index {BatchIndex}.", batchIndex);
return false;
});
return cqlUpsert.ExecuteWithRetryAndRateLimit(this.logger, rateLimit, rateLimitPeriod, cancellationToken);
}

/// <inheritdoc />
public Task<bool> UpsertAtomicPair<TFirst, TSecond>(TFirst first, TSecond second, int? ttlSeconds = null,
bool insertNulls = false)
{
var loggedBatch = this.session.CreateBatch(BatchType.Logged);

Expand Down
2 changes: 2 additions & 0 deletions src/SnD.Sdk/Storage/Providers/RedisServiceProvider.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Diagnostics.CodeAnalysis;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Snd.Sdk.Storage.Base;
Expand All @@ -10,6 +11,7 @@ namespace Snd.Sdk.Storage.Providers;
/// <summary>
/// Provider for Redis service.
/// </summary>
[ExcludeFromCodeCoverage]
public static class RedisServiceProvider
{
/// <summary>
Expand Down
14 changes: 14 additions & 0 deletions src/SnD.Sdk/Tasks/TaskExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Threading.Tasks;
using Akka;
using Polly.Retry;
using Polly.Wrap;

namespace Snd.Sdk.Tasks
{
Expand Down Expand Up @@ -119,5 +120,18 @@ public static Task<TResult> WithRetryPolicy<TResult>(this Func<CancellationToken
{
return policy.ExecuteAsync(wrapped, cancellationToken);
}

/// <summary>
/// Applies the specified policy wrap to the specified task and returns the result.
/// </summary>
/// <typeparam name="TResult">The type of the result produced by the task.</typeparam>
/// <param name="wrapped">The task to which to apply the policy wrap.</param>
/// <param name="policy">The policy wrap to apply.</param>
/// <param name="cancellationToken">Optional cancellation token for this policy wrapper.</param>
/// <returns>A task that represents the asynchronous operation, which produces the result of the wrapped task.</returns>
public static Task<TResult> WithWrapPolicy<TResult>(this Func<CancellationToken, Task<TResult>> wrapped, AsyncPolicyWrap policy, CancellationToken cancellationToken = default)
{
return policy.ExecuteAsync(wrapped, cancellationToken);
}
}
}
37 changes: 37 additions & 0 deletions test/Storage/CqlTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Moq;
using Snd.Sdk.Storage.Cql;
using Xunit;

namespace Snd.Sdk.Tests.Storage
{
public class CqlTests : IClassFixture<AkkaFixture>, IClassFixture<LoggerFixture>
{
private readonly AkkaFixture akkaFixture;
private readonly LoggerFixture loggerFixture;

public CqlTests(AkkaFixture akkaFixture, LoggerFixture loggerFixture)
{
this.akkaFixture = akkaFixture;
this.loggerFixture = loggerFixture;
}

[Theory]
[InlineData(1000, 1, true)]
[InlineData(50000, 1, true)]
public async Task ExecuteWithRetryAndRateLimit_ExecutesSuccessfully(int rateLimit, int rateLimitPeriodSeconds, bool expectedResult)
{
var loggerMock = new Mock<ILogger<object>>();
var cqlApiCallMock = new Mock<Func<CancellationToken, Task<bool>>>();
cqlApiCallMock.Setup(c => c(It.IsAny<CancellationToken>())).ReturnsAsync(expectedResult);
var cancellationToken = CancellationToken.None;

var result = await cqlApiCallMock.Object.ExecuteWithRetryAndRateLimit(loggerMock.Object, rateLimit, TimeSpan.FromSeconds(rateLimitPeriodSeconds), cancellationToken);

Assert.True(result);
}
}
}

0 comments on commit 537fc1c

Please sign in to comment.