Skip to content

Commit

Permalink
HDFS-17324. RBF: Router should not return nameservices that not enabl…
Browse files Browse the repository at this point in the history
…e observer r… (apache#6412)
LiuGuH authored Jan 23, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent b2fac14 commit 8193a84
Showing 2 changed files with 33 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs.server.federation.router;

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;

@@ -58,6 +59,10 @@ class RouterStateIdContext implements AlignmentContext {
private final ConcurrentHashMap<String, LongAccumulator> namespaceIdMap;
// Size limit for the map of state Ids to send to clients.
private final int maxSizeOfFederatedStateToPropagate;
/** Observer read enabled. Default for all nameservices. */
private final boolean observerReadEnabledDefault;
/** Nameservice specific overrides of the default setting for enabling observer reads. */
private HashSet<String> observerReadEnabledOverrides = new HashSet<>();

RouterStateIdContext(Configuration conf) {
this.coordinatedMethods = new HashSet<>();
@@ -75,6 +80,15 @@ class RouterStateIdContext implements AlignmentContext {
maxSizeOfFederatedStateToPropagate =
conf.getInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE,
RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE_DEFAULT);

this.observerReadEnabledDefault = conf.getBoolean(
RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY,
RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_VALUE);
String[] observerReadOverrides =
conf.getStrings(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES);
if (observerReadOverrides != null) {
observerReadEnabledOverrides.addAll(Arrays.asList(observerReadOverrides));
}
}

/**
@@ -86,7 +100,7 @@ public void setResponseHeaderState(RpcResponseHeaderProto.Builder headerBuilder)
}
RouterFederatedStateProto.Builder builder = RouterFederatedStateProto.newBuilder();
namespaceIdMap.forEach((k, v) -> {
if (v.get() != Long.MIN_VALUE) {
if ((v.get() != Long.MIN_VALUE) && isNamespaceObserverReadEligible(k)) {
builder.putNamespaceStateIds(k, v.get());
}
});
@@ -177,4 +191,13 @@ public boolean isCoordinatedCall(String protocolName, String methodName) {
return protocolName.equals(ClientProtocol.class.getCanonicalName())
&& coordinatedMethods.contains(methodName);
}

/**
* Check if a namespace is eligible for observer reads.
* @param nsId namespaceID
* @return whether the 'namespace' has observer reads enabled.
*/
boolean isNamespaceObserverReadEligible(String nsId) {
return observerReadEnabledDefault != observerReadEnabledOverrides.contains(nsId);
}
}
Original file line number Diff line number Diff line change
@@ -590,34 +590,31 @@ public void testClientReceiveResponseState() {
@Test
@Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
public void testRouterResponseHeaderState() {
RouterStateIdContext routerStateIdContext = new RouterStateIdContext(new Configuration());
// This conf makes ns1 that is not eligible for observer reads.
Configuration conf = new Configuration();
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true);
conf.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES, "ns1");

RouterStateIdContext routerStateIdContext = new RouterStateIdContext(conf);

ConcurrentHashMap<String, LongAccumulator> namespaceIdMap =
routerStateIdContext.getNamespaceIdMap();
namespaceIdMap.put("ns0", new LongAccumulator(Math::max, 10));
namespaceIdMap.put("ns1", new LongAccumulator(Math::max, 100));
namespaceIdMap.put("ns2", new LongAccumulator(Math::max, Long.MIN_VALUE));

Map<String, Long> mockMapping = new HashMap<>();
mockMapping.put("ns0", 10L);
mockMapping.put("ns2", 100L);
mockMapping.put("ns3", Long.MIN_VALUE);
RouterFederatedStateProto.Builder builder = RouterFederatedStateProto.newBuilder();
mockMapping.forEach(builder::putNamespaceStateIds);

RpcHeaderProtos.RpcResponseHeaderProto.Builder responseHeaderBuilder =
RpcHeaderProtos.RpcResponseHeaderProto
.newBuilder()
.setCallId(1)
.setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS)
.setRouterFederatedState(builder.build().toByteString());
.setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS);
routerStateIdContext.updateResponseState(responseHeaderBuilder);

Map<String, Long> latestFederateState = RouterStateIdContext.getRouterFederatedStateMap(
responseHeaderBuilder.build().getRouterFederatedState());
Assertions.assertEquals(2, latestFederateState.size());
// Only ns0 will be in latestFederateState
Assertions.assertEquals(1, latestFederateState.size());
Assertions.assertEquals(10L, latestFederateState.get("ns0"));
Assertions.assertEquals(100L, latestFederateState.get("ns1"));
}

@EnumSource(ConfigSetting.class)

0 comments on commit 8193a84

Please sign in to comment.