Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KV Watch From Revision #866

Merged
merged 1 commit into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/NATS.Client/KeyValue/IKeyValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ public interface IKeyValue
/// <returns></returns>
KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions);

/// <summary>
/// Watch updates for a specific key, starting from a specific revision
/// </summary>
/// <param name="key">the key</param>
/// <param name="watcher">the watcher</param>
/// <param name="fromRevision">the revision to start from</param>
/// <param name="watchOptions">the watch options to apply. If multiple conflicting options are supplied, the last options wins.</param>
/// <returns></returns>
KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions);

/// <summary>
/// Watch updates for all keys
/// </summary>
Expand All @@ -110,6 +120,15 @@ public interface IKeyValue
/// <returns>The KeyValueWatchSubscription</returns>
KeyValueWatchSubscription WatchAll(IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions);

/// <summary>
/// Watch updates for all keys, starting from a specific revision
/// </summary>
/// <param name="watcher">the watcher</param>
/// <param name="fromRevision">the revision to start from</param>
/// <param name="watchOptions">the watch options to apply. If multiple conflicting options are supplied, the last options wins.</param>
/// <returns>The KeyValueWatchSubscription</returns>
KeyValueWatchSubscription WatchAll(IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions);

/// <summary>
/// Get a list of the keys in a bucket.
/// </summary>
Expand Down
17 changes: 15 additions & 2 deletions src/NATS.Client/KeyValue/KeyValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,26 @@ public KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, par
{
Validator.ValidateKvKeyWildcardAllowedRequired(key);
Validator.ValidateNotNull(watcher, "Watcher is required");
return new KeyValueWatchSubscription(this, key, watcher, watchOptions);
return new KeyValueWatchSubscription(this, key, watcher, ConsumerConfiguration.UlongUnset, watchOptions);
}

public KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions)
{
Validator.ValidateKvKeyWildcardAllowedRequired(key);
Validator.ValidateNotNull(watcher, "Watcher is required");
return new KeyValueWatchSubscription(this, key, watcher, fromRevision, watchOptions);
}

public KeyValueWatchSubscription WatchAll(IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions)
{
Validator.ValidateNotNull(watcher, "Watcher is required");
return new KeyValueWatchSubscription(this, ">", watcher, watchOptions);
return new KeyValueWatchSubscription(this, ">", watcher, ConsumerConfiguration.UlongUnset, watchOptions);
}

public KeyValueWatchSubscription WatchAll(IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions)
{
Validator.ValidateNotNull(watcher, "Watcher is required");
return new KeyValueWatchSubscription(this, ">", watcher, fromRevision, watchOptions);
}

private PublishAck _write(string key, byte[] data, MsgHeader h) {
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client/KeyValue/KeyValueWatchOption.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public enum KeyValueWatchOption
IgnoreDelete,

/// <summary>
/// Only get meta data, skip value when retrieving data from the server.
/// Only get metadata, skip value when retrieving data from the server.
/// </summary>
MetaOnly,

Expand Down
26 changes: 18 additions & 8 deletions src/NATS.Client/KeyValue/KeyValueWatchSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class KeyValueWatchSubscription : IDisposable
private readonly object subLock;

public KeyValueWatchSubscription(KeyValue kv, string keyPattern,
IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions)
IKeyValueWatcher watcher, ulong fromRevision, params KeyValueWatchOption[] watchOptions)
{
subLock = new object();
string subscribeSubject = kv.ReadSubject(keyPattern);
Expand All @@ -44,14 +44,23 @@ public KeyValueWatchSubscription(KeyValue kv, string keyPattern,
}
}

if (deliverPolicy == DeliverPolicy.New || kv._getLast(subscribeSubject) == null)
if (fromRevision > 0)
{
endOfDataSent = new InterlockedBoolean(true);
watcher.EndOfData();
deliverPolicy = DeliverPolicy.ByStartSequence;
endOfDataSent = new InterlockedBoolean();
}
else
{
endOfDataSent = new InterlockedBoolean(false);
fromRevision = ConsumerConfiguration.UlongUnset; // easier on the builder since we aren't starting at a fromRevision
if (deliverPolicy == DeliverPolicy.New || kv._getLast(subscribeSubject) == null)
{
endOfDataSent = new InterlockedBoolean(true);
watcher.EndOfData();
}
else
{
endOfDataSent = new InterlockedBoolean();
}
}

PushSubscribeOptions pso = PushSubscribeOptions.Builder()
Expand All @@ -61,12 +70,13 @@ public KeyValueWatchSubscription(KeyValue kv, string keyPattern,
ConsumerConfiguration.Builder()
.WithAckPolicy(AckPolicy.None)
.WithDeliverPolicy(deliverPolicy)
.WithStartSequence(fromRevision)
.WithHeadersOnly(headersOnly)
.WithFilterSubject(subscribeSubject)
.Build())
.Build();

