From a0822a2498bb7a7ba86043a4775836960adf9243 Mon Sep 17 00:00:00 2001 From: LiangJun He <2005hithlj@163.com> Date: Sun, 13 Nov 2022 22:03:36 +0800 Subject: [PATCH] HBASE-27217 Revisit the DumpReplicationQueues tool (#4810) Signed-off-by: Duo Zhang --- .../regionserver/DumpReplicationQueues.java | 240 +++++++++++------- .../hbase/wal/AbstractFSWALProvider.java | 20 ++ .../TestDumpReplicationQueues.java | 159 ++++++++---- 3 files changed, 284 insertions(+), 135 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java index 98d0a55fbc43..b284e3f6837f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java @@ -19,8 +19,12 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -31,7 +35,7 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.Admin; @@ -40,28 +44,33 @@ import org.apache.hadoop.hbase.client.replication.TableCFs; import org.apache.hadoop.hbase.io.WALLink; import org.apache.hadoop.hbase.procedure2.util.StringUtils; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; +import org.apache.hadoop.hbase.replication.ReplicationOffsetUtil; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; -import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; +import org.apache.hadoop.hbase.replication.ReplicationQueueData; +import org.apache.hadoop.hbase.replication.ReplicationQueueId; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.zookeeper.ZKDump; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicLongMap; /** - * TODO: reimplement this tool *

* Provides information about the existing states of replication, replication peers and queues. * Usage: hbase org.apache.hadoop.hbase.replication.regionserver.DumpReplicationQueues [args] * Arguments: --distributed Polls each RS to dump information about the queue --hdfs Reports HDFS - * usage by the replication queues (note: can be overestimated). + * usage by the replication queues (note: can be overestimated). In the new version, we + * reimplemented the DumpReplicationQueues tool to support obtaining information from replication + * table. */ @InterfaceAudience.Private public class DumpReplicationQueues extends Configured implements Tool { @@ -185,7 +194,7 @@ protected static void printUsage(final String className, final String message) { System.err.println("General Options:"); System.err.println(" -h|--h|--help Show this help and exit."); System.err.println(" --distributed Poll each RS and print its own replication queue. " - + "Default only polls ZooKeeper"); + + "Default only polls replication table."); System.err.println(" --hdfs Use HDFS to calculate usage of WALs by replication." + " It could be overestimated if replicating to multiple peers." + " --distributed flag is also needed."); @@ -201,13 +210,7 @@ private int dumpReplicationQueues(DumpOptions opts) throws Exception { Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); - ZKWatcher zkw = - new ZKWatcher(conf, "DumpReplicationQueues" + EnvironmentEdgeManager.currentTime(), - new WarnOnlyAbortable(), true); - try { - // Our zk watcher - LOG.info("Our Quorum: " + zkw.getQuorum()); List replicatedTableCFs = admin.listReplicatedTableCFs(); if (replicatedTableCFs.isEmpty()) { LOG.info("No tables with a configured replication peer were found."); @@ -229,21 +232,72 @@ private int dumpReplicationQueues(DumpOptions opts) throws Exception { LOG.info("Found [--distributed], will poll each RegionServer."); Set peerIds = peers.stream().map((peer) -> peer.getPeerId()).collect(Collectors.toSet()); - System.out.println(dumpQueues(zkw, peerIds, opts.isHdfs())); + System.out.println(dumpQueues(connection, peerIds, opts.isHdfs(), conf)); System.out.println(dumpReplicationSummary()); } else { - // use ZK instead - System.out.print("Dumping replication znodes via ZooKeeper:"); - System.out.println(ZKDump.getReplicationZnodesDump(zkw)); + // use replication table instead + System.out.println("Dumping replication info via replication table."); + System.out.println(dumpReplicationViaTable(connection, conf)); } return (0); } catch (IOException e) { return (-1); } finally { - zkw.close(); + connection.close(); } } + public String dumpReplicationViaTable(Connection connection, Configuration conf) + throws ReplicationException, IOException { + StringBuilder sb = new StringBuilder(); + ReplicationQueueStorage queueStorage = + ReplicationStorageFactory.getReplicationQueueStorage(connection, conf); + + // The dump info format is as follows: + // peers: + // peers/1: zk1:2181:/hbase + // peers/1/peer-state: ENABLED + // rs: + // rs/rs1,16020,1664092120094/1/rs1%2C16020%2C1664092120094.1664096778778: 123 + // rs/rs2,16020,1664092120094/2/rs1%2C16020%2C1664092120094.1664096778778: 321 + // hfile-refs: + // hfile-refs/1/hfile1,hfile2 + // hfile-refs/2/hfile3,hfile4 + String peersKey = "peers"; + sb.append(peersKey).append(": ").append("\n"); + List repPeerDescs = connection.getAdmin().listReplicationPeers(); + for (ReplicationPeerDescription repPeerDesc : repPeerDescs) { + sb.append(peersKey).append("/").append(repPeerDesc.getPeerId()).append(": ") + .append(repPeerDesc.getPeerConfig().getClusterKey()).append("\n"); + sb.append(peersKey).append("/").append(repPeerDesc.getPeerId()).append("/peer-state: ") + .append(repPeerDesc.isEnabled() ? "ENABLED" : "DISABLED").append("\n"); + } + + List repQueueDataList = queueStorage.listAllQueues(); + String rsKey = "rs"; + sb.append(rsKey).append(": ").append("\n"); + for (ReplicationQueueData repQueueData : repQueueDataList) { + String peerId = repQueueData.getId().getPeerId(); + for (ImmutableMap.Entry entry : repQueueData.getOffsets() + .entrySet()) { + sb.append(rsKey).append("/").append(entry.getKey()).append("/").append(peerId).append("/") + .append(entry.getValue().getWal()).append(": ").append(entry.getValue().getOffset()) + .append("\n"); + } + } + + List peerIds = queueStorage.getAllPeersFromHFileRefsQueue(); + String hfileKey = "hfile-refs"; + sb.append(hfileKey).append(": ").append("\n"); + for (String peerId : peerIds) { + List hfiles = queueStorage.getReplicableHFiles(peerId); + sb.append(hfileKey).append("/").append(peerId).append("/").append(String.join(",", hfiles)) + .append("\n"); + } + + return sb.toString(); + } + public String dumpReplicationSummary() { StringBuilder sb = new StringBuilder(); if (!deletedQueues.isEmpty()) { @@ -294,71 +348,103 @@ public String dumpPeersState(List peers) throws Exce return sb.toString(); } - public String dumpQueues(ZKWatcher zkw, Set peerIds, boolean hdfs) throws Exception { - ReplicationQueueStorage queueStorage; + public String dumpQueues(Connection connection, Set peerIds, boolean hdfs, + Configuration conf) throws Exception { StringBuilder sb = new StringBuilder(); + ReplicationQueueStorage queueStorage = + ReplicationStorageFactory.getReplicationQueueStorage(connection, conf); + + Set liveRegionServers = + connection.getAdmin().getClusterMetrics().getLiveServerMetrics().keySet(); - // queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); - // Set liveRegionServers = ZKUtil.listChildrenNoWatch(zkw, - // zkw.getZNodePaths().rsZNode) - // .stream().map(ServerName::parseServerName).collect(Collectors.toSet()); - // - // Loops each peer on each RS and dumps the queues - // List regionservers = queueStorage.getListOfReplicators(); - // if (regionservers == null || regionservers.isEmpty()) { - // return sb.toString(); - // } - // for (ServerName regionserver : regionservers) { - // List queueIds = queueStorage.getAllQueues(regionserver); - // if (!liveRegionServers.contains(regionserver)) { - // deadRegionServers.add(regionserver.getServerName()); - // } - // for (String queueId : queueIds) { - // ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); - // List wals = queueStorage.getWALsInQueue(regionserver, queueId); - // Collections.sort(wals); - // if (!peerIds.contains(queueInfo.getPeerId())) { - // deletedQueues.add(regionserver + "/" + queueId); - // sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs)); - // } else { - // sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs)); - // } - // } - // } + List regionServers = queueStorage.listAllReplicators(); + if (regionServers == null || regionServers.isEmpty()) { + return sb.toString(); + } + for (ServerName regionServer : regionServers) { + List queueIds = queueStorage.listAllQueueIds(regionServer); + + if (!liveRegionServers.contains(regionServer)) { + deadRegionServers.add(regionServer.getServerName()); + } + for (ReplicationQueueId queueId : queueIds) { + List tmpWals = new ArrayList<>(); + // wals + AbstractFSWALProvider + .getWALFiles(connection.getConfiguration(), queueId.getServerWALsBelongTo()).stream() + .map(Path::toString).forEach(tmpWals::add); + + // old wals + AbstractFSWALProvider.getArchivedWALFiles(connection.getConfiguration(), + queueId.getServerWALsBelongTo(), URLEncoder + .encode(queueId.getServerWALsBelongTo().toString(), StandardCharsets.UTF_8.name())) + .stream().map(Path::toString).forEach(tmpWals::add); + + Map offsets = queueStorage.getOffsets(queueId); + // filter out the wal files that should replicate + List wals = new ArrayList<>(); + for (Map.Entry entry : offsets.entrySet()) { + ReplicationGroupOffset offset = entry.getValue(); + for (String wal : tmpWals) { + if (ReplicationOffsetUtil.shouldReplicate(offset, wal)) { + wals.add(wal); + } + } + } + Collections.sort(wals, Comparator.comparingLong(AbstractFSWALProvider::getTimestamp)); + if (!peerIds.contains(queueId.getPeerId())) { + deletedQueues.add(regionServer + "/" + queueId); + sb.append(formatQueue(regionServer, offsets, wals, queueId, true, hdfs)); + } else { + sb.append(formatQueue(regionServer, offsets, wals, queueId, false, hdfs)); + } + } + } return sb.toString(); } - private String formatQueue(ServerName regionserver, ReplicationQueueStorage queueStorage, - ReplicationQueueInfo queueInfo, String queueId, List wals, boolean isDeleted, - boolean hdfs) throws Exception { + private String formatQueue(ServerName regionServer, Map offsets, + List wals, ReplicationQueueId queueId, boolean isDeleted, boolean hdfs) + throws Exception { StringBuilder sb = new StringBuilder(); - List deadServers; - - sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n"); - sb.append(" Queue znode: " + queueId + "\n"); - sb.append(" PeerID: " + queueInfo.getPeerId() + "\n"); - sb.append(" Recovered: " + queueInfo.isQueueRecovered() + "\n"); - deadServers = queueInfo.getDeadRegionServers(); - if (deadServers.isEmpty()) { - sb.append(" No dead RegionServers found in this queue." + "\n"); + sb.append("Dumping replication queue info for RegionServer: [" + regionServer + "]" + "\n"); + sb.append(" Queue id: " + queueId + "\n"); + sb.append(" PeerID: " + queueId.getPeerId() + "\n"); + sb.append(" Recovered: " + queueId.isRecovered() + "\n"); + // In new version, we only record the first dead RegionServer in queueId. + if (queueId.getSourceServerName().isPresent()) { + sb.append(" Dead RegionServer: " + queueId.getSourceServerName().get() + "\n"); } else { - sb.append(" Dead RegionServers: " + deadServers + "\n"); + sb.append(" No dead RegionServer found in this queue." + "\n"); } sb.append(" Was deleted: " + isDeleted + "\n"); sb.append(" Number of WALs in replication queue: " + wals.size() + "\n"); - peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size()); - - for (String wal : wals) { - // long position = queueStorage.getWALPosition(regionserver, queueInfo.getPeerId(), wal); - // sb.append(" Replication position for " + wal + ": " - // + (position > 0 ? position : "0" + " (not started or nothing to replicate)") + "\n"); + peersQueueSize.addAndGet(queueId.getPeerId(), wals.size()); + + for (Map.Entry entry : offsets.entrySet()) { + String walGroup = entry.getKey(); + ReplicationGroupOffset offset = entry.getValue(); + for (String wal : wals) { + long position = 0; + if (offset.getWal().equals(wal)) { + position = offset.getOffset(); + } + sb.append( + " Replication position for " + (walGroup != null ? walGroup + "/" + wal : wal) + ": "); + if (position == 0) { + sb.append("0 (not started or nothing to replicate)"); + } else if (position > 0) { + sb.append(position); + } + sb.append("\n"); + } } if (hdfs) { FileSystem fs = FileSystem.get(getConf()); sb.append(" Total size of WALs on HDFS for this queue: " - + StringUtils.humanSize(getTotalWALSize(fs, wals, regionserver)) + "\n"); + + StringUtils.humanSize(getTotalWALSize(fs, wals, regionServer)) + "\n"); } return sb.toString(); } @@ -366,8 +452,7 @@ private String formatQueue(ServerName regionserver, ReplicationQueueStorage queu /** * return total size in bytes from a list of WALs */ - private long getTotalWALSize(FileSystem fs, List wals, ServerName server) - throws IOException { + private long getTotalWALSize(FileSystem fs, List wals, ServerName server) { long size = 0; FileStatus fileStatus; @@ -389,19 +474,4 @@ private long getTotalWALSize(FileSystem fs, List wals, ServerName server totalSizeOfWALs += size; return size; } - - private static class WarnOnlyAbortable implements Abortable { - @Override - public void abort(String why, Throwable e) { - LOG.warn("DumpReplicationQueue received abort, ignoring. Reason: " + why); - if (LOG.isDebugEnabled()) { - LOG.debug(e.toString(), e); - } - } - - @Override - public boolean isAborted() { - return false; - } - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index 480866949993..5bbc66791967 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -379,6 +379,26 @@ public static List getArchivedWALFiles(Configuration conf, ServerName serv return archivedWalFiles; } + /** + * List all the wal files for a logPrefix. + */ + public static List getWALFiles(Configuration c, ServerName serverName) throws IOException { + Path walRoot = new Path(CommonFSUtils.getWALRootDir(c), HConstants.HREGION_LOGDIR_NAME); + FileSystem fs = walRoot.getFileSystem(c); + List walFiles = new ArrayList<>(); + Path walDir = new Path(walRoot, serverName.toString()); + try { + for (FileStatus status : fs.listStatus(walDir)) { + if (status.isFile()) { + walFiles.add(status.getPath()); + } + } + } catch (FileNotFoundException e) { + LOG.info("WAL dir {} not exists", walDir); + } + return walFiles; + } + /** * Pulls a ServerName out of a Path generated according to our layout rules. In the below layouts, * this method ignores the format of the logfile component. Current format: [base directory for diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDumpReplicationQueues.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDumpReplicationQueues.java index 3475ae5c1925..3e1dc624fe7d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDumpReplicationQueues.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDumpReplicationQueues.java @@ -17,34 +17,43 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; +import org.apache.hadoop.hbase.replication.ReplicationQueueId; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.After; +import org.junit.Before; import org.junit.ClassRule; -import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; /** * Tests for DumpReplicationQueues tool */ -// TODO: reimplement -@Ignore @Category({ ReplicationTests.class, SmallTests.class }) public class TestDumpReplicationQueues { @@ -52,49 +61,99 @@ public class TestDumpReplicationQueues { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestDumpReplicationQueues.class); - /** - * Makes sure dumpQueues returns wals znodes ordered chronologically. - * @throws Exception if dumpqueues finds any error while handling list of znodes. - */ + private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); + private static Configuration CONF; + private static FileSystem FS = null; + private Path root; + private Path logDir; + @Rule + public final TestName name = new TestName(); + + @Before + public void setup() throws Exception { + UTIL.startMiniCluster(3); + CONF = UTIL.getConfiguration(); + TableName tableName = TableName.valueOf("replication_" + name.getMethodName()); + UTIL.getAdmin() + .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName)); + CONF.set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, tableName.getNameAsString()); + FS = FileSystem.get(CONF); + root = UTIL.getDataTestDirOnTestFS("hbase"); + logDir = new Path(root, HConstants.HREGION_LOGDIR_NAME); + FS.mkdirs(logDir); + CommonFSUtils.setRootDir(CONF, root); + CommonFSUtils.setWALRootDir(CONF, root); + } + @Test - public void testDumpReplicationReturnsWalSorted() throws Exception { - Configuration config = HBaseConfiguration.create(); - ZKWatcher zkWatcherMock = mock(ZKWatcher.class); - ZNodePaths zNodePath = new ZNodePaths(config); - RecoverableZooKeeper recoverableZooKeeperMock = mock(RecoverableZooKeeper.class); - when(zkWatcherMock.getRecoverableZooKeeper()).thenReturn(recoverableZooKeeperMock); - when(zkWatcherMock.getZNodePaths()).thenReturn(zNodePath); - List nodes = new ArrayList<>(); - String server = "rs1,60030," + EnvironmentEdgeManager.currentTime(); - nodes.add(server); - when(recoverableZooKeeperMock.getChildren("/hbase/rs", null)).thenReturn(nodes); - when(recoverableZooKeeperMock.getChildren("/hbase/replication/rs", null)).thenReturn(nodes); - List queuesIds = new ArrayList<>(); - queuesIds.add("1"); - when(recoverableZooKeeperMock.getChildren("/hbase/replication/rs/" + server, null)) - .thenReturn(queuesIds); - List wals = new ArrayList<>(); - wals.add("rs1%2C60964%2C1549394085556.1549394101427"); - wals.add("rs1%2C60964%2C1549394085556.1549394101426"); - wals.add("rs1%2C60964%2C1549394085556.1549394101428"); - when(recoverableZooKeeperMock.getChildren("/hbase/replication/rs/" + server + "/1", null)) - .thenReturn(wals); + public void testDumpReplication() throws Exception { + String peerId = "1"; + String serverNameStr = "rs1,12345,123"; + addPeer(peerId, "hbase"); + ServerName serverName = ServerName.valueOf(serverNameStr); + String walName = "rs1%2C12345%2C123.10"; + Path walPath = new Path(logDir, serverNameStr + "/" + walName); + FS.createNewFile(walPath); + + ReplicationQueueId queueId = new ReplicationQueueId(serverName, peerId); + ReplicationQueueStorage queueStorage = + ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getConnection(), CONF); + queueStorage.setOffset(queueId, "wal-group", + new ReplicationGroupOffset(FS.listStatus(walPath)[0].getPath().toString(), 123), + Collections.emptyMap()); + DumpReplicationQueues dumpQueues = new DumpReplicationQueues(); Set peerIds = new HashSet<>(); - peerIds.add("1"); - dumpQueues.setConf(config); - String dump = dumpQueues.dumpQueues(zkWatcherMock, peerIds, false); + peerIds.add(peerId); + List wals = new ArrayList<>(); + wals.add("rs1%2C12345%2C123.12"); + wals.add("rs1%2C12345%2C123.15"); + wals.add("rs1%2C12345%2C123.11"); + for (String wal : wals) { + Path wPath = new Path(logDir, serverNameStr + "/" + wal); + FS.createNewFile(wPath); + } + + String dump = dumpQueues.dumpQueues(UTIL.getConnection(), peerIds, false, CONF); + assertTrue(dump.indexOf("Queue id: 1-rs1,12345,123") > 0); + assertTrue(dump.indexOf("Number of WALs in replication queue: 4") > 0); + // test for 'Returns wal sorted' String[] parsedDump = dump.split("Replication position for"); - assertEquals("Parsed dump should have 4 parts.", 4, parsedDump.length); - assertTrue( - "First wal should be rs1%2C60964%2C1549394085556.1549394101426, but got: " + parsedDump[1], - parsedDump[1].indexOf("rs1%2C60964%2C1549394085556.1549394101426") >= 0); - assertTrue( - "Second wal should be rs1%2C60964%2C1549394085556.1549394101427, but got: " + parsedDump[2], - parsedDump[2].indexOf("rs1%2C60964%2C1549394085556.1549394101427") >= 0); - assertTrue( - "Third wal should be rs1%2C60964%2C1549394085556.1549394101428, but got: " + parsedDump[3], - parsedDump[3].indexOf("rs1%2C60964%2C1549394085556.1549394101428") >= 0); + assertTrue("First wal should be rs1%2C12345%2C123.10: 123, but got: " + parsedDump[1], + parsedDump[1].indexOf("rs1%2C12345%2C123.10: 123") >= 0); + assertTrue("Second wal should be rs1%2C12345%2C123.11: 0, but got: " + parsedDump[2], + parsedDump[2].indexOf("rs1%2C12345%2C123.11: 0 (not started or nothing to replicate)") >= 0); + assertTrue("Third wal should be rs1%2C12345%2C123.12: 0, but got: " + parsedDump[3], + parsedDump[3].indexOf("rs1%2C12345%2C123.12: 0 (not started or nothing to replicate)") >= 0); + assertTrue("Fourth wal should be rs1%2C12345%2C123.15: 0, but got: " + parsedDump[4], + parsedDump[4].indexOf("rs1%2C12345%2C123.15: 0 (not started or nothing to replicate)") >= 0); + + Path file1 = new Path("testHFile1"); + Path file2 = new Path("testHFile2"); + List> files = new ArrayList<>(1); + files.add(new Pair<>(null, file1)); + files.add(new Pair<>(null, file2)); + queueStorage.addHFileRefs(peerId, files); + // test for 'Dump Replication via replication table' + String dump2 = dumpQueues.dumpReplicationViaTable(UTIL.getConnection(), CONF); + assertTrue(dump2.indexOf("peers/1/peer-state: ENABLED") > 0); + assertTrue(dump2.indexOf("rs1,12345,123/rs1%2C12345%2C123.10: 123") >= 0); + assertTrue(dump2.indexOf("hfile-refs/1/testHFile1,testHFile2") >= 0); + } + + /** + * Add a peer + */ + private void addPeer(String peerId, String clusterKey) throws IOException { + ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder() + .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" + clusterKey) + .setReplicationEndpointImpl( + TestReplicationSourceManager.ReplicationEndpointForTest.class.getName()); + UTIL.getAdmin().addReplicationPeer(peerId, builder.build(), true); } + @After + public void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } }