Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into YARN-11614
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored Nov 25, 2023
2 parents 21a77e9 + 476b90f commit b4dce47
Show file tree
Hide file tree
Showing 10 changed files with 464 additions and 164 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hadoop.fs.impl.prefetch;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;

/**
* This class is used to provide parameters to {@link BlockManager}.
*/
@InterfaceAudience.Private
public final class BlockManagerParameters {

/**
* Asynchronous tasks are performed in this pool.
*/
private ExecutorServiceFuturePool futurePool;

/**
* Information about each block of the underlying file.
*/
private BlockData blockData;

/**
* Size of the in-memory cache in terms of number of blocks.
*/
private int bufferPoolSize;

/**
* Statistics for the stream.
*/
private PrefetchingStatistics prefetchingStatistics;

/**
* The configuration object.
*/
private Configuration conf;

/**
* The local dir allocator instance.
*/
private LocalDirAllocator localDirAllocator;

/**
* Max blocks count to be kept in cache at any time.
*/
private int maxBlocksCount;

/**
* Tracker with statistics to update.
*/
private DurationTrackerFactory trackerFactory;

/**
* @return The Executor future pool to perform async prefetch tasks.
*/
public ExecutorServiceFuturePool getFuturePool() {
return futurePool;
}

/**
* @return The object holding blocks data info for the underlying file.
*/
public BlockData getBlockData() {
return blockData;
}

/**
* @return The size of the in-memory cache.
*/
public int getBufferPoolSize() {
return bufferPoolSize;
}

/**
* @return The prefetching statistics for the stream.
*/
public PrefetchingStatistics getPrefetchingStatistics() {
return prefetchingStatistics;
}

/**
* @return The configuration object.
*/
public Configuration getConf() {
return conf;
}

/**
* @return The local dir allocator instance.
*/
public LocalDirAllocator getLocalDirAllocator() {
return localDirAllocator;
}

/**
* @return The max blocks count to be kept in cache at any time.
*/
public int getMaxBlocksCount() {
return maxBlocksCount;
}

/**
* @return The duration tracker with statistics to update.
*/
public DurationTrackerFactory getTrackerFactory() {
return trackerFactory;
}

/**
* Sets the executor service future pool that is later used to perform
* async prefetch tasks.
*
* @param pool The future pool.
* @return The builder.
*/
public BlockManagerParameters withFuturePool(
final ExecutorServiceFuturePool pool) {
this.futurePool = pool;
return this;
}

/**
* Sets the object holding blocks data info for the underlying file.
*
* @param data The block data object.
* @return The builder.
*/
public BlockManagerParameters withBlockData(
final BlockData data) {
this.blockData = data;
return this;
}

/**
* Sets the in-memory cache size as number of blocks.
*
* @param poolSize The buffer pool size as number of blocks.
* @return The builder.
*/
public BlockManagerParameters withBufferPoolSize(
final int poolSize) {
this.bufferPoolSize = poolSize;
return this;
}

/**
* Sets the prefetching statistics for the stream.
*
* @param statistics The prefetching statistics.
* @return The builder.
*/
public BlockManagerParameters withPrefetchingStatistics(
final PrefetchingStatistics statistics) {
this.prefetchingStatistics = statistics;
return this;
}

/**
* Sets the configuration object.
*
* @param configuration The configuration object.
* @return The builder.
*/
public BlockManagerParameters withConf(
final Configuration configuration) {
this.conf = configuration;
return this;
}

/**
* Sets the local dir allocator for round-robin disk allocation
* while creating files.
*
* @param dirAllocator The local dir allocator object.
* @return The builder.
*/
public BlockManagerParameters withLocalDirAllocator(
final LocalDirAllocator dirAllocator) {
this.localDirAllocator = dirAllocator;
return this;
}

/**
* Sets the max blocks count to be kept in cache at any time.
*
* @param blocksCount The max blocks count.
* @return The builder.
*/
public BlockManagerParameters withMaxBlocksCount(
final int blocksCount) {
this.maxBlocksCount = blocksCount;
return this;
}

/**
* Sets the duration tracker with statistics to update.
*
* @param factory The tracker factory object.
* @return The builder.
*/
public BlockManagerParameters withTrackerFactory(
final DurationTrackerFactory factory) {
this.trackerFactory = factory;
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import javax.annotation.Nonnull;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -105,47 +107,33 @@ public abstract class CachingBlockManager extends BlockManager {
/**
* Constructs an instance of a {@code CachingBlockManager}.
*
* @param futurePool asynchronous tasks are performed in this pool.
* @param blockData information about each block of the underlying file.
* @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
* @param prefetchingStatistics statistics for this stream.
* @param conf the configuration.
* @param localDirAllocator the local dir allocator instance.
* @param maxBlocksCount max blocks count to be kept in cache at any time.
* @param trackerFactory tracker with statistics to update.
* @param blockManagerParameters params for block manager.
* @throws IllegalArgumentException if bufferPoolSize is zero or negative.
*/
@SuppressWarnings("checkstyle:parameternumber")
public CachingBlockManager(
ExecutorServiceFuturePool futurePool,
BlockData blockData,
int bufferPoolSize,
PrefetchingStatistics prefetchingStatistics,
Configuration conf,
LocalDirAllocator localDirAllocator,
int maxBlocksCount,
DurationTrackerFactory trackerFactory) {
super(blockData);

Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize");

this.futurePool = requireNonNull(futurePool);
this.bufferPoolSize = bufferPoolSize;
public CachingBlockManager(@Nonnull final BlockManagerParameters blockManagerParameters) {
super(blockManagerParameters.getBlockData());

Validate.checkPositiveInteger(blockManagerParameters.getBufferPoolSize(), "bufferPoolSize");

this.futurePool = requireNonNull(blockManagerParameters.getFuturePool());
this.bufferPoolSize = blockManagerParameters.getBufferPoolSize();
this.numCachingErrors = new AtomicInteger();
this.numReadErrors = new AtomicInteger();
this.cachingDisabled = new AtomicBoolean();
this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
this.conf = requireNonNull(conf);
this.prefetchingStatistics = requireNonNull(
blockManagerParameters.getPrefetchingStatistics());
this.conf = requireNonNull(blockManagerParameters.getConf());

if (this.getBlockData().getFileSize() > 0) {
this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize(),
this.prefetchingStatistics);
this.cache = this.createCache(maxBlocksCount, trackerFactory);
this.cache = this.createCache(blockManagerParameters.getMaxBlocksCount(),
blockManagerParameters.getTrackerFactory());
}

this.ops = new BlockOperations();
this.ops.setDebug(false);
this.localDirAllocator = localDirAllocator;
this.localDirAllocator = blockManagerParameters.getLocalDirAllocator();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,17 @@
import java.io.IOException;
import java.nio.ByteBuffer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.impl.prefetch.BlockData;
import org.apache.hadoop.fs.impl.prefetch.BlockManagerParameters;
import org.apache.hadoop.fs.impl.prefetch.CachingBlockManager;
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
import org.apache.hadoop.fs.impl.prefetch.Validate;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;

import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PREFETCH_MAX_BLOCKS_COUNT;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT;

/**
* Provides access to S3 file one block at a time.
*/
public class S3ACachingBlockManager extends CachingBlockManager {

private static final Logger LOG = LoggerFactory.getLogger(
S3ACachingBlockManager.class);

/**
* Reader that reads from S3 file.
*/
Expand All @@ -52,32 +41,15 @@ public class S3ACachingBlockManager extends CachingBlockManager {
/**
* Constructs an instance of a {@code S3ACachingBlockManager}.
*
* @param futurePool asynchronous tasks are performed in this pool.
* @param blockManagerParameters params for block manager.
* @param reader reader that reads from S3 file.
* @param blockData information about each block of the S3 file.
* @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
* @param streamStatistics statistics for this stream.
* @param conf the configuration.
* @param localDirAllocator the local dir allocator instance.
* @throws IllegalArgumentException if reader is null.
*/
public S3ACachingBlockManager(
ExecutorServiceFuturePool futurePool,
S3ARemoteObjectReader reader,
BlockData blockData,
int bufferPoolSize,
S3AInputStreamStatistics streamStatistics,
Configuration conf,
LocalDirAllocator localDirAllocator) {
@Nonnull final BlockManagerParameters blockManagerParameters,
final S3ARemoteObjectReader reader) {

super(futurePool,
blockData,
bufferPoolSize,
streamStatistics,
conf,
localDirAllocator,
conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT),
streamStatistics);
super(blockManagerParameters);

Validate.checkNotNull(reader, "reader");

Expand Down
Loading

0 comments on commit b4dce47

Please sign in to comment.