Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-17333. DFSClient supports lazy resolution from hostname to IP. #6430

Merged
merged 5 commits into from
Mar 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ public static InetSocketAddress createSocketAddr(String target) {
return createSocketAddr(target, -1);
}

public static InetSocketAddress createSocketAddrUnresolved(String target) {
return createSocketAddr(target, -1, null, false, false);
}

/**
* Util method to build socket addr from either.
* {@literal <host>}
Expand Down Expand Up @@ -219,6 +223,12 @@ public static InetSocketAddress createSocketAddr(String target,
int defaultPort,
String configName,
boolean useCacheIfPresent) {
return createSocketAddr(target, defaultPort, configName, useCacheIfPresent, true);
}

public static InetSocketAddress createSocketAddr(
String target, int defaultPort, String configName,
boolean useCacheIfPresent, boolean isResolved) {
String helpText = "";
if (configName != null) {
helpText = " (configuration property '" + configName + "')";
Expand All @@ -244,7 +254,10 @@ public static InetSocketAddress createSocketAddr(String target,
"Does not contain a valid host:port authority: " + target + helpText
);
}
return createSocketAddrForHost(host, port);
if (isResolved) {
return createSocketAddrForHost(host, port);
}
return InetSocketAddress.createUnresolved(host, port);
}

private static final long URI_CACHE_SIZE_DEFAULT = 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Failover.DFS_CLIENT_LAZY_RESOLVED;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Failover.DFS_CLIENT_LAZY_RESOLVED_DEFAULT;

@InterfaceAudience.Private
public class DFSUtilClient {
Expand Down Expand Up @@ -530,11 +532,18 @@ public static Map<String, InetSocketAddress> getAddressesForNameserviceId(
String suffix = concatSuffixes(nsId, nnId);
String address = checkKeysAndProcess(defaultValue, suffix, conf, keys);
if (address != null) {
InetSocketAddress isa = NetUtils.createSocketAddr(address);
if (isa.isUnresolved()) {
LOG.warn("Namenode for {} remains unresolved for ID {}. Check your "
+ "hdfs-site.xml file to ensure namenodes are configured "
+ "properly.", nsId, nnId);
InetSocketAddress isa = null;
// There is no need to resolve host->ip in advance.
// Delay the resolution until the host is used.
if (conf.getBoolean(DFS_CLIENT_LAZY_RESOLVED, DFS_CLIENT_LAZY_RESOLVED_DEFAULT)) {
isa = NetUtils.createSocketAddrUnresolved(address);
}else {
isa = NetUtils.createSocketAddr(address);
if (isa.isUnresolved()) {
LOG.warn("Namenode for {} remains unresolved for ID {}. Check your "
+ "hdfs-site.xml file to ensure namenodes are configured "
+ "properly.", nsId, nnId);
}
}
ret.put(nnId, isa);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,8 @@ interface Failover {
String RESOLVE_SERVICE_KEY = PREFIX + "resolver.impl";
String RESOLVE_ADDRESS_TO_FQDN = PREFIX + "resolver.useFQDN";
boolean RESOLVE_ADDRESS_TO_FQDN_DEFAULT = true;
String DFS_CLIENT_LAZY_RESOLVED = PREFIX + "lazy.resolved";
boolean DFS_CLIENT_LAZY_RESOLVED_DEFAULT = false;
}

/** dfs.client.write configuration properties */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,14 @@
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.net.DomainNameResolver;
import org.apache.hadoop.net.DomainNameResolverFactory;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Failover.DFS_CLIENT_LAZY_RESOLVED;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Failover.DFS_CLIENT_LAZY_RESOLVED_DEFAULT;

