Skip to content

Commit

Permalink
add closeable for Watcher, change visible for EtcdUtil
Browse files Browse the repository at this point in the history
  • Loading branch information
StupidHod committed Dec 2, 2016
1 parent 8d2baa5 commit 5e16176
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 30 deletions.
2 changes: 1 addition & 1 deletion src/main/java/com/coreos/jetcd/EtcdUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
/**
* This util is to convert api class to client class.
*/
public class EtcdUtil {
class EtcdUtil {

private EtcdUtil() {
}
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/com/coreos/jetcd/EtcdWatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.coreos.jetcd.options.WatchOption;
import com.coreos.jetcd.watch.WatchEvent;

import java.io.Closeable;
import java.util.List;
import java.util.concurrent.CompletableFuture;

Expand All @@ -26,10 +27,11 @@ public interface EtcdWatch {
*/
CompletableFuture<Watcher> watch(ByteSequence key, WatchOption watchOption, WatchCallback callback);

interface Watcher{
interface Watcher extends Closeable {

/**
* get watcher id
*
* @return id
*/
long getWatchID();
Expand All @@ -42,12 +44,14 @@ interface Watcher{

/**
* get the watch option
*
* @return watch option
*/
WatchOption getWatchOption();

/**
* cancel the watcher
*
* @return cancel result
*/
CompletableFuture<Boolean> cancel();
Expand Down
41 changes: 36 additions & 5 deletions src/main/java/com/coreos/jetcd/EtcdWatchImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@
import io.grpc.stub.StreamObserver;
import javafx.util.Pair;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.*;

import static com.coreos.jetcd.EtcdUtil.apiToClientEvents;
import static com.coreos.jetcd.EtcdUtil.apiToClientHeader;
Expand Down Expand Up @@ -75,7 +74,7 @@ protected CompletableFuture<Boolean> cancelWatch(long id) {

WatchCancelRequest cancelRequest = WatchCancelRequest.newBuilder().setWatchId(id).build();
WatchRequest request = WatchRequest.newBuilder().setCancelRequest(cancelRequest).build();
this.requestStream.onNext(request);
getRequestStream().onNext(request);
return completableFuture;
}

Expand Down Expand Up @@ -154,7 +153,8 @@ private void processCreate(WatchResponse response) {
if (response.getCreated()) {
if (response.getCanceled() || response.getCompactRevision() != 0) {
watcher.setCanceled(true);
requestPair.getValue().completeExceptionally(new WatchCreateException("the start revision has been compacted", apiToClientHeader(response.getHeader(), response.getCompactRevision())));;
requestPair.getValue().completeExceptionally(new WatchCreateException("the start revision has been compacted", apiToClientHeader(response.getHeader(), response.getCompactRevision())));
;
}

if (response.getWatchId() == -1 && watcher.callback != null) {
Expand Down Expand Up @@ -379,6 +379,37 @@ private void setResuming(boolean resuming) {
this.resuming = resuming;
}

/**
* Closes this stream and releases any system resources associated
* with it. If the stream is already closed then invoking this
* method has no effect.
* <p>
* <p> As noted in {@link AutoCloseable#close()}, cases where the
* close may fail require careful attention. It is strongly advised
* to relinquish the underlying resources and to internally
* <em>mark</em> the {@code Closeable} as closed, prior to throwing
* the {@code IOException}.
*
* @throws IOException if an I/O error occurs
*/
@Override
public void close() throws IOException {
if (!isCanceled()) {
try {
if (!cancel().get(5, TimeUnit.SECONDS)) {

}
} catch (InterruptedException e) {
throw new IOException("Close was interrupted.", e);
} catch (ExecutionException e) {
throw new IOException("Exception during execute.", e);
} catch (TimeoutException e) {
throw new IOException("Close out of time.", e);
} finally {
setCanceled(true);
}
}
}
}

}
27 changes: 8 additions & 19 deletions src/main/java/com/coreos/jetcd/data/ByteSequence.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,19 @@
import java.io.UnsupportedEncodingException;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.util.Arrays;

/**
* Etcd binary bytes, easy to convert between byte[], String and ByteString.
*/
public class ByteSequence {

private final byte[] bytes;
private final int hashVal;
private final ByteString byteString;


public ByteSequence(byte[] source) {
this.bytes = Arrays.copyOf(source, source.length);
hashVal = calcHashCore();
byteString = toByteString();
hashVal = calcHashCore(source);
byteString = toByteString(source);
}

protected ByteSequence(ByteString byteString) {
Expand All @@ -45,17 +42,9 @@ public boolean equals(Object obj) {
return true;
}
if (obj instanceof ByteSequence) {
ByteSequence target = (ByteSequence) obj;
if (target.bytes.length == this.bytes.length) {
for (int i = 0; i < this.bytes.length; ++i) {
if (bytes[i] != target.bytes[i]) {
return false;
}
}
return true;
} else {
return false;
}
ByteSequence other = (ByteSequence) obj;
if (other.hashCode() != hashCode()) return false;
return byteString.equals(other.byteString);
} else {
return false;
}
Expand All @@ -65,11 +54,11 @@ protected ByteString getByteString() {
return this.byteString;
}

private ByteString toByteString() {
private ByteString toByteString(byte[] bytes) {
return ByteString.copyFrom(bytes);
}

private int calcHashCore() {
private int calcHashCore(byte[] bytes) {
int result = 0;
for (int i = 0; i < bytes.length; ++i) {
result = 31 * result + bytes[i];
Expand All @@ -95,7 +84,7 @@ public String toString(String charsetName) throws UnsupportedEncodingException {
}

public byte[] getBytes() {
return Arrays.copyOf(bytes, bytes.length);
return byteString.toByteArray();
}

public static ByteSequence fromString(String string) {
Expand Down
4 changes: 0 additions & 4 deletions src/main/java/com/coreos/jetcd/options/WatchOption.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
package com.coreos.jetcd.options;

import com.coreos.jetcd.api.WatchCreateRequest;
import com.coreos.jetcd.data.ByteSequence;
import com.google.protobuf.ByteString;

import java.util.List;
import java.util.Optional;
import java.util.Set;

/**
* The option for watch operation.
Expand Down

0 comments on commit 5e16176

Please sign in to comment.