Skip to content

Commit

Permalink
modification
Browse files Browse the repository at this point in the history
  • Loading branch information
GeorryHuang committed Oct 11, 2021
1 parent 2320469 commit 2addcdc
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
import org.apache.hadoop.hbase.master.migrate.RollingUpgradeChore;
import static org.apache.hadoop.hbase.util.DNS.MASTER_HOSTNAME_KEY;

import com.google.errorprone.annotations.RestrictedApi;
Expand Down Expand Up @@ -193,7 +194,6 @@
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotNotifier;
import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotNotifierFactory;
import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.replication.ReplicationException;
Expand Down Expand Up @@ -374,6 +374,7 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
private ReplicationBarrierCleaner replicationBarrierCleaner;
private MobFileCleanerChore mobFileCleanerChore;
private MobFileCompactionChore mobFileCompactionChore;
private RollingUpgradeChore rollingUpgradeChore;
// used to synchronize the mobCompactionStates
private final IdLock mobCompactionLock = new IdLock();
// save the information of mob compactions in tables.
Expand Down Expand Up @@ -1220,31 +1221,12 @@ private void finishActiveMasterInitialization(MonitoredTask status) throws IOExc
(EnvironmentEdgeManager.currentTime() - start) / 1000) + " seconds");
}

/*
A background thread to update all TableDescriptors that do not contain
StoreFileTracker implementation configuration. see HBASE-26263.
*/
Thread STFChecking = new Thread(() -> {
try {
tableDescriptors.getAll().forEach((tableName, htd) -> {
if (StringUtils.isEmpty(htd.getValue(StoreFileTrackerFactory.TRACKER_IMPL))) {
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(htd);
StoreFileTrackerFactory.persistTrackerConfig(this.conf, builder);
try {
tableDescriptors.update(builder.build());
LOG.info("Persist StoreFileTracker configurations to TableDescriptor for table {}",
tableName);
} catch (IOException ioe) {
LOG.warn("Failed to persist StoreFileTracker to table {}", tableName, ioe);
}
}
});
} catch (IOException e) {
LOG.error("Failed to run StoreFileTracker checking thread for existing tables!", e);
}
}, "StoreFileTrackerChecking");
STFChecking.setDaemon(true);
STFChecking.start();
boolean isRollingUpgradeChoreEnabled = conf.getBoolean(RollingUpgradeChore.ROLLING_UPGRADE_CHORE_ENABLED_KEY,
RollingUpgradeChore.DEFAULT_ROLLING_UPGRADE_CHORE_ENABLED);
if(isRollingUpgradeChoreEnabled){
this.rollingUpgradeChore = new RollingUpgradeChore(this);
getChoreService().scheduleChore(rollingUpgradeChore);
}
}

