diff --git a/src/main/java/com/coreos/jetcd/EtcdClient.java b/src/main/java/com/coreos/jetcd/EtcdClient.java index 7cc315479..850017aa6 100644 --- a/src/main/java/com/coreos/jetcd/EtcdClient.java +++ b/src/main/java/com/coreos/jetcd/EtcdClient.java @@ -34,6 +34,7 @@ public class EtcdClient { private final Supplier maintenanceClient; private final Supplier clusterClient; private final Supplier leaseClient; + private final Supplier watchClient; public EtcdClient(EtcdClientBuilder builder) throws ConnectException, AuthFailedException { this(Optional.empty(), builder); @@ -62,6 +63,7 @@ private EtcdClient(Optional> channelBuilder, EtcdClient this.maintenanceClient = Suppliers.memoize(() -> new EtcdMaintenanceImpl(channel, token)); this.clusterClient = Suppliers.memoize(() -> new EtcdClusterImpl(channel, token)); this.leaseClient = Suppliers.memoize(() -> new EtcdLeaseImpl(channel, token)); + this.watchClient = Suppliers.memoize(() -> new EtcdWatchImpl(channel, token)); } // ************************ @@ -88,6 +90,10 @@ public EtcdLease getLeaseClient() { return this.leaseClient.get(); } + public EtcdWatch getWatchClient() { + return this.watchClient.get(); + } + public void close() { channel.shutdownNow(); } diff --git a/src/main/java/com/coreos/jetcd/EtcdUtil.java b/src/main/java/com/coreos/jetcd/EtcdUtil.java new file mode 100644 index 000000000..101e7a865 --- /dev/null +++ b/src/main/java/com/coreos/jetcd/EtcdUtil.java @@ -0,0 +1,80 @@ +package com.coreos.jetcd; + +import com.coreos.jetcd.api.Event; +import com.coreos.jetcd.api.ResponseHeader; +import com.coreos.jetcd.data.ByteSequence; +import com.coreos.jetcd.data.EtcdHeader; +import com.coreos.jetcd.data.KeyValue; +import com.coreos.jetcd.watch.WatchEvent; +import com.google.protobuf.ByteString; + +import java.util.ArrayList; +import java.util.List; + +/** + * This util is to convert api class to client class. + */ +class EtcdUtil { + + private EtcdUtil() { + } + + /** + * convert ByteSequence to ByteString + */ + protected static ByteString byteStringFromByteSequence(ByteSequence byteSequence) { + return ByteString.copyFrom(byteSequence.getBytes()); + } + + /** + * convert ByteString to ByteSequence + * @return + */ + protected static ByteSequence byteSequceFromByteString(ByteString byteString) { + return ByteSequence.fromBytes(byteString.toByteArray()); + } + + /** + * convert API KeyValue to etcd client KeyValue + */ + protected static KeyValue apiToClientKV(com.coreos.jetcd.api.KeyValue keyValue) { + return new KeyValue( + byteSequceFromByteString(keyValue.getKey()), + byteSequceFromByteString(keyValue.getValue()), + keyValue.getCreateRevision(), + keyValue.getModRevision(), + keyValue.getVersion(), + keyValue.getLease()); + } + + /** + * convert API watch event to etcd client event + */ + protected static WatchEvent apiToClientEvent(Event event) { + WatchEvent.EventType eventType = WatchEvent.EventType.UNRECOGNIZED; + switch (event.getType()) { + case DELETE: + eventType = WatchEvent.EventType.DELETE; + break; + case PUT: + eventType = WatchEvent.EventType.PUT; + break; + } + return new WatchEvent(apiToClientKV(event.getKv()), apiToClientKV(event.getPrevKv()), eventType); + } + + protected static List apiToClientEvents(List events) { + List watchEvents = new ArrayList<>(); + for (Event event : events) { + watchEvents.add(apiToClientEvent(event)); + } + return watchEvents; + } + + /** + * convert API response header to self defined header + */ + protected static EtcdHeader apiToClientHeader(ResponseHeader header, long compactRevision) { + return new EtcdHeader(header.getClusterId(), header.getMemberId(), header.getRevision(), header.getRaftTerm(), compactRevision); + } +} diff --git a/src/main/java/com/coreos/jetcd/EtcdWatch.java b/src/main/java/com/coreos/jetcd/EtcdWatch.java new file mode 100644 index 000000000..3a71078d4 --- /dev/null +++ b/src/main/java/com/coreos/jetcd/EtcdWatch.java @@ -0,0 +1,74 @@ +package com.coreos.jetcd; + +import com.coreos.jetcd.data.ByteSequence; +import com.coreos.jetcd.data.EtcdHeader; +import com.coreos.jetcd.options.WatchOption; +import com.coreos.jetcd.watch.WatchEvent; + +import java.io.Closeable; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Interface of watch client + */ +public interface EtcdWatch { + + + /** + * Watch watches on a key or prefix. The watched events will be called by onWatch. + * If the watch is slow or the required rev is compacted, the watch request + * might be canceled from the server-side and the onCreateFailed will be called. + * + * @param key the key subscribe + * @param watchOption key option + * @param callback call back + * @return ListenableFuture watcher + */ + CompletableFuture watch(ByteSequence key, WatchOption watchOption, WatchCallback callback); + + interface Watcher extends Closeable { + + /** + * get watcher id + * + * @return id + */ + long getWatchID(); + + long getLastRevision(); + + ByteSequence getKey(); + + boolean isResuming(); + + /** + * get the watch option + * + * @return watch option + */ + WatchOption getWatchOption(); + + /** + * cancel the watcher + * + * @return cancel result + */ + CompletableFuture cancel(); + } + + interface WatchCallback { + + /** + * onWatch will be called when watcher receive any events + * + * @param events received events + */ + void onWatch(EtcdHeader header, List events); + + /** + * onResuming will be called when the watcher is on resuming. + */ + void onResuming(); + } +} diff --git a/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java b/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java new file mode 100644 index 000000000..7cb6886c7 --- /dev/null +++ b/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java @@ -0,0 +1,417 @@ +package com.coreos.jetcd; + +import com.coreos.jetcd.api.*; +import com.coreos.jetcd.data.ByteSequence; +import com.coreos.jetcd.options.WatchOption; +import com.coreos.jetcd.watch.WatchCreateException; +import com.google.protobuf.ByteString; +import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; +import javafx.util.Pair; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.*; + +import static com.coreos.jetcd.EtcdUtil.apiToClientEvents; +import static com.coreos.jetcd.EtcdUtil.apiToClientHeader; + +/** + * etcd watcher Implementation + */ +public class EtcdWatchImpl implements EtcdWatch { + + private volatile StreamObserver requestStream; + + private ConcurrentHashMap watchers = new ConcurrentHashMap<>(); + + private final WatchGrpc.WatchStub watchStub; + + private ConcurrentLinkedQueue>> pendingCreateWatchers = new ConcurrentLinkedQueue<>(); + private Map> pendingCancelFutures = new ConcurrentHashMap<>(); + + public EtcdWatchImpl(ManagedChannel channel, Optional token) { + this.watchStub = EtcdClientUtil.configureStub(WatchGrpc.newStub(channel), token); + } + + /** + * Watch watches on a key or prefix. The watched events will be called by onWatch. + * If the watch is slow or the required rev is compacted, the watch request + * might be canceled from the server-side and the onCreateFailed will be called. + * + * @param key the key subscribe + * @param watchOption key option + * @param callback call back + * @return CompletableFuture watcher + */ + @Override + public CompletableFuture watch(ByteSequence key, WatchOption watchOption, WatchCallback callback) { + WatchRequest request = optionToWatchCreateRequest(EtcdUtil.byteStringFromByteSequence(key), watchOption); + WatcherImpl watcher = new WatcherImpl(key, watchOption, callback); + CompletableFuture waitFuture = new CompletableFuture(); + this.pendingCreateWatchers.add(new Pair<>(watcher, waitFuture)); + getRequestStream().onNext(request); + return waitFuture; + } + + /** + * Cancel the watch task with the watcher, the onCanceled will be called after successfully canceled. + * + * @param id the watcher to be canceled + */ + protected CompletableFuture cancelWatch(long id) { + WatcherImpl temp = watchers.get(id); + CompletableFuture completableFuture = null; + if (temp != null) { + synchronized (temp) { + if (this.watchers.containsKey(temp.getWatchID())) { + this.watchers.remove(temp.getWatchID()); + completableFuture = new CompletableFuture<>(); + this.pendingCancelFutures.put(id, completableFuture); + } + } + } + + WatchCancelRequest cancelRequest = WatchCancelRequest.newBuilder().setWatchId(id).build(); + WatchRequest request = WatchRequest.newBuilder().setCancelRequest(cancelRequest).build(); + getRequestStream().onNext(request); + return completableFuture; + } + + /** + * empty the old request stream, watchers and resume the old watchers + * empty the pendingCancelFutures as there is no need to cancel, the old request stream has been dead + */ + private synchronized void resume() { + this.requestStream = null; + WatcherImpl[] resumeWatchers = (WatcherImpl[]) watchers.values().toArray(); + this.watchers.clear(); + for (CompletableFuture watcherCompletableFuture : pendingCancelFutures.values()) { + watcherCompletableFuture.complete(Boolean.TRUE); + } + this.pendingCancelFutures.clear(); + resumeWatchers(resumeWatchers); + } + + /** + * single instance method to get request stream, empty the old requestStream, so we will get a new + * requestStream automatically + *

the responseStream will distribute the create, cancel, normal response to + * processCreate, processCanceled and processEvents + *

if error happened, the requestStream will be closed by server side, so we call resume to resume + * all ongoing watchers + * + * @return + */ + private StreamObserver getRequestStream() { + if (this.requestStream == null) { + synchronized (this) { + if (this.requestStream == null) { + StreamObserver watchResponseStreamObserver = new StreamObserver() { + @Override + public void onNext(WatchResponse watchResponse) { + if (watchResponse.getCreated()) { + processCreate(watchResponse); + } else if (watchResponse.getCanceled()) { + processCanceled(watchResponse); + } else { + processEvents(watchResponse); + } + } + + @Override + public void onError(Throwable throwable) { + resume(); + } + + @Override + public void onCompleted() { + + } + }; + this.requestStream = this.watchStub.watch(watchResponseStreamObserver); + } + } + } + return this.requestStream; + } + + /** + * Process create response from etcd server + *

If there is no pendingWatcher, ignore. + *

If cancel flag is true or CompactRevision not equal zero means the start revision + * has been compacted out of the store, call onCreateFailed. + *

If watchID = -1, complete future with WatchCreateException. + *

If everything is Ok, create watcher, complete CompletableFuture task and put the new watcher + * to the watchers map. + * + * @param response + */ + private void processCreate(WatchResponse response) { + Pair> requestPair = pendingCreateWatchers.poll(); + WatcherImpl watcher = requestPair.getKey(); + if (response.getCreated()) { + if (response.getCanceled() || response.getCompactRevision() != 0) { + watcher.setCanceled(true); + requestPair.getValue().completeExceptionally(new WatchCreateException("the start revision has been compacted", apiToClientHeader(response.getHeader(), response.getCompactRevision()))); + ; + } + + if (response.getWatchId() == -1 && watcher.callback != null) { + requestPair.getValue().completeExceptionally(new WatchCreateException("create watcher failed", apiToClientHeader(response.getHeader(), response.getCompactRevision()))); + } else { + this.watchers.put(watcher.getWatchID(), watcher); + watcher.setWatchID(response.getWatchId()); + requestPair.getValue().complete(watcher); + } + + //note the header revision so that put following a current watcher disconnect will arrive + //on watcher channel after reconnect + synchronized (watcher) { + watcher.setLastRevision(response.getHeader().getRevision()); + if (watcher.isResuming()) { + watcher.setResuming(false); + } + } + } + } + + /** + * Process subscribe watch events + *

If the watch id is not in the watchers map, scan it in the pendingCancelFutures map + * if exist, ignore, otherwise cancel it. + *

If the watcher exist, call the onWatch and set the last revision for resume + * + * @param watchResponse + */ + private void processEvents(WatchResponse watchResponse) { + WatcherImpl watcher = watchers.get(watchResponse.getWatchId()); + if (watcher != null) { + synchronized (watcher) { + if (watchResponse.getEventsCount() != 0) { + List events = watchResponse.getEventsList(); + // if on resume process, filter processed events + if (watcher.isResuming()) { + long lastRevision = watcher.getLastRevision(); + events.removeIf((e) -> e.getKv().getModRevision() <= lastRevision); + } + watcher.setLastRevision( + watchResponse + .getEvents(watchResponse.getEventsCount() - 1) + .getKv().getModRevision()); + + if (watcher.callback != null) { + watcher.callback.onWatch(apiToClientHeader(watchResponse.getHeader(), watchResponse.getCompactRevision()), apiToClientEvents(events)); + } + } else { + watcher.setLastRevision(watchResponse.getHeader().getRevision()); + } + } + } else { + // if the watcher is not canceling, cancel it. + CompletableFuture completableFuture = this.pendingCancelFutures.get(watchResponse.getWatchId()); + if (this.pendingCancelFutures.putIfAbsent(watcher.getWatchID(), completableFuture) == null) { + cancelWatch(watchResponse.getWatchId()); + } + } + } + + /** + * resume all the watchers + * + * @param watchers + */ + private void resumeWatchers(WatcherImpl[] watchers) { + for (WatcherImpl watcher : watchers) { + if (watcher.callback != null) { + watcher.callback.onResuming(); + } + watch(watcher.getKey(), getResumeWatchOptionWithWatcher(watcher), watcher.callback); + } + } + + /** + * Process cancel response from etcd server, + * + * @param response + */ + private void processCanceled(WatchResponse response) { + CompletableFuture cancelFuture = this.pendingCancelFutures.remove(response.getWatchId()); + cancelFuture.complete(Boolean.TRUE); + } + + /** + * convert WatcherOption to WatchRequest + * + * @param key + * @param option + * @return + */ + private WatchRequest optionToWatchCreateRequest(ByteString key, WatchOption option) { + WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder() + .setKey(key) + .setPrevKv(option.isPrevKV()) + .setProgressNotify(option.isProgressNotify()) + .setStartRevision(option.getRevision()); + + if (option.getEndKey().isPresent()) { + builder.setRangeEnd(EtcdUtil.byteStringFromByteSequence(option.getEndKey().get())); + } + + if (option.isNoDelete()) { + builder.addFilters(WatchCreateRequest.FilterType.NODELETE); + } + + if (option.isNoPut()) { + builder.addFilters(WatchCreateRequest.FilterType.NOPUT); + } + + return WatchRequest.newBuilder().setCreateRequest(builder).build(); + } + + /** + * build new WatchOption from dead to resume it in new requestStream + * + * @param watcher + * @return + */ + private WatchOption getResumeWatchOptionWithWatcher(Watcher watcher) { + WatchOption oldOption = watcher.getWatchOption(); + return WatchOption.newBuilder().withNoDelete(oldOption.isNoDelete()) + .withNoPut(oldOption.isNoPut()) + .withPrevKV(oldOption.isPrevKV()) + .withProgressNotify(oldOption.isProgressNotify()) + .withRange(oldOption.getEndKey().get()) + .withRevision(watcher.getLastRevision() + 1) + .withResuming(true) + .build(); + } + + + /** + * Watcher class hold watcher information. + */ + public class WatcherImpl implements Watcher { + + private final WatchOption watchOption; + private final ByteSequence key; + + public final WatchCallback callback; + private long watchID; + + private long lastRevision = -1; + private boolean canceled = false; + + private boolean resuming; + + private WatcherImpl(ByteSequence key, WatchOption watchOption, WatchCallback callback) { + this.key = key; + this.watchOption = watchOption; + this.callback = callback; + this.resuming = watchOption.isResuming(); + } + + @Override + public CompletableFuture cancel() { + return cancelWatch(watchID); + } + + /** + * set the last revision watcher received, used for resume + * + * @param lastRevision the last revision + */ + private void setLastRevision(long lastRevision) { + this.lastRevision = lastRevision; + } + + public boolean isCanceled() { + return canceled; + } + + private void setCanceled(boolean canceled) { + this.canceled = canceled; + } + + /** + * get the watch id of the watcher + * + * @return + */ + public long getWatchID() { + return watchID; + } + + private void setWatchID(long watchID) { + this.watchID = watchID; + } + + public WatchOption getWatchOption() { + return watchOption; + } + + /** + * get the last revision watcher received + * + * @return last revision + */ + public long getLastRevision() { + return lastRevision; + } + + /** + * get the watcher key + * + * @return watcher key + */ + public ByteSequence getKey() { + return key; + } + + /** + * whether the watcher is resuming. + */ + public boolean isResuming() { + return resuming; + } + + private void setResuming(boolean resuming) { + this.resuming = resuming; + } + + /** + * Closes this stream and releases any system resources associated + * with it. If the stream is already closed then invoking this + * method has no effect. + *

+ *

As noted in {@link AutoCloseable#close()}, cases where the + * close may fail require careful attention. It is strongly advised + * to relinquish the underlying resources and to internally + * mark the {@code Closeable} as closed, prior to throwing + * the {@code IOException}. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + if (!isCanceled()) { + try { + if (!cancel().get(5, TimeUnit.SECONDS)) { + + } + } catch (InterruptedException e) { + throw new IOException("Close was interrupted.", e); + } catch (ExecutionException e) { + throw new IOException("Exception during execute.", e); + } catch (TimeoutException e) { + throw new IOException("Close out of time.", e); + } finally { + setCanceled(true); + } + } + } + } + +} diff --git a/src/main/java/com/coreos/jetcd/data/ByteSequence.java b/src/main/java/com/coreos/jetcd/data/ByteSequence.java new file mode 100644 index 000000000..ff6c8ea19 --- /dev/null +++ b/src/main/java/com/coreos/jetcd/data/ByteSequence.java @@ -0,0 +1,110 @@ +package com.coreos.jetcd.data; + +import com.google.protobuf.ByteString; + +import java.io.UnsupportedEncodingException; +import java.nio.CharBuffer; +import java.nio.charset.Charset; + +/** + * Etcd binary bytes, easy to convert between byte[], String and ByteString. + */ +public class ByteSequence { + + private final int hashVal; + private final ByteString byteString; + + + public ByteSequence(byte[] source) { + hashVal = calcHashCore(source); + byteString = toByteString(source); + } + + protected ByteSequence(ByteString byteString) { + this(byteString.toByteArray()); + } + + public ByteSequence(String string) { + this(string.getBytes()); + } + + public ByteSequence(CharBuffer charBuffer) { + this(String.valueOf(charBuffer.array())); + } + + public ByteSequence(CharSequence charSequence) { + this(java.nio.CharBuffer.wrap(charSequence)); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (obj instanceof ByteSequence) { + ByteSequence other = (ByteSequence) obj; + if (other.hashCode() != hashCode()) return false; + return byteString.equals(other.byteString); + } else { + return false; + } + } + + protected ByteString getByteString() { + return this.byteString; + } + + private ByteString toByteString(byte[] bytes) { + return ByteString.copyFrom(bytes); + } + + private int calcHashCore(byte[] bytes) { + int result = 0; + for (int i = 0; i < bytes.length; ++i) { + result = 31 * result + bytes[i]; + } + return result; + } + + @Override + public int hashCode() { + return hashVal; + } + + public String toStringUtf8() { + return byteString.toStringUtf8(); + } + + public String toString(Charset charset) { + return byteString.toString(charset); + } + + public String toString(String charsetName) throws UnsupportedEncodingException { + return byteString.toString(charsetName); + } + + public byte[] getBytes() { + return byteString.toByteArray(); + } + + public static ByteSequence fromString(String string) { + return new ByteSequence(string); + } + + public static ByteSequence fromCharSequence(CharSequence charSequence) { + return new ByteSequence(charSequence); + } + + public static ByteSequence fromCharBuffer(CharBuffer charBuffer) { + return new ByteSequence(charBuffer); + } + + protected static ByteSequence fromByteString(ByteString byteString) { + return new ByteSequence(byteString); + } + + public static ByteSequence fromBytes(byte[] bytes) { + return new ByteSequence(bytes); + } + +} diff --git a/src/main/java/com/coreos/jetcd/data/EtcdHeader.java b/src/main/java/com/coreos/jetcd/data/EtcdHeader.java new file mode 100644 index 000000000..4eaeaa75c --- /dev/null +++ b/src/main/java/com/coreos/jetcd/data/EtcdHeader.java @@ -0,0 +1,40 @@ +package com.coreos.jetcd.data; + +/** + * Etcd message header information. + */ +public class EtcdHeader { + private final long clusterId; + private final long memberId; + private final long revision; + private final long raftTerm; + private final long compactRevision; + + public EtcdHeader(long clusterId, long memberId, long revision, long raftTerm, long compactRevision) { + this.clusterId = clusterId; + this.memberId = memberId; + this.revision = revision; + this.raftTerm = raftTerm; + this.compactRevision = compactRevision; + } + + public long getClusterId() { + return clusterId; + } + + public long getMemberId() { + return memberId; + } + + public long getRevision() { + return revision; + } + + public long getRaftTerm() { + return raftTerm; + } + + public long getCompactRevision() { + return compactRevision; + } +} diff --git a/src/main/java/com/coreos/jetcd/data/KeyValue.java b/src/main/java/com/coreos/jetcd/data/KeyValue.java new file mode 100644 index 000000000..de4ded15d --- /dev/null +++ b/src/main/java/com/coreos/jetcd/data/KeyValue.java @@ -0,0 +1,47 @@ +package com.coreos.jetcd.data; + +/** + * Etcd key value pair. + */ +public class KeyValue { + + private final ByteSequence key; + private final ByteSequence value; + private long createRevision = 0L; + private long modRevision = 0L; + private long version = 0L; + private long lease = 0L; + + public KeyValue(ByteSequence key, ByteSequence value, long createRevision, long modRevision, long version, long lease) { + this.key = key; + this.value = value; + this.createRevision = createRevision; + this.modRevision = modRevision; + this.version = version; + this.lease = lease; + } + + public ByteSequence getKey() { + return key; + } + + public ByteSequence getValue() { + return value; + } + + public long getCreateRevision() { + return createRevision; + } + + public long getModRevision() { + return modRevision; + } + + public long getVersion() { + return version; + } + + public long getLease() { + return lease; + } +} diff --git a/src/main/java/com/coreos/jetcd/options/WatchOption.java b/src/main/java/com/coreos/jetcd/options/WatchOption.java new file mode 100644 index 000000000..184eba4c7 --- /dev/null +++ b/src/main/java/com/coreos/jetcd/options/WatchOption.java @@ -0,0 +1,194 @@ +package com.coreos.jetcd.options; + +import com.coreos.jetcd.data.ByteSequence; + +import java.util.Optional; + +/** + * The option for watch operation. + */ +public final class WatchOption { + + public static final WatchOption DEFAULT = newBuilder().build(); + + /** + * Create a builder to construct option for watch operation. + * + * @return builder + */ + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + + private long revision = 0L; + private Optional endKey = Optional.empty(); + private boolean prevKV = false; + private boolean progressNotify = false; + private boolean noPut = false; + private boolean noDelete = false; + private boolean resuming = false; + + private Builder() { + } + + /** + * Provide the revision to use for the get request. + *

If the revision is less or equal to zero, the get is over the newest key-value store. + *

If the revision has been compacted, ErrCompacted is returned as a response. + * + * @param revision the revision to get. + * @return builder + */ + public Builder withRevision(long revision) { + this.revision = revision; + return this; + } + + /** + * Set the end key of the get request. If it is set, the + * get request will return the keys from key to endKey (exclusive). + *

If end key is '\0', the range is all keys >= key. + *

If the end key is one bit larger than the given key, then it gets all keys with the prefix (the given key). + *

If both key and end key are '\0', it returns all keys. + * + * @param endKey end key + * @return builder + */ + public Builder withRange(ByteSequence endKey) { + this.endKey = Optional.ofNullable(endKey); + return this; + } + + /** + * When prevKV is set, created watcher gets the previous KV before the event happens, + * if the previous KV is not compacted. + * + * @return builder + */ + public Builder withPrevKV(boolean prevKV) { + this.prevKV = prevKV; + return this; + } + + /** + * When progressNotify is set, the watch server send periodic progress updates. + * Progress updates have zero events in WatchResponse + * + * @return builder + */ + public Builder withProgressNotify(boolean progressNotify) { + this.progressNotify = progressNotify; + return this; + } + + /** + * filter out put event in server side + * + * @param noPut + * @return + */ + public Builder withNoPut(boolean noPut) { + this.noPut = noPut; + return this; + } + + public Builder withResuming(boolean resuming){ + this.resuming = resuming; + return this; + } + /** + * filter out delete event in server side + * + * @param noDelete + * @return + */ + public Builder withNoDelete(boolean noDelete) { + this.noDelete = noDelete; + return this; + } + + public WatchOption build() { + return new WatchOption( + endKey, + revision, + prevKV, + progressNotify, + noPut, + noDelete, + resuming); + } + + } + + private final Optional endKey; + private final long revision; + private final boolean prevKV; + private final boolean progressNotify; + private final boolean noPut; + private final boolean noDelete; + private final boolean resuming; + + private WatchOption(Optional endKey, + long revision, + boolean prevKV, + boolean progressNotify, + boolean noPut, + boolean noDelete, + boolean resuming) { + this.endKey = endKey; + this.revision = revision; + this.prevKV = prevKV; + this.progressNotify = progressNotify; + this.noPut = noPut; + this.noDelete = noDelete; + this.resuming = resuming; + } + + public Optional getEndKey() { + return this.endKey; + } + + public long getRevision() { + return revision; + } + + /** + * Whether created watcher gets the previous KV before the event happens. + */ + public boolean isPrevKV() { + return prevKV; + } + + /** + * Whether watcher server send periodic progress updates. + * + * @return if true, watcher server should send periodic progress updates. + */ + public boolean isProgressNotify() { + return progressNotify; + } + + /** + * Whether filter put event in server side + * + * @return if true, filter put event in server side + */ + public boolean isNoPut() { + return noPut; + } + + /** + * Whether filter delete event in server side + * + * @return if true, filter delete event in server side + */ + public boolean isNoDelete() { + return noDelete; + } + + public boolean isResuming(){ + return resuming; + } +} diff --git a/src/main/java/com/coreos/jetcd/watch/WatchCreateException.java b/src/main/java/com/coreos/jetcd/watch/WatchCreateException.java new file mode 100644 index 000000000..ccc3e342c --- /dev/null +++ b/src/main/java/com/coreos/jetcd/watch/WatchCreateException.java @@ -0,0 +1,16 @@ +package com.coreos.jetcd.watch; + +import com.coreos.jetcd.data.EtcdHeader; + +/** + * Exception thrown when create watcher failed. + */ +public class WatchCreateException extends Exception { + + public final EtcdHeader header; + + public WatchCreateException(String cause, EtcdHeader header) { + super(cause); + this.header = header; + } +} diff --git a/src/main/java/com/coreos/jetcd/watch/WatchEvent.java b/src/main/java/com/coreos/jetcd/watch/WatchEvent.java new file mode 100644 index 000000000..11d88cda6 --- /dev/null +++ b/src/main/java/com/coreos/jetcd/watch/WatchEvent.java @@ -0,0 +1,39 @@ +package com.coreos.jetcd.watch; + +import com.coreos.jetcd.data.KeyValue; + +/** + * Watch event, return by watch, contain put, delete event. + */ +public class WatchEvent { + + public enum EventType { + PUT, + DELETE, + UNRECOGNIZED, + } + + private final KeyValue keyValue; + + private final KeyValue prevKV; + + private final EventType eventType; + + public WatchEvent(KeyValue keyValue, KeyValue prevKV, EventType eventType) { + this.keyValue = keyValue; + this.prevKV = prevKV; + this.eventType = eventType; + } + + public KeyValue getKeyValue() { + return keyValue; + } + + public KeyValue getPrevKV() { + return prevKV; + } + + public EventType getEventType() { + return eventType; + } +} diff --git a/src/test/java/com/coreos/jetcd/EtcdWatchTest.java b/src/test/java/com/coreos/jetcd/EtcdWatchTest.java new file mode 100644 index 000000000..2eabbd1cf --- /dev/null +++ b/src/test/java/com/coreos/jetcd/EtcdWatchTest.java @@ -0,0 +1,100 @@ +package com.coreos.jetcd; + +import com.coreos.jetcd.data.ByteSequence; +import com.coreos.jetcd.data.EtcdHeader; +import com.coreos.jetcd.exception.AuthFailedException; +import com.coreos.jetcd.exception.ConnectException; +import com.coreos.jetcd.options.WatchOption; +import com.coreos.jetcd.watch.WatchEvent; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; +import org.testng.asserts.Assertion; + +import java.util.List; +import java.util.concurrent.*; + +/** + * watch test case. + */ +public class EtcdWatchTest { + + private EtcdClient client; + private EtcdWatch watchClient; + private EtcdKV kvClient; + private BlockingQueue eventsQueue = new LinkedBlockingDeque<>(); + + private ByteSequence key = ByteSequence.fromString("test_key"); + private ByteSequence value = ByteSequence.fromString("test_val"); + private EtcdWatchImpl.Watcher watcher; + + private Assertion test = new Assertion(); + + @BeforeTest + public void newEtcdClient() throws AuthFailedException, ConnectException { + client = EtcdClientBuilder.newBuilder().endpoints("localhost:2379").build(); + watchClient = client.getWatchClient(); + kvClient = client.getKVClient(); + } + + @Test + public void testWatch() throws ExecutionException, InterruptedException { + WatchOption option = WatchOption.DEFAULT; + watcher = watchClient.watch(key, option, new EtcdWatch.WatchCallback() { + + /** + * onWatch will be called when watcher receive any events + * + * @param header + * @param events received events + */ + @Override + public void onWatch(EtcdHeader header, List events) { + EtcdWatchTest.this.eventsQueue.addAll(events); + } + + /** + * onResuming will be called when the watcher is on resuming. + */ + @Override + public void onResuming() { + + } + + }).get(); + + } + + /** + * watch put operation on key + * assert whether receive put event + */ + @Test(dependsOnMethods = "testWatch") + public void testWatchPut() throws InterruptedException { + kvClient.put(EtcdUtil.byteStringFromByteSequence(key), EtcdUtil.byteStringFromByteSequence(value)); + WatchEvent event = eventsQueue.poll(5, TimeUnit.SECONDS); + test.assertEquals(event.getKeyValue().getKey(), key); + test.assertEquals(event.getEventType(), WatchEvent.EventType.PUT); + } + + /** + * watch delete operation on key + * assert whether receive delete event + */ + @Test(dependsOnMethods = "testWatchPut") + public void testWatchDelete() throws InterruptedException { + kvClient.delete(EtcdUtil.byteStringFromByteSequence(key)); + WatchEvent event = eventsQueue.poll(5, TimeUnit.SECONDS); + test.assertEquals(event.getKeyValue().getKey(), key); + test.assertEquals(event.getEventType(), WatchEvent.EventType.DELETE); + } + + /** + * cancel watch test case + * assert whether receive cancel response + */ + @Test(dependsOnMethods = "testWatchDelete") + public void testCancelWatch() throws ExecutionException, InterruptedException, TimeoutException { + CompletableFuture future = watcher.cancel(); + test.assertTrue(future.get(5, TimeUnit.SECONDS)); + } +}