Skip to content

Commit

Permalink
Merge pull request #84 from fanminshi/StupidHod-watch
Browse files Browse the repository at this point in the history
watch: fix style warnings and minor nits for StupidHod's watch implementation.
  • Loading branch information
fanminshi authored Apr 26, 2017
2 parents fa328f5 + afa3659 commit 83c9b2c
Show file tree
Hide file tree
Showing 12 changed files with 1,181 additions and 1 deletion.
4 changes: 3 additions & 1 deletion properties/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@
<property name="allowedAbbreviationLength" value="10"/>
</module>
<module name="OverloadMethodsDeclarationOrder"/>
<module name="VariableDeclarationUsageDistance"/>
<module name="VariableDeclarationUsageDistance">
<property name="allowedDistance" value="4"/>
</module>
<module name="CustomImportOrder">
<property name="sortImportsInGroupAlphabetically" value="true"/>
<property name="separateLineBetweenGroups" value="true"/>
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/coreos/jetcd/EtcdClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class EtcdClient {
private final Supplier<EtcdMaintenance> maintenanceClient;
private final Supplier<EtcdCluster> clusterClient;
private final Supplier<EtcdLease> leaseClient;
private final Supplier<EtcdWatch> watchClient;

public EtcdClient(EtcdClientBuilder builder) throws ConnectException, AuthFailedException {
this(Optional.empty(), builder);
Expand Down Expand Up @@ -65,6 +66,7 @@ private EtcdClient(Optional<ManagedChannelBuilder<?>> channelBuilder,
this.maintenanceClient = Suppliers.memoize(() -> new EtcdMaintenanceImpl(channel, token));
this.clusterClient = Suppliers.memoize(() -> new EtcdClusterImpl(channel, token));
this.leaseClient = Suppliers.memoize(() -> new EtcdLeaseImpl(channel, token));
this.watchClient = Suppliers.memoize(() -> new EtcdWatchImpl(channel, token));
}

// ************************
Expand All @@ -91,6 +93,10 @@ public EtcdLease getLeaseClient() {
return this.leaseClient.get();
}

public EtcdWatch getWatchClient() {
return this.watchClient.get();
}

public void close() {
channel.shutdownNow();
}
Expand Down
82 changes: 82 additions & 0 deletions src/main/java/com/coreos/jetcd/EtcdUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.coreos.jetcd;

import com.coreos.jetcd.api.Event;
import com.coreos.jetcd.api.ResponseHeader;
import com.coreos.jetcd.data.ByteSequence;
import com.coreos.jetcd.data.EtcdHeader;
import com.coreos.jetcd.data.KeyValue;
import com.coreos.jetcd.watch.WatchEvent;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.List;

/**
* This util is to convert api class to client class.
*/
class EtcdUtil {

private EtcdUtil() {
}

/**
* convert ByteSequence to ByteString.
*/
protected static ByteString byteStringFromByteSequence(ByteSequence byteSequence) {
return ByteString.copyFrom(byteSequence.getBytes());
}

/**
* convert ByteString to ByteSequence.
*/
protected static ByteSequence byteSequceFromByteString(ByteString byteString) {
return ByteSequence.fromBytes(byteString.toByteArray());
}

/**
* convert API KeyValue to etcd client KeyValue.
*/
protected static KeyValue apiToClientKV(com.coreos.jetcd.api.KeyValue keyValue) {
return new KeyValue(
byteSequceFromByteString(keyValue.getKey()),
byteSequceFromByteString(keyValue.getValue()),
keyValue.getCreateRevision(),
keyValue.getModRevision(),
keyValue.getVersion(),
keyValue.getLease());
}

/**
* convert API watch event to etcd client event.
*/
protected static WatchEvent apiToClientEvent(Event event) {
WatchEvent.EventType eventType;
switch (event.getType()) {
case DELETE:
eventType = WatchEvent.EventType.DELETE;
break;
case PUT:
eventType = WatchEvent.EventType.PUT;
break;
default:
eventType = WatchEvent.EventType.UNRECOGNIZED;
}
return new WatchEvent(apiToClientKV(event.getKv()), apiToClientKV(event.getPrevKv()),
eventType);
}

protected static List<WatchEvent> apiToClientEvents(List<Event> events) {
List<WatchEvent> watchEvents = new ArrayList<>();
for (Event event : events) {
watchEvents.add(apiToClientEvent(event));
}
return watchEvents;
}

/**
* convert API response header to self defined header.
*/
protected static EtcdHeader apiToClientHeader(ResponseHeader header, long compactRevision) {
return new EtcdHeader(header.getClusterId(), header.getMemberId(), header.getRevision(),
header.getRaftTerm(), compactRevision);
}
}
74 changes: 74 additions & 0 deletions src/main/java/com/coreos/jetcd/EtcdWatch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.coreos.jetcd;

import com.coreos.jetcd.data.ByteSequence;
import com.coreos.jetcd.data.EtcdHeader;
import com.coreos.jetcd.options.WatchOption;
import com.coreos.jetcd.watch.WatchEvent;
import java.io.Closeable;
import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* Interface of watch client.
*/
public interface EtcdWatch {


/**
* Watch watches on a key or prefix. The watched events will be called by onWatch.
* If the watch is slow or the required rev is compacted, the watch request
* might be canceled from the server-side and the onCreateFailed will be called.
*
* @param key the key subscribe
* @param watchOption key option
* @param callback call back
* @return ListenableFuture watcher
*/
CompletableFuture<Watcher> watch(ByteSequence key, WatchOption watchOption,
WatchCallback callback);

interface Watcher extends Closeable {

/**
* get watcher id.
*
* @return id
*/
long getWatchID();

long getLastRevision();

ByteSequence getKey();

boolean isResuming();

/**
* get the watch option.
*
* @return watch option
*/
WatchOption getWatchOption();

/**
* cancel the watcher.
*
* @return cancel result
*/
CompletableFuture<Boolean> cancel();
}

interface WatchCallback {

/**
* onWatch will be called when watcher receive any events.
*
* @param events received events
*/
void onWatch(EtcdHeader header, List<WatchEvent> events);

/**
* onResuming will be called when the watcher is on resuming.
*/
void onResuming();
}
}
Loading

0 comments on commit 83c9b2c

Please sign in to comment.