Skip to content

Commit

Permalink
HDDS-5466. Refactor BlockOutputStream. (#2442)
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo authored Jul 28, 2021
1 parent 08e948b commit 59d6e95
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
Expand All @@ -34,6 +33,7 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
Expand Down Expand Up @@ -109,11 +109,6 @@ public class BlockOutputStream extends OutputStream {
// which got written between successive putBlock calls.
private List<ChunkBuffer> bufferList;

// This object will maintain the commitIndexes and byteBufferList in order
// Also, corresponding to the logIndex, the corresponding list of buffers will
// be released from the buffer pool.
private final CommitWatcher commitWatcher;

private final List<DatanodeDetails> failedServers;
private final Checksum checksum;

Expand Down Expand Up @@ -154,7 +149,7 @@ public BlockOutputStream(
this.token = token;

//number of buffers used before doing a flush
refreshCurrentBuffer(bufferPool);
refreshCurrentBuffer();
flushPeriod = (int) (config.getStreamBufferFlushSize() / config
.getStreamBufferSize());

Expand All @@ -165,7 +160,6 @@ public BlockOutputStream(

// A single thread executor handle the responses of async requests
responseExecutor = Executors.newSingleThreadExecutor();
commitWatcher = new CommitWatcher(bufferPool, xceiverClient);
bufferList = null;
totalDataFlushedLength = 0;
writtenDataLength = 0;
Expand All @@ -175,8 +169,8 @@ public BlockOutputStream(
config.getBytesPerChecksum());
}

private void refreshCurrentBuffer(BufferPool pool) {
currentBuffer = pool.getCurrentBuffer();
void refreshCurrentBuffer() {
currentBuffer = bufferPool.getCurrentBuffer();
currentBufferRemaining =
currentBuffer != null ? currentBuffer.remaining() : 0;
}
Expand All @@ -186,7 +180,7 @@ public BlockID getBlockID() {
}

public long getTotalAckDataLength() {
return commitWatcher.getTotalAckDataLength();
return 0;
}

public long getWrittenDataLength() {
Expand Down Expand Up @@ -216,11 +210,6 @@ public IOException getIoException() {
return ioException.get();
}

@VisibleForTesting
public Map<Long, List<ChunkBuffer>> getCommitIndex2flushedDataMap() {
return commitWatcher.getCommitIndex2flushedDataMap();
}

@Override
public void write(int b) throws IOException {
checkOpen();
Expand Down Expand Up @@ -345,9 +334,7 @@ public void writeOnRetry(long len) throws IOException {
private void handleFullBuffer() throws IOException {
try {
checkOpen();
if (!commitWatcher.getFutureMap().isEmpty()) {
waitOnFlushFutures();
}
waitOnFlushFutures();
} catch (ExecutionException e) {
handleExecutionException(e);
} catch (InterruptedException ex) {
Expand All @@ -357,28 +344,33 @@ private void handleFullBuffer() throws IOException {
watchForCommit(true);
}

void releaseBuffersOnException() {
}

// It may happen that once the exception is encountered , we still might
// have successfully flushed up to a certain index. Make sure the buffers
// only contain data which have not been sufficiently replicated
private void adjustBuffersOnException() {
commitWatcher.releaseBuffersOnException();
refreshCurrentBuffer(bufferPool);
releaseBuffersOnException();
refreshCurrentBuffer();
}

XceiverClientReply sendWatchForCommit(boolean bufferFull)
throws IOException {
return null;
}

/**
* calls watchForCommit API of the Ratis Client. For Standalone client,
* it is a no op.
* @param bufferFull flag indicating whether bufferFull condition is hit or
* its called as part flush/close
* @return minimum commit index replicated to all nodes
* @throws IOException IOException in case watch gets timed out
*/
private void watchForCommit(boolean bufferFull) throws IOException {
checkOpen();
try {
XceiverClientReply reply = bufferFull ?
commitWatcher.watchOnFirstIndex() : commitWatcher.watchOnLastIndex();
final XceiverClientReply reply = sendWatchForCommit(bufferFull);
if (reply != null) {
List<DatanodeDetails> dnList = reply.getDatanodes();
if (!dnList.isEmpty()) {
Expand All @@ -393,8 +385,10 @@ private void watchForCommit(boolean bufferFull) throws IOException {
setIoException(ioe);
throw getIoException();
}
refreshCurrentBuffer(bufferPool);
refreshCurrentBuffer();
}

void updateCommitInfo(XceiverClientReply reply, List<ChunkBuffer> buffers) {
}

/**
Expand Down Expand Up @@ -441,16 +435,14 @@ ContainerCommandResponseProto> executePutBlock(boolean close,
blockID.set(responseBlockID);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Adding index " + asyncReply.getLogIndex() + " commitMap size "
+ commitWatcher.getCommitInfoMapSize() + " flushLength "
"Adding index " + asyncReply.getLogIndex() + " flushLength "
+ flushPos + " numBuffers " + byteBufferList.size()
+ " blockID " + blockID + " bufferPool size" + bufferPool
.getSize() + " currentBufferIndex " + bufferPool
.getCurrentBufferIndex());
}
// for standalone protocol, logIndex will always be 0.
commitWatcher
.updateCommitInfoMap(asyncReply.getLogIndex(), byteBufferList);
updateCommitInfo(asyncReply, byteBufferList);
}
return e;
}, responseExecutor).exceptionally(e -> {
Expand All @@ -468,10 +460,14 @@ ContainerCommandResponseProto> executePutBlock(boolean close,
Thread.currentThread().interrupt();
handleInterruptedException(ex, false);
}
commitWatcher.getFutureMap().put(flushPos, flushFuture);
putFlushFuture(flushPos, flushFuture);
return flushFuture;
}

void putFlushFuture(long flushPos,
CompletableFuture<ContainerCommandResponseProto> flushFuture) {
}

@Override
public void flush() throws IOException {
if (xceiverClientFactory != null && xceiverClient != null
Expand Down Expand Up @@ -514,7 +510,7 @@ private void handleFlush(boolean close)
checkOpen();
// flush the last chunk data residing on the currentBuffer
if (totalDataFlushedLength < writtenDataLength) {
refreshCurrentBuffer(bufferPool);
refreshCurrentBuffer();
Preconditions.checkArgument(currentBuffer.position() > 0);
if (currentBuffer.hasRemaining()) {
writeChunk(currentBuffer);
Expand Down Expand Up @@ -561,13 +557,7 @@ public void close() throws IOException {
}
}

private void waitOnFlushFutures()
throws InterruptedException, ExecutionException {
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
commitWatcher.getFutureMap().values().toArray(
new CompletableFuture[commitWatcher.getFutureMap().size()]));
// wait for all the transactions to complete
combinedFuture.get();
void waitOnFlushFutures() throws InterruptedException, ExecutionException {
}

private void validateResponse(
Expand Down Expand Up @@ -601,13 +591,17 @@ private void setIoException(Exception e) {
}
}

void cleanup() {
}

public void cleanup(boolean invalidateClient) {
if (xceiverClientFactory != null) {
xceiverClientFactory.releaseClient(xceiverClient, invalidateClient);
}
xceiverClientFactory = null;
xceiverClient = null;
commitWatcher.cleanup();
cleanup();

if (bufferList != null) {
bufferList.clear();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.storage;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
* An {@link OutputStream} used by the REST service in combination with the
* SCMClient to write the value of a key to a sequence
* of container chunks. Writes are buffered locally and periodically written to
* the container as a new chunk. In order to preserve the semantics that
* replacement of a pre-existing key is atomic, each instance of the stream has
* an internal unique identifier. This unique identifier and a monotonically
* increasing chunk index form a composite key that is used as the chunk name.
* After all data is written, a putKey call creates or updates the corresponding
* container key, and this call includes the full list of chunks that make up
* the key data. The list of chunks is updated all at once. Therefore, a
* concurrent reader never can see an intermediate state in which different
* chunks of data from different versions of the key data are interleaved.
* This class encapsulates all state management for buffering and writing
* through to the container.
*/
public class RatisBlockOutputStream extends BlockOutputStream {
public static final Logger LOG = LoggerFactory.getLogger(
RatisBlockOutputStream.class);

// This object will maintain the commitIndexes and byteBufferList in order
// Also, corresponding to the logIndex, the corresponding list of buffers will
// be released from the buffer pool.
private final CommitWatcher commitWatcher;

/**
* Creates a new BlockOutputStream.
*
* @param blockID block ID
* @param bufferPool pool of buffers
*/
public RatisBlockOutputStream(
BlockID blockID,
XceiverClientFactory xceiverClientManager,
Pipeline pipeline,
BufferPool bufferPool,
OzoneClientConfig config,
Token<? extends TokenIdentifier> token
) throws IOException {
super(blockID, xceiverClientManager, pipeline, bufferPool, config, token);
this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient());
}

@Override
public long getTotalAckDataLength() {
return commitWatcher.getTotalAckDataLength();
}

@VisibleForTesting
public Map<Long, List<ChunkBuffer>> getCommitIndex2flushedDataMap() {
return commitWatcher.getCommitIndex2flushedDataMap();
}

@Override
void releaseBuffersOnException() {
commitWatcher.releaseBuffersOnException();
}

@Override
XceiverClientReply sendWatchForCommit(boolean bufferFull) throws IOException {
return bufferFull? commitWatcher.watchOnFirstIndex()
: commitWatcher.watchOnLastIndex();
}

@Override
void updateCommitInfo(XceiverClientReply reply, List<ChunkBuffer> buffers) {
commitWatcher.updateCommitInfoMap(reply.getLogIndex(), buffers);
}

@Override
void putFlushFuture(long flushPos,
CompletableFuture<ContainerCommandResponseProto> flushFuture) {
commitWatcher.getFutureMap().put(flushPos, flushFuture);
}

@Override
void waitOnFlushFutures() throws InterruptedException, ExecutionException {
// wait for all the transactions to complete
CompletableFuture.allOf(commitWatcher.getFutureMap().values().toArray(
new CompletableFuture[0])).get();
}

@Override
void cleanup() {
commitWatcher.cleanup();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool)
config.setChecksumType(ChecksumType.NONE);
config.setBytesPerChecksum(256 * 1024);

BlockOutputStream outputStream = new BlockOutputStream(
BlockOutputStream outputStream = new RatisBlockOutputStream(
new BlockID(1L, 1L),
xcm,
pipeline,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.hdds.scm.storage.RatisBlockOutputStream;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;

Expand Down Expand Up @@ -96,7 +97,7 @@ long getRemaining() {
private void checkStream() throws IOException {
if (this.outputStream == null) {
this.outputStream =
new BlockOutputStream(blockID, xceiverClientManager,
new RatisBlockOutputStream(blockID, xceiverClientManager,
pipeline, bufferPool, config, token);
}
}
Expand Down
Loading

0 comments on commit 59d6e95

Please sign in to comment.