Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-5466. Refactor BlockOutputStream. #2442

Merged
merged 2 commits into from
Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
Expand All @@ -34,6 +33,7 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
Expand Down Expand Up @@ -109,11 +109,6 @@ public class BlockOutputStream extends OutputStream {
// which got written between successive putBlock calls.
private List<ChunkBuffer> bufferList;

// This object will maintain the commitIndexes and byteBufferList in order
// Also, corresponding to the logIndex, the corresponding list of buffers will
// be released from the buffer pool.
private final CommitWatcher commitWatcher;

private final List<DatanodeDetails> failedServers;
private final Checksum checksum;

Expand Down Expand Up @@ -154,7 +149,7 @@ public BlockOutputStream(
this.token = token;

//number of buffers used before doing a flush
refreshCurrentBuffer(bufferPool);
refreshCurrentBuffer();
flushPeriod = (int) (config.getStreamBufferFlushSize() / config
.getStreamBufferSize());

Expand All @@ -165,7 +160,6 @@ public BlockOutputStream(

// A single thread executor handle the responses of async requests
responseExecutor = Executors.newSingleThreadExecutor();
commitWatcher = new CommitWatcher(bufferPool, xceiverClient);
bufferList = null;
totalDataFlushedLength = 0;
writtenDataLength = 0;
Expand All @@ -175,8 +169,8 @@ public BlockOutputStream(
config.getBytesPerChecksum());
}

private void refreshCurrentBuffer(BufferPool pool) {
currentBuffer = pool.getCurrentBuffer();
void refreshCurrentBuffer() {
currentBuffer = bufferPool.getCurrentBuffer();
currentBufferRemaining =
currentBuffer != null ? currentBuffer.remaining() : 0;
}
Expand All @@ -186,7 +180,7 @@ public BlockID getBlockID() {
}

public long getTotalAckDataLength() {
return commitWatcher.getTotalAckDataLength();
return 0;
}

public long getWrittenDataLength() {
Expand Down Expand Up @@ -216,11 +210,6 @@ public IOException getIoException() {
return ioException.get();
}

@VisibleForTesting
public Map<Long, List<ChunkBuffer>> getCommitIndex2flushedDataMap() {
return commitWatcher.getCommitIndex2flushedDataMap();
}

@Override
public void write(int b) throws IOException {
checkOpen();
Expand Down Expand Up @@ -345,9 +334,7 @@ public void writeOnRetry(long len) throws IOException {
private void handleFullBuffer() throws IOException {
try {
checkOpen();
if (!commitWatcher.getFutureMap().isEmpty()) {
waitOnFlushFutures();
}
waitOnFlushFutures();
} catch (ExecutionException e) {
handleExecutionException(e);
} catch (InterruptedException ex) {
Expand All @@ -357,28 +344,33 @@ private void handleFullBuffer() throws IOException {
watchForCommit(true);
}

void releaseBuffersOnException() {
}

// It may happen that once the exception is encountered , we still might
// have successfully flushed up to a certain index. Make sure the buffers
// only contain data which have not been sufficiently replicated
private void adjustBuffersOnException() {
commitWatcher.releaseBuffersOnException();
refreshCurrentBuffer(bufferPool);
releaseBuffersOnException();
refreshCurrentBuffer();
}

XceiverClientReply sendWatchForCommit(boolean bufferFull)
throws IOException {
return null;
}

/**
* calls watchForCommit API of the Ratis Client. For Standalone client,
* it is a no op.
* @param bufferFull flag indicating whether bufferFull condition is hit or
* its called as part flush/close
* @return minimum commit index replicated to all nodes
* @throws IOException IOException in case watch gets timed out
*/
private void watchForCommit(boolean bufferFull) throws IOException {
checkOpen();
try {
XceiverClientReply reply = bufferFull ?
commitWatcher.watchOnFirstIndex() : commitWatcher.watchOnLastIndex();
final XceiverClientReply reply = sendWatchForCommit(bufferFull);
if (reply != null) {
List<DatanodeDetails> dnList = reply.getDatanodes();
if (!dnList.isEmpty()) {
Expand All @@ -393,8 +385,10 @@ private void watchForCommit(boolean bufferFull) throws IOException {
setIoException(ioe);
throw getIoException();
}
refreshCurrentBuffer(bufferPool);
refreshCurrentBuffer();
}

void updateCommitInfo(XceiverClientReply reply, List<ChunkBuffer> buffers) {
}

/**
Expand Down Expand Up @@ -441,16 +435,14 @@ ContainerCommandResponseProto> executePutBlock(boolean close,
blockID.set(responseBlockID);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Adding index " + asyncReply.getLogIndex() + " commitMap size "
+ commitWatcher.getCommitInfoMapSize() + " flushLength "
"Adding index " + asyncReply.getLogIndex() + " flushLength "
+ flushPos + " numBuffers " + byteBufferList.size()
+ " blockID " + blockID + " bufferPool size" + bufferPool
.getSize() + " currentBufferIndex " + bufferPool
.getCurrentBufferIndex());
}
// for standalone protocol, logIndex will always be 0.
commitWatcher
.updateCommitInfoMap(asyncReply.getLogIndex(), byteBufferList);
updateCommitInfo(asyncReply, byteBufferList);
}
return e;
}, responseExecutor).exceptionally(e -> {
Expand All @@ -468,10 +460,14 @@ ContainerCommandResponseProto> executePutBlock(boolean close,
Thread.currentThread().interrupt();
handleInterruptedException(ex, false);
}
commitWatcher.getFutureMap().put(flushPos, flushFuture);
putFlushFuture(flushPos, flushFuture);
return flushFuture;
}

void putFlushFuture(long flushPos,
CompletableFuture<ContainerCommandResponseProto> flushFuture) {
}

@Override
public void flush() throws IOException {
if (xceiverClientFactory != null && xceiverClient != null
Expand Down Expand Up @@ -514,7 +510,7 @@ private void handleFlush(boolean close)
checkOpen();
// flush the last chunk data residing on the currentBuffer
if (totalDataFlushedLength < writtenDataLength) {
refreshCurrentBuffer(bufferPool);
refreshCurrentBuffer();
Preconditions.checkArgument(currentBuffer.position() > 0);
if (currentBuffer.hasRemaining()) {
writeChunk(currentBuffer);
Expand Down Expand Up @@ -561,13 +557,7 @@ public void close() throws IOException {
}
}

private void waitOnFlushFutures()
throws InterruptedException, ExecutionException {
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
commitWatcher.getFutureMap().values().toArray(
new CompletableFuture[commitWatcher.getFutureMap().size()]));
// wait for all the transactions to complete
combinedFuture.get();
void waitOnFlushFutures() throws InterruptedException, ExecutionException {
}

private void validateResponse(
Expand Down Expand Up @@ -601,13 +591,17 @@ private void setIoException(Exception e) {
}
}

void cleanup() {
}

public void cleanup(boolean invalidateClient) {
if (xceiverClientFactory != null) {
xceiverClientFactory.releaseClient(xceiverClient, invalidateClient);
}
xceiverClientFactory = null;
xceiverClient = null;
commitWatcher.cleanup();
cleanup();

if (bufferList != null) {
bufferList.clear();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.storage;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
* An {@link OutputStream} used by the REST service in combination with the
* SCMClient to write the value of a key to a sequence
* of container chunks. Writes are buffered locally and periodically written to
* the container as a new chunk. In order to preserve the semantics that
* replacement of a pre-existing key is atomic, each instance of the stream has
* an internal unique identifier. This unique identifier and a monotonically
* increasing chunk index form a composite key that is used as the chunk name.
* After all data is written, a putKey call creates or updates the corresponding
* container key, and this call includes the full list of chunks that make up
* the key data. The list of chunks is updated all at once. Therefore, a
* concurrent reader never can see an intermediate state in which different
* chunks of data from different versions of the key data are interleaved.
* This class encapsulates all state management for buffering and writing
* through to the container.
*/
public class RatisBlockOutputStream extends BlockOutputStream {
public static final Logger LOG = LoggerFactory.getLogger(
RatisBlockOutputStream.class);

// This object will maintain the commitIndexes and byteBufferList in order
// Also, corresponding to the logIndex, the corresponding list of buffers will
// be released from the buffer pool.
private final CommitWatcher commitWatcher;

/**
* Creates a new BlockOutputStream.
*
* @param blockID block ID
* @param bufferPool pool of buffers
*/
public RatisBlockOutputStream(
BlockID blockID,
XceiverClientFactory xceiverClientManager,
Pipeline pipeline,
BufferPool bufferPool,
OzoneClientConfig config,
Token<? extends TokenIdentifier> token
) throws IOException {
super(blockID, xceiverClientManager, pipeline, bufferPool, config, token);
this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient());
}

@Override
public long getTotalAckDataLength() {
return commitWatcher.getTotalAckDataLength();
}

@VisibleForTesting
public Map<Long, List<ChunkBuffer>> getCommitIndex2flushedDataMap() {
return commitWatcher.getCommitIndex2flushedDataMap();
}

@Override
void releaseBuffersOnException() {
commitWatcher.releaseBuffersOnException();
}

@Override
XceiverClientReply sendWatchForCommit(boolean bufferFull) throws IOException {
return bufferFull? commitWatcher.watchOnFirstIndex()
: commitWatcher.watchOnLastIndex();
}

@Override
void updateCommitInfo(XceiverClientReply reply, List<ChunkBuffer> buffers) {
commitWatcher.updateCommitInfoMap(reply.getLogIndex(), buffers);
}

@Override
void putFlushFuture(long flushPos,
CompletableFuture<ContainerCommandResponseProto> flushFuture) {
commitWatcher.getFutureMap().put(flushPos, flushFuture);
}

@Override
void waitOnFlushFutures() throws InterruptedException, ExecutionException {
// wait for all the transactions to complete
CompletableFuture.allOf(commitWatcher.getFutureMap().values().toArray(
new CompletableFuture[0])).get();
}

@Override
void cleanup() {
commitWatcher.cleanup();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool)
config.setChecksumType(ChecksumType.NONE);
config.setBytesPerChecksum(256 * 1024);

BlockOutputStream outputStream = new BlockOutputStream(
BlockOutputStream outputStream = new RatisBlockOutputStream(
new BlockID(1L, 1L),
xcm,
pipeline,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.hdds.scm.storage.RatisBlockOutputStream;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;

Expand Down Expand Up @@ -96,7 +97,7 @@ long getRemaining() {
private void checkStream() throws IOException {
if (this.outputStream == null) {
this.outputStream =
new BlockOutputStream(blockID, xceiverClientManager,
new RatisBlockOutputStream(blockID, xceiverClientManager,
pipeline, bufferPool, config, token);
}
}
Expand Down
Loading