From d48eaa73881e6341be31de5c3da630b27a304e92 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 28 Mar 2022 15:14:28 +0200 Subject: [PATCH 01/13] HADOOP-18080 replace use of twitter util-core --- hadoop-tools/hadoop-aws/pom.xml | 6 --- .../apache/hadoop/fs/common/BufferData.java | 7 +-- .../apache/hadoop/fs/common/BufferPool.java | 4 +- .../hadoop/fs/common/CachingBlockManager.java | 27 ++++++----- .../fs/common/ExecutorServiceFuturePool.java | 47 +++++++++++++++++++ .../apache/hadoop/fs/s3a/S3AFileSystem.java | 5 +- .../hadoop/fs/s3a/S3AReadOpContext.java | 15 +++--- .../fs/s3a/read/S3CachingBlockManager.java | 4 +- .../fs/s3a/read/S3CachingInputStream.java | 4 +- .../hadoop/fs/common/TestBufferData.java | 10 ++-- .../org/apache/hadoop/fs/s3a/read/Fakes.java | 14 +++--- .../s3a/read/TestS3CachingBlockManager.java | 7 ++- .../apache/hadoop/fs/s3a/read/TestS3File.java | 5 +- .../hadoop/fs/s3a/read/TestS3InputStream.java | 5 +- 14 files changed, 98 insertions(+), 62 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index 2e6b8ff359512..5583bb7ad05ec 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -435,12 +435,6 @@ aws-java-sdk-bundle compile - - com.twitter - util-core_2.11 - 21.2.0 - compile - org.assertj assertj-core diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferData.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferData.java index 34dd6d7ba3b8d..a855a1c2c390c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferData.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferData.java @@ -22,10 +22,9 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Future; import java.util.zip.CRC32; -import com.twitter.util.Awaitable.CanAwait; -import com.twitter.util.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -263,8 +262,6 @@ public boolean stateEqualsOneOf(State... states) { return false; } - private static final CanAwait CAN_AWAIT = () -> false; - public String toString() { return String.format( @@ -281,7 +278,7 @@ private String getFutureStr(Future f) { if (f == null) { return "--"; } else { - return this.action.isReady(CAN_AWAIT) ? "done" : "not done"; + return this.action.isDone() ? "done" : "not done"; } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java index 91798e550064a..259f9834cea82 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java @@ -27,8 +27,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CancellationException; +import java.util.concurrent.Future; -import com.twitter.util.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -233,7 +233,7 @@ public synchronized void close() { for (BufferData data : this.getAll()) { Future actionFuture = data.getActionFuture(); if (actionFuture != null) { - actionFuture.raise(new CancellationException("BufferPool is closing.")); + actionFuture.cancel(true); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java index 93417f3fe61e9..078b9a894e070 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java @@ -21,13 +21,12 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; -import com.twitter.util.Await; -import com.twitter.util.ExceptionalFunction0; -import com.twitter.util.Future; -import com.twitter.util.FuturePool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +38,7 @@ public abstract class CachingBlockManager extends BlockManager { private static final Logger LOG = LoggerFactory.getLogger(CachingBlockManager.class); // Asynchronous tasks are performed in this pool. - private final FuturePool futurePool; + private final ExecutorServiceFuturePool futurePool; // Pool of shared ByteBuffer instances. private BufferPool bufferPool; @@ -78,7 +77,7 @@ public abstract class CachingBlockManager extends BlockManager { * @throws IllegalArgumentException if bufferPoolSize is zero or negative. */ public CachingBlockManager( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, BlockData blockData, int bufferPoolSize) { super(blockData); @@ -344,7 +343,7 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State... /** * Read task that is submitted to the future pool. */ - private static class PrefetchTask extends ExceptionalFunction0 { + private static class PrefetchTask implements Supplier { private final BufferData data; private final CachingBlockManager blockManager; @@ -354,7 +353,7 @@ private static class PrefetchTask extends ExceptionalFunction0 { } @Override - public Void applyE() { + public Void get() { try { this.blockManager.prefetch(data); } catch (Exception e) { @@ -412,7 +411,9 @@ public void requestCaching(BufferData data) { if (state == BufferData.State.PREFETCHING) { blockFuture = data.getActionFuture(); } else { - blockFuture = Future.value(null); + CompletableFuture cf = new CompletableFuture<>(); + cf.complete(null); + blockFuture = cf; } CachePutTask task = new CachePutTask(data, blockFuture, this); @@ -433,13 +434,13 @@ private void addToCacheAndRelease(BufferData data, Future blockFuture) { } try { - Await.result(blockFuture); + blockFuture.get(); //TODO consider calling get(long timeout, TimeUnit unit) instead if (data.stateEqualsOneOf(BufferData.State.DONE)) { // There was an error during prefetch. return; } } catch (Exception e) { - String message = String.format("error waitng on blockFuture: %s", data); + String message = String.format("error waiting on blockFuture: %s", data); LOG.error(message, e); data.setDone(); return; @@ -500,7 +501,7 @@ protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException { this.cache.put(blockNumber, buffer); } - private static class CachePutTask extends ExceptionalFunction0 { + private static class CachePutTask implements Supplier { private final BufferData data; // Block being asynchronously fetched. @@ -519,7 +520,7 @@ private static class CachePutTask extends ExceptionalFunction0 { } @Override - public Void applyE() { + public Void get() { this.blockManager.addToCacheAndRelease(this.data, this.blockFuture); return null; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java new file mode 100644 index 0000000000000..ccbeae8299339 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.fs.common; + +import java.util.Locale; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.function.Supplier; + +public class ExecutorServiceFuturePool { + private ExecutorService executor; + + public ExecutorServiceFuturePool(ExecutorService executor) { + this.executor = executor; + } + + /** + * @param f function to run in future on executor pool + * @return future + * @throws java.util.concurrent.RejectedExecutionException can be thrown + * @throws NullPointerException if f param is null + */ + public Future apply(final Supplier f) { + return executor.submit(f::get); + } + + public String toString() { + return String.format(Locale.ROOT,"ExecutorServiceFuturePool(executor=%s)", executor); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 35c26067fcca1..76fe678073df3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -76,8 +76,7 @@ import com.amazonaws.services.s3.transfer.model.UploadResult; import com.amazonaws.event.ProgressListener; -import com.twitter.util.ExecutorServiceFuturePool; -import com.twitter.util.FuturePool; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -283,7 +282,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, private ThreadPoolExecutor unboundedThreadPool; // S3 reads are prefetched asynchronously using this future pool. - private FuturePool futurePool; + private ExecutorServiceFuturePool futurePool; // If true, the prefetching input stream is used for reads. private boolean prefetchEnabled; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java index acfe6a415f1e3..cfbd9921d80f2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java @@ -18,11 +18,10 @@ package org.apache.hadoop.fs.s3a; -import com.twitter.util.FuturePool; - import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy; import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; import org.apache.hadoop.fs.store.audit.AuditSpan; @@ -61,7 +60,7 @@ public class S3AReadOpContext extends S3AOpContext { private final AuditSpan auditSpan; // S3 reads are prefetched asynchronously using this future pool. - private FuturePool futurePool; + private ExecutorServiceFuturePool futurePool; // Size in bytes of a single prefetch block. private final int prefetchBlockSize; @@ -80,7 +79,7 @@ public class S3AReadOpContext extends S3AOpContext { * @param changeDetectionPolicy change detection policy. * @param readahead readahead for GET operations/skip, etc. * @param auditSpan active audit - * @param futurePool the FuturePool instance used by async prefetches. + * @param futurePool the ExecutorServiceFuturePool instance used by async prefetches. * @param prefetchBlockSize the size (in number of bytes) of each prefetched block. * @param prefetchBlockCount maximum number of prefetched blocks. */ @@ -94,7 +93,7 @@ public S3AReadOpContext( ChangeDetectionPolicy changeDetectionPolicy, final long readahead, final AuditSpan auditSpan, - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, int prefetchBlockSize, int prefetchBlockCount) { @@ -161,11 +160,11 @@ public AuditSpan getAuditSpan() { } /** - * Gets the {@code FuturePool} used for asynchronous prefetches. + * Gets the {@code ExecutorServiceFuturePool} used for asynchronous prefetches. * - * @return the {@code FuturePool} used for asynchronous prefetches. + * @return the {@code ExecutorServiceFuturePool} used for asynchronous prefetches. */ - public FuturePool getFuturePool() { + public ExecutorServiceFuturePool getFuturePool() { return this.futurePool; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java index c4fafd56f1d9b..7f6a564c18dc5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java @@ -22,7 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; -import com.twitter.util.FuturePool; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +52,7 @@ public class S3CachingBlockManager extends CachingBlockManager { * @throws IllegalArgumentException if reader is null. */ public S3CachingBlockManager( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, S3Reader reader, BlockData blockData, int bufferPoolSize) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java index 1117002526838..a1a9a22448ae3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java @@ -21,13 +21,13 @@ import java.io.IOException; -import com.twitter.util.FuturePool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.common.BlockData; import org.apache.hadoop.fs.common.BlockManager; import org.apache.hadoop.fs.common.BufferData; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; @@ -186,7 +186,7 @@ public String toString() { } protected BlockManager createBlockManager( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, S3Reader reader, BlockData blockData, int bufferPoolSize) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferData.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferData.java index 119e90ffebad5..c4699d11540ee 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferData.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferData.java @@ -24,8 +24,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.CompletableFuture; -import com.twitter.util.Future; import org.junit.Test; import org.apache.hadoop.test.AbstractHadoopTestBase; @@ -83,13 +83,14 @@ public void testValidStateUpdates() { assertEquals(BufferData.State.BLANK, data.getState()); - Future actionFuture = Future.value(null); + CompletableFuture actionFuture = new CompletableFuture<>(); + actionFuture.complete(null); data.setPrefetch(actionFuture); assertEquals(BufferData.State.PREFETCHING, data.getState()); assertNotNull(data.getActionFuture()); assertSame(actionFuture, data.getActionFuture()); - Future actionFuture2 = Future.value(null); + CompletableFuture actionFuture2 = new CompletableFuture<>(); data.setCaching(actionFuture2); assertEquals(BufferData.State.CACHING, data.getState()); assertNotNull(data.getActionFuture()); @@ -117,7 +118,8 @@ public void testValidStateUpdates() { @Test public void testInvalidStateUpdates() throws Exception { - Future actionFuture = Future.value(null); + CompletableFuture actionFuture = new CompletableFuture<>(); + actionFuture.complete(null); testInvalidStateUpdatesHelper( (d) -> d.setPrefetch(actionFuture), BufferData.State.BLANK, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java index 5c2f7eb224151..1b02c495bc477 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java @@ -34,11 +34,11 @@ import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectInputStream; -import com.twitter.util.FuturePool; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.common.BlockCache; import org.apache.hadoop.fs.common.BlockData; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.apache.hadoop.fs.common.SingleFilePerBlockCache; import org.apache.hadoop.fs.common.Validate; import org.apache.hadoop.fs.s3a.Invoker; @@ -109,7 +109,7 @@ public static S3ObjectAttributes createObjectAttributes( } public static S3AReadOpContext createReadContext( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, String key, int fileSize, int prefetchBlockSize, @@ -195,7 +195,7 @@ public void close() { public static S3InputStream createInputStream( Class clazz, - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, String bucket, String key, int fileSize, @@ -225,7 +225,7 @@ public static S3InputStream createInputStream( } public static TestS3InMemoryInputStream createS3InMemoryInputStream( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, String bucket, String key, int fileSize) { @@ -235,7 +235,7 @@ public static TestS3InMemoryInputStream createS3InMemoryInputStream( } public static TestS3CachingInputStream createS3CachingInputStream( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, String bucket, String key, int fileSize, @@ -322,7 +322,7 @@ private static void randomDelay(int delay) { public static class TestS3CachingBlockManager extends S3CachingBlockManager { public TestS3CachingBlockManager( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, S3Reader reader, BlockData blockData, int bufferPoolSize) { @@ -359,7 +359,7 @@ protected S3File getS3File() { @Override protected S3CachingBlockManager createBlockManager( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, S3Reader reader, BlockData blockData, int bufferPoolSize) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java index 99836793decba..3f84e2e028339 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java @@ -24,13 +24,12 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import com.twitter.util.ExecutorServiceFuturePool; -import com.twitter.util.FuturePool; import org.junit.Test; import org.apache.hadoop.fs.common.BlockData; import org.apache.hadoop.fs.common.BufferData; import org.apache.hadoop.fs.common.ExceptionAsserts; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.apache.hadoop.test.AbstractHadoopTestBase; import static org.junit.Assert.assertEquals; @@ -41,7 +40,7 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase { static final int POOL_SIZE = 3; private final ExecutorService threadPool = Executors.newFixedThreadPool(4); - private final FuturePool futurePool = new ExecutorServiceFuturePool(threadPool); + private final ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(threadPool); private final BlockData blockData = new BlockData(FILE_SIZE, BLOCK_SIZE); @@ -106,7 +105,7 @@ public void testArgChecks() throws Exception { */ static class TestBlockManager extends S3CachingBlockManager { TestBlockManager( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, S3Reader reader, BlockData blockData, int bufferPoolSize) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3File.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3File.java index 2f555d2b62c47..9f63ea0a889fd 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3File.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3File.java @@ -22,11 +22,10 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import com.twitter.util.ExecutorServiceFuturePool; -import com.twitter.util.FuturePool; import org.junit.Test; import org.apache.hadoop.fs.common.ExceptionAsserts; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; @@ -36,7 +35,7 @@ public class TestS3File extends AbstractHadoopTestBase { private final ExecutorService threadPool = Executors.newFixedThreadPool(1); - private final FuturePool futurePool = new ExecutorServiceFuturePool(threadPool); + private final ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(threadPool); private final S3AInputStream.InputStreamCallbacks client = MockS3File.createClient("bucket"); @Test diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java index 503cd699002c7..318a789cb6889 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java @@ -24,8 +24,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import com.twitter.util.ExecutorServiceFuturePool; -import com.twitter.util.FuturePool; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.junit.Test; import org.apache.hadoop.fs.FSExceptionMessages; @@ -45,7 +44,7 @@ public class TestS3InputStream extends AbstractHadoopTestBase { private static final int FILE_SIZE = 10; private final ExecutorService threadPool = Executors.newFixedThreadPool(4); - private final FuturePool futurePool = new ExecutorServiceFuturePool(threadPool); + private final ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(threadPool); private final S3AInputStream.InputStreamCallbacks client = MockS3File.createClient("bucket"); @Test From 02be95bf48b0a9f6273f9f62a526d80fc6abd6cd Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 28 Mar 2022 16:54:10 +0200 Subject: [PATCH 02/13] address some review comments --- .../apache/hadoop/fs/common/CachingBlockManager.java | 3 +-- .../hadoop/fs/common/ExecutorServiceFuturePool.java | 11 +++++++++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java index 078b9a894e070..e85cf711cdbe6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java @@ -440,8 +440,7 @@ private void addToCacheAndRelease(BufferData data, Future blockFuture) { return; } } catch (Exception e) { - String message = String.format("error waiting on blockFuture: %s", data); - LOG.error(message, e); + LOG.error("error waiting on blockFuture: {}", data, e); data.setDone(); return; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java index ccbeae8299339..829ec4174b0c0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java @@ -24,6 +24,17 @@ import java.util.concurrent.Future; import java.util.function.Supplier; +/** + * A FuturePool implementation backed by a java.util.concurrent.ExecutorService. + * + * If a piece of work has started, it cannot (currently) be cancelled. + * + * This class is a simplified version of com.twitter:util-core_2.11 ExecutorServiceFuturePool + * designed to avoid depending on that Scala library. One problem with using a Scala library is that many + * downstream projects (eg Apache Spark) use Scala and they might want to use a different version of Scala + * from the version that Hadoop chooses to use. + * + */ public class ExecutorServiceFuturePool { private ExecutorService executor; From a0dd7fe2c48105c5a3e09c27464c0af598f75d50 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 28 Mar 2022 21:03:43 +0200 Subject: [PATCH 03/13] add tests --- .../hadoop/fs/common/CachingBlockManager.java | 4 +- .../fs/common/ExecutorServiceFuturePool.java | 10 +++ .../common/TestExecutorServiceFuturePool.java | 78 +++++++++++++++++++ 3 files changed, 91 insertions(+), 1 deletion(-) create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java index e85cf711cdbe6..d7034b41dcd75 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -36,6 +37,7 @@ */ public abstract class CachingBlockManager extends BlockManager { private static final Logger LOG = LoggerFactory.getLogger(CachingBlockManager.class); + private static final int TIMEOUT_MINUTES = 60; // Asynchronous tasks are performed in this pool. private final ExecutorServiceFuturePool futurePool; @@ -434,7 +436,7 @@ private void addToCacheAndRelease(BufferData data, Future blockFuture) { } try { - blockFuture.get(); //TODO consider calling get(long timeout, TimeUnit unit) instead + blockFuture.get(TIMEOUT_MINUTES, TimeUnit.MINUTES); if (data.stateEqualsOneOf(BufferData.State.DONE)) { // There was an error during prefetch. return; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java index 829ec4174b0c0..70d14f70949db 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java @@ -52,6 +52,16 @@ public Future apply(final Supplier f) { return executor.submit(f::get); } + /** + * @param r runnable to run in future on executor pool + * @return future + * @throws java.util.concurrent.RejectedExecutionException can be thrown + * @throws NullPointerException if f param is null + */ + public Future apply(final Runnable r) { + return (Future) executor.submit(() -> r.run()); + } + public String toString() { return String.format(Locale.ROOT,"ExecutorServiceFuturePool(executor=%s)", executor); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java new file mode 100644 index 0000000000000..d51073558c7d1 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java @@ -0,0 +1,78 @@ +package org.apache.hadoop.fs.common; + +import org.junit.Test; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +public class TestExecutorServiceFuturePool { + @Test + public void testRunnableSucceeds() throws Exception { + ExecutorService executorService = Executors.newFixedThreadPool(3); + try { + ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); + final AtomicBoolean atomicBoolean = new AtomicBoolean(false); + Future future = futurePool.apply(() -> atomicBoolean.set(true)); + future.get(30, TimeUnit.SECONDS); + assertTrue(atomicBoolean.get()); + } finally { + executorService.shutdownNow(); + } + } + + @Test + public void testSupplierSucceeds() throws Exception { + ExecutorService executorService = Executors.newFixedThreadPool(3); + try { + ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); + final AtomicBoolean atomicBoolean = new AtomicBoolean(false); + Future future = futurePool.apply(() -> { + atomicBoolean.set(true); + return null; + }); + future.get(30, TimeUnit.SECONDS); + assertTrue(atomicBoolean.get()); + } finally { + executorService.shutdownNow(); + } + } + + @Test + public void testRunnableFails() throws Exception { + ExecutorService executorService = Executors.newFixedThreadPool(3); + try { + ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); + Future future = futurePool.apply((Runnable) () -> { + throw new IllegalStateException("deliberate"); + }); + assertThrows(ExecutionException.class, () -> future.get(30, TimeUnit.SECONDS)); + } finally { + executorService.shutdownNow(); + } + } + + @Test + public void testSupplierFails() throws Exception { + ExecutorService executorService = Executors.newFixedThreadPool(3); + try { + ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); + Future future = futurePool.apply(new Supplier() { + @Override + public Void get() { + throw new IllegalStateException("deliberate"); + } + }); + assertThrows(ExecutionException.class, () -> future.get(30, TimeUnit.SECONDS)); + } finally { + executorService.shutdownNow(); + } + } +} From bcab50d504f16877e91e2e579f33a28fda8d8b68 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 28 Mar 2022 21:38:27 +0200 Subject: [PATCH 04/13] missing license --- .../common/TestExecutorServiceFuturePool.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java index d51073558c7d1..c1e2b2d3e88bf 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.hadoop.fs.common; import org.junit.Test; From 339f47d9f4ae32afb6d8996cbe51e7063e24c124 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 28 Mar 2022 21:42:01 +0200 Subject: [PATCH 05/13] rename FuturePool methods --- .../hadoop/fs/common/CachingBlockManager.java | 4 ++-- .../fs/common/ExecutorServiceFuturePool.java | 4 ++-- .../fs/common/TestExecutorServiceFuturePool.java | 14 +++++--------- 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java index d7034b41dcd75..44c2df291387f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java @@ -248,7 +248,7 @@ public void requestPrefetch(int blockNumber) { BlockOperations.Operation op = this.ops.requestPrefetch(blockNumber); PrefetchTask prefetchTask = new PrefetchTask(data, this); - Future prefetchFuture = this.futurePool.apply(prefetchTask); + Future prefetchFuture = this.futurePool.executeFunction(prefetchTask); data.setPrefetch(prefetchFuture); this.ops.end(op); } @@ -419,7 +419,7 @@ public void requestCaching(BufferData data) { } CachePutTask task = new CachePutTask(data, blockFuture, this); - Future actionFuture = this.futurePool.apply(task); + Future actionFuture = this.futurePool.executeFunction(task); data.setCaching(actionFuture); this.ops.end(op); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java index 70d14f70949db..92d2274f179b0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java @@ -48,7 +48,7 @@ public ExecutorServiceFuturePool(ExecutorService executor) { * @throws java.util.concurrent.RejectedExecutionException can be thrown * @throws NullPointerException if f param is null */ - public Future apply(final Supplier f) { + public Future executeFunction(final Supplier f) { return executor.submit(f::get); } @@ -58,7 +58,7 @@ public Future apply(final Supplier f) { * @throws java.util.concurrent.RejectedExecutionException can be thrown * @throws NullPointerException if f param is null */ - public Future apply(final Runnable r) { + public Future executeRunnable(final Runnable r) { return (Future) executor.submit(() -> r.run()); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java index c1e2b2d3e88bf..b5f97eacf851c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java @@ -27,7 +27,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Supplier; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -39,7 +38,7 @@ public void testRunnableSucceeds() throws Exception { try { ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); final AtomicBoolean atomicBoolean = new AtomicBoolean(false); - Future future = futurePool.apply(() -> atomicBoolean.set(true)); + Future future = futurePool.executeRunnable(() -> atomicBoolean.set(true)); future.get(30, TimeUnit.SECONDS); assertTrue(atomicBoolean.get()); } finally { @@ -53,7 +52,7 @@ public void testSupplierSucceeds() throws Exception { try { ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); final AtomicBoolean atomicBoolean = new AtomicBoolean(false); - Future future = futurePool.apply(() -> { + Future future = futurePool.executeFunction(() -> { atomicBoolean.set(true); return null; }); @@ -69,7 +68,7 @@ public void testRunnableFails() throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(3); try { ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); - Future future = futurePool.apply((Runnable) () -> { + Future future = futurePool.executeRunnable(() -> { throw new IllegalStateException("deliberate"); }); assertThrows(ExecutionException.class, () -> future.get(30, TimeUnit.SECONDS)); @@ -83,11 +82,8 @@ public void testSupplierFails() throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(3); try { ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); - Future future = futurePool.apply(new Supplier() { - @Override - public Void get() { - throw new IllegalStateException("deliberate"); - } + Future future = futurePool.executeFunction(() -> { + throw new IllegalStateException("deliberate"); }); assertThrows(ExecutionException.class, () -> future.get(30, TimeUnit.SECONDS)); } finally { From c5b3c4b30d4c1d64bed6e1678f64f35110d0cd23 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 28 Mar 2022 22:02:15 +0200 Subject: [PATCH 06/13] Update ExecutorServiceFuturePool.java --- .../org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java index 92d2274f179b0..9ea01fddcd346 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java @@ -56,7 +56,7 @@ public Future executeFunction(final Supplier f) { * @param r runnable to run in future on executor pool * @return future * @throws java.util.concurrent.RejectedExecutionException can be thrown - * @throws NullPointerException if f param is null + * @throws NullPointerException if r param is null */ public Future executeRunnable(final Runnable r) { return (Future) executor.submit(() -> r.run()); From 5f9b643fce848452fc9c8dd3ade3704f67f5d5f6 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 28 Mar 2022 23:15:58 +0200 Subject: [PATCH 07/13] Update ExecutorServiceFuturePool.java --- .../org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java index 9ea01fddcd346..ba5cf2f2a739c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java @@ -58,6 +58,7 @@ public Future executeFunction(final Supplier f) { * @throws java.util.concurrent.RejectedExecutionException can be thrown * @throws NullPointerException if r param is null */ + @SuppressWarnings("unchecked") public Future executeRunnable(final Runnable r) { return (Future) executor.submit(() -> r.run()); } From 915b1d3313efa51b4863dcdba4030b26eafbe9fc Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Tue, 29 Mar 2022 00:54:27 +0200 Subject: [PATCH 08/13] fix indents --- .../apache/hadoop/fs/common/BufferPool.java | 1 - .../fs/common/ExecutorServiceFuturePool.java | 52 ++++----- .../common/TestExecutorServiceFuturePool.java | 102 +++++++++--------- 3 files changed, 77 insertions(+), 78 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java index 259f9834cea82..b151ed439afc6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java @@ -26,7 +26,6 @@ import java.util.IdentityHashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CancellationException; import java.util.concurrent.Future; import org.slf4j.Logger; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java index ba5cf2f2a739c..81fcf4c015b06 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java @@ -36,34 +36,34 @@ * */ public class ExecutorServiceFuturePool { - private ExecutorService executor; + private ExecutorService executor; - public ExecutorServiceFuturePool(ExecutorService executor) { - this.executor = executor; - } + public ExecutorServiceFuturePool(ExecutorService executor) { + this.executor = executor; + } - /** - * @param f function to run in future on executor pool - * @return future - * @throws java.util.concurrent.RejectedExecutionException can be thrown - * @throws NullPointerException if f param is null - */ - public Future executeFunction(final Supplier f) { - return executor.submit(f::get); - } + /** + * @param f function to run in future on executor pool + * @return future + * @throws java.util.concurrent.RejectedExecutionException can be thrown + * @throws NullPointerException if f param is null + */ + public Future executeFunction(final Supplier f) { + return executor.submit(f::get); + } - /** - * @param r runnable to run in future on executor pool - * @return future - * @throws java.util.concurrent.RejectedExecutionException can be thrown - * @throws NullPointerException if r param is null - */ - @SuppressWarnings("unchecked") - public Future executeRunnable(final Runnable r) { - return (Future) executor.submit(() -> r.run()); - } + /** + * @param r runnable to run in future on executor pool + * @return future + * @throws java.util.concurrent.RejectedExecutionException can be thrown + * @throws NullPointerException if r param is null + */ + @SuppressWarnings("unchecked") + public Future executeRunnable(final Runnable r) { + return (Future) executor.submit(() -> r.run()); + } - public String toString() { - return String.format(Locale.ROOT,"ExecutorServiceFuturePool(executor=%s)", executor); - } + public String toString() { + return String.format(Locale.ROOT,"ExecutorServiceFuturePool(executor=%s)", executor); + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java index b5f97eacf851c..8503be380b92a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java @@ -32,62 +32,62 @@ import static org.junit.Assert.assertTrue; public class TestExecutorServiceFuturePool { - @Test - public void testRunnableSucceeds() throws Exception { - ExecutorService executorService = Executors.newFixedThreadPool(3); - try { - ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); - final AtomicBoolean atomicBoolean = new AtomicBoolean(false); - Future future = futurePool.executeRunnable(() -> atomicBoolean.set(true)); - future.get(30, TimeUnit.SECONDS); - assertTrue(atomicBoolean.get()); - } finally { - executorService.shutdownNow(); - } + @Test + public void testRunnableSucceeds() throws Exception { + ExecutorService executorService = Executors.newFixedThreadPool(3); + try { + ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); + final AtomicBoolean atomicBoolean = new AtomicBoolean(false); + Future future = futurePool.executeRunnable(() -> atomicBoolean.set(true)); + future.get(30, TimeUnit.SECONDS); + assertTrue(atomicBoolean.get()); + } finally { + executorService.shutdownNow(); } + } - @Test - public void testSupplierSucceeds() throws Exception { - ExecutorService executorService = Executors.newFixedThreadPool(3); - try { - ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); - final AtomicBoolean atomicBoolean = new AtomicBoolean(false); - Future future = futurePool.executeFunction(() -> { - atomicBoolean.set(true); - return null; - }); - future.get(30, TimeUnit.SECONDS); - assertTrue(atomicBoolean.get()); - } finally { - executorService.shutdownNow(); - } + @Test + public void testSupplierSucceeds() throws Exception { + ExecutorService executorService = Executors.newFixedThreadPool(3); + try { + ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); + final AtomicBoolean atomicBoolean = new AtomicBoolean(false); + Future future = futurePool.executeFunction(() -> { + atomicBoolean.set(true); + return null; + }); + future.get(30, TimeUnit.SECONDS); + assertTrue(atomicBoolean.get()); + } finally { + executorService.shutdownNow(); } + } - @Test - public void testRunnableFails() throws Exception { - ExecutorService executorService = Executors.newFixedThreadPool(3); - try { - ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); - Future future = futurePool.executeRunnable(() -> { - throw new IllegalStateException("deliberate"); - }); - assertThrows(ExecutionException.class, () -> future.get(30, TimeUnit.SECONDS)); - } finally { - executorService.shutdownNow(); - } + @Test + public void testRunnableFails() throws Exception { + ExecutorService executorService = Executors.newFixedThreadPool(3); + try { + ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); + Future future = futurePool.executeRunnable(() -> { + throw new IllegalStateException("deliberate"); + }); + assertThrows(ExecutionException.class, () -> future.get(30, TimeUnit.SECONDS)); + } finally { + executorService.shutdownNow(); } + } - @Test - public void testSupplierFails() throws Exception { - ExecutorService executorService = Executors.newFixedThreadPool(3); - try { - ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); - Future future = futurePool.executeFunction(() -> { - throw new IllegalStateException("deliberate"); - }); - assertThrows(ExecutionException.class, () -> future.get(30, TimeUnit.SECONDS)); - } finally { - executorService.shutdownNow(); - } + @Test + public void testSupplierFails() throws Exception { + ExecutorService executorService = Executors.newFixedThreadPool(3); + try { + ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); + Future future = futurePool.executeFunction(() -> { + throw new IllegalStateException("deliberate"); + }); + assertThrows(ExecutionException.class, () -> future.get(30, TimeUnit.SECONDS)); + } finally { + executorService.shutdownNow(); } + } } From 27138bd9a295847e26bc6f61eeab20e1cc8ef7d4 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Tue, 29 Mar 2022 22:43:00 +0200 Subject: [PATCH 09/13] PR review items --- .../fs/s3a/read/S3CachingBlockManager.java | 2 +- .../common/TestExecutorServiceFuturePool.java | 92 +++++++++---------- .../hadoop/fs/s3a/read/TestS3InputStream.java | 2 +- 3 files changed, 48 insertions(+), 48 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java index 7f6a564c18dc5..674a5ccbdd8bb 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java @@ -22,12 +22,12 @@ import java.io.IOException; import java.nio.ByteBuffer; -import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.common.BlockData; import org.apache.hadoop.fs.common.CachingBlockManager; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.apache.hadoop.fs.common.Validate; /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java index 8503be380b92a..52dc8844eb2ea 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java @@ -19,6 +19,9 @@ package org.apache.hadoop.fs.common; +import org.apache.hadoop.test.AbstractHadoopTestBase; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import java.util.concurrent.ExecutionException; @@ -31,63 +34,60 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; -public class TestExecutorServiceFuturePool { - @Test - public void testRunnableSucceeds() throws Exception { - ExecutorService executorService = Executors.newFixedThreadPool(3); - try { - ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); - final AtomicBoolean atomicBoolean = new AtomicBoolean(false); - Future future = futurePool.executeRunnable(() -> atomicBoolean.set(true)); - future.get(30, TimeUnit.SECONDS); - assertTrue(atomicBoolean.get()); - } finally { +public class TestExecutorServiceFuturePool extends AbstractHadoopTestBase { + + private ExecutorService executorService; + + @Before + public void setUp() { + executorService = Executors.newFixedThreadPool(3); + } + + @After + public void tearDown() { + if (executorService != null) { executorService.shutdownNow(); } } + @Test + public void testRunnableSucceeds() throws Exception { + ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); + final AtomicBoolean atomicBoolean = new AtomicBoolean(false); + Future future = futurePool.executeRunnable(() -> atomicBoolean.set(true)); + future.get(30, TimeUnit.SECONDS); + assertTrue("atomicBoolean set to true?", atomicBoolean.get()); + } + @Test public void testSupplierSucceeds() throws Exception { - ExecutorService executorService = Executors.newFixedThreadPool(3); - try { - ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); - final AtomicBoolean atomicBoolean = new AtomicBoolean(false); - Future future = futurePool.executeFunction(() -> { - atomicBoolean.set(true); - return null; - }); - future.get(30, TimeUnit.SECONDS); - assertTrue(atomicBoolean.get()); - } finally { - executorService.shutdownNow(); - } + ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); + final AtomicBoolean atomicBoolean = new AtomicBoolean(false); + Future future = futurePool.executeFunction(() -> { + atomicBoolean.set(true); + return null; + }); + future.get(30, TimeUnit.SECONDS); + assertTrue("atomicBoolean set to true?", atomicBoolean.get()); } @Test - public void testRunnableFails() throws Exception { - ExecutorService executorService = Executors.newFixedThreadPool(3); - try { - ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); - Future future = futurePool.executeRunnable(() -> { - throw new IllegalStateException("deliberate"); - }); - assertThrows(ExecutionException.class, () -> future.get(30, TimeUnit.SECONDS)); - } finally { - executorService.shutdownNow(); - } + public void testRunnableFails() { + ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); + Future future = futurePool.executeRunnable(() -> { + throw new IllegalStateException("deliberate"); + }); + assertThrows("future failed with ExecutionException?", + ExecutionException.class, () -> future.get(30, TimeUnit.SECONDS)); } @Test - public void testSupplierFails() throws Exception { - ExecutorService executorService = Executors.newFixedThreadPool(3); - try { - ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); - Future future = futurePool.executeFunction(() -> { - throw new IllegalStateException("deliberate"); - }); - assertThrows(ExecutionException.class, () -> future.get(30, TimeUnit.SECONDS)); - } finally { - executorService.shutdownNow(); - } + public void testSupplierFails() { + ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); + Future future = futurePool.executeFunction(() -> { + throw new IllegalStateException("deliberate"); + }); + assertThrows("future failed with ExecutionException?", + ExecutionException.class, () -> future.get(30, TimeUnit.SECONDS)); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java index 318a789cb6889..010bc1c30b634 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java @@ -24,11 +24,11 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.junit.Test; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.common.ExceptionAsserts; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; From 57a5ccb8430336b84f08b1124e8435e383e74505 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Tue, 29 Mar 2022 23:11:57 +0200 Subject: [PATCH 10/13] use LambdaTestUtils --- .../fs/common/TestExecutorServiceFuturePool.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java index 52dc8844eb2ea..1ed07030d3f07 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java @@ -20,6 +20,7 @@ package org.apache.hadoop.fs.common; import org.apache.hadoop.test.AbstractHadoopTestBase; +import org.apache.hadoop.test.LambdaTestUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -31,7 +32,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; public class TestExecutorServiceFuturePool extends AbstractHadoopTestBase { @@ -72,22 +72,20 @@ public void testSupplierSucceeds() throws Exception { } @Test - public void testRunnableFails() { + public void testRunnableFails() throws Exception { ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); Future future = futurePool.executeRunnable(() -> { throw new IllegalStateException("deliberate"); }); - assertThrows("future failed with ExecutionException?", - ExecutionException.class, () -> future.get(30, TimeUnit.SECONDS)); + LambdaTestUtils.intercept(ExecutionException.class, () -> future.get(30, TimeUnit.SECONDS)); } @Test - public void testSupplierFails() { + public void testSupplierFails() throws Exception { ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); Future future = futurePool.executeFunction(() -> { throw new IllegalStateException("deliberate"); }); - assertThrows("future failed with ExecutionException?", - ExecutionException.class, () -> future.get(30, TimeUnit.SECONDS)); + LambdaTestUtils.intercept(ExecutionException.class, () -> future.get(30, TimeUnit.SECONDS)); } } From 66562cca050697a621389862028b6cd0051de533 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Wed, 30 Mar 2022 00:19:25 +0200 Subject: [PATCH 11/13] checkstyle warnings --- .../hadoop/fs/common/ExecutorServiceFuturePool.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java index 81fcf4c015b06..bed2d7ed90c1d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java @@ -29,9 +29,10 @@ * * If a piece of work has started, it cannot (currently) be cancelled. * - * This class is a simplified version of com.twitter:util-core_2.11 ExecutorServiceFuturePool - * designed to avoid depending on that Scala library. One problem with using a Scala library is that many - * downstream projects (eg Apache Spark) use Scala and they might want to use a different version of Scala + * This class is a simplified version of com.twitter:util-core_2.11 + * ExecutorServiceFuturePool designed to avoid depending on that Scala library. + * One problem with using a Scala library is that many downstream projects + * (eg Apache Spark) use Scala, and they might want to use a different version of Scala * from the version that Hadoop chooses to use. * */ @@ -64,6 +65,6 @@ public Future executeRunnable(final Runnable r) { } public String toString() { - return String.format(Locale.ROOT,"ExecutorServiceFuturePool(executor=%s)", executor); + return String.format(Locale.ROOT, "ExecutorServiceFuturePool(executor=%s)", executor); } } From ca715d4cd8e5ccef4e32f4493ced1dfd0964317e Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Wed, 30 Mar 2022 20:28:12 +0200 Subject: [PATCH 12/13] Update ExecutorServiceFuturePool.java --- .../org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java index bed2d7ed90c1d..932a047c94008 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java @@ -61,7 +61,7 @@ public Future executeFunction(final Supplier f) { */ @SuppressWarnings("unchecked") public Future executeRunnable(final Runnable r) { - return (Future) executor.submit(() -> r.run()); + return (Future) executor.submit(r::run); } public String toString() { From c01c7e0cd855f665a0a75521a9fdf0e4f7f5c205 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Wed, 30 Mar 2022 21:31:05 +0200 Subject: [PATCH 13/13] try to match import order --- .../fs/common/TestExecutorServiceFuturePool.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java index 1ed07030d3f07..00055a9ea1ef4 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java @@ -19,12 +19,6 @@ package org.apache.hadoop.fs.common; -import org.apache.hadoop.test.AbstractHadoopTestBase; -import org.apache.hadoop.test.LambdaTestUtils; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -32,6 +26,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.test.AbstractHadoopTestBase; +import org.apache.hadoop.test.LambdaTestUtils; + import static org.junit.Assert.assertTrue; public class TestExecutorServiceFuturePool extends AbstractHadoopTestBase {