Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-18180. Replace use of twitter util-core with java futures #4115

6 changes: 0 additions & 6 deletions hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -435,12 +435,6 @@
<artifactId>aws-java-sdk-bundle</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>util-core_2.11</artifactId>
<version>21.2.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.zip.CRC32;

import com.twitter.util.Awaitable.CanAwait;
import com.twitter.util.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -263,8 +262,6 @@ public boolean stateEqualsOneOf(State... states) {
return false;
}

private static final CanAwait CAN_AWAIT = () -> false;

public String toString() {

return String.format(
Expand All @@ -281,7 +278,7 @@ private String getFutureStr(Future<Void> f) {
if (f == null) {
return "--";
} else {
return this.action.isReady(CAN_AWAIT) ? "done" : "not done";
return this.action.isDone() ? "done" : "not done";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;

import com.twitter.util.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -233,7 +232,7 @@ public synchronized void close() {
for (BufferData data : this.getAll()) {
Future<Void> actionFuture = data.getActionFuture();
if (actionFuture != null) {
actionFuture.raise(new CancellationException("BufferPool is closing."));
actionFuture.cancel(true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import com.twitter.util.Await;
import com.twitter.util.ExceptionalFunction0;
import com.twitter.util.Future;
import com.twitter.util.FuturePool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,9 +37,10 @@
*/
public abstract class CachingBlockManager extends BlockManager {
private static final Logger LOG = LoggerFactory.getLogger(CachingBlockManager.class);
private static final int TIMEOUT_MINUTES = 60;

// Asynchronous tasks are performed in this pool.
private final FuturePool futurePool;
private final ExecutorServiceFuturePool futurePool;

// Pool of shared ByteBuffer instances.
private BufferPool bufferPool;
Expand Down Expand Up @@ -78,7 +79,7 @@ public abstract class CachingBlockManager extends BlockManager {
* @throws IllegalArgumentException if bufferPoolSize is zero or negative.
*/
public CachingBlockManager(
FuturePool futurePool,
ExecutorServiceFuturePool futurePool,
BlockData blockData,
int bufferPoolSize) {
super(blockData);
Expand Down Expand Up @@ -247,7 +248,7 @@ public void requestPrefetch(int blockNumber) {

BlockOperations.Operation op = this.ops.requestPrefetch(blockNumber);
PrefetchTask prefetchTask = new PrefetchTask(data, this);
Future<Void> prefetchFuture = this.futurePool.apply(prefetchTask);
Future<Void> prefetchFuture = this.futurePool.executeFunction(prefetchTask);
data.setPrefetch(prefetchFuture);
this.ops.end(op);
}
Expand Down Expand Up @@ -344,7 +345,7 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...
/**
* Read task that is submitted to the future pool.
*/
private static class PrefetchTask extends ExceptionalFunction0<Void> {
private static class PrefetchTask implements Supplier<Void> {
private final BufferData data;
private final CachingBlockManager blockManager;

Expand All @@ -354,7 +355,7 @@ private static class PrefetchTask extends ExceptionalFunction0<Void> {
}

@Override
public Void applyE() {
public Void get() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to think what we want to do here.

  1. we would want errors to be counted and included in stream stats
  2. but we've seen abfs apps flooded with stack traces when a transient network error breaks every prefetch
  • its good to be aware of transient errors, but not worry too much until the final reaqd.

created https://issues.apache.org/jira/browse/HADOOP-18185

try {
this.blockManager.prefetch(data);
} catch (Exception e) {
Expand Down Expand Up @@ -412,11 +413,13 @@ public void requestCaching(BufferData data) {
if (state == BufferData.State.PREFETCHING) {
blockFuture = data.getActionFuture();
} else {
blockFuture = Future.value(null);
CompletableFuture<Void> cf = new CompletableFuture<>();
cf.complete(null);
blockFuture = cf;
}

CachePutTask task = new CachePutTask(data, blockFuture, this);
Future<Void> actionFuture = this.futurePool.apply(task);
Future<Void> actionFuture = this.futurePool.executeFunction(task);
data.setCaching(actionFuture);
this.ops.end(op);
}
Expand All @@ -433,14 +436,13 @@ private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture) {
}

try {
Await.result(blockFuture);
blockFuture.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
if (data.stateEqualsOneOf(BufferData.State.DONE)) {
// There was an error during prefetch.
return;
}
} catch (Exception e) {
String message = String.format("error waitng on blockFuture: %s", data);
LOG.error(message, e);
LOG.error("error waiting on blockFuture: {}", data, e);
data.setDone();
return;
}
Expand Down Expand Up @@ -500,7 +502,7 @@ protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException {
this.cache.put(blockNumber, buffer);
}

private static class CachePutTask extends ExceptionalFunction0<Void> {
private static class CachePutTask implements Supplier<Void> {
private final BufferData data;

// Block being asynchronously fetched.
Expand All @@ -519,7 +521,7 @@ private static class CachePutTask extends ExceptionalFunction0<Void> {
}

@Override
public Void applyE() {
public Void get() {
this.blockManager.addToCacheAndRelease(this.data, this.blockFuture);
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hadoop.fs.common;

import java.util.Locale;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Supplier;

/**
* A FuturePool implementation backed by a java.util.concurrent.ExecutorService.
*
* If a piece of work has started, it cannot (currently) be cancelled.
*
* This class is a simplified version of <code>com.twitter:util-core_2.11</code> ExecutorServiceFuturePool
* designed to avoid depending on that Scala library. One problem with using a Scala library is that many
* downstream projects (eg Apache Spark) use Scala and they might want to use a different version of Scala
* from the version that Hadoop chooses to use.
*
*/
public class ExecutorServiceFuturePool {
steveloughran marked this conversation as resolved.
Show resolved Hide resolved
private ExecutorService executor;

public ExecutorServiceFuturePool(ExecutorService executor) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting q here about what service to support/integrate with here, especially for faireness in processs with many readers.

created https://issues.apache.org/jira/browse/HADOOP-18186 as a followup to this.

this.executor = executor;
}

/**
* @param f function to run in future on executor pool
* @return future
* @throws java.util.concurrent.RejectedExecutionException can be thrown
* @throws NullPointerException if f param is null
*/
public Future<Void> executeFunction(final Supplier<Void> f) {
return executor.submit(f::get);
}

/**
* @param r runnable to run in future on executor pool
* @return future
* @throws java.util.concurrent.RejectedExecutionException can be thrown
* @throws NullPointerException if r param is null
*/
@SuppressWarnings("unchecked")
public Future<Void> executeRunnable(final Runnable r) {
return (Future<Void>) executor.submit(() -> r.run());
pjfanning marked this conversation as resolved.
Show resolved Hide resolved
}

public String toString() {
return String.format(Locale.ROOT,"ExecutorServiceFuturePool(executor=%s)", executor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@
import com.amazonaws.services.s3.transfer.model.UploadResult;
import com.amazonaws.event.ProgressListener;

import com.twitter.util.ExecutorServiceFuturePool;
import com.twitter.util.FuturePool;
import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
pjfanning marked this conversation as resolved.
Show resolved Hide resolved
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -283,7 +282,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private ThreadPoolExecutor unboundedThreadPool;

// S3 reads are prefetched asynchronously using this future pool.
private FuturePool futurePool;
private ExecutorServiceFuturePool futurePool;

// If true, the prefetching input stream is used for reads.
private boolean prefetchEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@

package org.apache.hadoop.fs.s3a;

import com.twitter.util.FuturePool;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
import org.apache.hadoop.fs.store.audit.AuditSpan;
Expand Down Expand Up @@ -61,7 +60,7 @@ public class S3AReadOpContext extends S3AOpContext {
private final AuditSpan auditSpan;

// S3 reads are prefetched asynchronously using this future pool.
private FuturePool futurePool;
private ExecutorServiceFuturePool futurePool;

// Size in bytes of a single prefetch block.
private final int prefetchBlockSize;
Expand All @@ -80,7 +79,7 @@ public class S3AReadOpContext extends S3AOpContext {
* @param changeDetectionPolicy change detection policy.
* @param readahead readahead for GET operations/skip, etc.
* @param auditSpan active audit
* @param futurePool the FuturePool instance used by async prefetches.
* @param futurePool the ExecutorServiceFuturePool instance used by async prefetches.
* @param prefetchBlockSize the size (in number of bytes) of each prefetched block.
* @param prefetchBlockCount maximum number of prefetched blocks.
*/
Expand All @@ -94,7 +93,7 @@ public S3AReadOpContext(
ChangeDetectionPolicy changeDetectionPolicy,
final long readahead,
final AuditSpan auditSpan,
FuturePool futurePool,
ExecutorServiceFuturePool futurePool,
int prefetchBlockSize,
int prefetchBlockCount) {

Expand Down Expand Up @@ -161,11 +160,11 @@ public AuditSpan getAuditSpan() {
}

/**
* Gets the {@code FuturePool} used for asynchronous prefetches.
* Gets the {@code ExecutorServiceFuturePool} used for asynchronous prefetches.
*
* @return the {@code FuturePool} used for asynchronous prefetches.
* @return the {@code ExecutorServiceFuturePool} used for asynchronous prefetches.
*/
public FuturePool getFuturePool() {
public ExecutorServiceFuturePool getFuturePool() {
return this.futurePool;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;

import com.twitter.util.FuturePool;
import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
pjfanning marked this conversation as resolved.
Show resolved Hide resolved
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -52,7 +52,7 @@ public class S3CachingBlockManager extends CachingBlockManager {
* @throws IllegalArgumentException if reader is null.
*/
public S3CachingBlockManager(
FuturePool futurePool,
ExecutorServiceFuturePool futurePool,
S3Reader reader,
BlockData blockData,
int bufferPoolSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@

import java.io.IOException;

import com.twitter.util.FuturePool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.common.BlockData;
import org.apache.hadoop.fs.common.BlockManager;
import org.apache.hadoop.fs.common.BufferData;
import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
Expand Down Expand Up @@ -186,7 +186,7 @@ public String toString() {
}

protected BlockManager createBlockManager(
FuturePool futurePool,
ExecutorServiceFuturePool futurePool,
S3Reader reader,
BlockData blockData,
int bufferPoolSize) {
Expand Down
Loading