Skip to content

Commit

Permalink
HBASE-27514 Move some persistent states from zookeeper to master regi…
Browse files Browse the repository at this point in the history
…on (#4925)

Signed-off-by: Bryan Beaudreault <[email protected]>
Signed-off-by: Peter Somogyi <[email protected]>
  • Loading branch information
Apache9 authored Dec 22, 2022
1 parent 3b714a3 commit dcfde79
Show file tree
Hide file tree
Showing 25 changed files with 990 additions and 508 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,25 @@ public class ZNodePaths {
// znode used for log splitting work assignment
public final String splitLogZNode;
// znode containing the state of the load balancer
/**
* @deprecated Since 2.6.0, will be removed in 4.0.0. We use master local region to store this
* state.
*/
@Deprecated
public final String balancerZNode;
// znode containing the state of region normalizer
/**
* @deprecated Since 2.6.0, will be removed in 4.0.0. We use master local region to store this
* state.
*/
@Deprecated
public final String regionNormalizerZNode;
// znode containing the state of all switches, currently there are split and merge child node.
/**
* @deprecated Since 2.6.0, will be removed in 4.0.0. We use master local region to store this
* state.
*/
@Deprecated
public final String switchZNode;
// znode of indicating master maintenance mode
public final String masterMaintZNode;
Expand All @@ -86,7 +101,12 @@ public class ZNodePaths {
// znode containing queues of hfile references to be replicated
public final String hfileRefsZNode;
// znode containing the state of the snapshot auto-cleanup
final String snapshotCleanupZNode;
/**
* @deprecated Since 2.6.0, will be removed in 4.0.0. We use master local region to store this
* state.
*/
@Deprecated
public final String snapshotCleanupZNode;

public ZNodePaths(Configuration conf) {
baseZNode = conf.get(ZOOKEEPER_ZNODE_PARENT, DEFAULT_ZOOKEEPER_ZNODE_PARENT);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;

import java.io.IOException;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.master.region.MasterRegion;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;

/**
* Store a boolean state.
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UG_SYNC_SET_UNSYNC_GET",
justification = "the flag is volatile")
@InterfaceAudience.Private
public abstract class BooleanStateStore extends MasterStateStore {

private volatile boolean on;

protected BooleanStateStore(MasterRegion masterRegion, String stateName, ZKWatcher watcher,
String zkPath) throws IOException, KeeperException, DeserializationException {
super(masterRegion, stateName, watcher, zkPath);
byte[] state = getState();
this.on = state == null || parseFrom(state);
}

/**
* Returns true if the flag is on, otherwise false
*/
public boolean get() {
return on;
}

/**
* Set the flag on/off.
* @param on true if the flag should be on, false otherwise
* @throws IOException if the operation fails
*/
public synchronized void set(boolean on) throws IOException {
byte[] state = toByteArray(on);
setState(state);
this.on = on;
}

protected abstract byte[] toByteArray(boolean on);

protected abstract boolean parseFrom(byte[] bytes) throws DeserializationException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.exceptions.MasterStoppedException;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
Expand All @@ -129,6 +130,7 @@
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerStateStore;
import org.apache.hadoop.hbase.master.balancer.MaintenanceLoadBalancer;
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
Expand All @@ -145,6 +147,7 @@
import org.apache.hadoop.hbase.master.migrate.RollingUpgradeChore;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerStateStore;
import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
import org.apache.hadoop.hbase.master.procedure.DeleteNamespaceProcedure;
import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
Expand Down Expand Up @@ -174,6 +177,7 @@
import org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStateProcedure;
import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
import org.apache.hadoop.hbase.master.slowlog.SlowLogMasterService;
import org.apache.hadoop.hbase.master.snapshot.SnapshotCleanupStateStore;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator;
import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer;
Expand Down Expand Up @@ -246,11 +250,8 @@
import org.apache.hadoop.hbase.util.TableDescriptorChecker;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
import org.apache.hadoop.hbase.zookeeper.SnapshotCleanupTracker;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
Expand Down Expand Up @@ -306,17 +307,17 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
// Draining region server tracker
private DrainingServerTracker drainingServerTracker;
// Tracker for load balancer state
LoadBalancerTracker loadBalancerTracker;
LoadBalancerStateStore loadBalancerStateStore;
// Tracker for meta location, if any client ZK quorum specified
private MetaLocationSyncer metaLocationSyncer;
// Tracker for active master location, if any client ZK quorum specified
@InterfaceAudience.Private
MasterAddressSyncer masterAddressSyncer;
// Tracker for auto snapshot cleanup state
SnapshotCleanupTracker snapshotCleanupTracker;
SnapshotCleanupStateStore snapshotCleanupStateStore;

// Tracker for split and merge state
private SplitOrMergeTracker splitOrMergeTracker;
private SplitOrMergeStateStore splitOrMergeStateStore;

private ClusterSchemaService clusterSchemaService;

Expand Down Expand Up @@ -750,24 +751,22 @@ public MetricsMaster getMasterMetrics() {
* should have already been initialized along with {@link ServerManager}.
*/
private void initializeZKBasedSystemTrackers()
throws IOException, KeeperException, ReplicationException {
throws IOException, KeeperException, ReplicationException, DeserializationException {
if (maintenanceMode) {
// in maintenance mode, always use MaintenanceLoadBalancer.
conf.unset(LoadBalancer.HBASE_RSGROUP_LOADBALANCER_CLASS);
conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, MaintenanceLoadBalancer.class,
LoadBalancer.class);
}
this.balancer = new RSGroupBasedLoadBalancer();
this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
this.loadBalancerTracker.start();
this.loadBalancerStateStore = new LoadBalancerStateStore(masterRegion, zooKeeper);

this.regionNormalizerManager =
RegionNormalizerFactory.createNormalizerManager(conf, zooKeeper, this);
RegionNormalizerFactory.createNormalizerManager(conf, masterRegion, zooKeeper, this);
this.configurationManager.registerObserver(regionNormalizerManager);
this.regionNormalizerManager.start();

this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
this.splitOrMergeTracker.start();
this.splitOrMergeStateStore = new SplitOrMergeStateStore(masterRegion, zooKeeper, conf);

// This is for backwards compatible. We do not need the CP for rs group now but if user want to
// load it, we need to enable rs group.
Expand All @@ -787,8 +786,7 @@ private void initializeZKBasedSystemTrackers()
this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
this.drainingServerTracker.start();

this.snapshotCleanupTracker = new SnapshotCleanupTracker(zooKeeper, this);
this.snapshotCleanupTracker.start();
this.snapshotCleanupStateStore = new SnapshotCleanupStateStore(masterRegion, zooKeeper);

String clientQuorumServers = conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM);
boolean clientZkObserverMode = conf.getBoolean(HConstants.CLIENT_ZOOKEEPER_OBSERVER_MODE,
Expand Down Expand Up @@ -910,8 +908,8 @@ private void tryMigrateMetaLocationsFromZooKeeper() throws IOException, KeeperEx
* Notice that now we will not schedule a special procedure to make meta online(unless the first
* time where meta has not been created yet), we will rely on SCP to bring meta online.
*/
private void finishActiveMasterInitialization(MonitoredTask status)
throws IOException, InterruptedException, KeeperException, ReplicationException {
private void finishActiveMasterInitialization(MonitoredTask status) throws IOException,
InterruptedException, KeeperException, ReplicationException, DeserializationException {
/*
* We are active master now... go initialize components we need to run.
*/
Expand Down Expand Up @@ -1640,7 +1638,7 @@ conf, getMasterFileSystem().getFileSystem(), new Path(archiveDir, path),
new ReplicationBarrierCleaner(conf, this, getConnection(), replicationPeerManager);
getChoreService().scheduleChore(replicationBarrierCleaner);

final boolean isSnapshotChoreEnabled = this.snapshotCleanupTracker.isSnapshotCleanupEnabled();
final boolean isSnapshotChoreEnabled = this.snapshotCleanupStateStore.get();
this.snapshotCleanerChore = new SnapshotCleanerChore(this, conf, getSnapshotManager());
if (isSnapshotChoreEnabled) {
getChoreService().scheduleChore(this.snapshotCleanerChore);
Expand Down Expand Up @@ -1762,7 +1760,7 @@ protected void startProcedureExecutor() throws IOException {
* Turn on/off Snapshot Cleanup Chore
* @param on indicates whether Snapshot Cleanup Chore is to be run
*/
void switchSnapshotCleanup(final boolean on, final boolean synchronous) {
void switchSnapshotCleanup(final boolean on, final boolean synchronous) throws IOException {
if (synchronous) {
synchronized (this.snapshotCleanerChore) {
switchSnapshotCleanup(on);
Expand All @@ -1772,16 +1770,12 @@ void switchSnapshotCleanup(final boolean on, final boolean synchronous) {
}
}

private void switchSnapshotCleanup(final boolean on) {
try {
snapshotCleanupTracker.setSnapshotCleanupEnabled(on);
if (on) {
getChoreService().scheduleChore(this.snapshotCleanerChore);
} else {
this.snapshotCleanerChore.cancel();
}
} catch (KeeperException e) {
LOG.error("Error updating snapshot cleanup mode to {}", on, e);
private void switchSnapshotCleanup(final boolean on) throws IOException {
snapshotCleanupStateStore.set(on);
if (on) {
getChoreService().scheduleChore(this.snapshotCleanerChore);
} else {
this.snapshotCleanerChore.cancel();
}
}

Expand Down Expand Up @@ -1955,9 +1949,7 @@ public BalanceResponse balance(BalanceRequest request) throws IOException {

BalanceResponse.Builder responseBuilder = BalanceResponse.newBuilder();

if (
loadBalancerTracker == null || !(loadBalancerTracker.isBalancerOn() || request.isDryRun())
) {
if (loadBalancerStateStore == null || !(loadBalancerStateStore.get() || request.isDryRun())) {
return responseBuilder.build();
}

Expand Down Expand Up @@ -2889,8 +2881,8 @@ public ClusterMetrics getClusterMetricsWithoutCoprocessor(EnumSet<Option> option
break;
}
case BALANCER_ON: {
if (loadBalancerTracker != null) {
builder.setBalancerOn(loadBalancerTracker.isBalancerOn());
if (loadBalancerStateStore != null) {
builder.setBalancerOn(loadBalancerStateStore.get());
}
break;
}
Expand Down Expand Up @@ -3727,33 +3719,32 @@ public void reportMobCompactionEnd(TableName tableName) throws IOException {
}

/**
* Queries the state of the {@link LoadBalancerTracker}. If the balancer is not initialized, false
* is returned.
* Queries the state of the {@link LoadBalancerStateStore}. If the balancer is not initialized,
* false is returned.
* @return The state of the load balancer, or false if the load balancer isn't defined.
*/
public boolean isBalancerOn() {
return !isInMaintenanceMode() && loadBalancerTracker != null
&& loadBalancerTracker.isBalancerOn();
return !isInMaintenanceMode() && loadBalancerStateStore != null && loadBalancerStateStore.get();
}

/**
* Queries the state of the {@link RegionNormalizerTracker}. If it's not initialized, false is
* Queries the state of the {@link RegionNormalizerStateStore}. If it's not initialized, false is
* returned.
*/
public boolean isNormalizerOn() {
return !isInMaintenanceMode() && getRegionNormalizerManager().isNormalizerOn();
}

/**
* Queries the state of the {@link SplitOrMergeTracker}. If it is not initialized, false is
* Queries the state of the {@link SplitOrMergeStateStore}. If it is not initialized, false is
* returned. If switchType is illegal, false will return.
* @param switchType see {@link org.apache.hadoop.hbase.client.MasterSwitchType}
* @return The state of the switch
*/
@Override
public boolean isSplitOrMergeEnabled(MasterSwitchType switchType) {
return !isInMaintenanceMode() && splitOrMergeTracker != null
&& splitOrMergeTracker.isSplitOrMergeEnabled(switchType);
return !isInMaintenanceMode() && splitOrMergeStateStore != null
&& splitOrMergeStateStore.isSplitOrMergeEnabled(switchType);
}

/**
Expand All @@ -3768,8 +3759,8 @@ public String getLoadBalancerClassName() {
LoadBalancerFactory.getDefaultLoadBalancerClass().getName());
}

public SplitOrMergeTracker getSplitOrMergeTracker() {
return splitOrMergeTracker;
public SplitOrMergeStateStore getSplitOrMergeStateStore() {
return splitOrMergeStateStore;
}

@Override
Expand Down
Loading

0 comments on commit dcfde79

Please sign in to comment.