Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-16671. RBF: RouterRpcFairnessPolicyController supports configurable permit acquire timeout #4597

Merged
merged 5 commits into from
Jul 28, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_KEY;

/**
* Base fairness policy that implements @RouterRpcFairnessPolicyController.
* Internally a map of nameservice to Semaphore is used to control permits.
Expand All @@ -42,15 +45,22 @@ public class AbstractRouterRpcFairnessPolicyController
/** Hash table to hold semaphore for each configured name service. */
private Map<String, Semaphore> permits;

private long acquireTimeout = DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT;

public void init(Configuration conf) {
this.permits = new HashMap<>();
long timeout = conf.getLong(DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_KEY,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the unit? We should do getTimeDuration()

DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT);
if (timeout >= 0) {
acquireTimeout = timeout;
}
}

@Override
public boolean acquirePermit(String nsId) {
try {
LOG.debug("Taking lock for nameservice {}", nsId);
return this.permits.get(nsId).tryAcquire(1, TimeUnit.SECONDS);
return this.permits.get(nsId).tryAcquire(acquireTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.debug("Cannot get a permit for nameservice {}", nsId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void init(Configuration conf)
// Assign remaining handlers equally to remaining name services and
// general pool if applicable.
if (!unassignedNS.isEmpty()) {
LOG.info("Unassigned ns {}", unassignedNS.toString());
LOG.info("Unassigned ns {}", unassignedNS);
int handlersPerNS = handlerCount / unassignedNS.size();
LOG.info("Handlers available per ns {}", handlersPerNS);
for (String nsId : unassignedNS) {
Expand All @@ -109,16 +109,15 @@ public void init(Configuration conf)
}

private static void logAssignment(String nsId, int count) {
LOG.info("Assigned {} handlers to nsId {} ",
count, nsId);
LOG.info("Assigned {} handlers to nsId {} ", count, nsId);
}

private void validateHandlersCount(Configuration conf, int handlerCount,
Set<String> allConfiguredNS) {
private void validateHandlersCount(Configuration conf,
int handlerCount, Set<String> allConfiguredNS) {
int totalDedicatedHandlers = 0;
for (String nsId : allConfiguredNS) {
int dedicatedHandlers =
conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it one line as we are touching this.

if (dedicatedHandlers > 0) {
// Total handlers should not be less than sum of dedicated handlers.
totalDedicatedHandlers += dedicatedHandlers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,9 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
NoRouterRpcFairnessPolicyController.class;
public static final String DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX =
FEDERATION_ROUTER_FAIRNESS_PREFIX + "handler.count.";
public static final String DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_KEY =
FEDERATION_ROUTER_FAIRNESS_PREFIX + "acquire.timeout";
public static final long DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_DEFAULT = 1000;

// HDFS Router Federation Rename.
public static final String DFS_ROUTER_FEDERATION_RENAME_PREFIX =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,14 @@
</description>
</property>

<property>
<name>dfs.federation.router.fairness.acquire.timeout</name>
<value>1000</value>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1s and remove the ms

<description>
The timeout value of acquiring permit for RouterRpcFairnessPolicyController.
</description>
</property>

<property>
<name>dfs.federation.router.federation.rename.bandwidth</name>
<value>10</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.Test;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
Expand Down Expand Up @@ -83,6 +85,29 @@ public void testHandlerAllocationPreconfigured() {
assertFalse(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
}

@Test
public void testAcquireTimeout() {
Configuration conf = createConf(40);
conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", 30);
conf.setLong(DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT_KEY, 100);
RouterRpcFairnessPolicyController routerRpcFairnessPolicyController =
FederationUtil.newFairnessPolicyController(conf);

// ns1 should have 30 permits allocated
for (int i = 0; i < 30; i++) {
assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1"));
}
long acquireBeginTime = Time.monotonicNow();
assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1"));
long acquireEndTime = Time.monotonicNow();

long acquireTime = acquireEndTime - acquireBeginTime;

// There are some other operations, so acquireTime should more than 100ms.
assertTrue(acquireTime > 100);
assertTrue(acquireTime < 100 + 50);
}

@Test
public void testAllocationErrorWithZeroHandlers() {
Configuration conf = createConf(0);
Expand Down