Skip to content

Commit

Permalink
add closeable for Watcher, change visible for EtcdUtil
Browse files Browse the repository at this point in the history
  • Loading branch information
StupidHod committed Sep 6, 2016
1 parent 4e55cac commit ad661f4
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 23 deletions.
2 changes: 1 addition & 1 deletion src/main/java/com/coreos/jetcd/EtcdUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
/**
* This util is to convert api class to client class.
*/
public class EtcdUtil {
class EtcdUtil {

private EtcdUtil() {
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/coreos/jetcd/EtcdWatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.coreos.jetcd.options.WatchOption;
import com.coreos.jetcd.watch.WatchEvent;

import java.io.Closeable;
import java.util.List;
import java.util.concurrent.CompletableFuture;

Expand All @@ -26,7 +27,7 @@ public interface EtcdWatch {
*/
CompletableFuture<Watcher> watch(ByteSequence key, WatchOption watchOption, WatchCallback callback);

interface Watcher{
interface Watcher extends Closeable{

/**
* get watcher id
Expand Down
37 changes: 34 additions & 3 deletions src/main/java/com/coreos/jetcd/EtcdWatchImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
import io.grpc.stub.StreamObserver;
import javafx.util.Pair;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.*;

import static com.coreos.jetcd.EtcdUtil.apiToClientEvents;
import static com.coreos.jetcd.EtcdUtil.apiToClientHeader;
Expand Down Expand Up @@ -379,6 +379,37 @@ private void setResuming(boolean resuming) {
this.resuming = resuming;
}

/**
* Closes this stream and releases any system resources associated
* with it. If the stream is already closed then invoking this
* method has no effect.
* <p>
* <p> As noted in {@link AutoCloseable#close()}, cases where the
* close may fail require careful attention. It is strongly advised
* to relinquish the underlying resources and to internally
* <em>mark</em> the {@code Closeable} as closed, prior to throwing
* the {@code IOException}.
*
* @throws IOException if an I/O error occurs
*/
@Override
public void close() throws IOException {
if(!isCanceled()){
try {
if(!cancel().get(5, TimeUnit.SECONDS)){

}
}catch (InterruptedException e){
throw new IOException("Close was interrupted.", e);
} catch (ExecutionException e) {
throw new IOException("Exception during execute.", e);
} catch (TimeoutException e) {
throw new IOException("Close out of time.", e);
}finally {
setCanceled(true);
}
}
}
}

}
26 changes: 8 additions & 18 deletions src/main/java/com/coreos/jetcd/data/ByteSequence.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@
*/
public class ByteSequence {

private final byte[] bytes;
private final int hashVal;
private final ByteString byteString;


public ByteSequence(byte[] source) {
this.bytes = Arrays.copyOf(source, source.length);
hashVal = calcHashCore();
byteString = toByteString();
hashVal = calcHashCore(source);
byteString = toByteString(source);
}

protected ByteSequence(ByteString byteString) {
Expand All @@ -45,17 +43,9 @@ public boolean equals(Object obj) {
return true;
}
if (obj instanceof ByteSequence) {
ByteSequence target = (ByteSequence) obj;
if (target.bytes.length == this.bytes.length) {
for (int i = 0; i < this.bytes.length; ++i) {
if (bytes[i] != target.bytes[i]) {
return false;
}
}
return true;
} else {
return false;
}
ByteSequence other = (ByteSequence) obj;
if(other.hashCode() != hashCode())return false;
return byteString.equals(other.byteString);
} else {
return false;
}
Expand All @@ -65,11 +55,11 @@ protected ByteString getByteString() {
return this.byteString;
}

private ByteString toByteString() {
private ByteString toByteString(byte[] bytes) {
return ByteString.copyFrom(bytes);
}

private int calcHashCore() {
private int calcHashCore(byte[] bytes) {
int result = 0;
for (int i = 0; i < bytes.length; ++i) {
result = 31 * result + bytes[i];
Expand All @@ -95,7 +85,7 @@ public String toString(String charsetName) throws UnsupportedEncodingException {
}

public byte[] getBytes() {
return Arrays.copyOf(bytes, bytes.length);
return byteString.toByteArray();
}

public static ByteSequence fromString(String string) {
Expand Down

0 comments on commit ad661f4

Please sign in to comment.