Skip to content

Commit

Permalink
HBASE-26680 Close and do not write trailer for the broken WAL writer (#…
Browse files Browse the repository at this point in the history
…4174)

Signed-off-by: Duo Zhang <[email protected]>
  • Loading branch information
sunhelly committed Mar 16, 2022
1 parent d3629bb commit db5e954
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,38 +166,43 @@ private boolean initializeCompressionContext(Configuration conf, Path path) thro
public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable,
long blocksize, StreamSlowMonitor monitor) throws IOException,
StreamLacksCapabilityException {
this.conf = conf;
boolean doCompress = initializeCompressionContext(conf, path);
this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
int bufferSize = CommonFSUtils.getDefaultBufferSize(fs);
short replication = (short) conf.getInt("hbase.regionserver.hlog.replication",
CommonFSUtils.getDefaultReplication(fs, path));

initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor);

boolean doTagCompress = doCompress &&
conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
boolean doValueCompress = doCompress &&
conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false);
WALHeader.Builder headerBuilder = WALHeader.newBuilder()
.setHasCompression(doCompress)
.setHasTagCompression(doTagCompress)
.setHasValueCompression(doValueCompress);
if (doValueCompress) {
headerBuilder.setValueCompressionAlgorithm(
CompressionContext.getValueCompressionAlgorithm(conf).ordinal());
}
length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC,
buildWALHeader(conf, headerBuilder)));
try {
this.conf = conf;
boolean doCompress = initializeCompressionContext(conf, path);
this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
int bufferSize = CommonFSUtils.getDefaultBufferSize(fs);
short replication = (short) conf.getInt("hbase.regionserver.hlog.replication",
CommonFSUtils.getDefaultReplication(fs, path));

initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor);

boolean doTagCompress =
doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
boolean doValueCompress =
doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false);
WALHeader.Builder headerBuilder =
WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress)
.setHasValueCompression(doValueCompress);
if (doValueCompress) {
headerBuilder.setValueCompressionAlgorithm(
CompressionContext.getValueCompressionAlgorithm(conf).ordinal());
}
length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC,
buildWALHeader(conf, headerBuilder)));

initAfterHeader(doCompress);
initAfterHeader(doCompress);

// instantiate trailer to default value.
trailer = WALTrailer.newBuilder().build();
// instantiate trailer to default value.
trailer = WALTrailer.newBuilder().build();

if (LOG.isTraceEnabled()) {
LOG.trace("Initialized protobuf WAL={}, compression={}, tagCompression={}" +
", valueCompression={}", path, doCompress, doTagCompress, doValueCompress);
if (LOG.isTraceEnabled()) {
LOG.trace("Initialized protobuf WAL={}, compression={}, tagCompression={}"
+ ", valueCompression={}", path, doCompress, doTagCompress, doValueCompress);
}
} catch (Exception e) {
LOG.warn("Init output failed, path={}", path, e);
closeOutput();
throw e;
}
}

Expand Down Expand Up @@ -265,6 +270,11 @@ protected abstract void initOutput(FileSystem fs, Path path, boolean overwritabl
short replication, long blockSize, StreamSlowMonitor monitor)
throws IOException, StreamLacksCapabilityException;

/**
* simply close the output, do not need to write trailer like the Writer.close
*/
protected abstract void closeOutput();

/**
* return the file length after written.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;

import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;

Expand Down Expand Up @@ -197,6 +196,17 @@ protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bu
this.asyncOutputWrapper = new OutputStreamWrapper(output);
}

@Override
protected void closeOutput() {
if (this.output != null) {
try {
this.output.close();
} catch (IOException e) {
LOG.warn("Close output failed", e);
}
}
}

private long writeWALMetadata(Consumer<CompletableFuture<Long>> action) throws IOException {
CompletableFuture<Long> future = new CompletableFuture<>();
action.accept(future);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,17 @@ protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bu
}
}

@Override
protected void closeOutput() {
if (this.output != null) {
try {
this.output.close();
} catch (IOException e) {
LOG.warn("Close output failed", e);
}
}
}

@Override
protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException {
output.write(magic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,6 @@ public static Writer createWriter(final Configuration conf, final FileSystem fs,
} else {
LOG.debug("Error instantiating log writer.", e);
}
if (writer != null) {
try{
writer.close();
} catch(IOException ee){
LOG.error("cannot close log writer", ee);
}
}
throw new IOException("cannot get log writer", e);
}
}
Expand Down

0 comments on commit db5e954

Please sign in to comment.