Skip to content

Commit

Permalink
HBASE-24625 AsyncFSWAL.getLogFileSizeIfBeingWritten does not return t…
Browse files Browse the repository at this point in the history
…he expected synced file length(addendum) (#2055)

Signed-off-by: Duo Zhang <[email protected]>
  • Loading branch information
comnetwork authored Aug 10, 2020
1 parent 206d0a0 commit 7b099ea
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,11 @@ private void flush0(CompletableFuture<Long> future, ByteArrayOutputStream buffer
}
}
long pos = out.getPos();
if(pos > this.syncedLength) {
this.syncedLength = pos;
}
/**
* This flush0 method could only be called by single thread, so here we could
* safely overwrite without any synchronization.
*/
this.syncedLength = pos;
future.complete(pos);
} catch (IOException e) {
future.completeExceptionally(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter

private final Class<? extends Channel> channelClass;

private AsyncFSOutput output;
private volatile AsyncFSOutput output;
/**
* Save {@link AsyncFSOutput#getSyncedLength()} when {@link #output} is closed.
*/
private volatile long finalSyncedLength = -1;

private static final class OutputStreamWrapper extends OutputStream
implements ByteBufferWriter {
Expand Down Expand Up @@ -156,6 +160,13 @@ public synchronized void close() throws IOException {
LOG.warn("normal close failed, try recover", e);
output.recoverAndClose(null);
}
/**
* We have to call {@link AsyncFSOutput#getSyncedLength()}
* after {@link AsyncFSOutput#close()} to get the final length
* synced to underlying filesystem because {@link AsyncFSOutput#close()}
* may also flush some data to underlying filesystem.
*/
this.finalSyncedLength = this.output.getSyncedLength();
this.output = null;
}

Expand Down Expand Up @@ -234,6 +245,17 @@ protected OutputStream getOutputStreamForCellEncoder() {

@Override
public long getSyncedLength() {
return this.output.getSyncedLength();
/**
* The statement "this.output = null;" in {@link AsyncProtobufLogWriter#close}
* is a sync point, if output is null, then finalSyncedLength must set,
* so we can return finalSyncedLength, else we return output.getSyncedLength
*/
AsyncFSOutput outputToUse = this.output;
if(outputToUse == null) {
long finalSyncedLengthToUse = this.finalSyncedLength;
assert finalSyncedLengthToUse >= 0;
return finalSyncedLengthToUse;
}
return outputToUse.getSyncedLength();
}
}

0 comments on commit 7b099ea

Please sign in to comment.