diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
index 98d0a55fbc43..b284e3f6837f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
@@ -19,8 +19,12 @@
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -31,7 +35,7 @@
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Admin;
@@ -40,28 +44,33 @@
import org.apache.hadoop.hbase.client.replication.TableCFs;
import org.apache.hadoop.hbase.io.WALLink;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
+import org.apache.hadoop.hbase.replication.ReplicationOffsetUtil;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
-import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.replication.ReplicationQueueData;
+import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.zookeeper.ZKDump;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicLongMap;
/**
- * TODO: reimplement this tool
*
* Provides information about the existing states of replication, replication peers and queues.
* Usage: hbase org.apache.hadoop.hbase.replication.regionserver.DumpReplicationQueues [args]
* Arguments: --distributed Polls each RS to dump information about the queue --hdfs Reports HDFS
- * usage by the replication queues (note: can be overestimated).
+ * usage by the replication queues (note: can be overestimated). In the new version, we
+ * reimplemented the DumpReplicationQueues tool to support obtaining information from replication
+ * table.
*/
@InterfaceAudience.Private
public class DumpReplicationQueues extends Configured implements Tool {
@@ -185,7 +194,7 @@ protected static void printUsage(final String className, final String message) {
System.err.println("General Options:");
System.err.println(" -h|--h|--help Show this help and exit.");
System.err.println(" --distributed Poll each RS and print its own replication queue. "
- + "Default only polls ZooKeeper");
+ + "Default only polls replication table.");
System.err.println(" --hdfs Use HDFS to calculate usage of WALs by replication."
+ " It could be overestimated if replicating to multiple peers."
+ " --distributed flag is also needed.");
@@ -201,13 +210,7 @@ private int dumpReplicationQueues(DumpOptions opts) throws Exception {
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
- ZKWatcher zkw =
- new ZKWatcher(conf, "DumpReplicationQueues" + EnvironmentEdgeManager.currentTime(),
- new WarnOnlyAbortable(), true);
-
try {
- // Our zk watcher
- LOG.info("Our Quorum: " + zkw.getQuorum());
List replicatedTableCFs = admin.listReplicatedTableCFs();
if (replicatedTableCFs.isEmpty()) {
LOG.info("No tables with a configured replication peer were found.");
@@ -229,21 +232,72 @@ private int dumpReplicationQueues(DumpOptions opts) throws Exception {
LOG.info("Found [--distributed], will poll each RegionServer.");
Set peerIds =
peers.stream().map((peer) -> peer.getPeerId()).collect(Collectors.toSet());
- System.out.println(dumpQueues(zkw, peerIds, opts.isHdfs()));
+ System.out.println(dumpQueues(connection, peerIds, opts.isHdfs(), conf));
System.out.println(dumpReplicationSummary());
} else {
- // use ZK instead
- System.out.print("Dumping replication znodes via ZooKeeper:");
- System.out.println(ZKDump.getReplicationZnodesDump(zkw));
+ // use replication table instead
+ System.out.println("Dumping replication info via replication table.");
+ System.out.println(dumpReplicationViaTable(connection, conf));
}
return (0);
} catch (IOException e) {
return (-1);
} finally {
- zkw.close();
+ connection.close();
}
}
+ public String dumpReplicationViaTable(Connection connection, Configuration conf)
+ throws ReplicationException, IOException {
+ StringBuilder sb = new StringBuilder();
+ ReplicationQueueStorage queueStorage =
+ ReplicationStorageFactory.getReplicationQueueStorage(connection, conf);
+
+ // The dump info format is as follows:
+ // peers:
+ // peers/1: zk1:2181:/hbase
+ // peers/1/peer-state: ENABLED
+ // rs:
+ // rs/rs1,16020,1664092120094/1/rs1%2C16020%2C1664092120094.1664096778778: 123
+ // rs/rs2,16020,1664092120094/2/rs1%2C16020%2C1664092120094.1664096778778: 321
+ // hfile-refs:
+ // hfile-refs/1/hfile1,hfile2
+ // hfile-refs/2/hfile3,hfile4
+ String peersKey = "peers";
+ sb.append(peersKey).append(": ").append("\n");
+ List repPeerDescs = connection.getAdmin().listReplicationPeers();
+ for (ReplicationPeerDescription repPeerDesc : repPeerDescs) {
+ sb.append(peersKey).append("/").append(repPeerDesc.getPeerId()).append(": ")
+ .append(repPeerDesc.getPeerConfig().getClusterKey()).append("\n");
+ sb.append(peersKey).append("/").append(repPeerDesc.getPeerId()).append("/peer-state: ")
+ .append(repPeerDesc.isEnabled() ? "ENABLED" : "DISABLED").append("\n");
+ }
+
+ List repQueueDataList = queueStorage.listAllQueues();
+ String rsKey = "rs";
+ sb.append(rsKey).append(": ").append("\n");
+ for (ReplicationQueueData repQueueData : repQueueDataList) {
+ String peerId = repQueueData.getId().getPeerId();
+ for (ImmutableMap.Entry entry : repQueueData.getOffsets()
+ .entrySet()) {
+ sb.append(rsKey).append("/").append(entry.getKey()).append("/").append(peerId).append("/")
+ .append(entry.getValue().getWal()).append(": ").append(entry.getValue().getOffset())
+ .append("\n");
+ }
+ }
+
+ List peerIds = queueStorage.getAllPeersFromHFileRefsQueue();
+ String hfileKey = "hfile-refs";
+ sb.append(hfileKey).append(": ").append("\n");
+ for (String peerId : peerIds) {
+ List hfiles = queueStorage.getReplicableHFiles(peerId);
+ sb.append(hfileKey).append("/").append(peerId).append("/").append(String.join(",", hfiles))
+ .append("\n");
+ }
+
+ return sb.toString();
+ }
+
public String dumpReplicationSummary() {
StringBuilder sb = new StringBuilder();
if (!deletedQueues.isEmpty()) {
@@ -294,71 +348,103 @@ public String dumpPeersState(List peers) throws Exce
return sb.toString();
}
- public String dumpQueues(ZKWatcher zkw, Set peerIds, boolean hdfs) throws Exception {
- ReplicationQueueStorage queueStorage;
+ public String dumpQueues(Connection connection, Set peerIds, boolean hdfs,
+ Configuration conf) throws Exception {
StringBuilder sb = new StringBuilder();
+ ReplicationQueueStorage queueStorage =
+ ReplicationStorageFactory.getReplicationQueueStorage(connection, conf);
+
+ Set liveRegionServers =
+ connection.getAdmin().getClusterMetrics().getLiveServerMetrics().keySet();
- // queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
- // Set liveRegionServers = ZKUtil.listChildrenNoWatch(zkw,
- // zkw.getZNodePaths().rsZNode)
- // .stream().map(ServerName::parseServerName).collect(Collectors.toSet());
- //
- // Loops each peer on each RS and dumps the queues
- // List regionservers = queueStorage.getListOfReplicators();
- // if (regionservers == null || regionservers.isEmpty()) {
- // return sb.toString();
- // }
- // for (ServerName regionserver : regionservers) {
- // List queueIds = queueStorage.getAllQueues(regionserver);
- // if (!liveRegionServers.contains(regionserver)) {
- // deadRegionServers.add(regionserver.getServerName());
- // }
- // for (String queueId : queueIds) {
- // ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
- // List wals = queueStorage.getWALsInQueue(regionserver, queueId);
- // Collections.sort(wals);
- // if (!peerIds.contains(queueInfo.getPeerId())) {
- // deletedQueues.add(regionserver + "/" + queueId);
- // sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs));
- // } else {
- // sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs));
- // }
- // }
- // }
+ List regionServers = queueStorage.listAllReplicators();
+ if (regionServers == null || regionServers.isEmpty()) {
+ return sb.toString();
+ }
+ for (ServerName regionServer : regionServers) {
+ List queueIds = queueStorage.listAllQueueIds(regionServer);
+
+ if (!liveRegionServers.contains(regionServer)) {
+ deadRegionServers.add(regionServer.getServerName());
+ }
+ for (ReplicationQueueId queueId : queueIds) {
+ List tmpWals = new ArrayList<>();
+ // wals
+ AbstractFSWALProvider
+ .getWALFiles(connection.getConfiguration(), queueId.getServerWALsBelongTo()).stream()
+ .map(Path::toString).forEach(tmpWals::add);
+
+ // old wals
+ AbstractFSWALProvider.getArchivedWALFiles(connection.getConfiguration(),
+ queueId.getServerWALsBelongTo(), URLEncoder
+ .encode(queueId.getServerWALsBelongTo().toString(), StandardCharsets.UTF_8.name()))
+ .stream().map(Path::toString).forEach(tmpWals::add);
+
+ Map offsets = queueStorage.getOffsets(queueId);
+ // filter out the wal files that should replicate
+ List wals = new ArrayList<>();
+ for (Map.Entry entry : offsets.entrySet()) {
+ ReplicationGroupOffset offset = entry.getValue();
+ for (String wal : tmpWals) {
+ if (ReplicationOffsetUtil.shouldReplicate(offset, wal)) {
+ wals.add(wal);
+ }
+ }
+ }
+ Collections.sort(wals, Comparator.comparingLong(AbstractFSWALProvider::getTimestamp));
+ if (!peerIds.contains(queueId.getPeerId())) {
+ deletedQueues.add(regionServer + "/" + queueId);
+ sb.append(formatQueue(regionServer, offsets, wals, queueId, true, hdfs));
+ } else {
+ sb.append(formatQueue(regionServer, offsets, wals, queueId, false, hdfs));
+ }
+ }
+ }
return sb.toString();
}
- private String formatQueue(ServerName regionserver, ReplicationQueueStorage queueStorage,
- ReplicationQueueInfo queueInfo, String queueId, List wals, boolean isDeleted,
- boolean hdfs) throws Exception {
+ private String formatQueue(ServerName regionServer, Map offsets,
+ List wals, ReplicationQueueId queueId, boolean isDeleted, boolean hdfs)
+ throws Exception {
StringBuilder sb = new StringBuilder();
- List deadServers;
-
- sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n");
- sb.append(" Queue znode: " + queueId + "\n");
- sb.append(" PeerID: " + queueInfo.getPeerId() + "\n");
- sb.append(" Recovered: " + queueInfo.isQueueRecovered() + "\n");
- deadServers = queueInfo.getDeadRegionServers();
- if (deadServers.isEmpty()) {
- sb.append(" No dead RegionServers found in this queue." + "\n");
+ sb.append("Dumping replication queue info for RegionServer: [" + regionServer + "]" + "\n");
+ sb.append(" Queue id: " + queueId + "\n");
+ sb.append(" PeerID: " + queueId.getPeerId() + "\n");
+ sb.append(" Recovered: " + queueId.isRecovered() + "\n");
+ // In new version, we only record the first dead RegionServer in queueId.
+ if (queueId.getSourceServerName().isPresent()) {
+ sb.append(" Dead RegionServer: " + queueId.getSourceServerName().get() + "\n");
} else {
- sb.append(" Dead RegionServers: " + deadServers + "\n");
+ sb.append(" No dead RegionServer found in this queue." + "\n");
}
sb.append(" Was deleted: " + isDeleted + "\n");
sb.append(" Number of WALs in replication queue: " + wals.size() + "\n");
- peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size());
-
- for (String wal : wals) {
- // long position = queueStorage.getWALPosition(regionserver, queueInfo.getPeerId(), wal);
- // sb.append(" Replication position for " + wal + ": "
- // + (position > 0 ? position : "0" + " (not started or nothing to replicate)") + "\n");
+ peersQueueSize.addAndGet(queueId.getPeerId(), wals.size());
+
+ for (Map.Entry entry : offsets.entrySet()) {
+ String walGroup = entry.getKey();
+ ReplicationGroupOffset offset = entry.getValue();
+ for (String wal : wals) {
+ long position = 0;
+ if (offset.getWal().equals(wal)) {
+ position = offset.getOffset();
+ }
+ sb.append(
+ " Replication position for " + (walGroup != null ? walGroup + "/" + wal : wal) + ": ");
+ if (position == 0) {
+ sb.append("0 (not started or nothing to replicate)");
+ } else if (position > 0) {
+ sb.append(position);
+ }
+ sb.append("\n");
+ }
}
if (hdfs) {
FileSystem fs = FileSystem.get(getConf());
sb.append(" Total size of WALs on HDFS for this queue: "
- + StringUtils.humanSize(getTotalWALSize(fs, wals, regionserver)) + "\n");
+ + StringUtils.humanSize(getTotalWALSize(fs, wals, regionServer)) + "\n");
}
return sb.toString();
}
@@ -366,8 +452,7 @@ private String formatQueue(ServerName regionserver, ReplicationQueueStorage queu
/**
* return total size in bytes from a list of WALs
*/
- private long getTotalWALSize(FileSystem fs, List wals, ServerName server)
- throws IOException {
+ private long getTotalWALSize(FileSystem fs, List wals, ServerName server) {
long size = 0;
FileStatus fileStatus;
@@ -389,19 +474,4 @@ private long getTotalWALSize(FileSystem fs, List wals, ServerName server
totalSizeOfWALs += size;
return size;
}
-
- private static class WarnOnlyAbortable implements Abortable {
- @Override
- public void abort(String why, Throwable e) {
- LOG.warn("DumpReplicationQueue received abort, ignoring. Reason: " + why);
- if (LOG.isDebugEnabled()) {
- LOG.debug(e.toString(), e);
- }
- }
-
- @Override
- public boolean isAborted() {
- return false;
- }
- }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index 480866949993..5bbc66791967 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -379,6 +379,26 @@ public static List getArchivedWALFiles(Configuration conf, ServerName serv
return archivedWalFiles;
}
+ /**
+ * List all the wal files for a logPrefix.
+ */
+ public static List getWALFiles(Configuration c, ServerName serverName) throws IOException {
+ Path walRoot = new Path(CommonFSUtils.getWALRootDir(c), HConstants.HREGION_LOGDIR_NAME);
+ FileSystem fs = walRoot.getFileSystem(c);
+ List walFiles = new ArrayList<>();
+ Path walDir = new Path(walRoot, serverName.toString());
+ try {
+ for (FileStatus status : fs.listStatus(walDir)) {
+ if (status.isFile()) {
+ walFiles.add(status.getPath());
+ }
+ }
+ } catch (FileNotFoundException e) {
+ LOG.info("WAL dir {} not exists", walDir);
+ }
+ return walFiles;
+ }
+
/**
* Pulls a ServerName out of a Path generated according to our layout rules. In the below layouts,
* this method ignores the format of the logfile component. Current format: [base directory for
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDumpReplicationQueues.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDumpReplicationQueues.java
index 3475ae5c1925..3e1dc624fe7d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDumpReplicationQueues.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestDumpReplicationQueues.java
@@ -17,34 +17,43 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
+import org.apache.hadoop.hbase.replication.ReplicationQueueId;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.After;
+import org.junit.Before;
import org.junit.ClassRule;
-import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
/**
* Tests for DumpReplicationQueues tool
*/
-// TODO: reimplement
-@Ignore
@Category({ ReplicationTests.class, SmallTests.class })
public class TestDumpReplicationQueues {
@@ -52,49 +61,99 @@ public class TestDumpReplicationQueues {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestDumpReplicationQueues.class);
- /**
- * Makes sure dumpQueues returns wals znodes ordered chronologically.
- * @throws Exception if dumpqueues finds any error while handling list of znodes.
- */
+ private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
+ private static Configuration CONF;
+ private static FileSystem FS = null;
+ private Path root;
+ private Path logDir;
+ @Rule
+ public final TestName name = new TestName();
+
+ @Before
+ public void setup() throws Exception {
+ UTIL.startMiniCluster(3);
+ CONF = UTIL.getConfiguration();
+ TableName tableName = TableName.valueOf("replication_" + name.getMethodName());
+ UTIL.getAdmin()
+ .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName));
+ CONF.set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, tableName.getNameAsString());
+ FS = FileSystem.get(CONF);
+ root = UTIL.getDataTestDirOnTestFS("hbase");
+ logDir = new Path(root, HConstants.HREGION_LOGDIR_NAME);
+ FS.mkdirs(logDir);
+ CommonFSUtils.setRootDir(CONF, root);
+ CommonFSUtils.setWALRootDir(CONF, root);
+ }
+
@Test
- public void testDumpReplicationReturnsWalSorted() throws Exception {
- Configuration config = HBaseConfiguration.create();
- ZKWatcher zkWatcherMock = mock(ZKWatcher.class);
- ZNodePaths zNodePath = new ZNodePaths(config);
- RecoverableZooKeeper recoverableZooKeeperMock = mock(RecoverableZooKeeper.class);
- when(zkWatcherMock.getRecoverableZooKeeper()).thenReturn(recoverableZooKeeperMock);
- when(zkWatcherMock.getZNodePaths()).thenReturn(zNodePath);
- List nodes = new ArrayList<>();
- String server = "rs1,60030," + EnvironmentEdgeManager.currentTime();
- nodes.add(server);
- when(recoverableZooKeeperMock.getChildren("/hbase/rs", null)).thenReturn(nodes);
- when(recoverableZooKeeperMock.getChildren("/hbase/replication/rs", null)).thenReturn(nodes);
- List queuesIds = new ArrayList<>();
- queuesIds.add("1");
- when(recoverableZooKeeperMock.getChildren("/hbase/replication/rs/" + server, null))
- .thenReturn(queuesIds);
- List wals = new ArrayList<>();
- wals.add("rs1%2C60964%2C1549394085556.1549394101427");
- wals.add("rs1%2C60964%2C1549394085556.1549394101426");
- wals.add("rs1%2C60964%2C1549394085556.1549394101428");
- when(recoverableZooKeeperMock.getChildren("/hbase/replication/rs/" + server + "/1", null))
- .thenReturn(wals);
+ public void testDumpReplication() throws Exception {
+ String peerId = "1";
+ String serverNameStr = "rs1,12345,123";
+ addPeer(peerId, "hbase");
+ ServerName serverName = ServerName.valueOf(serverNameStr);
+ String walName = "rs1%2C12345%2C123.10";
+ Path walPath = new Path(logDir, serverNameStr + "/" + walName);
+ FS.createNewFile(walPath);
+
+ ReplicationQueueId queueId = new ReplicationQueueId(serverName, peerId);
+ ReplicationQueueStorage queueStorage =
+ ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getConnection(), CONF);
+ queueStorage.setOffset(queueId, "wal-group",
+ new ReplicationGroupOffset(FS.listStatus(walPath)[0].getPath().toString(), 123),
+ Collections.emptyMap());
+
DumpReplicationQueues dumpQueues = new DumpReplicationQueues();
Set peerIds = new HashSet<>();
- peerIds.add("1");
- dumpQueues.setConf(config);
- String dump = dumpQueues.dumpQueues(zkWatcherMock, peerIds, false);
+ peerIds.add(peerId);
+ List wals = new ArrayList<>();
+ wals.add("rs1%2C12345%2C123.12");
+ wals.add("rs1%2C12345%2C123.15");
+ wals.add("rs1%2C12345%2C123.11");
+ for (String wal : wals) {
+ Path wPath = new Path(logDir, serverNameStr + "/" + wal);
+ FS.createNewFile(wPath);
+ }
+
+ String dump = dumpQueues.dumpQueues(UTIL.getConnection(), peerIds, false, CONF);
+ assertTrue(dump.indexOf("Queue id: 1-rs1,12345,123") > 0);
+ assertTrue(dump.indexOf("Number of WALs in replication queue: 4") > 0);
+ // test for 'Returns wal sorted'
String[] parsedDump = dump.split("Replication position for");
- assertEquals("Parsed dump should have 4 parts.", 4, parsedDump.length);
- assertTrue(
- "First wal should be rs1%2C60964%2C1549394085556.1549394101426, but got: " + parsedDump[1],
- parsedDump[1].indexOf("rs1%2C60964%2C1549394085556.1549394101426") >= 0);
- assertTrue(
- "Second wal should be rs1%2C60964%2C1549394085556.1549394101427, but got: " + parsedDump[2],
- parsedDump[2].indexOf("rs1%2C60964%2C1549394085556.1549394101427") >= 0);
- assertTrue(
- "Third wal should be rs1%2C60964%2C1549394085556.1549394101428, but got: " + parsedDump[3],
- parsedDump[3].indexOf("rs1%2C60964%2C1549394085556.1549394101428") >= 0);
+ assertTrue("First wal should be rs1%2C12345%2C123.10: 123, but got: " + parsedDump[1],
+ parsedDump[1].indexOf("rs1%2C12345%2C123.10: 123") >= 0);
+ assertTrue("Second wal should be rs1%2C12345%2C123.11: 0, but got: " + parsedDump[2],
+ parsedDump[2].indexOf("rs1%2C12345%2C123.11: 0 (not started or nothing to replicate)") >= 0);
+ assertTrue("Third wal should be rs1%2C12345%2C123.12: 0, but got: " + parsedDump[3],
+ parsedDump[3].indexOf("rs1%2C12345%2C123.12: 0 (not started or nothing to replicate)") >= 0);
+ assertTrue("Fourth wal should be rs1%2C12345%2C123.15: 0, but got: " + parsedDump[4],
+ parsedDump[4].indexOf("rs1%2C12345%2C123.15: 0 (not started or nothing to replicate)") >= 0);
+
+ Path file1 = new Path("testHFile1");
+ Path file2 = new Path("testHFile2");
+ List> files = new ArrayList<>(1);
+ files.add(new Pair<>(null, file1));
+ files.add(new Pair<>(null, file2));
+ queueStorage.addHFileRefs(peerId, files);
+ // test for 'Dump Replication via replication table'
+ String dump2 = dumpQueues.dumpReplicationViaTable(UTIL.getConnection(), CONF);
+ assertTrue(dump2.indexOf("peers/1/peer-state: ENABLED") > 0);
+ assertTrue(dump2.indexOf("rs1,12345,123/rs1%2C12345%2C123.10: 123") >= 0);
+ assertTrue(dump2.indexOf("hfile-refs/1/testHFile1,testHFile2") >= 0);
+ }
+
+ /**
+ * Add a peer
+ */
+ private void addPeer(String peerId, String clusterKey) throws IOException {
+ ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
+ .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" + clusterKey)
+ .setReplicationEndpointImpl(
+ TestReplicationSourceManager.ReplicationEndpointForTest.class.getName());
+ UTIL.getAdmin().addReplicationPeer(peerId, builder.build(), true);
}
+ @After
+ public void tearDown() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
}