From c5b17e816007e2b80dbca177f6874c639cd3fee4 Mon Sep 17 00:00:00 2001 From: StupidHod Date: Tue, 30 Aug 2016 15:51:09 +0800 Subject: [PATCH 1/8] add etcd watch implementation --- src/main/java/com/coreos/jetcd/EtcdWatch.java | 32 ++ .../java/com/coreos/jetcd/EtcdWatchImpl.java | 294 ++++++++++++++++++ .../com/coreos/jetcd/options/WatchOption.java | 197 ++++++++++++ .../jetcd/util/ListenableSetFuture.java | 183 +++++++++++ .../java/com/coreos/jetcd/watch/Watcher.java | 169 ++++++++++ .../java/com/coreos/jetcd/EtcdWatchTest.java | 105 +++++++ 6 files changed, 980 insertions(+) create mode 100644 src/main/java/com/coreos/jetcd/EtcdWatch.java create mode 100644 src/main/java/com/coreos/jetcd/EtcdWatchImpl.java create mode 100644 src/main/java/com/coreos/jetcd/options/WatchOption.java create mode 100644 src/main/java/com/coreos/jetcd/util/ListenableSetFuture.java create mode 100644 src/main/java/com/coreos/jetcd/watch/Watcher.java create mode 100644 src/test/java/com/coreos/jetcd/EtcdWatchTest.java diff --git a/src/main/java/com/coreos/jetcd/EtcdWatch.java b/src/main/java/com/coreos/jetcd/EtcdWatch.java new file mode 100644 index 000000000..8f923b5b9 --- /dev/null +++ b/src/main/java/com/coreos/jetcd/EtcdWatch.java @@ -0,0 +1,32 @@ +package com.coreos.jetcd; + +import com.coreos.jetcd.options.WatchOption; +import com.coreos.jetcd.util.ListenableSetFuture; +import com.coreos.jetcd.watch.Watcher; +import com.google.protobuf.ByteString; + +/** + * Interface of watch client + */ +public interface EtcdWatch { + + + /** + * Watch watches on a key or prefix. The watched events will be called by onWatch. + * If the watch is slow or the required rev is compacted, the watch request + * might be canceled from the server-side and the onCreateFailed will be called. + * + * @param key the key subscribe + * @param watchOption key option + * @param callback call back + * @return ListenableFuture watcher + */ + ListenableSetFuture watch(ByteString key, WatchOption watchOption, Watcher.WatchCallback callback); + + /** + * Cancel the watch task with the watcher, the onCanceled will be called after successfully canceled. + * + * @param watcher the watcher to be canceled + */ + void cancelWatch(Watcher watcher); +} diff --git a/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java b/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java new file mode 100644 index 000000000..dea2ceafa --- /dev/null +++ b/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java @@ -0,0 +1,294 @@ +package com.coreos.jetcd; + +import com.coreos.jetcd.api.*; +import com.coreos.jetcd.options.WatchOption; +import com.coreos.jetcd.util.ListenableSetFuture; +import com.coreos.jetcd.watch.Watcher; +import com.google.protobuf.ByteString; +import io.grpc.stub.StreamObserver; +import javafx.util.Pair; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * etcd watcher Implementation + */ +public class EtcdWatchImpl implements EtcdWatch { + + private StreamObserver requestStream; + + private ConcurrentHashMap watchers = new ConcurrentHashMap<>(); + + private WatchGrpc.WatchStub watchStub; + + private ConcurrentLinkedQueue>> pendingWatchers = new ConcurrentLinkedQueue<>(); + private Map cancelWatchers = new ConcurrentHashMap<>(); + + public EtcdWatchImpl(WatchGrpc.WatchStub watchStub) { + this.watchStub = watchStub; + } + + /** + * Watch watches on a key or prefix. The watched events will be called by onWatch. + * If the watch is slow or the required rev is compacted, the watch request + * might be canceled from the server-side and the onCreateFailed will be called. + * + * @param key the key subscribe + * @param watchOption key option + * @param callback call back + * @return ListenableFuture watcher + */ + @Override + public synchronized ListenableSetFuture watch(ByteString key, WatchOption watchOption, Watcher.WatchCallback callback) { + WatchRequest request = optionToWatchCreateRequest(key, watchOption); + Watcher.Builder builder = Watcher.newBuilder().withCallBack(callback) + .withKey(key) + .withWatchOption(watchOption); + ListenableSetFuture waitFuture = new ListenableSetFuture<>(null); + this.pendingWatchers.add(new Pair<>(builder, waitFuture)); + getRequestStream().onNext(request); + return waitFuture; + } + + /** + * Cancel the watch task with the watcher, the onCanceled will be called after successfully canceled. + * + * @param watcher the watcher to be canceled + */ + @Override + public void cancelWatch(Watcher watcher) { + Watcher temp = watchers.get(watcher.getWatchID()); + if (temp != null) { + synchronized (temp) { + if (this.watchers.containsKey(temp.getWatchID())) { + this.watchers.remove(temp.getWatchID()); + this.cancelWatchers.put(temp.getWatchID(), temp); + WatchCancelRequest cancelRequest = WatchCancelRequest.newBuilder().setWatchId(watcher.getWatchID()).build(); + WatchRequest request = WatchRequest.newBuilder().setCancelRequest(cancelRequest).build(); + this.requestStream.onNext(request); + } + } + } + } + + /** + * empty the old request stream, watchers and resume the old watchers + * empty the cancelWatchers as there is no need to cancel, the old request stream has been dead + */ + private void resume() { + synchronized (this) { + this.requestStream = null; + Watcher[] resumeWatchers = (Watcher[]) watchers.values().toArray(); + this.watchers.clear(); + this.cancelWatchers.clear(); + resumeWatchers(resumeWatchers); + } + + } + + /** + * single instance method to get request stream, empty the old requestStream, so we will get a new + * requestStream automatically + *

the responseStream will distribute the create, cancel, normal response to + * processCreate, processCanceled and processEvents + *

if error happened, the requestStream will be closed by server side, so we call resume to resume + * all ongoing watchers + * + * @return + */ + private StreamObserver getRequestStream() { + if (this.requestStream == null) { + synchronized (this) { + if (this.requestStream == null) { + StreamObserver watchResponseStreamObserver = new StreamObserver() { + @Override + public void onNext(WatchResponse watchResponse) { + if (watchResponse.getCreated()) { + processCreate(watchResponse); + } else if (watchResponse.getCanceled()) { + processCanceled(watchResponse); + } else { + processEvents(watchResponse); + } + } + + @Override + public void onError(Throwable throwable) { + resume(); + } + + @Override + public void onCompleted() { + + } + }; + this.requestStream = this.watchStub.watch(watchResponseStreamObserver); + } + } + } + return this.requestStream; + } + + + /** + * Process create response from etcd server + *

If there is no pendingWatcher, ignore. + *

If cancel flag is true or CompactRevision not equal zero means the start revision + * has been compacted out of the store, call onCreateFailed. + *

If watchID = -1, create failed, call onCreateFailed. + *

If everything is Ok, create watcher, complete ListenableFuture task and put the new watcher + * to the watchers map. + * + * @param response + */ + private void processCreate(WatchResponse response) { + Pair> requestPair = pendingWatchers.poll(); + Watcher.Builder builder = requestPair.getKey(); + if (response.getCreated()) { + if (response.getCanceled() || response.getCompactRevision() != 0) { + builder.withCanceled(true); + Watcher watcher = builder.build(); + requestPair.getValue().setResult(watcher); + } + + builder.withWatchID(response.getWatchId()); + Watcher watcher = builder.build(); + requestPair.getValue().setResult(watcher); + + if (response.getWatchId() == -1 && watcher.callback != null) { + watcher.callback.onCreateFailed(response); + } else { + this.watchers.put(watcher.getWatchID(), watcher); + } + + //note the header revision so that put following a current watcher disconnect will arrive + //on watcher channel after reconnect + synchronized (watcher) { + watcher.setLastRevision(response.getHeader().getRevision()); + if(watcher.isResuming()){ + watcher.setResuming(false); + } + } + } + } + + /** + * Process subscribe watch events + *

If the watch id is not in the watchers map, scan it in the cancelWatchers map + * if exist, ignore, otherwise cancel it. + *

If the watcher exist, call the onWatch and set the last revision for resume + * + * @param watchResponse + */ + private void processEvents(WatchResponse watchResponse) { + Watcher watcher = watchers.get(watchResponse.getWatchId()); + if (watcher != null) { + synchronized (watcher) { + if (watchResponse.getEventsCount() != 0) { + List events = watchResponse.getEventsList(); + // if on resume process, filter processed events + if (watcher.isResuming()) { + long lastRevision = watcher.getLastRevision(); + events.removeIf((e) -> e.getKv().getModRevision() <= lastRevision); + } + watcher.setLastRevision( + watchResponse + .getEvents(watchResponse.getEventsCount() - 1) + .getKv().getModRevision()); + + if (watcher.callback != null) { + watcher.callback.onWatch(events); + } + } else { + watcher.setLastRevision(watchResponse.getHeader().getRevision()); + } + } + } else { + watcher = this.cancelWatchers.get(watchResponse.getWatchId()); + if (this.cancelWatchers.putIfAbsent(watcher.getWatchID(), watcher) == null) { + cancelWatch(watcher); + } + } + } + + /** + * resume all the watchers + * + * @param watchers + */ + private void resumeWatchers(Watcher[] watchers) { + for (Watcher watcher : watchers) { + if(watcher.callback!=null){ + watcher.callback.onResuming(); + } + watch(watcher.getKey(), getResumeWatchOptionWithWatcher(watcher), watcher.callback); + } + } + + /** + * Process cancel response from etcd server, + * + * @param response + */ + private void processCanceled(WatchResponse response) { + Watcher watcher = this.cancelWatchers.remove(response.getWatchId()); + if (watcher != null && watcher.callback != null) { + if (watcher.callback != null) { + watcher.setCanceled(true); + watcher.callback.onCanceled(response); + } + } + } + + /** + * convert WatcherOption to WatchRequest + * + * @param key + * @param option + * @return + */ + private WatchRequest optionToWatchCreateRequest(ByteString key, WatchOption option) { + WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder() + .setKey(key) + .setPrevKv(option.isPrevKV()) + .setProgressNotify(option.isProgressNotify()) + .setStartRevision(option.getRevision()); + + if (option.getEndKey().isPresent()) { + builder.setRangeEnd(option.getEndKey().get()); + } + + if (option.isNoDelete()) { + builder.addFilters(WatchCreateRequest.FilterType.NODELETE); + } + + if (option.isNoPut()) { + builder.addFilters(WatchCreateRequest.FilterType.NOPUT); + } + + return WatchRequest.newBuilder().setCreateRequest(builder).build(); + } + + /** + * build new WatchOption from dead to resume it in new requestStream + * + * @param watcher + * @return + */ + private WatchOption getResumeWatchOptionWithWatcher(Watcher watcher) { + WatchOption oldOption = watcher.getWatchOption(); + return WatchOption.newBuilder().withNoDelete(oldOption.isNoDelete()) + .withNoPut(oldOption.isNoPut()) + .withPrevKV(oldOption.isPrevKV()) + .withProgressNotify(oldOption.isProgressNotify()) + .withRange(oldOption.getEndKey().get()) + .withRevision(watcher.getLastRevision()+1) + .withResuming(true) + .build(); + } + + +} diff --git a/src/main/java/com/coreos/jetcd/options/WatchOption.java b/src/main/java/com/coreos/jetcd/options/WatchOption.java new file mode 100644 index 000000000..46c7bd594 --- /dev/null +++ b/src/main/java/com/coreos/jetcd/options/WatchOption.java @@ -0,0 +1,197 @@ +package com.coreos.jetcd.options; + +import com.coreos.jetcd.api.WatchCreateRequest; +import com.google.protobuf.ByteString; + +import java.util.List; +import java.util.Optional; +import java.util.Set; + +/** + * The option for watch operation. + */ +public final class WatchOption { + + public static final WatchOption DEFAULT = newBuilder().build(); + + /** + * Create a builder to construct option for watch operation. + * + * @return builder + */ + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + + private long revision = 0L; + private Optional endKey = Optional.empty(); + private boolean prevKV = false; + private boolean progressNotify = false; + private boolean noPut = false; + private boolean noDelete = false; + private boolean resuming = false; + + private Builder() { + } + + /** + * Provide the revision to use for the get request. + *

If the revision is less or equal to zero, the get is over the newest key-value store. + *

If the revision has been compacted, ErrCompacted is returned as a response. + * + * @param revision the revision to get. + * @return builder + */ + public Builder withRevision(long revision) { + this.revision = revision; + return this; + } + + /** + * Set the end key of the get request. If it is set, the + * get request will return the keys from key to endKey (exclusive). + *

If end key is '\0', the range is all keys >= key. + *

If the end key is one bit larger than the given key, then it gets all keys with the prefix (the given key). + *

If both key and end key are '\0', it returns all keys. + * + * @param endKey end key + * @return builder + */ + public Builder withRange(ByteString endKey) { + this.endKey = Optional.ofNullable(endKey); + return this; + } + + /** + * When prevKV is set, created watcher gets the previous KV before the event happens, + * if the previous KV is not compacted. + * + * @return builder + */ + public Builder withPrevKV(boolean prevKV) { + this.prevKV = prevKV; + return this; + } + + /** + * When progressNotify is set, the watch server send periodic progress updates. + * Progress updates have zero events in WatchResponse + * + * @return builder + */ + public Builder withProgressNotify(boolean progressNotify) { + this.progressNotify = progressNotify; + return this; + } + + /** + * filter out put event in server side + * + * @param noPut + * @return + */ + public Builder withNoPut(boolean noPut) { + this.noPut = noPut; + return this; + } + + public Builder withResuming(boolean resuming){ + this.resuming = resuming; + return this; + } + /** + * filter out delete event in server side + * + * @param noDelete + * @return + */ + public Builder withNoDelete(boolean noDelete) { + this.noDelete = noDelete; + return this; + } + + public WatchOption build() { + return new WatchOption( + endKey, + revision, + prevKV, + progressNotify, + noPut, + noDelete, + resuming); + } + + } + + private final Optional endKey; + private final long revision; + private final boolean prevKV; + private final boolean progressNotify; + private final boolean noPut; + private final boolean noDelete; + private final boolean resuming; + + private WatchOption(Optional endKey, + long revision, + boolean prevKV, + boolean progressNotify, + boolean noPut, + boolean noDelete, + boolean resuming) { + this.endKey = endKey; + this.revision = revision; + this.prevKV = prevKV; + this.progressNotify = progressNotify; + this.noPut = noPut; + this.noDelete = noDelete; + this.resuming = resuming; + } + + public Optional getEndKey() { + return this.endKey; + } + + public long getRevision() { + return revision; + } + + /** + * Whether created watcher gets the previous KV before the event happens. + */ + public boolean isPrevKV() { + return prevKV; + } + + /** + * Whether watcher server send periodic progress updates. + * + * @return if true, watcher server should send periodic progress updates. + */ + public boolean isProgressNotify() { + return progressNotify; + } + + /** + * Whether filter put event in server side + * + * @return if true, filter put event in server side + */ + public boolean isNoPut() { + return noPut; + } + + /** + * Whether filter delete event in server side + * + * @return if true, filter delete event in server side + */ + public boolean isNoDelete() { + return noDelete; + } + + public boolean isResuming(){ + return resuming; + } +} diff --git a/src/main/java/com/coreos/jetcd/util/ListenableSetFuture.java b/src/main/java/com/coreos/jetcd/util/ListenableSetFuture.java new file mode 100644 index 000000000..dd83b9774 --- /dev/null +++ b/src/main/java/com/coreos/jetcd/util/ListenableSetFuture.java @@ -0,0 +1,183 @@ +package com.coreos.jetcd.util; + +import com.google.common.util.concurrent.ListenableFuture; +import javafx.util.Pair; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * ListenableSetFuture, the result can be set by any thread. Before the setResult method called, the get method will be blocked. + */ +public class ListenableSetFuture implements ListenableFuture { + + private volatile T result = null; + + private List> listeners = new LinkedList<>(); + + private AtomicBoolean canceled = new AtomicBoolean(false); + private AtomicBoolean done = new AtomicBoolean(false); + + private CountDownLatch countDownLatch = new CountDownLatch(1); + private final CancelCallable cancelCallable; + + public ListenableSetFuture(CancelCallable cancelCallable){ + this.cancelCallable = cancelCallable; + } + + @Override + public void addListener(Runnable runnable, Executor executor) { + listeners.add(new Pair<>(runnable, executor)); + } + + /** + * Attempts to cancel execution of this task. This attempt will + * fail if the task has already completed, has already been cancelled, + * or could not be cancelled for some other reason. If successful, + * and this task has not started when {@code cancel} is called, + * this task should never run. If the task has already started, + * then the {@code mayInterruptIfRunning} parameter determines + * whether the thread executing this task should be interrupted in + * an attempt to stop the task. + *

+ *

After this method returns, subsequent calls to {@link #isDone} will + * always return {@code true}. Subsequent calls to {@link #isCancelled} + * will always return {@code true} if this method returned {@code true}. + * + * @param mayInterruptIfRunning {@code true} if the thread executing this + * task should be interrupted; otherwise, in-progress tasks are allowed + * to complete + * @return {@code false} if the task could not be cancelled, + * typically because it has already completed normally; + * {@code true} otherwise + */ + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if(cancelCallable == null || done.get()){ + return false; + } + if(cancelCallable.cancel()){ + canceled.set(true); + done.set(true); + return true; + }else { + return false; + } + } + + /** + * Returns {@code true} if this task was cancelled before it completed + * normally. + * + * @return {@code true} if this task was cancelled before it completed + */ + @Override + public boolean isCancelled() { + return canceled.get(); + } + + /** + * Returns {@code true} if this task completed. + *

+ * Completion may be due to normal termination, an exception, or + * cancellation -- in all of these cases, this method will return + * {@code true}. + * + * @return {@code true} if this task completed + */ + @Override + public boolean isDone() { + return done.get(); + } + + public void setResult(T result) { + if (canceled.get() || done.get()) { + throw new IllegalStateException(); + } else { + synchronized (this){ + if (canceled.get() || done.get()) { + throw new IllegalStateException(); + } + this.result = result; + done.set(true); + countDownLatch.countDown(); + } + runCompleteListener(); + } + + } + + /** + * Waits if necessary for the computation to complete, and then + * retrieves its result. + * + * @return the computed result + * @throws CancellationException if the computation was cancelled + * @throws ExecutionException if the computation threw an + * exception + * @throws InterruptedException if the current thread was interrupted + * while waiting + */ + @Override + public T get() throws InterruptedException, ExecutionException { + + if (canceled.get()) { + throw new CancellationException(); + } + + if (done.get()) { + return result; + } + + countDownLatch.await(); + if(canceled.get()){ + new CancellationException(); + } + return result; + } + + /** + * Waits if necessary for at most the given time for the computation + * to complete, and then retrieves its result, if available. + * + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @return the computed result + * @throws CancellationException if the computation was cancelled + * @throws ExecutionException if the computation threw an + * exception + * @throws InterruptedException if the current thread was interrupted + * while waiting + * @throws TimeoutException if the wait timed out + */ + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + if (canceled.get()) { + throw new CancellationException(); + } + + if (done.get()) { + return result; + } + + if (!countDownLatch.await(timeout, unit)) { + throw new TimeoutException(); + } + + return result; + } + + private void runCompleteListener(){ + for(Pair runPair: listeners){ + runPair.getValue().execute(runPair.getKey()); + } + } + + + public interface CancelCallable{ + boolean cancel(); + } + +} diff --git a/src/main/java/com/coreos/jetcd/watch/Watcher.java b/src/main/java/com/coreos/jetcd/watch/Watcher.java new file mode 100644 index 000000000..99fe7a4c2 --- /dev/null +++ b/src/main/java/com/coreos/jetcd/watch/Watcher.java @@ -0,0 +1,169 @@ +package com.coreos.jetcd.watch; + +import com.coreos.jetcd.api.Event; +import com.coreos.jetcd.api.WatchResponse; +import com.coreos.jetcd.options.WatchOption; +import com.google.protobuf.ByteString; + +import javax.annotation.concurrent.GuardedBy; +import java.util.List; + +/** + * Watcher class hold watcher information. + */ +public class Watcher { + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + private WatchCallback callback; + private WatchOption watchOption; + private ByteString key; + private long watchID; + private boolean canceled = false; + + public Builder withCallBack(WatchCallback callBack) { + this.callback = callBack; + return this; + } + + public Builder withWatchOption(WatchOption watchOption) { + this.watchOption = watchOption; + return this; + } + + public Builder withKey(ByteString key) { + this.key = key; + return this; + } + + public Builder withWatchID(long watchID) { + this.watchID = watchID; + return this; + } + + public Builder withCanceled(boolean canceled) { + this.canceled = canceled; + return this; + } + + public Watcher build() { + return new Watcher(this.watchID, this.key, this.watchOption, canceled, this.callback); + } + + } + + + private final WatchOption watchOption; + private final ByteString key; + + @GuardedBy("this") + public final WatchCallback callback; + private final long watchID; + + @GuardedBy("this") + private long lastRevision = -1; + private boolean canceled = false; + + @GuardedBy("this") + private boolean resuming; + + private Watcher(long watchID, ByteString key, WatchOption watchOption, boolean canceled, WatchCallback callback) { + this.key = key; + this.watchOption = watchOption; + this.watchID = watchID; + this.callback = callback; + this.canceled = canceled; + this.resuming = watchOption.isResuming(); + } + + /** + * set the last revision watcher received, used for resume + * + * @param lastRevision the last revision + */ + public synchronized void setLastRevision(long lastRevision) { + this.lastRevision = lastRevision; + } + + public boolean isCanceled() { + return canceled; + } + + public synchronized void setCanceled(boolean canceled) { + this.canceled = canceled; + } + + /** + * get the watch id of the watcher + * + * @return + */ + public long getWatchID() { + return watchID; + } + + public WatchOption getWatchOption() { + return watchOption; + } + + /** + * get the last revision watcher received + * + * @return last revision + */ + public synchronized long getLastRevision() { + return lastRevision; + } + + /** + * get the watcher key + * + * @return watcher key + */ + public ByteString getKey() { + return key; + } + + /** + * whether the watcher is resuming. + */ + public synchronized boolean isResuming() { + return resuming; + } + + public synchronized void setResuming(boolean resuming) { + this.resuming = resuming; + } + + public interface WatchCallback { + + /** + * onWatch will be called when watcher receive any events + * + * @param events received events + */ + void onWatch(List events); + + /** + * onCreateFailed will be called when create watcher failed + * + * @param watchResponse watch response + */ + void onCreateFailed(WatchResponse watchResponse); + + /** + * onResuming will be called when the watcher is on resuming. + */ + void onResuming(); + + /** + * onCanceled will be called when the watcher is canceled successfully. + * + * @param response watch response for cancel + */ + void onCanceled(WatchResponse response); + } +} diff --git a/src/test/java/com/coreos/jetcd/EtcdWatchTest.java b/src/test/java/com/coreos/jetcd/EtcdWatchTest.java new file mode 100644 index 000000000..531c37bb7 --- /dev/null +++ b/src/test/java/com/coreos/jetcd/EtcdWatchTest.java @@ -0,0 +1,105 @@ +package com.coreos.jetcd; + +import com.coreos.jetcd.api.Event; +import com.coreos.jetcd.api.WatchResponse; +import com.coreos.jetcd.exception.AuthFailedException; +import com.coreos.jetcd.exception.ConnectException; +import com.coreos.jetcd.options.WatchOption; +import com.coreos.jetcd.util.ListenableSetFuture; +import com.coreos.jetcd.watch.Watcher; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.protobuf.ByteString; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; +import org.testng.asserts.Assertion; + +import java.util.List; +import java.util.concurrent.*; + +/** + * watch test case. + */ +public class EtcdWatchTest { + + private EtcdClient client; + private EtcdWatch watchClient; + private EtcdKV kvClient; + private BlockingQueue eventsQueue = new LinkedBlockingDeque<>(); + + private ByteString key = ByteString.copyFromUtf8("test_key"); + private ByteString value = ByteString.copyFromUtf8("test_val"); + private Watcher watcher; + private ListenableSetFuture cancelResponse; + + private Assertion test = new Assertion(); + + @BeforeTest + public void newEtcdClient() throws AuthFailedException, ConnectException { + client = EtcdClientBuilder.newBuilder().endpoints("localhost:2379").build(); + watchClient = client.getWatchClient(); + kvClient = client.getKVClient(); + } + + @Test + public void testWatch() throws ExecutionException, InterruptedException { + WatchOption option = WatchOption.DEFAULT; + cancelResponse = new ListenableSetFuture<>(null); + watcher = watchClient.watch(key, option, new Watcher.WatchCallback() { + @Override + public void onWatch(List events) { + EtcdWatchTest.this.eventsQueue.addAll(events); + } + + @Override + public void onCreateFailed(WatchResponse watchResponse) { + + } + + @Override + public void onResuming() { + + } + + @Override + public void onCanceled(WatchResponse response) { + cancelResponse.setResult(response); + } + }).get(); + + } + + /** + * watch put operation on key + * assert whether receive put event + */ + @Test(dependsOnMethods = "testWatch") + public void testWatchPut() throws InterruptedException { + kvClient.put(key, value); + Event event = eventsQueue.poll(5, TimeUnit.SECONDS); + test.assertEquals(event.getKv().getKey(), key); + test.assertEquals(event.getType(), Event.EventType.PUT); + } + + /** + * watch delete operation on key + * assert whether receive delete event + */ + @Test(dependsOnMethods = "testWatchPut") + public void testWatchDelete() throws InterruptedException { + kvClient.delete(key); + Event event = eventsQueue.poll(5, TimeUnit.SECONDS); + test.assertEquals(event.getKv().getKey(), key); + test.assertEquals(event.getType(), Event.EventType.DELETE); + } + + /** + * cancel watch test case + * assert whether receive cancel response + */ + @Test(dependsOnMethods = "testWatchDelete") + public void testCancelWatch() throws ExecutionException, InterruptedException, TimeoutException { + watchClient.cancelWatch(watcher); + WatchResponse watchResponse = cancelResponse.get(5, TimeUnit.SECONDS); + test.assertTrue(watchResponse.getCanceled()); + } +} From 177de5b407ee8a2640071166ad5de25175794937 Mon Sep 17 00:00:00 2001 From: StupidHod Date: Mon, 5 Sep 2016 17:17:18 +0800 Subject: [PATCH 2/8] change gRPC class to package class --- src/main/java/com/coreos/jetcd/EtcdUtil.java | 80 +++++++ src/main/java/com/coreos/jetcd/EtcdWatch.java | 58 ++++- .../java/com/coreos/jetcd/EtcdWatchImpl.java | 216 +++++++++++++----- .../com/coreos/jetcd/data/ByteSequence.java | 121 ++++++++++ .../com/coreos/jetcd/data/EtcdHeader.java | 40 ++++ .../java/com/coreos/jetcd/data/KeyValue.java | 47 ++++ .../com/coreos/jetcd/options/WatchOption.java | 11 +- .../jetcd/util/ListenableSetFuture.java | 183 --------------- .../jetcd/watch/WatchCreateException.java | 16 ++ .../com/coreos/jetcd/watch/WatchEvent.java | 39 ++++ .../java/com/coreos/jetcd/watch/Watcher.java | 169 -------------- .../java/com/coreos/jetcd/EtcdWatchTest.java | 63 +++-- 12 files changed, 579 insertions(+), 464 deletions(-) create mode 100644 src/main/java/com/coreos/jetcd/EtcdUtil.java create mode 100644 src/main/java/com/coreos/jetcd/data/ByteSequence.java create mode 100644 src/main/java/com/coreos/jetcd/data/EtcdHeader.java create mode 100644 src/main/java/com/coreos/jetcd/data/KeyValue.java delete mode 100644 src/main/java/com/coreos/jetcd/util/ListenableSetFuture.java create mode 100644 src/main/java/com/coreos/jetcd/watch/WatchCreateException.java create mode 100644 src/main/java/com/coreos/jetcd/watch/WatchEvent.java delete mode 100644 src/main/java/com/coreos/jetcd/watch/Watcher.java diff --git a/src/main/java/com/coreos/jetcd/EtcdUtil.java b/src/main/java/com/coreos/jetcd/EtcdUtil.java new file mode 100644 index 000000000..5f08e5db9 --- /dev/null +++ b/src/main/java/com/coreos/jetcd/EtcdUtil.java @@ -0,0 +1,80 @@ +package com.coreos.jetcd; + +import com.coreos.jetcd.api.Event; +import com.coreos.jetcd.api.ResponseHeader; +import com.coreos.jetcd.data.ByteSequence; +import com.coreos.jetcd.data.EtcdHeader; +import com.coreos.jetcd.data.KeyValue; +import com.coreos.jetcd.watch.WatchEvent; +import com.google.protobuf.ByteString; + +import java.util.ArrayList; +import java.util.List; + +/** + * This util is to convert api class to client class. + */ +public class EtcdUtil { + + private EtcdUtil() { + } + + /** + * convert ByteSequence to ByteString + */ + protected static ByteString byteStringFromByteSequence(ByteSequence byteSequence) { + return ByteString.copyFrom(byteSequence.getBytes()); + } + + /** + * convert ByteString to ByteSequence + * @return + */ + protected static ByteSequence byteSequceFromByteString(ByteString byteString) { + return ByteSequence.fromBytes(byteString.toByteArray()); + } + + /** + * convert API KeyValue to etcd client KeyValue + */ + protected static KeyValue apiToClientKV(com.coreos.jetcd.api.KeyValue keyValue) { + return new KeyValue( + byteSequceFromByteString(keyValue.getKey()), + byteSequceFromByteString(keyValue.getValue()), + keyValue.getCreateRevision(), + keyValue.getModRevision(), + keyValue.getVersion(), + keyValue.getLease()); + } + + /** + * convert API watch event to etcd client event + */ + protected static WatchEvent apiToClientEvent(Event event) { + WatchEvent.EventType eventType = WatchEvent.EventType.UNRECOGNIZED; + switch (event.getType()) { + case DELETE: + eventType = WatchEvent.EventType.DELETE; + break; + case PUT: + eventType = WatchEvent.EventType.PUT; + break; + } + return new WatchEvent(apiToClientKV(event.getKv()), apiToClientKV(event.getPrevKv()), eventType); + } + + protected static List apiToClientEvents(List events) { + List watchEvents = new ArrayList<>(); + for (Event event : events) { + watchEvents.add(apiToClientEvent(event)); + } + return watchEvents; + } + + /** + * convert API response header to self defined header + */ + protected static EtcdHeader apiToClientHeader(ResponseHeader header, long compactRevision) { + return new EtcdHeader(header.getClusterId(), header.getMemberId(), header.getRevision(), header.getRaftTerm(), compactRevision); + } +} diff --git a/src/main/java/com/coreos/jetcd/EtcdWatch.java b/src/main/java/com/coreos/jetcd/EtcdWatch.java index 8f923b5b9..497b7fe04 100644 --- a/src/main/java/com/coreos/jetcd/EtcdWatch.java +++ b/src/main/java/com/coreos/jetcd/EtcdWatch.java @@ -1,9 +1,12 @@ package com.coreos.jetcd; +import com.coreos.jetcd.data.ByteSequence; +import com.coreos.jetcd.data.EtcdHeader; import com.coreos.jetcd.options.WatchOption; -import com.coreos.jetcd.util.ListenableSetFuture; -import com.coreos.jetcd.watch.Watcher; -import com.google.protobuf.ByteString; +import com.coreos.jetcd.watch.WatchEvent; + +import java.util.List; +import java.util.concurrent.CompletableFuture; /** * Interface of watch client @@ -21,12 +24,47 @@ public interface EtcdWatch { * @param callback call back * @return ListenableFuture watcher */ - ListenableSetFuture watch(ByteString key, WatchOption watchOption, Watcher.WatchCallback callback); + CompletableFuture watch(ByteSequence key, WatchOption watchOption, WatchCallback callback); - /** - * Cancel the watch task with the watcher, the onCanceled will be called after successfully canceled. - * - * @param watcher the watcher to be canceled - */ - void cancelWatch(Watcher watcher); + interface Watcher{ + + /** + * get watcher id + * @return id + */ + long getWatchID(); + + long getLastRevision(); + + ByteSequence getKey(); + + boolean isResuming(); + + /** + * get the watch option + * @return watch option + */ + WatchOption getWatchOption(); + + /** + * cancel the watcher + * @return cancel result + */ + CompletableFuture cancel(); + } + + interface WatchCallback { + + /** + * onWatch will be called when watcher receive any events + * + * @param events received events + */ + void onWatch(EtcdHeader header, List events); + + /** + * onResuming will be called when the watcher is on resuming. + */ + void onResuming(); + } } diff --git a/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java b/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java index dea2ceafa..f605f6d1d 100644 --- a/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java +++ b/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java @@ -1,31 +1,35 @@ package com.coreos.jetcd; import com.coreos.jetcd.api.*; +import com.coreos.jetcd.data.ByteSequence; import com.coreos.jetcd.options.WatchOption; -import com.coreos.jetcd.util.ListenableSetFuture; -import com.coreos.jetcd.watch.Watcher; +import com.coreos.jetcd.watch.WatchCreateException; import com.google.protobuf.ByteString; import io.grpc.stub.StreamObserver; import javafx.util.Pair; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import static com.coreos.jetcd.EtcdUtil.apiToClientEvents; +import static com.coreos.jetcd.EtcdUtil.apiToClientHeader; + /** * etcd watcher Implementation */ public class EtcdWatchImpl implements EtcdWatch { - private StreamObserver requestStream; + private volatile StreamObserver requestStream; - private ConcurrentHashMap watchers = new ConcurrentHashMap<>(); + private ConcurrentHashMap watchers = new ConcurrentHashMap<>(); private WatchGrpc.WatchStub watchStub; - private ConcurrentLinkedQueue>> pendingWatchers = new ConcurrentLinkedQueue<>(); - private Map cancelWatchers = new ConcurrentHashMap<>(); + private ConcurrentLinkedQueue>> pendingCreateWatchers = new ConcurrentLinkedQueue<>(); + private Map> pendingCancelFutures = new ConcurrentHashMap<>(); public EtcdWatchImpl(WatchGrpc.WatchStub watchStub) { this.watchStub = watchStub; @@ -39,16 +43,14 @@ public EtcdWatchImpl(WatchGrpc.WatchStub watchStub) { * @param key the key subscribe * @param watchOption key option * @param callback call back - * @return ListenableFuture watcher + * @return CompletableFuture watcher */ @Override - public synchronized ListenableSetFuture watch(ByteString key, WatchOption watchOption, Watcher.WatchCallback callback) { - WatchRequest request = optionToWatchCreateRequest(key, watchOption); - Watcher.Builder builder = Watcher.newBuilder().withCallBack(callback) - .withKey(key) - .withWatchOption(watchOption); - ListenableSetFuture waitFuture = new ListenableSetFuture<>(null); - this.pendingWatchers.add(new Pair<>(builder, waitFuture)); + public CompletableFuture watch(ByteSequence key, WatchOption watchOption, WatchCallback callback) { + WatchRequest request = optionToWatchCreateRequest(EtcdUtil.byteStringFromByteSequence(key), watchOption); + WatcherImpl watcher = new WatcherImpl(key, watchOption, callback); + CompletableFuture waitFuture = new CompletableFuture(); + this.pendingCreateWatchers.add(new Pair<>(watcher, waitFuture)); getRequestStream().onNext(request); return waitFuture; } @@ -56,37 +58,40 @@ public synchronized ListenableSetFuture watch(ByteString key, WatchOpti /** * Cancel the watch task with the watcher, the onCanceled will be called after successfully canceled. * - * @param watcher the watcher to be canceled + * @param id the watcher to be canceled */ - @Override - public void cancelWatch(Watcher watcher) { - Watcher temp = watchers.get(watcher.getWatchID()); + protected CompletableFuture cancelWatch(long id) { + WatcherImpl temp = watchers.get(id); + CompletableFuture completableFuture = null; if (temp != null) { synchronized (temp) { if (this.watchers.containsKey(temp.getWatchID())) { this.watchers.remove(temp.getWatchID()); - this.cancelWatchers.put(temp.getWatchID(), temp); - WatchCancelRequest cancelRequest = WatchCancelRequest.newBuilder().setWatchId(watcher.getWatchID()).build(); - WatchRequest request = WatchRequest.newBuilder().setCancelRequest(cancelRequest).build(); - this.requestStream.onNext(request); + completableFuture = new CompletableFuture<>(); + this.pendingCancelFutures.put(id, completableFuture); } } } + + WatchCancelRequest cancelRequest = WatchCancelRequest.newBuilder().setWatchId(id).build(); + WatchRequest request = WatchRequest.newBuilder().setCancelRequest(cancelRequest).build(); + this.requestStream.onNext(request); + return completableFuture; } /** * empty the old request stream, watchers and resume the old watchers - * empty the cancelWatchers as there is no need to cancel, the old request stream has been dead + * empty the pendingCancelFutures as there is no need to cancel, the old request stream has been dead */ - private void resume() { - synchronized (this) { - this.requestStream = null; - Watcher[] resumeWatchers = (Watcher[]) watchers.values().toArray(); - this.watchers.clear(); - this.cancelWatchers.clear(); - resumeWatchers(resumeWatchers); + private synchronized void resume() { + this.requestStream = null; + WatcherImpl[] resumeWatchers = (WatcherImpl[]) watchers.values().toArray(); + this.watchers.clear(); + for (CompletableFuture watcherCompletableFuture : pendingCancelFutures.values()) { + watcherCompletableFuture.complete(Boolean.TRUE); } - + this.pendingCancelFutures.clear(); + resumeWatchers(resumeWatchers); } /** @@ -132,43 +137,39 @@ public void onCompleted() { return this.requestStream; } - /** * Process create response from etcd server *

If there is no pendingWatcher, ignore. *

If cancel flag is true or CompactRevision not equal zero means the start revision * has been compacted out of the store, call onCreateFailed. - *

If watchID = -1, create failed, call onCreateFailed. - *

If everything is Ok, create watcher, complete ListenableFuture task and put the new watcher + *

If watchID = -1, complete future with WatchCreateException. + *

If everything is Ok, create watcher, complete CompletableFuture task and put the new watcher * to the watchers map. * * @param response */ private void processCreate(WatchResponse response) { - Pair> requestPair = pendingWatchers.poll(); - Watcher.Builder builder = requestPair.getKey(); + Pair> requestPair = pendingCreateWatchers.poll(); + WatcherImpl watcher = requestPair.getKey(); if (response.getCreated()) { if (response.getCanceled() || response.getCompactRevision() != 0) { - builder.withCanceled(true); - Watcher watcher = builder.build(); - requestPair.getValue().setResult(watcher); + watcher.setCanceled(true); + requestPair.getValue().completeExceptionally(new WatchCreateException("the start revision has been compacted", apiToClientHeader(response.getHeader(), response.getCompactRevision())));; } - builder.withWatchID(response.getWatchId()); - Watcher watcher = builder.build(); - requestPair.getValue().setResult(watcher); - if (response.getWatchId() == -1 && watcher.callback != null) { - watcher.callback.onCreateFailed(response); + requestPair.getValue().completeExceptionally(new WatchCreateException("create watcher failed", apiToClientHeader(response.getHeader(), response.getCompactRevision()))); } else { this.watchers.put(watcher.getWatchID(), watcher); + watcher.setWatchID(response.getWatchId()); + requestPair.getValue().complete(watcher); } //note the header revision so that put following a current watcher disconnect will arrive //on watcher channel after reconnect synchronized (watcher) { watcher.setLastRevision(response.getHeader().getRevision()); - if(watcher.isResuming()){ + if (watcher.isResuming()) { watcher.setResuming(false); } } @@ -177,14 +178,14 @@ private void processCreate(WatchResponse response) { /** * Process subscribe watch events - *

If the watch id is not in the watchers map, scan it in the cancelWatchers map + *

If the watch id is not in the watchers map, scan it in the pendingCancelFutures map * if exist, ignore, otherwise cancel it. *

If the watcher exist, call the onWatch and set the last revision for resume * * @param watchResponse */ private void processEvents(WatchResponse watchResponse) { - Watcher watcher = watchers.get(watchResponse.getWatchId()); + WatcherImpl watcher = watchers.get(watchResponse.getWatchId()); if (watcher != null) { synchronized (watcher) { if (watchResponse.getEventsCount() != 0) { @@ -200,16 +201,17 @@ private void processEvents(WatchResponse watchResponse) { .getKv().getModRevision()); if (watcher.callback != null) { - watcher.callback.onWatch(events); + watcher.callback.onWatch(apiToClientHeader(watchResponse.getHeader(), watchResponse.getCompactRevision()), apiToClientEvents(events)); } } else { watcher.setLastRevision(watchResponse.getHeader().getRevision()); } } } else { - watcher = this.cancelWatchers.get(watchResponse.getWatchId()); - if (this.cancelWatchers.putIfAbsent(watcher.getWatchID(), watcher) == null) { - cancelWatch(watcher); + // if the watcher is not canceling, cancel it. + CompletableFuture completableFuture = this.pendingCancelFutures.get(watchResponse.getWatchId()); + if (this.pendingCancelFutures.putIfAbsent(watcher.getWatchID(), completableFuture) == null) { + cancelWatch(watchResponse.getWatchId()); } } } @@ -219,9 +221,9 @@ private void processEvents(WatchResponse watchResponse) { * * @param watchers */ - private void resumeWatchers(Watcher[] watchers) { - for (Watcher watcher : watchers) { - if(watcher.callback!=null){ + private void resumeWatchers(WatcherImpl[] watchers) { + for (WatcherImpl watcher : watchers) { + if (watcher.callback != null) { watcher.callback.onResuming(); } watch(watcher.getKey(), getResumeWatchOptionWithWatcher(watcher), watcher.callback); @@ -234,13 +236,8 @@ private void resumeWatchers(Watcher[] watchers) { * @param response */ private void processCanceled(WatchResponse response) { - Watcher watcher = this.cancelWatchers.remove(response.getWatchId()); - if (watcher != null && watcher.callback != null) { - if (watcher.callback != null) { - watcher.setCanceled(true); - watcher.callback.onCanceled(response); - } - } + CompletableFuture cancelFuture = this.pendingCancelFutures.remove(response.getWatchId()); + cancelFuture.complete(Boolean.TRUE); } /** @@ -258,7 +255,7 @@ private WatchRequest optionToWatchCreateRequest(ByteString key, WatchOption opti .setStartRevision(option.getRevision()); if (option.getEndKey().isPresent()) { - builder.setRangeEnd(option.getEndKey().get()); + builder.setRangeEnd(EtcdUtil.byteStringFromByteSequence(option.getEndKey().get())); } if (option.isNoDelete()) { @@ -285,10 +282,103 @@ private WatchOption getResumeWatchOptionWithWatcher(Watcher watcher) { .withPrevKV(oldOption.isPrevKV()) .withProgressNotify(oldOption.isProgressNotify()) .withRange(oldOption.getEndKey().get()) - .withRevision(watcher.getLastRevision()+1) + .withRevision(watcher.getLastRevision() + 1) .withResuming(true) .build(); } + /** + * Watcher class hold watcher information. + */ + public class WatcherImpl implements Watcher { + + private final WatchOption watchOption; + private final ByteSequence key; + + public final WatchCallback callback; + private long watchID; + + private long lastRevision = -1; + private boolean canceled = false; + + private boolean resuming; + + private WatcherImpl(ByteSequence key, WatchOption watchOption, WatchCallback callback) { + this.key = key; + this.watchOption = watchOption; + this.callback = callback; + this.resuming = watchOption.isResuming(); + } + + @Override + public CompletableFuture cancel() { + return cancelWatch(watchID); + } + + /** + * set the last revision watcher received, used for resume + * + * @param lastRevision the last revision + */ + private void setLastRevision(long lastRevision) { + this.lastRevision = lastRevision; + } + + public boolean isCanceled() { + return canceled; + } + + private void setCanceled(boolean canceled) { + this.canceled = canceled; + } + + /** + * get the watch id of the watcher + * + * @return + */ + public long getWatchID() { + return watchID; + } + + private void setWatchID(long watchID) { + this.watchID = watchID; + } + + public WatchOption getWatchOption() { + return watchOption; + } + + /** + * get the last revision watcher received + * + * @return last revision + */ + public long getLastRevision() { + return lastRevision; + } + + /** + * get the watcher key + * + * @return watcher key + */ + public ByteSequence getKey() { + return key; + } + + /** + * whether the watcher is resuming. + */ + public boolean isResuming() { + return resuming; + } + + private void setResuming(boolean resuming) { + this.resuming = resuming; + } + + } + } diff --git a/src/main/java/com/coreos/jetcd/data/ByteSequence.java b/src/main/java/com/coreos/jetcd/data/ByteSequence.java new file mode 100644 index 000000000..f4be329c6 --- /dev/null +++ b/src/main/java/com/coreos/jetcd/data/ByteSequence.java @@ -0,0 +1,121 @@ +package com.coreos.jetcd.data; + +import com.google.protobuf.ByteString; + +import java.io.UnsupportedEncodingException; +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.util.Arrays; + +/** + * Etcd binary bytes, easy to convert between byte[], String and ByteString. + */ +public class ByteSequence { + + private final byte[] bytes; + private final int hashVal; + private final ByteString byteString; + + + public ByteSequence(byte[] source) { + this.bytes = Arrays.copyOf(source, source.length); + hashVal = calcHashCore(); + byteString = toByteString(); + } + + protected ByteSequence(ByteString byteString) { + this(byteString.toByteArray()); + } + + public ByteSequence(String string) { + this(string.getBytes()); + } + + public ByteSequence(CharBuffer charBuffer) { + this(String.valueOf(charBuffer.array())); + } + + public ByteSequence(CharSequence charSequence) { + this(java.nio.CharBuffer.wrap(charSequence)); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (obj instanceof ByteSequence) { + ByteSequence target = (ByteSequence) obj; + if (target.bytes.length == this.bytes.length) { + for (int i = 0; i < this.bytes.length; ++i) { + if (bytes[i] != target.bytes[i]) { + return false; + } + } + return true; + } else { + return false; + } + } else { + return false; + } + } + + protected ByteString getByteString() { + return this.byteString; + } + + private ByteString toByteString() { + return ByteString.copyFrom(bytes); + } + + private int calcHashCore() { + int result = 0; + for (int i = 0; i < bytes.length; ++i) { + result = 31 * result + bytes[i]; + } + return result; + } + + @Override + public int hashCode() { + return hashVal; + } + + public String toStringUtf8() { + return byteString.toStringUtf8(); + } + + public String toString(Charset charset) { + return byteString.toString(charset); + } + + public String toString(String charsetName) throws UnsupportedEncodingException { + return byteString.toString(charsetName); + } + + public byte[] getBytes() { + return Arrays.copyOf(bytes, bytes.length); + } + + public static ByteSequence fromString(String string) { + return new ByteSequence(string); + } + + public static ByteSequence fromCharSequence(CharSequence charSequence) { + return new ByteSequence(charSequence); + } + + public static ByteSequence fromCharBuffer(CharBuffer charBuffer) { + return new ByteSequence(charBuffer); + } + + protected static ByteSequence fromByteString(ByteString byteString) { + return new ByteSequence(byteString); + } + + public static ByteSequence fromBytes(byte[] bytes) { + return new ByteSequence(bytes); + } + +} diff --git a/src/main/java/com/coreos/jetcd/data/EtcdHeader.java b/src/main/java/com/coreos/jetcd/data/EtcdHeader.java new file mode 100644 index 000000000..4eaeaa75c --- /dev/null +++ b/src/main/java/com/coreos/jetcd/data/EtcdHeader.java @@ -0,0 +1,40 @@ +package com.coreos.jetcd.data; + +/** + * Etcd message header information. + */ +public class EtcdHeader { + private final long clusterId; + private final long memberId; + private final long revision; + private final long raftTerm; + private final long compactRevision; + + public EtcdHeader(long clusterId, long memberId, long revision, long raftTerm, long compactRevision) { + this.clusterId = clusterId; + this.memberId = memberId; + this.revision = revision; + this.raftTerm = raftTerm; + this.compactRevision = compactRevision; + } + + public long getClusterId() { + return clusterId; + } + + public long getMemberId() { + return memberId; + } + + public long getRevision() { + return revision; + } + + public long getRaftTerm() { + return raftTerm; + } + + public long getCompactRevision() { + return compactRevision; + } +} diff --git a/src/main/java/com/coreos/jetcd/data/KeyValue.java b/src/main/java/com/coreos/jetcd/data/KeyValue.java new file mode 100644 index 000000000..de4ded15d --- /dev/null +++ b/src/main/java/com/coreos/jetcd/data/KeyValue.java @@ -0,0 +1,47 @@ +package com.coreos.jetcd.data; + +/** + * Etcd key value pair. + */ +public class KeyValue { + + private final ByteSequence key; + private final ByteSequence value; + private long createRevision = 0L; + private long modRevision = 0L; + private long version = 0L; + private long lease = 0L; + + public KeyValue(ByteSequence key, ByteSequence value, long createRevision, long modRevision, long version, long lease) { + this.key = key; + this.value = value; + this.createRevision = createRevision; + this.modRevision = modRevision; + this.version = version; + this.lease = lease; + } + + public ByteSequence getKey() { + return key; + } + + public ByteSequence getValue() { + return value; + } + + public long getCreateRevision() { + return createRevision; + } + + public long getModRevision() { + return modRevision; + } + + public long getVersion() { + return version; + } + + public long getLease() { + return lease; + } +} diff --git a/src/main/java/com/coreos/jetcd/options/WatchOption.java b/src/main/java/com/coreos/jetcd/options/WatchOption.java index 46c7bd594..ec842960c 100644 --- a/src/main/java/com/coreos/jetcd/options/WatchOption.java +++ b/src/main/java/com/coreos/jetcd/options/WatchOption.java @@ -1,6 +1,7 @@ package com.coreos.jetcd.options; import com.coreos.jetcd.api.WatchCreateRequest; +import com.coreos.jetcd.data.ByteSequence; import com.google.protobuf.ByteString; import java.util.List; @@ -26,7 +27,7 @@ public static Builder newBuilder() { public static class Builder { private long revision = 0L; - private Optional endKey = Optional.empty(); + private Optional endKey = Optional.empty(); private boolean prevKV = false; private boolean progressNotify = false; private boolean noPut = false; @@ -59,7 +60,7 @@ public Builder withRevision(long revision) { * @param endKey end key * @return builder */ - public Builder withRange(ByteString endKey) { + public Builder withRange(ByteSequence endKey) { this.endKey = Optional.ofNullable(endKey); return this; } @@ -125,7 +126,7 @@ public WatchOption build() { } - private final Optional endKey; + private final Optional endKey; private final long revision; private final boolean prevKV; private final boolean progressNotify; @@ -133,7 +134,7 @@ public WatchOption build() { private final boolean noDelete; private final boolean resuming; - private WatchOption(Optional endKey, + private WatchOption(Optional endKey, long revision, boolean prevKV, boolean progressNotify, @@ -149,7 +150,7 @@ private WatchOption(Optional endKey, this.resuming = resuming; } - public Optional getEndKey() { + public Optional getEndKey() { return this.endKey; } diff --git a/src/main/java/com/coreos/jetcd/util/ListenableSetFuture.java b/src/main/java/com/coreos/jetcd/util/ListenableSetFuture.java deleted file mode 100644 index dd83b9774..000000000 --- a/src/main/java/com/coreos/jetcd/util/ListenableSetFuture.java +++ /dev/null @@ -1,183 +0,0 @@ -package com.coreos.jetcd.util; - -import com.google.common.util.concurrent.ListenableFuture; -import javafx.util.Pair; - -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * ListenableSetFuture, the result can be set by any thread. Before the setResult method called, the get method will be blocked. - */ -public class ListenableSetFuture implements ListenableFuture { - - private volatile T result = null; - - private List> listeners = new LinkedList<>(); - - private AtomicBoolean canceled = new AtomicBoolean(false); - private AtomicBoolean done = new AtomicBoolean(false); - - private CountDownLatch countDownLatch = new CountDownLatch(1); - private final CancelCallable cancelCallable; - - public ListenableSetFuture(CancelCallable cancelCallable){ - this.cancelCallable = cancelCallable; - } - - @Override - public void addListener(Runnable runnable, Executor executor) { - listeners.add(new Pair<>(runnable, executor)); - } - - /** - * Attempts to cancel execution of this task. This attempt will - * fail if the task has already completed, has already been cancelled, - * or could not be cancelled for some other reason. If successful, - * and this task has not started when {@code cancel} is called, - * this task should never run. If the task has already started, - * then the {@code mayInterruptIfRunning} parameter determines - * whether the thread executing this task should be interrupted in - * an attempt to stop the task. - *

- *

After this method returns, subsequent calls to {@link #isDone} will - * always return {@code true}. Subsequent calls to {@link #isCancelled} - * will always return {@code true} if this method returned {@code true}. - * - * @param mayInterruptIfRunning {@code true} if the thread executing this - * task should be interrupted; otherwise, in-progress tasks are allowed - * to complete - * @return {@code false} if the task could not be cancelled, - * typically because it has already completed normally; - * {@code true} otherwise - */ - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - if(cancelCallable == null || done.get()){ - return false; - } - if(cancelCallable.cancel()){ - canceled.set(true); - done.set(true); - return true; - }else { - return false; - } - } - - /** - * Returns {@code true} if this task was cancelled before it completed - * normally. - * - * @return {@code true} if this task was cancelled before it completed - */ - @Override - public boolean isCancelled() { - return canceled.get(); - } - - /** - * Returns {@code true} if this task completed. - *

- * Completion may be due to normal termination, an exception, or - * cancellation -- in all of these cases, this method will return - * {@code true}. - * - * @return {@code true} if this task completed - */ - @Override - public boolean isDone() { - return done.get(); - } - - public void setResult(T result) { - if (canceled.get() || done.get()) { - throw new IllegalStateException(); - } else { - synchronized (this){ - if (canceled.get() || done.get()) { - throw new IllegalStateException(); - } - this.result = result; - done.set(true); - countDownLatch.countDown(); - } - runCompleteListener(); - } - - } - - /** - * Waits if necessary for the computation to complete, and then - * retrieves its result. - * - * @return the computed result - * @throws CancellationException if the computation was cancelled - * @throws ExecutionException if the computation threw an - * exception - * @throws InterruptedException if the current thread was interrupted - * while waiting - */ - @Override - public T get() throws InterruptedException, ExecutionException { - - if (canceled.get()) { - throw new CancellationException(); - } - - if (done.get()) { - return result; - } - - countDownLatch.await(); - if(canceled.get()){ - new CancellationException(); - } - return result; - } - - /** - * Waits if necessary for at most the given time for the computation - * to complete, and then retrieves its result, if available. - * - * @param timeout the maximum time to wait - * @param unit the time unit of the timeout argument - * @return the computed result - * @throws CancellationException if the computation was cancelled - * @throws ExecutionException if the computation threw an - * exception - * @throws InterruptedException if the current thread was interrupted - * while waiting - * @throws TimeoutException if the wait timed out - */ - @Override - public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - if (canceled.get()) { - throw new CancellationException(); - } - - if (done.get()) { - return result; - } - - if (!countDownLatch.await(timeout, unit)) { - throw new TimeoutException(); - } - - return result; - } - - private void runCompleteListener(){ - for(Pair runPair: listeners){ - runPair.getValue().execute(runPair.getKey()); - } - } - - - public interface CancelCallable{ - boolean cancel(); - } - -} diff --git a/src/main/java/com/coreos/jetcd/watch/WatchCreateException.java b/src/main/java/com/coreos/jetcd/watch/WatchCreateException.java new file mode 100644 index 000000000..ccc3e342c --- /dev/null +++ b/src/main/java/com/coreos/jetcd/watch/WatchCreateException.java @@ -0,0 +1,16 @@ +package com.coreos.jetcd.watch; + +import com.coreos.jetcd.data.EtcdHeader; + +/** + * Exception thrown when create watcher failed. + */ +public class WatchCreateException extends Exception { + + public final EtcdHeader header; + + public WatchCreateException(String cause, EtcdHeader header) { + super(cause); + this.header = header; + } +} diff --git a/src/main/java/com/coreos/jetcd/watch/WatchEvent.java b/src/main/java/com/coreos/jetcd/watch/WatchEvent.java new file mode 100644 index 000000000..11d88cda6 --- /dev/null +++ b/src/main/java/com/coreos/jetcd/watch/WatchEvent.java @@ -0,0 +1,39 @@ +package com.coreos.jetcd.watch; + +import com.coreos.jetcd.data.KeyValue; + +/** + * Watch event, return by watch, contain put, delete event. + */ +public class WatchEvent { + + public enum EventType { + PUT, + DELETE, + UNRECOGNIZED, + } + + private final KeyValue keyValue; + + private final KeyValue prevKV; + + private final EventType eventType; + + public WatchEvent(KeyValue keyValue, KeyValue prevKV, EventType eventType) { + this.keyValue = keyValue; + this.prevKV = prevKV; + this.eventType = eventType; + } + + public KeyValue getKeyValue() { + return keyValue; + } + + public KeyValue getPrevKV() { + return prevKV; + } + + public EventType getEventType() { + return eventType; + } +} diff --git a/src/main/java/com/coreos/jetcd/watch/Watcher.java b/src/main/java/com/coreos/jetcd/watch/Watcher.java deleted file mode 100644 index 99fe7a4c2..000000000 --- a/src/main/java/com/coreos/jetcd/watch/Watcher.java +++ /dev/null @@ -1,169 +0,0 @@ -package com.coreos.jetcd.watch; - -import com.coreos.jetcd.api.Event; -import com.coreos.jetcd.api.WatchResponse; -import com.coreos.jetcd.options.WatchOption; -import com.google.protobuf.ByteString; - -import javax.annotation.concurrent.GuardedBy; -import java.util.List; - -/** - * Watcher class hold watcher information. - */ -public class Watcher { - - public static Builder newBuilder() { - return new Builder(); - } - - public static class Builder { - private WatchCallback callback; - private WatchOption watchOption; - private ByteString key; - private long watchID; - private boolean canceled = false; - - public Builder withCallBack(WatchCallback callBack) { - this.callback = callBack; - return this; - } - - public Builder withWatchOption(WatchOption watchOption) { - this.watchOption = watchOption; - return this; - } - - public Builder withKey(ByteString key) { - this.key = key; - return this; - } - - public Builder withWatchID(long watchID) { - this.watchID = watchID; - return this; - } - - public Builder withCanceled(boolean canceled) { - this.canceled = canceled; - return this; - } - - public Watcher build() { - return new Watcher(this.watchID, this.key, this.watchOption, canceled, this.callback); - } - - } - - - private final WatchOption watchOption; - private final ByteString key; - - @GuardedBy("this") - public final WatchCallback callback; - private final long watchID; - - @GuardedBy("this") - private long lastRevision = -1; - private boolean canceled = false; - - @GuardedBy("this") - private boolean resuming; - - private Watcher(long watchID, ByteString key, WatchOption watchOption, boolean canceled, WatchCallback callback) { - this.key = key; - this.watchOption = watchOption; - this.watchID = watchID; - this.callback = callback; - this.canceled = canceled; - this.resuming = watchOption.isResuming(); - } - - /** - * set the last revision watcher received, used for resume - * - * @param lastRevision the last revision - */ - public synchronized void setLastRevision(long lastRevision) { - this.lastRevision = lastRevision; - } - - public boolean isCanceled() { - return canceled; - } - - public synchronized void setCanceled(boolean canceled) { - this.canceled = canceled; - } - - /** - * get the watch id of the watcher - * - * @return - */ - public long getWatchID() { - return watchID; - } - - public WatchOption getWatchOption() { - return watchOption; - } - - /** - * get the last revision watcher received - * - * @return last revision - */ - public synchronized long getLastRevision() { - return lastRevision; - } - - /** - * get the watcher key - * - * @return watcher key - */ - public ByteString getKey() { - return key; - } - - /** - * whether the watcher is resuming. - */ - public synchronized boolean isResuming() { - return resuming; - } - - public synchronized void setResuming(boolean resuming) { - this.resuming = resuming; - } - - public interface WatchCallback { - - /** - * onWatch will be called when watcher receive any events - * - * @param events received events - */ - void onWatch(List events); - - /** - * onCreateFailed will be called when create watcher failed - * - * @param watchResponse watch response - */ - void onCreateFailed(WatchResponse watchResponse); - - /** - * onResuming will be called when the watcher is on resuming. - */ - void onResuming(); - - /** - * onCanceled will be called when the watcher is canceled successfully. - * - * @param response watch response for cancel - */ - void onCanceled(WatchResponse response); - } -} diff --git a/src/test/java/com/coreos/jetcd/EtcdWatchTest.java b/src/test/java/com/coreos/jetcd/EtcdWatchTest.java index 531c37bb7..2eabbd1cf 100644 --- a/src/test/java/com/coreos/jetcd/EtcdWatchTest.java +++ b/src/test/java/com/coreos/jetcd/EtcdWatchTest.java @@ -1,14 +1,11 @@ package com.coreos.jetcd; -import com.coreos.jetcd.api.Event; -import com.coreos.jetcd.api.WatchResponse; +import com.coreos.jetcd.data.ByteSequence; +import com.coreos.jetcd.data.EtcdHeader; import com.coreos.jetcd.exception.AuthFailedException; import com.coreos.jetcd.exception.ConnectException; import com.coreos.jetcd.options.WatchOption; -import com.coreos.jetcd.util.ListenableSetFuture; -import com.coreos.jetcd.watch.Watcher; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.protobuf.ByteString; +import com.coreos.jetcd.watch.WatchEvent; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; import org.testng.asserts.Assertion; @@ -24,12 +21,11 @@ public class EtcdWatchTest { private EtcdClient client; private EtcdWatch watchClient; private EtcdKV kvClient; - private BlockingQueue eventsQueue = new LinkedBlockingDeque<>(); + private BlockingQueue eventsQueue = new LinkedBlockingDeque<>(); - private ByteString key = ByteString.copyFromUtf8("test_key"); - private ByteString value = ByteString.copyFromUtf8("test_val"); - private Watcher watcher; - private ListenableSetFuture cancelResponse; + private ByteSequence key = ByteSequence.fromString("test_key"); + private ByteSequence value = ByteSequence.fromString("test_val"); + private EtcdWatchImpl.Watcher watcher; private Assertion test = new Assertion(); @@ -43,27 +39,27 @@ public void newEtcdClient() throws AuthFailedException, ConnectException { @Test public void testWatch() throws ExecutionException, InterruptedException { WatchOption option = WatchOption.DEFAULT; - cancelResponse = new ListenableSetFuture<>(null); - watcher = watchClient.watch(key, option, new Watcher.WatchCallback() { + watcher = watchClient.watch(key, option, new EtcdWatch.WatchCallback() { + + /** + * onWatch will be called when watcher receive any events + * + * @param header + * @param events received events + */ @Override - public void onWatch(List events) { + public void onWatch(EtcdHeader header, List events) { EtcdWatchTest.this.eventsQueue.addAll(events); } - @Override - public void onCreateFailed(WatchResponse watchResponse) { - - } - + /** + * onResuming will be called when the watcher is on resuming. + */ @Override public void onResuming() { } - @Override - public void onCanceled(WatchResponse response) { - cancelResponse.setResult(response); - } }).get(); } @@ -74,10 +70,10 @@ public void onCanceled(WatchResponse response) { */ @Test(dependsOnMethods = "testWatch") public void testWatchPut() throws InterruptedException { - kvClient.put(key, value); - Event event = eventsQueue.poll(5, TimeUnit.SECONDS); - test.assertEquals(event.getKv().getKey(), key); - test.assertEquals(event.getType(), Event.EventType.PUT); + kvClient.put(EtcdUtil.byteStringFromByteSequence(key), EtcdUtil.byteStringFromByteSequence(value)); + WatchEvent event = eventsQueue.poll(5, TimeUnit.SECONDS); + test.assertEquals(event.getKeyValue().getKey(), key); + test.assertEquals(event.getEventType(), WatchEvent.EventType.PUT); } /** @@ -86,10 +82,10 @@ public void testWatchPut() throws InterruptedException { */ @Test(dependsOnMethods = "testWatchPut") public void testWatchDelete() throws InterruptedException { - kvClient.delete(key); - Event event = eventsQueue.poll(5, TimeUnit.SECONDS); - test.assertEquals(event.getKv().getKey(), key); - test.assertEquals(event.getType(), Event.EventType.DELETE); + kvClient.delete(EtcdUtil.byteStringFromByteSequence(key)); + WatchEvent event = eventsQueue.poll(5, TimeUnit.SECONDS); + test.assertEquals(event.getKeyValue().getKey(), key); + test.assertEquals(event.getEventType(), WatchEvent.EventType.DELETE); } /** @@ -98,8 +94,7 @@ public void testWatchDelete() throws InterruptedException { */ @Test(dependsOnMethods = "testWatchDelete") public void testCancelWatch() throws ExecutionException, InterruptedException, TimeoutException { - watchClient.cancelWatch(watcher); - WatchResponse watchResponse = cancelResponse.get(5, TimeUnit.SECONDS); - test.assertTrue(watchResponse.getCanceled()); + CompletableFuture future = watcher.cancel(); + test.assertTrue(future.get(5, TimeUnit.SECONDS)); } } From 8d2baa5cc68efcab01f6dddebbaaa7cdf9468346 Mon Sep 17 00:00:00 2001 From: StupidHod Date: Mon, 5 Sep 2016 17:44:32 +0800 Subject: [PATCH 3/8] add final for watch stub --- src/main/java/com/coreos/jetcd/EtcdWatchImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java b/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java index f605f6d1d..40e5e27f7 100644 --- a/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java +++ b/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java @@ -26,7 +26,7 @@ public class EtcdWatchImpl implements EtcdWatch { private ConcurrentHashMap watchers = new ConcurrentHashMap<>(); - private WatchGrpc.WatchStub watchStub; + private final WatchGrpc.WatchStub watchStub; private ConcurrentLinkedQueue>> pendingCreateWatchers = new ConcurrentLinkedQueue<>(); private Map> pendingCancelFutures = new ConcurrentHashMap<>(); From 5e161766686d308d807955c1400fe69f22048714 Mon Sep 17 00:00:00 2001 From: StupidHod Date: Tue, 6 Sep 2016 10:46:14 +0800 Subject: [PATCH 4/8] add closeable for Watcher, change visible for EtcdUtil --- src/main/java/com/coreos/jetcd/EtcdUtil.java | 2 +- src/main/java/com/coreos/jetcd/EtcdWatch.java | 6 ++- .../java/com/coreos/jetcd/EtcdWatchImpl.java | 41 ++++++++++++++++--- .../com/coreos/jetcd/data/ByteSequence.java | 27 ++++-------- .../com/coreos/jetcd/options/WatchOption.java | 4 -- 5 files changed, 50 insertions(+), 30 deletions(-) diff --git a/src/main/java/com/coreos/jetcd/EtcdUtil.java b/src/main/java/com/coreos/jetcd/EtcdUtil.java index 5f08e5db9..101e7a865 100644 --- a/src/main/java/com/coreos/jetcd/EtcdUtil.java +++ b/src/main/java/com/coreos/jetcd/EtcdUtil.java @@ -14,7 +14,7 @@ /** * This util is to convert api class to client class. */ -public class EtcdUtil { +class EtcdUtil { private EtcdUtil() { } diff --git a/src/main/java/com/coreos/jetcd/EtcdWatch.java b/src/main/java/com/coreos/jetcd/EtcdWatch.java index 497b7fe04..3a71078d4 100644 --- a/src/main/java/com/coreos/jetcd/EtcdWatch.java +++ b/src/main/java/com/coreos/jetcd/EtcdWatch.java @@ -5,6 +5,7 @@ import com.coreos.jetcd.options.WatchOption; import com.coreos.jetcd.watch.WatchEvent; +import java.io.Closeable; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -26,10 +27,11 @@ public interface EtcdWatch { */ CompletableFuture watch(ByteSequence key, WatchOption watchOption, WatchCallback callback); - interface Watcher{ + interface Watcher extends Closeable { /** * get watcher id + * * @return id */ long getWatchID(); @@ -42,12 +44,14 @@ interface Watcher{ /** * get the watch option + * * @return watch option */ WatchOption getWatchOption(); /** * cancel the watcher + * * @return cancel result */ CompletableFuture cancel(); diff --git a/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java b/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java index 40e5e27f7..6efb021fa 100644 --- a/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java +++ b/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java @@ -8,11 +8,10 @@ import io.grpc.stub.StreamObserver; import javafx.util.Pair; +import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.*; import static com.coreos.jetcd.EtcdUtil.apiToClientEvents; import static com.coreos.jetcd.EtcdUtil.apiToClientHeader; @@ -75,7 +74,7 @@ protected CompletableFuture cancelWatch(long id) { WatchCancelRequest cancelRequest = WatchCancelRequest.newBuilder().setWatchId(id).build(); WatchRequest request = WatchRequest.newBuilder().setCancelRequest(cancelRequest).build(); - this.requestStream.onNext(request); + getRequestStream().onNext(request); return completableFuture; } @@ -154,7 +153,8 @@ private void processCreate(WatchResponse response) { if (response.getCreated()) { if (response.getCanceled() || response.getCompactRevision() != 0) { watcher.setCanceled(true); - requestPair.getValue().completeExceptionally(new WatchCreateException("the start revision has been compacted", apiToClientHeader(response.getHeader(), response.getCompactRevision())));; + requestPair.getValue().completeExceptionally(new WatchCreateException("the start revision has been compacted", apiToClientHeader(response.getHeader(), response.getCompactRevision()))); + ; } if (response.getWatchId() == -1 && watcher.callback != null) { @@ -379,6 +379,37 @@ private void setResuming(boolean resuming) { this.resuming = resuming; } + /** + * Closes this stream and releases any system resources associated + * with it. If the stream is already closed then invoking this + * method has no effect. + *

+ *

As noted in {@link AutoCloseable#close()}, cases where the + * close may fail require careful attention. It is strongly advised + * to relinquish the underlying resources and to internally + * mark the {@code Closeable} as closed, prior to throwing + * the {@code IOException}. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + if (!isCanceled()) { + try { + if (!cancel().get(5, TimeUnit.SECONDS)) { + + } + } catch (InterruptedException e) { + throw new IOException("Close was interrupted.", e); + } catch (ExecutionException e) { + throw new IOException("Exception during execute.", e); + } catch (TimeoutException e) { + throw new IOException("Close out of time.", e); + } finally { + setCanceled(true); + } + } + } } } diff --git a/src/main/java/com/coreos/jetcd/data/ByteSequence.java b/src/main/java/com/coreos/jetcd/data/ByteSequence.java index f4be329c6..ff6c8ea19 100644 --- a/src/main/java/com/coreos/jetcd/data/ByteSequence.java +++ b/src/main/java/com/coreos/jetcd/data/ByteSequence.java @@ -5,22 +5,19 @@ import java.io.UnsupportedEncodingException; import java.nio.CharBuffer; import java.nio.charset.Charset; -import java.util.Arrays; /** * Etcd binary bytes, easy to convert between byte[], String and ByteString. */ public class ByteSequence { - private final byte[] bytes; private final int hashVal; private final ByteString byteString; public ByteSequence(byte[] source) { - this.bytes = Arrays.copyOf(source, source.length); - hashVal = calcHashCore(); - byteString = toByteString(); + hashVal = calcHashCore(source); + byteString = toByteString(source); } protected ByteSequence(ByteString byteString) { @@ -45,17 +42,9 @@ public boolean equals(Object obj) { return true; } if (obj instanceof ByteSequence) { - ByteSequence target = (ByteSequence) obj; - if (target.bytes.length == this.bytes.length) { - for (int i = 0; i < this.bytes.length; ++i) { - if (bytes[i] != target.bytes[i]) { - return false; - } - } - return true; - } else { - return false; - } + ByteSequence other = (ByteSequence) obj; + if (other.hashCode() != hashCode()) return false; + return byteString.equals(other.byteString); } else { return false; } @@ -65,11 +54,11 @@ protected ByteString getByteString() { return this.byteString; } - private ByteString toByteString() { + private ByteString toByteString(byte[] bytes) { return ByteString.copyFrom(bytes); } - private int calcHashCore() { + private int calcHashCore(byte[] bytes) { int result = 0; for (int i = 0; i < bytes.length; ++i) { result = 31 * result + bytes[i]; @@ -95,7 +84,7 @@ public String toString(String charsetName) throws UnsupportedEncodingException { } public byte[] getBytes() { - return Arrays.copyOf(bytes, bytes.length); + return byteString.toByteArray(); } public static ByteSequence fromString(String string) { diff --git a/src/main/java/com/coreos/jetcd/options/WatchOption.java b/src/main/java/com/coreos/jetcd/options/WatchOption.java index ec842960c..184eba4c7 100644 --- a/src/main/java/com/coreos/jetcd/options/WatchOption.java +++ b/src/main/java/com/coreos/jetcd/options/WatchOption.java @@ -1,12 +1,8 @@ package com.coreos.jetcd.options; -import com.coreos.jetcd.api.WatchCreateRequest; import com.coreos.jetcd.data.ByteSequence; -import com.google.protobuf.ByteString; -import java.util.List; import java.util.Optional; -import java.util.Set; /** * The option for watch operation. From 20c19b53aa3058ab805f60c00863736080f7822a Mon Sep 17 00:00:00 2001 From: StupidHod Date: Fri, 2 Dec 2016 12:10:43 +0800 Subject: [PATCH 5/8] change EtcdWatchImpl constract interface --- src/main/java/com/coreos/jetcd/EtcdClient.java | 6 ++++++ src/main/java/com/coreos/jetcd/EtcdWatchImpl.java | 6 ++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/coreos/jetcd/EtcdClient.java b/src/main/java/com/coreos/jetcd/EtcdClient.java index 7cc315479..850017aa6 100644 --- a/src/main/java/com/coreos/jetcd/EtcdClient.java +++ b/src/main/java/com/coreos/jetcd/EtcdClient.java @@ -34,6 +34,7 @@ public class EtcdClient { private final Supplier maintenanceClient; private final Supplier clusterClient; private final Supplier leaseClient; + private final Supplier watchClient; public EtcdClient(EtcdClientBuilder builder) throws ConnectException, AuthFailedException { this(Optional.empty(), builder); @@ -62,6 +63,7 @@ private EtcdClient(Optional> channelBuilder, EtcdClient this.maintenanceClient = Suppliers.memoize(() -> new EtcdMaintenanceImpl(channel, token)); this.clusterClient = Suppliers.memoize(() -> new EtcdClusterImpl(channel, token)); this.leaseClient = Suppliers.memoize(() -> new EtcdLeaseImpl(channel, token)); + this.watchClient = Suppliers.memoize(() -> new EtcdWatchImpl(channel, token)); } // ************************ @@ -88,6 +90,10 @@ public EtcdLease getLeaseClient() { return this.leaseClient.get(); } + public EtcdWatch getWatchClient() { + return this.watchClient.get(); + } + public void close() { channel.shutdownNow(); } diff --git a/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java b/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java index 6efb021fa..7cb6886c7 100644 --- a/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java +++ b/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java @@ -5,12 +5,14 @@ import com.coreos.jetcd.options.WatchOption; import com.coreos.jetcd.watch.WatchCreateException; import com.google.protobuf.ByteString; +import io.grpc.ManagedChannel; import io.grpc.stub.StreamObserver; import javafx.util.Pair; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.*; import static com.coreos.jetcd.EtcdUtil.apiToClientEvents; @@ -30,8 +32,8 @@ public class EtcdWatchImpl implements EtcdWatch { private ConcurrentLinkedQueue>> pendingCreateWatchers = new ConcurrentLinkedQueue<>(); private Map> pendingCancelFutures = new ConcurrentHashMap<>(); - public EtcdWatchImpl(WatchGrpc.WatchStub watchStub) { - this.watchStub = watchStub; + public EtcdWatchImpl(ManagedChannel channel, Optional token) { + this.watchStub = EtcdClientUtil.configureStub(WatchGrpc.newStub(channel), token); } /** From b1538853b2cbf381704209d9c101517ff271a536 Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Wed, 26 Apr 2017 14:06:24 -0700 Subject: [PATCH 6/8] style: fix style warning on watch impl and allows VariableDeclarationUsageDistance to be 4. --- properties/checkstyle.xml | 4 +- src/main/java/com/coreos/jetcd/EtcdUtil.java | 110 +-- src/main/java/com/coreos/jetcd/EtcdWatch.java | 94 +-- .../java/com/coreos/jetcd/EtcdWatchImpl.java | 718 +++++++++--------- .../com/coreos/jetcd/data/ByteSequence.java | 149 ++-- .../com/coreos/jetcd/data/EtcdHeader.java | 68 +- .../java/com/coreos/jetcd/data/KeyValue.java | 79 +- .../com/coreos/jetcd/options/WatchOption.java | 310 ++++---- .../jetcd/watch/WatchCreateException.java | 10 +- .../com/coreos/jetcd/watch/WatchEvent.java | 44 +- .../java/com/coreos/jetcd/EtcdWatchTest.java | 169 +++-- 11 files changed, 890 insertions(+), 865 deletions(-) diff --git a/properties/checkstyle.xml b/properties/checkstyle.xml index 7b6358e79..35d83816d 100644 --- a/properties/checkstyle.xml +++ b/properties/checkstyle.xml @@ -171,7 +171,9 @@ - + + + diff --git a/src/main/java/com/coreos/jetcd/EtcdUtil.java b/src/main/java/com/coreos/jetcd/EtcdUtil.java index 101e7a865..e5318c68f 100644 --- a/src/main/java/com/coreos/jetcd/EtcdUtil.java +++ b/src/main/java/com/coreos/jetcd/EtcdUtil.java @@ -7,7 +7,6 @@ import com.coreos.jetcd.data.KeyValue; import com.coreos.jetcd.watch.WatchEvent; import com.google.protobuf.ByteString; - import java.util.ArrayList; import java.util.List; @@ -16,65 +15,68 @@ */ class EtcdUtil { - private EtcdUtil() { - } + private EtcdUtil() { + } - /** - * convert ByteSequence to ByteString - */ - protected static ByteString byteStringFromByteSequence(ByteSequence byteSequence) { - return ByteString.copyFrom(byteSequence.getBytes()); - } + /** + * convert ByteSequence to ByteString. + */ + protected static ByteString byteStringFromByteSequence(ByteSequence byteSequence) { + return ByteString.copyFrom(byteSequence.getBytes()); + } - /** - * convert ByteString to ByteSequence - * @return - */ - protected static ByteSequence byteSequceFromByteString(ByteString byteString) { - return ByteSequence.fromBytes(byteString.toByteArray()); - } + /** + * convert ByteString to ByteSequence. + */ + protected static ByteSequence byteSequceFromByteString(ByteString byteString) { + return ByteSequence.fromBytes(byteString.toByteArray()); + } - /** - * convert API KeyValue to etcd client KeyValue - */ - protected static KeyValue apiToClientKV(com.coreos.jetcd.api.KeyValue keyValue) { - return new KeyValue( - byteSequceFromByteString(keyValue.getKey()), - byteSequceFromByteString(keyValue.getValue()), - keyValue.getCreateRevision(), - keyValue.getModRevision(), - keyValue.getVersion(), - keyValue.getLease()); - } + /** + * convert API KeyValue to etcd client KeyValue. + */ + protected static KeyValue apiToClientKV(com.coreos.jetcd.api.KeyValue keyValue) { + return new KeyValue( + byteSequceFromByteString(keyValue.getKey()), + byteSequceFromByteString(keyValue.getValue()), + keyValue.getCreateRevision(), + keyValue.getModRevision(), + keyValue.getVersion(), + keyValue.getLease()); + } - /** - * convert API watch event to etcd client event - */ - protected static WatchEvent apiToClientEvent(Event event) { - WatchEvent.EventType eventType = WatchEvent.EventType.UNRECOGNIZED; - switch (event.getType()) { - case DELETE: - eventType = WatchEvent.EventType.DELETE; - break; - case PUT: - eventType = WatchEvent.EventType.PUT; - break; - } - return new WatchEvent(apiToClientKV(event.getKv()), apiToClientKV(event.getPrevKv()), eventType); + /** + * convert API watch event to etcd client event. + */ + protected static WatchEvent apiToClientEvent(Event event) { + WatchEvent.EventType eventType; + switch (event.getType()) { + case DELETE: + eventType = WatchEvent.EventType.DELETE; + break; + case PUT: + eventType = WatchEvent.EventType.PUT; + break; + default: + eventType = WatchEvent.EventType.UNRECOGNIZED; } + return new WatchEvent(apiToClientKV(event.getKv()), apiToClientKV(event.getPrevKv()), + eventType); + } - protected static List apiToClientEvents(List events) { - List watchEvents = new ArrayList<>(); - for (Event event : events) { - watchEvents.add(apiToClientEvent(event)); - } - return watchEvents; + protected static List apiToClientEvents(List events) { + List watchEvents = new ArrayList<>(); + for (Event event : events) { + watchEvents.add(apiToClientEvent(event)); } + return watchEvents; + } - /** - * convert API response header to self defined header - */ - protected static EtcdHeader apiToClientHeader(ResponseHeader header, long compactRevision) { - return new EtcdHeader(header.getClusterId(), header.getMemberId(), header.getRevision(), header.getRaftTerm(), compactRevision); - } + /** + * convert API response header to self defined header. + */ + protected static EtcdHeader apiToClientHeader(ResponseHeader header, long compactRevision) { + return new EtcdHeader(header.getClusterId(), header.getMemberId(), header.getRevision(), + header.getRaftTerm(), compactRevision); + } } diff --git a/src/main/java/com/coreos/jetcd/EtcdWatch.java b/src/main/java/com/coreos/jetcd/EtcdWatch.java index 3a71078d4..455a1df8e 100644 --- a/src/main/java/com/coreos/jetcd/EtcdWatch.java +++ b/src/main/java/com/coreos/jetcd/EtcdWatch.java @@ -4,71 +4,71 @@ import com.coreos.jetcd.data.EtcdHeader; import com.coreos.jetcd.options.WatchOption; import com.coreos.jetcd.watch.WatchEvent; - import java.io.Closeable; import java.util.List; import java.util.concurrent.CompletableFuture; /** - * Interface of watch client + * Interface of watch client. */ public interface EtcdWatch { + /** + * Watch watches on a key or prefix. The watched events will be called by onWatch. + * If the watch is slow or the required rev is compacted, the watch request + * might be canceled from the server-side and the onCreateFailed will be called. + * + * @param key the key subscribe + * @param watchOption key option + * @param callback call back + * @return ListenableFuture watcher + */ + CompletableFuture watch(ByteSequence key, WatchOption watchOption, + WatchCallback callback); + + interface Watcher extends Closeable { + /** - * Watch watches on a key or prefix. The watched events will be called by onWatch. - * If the watch is slow or the required rev is compacted, the watch request - * might be canceled from the server-side and the onCreateFailed will be called. + * get watcher id. * - * @param key the key subscribe - * @param watchOption key option - * @param callback call back - * @return ListenableFuture watcher + * @return id */ - CompletableFuture watch(ByteSequence key, WatchOption watchOption, WatchCallback callback); - - interface Watcher extends Closeable { + long getWatchID(); - /** - * get watcher id - * - * @return id - */ - long getWatchID(); + long getLastRevision(); - long getLastRevision(); + ByteSequence getKey(); - ByteSequence getKey(); + boolean isResuming(); - boolean isResuming(); - - /** - * get the watch option - * - * @return watch option - */ - WatchOption getWatchOption(); + /** + * get the watch option. + * + * @return watch option + */ + WatchOption getWatchOption(); - /** - * cancel the watcher - * - * @return cancel result - */ - CompletableFuture cancel(); - } + /** + * cancel the watcher. + * + * @return cancel result + */ + CompletableFuture cancel(); + } - interface WatchCallback { + interface WatchCallback { - /** - * onWatch will be called when watcher receive any events - * - * @param events received events - */ - void onWatch(EtcdHeader header, List events); + /** + * onWatch will be called when watcher receive any events. + * + * @param events received events + */ + void onWatch(EtcdHeader header, List events); - /** - * onResuming will be called when the watcher is on resuming. - */ - void onResuming(); - } + /** + * onResuming will be called when the watcher is on resuming. + */ + void onResuming(); + } } diff --git a/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java b/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java index 7cb6886c7..fa9d1eeba 100644 --- a/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java +++ b/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java @@ -1,417 +1,429 @@ package com.coreos.jetcd; -import com.coreos.jetcd.api.*; +import static com.coreos.jetcd.EtcdUtil.apiToClientEvents; +import static com.coreos.jetcd.EtcdUtil.apiToClientHeader; + +import com.coreos.jetcd.api.Event; +import com.coreos.jetcd.api.WatchCancelRequest; +import com.coreos.jetcd.api.WatchCreateRequest; +import com.coreos.jetcd.api.WatchGrpc; +import com.coreos.jetcd.api.WatchRequest; +import com.coreos.jetcd.api.WatchResponse; import com.coreos.jetcd.data.ByteSequence; import com.coreos.jetcd.options.WatchOption; import com.coreos.jetcd.watch.WatchCreateException; import com.google.protobuf.ByteString; import io.grpc.ManagedChannel; import io.grpc.stub.StreamObserver; -import javafx.util.Pair; - import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.*; - -import static com.coreos.jetcd.EtcdUtil.apiToClientEvents; -import static com.coreos.jetcd.EtcdUtil.apiToClientHeader; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javafx.util.Pair; /** - * etcd watcher Implementation + * etcd watcher Implementation. */ public class EtcdWatchImpl implements EtcdWatch { - private volatile StreamObserver requestStream; + private volatile StreamObserver requestStream; + + private ConcurrentHashMap watchers = new ConcurrentHashMap<>(); + + private final WatchGrpc.WatchStub watchStub; + + private ConcurrentLinkedQueue>> + pendingCreateWatchers = new ConcurrentLinkedQueue<>(); + private Map> pendingCancelFutures = new ConcurrentHashMap<>(); + + public EtcdWatchImpl(ManagedChannel channel, Optional token) { + this.watchStub = EtcdClientUtil.configureStub(WatchGrpc.newStub(channel), token); + } + + /** + * Watch watches on a key or prefix. The watched events will be called by onWatch. + * If the watch is slow or the required rev is compacted, the watch request + * might be canceled from the server-side and the onCreateFailed will be called. + * + * @param key the key subscribe + * @param watchOption key option + * @param callback call back + * @return CompletableFuture watcher + */ + @Override + public CompletableFuture watch(ByteSequence key, WatchOption watchOption, + WatchCallback callback) { + WatchRequest request = optionToWatchCreateRequest(EtcdUtil.byteStringFromByteSequence(key), + watchOption); + WatcherImpl watcher = new WatcherImpl(key, watchOption, callback); + CompletableFuture waitFuture = new CompletableFuture(); + this.pendingCreateWatchers.add(new Pair<>(watcher, waitFuture)); + getRequestStream().onNext(request); + return waitFuture; + } + + /** + * Cancel the watch task with the watcher, the onCanceled will be called after successfully + * canceled. + * + * @param id the watcher to be canceled + */ + protected CompletableFuture cancelWatch(long id) { + WatcherImpl temp = watchers.get(id); + CompletableFuture completableFuture = null; + if (temp != null) { + synchronized (temp) { + if (this.watchers.containsKey(temp.getWatchID())) { + this.watchers.remove(temp.getWatchID()); + completableFuture = new CompletableFuture<>(); + this.pendingCancelFutures.put(id, completableFuture); + } + } + } - private ConcurrentHashMap watchers = new ConcurrentHashMap<>(); + WatchCancelRequest cancelRequest = WatchCancelRequest.newBuilder().setWatchId(id).build(); + WatchRequest request = WatchRequest.newBuilder().setCancelRequest(cancelRequest).build(); + getRequestStream().onNext(request); + return completableFuture; + } + + /** + * empty the old request stream, watchers and resume the old watchers empty the + * pendingCancelFutures as there is no need to cancel, the old request stream has been dead. + */ + private synchronized void resume() { + this.requestStream = null; + WatcherImpl[] resumeWatchers = (WatcherImpl[]) watchers.values().toArray(); + this.watchers.clear(); + for (CompletableFuture watcherCompletableFuture : pendingCancelFutures.values()) { + watcherCompletableFuture.complete(Boolean.TRUE); + } + this.pendingCancelFutures.clear(); + resumeWatchers(resumeWatchers); + } + + /** + * single instance method to get request stream, empty the old requestStream, so we will get a new + * requestStream automatically + * + *

the responseStream will distribute the create, cancel, normal + * response to processCreate, processCanceled and processEvents + * + *

if error happened, the + * requestStream will be closed by server side, so we call resume to resume all ongoing watchers. + */ + private StreamObserver getRequestStream() { + if (this.requestStream == null) { + synchronized (this) { + if (this.requestStream == null) { + StreamObserver watchResponseStreamObserver = + new StreamObserver() { + @Override + public void onNext(WatchResponse watchResponse) { + if (watchResponse.getCreated()) { + processCreate(watchResponse); + } else if (watchResponse.getCanceled()) { + processCanceled(watchResponse); + } else { + processEvents(watchResponse); + } + } - private final WatchGrpc.WatchStub watchStub; + @Override + public void onError(Throwable throwable) { + resume(); + } - private ConcurrentLinkedQueue>> pendingCreateWatchers = new ConcurrentLinkedQueue<>(); - private Map> pendingCancelFutures = new ConcurrentHashMap<>(); + @Override + public void onCompleted() { - public EtcdWatchImpl(ManagedChannel channel, Optional token) { - this.watchStub = EtcdClientUtil.configureStub(WatchGrpc.newStub(channel), token); + } + }; + this.requestStream = this.watchStub.watch(watchResponseStreamObserver); + } + } + } + return this.requestStream; + } + + /** + * Process create response from etcd server + * + *

If there is no pendingWatcher, ignore. + * + *

If cancel flag is true or CompactRevision not equal zero means the start revision + * has been compacted out of the store, call onCreateFailed. + * + *

If watchID = -1, complete future with WatchCreateException. + * + *

If everything is Ok, create watcher, complete CompletableFuture task and put the new watcher + * to the watchers map. + */ + private void processCreate(WatchResponse response) { + Pair> requestPair = pendingCreateWatchers.poll(); + WatcherImpl watcher = requestPair.getKey(); + if (response.getCreated()) { + if (response.getCanceled() || response.getCompactRevision() != 0) { + watcher.setCanceled(true); + requestPair.getValue().completeExceptionally( + new WatchCreateException("the start revision has been compacted", + apiToClientHeader(response.getHeader(), response.getCompactRevision()))); + ; + } + + if (response.getWatchId() == -1 && watcher.callback != null) { + requestPair.getValue().completeExceptionally( + new WatchCreateException("create watcher failed", + apiToClientHeader(response.getHeader(), response.getCompactRevision()))); + } else { + this.watchers.put(watcher.getWatchID(), watcher); + watcher.setWatchID(response.getWatchId()); + requestPair.getValue().complete(watcher); + } + + //note the header revision so that put following a current watcher disconnect will arrive + //on watcher channel after reconnect + synchronized (watcher) { + watcher.setLastRevision(response.getHeader().getRevision()); + if (watcher.isResuming()) { + watcher.setResuming(false); + } + } + } + } + + /** + * Process subscribe watch events + * + *

If the watch id is not in the watchers map, scan it in the pendingCancelFutures map + * if exist, ignore, otherwise cancel it. + * + *

If the watcher exist, call the onWatch and set the last revision for resume. + */ + private void processEvents(WatchResponse watchResponse) { + WatcherImpl watcher = watchers.get(watchResponse.getWatchId()); + if (watcher != null) { + synchronized (watcher) { + if (watchResponse.getEventsCount() != 0) { + List events = watchResponse.getEventsList(); + // if on resume process, filter processed events + if (watcher.isResuming()) { + long lastRevision = watcher.getLastRevision(); + events.removeIf((e) -> e.getKv().getModRevision() <= lastRevision); + } + watcher.setLastRevision( + watchResponse + .getEvents(watchResponse.getEventsCount() - 1) + .getKv().getModRevision()); + + if (watcher.callback != null) { + watcher.callback.onWatch( + apiToClientHeader(watchResponse.getHeader(), watchResponse.getCompactRevision()), + apiToClientEvents(events)); + } + } else { + watcher.setLastRevision(watchResponse.getHeader().getRevision()); + } + } + } else { + // if the watcher is not canceling, cancel it. + CompletableFuture completableFuture = this.pendingCancelFutures + .get(watchResponse.getWatchId()); + if (this.pendingCancelFutures.putIfAbsent(watcher.getWatchID(), completableFuture) == null) { + cancelWatch(watchResponse.getWatchId()); + } + } + } + + /** + * resume all the watchers. + */ + private void resumeWatchers(WatcherImpl[] watchers) { + for (WatcherImpl watcher : watchers) { + if (watcher.callback != null) { + watcher.callback.onResuming(); + } + watch(watcher.getKey(), getResumeWatchOptionWithWatcher(watcher), watcher.callback); + } + } + + /** + * Process cancel response from etcd server. + */ + private void processCanceled(WatchResponse response) { + CompletableFuture cancelFuture = this.pendingCancelFutures + .remove(response.getWatchId()); + cancelFuture.complete(Boolean.TRUE); + } + + /** + * convert WatcherOption to WatchRequest. + */ + private WatchRequest optionToWatchCreateRequest(ByteString key, WatchOption option) { + WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder() + .setKey(key) + .setPrevKv(option.isPrevKV()) + .setProgressNotify(option.isProgressNotify()) + .setStartRevision(option.getRevision()); + + if (option.getEndKey().isPresent()) { + builder.setRangeEnd(EtcdUtil.byteStringFromByteSequence(option.getEndKey().get())); } - /** - * Watch watches on a key or prefix. The watched events will be called by onWatch. - * If the watch is slow or the required rev is compacted, the watch request - * might be canceled from the server-side and the onCreateFailed will be called. - * - * @param key the key subscribe - * @param watchOption key option - * @param callback call back - * @return CompletableFuture watcher - */ - @Override - public CompletableFuture watch(ByteSequence key, WatchOption watchOption, WatchCallback callback) { - WatchRequest request = optionToWatchCreateRequest(EtcdUtil.byteStringFromByteSequence(key), watchOption); - WatcherImpl watcher = new WatcherImpl(key, watchOption, callback); - CompletableFuture waitFuture = new CompletableFuture(); - this.pendingCreateWatchers.add(new Pair<>(watcher, waitFuture)); - getRequestStream().onNext(request); - return waitFuture; + if (option.isNoDelete()) { + builder.addFilters(WatchCreateRequest.FilterType.NODELETE); } - /** - * Cancel the watch task with the watcher, the onCanceled will be called after successfully canceled. - * - * @param id the watcher to be canceled - */ - protected CompletableFuture cancelWatch(long id) { - WatcherImpl temp = watchers.get(id); - CompletableFuture completableFuture = null; - if (temp != null) { - synchronized (temp) { - if (this.watchers.containsKey(temp.getWatchID())) { - this.watchers.remove(temp.getWatchID()); - completableFuture = new CompletableFuture<>(); - this.pendingCancelFutures.put(id, completableFuture); - } - } - } + if (option.isNoPut()) { + builder.addFilters(WatchCreateRequest.FilterType.NOPUT); + } - WatchCancelRequest cancelRequest = WatchCancelRequest.newBuilder().setWatchId(id).build(); - WatchRequest request = WatchRequest.newBuilder().setCancelRequest(cancelRequest).build(); - getRequestStream().onNext(request); - return completableFuture; + return WatchRequest.newBuilder().setCreateRequest(builder).build(); + } + + /** + * build new WatchOption from dead to resume it in new requestStream. + */ + private WatchOption getResumeWatchOptionWithWatcher(Watcher watcher) { + WatchOption oldOption = watcher.getWatchOption(); + return WatchOption.newBuilder().withNoDelete(oldOption.isNoDelete()) + .withNoPut(oldOption.isNoPut()) + .withPrevKV(oldOption.isPrevKV()) + .withProgressNotify(oldOption.isProgressNotify()) + .withRange(oldOption.getEndKey().get()) + .withRevision(watcher.getLastRevision() + 1) + .withResuming(true) + .build(); + } + + + /** + * Watcher class hold watcher information. + */ + public class WatcherImpl implements Watcher { + + private final WatchOption watchOption; + private final ByteSequence key; + + public final WatchCallback callback; + private long watchID; + + private long lastRevision = -1; + private boolean canceled = false; + + private boolean resuming; + + private WatcherImpl(ByteSequence key, WatchOption watchOption, WatchCallback callback) { + this.key = key; + this.watchOption = watchOption; + this.callback = callback; + this.resuming = watchOption.isResuming(); } - /** - * empty the old request stream, watchers and resume the old watchers - * empty the pendingCancelFutures as there is no need to cancel, the old request stream has been dead - */ - private synchronized void resume() { - this.requestStream = null; - WatcherImpl[] resumeWatchers = (WatcherImpl[]) watchers.values().toArray(); - this.watchers.clear(); - for (CompletableFuture watcherCompletableFuture : pendingCancelFutures.values()) { - watcherCompletableFuture.complete(Boolean.TRUE); - } - this.pendingCancelFutures.clear(); - resumeWatchers(resumeWatchers); + @Override + public CompletableFuture cancel() { + return cancelWatch(watchID); } /** - * single instance method to get request stream, empty the old requestStream, so we will get a new - * requestStream automatically - *

the responseStream will distribute the create, cancel, normal response to - * processCreate, processCanceled and processEvents - *

if error happened, the requestStream will be closed by server side, so we call resume to resume - * all ongoing watchers + * set the last revision watcher received, used for resume. * - * @return + * @param lastRevision the last revision */ - private StreamObserver getRequestStream() { - if (this.requestStream == null) { - synchronized (this) { - if (this.requestStream == null) { - StreamObserver watchResponseStreamObserver = new StreamObserver() { - @Override - public void onNext(WatchResponse watchResponse) { - if (watchResponse.getCreated()) { - processCreate(watchResponse); - } else if (watchResponse.getCanceled()) { - processCanceled(watchResponse); - } else { - processEvents(watchResponse); - } - } - - @Override - public void onError(Throwable throwable) { - resume(); - } - - @Override - public void onCompleted() { - - } - }; - this.requestStream = this.watchStub.watch(watchResponseStreamObserver); - } - } - } - return this.requestStream; + private void setLastRevision(long lastRevision) { + this.lastRevision = lastRevision; } - /** - * Process create response from etcd server - *

If there is no pendingWatcher, ignore. - *

If cancel flag is true or CompactRevision not equal zero means the start revision - * has been compacted out of the store, call onCreateFailed. - *

If watchID = -1, complete future with WatchCreateException. - *

If everything is Ok, create watcher, complete CompletableFuture task and put the new watcher - * to the watchers map. - * - * @param response - */ - private void processCreate(WatchResponse response) { - Pair> requestPair = pendingCreateWatchers.poll(); - WatcherImpl watcher = requestPair.getKey(); - if (response.getCreated()) { - if (response.getCanceled() || response.getCompactRevision() != 0) { - watcher.setCanceled(true); - requestPair.getValue().completeExceptionally(new WatchCreateException("the start revision has been compacted", apiToClientHeader(response.getHeader(), response.getCompactRevision()))); - ; - } - - if (response.getWatchId() == -1 && watcher.callback != null) { - requestPair.getValue().completeExceptionally(new WatchCreateException("create watcher failed", apiToClientHeader(response.getHeader(), response.getCompactRevision()))); - } else { - this.watchers.put(watcher.getWatchID(), watcher); - watcher.setWatchID(response.getWatchId()); - requestPair.getValue().complete(watcher); - } - - //note the header revision so that put following a current watcher disconnect will arrive - //on watcher channel after reconnect - synchronized (watcher) { - watcher.setLastRevision(response.getHeader().getRevision()); - if (watcher.isResuming()) { - watcher.setResuming(false); - } - } - } + public boolean isCanceled() { + return canceled; } - /** - * Process subscribe watch events - *

If the watch id is not in the watchers map, scan it in the pendingCancelFutures map - * if exist, ignore, otherwise cancel it. - *

If the watcher exist, call the onWatch and set the last revision for resume - * - * @param watchResponse - */ - private void processEvents(WatchResponse watchResponse) { - WatcherImpl watcher = watchers.get(watchResponse.getWatchId()); - if (watcher != null) { - synchronized (watcher) { - if (watchResponse.getEventsCount() != 0) { - List events = watchResponse.getEventsList(); - // if on resume process, filter processed events - if (watcher.isResuming()) { - long lastRevision = watcher.getLastRevision(); - events.removeIf((e) -> e.getKv().getModRevision() <= lastRevision); - } - watcher.setLastRevision( - watchResponse - .getEvents(watchResponse.getEventsCount() - 1) - .getKv().getModRevision()); - - if (watcher.callback != null) { - watcher.callback.onWatch(apiToClientHeader(watchResponse.getHeader(), watchResponse.getCompactRevision()), apiToClientEvents(events)); - } - } else { - watcher.setLastRevision(watchResponse.getHeader().getRevision()); - } - } - } else { - // if the watcher is not canceling, cancel it. - CompletableFuture completableFuture = this.pendingCancelFutures.get(watchResponse.getWatchId()); - if (this.pendingCancelFutures.putIfAbsent(watcher.getWatchID(), completableFuture) == null) { - cancelWatch(watchResponse.getWatchId()); - } - } + private void setCanceled(boolean canceled) { + this.canceled = canceled; } /** - * resume all the watchers - * - * @param watchers + * get the watch id of the watcher. */ - private void resumeWatchers(WatcherImpl[] watchers) { - for (WatcherImpl watcher : watchers) { - if (watcher.callback != null) { - watcher.callback.onResuming(); - } - watch(watcher.getKey(), getResumeWatchOptionWithWatcher(watcher), watcher.callback); - } + public long getWatchID() { + return watchID; + } + + private void setWatchID(long watchID) { + this.watchID = watchID; + } + + public WatchOption getWatchOption() { + return watchOption; } /** - * Process cancel response from etcd server, + * get the last revision watcher received. * - * @param response + * @return last revision */ - private void processCanceled(WatchResponse response) { - CompletableFuture cancelFuture = this.pendingCancelFutures.remove(response.getWatchId()); - cancelFuture.complete(Boolean.TRUE); + public long getLastRevision() { + return lastRevision; } /** - * convert WatcherOption to WatchRequest + * get the watcher key. * - * @param key - * @param option - * @return + * @return watcher key */ - private WatchRequest optionToWatchCreateRequest(ByteString key, WatchOption option) { - WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder() - .setKey(key) - .setPrevKv(option.isPrevKV()) - .setProgressNotify(option.isProgressNotify()) - .setStartRevision(option.getRevision()); - - if (option.getEndKey().isPresent()) { - builder.setRangeEnd(EtcdUtil.byteStringFromByteSequence(option.getEndKey().get())); - } - - if (option.isNoDelete()) { - builder.addFilters(WatchCreateRequest.FilterType.NODELETE); - } - - if (option.isNoPut()) { - builder.addFilters(WatchCreateRequest.FilterType.NOPUT); - } - - return WatchRequest.newBuilder().setCreateRequest(builder).build(); + public ByteSequence getKey() { + return key; } /** - * build new WatchOption from dead to resume it in new requestStream - * - * @param watcher - * @return + * whether the watcher is resuming. */ - private WatchOption getResumeWatchOptionWithWatcher(Watcher watcher) { - WatchOption oldOption = watcher.getWatchOption(); - return WatchOption.newBuilder().withNoDelete(oldOption.isNoDelete()) - .withNoPut(oldOption.isNoPut()) - .withPrevKV(oldOption.isPrevKV()) - .withProgressNotify(oldOption.isProgressNotify()) - .withRange(oldOption.getEndKey().get()) - .withRevision(watcher.getLastRevision() + 1) - .withResuming(true) - .build(); + public boolean isResuming() { + return resuming; } + private void setResuming(boolean resuming) { + this.resuming = resuming; + } /** - * Watcher class hold watcher information. + * Closes this stream and releases any system resources associated + * with it. If the stream is already closed then invoking this + * method has no effect. + * + *

As noted in {@link AutoCloseable#close()}, cases where the + * close may fail require careful attention. It is strongly advised + * to relinquish the underlying resources and to internally + * mark the {@code Closeable} as closed, prior to throwing + * the {@code IOException}. + * + * @throws IOException if an I/O error occurs */ - public class WatcherImpl implements Watcher { - - private final WatchOption watchOption; - private final ByteSequence key; - - public final WatchCallback callback; - private long watchID; - - private long lastRevision = -1; - private boolean canceled = false; - - private boolean resuming; - - private WatcherImpl(ByteSequence key, WatchOption watchOption, WatchCallback callback) { - this.key = key; - this.watchOption = watchOption; - this.callback = callback; - this.resuming = watchOption.isResuming(); - } - - @Override - public CompletableFuture cancel() { - return cancelWatch(watchID); - } - - /** - * set the last revision watcher received, used for resume - * - * @param lastRevision the last revision - */ - private void setLastRevision(long lastRevision) { - this.lastRevision = lastRevision; - } - - public boolean isCanceled() { - return canceled; - } - - private void setCanceled(boolean canceled) { - this.canceled = canceled; - } - - /** - * get the watch id of the watcher - * - * @return - */ - public long getWatchID() { - return watchID; - } - - private void setWatchID(long watchID) { - this.watchID = watchID; - } - - public WatchOption getWatchOption() { - return watchOption; - } - - /** - * get the last revision watcher received - * - * @return last revision - */ - public long getLastRevision() { - return lastRevision; - } - - /** - * get the watcher key - * - * @return watcher key - */ - public ByteSequence getKey() { - return key; - } - - /** - * whether the watcher is resuming. - */ - public boolean isResuming() { - return resuming; - } - - private void setResuming(boolean resuming) { - this.resuming = resuming; - } - - /** - * Closes this stream and releases any system resources associated - * with it. If the stream is already closed then invoking this - * method has no effect. - *

- *

As noted in {@link AutoCloseable#close()}, cases where the - * close may fail require careful attention. It is strongly advised - * to relinquish the underlying resources and to internally - * mark the {@code Closeable} as closed, prior to throwing - * the {@code IOException}. - * - * @throws IOException if an I/O error occurs - */ - @Override - public void close() throws IOException { - if (!isCanceled()) { - try { - if (!cancel().get(5, TimeUnit.SECONDS)) { - - } - } catch (InterruptedException e) { - throw new IOException("Close was interrupted.", e); - } catch (ExecutionException e) { - throw new IOException("Exception during execute.", e); - } catch (TimeoutException e) { - throw new IOException("Close out of time.", e); - } finally { - setCanceled(true); - } - } + @Override + public void close() throws IOException { + if (!isCanceled()) { + try { + if (!cancel().get(5, TimeUnit.SECONDS)) { + // TODO: handle this case? + return; + } + } catch (InterruptedException e) { + throw new IOException("Close was interrupted.", e); + } catch (ExecutionException e) { + throw new IOException("Exception during execute.", e); + } catch (TimeoutException e) { + throw new IOException("Close out of time.", e); + } finally { + setCanceled(true); } + } } + } } diff --git a/src/main/java/com/coreos/jetcd/data/ByteSequence.java b/src/main/java/com/coreos/jetcd/data/ByteSequence.java index ff6c8ea19..107bf9ef8 100644 --- a/src/main/java/com/coreos/jetcd/data/ByteSequence.java +++ b/src/main/java/com/coreos/jetcd/data/ByteSequence.java @@ -1,7 +1,6 @@ package com.coreos.jetcd.data; import com.google.protobuf.ByteString; - import java.io.UnsupportedEncodingException; import java.nio.CharBuffer; import java.nio.charset.Charset; @@ -11,100 +10,102 @@ */ public class ByteSequence { - private final int hashVal; - private final ByteString byteString; + private final int hashVal; + private final ByteString byteString; - public ByteSequence(byte[] source) { - hashVal = calcHashCore(source); - byteString = toByteString(source); - } + public ByteSequence(byte[] source) { + hashVal = calcHashCore(source); + byteString = toByteString(source); + } - protected ByteSequence(ByteString byteString) { - this(byteString.toByteArray()); - } + protected ByteSequence(ByteString byteString) { + this(byteString.toByteArray()); + } - public ByteSequence(String string) { - this(string.getBytes()); - } + public ByteSequence(String string) { + this(string.getBytes()); + } - public ByteSequence(CharBuffer charBuffer) { - this(String.valueOf(charBuffer.array())); - } + public ByteSequence(CharBuffer charBuffer) { + this(String.valueOf(charBuffer.array())); + } - public ByteSequence(CharSequence charSequence) { - this(java.nio.CharBuffer.wrap(charSequence)); - } + public ByteSequence(CharSequence charSequence) { + this(java.nio.CharBuffer.wrap(charSequence)); + } - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - if (obj instanceof ByteSequence) { - ByteSequence other = (ByteSequence) obj; - if (other.hashCode() != hashCode()) return false; - return byteString.equals(other.byteString); - } else { - return false; - } + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; } - - protected ByteString getByteString() { - return this.byteString; + if (obj instanceof ByteSequence) { + ByteSequence other = (ByteSequence) obj; + if (other.hashCode() != hashCode()) { + return false; + } + return byteString.equals(other.byteString); + } else { + return false; } + } - private ByteString toByteString(byte[] bytes) { - return ByteString.copyFrom(bytes); - } + protected ByteString getByteString() { + return this.byteString; + } - private int calcHashCore(byte[] bytes) { - int result = 0; - for (int i = 0; i < bytes.length; ++i) { - result = 31 * result + bytes[i]; - } - return result; - } + private ByteString toByteString(byte[] bytes) { + return ByteString.copyFrom(bytes); + } - @Override - public int hashCode() { - return hashVal; + private int calcHashCore(byte[] bytes) { + int result = 0; + for (int i = 0; i < bytes.length; ++i) { + result = 31 * result + bytes[i]; } + return result; + } - public String toStringUtf8() { - return byteString.toStringUtf8(); - } + @Override + public int hashCode() { + return hashVal; + } - public String toString(Charset charset) { - return byteString.toString(charset); - } + public String toStringUtf8() { + return byteString.toStringUtf8(); + } - public String toString(String charsetName) throws UnsupportedEncodingException { - return byteString.toString(charsetName); - } + public String toString(Charset charset) { + return byteString.toString(charset); + } - public byte[] getBytes() { - return byteString.toByteArray(); - } + public String toString(String charsetName) throws UnsupportedEncodingException { + return byteString.toString(charsetName); + } - public static ByteSequence fromString(String string) { - return new ByteSequence(string); - } + public byte[] getBytes() { + return byteString.toByteArray(); + } - public static ByteSequence fromCharSequence(CharSequence charSequence) { - return new ByteSequence(charSequence); - } + public static ByteSequence fromString(String string) { + return new ByteSequence(string); + } - public static ByteSequence fromCharBuffer(CharBuffer charBuffer) { - return new ByteSequence(charBuffer); - } + public static ByteSequence fromCharSequence(CharSequence charSequence) { + return new ByteSequence(charSequence); + } - protected static ByteSequence fromByteString(ByteString byteString) { - return new ByteSequence(byteString); - } + public static ByteSequence fromCharBuffer(CharBuffer charBuffer) { + return new ByteSequence(charBuffer); + } - public static ByteSequence fromBytes(byte[] bytes) { - return new ByteSequence(bytes); - } + protected static ByteSequence fromByteString(ByteString byteString) { + return new ByteSequence(byteString); + } + + public static ByteSequence fromBytes(byte[] bytes) { + return new ByteSequence(bytes); + } } diff --git a/src/main/java/com/coreos/jetcd/data/EtcdHeader.java b/src/main/java/com/coreos/jetcd/data/EtcdHeader.java index 4eaeaa75c..5487252f1 100644 --- a/src/main/java/com/coreos/jetcd/data/EtcdHeader.java +++ b/src/main/java/com/coreos/jetcd/data/EtcdHeader.java @@ -4,37 +4,39 @@ * Etcd message header information. */ public class EtcdHeader { - private final long clusterId; - private final long memberId; - private final long revision; - private final long raftTerm; - private final long compactRevision; - - public EtcdHeader(long clusterId, long memberId, long revision, long raftTerm, long compactRevision) { - this.clusterId = clusterId; - this.memberId = memberId; - this.revision = revision; - this.raftTerm = raftTerm; - this.compactRevision = compactRevision; - } - - public long getClusterId() { - return clusterId; - } - - public long getMemberId() { - return memberId; - } - - public long getRevision() { - return revision; - } - - public long getRaftTerm() { - return raftTerm; - } - - public long getCompactRevision() { - return compactRevision; - } + + private final long clusterId; + private final long memberId; + private final long revision; + private final long raftTerm; + private final long compactRevision; + + public EtcdHeader(long clusterId, long memberId, long revision, long raftTerm, + long compactRevision) { + this.clusterId = clusterId; + this.memberId = memberId; + this.revision = revision; + this.raftTerm = raftTerm; + this.compactRevision = compactRevision; + } + + public long getClusterId() { + return clusterId; + } + + public long getMemberId() { + return memberId; + } + + public long getRevision() { + return revision; + } + + public long getRaftTerm() { + return raftTerm; + } + + public long getCompactRevision() { + return compactRevision; + } } diff --git a/src/main/java/com/coreos/jetcd/data/KeyValue.java b/src/main/java/com/coreos/jetcd/data/KeyValue.java index de4ded15d..17714246d 100644 --- a/src/main/java/com/coreos/jetcd/data/KeyValue.java +++ b/src/main/java/com/coreos/jetcd/data/KeyValue.java @@ -5,43 +5,44 @@ */ public class KeyValue { - private final ByteSequence key; - private final ByteSequence value; - private long createRevision = 0L; - private long modRevision = 0L; - private long version = 0L; - private long lease = 0L; - - public KeyValue(ByteSequence key, ByteSequence value, long createRevision, long modRevision, long version, long lease) { - this.key = key; - this.value = value; - this.createRevision = createRevision; - this.modRevision = modRevision; - this.version = version; - this.lease = lease; - } - - public ByteSequence getKey() { - return key; - } - - public ByteSequence getValue() { - return value; - } - - public long getCreateRevision() { - return createRevision; - } - - public long getModRevision() { - return modRevision; - } - - public long getVersion() { - return version; - } - - public long getLease() { - return lease; - } + private final ByteSequence key; + private final ByteSequence value; + private long createRevision = 0L; + private long modRevision = 0L; + private long version = 0L; + private long lease = 0L; + + public KeyValue(ByteSequence key, ByteSequence value, long createRevision, long modRevision, + long version, long lease) { + this.key = key; + this.value = value; + this.createRevision = createRevision; + this.modRevision = modRevision; + this.version = version; + this.lease = lease; + } + + public ByteSequence getKey() { + return key; + } + + public ByteSequence getValue() { + return value; + } + + public long getCreateRevision() { + return createRevision; + } + + public long getModRevision() { + return modRevision; + } + + public long getVersion() { + return version; + } + + public long getLease() { + return lease; + } } diff --git a/src/main/java/com/coreos/jetcd/options/WatchOption.java b/src/main/java/com/coreos/jetcd/options/WatchOption.java index 184eba4c7..ca3cdbc64 100644 --- a/src/main/java/com/coreos/jetcd/options/WatchOption.java +++ b/src/main/java/com/coreos/jetcd/options/WatchOption.java @@ -1,7 +1,6 @@ package com.coreos.jetcd.options; import com.coreos.jetcd.data.ByteSequence; - import java.util.Optional; /** @@ -9,186 +8,187 @@ */ public final class WatchOption { - public static final WatchOption DEFAULT = newBuilder().build(); + public static final WatchOption DEFAULT = newBuilder().build(); - /** - * Create a builder to construct option for watch operation. - * - * @return builder - */ - public static Builder newBuilder() { - return new Builder(); - } + /** + * Create a builder to construct option for watch operation. + * + * @return builder + */ + public static Builder newBuilder() { + return new Builder(); + } - public static class Builder { - - private long revision = 0L; - private Optional endKey = Optional.empty(); - private boolean prevKV = false; - private boolean progressNotify = false; - private boolean noPut = false; - private boolean noDelete = false; - private boolean resuming = false; - - private Builder() { - } - - /** - * Provide the revision to use for the get request. - *

If the revision is less or equal to zero, the get is over the newest key-value store. - *

If the revision has been compacted, ErrCompacted is returned as a response. - * - * @param revision the revision to get. - * @return builder - */ - public Builder withRevision(long revision) { - this.revision = revision; - return this; - } - - /** - * Set the end key of the get request. If it is set, the - * get request will return the keys from key to endKey (exclusive). - *

If end key is '\0', the range is all keys >= key. - *

If the end key is one bit larger than the given key, then it gets all keys with the prefix (the given key). - *

If both key and end key are '\0', it returns all keys. - * - * @param endKey end key - * @return builder - */ - public Builder withRange(ByteSequence endKey) { - this.endKey = Optional.ofNullable(endKey); - return this; - } - - /** - * When prevKV is set, created watcher gets the previous KV before the event happens, - * if the previous KV is not compacted. - * - * @return builder - */ - public Builder withPrevKV(boolean prevKV) { - this.prevKV = prevKV; - return this; - } - - /** - * When progressNotify is set, the watch server send periodic progress updates. - * Progress updates have zero events in WatchResponse - * - * @return builder - */ - public Builder withProgressNotify(boolean progressNotify) { - this.progressNotify = progressNotify; - return this; - } - - /** - * filter out put event in server side - * - * @param noPut - * @return - */ - public Builder withNoPut(boolean noPut) { - this.noPut = noPut; - return this; - } - - public Builder withResuming(boolean resuming){ - this.resuming = resuming; - return this; - } - /** - * filter out delete event in server side - * - * @param noDelete - * @return - */ - public Builder withNoDelete(boolean noDelete) { - this.noDelete = noDelete; - return this; - } - - public WatchOption build() { - return new WatchOption( - endKey, - revision, - prevKV, - progressNotify, - noPut, - noDelete, - resuming); - } + public static class Builder { - } + private long revision = 0L; + private Optional endKey = Optional.empty(); + private boolean prevKV = false; + private boolean progressNotify = false; + private boolean noPut = false; + private boolean noDelete = false; + private boolean resuming = false; - private final Optional endKey; - private final long revision; - private final boolean prevKV; - private final boolean progressNotify; - private final boolean noPut; - private final boolean noDelete; - private final boolean resuming; - - private WatchOption(Optional endKey, - long revision, - boolean prevKV, - boolean progressNotify, - boolean noPut, - boolean noDelete, - boolean resuming) { - this.endKey = endKey; - this.revision = revision; - this.prevKV = prevKV; - this.progressNotify = progressNotify; - this.noPut = noPut; - this.noDelete = noDelete; - this.resuming = resuming; + private Builder() { } - public Optional getEndKey() { - return this.endKey; + /** + * Provide the revision to use for the get request. + * + *

If the revision is less or equal to zero, the get is over the newest key-value store. + * + *

If the revision has been compacted, ErrCompacted is returned as a response. + * + * @param revision the revision to get. + * @return builder + */ + public Builder withRevision(long revision) { + this.revision = revision; + return this; } - public long getRevision() { - return revision; + /** + * Set the end key of the get request. If it is set, the get request will return the keys from + * key to endKey (exclusive). + * + *

If end key is '\0', the range is all keys >= key. + * + *

If the end key is one bit larger than the given key, then it gets all keys with the prefix + * (the given key). + * + *

If both key and end key are '\0', it returns all keys. + * + * @param endKey end key + * @return builder + */ + public Builder withRange(ByteSequence endKey) { + this.endKey = Optional.ofNullable(endKey); + return this; } /** - * Whether created watcher gets the previous KV before the event happens. + * When prevKV is set, created watcher gets the previous KV before the event happens, + * if the previous KV is not compacted. + * + * @return builder */ - public boolean isPrevKV() { - return prevKV; + public Builder withPrevKV(boolean prevKV) { + this.prevKV = prevKV; + return this; } /** - * Whether watcher server send periodic progress updates. + * When progressNotify is set, the watch server send periodic progress updates. + * Progress updates have zero events in WatchResponse. * - * @return if true, watcher server should send periodic progress updates. + * @return builder */ - public boolean isProgressNotify() { - return progressNotify; + public Builder withProgressNotify(boolean progressNotify) { + this.progressNotify = progressNotify; + return this; } /** - * Whether filter put event in server side - * - * @return if true, filter put event in server side + * filter out put event in server side. */ - public boolean isNoPut() { - return noPut; + public Builder withNoPut(boolean noPut) { + this.noPut = noPut; + return this; + } + + public Builder withResuming(boolean resuming) { + this.resuming = resuming; + return this; } /** - * Whether filter delete event in server side - * - * @return if true, filter delete event in server side + * filter out delete event in server side. */ - public boolean isNoDelete() { - return noDelete; + public Builder withNoDelete(boolean noDelete) { + this.noDelete = noDelete; + return this; } - public boolean isResuming(){ - return resuming; + public WatchOption build() { + return new WatchOption( + endKey, + revision, + prevKV, + progressNotify, + noPut, + noDelete, + resuming); } + + } + + private final Optional endKey; + private final long revision; + private final boolean prevKV; + private final boolean progressNotify; + private final boolean noPut; + private final boolean noDelete; + private final boolean resuming; + + private WatchOption(Optional endKey, + long revision, + boolean prevKV, + boolean progressNotify, + boolean noPut, + boolean noDelete, + boolean resuming) { + this.endKey = endKey; + this.revision = revision; + this.prevKV = prevKV; + this.progressNotify = progressNotify; + this.noPut = noPut; + this.noDelete = noDelete; + this.resuming = resuming; + } + + public Optional getEndKey() { + return this.endKey; + } + + public long getRevision() { + return revision; + } + + /** + * Whether created watcher gets the previous KV before the event happens. + */ + public boolean isPrevKV() { + return prevKV; + } + + /** + * Whether watcher server send periodic progress updates. + * + * @return if true, watcher server should send periodic progress updates. + */ + public boolean isProgressNotify() { + return progressNotify; + } + + /** + * Whether filter put event in server side. + * + * @return if true, filter put event in server side + */ + public boolean isNoPut() { + return noPut; + } + + /** + * Whether filter delete event in server side. + * + * @return if true, filter delete event in server side + */ + public boolean isNoDelete() { + return noDelete; + } + + public boolean isResuming() { + return resuming; + } } diff --git a/src/main/java/com/coreos/jetcd/watch/WatchCreateException.java b/src/main/java/com/coreos/jetcd/watch/WatchCreateException.java index ccc3e342c..92ea7ea18 100644 --- a/src/main/java/com/coreos/jetcd/watch/WatchCreateException.java +++ b/src/main/java/com/coreos/jetcd/watch/WatchCreateException.java @@ -7,10 +7,10 @@ */ public class WatchCreateException extends Exception { - public final EtcdHeader header; + public final EtcdHeader header; - public WatchCreateException(String cause, EtcdHeader header) { - super(cause); - this.header = header; - } + public WatchCreateException(String cause, EtcdHeader header) { + super(cause); + this.header = header; + } } diff --git a/src/main/java/com/coreos/jetcd/watch/WatchEvent.java b/src/main/java/com/coreos/jetcd/watch/WatchEvent.java index 11d88cda6..21ee09888 100644 --- a/src/main/java/com/coreos/jetcd/watch/WatchEvent.java +++ b/src/main/java/com/coreos/jetcd/watch/WatchEvent.java @@ -7,33 +7,33 @@ */ public class WatchEvent { - public enum EventType { - PUT, - DELETE, - UNRECOGNIZED, - } + public enum EventType { + PUT, + DELETE, + UNRECOGNIZED, + } - private final KeyValue keyValue; + private final KeyValue keyValue; - private final KeyValue prevKV; + private final KeyValue prevKV; - private final EventType eventType; + private final EventType eventType; - public WatchEvent(KeyValue keyValue, KeyValue prevKV, EventType eventType) { - this.keyValue = keyValue; - this.prevKV = prevKV; - this.eventType = eventType; - } + public WatchEvent(KeyValue keyValue, KeyValue prevKV, EventType eventType) { + this.keyValue = keyValue; + this.prevKV = prevKV; + this.eventType = eventType; + } - public KeyValue getKeyValue() { - return keyValue; - } + public KeyValue getKeyValue() { + return keyValue; + } - public KeyValue getPrevKV() { - return prevKV; - } + public KeyValue getPrevKV() { + return prevKV; + } - public EventType getEventType() { - return eventType; - } + public EventType getEventType() { + return eventType; + } } diff --git a/src/test/java/com/coreos/jetcd/EtcdWatchTest.java b/src/test/java/com/coreos/jetcd/EtcdWatchTest.java index 2eabbd1cf..a823254c4 100644 --- a/src/test/java/com/coreos/jetcd/EtcdWatchTest.java +++ b/src/test/java/com/coreos/jetcd/EtcdWatchTest.java @@ -6,95 +6,100 @@ import com.coreos.jetcd.exception.ConnectException; import com.coreos.jetcd.options.WatchOption; import com.coreos.jetcd.watch.WatchEvent; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; import org.testng.asserts.Assertion; -import java.util.List; -import java.util.concurrent.*; - /** * watch test case. */ public class EtcdWatchTest { - private EtcdClient client; - private EtcdWatch watchClient; - private EtcdKV kvClient; - private BlockingQueue eventsQueue = new LinkedBlockingDeque<>(); - - private ByteSequence key = ByteSequence.fromString("test_key"); - private ByteSequence value = ByteSequence.fromString("test_val"); - private EtcdWatchImpl.Watcher watcher; - - private Assertion test = new Assertion(); - - @BeforeTest - public void newEtcdClient() throws AuthFailedException, ConnectException { - client = EtcdClientBuilder.newBuilder().endpoints("localhost:2379").build(); - watchClient = client.getWatchClient(); - kvClient = client.getKVClient(); - } - - @Test - public void testWatch() throws ExecutionException, InterruptedException { - WatchOption option = WatchOption.DEFAULT; - watcher = watchClient.watch(key, option, new EtcdWatch.WatchCallback() { - - /** - * onWatch will be called when watcher receive any events - * - * @param header - * @param events received events - */ - @Override - public void onWatch(EtcdHeader header, List events) { - EtcdWatchTest.this.eventsQueue.addAll(events); - } - - /** - * onResuming will be called when the watcher is on resuming. - */ - @Override - public void onResuming() { - - } - - }).get(); - - } - - /** - * watch put operation on key - * assert whether receive put event - */ - @Test(dependsOnMethods = "testWatch") - public void testWatchPut() throws InterruptedException { - kvClient.put(EtcdUtil.byteStringFromByteSequence(key), EtcdUtil.byteStringFromByteSequence(value)); - WatchEvent event = eventsQueue.poll(5, TimeUnit.SECONDS); - test.assertEquals(event.getKeyValue().getKey(), key); - test.assertEquals(event.getEventType(), WatchEvent.EventType.PUT); - } - - /** - * watch delete operation on key - * assert whether receive delete event - */ - @Test(dependsOnMethods = "testWatchPut") - public void testWatchDelete() throws InterruptedException { - kvClient.delete(EtcdUtil.byteStringFromByteSequence(key)); - WatchEvent event = eventsQueue.poll(5, TimeUnit.SECONDS); - test.assertEquals(event.getKeyValue().getKey(), key); - test.assertEquals(event.getEventType(), WatchEvent.EventType.DELETE); - } - - /** - * cancel watch test case - * assert whether receive cancel response - */ - @Test(dependsOnMethods = "testWatchDelete") - public void testCancelWatch() throws ExecutionException, InterruptedException, TimeoutException { - CompletableFuture future = watcher.cancel(); - test.assertTrue(future.get(5, TimeUnit.SECONDS)); - } + private EtcdClient client; + private EtcdWatch watchClient; + private EtcdKV kvClient; + private BlockingQueue eventsQueue = new LinkedBlockingDeque<>(); + + private ByteSequence key = ByteSequence.fromString("test_key"); + private ByteSequence value = ByteSequence.fromString("test_val"); + private EtcdWatchImpl.Watcher watcher; + + private Assertion test = new Assertion(); + + @BeforeTest + public void newEtcdClient() throws AuthFailedException, ConnectException { + client = EtcdClientBuilder.newBuilder().endpoints("localhost:2379").build(); + watchClient = client.getWatchClient(); + kvClient = client.getKVClient(); + } + + @Test + public void testWatch() throws ExecutionException, InterruptedException { + WatchOption option = WatchOption.DEFAULT; + watcher = watchClient.watch(key, option, new EtcdWatch.WatchCallback() { + + /** + * onWatch will be called when watcher receive any events + * + * @param header + * @param events received events + */ + @Override + public void onWatch(EtcdHeader header, List events) { + EtcdWatchTest.this.eventsQueue.addAll(events); + } + + /** + * onResuming will be called when the watcher is on resuming. + */ + @Override + public void onResuming() { + + } + + }).get(); + + } + + /** + * watch put operation on key + * assert whether receive put event + */ + @Test(dependsOnMethods = "testWatch") + public void testWatchPut() throws InterruptedException { + kvClient + .put(EtcdUtil.byteStringFromByteSequence(key), EtcdUtil.byteStringFromByteSequence(value)); + WatchEvent event = eventsQueue.poll(5, TimeUnit.SECONDS); + test.assertEquals(event.getKeyValue().getKey(), key); + test.assertEquals(event.getEventType(), WatchEvent.EventType.PUT); + } + + /** + * watch delete operation on key + * assert whether receive delete event + */ + @Test(dependsOnMethods = "testWatchPut") + public void testWatchDelete() throws InterruptedException { + kvClient.delete(EtcdUtil.byteStringFromByteSequence(key)); + WatchEvent event = eventsQueue.poll(5, TimeUnit.SECONDS); + test.assertEquals(event.getKeyValue().getKey(), key); + test.assertEquals(event.getEventType(), WatchEvent.EventType.DELETE); + } + + /** + * cancel watch test case + * assert whether receive cancel response + */ + @Test(dependsOnMethods = "testWatchDelete") + public void testCancelWatch() throws ExecutionException, InterruptedException, TimeoutException { + CompletableFuture future = watcher.cancel(); + test.assertTrue(future.get(5, TimeUnit.SECONDS)); + } } From ca3c0ab02eefb6c6dfc619d91dbf28bbb3debc30 Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Wed, 26 Apr 2017 14:14:46 -0700 Subject: [PATCH 7/8] watch: add package local class Pair to replace javafx.util.Pair. --- .../java/com/coreos/jetcd/EtcdWatchImpl.java | 34 ++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java b/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java index fa9d1eeba..584226cf6 100644 --- a/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java +++ b/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java @@ -25,7 +25,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import javafx.util.Pair; /** * etcd watcher Implementation. @@ -426,4 +425,37 @@ public void close() throws IOException { } } + class Pair { + + final K key; + final V value; + + Pair(K k, V v) { + this.key = k; + this.value = v; + } + + K getKey() { + return key; + } + + V getValue() { + return value; + } + + @Override + public int hashCode() { + return key.hashCode() + value.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null || !(obj instanceof Pair)) { + return false; + } + Pair other = (Pair) obj; + return key.equals(other.key) && value.equals(other.value); + } + } + } From afa36591755e3a3a749ba7f34a0383be10dca065 Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Wed, 26 Apr 2017 14:18:29 -0700 Subject: [PATCH 8/8] watch: correctly cast resumeWatchers in resume() --- src/main/java/com/coreos/jetcd/EtcdWatchImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java b/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java index 584226cf6..b3a957dda 100644 --- a/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java +++ b/src/main/java/com/coreos/jetcd/EtcdWatchImpl.java @@ -98,7 +98,7 @@ protected CompletableFuture cancelWatch(long id) { */ private synchronized void resume() { this.requestStream = null; - WatcherImpl[] resumeWatchers = (WatcherImpl[]) watchers.values().toArray(); + WatcherImpl[] resumeWatchers = watchers.values().toArray(new WatcherImpl[watchers.size()]); this.watchers.clear(); for (CompletableFuture watcherCompletableFuture : pendingCancelFutures.values()) { watcherCompletableFuture.complete(Boolean.TRUE);