Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into YARN-7707-V2
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored Jul 17, 2023
2 parents 24447a3 + c44823d commit 0d8944f
Show file tree
Hide file tree
Showing 40 changed files with 968 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,15 @@ public class CommonConfigurationKeysPublic {
public static final String FS_TRASH_INTERVAL_KEY = "fs.trash.interval";
/** Default value for FS_TRASH_INTERVAL_KEY */
public static final long FS_TRASH_INTERVAL_DEFAULT = 0;
/**
* @see
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
* core-default.xml</a>
*/
public static final String FS_TRASH_CLEAN_TRASHROOT_ENABLE_KEY =
"fs.trash.clean.trashroot.enable";
/** Default value for FS_TRASH_CLEAN_TRASHROOT_ENABLE_KEY. */
public static final boolean FS_TRASH_CLEAN_TRASHROOT_ENABLE_DEFAULT = false;
/**
* @see
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CLEAN_TRASHROOT_ENABLE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CLEAN_TRASHROOT_ENABLE_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;

Expand Down Expand Up @@ -70,6 +72,8 @@ public class TrashPolicyDefault extends TrashPolicy {

private long emptierInterval;

private boolean cleanNonCheckpointUnderTrashRoot;

public TrashPolicyDefault() { }

private TrashPolicyDefault(FileSystem fs, Configuration conf)
Expand All @@ -90,6 +94,8 @@ public void initialize(Configuration conf, FileSystem fs, Path home) {
this.emptierInterval = (long)(conf.getFloat(
FS_TRASH_CHECKPOINT_INTERVAL_KEY, FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT)
* MSECS_PER_MINUTE);
this.cleanNonCheckpointUnderTrashRoot = conf.getBoolean(
FS_TRASH_CLEAN_TRASHROOT_ENABLE_KEY, FS_TRASH_CLEAN_TRASHROOT_ENABLE_DEFAULT);
}

@Override
Expand All @@ -101,6 +107,8 @@ public void initialize(Configuration conf, FileSystem fs) {
this.emptierInterval = (long)(conf.getFloat(
FS_TRASH_CHECKPOINT_INTERVAL_KEY, FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT)
* MSECS_PER_MINUTE);
this.cleanNonCheckpointUnderTrashRoot = conf.getBoolean(
FS_TRASH_CLEAN_TRASHROOT_ENABLE_KEY, FS_TRASH_CLEAN_TRASHROOT_ENABLE_DEFAULT);
if (deletionInterval < 0) {
LOG.warn("Invalid value {} for deletion interval,"
+ " deletion interaval can not be negative."
Expand Down Expand Up @@ -374,8 +382,14 @@ private void deleteCheckpoint(Path trashRoot, boolean deleteImmediately)
try {
time = getTimeFromCheckpoint(name);
} catch (ParseException e) {
LOG.warn("Unexpected item in trash: "+dir+". Ignoring.");
continue;
if (cleanNonCheckpointUnderTrashRoot) {
fs.delete(path, true);
LOG.warn("Unexpected item in trash: " + dir + ". Deleting.");
continue;
} else {
LOG.warn("Unexpected item in trash: " + dir + ". Ignoring.");
continue;
}
}

if (((now - deletionInterval) > time) || deleteImmediately) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public abstract class CachingBlockManager extends BlockManager {
* @param prefetchingStatistics statistics for this stream.
* @param conf the configuration.
* @param localDirAllocator the local dir allocator instance.
* @param maxBlocksCount max blocks count to be kept in cache at any time.
* @throws IllegalArgumentException if bufferPoolSize is zero or negative.
*/
public CachingBlockManager(
Expand All @@ -118,7 +119,8 @@ public CachingBlockManager(
int bufferPoolSize,
PrefetchingStatistics prefetchingStatistics,
Configuration conf,
LocalDirAllocator localDirAllocator) {
LocalDirAllocator localDirAllocator,
int maxBlocksCount) {
super(blockData);

Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize");
Expand All @@ -129,16 +131,16 @@ public CachingBlockManager(
this.numReadErrors = new AtomicInteger();
this.cachingDisabled = new AtomicBoolean();
this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
this.conf = requireNonNull(conf);

if (this.getBlockData().getFileSize() > 0) {
this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize(),
this.prefetchingStatistics);
this.cache = this.createCache();
this.cache = this.createCache(maxBlocksCount);
}

this.ops = new BlockOperations();
this.ops.setDebug(false);
this.conf = requireNonNull(conf);
this.localDirAllocator = localDirAllocator;
}

Expand Down Expand Up @@ -557,8 +559,8 @@ private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture,
}
}

protected BlockCache createCache() {
return new SingleFilePerBlockCache(prefetchingStatistics);
protected BlockCache createCache(int maxBlocksCount) {
return new SingleFilePerBlockCache(prefetchingStatistics, maxBlocksCount);
}

protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hadoop.fs.impl.prefetch;

import java.util.concurrent.TimeUnit;

/**
* Constants used by prefetch implementations.
*/
public final class PrefetchConstants {

private PrefetchConstants() {
}

/**
* Timeout to be used by close, while acquiring prefetch block write lock.
* Value = {@value PREFETCH_WRITE_LOCK_TIMEOUT}
*/
static final int PREFETCH_WRITE_LOCK_TIMEOUT = 5;

/**
* Lock timeout unit to be used by the thread while acquiring prefetch block write lock.
* Value = {@value PREFETCH_WRITE_LOCK_TIMEOUT_UNIT}
*/
static final TimeUnit PREFETCH_WRITE_LOCK_TIMEOUT_UNIT = TimeUnit.SECONDS;

}
Loading

0 comments on commit 0d8944f

Please sign in to comment.