diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index 2e6b8ff359512..5583bb7ad05ec 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -435,12 +435,6 @@ aws-java-sdk-bundle compile - - com.twitter - util-core_2.11 - 21.2.0 - compile - org.assertj assertj-core diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferData.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferData.java index 34dd6d7ba3b8d..a855a1c2c390c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferData.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferData.java @@ -22,10 +22,9 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Future; import java.util.zip.CRC32; -import com.twitter.util.Awaitable.CanAwait; -import com.twitter.util.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -263,8 +262,6 @@ public boolean stateEqualsOneOf(State... states) { return false; } - private static final CanAwait CAN_AWAIT = () -> false; - public String toString() { return String.format( @@ -281,7 +278,7 @@ private String getFutureStr(Future f) { if (f == null) { return "--"; } else { - return this.action.isReady(CAN_AWAIT) ? "done" : "not done"; + return this.action.isDone() ? "done" : "not done"; } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java index 91798e550064a..b151ed439afc6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java @@ -26,9 +26,8 @@ import java.util.IdentityHashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CancellationException; +import java.util.concurrent.Future; -import com.twitter.util.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -233,7 +232,7 @@ public synchronized void close() { for (BufferData data : this.getAll()) { Future actionFuture = data.getActionFuture(); if (actionFuture != null) { - actionFuture.raise(new CancellationException("BufferPool is closing.")); + actionFuture.cancel(true); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java index 93417f3fe61e9..44c2df291387f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java @@ -21,13 +21,13 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; -import com.twitter.util.Await; -import com.twitter.util.ExceptionalFunction0; -import com.twitter.util.Future; -import com.twitter.util.FuturePool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,9 +37,10 @@ */ public abstract class CachingBlockManager extends BlockManager { private static final Logger LOG = LoggerFactory.getLogger(CachingBlockManager.class); + private static final int TIMEOUT_MINUTES = 60; // Asynchronous tasks are performed in this pool. - private final FuturePool futurePool; + private final ExecutorServiceFuturePool futurePool; // Pool of shared ByteBuffer instances. private BufferPool bufferPool; @@ -78,7 +79,7 @@ public abstract class CachingBlockManager extends BlockManager { * @throws IllegalArgumentException if bufferPoolSize is zero or negative. */ public CachingBlockManager( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, BlockData blockData, int bufferPoolSize) { super(blockData); @@ -247,7 +248,7 @@ public void requestPrefetch(int blockNumber) { BlockOperations.Operation op = this.ops.requestPrefetch(blockNumber); PrefetchTask prefetchTask = new PrefetchTask(data, this); - Future prefetchFuture = this.futurePool.apply(prefetchTask); + Future prefetchFuture = this.futurePool.executeFunction(prefetchTask); data.setPrefetch(prefetchFuture); this.ops.end(op); } @@ -344,7 +345,7 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State... /** * Read task that is submitted to the future pool. */ - private static class PrefetchTask extends ExceptionalFunction0 { + private static class PrefetchTask implements Supplier { private final BufferData data; private final CachingBlockManager blockManager; @@ -354,7 +355,7 @@ private static class PrefetchTask extends ExceptionalFunction0 { } @Override - public Void applyE() { + public Void get() { try { this.blockManager.prefetch(data); } catch (Exception e) { @@ -412,11 +413,13 @@ public void requestCaching(BufferData data) { if (state == BufferData.State.PREFETCHING) { blockFuture = data.getActionFuture(); } else { - blockFuture = Future.value(null); + CompletableFuture cf = new CompletableFuture<>(); + cf.complete(null); + blockFuture = cf; } CachePutTask task = new CachePutTask(data, blockFuture, this); - Future actionFuture = this.futurePool.apply(task); + Future actionFuture = this.futurePool.executeFunction(task); data.setCaching(actionFuture); this.ops.end(op); } @@ -433,14 +436,13 @@ private void addToCacheAndRelease(BufferData data, Future blockFuture) { } try { - Await.result(blockFuture); + blockFuture.get(TIMEOUT_MINUTES, TimeUnit.MINUTES); if (data.stateEqualsOneOf(BufferData.State.DONE)) { // There was an error during prefetch. return; } } catch (Exception e) { - String message = String.format("error waitng on blockFuture: %s", data); - LOG.error(message, e); + LOG.error("error waiting on blockFuture: {}", data, e); data.setDone(); return; } @@ -500,7 +502,7 @@ protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException { this.cache.put(blockNumber, buffer); } - private static class CachePutTask extends ExceptionalFunction0 { + private static class CachePutTask implements Supplier { private final BufferData data; // Block being asynchronously fetched. @@ -519,7 +521,7 @@ private static class CachePutTask extends ExceptionalFunction0 { } @Override - public Void applyE() { + public Void get() { this.blockManager.addToCacheAndRelease(this.data, this.blockFuture); return null; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java new file mode 100644 index 0000000000000..932a047c94008 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.fs.common; + +import java.util.Locale; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.function.Supplier; + +/** + * A FuturePool implementation backed by a java.util.concurrent.ExecutorService. + * + * If a piece of work has started, it cannot (currently) be cancelled. + * + * This class is a simplified version of com.twitter:util-core_2.11 + * ExecutorServiceFuturePool designed to avoid depending on that Scala library. + * One problem with using a Scala library is that many downstream projects + * (eg Apache Spark) use Scala, and they might want to use a different version of Scala + * from the version that Hadoop chooses to use. + * + */ +public class ExecutorServiceFuturePool { + private ExecutorService executor; + + public ExecutorServiceFuturePool(ExecutorService executor) { + this.executor = executor; + } + + /** + * @param f function to run in future on executor pool + * @return future + * @throws java.util.concurrent.RejectedExecutionException can be thrown + * @throws NullPointerException if f param is null + */ + public Future executeFunction(final Supplier f) { + return executor.submit(f::get); + } + + /** + * @param r runnable to run in future on executor pool + * @return future + * @throws java.util.concurrent.RejectedExecutionException can be thrown + * @throws NullPointerException if r param is null + */ + @SuppressWarnings("unchecked") + public Future executeRunnable(final Runnable r) { + return (Future) executor.submit(r::run); + } + + public String toString() { + return String.format(Locale.ROOT, "ExecutorServiceFuturePool(executor=%s)", executor); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 35c26067fcca1..76fe678073df3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -76,8 +76,7 @@ import com.amazonaws.services.s3.transfer.model.UploadResult; import com.amazonaws.event.ProgressListener; -import com.twitter.util.ExecutorServiceFuturePool; -import com.twitter.util.FuturePool; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -283,7 +282,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, private ThreadPoolExecutor unboundedThreadPool; // S3 reads are prefetched asynchronously using this future pool. - private FuturePool futurePool; + private ExecutorServiceFuturePool futurePool; // If true, the prefetching input stream is used for reads. private boolean prefetchEnabled; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java index acfe6a415f1e3..cfbd9921d80f2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java @@ -18,11 +18,10 @@ package org.apache.hadoop.fs.s3a; -import com.twitter.util.FuturePool; - import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy; import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; import org.apache.hadoop.fs.store.audit.AuditSpan; @@ -61,7 +60,7 @@ public class S3AReadOpContext extends S3AOpContext { private final AuditSpan auditSpan; // S3 reads are prefetched asynchronously using this future pool. - private FuturePool futurePool; + private ExecutorServiceFuturePool futurePool; // Size in bytes of a single prefetch block. private final int prefetchBlockSize; @@ -80,7 +79,7 @@ public class S3AReadOpContext extends S3AOpContext { * @param changeDetectionPolicy change detection policy. * @param readahead readahead for GET operations/skip, etc. * @param auditSpan active audit - * @param futurePool the FuturePool instance used by async prefetches. + * @param futurePool the ExecutorServiceFuturePool instance used by async prefetches. * @param prefetchBlockSize the size (in number of bytes) of each prefetched block. * @param prefetchBlockCount maximum number of prefetched blocks. */ @@ -94,7 +93,7 @@ public S3AReadOpContext( ChangeDetectionPolicy changeDetectionPolicy, final long readahead, final AuditSpan auditSpan, - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, int prefetchBlockSize, int prefetchBlockCount) { @@ -161,11 +160,11 @@ public AuditSpan getAuditSpan() { } /** - * Gets the {@code FuturePool} used for asynchronous prefetches. + * Gets the {@code ExecutorServiceFuturePool} used for asynchronous prefetches. * - * @return the {@code FuturePool} used for asynchronous prefetches. + * @return the {@code ExecutorServiceFuturePool} used for asynchronous prefetches. */ - public FuturePool getFuturePool() { + public ExecutorServiceFuturePool getFuturePool() { return this.futurePool; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java index c4fafd56f1d9b..674a5ccbdd8bb 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java @@ -22,12 +22,12 @@ import java.io.IOException; import java.nio.ByteBuffer; -import com.twitter.util.FuturePool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.common.BlockData; import org.apache.hadoop.fs.common.CachingBlockManager; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.apache.hadoop.fs.common.Validate; /** @@ -52,7 +52,7 @@ public class S3CachingBlockManager extends CachingBlockManager { * @throws IllegalArgumentException if reader is null. */ public S3CachingBlockManager( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, S3Reader reader, BlockData blockData, int bufferPoolSize) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java index 1117002526838..a1a9a22448ae3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java @@ -21,13 +21,13 @@ import java.io.IOException; -import com.twitter.util.FuturePool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.common.BlockData; import org.apache.hadoop.fs.common.BlockManager; import org.apache.hadoop.fs.common.BufferData; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; @@ -186,7 +186,7 @@ public String toString() { } protected BlockManager createBlockManager( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, S3Reader reader, BlockData blockData, int bufferPoolSize) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferData.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferData.java index 119e90ffebad5..c4699d11540ee 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferData.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferData.java @@ -24,8 +24,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.CompletableFuture; -import com.twitter.util.Future; import org.junit.Test; import org.apache.hadoop.test.AbstractHadoopTestBase; @@ -83,13 +83,14 @@ public void testValidStateUpdates() { assertEquals(BufferData.State.BLANK, data.getState()); - Future actionFuture = Future.value(null); + CompletableFuture actionFuture = new CompletableFuture<>(); + actionFuture.complete(null); data.setPrefetch(actionFuture); assertEquals(BufferData.State.PREFETCHING, data.getState()); assertNotNull(data.getActionFuture()); assertSame(actionFuture, data.getActionFuture()); - Future actionFuture2 = Future.value(null); + CompletableFuture actionFuture2 = new CompletableFuture<>(); data.setCaching(actionFuture2); assertEquals(BufferData.State.CACHING, data.getState()); assertNotNull(data.getActionFuture()); @@ -117,7 +118,8 @@ public void testValidStateUpdates() { @Test public void testInvalidStateUpdates() throws Exception { - Future actionFuture = Future.value(null); + CompletableFuture actionFuture = new CompletableFuture<>(); + actionFuture.complete(null); testInvalidStateUpdatesHelper( (d) -> d.setPrefetch(actionFuture), BufferData.State.BLANK, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java new file mode 100644 index 0000000000000..00055a9ea1ef4 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.fs.common; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.test.AbstractHadoopTestBase; +import org.apache.hadoop.test.LambdaTestUtils; + +import static org.junit.Assert.assertTrue; + +public class TestExecutorServiceFuturePool extends AbstractHadoopTestBase { + + private ExecutorService executorService; + + @Before + public void setUp() { + executorService = Executors.newFixedThreadPool(3); + } + + @After + public void tearDown() { + if (executorService != null) { + executorService.shutdownNow(); + } + } + + @Test + public void testRunnableSucceeds() throws Exception { + ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); + final AtomicBoolean atomicBoolean = new AtomicBoolean(false); + Future future = futurePool.executeRunnable(() -> atomicBoolean.set(true)); + future.get(30, TimeUnit.SECONDS); + assertTrue("atomicBoolean set to true?", atomicBoolean.get()); + } + + @Test + public void testSupplierSucceeds() throws Exception { + ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); + final AtomicBoolean atomicBoolean = new AtomicBoolean(false); + Future future = futurePool.executeFunction(() -> { + atomicBoolean.set(true); + return null; + }); + future.get(30, TimeUnit.SECONDS); + assertTrue("atomicBoolean set to true?", atomicBoolean.get()); + } + + @Test + public void testRunnableFails() throws Exception { + ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); + Future future = futurePool.executeRunnable(() -> { + throw new IllegalStateException("deliberate"); + }); + LambdaTestUtils.intercept(ExecutionException.class, () -> future.get(30, TimeUnit.SECONDS)); + } + + @Test + public void testSupplierFails() throws Exception { + ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService); + Future future = futurePool.executeFunction(() -> { + throw new IllegalStateException("deliberate"); + }); + LambdaTestUtils.intercept(ExecutionException.class, () -> future.get(30, TimeUnit.SECONDS)); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java index 5c2f7eb224151..1b02c495bc477 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java @@ -34,11 +34,11 @@ import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectInputStream; -import com.twitter.util.FuturePool; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.common.BlockCache; import org.apache.hadoop.fs.common.BlockData; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.apache.hadoop.fs.common.SingleFilePerBlockCache; import org.apache.hadoop.fs.common.Validate; import org.apache.hadoop.fs.s3a.Invoker; @@ -109,7 +109,7 @@ public static S3ObjectAttributes createObjectAttributes( } public static S3AReadOpContext createReadContext( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, String key, int fileSize, int prefetchBlockSize, @@ -195,7 +195,7 @@ public void close() { public static S3InputStream createInputStream( Class clazz, - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, String bucket, String key, int fileSize, @@ -225,7 +225,7 @@ public static S3InputStream createInputStream( } public static TestS3InMemoryInputStream createS3InMemoryInputStream( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, String bucket, String key, int fileSize) { @@ -235,7 +235,7 @@ public static TestS3InMemoryInputStream createS3InMemoryInputStream( } public static TestS3CachingInputStream createS3CachingInputStream( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, String bucket, String key, int fileSize, @@ -322,7 +322,7 @@ private static void randomDelay(int delay) { public static class TestS3CachingBlockManager extends S3CachingBlockManager { public TestS3CachingBlockManager( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, S3Reader reader, BlockData blockData, int bufferPoolSize) { @@ -359,7 +359,7 @@ protected S3File getS3File() { @Override protected S3CachingBlockManager createBlockManager( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, S3Reader reader, BlockData blockData, int bufferPoolSize) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java index 99836793decba..3f84e2e028339 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java @@ -24,13 +24,12 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import com.twitter.util.ExecutorServiceFuturePool; -import com.twitter.util.FuturePool; import org.junit.Test; import org.apache.hadoop.fs.common.BlockData; import org.apache.hadoop.fs.common.BufferData; import org.apache.hadoop.fs.common.ExceptionAsserts; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.apache.hadoop.test.AbstractHadoopTestBase; import static org.junit.Assert.assertEquals; @@ -41,7 +40,7 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase { static final int POOL_SIZE = 3; private final ExecutorService threadPool = Executors.newFixedThreadPool(4); - private final FuturePool futurePool = new ExecutorServiceFuturePool(threadPool); + private final ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(threadPool); private final BlockData blockData = new BlockData(FILE_SIZE, BLOCK_SIZE); @@ -106,7 +105,7 @@ public void testArgChecks() throws Exception { */ static class TestBlockManager extends S3CachingBlockManager { TestBlockManager( - FuturePool futurePool, + ExecutorServiceFuturePool futurePool, S3Reader reader, BlockData blockData, int bufferPoolSize) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3File.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3File.java index 2f555d2b62c47..9f63ea0a889fd 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3File.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3File.java @@ -22,11 +22,10 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import com.twitter.util.ExecutorServiceFuturePool; -import com.twitter.util.FuturePool; import org.junit.Test; import org.apache.hadoop.fs.common.ExceptionAsserts; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; @@ -36,7 +35,7 @@ public class TestS3File extends AbstractHadoopTestBase { private final ExecutorService threadPool = Executors.newFixedThreadPool(1); - private final FuturePool futurePool = new ExecutorServiceFuturePool(threadPool); + private final ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(threadPool); private final S3AInputStream.InputStreamCallbacks client = MockS3File.createClient("bucket"); @Test diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java index 503cd699002c7..010bc1c30b634 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java @@ -24,12 +24,11 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import com.twitter.util.ExecutorServiceFuturePool; -import com.twitter.util.FuturePool; import org.junit.Test; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.common.ExceptionAsserts; +import org.apache.hadoop.fs.common.ExecutorServiceFuturePool; import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AReadOpContext; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; @@ -45,7 +44,7 @@ public class TestS3InputStream extends AbstractHadoopTestBase { private static final int FILE_SIZE = 10; private final ExecutorService threadPool = Executors.newFixedThreadPool(4); - private final FuturePool futurePool = new ExecutorServiceFuturePool(threadPool); + private final ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(threadPool); private final S3AInputStream.InputStreamCallbacks client = MockS3File.createClient("bucket"); @Test