Skip to content

Commit

Permalink
HADOOP-18792. s3a prefetching to use split start/end options to limit…
Browse files Browse the repository at this point in the history
… prefetch range

This passes the values down but doesn't interpret them; future work

Change-Id: I523b26e5a5a43fbf6ba5d2b6e44614c7e4fc70b7

HADOOP-18184. S3A Prefetching unbuffer.

compiles against v2 sdk now

Change-Id: Ic96af7f76931c6dcc453368ad02ae87d07fa4484

HADOOP-18184. temp file creation/test validation

* use block id in filename
* log statements include fs path
* tests more resilient
* logging auditor prints GET range and length

Tests are failing with signs of
* too many GETs
* incomplete buffers. race conditions?

Change-Id: Ibdca6292df8cf0149697cecfec24035e2be473d8

HADOOP-19043. S3A: Regression: ITestS3AOpenCost fails on prefetch test runs

This is actually trickier than it seems as we will need to go deep into the
implementation of caching.

Specifically: the prefetcher knows the file length and if you open a file
shorter than that, but less than one block, the read is considered a failure
and the whole block is skipped, so read() of the nominally in-range data
returns -1.

This fix has to be considered a PoC and should be combined with the other
big PR for prefetching, #5832 as that is where changes should go.

Here is just test tuning and some differentiation of channel problems from
other EOFs.

Change-Id: Icdf7e2fb10ca77b6ca427eb207472fad277130d7

HADOOP-19043. S3A: Regression: ITestS3AOpenCost fails on prefetch test runs

* Adds EOF logic deep into the prefetching code
* Tests still failing.
* this has all conflicts with hadoop trunk resolved

Change-Id: I9b23b01d010d8a1a680ce849d26a0aebab2389e2

HADOOP-18184. fix NPEs in BlockManager unit tests by adding withPath()

Change-Id: Ie3d1c266b1231fa85c01092dd79f2dcf961fe498

HADOOP-18184. prefetching

- Cache was not thread safe and it was possible for cleanup
  to happen while the caller had just verified it was there and
  before a read lock was acquired.
  fix: synchronize check and get into one block, use synchronized elsewhere.
- try to cut back on assertions in ITestS3APrefetchingLargeFiles which
  seem too brittle to prefetch behaviour/race conditions.
- minor doc, log, assertion changes
more work on that test failure needed

Change-Id: I288540ec1fb08e1a5684cde8e94e1c7933d1e41d

HADOOP-18184. prefetching: style

Change-Id: Ifdde5ab33f24515c306a8ccc27ec784c3b6c0a76

HADOOP-18184. unbuffer: reinstate commented out asserts

these asserts fail as I don't understand the prefetch logic
well enough to make valid assertions

Change-Id: I198d10ccead99754afd17040dc4f4c9ebc919906

HADOOP-18184. javadocs

Change-Id: I61d013ba439c8f4093ad0634a67c6a20e82062ad
  • Loading branch information
steveloughran committed Nov 18, 2024
1 parent 317db31 commit 5b85014
Show file tree
Hide file tree
Showing 56 changed files with 2,322 additions and 804 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,15 @@ public interface BlockCache extends Closeable {

/**
* Gets the block having the given {@code blockNumber}.
*
* If the block is not present then the method returns
* false and {@code buffer} is unchanged.
* @param blockNumber the id of the desired block.
* @param buffer contents of the desired block are copied to this buffer.
* @return true if the block was found.
* @throws IOException if there is an error reading the given block.
* @throws IllegalArgumentException if buffer is null.
*/
void get(int blockNumber, ByteBuffer buffer) throws IOException;
boolean get(int blockNumber, ByteBuffer buffer) throws IOException;

/**
* Puts the given block in this cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ public BlockData(long fileSize, int blockSize) {
? 1
: 0);
this.state = new State[this.numBlocks];
markBlocksAsNotReady();
}

/**
* Mark all the blocks as not ready.
*/
public void markBlocksAsNotReady() {
for (int b = 0; b < this.numBlocks; b++) {
setState(b, State.NOT_READY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,9 @@ public void requestPrefetch(int blockNumber) {

/**
* Requests cancellation of any previously issued prefetch requests.
* @param reason why?
*/
public void cancelPrefetches() {
public void cancelPrefetches(final CancelReason reason) {
// Do nothing because we do not support prefetches.
}

Expand All @@ -142,4 +143,16 @@ public void requestCaching(BufferData data) {
@Override
public void close() {
}

/**
* Reason for cancelling prefetches.
*/
public enum CancelReason {
/** Stream has switched to random IO. */
RandomIO,
/** Stream closed completely. */
Close,
/** Stream unbuffered. */
Unbuffer
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;

/**
Expand All @@ -30,6 +31,11 @@
@InterfaceAudience.Private
public final class BlockManagerParameters {

/**
* The path of the underlying file; for logging.
*/
private Path path;

/**
* Asynchronous tasks are performed in this pool.
*/
Expand Down Expand Up @@ -224,4 +230,20 @@ public BlockManagerParameters withTrackerFactory(
return this;
}

/**
* @return The path of the underlying file.
*/
public Path getPath() {
return path;
}

/**
* Set the path.
* @param value new value
* @return the builder
*/
public BlockManagerParameters withPath(final Path value) {
path = value;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public enum State {
/**
* Buffer has been acquired but has no data.
*/
BLANK,
EMPTY,

/**
* This block is being prefetched.
Expand Down Expand Up @@ -114,7 +114,7 @@ public BufferData(int blockNumber, ByteBuffer buffer) {

this.blockNumber = blockNumber;
this.buffer = buffer;
this.state = State.BLANK;
this.state = State.EMPTY;
}

/**
Expand Down Expand Up @@ -181,7 +181,7 @@ public synchronized Future<Void> getActionFuture() {
public synchronized void setPrefetch(Future<Void> actionFuture) {
Validate.checkNotNull(actionFuture, "actionFuture");

this.updateState(State.PREFETCHING, State.BLANK);
this.updateState(State.PREFETCHING, State.EMPTY);
this.action = actionFuture;
}

Expand Down Expand Up @@ -289,7 +289,7 @@ public boolean stateEqualsOneOf(State... states) {
public String toString() {

return String.format(
"[%03d] id: %03d, %s: buf: %s, checksum: %d, future: %s",
"[%03d] id: %03d, State: %s: buffer: %s, checksum: %d, future: %s",
this.blockNumber,
System.identityHashCode(this),
this.state,
Expand All @@ -300,7 +300,7 @@ public String toString() {

private String getFutureStr(Future<Void> f) {
if (f == null) {
return "--";
return "(none)";
} else {
return this.action.isDone() ? "done" : "not done";
}
Expand Down
Loading

0 comments on commit 5b85014

Please sign in to comment.