From 880f6064183c216d56d8c3e46a655c2b143f6d5e Mon Sep 17 00:00:00 2001 From: Paul Chesnais Date: Fri, 16 Jun 2023 03:05:58 -0700 Subject: [PATCH] ZOOKEEPER-4655: Communicate the Zxid that triggered a WatchEvent to fire (#1950) * Fix a race condition in WatcherCleanerTest.testDeadWatcherMetrics Because the metrics were updated _after_ the listener is invoked, the listener does not always see the fresh metric value. This fixes it so that the test waits for the value to become what we expect. * Leverage an existing method and refactor the rest of the code to match Since there was an existing waitFor method in ZKTestCase, along with an existing implementation of a waitForMetric LearnerMetricsTest, this commit moves waitForMetric to ZKTestCase and refactors the metric-related usages of waitFor. * Communicate the Zxid that triggered a WatchEvent to fire With the recent addition of persistent watches, many doors have opened up to significantly more performant and intuitive local caches of remote state, but the actual implementation can be difficult because to cache data locally, one needs to execute the following steps: 1. Set the watch 2. Bootstrap the watched subtree 3. Catch up on the events that fired during the bootstrap The issue is it's now very difficult to deduplicate and sanely resolve the remote state during step 3 because it's unknown whether an event arrived during the bootstrap or after. For example, imagine that between steps 1 and 2, a node /a was deleted then re-created. By the time step 3 is executed, there will be a NodeDeleted event queued up followed by a NodeCreated, causing at best a double read (one from the bootstrap, one from the NodeCreated) or at worst some data inconsistencies in the local cache. This change sets the Zxid in the response header whenever the watch event type is NodeCreated, NodeDeleted, NodeDataChanged or NodeChildrenChanged. --- .../zookeeper/server/watch/WatchBench.java | 6 +- .../java/org/apache/zookeeper/ClientCnxn.java | 2 +- .../org/apache/zookeeper/WatchedEvent.java | 39 ++++- .../java/org/apache/zookeeper/Watcher.java | 5 + .../org/apache/zookeeper/server/DataTree.java | 12 +- .../apache/zookeeper/server/DumbWatcher.java | 19 +++ .../zookeeper/server/NIOServerCnxn.java | 2 +- .../zookeeper/server/NettyServerCnxn.java | 2 +- .../zookeeper/server/watch/IWatchManager.java | 6 +- .../zookeeper/server/watch/WatchManager.java | 8 +- .../server/watch/WatchManagerOptimized.java | 8 +- .../java/org/apache/zookeeper/ZKTestCase.java | 42 ++++- .../server/RequestThrottlerTest.java | 16 +- .../FileTxnSnapLogMetricsTest.java | 3 +- .../server/quorum/LearnerMetricsTest.java | 15 -- .../server/watch/WatchManagerTest.java | 37 ++++- .../server/watch/WatcherCleanerTest.java | 32 ++-- .../test/PersistentRecursiveWatcherTest.java | 134 +++++++++------ .../org/apache/zookeeper/test/TestUtils.java | 14 ++ .../test/UnsupportedAddWatcherTest.java | 4 +- .../zookeeper/test/WatchedEventTest.java | 4 +- .../zookeeper/test/WatcherFuncTest.java | 155 ++++++++++-------- 22 files changed, 360 insertions(+), 205 deletions(-) diff --git a/zookeeper-it/src/main/java/org/apache/zookeeper/server/watch/WatchBench.java b/zookeeper-it/src/main/java/org/apache/zookeeper/server/watch/WatchBench.java index aee5b2f18ab..7de1be0370b 100644 --- a/zookeeper-it/src/main/java/org/apache/zookeeper/server/watch/WatchBench.java +++ b/zookeeper-it/src/main/java/org/apache/zookeeper/server/watch/WatchBench.java @@ -191,7 +191,7 @@ void prepare() { @Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS) public void testTriggerConcentrateWatch(InvocationState state) throws Exception { for (String path : state.paths) { - state.watchManager.triggerWatch(path, event); + state.watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID); } } @@ -225,7 +225,7 @@ public void tearDown() { // clear all the watches for (String path : paths) { - watchManager.triggerWatch(path, event); + watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID); } } } @@ -294,7 +294,7 @@ public void prepare() { @Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS) public void testTriggerSparseWatch(TriggerSparseWatchState state) throws Exception { for (String path : state.paths) { - state.watchManager.triggerWatch(path, event); + state.watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID); } } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java index c29a2141d58..727d97daa6a 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java @@ -902,7 +902,7 @@ void readResponse(ByteBuffer incomingBuffer) throws IOException { event.setPath(clientPath); } - WatchedEvent we = new WatchedEvent(event); + WatchedEvent we = new WatchedEvent(event, replyHdr.getZxid()); LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId)); eventThread.queueEvent(we); return; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/WatchedEvent.java b/zookeeper-server/src/main/java/org/apache/zookeeper/WatchedEvent.java index 1de3d3ddf68..1303629c914 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/WatchedEvent.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/WatchedEvent.java @@ -31,27 +31,38 @@ */ @InterfaceAudience.Public public class WatchedEvent { + public static final long NO_ZXID = -1L; private final KeeperState keeperState; private final EventType eventType; - private String path; + private final String path; + private final long zxid; /** - * Create a WatchedEvent with specified type, state and path + * Create a WatchedEvent with specified type, state, path and zxid */ - public WatchedEvent(EventType eventType, KeeperState keeperState, String path) { + public WatchedEvent(EventType eventType, KeeperState keeperState, String path, long zxid) { this.keeperState = keeperState; this.eventType = eventType; this.path = path; + this.zxid = zxid; } /** - * Convert a WatcherEvent sent over the wire into a full-fledged WatcherEvent + * Create a WatchedEvent with specified type, state and path */ - public WatchedEvent(WatcherEvent eventMessage) { + public WatchedEvent(EventType eventType, KeeperState keeperState, String path) { + this(eventType, keeperState, path, NO_ZXID); + } + + /** + * Convert a WatcherEvent sent over the wire into a full-fledged WatchedEvent + */ + public WatchedEvent(WatcherEvent eventMessage, long zxid) { keeperState = KeeperState.fromInt(eventMessage.getState()); eventType = EventType.fromInt(eventMessage.getType()); path = eventMessage.getPath(); + this.zxid = zxid; } public KeeperState getState() { @@ -66,9 +77,24 @@ public String getPath() { return path; } + /** + * Returns the zxid of the transaction that triggered this watch if it is + * of one of the following types: + * Otherwise, returns {@value #NO_ZXID}. Note that {@value #NO_ZXID} is also + * returned by old servers that do not support this feature. + */ + public long getZxid() { + return zxid; + } + @Override public String toString() { - return "WatchedEvent state:" + keeperState + " type:" + eventType + " path:" + path; + return "WatchedEvent state:" + keeperState + " type:" + eventType + " path:" + path + " zxid: " + zxid; } /** @@ -77,5 +103,4 @@ public String toString() { public WatcherEvent getWrapper() { return new WatcherEvent(eventType.getIntValue(), keeperState.getIntValue(), path); } - } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java b/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java index 6347fa4569e..01c3ae75a9f 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java @@ -26,6 +26,11 @@ * server it connects to. An application using such a client handles these * events by registering a callback object with the client. The callback object * is expected to be an instance of a class that implements Watcher interface. + * When {@link #process} is triggered by a watch firing, such as + * {@link Event.EventType#NodeDataChanged}, {@link WatchedEvent#getZxid()} will + * return the zxid of the transaction that caused said watch to fire. If + * {@value WatchedEvent#NO_ZXID} is returned then the server must be updated to + * support this feature. * */ @InterfaceAudience.Public diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java index 1f0e7a74974..be1c7538593 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java @@ -518,8 +518,8 @@ public void createNode(final String path, byte[] data, List acl, long ephem updateQuotaStat(lastPrefix, bytes, 1); } updateWriteStat(path, bytes); - dataWatches.triggerWatch(path, Event.EventType.NodeCreated); - childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged); + dataWatches.triggerWatch(path, Event.EventType.NodeCreated, zxid); + childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged, zxid); } /** @@ -615,9 +615,9 @@ public void deleteNode(String path, long zxid) throws NoNodeException { "childWatches.triggerWatch " + parentName); } - WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted); - childWatches.triggerWatch(path, EventType.NodeDeleted, processed); - childWatches.triggerWatch("".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged); + WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted, zxid); + childWatches.triggerWatch(path, EventType.NodeDeleted, zxid, processed); + childWatches.triggerWatch("".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged, zxid); } public Stat setData(String path, byte[] data, int version, long zxid, long time) throws NoNodeException { @@ -649,7 +649,7 @@ public Stat setData(String path, byte[] data, int version, long zxid, long time) nodeDataSize.addAndGet(getNodeSize(path, data) - getNodeSize(path, lastData)); updateWriteStat(path, dataBytes); - dataWatches.triggerWatch(path, EventType.NodeDataChanged); + dataWatches.triggerWatch(path, EventType.NodeDataChanged, zxid); return s; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java index c7bf830b4ef..231a063fe12 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java @@ -33,6 +33,9 @@ public class DumbWatcher extends ServerCnxn { private long sessionId; + private String mostRecentPath; + private Event.EventType mostRecentEventType; + private long mostRecentZxid = WatchedEvent.NO_ZXID; public DumbWatcher() { this(0); @@ -49,8 +52,24 @@ void setSessionTimeout(int sessionTimeout) { @Override public void process(WatchedEvent event) { + mostRecentEventType = event.getType(); + mostRecentZxid = event.getZxid(); + mostRecentPath = event.getPath(); } + public String getMostRecentPath() { + return mostRecentPath; + } + + public Event.EventType getMostRecentEventType() { + return mostRecentEventType; + } + + public long getMostRecentZxid() { + return mostRecentZxid; + } + + @Override int getSessionTimeout() { return 0; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java index 1a8575cd45e..61cbe71bad2 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java @@ -705,7 +705,7 @@ public int sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, St */ @Override public void process(WatchedEvent event) { - ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0); + ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage( LOG, diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java index f95200d560b..3ed77183434 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java @@ -162,7 +162,7 @@ public int getSessionTimeout() { @Override public void process(WatchedEvent event) { - ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0); + ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage( LOG, diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java index c3900eb64e9..0d537b4ec67 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java @@ -113,10 +113,11 @@ default boolean removeWatcher(String path, Watcher watcher, WatcherMode watcherM * * @param path znode path * @param type the watch event type + * @param zxid the zxid for the corresponding change that triggered this event * * @return the watchers have been notified */ - WatcherOrBitSet triggerWatch(String path, EventType type); + WatcherOrBitSet triggerWatch(String path, EventType type, long zxid); /** * Distribute the watch event for the given path, but ignore those @@ -124,11 +125,12 @@ default boolean removeWatcher(String path, Watcher watcher, WatcherMode watcherM * * @param path znode path * @param type the watch event type + * @param zxid the zxid for the corresponding change that triggered this event * @param suppress the suppressed watcher set * * @return the watchers have been notified */ - WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet suppress); + WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet suppress); /** * Get the size of watchers. diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java index 6e9a3a52bb0..568de2a82d0 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java @@ -129,13 +129,13 @@ public synchronized void removeWatcher(Watcher watcher) { } @Override - public WatcherOrBitSet triggerWatch(String path, EventType type) { - return triggerWatch(path, type, null); + public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid) { + return triggerWatch(path, type, zxid, null); } @Override - public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) { - WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); + public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet supress) { + WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid); Set watchers = new HashSet<>(); synchronized (this) { PathParentIterator pathParentIterator = getPathParentIterator(path); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java index 7f72175efdf..0a6b4279fdb 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java @@ -202,13 +202,13 @@ public void processDeadWatchers(Set deadWatchers) { } @Override - public WatcherOrBitSet triggerWatch(String path, EventType type) { - return triggerWatch(path, type, null); + public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid) { + return triggerWatch(path, type, zxid, null); } @Override - public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet suppress) { - WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); + public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet suppress) { + WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid); BitHashSet watchers = remove(path); if (watchers == null) { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java index 8d9430e1cf6..b29deedb0bc 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java @@ -22,7 +22,12 @@ import static org.junit.jupiter.api.Assertions.fail; import java.io.File; import java.time.Instant; +import org.apache.zookeeper.metrics.MetricsUtils; import org.apache.zookeeper.util.ServiceUtils; +import org.hamcrest.CustomMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.StringDescription; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -40,6 +45,7 @@ public class ZKTestCase { protected static final File testBaseDir = new File(System.getProperty("build.test.dir", "build")); private static final Logger LOG = LoggerFactory.getLogger(ZKTestCase.class); + public static final int DEFAULT_METRIC_TIMEOUT = 30; static { // Disable System.exit in tests. @@ -103,7 +109,7 @@ public interface WaitForCondition { * @param timeout timeout in seconds * @throws InterruptedException */ - public void waitFor(String msg, WaitForCondition condition, int timeout) throws InterruptedException { + public static void waitFor(String msg, WaitForCondition condition, int timeout) throws InterruptedException { final Instant deadline = Instant.now().plusSeconds(timeout); while (Instant.now().isBefore(deadline)) { if (condition.evaluate()) { @@ -114,4 +120,36 @@ public void waitFor(String msg, WaitForCondition condition, int timeout) throws fail(msg); } -} + public static void waitForMetric(String metricKey, Matcher matcher) throws InterruptedException { + waitForMetric(metricKey, matcher, DEFAULT_METRIC_TIMEOUT); + } + + public static void waitForMetric(String metricKey, Matcher matcher, int timeoutInSeconds) throws InterruptedException { + String errorMessage = String.format("metric \"%s\" failed to match after %d seconds", + metricKey, timeoutInSeconds); + waitFor(errorMessage, () -> { + @SuppressWarnings("unchecked") + T actual = (T) MetricsUtils.currentServerMetrics().get(metricKey); + if (!matcher.matches(actual)) { + Description description = new StringDescription(); + matcher.describeMismatch(actual, description); + LOG.info("match failed for metric {}: {}", metricKey, description); + return false; + } + return true; + }, timeoutInSeconds); + } + + /** + * Functionally identical to {@link org.hamcrest.Matchers#closeTo} except that it accepts all numerical types + * instead of failing if the value is not a {@link Double}. + */ + public static Matcher closeTo(double operand, double error) { + return new CustomMatcher(String.format("A number within %s of %s", error, operand)) { + @Override + public boolean matches(Object actual) { + return Math.abs(operand - ((Number) actual).doubleValue()) <= error; + } + }; + } +} \ No newline at end of file diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java index 15259207599..088f80c4857 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java @@ -19,6 +19,8 @@ package org.apache.zookeeper.server; import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.File; @@ -75,10 +77,6 @@ public class RequestThrottlerTest extends ZKTestCase { ZooKeeper zk = null; int connectionLossCount = 0; - private long getCounterMetric(String name) { - return (long) MetricsUtils.currentServerMetrics().get(name); - } - @BeforeEach public void setup() throws Exception { // start a server and create a client @@ -222,11 +220,8 @@ public void testRequestThrottler() throws Exception { submitted.await(5, TimeUnit.SECONDS); // but only two requests can get into the pipeline because of the throttler - WaitForCondition requestQueued = () -> getCounterMetric("prep_processor_request_queued") == 2; - waitFor("request not queued", requestQueued, 5); - - WaitForCondition throttleWait = () -> getCounterMetric("request_throttle_wait_count") >= 1; - waitFor("no throttle wait", throttleWait, 5); + waitForMetric("prep_processor_request_queued", is(2L)); + waitForMetric("request_throttle_wait_count", greaterThanOrEqualTo(1L)); // let the requests go through the pipeline and the throttler will be waken up to allow more requests // to enter the pipeline @@ -387,8 +382,7 @@ public void testGlobalOutstandingRequestThrottlingWithRequestThrottlerDisabled() // be GLOBAL_OUTSTANDING_LIMIT + 2. // // But due to leak of consistent view of number of outstanding requests, the number could be larger. - WaitForCondition requestQueued = () -> getCounterMetric("prep_processor_request_queued") >= Integer.parseInt(GLOBAL_OUTSTANDING_LIMIT) + 2; - waitFor("no enough requests queued", requestQueued, 5); + waitForMetric("prep_processor_request_queued", greaterThanOrEqualTo(Long.parseLong(GLOBAL_OUTSTANDING_LIMIT) + 2)); resumeProcess.countDown(); } catch (Exception e) { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogMetricsTest.java index 65648fefce4..1a569e4d47e 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogMetricsTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogMetricsTest.java @@ -79,8 +79,7 @@ public void testFileTxnSnapLogMetrics() throws Exception { } // It is possible that above writes will trigger more than one snapshot due to randomization. - WaitForCondition newSnapshot = () -> (long) MetricsUtils.currentServerMetrics().get("cnt_snapshottime") >= 2L; - waitFor("no snapshot in 10s", newSnapshot, 10); + waitForMetric("cnt_snapshottime", greaterThanOrEqualTo(2L), 10); // Pauses snapshot and logs more txns. cnxnFactory.getZooKeeperServer().getTxnLogFactory().snapLog.close(); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java index 5df14600a15..aa3ba3622ad 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java @@ -26,10 +26,8 @@ import org.apache.zookeeper.PortAssignment; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.metrics.MetricsUtils; import org.apache.zookeeper.server.ServerMetrics; import org.apache.zookeeper.test.ClientBase; -import org.hamcrest.Matcher; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -38,7 +36,6 @@ public class LearnerMetricsTest extends QuorumPeerTestBase { - private static final int TIMEOUT_SECONDS = 30; private static final int SERVER_COUNT = 4; // 1 observer, 3 participants private final QuorumPeerTestBase.MainThread[] mt = new QuorumPeerTestBase.MainThread[SERVER_COUNT]; private ZooKeeper zk_client; @@ -113,18 +110,6 @@ public void testLearnerMetricsTest(boolean asyncSending) throws Exception { waitForMetric("min_commit_propagation_latency", greaterThanOrEqualTo(0L)); } - private void waitForMetric(final String metricKey, final Matcher matcher) throws InterruptedException { - final String errorMessage = String.format("unable to match on metric: %s", metricKey); - waitFor(errorMessage, () -> { - long actual = (long) MetricsUtils.currentServerMetrics().get(metricKey); - if (!matcher.matches(actual)) { - LOG.info("match failed on {}, actual value: {}", metricKey, actual); - return false; - } - return true; - }, TIMEOUT_SECONDS); - } - @AfterEach public void tearDown() throws Exception { zk_client.close(); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java index 5f40ef3559d..ba97b887e6d 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java @@ -133,7 +133,7 @@ public WatcherTriggerWorker( public void run() { while (!stopped) { String path = PATH_PREFIX + r.nextInt(paths); - WatcherOrBitSet s = manager.triggerWatch(path, EventType.NodeDeleted); + WatcherOrBitSet s = manager.triggerWatch(path, EventType.NodeDeleted, -1); if (s != null) { triggeredCount.addAndGet(s.size()); } @@ -729,6 +729,12 @@ private void checkMetrics(String metricName, long min, long max, double avg, lon assertEquals(sum, values.get("sum_" + metricName)); } + private void checkMostRecentWatchedEvent(DumbWatcher watcher, String path, EventType eventType, long zxid) { + assertEquals(path, watcher.getMostRecentPath()); + assertEquals(eventType, watcher.getMostRecentEventType()); + assertEquals(zxid, watcher.getMostRecentZxid()); + } + @ParameterizedTest @MethodSource("data") public void testWatcherMetrics(String className) throws IOException { @@ -743,41 +749,54 @@ public void testWatcherMetrics(String className) throws IOException { final String path3 = "/path3"; - //both wather1 and wather2 are watching path1 + //both watcher1 and watcher2 are watching path1 manager.addWatch(path1, watcher1); manager.addWatch(path1, watcher2); //path2 is watched by watcher1 manager.addWatch(path2, watcher1); - manager.triggerWatch(path3, EventType.NodeCreated); + manager.triggerWatch(path3, EventType.NodeCreated, 1); //path3 is not being watched so metric is 0 checkMetrics("node_created_watch_count", 0L, 0L, 0D, 0L, 0L); + // Watchers shouldn't have received any events yet so the zxid should be -1. + checkMostRecentWatchedEvent(watcher1, null, null, -1); + checkMostRecentWatchedEvent(watcher2, null, null, -1); //path1 is watched by two watchers so two fired - manager.triggerWatch(path1, EventType.NodeCreated); + manager.triggerWatch(path1, EventType.NodeCreated, 2); checkMetrics("node_created_watch_count", 2L, 2L, 2D, 1L, 2L); + checkMostRecentWatchedEvent(watcher1, path1, EventType.NodeCreated, 2); + checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeCreated, 2); //path2 is watched by one watcher so one fired now total is 3 - manager.triggerWatch(path2, EventType.NodeCreated); + manager.triggerWatch(path2, EventType.NodeCreated, 3); checkMetrics("node_created_watch_count", 1L, 2L, 1.5D, 2L, 3L); + checkMostRecentWatchedEvent(watcher1, path2, EventType.NodeCreated, 3); + checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeCreated, 2); //watches on path1 are no longer there so zero fired - manager.triggerWatch(path1, EventType.NodeDataChanged); + manager.triggerWatch(path1, EventType.NodeDataChanged, 4); checkMetrics("node_changed_watch_count", 0L, 0L, 0D, 0L, 0L); + checkMostRecentWatchedEvent(watcher1, path2, EventType.NodeCreated, 3); + checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeCreated, 2); - //both wather1 and wather2 are watching path1 + //both watcher and watcher are watching path1 manager.addWatch(path1, watcher1); manager.addWatch(path1, watcher2); //path2 is watched by watcher1 manager.addWatch(path2, watcher1); - manager.triggerWatch(path1, EventType.NodeDataChanged); + manager.triggerWatch(path1, EventType.NodeDataChanged, 5); checkMetrics("node_changed_watch_count", 2L, 2L, 2D, 1L, 2L); + checkMostRecentWatchedEvent(watcher1, path1, EventType.NodeDataChanged, 5); + checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeDataChanged, 5); - manager.triggerWatch(path2, EventType.NodeDeleted); + manager.triggerWatch(path2, EventType.NodeDeleted, 6); checkMetrics("node_deleted_watch_count", 1L, 1L, 1D, 1L, 1L); + checkMostRecentWatchedEvent(watcher1, path2, EventType.NodeDeleted, 6); + checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeDataChanged, 5); //make sure that node created watch count is not impacted by the fire of other event types checkMetrics("node_created_watch_count", 1L, 2L, 1.5D, 2L, 3L); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatcherCleanerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatcherCleanerTest.java index 17e44eb9b01..3320f1a247c 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatcherCleanerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatcherCleanerTest.java @@ -17,8 +17,8 @@ package org.apache.zookeeper.server.watch; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.number.OrderingComparison.greaterThan; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -140,7 +140,7 @@ public void testMaxInProcessingDeadWatchers() { } @Test - public void testDeadWatcherMetrics() { + public void testDeadWatcherMetrics() throws InterruptedException { ServerMetrics.getMetrics().resetAll(); MyDeadWatcherListener listener = new MyDeadWatcherListener(); WatcherCleaner cleaner = new WatcherCleaner(listener, 1, 1, 1, 1); @@ -156,19 +156,19 @@ public void testDeadWatcherMetrics() { assertTrue(listener.wait(5000)); Map values = MetricsUtils.currentServerMetrics(); - assertThat("Adding dead watcher should be stalled twice", (Long) values.get("add_dead_watcher_stall_time"), greaterThan(0L)); - assertEquals(3L, values.get("dead_watchers_queued"), "Total dead watchers added to the queue should be 3"); - assertEquals(3L, values.get("dead_watchers_cleared"), "Total dead watchers cleared should be 3"); - - assertEquals(3L, values.get("cnt_dead_watchers_cleaner_latency")); - - //Each latency should be a little over 20 ms, allow 20 ms deviation - assertEquals(20D, (Double) values.get("avg_dead_watchers_cleaner_latency"), 20); - assertEquals(20D, ((Long) values.get("min_dead_watchers_cleaner_latency")).doubleValue(), 20); - assertEquals(20D, ((Long) values.get("max_dead_watchers_cleaner_latency")).doubleValue(), 20); - assertEquals(20D, ((Long) values.get("p50_dead_watchers_cleaner_latency")).doubleValue(), 20); - assertEquals(20D, ((Long) values.get("p95_dead_watchers_cleaner_latency")).doubleValue(), 20); - assertEquals(20D, ((Long) values.get("p99_dead_watchers_cleaner_latency")).doubleValue(), 20); + // Adding dead watcher should be stalled twice + waitForMetric("add_dead_watcher_stall_time", greaterThan(0L)); + waitForMetric("dead_watchers_queued", is(3L)); + waitForMetric("dead_watchers_cleared", is(3L)); + waitForMetric("cnt_dead_watchers_cleaner_latency", is(3L)); + + //Each latency should be a little over 20 ms, allow 5 ms deviation + waitForMetric("avg_dead_watchers_cleaner_latency", closeTo(20, 5)); + waitForMetric("min_dead_watchers_cleaner_latency", closeTo(20, 5)); + waitForMetric("max_dead_watchers_cleaner_latency", closeTo(20, 5)); + waitForMetric("p50_dead_watchers_cleaner_latency", closeTo(20, 5)); + waitForMetric("p95_dead_watchers_cleaner_latency", closeTo(20, 5)); + waitForMetric("p99_dead_watchers_cleaner_latency", closeTo(20, 5)); } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java index 077af3c456c..440485e660f 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java @@ -20,7 +20,6 @@ import static org.apache.zookeeper.AddWatchMode.PERSISTENT; import static org.apache.zookeeper.AddWatchMode.PERSISTENT_RECURSIVE; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; @@ -33,8 +32,11 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -81,21 +83,28 @@ public void testBasicAsync() private void internalTestBasic(ZooKeeper zk) throws KeeperException, InterruptedException { zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.setData("/a/b/c/d/e", new byte[0], -1); + + Stat stat = new Stat(); + zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b", stat); + + zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b/c", stat); + + zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b/c/d", stat); + + zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b/c/d/e", stat); + + stat = zk.setData("/a/b/c/d/e", new byte[0], -1); + assertEvent(events, EventType.NodeDataChanged, "/a/b/c/d/e", stat); + zk.delete("/a/b/c/d/e", -1); - zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d/e"); - assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c/d/e"); - assertEvent(events, Watcher.Event.EventType.NodeDeleted, "/a/b/c/d/e"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d/e"); + assertEvent(events, EventType.NodeDeleted, "/a/b/c/d/e", zk.exists("/a/b/c/d", false).getPzxid()); + + zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b/c/d/e", stat); } @Test @@ -104,14 +113,15 @@ public void testRemoval() try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) { zk.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE); zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c"); + Stat stat = new Stat(); + zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b", stat); + zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b/c", stat); zk.removeWatches("/a/b", persistentWatcher, Watcher.WatcherType.Any, false); zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - assertEvent(events, Watcher.Event.EventType.PersistentWatchRemoved, "/a/b"); + assertEvent(events, EventType.PersistentWatchRemoved, "/a/b", WatchedEvent.NO_ZXID); } } @@ -125,13 +135,15 @@ public void testNoChildEvents() throws Exception { BlockingQueue childEvents = new LinkedBlockingQueue<>(); zk.getChildren("/a", childEvents::add); - zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + Stat createABStat = new Stat(); + zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createABStat); + Stat createABCStat = new Stat(); + zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createABCStat); - assertEvent(childEvents, Watcher.Event.EventType.NodeChildrenChanged, "/a"); + assertEvent(childEvents, Watcher.Event.EventType.NodeChildrenChanged, "/a", createABStat.getPzxid()); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c"); + assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b", createABStat); + assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c", createABCStat); assertTrue(events.isEmpty()); } } @@ -141,9 +153,9 @@ public void testDisconnect() throws Exception { try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) { zk.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE); stopServer(); - assertEvent(events, Watcher.Event.EventType.None, null); + assertEvent(events, EventType.None, KeeperState.Disconnected, null, WatchedEvent.NO_ZXID); startServer(); - assertEvent(events, Watcher.Event.EventType.None, null); + assertEvent(events, EventType.None, KeeperState.SyncConnected, null, WatchedEvent.NO_ZXID); internalTestBasic(zk); } } @@ -158,17 +170,15 @@ public void testMultiClient() zk1.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk1.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE); - zk1.setData("/a/b/c", "one".getBytes(), -1); - Thread.sleep(1000); // give some time for the event to arrive - - zk2.setData("/a/b/c", "two".getBytes(), -1); - zk2.setData("/a/b/c", "three".getBytes(), -1); - zk2.setData("/a/b/c", "four".getBytes(), -1); - - assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c"); - assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c"); - assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c"); - assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c"); + Stat stat = zk1.setData("/a/b/c", "one".getBytes(), -1); + assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid()); + + stat = zk2.setData("/a/b/c", "two".getBytes(), -1); + assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid()); + stat = zk2.setData("/a/b/c", "three".getBytes(), -1); + assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid()); + stat = zk2.setData("/a/b/c", "four".getBytes(), -1); + assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid()); } } @@ -224,22 +234,42 @@ public void testRootWatcher() throws IOException, InterruptedException, KeeperException { try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) { zk.addWatch("/", persistentWatcher, PERSISTENT_RECURSIVE); - zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/b"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/b/c"); + Stat stat = new Stat(); + + zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a", stat.getMzxid()); + + zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b", stat.getMzxid()); + + zk.create("/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/b", stat.getMzxid()); + + zk.create("/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/b/c", stat.getMzxid()); } } - private void assertEvent(BlockingQueue events, Watcher.Event.EventType eventType, String path) - throws InterruptedException { - WatchedEvent event = events.poll(5, TimeUnit.SECONDS); - assertNotNull(event); - assertEquals(eventType, event.getType()); - assertEquals(path, event.getPath()); + private void assertEvent(BlockingQueue events, EventType eventType, String path, Stat stat) + throws InterruptedException { + assertEvent(events, eventType, path, stat.getMzxid()); + } + + private void assertEvent(BlockingQueue events, EventType eventType, String path, long zxid) + throws InterruptedException { + assertEvent(events, eventType, KeeperState.SyncConnected, path, zxid); + } + + private void assertEvent(BlockingQueue events, EventType eventType, KeeperState keeperState, + String path, long zxid) throws InterruptedException { + WatchedEvent actualEvent = events.poll(5, TimeUnit.SECONDS); + assertNotNull(actualEvent); + WatchedEvent expectedEvent = new WatchedEvent( + eventType, + keeperState, + path, + zxid + ); + TestUtils.assertWatchedEventEquals(expectedEvent, actualEvent); } } \ No newline at end of file diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestUtils.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestUtils.java index 00c6c070a04..e3306c1fe76 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestUtils.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestUtils.java @@ -18,8 +18,10 @@ package org.apache.zookeeper.test; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; import java.io.File; +import org.apache.zookeeper.WatchedEvent; /** * This class contains test utility methods @@ -57,4 +59,16 @@ public static boolean deleteFileRecursively(File file) { return deleteFileRecursively(file, false); } + /** + * Asserts that the given {@link WatchedEvent} are semantically equal, i.e. they have the same EventType, path and + * zxid. + */ + public static void assertWatchedEventEquals(WatchedEvent expected, WatchedEvent actual) { + // TODO: .hashCode and .equals cannot be added to WatchedEvent without potentially breaking consumers. This + // can be changed to `assertEquals(expected, actual)` once WatchedEvent has those methods. Until then, + // compare the lists manually. + assertEquals(expected.getType(), actual.getType()); + assertEquals(expected.getPath(), actual.getPath()); + assertEquals(expected.getZxid(), actual.getZxid()); + } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java index a3d6eef7bdc..cce26276cdd 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java @@ -59,12 +59,12 @@ public void removeWatcher(Watcher watcher) { } @Override - public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type) { + public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type, long zxid) { return new WatcherOrBitSet(Collections.emptySet()); } @Override - public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type, WatcherOrBitSet suppress) { + public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type, long zxid, WatcherOrBitSet suppress) { return new WatcherOrBitSet(Collections.emptySet()); } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatchedEventTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatchedEventTest.java index f4a0298f233..a9bc11da2a6 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatchedEventTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatchedEventTest.java @@ -61,7 +61,7 @@ public void testCreatingWatchedEventFromWrapper() { for (EventType et : allTypes) { for (KeeperState ks : allStates) { wep = new WatcherEvent(et.getIntValue(), ks.getIntValue(), "blah"); - we = new WatchedEvent(wep); + we = new WatchedEvent(wep, WatchedEvent.NO_ZXID); assertEquals(et, we.getType()); assertEquals(ks, we.getState()); assertEquals("blah", we.getPath()); @@ -75,7 +75,7 @@ public void testCreatingWatchedEventFromInvalidWrapper() { try { WatcherEvent wep = new WatcherEvent(-2342, -252352, "foo"); - new WatchedEvent(wep); + new WatchedEvent(wep, WatchedEvent.NO_ZXID); fail("Was able to create WatchedEvent from bad wrapper"); } catch (RuntimeException re) { // we're good diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherFuncTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherFuncTest.java index 180cd08a611..44440a7c3bb 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherFuncTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherFuncTest.java @@ -37,6 +37,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -68,14 +69,17 @@ public void process(WatchedEvent event) { assertTrue(false, "interruption unexpected"); } } - public void verify(List expected) throws InterruptedException { + + public void verify(List expected) throws InterruptedException { + List actual = new ArrayList<>(); WatchedEvent event; - int count = 0; - while (count < expected.size() && (event = events.poll(30, TimeUnit.SECONDS)) != null) { - assertEquals(expected.get(count), event.getType()); - count++; + while (actual.size() < expected.size() && (event = events.poll(30, TimeUnit.SECONDS)) != null) { + actual.add(event); + } + assertEquals(expected.size(), actual.size()); + for (int i = 0; i < expected.size(); i++) { + TestUtils.assertWatchedEventEquals(expected.get(i), actual.get(i)); } - assertEquals(expected.size(), count); events.clear(); } @@ -88,7 +92,7 @@ public void verify(List expected) throws InterruptedException { private volatile CountDownLatch lsnr_latch; private ZooKeeper lsnr; - private List expected; + private List expected; @BeforeEach @Override @@ -127,15 +131,34 @@ private void verify() throws InterruptedException { expected.clear(); } + private void addEvent(List events, EventType eventType, String path, Stat stat) { + addEvent(events, eventType, path, stat.getMzxid()); + } + + private void addEvent(List events, EventType eventType, String path, long zxid) { + events.add(new WatchedEvent(eventType, KeeperState.SyncConnected, path, zxid)); + } + + private long delete(String path) throws InterruptedException, KeeperException { + client.delete(path, -1); + int lastSlash = path.lastIndexOf('/'); + String parent = (lastSlash == 0) + ? "/" + : path.substring(0, lastSlash); + // the deletion's zxid will be reflected in the parent's Pzxid + return client.exists(parent, false).getPzxid(); + } + @Test public void testExistsSync() throws IOException, InterruptedException, KeeperException { assertNull(lsnr.exists("/foo", true)); assertNull(lsnr.exists("/foo/bar", true)); - client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - expected.add(EventType.NodeCreated); - client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - expected.add(EventType.NodeCreated); + Stat stat = new Stat(); + client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(expected, EventType.NodeCreated, "/foo", stat); + client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(expected, EventType.NodeCreated, "/foo/bar", stat); verify(); @@ -160,20 +183,20 @@ public void testExistsSync() throws IOException, InterruptedException, KeeperExc assertEquals("/foo/car", e.getPath()); } - client.setData("/foo", "parent".getBytes(), -1); - expected.add(EventType.NodeDataChanged); - client.setData("/foo/bar", "child".getBytes(), -1); - expected.add(EventType.NodeDataChanged); + stat = client.setData("/foo", "parent".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo", stat); + stat = client.setData("/foo/bar", "child".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo/bar", stat); verify(); assertNotNull(lsnr.exists("/foo", true)); assertNotNull(lsnr.exists("/foo/bar", true)); - client.delete("/foo/bar", -1); - expected.add(EventType.NodeDeleted); - client.delete("/foo", -1); - expected.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(expected, EventType.NodeDeleted, "/foo/bar", deleteZxid); + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); verify(); } @@ -200,20 +223,20 @@ public void testGetDataSync() throws IOException, InterruptedException, KeeperEx client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); assertNotNull(lsnr.getData("/foo/bar", true, null)); - client.setData("/foo", "parent".getBytes(), -1); - expected.add(EventType.NodeDataChanged); - client.setData("/foo/bar", "child".getBytes(), -1); - expected.add(EventType.NodeDataChanged); + Stat stat = client.setData("/foo", "parent".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo", stat); + stat = client.setData("/foo/bar", "child".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo/bar", stat); verify(); assertNotNull(lsnr.getData("/foo", true, null)); assertNotNull(lsnr.getData("/foo/bar", true, null)); - client.delete("/foo/bar", -1); - expected.add(EventType.NodeDeleted); - client.delete("/foo", -1); - expected.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(expected, EventType.NodeDeleted, "/foo/bar", deleteZxid); + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); verify(); } @@ -238,8 +261,9 @@ public void testGetChildrenSync() throws IOException, InterruptedException, Keep client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); assertNotNull(lsnr.getChildren("/foo", true)); - client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - expected.add(EventType.NodeChildrenChanged); // /foo + Stat stat = new Stat(); + client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(expected, EventType.NodeChildrenChanged, "/foo", stat); // /foo assertNotNull(lsnr.getChildren("/foo/bar", true)); client.setData("/foo", "parent".getBytes(), -1); @@ -250,11 +274,11 @@ public void testGetChildrenSync() throws IOException, InterruptedException, Keep assertNotNull(lsnr.getChildren("/foo", true)); assertNotNull(lsnr.getChildren("/foo/bar", true)); - client.delete("/foo/bar", -1); - expected.add(EventType.NodeDeleted); // /foo/bar childwatch - expected.add(EventType.NodeChildrenChanged); // /foo - client.delete("/foo", -1); - expected.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(expected, EventType.NodeDeleted, "/foo/bar", deleteZxid); // /foo/bar childwatch + addEvent(expected, EventType.NodeChildrenChanged, "/foo", deleteZxid); // /foo + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); verify(); } @@ -266,7 +290,7 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe SimpleWatcher w3 = new SimpleWatcher(null); SimpleWatcher w4 = new SimpleWatcher(null); - List e2 = new ArrayList<>(); + List e2 = new ArrayList<>(); assertNull(lsnr.exists("/foo", true)); assertNull(lsnr.exists("/foo", w1)); @@ -276,10 +300,11 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe assertNull(lsnr.exists("/foo/bar", w3)); assertNull(lsnr.exists("/foo/bar", w4)); - client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - expected.add(EventType.NodeCreated); - client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - e2.add(EventType.NodeCreated); + Stat stat = new Stat(); + client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(expected, EventType.NodeCreated, "/foo", stat); + client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(e2, EventType.NodeCreated, "/foo/bar", stat); lsnr_dwatch.verify(expected); w1.verify(expected); @@ -297,10 +322,10 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe assertNotNull(lsnr.exists("/foo/bar", w4)); assertNotNull(lsnr.exists("/foo/bar", w4)); - client.setData("/foo", "parent".getBytes(), -1); - expected.add(EventType.NodeDataChanged); - client.setData("/foo/bar", "child".getBytes(), -1); - e2.add(EventType.NodeDataChanged); + stat = client.setData("/foo", "parent".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo", stat); + stat = client.setData("/foo/bar", "child".getBytes(), -1); + addEvent(e2, EventType.NodeDataChanged, "/foo/bar", stat); lsnr_dwatch.verify(new ArrayList<>()); // not reg so should = 0 w1.verify(expected); @@ -319,10 +344,10 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe assertNotNull(lsnr.exists("/foo/bar", w3)); assertNotNull(lsnr.exists("/foo/bar", w4)); - client.delete("/foo/bar", -1); - expected.add(EventType.NodeDeleted); - client.delete("/foo", -1); - e2.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(e2, EventType.NodeDeleted, "/foo/bar", deleteZxid); + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); lsnr_dwatch.verify(expected); w1.verify(expected); @@ -331,7 +356,6 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe w4.verify(e2); expected.clear(); e2.clear(); - } @Test @@ -341,7 +365,7 @@ public void testGetDataSyncWObj() throws IOException, InterruptedException, Keep SimpleWatcher w3 = new SimpleWatcher(null); SimpleWatcher w4 = new SimpleWatcher(null); - List e2 = new ArrayList<>(); + List e2 = new ArrayList<>(); try { lsnr.getData("/foo", w1, null); @@ -367,10 +391,10 @@ public void testGetDataSyncWObj() throws IOException, InterruptedException, Keep assertNotNull(lsnr.getData("/foo/bar", w4, null)); assertNotNull(lsnr.getData("/foo/bar", w4, null)); - client.setData("/foo", "parent".getBytes(), -1); - expected.add(EventType.NodeDataChanged); - client.setData("/foo/bar", "child".getBytes(), -1); - e2.add(EventType.NodeDataChanged); + Stat stat = client.setData("/foo", "parent".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo", stat); + stat = client.setData("/foo/bar", "child".getBytes(), -1); + addEvent(e2, EventType.NodeDataChanged, "/foo/bar", stat); lsnr_dwatch.verify(expected); w1.verify(expected); @@ -387,10 +411,10 @@ public void testGetDataSyncWObj() throws IOException, InterruptedException, Keep assertNotNull(lsnr.getData("/foo/bar", w3, null)); assertNotNull(lsnr.getData("/foo/bar", w4, null)); - client.delete("/foo/bar", -1); - expected.add(EventType.NodeDeleted); - client.delete("/foo", -1); - e2.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(e2, EventType.NodeDeleted, "/foo/bar", deleteZxid); + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); lsnr_dwatch.verify(expected); w1.verify(expected); @@ -408,7 +432,7 @@ public void testGetChildrenSyncWObj() throws IOException, InterruptedException, SimpleWatcher w3 = new SimpleWatcher(null); SimpleWatcher w4 = new SimpleWatcher(null); - List e2 = new ArrayList<>(); + List e2 = new ArrayList<>(); try { lsnr.getChildren("/foo", true); @@ -429,8 +453,9 @@ public void testGetChildrenSyncWObj() throws IOException, InterruptedException, assertNotNull(lsnr.getChildren("/foo", true)); assertNotNull(lsnr.getChildren("/foo", w1)); - client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - expected.add(EventType.NodeChildrenChanged); // /foo + Stat stat = new Stat(); + client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(expected, EventType.NodeChildrenChanged, "/foo", stat); // /foo assertNotNull(lsnr.getChildren("/foo/bar", w2)); assertNotNull(lsnr.getChildren("/foo/bar", w2)); assertNotNull(lsnr.getChildren("/foo/bar", w3)); @@ -451,11 +476,11 @@ public void testGetChildrenSyncWObj() throws IOException, InterruptedException, assertNotNull(lsnr.getChildren("/foo/bar", w4)); assertNotNull(lsnr.getChildren("/foo/bar", w4)); - client.delete("/foo/bar", -1); - e2.add(EventType.NodeDeleted); // /foo/bar childwatch - expected.add(EventType.NodeChildrenChanged); // /foo - client.delete("/foo", -1); - expected.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(e2, EventType.NodeDeleted, "/foo/bar", deleteZxid); + addEvent(expected, EventType.NodeChildrenChanged, "/foo", deleteZxid); // /foo + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); lsnr_dwatch.verify(expected); w1.verify(expected);