Skip to content

Commit

Permalink
add watch implementation and test
Browse files Browse the repository at this point in the history
  • Loading branch information
StupidHod committed Aug 22, 2016
1 parent db521c0 commit 91f6699
Show file tree
Hide file tree
Showing 7 changed files with 1,007 additions and 19 deletions.
46 changes: 27 additions & 19 deletions src/main/java/com/coreos/jetcd/EtcdClient.java
Original file line number Diff line number Diff line change
@@ -1,62 +1,61 @@
package com.coreos.jetcd;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import java.util.concurrent.ExecutionException;

import com.coreos.jetcd.api.AuthGrpc;
import com.coreos.jetcd.api.AuthenticateRequest;
import com.coreos.jetcd.api.AuthenticateResponse;
import com.coreos.jetcd.api.KVGrpc;
import com.coreos.jetcd.api.*;
import com.coreos.jetcd.exception.AuthFailedException;
import com.coreos.jetcd.exception.ConnectException;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;

import io.grpc.CallCredentials;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.stub.AbstractStub;

import java.util.concurrent.ExecutionException;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

/**
* Etcd Client
*/
public class EtcdClient {

private static final String TOKEN = "token";
private static final String TOKEN = "token";

private final ManagedChannelBuilder<?> channelBuilder;
private final String[] endpoints;
private final ManagedChannel channel;
private final String[] endpoints;
private final ManagedChannel channel;

private final EtcdKV kvClient;
private final EtcdAuth authClient;
private final EtcdKV kvClient;
private final EtcdAuth authClient;
private final EtcdWatch watchClient;

private KVGrpc.KVFutureStub kvStub;
private AuthGrpc.AuthFutureStub authStub;
private KVGrpc.KVFutureStub kvStub;
private AuthGrpc.AuthFutureStub authStub;

public EtcdClient(ManagedChannelBuilder<?> channelBuilder, EtcdClientBuilder builder) throws ConnectException, AuthFailedException {
this.endpoints = new String[builder.endpoints().size()];
builder.endpoints().toArray(this.endpoints);
this.channelBuilder = channelBuilder != null ? channelBuilder : ManagedChannelBuilder.forAddress("localhost", 2379).usePlaintext(
true);
this.channelBuilder = channelBuilder != null ? channelBuilder : ManagedChannelBuilder.forAddress("localhost", 2379).usePlaintext(true);

this.channel = this.channelBuilder.build();

this.kvStub = KVGrpc.newFutureStub(this.channel);
this.authStub = AuthGrpc.newFutureStub(this.channel);
WatchGrpc.WatchStub watchStub = WatchGrpc.newStub(this.channel);

String token = getToken(builder);

if (token != null) {
this.authStub = setTokenForStub(authStub, token);
this.kvStub = setTokenForStub(kvStub, token);
watchStub = setTokenForStub(watchStub, token);
}

this.kvClient = newKVClient(kvStub);
this.authClient = newAuthClient(authStub);
this.watchClient = newWatchClient(watchStub);
}

/**
Expand All @@ -72,6 +71,10 @@ private EtcdAuth newAuthClient(AuthGrpc.AuthFutureStub stub) {
return new EtcdAuthImpl(stub);
}

private EtcdWatch newWatchClient(WatchGrpc.WatchStub stub){
return new EtcdWatchImpl(stub);
}

protected EtcdAuth getAuthClient() {
return authClient;
}
Expand All @@ -80,6 +83,10 @@ protected EtcdKV getKVClient() {
return kvClient;
}

protected EtcdWatch getWatchClient(){
return watchClient;
}

/**
* add token to channel's head
*
Expand Down Expand Up @@ -139,6 +146,7 @@ private String getToken(EtcdClientBuilder builder) throws ConnectException, Auth
return null;
}


public void close() {
channel.shutdownNow();
}
Expand Down
32 changes: 32 additions & 0 deletions src/main/java/com/coreos/jetcd/EtcdWatch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.coreos.jetcd;

import com.coreos.jetcd.options.WatchOption;
import com.coreos.jetcd.util.ListenableSetFuture;
import com.coreos.jetcd.watch.Watcher;
import com.google.protobuf.ByteString;

/**
* Interface of watch client
*/
public interface EtcdWatch {


/**
* Watch watches on a key or prefix. The watched events will be called by onWatch.
* If the watch is slow or the required rev is compacted, the watch request
* might be canceled from the server-side and the onCreateFailed will be called.
*
* @param key the key subscribe
* @param watchOption key option
* @param callback call back
* @return ListenableFuture watcher
*/
ListenableSetFuture<Watcher> watch(ByteString key, WatchOption watchOption, Watcher.WatchCallback callback);

/**
* Cancel the watch task with the watcher, the onCanceled will be called after successfully canceled.
*
* @param watcher the watcher to be canceled
*/
void cancelWatch(Watcher watcher);
}
Loading

0 comments on commit 91f6699

Please sign in to comment.