diff --git a/src/NATS.Client/KeyValue/IKeyValue.cs b/src/NATS.Client/KeyValue/IKeyValue.cs index 47bb66534..9c2cfb632 100644 --- a/src/NATS.Client/KeyValue/IKeyValue.cs +++ b/src/NATS.Client/KeyValue/IKeyValue.cs @@ -102,6 +102,16 @@ public interface IKeyValue /// KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions); + /// + /// Watch updates for a specific key, starting from a specific revision + /// + /// the key + /// the watcher + /// the revision to start from + /// the watch options to apply. If multiple conflicting options are supplied, the last options wins. + /// + KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions); + /// /// Watch updates for all keys /// @@ -110,6 +120,15 @@ public interface IKeyValue /// The KeyValueWatchSubscription KeyValueWatchSubscription WatchAll(IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions); + /// + /// Watch updates for all keys, starting from a specific revision + /// + /// the watcher + /// the revision to start from + /// the watch options to apply. If multiple conflicting options are supplied, the last options wins. + /// The KeyValueWatchSubscription + KeyValueWatchSubscription WatchAll(IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions); + /// /// Get a list of the keys in a bucket. /// diff --git a/src/NATS.Client/KeyValue/KeyValue.cs b/src/NATS.Client/KeyValue/KeyValue.cs index 7554cfb08..2b3994b9c 100644 --- a/src/NATS.Client/KeyValue/KeyValue.cs +++ b/src/NATS.Client/KeyValue/KeyValue.cs @@ -153,13 +153,26 @@ public KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, par { Validator.ValidateKvKeyWildcardAllowedRequired(key); Validator.ValidateNotNull(watcher, "Watcher is required"); - return new KeyValueWatchSubscription(this, key, watcher, watchOptions); + return new KeyValueWatchSubscription(this, key, watcher, ConsumerConfiguration.UlongUnset, watchOptions); + } + + public KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions) + { + Validator.ValidateKvKeyWildcardAllowedRequired(key); + Validator.ValidateNotNull(watcher, "Watcher is required"); + return new KeyValueWatchSubscription(this, key, watcher, fromRevision, watchOptions); } public KeyValueWatchSubscription WatchAll(IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions) { Validator.ValidateNotNull(watcher, "Watcher is required"); - return new KeyValueWatchSubscription(this, ">", watcher, watchOptions); + return new KeyValueWatchSubscription(this, ">", watcher, ConsumerConfiguration.UlongUnset, watchOptions); + } + + public KeyValueWatchSubscription WatchAll(IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions) + { + Validator.ValidateNotNull(watcher, "Watcher is required"); + return new KeyValueWatchSubscription(this, ">", watcher, fromRevision, watchOptions); } private PublishAck _write(string key, byte[] data, MsgHeader h) { diff --git a/src/NATS.Client/KeyValue/KeyValueWatchOption.cs b/src/NATS.Client/KeyValue/KeyValueWatchOption.cs index 359757e05..788cde052 100644 --- a/src/NATS.Client/KeyValue/KeyValueWatchOption.cs +++ b/src/NATS.Client/KeyValue/KeyValueWatchOption.cs @@ -22,7 +22,7 @@ public enum KeyValueWatchOption IgnoreDelete, /// - /// Only get meta data, skip value when retrieving data from the server. + /// Only get metadata, skip value when retrieving data from the server. /// MetaOnly, diff --git a/src/NATS.Client/KeyValue/KeyValueWatchSubscription.cs b/src/NATS.Client/KeyValue/KeyValueWatchSubscription.cs index 82a8d25a9..2ca87edd2 100644 --- a/src/NATS.Client/KeyValue/KeyValueWatchSubscription.cs +++ b/src/NATS.Client/KeyValue/KeyValueWatchSubscription.cs @@ -24,7 +24,7 @@ public class KeyValueWatchSubscription : IDisposable private readonly object subLock; public KeyValueWatchSubscription(KeyValue kv, string keyPattern, - IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions) + IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions) { subLock = new object(); string subscribeSubject = kv.ReadSubject(keyPattern); @@ -44,14 +44,23 @@ public KeyValueWatchSubscription(KeyValue kv, string keyPattern, } } - if (deliverPolicy == DeliverPolicy.New || kv._getLast(subscribeSubject) == null) + if (fromRevision > 0) { - endOfDataSent = new InterlockedBoolean(true); - watcher.EndOfData(); + deliverPolicy = DeliverPolicy.ByStartSequence; + endOfDataSent = new InterlockedBoolean(); } else { - endOfDataSent = new InterlockedBoolean(false); + fromRevision = ConsumerConfiguration.UlongUnset; // easier on the builder since we aren't starting at a fromRevision + if (deliverPolicy == DeliverPolicy.New || kv._getLast(subscribeSubject) == null) + { + endOfDataSent = new InterlockedBoolean(true); + watcher.EndOfData(); + } + else + { + endOfDataSent = new InterlockedBoolean(); + } } PushSubscribeOptions pso = PushSubscribeOptions.Builder() @@ -61,12 +70,13 @@ public KeyValueWatchSubscription(KeyValue kv, string keyPattern, ConsumerConfiguration.Builder() .WithAckPolicy(AckPolicy.None) .WithDeliverPolicy(deliverPolicy) + .WithStartSequence(fromRevision) .WithHeadersOnly(headersOnly) .WithFilterSubject(subscribeSubject) .Build()) .Build(); - EventHandler handler = (sender, args) => + void Handler(object sender, MsgHandlerEventArgs args) { KeyValueEntry kve = new KeyValueEntry(args.Message); if (includeDeletes || kve.Operation.Equals(KeyValueOperation.Put)) @@ -79,9 +89,9 @@ public KeyValueWatchSubscription(KeyValue kv, string keyPattern, endOfDataSent.Set(true); watcher.EndOfData(); } - }; + } - sub = kv.js.PushSubscribeAsync(subscribeSubject, handler, false, pso); + sub = kv.js.PushSubscribeAsync(subscribeSubject, Handler, false, pso); if (endOfDataSent.IsFalse()) { ulong pending = sub.GetConsumerInformation().CalculatedPending; diff --git a/src/Tests/IntegrationTests/TestKeyValue.cs b/src/Tests/IntegrationTests/TestKeyValue.cs index 41dce3c6f..ca21499f3 100644 --- a/src/Tests/IntegrationTests/TestKeyValue.cs +++ b/src/Tests/IntegrationTests/TestKeyValue.cs @@ -738,20 +738,24 @@ public void TestWatch() string key1 = "key.1"; string key2 = "key.2"; - Object[] key1AllExpecteds = new Object[] { + object[] key1AllExpecteds = { "a", "aa", KeyValueOperation.Delete, "aaa", KeyValueOperation.Delete, KeyValueOperation.Purge }; + + object[] key1FromRevisionExpecteds = { + "aa", KeyValueOperation.Delete, "aaa" + }; - Object[] noExpecteds = new Object[0]; - Object[] purgeOnlyExpecteds = { KeyValueOperation.Purge }; + object[] noExpecteds = Array.Empty(); + object[] purgeOnlyExpecteds = { KeyValueOperation.Purge }; - Object[] key2AllExpecteds = { + object[] key2AllExpecteds = { "z", "zz", KeyValueOperation.Delete, "zzz" }; - Object[] key2AfterExpecteds = { "zzz" }; + object[] key2AfterExpecteds = { "zzz" }; - Object[] allExpecteds = new object[] { + object[] allExpecteds = { "a", "aa", "z", "zz", KeyValueOperation.Delete, KeyValueOperation.Delete, "aaa", "zzz", @@ -759,7 +763,13 @@ public void TestWatch() null }; - Object[] allPutsExpecteds = { + object[] allFromRevisionExpecteds = { + "aa", "z", "zz", + KeyValueOperation.Delete, KeyValueOperation.Delete, + "aaa", "zzz" + }; + + object[] allPutsExpecteds = { "a", "aa", "z", "zz", "aaa", "zzz", null }; @@ -768,14 +778,23 @@ public void TestWatch() // get the kv management context IKeyValueManagement kvm = c.CreateKeyValueManagementContext(); - // create the bucket + string bucket1 = Bucket(1); + string bucket2 = Bucket(2); + // create the buckets kvm.Create(KeyValueConfiguration.Builder() - .WithName(BUCKET) + .WithName(bucket1) .WithMaxHistoryPerKey(10) .WithStorageType(StorageType.Memory) .Build()); - IKeyValue kv = c.CreateKeyValueContext(BUCKET); + kvm.Create(KeyValueConfiguration.Builder() + .WithName(bucket2) + .WithMaxHistoryPerKey(10) + .WithStorageType(StorageType.Memory) + .Build()); + + IKeyValue kv = c.CreateKeyValueContext(bucket1); + IKeyValue kv2 = c.CreateKeyValueContext(bucket2); TestKeyValueWatcher key1FullWatcher = new TestKeyValueWatcher(true); TestKeyValueWatcher key1MetaWatcher = new TestKeyValueWatcher(true, KeyValueWatchOption.MetaOnly); @@ -819,7 +838,16 @@ public void TestWatch() kv.Delete(key1); kv.Purge(key1); kv.Put(keyNull, (byte[])null); - + + kv2.Put(key1, "a"); + kv2.Put(key1, "aa"); + kv2.Put(key2, "z"); + kv2.Put(key2, "zz"); + kv2.Delete(key1); + kv2.Delete(key2); + kv2.Put(key1, "aaa"); + kv2.Put(key2, "zzz"); + Thread.Sleep(100); // give time for all the data to be setup TestKeyValueWatcher key1AfterWatcher = new TestKeyValueWatcher(false, KeyValueWatchOption.MetaOnly); @@ -830,6 +858,9 @@ public void TestWatch() TestKeyValueWatcher key2AfterStartNewWatcher = new TestKeyValueWatcher(false, KeyValueWatchOption.MetaOnly, KeyValueWatchOption.UpdatesOnly); TestKeyValueWatcher key2AfterStartFirstWatcher = new TestKeyValueWatcher(false, KeyValueWatchOption.MetaOnly, KeyValueWatchOption.IncludeHistory); + TestKeyValueWatcher key1FromRevisionAfterWatcher = new TestKeyValueWatcher(false); + TestKeyValueWatcher allFromRevisionAfterWatcher = new TestKeyValueWatcher(false); + subs.Add(kv.Watch(key1, key1AfterWatcher, key1AfterWatcher.WatchOptions)); subs.Add(kv.Watch(key1, key1AfterIgDelWatcher, key1AfterIgDelWatcher.WatchOptions)); subs.Add(kv.Watch(key1, key1AfterStartNewWatcher, key1AfterStartNewWatcher.WatchOptions)); @@ -838,6 +869,9 @@ public void TestWatch() subs.Add(kv.Watch(key2, key2AfterStartNewWatcher, key2AfterStartNewWatcher.WatchOptions)); subs.Add(kv.Watch(key2, key2AfterStartFirstWatcher, key2AfterStartFirstWatcher.WatchOptions)); + subs.Add(kv2.Watch(key1, key1FromRevisionAfterWatcher, 2, key1FromRevisionAfterWatcher.WatchOptions)); + subs.Add(kv2.WatchAll(allFromRevisionAfterWatcher, 2, allFromRevisionAfterWatcher.WatchOptions)); + Thread.Sleep(2000); // give time for the watches to get messages // unsubscribe so the watchers don't get any more messages @@ -876,6 +910,9 @@ public void TestWatch() ValidateWatcher(key2AfterExpecteds, key2AfterWatcher); ValidateWatcher(noExpecteds, key2AfterStartNewWatcher); ValidateWatcher(key2AllExpecteds, key2AfterStartFirstWatcher); + + ValidateWatcher(key1FromRevisionExpecteds, key1FromRevisionAfterWatcher); + ValidateWatcher(allFromRevisionExpecteds, allFromRevisionAfterWatcher); }); } @@ -899,7 +936,7 @@ private void ValidateWatcher(object[] expectedKves, TestKeyValueWatcher watcher) Assert.True(lastRevision < kve.Revision); lastRevision = kve.Revision; - Object expected = expectedKves[aix++]; + object expected = expectedKves[aix++]; if (expected == null) { Assert.Equal(KeyValueOperation.Put, kve.Operation); Assert.True(kve.Value == null || kve.Value.Length == 0); @@ -1024,7 +1061,7 @@ public void TestWithAccount() AssertKvAccountKeys(kv_connA_bucketI.Keys(), Key(21), Key(22)); AssertKvAccountKeys(kv_connI_bucketI.Keys(), Key(21), Key(22)); - Object[] expecteds = { + object[] expecteds = { Data(0), Data(1), KeyValueOperation.Delete, KeyValueOperation.Purge, Data(2), Data(0), Data(1), KeyValueOperation.Delete, KeyValueOperation.Purge, Data(2) }; @@ -1074,7 +1111,7 @@ private void AssertKveAccount(IKeyValue kvWorker, string key, IKeyValue kvUserA, AssertKveAccountGet(kvUserA, kvUserI, key, Data(2)); } - private void AssertKveAccountHistory(IList history, params Object[] expecteds) { + private void AssertKveAccountHistory(IList history, params object[] expecteds) { Assert.Equal(expecteds.Length, history.Count); for (int x = 0; x < expecteds.Length; x++) { if (expecteds[x] is string expected) { @@ -1243,7 +1280,7 @@ private void _testMirror(IKeyValue okv, IKeyValue mkv, int num) { { Thread.Sleep(200); // give the messages time to propagate } - ValidateWatcher(new Object[]{"bb0", "aaa" + num, KeyValueOperation.Delete}, mWatcher); + ValidateWatcher(new object[]{"bb0", "aaa" + num, KeyValueOperation.Delete}, mWatcher); // Does the origin data match? if (okv != null) { @@ -1252,7 +1289,7 @@ private void _testMirror(IKeyValue okv, IKeyValue mkv, int num) { { Thread.Sleep(200); // give the messages time to propagate } - ValidateWatcher(new Object[]{"bb0", "aaa" + num, KeyValueOperation.Delete}, oWatcher); + ValidateWatcher(new object[]{"bb0", "aaa" + num, KeyValueOperation.Delete}, oWatcher); } } @@ -1299,7 +1336,7 @@ public void TestDontGetNoResponders() } } - class TestKeyValueWatcher : IKeyValueWatcher + class TestKeyValueWatcher : IKeyValueWatcher { public IList Entries = new List(); public KeyValueWatchOption[] WatchOptions;