Skip to content

Commit

Permalink
HADOOP-18184. yetus and code reviews.
Browse files Browse the repository at this point in the history
 ITestS3APrefetchingCacheFiles.testCacheFileExistence:
 135->assertCacheFileExists:151
  [No cache files found under
   /var/folders/4n/w4cjr_d95kg9bxkl6sz3n3ym0000gr/
    T/ITestS3APrefetchingCacheFiles2189128656118567478]

Change-Id: I009082bb5bee66f0b224164a1806f4d6c3c54020
  • Loading branch information
steveloughran committed Aug 3, 2023
1 parent c0e4f1c commit ec5ae7a
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public void close() {
public enum CancelReason {
/** Stream has switched to random IO. */
RandomIO,
/** Stream closed completely */
/** Stream closed completely. */
Close,
/** Stream unbuffered. */
Unbuffer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
Expand All @@ -42,10 +43,11 @@

import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNegative;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;

/**
* Provides read access to the underlying file one block at a time.
* Improve read performance by prefetching and locall caching blocks.
* Improve read performance by prefetching and locally caching blocks.
*/
public abstract class CachingBlockManager extends BlockManager {
private static final Logger LOG = LoggerFactory.getLogger(CachingBlockManager.class);
Expand Down Expand Up @@ -432,7 +434,7 @@ private boolean isClosed() {

/**
* Disable caching; updates stream statistics and logs exactly once
* at info
* at info.
* @param endOp operation which measured the duration of the write.
*/
private void disableCaching(final BlockOperations.End endOp) {
Expand Down Expand Up @@ -566,21 +568,23 @@ private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture,
LOG.debug("Block {}: awaiting any read to complete", blockNumber);

try {
blockFuture.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
// wait for data; state of caching may change during this period.
awaitFuture(blockFuture, TIMEOUT_MINUTES, TimeUnit.MINUTES);
if (data.stateEqualsOneOf(BufferData.State.DONE)) {
// There was an error during prefetch.
LOG.debug("Block {}: prefetch failure", blockNumber);
return;
}
} catch (Exception e) {
LOG.info("error waiting on blockFuture: {}. {}", data, e.getMessage());
LOG.debug("error waiting on blockFuture: {}", data, e);
} catch (IOException | TimeoutException e) {
LOG.info("Error fetching block: {}. {}", data, e.toString());
LOG.debug("Error fetching block: {}", data, e);
data.setDone();
return;
}

if (isCachingDisabled()) {
LOG.debug("Block {}: Preparing caching disabled while reading data", blockNumber);
// caching was disabled while waiting fro the read to complete.
LOG.debug("Block {}: caching disabled while reading data", blockNumber);
data.setDone();
return;
}
Expand All @@ -590,8 +594,7 @@ private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture,
synchronized (data) {
try {
if (data.stateEqualsOneOf(BufferData.State.DONE)) {
LOG.debug("Block {}: Block already in cache; not adding", blockNumber);

LOG.debug("Block {}: block no longer in use; not adding", blockNumber);
return;
}

Expand All @@ -606,10 +609,10 @@ private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture,
buffer.rewind();
cachePut(blockNumber, buffer);
data.setDone();
} catch (Exception e) {
} catch (IOException e) {
numCachingErrors.incrementAndGet();
LOG.info("error adding block to cache after wait: {}. {}", data, e.getMessage());
LOG.debug("error adding block to cache after wait: {}", data, e);
LOG.info("error adding block to cache: {}. {}", data, e.getMessage());
LOG.debug("error adding block to cache: {}", data, e);
data.setDone();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package org.apache.hadoop.test;

/**
* Sizes of data in KiB/MiB
* Sizes of data in KiB/MiB.
*/
public class Sizes {
public final class Sizes {

public static final int S_256 = 256;
public static final int S_512 = 512;
Expand All @@ -48,4 +48,6 @@ public class Sizes {
public static final int S_64M = 64 * S_1M;
public static final double NANOSEC = 1.0e9;

private Sizes() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,18 @@ public String toString() {
return sb.toString();
}

/**
* Construct an instance of a {@code S3ACachingBlockManager}.
*
* @param futurePool asynchronous tasks are performed in this pool.
* @param reader reader that reads from S3 file.
* @param blockData information about each block of the S3 file.
* @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
* @param streamStatistics statistics for this stream.
* @param conf the configuration.
* @param localDirAllocator the local dir allocator instance.
* @throws IllegalArgumentException if reader is null.
*/
protected BlockManager createBlockManager(
ExecutorServiceFuturePool futurePool,
S3ARemoteObjectReader reader,
Expand Down

0 comments on commit ec5ae7a

Please sign in to comment.