Skip to content

Commit

Permalink
HBASE-27217 Revisit the DumpReplicationQueues tool (#4810)
Browse files Browse the repository at this point in the history
Signed-off-by: Duo Zhang <[email protected]>
  • Loading branch information
2005hithlj authored and Apache9 committed Nov 27, 2022
1 parent 3926abf commit 209865d
Show file tree
Hide file tree
Showing 3 changed files with 284 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand All @@ -31,7 +35,7 @@
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Admin;
Expand All @@ -40,28 +44,33 @@
import org.apache.hadoop.hbase.client.replication.TableCFs;
import org.apache.hadoop.hbase.io.WALLink;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
import org.apache.hadoop.hbase.replication.ReplicationOffsetUtil;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueData;
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.zookeeper.ZKDump;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicLongMap;

/**
* TODO: reimplement this tool
* <p/>
* Provides information about the existing states of replication, replication peers and queues.
* Usage: hbase org.apache.hadoop.hbase.replication.regionserver.DumpReplicationQueues [args]
* Arguments: --distributed Polls each RS to dump information about the queue --hdfs Reports HDFS
* usage by the replication queues (note: can be overestimated).
* usage by the replication queues (note: can be overestimated). In the new version, we
* reimplemented the DumpReplicationQueues tool to support obtaining information from replication
* table.
*/
@InterfaceAudience.Private
public class DumpReplicationQueues extends Configured implements Tool {
Expand Down Expand Up @@ -185,7 +194,7 @@ protected static void printUsage(final String className, final String message) {
System.err.println("General Options:");
System.err.println(" -h|--h|--help Show this help and exit.");
System.err.println(" --distributed Poll each RS and print its own replication queue. "
+ "Default only polls ZooKeeper");
+ "Default only polls replication table.");
System.err.println(" --hdfs Use HDFS to calculate usage of WALs by replication."
+ " It could be overestimated if replicating to multiple peers."
+ " --distributed flag is also needed.");
Expand All @@ -201,13 +210,7 @@ private int dumpReplicationQueues(DumpOptions opts) throws Exception {
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();

ZKWatcher zkw =
new ZKWatcher(conf, "DumpReplicationQueues" + EnvironmentEdgeManager.currentTime(),
new WarnOnlyAbortable(), true);

try {
// Our zk watcher
LOG.info("Our Quorum: " + zkw.getQuorum());
List<TableCFs> replicatedTableCFs = admin.listReplicatedTableCFs();
if (replicatedTableCFs.isEmpty()) {
LOG.info("No tables with a configured replication peer were found.");
Expand All @@ -229,21 +232,72 @@ private int dumpReplicationQueues(DumpOptions opts) throws Exception {
LOG.info("Found [--distributed], will poll each RegionServer.");
Set<String> peerIds =
peers.stream().map((peer) -> peer.getPeerId()).collect(Collectors.toSet());
System.out.println(dumpQueues(zkw, peerIds, opts.isHdfs()));
System.out.println(dumpQueues(connection, peerIds, opts.isHdfs(), conf));
System.out.println(dumpReplicationSummary());
} else {
// use ZK instead
System.out.print("Dumping replication znodes via ZooKeeper:");
System.out.println(ZKDump.getReplicationZnodesDump(zkw));
// use replication table instead
System.out.println("Dumping replication info via replication table.");
System.out.println(dumpReplicationViaTable(connection, conf));
}
return (0);
} catch (IOException e) {
return (-1);
} finally {
zkw.close();
connection.close();
}
}

public String dumpReplicationViaTable(Connection connection, Configuration conf)
throws ReplicationException, IOException {
StringBuilder sb = new StringBuilder();
ReplicationQueueStorage queueStorage =
ReplicationStorageFactory.getReplicationQueueStorage(connection, conf);

// The dump info format is as follows:
// peers:
// peers/1: zk1:2181:/hbase
// peers/1/peer-state: ENABLED
// rs:
// rs/rs1,16020,1664092120094/1/rs1%2C16020%2C1664092120094.1664096778778: 123
// rs/rs2,16020,1664092120094/2/rs1%2C16020%2C1664092120094.1664096778778: 321
// hfile-refs:
// hfile-refs/1/hfile1,hfile2
// hfile-refs/2/hfile3,hfile4
String peersKey = "peers";
sb.append(peersKey).append(": ").append("\n");
List<ReplicationPeerDescription> repPeerDescs = connection.getAdmin().listReplicationPeers();
for (ReplicationPeerDescription repPeerDesc : repPeerDescs) {
sb.append(peersKey).append("/").append(repPeerDesc.getPeerId()).append(": ")
.append(repPeerDesc.getPeerConfig().getClusterKey()).append("\n");
sb.append(peersKey).append("/").append(repPeerDesc.getPeerId()).append("/peer-state: ")
.append(repPeerDesc.isEnabled() ? "ENABLED" : "DISABLED").append("\n");
}

List<ReplicationQueueData> repQueueDataList = queueStorage.listAllQueues();
String rsKey = "rs";
sb.append(rsKey).append(": ").append("\n");
for (ReplicationQueueData repQueueData : repQueueDataList) {
String peerId = repQueueData.getId().getPeerId();
for (ImmutableMap.Entry<String, ReplicationGroupOffset> entry : repQueueData.getOffsets()
.entrySet()) {
sb.append(rsKey).append("/").append(entry.getKey()).append("/").append(peerId).append("/")
.append(entry.getValue().getWal()).append(": ").append(entry.getValue().getOffset())
.append("\n");
}
}

List<String> peerIds = queueStorage.getAllPeersFromHFileRefsQueue();
String hfileKey = "hfile-refs";
sb.append(hfileKey).append(": ").append("\n");
for (String peerId : peerIds) {
List<String> hfiles = queueStorage.getReplicableHFiles(peerId);
sb.append(hfileKey).append("/").append(peerId).append("/").append(String.join(",", hfiles))
.append("\n");
}

return sb.toString();
}

public String dumpReplicationSummary() {
StringBuilder sb = new StringBuilder();
if (!deletedQueues.isEmpty()) {
Expand Down Expand Up @@ -294,80 +348,111 @@ public String dumpPeersState(List<ReplicationPeerDescription> peers) throws Exce
return sb.toString();
}

public String dumpQueues(ZKWatcher zkw, Set<String> peerIds, boolean hdfs) throws Exception {
ReplicationQueueStorage queueStorage;
public String dumpQueues(Connection connection, Set<String> peerIds, boolean hdfs,
Configuration conf) throws Exception {
StringBuilder sb = new StringBuilder();
ReplicationQueueStorage queueStorage =
ReplicationStorageFactory.getReplicationQueueStorage(connection, conf);

Set<ServerName> liveRegionServers =
connection.getAdmin().getClusterMetrics().getLiveServerMetrics().keySet();

// queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
// Set<ServerName> liveRegionServers = ZKUtil.listChildrenNoWatch(zkw,
// zkw.getZNodePaths().rsZNode)
// .stream().map(ServerName::parseServerName).collect(Collectors.toSet());
//
// Loops each peer on each RS and dumps the queues
// List<ServerName> regionservers = queueStorage.getListOfReplicators();
// if (regionservers == null || regionservers.isEmpty()) {
// return sb.toString();
// }
// for (ServerName regionserver : regionservers) {
// List<String> queueIds = queueStorage.getAllQueues(regionserver);
// if (!liveRegionServers.contains(regionserver)) {
// deadRegionServers.add(regionserver.getServerName());
// }
// for (String queueId : queueIds) {
// ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
// List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId);
// Collections.sort(wals);
// if (!peerIds.contains(queueInfo.getPeerId())) {
// deletedQueues.add(regionserver + "/" + queueId);
// sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs));
// } else {
// sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs));
// }
// }
// }
List<ServerName> regionServers = queueStorage.listAllReplicators();
if (regionServers == null || regionServers.isEmpty()) {
return sb.toString();
}
for (ServerName regionServer : regionServers) {
List<ReplicationQueueId> queueIds = queueStorage.listAllQueueIds(regionServer);

if (!liveRegionServers.contains(regionServer)) {
deadRegionServers.add(regionServer.getServerName());
}
for (ReplicationQueueId queueId : queueIds) {
List<String> tmpWals = new ArrayList<>();
// wals
AbstractFSWALProvider
.getWALFiles(connection.getConfiguration(), queueId.getServerWALsBelongTo()).stream()
.map(Path::toString).forEach(tmpWals::add);

// old wals
AbstractFSWALProvider.getArchivedWALFiles(connection.getConfiguration(),
queueId.getServerWALsBelongTo(), URLEncoder
.encode(queueId.getServerWALsBelongTo().toString(), StandardCharsets.UTF_8.name()))
.stream().map(Path::toString).forEach(tmpWals::add);

Map<String, ReplicationGroupOffset> offsets = queueStorage.getOffsets(queueId);
// filter out the wal files that should replicate
List<String> wals = new ArrayList<>();
for (Map.Entry<String, ReplicationGroupOffset> entry : offsets.entrySet()) {
ReplicationGroupOffset offset = entry.getValue();
for (String wal : tmpWals) {
if (ReplicationOffsetUtil.shouldReplicate(offset, wal)) {
wals.add(wal);
}
}
}
Collections.sort(wals, Comparator.comparingLong(AbstractFSWALProvider::getTimestamp));
if (!peerIds.contains(queueId.getPeerId())) {
deletedQueues.add(regionServer + "/" + queueId);
sb.append(formatQueue(regionServer, offsets, wals, queueId, true, hdfs));
} else {
sb.append(formatQueue(regionServer, offsets, wals, queueId, false, hdfs));
}
}
}
return sb.toString();
}

private String formatQueue(ServerName regionserver, ReplicationQueueStorage queueStorage,
ReplicationQueueInfo queueInfo, String queueId, List<String> wals, boolean isDeleted,
boolean hdfs) throws Exception {
private String formatQueue(ServerName regionServer, Map<String, ReplicationGroupOffset> offsets,
List<String> wals, ReplicationQueueId queueId, boolean isDeleted, boolean hdfs)
throws Exception {
StringBuilder sb = new StringBuilder();

List<ServerName> deadServers;

sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n");
sb.append(" Queue znode: " + queueId + "\n");
sb.append(" PeerID: " + queueInfo.getPeerId() + "\n");
sb.append(" Recovered: " + queueInfo.isQueueRecovered() + "\n");
deadServers = queueInfo.getDeadRegionServers();
if (deadServers.isEmpty()) {
sb.append(" No dead RegionServers found in this queue." + "\n");
sb.append("Dumping replication queue info for RegionServer: [" + regionServer + "]" + "\n");
sb.append(" Queue id: " + queueId + "\n");
sb.append(" PeerID: " + queueId.getPeerId() + "\n");
sb.append(" Recovered: " + queueId.isRecovered() + "\n");
// In new version, we only record the first dead RegionServer in queueId.
if (queueId.getSourceServerName().isPresent()) {
sb.append(" Dead RegionServer: " + queueId.getSourceServerName().get() + "\n");
} else {
sb.append(" Dead RegionServers: " + deadServers + "\n");
sb.append(" No dead RegionServer found in this queue." + "\n");
}
sb.append(" Was deleted: " + isDeleted + "\n");
sb.append(" Number of WALs in replication queue: " + wals.size() + "\n");
peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size());

for (String wal : wals) {
// long position = queueStorage.getWALPosition(regionserver, queueInfo.getPeerId(), wal);
// sb.append(" Replication position for " + wal + ": "
// + (position > 0 ? position : "0" + " (not started or nothing to replicate)") + "\n");
peersQueueSize.addAndGet(queueId.getPeerId(), wals.size());

for (Map.Entry<String, ReplicationGroupOffset> entry : offsets.entrySet()) {
String walGroup = entry.getKey();
ReplicationGroupOffset offset = entry.getValue();
for (String wal : wals) {
long position = 0;
if (offset.getWal().equals(wal)) {
position = offset.getOffset();
}
sb.append(
" Replication position for " + (walGroup != null ? walGroup + "/" + wal : wal) + ": ");
if (position == 0) {
sb.append("0 (not started or nothing to replicate)");
} else if (position > 0) {
sb.append(position);
}
sb.append("\n");
}
}

if (hdfs) {
FileSystem fs = FileSystem.get(getConf());
sb.append(" Total size of WALs on HDFS for this queue: "
+ StringUtils.humanSize(getTotalWALSize(fs, wals, regionserver)) + "\n");
+ StringUtils.humanSize(getTotalWALSize(fs, wals, regionServer)) + "\n");
}
return sb.toString();
}

/**
* return total size in bytes from a list of WALs
*/
private long getTotalWALSize(FileSystem fs, List<String> wals, ServerName server)
throws IOException {
private long getTotalWALSize(FileSystem fs, List<String> wals, ServerName server) {
long size = 0;
FileStatus fileStatus;

Expand All @@ -389,19 +474,4 @@ private long getTotalWALSize(FileSystem fs, List<String> wals, ServerName server
totalSizeOfWALs += size;
return size;
}

private static class WarnOnlyAbortable implements Abortable {
@Override
public void abort(String why, Throwable e) {
LOG.warn("DumpReplicationQueue received abort, ignoring. Reason: " + why);
if (LOG.isDebugEnabled()) {
LOG.debug(e.toString(), e);
}
}

@Override
public boolean isAborted() {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,26 @@ public static List<Path> getArchivedWALFiles(Configuration conf, ServerName serv
return archivedWalFiles;
}

/**
* List all the wal files for a logPrefix.
*/
public static List<Path> getWALFiles(Configuration c, ServerName serverName) throws IOException {
Path walRoot = new Path(CommonFSUtils.getWALRootDir(c), HConstants.HREGION_LOGDIR_NAME);
FileSystem fs = walRoot.getFileSystem(c);
List<Path> walFiles = new ArrayList<>();
Path walDir = new Path(walRoot, serverName.toString());
try {
for (FileStatus status : fs.listStatus(walDir)) {
if (status.isFile()) {
walFiles.add(status.getPath());
}
}
} catch (FileNotFoundException e) {
LOG.info("WAL dir {} not exists", walDir);
}
return walFiles;
}

/**
* Pulls a ServerName out of a Path generated according to our layout rules. In the below layouts,
* this method ignores the format of the logfile component. Current format: [base directory for
Expand Down
Loading

0 comments on commit 209865d

Please sign in to comment.