diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfo.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfo.java index ad55d1f2a468..817e23744e36 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfo.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfo.java @@ -19,10 +19,8 @@ package org.apache.hadoop.hbase.rsgroup; import java.util.Collection; -import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; - import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.net.Address; import org.apache.yetus.audience.InterfaceAudience; @@ -104,7 +102,7 @@ public boolean containsServer(Address hostPort) { /** * Get list of servers. */ - public Set
getServers() { + public SortedSet
getServers() { return servers; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java index 947251a41002..f998d94e0f06 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java @@ -61,7 +61,7 @@ public class RSGroupAdminServer implements RSGroupAdmin { "one server in 'default' RSGroup."; private MasterServices master; - private final RSGroupInfoManager rsGroupInfoManager; + final RSGroupInfoManager rsGroupInfoManager; /** Define the config key of retries threshold when movements failed */ //made package private for testing diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServiceImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServiceImpl.java index 6bc45194ded6..749d3536b1ae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServiceImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServiceImpl.java @@ -164,7 +164,7 @@ public void getRSGroupInfoOfTable(RpcController controller, GetRSGroupInfoOfTabl } checkPermission("getRSGroupInfoOfTable"); Optional optGroup = - RSGroupUtil.getRSGroupInfo(master, groupAdminServer, tableName); + RSGroupUtil.getRSGroupInfo(master, groupAdminServer.rsGroupInfoManager, tableName); if (optGroup.isPresent()) { builder.setRSGroupInfo(ProtobufUtil.toProtoGroupInfo(fillTables(optGroup.get()))); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java index 28f7c1f3e901..1b9f3efef56d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.List; import java.util.Set; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.net.Address; import org.apache.yetus.audience.InterfaceAudience; @@ -67,11 +68,6 @@ Set
moveServers(Set
servers, String srcGroup, String dstGroup) */ List listRSGroups() throws IOException; - /** - * Refresh/reload the group information from the persistent store - */ - void refresh() throws IOException; - /** * Whether the manager is able to fully return group metadata * @return whether the manager is in online mode @@ -83,4 +79,12 @@ Set
moveServers(Set
servers, String srcGroup, String dstGroup) * @param servers set of servers to remove */ void removeServers(Set
servers) throws IOException; + + /** + * Get {@code RSGroupInfo} for the given table. + * @deprecated Since 3.0.0, will be removed in 4.0.0. Only for compatibility, where we upgrade + * from a version that stores table names for a rs group in the {@code RSGroupInfo}. + */ + @Deprecated + RSGroupInfo getRSGroupForTable(TableName tableName) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java index eaf23f32b45a..67250669ea9b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java @@ -20,6 +20,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.client.AsyncTable; @@ -76,6 +78,7 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.collect.Maps; import org.apache.hbase.thirdparty.com.google.common.collect.Sets; @@ -104,9 +107,6 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { private static final Logger LOG = LoggerFactory.getLogger(RSGroupInfoManagerImpl.class); - private static final String REASSIGN_WAIT_INTERVAL_KEY = "hbase.rsgroup.reassign.wait"; - private static final long DEFAULT_REASSIGN_WAIT_INTERVAL = 30 * 1000L; - // Assigned before user tables @VisibleForTesting static final TableName RSGROUP_TABLE_NAME = @@ -120,6 +120,9 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { @VisibleForTesting static final byte[] META_QUALIFIER_BYTES = Bytes.toBytes("i"); + @VisibleForTesting + static final String MIGRATE_THREAD_NAME = "Migrate-RSGroup-Tables"; + private static final byte[] ROW_KEY = { 0 }; /** Table descriptor for hbase:rsgroup catalog table */ @@ -140,7 +143,30 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager { // There two Maps are immutable and wholesale replaced on each modification // so are safe to access concurrently. See class comment. - private volatile Map rsGroupMap = Collections.emptyMap(); + private static final class RSGroupInfoHolder { + final ImmutableMap groupName2Group; + final ImmutableMap tableName2Group; + + RSGroupInfoHolder() { + this(Collections.emptyMap()); + } + + RSGroupInfoHolder(Map rsGroupMap) { + ImmutableMap.Builder group2Name2GroupBuilder = ImmutableMap.builder(); + ImmutableMap.Builder tableName2GroupBuilder = ImmutableMap.builder(); + rsGroupMap.forEach((groupName, rsGroupInfo) -> { + group2Name2GroupBuilder.put(groupName, rsGroupInfo); + if (!groupName.equals(RSGroupInfo.DEFAULT_GROUP)) { + rsGroupInfo.getTables() + .forEach(tableName -> tableName2GroupBuilder.put(tableName, rsGroupInfo)); + } + }); + this.groupName2Group = group2Name2GroupBuilder.build(); + this.tableName2Group = tableName2GroupBuilder.build(); + } + } + + private volatile RSGroupInfoHolder holder = new RSGroupInfoHolder(); private final MasterServices masterServices; private final AsyncClusterConnection conn; @@ -160,9 +186,10 @@ private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException private synchronized void init() throws IOException { - refresh(); + refresh(false); serverEventsListenerThread.start(); masterServices.getServerManager().registerListener(serverEventsListenerThread); + migrate(); } static RSGroupInfoManager getInstance(MasterServices master) throws IOException { @@ -179,6 +206,7 @@ public void start() { @Override public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException { checkGroupName(rsGroupInfo.getName()); + Map rsGroupMap = holder.groupName2Group; if (rsGroupMap.get(rsGroupInfo.getName()) != null || rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { throw new DoNotRetryIOException("Group already exists: " + rsGroupInfo.getName()); @@ -235,7 +263,7 @@ public synchronized Set
moveServers(Set
servers, String srcGro } dst.addServer(el); } - Map newGroupMap = Maps.newHashMap(rsGroupMap); + Map newGroupMap = Maps.newHashMap(holder.groupName2Group); newGroupMap.put(src.getName(), src); newGroupMap.put(dst.getName(), dst); flushConfig(newGroupMap); @@ -244,7 +272,7 @@ public synchronized Set
moveServers(Set
servers, String srcGro @Override public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException { - for (RSGroupInfo info : rsGroupMap.values()) { + for (RSGroupInfo info : holder.groupName2Group.values()) { if (info.containsServer(serverHostPort)) { return info; } @@ -254,11 +282,12 @@ public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException @Override public RSGroupInfo getRSGroup(String groupName) { - return rsGroupMap.get(groupName); + return holder.groupName2Group.get(groupName); } @Override public synchronized void removeRSGroup(String groupName) throws IOException { + Map rsGroupMap = holder.groupName2Group; if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) { throw new DoNotRetryIOException( "Group " + groupName + " does not exist or is a reserved " + "group"); @@ -270,7 +299,7 @@ public synchronized void removeRSGroup(String groupName) throws IOException { @Override public List listRSGroups() { - return Lists.newArrayList(rsGroupMap.values()); + return Lists.newArrayList(holder.groupName2Group.values()); } @Override @@ -298,7 +327,7 @@ public synchronized void removeServers(Set
servers) throws IOException } if (rsGroupInfos.size() > 0) { - Map newGroupMap = Maps.newHashMap(rsGroupMap); + Map newGroupMap = Maps.newHashMap(holder.groupName2Group); newGroupMap.putAll(rsGroupInfos); flushConfig(newGroupMap); } @@ -349,9 +378,90 @@ private List retrieveGroupListFromZookeeper() throws IOException { return RSGroupInfoList; } - @Override - public void refresh() throws IOException { - refresh(false); + private void migrate(Collection groupList) { + TableDescriptors tds = masterServices.getTableDescriptors(); + for (RSGroupInfo groupInfo : groupList) { + if (groupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { + continue; + } + SortedSet failedTables = new TreeSet<>(); + for (TableName tableName : groupInfo.getTables()) { + LOG.debug("Migrating {} in group {}", tableName, groupInfo.getName()); + TableDescriptor oldTd; + try { + oldTd = tds.get(tableName); + } catch (IOException e) { + LOG.warn("Failed to migrate {} in group {}", tableName, groupInfo.getName(), e); + failedTables.add(tableName); + continue; + } + if (oldTd == null) { + continue; + } + if (oldTd.getRegionServerGroup().isPresent()) { + // either we have already migrated it or that user has set the rs group using the new + // code which will set the group directly on table descriptor, skip. + LOG.debug("Skip migrating {} since it is already in group {}", tableName, + oldTd.getRegionServerGroup().get()); + continue; + } + TableDescriptor newTd = TableDescriptorBuilder.newBuilder(oldTd) + .setRegionServerGroup(groupInfo.getName()).build(); + // This is a bit tricky. Since we know that the region server group config in + // TableDescriptor will only be used at master side, it is fine to just update the table + // descriptor on file system and also the cache, without reopening all the regions. This + // will be much faster than the normal modifyTable. And when upgrading, we will update + // master first and then region server, so after all the region servers has been reopened, + // the new TableDescriptor will be loaded. + try { + tds.add(newTd); + } catch (IOException e) { + LOG.warn("Failed to migrate {} in group {}", tableName, groupInfo.getName(), e); + failedTables.add(tableName); + continue; + } + } + LOG.debug("Done migrating {}, failed tables {}", groupInfo.getName(), failedTables); + synchronized (RSGroupInfoManagerImpl.this) { + Map rsGroupMap = holder.groupName2Group; + RSGroupInfo currentInfo = rsGroupMap.get(groupInfo.getName()); + if (currentInfo != null) { + RSGroupInfo newInfo = + new RSGroupInfo(currentInfo.getName(), currentInfo.getServers(), failedTables); + Map newGroupMap = new HashMap<>(rsGroupMap); + newGroupMap.put(groupInfo.getName(), newInfo); + try { + flushConfig(newGroupMap); + } catch (IOException e) { + LOG.warn("Failed to persist rs group {}", newInfo.getName(), e); + } + } + } + } + } + + // Migrate the table rs group info from RSGroupInfo into the table descriptor + // Notice that we do not want to block the initialize so this will be done in background, and + // during the migrating, the rs group info maybe incomplete and cause region to be misplaced. + private void migrate() { + Thread migrateThread = new Thread(MIGRATE_THREAD_NAME) { + + @Override + public void run() { + LOG.info("Start migrating table rs group config"); + while (!masterServices.isStopped()) { + Collection groups = holder.groupName2Group.values(); + boolean hasTables = groups.stream().anyMatch(r -> !r.getTables().isEmpty()); + if (!hasTables) { + break; + } + migrate(groups); + } + LOG.info("Done migrating table rs group info"); + } + }; + migrateThread.setDaemon(true); + migrateThread.start(); } /** @@ -381,7 +491,7 @@ private synchronized void refresh(boolean forceOnline) throws IOException { newGroupMap.put(group.getName(), group); } resetRSGroupMap(newGroupMap); - updateCacheOfRSGroups(rsGroupMap.keySet()); + updateCacheOfRSGroups(newGroupMap.keySet()); } private void flushConfigTable(Map groupMap) throws IOException { @@ -411,20 +521,20 @@ private void flushConfigTable(Map groupMap) throws IOExcept } private synchronized void flushConfig() throws IOException { - flushConfig(this.rsGroupMap); + flushConfig(holder.groupName2Group); } private synchronized void flushConfig(Map newGroupMap) throws IOException { // For offline mode persistence is still unavailable // We're refreshing in-memory state but only for servers in default group if (!isOnline()) { - if (newGroupMap == this.rsGroupMap) { + if (newGroupMap == holder.groupName2Group) { // When newGroupMap is this.rsGroupMap itself, // do not need to check default group and other groups as followed return; } - Map oldGroupMap = Maps.newHashMap(rsGroupMap); + Map oldGroupMap = Maps.newHashMap(holder.groupName2Group); RSGroupInfo oldDefaultGroup = oldGroupMap.remove(RSGroupInfo.DEFAULT_GROUP); RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP); if (!oldGroupMap.equals(newGroupMap) /* compare both tables and servers in other groups */ || @@ -438,7 +548,7 @@ private synchronized void flushConfig(Map newGroupMap) thro // Refresh rsGroupMap // according to the inputted newGroupMap (an updated copy of rsGroupMap) - rsGroupMap = newGroupMap; + this.holder = new RSGroupInfoHolder(newGroupMap); // Do not need to update tableMap // because only the update on servers in default group is allowed above, @@ -495,8 +605,7 @@ private void saveRSGroupMapToZK(Map newGroupMap) throws IOE * Make changes visible. Caller must be synchronized on 'this'. */ private void resetRSGroupMap(Map newRSGroupMap) { - // Make maps Immutable. - this.rsGroupMap = Collections.unmodifiableMap(newRSGroupMap); + this.holder = new RSGroupInfoHolder(newRSGroupMap); } /** @@ -549,6 +658,7 @@ private SortedSet
getDefaultServers() throws IOException { // Called by ServerEventsListenerThread. Synchronize on this because redoing // the rsGroupMap then writing it out. private synchronized void updateDefaultServers(SortedSet
servers) { + Map rsGroupMap = holder.groupName2Group; RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP); RSGroupInfo newInfo = new RSGroupInfo(info.getName(), servers); HashMap newGroupMap = Maps.newHashMap(rsGroupMap); @@ -647,6 +757,8 @@ private boolean waitForGroupTableOnline() { online = true; // flush any inconsistencies between ZK and HTable RSGroupInfoManagerImpl.this.flushConfig(); + // migrate after we are online. + migrate(); return true; } catch (Exception e) { LOG.warn("Failed to perform check", e); @@ -725,4 +837,10 @@ private void checkGroupName(String groupName) throws ConstraintException { throw new ConstraintException("RSGroup name should only contain alphanumeric characters"); } } + + + @Override + public RSGroupInfo getRSGroupForTable(TableName tableName) throws IOException { + return holder.tableName2Group.get(tableName); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupUtil.java index a08d236129ed..af30049fd33d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupUtil.java @@ -34,12 +34,11 @@ final class RSGroupUtil { private RSGroupUtil() { } - @FunctionalInterface - private interface GetRSGroup { - RSGroupInfo get(String groupName) throws IOException; - } - - private static Optional getRSGroupInfo(MasterServices master, GetRSGroup getter, + /** + * Will try to get the rsgroup from {@link TableDescriptor} first, and then try to get the rsgroup + * from the {@link NamespaceDescriptor}. If still not present, return empty. + */ + static Optional getRSGroupInfo(MasterServices master, RSGroupInfoManager manager, TableName tableName) throws IOException { TableDescriptor td = master.getTableDescriptors().get(tableName); if (td == null) { @@ -47,11 +46,17 @@ private static Optional getRSGroupInfo(MasterServices master, GetRS } Optional optGroupNameOfTable = td.getRegionServerGroup(); if (optGroupNameOfTable.isPresent()) { - RSGroupInfo group = getter.get(optGroupNameOfTable.get()); + RSGroupInfo group = manager.getRSGroup(optGroupNameOfTable.get()); if (group != null) { return Optional.of(group); } } + // for backward compatible, where we may still have table configs in the RSGroupInfo after + // upgrading when migrating is still on-going. + RSGroupInfo groupFromOldRSGroupInfo = manager.getRSGroupForTable(tableName); + if (groupFromOldRSGroupInfo != null) { + return Optional.of(groupFromOldRSGroupInfo); + } ClusterSchema clusterSchema = master.getClusterSchema(); if (clusterSchema == null) { if (TableName.isMetaTableName(tableName)) { @@ -67,25 +72,7 @@ private static Optional getRSGroupInfo(MasterServices master, GetRS if (groupNameOfNs == null) { return Optional.empty(); } - return Optional.ofNullable(getter.get(groupNameOfNs)); - } - - /** - * Will try to get the rsgroup from {@link TableDescriptor} first, and then try to get the rsgroup - * from the {@link NamespaceDescriptor}. If still not present, return empty. - */ - static Optional getRSGroupInfo(MasterServices master, RSGroupInfoManager manager, - TableName tableName) throws IOException { - return getRSGroupInfo(master, manager::getRSGroup, tableName); - } - - /** - * Will try to get the rsgroup from {@link TableDescriptor} first, and then try to get the rsgroup - * from the {@link NamespaceDescriptor}. If still not present, return empty. - */ - static Optional getRSGroupInfo(MasterServices master, RSGroupAdmin admin, - TableName tableName) throws IOException { - return getRSGroupInfo(master, admin::getRSGroupInfo, tableName); + return Optional.ofNullable(manager.getRSGroup(groupNameOfNs)); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestMigrateRSGroupInfo.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestMigrateRSGroupInfo.java new file mode 100644 index 000000000000..8d63cf846990 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestMigrateRSGroupInfo.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.rsgroup; + +import static org.apache.hadoop.hbase.rsgroup.RSGroupInfoManagerImpl.META_FAMILY_BYTES; +import static org.apache.hadoop.hbase.rsgroup.RSGroupInfoManagerImpl.META_QUALIFIER_BYTES; +import static org.apache.hadoop.hbase.rsgroup.RSGroupInfoManagerImpl.RSGROUP_TABLE_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableDescriptors; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.KeeperException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Testcase for HBASE-22819 + */ +@Category({ MediumTests.class }) +public class TestMigrateRSGroupInfo extends TestRSGroupsBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMigrateRSGroupInfo.class); + + private static String TABLE_NAME_PREFIX = "Table_"; + + private static int NUM_TABLES = 10; + + private static byte[] FAMILY = Bytes.toBytes("family"); + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.getConfiguration().setClass(HConstants.MASTER_IMPL, HMasterForTest.class, + HMaster.class); + setUpTestBeforeClass(); + for (int i = 0; i < NUM_TABLES; i++) { + TEST_UTIL.createTable(TableName.valueOf(TABLE_NAME_PREFIX + i), FAMILY); + } + } + + @AfterClass + public static void tearDown() throws Exception { + tearDownAfterClass(); + } + + private static CountDownLatch RESUME = new CountDownLatch(1); + + public static final class HMasterForTest extends HMaster { + + public HMasterForTest(Configuration conf) throws IOException, KeeperException { + super(conf); + } + + @Override + public TableDescriptors getTableDescriptors() { + if (RESUME != null) { + for (StackTraceElement element : Thread.currentThread().getStackTrace()) { + if (element.getClassName().contains("RSGroupInfoManagerImpl")) { + try { + RESUME.await(); + } catch (InterruptedException e) { + } + RESUME = null; + break; + } + } + } + return super.getTableDescriptors(); + } + } + + @Test + public void testMigrate() throws IOException, InterruptedException { + String groupName = name.getMethodName(); + addGroup(groupName, TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().size() - 1); + RSGroupInfo rsGroupInfo = rsGroupAdmin.getRSGroupInfo(groupName); + assertTrue(rsGroupInfo.getTables().isEmpty()); + for (int i = 0; i < NUM_TABLES; i++) { + rsGroupInfo.addTable(TableName.valueOf(TABLE_NAME_PREFIX + i)); + } + try (Table table = TEST_UTIL.getConnection().getTable(RSGROUP_TABLE_NAME)) { + RSGroupProtos.RSGroupInfo proto = ProtobufUtil.toProtoGroupInfo(rsGroupInfo); + Put p = new Put(Bytes.toBytes(rsGroupInfo.getName())); + p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray()); + table.put(p); + } + TEST_UTIL.getMiniHBaseCluster().stopMaster(0).join(); + RESUME = new CountDownLatch(1); + TEST_UTIL.getMiniHBaseCluster().startMaster(); + TEST_UTIL.invalidateConnection(); + RS_GROUP_ADMIN_CLIENT = new RSGroupAdminClient(TEST_UTIL.getConnection()); + + // wait until we can get the rs group info for a table + TEST_UTIL.waitFor(30000, () -> { + try { + rsGroupAdmin.getRSGroupInfoOfTable(TableName.valueOf(TABLE_NAME_PREFIX + 0)); + return true; + } catch (IOException e) { + return false; + } + }); + // confirm that before migrating, we could still get the correct rs group for a table. + for (int i = 0; i < NUM_TABLES; i++) { + RSGroupInfo info = + rsGroupAdmin.getRSGroupInfoOfTable(TableName.valueOf(TABLE_NAME_PREFIX + i)); + assertEquals(rsGroupInfo.getName(), info.getName()); + assertEquals(NUM_TABLES, info.getTables().size()); + } + RESUME.countDown(); + TEST_UTIL.waitFor(60000, () -> { + for (int i = 0; i < NUM_TABLES; i++) { + TableDescriptor td; + try { + td = TEST_UTIL.getAdmin().getDescriptor(TableName.valueOf(TABLE_NAME_PREFIX + i)); + } catch (IOException e) { + return false; + } + if (!rsGroupInfo.getName().equals(td.getRegionServerGroup().orElse(null))) { + return false; + } + } + return true; + }); + // make sure that we persist the result to hbase, where we delete all the tables in the rs + // group. + TEST_UTIL.waitFor(30000, () -> { + try (Table table = TEST_UTIL.getConnection().getTable(RSGROUP_TABLE_NAME)) { + Result result = table.get(new Get(Bytes.toBytes(rsGroupInfo.getName()))); + RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo + .parseFrom(result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES)); + RSGroupInfo gi = ProtobufUtil.toGroupInfo(proto); + return gi.getTables().isEmpty(); + } + }); + // make sure that the migrate thread has quit. + TEST_UTIL.waitFor(30000, () -> Thread.getAllStackTraces().keySet().stream() + .noneMatch(t -> t.getName().equals(RSGroupInfoManagerImpl.MIGRATE_THREAD_NAME))); + // make sure we could still get the correct rs group info after migration + for (int i = 0; i < NUM_TABLES; i++) { + RSGroupInfo info = + rsGroupAdmin.getRSGroupInfoOfTable(TableName.valueOf(TABLE_NAME_PREFIX + i)); + assertEquals(rsGroupInfo.getName(), info.getName()); + assertEquals(NUM_TABLES, info.getTables().size()); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java index b91cd5e6b16b..b9885be3c078 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java @@ -71,7 +71,7 @@ public abstract class TestRSGroupsBase { protected final static Random rand = new Random(); //shared, cluster type specific - protected static HBaseTestingUtility TEST_UTIL; + protected static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); protected static Admin admin; protected static HBaseCluster cluster; protected static RSGroupAdminClient rsGroupAdmin; @@ -90,7 +90,6 @@ public abstract class TestRSGroupsBase { protected TableName tableName; public static void setUpTestBeforeClass() throws Exception { - TEST_UTIL = new HBaseTestingUtility(); TEST_UTIL.getConfiguration().setFloat( "hbase.master.balancer.stochastic.tableSkewCost", 6000); TEST_UTIL.getConfiguration().set(