Skip to content

Commit

Permalink
HBASE-26680 Close and do not write trailer for the broken WAL writer (a…
Browse files Browse the repository at this point in the history
…pache#4174)

Signed-off-by: Duo Zhang <[email protected]>
(cherry picked from commit 8cce0d3)
Change-Id: I2514bda4a62089328ed5f561ef2b98798ec7d727
  • Loading branch information
sunhelly authored and petersomogyi committed Mar 29, 2022
1 parent dfd1d76 commit 082e7a8
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,26 +151,32 @@ private boolean initializeCompressionContext(Configuration conf, Path path) thro

public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable,
long blocksize) throws IOException, StreamLacksCapabilityException {
this.conf = conf;
boolean doCompress = initializeCompressionContext(conf, path);
this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
int bufferSize = CommonFSUtils.getDefaultBufferSize(fs);
short replication = (short) conf.getInt("hbase.regionserver.hlog.replication",
CommonFSUtils.getDefaultReplication(fs, path));

initOutput(fs, path, overwritable, bufferSize, replication, blocksize);

boolean doTagCompress = doCompress
&& conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC, buildWALHeader(conf,
WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress))));

initAfterHeader(doCompress);

// instantiate trailer to default value.
trailer = WALTrailer.newBuilder().build();
if (LOG.isTraceEnabled()) {
LOG.trace("Initialized protobuf WAL=" + path + ", compression=" + doCompress);
try {
this.conf = conf;
boolean doCompress = initializeCompressionContext(conf, path);
this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
int bufferSize = CommonFSUtils.getDefaultBufferSize(fs);
short replication = (short) conf.getInt("hbase.regionserver.hlog.replication",
CommonFSUtils.getDefaultReplication(fs, path));

initOutput(fs, path, overwritable, bufferSize, replication, blocksize);

boolean doTagCompress =
doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC, buildWALHeader(conf,
WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress))));

initAfterHeader(doCompress);

// instantiate trailer to default value.
trailer = WALTrailer.newBuilder().build();
if (LOG.isTraceEnabled()) {
LOG.trace("Initialized protobuf WAL=" + path + ", compression=" + doCompress);
}
} catch (Exception e) {
LOG.warn("Init output failed, path={}", path, e);
closeOutput();
throw e;
}
}

Expand Down Expand Up @@ -237,6 +243,11 @@ protected void writeWALTrailer() {
protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
short replication, long blockSize) throws IOException, StreamLacksCapabilityException;

/**
* simply close the output, do not need to write trailer like the Writer.close
*/
protected abstract void closeOutput();

/**
* return the file length after written.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,17 @@ protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bu
this.asyncOutputWrapper = new OutputStreamWrapper(output);
}

@Override
protected void closeOutput() {
if (this.output != null) {
try {
this.output.close();
} catch (IOException e) {
LOG.warn("Close output failed", e);
}
}
}

private long writeWALMetadata(Consumer<CompletableFuture<Long>> action) throws IOException {
CompletableFuture<Long> future = new CompletableFuture<>();
action.accept(future);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,17 @@ protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bu
}
}

@Override
protected void closeOutput() {
if (this.output != null) {
try {
this.output.close();
} catch (IOException e) {
LOG.warn("Close output failed", e);
}
}
}

@Override
protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException {
output.write(magic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,6 @@ public static Writer createWriter(final Configuration conf, final FileSystem fs,
} else {
LOG.debug("Error instantiating log writer.", e);
}
if (writer != null) {
try{
writer.close();
} catch(IOException ee){
LOG.error("cannot close log writer", ee);
}
}
throw new IOException("cannot get log writer", e);
}
}
Expand Down

0 comments on commit 082e7a8

Please sign in to comment.