Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-16452. msync RPC should send to acitve namenode directly #3976

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_OBSERVER_NAMENODES_KEY_SUFFIX;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
Expand Down Expand Up @@ -227,6 +228,20 @@ public static Collection<String> getNameNodeIds(Configuration conf, String nsId)
return conf.getTrimmedStringCollection(key);
}

/**
* Returns collections of observer namenode Ids from the configuration. One logical id
* for each namenode in the in the Observer-Read setup.
*
* @param conf configuration
* @param nsId the nameservice ID to look at, or null for non-federated
* @return collection of observer namenode Ids
*/
public static Collection<String> getObserberNameNodeIds(Configuration conf, String nsId) {
String key = addSuffix(DFS_HA_NAMENODES_KEY_PREFIX, nsId);
String observerKey = addSuffix(key, DFS_HA_OBSERVER_NAMENODES_KEY_SUFFIX);
return conf.getTrimmedStringCollection(observerKey);
}

/** Add non empty and non null suffix to a key */
static String addSuffix(String key, String suffix) {
if (suffix == null || suffix.isEmpty()) {
Expand Down Expand Up @@ -422,7 +437,21 @@ static String concatSuffixes(String... suffixes) {
public static Map<String, Map<String, InetSocketAddress>> getAddresses(
Configuration conf, String defaultAddress, String... keys) {
Collection<String> nameserviceIds = getNameServiceIds(conf);
return getAddressesForNsIds(conf, nameserviceIds, defaultAddress, keys);
return getAddressesForNsIds(conf, nameserviceIds, defaultAddress, false, keys);
}

/**
* Returns the configured address for all NameNodes in the cluster.
* @param conf configuration
* @param defaultAddress default address to return in case key is not found.
* @param forOnnFailover2Ann whether for ObserverReadProxyProvider failoverProxy
* @param keys Set of keys to look for in the order of preference
* @return a map(nameserviceId to map(namenodeId to InetSocketAddress))
*/
public static Map<String, Map<String, InetSocketAddress>> getAddresses(
Configuration conf, String defaultAddress, boolean forOnnFailover2Ann, String... keys) {
Collection<String> nameserviceIds = getNameServiceIds(conf);
return getAddressesForNsIds(conf, nameserviceIds, defaultAddress, forOnnFailover2Ann, keys);
}

/**
Expand Down Expand Up @@ -507,13 +536,13 @@ private static String getConcatNnId(String nsId, String nnId, String hostname, i
*/
static Map<String, Map<String, InetSocketAddress>> getAddressesForNsIds(
Configuration conf, Collection<String> nsIds, String defaultAddress,
String... keys) {
boolean forOnnFailover2Ann, String... keys) {
// Look for configurations of the form <key>[.<nameserviceId>][.<namenodeId>]
// across all of the configured nameservices and namenodes.
Map<String, Map<String, InetSocketAddress>> ret = Maps.newLinkedHashMap();
for (String nsId : emptyAsSingletonNull(nsIds)) {
Map<String, InetSocketAddress> isas =
getAddressesForNameserviceId(conf, nsId, defaultAddress, keys);
getAddressesForNameserviceId(conf, nsId, defaultAddress, forOnnFailover2Ann, keys);
if (!isas.isEmpty()) {
ret.put(nsId, isas);
}
Expand All @@ -523,7 +552,25 @@ static Map<String, Map<String, InetSocketAddress>> getAddressesForNsIds(

public static Map<String, InetSocketAddress> getAddressesForNameserviceId(
Configuration conf, String nsId, String defaultValue, String... keys) {
return getAddressesForNameserviceId(conf, nsId, defaultValue, false, keys);
}

public static Map<String, InetSocketAddress> getAddressesForNameserviceId(
Configuration conf, String nsId, String defaultValue,
boolean forOnnFailover2Ann, String... keys) {
Collection<String> nnIds = getNameNodeIds(conf, nsId);
if (forOnnFailover2Ann) {
Collection<String> obnnIds = getObserberNameNodeIds(conf, nsId);
if (obnnIds.size() != 0) {
nnIds.removeIf(obnnIds::contains);
}
}
return getNamenodeIDInetSocketAddressMap(conf, nsId, defaultValue, nnIds, keys);
}

private static Map<String, InetSocketAddress> getNamenodeIDInetSocketAddressMap(
Configuration conf, String nsId, String defaultValue,
Collection<String> nnIds, String[] keys) {
Map<String, InetSocketAddress> ret = Maps.newLinkedHashMap();
for (String nnId : emptyAsSingletonNull(nnIds)) {
String suffix = concatSuffixes(nsId, nnId);
Expand All @@ -532,15 +579,16 @@ public static Map<String, InetSocketAddress> getAddressesForNameserviceId(
InetSocketAddress isa = NetUtils.createSocketAddr(address);
if (isa.isUnresolved()) {
LOG.warn("Namenode for {} remains unresolved for ID {}. Check your "
+ "hdfs-site.xml file to ensure namenodes are configured "
+ "properly.", nsId, nnId);
+ "hdfs-site.xml file to ensure namenodes are configured "
+ "properly.", nsId, nnId);
}
ret.put(nnId, isa);
}
}
return ret;
}


/**
* Return address from configuration. Take a list of keys as preference.
* If the address to be returned is the value of DFS_NAMENODE_RPC_ADDRESS_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public interface HdfsClientConfigKeys {
String DFS_NAMENODE_HTTPS_ADDRESS_KEY = "dfs.namenode.https-address";
String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes";
int DFS_NAMENODE_RPC_PORT_DEFAULT = 8020;
String DFS_HA_OBSERVER_NAMENODES_KEY_SUFFIX = "observers";
String DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY =
"dfs.namenode.kerberos.principal";
String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,15 @@ protected NNProxyInfo<T> createProxyIfNeeded(NNProxyInfo<T> pi) {
* Get list of configured NameNode proxy addresses.
* Randomize the list if requested.
*/
protected List<NNProxyInfo<T>> getProxyAddresses(URI uri, String addressKey) {
protected List<NNProxyInfo<T>> getProxyAddresses(URI uri, String addressKey,
boolean forOnnFailover2Ann) {
final List<NNProxyInfo<T>> proxies = new ArrayList<NNProxyInfo<T>>();
Map<String, Map<String, InetSocketAddress>> map =
DFSUtilClient.getAddresses(conf, null, addressKey);
Map<String, Map<String, InetSocketAddress>> map;
if (forOnnFailover2Ann) {
map = DFSUtilClient.getAddresses(conf, null, true, addressKey);
} else {
map = DFSUtilClient.getAddresses(conf, null, addressKey);
}
Map<String, InetSocketAddress> addressesInNN = map.get(uri.getHost());

if (addressesInNN == null || addressesInNN.size() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,18 @@ public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
Class<T> xface, HAProxyFactory<T> factory, String addressKey) {
super(conf, uri, xface, factory);
this.proxies = getProxyAddresses(uri, addressKey);
this.proxies = getProxyAddresses(uri, addressKey, false);
}

public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
Class<T> xface, HAProxyFactory<T> factory, boolean forOnnFailover2Ann) {
this(conf, uri, xface, factory, DFS_NAMENODE_RPC_ADDRESS_KEY, forOnnFailover2Ann);
}

public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
Class<T> xface, HAProxyFactory<T> factory, String addressKey, boolean forOnnFailover2Ann) {
super(conf, uri, xface, factory);
this.proxies = getProxyAddresses(uri, addressKey, forOnnFailover2Ann);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public class ObserverReadProxyProvider<T>
public ObserverReadProxyProvider(
Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory) {
this(conf, uri, xface, factory,
new ConfiguredFailoverProxyProvider<>(conf, uri, xface, factory));
new ConfiguredFailoverProxyProvider<>(conf, uri, xface, factory, true));
}

@SuppressWarnings("unchecked")
Expand All @@ -189,7 +189,7 @@ public ObserverReadProxyProvider(

// Get all NameNode proxies
nameNodeProxies = getProxyAddresses(uri,
HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, false);

// Create a wrapped proxy containing all the proxies. Since this combined
// proxy is just redirecting to other proxies, all invocations can share it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ public ClientProtocol createProxy(Configuration config,
}) {
@Override
protected List<NNProxyInfo<ClientProtocol>> getProxyAddresses(
URI uri, String addressKey) {
URI uri, String addressKey, boolean forOnnFailover2Ann) {
List<NNProxyInfo<ClientProtocol>> nnProxies =
super.getProxyAddresses(uri, addressKey);
super.getProxyAddresses(uri, addressKey, forOnnFailover2Ann);
return nnProxies;
}
};
Expand Down