public abstract class AbstractNNFailoverProxyProvider<T> implements
FailoverProxyProvider <T> {
protected static final Logger LOG =
Expand Down Expand Up @@ -138,6 +142,10 @@ public void setCachedState(HAServiceState state) {
public HAServiceState getCachedState() {
return cachedState;
}

public void setAddress(InetSocketAddress address) {
this.address = address;
}
}

@Override
Expand All @@ -152,6 +160,24 @@ protected NNProxyInfo<T> createProxyIfNeeded(NNProxyInfo<T> pi) {
if (pi.proxy == null) {
assert pi.getAddress() != null : "Proxy address is null";
try {
InetSocketAddress address = pi.getAddress();
// If the host is not resolved to IP and lazy.resolved=true,
// the host needs to be resolved.
if (address.isUnresolved()) {
if (conf.getBoolean(DFS_CLIENT_LAZY_RESOLVED, DFS_CLIENT_LAZY_RESOLVED_DEFAULT)) {
InetSocketAddress isa =
NetUtils.createSocketAddrForHost(address.getHostName(), address.getPort());
if (isa.isUnresolved()) {
LOG.warn("Can not resolve host {}, check your hdfs-site.xml file " +
"to ensure host are configured correctly.", address.getHostName());
}
pi.setAddress(isa);
if (LOG.isDebugEnabled()) {
LOG.debug("Lazy resolve host {} -> {}, when create proxy if needed.",
address.toString(), pi.getAddress().toString());
}
}
}
pi.proxy = factory.createProxy(conf,
pi.getAddress(), xface, ugi, false, getFallbackToSimpleAuth());
} catch (IOException ioe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
Expand All @@ -60,6 +61,7 @@ public class TestConfiguredFailoverProxyProvider {
private URI ns1Uri;
private URI ns2Uri;
private URI ns3Uri;
private URI ns4Uri;
private String ns1;
private String ns1nn1Hostname = "machine1.foo.bar";
private InetSocketAddress ns1nn1 =
Expand All @@ -79,6 +81,9 @@ public class TestConfiguredFailoverProxyProvider {
new InetSocketAddress(ns2nn3Hostname, rpcPort);
private String ns3;
private static final int NUM_ITERATIONS = 50;
private String ns4;
private String ns4nn1Hostname = "localhost";
private String ns4nn2Hostname = "127.0.0.1";

@Rule
public final ExpectedException exception = ExpectedException.none();
Expand Down Expand Up @@ -133,8 +138,11 @@ public void setup() throws URISyntaxException {
ns3 = "mycluster-3-" + Time.monotonicNow();
ns3Uri = new URI("hdfs://" + ns3);

ns4 = "mycluster-4-" + Time.monotonicNow();
ns4Uri = new URI("hdfs://" + ns4);

conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES,
String.join(",", ns1, ns2, ns3));
String.join(",", ns1, ns2, ns3, ns4));
conf.set("fs.defaultFS", "hdfs://" + ns1);
}

Expand Down Expand Up @@ -170,6 +178,33 @@ private void addDNSSettings(Configuration config,
);
}

/**
* Add more LazyResolved related settings to the passed in configuration.
*/
private void addLazyResolvedSettings(Configuration config, boolean isLazy) {
config.set(
HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns4,
"nn1,nn2,nn3");
config.set(
HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns4 + ".nn1",
ns4nn1Hostname + ":" + rpcPort);
config.set(
HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns4 + ".nn2",
ns4nn2Hostname + ":" + rpcPort);
config.set(
HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + ns4,
ConfiguredFailoverProxyProvider.class.getName());
if (isLazy) {
// Set dfs.client.failover.lazy.resolved=true (default false).
config.setBoolean(
HdfsClientConfigKeys.Failover.DFS_CLIENT_LAZY_RESOLVED,
true);
}
config.setBoolean(
HdfsClientConfigKeys.Failover.RANDOM_ORDER + "." + ns4,
false);
}

/**
* Tests getProxy with random.order configuration set to false.
* This expects the proxy order to be consistent every time a new
Expand Down Expand Up @@ -330,6 +365,51 @@ public void testResolveDomainNameUsingDNS() throws Exception {
testResolveDomainNameUsingDNS(true);
}

@Test
public void testLazyResolved() throws IOException {
// Not lazy resolved.
testLazyResolved(false);
// Lazy resolved.
testLazyResolved(true);
}

private void testLazyResolved(boolean isLazy) throws IOException {
Configuration lazyResolvedConf = new Configuration(conf);
addLazyResolvedSettings(lazyResolvedConf, isLazy);
Map<InetSocketAddress, ClientProtocol> proxyMap = new HashMap<>();

InetSocketAddress ns4nn1 = new InetSocketAddress(ns4nn1Hostname, rpcPort);
InetSocketAddress ns4nn2 = new InetSocketAddress(ns4nn2Hostname, rpcPort);

// Mock ClientProtocol
final ClientProtocol nn1Mock = mock(ClientProtocol.class);
when(nn1Mock.getStats()).thenReturn(new long[]{0});
proxyMap.put(ns4nn1, nn1Mock);

final ClientProtocol nn2Mock = mock(ClientProtocol.class);
when(nn1Mock.getStats()).thenReturn(new long[]{0});
proxyMap.put(ns4nn2, nn2Mock);

ConfiguredFailoverProxyProvider<ClientProtocol> provider =
new ConfiguredFailoverProxyProvider<>(lazyResolvedConf, ns4Uri,
ClientProtocol.class, createFactory(proxyMap));
assertEquals(2, provider.proxies.size());
for (AbstractNNFailoverProxyProvider.NNProxyInfo proxyInfo : provider.proxies) {
if (isLazy) {
// If lazy resolution is used, and the proxy is not used at this time,
// so the host is not resolved.
assertTrue(proxyInfo.getAddress().isUnresolved());
}else {
assertFalse(proxyInfo.getAddress().isUnresolved());
}
}

// When the host is used to process the request, the host is resolved.
ClientProtocol proxy = provider.getProxy().proxy;
proxy.getStats();
assertFalse(provider.proxies.get(0).getAddress().isUnresolved());
}

@Test
public void testResolveDomainNameUsingDNSUnknownHost() throws Exception {
Configuration dnsConf = new Configuration(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4469,6 +4469,15 @@
</description>
</property>

<property>
<name>dfs.client.failover.lazy.resolved</name>
<value>false</value>
<description>
Used to enable lazy resolution of host->ip. If the value is true,
the host will only be resolved only before Dfsclient needs to request the host.
</description>
</property>

<property>
<name>dfs.client.key.provider.cache.expiry</name>
<value>864000000</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1159,4 +1159,51 @@ public void testGetTransferRateInBytesPerSecond() {
assertEquals(102_400_000,
DFSUtil.getTransferRateInBytesPerSecond(512_000_000L, 5_000_000_000L));
}

@Test
public void testLazyResolved() {
// Not lazy resolved.
testLazyResolved(false);
// Lazy resolved.
testLazyResolved(true);
}

private void testLazyResolved(boolean isLazy) {
final String ns1Nn1 = "localhost:8020";
final String ns1Nn2 = "127.0.0.1:8020";
final String ns2Nn1 = "127.0.0.2:8020";
final String ns2Nn2 = "127.0.0.3:8020";

HdfsConfiguration conf = new HdfsConfiguration();

conf.set(DFS_NAMESERVICES, "ns1,ns2");
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, "ns1"), "nn1,nn2");
conf.set(DFSUtil.addKeySuffixes(
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns1", "nn1"), ns1Nn1);
conf.set(DFSUtil.addKeySuffixes(
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns1", "nn2"), ns1Nn2);
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, "ns2"), "nn1,nn2");
conf.set(DFSUtil.addKeySuffixes(
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns2", "nn1"), ns2Nn1);
conf.set(DFSUtil.addKeySuffixes(
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns2", "nn2"), ns2Nn2);

conf.setBoolean(HdfsClientConfigKeys.Failover.DFS_CLIENT_LAZY_RESOLVED, isLazy);

Map<String, Map<String, InetSocketAddress>> addresses =
DFSUtilClient.getAddresses(conf, null, DFS_NAMENODE_RPC_ADDRESS_KEY);

addresses.forEach((ns, inetSocketAddressMap) -> {
inetSocketAddressMap.forEach((nn, inetSocketAddress) -> {
if (isLazy) {
// Lazy resolved. There is no need to change host->ip in advance.
assertTrue(inetSocketAddress.isUnresolved());
}else {
// Need resolve all host->ip.
assertFalse(inetSocketAddress.isUnresolved());
}
assertEquals(inetSocketAddress.getPort(), 8020);
});
});
}
}
Loading