Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-22819 Automatically migrate the rs group config for table after… #498

Merged
merged 1 commit into from
Aug 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
package org.apache.hadoop.hbase.rsgroup;

import java.util.Collection;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.net.Address;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -104,7 +102,7 @@ public boolean containsServer(Address hostPort) {
/**
* Get list of servers.
*/
public Set<Address> getServers() {
public SortedSet<Address> getServers() {
return servers;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class RSGroupAdminServer implements RSGroupAdmin {
"one server in 'default' RSGroup.";

private MasterServices master;
private final RSGroupInfoManager rsGroupInfoManager;
final RSGroupInfoManager rsGroupInfoManager;

/** Define the config key of retries threshold when movements failed */
//made package private for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void getRSGroupInfoOfTable(RpcController controller, GetRSGroupInfoOfTabl
}
checkPermission("getRSGroupInfoOfTable");
Optional<RSGroupInfo> optGroup =
RSGroupUtil.getRSGroupInfo(master, groupAdminServer, tableName);
RSGroupUtil.getRSGroupInfo(master, groupAdminServer.rsGroupInfoManager, tableName);
if (optGroup.isPresent()) {
builder.setRSGroupInfo(ProtobufUtil.toProtoGroupInfo(fillTables(optGroup.get())));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.net.Address;
import org.apache.yetus.audience.InterfaceAudience;

Expand Down Expand Up @@ -67,11 +68,6 @@ Set<Address> moveServers(Set<Address> servers, String srcGroup, String dstGroup)
*/
List<RSGroupInfo> listRSGroups() throws IOException;

/**
* Refresh/reload the group information from the persistent store
*/
void refresh() throws IOException;

/**
* Whether the manager is able to fully return group metadata
* @return whether the manager is in online mode
Expand All @@ -83,4 +79,12 @@ Set<Address> moveServers(Set<Address> servers, String srcGroup, String dstGroup)
* @param servers set of servers to remove
*/
void removeServers(Set<Address> servers) throws IOException;

/**
* Get {@code RSGroupInfo} for the given table.
* @deprecated Since 3.0.0, will be removed in 4.0.0. Only for compatibility, where we upgrade
* from a version that stores table names for a rs group in the {@code RSGroupInfo}.
*/
@Deprecated
RSGroupInfo getRSGroupForTable(TableName tableName) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -34,6 +35,7 @@
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.AsyncTable;
Expand Down Expand Up @@ -79,6 +81,7 @@
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
Expand Down Expand Up @@ -123,6 +126,9 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
@VisibleForTesting
static final byte[] META_QUALIFIER_BYTES = Bytes.toBytes("i");

@VisibleForTesting
static final String MIGRATE_THREAD_NAME = "Migrate-RSGroup-Tables";

private static final byte[] ROW_KEY = { 0 };

/** Table descriptor for <code>hbase:rsgroup</code> catalog table */
Expand All @@ -143,7 +149,30 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {

// There two Maps are immutable and wholesale replaced on each modification
// so are safe to access concurrently. See class comment.
private volatile Map<String, RSGroupInfo> rsGroupMap = Collections.emptyMap();
private static final class RSGroupInfoHolder {
final ImmutableMap<String, RSGroupInfo> groupName2Group;
final ImmutableMap<TableName, RSGroupInfo> tableName2Group;

RSGroupInfoHolder() {
this(Collections.emptyMap());
}

RSGroupInfoHolder(Map<String, RSGroupInfo> rsGroupMap) {
ImmutableMap.Builder<String, RSGroupInfo> group2Name2GroupBuilder = ImmutableMap.builder();
ImmutableMap.Builder<TableName, RSGroupInfo> tableName2GroupBuilder = ImmutableMap.builder();
rsGroupMap.forEach((groupName, rsGroupInfo) -> {
group2Name2GroupBuilder.put(groupName, rsGroupInfo);
if (!groupName.equals(RSGroupInfo.DEFAULT_GROUP)) {
rsGroupInfo.getTables()
.forEach(tableName -> tableName2GroupBuilder.put(tableName, rsGroupInfo));
}
});
this.groupName2Group = group2Name2GroupBuilder.build();
this.tableName2Group = tableName2GroupBuilder.build();
}
}

private volatile RSGroupInfoHolder holder = new RSGroupInfoHolder();

private final MasterServices masterServices;
private final AsyncClusterConnection conn;
Expand All @@ -164,12 +193,13 @@ private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException


private synchronized void init() throws IOException {
refresh();
refresh(false);
serverEventsListenerThread.start();
masterServices.getServerManager().registerListener(serverEventsListenerThread);
failedOpenUpdaterThread = new FailedOpenUpdaterThread(masterServices.getConfiguration());
failedOpenUpdaterThread.start();
masterServices.getServerManager().registerListener(failedOpenUpdaterThread);
migrate();
}

static RSGroupInfoManager getInstance(MasterServices master) throws IOException {
Expand All @@ -186,6 +216,7 @@ public void start() {
@Override
public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException {
checkGroupName(rsGroupInfo.getName());
Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group;
if (rsGroupMap.get(rsGroupInfo.getName()) != null ||
rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
throw new DoNotRetryIOException("Group already exists: " + rsGroupInfo.getName());
Expand Down Expand Up @@ -242,7 +273,7 @@ public synchronized Set<Address> moveServers(Set<Address> servers, String srcGro
}
dst.addServer(el);
}
Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(holder.groupName2Group);
newGroupMap.put(src.getName(), src);
newGroupMap.put(dst.getName(), dst);
flushConfig(newGroupMap);
Expand All @@ -251,7 +282,7 @@ public synchronized Set<Address> moveServers(Set<Address> servers, String srcGro

@Override
public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException {
for (RSGroupInfo info : rsGroupMap.values()) {
for (RSGroupInfo info : holder.groupName2Group.values()) {
if (info.containsServer(serverHostPort)) {
return info;
}
Expand All @@ -261,11 +292,12 @@ public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException

@Override
public RSGroupInfo getRSGroup(String groupName) {
return rsGroupMap.get(groupName);
return holder.groupName2Group.get(groupName);
}

@Override
public synchronized void removeRSGroup(String groupName) throws IOException {
Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group;
if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) {
throw new DoNotRetryIOException(
"Group " + groupName + " does not exist or is a reserved " + "group");
Expand All @@ -277,7 +309,7 @@ public synchronized void removeRSGroup(String groupName) throws IOException {

@Override
public List<RSGroupInfo> listRSGroups() {
return Lists.newArrayList(rsGroupMap.values());
return Lists.newArrayList(holder.groupName2Group.values());
}

@Override
Expand Down Expand Up @@ -305,7 +337,7 @@ public synchronized void removeServers(Set<Address> servers) throws IOException
}

if (rsGroupInfos.size() > 0) {
Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(holder.groupName2Group);
newGroupMap.putAll(rsGroupInfos);
flushConfig(newGroupMap);
}
Expand Down Expand Up @@ -356,9 +388,90 @@ private List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException {
return RSGroupInfoList;
}

@Override
public void refresh() throws IOException {
refresh(false);
private void migrate(Collection<RSGroupInfo> groupList) {
TableDescriptors tds = masterServices.getTableDescriptors();
for (RSGroupInfo groupInfo : groupList) {
if (groupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
continue;
}
SortedSet<TableName> failedTables = new TreeSet<>();
for (TableName tableName : groupInfo.getTables()) {
LOG.debug("Migrating {} in group {}", tableName, groupInfo.getName());
TableDescriptor oldTd;
try {
oldTd = tds.get(tableName);
} catch (IOException e) {
LOG.warn("Failed to migrate {} in group {}", tableName, groupInfo.getName(), e);
failedTables.add(tableName);
continue;
}
if (oldTd == null) {
continue;
}
if (oldTd.getRegionServerGroup().isPresent()) {
// either we have already migrated it or that user has set the rs group using the new
// code which will set the group directly on table descriptor, skip.
LOG.debug("Skip migrating {} since it is already in group {}", tableName,
oldTd.getRegionServerGroup().get());
continue;
}
TableDescriptor newTd = TableDescriptorBuilder.newBuilder(oldTd)
.setRegionServerGroup(groupInfo.getName()).build();
// This is a bit tricky. Since we know that the region server group config in
// TableDescriptor will only be used at master side, it is fine to just update the table
// descriptor on file system and also the cache, without reopening all the regions. This
// will be much faster than the normal modifyTable. And when upgrading, we will update
// master first and then region server, so after all the region servers has been reopened,
// the new TableDescriptor will be loaded.
try {
tds.add(newTd);
} catch (IOException e) {
LOG.warn("Failed to migrate {} in group {}", tableName, groupInfo.getName(), e);
failedTables.add(tableName);
continue;
}
}
LOG.debug("Done migrating {}, failed tables {}", groupInfo.getName(), failedTables);
synchronized (RSGroupInfoManagerImpl.this) {
Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group;
RSGroupInfo currentInfo = rsGroupMap.get(groupInfo.getName());
if (currentInfo != null) {
RSGroupInfo newInfo =
new RSGroupInfo(currentInfo.getName(), currentInfo.getServers(), failedTables);
Map<String, RSGroupInfo> newGroupMap = new HashMap<>(rsGroupMap);
newGroupMap.put(groupInfo.getName(), newInfo);
try {
flushConfig(newGroupMap);
} catch (IOException e) {
LOG.warn("Failed to persist rs group {}", newInfo.getName(), e);
}
}
}
}
}

// Migrate the table rs group info from RSGroupInfo into the table descriptor
// Notice that we do not want to block the initialize so this will be done in background, and
// during the migrating, the rs group info maybe incomplete and cause region to be misplaced.
private void migrate() {
Thread migrateThread = new Thread(MIGRATE_THREAD_NAME) {

@Override
public void run() {
LOG.info("Start migrating table rs group config");
while (!masterServices.isStopped()) {
Collection<RSGroupInfo> groups = holder.groupName2Group.values();
boolean hasTables = groups.stream().anyMatch(r -> !r.getTables().isEmpty());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When restart master again, how to avoid migrate again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see the above method, we have lots of checks, and once we finish migrate one group, we will update the RSGroupInfo to remove all the tables in it.

if (!hasTables) {
break;
}
migrate(groups);
}
LOG.info("Done migrating table rs group info");
}
};
migrateThread.setDaemon(true);
migrateThread.start();
}

/**
Expand Down Expand Up @@ -388,7 +501,7 @@ private synchronized void refresh(boolean forceOnline) throws IOException {
newGroupMap.put(group.getName(), group);
}
resetRSGroupMap(newGroupMap);
updateCacheOfRSGroups(rsGroupMap.keySet());
updateCacheOfRSGroups(newGroupMap.keySet());
}

private void flushConfigTable(Map<String, RSGroupInfo> groupMap) throws IOException {
Expand Down Expand Up @@ -418,20 +531,20 @@ private void flushConfigTable(Map<String, RSGroupInfo> groupMap) throws IOExcept
}

private synchronized void flushConfig() throws IOException {
flushConfig(this.rsGroupMap);
flushConfig(holder.groupName2Group);
}

private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) throws IOException {
// For offline mode persistence is still unavailable
// We're refreshing in-memory state but only for servers in default group
if (!isOnline()) {
if (newGroupMap == this.rsGroupMap) {
if (newGroupMap == holder.groupName2Group) {
// When newGroupMap is this.rsGroupMap itself,
// do not need to check default group and other groups as followed
return;
}

Map<String, RSGroupInfo> oldGroupMap = Maps.newHashMap(rsGroupMap);
Map<String, RSGroupInfo> oldGroupMap = Maps.newHashMap(holder.groupName2Group);
RSGroupInfo oldDefaultGroup = oldGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
if (!oldGroupMap.equals(newGroupMap) /* compare both tables and servers in other groups */ ||
Expand All @@ -445,7 +558,7 @@ private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) thro

// Refresh rsGroupMap
// according to the inputted newGroupMap (an updated copy of rsGroupMap)
rsGroupMap = newGroupMap;
this.holder = new RSGroupInfoHolder(newGroupMap);

// Do not need to update tableMap
// because only the update on servers in default group is allowed above,
Expand Down Expand Up @@ -502,8 +615,7 @@ private void saveRSGroupMapToZK(Map<String, RSGroupInfo> newGroupMap) throws IOE
* Make changes visible. Caller must be synchronized on 'this'.
*/
private void resetRSGroupMap(Map<String, RSGroupInfo> newRSGroupMap) {
// Make maps Immutable.
this.rsGroupMap = Collections.unmodifiableMap(newRSGroupMap);
this.holder = new RSGroupInfoHolder(newRSGroupMap);
}

/**
Expand Down Expand Up @@ -556,6 +668,7 @@ private SortedSet<Address> getDefaultServers() throws IOException {
// Called by ServerEventsListenerThread. Synchronize on this because redoing
// the rsGroupMap then writing it out.
private synchronized void updateDefaultServers(SortedSet<Address> servers) {
Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group;
RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP);
RSGroupInfo newInfo = new RSGroupInfo(info.getName(), servers);
HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
Expand Down Expand Up @@ -734,6 +847,8 @@ private boolean waitForGroupTableOnline() {
online = true;
// flush any inconsistencies between ZK and HTable
RSGroupInfoManagerImpl.this.flushConfig();
// migrate after we are online.
migrate();
return true;
} catch (Exception e) {
LOG.warn("Failed to perform check", e);
Expand Down Expand Up @@ -812,4 +927,10 @@ private void checkGroupName(String groupName) throws ConstraintException {
throw new ConstraintException("RSGroup name should only contain alphanumeric characters");
}
}


@Override
public RSGroupInfo getRSGroupForTable(TableName tableName) throws IOException {
return holder.tableName2Group.get(tableName);
}
}
Loading