-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(WIP) Making compression ratio dynamically calculated based on bytes written #347
Conversation
@vinothchandar I would like to have a quick discussion on this before you take a pass. |
|
||
public HoodieParquetConfig(HoodieAvroWriteSupport writeSupport, | ||
CompressionCodecName compressionCodecName, int blockSize, int pageSize, long maxFileSize, | ||
Configuration hadoopConf) { | ||
CompressionCodecName compressionCodecName, int blockSize, int pageSize, int maxFileSize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type change here, probably do a rebase?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, done.
b30eef8
to
f04879e
Compare
|
||
SizeAwareFSDataOutputStream os = | ||
new SizeAwareFSDataOutputStream(fsDataOutputStream, new Runnable() { | ||
@Override | ||
public void run() { | ||
openStreams.remove(path.getName()); | ||
// openStreams.remove(path.getName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doing this helps to call fs.getBytesWritten(file) even after the stream is closed and gives back an exact number of uncompressed bytes written. @vinothchandar
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stream is closed implies that the file (block i think) is fully written?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, and so the wrappedStream has the correct number of bytes to return..
f04879e
to
1437afb
Compare
} catch (Throwable t) { | ||
// make this fail safe. | ||
} | ||
return HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can make the DEFAULT be calculated based on maxFileSize.
1437afb
to
1cfd36d
Compare
1cfd36d
to
7483160
Compare
@n3nash Can we check in the underlying OutputStream (by adding wrapper) to see how much is getting written. This will help us to correctly throttle file size. |
@ovj this code actually already does that, the change is just to make sure we can get bytes before the stream is closed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
High level: we can go with two approaches
A) Just care about the compressed_size_per_record, based on commit metadata previously
B) Get uncompressed and compressed sizes and determine the compression ratio.. (This PR)
Neither really tackles the case when there is no history/commits to get a sense of the record size.. (correct me if I am missing sth)
I am actually leaning more on doubling down on A (which is what the partitioner uses to pack data today). Is that grossly inaccurate in sizing partitions?
Also can you confirm this has been set..
// Config to control whether we control insert split sizes automatically based on average record sizes
public static final String COPY_ON_WRITE_TABLE_AUTO_SPLIT_INSERTS = "hoodie.copyonwrite.insert.auto.split";
// its off by default
public static final String DEFAULT_COPY_ON_WRITE_TABLE_AUTO_SPLIT_INSERTS = String.valueOf(false);
@@ -219,6 +219,9 @@ public void close() { | |||
} | |||
writeStatus.getStat().setNumWrites(recordsWritten); | |||
writeStatus.getStat().setNumDeletes(recordsDeleted); | |||
// an estimate of the number of bytes written | |||
writeStatus.getStat().setTotalUncompressedWriteBytes(recordsWritten*averageRecordSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
really like to understand how your IDE is setup :).. how come it missed formatting the space between * in this diff, while it changed it everywhere in the other..
we really need to do #287
|
||
SizeAwareFSDataOutputStream os = | ||
new SizeAwareFSDataOutputStream(fsDataOutputStream, new Runnable() { | ||
@Override | ||
public void run() { | ||
openStreams.remove(path.getName()); | ||
// openStreams.remove(path.getName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stream is closed implies that the file (block i think) is fully written?
@@ -50,8 +54,29 @@ | |||
HoodieParquetConfig parquetConfig = | |||
new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, | |||
config.getParquetBlockSize(), config.getParquetPageSize(), | |||
config.getParquetMaxFileSize(), hoodieTable.getHadoopConf()); | |||
config.getParquetMaxFileSize(), hoodieTable.getHadoopConf(), compressionRatioPerRecord(hoodieTable)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will be opened in each executor?
@vinothchandar I see few ways to pass information of number of records needed in a partition to create handle :
I personally like 1 over 2 and am not in favor of 3. But I also want to explore the addition of the metric (this PR) which makes things simpler. |
Option #4 (lmk what you think)
|
I'm fine with this approach too. Ideally, I was looking for something that requires less refactor and can be a quick way to fix this compression issue so I can spend time starting to run a dataset end to end, tune the compaction process and move towards running this in prod, hence this PR and my suggested approaches.. |
Understand where you are coming from. Unfortunately this is not straightforward. If thats your stated short term goal, I suggest just introduce a config for compression ratio, set it as desired and move on.. |
Yeah, let me do that for now. Once I start to run some datasets and am able to do validations, I can spend time on this again. |
closing for now , keeping the issue open |
No description provided.