-
Notifications
You must be signed in to change notification settings - Fork 8.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HADOOP-19354. S3A: S3AInputStream to be created by factory under S3AStore #7214
base: trunk
Are you sure you want to change the base?
HADOOP-19354. S3A: S3AInputStream to be created by factory under S3AStore #7214
Conversation
test failure from me pushing disk allocator down into store and test case not setting the store up
|
5a32f16
to
7d76047
Compare
a944b86
to
0f01d61
Compare
💔 -1 overall
This message was automatically generated. |
0f01d61
to
e7e454c
Compare
💔 -1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall I like the design and refactoring.
One thought, can we make minimal prefetching changes in this PR and only focus on the interface and ClassicInputStream and create a separate PR for all prefetching stuff?
@@ -993,7 +983,7 @@ private void initThreadPools(Configuration conf) { | |||
unboundedThreadPool.allowCoreThreadTimeOut(true); | |||
executorCapacity = intOption(conf, | |||
EXECUTOR_CAPACITY, DEFAULT_EXECUTOR_CAPACITY, 1); | |||
if (prefetchEnabled) { | |||
if (requirements.createFuturePool()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change the name to prefetchRequirements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's more requirements than just prefetching, e.g if vector IO support is needed then some extra threads are added to the pool passed down.
...doop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamCallbacks.java
Outdated
Show resolved
Hide resolved
I'm just setting this up so it is ready for the analytics stream work...making sure that prefetch is also covered is my way to validate the factory model, and that the options need to include things like the options to ask for a shared thread pool and stream thread pool, with the intent that analytics will use that too. And once I do that, they all need a single base stream class. For my vector IO resilience PR, once I have this PR in, I'm going to go back to #7105 and make it something which works with all object input streams
read failure
the read failure stuff is essentially in my PR, so maybe we can rebase onto this, merge in and then pull up. Goal: analytics stream gets vector IO. |
💔 -1 overall
This message was automatically generated. |
this.ioStatistics = streamStatistics.getIOStatistics(); | ||
this.inputPolicy = context.getInputPolicy(); | ||
streamStatistics.inputPolicySet(inputPolicy.ordinal()); | ||
this.boundedThreadPool = parameters.getBoundedThreadPool(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see boundedThreadPool is used in S3AInputStream but not in S3APrefetchingInputStream, can we keep boundedThreadPool local to S3AInputStream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
each stream can declare what it wants thread-pool wise and we will allocate those to them. If they don't want it, they don't get it.
That bounded thread pool passed down is the semaphore pool we also use in uploads. It takes a subset of the shared pool, has its own pending queue and blocks the caller thread when that pending queue is full.
If the analytics stream doesn't currently need it -don't ask for any
But I do want to have the vector IO code to be moved out of S3AInputStream so it can work with the superclass, so all streams get it. These also want a bounded number of threads
|
||
/** | ||
* A stream of data from an S3 object. | ||
* The blase class includes common methods, stores |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: spelling base
* This must be re-invoked after replacing the S3Client during test | ||
* runs. | ||
* <p> | ||
* It requires the S3Store to have been instantiated. | ||
* @param conf configuration. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@param conf is no longer required
* @param sharedThreads Number of shared threads to included in the bounded pool. | ||
* @param streamThreads How many threads per stream, ignoring vector IO requirements. | ||
* @param createFuturePool Flag to enable creation of a future pool around the bounded thread pool. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@param vectorSupported missing
@@ -845,7 +826,7 @@ private S3AFileSystemOperations createFileSystemHandler() { | |||
@VisibleForTesting | |||
protected S3AStore createS3AStore(final ClientManager clientManager, | |||
final int rateLimitCapacity) { | |||
return new S3AStoreBuilder() | |||
final S3AStore st = new S3AStoreBuilder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: rename variable to meaningful name
@rajdchak thanks for the comments, will address I do want to pull up the vector IO support, with integration with prefetch and cacheing. For prefetch/caching stream we'd ask for a the requested ranges to be split up into
It'd be good to collect stats on cache hit/miss here, to assess integration of vector reads with ranges. When a list of ranges comes down, there is less need to infer the next range and prefetch, and I'm not actually sure how important cacheing becomes. This is why setting parquet up to use vector IO already appears to give speedups comparable to the analytics stream benchmarks published. what I want is best of both worlds: prefetch of rowgroups from stream inference -and when vector reads come in, statisfy those by returning current/active prefetches, or retrieve new ranges through ranged GET requests. #7105 is where that will go; I've halted that until this is in. And I'll only worry about that integration with prefetched/cached blocks with the analytics stream. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @steveloughran, looks good to me overall. Just need to allow for the ClientManager to be passed into the factory.
: 0); | ||
// create an executor which is a subset of the | ||
// bounded thread pool. | ||
final SemaphoredDelegatingExecutor pool = new SemaphoredDelegatingExecutor( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a clarifying question, what is the benefit of creating a new SemaphoredDelegatingExecutor
per stream vs just creating this once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok I think I get it, this is basically a way to ensure a single stream instance does not use up too many threads.
public static ObjectInputStreamFactory createStreamFactory(final Configuration conf) { | ||
// choose the default input stream type | ||
InputStreamType defaultStream = InputStreamType.DEFAULT_STREAM_TYPE; | ||
if (conf.getBoolean(PREFETCH_ENABLED_KEY, false)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're saying PREFETCH_ENABLED_KEY
deprecated, but still setting the stream type to prefetch. Is this something we want? If yes, we should make the message clearer to say "we're going to deprecate this in the future, but it works for now"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to say "if you set it, we will tell you not to but still take the setting as the default...so it can be overridden by the new option"
* Each enum value contains the factory function actually used to create | ||
* the factory. | ||
*/ | ||
public enum InputStreamType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed in #7295, the S3SeekableInputStreamFactory requires a client to be passed in. For this, we need a way to pass in the ClientManager here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah. will do that. after Service.init() we will pass down a reference to the client manager, -though that won't be ready to use until Service.start().
Also, client manager should declare whether CRT is used or not, even before the client is instantiated (avoids launch-performance hit). Then the analyitics stream can just fail fast in start() based on that flag alone
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/* sorry, had commented back on others but hadn't pressed the submit button. doing it now */
@@ -993,7 +983,7 @@ private void initThreadPools(Configuration conf) { | |||
unboundedThreadPool.allowCoreThreadTimeOut(true); | |||
executorCapacity = intOption(conf, | |||
EXECUTOR_CAPACITY, DEFAULT_EXECUTOR_CAPACITY, 1); | |||
if (prefetchEnabled) { | |||
if (requirements.createFuturePool()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's more requirements than just prefetching, e.g if vector IO support is needed then some extra threads are added to the pool passed down.
this.ioStatistics = streamStatistics.getIOStatistics(); | ||
this.inputPolicy = context.getInputPolicy(); | ||
streamStatistics.inputPolicySet(inputPolicy.ordinal()); | ||
this.boundedThreadPool = parameters.getBoundedThreadPool(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
each stream can declare what it wants thread-pool wise and we will allocate those to them. If they don't want it, they don't get it.
That bounded thread pool passed down is the semaphore pool we also use in uploads. It takes a subset of the shared pool, has its own pending queue and blocks the caller thread when that pending queue is full.
If the analytics stream doesn't currently need it -don't ask for any
But I do want to have the vector IO code to be moved out of S3AInputStream so it can work with the superclass, so all streams get it. These also want a bounded number of threads
5f62a1d
to
88ee1d2
Compare
(just had to rebase as it wouldn't merge with the directory marker changes. going to make backporting to branch-3.4 harder. FWIW I'm wondering if we should make the leap to a 3.5.0 release with java17 the baseline and keep 3.4.x the maintenance branch with CVE and jar updates only. Not discussed that on the mail lists yet though...) |
💔 -1 overall
This message was automatically generated. |
Thanks @steveloughran, this looks good now. We've just done an initial rebase on this here and we're able to integrate successfully. I will merge this into the feature branch, and then follow up with our changes. |
@ahmarsuhail will look at it. just rebase and review of this; last failure seems VM rather than code |
@steveloughran do you want to merge this PR into trunk? Or do you want this to go in via our feature branch?
So either this PR goes into trunk directly, or it can go in as part of the feature branch. |
Original contributor: @steveloughran in PR #7214
88ee1d2
to
b5346a1
Compare
💔 -1 overall
This message was automatically generated. |
Do you think we should fallback if a stream factory fails to load? as if they depend on 3rd party libraries those libs may not be deployed across the cluster Good: something works We can/should add an iostats gauge to indicate which indicates which stream is in use -serve it up in FS and stream |
@steveloughran personally think we should throw the failure and not have the fallback. Users of both prefetching input stream and AAL will expect performance benefits from using them, and if the failures are not visible, it'll lead to people thinking those streams aren't any faster. |
@ahmarsuhail +1 now, unrelated issue. It looks to me like the jersey update's associated junit stuff has stopped tests being discovered in hadoop-aws. I'm rebasing this PR onto the commit before that one just so I can make progress. Can you check out and build trunk and tell me if your run of the hadoop-aws unit tests run any tests -or is it my setup (across both git clones i have of the repo) |
@steveloughran I just hit the same issue on my CRT PR, unable to run tests :(
|
b5346a1
to
745492d
Compare
S3 InputStreams are created by a factory class, with the choice of factory dynamically chosen by the option fs.s3a.input.stream.type Supported values: classic, prefetching, analytics. S3AStore * Manages the creation and service lifecycle of the chosen factory, as well as forwarding stream construction requests to the chosen factory. * Provides the callbacks needed by both the factories and input streams. * StreamCapabilities.hasCapability(), which is relayed to the active factory. This avoids the FS having to know what capabilities are available in the stream.
745492d
to
9c8e753
Compare
Ability to create custom streams (type = custom), which reads class from "fs.s3a.input.stream.custom.factory". This is mainly for testing, especially CNFE and similar. Unit test TestStreamFactories for this. ObjectInputStreams save and export stream type to assist these tests too, as it enables assertions on the generated stream type. Simplified that logic related to the old prefetch enabled flag If fs.s3a.prefetch.enabled is true, the prefetch stream is returned, the stream.type option is not used at all. Simpler logic, simpler docs, fewer support calls. Parameters supplied to ObjectInputStreamFactory.bind converted to a parameter object. Allows for more parameters to be added later if ever required. ObjectInputStreamFactory returns more requirements to the store/fs. For this reason StreamThreadOptions threadRequirements(); is renamed StreamFactoryRequirements factoryRequirements() VectorIO context changes * Returned in factoryRequirements() * exiting configuration reading code moved into StreamIntegration.populateVectoredIOContext() * Streams which don't have custom vector IO, e.g. prefetching can return a minimum seek range of 0. This disables range merging on the default PositionedReadable implementation, so ensures that they will only get asked for data which will be read...leaving prefetch/cache code to know exactly what is needed. Other * Draft docs. * Stream capability declares stream type & is exported through FS too. (todo: test, document, add to bucket-info) * ConfigurationHelper.resolveEnum() supercedes Configuration.getEnum() with - case independence - fallback is a supplier<Enum> rather than a simple value. Change-Id: I2e59300af48042df8173de61d0b3d6139a0ae7fe
Not fully tested yet. I want to have the stream type passed down as a -D option |
HADOOP-19354
How was this patch tested?
S3 london
For code changes:
LICENSE
,LICENSE-binary
,NOTICE-binary
files?