Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid reading the entire file in ChunkedStore #4525

Merged
merged 1 commit into from
Dec 7, 2022
Merged

Avoid reading the entire file in ChunkedStore #4525

merged 1 commit into from
Dec 7, 2022

Conversation

metesynnada
Copy link
Contributor

Which issue does this PR close?

Closes #4524.

Rationale for this change

We would like to have low memory usage while processing queries that involve only pipeline-able operators. Obviously, the very first step of this is to read files and byte streams in a suitable manner. However, the current ChunkedStore implementation materializes the whole data in the memory before creating the Result<Byte> stream. This PR fixes the implementation so that the entire file is actually read in chunks, not just outputted in chunks.

What changes are included in this PR?

We changed the get method of ChunkedStore so that it actually reads file in chunks, without reading the whole file and splitting it into chunks.

Are these changes tested?

Chunked byte stream conversion is tested for

  • CSV
  • JSON
  • AVRO
  • Byte array.

Are there any user-facing changes?

Now, a user can use ChunkedStore for incremental reading for various types. The user can even concatenate ChunkedStore with the AmazonS3 store for increased reading performance.

Discussion for future work

(1) ChunkedStore's current use case seems to be mostly subsumed by arrow_json] and arrow_csv]. They can also read and output files in chunks if we supply false to with_collect_statistics and (1 to target_partition in certain cases like reading FIFO files).

Therefore, ChunkedStore may not be required anymore. If we are not missing something and this is indeed the case, we can discuss deprecating it in the future.

(2) One has to load the entire data in memory for byte streams unless one defines a schema; i.e. infer_schema operates on the entire dataset.

We probably want to fix this too.

@github-actions github-actions bot added the core Core DataFusion crate label Dec 6, 2022
@alamb alamb requested a review from tustvold December 6, 2022 14:14
Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChunkStore is purely intended for use within tests, to allow control over the chunks in the produced output streams, which in turn allows verifying the delimiting logic in newline_delimited_stream.

I think perhaps this could be made more clear. Taking a step back the chunk size used by CSV and JSON is dictated by the object stores themselves, in particular the chunk size used by the HTTP transport.

Edit: looking at the issue I'm not sure I follow the problem, I believe if you just use the normal ObjectStore implementations it should stream correctly? Are you manually adding ChunkedStore into the path?

} else {
Url::from_file_path(path)
}
.map_err(|_| DataFusionError::Internal(format!("Can not open path: {}", s)))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this change necessary, canonicalize above will fail if the path doesn't exist?

Copy link
Contributor

@ozankabak ozankabak Dec 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a generalization of the old checking logic. When you check with is_file(), you can not read certain file types such as FIFO files. To support all kinds of readable files, one typically checks with !is_dir(). You can get more context here at this clippy discussion.

So this change is simply applying the modification above and turning the boolean match expression to an if expression, which is slightly more idiomatic.

Are you asking about map_err vs unwrap? We weren't sure if an error is impossible here, so we added the map_err just in case. I typically prefer doing this for the purposes of defensive programming:

  • If what is now impossible becomes possible in the future because the code calling canonicalize changes for any reason, we would still be producing a sensible error instead of panicking.
  • It also guards against cases where the path is valid at the time you call canonicalize, but then gets deleted in between two function calls. This is not a realistic case, but it is possible in theory.

However, if you believe it reduces readability, we can go back to unwrap.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To support all kinds of readable files, one typically checks with !is_dir(). You can get more context here at this clippy discussion.

Makes sense, thank you

However, if you believe it reduces readability, we can go back to unwrap.

I think it does, but I don't feel especially strongly 😅 . There are long term plans to revisit the error handling to use something like anyhow, which will clean this all up

@ozankabak
Copy link
Contributor

ozankabak commented Dec 6, 2022

@tustvold, maybe I can provide a little more context: We tried using ChunkedStore while testing various approaches to streaming execution (i.e. using non-pipeline-breaking operators all the way).

One of the approaches we use while doing this is to monitor memory usage when processing big files (or "infinite" files like FIFOs). In this context, we discovered that ChunkedStore read all the bytes into memory and then split it into chunks. This behavior has two consequences:

  • One can not use it to test things involving FIFO files, or any unbounded file -- this was how we discovered the issue in the first place.
  • If one were to use it to ingest a large file, it would not behave reasonably -- this is more of a theoretical issue since its area of intended use is probably elsewhere.

All in all, this small change does not change its general behavior at all but makes it useable in the above scenarios too. Since it does not make any API change and change chunk contents, it doesn't result in any behavioral changes for already existing use cases.

Thanks for the review!

@tustvold
Copy link
Contributor

tustvold commented Dec 6, 2022

Thank you for taking the time to respond, a couple of follow up questions to help my understanding. I was actually about to file a PR to gate ChunkedStore behind cfg(test) so would like to better understand you use-case as I'm clearly missing something?

In this context, we discovered that ChunkedStore read all the bytes into memory and then split it into chunks.

What was the motivation for using ChunkedStore over just using the standard LocalFileSystem? This would return GetResult::File which would then synchronously read data from the file in batches automatically?

If one were to use it to ingest a large file, it would not behave reasonably

Agreed, the ChunkedStore will only ever make the experience of the wrapped ObjectStore worse. I wrote it to make them controllably worse in order to test edge cases in the file format readers, it is actively worse to use ChunkedStore than to not. Or at least that is what I had thought, and why I am now asking these questions? 😅

@ozankabak
Copy link
Contributor

What was the motivation for using ChunkedStore over just using the standard LocalFileSystem? This would return GetResult::File which would then synchronously read data from the file in batches automatically?

Yes, I agree that this makes more sense in a real application. We are currently writing POCs/unit tests for candidate streaming execution approaches that operate on things like FIFOs, and we are using ChunkedStore because we saw it being used in tests and it is easy to control it to investigate things as you mention.

When we first started, the context and intended usage of ChunkedStore was not entirely clear to us, and @metesynnada's first remark kind of tries to get clarification on that. Thanks for making it clear!

Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this, cool that this is getting use in downstream testing 👍

vec![
Bytes::from("hello\nworld\n"),
Bytes::from("bingo\n"),
Bytes::from("cupcakes")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Some(Ok(bytes)) => delimiter.push(bytes),
Some(Err(e)) => return Some((Err(e), (s, delimiter, exhausted))),
None => {
exhausted = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this is to handle the case of a missing null terminator, for which there is a test added below 👍

@alamb
Copy link
Contributor

alamb commented Dec 7, 2022

Thanks everyone for the reviews and disucssion

@alamb alamb merged commit 61a6c89 into apache:master Dec 7, 2022
@ursabot
Copy link

ursabot commented Dec 7, 2022

Benchmark runs are scheduled for baseline = cedb05a and contender = 61a6c89. 61a6c89 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@alamb
Copy link
Contributor

alamb commented Dec 7, 2022

Given the possible confusion about ChunkedStore I proposed some additional clarification in docstrings on #4541

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Avoid reading the entire file in ChunkedStore
5 participants