diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java index d19d21004667..3f66c7cdc0c2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java @@ -220,7 +220,11 @@ public String getRsPath(ServerName sn) { * @param suffix ending of znode name * @return result of properly joining prefix with suffix */ - public static String joinZNode(String prefix, String suffix) { - return prefix + ZNodePaths.ZNODE_PATH_SEPARATOR + suffix; + public static String joinZNode(String prefix, String... suffix) { + StringBuilder sb = new StringBuilder(prefix); + for (String s : suffix) { + sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(s); + } + return sb.toString(); } } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index 34c74d92c161..43adba2bc21a 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.metrics.Counter; import org.apache.hadoop.hbase.metrics.Histogram; @@ -33,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; /** @@ -1011,6 +1013,19 @@ final void doReleaseLock(TEnvironment env, ProcedureStore store) { releaseLock(env); } + protected final ProcedureSuspendedException suspend(int timeoutMillis, boolean jitter) + throws ProcedureSuspendedException { + if (jitter) { + // 10% possible jitter + double add = (double) timeoutMillis * ThreadLocalRandom.current().nextDouble(0.1); + timeoutMillis += add; + } + setTimeout(timeoutMillis); + setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); + skipPersistence(); + throw new ProcedureSuspendedException(); + } + @Override public int compareTo(final Procedure other) { return Long.compare(getProcId(), other.getProcId()); diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto index 76a1d676487a..b6f5d7e50bb0 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto @@ -722,3 +722,15 @@ enum AssignReplicationQueuesState { message AssignReplicationQueuesStateData { required ServerName crashed_server = 1; } + +enum MigrateReplicationQueueFromZkToTableState { + MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE = 1; + MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER = 2; + MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE = 3; + MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING = 4; + MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER = 5; +} + +message MigrateReplicationQueueFromZkToTableStateData { + repeated string disabled_peer_id = 1; +} diff --git a/hbase-replication/pom.xml b/hbase-replication/pom.xml index 9acab39599fa..b4f1cfa224da 100644 --- a/hbase-replication/pom.xml +++ b/hbase-replication/pom.xml @@ -104,6 +104,16 @@ junit test + + org.hamcrest + hamcrest-core + test + + + org.hamcrest + hamcrest-library + test + org.mockito mockito-core diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java index 6f6aee38cc8f..1e36bbeb78f0 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java @@ -22,6 +22,7 @@ import java.util.Set; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; @@ -184,4 +185,22 @@ void removeLastSequenceIds(String peerId, List encodedRegionNames) * @return Whether the replication queue table exists */ boolean hasData() throws ReplicationException; + + // the below 3 methods are used for migrating + /** + * Update the replication queue datas for a given region server. + */ + void batchUpdateQueues(ServerName serverName, List datas) + throws ReplicationException; + + /** + * Update last pushed sequence id for the given regions and peers. + */ + void batchUpdateLastSequenceIds(List lastPushedSeqIds) + throws ReplicationException; + + /** + * Add the given hfile refs to the given peer. + */ + void batchUpdateHFileRefs(String peerId, List hfileRefs) throws ReplicationException; } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java index 392a3692d66f..f3870f4d09d8 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java @@ -21,12 +21,14 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.Set; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; @@ -46,6 +48,7 @@ import org.apache.hadoop.hbase.client.Scan.ReadType; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.KeyOnlyFilter; +import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.Pair; @@ -74,12 +77,6 @@ public class TableReplicationQueueStorage implements ReplicationQueueStorage { private final TableName tableName; - @FunctionalInterface - private interface TableCreator { - - void create() throws IOException; - } - public TableReplicationQueueStorage(Connection conn, TableName tableName) { this.conn = conn; this.tableName = tableName; @@ -541,4 +538,60 @@ public boolean hasData() throws ReplicationException { throw new ReplicationException("failed to get replication queue table", e); } } + + @Override + public void batchUpdateQueues(ServerName serverName, List datas) + throws ReplicationException { + List puts = new ArrayList<>(); + for (ReplicationQueueData data : datas) { + if (data.getOffsets().isEmpty()) { + continue; + } + Put put = new Put(Bytes.toBytes(data.getId().toString())); + data.getOffsets().forEach((walGroup, offset) -> { + put.addColumn(QUEUE_FAMILY, Bytes.toBytes(walGroup), Bytes.toBytes(offset.toString())); + }); + puts.add(put); + } + try (Table table = conn.getTable(tableName)) { + table.put(puts); + } catch (IOException e) { + throw new ReplicationException("failed to batch update queues", e); + } + } + + @Override + public void batchUpdateLastSequenceIds(List lastPushedSeqIds) + throws ReplicationException { + Map peerId2Put = new HashMap<>(); + for (ZkLastPushedSeqId lastPushedSeqId : lastPushedSeqIds) { + peerId2Put + .computeIfAbsent(lastPushedSeqId.getPeerId(), peerId -> new Put(Bytes.toBytes(peerId))) + .addColumn(LAST_SEQUENCE_ID_FAMILY, Bytes.toBytes(lastPushedSeqId.getEncodedRegionName()), + Bytes.toBytes(lastPushedSeqId.getLastPushedSeqId())); + } + try (Table table = conn.getTable(tableName)) { + table + .put(peerId2Put.values().stream().filter(p -> !p.isEmpty()).collect(Collectors.toList())); + } catch (IOException e) { + throw new ReplicationException("failed to batch update last pushed sequence ids", e); + } + } + + @Override + public void batchUpdateHFileRefs(String peerId, List hfileRefs) + throws ReplicationException { + if (hfileRefs.isEmpty()) { + return; + } + Put put = new Put(Bytes.toBytes(peerId)); + for (String ref : hfileRefs) { + put.addColumn(HFILE_REF_FAMILY, Bytes.toBytes(ref), HConstants.EMPTY_BYTE_ARRAY); + } + try (Table table = conn.getTable(tableName)) { + table.put(put); + } catch (IOException e) { + throw new ReplicationException("failed to batch update hfile references", e); + } + } } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorageForMigration.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorageForMigration.java new file mode 100644 index 000000000000..22cc13145225 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorageForMigration.java @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import com.google.errorprone.annotations.RestrictedApi; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; + +import org.apache.hbase.thirdparty.com.google.common.base.Splitter; + +/** + * Just retain a small set of the methods for the old zookeeper based replication queue storage, for + * migrating. + */ +@InterfaceAudience.Private +public class ZKReplicationQueueStorageForMigration extends ZKReplicationStorageBase { + + public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY = + "zookeeper.znode.replication.hfile.refs"; + public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs"; + + public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY = + "zookeeper.znode.replication.regions"; + public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT = "regions"; + + /** + * The name of the znode that contains all replication queues + */ + private final String queuesZNode; + + /** + * The name of the znode that contains queues of hfile references to be replicated + */ + private final String hfileRefsZNode; + + private final String regionsZNode; + + public ZKReplicationQueueStorageForMigration(ZKWatcher zookeeper, Configuration conf) { + super(zookeeper, conf); + String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs"); + String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY, + ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT); + this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName); + this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName); + this.regionsZNode = ZNodePaths.joinZNode(replicationZNode, conf + .get(ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY, ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT)); + } + + public interface MigrationIterator { + + T next() throws Exception; + } + + @SuppressWarnings("rawtypes") + private static final MigrationIterator EMPTY_ITER = new MigrationIterator() { + + @Override + public Object next() { + return null; + } + }; + + public static final class ZkReplicationQueueData { + + private final ReplicationQueueId queueId; + + private final Map walOffsets; + + public ZkReplicationQueueData(ReplicationQueueId queueId, Map walOffsets) { + this.queueId = queueId; + this.walOffsets = walOffsets; + } + + public ReplicationQueueId getQueueId() { + return queueId; + } + + public Map getWalOffsets() { + return walOffsets; + } + } + + private String getRsNode(ServerName serverName) { + return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName()); + } + + private String getQueueNode(ServerName serverName, String queueId) { + return ZNodePaths.joinZNode(getRsNode(serverName), queueId); + } + + private String getFileNode(String queueNode, String fileName) { + return ZNodePaths.joinZNode(queueNode, fileName); + } + + private String getFileNode(ServerName serverName, String queueId, String fileName) { + return getFileNode(getQueueNode(serverName, queueId), fileName); + } + + @SuppressWarnings("unchecked") + public MigrationIterator>> listAllQueues() + throws KeeperException { + List replicators = ZKUtil.listChildrenNoWatch(zookeeper, queuesZNode); + if (replicators == null || replicators.isEmpty()) { + ZKUtil.deleteNodeRecursively(zookeeper, queuesZNode); + return EMPTY_ITER; + } + Iterator iter = replicators.iterator(); + return new MigrationIterator>>() { + + private ServerName previousServerName; + + @Override + public Pair> next() throws Exception { + if (previousServerName != null) { + ZKUtil.deleteNodeRecursively(zookeeper, getRsNode(previousServerName)); + } + if (!iter.hasNext()) { + ZKUtil.deleteNodeRecursively(zookeeper, queuesZNode); + return null; + } + String replicator = iter.next(); + ServerName serverName = ServerName.parseServerName(replicator); + previousServerName = serverName; + List queueIdList = ZKUtil.listChildrenNoWatch(zookeeper, getRsNode(serverName)); + if (queueIdList == null || queueIdList.isEmpty()) { + return Pair.newPair(serverName, Collections.emptyList()); + } + List queueDataList = new ArrayList<>(queueIdList.size()); + for (String queueIdStr : queueIdList) { + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueIdStr); + ReplicationQueueId queueId; + if (queueInfo.getDeadRegionServers().isEmpty()) { + queueId = new ReplicationQueueId(serverName, queueInfo.getPeerId()); + } else { + queueId = new ReplicationQueueId(serverName, queueInfo.getPeerId(), + queueInfo.getDeadRegionServers().get(0)); + } + List wals = + ZKUtil.listChildrenNoWatch(zookeeper, getQueueNode(serverName, queueIdStr)); + ZkReplicationQueueData queueData; + if (wals == null || wals.isEmpty()) { + queueData = new ZkReplicationQueueData(queueId, Collections.emptyMap()); + } else { + Map walOffsets = new HashMap<>(); + for (String wal : wals) { + byte[] data = ZKUtil.getData(zookeeper, getFileNode(serverName, queueIdStr, wal)); + if (data == null || data.length == 0) { + walOffsets.put(wal, 0L); + } else { + walOffsets.put(wal, ZKUtil.parseWALPositionFrom(data)); + } + } + queueData = new ZkReplicationQueueData(queueId, walOffsets); + } + queueDataList.add(queueData); + } + return Pair.newPair(serverName, queueDataList); + } + }; + } + + public static final class ZkLastPushedSeqId { + + private final String encodedRegionName; + + private final String peerId; + + private final long lastPushedSeqId; + + ZkLastPushedSeqId(String encodedRegionName, String peerId, long lastPushedSeqId) { + this.encodedRegionName = encodedRegionName; + this.peerId = peerId; + this.lastPushedSeqId = lastPushedSeqId; + } + + public String getEncodedRegionName() { + return encodedRegionName; + } + + public String getPeerId() { + return peerId; + } + + public long getLastPushedSeqId() { + return lastPushedSeqId; + } + + } + + @SuppressWarnings("unchecked") + public MigrationIterator> listAllLastPushedSeqIds() + throws KeeperException { + List level1Prefixs = ZKUtil.listChildrenNoWatch(zookeeper, regionsZNode); + if (level1Prefixs == null || level1Prefixs.isEmpty()) { + ZKUtil.deleteNodeRecursively(zookeeper, regionsZNode); + return EMPTY_ITER; + } + Iterator level1Iter = level1Prefixs.iterator(); + return new MigrationIterator>() { + + private String level1Prefix; + + private Iterator level2Iter; + + private String level2Prefix; + + @Override + public List next() throws Exception { + for (;;) { + if (level2Iter == null || !level2Iter.hasNext()) { + if (!level1Iter.hasNext()) { + ZKUtil.deleteNodeRecursively(zookeeper, regionsZNode); + return null; + } + if (level1Prefix != null) { + // this will also delete the previous level2Prefix which is under this level1Prefix + ZKUtil.deleteNodeRecursively(zookeeper, + ZNodePaths.joinZNode(regionsZNode, level1Prefix)); + } + level1Prefix = level1Iter.next(); + List level2Prefixes = ZKUtil.listChildrenNoWatch(zookeeper, + ZNodePaths.joinZNode(regionsZNode, level1Prefix)); + if (level2Prefixes != null) { + level2Iter = level2Prefixes.iterator(); + // reset level2Prefix as we have switched level1Prefix, otherwise the below delete + // level2Prefix section will delete the znode with this level2Prefix under the new + // level1Prefix + level2Prefix = null; + } + } else { + if (level2Prefix != null) { + ZKUtil.deleteNodeRecursively(zookeeper, + ZNodePaths.joinZNode(regionsZNode, level1Prefix, level2Prefix)); + } + level2Prefix = level2Iter.next(); + List encodedRegionNameAndPeerIds = ZKUtil.listChildrenNoWatch(zookeeper, + ZNodePaths.joinZNode(regionsZNode, level1Prefix, level2Prefix)); + if (encodedRegionNameAndPeerIds == null || encodedRegionNameAndPeerIds.isEmpty()) { + return Collections.emptyList(); + } + List lastPushedSeqIds = new ArrayList<>(); + for (String encodedRegionNameAndPeerId : encodedRegionNameAndPeerIds) { + byte[] data = ZKUtil.getData(zookeeper, ZNodePaths.joinZNode(regionsZNode, + level1Prefix, level2Prefix, encodedRegionNameAndPeerId)); + long lastPushedSeqId = ZKUtil.parseWALPositionFrom(data); + Iterator iter = Splitter.on('-').split(encodedRegionNameAndPeerId).iterator(); + String encodedRegionName = level1Prefix + level2Prefix + iter.next(); + String peerId = iter.next(); + lastPushedSeqIds + .add(new ZkLastPushedSeqId(encodedRegionName, peerId, lastPushedSeqId)); + } + return Collections.unmodifiableList(lastPushedSeqIds); + } + } + } + }; + } + + private String getHFileRefsPeerNode(String peerId) { + return ZNodePaths.joinZNode(hfileRefsZNode, peerId); + } + + /** + * Pair<PeerId, List<HFileRefs>> + */ + @SuppressWarnings("unchecked") + public MigrationIterator>> listAllHFileRefs() throws KeeperException { + List peerIds = ZKUtil.listChildrenNoWatch(zookeeper, hfileRefsZNode); + if (peerIds == null || peerIds.isEmpty()) { + ZKUtil.deleteNodeRecursively(zookeeper, hfileRefsZNode); + return EMPTY_ITER; + } + Iterator iter = peerIds.iterator(); + return new MigrationIterator>>() { + + private String previousPeerId; + + @Override + public Pair> next() throws KeeperException { + if (previousPeerId != null) { + ZKUtil.deleteNodeRecursively(zookeeper, getHFileRefsPeerNode(previousPeerId)); + } + if (!iter.hasNext()) { + ZKUtil.deleteNodeRecursively(zookeeper, hfileRefsZNode); + return null; + } + String peerId = iter.next(); + List refs = ZKUtil.listChildrenNoWatch(zookeeper, getHFileRefsPeerNode(peerId)); + previousPeerId = peerId; + return Pair.newPair(peerId, refs != null ? refs : Collections.emptyList()); + } + }; + } + + public boolean hasData() throws KeeperException { + return ZKUtil.checkExists(zookeeper, queuesZNode) != -1 + || ZKUtil.checkExists(zookeeper, regionsZNode) != -1 + || ZKUtil.checkExists(zookeeper, hfileRefsZNode) != -1; + } + + public void deleteAllData() throws KeeperException { + ZKUtil.deleteNodeRecursively(zookeeper, queuesZNode); + ZKUtil.deleteNodeRecursively(zookeeper, regionsZNode); + ZKUtil.deleteNodeRecursively(zookeeper, hfileRefsZNode); + } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + String getQueuesZNode() { + return queuesZNode; + } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + String getHfileRefsZNode() { + return hfileRefsZNode; + } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + String getRegionsZNode() { + return regionsZNode; + } +} diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java new file mode 100644 index 000000000000..e38b7b134e99 --- /dev/null +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseZKTestingUtil; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.MigrationIterator; +import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId; +import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.MD5Hash; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.zookeeper.KeeperException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import org.apache.hbase.thirdparty.com.google.common.base.Splitter; +import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; +import org.apache.hbase.thirdparty.com.google.common.collect.Sets; +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestZKReplicationQueueStorage { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestZKReplicationQueueStorage.class); + + private static final HBaseZKTestingUtil UTIL = new HBaseZKTestingUtil(); + + private ZKWatcher zk; + + private ZKReplicationQueueStorageForMigration storage; + + @Rule + public final TestName name = new TestName(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + UTIL.startMiniZKCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws IOException { + UTIL.shutdownMiniZKCluster(); + } + + @Before + public void setUp() throws IOException { + Configuration conf = UTIL.getConfiguration(); + conf.set(ZKReplicationStorageBase.REPLICATION_ZNODE, name.getMethodName()); + zk = new ZKWatcher(conf, name.getMethodName(), null); + storage = new ZKReplicationQueueStorageForMigration(zk, conf); + } + + @After + public void tearDown() throws Exception { + ZKUtil.deleteNodeRecursively(zk, storage.replicationZNode); + Closeables.close(zk, true); + } + + public static void mockQueuesData(ZKReplicationQueueStorageForMigration storage, int nServers, + String peerId, ServerName deadServer) throws KeeperException { + ZKWatcher zk = storage.zookeeper; + for (int i = 0; i < nServers; i++) { + ServerName sn = + ServerName.valueOf("test-hbase-" + i, 12345, EnvironmentEdgeManager.currentTime()); + String rsZNode = ZNodePaths.joinZNode(storage.getQueuesZNode(), sn.toString()); + String peerZNode = ZNodePaths.joinZNode(rsZNode, peerId); + ZKUtil.createWithParents(zk, peerZNode); + for (int j = 0; j < i; j++) { + String wal = ZNodePaths.joinZNode(peerZNode, sn.toString() + "." + j); + ZKUtil.createSetData(zk, wal, ZKUtil.positionToByteArray(j)); + } + String deadServerPeerZNode = ZNodePaths.joinZNode(rsZNode, peerId + "-" + deadServer); + ZKUtil.createWithParents(zk, deadServerPeerZNode); + for (int j = 0; j < i; j++) { + String wal = ZNodePaths.joinZNode(deadServerPeerZNode, deadServer.toString() + "." + j); + if (j > 0) { + ZKUtil.createSetData(zk, wal, ZKUtil.positionToByteArray(j)); + } else { + ZKUtil.createWithParents(zk, wal); + } + } + } + ZKUtil.createWithParents(zk, + ZNodePaths.joinZNode(storage.getQueuesZNode(), deadServer.toString())); + } + + private static String getLastPushedSeqIdZNode(String regionsZNode, String encodedName, + String peerId) { + return ZNodePaths.joinZNode(regionsZNode, encodedName.substring(0, 2), + encodedName.substring(2, 4), encodedName.substring(4) + "-" + peerId); + } + + public static Map> mockLastPushedSeqIds( + ZKReplicationQueueStorageForMigration storage, String peerId1, String peerId2, int nRegions, + int emptyLevel1Count, int emptyLevel2Count) throws KeeperException { + ZKWatcher zk = storage.zookeeper; + Map> name2PeerIds = new HashMap<>(); + byte[] bytes = new byte[32]; + for (int i = 0; i < nRegions; i++) { + ThreadLocalRandom.current().nextBytes(bytes); + String encodeName = MD5Hash.getMD5AsHex(bytes); + String znode1 = getLastPushedSeqIdZNode(storage.getRegionsZNode(), encodeName, peerId1); + ZKUtil.createSetData(zk, znode1, ZKUtil.positionToByteArray(1)); + String znode2 = getLastPushedSeqIdZNode(storage.getRegionsZNode(), encodeName, peerId2); + ZKUtil.createSetData(zk, znode2, ZKUtil.positionToByteArray(2)); + name2PeerIds.put(encodeName, Sets.newHashSet(peerId1, peerId2)); + } + int addedEmptyZNodes = 0; + for (int i = 0; i < 256; i++) { + String level1ZNode = + ZNodePaths.joinZNode(storage.getRegionsZNode(), String.format("%02x", i)); + if (ZKUtil.checkExists(zk, level1ZNode) == -1) { + ZKUtil.createWithParents(zk, level1ZNode); + addedEmptyZNodes++; + if (addedEmptyZNodes <= emptyLevel2Count) { + ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(level1ZNode, "ab")); + } + if (addedEmptyZNodes >= emptyLevel1Count + emptyLevel2Count) { + break; + } + } + } + return name2PeerIds; + } + + public static void mockHFileRefs(ZKReplicationQueueStorageForMigration storage, int nPeers) + throws KeeperException { + ZKWatcher zk = storage.zookeeper; + for (int i = 0; i < nPeers; i++) { + String peerId = "peer_" + i; + ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(storage.getHfileRefsZNode(), peerId)); + for (int j = 0; j < i; j++) { + ZKUtil.createWithParents(zk, + ZNodePaths.joinZNode(storage.getHfileRefsZNode(), peerId, "hfile-" + j)); + } + } + } + + @Test + public void testDeleteAllData() throws Exception { + assertFalse(storage.hasData()); + ZKUtil.createWithParents(zk, storage.getQueuesZNode()); + assertTrue(storage.hasData()); + storage.deleteAllData(); + assertFalse(storage.hasData()); + } + + @Test + public void testEmptyIter() throws Exception { + ZKUtil.createWithParents(zk, storage.getQueuesZNode()); + ZKUtil.createWithParents(zk, storage.getRegionsZNode()); + ZKUtil.createWithParents(zk, storage.getHfileRefsZNode()); + assertNull(storage.listAllQueues().next()); + assertEquals(-1, ZKUtil.checkExists(zk, storage.getQueuesZNode())); + assertNull(storage.listAllLastPushedSeqIds().next()); + assertEquals(-1, ZKUtil.checkExists(zk, storage.getRegionsZNode())); + assertNull(storage.listAllHFileRefs().next()); + assertEquals(-1, ZKUtil.checkExists(zk, storage.getHfileRefsZNode())); + } + + @Test + public void testListAllQueues() throws Exception { + String peerId = "1"; + ServerName deadServer = + ServerName.valueOf("test-hbase-dead", 12345, EnvironmentEdgeManager.currentTime()); + int nServers = 10; + mockQueuesData(storage, nServers, peerId, deadServer); + MigrationIterator>> iter = + storage.listAllQueues(); + ServerName previousServerName = null; + for (int i = 0; i < nServers + 1; i++) { + Pair> pair = iter.next(); + assertNotNull(pair); + if (previousServerName != null) { + assertEquals(-1, ZKUtil.checkExists(zk, + ZNodePaths.joinZNode(storage.getQueuesZNode(), previousServerName.toString()))); + } + ServerName sn = pair.getFirst(); + previousServerName = sn; + if (sn.equals(deadServer)) { + assertThat(pair.getSecond(), empty()); + } else { + assertEquals(2, pair.getSecond().size()); + int n = Integer.parseInt(Iterables.getLast(Splitter.on('-').split(sn.getHostname()))); + ZkReplicationQueueData data0 = pair.getSecond().get(0); + assertEquals(peerId, data0.getQueueId().getPeerId()); + assertEquals(sn, data0.getQueueId().getServerName()); + assertEquals(n, data0.getWalOffsets().size()); + for (int j = 0; j < n; j++) { + assertEquals(j, + data0.getWalOffsets().get( + (data0.getQueueId().isRecovered() ? deadServer.toString() : sn.toString()) + "." + j) + .intValue()); + } + ZkReplicationQueueData data1 = pair.getSecond().get(1); + assertEquals(peerId, data1.getQueueId().getPeerId()); + assertEquals(sn, data1.getQueueId().getServerName()); + assertEquals(n, data1.getWalOffsets().size()); + for (int j = 0; j < n; j++) { + assertEquals(j, + data1.getWalOffsets().get( + (data1.getQueueId().isRecovered() ? deadServer.toString() : sn.toString()) + "." + j) + .intValue()); + } + // the order of the returned result is undetermined + if (data0.getQueueId().getSourceServerName().isPresent()) { + assertEquals(deadServer, data0.getQueueId().getSourceServerName().get()); + assertFalse(data1.getQueueId().getSourceServerName().isPresent()); + } else { + assertEquals(deadServer, data1.getQueueId().getSourceServerName().get()); + } + } + } + assertNull(iter.next()); + assertEquals(-1, ZKUtil.checkExists(zk, storage.getQueuesZNode())); + } + + @Test + public void testListAllLastPushedSeqIds() throws Exception { + String peerId1 = "1"; + String peerId2 = "2"; + Map> name2PeerIds = + mockLastPushedSeqIds(storage, peerId1, peerId2, 100, 10, 10); + MigrationIterator> iter = storage.listAllLastPushedSeqIds(); + int emptyListCount = 0; + for (;;) { + List list = iter.next(); + if (list == null) { + break; + } + if (list.isEmpty()) { + emptyListCount++; + continue; + } + for (ZkLastPushedSeqId seqId : list) { + name2PeerIds.get(seqId.getEncodedRegionName()).remove(seqId.getPeerId()); + if (seqId.getPeerId().equals(peerId1)) { + assertEquals(1, seqId.getLastPushedSeqId()); + } else { + assertEquals(2, seqId.getLastPushedSeqId()); + } + } + } + assertEquals(10, emptyListCount); + name2PeerIds.forEach((encodedRegionName, peerIds) -> { + assertThat(encodedRegionName + " still has unmigrated peers", peerIds, empty()); + }); + assertEquals(-1, ZKUtil.checkExists(zk, storage.getRegionsZNode())); + } + + @Test + public void testListAllHFileRefs() throws Exception { + int nPeers = 10; + mockHFileRefs(storage, nPeers); + MigrationIterator>> iter = storage.listAllHFileRefs(); + String previousPeerId = null; + for (int i = 0; i < nPeers; i++) { + Pair> pair = iter.next(); + if (previousPeerId != null) { + assertEquals(-1, ZKUtil.checkExists(zk, + ZNodePaths.joinZNode(storage.getHfileRefsZNode(), previousPeerId))); + } + String peerId = pair.getFirst(); + previousPeerId = peerId; + int index = Integer.parseInt(Iterables.getLast(Splitter.on('_').split(peerId))); + assertEquals(index, pair.getSecond().size()); + } + assertNull(iter.next()); + assertEquals(-1, ZKUtil.checkExists(zk, storage.getHfileRefsZNode())); + } +} diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml index 0dba4aa98339..b61b0252a052 100644 --- a/hbase-server/pom.xml +++ b/hbase-server/pom.xml @@ -102,6 +102,12 @@ org.apache.hbase hbase-replication + + org.apache.hbase + hbase-replication + test-jar + test + org.apache.hbase hbase-balancer diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 896f9a5d0860..adb53468ce72 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -170,6 +170,7 @@ import org.apache.hadoop.hbase.master.replication.AddPeerProcedure; import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure; import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure; +import org.apache.hadoop.hbase.master.replication.MigrateReplicationQueueFromZkToTableProcedure; import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure; import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.master.replication.ReplicationPeerModificationStateStore; @@ -221,6 +222,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator; @@ -1058,6 +1060,17 @@ private void finishActiveMasterInitialization() throws IOException, InterruptedE this.balancer.initialize(); this.balancer.updateClusterMetrics(getClusterMetricsWithoutCoprocessor()); + // try migrate replication data + ZKReplicationQueueStorageForMigration oldReplicationQueueStorage = + new ZKReplicationQueueStorageForMigration(zooKeeper, conf); + // check whether there are something to migrate and we haven't scheduled a migration procedure + // yet + if ( + oldReplicationQueueStorage.hasData() && procedureExecutor.getProcedures().stream() + .allMatch(p -> !(p instanceof MigrateReplicationQueueFromZkToTableProcedure)) + ) { + procedureExecutor.submitProcedure(new MigrateReplicationQueueFromZkToTableProcedure()); + } // start up all service threads. startupTaskGroup.addTask("Initializing master service threads"); startServiceThreads(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java index 487c45e5c5cb..97976756d828 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.master.assignment.RegionStateNode; import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; import org.apache.hadoop.hbase.master.replication.AssignReplicationQueuesProcedure; +import org.apache.hadoop.hbase.master.replication.MigrateReplicationQueueFromZkToTableProcedure; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.procedure2.Procedure; @@ -52,6 +53,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ServerCrashState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; /** * Handle crashed server. This is a port to ProcedureV2 of what used to be euphemistically called @@ -266,6 +268,16 @@ protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState state) } break; case SERVER_CRASH_CLAIM_REPLICATION_QUEUES: + if ( + env.getMasterServices().getProcedures().stream() + .filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure) + .anyMatch(p -> !p.isFinished()) + ) { + LOG.info("There is a pending {}, will retry claim replication queue later", + MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName()); + suspend(10_000, true); + return Flow.NO_MORE_STATE; + } addChildProcedure(new AssignReplicationQueuesProcedure(serverName)); setNextState(ServerCrashState.SERVER_CRASH_FINISH); break; @@ -431,6 +443,13 @@ protected void releaseLock(final MasterProcedureEnv env) { env.getProcedureScheduler().wakeServerExclusiveLock(this, getServerName()); } + @Override + protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { + setState(ProcedureProtos.ProcedureState.RUNNABLE); + env.getProcedureScheduler().addFront(this); + return false; + } + @Override public void toStringClassDetails(StringBuilder sb) { sb.append(getProcName()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerNoLockProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerNoLockProcedure.java index 660f9968573d..1f0a89f20762 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerNoLockProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerNoLockProcedure.java @@ -98,10 +98,7 @@ protected final ProcedureSuspendedException suspend(Configuration conf, } long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); backoffConsumer.accept(backoff); - setTimeout(Math.toIntExact(backoff)); - setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); - skipPersistence(); - throw new ProcedureSuspendedException(); + throw suspend(Math.toIntExact(backoff), false); } protected final void resetRetry() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java new file mode 100644 index 000000000000..536f232338e9 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER; +import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER; +import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE; +import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE; +import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.stream.Collectors; +import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; +import org.apache.hadoop.hbase.util.VersionInfo; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableStateData; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; + +/** + * A procedure for migrating replication queue data from zookeeper to hbase:replication table. + */ +@InterfaceAudience.Private +public class MigrateReplicationQueueFromZkToTableProcedure + extends StateMachineProcedure + implements GlobalProcedureInterface { + + private static final Logger LOG = + LoggerFactory.getLogger(MigrateReplicationQueueFromZkToTableProcedure.class); + + private static final int MIN_MAJOR_VERSION = 3; + + private List disabledPeerIds; + + private List> futures; + + private ExecutorService executor; + + @Override + public String getGlobalId() { + return getClass().getSimpleName(); + } + + private ExecutorService getExecutorService() { + if (executor == null) { + executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder() + .setNameFormat(getClass().getSimpleName() + "-%d").setDaemon(true).build()); + } + return executor; + } + + private void shutdownExecutorService() { + if (executor != null) { + executor.shutdown(); + executor = null; + } + } + + private void waitUntilNoPeerProcedure(MasterProcedureEnv env) throws ProcedureSuspendedException { + long peerProcCount; + try { + peerProcCount = env.getMasterServices().getProcedures().stream() + .filter(p -> p instanceof PeerProcedureInterface).filter(p -> !p.isFinished()).count(); + } catch (IOException e) { + LOG.warn("failed to check peer procedure status", e); + throw suspend(5000, true); + } + if (peerProcCount > 0) { + LOG.info("There are still {} pending peer procedures, will sleep and check later", + peerProcCount); + throw suspend(10_000, true); + } + LOG.info("No pending peer procedures found, continue..."); + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, + MigrateReplicationQueueFromZkToTableState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + switch (state) { + case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE: + waitUntilNoPeerProcedure(env); + List peers = env.getReplicationPeerManager().listPeers(null); + if (peers.isEmpty()) { + LOG.info("No active replication peer found, delete old replication queue data and quit"); + ZKReplicationQueueStorageForMigration oldStorage = + new ZKReplicationQueueStorageForMigration(env.getMasterServices().getZooKeeper(), + env.getMasterConfiguration()); + try { + oldStorage.deleteAllData(); + } catch (KeeperException e) { + LOG.warn("failed to delete old replication queue data, sleep and retry later", e); + suspend(10_000, true); + } + return Flow.NO_MORE_STATE; + } + // here we do not care the peers which have already been disabled, as later we do not need + // to enable them + disabledPeerIds = peers.stream().filter(ReplicationPeerDescription::isEnabled) + .map(ReplicationPeerDescription::getPeerId).collect(Collectors.toList()); + setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER); + return Flow.HAS_MORE_STATE; + case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER: + for (String peerId : disabledPeerIds) { + addChildProcedure(new DisablePeerProcedure(peerId)); + } + setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE); + return Flow.HAS_MORE_STATE; + case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE: + if (futures != null) { + // wait until all futures done + long notDone = futures.stream().filter(f -> !f.isDone()).count(); + if (notDone == 0) { + boolean succ = true; + for (Future future : futures) { + try { + future.get(); + } catch (Exception e) { + succ = false; + LOG.warn("Failed to migrate", e); + } + } + if (succ) { + shutdownExecutorService(); + setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING); + return Flow.HAS_MORE_STATE; + } + // reschedule to retry migration again + futures = null; + } else { + LOG.info("There still {} pending migration tasks, will sleep and check later", notDone); + throw suspend(10_000, true); + } + } + try { + futures = env.getReplicationPeerManager() + .migrateQueuesFromZk(env.getMasterServices().getZooKeeper(), getExecutorService()); + } catch (IOException e) { + LOG.warn("failed to submit migration tasks", e); + throw suspend(10_000, true); + } + throw suspend(10_000, true); + case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING: + long rsWithLowerVersion = + env.getMasterServices().getServerManager().getOnlineServers().values().stream() + .filter(sm -> VersionInfo.getMajorVersion(sm.getVersion()) < MIN_MAJOR_VERSION).count(); + if (rsWithLowerVersion == 0) { + setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER); + return Flow.HAS_MORE_STATE; + } else { + LOG.info("There are still {} region servers which have a major version less than {}, " + + "will sleep and check later", rsWithLowerVersion, MIN_MAJOR_VERSION); + throw suspend(10_000, true); + } + case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER: + for (String peerId : disabledPeerIds) { + addChildProcedure(new EnablePeerProcedure(peerId)); + } + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } + + @Override + protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { + setState(ProcedureProtos.ProcedureState.RUNNABLE); + env.getProcedureScheduler().addFront(this); + return false; + } + + @Override + protected void rollbackState(MasterProcedureEnv env, + MigrateReplicationQueueFromZkToTableState state) throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected MigrateReplicationQueueFromZkToTableState getState(int stateId) { + return MigrateReplicationQueueFromZkToTableState.forNumber(stateId); + } + + @Override + protected int getStateId(MigrateReplicationQueueFromZkToTableState state) { + return state.getNumber(); + } + + @Override + protected MigrateReplicationQueueFromZkToTableState getInitialState() { + return MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + MigrateReplicationQueueFromZkToTableStateData.Builder builder = + MigrateReplicationQueueFromZkToTableStateData.newBuilder(); + if (disabledPeerIds != null) { + builder.addAllDisabledPeerId(disabledPeerIds); + } + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + MigrateReplicationQueueFromZkToTableStateData data = + serializer.deserialize(MigrateReplicationQueueFromZkToTableStateData.class); + disabledPeerIds = data.getDisabledPeerIdList().stream().collect(Collectors.toList()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java index 50214e205192..79bed1503bec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.io.InterruptedIOException; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.TableDescriptor; @@ -27,6 +28,7 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure; +import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; @@ -152,12 +154,36 @@ protected void reopenRegions(MasterProcedureEnv env) throws IOException { } } + private boolean shouldFailForMigrating(MasterProcedureEnv env) throws IOException { + long parentProcId = getParentProcId(); + if ( + parentProcId != Procedure.NO_PROC_ID && env.getMasterServices().getMasterProcedureExecutor() + .getProcedure(parentProcId) instanceof MigrateReplicationQueueFromZkToTableProcedure + ) { + // this is scheduled by MigrateReplicationQueueFromZkToTableProcedure, should not fail it + return false; + } + return env.getMasterServices().getProcedures().stream() + .filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure) + .anyMatch(p -> !p.isFinished()); + } + @Override protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state) throws ProcedureSuspendedException, InterruptedException { switch (state) { case PRE_PEER_MODIFICATION: try { + if (shouldFailForMigrating(env)) { + LOG.info("There is a pending {}, give up execution of {}", + MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName(), + getClass().getName()); + setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", + new DoNotRetryIOException("There is a pending " + + MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName())); + releaseLatch(env); + return Flow.NO_MORE_STATE; + } checkPeerModificationEnabled(env); prePeerModification(env); } catch (IOException e) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 57380920d0fc..f3cdaddb31ca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -21,14 +21,18 @@ import java.io.IOException; import java.net.URI; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -40,6 +44,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.conf.ConfigurationObserver; @@ -51,17 +56,24 @@ import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; +import org.apache.hadoop.hbase.replication.ReplicationQueueData; import org.apache.hadoop.hbase.replication.ReplicationQueueId; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; +import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.MigrationIterator; +import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId; +import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; @@ -116,7 +128,7 @@ public class ReplicationPeerManager implements ConfigurationObserver { private final ZKWatcher zk; @FunctionalInterface - private interface ReplicationQueueStorageInitializer { + interface ReplicationQueueStorageInitializer { void initialize() throws IOException; } @@ -151,6 +163,10 @@ private void checkQueuesDeleted(String peerId) } } + private void initializeQueueStorage() throws IOException { + queueStorageInitializer.initialize(); + } + void preAddPeer(String peerId, ReplicationPeerConfig peerConfig) throws ReplicationException, IOException { if (peerId.contains("-")) { @@ -165,7 +181,7 @@ void preAddPeer(String peerId, ReplicationPeerConfig peerConfig) } // lazy create table - queueStorageInitializer.initialize(); + initializeQueueStorage(); // make sure that there is no queues with the same peer id. This may happen when we create a // peer with the same id with a old deleted peer. If the replication queues for the old peer // have not been cleaned up yet then we should not create the new peer, otherwise the old wal @@ -718,4 +734,88 @@ public void onConfigurationChange(Configuration conf) { this.conf = conf; this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf); } + + private ReplicationQueueData convert(ZkReplicationQueueData zkData) { + Map groupOffsets = new HashMap<>(); + zkData.getWalOffsets().forEach((wal, offset) -> { + String walGroup = AbstractFSWALProvider.getWALPrefixFromWALName(wal); + groupOffsets.compute(walGroup, (k, oldOffset) -> { + if (oldOffset == null) { + return new ReplicationGroupOffset(wal, offset); + } + // we should record the first wal's offset + long oldWalTs = AbstractFSWALProvider.getTimestamp(oldOffset.getWal()); + long walTs = AbstractFSWALProvider.getTimestamp(wal); + if (walTs < oldWalTs) { + return new ReplicationGroupOffset(wal, offset); + } + return oldOffset; + }); + }); + return new ReplicationQueueData(zkData.getQueueId(), ImmutableMap.copyOf(groupOffsets)); + } + + private void migrateQueues(ZKReplicationQueueStorageForMigration oldQueueStorage) + throws Exception { + MigrationIterator>> iter = + oldQueueStorage.listAllQueues(); + for (;;) { + Pair> pair = iter.next(); + if (pair == null) { + return; + } + queueStorage.batchUpdateQueues(pair.getFirst(), + pair.getSecond().stream().filter(data -> peers.containsKey(data.getQueueId().getPeerId())) + .map(this::convert).collect(Collectors.toList())); + } + } + + private void migrateLastPushedSeqIds(ZKReplicationQueueStorageForMigration oldQueueStorage) + throws Exception { + MigrationIterator> iter = oldQueueStorage.listAllLastPushedSeqIds(); + for (;;) { + List list = iter.next(); + if (list == null) { + return; + } + queueStorage.batchUpdateLastSequenceIds(list.stream() + .filter(data -> peers.containsKey(data.getPeerId())).collect(Collectors.toList())); + } + } + + private void migrateHFileRefs(ZKReplicationQueueStorageForMigration oldQueueStorage) + throws Exception { + MigrationIterator>> iter = oldQueueStorage.listAllHFileRefs(); + for (;;) { + Pair> pair = iter.next(); + if (pair == null) { + return; + } + if (peers.containsKey(pair.getFirst())) { + queueStorage.batchUpdateHFileRefs(pair.getFirst(), pair.getSecond()); + } + } + } + + /** + * Submit the migration tasks to the given {@code executor} and return the futures. + */ + List> migrateQueuesFromZk(ZKWatcher zookeeper, ExecutorService executor) + throws IOException { + // the replication queue table creation is asynchronous and will be triggered by addPeer, so + // here we need to manually initialize it since we will not call addPeer. + initializeQueueStorage(); + ZKReplicationQueueStorageForMigration oldStorage = + new ZKReplicationQueueStorageForMigration(zookeeper, conf); + return Arrays.asList(executor.submit(() -> { + migrateQueues(oldStorage); + return null; + }), executor.submit(() -> { + migrateLastPushedSeqIds(oldStorage); + return null; + }), executor.submit(() -> { + migrateHFileRefs(oldStorage); + return null; + })); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java index ed0760c69924..df6078d64bed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.StateMachineProcedure.Flow; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationUtils; @@ -236,6 +237,19 @@ protected Flow executeFromState(MasterProcedureEnv env, switch (state) { case PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION: try { + if ( + env.getMasterServices().getProcedures().stream() + .filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure) + .anyMatch(p -> !p.isFinished()) + ) { + LOG.info("There is a pending {}, give up execution of {}", + MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName(), + getClass().getSimpleName()); + setFailure("master-transit-peer-sync-replication-state", + new DoNotRetryIOException("There is a pending " + + MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName())); + return Flow.NO_MORE_STATE; + } checkPeerModificationEnabled(env); preTransit(env); } catch (IOException e) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueue.java new file mode 100644 index 000000000000..1b0f727a0722 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueue.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; +import org.apache.hadoop.hbase.replication.ReplicationQueueData; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.replication.TestReplicationBase; +import org.apache.hadoop.hbase.replication.ZKReplicationStorageBase; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; + +@Category({ MasterTests.class, LargeTests.class }) +public class TestMigrateReplicationQueue extends TestReplicationBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMigrateReplicationQueue.class); + + private int disableAndInsert() throws Exception { + UTIL1.getAdmin().disableReplicationPeer(PEER_ID2); + return UTIL1.loadTable(htable1, famName); + } + + private String getQueuesZNode() throws IOException { + Configuration conf = UTIL1.getConfiguration(); + ZKWatcher zk = UTIL1.getZooKeeperWatcher(); + String replicationZNode = ZNodePaths.joinZNode(zk.getZNodePaths().baseZNode, + conf.get(ZKReplicationStorageBase.REPLICATION_ZNODE, + ZKReplicationStorageBase.REPLICATION_ZNODE_DEFAULT)); + return ZNodePaths.joinZNode(replicationZNode, conf.get("zookeeper.znode.replication.rs", "rs")); + } + + private void mockData() throws Exception { + // delete the replication queue table to simulate upgrading from an older version of hbase + TableName replicationQueueTableName = TableName + .valueOf(UTIL1.getConfiguration().get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, + ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString())); + List queueDatas = UTIL1.getMiniHBaseCluster().getMaster() + .getReplicationPeerManager().getQueueStorage().listAllQueues(); + assertEquals(UTIL1.getMiniHBaseCluster().getRegionServerThreads().size(), queueDatas.size()); + UTIL1.getAdmin().disableTable(replicationQueueTableName); + UTIL1.getAdmin().deleteTable(replicationQueueTableName); + // shutdown the hbase cluster + UTIL1.shutdownMiniHBaseCluster(); + ZKWatcher zk = UTIL1.getZooKeeperWatcher(); + String queuesZNode = getQueuesZNode(); + for (ReplicationQueueData queueData : queueDatas) { + String replicatorZNode = + ZNodePaths.joinZNode(queuesZNode, queueData.getId().getServerName().toString()); + String queueZNode = ZNodePaths.joinZNode(replicatorZNode, queueData.getId().getPeerId()); + assertEquals(1, queueData.getOffsets().size()); + ReplicationGroupOffset offset = Iterables.getOnlyElement(queueData.getOffsets().values()); + String walZNode = ZNodePaths.joinZNode(queueZNode, offset.getWal()); + ZKUtil.createSetData(zk, walZNode, ZKUtil.positionToByteArray(offset.getOffset())); + } + } + + @Test + public void testMigrate() throws Exception { + int count = disableAndInsert(); + mockData(); + restartSourceCluster(1); + UTIL1.waitFor(60000, + () -> UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream() + .filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure).findAny() + .map(Procedure::isSuccess).orElse(false)); + TableName replicationQueueTableName = TableName + .valueOf(UTIL1.getConfiguration().get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, + ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString())); + assertTrue(UTIL1.getAdmin().tableExists(replicationQueueTableName)); + ZKWatcher zk = UTIL1.getZooKeeperWatcher(); + assertEquals(-1, ZKUtil.checkExists(zk, getQueuesZNode())); + // wait until SCP finishes, which means we can finish the claim queue operation + UTIL1.waitFor(60000, () -> UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream() + .filter(p -> p instanceof ServerCrashProcedure).allMatch(Procedure::isSuccess)); + List queueDatas = UTIL1.getMiniHBaseCluster().getMaster() + .getReplicationPeerManager().getQueueStorage().listAllQueues(); + assertEquals(1, queueDatas.size()); + // should have 1 recovered queue, as we haven't replicated anything out so there is no queue + // data for the new alive region server + assertTrue(queueDatas.get(0).getId().isRecovered()); + assertEquals(1, queueDatas.get(0).getOffsets().size()); + // the peer is still disabled, so no data has been replicated + assertFalse(UTIL1.getAdmin().isReplicationPeerEnabled(PEER_ID2)); + assertEquals(0, HBaseTestingUtil.countRows(htable2)); + // enable peer, and make sure the replication can continue correctly + UTIL1.getAdmin().enableReplicationPeer(PEER_ID2); + waitForReplication(count, 100); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedure.java new file mode 100644 index 000000000000..752abc380b84 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedure.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING; +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.ServerMetrics; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.StartTestingClusterOption; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.RegionServerList; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestMigrateReplicationQueueFromZkToTableProcedure { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMigrateReplicationQueueFromZkToTableProcedure.class); + + private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); + + public static final class HMasterForTest extends HMaster { + + public HMasterForTest(Configuration conf) throws IOException { + super(conf); + } + + @Override + protected ServerManager createServerManager(MasterServices master, RegionServerList storage) + throws IOException { + setupClusterConnection(); + return new ServerManagerForTest(master, storage); + } + } + + private static final ConcurrentMap EXTRA_REGION_SERVERS = + new ConcurrentHashMap<>(); + + public static final class ServerManagerForTest extends ServerManager { + + public ServerManagerForTest(MasterServices master, RegionServerList storage) { + super(master, storage); + } + + @Override + public Map getOnlineServers() { + Map map = new HashMap<>(super.getOnlineServers()); + map.putAll(EXTRA_REGION_SERVERS); + return map; + } + } + + @BeforeClass + public static void setupCluster() throws Exception { + UTIL.startMiniCluster( + StartTestingClusterOption.builder().masterClass(HMasterForTest.class).build()); + } + + @AfterClass + public static void cleanupTest() throws Exception { + UTIL.shutdownMiniCluster(); + } + + private ProcedureExecutor getMasterProcedureExecutor() { + return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor(); + } + + @After + public void tearDown() throws Exception { + Admin admin = UTIL.getAdmin(); + for (ReplicationPeerDescription pd : admin.listReplicationPeers()) { + admin.removeReplicationPeer(pd.getPeerId()); + } + } + + private static CountDownLatch PEER_PROC_ARRIVE; + + private static CountDownLatch PEER_PROC_RESUME; + + public static final class FakePeerProcedure extends Procedure + implements PeerProcedureInterface { + + private String peerId; + + public FakePeerProcedure() { + } + + public FakePeerProcedure(String peerId) { + this.peerId = peerId; + } + + @Override + public String getPeerId() { + return peerId; + } + + @Override + public PeerOperationType getPeerOperationType() { + return PeerOperationType.UPDATE_CONFIG; + } + + @Override + protected Procedure[] execute(MasterProcedureEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + PEER_PROC_ARRIVE.countDown(); + PEER_PROC_RESUME.await(); + return null; + } + + @Override + protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + } + } + + @Test + public void testWaitUntilNoPeerProcedure() throws Exception { + PEER_PROC_ARRIVE = new CountDownLatch(1); + PEER_PROC_RESUME = new CountDownLatch(1); + ProcedureExecutor procExec = getMasterProcedureExecutor(); + procExec.submitProcedure(new FakePeerProcedure("1")); + PEER_PROC_ARRIVE.await(); + MigrateReplicationQueueFromZkToTableProcedure proc = + new MigrateReplicationQueueFromZkToTableProcedure(); + procExec.submitProcedure(proc); + // make sure we will wait until there is no peer related procedures before proceeding + UTIL.waitFor(30000, () -> proc.getState() == ProcedureState.WAITING_TIMEOUT); + // continue and make sure we can finish successfully + PEER_PROC_RESUME.countDown(); + UTIL.waitFor(30000, () -> proc.isSuccess()); + } + + @Test + public void testDisablePeerAndWaitUpgrading() throws Exception { + String peerId = "2"; + ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() + .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/testhbase") + .setReplicateAllUserTables(true).build(); + UTIL.getAdmin().addReplicationPeer(peerId, rpc); + // put a fake region server to simulate that there are still region servers with older version + ServerMetrics metrics = mock(ServerMetrics.class); + when(metrics.getVersion()).thenReturn("2.5.0"); + EXTRA_REGION_SERVERS + .put(ServerName.valueOf("localhost", 54321, EnvironmentEdgeManager.currentTime()), metrics); + + ProcedureExecutor procExec = getMasterProcedureExecutor(); + + MigrateReplicationQueueFromZkToTableProcedure proc = + new MigrateReplicationQueueFromZkToTableProcedure(); + procExec.submitProcedure(proc); + // wait until we reach the wait upgrading state + UTIL.waitFor(30000, + () -> proc.getCurrentStateId() + == MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING.getNumber() + && proc.getState() == ProcedureState.WAITING_TIMEOUT); + // make sure the peer is disabled for migrating + assertFalse(UTIL.getAdmin().isReplicationPeerEnabled(peerId)); + + // the procedure should finish successfully + EXTRA_REGION_SERVERS.clear(); + UTIL.waitFor(30000, () -> proc.isSuccess()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedureRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedureRecovery.java new file mode 100644 index 000000000000..8d1a975400fa --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedureRecovery.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; + +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; +import org.apache.hadoop.hbase.replication.ZKReplicationStorageBase; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestMigrateReplicationQueueFromZkToTableProcedureRecovery { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMigrateReplicationQueueFromZkToTableProcedureRecovery.class); + + private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); + + @BeforeClass + public static void setupCluster() throws Exception { + UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1); + UTIL.startMiniCluster(1); + } + + @AfterClass + public static void cleanupTest() throws Exception { + UTIL.shutdownMiniCluster(); + } + + private ProcedureExecutor getMasterProcedureExecutor() { + return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor(); + } + + @Before + public void setup() throws Exception { + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(getMasterProcedureExecutor(), false); + } + + @After + public void tearDown() throws Exception { + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(getMasterProcedureExecutor(), false); + } + + private String getHFileRefsZNode() throws IOException { + Configuration conf = UTIL.getConfiguration(); + ZKWatcher zk = UTIL.getZooKeeperWatcher(); + String replicationZNode = ZNodePaths.joinZNode(zk.getZNodePaths().baseZNode, + conf.get(ZKReplicationStorageBase.REPLICATION_ZNODE, + ZKReplicationStorageBase.REPLICATION_ZNODE_DEFAULT)); + return ZNodePaths.joinZNode(replicationZNode, + conf.get(ZKReplicationQueueStorageForMigration.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY, + ZKReplicationQueueStorageForMigration.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT)); + } + + @Test + public void testRecoveryAndDoubleExecution() throws Exception { + String peerId = "2"; + ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() + .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/testhbase") + .setReplicateAllUserTables(true).build(); + UTIL.getAdmin().addReplicationPeer(peerId, rpc); + + // here we only test a simple migration, more complicated migration will be tested in other UTs, + // such as TestMigrateReplicationQueue and TestReplicationPeerManagerMigrateFromZk + String hfileRefsZNode = getHFileRefsZNode(); + String hfile = "hfile"; + String hfileZNode = ZNodePaths.joinZNode(hfileRefsZNode, peerId, hfile); + ZKUtil.createWithParents(UTIL.getZooKeeperWatcher(), hfileZNode); + + ProcedureExecutor procExec = getMasterProcedureExecutor(); + + ProcedureTestingUtility.waitNoProcedureRunning(procExec); + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true); + + // Start the migration procedure && kill the executor + long procId = procExec.submitProcedure(new MigrateReplicationQueueFromZkToTableProcedure()); + // Restart the executor and execute the step twice + MasterProcedureTestingUtility.testRecoveryAndDoubleExecution(procExec, procId); + // Validate the migration result + ProcedureTestingUtility.assertProcNotFailed(procExec, procId); + ReplicationQueueStorage queueStorage = + UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage(); + List hfiles = queueStorage.getReplicableHFiles(peerId); + assertThat(hfiles, Matchers.> both(hasItem(hfile)).and(hasSize(1))); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestReplicationPeerManagerMigrateQueuesFromZk.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestReplicationPeerManagerMigrateQueuesFromZk.java new file mode 100644 index 000000000000..76301ae67531 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestReplicationPeerManagerMigrateQueuesFromZk.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNameTestRule; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager.ReplicationQueueStorageInitializer; +import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; +import org.apache.hadoop.hbase.replication.ReplicationQueueData; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.replication.TableReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.TestZKReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestReplicationPeerManagerMigrateQueuesFromZk { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicationPeerManagerMigrateQueuesFromZk.class); + + private static HBaseTestingUtil UTIL = new HBaseTestingUtil(); + + private static ExecutorService EXECUTOR; + + ConcurrentMap peers; + + private ReplicationPeerStorage peerStorage; + + private ReplicationQueueStorage queueStorage; + + private ReplicationQueueStorageInitializer queueStorageInitializer; + + private ReplicationPeerManager manager; + + private int nServers = 10; + + private int nPeers = 10; + + private int nRegions = 100; + + private ServerName deadServerName; + + @Rule + public final TableNameTestRule tableNameRule = new TableNameTestRule(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + UTIL.startMiniCluster(1); + EXECUTOR = Executors.newFixedThreadPool(3, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat(TestReplicationPeerManagerMigrateQueuesFromZk.class.getSimpleName() + "-%d") + .build()); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + EXECUTOR.shutdownNow(); + UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws IOException { + Configuration conf = UTIL.getConfiguration(); + peerStorage = mock(ReplicationPeerStorage.class); + TableName tableName = tableNameRule.getTableName(); + UTIL.getAdmin() + .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName)); + queueStorage = new TableReplicationQueueStorage(UTIL.getConnection(), tableName); + queueStorageInitializer = mock(ReplicationQueueStorageInitializer.class); + peers = new ConcurrentHashMap<>(); + deadServerName = + ServerName.valueOf("test-hbase-dead", 12345, EnvironmentEdgeManager.currentTime()); + manager = new ReplicationPeerManager(UTIL.getTestFileSystem(), UTIL.getZooKeeperWatcher(), + peerStorage, queueStorage, peers, conf, "cluster", queueStorageInitializer); + } + + private Map> prepareData() throws Exception { + ZKReplicationQueueStorageForMigration storage = new ZKReplicationQueueStorageForMigration( + UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); + TestZKReplicationQueueStorage.mockQueuesData(storage, 10, "peer_0", deadServerName); + Map> encodedName2PeerIds = TestZKReplicationQueueStorage + .mockLastPushedSeqIds(storage, "peer_1", "peer_2", nRegions, 10, 10); + TestZKReplicationQueueStorage.mockHFileRefs(storage, 10); + return encodedName2PeerIds; + } + + @Test + public void testNoPeers() throws Exception { + prepareData(); + for (Future future : manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR)) { + future.get(1, TimeUnit.MINUTES); + } + // should have called initializer + verify(queueStorageInitializer).initialize(); + // should have not migrated any data since there is no peer + try (Table table = UTIL.getConnection().getTable(tableNameRule.getTableName())) { + assertEquals(0, HBaseTestingUtil.countRows(table)); + } + } + + @Test + public void testMigrate() throws Exception { + Map> encodedName2PeerIds = prepareData(); + // add all peers so we will migrate them all + for (int i = 0; i < nPeers; i++) { + // value is not used in this test, so just add a mock + peers.put("peer_" + i, mock(ReplicationPeerDescription.class)); + } + for (Future future : manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR)) { + future.get(1, TimeUnit.MINUTES); + } + // should have called initializer + verify(queueStorageInitializer).initialize(); + List queueDatas = queueStorage.listAllQueues(); + // there should be two empty queues so minus 2 + assertEquals(2 * nServers - 2, queueDatas.size()); + for (ReplicationQueueData queueData : queueDatas) { + assertEquals("peer_0", queueData.getId().getPeerId()); + assertEquals(1, queueData.getOffsets().size()); + String walGroup = queueData.getId().getServerWALsBelongTo().toString(); + ReplicationGroupOffset offset = queueData.getOffsets().get(walGroup); + assertEquals(0, offset.getOffset()); + assertEquals(queueData.getId().getServerWALsBelongTo().toString() + ".0", offset.getWal()); + } + // there is no method in ReplicationQueueStorage can list all the last pushed sequence ids + try (Table table = UTIL.getConnection().getTable(tableNameRule.getTableName()); + ResultScanner scanner = + table.getScanner(TableReplicationQueueStorage.LAST_SEQUENCE_ID_FAMILY)) { + for (int i = 0; i < 2; i++) { + Result result = scanner.next(); + String peerId = Bytes.toString(result.getRow()); + assertEquals(nRegions, result.size()); + for (Cell cell : result.rawCells()) { + String encodedRegionName = Bytes.toString(cell.getQualifierArray(), + cell.getQualifierOffset(), cell.getQualifierLength()); + encodedName2PeerIds.get(encodedRegionName).remove(peerId); + long seqId = + Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + assertEquals(i + 1, seqId); + } + } + encodedName2PeerIds.forEach((encodedRegionName, peerIds) -> { + assertThat(encodedRegionName + " still has unmigrated peers", peerIds, empty()); + }); + assertNull(scanner.next()); + } + for (int i = 0; i < nPeers; i++) { + List refs = queueStorage.getReplicableHFiles("peer_" + i); + assertEquals(i, refs.size()); + Set refsSet = new HashSet<>(refs); + for (int j = 0; j < i; j++) { + assertTrue(refsSet.remove("hfile-" + j)); + } + assertThat(refsSet, empty()); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index b6157ac0f184..27477527277f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -216,7 +216,7 @@ static void configureClusters(HBaseTestingUtil util1, HBaseTestingUtil util2) { conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); } - static void restartSourceCluster(int numSlaves) throws Exception { + protected static void restartSourceCluster(int numSlaves) throws Exception { Closeables.close(hbaseAdmin, true); Closeables.close(htable1, true); UTIL1.shutdownMiniHBaseCluster(); diff --git a/pom.xml b/pom.xml index 1dd236d5955b..fd5ddd586ee3 100644 --- a/pom.xml +++ b/pom.xml @@ -1037,13 +1037,18 @@ hbase-hadoop-compat ${project.version} test-jar - test org.apache.hbase hbase-replication ${project.version} + + org.apache.hbase + hbase-replication + ${project.version} + test-jar + org.apache.hbase hbase-balancer