Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into HADOOP-19404
Browse files Browse the repository at this point in the history
  • Loading branch information
anmolanmol1234 authored Feb 4, 2025
2 parents dfb2d99 + 7459a1f commit 7072c65
Show file tree
Hide file tree
Showing 70 changed files with 5,196 additions and 367 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class MountTableRefresherService extends AbstractService {

/**
* All router admin clients cached. So no need to create the client again and
* again. Router admin address(host:port) is used as key to cache RouterClient
* again. Router admin address(host:port or ip:port) is used as key to cache RouterClient
* objects.
*/
private LoadingCache<String, RouterClient> routerClientsCache;
Expand All @@ -102,8 +102,13 @@ protected void serviceInit(Configuration conf) throws Exception {
this.mountTableStore = getMountTableStore();
// Attach this service to mount table store.
this.mountTableStore.setRefreshService(this);
this.localAdminAddress =
StateStoreUtils.getHostPortString(router.getAdminServerAddress());
if (conf.getBoolean(RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE,
RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE_DEFAULT)) {
this.localAdminAddress = StateStoreUtils.getIpPortString(router.getAdminServerAddress());
} else {
this.localAdminAddress = StateStoreUtils.getHostPortString(router.getAdminServerAddress());
}
LOG.info("Initialized MountTableRefresherService with addr: {}", this.localAdminAddress);
this.cacheUpdateTimeout = conf.getTimeDuration(
RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT,
RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT_DEFAULT,
Expand Down Expand Up @@ -220,7 +225,7 @@ public void refresh() throws StateStoreUnavailableException {
List<MountTableRefresherThread> refreshThreads = new ArrayList<>();
for (RouterState routerState : cachedRecords) {
String adminAddress = routerState.getAdminAddress();
if (adminAddress == null || adminAddress.length() == 0) {
if (adminAddress == null || adminAddress.isEmpty()) {
// this router has not enabled router admin.
continue;
}
Expand All @@ -237,11 +242,13 @@ public void refresh() throws StateStoreUnavailableException {
* RouterClient
*/
refreshThreads.add(getLocalRefresher(adminAddress));
LOG.debug("Added local refresher for {}", adminAddress);
} else {
try {
RouterClient client = routerClientsCache.get(adminAddress);
refreshThreads.add(new MountTableRefresherThread(
client.getMountTableManager(), adminAddress));
LOG.debug("Added remote refresher for {}", adminAddress);
} catch (ExecutionException execExcep) {
// Can not connect, seems router is stopped now.
LOG.warn(ROUTER_CONNECT_ERROR_MSG, adminAddress, execExcep);
Expand Down Expand Up @@ -296,6 +303,7 @@ private void logResult(List<MountTableRefresherThread> refreshThreads) {
if (mountTableRefreshThread.isSuccess()) {
successCount++;
} else {
LOG.debug("Failed to refresh {}", mountTableRefreshThread.getAdminAddress());
failureCount++;
// remove RouterClient from cache so that new client is created
removeFromCache(mountTableRefreshThread.getAdminAddress());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
FEDERATION_ROUTER_PREFIX + "safemode.checkperiod";
public static final long DFS_ROUTER_SAFEMODE_CHECKPERIOD_MS_DEFAULT =
TimeUnit.SECONDS.toMillis(5);
public static final String DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE =
FEDERATION_ROUTER_PREFIX + "heartbeat.with.ip.enable";
public static final boolean DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE_DEFAULT = false;

// HDFS Router-based federation mount table entries
/** Maximum number of cache entries to have. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,12 @@ synchronized void updateStateStore() {
getStateStoreVersion(MountTableStore.class));
record.setStateStoreVersion(stateStoreVersion);
// if admin server not started then hostPort will be empty
String hostPort =
StateStoreUtils.getHostPortString(router.getAdminServerAddress());
record.setAdminAddress(hostPort);
if (router.getConfig().getBoolean(RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE,
RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE_DEFAULT)) {
record.setAdminAddress(StateStoreUtils.getIpPortString(router.getAdminServerAddress()));
} else {
record.setAdminAddress(StateStoreUtils.getHostPortString(router.getAdminServerAddress()));
}
RouterHeartbeatRequest request =
RouterHeartbeatRequest.newInstance(record);
RouterHeartbeatResponse response = routerStore.routerHeartbeat(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
import org.apache.hadoop.net.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -136,4 +137,19 @@ public static String getHostPortString(InetSocketAddress address) {
return hostName + ":" + address.getPort();
}

/**
* Returns address in form of ip:port, empty string if address is null.
*
* @param address address
* @return host:port
*/
public static String getIpPortString(InetSocketAddress address) {
if (null == address) {
return "";
}
address = NetUtils.getConnectAddress(address);
InetAddress inet = address.getAddress();
return inet.getHostAddress() + ":" + address.getPort();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -974,4 +974,12 @@
</description>
</property>

<property>
<name>dfs.federation.router.heartbeat.with.ip.enable</name>
<description>
Make router use IP instead of host when communicating with router state state store.
</description>
<value>false</value>
</property>

</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -53,23 +55,32 @@
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

/**
* This test class verifies that mount table cache is updated on all the routers
* when MountTableRefreshService is enabled and there is a change in mount table
* entries.
*/
@RunWith(Parameterized.class)
public class TestRouterMountTableCacheRefresh {
private static TestingServer curatorTestingServer;
private static MiniRouterDFSCluster cluster;
private static RouterContext routerContext;
private static MountTableManager mountTableManager;

@BeforeClass
public static void setUp() throws Exception {
@Parameterized.Parameters
public static Collection<Object> data() {
return Arrays.asList(new Object[] {true, false});
}

public TestRouterMountTableCacheRefresh(boolean useIpForHeartbeats) throws Exception {
// Initialize only once per parameter
if (curatorTestingServer != null) {
return;
}
curatorTestingServer = new TestingServer();
curatorTestingServer.start();
final String connectString = curatorTestingServer.getConnectString();
Expand All @@ -82,6 +93,7 @@ public static void setUp() throws Exception {
FileSubclusterResolver.class);
conf.set(RBFConfigKeys.FEDERATION_STORE_ZK_ADDRESS, connectString);
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_STORE_ENABLE, true);
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE, useIpForHeartbeats);
cluster.addRouterOverrides(conf);
cluster.startCluster();
cluster.startRouters();
Expand All @@ -95,11 +107,15 @@ public static void setUp() throws Exception {
numNameservices, 60000);
}

@AfterClass
public static void destory() {
@Parameterized.AfterParam
public static void destroy() {
try {
curatorTestingServer.close();
cluster.shutdown();
if (curatorTestingServer != null) {
curatorTestingServer.close();
}
if (cluster != null) {
cluster.shutdown();
}
} catch (IOException e) {
// do nothing
}
Expand Down
4 changes: 2 additions & 2 deletions hadoop-project/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@
--add-opens=java.base/sun.security.x509=ALL-UNNAMED
</extraJavaTestArgs>
<!-- Plugin versions and config -->
<maven-surefire-plugin.argLine>-Xmx2048m -XX:+HeapDumpOnOutOfMemoryError ${extraJavaTestArgs}</maven-surefire-plugin.argLine>
<maven-surefire-plugin.version>3.0.0-M1</maven-surefire-plugin.version>
<maven-surefire-plugin.argLine>-Xmx2048m -Xss2m -XX:+HeapDumpOnOutOfMemoryError ${extraJavaTestArgs}</maven-surefire-plugin.argLine>
<maven-surefire-plugin.version>3.0.0-M4</maven-surefire-plugin.version>
<maven-surefire-report-plugin.version>${maven-surefire-plugin.version}</maven-surefire-report-plugin.version>
<maven-failsafe-plugin.version>${maven-surefire-plugin.version}</maven-failsafe-plugin.version>

Expand Down
5 changes: 5 additions & 0 deletions hadoop-tools/hadoop-archive-logs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@
<artifactId>junit-platform-launcher</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
5 changes: 5 additions & 0 deletions hadoop-tools/hadoop-archives/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@
<artifactId>junit-platform-launcher</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
5 changes: 5 additions & 0 deletions hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -601,5 +601,10 @@
<artifactId>junit-platform-launcher</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
6 changes: 5 additions & 1 deletion hadoop-tools/hadoop-azure-datalake/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@
<artifactId>junit-platform-launcher</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
5 changes: 5 additions & 0 deletions hadoop-tools/hadoop-azure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,11 @@
<artifactId>junit-platform-launcher</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,24 @@
import java.io.IOException;
import java.lang.reflect.Field;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.services.FixedSASTokenProvider;
import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.constants.AuthConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.Base64StringConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.BooleanConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerWithOutlierConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.LongConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.StringConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.Base64StringConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.BooleanConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
Expand All @@ -65,16 +64,16 @@
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
import org.apache.hadoop.fs.azurebfs.services.FixedSASTokenProvider;
import org.apache.hadoop.fs.azurebfs.services.KeyProvider;
import org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider;
import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.ReflectionUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*;
Expand Down Expand Up @@ -399,6 +398,34 @@ public class AbfsConfiguration{
FS_AZURE_ENABLE_PAGINATED_DELETE, DefaultValue = DEFAULT_ENABLE_PAGINATED_DELETE)
private boolean isPaginatedDeleteEnabled;

@LongConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_BLOB_COPY_PROGRESS_WAIT_MILLIS, DefaultValue = DEFAULT_AZURE_BLOB_COPY_PROGRESS_WAIT_MILLIS)
private long blobCopyProgressPollWaitMillis;

@LongConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_BLOB_COPY_MAX_WAIT_MILLIS, DefaultValue = DEFAULT_AZURE_BLOB_COPY_MAX_WAIT_MILLIS)
private long blobCopyProgressMaxWaitMillis;

@LongConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_BLOB_ATOMIC_RENAME_LEASE_REFRESH_DURATION, DefaultValue = DEFAULT_AZURE_BLOB_ATOMIC_RENAME_LEASE_REFRESH_DURATION)
private long blobAtomicRenameLeaseRefreshDuration;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, DefaultValue = DEFAULT_FS_AZURE_PRODUCER_QUEUE_MAX_SIZE)
private int producerQueueMaxSize;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_CONSUMER_MAX_LAG, DefaultValue = DEFAULT_FS_AZURE_CONSUMER_MAX_LAG)
private int listingMaxConsumptionLag;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, DefaultValue = DEFAULT_FS_AZURE_BLOB_RENAME_THREAD)
private int blobRenameDirConsumptionParallelism;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD, DefaultValue = DEFAULT_FS_AZURE_BLOB_DELETE_THREAD)
private int blobDeleteDirConsumptionParallelism;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES, DefaultValue = DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES)
private int maxApacheHttpClientIoExceptionsRetries;
Expand Down Expand Up @@ -1517,4 +1544,32 @@ public boolean getIsChecksumValidationEnabled() {
public void setIsChecksumValidationEnabled(boolean isChecksumValidationEnabled) {
this.isChecksumValidationEnabled = isChecksumValidationEnabled;
}

public long getBlobCopyProgressPollWaitMillis() {
return blobCopyProgressPollWaitMillis;
}

public long getBlobCopyProgressMaxWaitMillis() {
return blobCopyProgressMaxWaitMillis;
}

public long getAtomicRenameLeaseRefreshDuration() {
return blobAtomicRenameLeaseRefreshDuration;
}

public int getProducerQueueMaxSize() {
return producerQueueMaxSize;
}

public int getListingMaxConsumptionLag() {
return listingMaxConsumptionLag;
}

public int getBlobRenameDirConsumptionParallelism() {
return blobRenameDirConsumptionParallelism;
}

public int getBlobDeleteDirConsumptionParallelism() {
return blobDeleteDirConsumptionParallelism;
}
}
Loading

0 comments on commit 7072c65

Please sign in to comment.