diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
index 0bc9ee5b2ed9..24d611ff7d0c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
@@ -19,8 +19,11 @@
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -31,6 +34,7 @@
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
@@ -40,12 +44,17 @@
import org.apache.hadoop.hbase.client.replication.TableCFs;
import org.apache.hadoop.hbase.io.WALLink;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
-import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.replication.ReplicationQueueData;
+import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.zookeeper.ZKDump;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -53,10 +62,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicLongMap;
/**
- * TODO: reimplement this tool
*
* Provides information about the existing states of replication, replication peers and queues.
* Usage: hbase org.apache.hadoop.hbase.replication.regionserver.DumpReplicationQueues [args]
@@ -185,7 +194,7 @@ protected static void printUsage(final String className, final String message) {
System.err.println("General Options:");
System.err.println(" -h|--h|--help Show this help and exit.");
System.err.println(" --distributed Poll each RS and print its own replication queue. "
- + "Default only polls ZooKeeper");
+ + "Default only polls replication table.");
System.err.println(" --hdfs Use HDFS to calculate usage of WALs by replication."
+ " It could be overestimated if replicating to multiple peers."
+ " --distributed flag is also needed.");
@@ -229,21 +238,45 @@ private int dumpReplicationQueues(DumpOptions opts) throws Exception {
LOG.info("Found [--distributed], will poll each RegionServer.");
Set peerIds =
peers.stream().map((peer) -> peer.getPeerId()).collect(Collectors.toSet());
- System.out.println(dumpQueues(zkw, peerIds, opts.isHdfs()));
+ System.out.println(dumpQueues(zkw, connection, peerIds, opts.isHdfs()));
System.out.println(dumpReplicationSummary());
} else {
- // use ZK instead
- System.out.print("Dumping replication znodes via ZooKeeper:");
- System.out.println(ZKDump.getReplicationZnodesDump(zkw));
+ // use replication table instead
+ System.out.println("Dumping replication info via replication table.");
+ System.out.println(dumpReplicationViaTable(connection));
}
return (0);
} catch (IOException e) {
return (-1);
} finally {
- zkw.close();
+ connection.close();
}
}
+ public String dumpReplicationViaTable(Connection connection) throws ReplicationException {
+ StringBuilder sb = new StringBuilder();
+ ReplicationQueueStorage queueStorage =
+ ReplicationStorageFactory.getReplicationQueueStorage(connection, getConf());
+ List replicationQueueDataList = queueStorage.listAllQueues();
+ for (ReplicationQueueData replicationQueueData : replicationQueueDataList) {
+ sb.append(replicationQueueData.getId().getPeerId()).append("\n");
+ sb.append(replicationQueueData.getId().getServerName().getServerName());
+ }
+
+ for (ReplicationQueueData replicationQueueData : replicationQueueDataList) {
+ for (ImmutableMap.Entry entry : replicationQueueData
+ .getOffsets().entrySet()) {
+ sb.append("\n").append(entry.getKey()).append("/").append(entry.getValue().getWal())
+ .append(": ").append(entry.getValue().getOffset());
+ }
+ }
+ Set allHFileRefs = queueStorage.getAllHFileRefs();
+ for (String hfileRef : allHFileRefs) {
+ sb.append("\n").append(hfileRef);
+ }
+ return sb.toString();
+ }
+
public String dumpReplicationSummary() {
StringBuilder sb = new StringBuilder();
if (!deletedQueues.isEmpty()) {
@@ -255,7 +288,7 @@ public String dumpReplicationSummary() {
}
if (!deadRegionServers.isEmpty()) {
sb.append("Found " + deadRegionServers.size() + " dead regionservers"
- + ", restart one regionserver to transfer the queues of dead regionservers\n");
+ + ", restart one regionServer to transfer the queues of dead regionservers\n");
for (String deadRs : deadRegionServers) {
sb.append(" " + deadRs + "\n");
}
@@ -294,71 +327,91 @@ public String dumpPeersState(List peers) throws Exce
return sb.toString();
}
- public String dumpQueues(ZKWatcher zkw, Set peerIds, boolean hdfs) throws Exception {
- ReplicationQueueStorage queueStorage;
+ public String dumpQueues(ZKWatcher zkw, Connection connection, Set peerIds, boolean hdfs)
+ throws Exception {
StringBuilder sb = new StringBuilder();
+ ReplicationQueueStorage queueStorage =
+ ReplicationStorageFactory.getReplicationQueueStorage(connection, getConf());
+ Set liveRegionServers = ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode)
+ .stream().map(ServerName::parseServerName).collect(Collectors.toSet());
+
+ List regionServers = queueStorage.listAllReplicators();
+ if (regionServers == null || regionServers.isEmpty()) {
+ return sb.toString();
+ }
+ for (ServerName regionServer : regionServers) {
+ List queueIds = queueStorage.listAllQueueIds(regionServer);
- // queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
- // Set liveRegionServers = ZKUtil.listChildrenNoWatch(zkw,
- // zkw.getZNodePaths().rsZNode)
- // .stream().map(ServerName::parseServerName).collect(Collectors.toSet());
- //
- // Loops each peer on each RS and dumps the queues
- // List regionservers = queueStorage.getListOfReplicators();
- // if (regionservers == null || regionservers.isEmpty()) {
- // return sb.toString();
- // }
- // for (ServerName regionserver : regionservers) {
- // List queueIds = queueStorage.getAllQueues(regionserver);
- // if (!liveRegionServers.contains(regionserver)) {
- // deadRegionServers.add(regionserver.getServerName());
- // }
- // for (String queueId : queueIds) {
- // ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
- // List wals = queueStorage.getWALsInQueue(regionserver, queueId);
- // Collections.sort(wals);
- // if (!peerIds.contains(queueInfo.getPeerId())) {
- // deletedQueues.add(regionserver + "/" + queueId);
- // sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs));
- // } else {
- // sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs));
- // }
- // }
- // }
+ if (!liveRegionServers.contains(regionServer)) {
+ deadRegionServers.add(regionServer.getServerName());
+ }
+ for (ReplicationQueueId queueId : queueIds) {
+ List wals = null;
+ if (queueId.isRecovered()) {
+ wals = AbstractFSWALProvider
+ .getArchivedWALFiles(connection.getConfiguration(), queueId.getSourceServerName().get(),
+ URLEncoder.encode(queueId.getSourceServerName().get().toString(),
+ StandardCharsets.UTF_8.name()))
+ .stream().map(Path::toString).collect(Collectors.toList());
+ } else {
+ wals =
+ AbstractFSWALProvider
+ .getArchivedWALFiles(connection.getConfiguration(), queueId.getServerName(),
+ URLEncoder.encode(queueId.getServerName().toString(),
+ StandardCharsets.UTF_8.name()))
+ .stream().map(Path::toString).collect(Collectors.toList());
+ }
+ Collections.sort(wals);
+ if (!peerIds.contains(queueId.getPeerId())) {
+ deletedQueues.add(regionServer + "/" + queueId);
+ sb.append(formatQueue(regionServer, queueStorage, wals, queueId, true, hdfs));
+ } else {
+ sb.append(formatQueue(regionServer, queueStorage, wals, queueId, false, hdfs));
+ }
+ }
+ }
return sb.toString();
}
- private String formatQueue(ServerName regionserver, ReplicationQueueStorage queueStorage,
- ReplicationQueueInfo queueInfo, String queueId, List wals, boolean isDeleted,
- boolean hdfs) throws Exception {
+ private String formatQueue(ServerName regionServer, ReplicationQueueStorage queueStorage,
+ List wals, ReplicationQueueId queueId, boolean isDeleted, boolean hdfs)
+ throws Exception {
StringBuilder sb = new StringBuilder();
- List deadServers;
-
- sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n");
+ sb.append("Dumping replication queue info for RegionServer: [" + regionServer + "]" + "\n");
sb.append(" Queue znode: " + queueId + "\n");
- sb.append(" PeerID: " + queueInfo.getPeerId() + "\n");
- sb.append(" Recovered: " + queueInfo.isQueueRecovered() + "\n");
- deadServers = queueInfo.getDeadRegionServers();
- if (deadServers.isEmpty()) {
- sb.append(" No dead RegionServers found in this queue." + "\n");
+ sb.append(" PeerID: " + queueId.getPeerId() + "\n");
+ sb.append(" Recovered: " + queueId.isRecovered() + "\n");
+ if (queueId.getSourceServerName().isPresent()) {
+ sb.append(" Dead RegionServers: " + queueId.getSourceServerName().get() + "\n");
} else {
- sb.append(" Dead RegionServers: " + deadServers + "\n");
+ sb.append(" No dead RegionServers found in this queue." + "\n");
}
sb.append(" Was deleted: " + isDeleted + "\n");
sb.append(" Number of WALs in replication queue: " + wals.size() + "\n");
- peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size());
+ peersQueueSize.addAndGet(queueId.getPeerId(), wals.size());
+
+ Set> offsets =
+ queueStorage.getOffsets(queueId).entrySet();
for (String wal : wals) {
- // long position = queueStorage.getWALPosition(regionserver, queueInfo.getPeerId(), wal);
- // sb.append(" Replication position for " + wal + ": "
- // + (position > 0 ? position : "0" + " (not started or nothing to replicate)") + "\n");
+ long position = -1;
+ String walGroup = null;
+ for (Map.Entry entry : offsets) {
+ walGroup = entry.getKey();
+ ReplicationGroupOffset offset = entry.getValue();
+ if (offset.getWal().equals(wal)) {
+ position = offset.getOffset();
+ }
+ }
+ sb.append(" Replication position for " + (walGroup != null ? walGroup + "/" + wal : wal)
+ + ": " + (position > 0 ? position : "0" + " (not started or nothing to replicate)") + "\n");
}
if (hdfs) {
FileSystem fs = FileSystem.get(getConf());
sb.append(" Total size of WALs on HDFS for this queue: "
- + StringUtils.humanSize(getTotalWALSize(fs, wals, regionserver)) + "\n");
+ + StringUtils.humanSize(getTotalWALSize(fs, wals, regionServer)) + "\n");
}
return sb.toString();
}
@@ -366,8 +419,7 @@ private String formatQueue(ServerName regionserver, ReplicationQueueStorage queu
/**
* return total size in bytes from a list of WALs
*/
- private long getTotalWALSize(FileSystem fs, List wals, ServerName server)
- throws IOException {
+ private long getTotalWALSize(FileSystem fs, List wals, ServerName server) {
long size = 0;
FileStatus fileStatus;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDumpReplicationQueues.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDumpReplicationQueues.java
index 3475ae5c1925..e264683ec57f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDumpReplicationQueues.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDumpReplicationQueues.java
@@ -83,7 +83,7 @@ public void testDumpReplicationReturnsWalSorted() throws Exception {
Set peerIds = new HashSet<>();
peerIds.add("1");
dumpQueues.setConf(config);
- String dump = dumpQueues.dumpQueues(zkWatcherMock, peerIds, false);
+ String dump = dumpQueues.dumpQueues(zkWatcherMock, null, peerIds, false);
String[] parsedDump = dump.split("Replication position for");
assertEquals("Parsed dump should have 4 parts.", 4, parsedDump.length);
assertTrue(