Skip to content

Commit

Permalink
HBASE-27313: Persist list of Hfiles names for which prefetch is done
Browse files Browse the repository at this point in the history
  • Loading branch information
Kota-SH committed Aug 30, 2022
1 parent b44bfc5 commit 3500f03
Show file tree
Hide file tree
Showing 8 changed files with 518 additions and 2 deletions.
31 changes: 31 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/PrefetchPersistence.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto2";

package hbase.pb;

option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
option java_outer_classname = "PersistentPrefetchProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;


message PrefetchedHfileName {
map<string, bool> prefetched_files = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class BlockCacheKey implements HeapSize, java.io.Serializable {
private static final long serialVersionUID = -5199992013113130534L;
private final String hfileName;
private final long offset;
private final BlockType blockType;
private BlockType blockType;
private final boolean isPrimaryReplicaBlock;

/**
Expand Down Expand Up @@ -98,4 +98,8 @@ public long getOffset() {
public BlockType getBlockType() {
return blockType;
}

public void setBlockType(BlockType blockType){
this.blockType = blockType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ public class CacheConfig {
public static final String DROP_BEHIND_CACHE_COMPACTION_KEY =
"hbase.hfile.drop.behind.compaction";

public static String PREFETCH_PERSISTENCE_PATH_KEY = "hbase.prefetch.file-list.path";

// Defaults
public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
public static final boolean DEFAULT_CACHE_DATA_ON_WRITE = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
*/
package org.apache.hadoop.hbase.io.hfile;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Future;
Expand All @@ -37,19 +42,24 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.generated.PersistentPrefetchProtos;

@InterfaceAudience.Private
public final class PrefetchExecutor {

private static final Logger LOG = LoggerFactory.getLogger(PrefetchExecutor.class);

/** Futures for tracking block prefetch activity */
private static final Map<Path, Future<?>> prefetchFutures = new ConcurrentSkipListMap<>();
/** Set of files for which prefetch is completed */
public static HashMap<String, Boolean> prefetchCompleted = new HashMap<>();
/** Executor pool shared among all HFiles for block prefetch */
private static final ScheduledExecutorService prefetchExecutorPool;
/** Delay before beginning prefetch */
private static final int prefetchDelayMillis;
/** Variation in prefetch delay times, to mitigate stampedes */
private static final float prefetchDelayVariation;
static String prefetchedFileListPath;
static {
// Consider doing this on demand with a configuration passed in rather
// than in a static initializer.
Expand Down Expand Up @@ -79,6 +89,13 @@ public Thread newThread(Runnable r) {
+ HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") + Path.SEPARATOR_CHAR + ")");

public static void request(Path path, Runnable runnable) {
if (prefetchCompleted != null) {
if (prefetchCompleted.containsKey(path.getName())) {
LOG.info(
"File has already been prefetched before the restart, so skipping prefetch : " + path);
return;
}
}
if (!prefetchPathExclude.matcher(path.toString()).find()) {
long delay;
if (prefetchDelayMillis > 0) {
Expand All @@ -104,6 +121,7 @@ public static void request(Path path, Runnable runnable) {

public static void complete(Path path) {
prefetchFutures.remove(path);
prefetchCompleted.put(path.getName(), true);
LOG.debug("Prefetch completed for {}", path);
}

Expand All @@ -115,6 +133,7 @@ public static void cancel(Path path) {
prefetchFutures.remove(path);
LOG.debug("Prefetch cancelled for {}", path);
}
prefetchCompleted.remove(path.getName());
}

public static boolean isCompleted(Path path) {
Expand All @@ -125,6 +144,60 @@ public static boolean isCompleted(Path path) {
return true;
}

private PrefetchExecutor() {
public static void persistToFile(String path) throws IOException {
prefetchedFileListPath = path;
if (prefetchedFileListPath == null) {
LOG.info("Exception while persisting prefetch!");
throw new IOException("Error persisting prefetched HFiles set!");
}
if (!prefetchCompleted.isEmpty()) {
try (FileOutputStream fos = new FileOutputStream(prefetchedFileListPath, true)) {
PrefetchProtoUtils.toPB(prefetchCompleted).writeDelimitedTo(fos);
}
}
}

public static void retrieveFromFile(String path) throws IOException {
prefetchedFileListPath = path;
File prefetchPersistenceFile = new File(prefetchedFileListPath);
if (!prefetchPersistenceFile.exists()) {
LOG.warn("Prefetch persistence file does not exist!");
return;
}
LOG.info("Retrieving from prefetch persistence file " + path);
assert (prefetchedFileListPath != null);
try (FileInputStream fis = deleteFileOnClose(prefetchPersistenceFile)) {
PersistentPrefetchProtos.PrefetchedHfileName proto =
PersistentPrefetchProtos.PrefetchedHfileName.parseDelimitedFrom(fis);
Map<String, Boolean> protoPrefetchedFilesMap = proto.getPrefetchedFilesMap();
prefetchCompleted.putAll(protoPrefetchedFilesMap);
}
}

private static FileInputStream deleteFileOnClose(final File file) throws IOException {
return new FileInputStream(file) {
private File myFile;

private FileInputStream init(File file) {
myFile = file;
return this;
}

@Override
public void close() throws IOException {
if (myFile == null) {
return;
}

super.close();
if (!myFile.delete()) {
throw new IOException("Failed deleting persistence file " + myFile.getAbsolutePath());
}
myFile = null;
}
}.init(file);
}

public PrefetchExecutor() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;

import java.util.Map;

import org.apache.hadoop.hbase.shaded.protobuf.generated.PersistentPrefetchProtos;

final class PrefetchProtoUtils {
private PrefetchProtoUtils() {
}

static PersistentPrefetchProtos.PrefetchedHfileName
toPB(Map<String, Boolean> prefetchedHfileNames) {
return PersistentPrefetchProtos.PrefetchedHfileName.newBuilder()
.putAllPrefetchedFiles(prefetchedHfileNames).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
Expand All @@ -82,6 +83,7 @@
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos;
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY;

/**
* BucketCache uses {@link BucketAllocator} to allocate/free blocks, and uses BucketCache#ramCache
Expand Down Expand Up @@ -239,6 +241,8 @@ public class BucketCache implements BlockCache, HeapSize {
/** In-memory bucket size */
private float memoryFactor;

private String prefetchedFileListPath;

private static final String FILE_VERIFY_ALGORITHM =
"hbase.bucketcache.persistent.file.integrity.check.algorithm";
private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
Expand Down Expand Up @@ -283,6 +287,7 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR);
this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR);
this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR);
this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY);

sanityCheckConfigs();

Expand Down Expand Up @@ -463,6 +468,9 @@ protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cach
if (!cacheEnabled) {
return;
}
if(cacheKey.getBlockType() == null && cachedItem.getBlockType() != null){
cacheKey.setBlockType(cachedItem.getBlockType());
}
LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
// Stuff the entry into the RAM cache so it can get drained to the persistent store
RAMQueueEntry re =
Expand Down Expand Up @@ -1198,6 +1206,9 @@ private void persistToFile() throws IOException {
fos.write(ProtobufMagic.PB_MAGIC);
BucketProtoUtils.toPB(this).writeDelimitedTo(fos);
}
if (prefetchedFileListPath != null) {
PrefetchExecutor.persistToFile(prefetchedFileListPath);
}
}

/**
Expand All @@ -1209,6 +1220,9 @@ private void retrieveFromFile(int[] bucketSizes) throws IOException {
return;
}
assert !cacheEnabled;
if (prefetchedFileListPath != null) {
PrefetchExecutor.retrieveFromFile(prefetchedFileListPath);
}

try (FileInputStream in = deleteFileOnClose(persistenceFile)) {
int pblen = ProtobufMagic.lengthOfPBMagic();
Expand Down Expand Up @@ -1417,6 +1431,7 @@ protected String getAlgorithm() {
*/
@Override
public int evictBlocksByHfileName(String hfileName) {
PrefetchExecutor.prefetchCompleted.remove(hfileName);
Set<BlockCacheKey> keySet = blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE),
true, new BlockCacheKey(hfileName, Long.MAX_VALUE), true);

Expand Down
Loading

0 comments on commit 3500f03

Please sign in to comment.