diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java index 1a1819dfa2c5a..5f8df8ac40e07 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java @@ -103,6 +103,7 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_OBSERVER_NAMENODES_KEY_SUFFIX; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES; @@ -227,6 +228,20 @@ public static Collection getNameNodeIds(Configuration conf, String nsId) return conf.getTrimmedStringCollection(key); } + /** + * Returns collections of observer namenode Ids from the configuration. One logical id + * for each namenode in the in the Observer-Read setup. + * + * @param conf configuration + * @param nsId the nameservice ID to look at, or null for non-federated + * @return collection of observer namenode Ids + */ + public static Collection getObserberNameNodeIds(Configuration conf, String nsId) { + String key = addSuffix(DFS_HA_NAMENODES_KEY_PREFIX, nsId); + String observerKey = addSuffix(key, DFS_HA_OBSERVER_NAMENODES_KEY_SUFFIX); + return conf.getTrimmedStringCollection(observerKey); + } + /** Add non empty and non null suffix to a key */ static String addSuffix(String key, String suffix) { if (suffix == null || suffix.isEmpty()) { @@ -422,7 +437,21 @@ static String concatSuffixes(String... suffixes) { public static Map> getAddresses( Configuration conf, String defaultAddress, String... keys) { Collection nameserviceIds = getNameServiceIds(conf); - return getAddressesForNsIds(conf, nameserviceIds, defaultAddress, keys); + return getAddressesForNsIds(conf, nameserviceIds, defaultAddress, false, keys); + } + + /** + * Returns the configured address for all NameNodes in the cluster. + * @param conf configuration + * @param defaultAddress default address to return in case key is not found. + * @param forOnnFailover2Ann whether for ObserverReadProxyProvider failoverProxy + * @param keys Set of keys to look for in the order of preference + * @return a map(nameserviceId to map(namenodeId to InetSocketAddress)) + */ + public static Map> getAddresses( + Configuration conf, String defaultAddress, boolean forOnnFailover2Ann, String... keys) { + Collection nameserviceIds = getNameServiceIds(conf); + return getAddressesForNsIds(conf, nameserviceIds, defaultAddress, forOnnFailover2Ann, keys); } /** @@ -507,13 +536,13 @@ private static String getConcatNnId(String nsId, String nnId, String hostname, i */ static Map> getAddressesForNsIds( Configuration conf, Collection nsIds, String defaultAddress, - String... keys) { + boolean forOnnFailover2Ann, String... keys) { // Look for configurations of the form [.][.] // across all of the configured nameservices and namenodes. Map> ret = Maps.newLinkedHashMap(); for (String nsId : emptyAsSingletonNull(nsIds)) { Map isas = - getAddressesForNameserviceId(conf, nsId, defaultAddress, keys); + getAddressesForNameserviceId(conf, nsId, defaultAddress, forOnnFailover2Ann, keys); if (!isas.isEmpty()) { ret.put(nsId, isas); } @@ -523,7 +552,25 @@ static Map> getAddressesForNsIds( public static Map getAddressesForNameserviceId( Configuration conf, String nsId, String defaultValue, String... keys) { + return getAddressesForNameserviceId(conf, nsId, defaultValue, false, keys); + } + + public static Map getAddressesForNameserviceId( + Configuration conf, String nsId, String defaultValue, + boolean forOnnFailover2Ann, String... keys) { Collection nnIds = getNameNodeIds(conf, nsId); + if (forOnnFailover2Ann) { + Collection obnnIds = getObserberNameNodeIds(conf, nsId); + if (obnnIds.size() != 0) { + nnIds.removeIf(obnnIds::contains); + } + } + return getNamenodeIDInetSocketAddressMap(conf, nsId, defaultValue, nnIds, keys); + } + + private static Map getNamenodeIDInetSocketAddressMap( + Configuration conf, String nsId, String defaultValue, + Collection nnIds, String[] keys) { Map ret = Maps.newLinkedHashMap(); for (String nnId : emptyAsSingletonNull(nnIds)) { String suffix = concatSuffixes(nsId, nnId); @@ -532,8 +579,8 @@ public static Map getAddressesForNameserviceId( InetSocketAddress isa = NetUtils.createSocketAddr(address); if (isa.isUnresolved()) { LOG.warn("Namenode for {} remains unresolved for ID {}. Check your " - + "hdfs-site.xml file to ensure namenodes are configured " - + "properly.", nsId, nnId); + + "hdfs-site.xml file to ensure namenodes are configured " + + "properly.", nsId, nnId); } ret.put(nnId, isa); } @@ -541,6 +588,7 @@ public static Map getAddressesForNameserviceId( return ret; } + /** * Return address from configuration. Take a list of keys as preference. * If the address to be returned is the value of DFS_NAMENODE_RPC_ADDRESS_KEY, diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 7750c48ae6bfe..7232ac114e68e 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -79,6 +79,7 @@ public interface HdfsClientConfigKeys { String DFS_NAMENODE_HTTPS_ADDRESS_KEY = "dfs.namenode.https-address"; String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes"; int DFS_NAMENODE_RPC_PORT_DEFAULT = 8020; + String DFS_HA_OBSERVER_NAMENODES_KEY_SUFFIX = "observers"; String DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY = "dfs.namenode.kerberos.principal"; String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size"; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java index ec4c22ecb5c1a..c75f477afe202 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java @@ -167,10 +167,15 @@ protected NNProxyInfo createProxyIfNeeded(NNProxyInfo pi) { * Get list of configured NameNode proxy addresses. * Randomize the list if requested. */ - protected List> getProxyAddresses(URI uri, String addressKey) { + protected List> getProxyAddresses(URI uri, String addressKey, + boolean forOnnFailover2Ann) { final List> proxies = new ArrayList>(); - Map> map = - DFSUtilClient.getAddresses(conf, null, addressKey); + Map> map; + if (forOnnFailover2Ann) { + map = DFSUtilClient.getAddresses(conf, null, true, addressKey); + } else { + map = DFSUtilClient.getAddresses(conf, null, addressKey); + } Map addressesInNN = map.get(uri.getHost()); if (addressesInNN == null || addressesInNN.size() == 0) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java index 92e75cee364ed..df322c659bb83 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java @@ -48,7 +48,18 @@ public ConfiguredFailoverProxyProvider(Configuration conf, URI uri, public ConfiguredFailoverProxyProvider(Configuration conf, URI uri, Class xface, HAProxyFactory factory, String addressKey) { super(conf, uri, xface, factory); - this.proxies = getProxyAddresses(uri, addressKey); + this.proxies = getProxyAddresses(uri, addressKey, false); + } + + public ConfiguredFailoverProxyProvider(Configuration conf, URI uri, + Class xface, HAProxyFactory factory, boolean forOnnFailover2Ann) { + this(conf, uri, xface, factory, DFS_NAMENODE_RPC_ADDRESS_KEY, forOnnFailover2Ann); + } + + public ConfiguredFailoverProxyProvider(Configuration conf, URI uri, + Class xface, HAProxyFactory factory, String addressKey, boolean forOnnFailover2Ann) { + super(conf, uri, xface, factory); + this.proxies = getProxyAddresses(uri, addressKey, forOnnFailover2Ann); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java index e1a7c2f8030cb..3476ff82f35f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java @@ -168,7 +168,7 @@ public class ObserverReadProxyProvider public ObserverReadProxyProvider( Configuration conf, URI uri, Class xface, HAProxyFactory factory) { this(conf, uri, xface, factory, - new ConfiguredFailoverProxyProvider<>(conf, uri, xface, factory)); + new ConfiguredFailoverProxyProvider<>(conf, uri, xface, factory, true)); } @SuppressWarnings("unchecked") @@ -189,7 +189,7 @@ public ObserverReadProxyProvider( // Get all NameNode proxies nameNodeProxies = getProxyAddresses(uri, - HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY); + HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, false); // Create a wrapped proxy containing all the proxies. Since this combined // proxy is just redirecting to other proxies, all invocations can share it. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java index 54b1159899d76..0f5e660155c91 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java @@ -117,9 +117,9 @@ public ClientProtocol createProxy(Configuration config, }) { @Override protected List> getProxyAddresses( - URI uri, String addressKey) { + URI uri, String addressKey, boolean forOnnFailover2Ann) { List> nnProxies = - super.getProxyAddresses(uri, addressKey); + super.getProxyAddresses(uri, addressKey, forOnnFailover2Ann); return nnProxies; } };