Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cql batch inserts #110

Merged
merged 17 commits into from
Aug 30, 2024
2 changes: 1 addition & 1 deletion src/SnD.Sdk.Storage.Minio/Models/EventWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public record S3
/// <summary>
/// Gets or sets the object involved in the event.
/// </summary>
[JsonPropertyName("@object")]
[JsonPropertyName("object")]
public Object Object { get; set; }
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using Serilog;
using System.Diagnostics.CodeAnalysis;
using Serilog;

namespace Snd.Sdk.Logs.Providers.Configurations
{
/// <summary>
/// Extension methods for configuration of all sinks
/// </summary>
[ExcludeFromCodeCoverage]
public static class DefaultLoggingConfiguration
{
/// <summary>
Expand Down
15 changes: 15 additions & 0 deletions src/SnD.Sdk/Storage/Base/ICqlEntityService.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Threading;
using Akka;
using Akka.Streams.Dsl;
using System.Threading.Tasks;
Expand Down Expand Up @@ -30,6 +31,20 @@ public interface ICqlEntityService
/// <returns></returns>
Task<bool> UpsertEntity<T>(T entity, int? ttlSeconds = null, bool insertNulls = false);

/// <summary>
/// Inserts or updates a batch of entities in the table with optional TTL and null field handling.
/// </summary>
/// <typeparam name="T">The type of the entities to be upserted.</typeparam>
/// <param name="entities">A list of entities to be upserted.</param>
/// <param name="batchSize">The number of entities to be processed in each batch. Default is 1000.</param>
/// <param name="ttlSeconds">Optional time to live for the entities in seconds. Default is null.</param>
/// <param name="insertNulls">Specifies whether to merge non-supplied fields. Default is false.</param>
/// <param name="rateLimit">Rate limit for the operation. Default is "1000 per second".</param>
/// <param name="cancellationToken">Token to monitor for cancellation requests.</param>
/// <returns>Returns a task that represents the asynchronous operation. The task result contains a boolean indicating success or failure.</returns>
Task<bool> UpsertBatch<T>(List<T> entities, int batchSize = 1000, int? ttlSeconds = null,
adelinag08 marked this conversation as resolved.
Show resolved Hide resolved
bool insertNulls = false, string rateLimit = "1000 per second", CancellationToken cancellationToken = default);

/// <summary>
/// Creates or updates a pair of entities atomically with optional TTL.
/// </summary>
Expand Down
83 changes: 83 additions & 0 deletions src/SnD.Sdk/Storage/Cql/CqlApiExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Polly;
using Polly.RateLimit;
using Polly.Retry;
using Snd.Sdk.Tasks;

namespace Snd.Sdk.Storage.Cql;

/// <summary>
/// Provides extension methods for CQL API.
/// </summary>
public static class CqlApiExtensions
{
/// <summary>
/// Executes a CQL API call with retry and rate limit policies.
/// </summary>
/// <typeparam name="TResult">The type of the result produced by the CQL API call.</typeparam>
/// <typeparam name="TCaller">The type of the caller for logging purposes.</typeparam>
/// <param name="cqlApiCall">The CQL API call to be executed.</param>
/// <param name="logger">The logger to log retry and rate limit information.</param>
/// <param name="rateLimit">The rate limit string (e.g., "1000 per second").</param>
/// <param name="cancellationToken">The cancellation token to cancel the operation.</param>
/// <returns>A task that represents the asynchronous operation, which produces the result of the CQL API call.</returns>
public static Task<TResult> ExecuteWithRetryAndRateLimit<TResult, TCaller>(
this Func<CancellationToken, Task<TResult>> cqlApiCall,
ILogger<TCaller> logger, string rateLimit = "1000 per second",
CancellationToken cancellationToken = default
)
{
var wrapPolicy = CreateRetryPolicy(logger).WrapAsync(CreateRateLimitPolicy(rateLimit));
var wrappedTask = cqlApiCall.WithWrapPolicy(wrapPolicy, cancellationToken);

return wrappedTask;
}

private static AsyncRateLimitPolicy CreateRateLimitPolicy(string rateLimit)
{
var rateParts = rateLimit.Split(' ');
var limit = int.Parse(rateParts[0]);
adelinag08 marked this conversation as resolved.
Show resolved Hide resolved
var perUnit = rateParts[2];

var timeSpan = perUnit.ToLower() switch
{
"second" => TimeSpan.FromSeconds(1),
"minute" => TimeSpan.FromMinutes(1),
"hour" => TimeSpan.FromHours(1),
_ => throw new ArgumentException("Invalid rate limit unit.")
adelinag08 marked this conversation as resolved.
Show resolved Hide resolved
};

return Policy.RateLimitAsync(limit, timeSpan);
}

private static AsyncRetryPolicy CreateRetryPolicy(ILogger logger)
{
return Policy
.Handle<RateLimitRejectedException>()
.WaitAndRetryAsync(
retryCount: 5,
sleepDurationProvider: (retryAttempt, exception, context) =>
{
// Respect the retry after time provided by the rate limiter
if (exception is RateLimitRejectedException rateLimitException)
{
logger.LogWarning("Rate limit hit. Retrying after {RetryAfter} milliseconds",
rateLimitException.RetryAfter.TotalMilliseconds);
return rateLimitException.RetryAfter;
}

// Exponential backoff for other exceptions
return TimeSpan.FromSeconds(Math.Pow(2, retryAttempt));
},
onRetryAsync: (exception, timeSpan, retryCount, _) =>
{
logger.LogWarning(exception,
"Retrying batch after {SleepDuration}. Retry attempt {RetryCount}",
timeSpan, retryCount);
return Task.CompletedTask;
});
}
}
48 changes: 47 additions & 1 deletion src/SnD.Sdk/Storage/Cql/CqlService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
using Snd.Sdk.Tasks;
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Util;
using Cassandra;
Expand All @@ -17,6 +19,7 @@ namespace Snd.Sdk.Storage.Cql
/// <summary>
/// CQL-API compatible entity collection service.
/// </summary>
[ExcludeFromCodeCoverage]
public class CqlService : ICqlEntityService
{
private readonly ILogger<CqlService> logger;
Expand Down Expand Up @@ -159,7 +162,50 @@ public Task<bool> UpsertEntity<T>(T entity, int? ttlSeconds = null, bool insertN
}

/// <inheritdoc />
public Task<bool> UpsertAtomicPair<TFirst, TSecond>(TFirst first, TSecond second, int? ttlSeconds = null, bool insertNulls = false)
public Task<bool> UpsertBatch<T>(List<T> entities, int batchSize = 1000, int? ttlSeconds = null,
bool insertNulls = false, string rateLimit = "1000 per second", CancellationToken cancellationToken = default)
{
var totalBatches = (entities.Count + batchSize - 1) / batchSize;
for (int i = 0; i < totalBatches; i++)
{
var batch = CreateBatch(entities, i, batchSize, ttlSeconds, insertNulls);
ExecuteBatch(batch, i, rateLimit, cancellationToken);
}

return Task.FromResult(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about error handling? :)

}

private Batch CreateBatch<T>(List<T> entities, int batchIndex, int batchSize, int? ttlSeconds, bool insertNulls)
{
var batch = this.session.CreateBatch(BatchType.Unlogged);
var start = batchIndex * batchSize;
var end = Math.Min(start + batchSize, entities.Count);

for (var j = start; j < end; j++)
{
batch.Append(GetInsertCommand(entities[j], ttlSeconds, insertNulls));
}

return batch;
}

private Task<bool> ExecuteBatch(Batch batch, int batchIndex, string rateLimit = "1000 per second", CancellationToken cancellationToken = default)
{
var cqlUpsert = (CancellationToken ct) => batch.ExecuteAsync().TryMap((() =>
{
this.logger.LogDebug("Successfully inserted batch at index {BatchIndex}. Trace: {queryTrace}", batchIndex, batch.QueryTrace);
return true;
}), exception =>
{
this.logger.LogError(exception, "Failed to insert batch at index {BatchIndex}.", batchIndex);
return false;
});
return cqlUpsert.ExecuteWithRetryAndRateLimit(this.logger, rateLimit, cancellationToken);
}

/// <inheritdoc />
public Task<bool> UpsertAtomicPair<TFirst, TSecond>(TFirst first, TSecond second, int? ttlSeconds = null,
bool insertNulls = false)
{
var loggedBatch = this.session.CreateBatch(BatchType.Logged);

Expand Down
2 changes: 2 additions & 0 deletions src/SnD.Sdk/Storage/Providers/RedisServiceProvider.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Diagnostics.CodeAnalysis;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Snd.Sdk.Storage.Base;
Expand All @@ -10,6 +11,7 @@ namespace Snd.Sdk.Storage.Providers;
/// <summary>
/// Provider for Redis service.
/// </summary>
[ExcludeFromCodeCoverage]
public static class RedisServiceProvider
{
/// <summary>
Expand Down
14 changes: 14 additions & 0 deletions src/SnD.Sdk/Tasks/TaskExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Threading.Tasks;
using Akka;
using Polly.Retry;
using Polly.Wrap;

namespace Snd.Sdk.Tasks
{
Expand Down Expand Up @@ -119,5 +120,18 @@ public static Task<TResult> WithRetryPolicy<TResult>(this Func<CancellationToken
{
return policy.ExecuteAsync(wrapped, cancellationToken);
}

/// <summary>
/// Applies the specified policy wrap to the specified task and returns the result.
/// </summary>
/// <typeparam name="TResult">The type of the result produced by the task.</typeparam>
/// <param name="wrapped">The task to which to apply the policy wrap.</param>
/// <param name="policy">The policy wrap to apply.</param>
/// <param name="cancellationToken">Optional cancellation token for this policy wrapper.</param>
/// <returns>A task that represents the asynchronous operation, which produces the result of the wrapped task.</returns>
public static Task<TResult> WithWrapPolicy<TResult>(this Func<CancellationToken, Task<TResult>> wrapped, AsyncPolicyWrap policy, CancellationToken cancellationToken = default)
{
return policy.ExecuteAsync(wrapped, cancellationToken);
}
}
}
47 changes: 47 additions & 0 deletions test/Storage/CqlTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Moq;
using Snd.Sdk.Storage.Cql;
using Xunit;

namespace Snd.Sdk.Tests.Storage
{
public class CqlTests : IClassFixture<AkkaFixture>, IClassFixture<LoggerFixture>
{
private readonly AkkaFixture akkaFixture;
private readonly LoggerFixture loggerFixture;

public CqlTests(AkkaFixture akkaFixture, LoggerFixture loggerFixture)
{
this.akkaFixture = akkaFixture;
this.loggerFixture = loggerFixture;
}

[Theory]
[InlineData("1000 per second", true)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to make a dedicated test for rate limit parsing (with negative cases).

[InlineData("50000 per minute", true)]
public async Task ExecuteWithRetryAndRateLimit_ExecutesSuccessfully(string rateLimit, bool expectedResult)
{
var loggerMock = new Mock<ILogger<object>>();
var cqlApiCallMock = new Mock<Func<CancellationToken, Task<bool>>>();
cqlApiCallMock.Setup(c => c(It.IsAny<CancellationToken>())).ReturnsAsync(expectedResult);
var cancellationToken = CancellationToken.None;

var result = await cqlApiCallMock.Object.ExecuteWithRetryAndRateLimit(loggerMock.Object, rateLimit, cancellationToken);

Assert.True(result);
}

[Fact]
public async Task ExecuteWithRetryAndRateLimit_InvalidRateLimitUnit()
{
var loggerMock = new Mock<ILogger<object>>();
var cqlApiCallMock = new Mock<Func<CancellationToken, Task<int>>>();
var cancellationToken = CancellationToken.None;

await Assert.ThrowsAsync<ArgumentException>(() => cqlApiCallMock.Object.ExecuteWithRetryAndRateLimit(loggerMock.Object, "1000 per min", cancellationToken));
}
}
}
Loading