diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index ad6969ba9abf..12843ddd8494 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -131,6 +131,7 @@ import org.apache.hadoop.hbase.master.http.MasterStatusServlet; import org.apache.hadoop.hbase.master.janitor.CatalogJanitor; import org.apache.hadoop.hbase.master.locking.LockManager; +import org.apache.hadoop.hbase.master.migrate.RollingUpgradeChore; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager; import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure; @@ -376,6 +377,7 @@ public class HMaster extends HBaseServerBase implements Maste private ReplicationBarrierCleaner replicationBarrierCleaner; private MobFileCleanerChore mobFileCleanerChore; private MobFileCompactionChore mobFileCompactionChore; + private RollingUpgradeChore rollingUpgradeChore; // used to synchronize the mobCompactionStates private final IdLock mobCompactionLock = new IdLock(); // save the information of mob compactions in tables. @@ -1222,6 +1224,9 @@ private void finishActiveMasterInitialization(MonitoredTask status) throws IOExc LOG.debug("Balancer post startup initialization complete, took " + ( (EnvironmentEdgeManager.currentTime() - start) / 1000) + " seconds"); } + + this.rollingUpgradeChore = new RollingUpgradeChore(this); + getChoreService().scheduleChore(rollingUpgradeChore); } private void createMissingCFsInMetaDuringUpgrade( @@ -1713,6 +1718,7 @@ protected void stopChores() { shutdownChore(snapshotCleanerChore); shutdownChore(hbckChore); shutdownChore(regionsRecoveryChore); + shutdownChore(rollingUpgradeChore); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/migrate/RollingUpgradeChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/migrate/RollingUpgradeChore.java new file mode 100644 index 000000000000..3896b41f6625 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/migrate/RollingUpgradeChore.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.migrate; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.TableDescriptors; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.regionserver.storefiletracker.MigrateStoreFileTrackerProcedure; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * To avoid too many migrating/upgrade threads to be submitted at the time during master + * initialization, RollingUpgradeChore handles all rolling-upgrade tasks. + * */ +@InterfaceAudience.Private +public class RollingUpgradeChore extends ScheduledChore { + + static final String ROLLING_UPGRADE_CHORE_PERIOD_SECONDS_KEY = + "hbase.master.rolling.upgrade.chore.period.secs"; + static final int DFAULT_ROLLING_UPGRADE_CHORE_PERIOD_SECONDS = 10; // 10 seconds by default + + static final String ROLLING_UPGRADE_CHORE_DELAY_SECONDS_KEY = + "hbase.master.rolling.upgrade.chore.delay.secs"; + static final long DEFAULT_ROLLING_UPGRADE_CHORE_DELAY_SECONDS = 30; // 30 seconds + + static final int CONCURRENT_PROCEDURES_COUNT = 5; + + private final static Logger LOG = LoggerFactory.getLogger(RollingUpgradeChore.class); + ProcedureExecutor procedureExecutor; + private TableDescriptors tableDescriptors; + private List processingProcs = new ArrayList<>(); + + public RollingUpgradeChore(MasterServices masterServices) { + this(masterServices.getConfiguration(), masterServices.getMasterProcedureExecutor(), + masterServices.getTableDescriptors(), masterServices); + } + + private RollingUpgradeChore(Configuration conf, + ProcedureExecutor procedureExecutor, TableDescriptors tableDescriptors, + Stoppable stopper) { + super(RollingUpgradeChore.class.getSimpleName(), stopper, conf + .getInt(ROLLING_UPGRADE_CHORE_PERIOD_SECONDS_KEY, + DFAULT_ROLLING_UPGRADE_CHORE_PERIOD_SECONDS), conf + .getLong(ROLLING_UPGRADE_CHORE_DELAY_SECONDS_KEY, + DEFAULT_ROLLING_UPGRADE_CHORE_DELAY_SECONDS), + TimeUnit.SECONDS); + this.procedureExecutor = procedureExecutor; + this.tableDescriptors = tableDescriptors; + } + + @Override + protected void chore() { + if (isCompletelyMigrateSFT(CONCURRENT_PROCEDURES_COUNT)) { + LOG.info("All Rolling-Upgrade tasks are complete, shutdown RollingUpgradeChore!"); + shutdown(); + } + } + + private boolean isCompletelyMigrateSFT(int concurrentCount){ + Iterator iter = processingProcs.iterator(); + while(iter.hasNext()){ + MigrateStoreFileTrackerProcedure proc = iter.next(); + if(procedureExecutor.isFinished(proc.getProcId())){ + iter.remove(); + } + } + // No new migration procedures will be submitted until + // all procedures executed last time are completed. + if (!processingProcs.isEmpty()) { + return false; + } + + Map migrateSFTTables; + try { + migrateSFTTables = tableDescriptors.getAll().entrySet().stream().filter(entry -> { + TableDescriptor td = entry.getValue(); + return StringUtils.isEmpty(td.getValue(StoreFileTrackerFactory.TRACKER_IMPL)); + }).limit(concurrentCount).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); + } catch (IOException e) { + LOG.warn("Failed to migrate StoreFileTracker", e); + return false; + } + + if (migrateSFTTables.isEmpty()) { + LOG.info("There is no table to migrate StoreFileTracker!"); + return true; + } + + for (Map.Entry entry : migrateSFTTables.entrySet()) { + TableDescriptor tableDescriptor = entry.getValue(); + MigrateStoreFileTrackerProcedure proc = + new MigrateStoreFileTrackerProcedure(procedureExecutor.getEnvironment(), tableDescriptor); + procedureExecutor.submitProcedure(proc); + processingProcs.add(proc); + } + return false; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrateStoreFileTrackerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrateStoreFileTrackerProcedure.java new file mode 100644 index 000000000000..7cf3d1e8b5ac --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrateStoreFileTrackerProcedure.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.storefiletracker; + +import java.util.Optional; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.ModifyTableDescriptorProcedure; +import org.apache.hadoop.hbase.procedure2.util.StringUtils; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Procedure for migrating StoreFileTracker information to table descriptor. + */ +@InterfaceAudience.Private +public class MigrateStoreFileTrackerProcedure extends ModifyTableDescriptorProcedure { + + public MigrateStoreFileTrackerProcedure(){} + + public MigrateStoreFileTrackerProcedure(MasterProcedureEnv env, TableDescriptor unmodified) { + super(env, unmodified); + } + + @Override + protected Optional modify(MasterProcedureEnv env, TableDescriptor current) { + if (StringUtils.isEmpty(current.getValue(StoreFileTrackerFactory.TRACKER_IMPL))) { + TableDescriptor td = + StoreFileTrackerFactory.updateWithTrackerConfigs(env.getMasterConfiguration(), current); + return Optional.of(td); + } + return Optional.empty(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/migrate/TestMigrateStoreFileTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/migrate/TestMigrateStoreFileTracker.java new file mode 100644 index 000000000000..33325de9ca7d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/migrate/TestMigrateStoreFileTracker.java @@ -0,0 +1,107 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.migrate; + +import java.io.IOException; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.TableDescriptors; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestMigrateStoreFileTracker { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMigrateStoreFileTracker.class); + private final static String[] tables = new String[] { "t1", "t2", "t3", "t4", "t5", "t6" }; + private final static String famStr = "f1"; + private final static byte[] fam = Bytes.toBytes(famStr); + + private HBaseTestingUtil HTU; + private Configuration conf; + private TableDescriptor tableDescriptor; + + @Before + public void setUp() throws Exception { + conf = HBaseConfiguration.create(); + //Speed up the launch of RollingUpgradeChore + conf.setInt(RollingUpgradeChore.ROLLING_UPGRADE_CHORE_PERIOD_SECONDS_KEY, 1); + conf.setLong(RollingUpgradeChore.ROLLING_UPGRADE_CHORE_DELAY_SECONDS_KEY, 1); + HTU = new HBaseTestingUtil(conf); + HTU.startMiniCluster(); + } + + @After + public void tearDown() throws Exception { + HTU.shutdownMiniCluster(); + } + + @Test + public void testMigrateStoreFileTracker() throws IOException, InterruptedException { + //create tables to test + for (int i = 0; i < tables.length; i++) { + tableDescriptor = HTU.createModifyableTableDescriptor(tables[i]) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(fam).build()).build(); + HTU.createTable(tableDescriptor, null); + } + TableDescriptors tableDescriptors = HTU.getMiniHBaseCluster().getMaster().getTableDescriptors(); + for (int i = 0; i < tables.length; i++) { + TableDescriptor tdAfterCreated = tableDescriptors.get(TableName.valueOf(tables[i])); + //make sure that TRACKER_IMPL was set by default after tables have been created. + Assert.assertNotNull(tdAfterCreated.getValue(StoreFileTrackerFactory.TRACKER_IMPL)); + //Remove StoreFileTracker impl from tableDescriptor + TableDescriptor tdRemovedSFT = TableDescriptorBuilder.newBuilder(tdAfterCreated) + .removeValue(StoreFileTrackerFactory.TRACKER_IMPL).build(); + tableDescriptors.update(tdRemovedSFT); + } + HTU.getMiniHBaseCluster().stopMaster(0).join(); + HTU.getMiniHBaseCluster().startMaster(); + HTU.getMiniHBaseCluster().waitForActiveAndReadyMaster(30000); + //wait until all tables have been migrated + TableDescriptors tds = HTU.getMiniHBaseCluster().getMaster().getTableDescriptors(); + HTU.waitFor(30000, () -> { + try { + for (int i = 0; i < tables.length; i++) { + TableDescriptor td = tds.get(TableName.valueOf(tables[i])); + if (StringUtils.isEmpty(td.getValue(StoreFileTrackerFactory.TRACKER_IMPL))) { + return false; + } + } + return true; + } catch (IOException e) { + return false; + } + }); + } +}