-
Notifications
You must be signed in to change notification settings - Fork 8.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HADOOP-19348. Integrate analytics accelerator into S3A. #7334
base: trunk
Are you sure you want to change the base?
Conversation
🎊 +1 overall
This message was automatically generated. |
S3 InputStreams are created by a factory class, with the choice of factory dynamically chosen by the option fs.s3a.input.stream.type Supported values: classic, prefetching, analytics. S3AStore * Manages the creation and service lifecycle of the chosen factory, as well as forwarding stream construction requests to the chosen factory. * Provides the callbacks needed by both the factories and input streams. * StreamCapabilities.hasCapability(), which is relayed to the active factory. This avoids the FS having to know what capabilities are available in the stream.
Ability to create custom streams (type = custom), which reads class from "fs.s3a.input.stream.custom.factory". This is mainly for testing, especially CNFE and similar. Unit test TestStreamFactories for this. ObjectInputStreams save and export stream type to assist these tests too, as it enables assertions on the generated stream type. Simplified that logic related to the old prefetch enabled flag If fs.s3a.prefetch.enabled is true, the prefetch stream is returned, the stream.type option is not used at all. Simpler logic, simpler docs, fewer support calls. Parameters supplied to ObjectInputStreamFactory.bind converted to a parameter object. Allows for more parameters to be added later if ever required. ObjectInputStreamFactory returns more requirements to the store/fs. For this reason StreamThreadOptions threadRequirements(); is renamed StreamFactoryRequirements factoryRequirements() VectorIO context changes * Returned in factoryRequirements() * exiting configuration reading code moved into StreamIntegration.populateVectoredIOContext() * Streams which don't have custom vector IO, e.g. prefetching can return a minimum seek range of 0. This disables range merging on the default PositionedReadable implementation, so ensures that they will only get asked for data which will be read...leaving prefetch/cache code to know exactly what is needed. Other * Draft docs. * Stream capability declares stream type & is exported through FS too. (todo: test, document, add to bucket-info) * ConfigurationHelper.resolveEnum() supercedes Configuration.getEnum() with - case independence - fallback is a supplier<Enum> rather than a simple value. Change-Id: I2e59300af48042df8173de61d0b3d6139a0ae7fe
🎊 +1 overall
This message was automatically generated. |
e18d0a4
to
d45beae
Compare
🎊 +1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
Few things to discuss here:
|
private static final String LOGICAL_IO_PREFIX = "logicalio"; | ||
|
||
@Test | ||
public void testConnectorFrameWorkIntegration() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small parquet file, src/test/parquet
can we read the file ~10sKB
does it just complete and not complete
malformed footer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some old comments about javadoc
|
||
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream; | ||
import software.amazon.s3.analyticsaccelerator.util.S3URI; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
java doc
import static org.apache.hadoop.fs.s3a.Constants.*; | ||
import static org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration.populateVectoredIOContext; | ||
|
||
public class AnalyticsStreamFactory extends AbstractObjectInputStreamFactory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
javadoc
public void testOverwriteExistingFile() throws Throwable { | ||
// Will remove this when Analytics Accelerator supports overwrites | ||
skipIfAnalyticsAcceleratorEnabled(this.createConfiguration(), | ||
"Analytics Accelerator does not support overwrites yet"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Analytics Accelerator is about read optimizations right? How does this relate to overwrite?
Is it because the file will be changed? You mean it doesn't support the RemoteFileChangedException?
@@ -65,6 +66,8 @@ protected Configuration createConfiguration() { | |||
*/ | |||
@Test | |||
public void testNotFoundFirstRead() throws Exception { | |||
skipIfAnalyticsAcceleratorEnabled(getConfiguration(), | |||
"Temporarily disabling to fix Exception handling on Analytics Accelerator"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs to be enabled.
Description of PR
Initial integration of analytics accelerator.
How was this patch tested?
In progress
For code changes:
LICENSE
,LICENSE-binary
,NOTICE-binary
files?