diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index ca674949cb0..0cfbb5ac95c 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -21,7 +21,6 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; @@ -34,6 +33,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; import org.apache.hadoop.hdds.scm.OzoneClientConfig; @@ -109,11 +109,6 @@ public class BlockOutputStream extends OutputStream { // which got written between successive putBlock calls. private List bufferList; - // This object will maintain the commitIndexes and byteBufferList in order - // Also, corresponding to the logIndex, the corresponding list of buffers will - // be released from the buffer pool. - private final CommitWatcher commitWatcher; - private final List failedServers; private final Checksum checksum; @@ -154,7 +149,7 @@ public BlockOutputStream( this.token = token; //number of buffers used before doing a flush - refreshCurrentBuffer(bufferPool); + refreshCurrentBuffer(); flushPeriod = (int) (config.getStreamBufferFlushSize() / config .getStreamBufferSize()); @@ -165,7 +160,6 @@ public BlockOutputStream( // A single thread executor handle the responses of async requests responseExecutor = Executors.newSingleThreadExecutor(); - commitWatcher = new CommitWatcher(bufferPool, xceiverClient); bufferList = null; totalDataFlushedLength = 0; writtenDataLength = 0; @@ -175,8 +169,8 @@ public BlockOutputStream( config.getBytesPerChecksum()); } - private void refreshCurrentBuffer(BufferPool pool) { - currentBuffer = pool.getCurrentBuffer(); + void refreshCurrentBuffer() { + currentBuffer = bufferPool.getCurrentBuffer(); currentBufferRemaining = currentBuffer != null ? currentBuffer.remaining() : 0; } @@ -186,7 +180,7 @@ public BlockID getBlockID() { } public long getTotalAckDataLength() { - return commitWatcher.getTotalAckDataLength(); + return 0; } public long getWrittenDataLength() { @@ -216,11 +210,6 @@ public IOException getIoException() { return ioException.get(); } - @VisibleForTesting - public Map> getCommitIndex2flushedDataMap() { - return commitWatcher.getCommitIndex2flushedDataMap(); - } - @Override public void write(int b) throws IOException { checkOpen(); @@ -345,9 +334,7 @@ public void writeOnRetry(long len) throws IOException { private void handleFullBuffer() throws IOException { try { checkOpen(); - if (!commitWatcher.getFutureMap().isEmpty()) { - waitOnFlushFutures(); - } + waitOnFlushFutures(); } catch (ExecutionException e) { handleExecutionException(e); } catch (InterruptedException ex) { @@ -357,13 +344,20 @@ private void handleFullBuffer() throws IOException { watchForCommit(true); } + void releaseBuffersOnException() { + } // It may happen that once the exception is encountered , we still might // have successfully flushed up to a certain index. Make sure the buffers // only contain data which have not been sufficiently replicated private void adjustBuffersOnException() { - commitWatcher.releaseBuffersOnException(); - refreshCurrentBuffer(bufferPool); + releaseBuffersOnException(); + refreshCurrentBuffer(); + } + + XceiverClientReply sendWatchForCommit(boolean bufferFull) + throws IOException { + return null; } /** @@ -371,14 +365,12 @@ private void adjustBuffersOnException() { * it is a no op. * @param bufferFull flag indicating whether bufferFull condition is hit or * its called as part flush/close - * @return minimum commit index replicated to all nodes * @throws IOException IOException in case watch gets timed out */ private void watchForCommit(boolean bufferFull) throws IOException { checkOpen(); try { - XceiverClientReply reply = bufferFull ? - commitWatcher.watchOnFirstIndex() : commitWatcher.watchOnLastIndex(); + final XceiverClientReply reply = sendWatchForCommit(bufferFull); if (reply != null) { List dnList = reply.getDatanodes(); if (!dnList.isEmpty()) { @@ -393,8 +385,10 @@ private void watchForCommit(boolean bufferFull) throws IOException { setIoException(ioe); throw getIoException(); } - refreshCurrentBuffer(bufferPool); + refreshCurrentBuffer(); + } + void updateCommitInfo(XceiverClientReply reply, List buffers) { } /** @@ -441,16 +435,14 @@ ContainerCommandResponseProto> executePutBlock(boolean close, blockID.set(responseBlockID); if (LOG.isDebugEnabled()) { LOG.debug( - "Adding index " + asyncReply.getLogIndex() + " commitMap size " - + commitWatcher.getCommitInfoMapSize() + " flushLength " + "Adding index " + asyncReply.getLogIndex() + " flushLength " + flushPos + " numBuffers " + byteBufferList.size() + " blockID " + blockID + " bufferPool size" + bufferPool .getSize() + " currentBufferIndex " + bufferPool .getCurrentBufferIndex()); } // for standalone protocol, logIndex will always be 0. - commitWatcher - .updateCommitInfoMap(asyncReply.getLogIndex(), byteBufferList); + updateCommitInfo(asyncReply, byteBufferList); } return e; }, responseExecutor).exceptionally(e -> { @@ -468,10 +460,14 @@ ContainerCommandResponseProto> executePutBlock(boolean close, Thread.currentThread().interrupt(); handleInterruptedException(ex, false); } - commitWatcher.getFutureMap().put(flushPos, flushFuture); + putFlushFuture(flushPos, flushFuture); return flushFuture; } + void putFlushFuture(long flushPos, + CompletableFuture flushFuture) { + } + @Override public void flush() throws IOException { if (xceiverClientFactory != null && xceiverClient != null @@ -514,7 +510,7 @@ private void handleFlush(boolean close) checkOpen(); // flush the last chunk data residing on the currentBuffer if (totalDataFlushedLength < writtenDataLength) { - refreshCurrentBuffer(bufferPool); + refreshCurrentBuffer(); Preconditions.checkArgument(currentBuffer.position() > 0); if (currentBuffer.hasRemaining()) { writeChunk(currentBuffer); @@ -561,13 +557,7 @@ public void close() throws IOException { } } - private void waitOnFlushFutures() - throws InterruptedException, ExecutionException { - CompletableFuture combinedFuture = CompletableFuture.allOf( - commitWatcher.getFutureMap().values().toArray( - new CompletableFuture[commitWatcher.getFutureMap().size()])); - // wait for all the transactions to complete - combinedFuture.get(); + void waitOnFlushFutures() throws InterruptedException, ExecutionException { } private void validateResponse( @@ -601,13 +591,17 @@ private void setIoException(Exception e) { } } + void cleanup() { + } + public void cleanup(boolean invalidateClient) { if (xceiverClientFactory != null) { xceiverClientFactory.releaseClient(xceiverClient, invalidateClient); } xceiverClientFactory = null; xceiverClient = null; - commitWatcher.cleanup(); + cleanup(); + if (bufferList != null) { bufferList.clear(); } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java new file mode 100644 index 00000000000..7238f2a2a06 --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.storage; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdds.scm.OzoneClientConfig; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.XceiverClientReply; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.ozone.common.ChunkBuffer; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +/** + * An {@link OutputStream} used by the REST service in combination with the + * SCMClient to write the value of a key to a sequence + * of container chunks. Writes are buffered locally and periodically written to + * the container as a new chunk. In order to preserve the semantics that + * replacement of a pre-existing key is atomic, each instance of the stream has + * an internal unique identifier. This unique identifier and a monotonically + * increasing chunk index form a composite key that is used as the chunk name. + * After all data is written, a putKey call creates or updates the corresponding + * container key, and this call includes the full list of chunks that make up + * the key data. The list of chunks is updated all at once. Therefore, a + * concurrent reader never can see an intermediate state in which different + * chunks of data from different versions of the key data are interleaved. + * This class encapsulates all state management for buffering and writing + * through to the container. + */ +public class RatisBlockOutputStream extends BlockOutputStream { + public static final Logger LOG = LoggerFactory.getLogger( + RatisBlockOutputStream.class); + + // This object will maintain the commitIndexes and byteBufferList in order + // Also, corresponding to the logIndex, the corresponding list of buffers will + // be released from the buffer pool. + private final CommitWatcher commitWatcher; + + /** + * Creates a new BlockOutputStream. + * + * @param blockID block ID + * @param bufferPool pool of buffers + */ + public RatisBlockOutputStream( + BlockID blockID, + XceiverClientFactory xceiverClientManager, + Pipeline pipeline, + BufferPool bufferPool, + OzoneClientConfig config, + Token token + ) throws IOException { + super(blockID, xceiverClientManager, pipeline, bufferPool, config, token); + this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient()); + } + + @Override + public long getTotalAckDataLength() { + return commitWatcher.getTotalAckDataLength(); + } + + @VisibleForTesting + public Map> getCommitIndex2flushedDataMap() { + return commitWatcher.getCommitIndex2flushedDataMap(); + } + + @Override + void releaseBuffersOnException() { + commitWatcher.releaseBuffersOnException(); + } + + @Override + XceiverClientReply sendWatchForCommit(boolean bufferFull) throws IOException { + return bufferFull? commitWatcher.watchOnFirstIndex() + : commitWatcher.watchOnLastIndex(); + } + + @Override + void updateCommitInfo(XceiverClientReply reply, List buffers) { + commitWatcher.updateCommitInfoMap(reply.getLogIndex(), buffers); + } + + @Override + void putFlushFuture(long flushPos, + CompletableFuture flushFuture) { + commitWatcher.getFutureMap().put(flushPos, flushFuture); + } + + @Override + void waitOnFlushFutures() throws InterruptedException, ExecutionException { + // wait for all the transactions to complete + CompletableFuture.allOf(commitWatcher.getFutureMap().values().toArray( + new CompletableFuture[0])).get(); + } + + @Override + void cleanup() { + commitWatcher.cleanup(); + } +} diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java index 59145b6c5b7..635f1f7e5e6 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java @@ -104,7 +104,7 @@ private BlockOutputStream createBlockOutputStream(BufferPool bufferPool) config.setChecksumType(ChecksumType.NONE); config.setBytesPerChecksum(256 * 1024); - BlockOutputStream outputStream = new BlockOutputStream( + BlockOutputStream outputStream = new RatisBlockOutputStream( new BlockID(1L, 1L), xcm, pipeline, diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java index 594bbf0bd75..548587c1c76 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.hdds.scm.storage.BufferPool; +import org.apache.hadoop.hdds.scm.storage.RatisBlockOutputStream; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.security.token.Token; @@ -96,7 +97,7 @@ long getRemaining() { private void checkStream() throws IOException { if (this.outputStream == null) { this.outputStream = - new BlockOutputStream(blockID, xceiverClientManager, + new RatisBlockOutputStream(blockID, xceiverClientManager, pipeline, bufferPool, config, token); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java index 4459445afdb..cd3e8ebd6bb 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientMetrics; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; +import org.apache.hadoop.hdds.scm.storage.RatisBlockOutputStream; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.client.ObjectStore; @@ -159,7 +160,7 @@ public void testBufferCaching() throws Exception { OutputStream stream = keyOutputStream.getStreamEntries().get(0) .getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; // we have just written data less than a chunk size, the data will just sit // in the buffer, with only one buffer being allocated in the buffer pool @@ -263,7 +264,7 @@ public void testFlushChunk() throws Exception { OutputStream stream = keyOutputStream.getStreamEntries().get(0) .getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; // we have just written data equal flush Size = 2 chunks, at this time // buffer pool will have 2 buffers allocated worth of chunk size @@ -358,7 +359,7 @@ public void testMultiChunkWrite() throws Exception { OutputStream stream = keyOutputStream.getStreamEntries().get(0) .getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; // we have just written data equal flush Size > 1 chunk, at this time // buffer pool will have 2 buffers allocated worth of chunk size @@ -453,7 +454,7 @@ public void testMultiChunkWrite2() throws Exception { OutputStream stream = keyOutputStream.getStreamEntries().get(0) .getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; // we have just written data more than flush Size(2 chunks), at this time // buffer pool will have 3 buffers allocated worth of chunk size @@ -534,7 +535,7 @@ public void testFullBufferCondition() throws Exception { OutputStream stream = keyOutputStream.getStreamEntries().get(0) .getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); @@ -639,7 +640,7 @@ public void testWriteWithExceedingMaxBufferLimit() throws Exception { OutputStream stream = keyOutputStream.getStreamEntries().get(0) .getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); // writtenDataLength as well flushedDataLength will be updated here diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java index 59f04296589..1f6f58e8250 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientMetrics; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; +import org.apache.hadoop.hdds.scm.storage.RatisBlockOutputStream; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.client.ObjectStore; @@ -152,7 +153,7 @@ public void testBufferCaching() throws Exception { OutputStream stream = keyOutputStream.getStreamEntries().get(0) .getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; // we have just written data less than a chunk size, the data will just sit // in the buffer, with only one buffer being allocated in the buffer pool @@ -261,7 +262,7 @@ public void testFlushChunk() throws Exception { OutputStream stream = keyOutputStream.getStreamEntries().get(0) .getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; // we have just written data equal flush Size = 2 chunks, at this time // buffer pool will have 2 buffers allocated worth of chunk size @@ -356,7 +357,7 @@ public void testMultiChunkWrite() throws Exception { OutputStream stream = keyOutputStream.getStreamEntries().get(0) .getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; // we have just written data equal flush Size > 1 chunk, at this time // buffer pool will have 2 buffers allocated worth of chunk size @@ -451,7 +452,7 @@ public void testMultiChunkWrite2() throws Exception { OutputStream stream = keyOutputStream.getStreamEntries().get(0) .getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; // we have just written data more than flush Size(2 chunks), at this time // buffer pool will have 3 buffers allocated worth of chunk size @@ -532,7 +533,7 @@ public void testFullBufferCondition() throws Exception { OutputStream stream = keyOutputStream.getStreamEntries().get(0) .getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); @@ -637,7 +638,7 @@ public void testWriteWithExceedingMaxBufferLimit() throws Exception { OutputStream stream = keyOutputStream.getStreamEntries().get(0) .getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); // writtenDataLength as well flushedDataLength will be updated here diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java index db3c94b8036..d6b4419af65 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; +import org.apache.hadoop.hdds.scm.storage.RatisBlockOutputStream; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.client.ObjectStore; @@ -178,7 +179,7 @@ public void testWatchForCommitWithCloseContainerException() OutputStream stream = keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; // we have just written data more than flush Size(2 chunks), at this time // buffer pool will have 4 buffers allocated worth of chunk size @@ -268,7 +269,7 @@ public void testWatchForCommitDatanodeFailure() throws Exception { OutputStream stream = keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; // we have just written data more than flush Size(2 chunks), at this time // buffer pool will have 3 buffers allocated worth of chunk size @@ -353,7 +354,7 @@ public void test2DatanodesFailure() throws Exception { OutputStream stream = keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; // we have just written data more than flush Size(2 chunks), at this time // buffer pool will have 3 buffers allocated worth of chunk size @@ -458,7 +459,7 @@ public void testFailureWithPrimeSizedData() throws Exception { OutputStream stream = keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); @@ -522,7 +523,7 @@ public void testExceptionDuringClose() throws Exception { OutputStream stream = keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize()); Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); @@ -594,7 +595,7 @@ public void testWatchForCommitWithSingleNodeRatis() throws Exception { OutputStream stream = keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; // we have just written data more than flush Size(2 chunks), at this time // buffer pool will have 4 buffers allocated worth of chunk size @@ -686,7 +687,7 @@ public void testDatanodeFailureWithSingleNodeRatis() throws Exception { OutputStream stream = keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; // we have just written data more than flush Size(2 chunks), at this time // buffer pool will have 3 buffers allocated worth of chunk size @@ -780,7 +781,7 @@ public void testDatanodeFailureWithPreAllocation() throws Exception { OutputStream stream = keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; // we have just written data more than flush Size(2 chunks), at this time // buffer pool will have 3 buffers allocated worth of chunk size diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java index d88299b245c..382947df9c0 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; +import org.apache.hadoop.hdds.scm.storage.RatisBlockOutputStream; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.client.ObjectStore; @@ -179,7 +180,7 @@ public void testWatchForCommitWithCloseContainerException() OutputStream stream = keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; // we have just written data more than flush Size(2 chunks), at this time // buffer pool will have 4 buffers allocated worth of chunk size @@ -269,7 +270,7 @@ public void testWatchForCommitDatanodeFailure() throws Exception { OutputStream stream = keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; // we have just written data more than flush Size(2 chunks), at this time // buffer pool will have 3 buffers allocated worth of chunk size @@ -355,7 +356,7 @@ public void test2DatanodesFailure() throws Exception { OutputStream stream = keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; // we have just written data more than flush Size(2 chunks), at this time // buffer pool will have 3 buffers allocated worth of chunk size @@ -460,7 +461,7 @@ public void testFailureWithPrimeSizedData() throws Exception { OutputStream stream = keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize()); Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); @@ -524,7 +525,7 @@ public void testExceptionDuringClose() throws Exception { OutputStream stream = keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize()); Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength()); @@ -596,7 +597,7 @@ public void testWatchForCommitWithSingleNodeRatis() throws Exception { OutputStream stream = keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; // we have just written data more than flush Size(2 chunks), at this time // buffer pool will have 4 buffers allocated worth of chunk size @@ -688,7 +689,7 @@ public void testDatanodeFailureWithSingleNodeRatis() throws Exception { OutputStream stream = keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; // we have just written data more than flush Size(2 chunks), at this time // buffer pool will have 3 buffers allocated worth of chunk size @@ -782,7 +783,7 @@ public void testDatanodeFailureWithPreAllocation() throws Exception { OutputStream stream = keyOutputStream.getStreamEntries().get(0).getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; // we have just written data more than flush Size(2 chunks), at this time // buffer pool will have 3 buffers allocated worth of chunk size diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java index a79cca0c464..d6821c4aa65 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; +import org.apache.hadoop.hdds.scm.storage.RatisBlockOutputStream; import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConsts; @@ -183,7 +184,7 @@ public void testWatchForCommitWithKeyWrite() throws Exception { OutputStream stream = keyOutputStream.getStreamEntries().get(0) .getOutputStream(); Assert.assertTrue(stream instanceof BlockOutputStream); - BlockOutputStream blockOutputStream = (BlockOutputStream) stream; + RatisBlockOutputStream blockOutputStream = (RatisBlockOutputStream) stream; // we have just written data more than flush Size(2 chunks), at this time // buffer pool will have 3 buffers allocated worth of chunk size Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());