Skip to content

Commit

Permalink
Address review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
bharathv committed Aug 24, 2020
1 parent 8875d93 commit 97375bc
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
Expand Down Expand Up @@ -51,7 +52,7 @@ public class MasterAddressRefresher implements Closeable {
private static final int PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT = 300;
public static final String MIN_SECS_BETWEEN_REFRESHES =
"hbase.client.master_registry.min_secs_between_refreshes";
private static final long MIN_SECS_BETWEEN_REFRESHES_DEFAULT = 60;
private static final int MIN_SECS_BETWEEN_REFRESHES_DEFAULT = 60;

private final ExecutorService pool;
private final MasterRegistry registry;
Expand Down Expand Up @@ -96,16 +97,17 @@ public void run() {
LOG.debug("Error populating latest list of masters.", e);
}
}
LOG.info("Master end point refresher loop exited.");
}
}

MasterAddressRefresher(Configuration conf, MasterRegistry registry) {
pool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setNameFormat("master-registry-refresh-end-points").setDaemon(true).build());
periodicRefreshMs = 1000 * conf.getLong(PERIODIC_REFRESH_INTERVAL_SECS,
PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT);
timeBetweenRefreshesMs = 1000 * conf.getLong(MIN_SECS_BETWEEN_REFRESHES,
MIN_SECS_BETWEEN_REFRESHES_DEFAULT);
periodicRefreshMs = TimeUnit.SECONDS.toMillis(conf.getLong(PERIODIC_REFRESH_INTERVAL_SECS,
PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT));
timeBetweenRefreshesMs = TimeUnit.SECONDS.toMillis(conf.getLong(MIN_SECS_BETWEEN_REFRESHES,
MIN_SECS_BETWEEN_REFRESHES_DEFAULT));
Preconditions.checkArgument(periodicRefreshMs > 0);
Preconditions.checkArgument(timeBetweenRefreshesMs < periodicRefreshMs);
this.registry = registry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,21 +289,34 @@ private static boolean hasActiveMaster(GetMastersResponse resp) {
return activeMasters.size() == 1;
}

private static ServerName filterActiveMaster(GetMastersResponse resp) {
private static ServerName filterActiveMaster(GetMastersResponse resp) throws IOException {
List<GetMastersResponseEntry> activeMasters =
resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect(
Collectors.toList());
Preconditions.checkState(activeMasters.size() == 1);
if (activeMasters.size() != 1) {
throw new IOException(String.format("Incorrect number of active masters encountered." +
" Expected: 1 found: %d. Content: %s", activeMasters.size(), activeMasters));
}
return ProtobufUtil.toServerName(activeMasters.get(0).getServerName());
}

@Override
public CompletableFuture<ServerName> getActiveMaster() {
return this
.<GetMastersResponse> call(
(c, s, d) -> s.getMasters(c, GetMastersRequest.getDefaultInstance(), d),
MasterRegistry::hasActiveMaster, "getMasters()")
.thenApply(MasterRegistry::filterActiveMaster);
CompletableFuture<ServerName> future = new CompletableFuture<>();
addListener(call((c, s, d) -> s.getMasters(c, GetMastersRequest.getDefaultInstance(), d),
MasterRegistry::hasActiveMaster, "getMasters()"), (resp, ex) -> {
if (ex != null) {
future.completeExceptionally(ex);
}
ServerName result = null;
try {
result = filterActiveMaster((GetMastersResponse)resp);
} catch (IOException e) {
future.completeExceptionally(e);
}
future.complete(result);
});
return future;
}

private static List<ServerName> transformServerNames(GetMastersResponse resp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,13 +297,12 @@ public List<ServerName> getBackupMasters() throws InterruptedIOException {
public static List<ServerName> getBackupMastersAndRenewWatch(
ZKWatcher zkw) throws InterruptedIOException {
// Build Set of backup masters from ZK nodes
List<String> backupMasterStrings;
List<String> backupMasterStrings = Collections.emptyList();
try {
backupMasterStrings = ZKUtil.listChildrenAndWatchForNewChildren(zkw,
zkw.getZNodePaths().backupMasterAddressesZNode);
} catch (KeeperException e) {
LOG.warn(zkw.prefix("Unable to list backup servers"), e);
backupMasterStrings = null;
}

List<ServerName> backupMasters = Collections.emptyList();
Expand Down

0 comments on commit 97375bc

Please sign in to comment.