From f07b0630b9654b1c9b10ff5efc0e5989625404da Mon Sep 17 00:00:00 2001 From: sivabalan Date: Thu, 25 Aug 2022 12:56:33 -0700 Subject: [PATCH] =?UTF-8?q?Revert=20"[HUDI-3669]=20Add=20a=20remote=20requ?= =?UTF-8?q?est=20retry=20mechanism=20for=20'Remotehoodietablefiles?= =?UTF-8?q?=E2=80=A6=20(#5884)"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 660177bce1cd82975d7c25715497e0d2fbb2a95e. --- .../embedded/EmbeddedTimelineService.java | 5 -- .../table/view/FileSystemViewManager.java | 3 +- .../view/FileSystemViewStorageConfig.java | 76 ------------------- .../view/RemoteHoodieTableFileSystemView.java | 67 ++++------------ .../apache/hudi/common/util/RetryHelper.java | 46 +++++------ .../org/apache/hudi/util/StreamerUtil.java | 5 -- .../TestRemoteHoodieTableFileSystemView.java | 29 ------- 7 files changed, 36 insertions(+), 195 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java index 4d5375894d7e..72f8e29c9fa8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java @@ -117,11 +117,6 @@ public FileSystemViewStorageConfig getRemoteFileSystemViewConfig() { .withRemoteServerHost(hostAddr) .withRemoteServerPort(serverPort) .withRemoteTimelineClientTimeoutSecs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientTimeoutSecs()) - .withRemoteTimelineClientRetry(writeConfig.getClientSpecifiedViewStorageConfig().isRemoteTimelineClientRetryEnabled()) - .withRemoteTimelineClientMaxRetryNumbers(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientMaxRetryNumbers()) - .withRemoteTimelineInitialRetryIntervalMs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineInitialRetryIntervalMs()) - .withRemoteTimelineClientMaxRetryIntervalMs(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientMaxRetryIntervalMs()) - .withRemoteTimelineClientRetryExceptions(writeConfig.getClientSpecifiedViewStorageConfig().getRemoteTimelineClientRetryExceptions()) .build(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java index 48023d50463d..35fda6c416ac 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java @@ -214,7 +214,8 @@ private static RemoteHoodieTableFileSystemView createRemoteFileSystemView(Serial LOG.info("Creating remote view for basePath " + metaClient.getBasePath() + ". Server=" + viewConf.getRemoteViewServerHost() + ":" + viewConf.getRemoteViewServerPort() + ", Timeout=" + viewConf.getRemoteTimelineClientTimeoutSecs()); - return new RemoteHoodieTableFileSystemView(metaClient, viewConf); + return new RemoteHoodieTableFileSystemView(viewConf.getRemoteViewServerHost(), viewConf.getRemoteViewServerPort(), + metaClient, viewConf.getRemoteTimelineClientTimeoutSecs()); } public static FileSystemViewManager createViewManager(final HoodieEngineContext context, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java index bc835612aa93..63f10855bad8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java @@ -110,37 +110,6 @@ public class FileSystemViewStorageConfig extends HoodieConfig { .defaultValue(5 * 60) // 5 min .withDocumentation("Timeout in seconds, to wait for API requests against a remote file system view. e.g timeline server."); - public static final ConfigProperty REMOTE_RETRY_ENABLE = ConfigProperty - .key("hoodie.filesystem.view.remote.retry.enable") - .defaultValue("false") - .sinceVersion("0.12.0") - .withDocumentation("Whether to enable API request retry for remote file system view."); - - public static final ConfigProperty REMOTE_MAX_RETRY_NUMBERS = ConfigProperty - .key("hoodie.filesystem.view.remote.retry.max_numbers") - .defaultValue(3) // 3 times - .sinceVersion("0.12.0") - .withDocumentation("Maximum number of retry for API requests against a remote file system view. e.g timeline server."); - - public static final ConfigProperty REMOTE_INITIAL_RETRY_INTERVAL_MS = ConfigProperty - .key("hoodie.filesystem.view.remote.retry.initial_interval_ms") - .defaultValue(100L) - .sinceVersion("0.12.0") - .withDocumentation("Amount of time (in ms) to wait, before retry to do operations on storage."); - - public static final ConfigProperty REMOTE_MAX_RETRY_INTERVAL_MS = ConfigProperty - .key("hoodie.filesystem.view.remote.retry.max_interval_ms") - .defaultValue(2000L) - .sinceVersion("0.12.0") - .withDocumentation("Maximum amount of time (in ms), to wait for next retry."); - - public static final ConfigProperty RETRY_EXCEPTIONS = ConfigProperty - .key("hoodie.filesystem.view.remote.retry.exceptions") - .defaultValue("") - .sinceVersion("0.12.0") - .withDocumentation("The class name of the Exception that needs to be re-tryed, separated by commas. " - + "Default is empty which means retry all the IOException and RuntimeException from Remote Request."); - public static final ConfigProperty REMOTE_BACKUP_VIEW_ENABLE = ConfigProperty .key("hoodie.filesystem.remote.backup.view.enable") .defaultValue("true") // Need to be disabled only for tests. @@ -175,26 +144,6 @@ public Integer getRemoteTimelineClientTimeoutSecs() { return getInt(REMOTE_TIMEOUT_SECS); } - public boolean isRemoteTimelineClientRetryEnabled() { - return getBoolean(REMOTE_RETRY_ENABLE); - } - - public Integer getRemoteTimelineClientMaxRetryNumbers() { - return getInt(REMOTE_MAX_RETRY_NUMBERS); - } - - public Long getRemoteTimelineInitialRetryIntervalMs() { - return getLong(REMOTE_INITIAL_RETRY_INTERVAL_MS); - } - - public Long getRemoteTimelineClientMaxRetryIntervalMs() { - return getLong(REMOTE_MAX_RETRY_INTERVAL_MS); - } - - public String getRemoteTimelineClientRetryExceptions() { - return getString(RETRY_EXCEPTIONS); - } - public long getMaxMemoryForFileGroupMap() { long totalMemory = getLong(SPILLABLE_MEMORY); return totalMemory - getMaxMemoryForPendingCompaction() - getMaxMemoryForBootstrapBaseFile(); @@ -296,31 +245,6 @@ public Builder withRemoteTimelineClientTimeoutSecs(Integer timelineClientTimeout return this; } - public Builder withRemoteTimelineClientRetry(boolean enableRetry) { - fileSystemViewStorageConfig.setValue(REMOTE_RETRY_ENABLE, Boolean.toString(enableRetry)); - return this; - } - - public Builder withRemoteTimelineClientMaxRetryNumbers(Integer maxRetryNumbers) { - fileSystemViewStorageConfig.setValue(REMOTE_MAX_RETRY_NUMBERS, maxRetryNumbers.toString()); - return this; - } - - public Builder withRemoteTimelineInitialRetryIntervalMs(Long initialRetryIntervalMs) { - fileSystemViewStorageConfig.setValue(REMOTE_INITIAL_RETRY_INTERVAL_MS, initialRetryIntervalMs.toString()); - return this; - } - - public Builder withRemoteTimelineClientMaxRetryIntervalMs(Long maxRetryIntervalMs) { - fileSystemViewStorageConfig.setValue(REMOTE_MAX_RETRY_INTERVAL_MS, maxRetryIntervalMs.toString()); - return this; - } - - public Builder withRemoteTimelineClientRetryExceptions(String retryExceptions) { - fileSystemViewStorageConfig.setValue(RETRY_EXCEPTIONS, retryExceptions); - return this; - } - public Builder withMemFractionForPendingCompaction(Double memFractionForPendingCompaction) { fileSystemViewStorageConfig.setValue(SPILLABLE_COMPACTION_MEM_FRACTION, memFractionForPendingCompaction.toString()); return this; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java index ea51732eb020..099b79cbba0a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java @@ -39,7 +39,6 @@ import org.apache.hudi.common.table.timeline.dto.InstantDTO; import org.apache.hudi.common.table.timeline.dto.TimelineDTO; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.RetryHelper; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; @@ -133,35 +132,22 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, private boolean closed = false; - private RetryHelper retryHelper; - - private final HttpRequestCheckedFunction urlCheckedFunc; - private enum RequestMethod { GET, POST } public RemoteHoodieTableFileSystemView(String server, int port, HoodieTableMetaClient metaClient) { - this(metaClient, FileSystemViewStorageConfig.newBuilder().withRemoteServerHost(server).withRemoteServerPort(port).build()); + this(server, port, metaClient, 300); } - public RemoteHoodieTableFileSystemView(HoodieTableMetaClient metaClient, FileSystemViewStorageConfig viewConf) { + public RemoteHoodieTableFileSystemView(String server, int port, HoodieTableMetaClient metaClient, int timeoutSecs) { this.basePath = metaClient.getBasePath(); + this.serverHost = server; + this.serverPort = port; this.mapper = new ObjectMapper(); this.metaClient = metaClient; this.timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); - this.serverHost = viewConf.getRemoteViewServerHost(); - this.serverPort = viewConf.getRemoteViewServerPort(); - this.timeoutSecs = viewConf.getRemoteTimelineClientTimeoutSecs(); - this.urlCheckedFunc = new HttpRequestCheckedFunction(this.timeoutSecs * 1000); - if (viewConf.isRemoteTimelineClientRetryEnabled()) { - retryHelper = new RetryHelper( - viewConf.getRemoteTimelineClientMaxRetryIntervalMs(), - viewConf.getRemoteTimelineClientMaxRetryNumbers(), - viewConf.getRemoteTimelineInitialRetryIntervalMs(), - viewConf.getRemoteTimelineClientRetryExceptions(), - "Sending request"); - } + this.timeoutSecs = timeoutSecs; } private T executeRequest(String requestPath, Map queryParameters, TypeReference reference, @@ -179,9 +165,17 @@ private T executeRequest(String requestPath, Map queryParame String url = builder.toString(); LOG.info("Sending request : (" + url + ")"); - // Reset url and method, to avoid repeatedly instantiating objects. - urlCheckedFunc.setUrlAndMethod(url, method); - Response response = retryHelper != null ? retryHelper.tryWith(urlCheckedFunc).start() : urlCheckedFunc.get(); + Response response; + int timeout = this.timeoutSecs * 1000; // msec + switch (method) { + case GET: + response = Request.Get(url).connectTimeout(timeout).socketTimeout(timeout).execute(); + break; + case POST: + default: + response = Request.Post(url).connectTimeout(timeout).socketTimeout(timeout).execute(); + break; + } String content = response.returnContent().asString(); return (T) mapper.readValue(content, reference); } @@ -501,33 +495,4 @@ public Option getLatestBaseFile(String partitionPath, String fil throw new HoodieRemoteException(e); } } - - /** - * For remote HTTP requests, to avoid repeatedly instantiating objects. - */ - private class HttpRequestCheckedFunction implements RetryHelper.CheckedFunction { - private String url; - private RequestMethod method; - private final int timeoutMs; - - public void setUrlAndMethod(String url, RequestMethod method) { - this.method = method; - this.url = url; - } - - public HttpRequestCheckedFunction(int timeoutMs) { - this.timeoutMs = timeoutMs; - } - - @Override - public Response get() throws IOException { - switch (method) { - case GET: - return Request.Get(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute(); - case POST: - default: - return Request.Post(url).connectTimeout(timeoutMs).socketTimeout(timeoutMs).execute(); - } - } - } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java b/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java index efa10f38302b..067c5ee40dad 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java @@ -18,27 +18,28 @@ package org.apache.hudi.common.util; -import org.apache.hudi.exception.HoodieException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; -import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.stream.Collectors; -public class RetryHelper implements Serializable { +public class RetryHelper { private static final Logger LOG = LogManager.getLogger(RetryHelper.class); - private transient CheckedFunction func; - private final int num; - private final long maxIntervalTime; - private final long initialIntervalTime; + private CheckedFunction func; + private int num; + private long maxIntervalTime; + private long initialIntervalTime = 100L; private String taskInfo = "N/A"; private List> retryExceptionsClasses; + public RetryHelper() { + } + public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs, String retryExceptions) { this.num = maxRetryNumbers; this.initialIntervalTime = initialRetryIntervalMs; @@ -46,24 +47,18 @@ public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long initialRet if (StringUtils.isNullOrEmpty(retryExceptions)) { this.retryExceptionsClasses = new ArrayList<>(); } else { - try { - this.retryExceptionsClasses = Arrays.stream(retryExceptions.split(",")) - .map(exception -> (Exception) ReflectionUtils.loadClass(exception, "")) - .map(Exception::getClass) - .collect(Collectors.toList()); - } catch (HoodieException e) { - LOG.error("Exception while loading retry exceptions classes '" + retryExceptions + "'.", e); - this.retryExceptionsClasses = new ArrayList<>(); - } + this.retryExceptionsClasses = Arrays.stream(retryExceptions.split(",")) + .map(exception -> (Exception) ReflectionUtils.loadClass(exception, "")) + .map(Exception::getClass) + .collect(Collectors.toList()); } } - public RetryHelper(long maxRetryIntervalMs, int maxRetryNumbers, long initialRetryIntervalMs, String retryExceptions, String taskInfo) { - this(maxRetryIntervalMs, maxRetryNumbers, initialRetryIntervalMs, retryExceptions); + public RetryHelper(String taskInfo) { this.taskInfo = taskInfo; } - public RetryHelper tryWith(CheckedFunction func) { + public RetryHelper tryWith(CheckedFunction func) { this.func = func; return this; } @@ -82,18 +77,14 @@ public T start() throws IOException { throw e; } if (retries++ >= num) { - String message = "Still failed to " + taskInfo + " after retried " + num + " times."; - LOG.error(message, e); - if (e instanceof IOException) { - throw new IOException(message, e); - } + LOG.error("Still failed to " + taskInfo + " after retried " + num + " times.", e); throw e; } - LOG.warn("Catch Exception for " + taskInfo + ", will retry after " + waitTime + " ms.", e); + LOG.warn("Catch Exception " + taskInfo + ", will retry after " + waitTime + " ms.", e); try { Thread.sleep(waitTime); } catch (InterruptedException ex) { - // ignore InterruptedException here + // ignore InterruptedException here } } } @@ -101,7 +92,6 @@ public T start() throws IOException { if (retries > 0) { LOG.info("Success to " + taskInfo + " after retried " + retries + " times."); } - return functionResult; } @@ -133,7 +123,7 @@ private long getWaitTimeExp(int retryCount) { } @FunctionalInterface - public interface CheckedFunction extends Serializable { + public interface CheckedFunction { T get() throws IOException; } } \ No newline at end of file diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 4b93faeaf72d..b09d7ad8bfc3 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -429,11 +429,6 @@ public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throw .withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost()) .withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort()) .withRemoteTimelineClientTimeoutSecs(viewStorageConfig.getRemoteTimelineClientTimeoutSecs()) - .withRemoteTimelineClientRetry(viewStorageConfig.isRemoteTimelineClientRetryEnabled()) - .withRemoteTimelineClientMaxRetryNumbers(viewStorageConfig.getRemoteTimelineClientMaxRetryNumbers()) - .withRemoteTimelineInitialRetryIntervalMs(viewStorageConfig.getRemoteTimelineInitialRetryIntervalMs()) - .withRemoteTimelineClientMaxRetryIntervalMs(viewStorageConfig.getRemoteTimelineClientMaxRetryIntervalMs()) - .withRemoteTimelineClientRetryExceptions(viewStorageConfig.getRemoteTimelineClientRetryExceptions()) .build(); ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt, conf); return writeClient; diff --git a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java index 55fd9d7f1608..f9a6172b5ec3 100644 --- a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java +++ b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java @@ -28,14 +28,12 @@ import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView; import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.table.view.TestHoodieTableFileSystemView; -import org.apache.hudi.exception.HoodieRemoteException; import org.apache.hudi.timeline.service.TimelineService; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.junit.jupiter.api.Test; /** * Bring up a remote Timeline Server and run all test-cases of TestHoodieTableFileSystemView against it. @@ -66,31 +64,4 @@ protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline) { view = new RemoteHoodieTableFileSystemView("localhost", server.getServerPort(), metaClient); return view; } - - @Test - public void testRemoteHoodieTableFileSystemViewWithRetry() { - // Service is available. - view.getLatestBaseFiles(); - // Shut down the service. - server.close(); - try { - // Immediately fails and throws a connection refused exception. - view.getLatestBaseFiles(); - } catch (HoodieRemoteException e) { - assert e.getMessage().contains("Connection refused (Connection refused)"); - } - // Enable API request retry for remote file system view. - view = new RemoteHoodieTableFileSystemView(metaClient, FileSystemViewStorageConfig - .newBuilder() - .withRemoteServerHost("localhost") - .withRemoteServerPort(server.getServerPort()) - .withRemoteTimelineClientRetry(true) - .withRemoteTimelineClientMaxRetryNumbers(4) - .build()); - try { - view.getLatestBaseFiles(); - } catch (HoodieRemoteException e) { - assert e.getMessage().equalsIgnoreCase("Still failed to Sending request after retried 4 times."); - } - } }