Skip to content

Commit

Permalink
ZOOKEEPER-4466: Support different watch modes on same path (apache#1859)
Browse files Browse the repository at this point in the history
Signed-off-by: Kezhu Wang <[email protected]>
Co-authored-by: tison <[email protected]>
  • Loading branch information
kezhuw and tisonkun authored May 5, 2023
1 parent 89c1831 commit a64dbf5
Show file tree
Hide file tree
Showing 8 changed files with 219 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,9 @@ public String getMaxPrefixWithQuota(String path) {
public void addWatch(String basePath, Watcher watcher, int mode) {
WatcherMode watcherMode = WatcherMode.fromZooDef(mode);
dataWatches.addWatch(basePath, watcher, watcherMode);
childWatches.addWatch(basePath, watcher, watcherMode);
if (watcherMode != WatcherMode.PERSISTENT_RECURSIVE) {
childWatches.addWatch(basePath, watcher, watcherMode);
}
}

public byte[] getData(String path, Stat stat, Watcher watcher) throws NoNodeException {
Expand Down Expand Up @@ -1511,7 +1513,6 @@ public void setWatches(long relativeZxid, List<String> dataWatches, List<String>
this.dataWatches.addWatch(path, watcher, WatcherMode.PERSISTENT);
}
for (String path : persistentRecursiveWatches) {
this.childWatches.addWatch(path, watcher, WatcherMode.PERSISTENT_RECURSIVE);
this.dataWatches.addWatch(path, watcher, WatcherMode.PERSISTENT_RECURSIVE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,4 @@ default boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode)
*
*/
void dumpWatches(PrintWriter pwriter, boolean byPath);

/**
* Return the current number of recursive watchers
*
* @return qty
*/
default int getRecursiveWatchQty() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.zookeeper.server.watch;

import java.io.PrintWriter;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -45,9 +46,9 @@ public class WatchManager implements IWatchManager {

private final Map<String, Set<Watcher>> watchTable = new HashMap<>();

private final Map<Watcher, Set<String>> watch2Paths = new HashMap<>();
private final Map<Watcher, Map<String, WatchStats>> watch2Paths = new HashMap<>();

private final WatcherModeManager watcherModeManager = new WatcherModeManager();
private int recursiveWatchQty = 0;

@Override
public synchronized int size() {
Expand Down Expand Up @@ -84,33 +85,46 @@ public synchronized boolean addWatch(String path, Watcher watcher, WatcherMode w
}
list.add(watcher);

Set<String> paths = watch2Paths.get(watcher);
Map<String, WatchStats> paths = watch2Paths.get(watcher);
if (paths == null) {
// cnxns typically have many watches, so use default cap here
paths = new HashSet<>();
paths = new HashMap<>();
watch2Paths.put(watcher, paths);
}

watcherModeManager.setWatcherMode(watcher, path, watcherMode);
WatchStats stats = paths.getOrDefault(path, WatchStats.NONE);
WatchStats newStats = stats.addMode(watcherMode);

return paths.add(path);
if (newStats != stats) {
paths.put(path, newStats);
if (watcherMode.isRecursive()) {
++recursiveWatchQty;
}
return true;
}

return false;
}

@Override
public synchronized void removeWatcher(Watcher watcher) {
Set<String> paths = watch2Paths.remove(watcher);
Map<String, WatchStats> paths = watch2Paths.remove(watcher);
if (paths == null) {
return;
}
for (String p : paths) {
for (String p : paths.keySet()) {
Set<Watcher> list = watchTable.get(p);
if (list != null) {
list.remove(watcher);
if (list.isEmpty()) {
watchTable.remove(p);
}
}
watcherModeManager.removeWatcher(watcher, p);
}
for (WatchStats stats : paths.values()) {
if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) {
--recursiveWatchQty;
}
}
}

Expand All @@ -123,8 +137,8 @@ public WatcherOrBitSet triggerWatch(String path, EventType type) {
public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
Set<Watcher> watchers = new HashSet<>();
PathParentIterator pathParentIterator = getPathParentIterator(path);
synchronized (this) {
PathParentIterator pathParentIterator = getPathParentIterator(path);
for (String localPath : pathParentIterator.asIterable()) {
Set<Watcher> thisWatchers = watchTable.get(localPath);
if (thisWatchers == null || thisWatchers.isEmpty()) {
Expand All @@ -133,20 +147,23 @@ public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet
Iterator<Watcher> iterator = thisWatchers.iterator();
while (iterator.hasNext()) {
Watcher watcher = iterator.next();
WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, localPath);
if (watcherMode.isRecursive()) {
if (type != EventType.NodeChildrenChanged) {
watchers.add(watcher);
}
} else if (!pathParentIterator.atParentPath()) {
Map<String, WatchStats> paths = watch2Paths.getOrDefault(watcher, Collections.emptyMap());
WatchStats stats = paths.get(localPath);
if (stats == null) {
LOG.warn("inconsistent watch table for watcher {}, {} not in path list", watcher, localPath);
continue;
}
if (!pathParentIterator.atParentPath()) {
watchers.add(watcher);
if (!watcherMode.isPersistent()) {
WatchStats newStats = stats.removeMode(WatcherMode.STANDARD);
if (newStats == WatchStats.NONE) {
iterator.remove();
Set<String> paths = watch2Paths.get(watcher);
if (paths != null) {
paths.remove(localPath);
}
paths.remove(localPath);
} else if (newStats != stats) {
paths.put(localPath, newStats);
}
} else if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) {
watchers.add(watcher);
}
}
if (thisWatchers.isEmpty()) {
Expand Down Expand Up @@ -199,7 +216,7 @@ public synchronized String toString() {
sb.append(watch2Paths.size()).append(" connections watching ").append(watchTable.size()).append(" paths\n");

int total = 0;
for (Set<String> paths : watch2Paths.values()) {
for (Map<String, WatchStats> paths : watch2Paths.values()) {
total += paths.size();
}
sb.append("Total watches:").append(total);
Expand All @@ -219,10 +236,10 @@ public synchronized void dumpWatches(PrintWriter pwriter, boolean byPath) {
}
}
} else {
for (Entry<Watcher, Set<String>> e : watch2Paths.entrySet()) {
for (Entry<Watcher, Map<String, WatchStats>> e : watch2Paths.entrySet()) {
pwriter.print("0x");
pwriter.println(Long.toHexString(((ServerCnxn) e.getKey()).getSessionId()));
for (String path : e.getValue()) {
for (String path : e.getValue().keySet()) {
pwriter.print("\t");
pwriter.println(path);
}
Expand All @@ -232,49 +249,49 @@ public synchronized void dumpWatches(PrintWriter pwriter, boolean byPath) {

@Override
public synchronized boolean containsWatcher(String path, Watcher watcher) {
WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, path);
PathParentIterator pathParentIterator = getPathParentIterator(path);
for (String localPath : pathParentIterator.asIterable()) {
Set<Watcher> watchers = watchTable.get(localPath);
if (!pathParentIterator.atParentPath()) {
if (watchers != null) {
return true; // at the leaf node, all watcher types match
}
}
if (watcherMode.isRecursive()) {
return true;
}
}
return false;
Set<Watcher> list = watchTable.get(path);
return list != null && list.contains(watcher);
}

@Override
public synchronized boolean removeWatcher(String path, Watcher watcher) {
Set<String> paths = watch2Paths.get(watcher);
if (paths == null || !paths.remove(path)) {
Map<String, WatchStats> paths = watch2Paths.get(watcher);
if (paths == null) {
return false;
}

WatchStats stats = paths.remove(path);
if (stats == null) {
return false;
}
if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) {
--recursiveWatchQty;
}

Set<Watcher> list = watchTable.get(path);
if (list == null || !list.remove(watcher)) {
LOG.warn("inconsistent watch table for path {}, {} not in watcher list", path, watcher);
return false;
}

if (list.isEmpty()) {
watchTable.remove(path);
}

watcherModeManager.removeWatcher(watcher, path);

return true;
}

// VisibleForTesting
Map<Watcher, Map<String, WatchStats>> getWatch2Paths() {
return watch2Paths;
}

@Override
public synchronized WatchesReport getWatches() {
Map<Long, Set<String>> id2paths = new HashMap<>();
for (Entry<Watcher, Set<String>> e : watch2Paths.entrySet()) {
for (Entry<Watcher, Map<String, WatchStats>> e : watch2Paths.entrySet()) {
Long id = ((ServerCnxn) e.getKey()).getSessionId();
Set<String> paths = new HashSet<>(e.getValue());
Set<String> paths = new HashSet<>(e.getValue().keySet());
id2paths.put(id, paths);
}
return new WatchesReport(id2paths);
Expand All @@ -296,7 +313,7 @@ public synchronized WatchesPathReport getWatchesByPath() {
@Override
public synchronized WatchesSummary getWatchesSummary() {
int totalWatches = 0;
for (Set<String> paths : watch2Paths.values()) {
for (Map<String, WatchStats> paths : watch2Paths.values()) {
totalWatches += paths.size();
}
return new WatchesSummary(watch2Paths.size(), watchTable.size(), totalWatches);
Expand All @@ -305,13 +322,13 @@ public synchronized WatchesSummary getWatchesSummary() {
@Override
public void shutdown() { /* do nothing */ }

@Override
public int getRecursiveWatchQty() {
return watcherModeManager.getRecursiveQty();
// VisibleForTesting
synchronized int getRecursiveWatchQty() {
return recursiveWatchQty;
}

private PathParentIterator getPathParentIterator(String path) {
if (watcherModeManager.getRecursiveQty() == 0) {
if (getRecursiveWatchQty() == 0) {
return PathParentIterator.forPathOnly(path);
}
return PathParentIterator.forAll(path);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zookeeper.server.watch;

/**
* Statistics for multiple different watches on one node.
*/
public final class WatchStats {
private static final WatchStats[] WATCH_STATS = new WatchStats[] {
new WatchStats(0), // NONE
new WatchStats(1), // STANDARD
new WatchStats(2), // PERSISTENT
new WatchStats(3), // STANDARD + PERSISTENT
new WatchStats(4), // PERSISTENT_RECURSIVE
new WatchStats(5), // STANDARD + PERSISTENT_RECURSIVE
new WatchStats(6), // PERSISTENT + PERSISTENT_RECURSIVE
new WatchStats(7), // STANDARD + PERSISTENT + PERSISTENT_RECURSIVE
};

/**
* Stats that have no watchers attached.
*
* <p>This could be used as start point to compute new stats using {@link #addMode(WatcherMode)}.
*/
public static final WatchStats NONE = WATCH_STATS[0];

private final int flags;

private WatchStats(int flags) {
this.flags = flags;
}

private static int modeToFlag(WatcherMode mode) {
return 1 << mode.ordinal();
}

/**
* Compute stats after given mode attached to node.
*
* @param mode watcher mode
* @return a new stats if given mode is not attached to this node before, otherwise old stats
*/
public WatchStats addMode(WatcherMode mode) {
int flags = this.flags | modeToFlag(mode);
return WATCH_STATS[flags];
}

/**
* Compute stats after given mode removed from node.
*
* @param mode watcher mode
* @return null if given mode is the last attached mode, otherwise a new stats
*/
public WatchStats removeMode(WatcherMode mode) {
int mask = ~modeToFlag(mode);
int flags = this.flags & mask;
if (flags == 0) {
return NONE;
}
return WATCH_STATS[flags];
}

/**
* Check whether given mode is attached to this node.
*
* @param mode watcher mode
* @return true if given mode is attached to this node.
*/
public boolean hasMode(WatcherMode mode) {
int flags = modeToFlag(mode);
return (this.flags & flags) != 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
public enum WatcherMode {
STANDARD(false, false),
PERSISTENT(true, false),
PERSISTENT_RECURSIVE(true, true)
PERSISTENT_RECURSIVE(true, true),
;

public static final WatcherMode DEFAULT_WATCHER_MODE = WatcherMode.STANDARD;
Expand Down
Loading

0 comments on commit a64dbf5

Please sign in to comment.