Skip to content

Commit

Permalink
HADOOP-19221. Test wiring up
Browse files Browse the repository at this point in the history
* failing byte buffer test good
* added byte array test
* but to test file output need a way to get content.
  will have to do something there, maybe a new constructor for
  the test case

Change-Id: I15b08471939de75a8bae5794f0020fd669b435d2
  • Loading branch information
steveloughran committed Jul 12, 2024
1 parent 327ca8b commit cd82520
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ public synchronized void close() {
byteBuffer = null;
}

/**
* Is the stream open?
* @return true if the stream has not been closed.
*/
public synchronized boolean isOpen() {
return byteBuffer != null;
}

/**
* Verify that the stream is open.
* @throws IOException if the stream is closed
Expand All @@ -75,6 +83,15 @@ private void verifyOpen() throws IOException {
}
}

/**
* Check the open state.
* @throws IllegalStateException if the stream is closed.
*/
private void checkOpenState() {
Preconditions.checkState(isOpen(),
FSExceptionMessages.STREAM_IS_CLOSED);
}

public synchronized int read() throws IOException {
if (available() > 0) {
return byteBuffer.get() & 0xFF;
Expand All @@ -99,8 +116,7 @@ public synchronized long skip(long offset) throws IOException {

@Override
public synchronized int available() {
Preconditions.checkState(byteBuffer != null,
FSExceptionMessages.STREAM_IS_CLOSED);
checkOpenState();
return byteBuffer.remaining();
}

Expand All @@ -109,6 +125,7 @@ public synchronized int available() {
* @return the buffer position
*/
public synchronized int position() {
checkOpenState();
return byteBuffer.position();
}

Expand All @@ -117,18 +134,21 @@ public synchronized int position() {
* @return true if there is data remaining in the buffer.
*/
public synchronized boolean hasRemaining() {
checkOpenState();
return byteBuffer.hasRemaining();
}

@Override
public synchronized void mark(int readlimit) {
LOG.debug("mark at {}", position());
checkOpenState();
byteBuffer.mark();
}

@Override
public synchronized void reset() throws IOException {
LOG.debug("reset");
checkOpenState();
byteBuffer.reset();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -947,9 +947,11 @@ private List<CompletedPart> waitForAllPartUploads() throws IOException {
try {
return Futures.allAsList(partETagsFutures).get();
} catch (InterruptedException ie) {
// interruptions can happen if a task is aborted.
// interruptions are raided if a task is aborted by spark.
LOG.warn("Interrupted while waiting for uploads to {} to complete", key, ie);
// abort the upload
abort();
// then regenerate a new InterruptedIOException
throw (IOException) new InterruptedIOException(ie.toString()).initCause(ie);
} catch (ExecutionException ee) {
//there is no way of recovering so abort
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ public void close() {

/**
* Note that a stream was created.
* Logs if this is a subsequent event as it implies a failure of the first attempt.
* <p>
* Logs if this is a subsequent event as it implies a failure of the first attempt.
* @return the new stream
*/
@Override
Expand All @@ -171,7 +172,7 @@ public final InputStream newStream() {
if (streamCreationCount > 1) {
LOG.info("Stream created more than once: {}", this);
}
return createNewStream();
return setCurrentStream(createNewStream());
}

/**
Expand All @@ -182,9 +183,9 @@ public final InputStream newStream() {

/**
* How many times has a stream been created?
* @return
* @return stream creation count
*/
int getStreamCreationCount() {
public int getStreamCreationCount() {
return streamCreationCount;
}

Expand Down Expand Up @@ -301,6 +302,11 @@ private static final class ByteBufferContentProvider
*/
private final ByteBuffer blockBuffer;

/**
* The position in the buffer at the time the provider was created.
*/
private final int initialPosition;

/**
* Constructor.
* @param blockBuffer buffer to read.
Expand All @@ -311,17 +317,22 @@ private static final class ByteBufferContentProvider
private ByteBufferContentProvider(final ByteBuffer blockBuffer, int size) {
super(size);
this.blockBuffer = blockBuffer;
this.initialPosition = blockBuffer.position();
}

@Override
protected ByteBufferInputStream createNewStream() {
// set the buffer up from reading from the beginning
blockBuffer.limit(initialPosition);
blockBuffer.position(0);
return new ByteBufferInputStream(getSize(), blockBuffer);
}

@Override
public String toString() {
return "ByteBufferContentProvider{" +
"blockBuffer=" + blockBuffer +
", initialPosition=" + initialPosition +
"} " + super.toString();
}
}
Expand Down Expand Up @@ -374,7 +385,7 @@ protected ByteArrayInputStream createNewStream() {
@Override
public String toString() {
return "ByteArrayContentProvider{" +
"buf=" + Arrays.toString(bytes) +
"buffer with length=" + bytes.length +
", offset=" + offset +
"} " + super.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,44 +18,80 @@

package org.apache.hadoop.fs.s3a;

import java.io.InputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;

import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.impl.UploadContentProviders;
import org.apache.hadoop.fs.store.ByteBufferInputStream;
import org.apache.hadoop.test.HadoopTestBase;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import static java.util.Optional.empty;
import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY;
import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_DISK;
import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BYTEBUFFER;

/**
* Unit tests for {@link S3ADataBlocks}.
* Parameterized on the buffer type.
*/
public class TestDataBlocks extends Assert {
@RunWith(Parameterized.class)
public class TestDataBlocks extends HadoopTestBase {

@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
{FAST_UPLOAD_BUFFER_DISK},
{FAST_UPLOAD_BUFFER_ARRAY},
{FAST_UPLOAD_BYTEBUFFER}
});
}

@Rule
public Timeout testTimeout = new Timeout(30 * 1000);
/**
* Buffer type.
*/
private final String bufferType;

@Before
public void nameThread() {
Thread.currentThread().setName("JUnit");
public TestDataBlocks(final String bufferType) {
this.bufferType = bufferType;
}

/**
* Create a block factory.
* @return the factory
*/
private S3ADataBlocks.BlockFactory createFactory() {
switch (bufferType) {
case FAST_UPLOAD_BUFFER_DISK:
return new S3ADataBlocks.DiskBlockFactory(null);
case FAST_UPLOAD_BUFFER_ARRAY:
return new S3ADataBlocks.ArrayBlockFactory(null);
case FAST_UPLOAD_BYTEBUFFER:
return new S3ADataBlocks.ByteBufferBlockFactory(null);
default:
throw new IllegalArgumentException("Unknown buffer type: " + bufferType);
}
}

/**
* Test the {@link S3ADataBlocks.ByteBufferBlockFactory}.
* That code implements an input stream over a ByteBuffer, and has to
* return the buffer to the pool after the read complete.
*
* This test verifies the basic contract of the process.
* Test the content providers from the block factory and the streams
* they produce.
* There are extra assertions on the {@link ByteBufferInputStream}.
*/
@Test
public void testByteBufferIO() throws Throwable {
try (S3ADataBlocks.ByteBufferBlockFactory factory =
new S3ADataBlocks.ByteBufferBlockFactory(null)) {
public void testBlockFactoryIO() throws Throwable {
try (S3ADataBlocks.BlockFactory factory = createFactory()) {
int limit = 128;
S3ADataBlocks.ByteBufferBlockFactory.ByteBufferBlock block
S3ADataBlocks.DataBlock block
= factory.create(1, limit, null);
assertOutstandingBuffers(factory, 1);
maybeAssertOutstandingBuffers(factory, 1);

byte[] buffer = ContractTestUtils.toAsciiByteArray("test data");
int bufferLen = buffer.length;
Expand All @@ -69,12 +105,24 @@ public void testByteBufferIO() throws Throwable {

// now start the write
S3ADataBlocks.BlockUploadData blockUploadData = block.startUpload();
final UploadContentProviders.BaseContentProvider cp =
final UploadContentProviders.BaseContentProvider<?> cp =
blockUploadData.getContentProvider();
ByteBufferInputStream stream =
(ByteBufferInputStream) cp.newStream();

assertStreamCreationCount(cp, 0);
InputStream stream = cp.newStream();
assertStreamCreationCount(cp, 1);

Optional<ByteBufferInputStream> bbStream =
stream instanceof ByteBufferInputStream
? Optional.of((ByteBufferInputStream) stream)
: empty();
assertTrue("Mark not supported in " + stream, stream.markSupported());
assertTrue("!hasRemaining() in " + stream, stream.hasRemaining());

bbStream.ifPresent(bb -> {
Assertions.assertThat(bb.hasRemaining())
.describedAs("hasRemaining() in %s", bb)
.isTrue();
});
int expected = bufferLen;
assertEquals("wrong available() in " + stream,
expected, stream.available());
Expand Down Expand Up @@ -106,35 +154,80 @@ public void testByteBufferIO() throws Throwable {
assertEquals(expected, index);
assertEquals('a', remainder[--index]);

// no more data left
assertEquals("wrong available() in " + stream,
0, stream.available());
assertTrue("hasRemaining() in " + stream, !stream.hasRemaining());
bbStream.ifPresent(bb -> {
Assertions.assertThat(bb.hasRemaining())
.describedAs("hasRemaining() in %s", bb)
.isFalse();
});

// at the end of the stream, a read fails
Assertions.assertThat(stream.read())
.describedAs("EOF in " + stream)
.isEqualTo(-1);

// go the mark point
stream.reset();
assertEquals('e', stream.read());

// when the stream is closed, the data should be returned
stream.close();
assertOutstandingBuffers(factory, 1);
// now ask the content provider for another content stream.
final InputStream stream2 = cp.newStream();

// do a read(byte[]) of everything
byte[] readBuffer = new byte[bufferLen];
Assertions.assertThat(stream2.read(readBuffer))
.describedAs("number of bytes read from stream %s", stream2)
.isEqualTo(bufferLen);
Assertions.assertThat(readBuffer)
.describedAs("data read into buffer")
.isEqualTo(buffer);

// this must close the old stream
bbStream.ifPresent(bb -> {
Assertions.assertThat(bb.isOpen())
.describedAs("stream %s is open", bb)
.isFalse();
});

assertStreamCreationCount(cp, 2);

// when the block is closed, the buffer must be returned
// to the pool.
block.close();
assertOutstandingBuffers(factory, 0);
maybeAssertOutstandingBuffers(factory, 0);
stream.close();
assertOutstandingBuffers(factory, 0);
maybeAssertOutstandingBuffers(factory, 0);
}

}

private static void assertStreamCreationCount(final UploadContentProviders.BaseContentProvider<?> cp,
final int count) {
Assertions.assertThat(cp.getStreamCreationCount())
.describedAs("stream creation count of %s", cp)
.isEqualTo(count);
}

/**
* Assert the number of buffers active for a block factory.
* Assert the number of buffers active for a block factory,
* if the factory is a ByteBufferBlockFactory.
* <p>
* If it is of any other type, no checks are made.
* @param factory factory
* @param expectedCount expected count.
*/
private static void assertOutstandingBuffers(
S3ADataBlocks.ByteBufferBlockFactory factory,
private static void maybeAssertOutstandingBuffers(
S3ADataBlocks.BlockFactory factory,
int expectedCount) {
assertEquals("outstanding buffers in " + factory,
expectedCount, factory.getOutstandingBufferCount());
if (factory instanceof S3ADataBlocks.ByteBufferBlockFactory) {
S3ADataBlocks.ByteBufferBlockFactory bufferFactory =
(S3ADataBlocks.ByteBufferBlockFactory) factory;
Assertions.assertThat(bufferFactory.getOutstandingBufferCount())
.describedAs("outstanding buffers in %s", factory)
.isEqualTo(expectedCount);
}
}

}

0 comments on commit cd82520

Please sign in to comment.