Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27621 Also clear the Dictionary when resetting when reading compressed WAL file #5016

Merged
merged 1 commit into from
Feb 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,15 @@ public void uncompressTags(InputStream src, byte[] dest, int offset, int length)
throws IOException {
int endOffset = offset + length;
while (offset < endOffset) {
byte status = (byte) src.read();
byte status = StreamUtils.readByte(src);
if (status == Dictionary.NOT_IN_DICTIONARY) {
int tagLen = StreamUtils.readRawVarint32(src);
offset = Bytes.putAsShort(dest, offset, tagLen);
IOUtils.readFully(src, dest, offset, tagLen);
tagDict.addEntry(dest, offset, tagLen);
offset += tagLen;
} else {
short dictIdx = StreamUtils.toShort(status, (byte) src.read());
short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(src));
byte[] entry = tagDict.getEntry(dictIdx);
if (entry == null) {
throw new IOException("Missing dictionary entry for index " + dictIdx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.util;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -206,6 +207,22 @@ public static Pair<Integer, Integer> readRawVarint32(ByteBuffer input, int offse
return new Pair<>(result, newOffset - offset);
}

/**
* Read a byte from the given stream using the read method, and throw EOFException if it returns
* -1, like the implementation in {@code DataInputStream}.
* <p/>
* This is useful because casting the return value of read method into byte directly will make us
* lose the ability to check whether there is a byte and its value is -1 or we reach EOF, as
* casting int -1 to byte also returns -1.
*/
public static byte readByte(InputStream in) throws IOException {
int r = in.read();
if (r < 0) {
throw new EOFException();
}
return (byte) r;
}

public static short toShort(byte hi, byte lo) {
short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF));
Preconditions.checkArgument(s >= 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ static byte[] readCompressed(DataInput in, Dictionary dict) throws IOException {
// if this isn't in the dictionary, we need to add to the dictionary.
byte[] arr = new byte[length];
in.readFully(arr);
if (dict != null) dict.addEntry(arr, 0, length);
if (dict != null) {
dict.addEntry(arr, 0, length);
}
return arr;
} else {
// Status here is the higher-order byte of index of the dictionary entry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ public class ProtobufLogReader extends ReaderBase {
// cell codec classname
private String codecClsName = null;

// a flag indicate that whether we need to reset compression context when seeking back
private boolean resetCompression;

@InterfaceAudience.Private
public long trailerSize() {
if (trailerPresent) {
Expand Down Expand Up @@ -160,6 +163,9 @@ public long getPosition() throws IOException {
@Override
public void reset() throws IOException {
String clsName = initInternal(null, false);
if (resetCompression) {
resetCompression();
}
initAfterCompression(clsName); // We need a new decoder (at least).
}

Expand Down Expand Up @@ -361,6 +367,8 @@ protected boolean readNext(Entry entry) throws IOException {
WALKey.Builder builder = WALKey.newBuilder();
long size = 0;
boolean resetPosition = false;
// by default, we should reset the compression when seeking back after reading something
resetCompression = true;
try {
long available = -1;
try {
Expand All @@ -372,6 +380,14 @@ protected boolean readNext(Entry entry) throws IOException {
// available may be < 0 on local fs for instance. If so, can't depend on it.
available = this.inputStream.available();
if (available > 0 && available < size) {
// if we quit here, we have just read the length, no actual data yet, which means we
// haven't put anything into the compression dictionary yet, so when seeking back to the
// last good position, we do not need to reset compression context.
// This is very useful for saving the extra effort for reconstructing the compression
// dictionary, where we need to read from the beginning instead of just seek to the
// position, as DFSInputStream implement the available method, so in most cases we will
// reach here if there are not enough data.
resetCompression = false;
throw new EOFException("Available stream not enough for edit, "
+ "inputStream.available()= " + this.inputStream.available() + ", " + "entry size= "
+ size + " at offset = " + this.inputStream.getPos());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public abstract class ReaderBase implements AbstractFSWALProvider.Reader {
* Compression context to use reading. Can be null if no compression.
*/
protected CompressionContext compressionContext = null;
protected boolean emptyCompressionContext = true;
private boolean emptyCompressionContext = true;

/**
* Default constructor.
Expand Down Expand Up @@ -130,6 +130,17 @@ public void seek(long pos) throws IOException {
seekOnFs(pos);
}

/**
* Clear the {@link ReaderBase#compressionContext}, and also set {@link #emptyCompressionContext}
* to true, so when seeking, we will try to skip to the position and reconstruct the dictionary.
*/
protected final void resetCompression() {
if (compressionContext != null) {
compressionContext.clear();
emptyCompressionContext = true;
}
}

/**
* Initializes the log reader with a particular stream (may be null). Reader assumes ownership of
* the stream if not null and may use it. Called once.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,18 +197,20 @@ public byte[] uncompress(ByteString data, Enum dictIndex) {

private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException {
InputStream in = bs.newInput();
byte status = (byte) in.read();
byte status = StreamUtils.readByte(in);
if (status == Dictionary.NOT_IN_DICTIONARY) {
byte[] arr = new byte[StreamUtils.readRawVarint32(in)];
int bytesRead = in.read(arr);
if (bytesRead != arr.length) {
throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead);
}
if (dict != null) dict.addEntry(arr, 0, arr.length);
if (dict != null) {
dict.addEntry(arr, 0, arr.length);
}
return arr;
} else {
// Status here is the higher-order byte of index of the dictionary entry.
short dictIdx = StreamUtils.toShort(status, (byte) in.read());
short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(in));
byte[] entry = dict.getEntry(dictIdx);
if (entry == null) {
throw new IOException("Missing dictionary entry for index " + dictIdx);
Expand Down Expand Up @@ -350,7 +352,7 @@ protected Cell parseCell() throws IOException {
}

private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOException {
byte status = (byte) in.read();
byte status = StreamUtils.readByte(in);
if (status == Dictionary.NOT_IN_DICTIONARY) {
// status byte indicating that data to be read is not in dictionary.
// if this isn't in the dictionary, we need to add to the dictionary.
Expand All @@ -360,7 +362,7 @@ private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOExcep
return length;
} else {
// the status byte also acts as the higher order byte of the dictionary entry.
short dictIdx = StreamUtils.toShort(status, (byte) in.read());
short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(in));
byte[] entry = dict.getEntry(dictIdx);
if (entry == null) {
throw new IOException("Missing dictionary entry for index " + dictIdx);
Expand Down
Loading