private void createMissingCFsInMetaDuringUpgrade(
Expand Down Expand Up @@ -1729,6 +1711,7 @@ protected void stopChores() {
shutdownChore(snapshotCleanerChore);
shutdownChore(hbckChore);
shutdownChore(regionsRecoveryChore);
shutdownChore(rollingUpgradeChore);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.master.migrate;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.SilentModifyTableProcedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* To avoid too many migrating/upgrade threads to be submitted at the time during master
* initialization, RollingUpgradeChore handler all rolling-upgrade tasks.
* */
@InterfaceAudience.Private
public class RollingUpgradeChore extends ScheduledChore {

public static final String ROLLING_UPGRADE_CHORE_ENABLED_KEY =
"hbase.master.rolling.upgrade.chore.enabled";
public static final boolean DEFAULT_ROLLING_UPGRADE_CHORE_ENABLED = true;

static final String ROLLING_UPGRADE_CHORE_TIMEUNIT_KEY =
"hbase.master.rolling.upgrade.chore.timeunit";
static final String DEFAULT_ROLLING_UPGRADE_CHORE_TIMEUNIT_KEY = TimeUnit.MILLISECONDS.name();

static final String ROLLING_UPGRADE_CHORE_PERIOD_KEY = "hbase.master.rolling.upgrade.chore.period";
static final int DFAULT_ROLLING_UPGRADE_CHORE_PERIOD = 1000 * 60 * 5; // 5 minutes in millis

static final String ROLLING_UPGRADE_CHORE_DELAY_KEY = "hbase.master.mob.cleaner.period";
static final long DEFAULT_ROLLING_UPGRADE_CHORE_DELAY = 1000L * 60 * 10; // 10 minutes in millis

private final static Logger LOG = LoggerFactory.getLogger(RollingUpgradeChore.class);
private Map<String, Long> tableToProcId;
ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
private Configuration conf;
private TableDescriptors tableDescriptors;

public RollingUpgradeChore(MasterServices masterServices) {
this(masterServices.getConfiguration(), masterServices.getMasterProcedureExecutor(), masterServices.getTableDescriptors(), masterServices);
}

private RollingUpgradeChore(Configuration conf, ProcedureExecutor<MasterProcedureEnv> procedureExecutor, TableDescriptors tableDescriptors, Stoppable stopper){
super(RollingUpgradeChore.class.getSimpleName(), stopper,
conf.getInt(ROLLING_UPGRADE_CHORE_PERIOD_KEY, DFAULT_ROLLING_UPGRADE_CHORE_PERIOD),
conf.getLong(ROLLING_UPGRADE_CHORE_DELAY_KEY, DEFAULT_ROLLING_UPGRADE_CHORE_DELAY),
TimeUnit.valueOf(conf.get(ROLLING_UPGRADE_CHORE_TIMEUNIT_KEY, DEFAULT_ROLLING_UPGRADE_CHORE_TIMEUNIT_KEY)));
this.conf = conf;
this.procedureExecutor = procedureExecutor;
this.tableDescriptors = tableDescriptors;
this.tableToProcId = new HashMap<>();
}

@Override
protected void chore() {
Map<String, TableDescriptor> migrateTables;
try {
migrateTables = tableDescriptors.getAll().entrySet().stream().filter(entry -> {
TableDescriptor td = entry.getValue();
return StringUtils.isEmpty(td.getValue(StoreFileTrackerFactory.TRACKER_IMPL));
}).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
} catch (IOException e) {
LOG.warn("Failed to migrate StoreFileTracker", e);
return;
}

if (migrateTables.isEmpty()) {
shutdown();
return;
}

for (Map.Entry<String, TableDescriptor> entry : migrateTables.entrySet()) {
String tableName = entry.getKey();
TableDescriptor tableDescriptor = entry.getValue();
Long procId = tableToProcId.get(tableName);
if (procId != null) {
if (procedureExecutor.isFinished(procId)) {
tableToProcId.remove(tableName);
}
continue;
}
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableDescriptor);
StoreFileTrackerFactory.persistTrackerConfig(conf, builder);
SilentModifyTableProcedure procedure;
try {
procedure = new SilentModifyTableProcedure(
procedureExecutor.getEnvironment(), builder.build());
} catch (HBaseIOException e) {
LOG.warn("Failed to build a SilentModifyTableProcedure for {}", tableName);
continue;
}
Long newProcId = procedureExecutor.submitProcedure(procedure);
tableToProcId.put(tableName, newProcId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ public class ModifyTableProcedure
extends AbstractStateMachineTableProcedure<ModifyTableState> {
private static final Logger LOG = LoggerFactory.getLogger(ModifyTableProcedure.class);

private TableDescriptor unmodifiedTableDescriptor = null;
private TableDescriptor modifiedTableDescriptor;
private boolean deleteColumnFamilyInModify;
private boolean shouldCheckDescriptor;
protected TableDescriptor unmodifiedTableDescriptor = null;
protected TableDescriptor modifiedTableDescriptor;
protected boolean deleteColumnFamilyInModify;
protected boolean shouldCheckDescriptor;
/**
* List of column families that cannot be deleted from the hbase:meta table.
* They are critical to cluster operation. This is a bit of an odd place to
Expand Down Expand Up @@ -285,7 +285,7 @@ public TableOperationType getTableOperationType() {
/**
* Check conditions before any real action of modifying a table.
*/
private void prepareModify(final MasterProcedureEnv env) throws IOException {
protected void prepareModify(final MasterProcedureEnv env) throws IOException {
// Checks whether the table exists
if (!env.getMasterServices().getTableDescriptors().exists(getTableName())) {
throw new TableNotFoundException(getTableName());
Expand Down Expand Up @@ -365,7 +365,7 @@ private void preModify(final MasterProcedureEnv env, final ModifyTableState stat
* Update descriptor
* @param env MasterProcedureEnv
**/
private void updateTableDescriptor(final MasterProcedureEnv env) throws IOException {
protected void updateTableDescriptor(final MasterProcedureEnv env) throws IOException {
env.getMasterServices().getTableDescriptors().update(modifiedTableDescriptor);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.master.procedure;

import java.io.IOException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ModifyTableState;

@InterfaceAudience.Private
/**
* A extension of ModifyTableProcedure. Only update TableDescriptor of table and not reopen any regions.
* After we introduced StoreFileTracker, update TableDescriptors directly become a dangerous action
* because while ModifyTableProcedure reopening regions, update table descriptor would probably cause
* different StoreFileTracker implementations online at the same time.
* */
public class SilentModifyTableProcedure
extends ModifyTableProcedure {
private static final Logger LOG = LoggerFactory.getLogger(SilentModifyTableProcedure.class);

public SilentModifyTableProcedure(final MasterProcedureEnv env, final TableDescriptor htd) throws HBaseIOException{
super(env, htd);
preflightChecks(env, null);
}

@Override
protected Flow executeFromState(final MasterProcedureEnv env, final ModifyTableState state)
throws InterruptedException {
LOG.trace("{} execute state={}", this, state);
try {
switch (state) {
case MODIFY_TABLE_PREPARE:
prepareModify(env);
setNextState(ModifyTableState.MODIFY_TABLE_UPDATE_TABLE_DESCRIPTOR);
break;
case MODIFY_TABLE_UPDATE_TABLE_DESCRIPTOR:
updateTableDescriptor(env);
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);
}
} catch (IOException e) {
if (isRollbackSupported(state)) {
setFailure("master-silent-modify-table", e);
} else {
LOG.warn("Retriable error trying to silent-modify table={} (in state={})", getTableName(), state,
e);
}
}
return Flow.HAS_MORE_STATE;
}

@Override
protected final void prepareModify(MasterProcedureEnv env) throws IOException {
super.prepareModify(env);
if (this.deleteColumnFamilyInModify) {
throw new DoNotRetryIOException("Table " + getTableName().toString()
+ " are using SilentModifyTableProcedure, which do not allow delete column family!");
}

final int oldReplicaCount = unmodifiedTableDescriptor.getRegionReplication();
final int newReplicaCount = modifiedTableDescriptor.getRegionReplication();
if (newReplicaCount != oldReplicaCount) {
throw new DoNotRetryIOException("Table " + getTableName().toString()
+ " are using SilentModifyTableProcedure, which do not allow change replica count!");
}
}

@Override
protected void rollbackState(final MasterProcedureEnv env, final ModifyTableState state)
throws IOException {
//Nothing to do.
}

@Override
protected boolean isRollbackSupported(final ModifyTableState state) {
return false;
}

@Override
protected ModifyTableState getInitialState() {
return ModifyTableState.MODIFY_TABLE_PREPARE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,13 @@
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
import org.apache.hadoop.hbase.master.procedure.SilentModifyTableProcedure;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -149,6 +152,9 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {

private static final byte[] ROW_KEY = { 0 };

//monitor which tables is under migrating procedure.
private Map<String, Long> tableToProcId = new HashMap<>();

/** Table descriptor for <code>hbase:rsgroup</code> catalog table */
private static final TableDescriptor RSGROUP_TABLE_DESC;
static {
Expand Down Expand Up @@ -499,6 +505,13 @@ private void migrate(Collection<RSGroupInfo> groupList) {
SortedSet<TableName> failedTables = new TreeSet<>();
for (TableName tableName : groupInfo.getTables()) {
LOG.debug("Migrating {} in group {}", tableName, groupInfo.getName());
Long procId = tableToProcId.get(tableName.toString());
if(procId != null){
if(masterServices.getMasterProcedureExecutor().isFinished(procId)){
tableToProcId.remove(tableName.toString());
}
continue;
}
TableDescriptor oldTd;
try {
oldTd = tds.get(tableName);
Expand All @@ -519,14 +532,15 @@ private void migrate(Collection<RSGroupInfo> groupList) {
}
TableDescriptor newTd = TableDescriptorBuilder.newBuilder(oldTd)
.setRegionServerGroup(groupInfo.getName()).build();
// This is a bit tricky. Since we know that the region server group config in
// TableDescriptor will only be used at master side, it is fine to just update the table
// descriptor on file system and also the cache, without reopening all the regions. This
// will be much faster than the normal modifyTable. And when upgrading, we will update
// master first and then region server, so after all the region servers has been reopened,
// the new TableDescriptor will be loaded.
// SilentModifyTableProcedure and ModifyTableProcedure are mutually exclusive.
// At the time ModifyTableProcedure has not finished reopening all region,
// we don’t want RSGroup migrating update table descriptor that would probably
// change StoreFileTracker implementation. That would be resulting in two kinds
// of StoreFileTracker implementation online, which lead to data loss
try {
tds.update(newTd);
ProcedureExecutor<MasterProcedureEnv> procedureExecutor = masterServices.getMasterProcedureExecutor();
Long newProcId = procedureExecutor.submitProcedure(new SilentModifyTableProcedure(procedureExecutor.getEnvironment(), newTd));
tableToProcId.put(tableName.toString(), newProcId);
} catch (IOException e) {
LOG.warn("Failed to migrate {} in group {}", tableName, groupInfo.getName(), e);
failedTables.add(tableName);
Expand Down

0 comments on commit 2addcdc

Please sign in to comment.