Skip to content

Commit

Permalink
Communicate the Zxid that triggered a WatchEvent to fire
Browse files Browse the repository at this point in the history
With the recent addition of persistent watches, many doors have opened up to significantly more
performant and intuitive local caches of remote state, but the actual implementation can be
difficult because to cache data locally, one needs to execute the following steps:

1. Set the watch
2. Bootstrap the watched subtree
3. Catch up on the events that fired during the bootstrap

The issue is it's now very difficult to deduplicate and sanely resolve the remote state during step
3 because it's unknown whether an event arrived during the bootstrap or after. For example,
imagine that between steps 1 and 2, a node /a was deleted then re-created. By the time step 3 is
executed, there will be a NodeDeleted event queued up followed by a NodeCreated, causing at best a
double read (one from the bootstrap, one from the NodeCreated) or at worst some data inconsistencies
in the local cache.

This change sets the Zxid in the response header whenever the watch event type is NodeCreated,
NodeDeleted, NodeDataChanged or NodeChildrenChanged.
  • Loading branch information
PapaCharlie committed Jan 9, 2023
1 parent b069ede commit 468e7a0
Show file tree
Hide file tree
Showing 14 changed files with 273 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ void readResponse(ByteBuffer incomingBuffer) throws IOException {
event.setPath(clientPath);
}

WatchedEvent we = new WatchedEvent(event);
WatchedEvent we = new WatchedEvent(event, replyHdr.getZxid());
LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));
eventThread.queueEvent(we);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.zookeeper;

import java.util.Objects;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
Expand All @@ -31,27 +32,38 @@
*/
@InterfaceAudience.Public
public class WatchedEvent {
public static final long NO_ZXID = -1L;

private final KeeperState keeperState;
private final EventType eventType;
private String path;
private final String path;
private final long zxid;

/**
* Create a WatchedEvent with specified type, state and path
* Create a WatchedEvent with specified type, state, path and zxid
*/
public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {
public WatchedEvent(EventType eventType, KeeperState keeperState, String path, long zxid) {
this.keeperState = keeperState;
this.eventType = eventType;
this.path = path;
this.zxid = zxid;
}

/**
* Convert a WatcherEvent sent over the wire into a full-fledged WatcherEvent
* Create a WatchedEvent with specified type, state and path
*/
public WatchedEvent(WatcherEvent eventMessage) {
public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {
this(eventType, keeperState, path, NO_ZXID);
}

/**
* Convert a WatcherEvent sent over the wire into a full-fledged WatchedEvent
*/
public WatchedEvent(WatcherEvent eventMessage, long zxid) {
keeperState = KeeperState.fromInt(eventMessage.getState());
eventType = EventType.fromInt(eventMessage.getType());
path = eventMessage.getPath();
this.zxid = zxid;
}

public KeeperState getState() {
Expand All @@ -66,9 +78,33 @@ public String getPath() {
return path;
}

public long getZxid() {
return zxid;
}

@Override
public String toString() {
return "WatchedEvent state:" + keeperState + " type:" + eventType + " path:" + path;
return "WatchedEvent state:" + keeperState + " type:" + eventType + " path:" + path + " zxid: " + zxid;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WatchedEvent that = (WatchedEvent) o;
return zxid == that.zxid
&& keeperState == that.keeperState
&& eventType == that.eventType
&& Objects.equals(path, that.path);
}

@Override
public int hashCode() {
return Objects.hash(keeperState, eventType, path, zxid);
}

/**
Expand All @@ -77,5 +113,4 @@ public String toString() {
public WatcherEvent getWrapper() {
return new WatcherEvent(eventType.getIntValue(), keeperState.getIntValue(), path);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -527,8 +527,8 @@ public void createNode(final String path, byte[] data, List<ACL> acl, long ephem
updateQuotaStat(lastPrefix, bytes, 1);
}
updateWriteStat(path, bytes);
dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged);
dataWatches.triggerWatch(path, Event.EventType.NodeCreated, zxid);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged, zxid);
}

/**
Expand Down Expand Up @@ -624,9 +624,9 @@ public void deleteNode(String path, long zxid) throws KeeperException.NoNodeExce
"childWatches.triggerWatch " + parentName);
}

WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted);
childWatches.triggerWatch(path, EventType.NodeDeleted, processed);
childWatches.triggerWatch("".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged);
WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted, zxid);
childWatches.triggerWatch(path, EventType.NodeDeleted, zxid, processed);
childWatches.triggerWatch("".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged, zxid);
}

public Stat setData(String path, byte[] data, int version, long zxid, long time) throws KeeperException.NoNodeException {
Expand Down Expand Up @@ -658,7 +658,7 @@ public Stat setData(String path, byte[] data, int version, long zxid, long time)
nodeDataSize.addAndGet(getNodeSize(path, data) - getNodeSize(path, lastdata));

updateWriteStat(path, dataBytes);
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
dataWatches.triggerWatch(path, EventType.NodeDataChanged, zxid);
return s;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
public class DumbWatcher extends ServerCnxn {

private long sessionId;
private long watchedZxid = WatchedEvent.NO_ZXID;

public DumbWatcher() {
this(0);
Expand All @@ -49,6 +50,11 @@ void setSessionTimeout(int sessionTimeout) {

@Override
public void process(WatchedEvent event) {
watchedZxid = event.getZxid();
}

public long getWatchedZxid() {
return watchedZxid;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ public int sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, St
*/
@Override
public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0);
ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
LOG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public int getSessionTimeout() {

@Override
public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0);
ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
LOG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,22 +82,24 @@ default boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode)
*
* @param path znode path
* @param type the watch event type
* @param zxid the zxid for the corresponding change that triggered this event
*
* @return the watchers have been notified
*/
WatcherOrBitSet triggerWatch(String path, EventType type);
WatcherOrBitSet triggerWatch(String path, EventType type, long zxid);

/**
* Distribute the watch event for the given path, but ignore those
* suppressed ones.
*
* @param path znode path
* @param type the watch event type
* @param zxid the zxid for the corresponding change that triggered this event
* @param suppress the suppressed watcher set
*
* @return the watchers have been notified
*/
WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet suppress);
WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet suppress);

