-
Notifications
You must be signed in to change notification settings - Fork 315
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add etcd watch implementation #45
Conversation
Good job @StupidHod , will take a look asap. |
import com.coreos.jetcd.options.WatchOption; | ||
import com.coreos.jetcd.util.ListenableSetFuture; | ||
import com.coreos.jetcd.watch.Watcher; | ||
import com.google.protobuf.ByteString; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO, it's better to avoid exposing ByteString directly since Apache Felix's iPOJO abstraction can be broken. Let me explain the detail. protobuf compiler generates non-compatible source code(I mean it's source code-level compatibility, not wire-level compatibility). If a user of jetcd tries to use another protobuf compiler(e.g. v2.5.0 compiler) for her applications, it can happen for users to pass non-compatible(v2.5.0) ByteString bytecode. My suggestion is to create a wrapper class of ByteString for preventing users from touching jetcd's ByteString.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@oza thanks for your great suggestion. If we expose java String to users, and convert it to ByteString, wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One concern to use Java String directly is a semantic gap between Golang's string and Java's string. IIUC, etcd's key can contain illegal String for java String since Golang's string is just a wrapper of byte[]. As a result, it can happen to fail to decode as Java's string. It's not trivial to handle them correctly at jetcd layer. My suggestion is to create primitive APIs which accept/return byte[], and create utility API which convert byte[] in primitive APIs to Java String on the top of them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xiang90 could you confirm this? If so we have to wrap this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@oza is correct.
Hi @StupidHod, thanks for your great job! I'm interested in jetcd, so I'm commenting here. I checked some points about API-level comment and initialization sequence of RequestStream. |
@oza Great news! Would love to have you contribute more to jetcd! Thanks a lot for the help so far! |
@xiang90 Happy to hear that :-) Yes, I will contribute more to jetcd! |
@StupidHod @oza I would support we construct kind of readable/writable byte sequence, we can name it |
ByteSequence should also accept CharSequence |
@adohe @lburgazzoli Got your advices. |
No it is a lazy singleton |
|
||
private ConcurrentHashMap<Long, Watcher> watchers = new ConcurrentHashMap<>(); | ||
|
||
private WatchGrpc.WatchStub watchStub; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private final
could be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, got it.
As the discussion, I have done some changes as below.
|
} | ||
|
||
this.kvClient = newKVClient(kvStub); | ||
this.authClient = newAuthClient(authStub); | ||
this.maintenanceClient = newMaintenanceClient(mainFStub, mainStub); | ||
this.clusterClient = newClusterClient(clusterStub); | ||
this.watchClient = newWatchClient(watchStub); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is really needed to create so many objects bu default ?
I would instead use Suppliers.memorize or any other way to have lazy object creation
@StupidHod @oza @lburgazzoli I think this is just what we want. All you guys are amazing 👍 For this PR I would hope we can focus on the thread safety, I would like to implement something quite clean but enough safe. |
Yes, for some points, we can implement them later. |
OK, checking thread safety. |
|
||
WatchCancelRequest cancelRequest = WatchCancelRequest.newBuilder().setWatchId(id).build(); | ||
WatchRequest request = WatchRequest.newBuilder().setCancelRequest(cancelRequest).build(); | ||
this.requestStream.onNext(request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Direct usage of requestStream
can be null without lock. getRequestStream()
should be called instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks very much.
Curious if this will get merged soon? watch implementation and a release of jetcd being available would be great! |
@StupidHod @oza @lburgazzoli let's check what still remaining, we should get this merge asap. |
@adohe I think you are right, I have fixed the issues proposed, hope for your advices. |
@adohe Will this PR get merged soon? Last comment on this was almost a month ago. |
@abhin4v I will recheck this, and believe we can merge this before this weekend. |
@xiang90 Any show stopper for this PR? |
@blackgwe I will recheck this with @StupidHod, will get this merged asap. |
OK, got it. @adohe |
We now urge for the java watcher service, hope this PR get merged soon. |
@adohe Can you please take another look? |
import com.google.protobuf.ByteString; | ||
import io.grpc.ManagedChannel; | ||
import io.grpc.stub.StreamObserver; | ||
import javafx.util.Pair; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should not include javafx
as a dependency; nothing else in this project does. It also means this only works with Oracle JDKs (i.e. it won't work with OpenJDK). If need be, add a package-local class at the end of the file like this:
class Pair<K,V> {
final K k;
final V v;
Pair(K k, V v) { this.k = k; this.v = v; }
K getKey() { return k; }
V getValue() { return v; }
@Override
public int hashCode() {
return k.hashCode() + v.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj==null || !(obj instanceof Pair)) return false;
Pair<K,V> other = (Pair<K,V>)obj;
return k.equals(other.k) && v.equals(other.v);
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment applies for the https://github.com/enncloud/jetcd changes too. I use the Zulu JVM by the way, which is a maintained branch of OpenJDK by the fine folks at Azul.
/cc @enncloud.
*/ | ||
private synchronized void resume() { | ||
this.requestStream = null; | ||
WatcherImpl[] resumeWatchers = (WatcherImpl[]) watchers.values().toArray(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be
WatcherImpl[] resumeWatchers = (WatcherImpl[]) watchers.values().toArray(new WatcherImpl[watchers.size()]);
or you will get a class cast exception
@krestenkrab Thank you for your reviewing. I will fix them soon. |
Signed-off-by: Steven M. Pritko <[email protected]>
With the wath design, I have finished the watch implementation, hope for your review.
@adohe @xiang90