Skip to content

Commit

Permalink
HBASE-22890 Verify the file integrity in persistent IOEngine
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhaoBQ committed Sep 12, 2019
1 parent 3f84591 commit 5f266e0
Show file tree
Hide file tree
Showing 5 changed files with 560 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
Expand Down Expand Up @@ -69,6 +70,7 @@
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.IdReadWriteLock;
Expand Down Expand Up @@ -242,6 +244,17 @@ public int compare(BlockCacheKey a, BlockCacheKey b) {
/** In-memory bucket size */
private float memoryFactor;

private String ioEngineName;
private static final String FILE_VERIFY_ALGORITHM =
"hbase.bucketcache.persistent.file.integrity.check.algorithm";
private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";

/**
* Use {@link java.security.MessageDigest} class's encryption algorithms to check
* persistence file integrity, default algorithm is MD5
* */
private String algorithm;

public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException,
IOException {
Expand All @@ -252,8 +265,7 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration,
Configuration conf)
throws FileNotFoundException, IOException {
this.ioEngine = getIOEngineFromName(ioEngineName, capacity);
throws IOException {
this.writerThreads = new WriterThread[writerThreadNum];
long blockNumCapacity = capacity / blockSize;
if (blockNumCapacity >= Integer.MAX_VALUE) {
Expand All @@ -275,6 +287,7 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
", memoryFactor: " + memoryFactor);

this.cacheCapacity = capacity;
this.ioEngineName = ioEngineName;
this.persistencePath = persistencePath;
this.blockSize = blockSize;
this.ioErrorsTolerationDuration = ioErrorsTolerationDuration;
Expand All @@ -288,14 +301,15 @@ public BucketCache(String ioEngineName, long capacity, int blockSize, int[] buck
this.ramCache = new ConcurrentHashMap<BlockCacheKey, RAMQueueEntry>();

this.backingMap = new ConcurrentHashMap<BlockCacheKey, BucketEntry>((int) blockNumCapacity);

this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, DEFAULT_FILE_VERIFY_ALGORITHM);
ioEngine = getIOEngineFromName();
if (ioEngine.isPersistent() && persistencePath != null) {
try {
retrieveFromFile(bucketSizes);
} catch (IOException ioex) {
LOG.error("Can't restore from file because of", ioex);
} catch (ClassNotFoundException cnfe) {
LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe);
LOG.error("Can't restore from file in rebuild because can't deserialise", cnfe);
throw new RuntimeException(cnfe);
}
}
Expand Down Expand Up @@ -359,24 +373,22 @@ public String getIoEngine() {

/**
* Get the IOEngine from the IO engine name
* @param ioEngineName
* @param capacity
* @return the IOEngine
* @throws IOException
*/
private IOEngine getIOEngineFromName(String ioEngineName, long capacity)
private IOEngine getIOEngineFromName()
throws IOException {
if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) {
// In order to make the usage simple, we only need the prefix 'files:' in
// document whether one or multiple file(s), but also support 'file:' for
// the compatibility
String[] filePaths =
ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER);
return new FileIOEngine(capacity, filePaths);
return new FileIOEngine(algorithm, persistencePath, cacheCapacity, filePaths);
} else if (ioEngineName.startsWith("offheap"))
return new ByteBufferIOEngine(capacity, true);
return new ByteBufferIOEngine(cacheCapacity, true);
else if (ioEngineName.startsWith("heap"))
return new ByteBufferIOEngine(capacity, false);
return new ByteBufferIOEngine(cacheCapacity, false);
else
throw new IllegalArgumentException(
"Don't understand io engine name for cache - prefix with file:, heap or offheap");
Expand Down Expand Up @@ -1021,41 +1033,48 @@ static List<RAMQueueEntry> getRAMQueueEntries(final BlockingQueue<RAMQueueEntry>

private void persistToFile() throws IOException {
assert !cacheEnabled;
FileOutputStream fos = null;
ObjectOutputStream oos = null;
try {
try (ObjectOutputStream oos = new ObjectOutputStream(
new FileOutputStream(persistencePath, false))){
if (!ioEngine.isPersistent()) {
throw new IOException("Attempt to persist non-persistent cache mappings!");
}
fos = new FileOutputStream(persistencePath, false);
oos = new ObjectOutputStream(fos);
if (ioEngine instanceof PersistentIOEngine) {
oos.write(ProtobufUtil.PB_MAGIC);
byte[] checksum = ((PersistentIOEngine) ioEngine).calculateChecksum();
oos.writeInt(checksum.length);
oos.write(checksum);
}
oos.writeLong(cacheCapacity);
oos.writeUTF(ioEngine.getClass().getName());
oos.writeUTF(backingMap.getClass().getName());
oos.writeObject(deserialiserMap);
oos.writeObject(backingMap);
} finally {
if (oos != null) oos.close();
if (fos != null) fos.close();
} catch (NoSuchAlgorithmException e) {
LOG.error("No such algorithm : " + algorithm + "! Failed to persist data on exit",e);
}
}

@SuppressWarnings("unchecked")
private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException,
private void retrieveFromFile(int[] bucketSizes) throws IOException,
ClassNotFoundException {
File persistenceFile = new File(persistencePath);
if (!persistenceFile.exists()) {
return;
}
assert !cacheEnabled;
FileInputStream fis = null;
ObjectInputStream ois = null;
try {
try (ObjectInputStream ois = new ObjectInputStream(new FileInputStream(persistencePath))){
if (!ioEngine.isPersistent())
throw new IOException(
"Attempt to restore non-persistent cache mappings!");
fis = new FileInputStream(persistencePath);
ois = new ObjectInputStream(fis);
// for backward compatibility
if (ioEngine instanceof PersistentIOEngine &&
!((PersistentIOEngine) ioEngine).isOldVersion()) {
byte[] PBMagic = new byte[ProtobufUtil.PB_MAGIC.length];
ois.read(PBMagic);
int length = ois.readInt();
byte[] persistenceChecksum = new byte[length];
ois.read(persistenceChecksum);
}
long capacitySize = ois.readLong();
if (capacitySize != cacheCapacity)
throw new IOException("Mismatched cache capacity:"
Expand All @@ -1078,9 +1097,8 @@ private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAlloc
bucketAllocator = allocator;
deserialiserMap = deserMap;
backingMap = backingMapFromFile;
blockNumber.set(backingMap.size());
} finally {
if (ois != null) ois.close();
if (fis != null) fis.close();
if (!persistenceFile.delete()) {
throw new IOException("Failed deleting persistence file "
+ persistenceFile.getAbsolutePath());
Expand Down Expand Up @@ -1597,4 +1615,9 @@ float getMultiFactor() {
float getMemoryFactor() {
return memoryFactor;
}

@VisibleForTesting
public UniqueIndexMap<Integer> getDeserialiserMap() {
return deserialiserMap;
}
}
Loading

0 comments on commit 5f266e0

Please sign in to comment.