/**
* Get the size of watchers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,13 @@ public synchronized void removeWatcher(Watcher watcher) {
}

@Override
public WatcherOrBitSet triggerWatch(String path, EventType type) {
return triggerWatch(path, type, null);
public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid) {
return triggerWatch(path, type, zxid, null);
}

@Override
public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid);
Set<Watcher> watchers = new HashSet<>();
PathParentIterator pathParentIterator = getPathParentIterator(path);
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,13 @@ public void processDeadWatchers(Set<Integer> deadWatchers) {
}

@Override
public WatcherOrBitSet triggerWatch(String path, EventType type) {
return triggerWatch(path, type, null);
public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid) {
return triggerWatch(path, type, zxid, null);
}

@Override
public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet suppress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet suppress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid);

BitHashSet watchers = remove(path);
if (watchers == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.apache.zookeeper.Watcher;
Expand Down Expand Up @@ -131,7 +132,7 @@ public WatcherTriggerWorker(
public void run() {
while (!stopped) {
String path = PATH_PREFIX + r.nextInt(paths);
WatcherOrBitSet s = manager.triggerWatch(path, EventType.NodeDeleted);
WatcherOrBitSet s = manager.triggerWatch(path, EventType.NodeDeleted, -1);
if (s != null) {
triggeredCount.addAndGet(s.size());
}
Expand Down Expand Up @@ -416,6 +417,10 @@ private void checkMetrics(String metricName, long min, long max, double avg, lon
assertEquals(sum, values.get("sum_" + metricName));
}

private void checkWatchedZxid(DumbWatcher watcher, long expectedZxid) {
assertEquals(expectedZxid, watcher.getWatchedZxid());
}

@ParameterizedTest
@MethodSource("data")
public void testWatcherMetrics(String className) throws IOException {
Expand All @@ -430,28 +435,36 @@ public void testWatcherMetrics(String className) throws IOException {

final String path3 = "/path3";

//both wather1 and wather2 are watching path1
//both watcher1 and watcher2 are watching path1
manager.addWatch(path1, watcher1);
manager.addWatch(path1, watcher2);

//path2 is watched by watcher1
manager.addWatch(path2, watcher1);

manager.triggerWatch(path3, EventType.NodeCreated);
manager.triggerWatch(path3, EventType.NodeCreated, 1);
//path3 is not being watched so metric is 0
checkMetrics("node_created_watch_count", 0L, 0L, 0D, 0L, 0L);
checkWatchedZxid(watcher1, 1);
checkWatchedZxid(watcher2, 1);

//path1 is watched by two watchers so two fired
manager.triggerWatch(path1, EventType.NodeCreated);
manager.triggerWatch(path1, EventType.NodeCreated, 2);
checkMetrics("node_created_watch_count", 2L, 2L, 2D, 1L, 2L);
checkWatchedZxid(watcher1, 2);
checkWatchedZxid(watcher2, 2);

//path2 is watched by one watcher so one fired now total is 3
manager.triggerWatch(path2, EventType.NodeCreated);
manager.triggerWatch(path2, EventType.NodeCreated, 3);
checkMetrics("node_created_watch_count", 1L, 2L, 1.5D, 2L, 3L);
checkWatchedZxid(watcher1, 3);
checkWatchedZxid(watcher2, 2);

//watches on path1 are no longer there so zero fired
manager.triggerWatch(path1, EventType.NodeDataChanged);
manager.triggerWatch(path1, EventType.NodeDataChanged, 4);
checkMetrics("node_changed_watch_count", 0L, 0L, 0D, 0L, 0L);
checkWatchedZxid(watcher1, 3);
checkWatchedZxid(watcher2, 2);

//both wather1 and wather2 are watching path1
manager.addWatch(path1, watcher1);
Expand All @@ -460,11 +473,15 @@ public void testWatcherMetrics(String className) throws IOException {
//path2 is watched by watcher1
manager.addWatch(path2, watcher1);

manager.triggerWatch(path1, EventType.NodeDataChanged);
manager.triggerWatch(path1, EventType.NodeDataChanged, 5);
checkMetrics("node_changed_watch_count", 2L, 2L, 2D, 1L, 2L);
checkWatchedZxid(watcher1, 5);
checkWatchedZxid(watcher2, 5);

manager.triggerWatch(path2, EventType.NodeDeleted);
manager.triggerWatch(path2, EventType.NodeDeleted, 6);
checkMetrics("node_deleted_watch_count", 1L, 1L, 1D, 1L, 1L);
checkWatchedZxid(watcher1, 6);
checkWatchedZxid(watcher2, 5);

//make sure that node created watch count is not impacted by the fire of other event types
checkMetrics("node_created_watch_count", 1L, 2L, 1.5D, 2L, 3L);
Expand Down
Loading

0 comments on commit 468e7a0

Please sign in to comment.