Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CSV Decoder::capacity (#3674) #3677

Merged
merged 4 commits into from
Feb 10, 2023

Conversation

tustvold
Copy link
Contributor

@tustvold tustvold commented Feb 8, 2023

Which issue does this PR close?

Closes #3674

Rationale for this change

What changes are included in this PR?

Are there any user-facing changes?

@tustvold tustvold force-pushed the add-csv-has-capacity branch from 9e229f6 to 73a7644 Compare February 8, 2023 18:54
@tustvold tustvold marked this pull request as ready for review February 9, 2023 17:21
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code looks good, but I struggle to understand the test. I apologize if I am missing something obvious

@@ -438,10 +438,10 @@ impl<R: BufRead> BufReader<R> {
loop {
let buf = self.reader.fill_buf()?;
let decoded = self.decoder.decode(buf)?;
if decoded == 0 {
self.reader.consume(decoded);
if decoded == 0 || self.decoder.capacity() == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has the effect of potentially creating smaller output batches, right? Basically the reader will now yield up any batches it already has buffered.

Copy link
Contributor Author

@tustvold tustvold Feb 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, it will yield only if it has fully read the batch size number of rows. I.e it will yield if it has read enough data, instead of looping around and calling fill_buf again to potentially read more data that it is just going to ignore (as it has already read batch_size rows)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if decoded == 0 || self.decoder.capacity() == 0 {
// yield only if it has fully read the batch size number of rows.
// instead of looping around and calling fill_buf again to potentially
// read more data that it is just going to ignore (as it has already read batch_size rows)
if decoded == 0 || self.decoder.capacity() == 0 {

@@ -2269,4 +2274,73 @@ mod tests {
"Csv error: Encountered invalid UTF-8 data for line 1 and field 1",
);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry if I am mis understanding the rationale for this change, but I don't understand how this tests mimics what is described in #3674 -- namely that data that has been read can be decoded and read out as record batches prior to sending the end of stream.

I wonder can we write a test like

  1. set the batch size to 5 (bigger than the output data)
    2.send in "foo,bar\nbaz,foo\n"
  2. Read those two records <-- as I understand this is what can not be done today
  3. Feed in "a,b\nc,d" + EOS
  4. read the final two records

I struggle how to map this test to that usecase -- it would be hard for me to understand the purpose of this test if I saw it without context. E.g. why is it important that there were two calls to fill(0)?

Copy link
Contributor Author

@tustvold tustvold Feb 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

namely that data that has been read can be decoded and read out as record batches prior to sending the end of stream.

Because that isn't the issue. The problem was that it would try to fill the buffer again, even if it had already read the batch_size number of rows.

Without the change in this PR you have

fill_sizes: [23, 3, 3, 0, 0]

In the case of a streaming decoder, this could potentially cause it to wait for more input when it doesn't actually need any more input as it already has the requisite number of rows

I wonder can we write a test like

This has never been supported, and realistically can't be supported as BufRead::fill_buf will only return an empty slice on EOS, otherwise it will block. There is no fill_buf if data available that I am aware of.

Edit: it would appear there is an experimental API - rust-lang/rust#86423

Edit Edit: this might actually be impossible in general - https://stackoverflow.com/questions/5431941/why-is-while-feoffile-always-wrong

Copy link
Contributor

@alamb alamb Feb 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the explanation

arrow-csv/src/lib.rs Outdated Show resolved Hide resolved

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
@@ -438,10 +438,10 @@ impl<R: BufRead> BufReader<R> {
loop {
let buf = self.reader.fill_buf()?;
let decoded = self.decoder.decode(buf)?;
if decoded == 0 {
self.reader.consume(decoded);
if decoded == 0 || self.decoder.capacity() == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if decoded == 0 || self.decoder.capacity() == 0 {
// yield only if it has fully read the batch size number of rows.
// instead of looping around and calling fill_buf again to potentially
// read more data that it is just going to ignore (as it has already read batch_size rows)
if decoded == 0 || self.decoder.capacity() == 0 {

@tustvold tustvold merged commit 3e08a75 into apache:master Feb 10, 2023
@ursabot
Copy link

ursabot commented Feb 10, 2023

Benchmark runs are scheduled for baseline = 5b1821e and contender = 3e08a75. 3e08a75 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
arrow Changes to the arrow crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Arrow-csv reader cannot produce RecordBatch even if the bytes are necessary
3 participants