-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow one to bound the size of output shards when writing to files. #22130
Conversation
Codecov Report
@@ Coverage Diff @@
## master #22130 +/- ##
==========================================
+ Coverage 73.99% 74.71% +0.71%
==========================================
Files 703 703
Lines 92936 96503 +3567
==========================================
+ Hits 68769 72102 +3333
- Misses 22901 23135 +234
Partials 1266 1266
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
R: @Abacn |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for implementing this. Since the request is on file sink I am thinking about keeping the scope of change limited on FileBasedSink and thus do not need to touch iobase. Please find the following comments if it makes sense.
@@ -68,6 +68,9 @@ def __init__( | |||
shard_name_template=None, | |||
mime_type='application/octet-stream', | |||
compression_type=CompressionTypes.AUTO, | |||
*, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have code style guide about the usage of asterisk in function parameters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we have any guidance here (other than that which is generic to Python, which would indicate most of these arguments should be passed by keyword).
@@ -108,6 +111,8 @@ def __init__( | |||
shard_name_template) | |||
self.compression_type = compression_type | |||
self.mime_type = mime_type | |||
self.max_records_per_shard = max_records_per_shard | |||
self.max_bytes_per_shard = max_bytes_per_shard |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the implementation of write below, only one of them will take effect. Do we need to raise a warning (or info) to remind possible misuse when neither is None? Also need to document this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch. Fixed so that both take effect.
|
||
def close(self): | ||
self.sink.close(self.temp_handle) | ||
return self.temp_shard_path | ||
|
||
|
||
class _ByteCountingWriter: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can bytes_written be handled also in write function as num_records_written thus no need for the wrapped class? FileBasedSink.open used to return an instance of BufferedWriter always but if use this wrapped class it now it may not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, if the writer is compressed, record sends to FileBasedWriter may have different length to the record actually written and that's why a wrapped class is needed here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found that io.BufferedWriter.write returns the number of bytes written (https://docs.python.org/3.7/library/io.html#io.BufferedWriter.write) so bytes_written can be traced directly in FileBasedSinkWriter.write
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately io.BufferedWriter.write returns the number of bytes written for that call, not a running total.
sdks/python/apache_beam/io/iobase.py
Outdated
@@ -848,8 +848,12 @@ class Writer(object): | |||
See ``iobase.Sink`` for more detailed documentation about the process of | |||
writing to a sink. | |||
""" | |||
def write(self, value): | |||
"""Writes a value to the sink using the current writer.""" | |||
def write(self, value) -> Optional[bool]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better not change the signature of base class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's backwards compatible, which is why I made it Optional. But I've moved to using at_capacity as suggested instead.
|
||
def write(self, value): | ||
self.sink.write_record(self.temp_handle, value) | ||
if self.sink.max_records_per_shard: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If write still does not return, could create another method like "at_capacity" returns true if the writer has reached capacity. Also in this way max_bytes_per_shard
and max_records_per_shard
can have effect at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
sdks/python/apache_beam/io/iobase.py
Outdated
@@ -1184,7 +1188,9 @@ def process(self, element, init_result): | |||
if self.writer is None: | |||
# We ignore UUID collisions here since they are extremely rare. | |||
self.writer = self.sink.open_writer(init_result, str(uuid.uuid4())) | |||
self.writer.write(element) | |||
if self.writer.write(element): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
always call self.writer.write and then test if self.writer.at_capacity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I've addressed your comments. We do have to push the change down to iobase as that's where the writers are created (and destroyed) but I think the change there should be minimal and natural.
@@ -68,6 +68,9 @@ def __init__( | |||
shard_name_template=None, | |||
mime_type='application/octet-stream', | |||
compression_type=CompressionTypes.AUTO, | |||
*, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we have any guidance here (other than that which is generic to Python, which would indicate most of these arguments should be passed by keyword).
|
||
def close(self): | ||
self.sink.close(self.temp_handle) | ||
return self.temp_shard_path | ||
|
||
|
||
class _ByteCountingWriter: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately io.BufferedWriter.write returns the number of bytes written for that call, not a running total.
sdks/python/apache_beam/io/iobase.py
Outdated
@@ -848,8 +848,12 @@ class Writer(object): | |||
See ``iobase.Sink`` for more detailed documentation about the process of | |||
writing to a sink. | |||
""" | |||
def write(self, value): | |||
"""Writes a value to the sink using the current writer.""" | |||
def write(self, value) -> Optional[bool]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's backwards compatible, which is why I made it Optional. But I've moved to using at_capacity as suggested instead.
sdks/python/apache_beam/io/iobase.py
Outdated
@@ -1184,7 +1188,9 @@ def process(self, element, init_result): | |||
if self.writer is None: | |||
# We ignore UUID collisions here since they are extremely rare. | |||
self.writer = self.sink.open_writer(init_result, str(uuid.uuid4())) | |||
self.writer.write(element) | |||
if self.writer.write(element): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
def write(self, value): | ||
self.sink.write_record(self.temp_handle, value) | ||
if self.sink.max_records_per_shard: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -108,6 +111,8 @@ def __init__( | |||
shard_name_template) | |||
self.compression_type = compression_type | |||
self.mime_type = mime_type | |||
self.max_records_per_shard = max_records_per_shard | |||
self.max_bytes_per_shard = max_bytes_per_shard |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch. Fixed so that both take effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! LGTM
Unrelated failure in apache_beam.runners.portability.fn_api_runner.fn_runner_test.FnApiRunnerTestWithGrpcAndMultiWorkers.test_pardo_large_input |
Run Python PreCommit |
1 similar comment
Run Python PreCommit |
Hi ! is this available for |
This has not been plumbed through for |
This fixes #22129 by possibly writing multiple shards per bundle.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.