EventHandler<MsgHandlerEventArgs> handler = (sender, args) =>
void Handler(object sender, MsgHandlerEventArgs args)
{
KeyValueEntry kve = new KeyValueEntry(args.Message);
if (includeDeletes || kve.Operation.Equals(KeyValueOperation.Put))
Expand All @@ -79,9 +89,9 @@ public KeyValueWatchSubscription(KeyValue kv, string keyPattern,
endOfDataSent.Set(true);
watcher.EndOfData();
}
};
}

sub = kv.js.PushSubscribeAsync(subscribeSubject, handler, false, pso);
sub = kv.js.PushSubscribeAsync(subscribeSubject, Handler, false, pso);
if (endOfDataSent.IsFalse())
{
ulong pending = sub.GetConsumerInformation().CalculatedPending;
Expand Down
71 changes: 54 additions & 17 deletions src/Tests/IntegrationTests/TestKeyValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -738,28 +738,38 @@ public void TestWatch()
string key1 = "key.1";
string key2 = "key.2";

Object[] key1AllExpecteds = new Object[] {
object[] key1AllExpecteds = {
"a", "aa", KeyValueOperation.Delete, "aaa", KeyValueOperation.Delete, KeyValueOperation.Purge
};

object[] key1FromRevisionExpecteds = {
"aa", KeyValueOperation.Delete, "aaa"
};

Object[] noExpecteds = new Object[0];
Object[] purgeOnlyExpecteds = { KeyValueOperation.Purge };
object[] noExpecteds = Array.Empty<object>();
object[] purgeOnlyExpecteds = { KeyValueOperation.Purge };

Object[] key2AllExpecteds = {
object[] key2AllExpecteds = {
"z", "zz", KeyValueOperation.Delete, "zzz"
};

Object[] key2AfterExpecteds = { "zzz" };
object[] key2AfterExpecteds = { "zzz" };

Object[] allExpecteds = new object[] {
object[] allExpecteds = {
"a", "aa", "z", "zz",
KeyValueOperation.Delete, KeyValueOperation.Delete,
"aaa", "zzz",
KeyValueOperation.Delete, KeyValueOperation.Purge,
null
};

Object[] allPutsExpecteds = {
object[] allFromRevisionExpecteds = {
"aa", "z", "zz",
KeyValueOperation.Delete, KeyValueOperation.Delete,
"aaa", "zzz"
};

object[] allPutsExpecteds = {
"a", "aa", "z", "zz", "aaa", "zzz", null
};

Expand All @@ -768,14 +778,23 @@ public void TestWatch()
// get the kv management context
IKeyValueManagement kvm = c.CreateKeyValueManagementContext();

// create the bucket
string bucket1 = Bucket(1);
string bucket2 = Bucket(2);
// create the buckets
kvm.Create(KeyValueConfiguration.Builder()
.WithName(BUCKET)
.WithName(bucket1)
.WithMaxHistoryPerKey(10)
.WithStorageType(StorageType.Memory)
.Build());

IKeyValue kv = c.CreateKeyValueContext(BUCKET);
kvm.Create(KeyValueConfiguration.Builder()
.WithName(bucket2)
.WithMaxHistoryPerKey(10)
.WithStorageType(StorageType.Memory)
.Build());

IKeyValue kv = c.CreateKeyValueContext(bucket1);
IKeyValue kv2 = c.CreateKeyValueContext(bucket2);

TestKeyValueWatcher key1FullWatcher = new TestKeyValueWatcher(true);
TestKeyValueWatcher key1MetaWatcher = new TestKeyValueWatcher(true, KeyValueWatchOption.MetaOnly);
Expand Down Expand Up @@ -819,7 +838,16 @@ public void TestWatch()
kv.Delete(key1);
kv.Purge(key1);
kv.Put(keyNull, (byte[])null);


kv2.Put(key1, "a");
kv2.Put(key1, "aa");
kv2.Put(key2, "z");
kv2.Put(key2, "zz");
kv2.Delete(key1);
kv2.Delete(key2);
kv2.Put(key1, "aaa");
kv2.Put(key2, "zzz");

Thread.Sleep(100); // give time for all the data to be setup

TestKeyValueWatcher key1AfterWatcher = new TestKeyValueWatcher(false, KeyValueWatchOption.MetaOnly);
Expand All @@ -830,6 +858,9 @@ public void TestWatch()
TestKeyValueWatcher key2AfterStartNewWatcher = new TestKeyValueWatcher(false, KeyValueWatchOption.MetaOnly, KeyValueWatchOption.UpdatesOnly);
TestKeyValueWatcher key2AfterStartFirstWatcher = new TestKeyValueWatcher(false, KeyValueWatchOption.MetaOnly, KeyValueWatchOption.IncludeHistory);

TestKeyValueWatcher key1FromRevisionAfterWatcher = new TestKeyValueWatcher(false);
TestKeyValueWatcher allFromRevisionAfterWatcher = new TestKeyValueWatcher(false);

subs.Add(kv.Watch(key1, key1AfterWatcher, key1AfterWatcher.WatchOptions));
subs.Add(kv.Watch(key1, key1AfterIgDelWatcher, key1AfterIgDelWatcher.WatchOptions));
subs.Add(kv.Watch(key1, key1AfterStartNewWatcher, key1AfterStartNewWatcher.WatchOptions));
Expand All @@ -838,6 +869,9 @@ public void TestWatch()
subs.Add(kv.Watch(key2, key2AfterStartNewWatcher, key2AfterStartNewWatcher.WatchOptions));
subs.Add(kv.Watch(key2, key2AfterStartFirstWatcher, key2AfterStartFirstWatcher.WatchOptions));

subs.Add(kv2.Watch(key1, key1FromRevisionAfterWatcher, 2, key1FromRevisionAfterWatcher.WatchOptions));
subs.Add(kv2.WatchAll(allFromRevisionAfterWatcher, 2, allFromRevisionAfterWatcher.WatchOptions));

Thread.Sleep(2000); // give time for the watches to get messages

// unsubscribe so the watchers don't get any more messages
Expand Down Expand Up @@ -876,6 +910,9 @@ public void TestWatch()
ValidateWatcher(key2AfterExpecteds, key2AfterWatcher);
ValidateWatcher(noExpecteds, key2AfterStartNewWatcher);
ValidateWatcher(key2AllExpecteds, key2AfterStartFirstWatcher);

ValidateWatcher(key1FromRevisionExpecteds, key1FromRevisionAfterWatcher);
ValidateWatcher(allFromRevisionExpecteds, allFromRevisionAfterWatcher);
});
}

Expand All @@ -899,7 +936,7 @@ private void ValidateWatcher(object[] expectedKves, TestKeyValueWatcher watcher)
Assert.True(lastRevision < kve.Revision);
lastRevision = kve.Revision;

Object expected = expectedKves[aix++];
object expected = expectedKves[aix++];
if (expected == null) {
Assert.Equal(KeyValueOperation.Put, kve.Operation);
Assert.True(kve.Value == null || kve.Value.Length == 0);
Expand Down Expand Up @@ -1024,7 +1061,7 @@ public void TestWithAccount()
AssertKvAccountKeys(kv_connA_bucketI.Keys(), Key(21), Key(22));
AssertKvAccountKeys(kv_connI_bucketI.Keys(), Key(21), Key(22));

Object[] expecteds = {
object[] expecteds = {
Data(0), Data(1), KeyValueOperation.Delete, KeyValueOperation.Purge, Data(2),
Data(0), Data(1), KeyValueOperation.Delete, KeyValueOperation.Purge, Data(2)
};
Expand Down Expand Up @@ -1074,7 +1111,7 @@ private void AssertKveAccount(IKeyValue kvWorker, string key, IKeyValue kvUserA,
AssertKveAccountGet(kvUserA, kvUserI, key, Data(2));
}

private void AssertKveAccountHistory(IList<KeyValueEntry> history, params Object[] expecteds) {
private void AssertKveAccountHistory(IList<KeyValueEntry> history, params object[] expecteds) {
Assert.Equal(expecteds.Length, history.Count);
for (int x = 0; x < expecteds.Length; x++) {
if (expecteds[x] is string expected) {
Expand Down Expand Up @@ -1243,7 +1280,7 @@ private void _testMirror(IKeyValue okv, IKeyValue mkv, int num) {
{
Thread.Sleep(200); // give the messages time to propagate
}
ValidateWatcher(new Object[]{"bb0", "aaa" + num, KeyValueOperation.Delete}, mWatcher);
ValidateWatcher(new object[]{"bb0", "aaa" + num, KeyValueOperation.Delete}, mWatcher);

// Does the origin data match?
if (okv != null) {
Expand All @@ -1252,7 +1289,7 @@ private void _testMirror(IKeyValue okv, IKeyValue mkv, int num) {
{
Thread.Sleep(200); // give the messages time to propagate
}
ValidateWatcher(new Object[]{"bb0", "aaa" + num, KeyValueOperation.Delete}, oWatcher);
ValidateWatcher(new object[]{"bb0", "aaa" + num, KeyValueOperation.Delete}, oWatcher);
}
}

Expand Down Expand Up @@ -1299,7 +1336,7 @@ public void TestDontGetNoResponders()
}
}

class TestKeyValueWatcher : IKeyValueWatcher
class TestKeyValueWatcher : IKeyValueWatcher
{
public IList<KeyValueEntry> Entries = new List<KeyValueEntry>();
public KeyValueWatchOption[] WatchOptions;
Expand Down
Loading