Skip to content

Commit

Permalink
HBASE-27668 PB's parseDelimitedFrom can successfully return when ther…
Browse files Browse the repository at this point in the history
…e are not enough bytes (#5059)

Signed-off-by: Viraj Jasani <[email protected]>
(cherry picked from commit d1fede7)
  • Loading branch information
Apache9 committed Feb 28, 2023
1 parent 53d3e78 commit 5b6d9de
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.hbase.protobuf.ProtobufMagic.PB_MAGIC;

import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Constructor;
Expand Down Expand Up @@ -133,6 +134,7 @@
import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.Parser;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.Service;
Expand Down Expand Up @@ -3596,4 +3598,52 @@ public static ClusterStatusProtos.ServerTask toServerTask(ServerTask task) {
.setStartTime(task.getStartTime()).setCompletionTime(task.getCompletionTime()).build();
}

/**
* Check whether this IPBE indicates EOF or not.
* <p/>
* We will check the exception message, if it is likely the one of
* InvalidProtocolBufferException.truncatedMessage, we will consider it as EOF, otherwise not.
*/
public static boolean isEOF(InvalidProtocolBufferException e) {
return e.getMessage().contains("input has been truncated");
}

/**
* This is a wrapper of the PB message's parseDelimitedFrom. The difference is, if we can not
* determine whether there are enough bytes in stream, i.e, the available method does not have a
* valid return value, we will try to read all the bytes to a byte array first, and then parse the
* pb message with {@link Parser#parseFrom(byte[])} instead of call
* {@link Parser#parseDelimitedFrom(InputStream)} directly. This is because even if the bytes are
* not enough bytes, {@link Parser#parseDelimitedFrom(InputStream)} could still return without any
* errors but just leave us a partial PB message.
* @return The PB message if we can parse it successfully, otherwise there will always be an
* exception thrown, will never return {@code null}.
*/
public static <T extends Message> T parseDelimitedFrom(InputStream in, Parser<T> parser)
throws IOException {
int firstByte = in.read();
if (firstByte < 0) {
throw new EOFException("EOF while reading message size");
}
int size = CodedInputStream.readRawVarint32(firstByte, in);
int available = in.available();
if (available > 0) {
if (available < size) {
throw new EOFException("Available bytes not enough for parsing PB message, expect at least "
+ size + " bytes, but only " + available + " bytes available");
}
// this piece of code is copied from GeneratedMessageV3.parseFrom
try {
return parser.parseFrom(ByteStreams.limit(in, size));
} catch (InvalidProtocolBufferException e) {
throw e.unwrapIOException();
}
} else {
// this usually means the stream does not have a proper available implementation, let's read
// the content to an byte array before parsing.
byte[] bytes = new byte[size];
ByteStreams.readFully(in, bytes);
return parser.parseFrom(bytes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -53,6 +54,7 @@
import org.apache.hbase.thirdparty.com.google.protobuf.Any;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.BytesValue;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;

import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
Expand Down Expand Up @@ -574,4 +576,21 @@ public void testTagEncodeTrueDecodeFalse() {
List<Tag> decodedTags = PrivateCellUtil.getTags(decodedCell);
assertEquals(0, decodedTags.size());
}

/**
* Used to confirm that we only consider truncatedMessage as EOF
*/
@Test
public void testIsEOF() throws Exception {
for (Method method : InvalidProtocolBufferException.class.getDeclaredMethods()) {
if (
method.getParameterCount() == 0
&& method.getReturnType() == InvalidProtocolBufferException.class
) {
method.setAccessible(true);
InvalidProtocolBufferException e = (InvalidProtocolBufferException) method.invoke(null);
assertEquals(method.getName().equals("truncatedMessage"), ProtobufUtil.isEOF(e));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
Expand Down Expand Up @@ -358,60 +356,46 @@ protected Compression.Algorithm getValueCompressionAlgorithm() {

@Override
protected boolean readNext(Entry entry) throws IOException {
resetCompression = false;
// OriginalPosition might be < 0 on local fs; if so, it is useless to us.
long originalPosition = this.inputStream.getPos();
if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
LOG.trace("Reached end of expected edits area at offset {}", originalPosition);
return false;
}
WALKey.Builder builder = WALKey.newBuilder();
long size = 0;
boolean resetPosition = false;
// by default, we should reset the compression when seeking back after reading something
resetCompression = true;
try {
long available = -1;
WALKey walKey;
try {
int firstByte = this.inputStream.read();
if (firstByte == -1) {
throw new EOFException();
}
size = CodedInputStream.readRawVarint32(firstByte, this.inputStream);
// available may be < 0 on local fs for instance. If so, can't depend on it.
available = this.inputStream.available();
if (available > 0 && available < size) {
// if we quit here, we have just read the length, no actual data yet, which means we
// haven't put anything into the compression dictionary yet, so when seeking back to the
// last good position, we do not need to reset compression context.
// This is very useful for saving the extra effort for reconstructing the compression
// dictionary, where we need to read from the beginning instead of just seek to the
// position, as DFSInputStream implement the available method, so in most cases we will
// reach here if there are not enough data.
resetCompression = false;
throw new EOFException("Available stream not enough for edit, "
+ "inputStream.available()= " + this.inputStream.available() + ", " + "entry size= "
+ size + " at offset = " + this.inputStream.getPos());
walKey = ProtobufUtil.parseDelimitedFrom(inputStream, WALKey.parser());
} catch (InvalidProtocolBufferException e) {
if (ProtobufUtil.isEOF(e) || isWALTrailer(originalPosition)) {
// only rethrow EOF if it indicates an EOF, or we have reached the partial WALTrailer
resetPosition = true;
throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition="
+ originalPosition + ", currentPosition=" + this.inputStream.getPos()).initCause(e);
} else {
throw e;
}
ProtobufUtil.mergeFrom(builder, ByteStreams.limit(this.inputStream, size), (int) size);
} catch (InvalidProtocolBufferException ipbe) {
resetPosition = true;
throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition="
+ originalPosition + ", currentPosition=" + this.inputStream.getPos() + ", messageSize="
+ size + ", currentAvailable=" + available).initCause(ipbe);
} catch (EOFException e) {
// append more detailed information
throw (EOFException) new EOFException("EOF while reading WAL key; originalPosition="
+ originalPosition + ", currentPosition=" + this.inputStream.getPos()).initCause(e);
}
if (!builder.isInitialized()) {
// TODO: not clear if we should try to recover from corrupt PB that looks semi-legit.
// If we can get the KV count, we could, theoretically, try to get next record.
throw new EOFException("Partial PB while reading WAL, "
+ "probably an unexpected EOF, ignoring. current offset=" + this.inputStream.getPos());
}
WALKey walKey = builder.build();
entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
LOG.debug("WALKey has no KVs that follow it; trying the next one. current offset={}",
this.inputStream.getPos());
return true;
}
// Starting from here, we will start to read cells, which will change the content in
// compression dictionary, so if we fail in the below operations, when resetting, we also need
// to clear the compression context, and read from the beginning to reconstruct the
// compression dictionary, instead of seeking to the position directly.
// This is very useful for saving the extra effort for reconstructing the compression
// dictionary, as DFSInputStream implement the available method, so in most cases we will
// not reach here if there are not enough data.
resetCompression = true;
int expectedCells = walKey.getFollowingKvCount();
long posBefore = this.inputStream.getPos();
try {
Expand Down Expand Up @@ -490,6 +474,54 @@ private IOException extractHiddenEof(Exception ex) {
return null;
}

/**
* This is used to determine whether we have already reached the WALTrailer. As the size and magic
* are at the end of the WAL file, it is possible that these two options are missing while
* writing, so we will consider there is no trailer. And when we actually reach the WALTrailer, we
* will try to decode it as WALKey and we will fail but the error could vary as it is parsing
* WALTrailer actually.
* @return whether this is a WALTrailer and we should throw EOF to upper layer the file is done
*/
private boolean isWALTrailer(long startPosition) throws IOException {
// We have nothing in the WALTrailer PB message now so its size is just a int length size and a
// magic at the end
int trailerSize = PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT;
if (fileLength - startPosition >= trailerSize) {
// We still have more than trailerSize bytes before reaching the EOF so this is not a trailer.
// We also test for == here because if this is a valid trailer, we can read it while opening
// the reader so we should not reach here
return false;
}
inputStream.seek(startPosition);
for (int i = 0; i < 4; i++) {
int r = inputStream.read();
if (r == -1) {
// we have reached EOF while reading the length, and all bytes read are 0, so we assume this
// is a partial trailer
return true;
}
if (r != 0) {
// the length is not 0, should not be a trailer
return false;
}
}
for (int i = 0; i < PB_WAL_COMPLETE_MAGIC.length; i++) {
int r = inputStream.read();
if (r == -1) {
// we have reached EOF while reading the magic, and all bytes read are matched, so we assume
// this is a partial trailer
return true;
}
if (r != (PB_WAL_COMPLETE_MAGIC[i] & 0xFF)) {
// does not match magic, should not be a trailer
return false;
}
}
// in fact we should not reach here, as this means the trailer bytes are all matched and
// complete, then we should not call this method...
return true;
}

@Override
protected void seekOnFs(long pos) throws IOException {
this.inputStream.seek(pos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import org.junit.Test;
import org.junit.rules.TestName;

import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;

/**
* WAL tests that can be reused across providers.
*/
Expand Down Expand Up @@ -111,6 +113,9 @@ public static void tearDownAfterClass() throws Exception {
*/
@Test
public void testWALTrailer() throws IOException {
// make sure that the size for WALTrailer is 0, we need this assumption when reading partial
// WALTrailer
assertEquals(0, WALTrailer.newBuilder().build().getSerializedSize());
// read With trailer.
doRead(true);
// read without trailer
Expand Down

0 comments on commit 5b6d9de

Please sign in to comment.