diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java index 96ff8eb088e..7608415e669 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java @@ -902,7 +902,7 @@ void readResponse(ByteBuffer incomingBuffer) throws IOException { event.setPath(clientPath); } - WatchedEvent we = new WatchedEvent(event); + WatchedEvent we = new WatchedEvent(event, replyHdr.getZxid()); LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId)); eventThread.queueEvent(we); return; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/WatchedEvent.java b/zookeeper-server/src/main/java/org/apache/zookeeper/WatchedEvent.java index 1de3d3ddf68..8acc3e83f15 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/WatchedEvent.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/WatchedEvent.java @@ -18,6 +18,7 @@ package org.apache.zookeeper; +import java.util.Objects; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; @@ -31,27 +32,38 @@ */ @InterfaceAudience.Public public class WatchedEvent { + public static final long NO_ZXID = -1L; private final KeeperState keeperState; private final EventType eventType; - private String path; + private final String path; + private final long zxid; /** - * Create a WatchedEvent with specified type, state and path + * Create a WatchedEvent with specified type, state, path and zxid */ - public WatchedEvent(EventType eventType, KeeperState keeperState, String path) { + public WatchedEvent(EventType eventType, KeeperState keeperState, String path, long zxid) { this.keeperState = keeperState; this.eventType = eventType; this.path = path; + this.zxid = zxid; } /** - * Convert a WatcherEvent sent over the wire into a full-fledged WatcherEvent + * Create a WatchedEvent with specified type, state and path */ - public WatchedEvent(WatcherEvent eventMessage) { + public WatchedEvent(EventType eventType, KeeperState keeperState, String path) { + this(eventType, keeperState, path, NO_ZXID); + } + + /** + * Convert a WatcherEvent sent over the wire into a full-fledged WatchedEvent + */ + public WatchedEvent(WatcherEvent eventMessage, long zxid) { keeperState = KeeperState.fromInt(eventMessage.getState()); eventType = EventType.fromInt(eventMessage.getType()); path = eventMessage.getPath(); + this.zxid = zxid; } public KeeperState getState() { @@ -66,9 +78,33 @@ public String getPath() { return path; } + public long getZxid() { + return zxid; + } + @Override public String toString() { - return "WatchedEvent state:" + keeperState + " type:" + eventType + " path:" + path; + return "WatchedEvent state:" + keeperState + " type:" + eventType + " path:" + path + " zxid: " + zxid; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + WatchedEvent that = (WatchedEvent) o; + return zxid == that.zxid + && keeperState == that.keeperState + && eventType == that.eventType + && Objects.equals(path, that.path); + } + + @Override + public int hashCode() { + return Objects.hash(keeperState, eventType, path, zxid); } /** @@ -77,5 +113,4 @@ public String toString() { public WatcherEvent getWrapper() { return new WatcherEvent(eventType.getIntValue(), keeperState.getIntValue(), path); } - } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java index cd45a7c3437..bbc9b97dbaa 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java @@ -527,8 +527,8 @@ public void createNode(final String path, byte[] data, List acl, long ephem updateQuotaStat(lastPrefix, bytes, 1); } updateWriteStat(path, bytes); - dataWatches.triggerWatch(path, Event.EventType.NodeCreated); - childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged); + dataWatches.triggerWatch(path, Event.EventType.NodeCreated, zxid); + childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged, zxid); } /** @@ -624,9 +624,9 @@ public void deleteNode(String path, long zxid) throws KeeperException.NoNodeExce "childWatches.triggerWatch " + parentName); } - WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted); - childWatches.triggerWatch(path, EventType.NodeDeleted, processed); - childWatches.triggerWatch("".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged); + WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted, zxid); + childWatches.triggerWatch(path, EventType.NodeDeleted, zxid, processed); + childWatches.triggerWatch("".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged, zxid); } public Stat setData(String path, byte[] data, int version, long zxid, long time) throws KeeperException.NoNodeException { @@ -658,7 +658,7 @@ public Stat setData(String path, byte[] data, int version, long zxid, long time) nodeDataSize.addAndGet(getNodeSize(path, data) - getNodeSize(path, lastdata)); updateWriteStat(path, dataBytes); - dataWatches.triggerWatch(path, EventType.NodeDataChanged); + dataWatches.triggerWatch(path, EventType.NodeDataChanged, zxid); return s; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java index c7bf830b4ef..93bfc70e7cc 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java @@ -33,6 +33,7 @@ public class DumbWatcher extends ServerCnxn { private long sessionId; + private long watchedZxid = WatchedEvent.NO_ZXID; public DumbWatcher() { this(0); @@ -49,6 +50,11 @@ void setSessionTimeout(int sessionTimeout) { @Override public void process(WatchedEvent event) { + watchedZxid = event.getZxid(); + } + + public long getWatchedZxid() { + return watchedZxid; } @Override diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java index 5ffc81da1c3..ca18d3411c9 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java @@ -705,7 +705,7 @@ public int sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, St */ @Override public void process(WatchedEvent event) { - ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0); + ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage( LOG, diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java index f95200d560b..3ed77183434 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java @@ -162,7 +162,7 @@ public int getSessionTimeout() { @Override public void process(WatchedEvent event) { - ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0); + ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage( LOG, diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java index 1bc44c805a0..482bc7e87d4 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java @@ -82,10 +82,11 @@ default boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode) * * @param path znode path * @param type the watch event type + * @param zxid the zxid for the corresponding change that triggered this event * * @return the watchers have been notified */ - WatcherOrBitSet triggerWatch(String path, EventType type); + WatcherOrBitSet triggerWatch(String path, EventType type, long zxid); /** * Distribute the watch event for the given path, but ignore those @@ -93,11 +94,12 @@ default boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode) * * @param path znode path * @param type the watch event type + * @param zxid the zxid for the corresponding change that triggered this event * @param suppress the suppressed watcher set * * @return the watchers have been notified */ - WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet suppress); + WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet suppress); /** * Get the size of watchers. diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java index c5b133059b2..2697808f6ca 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java @@ -115,13 +115,13 @@ public synchronized void removeWatcher(Watcher watcher) { } @Override - public WatcherOrBitSet triggerWatch(String path, EventType type) { - return triggerWatch(path, type, null); + public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid) { + return triggerWatch(path, type, zxid, null); } @Override - public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) { - WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); + public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet supress) { + WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid); Set watchers = new HashSet<>(); PathParentIterator pathParentIterator = getPathParentIterator(path); synchronized (this) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java index 1cc7deb9dc0..291fae1adab 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java @@ -202,13 +202,13 @@ public void processDeadWatchers(Set deadWatchers) { } @Override - public WatcherOrBitSet triggerWatch(String path, EventType type) { - return triggerWatch(path, type, null); + public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid) { + return triggerWatch(path, type, zxid, null); } @Override - public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet suppress) { - WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); + public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet suppress) { + WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid); BitHashSet watchers = remove(path); if (watchers == null) { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java index bd613a44488..cef143911f4 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java @@ -28,6 +28,7 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; import org.apache.zookeeper.Watcher; @@ -131,7 +132,7 @@ public WatcherTriggerWorker( public void run() { while (!stopped) { String path = PATH_PREFIX + r.nextInt(paths); - WatcherOrBitSet s = manager.triggerWatch(path, EventType.NodeDeleted); + WatcherOrBitSet s = manager.triggerWatch(path, EventType.NodeDeleted, -1); if (s != null) { triggeredCount.addAndGet(s.size()); } @@ -416,6 +417,10 @@ private void checkMetrics(String metricName, long min, long max, double avg, lon assertEquals(sum, values.get("sum_" + metricName)); } + private void checkWatchedZxid(DumbWatcher watcher, long expectedZxid) { + assertEquals(expectedZxid, watcher.getWatchedZxid()); + } + @ParameterizedTest @MethodSource("data") public void testWatcherMetrics(String className) throws IOException { @@ -430,28 +435,36 @@ public void testWatcherMetrics(String className) throws IOException { final String path3 = "/path3"; - //both wather1 and wather2 are watching path1 + //both watcher1 and watcher2 are watching path1 manager.addWatch(path1, watcher1); manager.addWatch(path1, watcher2); //path2 is watched by watcher1 manager.addWatch(path2, watcher1); - manager.triggerWatch(path3, EventType.NodeCreated); + manager.triggerWatch(path3, EventType.NodeCreated, 1); //path3 is not being watched so metric is 0 checkMetrics("node_created_watch_count", 0L, 0L, 0D, 0L, 0L); + checkWatchedZxid(watcher1, 1); + checkWatchedZxid(watcher2, 1); //path1 is watched by two watchers so two fired - manager.triggerWatch(path1, EventType.NodeCreated); + manager.triggerWatch(path1, EventType.NodeCreated, 2); checkMetrics("node_created_watch_count", 2L, 2L, 2D, 1L, 2L); + checkWatchedZxid(watcher1, 2); + checkWatchedZxid(watcher2, 2); //path2 is watched by one watcher so one fired now total is 3 - manager.triggerWatch(path2, EventType.NodeCreated); + manager.triggerWatch(path2, EventType.NodeCreated, 3); checkMetrics("node_created_watch_count", 1L, 2L, 1.5D, 2L, 3L); + checkWatchedZxid(watcher1, 3); + checkWatchedZxid(watcher2, 2); //watches on path1 are no longer there so zero fired - manager.triggerWatch(path1, EventType.NodeDataChanged); + manager.triggerWatch(path1, EventType.NodeDataChanged, 4); checkMetrics("node_changed_watch_count", 0L, 0L, 0D, 0L, 0L); + checkWatchedZxid(watcher1, 3); + checkWatchedZxid(watcher2, 2); //both wather1 and wather2 are watching path1 manager.addWatch(path1, watcher1); @@ -460,11 +473,15 @@ public void testWatcherMetrics(String className) throws IOException { //path2 is watched by watcher1 manager.addWatch(path2, watcher1); - manager.triggerWatch(path1, EventType.NodeDataChanged); + manager.triggerWatch(path1, EventType.NodeDataChanged, 5); checkMetrics("node_changed_watch_count", 2L, 2L, 2D, 1L, 2L); + checkWatchedZxid(watcher1, 5); + checkWatchedZxid(watcher2, 5); - manager.triggerWatch(path2, EventType.NodeDeleted); + manager.triggerWatch(path2, EventType.NodeDeleted, 6); checkMetrics("node_deleted_watch_count", 1L, 1L, 1D, 1L, 1L); + checkWatchedZxid(watcher1, 6); + checkWatchedZxid(watcher2, 5); //make sure that node created watch count is not impacted by the fire of other event types checkMetrics("node_created_watch_count", 1L, 2L, 1.5D, 2L, 3L); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java index 80d8c400cd2..8e6f0a3c737 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java @@ -18,10 +18,6 @@ package org.apache.zookeeper.test; -import static org.apache.zookeeper.AddWatchMode.PERSISTENT_RECURSIVE; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; @@ -32,13 +28,21 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.zookeeper.AddWatchMode.PERSISTENT_RECURSIVE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + public class PersistentRecursiveWatcherTest extends ClientBase { private static final Logger LOG = LoggerFactory.getLogger(PersistentRecursiveWatcherTest.class); private BlockingQueue events; @@ -80,21 +84,28 @@ public void testBasicAsync() private void internalTestBasic(ZooKeeper zk) throws KeeperException, InterruptedException { zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.setData("/a/b/c/d/e", new byte[0], -1); + + Stat stat = new Stat(); + zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b", stat); + + zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b/c", stat); + + zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b/c/d", stat); + + zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b/c/d/e", stat); + + stat = zk.setData("/a/b/c/d/e", new byte[0], -1); + assertEvent(events, EventType.NodeDataChanged, "/a/b/c/d/e", stat); + zk.delete("/a/b/c/d/e", -1); - zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d/e"); - assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c/d/e"); - assertEvent(events, Watcher.Event.EventType.NodeDeleted, "/a/b/c/d/e"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d/e"); + assertEvent(events, EventType.NodeDeleted, "/a/b/c/d/e", zk.exists("/a/b/c/d", false).getPzxid()); + + zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b/c/d/e", stat); } @Test @@ -103,14 +114,15 @@ public void testRemoval() try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) { zk.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE); zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c"); + Stat stat = new Stat(); + zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b", stat); + zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b/c", stat); zk.removeWatches("/a/b", persistentWatcher, Watcher.WatcherType.Any, false); zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - assertEvent(events, Watcher.Event.EventType.PersistentWatchRemoved, "/a/b"); + assertEvent(events, EventType.PersistentWatchRemoved, "/a/b", WatchedEvent.NO_ZXID); } } @@ -119,9 +131,9 @@ public void testDisconnect() throws Exception { try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) { zk.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE); stopServer(); - assertEvent(events, Watcher.Event.EventType.None, null); + assertEvent(events, EventType.None, KeeperState.Disconnected, null, WatchedEvent.NO_ZXID); startServer(); - assertEvent(events, Watcher.Event.EventType.None, null); + assertEvent(events, EventType.None, KeeperState.SyncConnected, null, WatchedEvent.NO_ZXID); internalTestBasic(zk); } } @@ -136,17 +148,15 @@ public void testMultiClient() zk1.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk1.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE); - zk1.setData("/a/b/c", "one".getBytes(), -1); - Thread.sleep(1000); // give some time for the event to arrive - - zk2.setData("/a/b/c", "two".getBytes(), -1); - zk2.setData("/a/b/c", "three".getBytes(), -1); - zk2.setData("/a/b/c", "four".getBytes(), -1); - - assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c"); - assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c"); - assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c"); - assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c"); + Stat stat = zk1.setData("/a/b/c", "one".getBytes(), -1); + assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid()); + + stat = zk2.setData("/a/b/c", "two".getBytes(), -1); + assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid()); + stat = zk2.setData("/a/b/c", "three".getBytes(), -1); + assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid()); + stat = zk2.setData("/a/b/c", "four".getBytes(), -1); + assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid()); } } @@ -155,22 +165,42 @@ public void testRootWatcher() throws IOException, InterruptedException, KeeperException { try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) { zk.addWatch("/", persistentWatcher, PERSISTENT_RECURSIVE); - zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/b"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/b/c"); + Stat stat = new Stat(); + + zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a", stat.getMzxid()); + + zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b", stat.getMzxid()); + + zk.create("/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/b", stat.getMzxid()); + + zk.create("/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/b/c", stat.getMzxid()); } } - private void assertEvent(BlockingQueue events, Watcher.Event.EventType eventType, String path) - throws InterruptedException { - WatchedEvent event = events.poll(5, TimeUnit.SECONDS); - assertNotNull(event); - assertEquals(eventType, event.getType()); - assertEquals(path, event.getPath()); + private void assertEvent(BlockingQueue events, EventType eventType, String path, Stat stat) + throws InterruptedException { + assertEvent(events, eventType, path, stat.getMzxid()); + } + + private void assertEvent(BlockingQueue events, EventType eventType, String path, long zxid) + throws InterruptedException { + assertEvent(events, eventType, KeeperState.SyncConnected, path, zxid); + } + + private void assertEvent(BlockingQueue events, EventType eventType, KeeperState keeperState, + String path, long zxid) throws InterruptedException { + WatchedEvent actualEvent = events.poll(5, TimeUnit.SECONDS); + assertNotNull(actualEvent); + WatchedEvent expectedEvent = new WatchedEvent( + eventType, + keeperState, + path, + zxid + ); + assertEquals(expectedEvent, actualEvent); } } \ No newline at end of file diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java index a3d6eef7bdc..cce26276cdd 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java @@ -59,12 +59,12 @@ public void removeWatcher(Watcher watcher) { } @Override - public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type) { + public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type, long zxid) { return new WatcherOrBitSet(Collections.emptySet()); } @Override - public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type, WatcherOrBitSet suppress) { + public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type, long zxid, WatcherOrBitSet suppress) { return new WatcherOrBitSet(Collections.emptySet()); } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatchedEventTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatchedEventTest.java index f4a0298f233..a9bc11da2a6 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatchedEventTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatchedEventTest.java @@ -61,7 +61,7 @@ public void testCreatingWatchedEventFromWrapper() { for (EventType et : allTypes) { for (KeeperState ks : allStates) { wep = new WatcherEvent(et.getIntValue(), ks.getIntValue(), "blah"); - we = new WatchedEvent(wep); + we = new WatchedEvent(wep, WatchedEvent.NO_ZXID); assertEquals(et, we.getType()); assertEquals(ks, we.getState()); assertEquals("blah", we.getPath()); @@ -75,7 +75,7 @@ public void testCreatingWatchedEventFromInvalidWrapper() { try { WatcherEvent wep = new WatcherEvent(-2342, -252352, "foo"); - new WatchedEvent(wep); + new WatchedEvent(wep, WatchedEvent.NO_ZXID); fail("Was able to create WatchedEvent from bad wrapper"); } catch (RuntimeException re) { // we're good diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherFuncTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherFuncTest.java index 0987c4cf7e9..4d893ed6379 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherFuncTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherFuncTest.java @@ -18,11 +18,6 @@ package org.apache.zookeeper.test; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -37,10 +32,17 @@ import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + public class WatcherFuncTest extends ClientBase { private static class SimpleWatcher implements Watcher { @@ -68,14 +70,14 @@ public void process(WatchedEvent event) { assertTrue(false, "interruption unexpected"); } } - public void verify(List expected) throws InterruptedException { + + public void verify(List expected) throws InterruptedException { + List actual = new ArrayList<>(); WatchedEvent event; - int count = 0; - while (count < expected.size() && (event = events.poll(30, TimeUnit.SECONDS)) != null) { - assertEquals(expected.get(count), event.getType()); - count++; + while (actual.size() < expected.size() && (event = events.poll(30, TimeUnit.SECONDS)) != null) { + actual.add(event); } - assertEquals(expected.size(), count); + assertEquals(expected, actual); events.clear(); } @@ -88,7 +90,7 @@ public void verify(List expected) throws InterruptedException { private volatile CountDownLatch lsnr_latch; private ZooKeeper lsnr; - private List expected; + private List expected; @BeforeEach @Override @@ -103,7 +105,7 @@ public void setUp() throws Exception { lsnr_dwatch = new SimpleWatcher(lsnr_latch); lsnr = createClient(lsnr_dwatch, lsnr_latch); - expected = new ArrayList(); + expected = new ArrayList<>(); } @AfterEach @@ -127,15 +129,34 @@ private void verify() throws InterruptedException { expected.clear(); } + private void addEvent(List events, EventType eventType, String path, Stat stat) { + addEvent(events, eventType, path, stat.getMzxid()); + } + + private void addEvent(List events, EventType eventType, String path, long zxid) { + events.add(new WatchedEvent(eventType, KeeperState.SyncConnected, path, zxid)); + } + + private long delete(String path) throws InterruptedException, KeeperException { + client.delete(path, -1); + int lastSlash = path.lastIndexOf('/'); + String parent = (lastSlash == 0) + ? "/" + : path.substring(0, lastSlash); + // the deletion's zxid will be reflected in the parent's Pzxid + return client.exists(parent, false).getPzxid(); + } + @Test public void testExistsSync() throws IOException, InterruptedException, KeeperException { assertNull(lsnr.exists("/foo", true)); assertNull(lsnr.exists("/foo/bar", true)); - client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - expected.add(EventType.NodeCreated); - client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - expected.add(EventType.NodeCreated); + Stat stat = new Stat(); + client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(expected, EventType.NodeCreated, "/foo", stat); + client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(expected, EventType.NodeCreated, "/foo/bar", stat); verify(); @@ -160,20 +181,20 @@ public void testExistsSync() throws IOException, InterruptedException, KeeperExc assertEquals("/foo/car", e.getPath()); } - client.setData("/foo", "parent".getBytes(), -1); - expected.add(EventType.NodeDataChanged); - client.setData("/foo/bar", "child".getBytes(), -1); - expected.add(EventType.NodeDataChanged); + stat = client.setData("/foo", "parent".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo", stat); + stat = client.setData("/foo/bar", "child".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo/bar", stat); verify(); assertNotNull(lsnr.exists("/foo", true)); assertNotNull(lsnr.exists("/foo/bar", true)); - client.delete("/foo/bar", -1); - expected.add(EventType.NodeDeleted); - client.delete("/foo", -1); - expected.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(expected, EventType.NodeDeleted, "/foo/bar", deleteZxid); + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); verify(); } @@ -200,20 +221,20 @@ public void testGetDataSync() throws IOException, InterruptedException, KeeperEx client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); assertNotNull(lsnr.getData("/foo/bar", true, null)); - client.setData("/foo", "parent".getBytes(), -1); - expected.add(EventType.NodeDataChanged); - client.setData("/foo/bar", "child".getBytes(), -1); - expected.add(EventType.NodeDataChanged); + Stat stat = client.setData("/foo", "parent".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo", stat); + stat = client.setData("/foo/bar", "child".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo/bar", stat); verify(); assertNotNull(lsnr.getData("/foo", true, null)); assertNotNull(lsnr.getData("/foo/bar", true, null)); - client.delete("/foo/bar", -1); - expected.add(EventType.NodeDeleted); - client.delete("/foo", -1); - expected.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(expected, EventType.NodeDeleted, "/foo/bar", deleteZxid); + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); verify(); } @@ -238,8 +259,9 @@ public void testGetChildrenSync() throws IOException, InterruptedException, Keep client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); assertNotNull(lsnr.getChildren("/foo", true)); - client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - expected.add(EventType.NodeChildrenChanged); // /foo + Stat stat = new Stat(); + client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(expected, EventType.NodeChildrenChanged, "/foo", stat); // /foo assertNotNull(lsnr.getChildren("/foo/bar", true)); client.setData("/foo", "parent".getBytes(), -1); @@ -250,11 +272,11 @@ public void testGetChildrenSync() throws IOException, InterruptedException, Keep assertNotNull(lsnr.getChildren("/foo", true)); assertNotNull(lsnr.getChildren("/foo/bar", true)); - client.delete("/foo/bar", -1); - expected.add(EventType.NodeDeleted); // /foo/bar childwatch - expected.add(EventType.NodeChildrenChanged); // /foo - client.delete("/foo", -1); - expected.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(expected, EventType.NodeDeleted, "/foo/bar", deleteZxid); // /foo/bar childwatch + addEvent(expected, EventType.NodeChildrenChanged, "/foo", deleteZxid); // /foo + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); verify(); } @@ -266,7 +288,7 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe SimpleWatcher w3 = new SimpleWatcher(null); SimpleWatcher w4 = new SimpleWatcher(null); - List e2 = new ArrayList(); + List e2 = new ArrayList(); assertNull(lsnr.exists("/foo", true)); assertNull(lsnr.exists("/foo", w1)); @@ -276,10 +298,11 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe assertNull(lsnr.exists("/foo/bar", w3)); assertNull(lsnr.exists("/foo/bar", w4)); - client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - expected.add(EventType.NodeCreated); - client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - e2.add(EventType.NodeCreated); + Stat stat = new Stat(); + client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(expected, EventType.NodeCreated, "/foo", stat); + client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(e2, EventType.NodeCreated, "/foo/bar", stat); lsnr_dwatch.verify(expected); w1.verify(expected); @@ -297,12 +320,12 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe assertNotNull(lsnr.exists("/foo/bar", w4)); assertNotNull(lsnr.exists("/foo/bar", w4)); - client.setData("/foo", "parent".getBytes(), -1); - expected.add(EventType.NodeDataChanged); - client.setData("/foo/bar", "child".getBytes(), -1); - e2.add(EventType.NodeDataChanged); + stat = client.setData("/foo", "parent".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo", stat); + stat = client.setData("/foo/bar", "child".getBytes(), -1); + addEvent(e2, EventType.NodeDataChanged, "/foo/bar", stat); - lsnr_dwatch.verify(new ArrayList()); // not reg so should = 0 + lsnr_dwatch.verify(new ArrayList<>()); // not reg so should = 0 w1.verify(expected); w2.verify(e2); w3.verify(e2); @@ -319,10 +342,10 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe assertNotNull(lsnr.exists("/foo/bar", w3)); assertNotNull(lsnr.exists("/foo/bar", w4)); - client.delete("/foo/bar", -1); - expected.add(EventType.NodeDeleted); - client.delete("/foo", -1); - e2.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(e2, EventType.NodeDeleted, "/foo/bar", deleteZxid); + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); lsnr_dwatch.verify(expected); w1.verify(expected); @@ -331,7 +354,6 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe w4.verify(e2); expected.clear(); e2.clear(); - } @Test @@ -341,7 +363,7 @@ public void testGetDataSyncWObj() throws IOException, InterruptedException, Keep SimpleWatcher w3 = new SimpleWatcher(null); SimpleWatcher w4 = new SimpleWatcher(null); - List e2 = new ArrayList(); + List e2 = new ArrayList(); try { lsnr.getData("/foo", w1, null); @@ -367,10 +389,10 @@ public void testGetDataSyncWObj() throws IOException, InterruptedException, Keep assertNotNull(lsnr.getData("/foo/bar", w4, null)); assertNotNull(lsnr.getData("/foo/bar", w4, null)); - client.setData("/foo", "parent".getBytes(), -1); - expected.add(EventType.NodeDataChanged); - client.setData("/foo/bar", "child".getBytes(), -1); - e2.add(EventType.NodeDataChanged); + Stat stat = client.setData("/foo", "parent".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo", stat); + stat = client.setData("/foo/bar", "child".getBytes(), -1); + addEvent(e2, EventType.NodeDataChanged, "/foo/bar", stat); lsnr_dwatch.verify(expected); w1.verify(expected); @@ -387,10 +409,10 @@ public void testGetDataSyncWObj() throws IOException, InterruptedException, Keep assertNotNull(lsnr.getData("/foo/bar", w3, null)); assertNotNull(lsnr.getData("/foo/bar", w4, null)); - client.delete("/foo/bar", -1); - expected.add(EventType.NodeDeleted); - client.delete("/foo", -1); - e2.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(e2, EventType.NodeDeleted, "/foo/bar", deleteZxid); + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); lsnr_dwatch.verify(expected); w1.verify(expected); @@ -408,7 +430,7 @@ public void testGetChildrenSyncWObj() throws IOException, InterruptedException, SimpleWatcher w3 = new SimpleWatcher(null); SimpleWatcher w4 = new SimpleWatcher(null); - List e2 = new ArrayList(); + List e2 = new ArrayList(); try { lsnr.getChildren("/foo", true); @@ -429,8 +451,9 @@ public void testGetChildrenSyncWObj() throws IOException, InterruptedException, assertNotNull(lsnr.getChildren("/foo", true)); assertNotNull(lsnr.getChildren("/foo", w1)); - client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - expected.add(EventType.NodeChildrenChanged); // /foo + Stat stat = new Stat(); + client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(expected, EventType.NodeChildrenChanged, "/foo", stat); // /foo assertNotNull(lsnr.getChildren("/foo/bar", w2)); assertNotNull(lsnr.getChildren("/foo/bar", w2)); assertNotNull(lsnr.getChildren("/foo/bar", w3)); @@ -451,11 +474,11 @@ public void testGetChildrenSyncWObj() throws IOException, InterruptedException, assertNotNull(lsnr.getChildren("/foo/bar", w4)); assertNotNull(lsnr.getChildren("/foo/bar", w4)); - client.delete("/foo/bar", -1); - e2.add(EventType.NodeDeleted); // /foo/bar childwatch - expected.add(EventType.NodeChildrenChanged); // /foo - client.delete("/foo", -1); - expected.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(e2, EventType.NodeDeleted, "/foo/bar", deleteZxid); + addEvent(expected, EventType.NodeChildrenChanged, "/foo", deleteZxid); // /foo + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); lsnr_dwatch.verify(expected); w1.verify(expected);