From bb2537a6bd60f7dc34b811799ee9178a43f7c2b5 Mon Sep 17 00:00:00 2001 From: darkwatchuk Date: Tue, 1 Oct 2024 12:44:06 +0100 Subject: [PATCH] Added Domain support for stream mirroring and sourcing and KV full support for the same. (#631) * Added Sources and Mirror to KV Store Creation * Removed the need for StreamSource.Domain * Fixed trailing whitespace * Whitespace fix for KeyValueStoreTest * Added Domain support for stream sourcing. * Test fix * Keep caller's config intact. * Test fix * Remove unused import in StreamSource.cs Deleted the 'System.Diagnostics.Metrics' import as it was not being used anywhere in the file. This change helps in maintaining a clean codebase with only necessary dependencies. * Remove unused using directive Deleted an unnecessary directive for System.Xml.Schema in NatsJSContext.cs. This cleanup aids in maintaining clean and efficient code. * Refactor initialization logic for stream configuration. Clean up redundant initializations and streamline the handling of `subjects`, `mirror`, and `sources` to improve code clarity. Ensure default assignments are explicitly defined within conditional branches for better code maintainability. * Refactor mirror assignment with ShallowCopy method Simplify the mirror object instantiation by using the ShallowCopy method, reducing code redundancy. This change improves readability and maintenance by encapsulating the cloning logic within the method. * Refactor null and count check for config.Sources Updated the conditional check for `config.Sources` to use pattern matching, improving readability and adhering to modern C# conventions. This change ensures cleaner and more maintainable code. * Skip specific test for NATS servers earlier than v2.10 This commit updates the KeyValueStoreTest to skip the Test_CombinedSources test for NATS server versions earlier than 2.10 since some of the mirroring features are introduced with 2.10. It also removes unnecessary retry delay and timeout parameters from the test configuration. * Use with syntax to clone * Fix build warnings * Fix build warnings and add test * Fix test --------- Co-authored-by: Ziya Suzen --- .github/workflows/test.yml | 7 ++ src/NATS.Client.Core/Nuid.cs | 4 +- .../Internal/netstandard.cs | 4 -- .../Models/StreamSource.cs | 9 +++ .../NatsJSContext.Streams.cs | 32 +++++++++ src/NATS.Client.JetStream/NatsJSContext.cs | 15 ++++ src/NATS.Client.KeyValueStore/NatsKVConfig.cs | 17 +++-- .../NatsKVContext.cs | 70 +++++++++++++++++-- .../KeyValueStoreTest.cs | 47 +++++++++++++ .../NatsKVContextFactoryTest.cs | 2 + .../NatsObjContextFactoryTest.cs | 2 + .../NATS.Client.Platform.Windows.Tests.csproj | 2 + .../test.runsettings | 7 ++ 13 files changed, 199 insertions(+), 19 deletions(-) create mode 100644 tests/NATS.Client.Simplified.Tests/test.runsettings diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d0baf8a07..d4ed5a0f1 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -94,6 +94,13 @@ jobs: cd tests/NATS.Client.Services.Tests dotnet test -c Release --no-build + - name: Test Simplified + run: | + killall nats-server 2> /dev/null | echo -n + nats-server -v + cd tests/NATS.Client.Simplified.Tests + dotnet test -c Release --no-build + - name: Test OpenTelemetry run: | killall nats-server 2> /dev/null | echo -n diff --git a/src/NATS.Client.Core/Nuid.cs b/src/NATS.Client.Core/Nuid.cs index 8994f3f28..21d822e7d 100644 --- a/src/NATS.Client.Core/Nuid.cs +++ b/src/NATS.Client.Core/Nuid.cs @@ -18,7 +18,7 @@ namespace NATS.Client.Core; [SkipLocalsInit] public sealed class Nuid { - // NuidLength, PrefixLength, SequentialLength were nuint (System.UIntPtr) in the original code + // NuidLength, PrefixLength, SequentialLength were nuint (System.UIntPtr) in the original code, // however, they were changed to uint to fix the compilation error for IL2CPP Unity projects. // With nuint, the following error occurs in Unity Linux IL2CPP builds: // Error: IL2CPP error for method 'System.Char[] NATS.Client.Core.Internal.NuidWriter::Refresh(System.UInt64&)' @@ -88,7 +88,7 @@ private static bool TryWriteNuidCore(Span buffer, Span prefix, ulong ref var digitsPtr = ref MemoryMarshal.GetReference(Digits); // write backwards so the last two characters change the fastest - for (var i = NuidLength; i > PrefixLength;) + for (nuint i = NuidLength; i > PrefixLength;) { i--; var digitIndex = (nuint)(sequential % Base); diff --git a/src/NATS.Client.JetStream/Internal/netstandard.cs b/src/NATS.Client.JetStream/Internal/netstandard.cs index f14282452..66b69b9b4 100644 --- a/src/NATS.Client.JetStream/Internal/netstandard.cs +++ b/src/NATS.Client.JetStream/Internal/netstandard.cs @@ -5,10 +5,6 @@ namespace System.Runtime.CompilerServices { - internal class ExtensionAttribute : Attribute - { - } - internal sealed class CompilerFeatureRequiredAttribute : Attribute { public CompilerFeatureRequiredAttribute(string featureName) diff --git a/src/NATS.Client.JetStream/Models/StreamSource.cs b/src/NATS.Client.JetStream/Models/StreamSource.cs index 684b7d667..0529bcb47 100644 --- a/src/NATS.Client.JetStream/Models/StreamSource.cs +++ b/src/NATS.Client.JetStream/Models/StreamSource.cs @@ -54,4 +54,13 @@ public record StreamSource [System.Text.Json.Serialization.JsonPropertyName("external")] [System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)] public ExternalStreamSource? External { get; set; } + + /// + /// This field is a convenience for setting up an ExternalStream. + /// If set, the value here is used to calculate the JetStreamAPI prefix. + /// This field is never serialized to the server. This value cannot be set + /// if external is set. + /// + [System.Text.Json.Serialization.JsonIgnore] + public string? Domain { get; set; } } diff --git a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs index 53d61cbdd..b9b428b89 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.Streams.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.Streams.cs @@ -20,6 +20,38 @@ public async ValueTask CreateStreamAsync( CancellationToken cancellationToken = default) { ThrowIfInvalidStreamName(config.Name, nameof(config.Name)); + + // keep caller's config intact. + config = config with { }; + + // If we have a mirror and an external domain, convert to ext.APIPrefix. + if (config.Mirror != null && !string.IsNullOrEmpty(config.Mirror.Domain)) + { + config.Mirror = config.Mirror with { }; + ConvertDomain(config.Mirror); + } + + // Check sources for the same. + if (config.Sources != null && config.Sources.Count > 0) + { + ICollection? sources = []; + foreach (var ss in config.Sources) + { + if (!string.IsNullOrEmpty(ss.Domain)) + { + var remappedDomainSource = ss with { }; + ConvertDomain(remappedDomainSource); + sources.Add(remappedDomainSource); + } + else + { + sources.Add(ss); + } + } + + config.Sources = sources; + } + var response = await JSRequestResponseAsync( subject: $"{Opts.Prefix}.STREAM.CREATE.{config.Name}", config, diff --git a/src/NATS.Client.JetStream/NatsJSContext.cs b/src/NATS.Client.JetStream/NatsJSContext.cs index 66ed5cacc..c0c6f9094 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.cs @@ -327,6 +327,21 @@ internal async ValueTask> JSRequestAsync throw new ArgumentException("Stream name cannot contain ' ', '.'", paramName); diff --git a/src/NATS.Client.KeyValueStore/NatsKVConfig.cs b/src/NATS.Client.KeyValueStore/NatsKVConfig.cs index dad8b7086..3ff86e67d 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVConfig.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVConfig.cs @@ -1,3 +1,5 @@ +using NATS.Client.JetStream.Models; + namespace NATS.Client.KeyValueStore; /// @@ -61,12 +63,15 @@ public record NatsKVConfig /// public bool Compression { get; init; } - // TODO: Bucket mirror configuration. - // pub mirror: Option, - // Bucket sources configuration. - // pub sources: Option>, - // Allow mirrors using direct API. - // pub mirror_direct: bool, + /// + /// Mirror defines the configuration for mirroring another KeyValue store + /// + public StreamSource? Mirror { get; init; } + + /// + /// Sources defines the configuration for sources of a KeyValue store. + /// + public ICollection? Sources { get; set; } } /// diff --git a/src/NATS.Client.KeyValueStore/NatsKVContext.cs b/src/NATS.Client.KeyValueStore/NatsKVContext.cs index 6c4279739..cd0b1871c 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVContext.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVContext.cs @@ -173,9 +173,6 @@ private static string ExtractBucketName(string streamName) private static StreamConfig CreateStreamConfig(NatsKVConfig config) { - // TODO: KV Mirrors - var subjects = new[] { $"$KV.{config.Bucket}.>" }; - long history; if (config.History > 0) { @@ -203,6 +200,66 @@ private static StreamConfig CreateStreamConfig(NatsKVConfig config) var replicas = config.NumberOfReplicas > 0 ? config.NumberOfReplicas : 1; + string[]? subjects; + StreamSource? mirror; + ICollection? sources; + bool mirrorDirect; + + if (config.Mirror != null) + { + mirror = config.Mirror with + { + Name = config.Mirror.Name.StartsWith(KvStreamNamePrefix) + ? config.Mirror.Name + : BucketToStream(config.Mirror.Name), + }; + mirrorDirect = true; + subjects = default; + sources = default; + } + else if (config.Sources is { Count: > 0 }) + { + sources = []; + foreach (var ss in config.Sources) + { + string? sourceBucketName; + if (ss.Name.StartsWith(KvStreamNamePrefix)) + { + sourceBucketName = ss.Name.Substring(KvStreamNamePrefixLen); + } + else + { + sourceBucketName = ss.Name; + ss.Name = BucketToStream(ss.Name); + } + + if (ss.External == null || sourceBucketName != config.Bucket) + { + ss.SubjectTransforms = + [ + new SubjectTransform + { + Src = $"$KV.{sourceBucketName}.>", + Dest = $"$KV.{config.Bucket}.>", + } + ]; + } + + sources.Add(ss); + } + + subjects = [$"$KV.{config.Bucket}.>"]; + mirror = default; + mirrorDirect = false; + } + else + { + subjects = [$"$KV.{config.Bucket}.>"]; + mirror = default; + sources = default; + mirrorDirect = false; + } + var streamConfig = new StreamConfig { Name = BucketToStream(config.Bucket), @@ -221,10 +278,9 @@ private static StreamConfig CreateStreamConfig(NatsKVConfig config) AllowDirect = true, NumReplicas = replicas, Discard = StreamConfigDiscard.New, - - // TODO: KV mirrors - // MirrorDirect = - // Mirror = + Mirror = mirror, + MirrorDirect = mirrorDirect, + Sources = sources, Retention = StreamConfigRetention.Limits, // from ADR-8 }; diff --git a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs index 01c973cf9..8504f04ac 100644 --- a/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs +++ b/tests/NATS.Client.KeyValueStore.Tests/KeyValueStoreTest.cs @@ -648,4 +648,51 @@ public async Task TestDirectMessageRepublishedSubject() Assert.Equal(publishSubject3, kve3.Key); Assert.Equal("tres", kve3.Value); } + + [SkipIfNatsServer(versionEarlierThan: "2.10")] + public async Task Test_CombinedSources() + { + await using var server = NatsServer.StartJS(); + await using var nats = server.CreateClientConnection(); + + var js = new NatsJSContext(nats); + var kv = new NatsKVContext(js); + + var storeSource1 = await kv.CreateStoreAsync("source1"); + var storeSource2 = await kv.CreateStoreAsync("source2"); + + var storeCombined = await kv.CreateStoreAsync(new NatsKVConfig("combined") + { + Sources = [ + new StreamSource { Name = "source1" }, + new StreamSource { Name = "source2" } + ], + }); + + await storeSource1.PutAsync("ss1_a", "a_fromStore1"); + await storeSource2.PutAsync("ss2_b", "b_fromStore2"); + + await Retry.Until( + "async replication is completed", + async () => + { + try + { + await storeCombined.GetEntryAsync("ss1_a"); + await storeCombined.GetEntryAsync("ss2_b"); + } + catch (NatsKVKeyNotFoundException) + { + return false; + } + + return true; + }); + + var entryA = await storeCombined.GetEntryAsync("ss1_a"); + var entryB = await storeCombined.GetEntryAsync("ss2_b"); + + Assert.Equal("a_fromStore1", entryA.Value); + Assert.Equal("b_fromStore2", entryB.Value); + } } diff --git a/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs b/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs index 0270ae9f9..9f63aeada 100644 --- a/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs +++ b/tests/NATS.Client.KeyValueStore.Tests/NatsKVContextFactoryTest.cs @@ -50,6 +50,8 @@ public class MockJsContext : INatsJSContext { public INatsConnection Connection { get; } = new NatsConnection(); + public NatsJSOpts Opts { get; } = new(new NatsOpts()); + public ValueTask CreateOrderedConsumerAsync(string stream, NatsJSOrderedConsumerOpts? opts = default, CancellationToken cancellationToken = default) => throw new NotImplementedException(); public ValueTask CreateOrUpdateConsumerAsync(string stream, ConsumerConfig config, CancellationToken cancellationToken = default) => throw new NotImplementedException(); diff --git a/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs b/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs index 2ed5bb6f8..1ac2c470b 100644 --- a/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs +++ b/tests/NATS.Client.ObjectStore.Tests/NatsObjContextFactoryTest.cs @@ -50,6 +50,8 @@ public class MockJsContext : INatsJSContext { public INatsConnection Connection { get; } = new NatsConnection(); + public NatsJSOpts Opts { get; } = new(new NatsOpts()); + public ValueTask CreateOrderedConsumerAsync(string stream, NatsJSOrderedConsumerOpts? opts = default, CancellationToken cancellationToken = default) => throw new NotImplementedException(); public ValueTask CreateOrUpdateConsumerAsync(string stream, ConsumerConfig config, CancellationToken cancellationToken = default) => throw new NotImplementedException(); diff --git a/tests/NATS.Client.Platform.Windows.Tests/NATS.Client.Platform.Windows.Tests.csproj b/tests/NATS.Client.Platform.Windows.Tests/NATS.Client.Platform.Windows.Tests.csproj index faaf762a1..8f3208efb 100644 --- a/tests/NATS.Client.Platform.Windows.Tests/NATS.Client.Platform.Windows.Tests.csproj +++ b/tests/NATS.Client.Platform.Windows.Tests/NATS.Client.Platform.Windows.Tests.csproj @@ -16,6 +16,8 @@ + + diff --git a/tests/NATS.Client.Simplified.Tests/test.runsettings b/tests/NATS.Client.Simplified.Tests/test.runsettings new file mode 100644 index 000000000..27c41ad33 --- /dev/null +++ b/tests/NATS.Client.Simplified.Tests/test.runsettings @@ -0,0 +1,7 @@ + + + + 1 + 300000 + +