Skip to content

Commit

Permalink
Add watch support from PR etcd-io#45
Browse files Browse the repository at this point in the history
Signed-off-by: Steven M. Pritko <[email protected]>
  • Loading branch information
Steven M. Pritko committed Mar 5, 2017
1 parent 89fc358 commit ad7d2e1
Show file tree
Hide file tree
Showing 12 changed files with 1,136 additions and 0 deletions.
6 changes: 6 additions & 0 deletions src/main/java/com/coreos/jetcd/EtcdClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class EtcdClient {
private final Supplier<EtcdMaintenance> maintenanceClient;
private final Supplier<EtcdCluster> clusterClient;
private final Supplier<EtcdLease> leaseClient;
private final Supplier<EtcdWatch> watchClient;

public EtcdClient(EtcdClientBuilder builder) throws ConnectException, AuthFailedException {
this(Optional.empty(), builder);
Expand Down Expand Up @@ -65,6 +66,7 @@ private EtcdClient(Optional<ManagedChannelBuilder<?>> channelBuilder,
this.maintenanceClient = Suppliers.memoize(() -> new EtcdMaintenanceImpl(channel, token));
this.clusterClient = Suppliers.memoize(() -> new EtcdClusterImpl(channel, token));
this.leaseClient = Suppliers.memoize(() -> new EtcdLeaseImpl(channel, token));
this.watchClient = Suppliers.memoize(() -> new EtcdWatchImpl(channel, token));
}

// ************************
Expand All @@ -91,6 +93,10 @@ public EtcdLease getLeaseClient() {
return this.leaseClient.get();
}

public EtcdWatch getWatchClient() {
return this.watchClient.get();
}

public void close() {
channel.shutdownNow();
}
Expand Down
82 changes: 82 additions & 0 deletions src/main/java/com/coreos/jetcd/EtcdUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.coreos.jetcd;

import com.coreos.jetcd.api.Event;
import com.coreos.jetcd.api.ResponseHeader;
import com.coreos.jetcd.data.ByteSequence;
import com.coreos.jetcd.data.EtcdHeader;
import com.coreos.jetcd.data.KeyValue;
import com.coreos.jetcd.watch.WatchEvent;
import com.google.protobuf.ByteString;

import java.util.ArrayList;
import java.util.List;

/**
* This util is to convert api class to client class.
*/
class EtcdUtil {

private EtcdUtil() {
}

/**
* convert ByteSequence to ByteString
*/
protected static ByteString byteStringFromByteSequence(ByteSequence byteSequence) {
return ByteString.copyFrom(byteSequence.getBytes());
}

/**
* convert ByteString to ByteSequence
*
* @return
*/
protected static ByteSequence byteSequceFromByteString(ByteString byteString) {
return ByteSequence.fromBytes(byteString.toByteArray());
}

/**
* convert API KeyValue to etcd client KeyValue
*/
protected static KeyValue apiToClientKV(com.coreos.jetcd.api.KeyValue keyValue) {
return new KeyValue(
byteSequceFromByteString(keyValue.getKey()),
byteSequceFromByteString(keyValue.getValue()),
keyValue.getCreateRevision(),
keyValue.getModRevision(),
keyValue.getVersion(),
keyValue.getLease());
}

/**
* convert API watch event to etcd client event
*/
protected static WatchEvent apiToClientEvent(Event event) {
WatchEvent.EventType eventType = WatchEvent.EventType.UNRECOGNIZED;
switch (event.getType()) {
case DELETE:
eventType = WatchEvent.EventType.DELETE;
break;
case PUT:
eventType = WatchEvent.EventType.PUT;
break;
}
return new WatchEvent(apiToClientKV(event.getKv()), apiToClientKV(event.getPrevKv()), eventType);
}

protected static List<WatchEvent> apiToClientEvents(List<Event> events) {
List<WatchEvent> watchEvents = new ArrayList<>();
for (Event event : events) {
watchEvents.add(apiToClientEvent(event));
}
return watchEvents;
}

/**
* convert API response header to self defined header
*/
protected static EtcdHeader apiToClientHeader(ResponseHeader header, long compactRevision) {
return new EtcdHeader(header.getClusterId(), header.getMemberId(),
header.getRevision(), header.getRaftTerm(), compactRevision);
}
}
74 changes: 74 additions & 0 deletions src/main/java/com/coreos/jetcd/EtcdWatch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.coreos.jetcd;

import com.coreos.jetcd.data.ByteSequence;
import com.coreos.jetcd.data.EtcdHeader;
import com.coreos.jetcd.options.WatchOption;
import com.coreos.jetcd.watch.WatchEvent;

import java.io.Closeable;
import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* Interface of watch client
*/
public interface EtcdWatch {


/**
* Watch watches on a key or prefix. The watched events will be called by onWatch.
* If the watch is slow or the required rev is compacted, the watch request
* might be canceled from the server-side and the onCreateFailed will be called.
*
* @param key the key subscribe
* @param watchOption key option
* @param callback call back
* @return ListenableFuture watcher
*/
CompletableFuture<Watcher> watch(ByteSequence key, WatchOption watchOption, WatchCallback callback);

interface Watcher extends Closeable {

/**
* get watcher id
*
* @return id
*/
long getWatchID();

long getLastRevision();

ByteSequence getKey();

boolean isResuming();

/**
* get the watch option
*
* @return watch option
*/
WatchOption getWatchOption();

/**
* cancel the watcher
*
* @return cancel result
*/
CompletableFuture<Boolean> cancel();
}

interface WatchCallback {

/**
* onWatch will be called when watcher receive any events
*
* @param events received events
*/
void onWatch(EtcdHeader header, List<WatchEvent> events);

/**
* onResuming will be called when the watcher is on resuming.
*/
void onResuming();
}
}
Loading

0 comments on commit ad7d2e1

Please sign in to comment.