Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add header removal for TextIO #29202

Merged
merged 21 commits into from
Nov 8, 2023
Merged

Conversation

stankiewicz
Copy link
Contributor

@stankiewicz stankiewicz commented Oct 30, 2023

Fixes #17990

  • Adds header removal for TextIO.Read / TextIO.ReadAll (with tests)

Related to ##28502 which may not work if number of lines is large.
Related to ##28728 which is not scalable if files are large.

@github-actions github-actions bot added the java label Oct 30, 2023
@github-actions
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@stankiewicz
Copy link
Contributor Author

Run Java_GCP_IO_Direct PreCommit

@ghost
Copy link

ghost commented Oct 31, 2023

Hi @stankiewicz, thanks so much for contributing to Beam!

Per our discussion, I would recommend the following additional changes.

  1. Overloading the public functions to avoid breaking changes to the existed apis. (I see you have already submitted a new commit for that. Thanks for doing it!)
  2. Your current code works great when the number of header lines to skip is 1. Would you please also add the logic for the case when we need to skip multiple header lines? Our Python SDK has a similar code path that you may find it useful.
    skip_header_lines=0,

    Notice that in this scenario, each reader will have to skip the header lines and make sure their start position is larger than the last position of the header. This may hurt the performance. Therefore, we will keep all 3 code paths (number of header lines to skip = 0/None, 1, >1) and choose the most efficient one based on user input.
  3. Last but not least, I suggest using the same parameter name as our Python API, i.e.skipHeaderLines.

Again, great work!

Regards,

Shunping

@stankiewicz
Copy link
Contributor Author

Run Java_GCP_IO_Direct PreCommit

CHANGES.md Outdated
@@ -76,6 +76,7 @@ should handle this. ([#25252](https://github.com/apache/beam/issues/25252)).
* `upload_graph` as one of the Experiments options for DataflowRunner is no longer required when the graph is larger than 10MB for Java SDK ([PR#28621](https://github.com/apache/beam/pull/28621).
* state amd side input cache has been enabled to a default of 100 MB. Use `--max_cache_memory_usage_mb=X` to provide cache size for the user state API and side inputs. (Python) ([#28770](https://github.com/apache/beam/issues/28770)).
* Beam YAML stable release. Beam pipelines can now be written using YAML and leverage the Beam YAML framework which includes a preliminary set of IO's and turnkey transforms. More information can be found in the YAML root folder and in the [README](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/yaml/README.md).
* `TextIO` supports skipping header.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would go to beam 2.53.0. Also, link to the GitHub issue

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please append the the original issue link to this line, like

  • TextIO supports skipping header. (Java)(#17990).

private void skipHeader(int headerLines, boolean skipFirstLine) throws IOException {
if (headerLines == 1) {
readNextRecord();
} else if (headerLines > 1) {
Copy link

@ghost ghost Nov 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add a test to cover the skipping of multiple header lines?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

((SeekableByteChannel) channel).position(requiredPosition);
startOfNextRecord = requiredPosition;
skipHeader(skipHeaderLines, false);
if (requiredPosition > startOfNextRecord) {
Copy link

@ghost ghost Nov 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering what will happen if multiple readers having requiredPosition <= startOfNextRecord here.
In this case, more than one readers are assigned to a start position that falls in the header lines (assuming there are multiple long header lines). Not sure if the current code is handling this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm. In this case, among those readers, only one will have an end position outside of the header lines, and will continue to read. So this should not be an issue then.

@stankiewicz
Copy link
Contributor Author

Run Java_GCP_IO_Direct PreCommit

@stankiewicz
Copy link
Contributor Author

Run Java PreCommit

@stankiewicz
Copy link
Contributor Author

@shunping-google this is ready for review. after many retries of flaky tests, all green.

Copy link

@ghost ghost left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks again for contributing to Beam.

Copy link
Contributor

@Abacn Abacn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for resolving this long standing feature request!

Please resolve the merge conflict then we can get this in

(not for this PR) Also this remind me that we should better check the correctness of our wordcount precommit test, not only run them

@Abacn
Copy link
Contributor

Abacn commented Nov 8, 2023

testOrderedWithinKey (org.apache.beam.sdk.io.gcp.spanner.changestreams.it.SpannerChangeStreamOrderedWithinKeyIT) failed unrelated flaky test. merging for now

@Abacn Abacn merged commit bf47289 into apache:master Nov 8, 2023
26 of 28 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Skip header row from csv
2 participants