Skip to content

Commit

Permalink
watcher: Added WithCreatedNotify Option for receiving created event
Browse files Browse the repository at this point in the history
This adds WatchOption to receive watch created event

Fixes #1186

Signed-off-by: Giridharan Ramasamy <[email protected]>
  • Loading branch information
0x01F4 authored and lburgazzoli committed Jul 8, 2023
1 parent ec48303 commit 151c1d7
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 3 deletions.
3 changes: 3 additions & 0 deletions jetcd-core/src/main/java/io/etcd/jetcd/impl/WatchImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ private void onNext(WatchResponse response) {

revision = Math.max(revision, response.getHeader().getRevision());
id = response.getWatchId();
if (option.isCreatedNotify()) {
listener.onNext(new io.etcd.jetcd.watch.WatchResponse(response));
}
} else if (response.getCanceled()) {

//
Expand Down
30 changes: 27 additions & 3 deletions jetcd-core/src/main/java/io/etcd/jetcd/options/WatchOption.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,19 @@ public final class WatchOption {
private final long revision;
private final boolean prevKV;
private final boolean progressNotify;
private final boolean createdNotify;
private final boolean noPut;
private final boolean noDelete;
private final boolean requireLeader;
private final boolean prefix;

private WatchOption(ByteSequence endKey, long revision, boolean prevKV, boolean progressNotify, boolean noPut,
boolean noDelete, boolean requireLeader, boolean prefix) {
private WatchOption(ByteSequence endKey, long revision, boolean prevKV, boolean progressNotify, boolean createdNotify,
boolean noPut, boolean noDelete, boolean requireLeader, boolean prefix) {
this.endKey = endKey;
this.revision = revision;
this.prevKV = prevKV;
this.progressNotify = progressNotify;
this.createdNotify = createdNotify;
this.noPut = noPut;
this.noDelete = noDelete;
this.requireLeader = requireLeader;
Expand Down Expand Up @@ -80,6 +82,15 @@ public boolean isProgressNotify() {
return progressNotify;
}

/**
* Whether watcher server send watch create event.
*
* @return if true, watcher server should send watch create event.
*/
public boolean isCreatedNotify() {
return createdNotify;
}

/**
* Whether filter put event in server side.
*
Expand Down Expand Up @@ -137,6 +148,7 @@ public static final class Builder {
private ByteSequence endKey;
private boolean prevKV = false;
private boolean progressNotify = false;
private boolean createNotify = false;
private boolean noPut = false;
private boolean noDelete = false;
private boolean requireLeader = false;
Expand Down Expand Up @@ -208,6 +220,17 @@ public Builder withProgressNotify(boolean progressNotify) {
return this;
}

/**
* When createNotify is set, the watch server sends event when watch is created.
*
* @param createNotify configure the watcher to receive watch create event.
* @return builder
*/
public Builder withCreateNotify(boolean createNotify) {
this.createNotify = createNotify;
return this;
}

/**
* filter out put event in server side.
*
Expand Down Expand Up @@ -272,7 +295,8 @@ public Builder withRequireLeader(boolean requireLeader) {
}

public WatchOption build() {
return new WatchOption(endKey, revision, prevKV, progressNotify, noPut, noDelete, requireLeader, prefix);
return new WatchOption(endKey, revision, prevKV, progressNotify, createNotify, noPut, noDelete, requireLeader,
prefix);
}

}
Expand Down
19 changes: 19 additions & 0 deletions jetcd-core/src/main/java/io/etcd/jetcd/watch/WatchResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,23 @@ public static boolean isProgressNotify(io.etcd.jetcd.api.WatchResponse response)
return response.getEventsCount() == 0 && !response.getCreated() && !response.getCanceled()
&& response.getCompactRevision() == 0 && response.getHeader().getRevision() != 0;
}

/**
* Returns true if the WatchResponse is created notification.
*
* @return true if the WatchResponse is created notification.
*/
public boolean isCreatedNotify() {
return isCreatedNotify(getResponse());
}

/**
* Returns true if the WatchResponse is created notification.
*
* @param response the response.
* @return true if the WatchResponse is created notification.
*/
public static boolean isCreatedNotify(io.etcd.jetcd.api.WatchResponse response) {
return response.getCreated() && !response.getCanceled();
}
}
15 changes: 15 additions & 0 deletions jetcd-core/src/test/java/io/etcd/jetcd/impl/WatchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -336,4 +336,19 @@ public void testCancelledWatchGetsClosed(final Client client) throws Exception {
assertThat(completed.get()).isEqualTo(Boolean.TRUE);
}
}

@ParameterizedTest
@MethodSource("parameters")
public void testWatchWithCreatedNotify(final Client client) throws Exception {

final ByteSequence key = randomByteSequence();
final WatchOption options = WatchOption.builder().withCreateNotify(true).build();
final AtomicReference<WatchResponse> ref = new AtomicReference<>();

try (Watcher watcher = client.getWatchClient().watch(key, options, ref::set)) {
await().atMost(TIME_OUT_SECONDS, TimeUnit.SECONDS).untilAsserted(() -> assertThat(ref.get()).isNotNull());
assertThat(ref.get().getEvents().size()).isEqualTo(0);
assertThat(ref.get().isCreatedNotify()).isEqualTo(Boolean.TRUE);
}
}
}

0 comments on commit 151c1d7

Please sign in to comment.