Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel NDSON file reading #8502

Closed
alamb opened this issue Dec 11, 2023 · 18 comments · Fixed by #8659
Closed

Parallel NDSON file reading #8502

alamb opened this issue Dec 11, 2023 · 18 comments · Fixed by #8659
Labels
enhancement New feature or request good first issue Good for newcomers

Comments

@alamb
Copy link
Contributor

alamb commented Dec 11, 2023

Is your feature request related to a problem or challenge?

DataFusion can now automatically read CSV and parquet files in parallel (see #6325 for CSV)

It would be great to do the same for "NDJSON" files -- namely files that have multiple JSON objects placed one after the other.

Describe the solution you'd like

Basically implement what is described in #6325 for JSON -- and read a single large ND json file (new line delimited file) in parallel

Describe alternatives you've considered

Some research may be required -- I am not sure if finding record boundaries is feasible

Additional context

I found this while writing tests for #8451

@alamb alamb added the enhancement New feature or request label Dec 11, 2023
@alamb alamb changed the title Parallel NDSON reading Parallel NDSON file reading Dec 11, 2023
@tustvold
Copy link
Contributor

This should be simpler than CSV, as NDJSON does not typically permit unescaped newline characters, so it should just be a case of finding the next newline

@alamb alamb added the good first issue Good for newcomers label Dec 12, 2023
@alamb
Copy link
Contributor Author

alamb commented Dec 12, 2023

I think this is a medium difficulty task for a new contributor as the pattern exists and there are tests (e.g. see #8505)

@JacobOgle
Copy link
Contributor

@alamb I wouldn't mind digging into this one if its still open

@alamb
Copy link
Contributor Author

alamb commented Dec 13, 2023

@alamb I wouldn't mind digging into this one if its still open

I just filed it and I don't know of anyone else working on it.

Thanks @JacobOgle

@kassoudkt
Copy link

Is it still available ? i wloud love to take it :)

@JacobOgle
Copy link
Contributor

@kassoudkt feel free! I've been a bit tied up lately so if you're free go for it!

@marvinlanhenke
Copy link
Contributor

This should be simpler than CSV, as NDJSON does not typically permit unescaped newline characters, so it should just be a case of finding the next newline

@tustvold @alamb
...out of curiosity, I was digging into this as well. From my understanding (looking at the CSV impl) the FileGroupPartitioner and its method repartition_file_groups are used to create the partitions. However, in this case evenly divided by size.

In order for NDJSON to be split "correctly" (and not in the middle of a JSON Object) the FileGroupPartitioner needs a new method to split on newline? Would this be a reasonable approach?
Then only fn repartitioned of trait ExecutionPlan and fn open of trait FileOpener need to be implemented.

Thanks for helping out.

@tustvold
Copy link
Contributor

tustvold commented Dec 23, 2023

The way it typically works is the split is based on filesize but the reader is setup such that one of the bounds includes the current partial row, and the other does not. For example the reader starts at the NEXT newline (with special case for first row) and stops when it reaches the end of a line AND the byte position now exceeds the end limit. CSV (and parquet) behave similarly.

This all avoids the planner needing to perform IO, which is pretty important

@marvinlanhenke
Copy link
Contributor

marvinlanhenke commented Dec 23, 2023

I just realized that I forgot the IO part. Now, I understand the approach better - thanks for the explanation.

@alamb
Copy link
Contributor Author

alamb commented Dec 24, 2023

In order for NDJSON to be split "correctly" (and not in the middle of a JSON Object) the FileGroupPartitioner needs a new method to split on newline? Would this be a reasonable approach?

Indeed -- I think the relevant code that finds the next bounds is https://github.com/apache/arrow-datafusion/blob/a1e959d87a66da7060bd005b1993b824c0683a63/datafusion/core/src/datasource/physical_plan/csv.rs#L411-L450

@marvinlanhenke
Copy link
Contributor

marvinlanhenke commented Dec 25, 2023

@alamb thanks for the pointers.

I already implemented a working solution, however I need to do some refactoring (if my kids let me :P ).
I'd also like to extract the common functionality since the NdJson and the CSV implementation are nearly the same; any suggestions where to put those utility functions, like find_first_newline? I think mod.rs would be fine.

@alamb
Copy link
Contributor Author

alamb commented Dec 26, 2023

arrow-datafusion/datafusion/core/src/datasource/physical_plan/mod.rs sounds like a good idea to me

@marvinlanhenke
Copy link
Contributor

marvinlanhenke commented Dec 26, 2023

@alamb
...found some time to clean up the changes.

However, I am not sure about properly benchmarking the solution (as stated in the PR) and perhaps some more tests are needed? I am looking forward to your feedback.

@alamb
Copy link
Contributor Author

alamb commented Dec 27, 2023

However, I am not sure about properly benchmarking the solution (as stated in the PR) and perhaps some more tests are needed? I am looking forward to your feedback.

I think a good test would be to find a largish JSON input file and show some benchmark reading numbers

I don't know of any existing benchmarks we have for reading large JSON files. Maybe we could add a benchmark for reading from large JSON (any CSV?) files in https://github.com/apache/arrow-datafusion/tree/main/benchmarks#datafusion-benchmarks

Something like

bench.sh run parse

That would measure the speed of parsing a large JSON file

@marvinlanhenke
Copy link
Contributor

@alamb

I did some basic benchmarking.

Methodology:

  1. Generated a 60mil rows NDJSON file (~3.7G)
  2. Run tests with datafusion-cli (before / after changes)
  3. create external table json_test stored as json location '/home/ml/data_60m.json';
  4. select * from json_test; & select * from json_test where a > 5;

Results:

query before after
select * from json_test; ~24s ~24s
select * from json_test where a > 5; ~26s ~11s

When applying a filter and explain select * from json_test where a > 5;
we can see the repartitioning happening (file_groups: 12).

However, when simply running select * from json_test.
File_groups remain at 1 and we get no parallel reading.

I think this issue relates to: #6983
Haven't tested it with a dataframe; however the issue seems to remain, at least for the datafusion-cli
(tested with JSON and CSV)

image

@alamb
Copy link
Contributor Author

alamb commented Dec 28, 2023

select * from json_test; & select * from json_test where a > 5;

A good test of just the speed of the parsing might be something like

select count(*) from json_test; 
select count(*) from json_test where a > 5;

That will minimize most of the actual query work other than parsing and won't need to try and format / carry through the other columns

@marvinlanhenke
Copy link
Contributor

...the updated result with different queries:

query before after
select count(*) from json_test; ~19s ~6s
select count(*) from json_test where a > 5; ~18s ~8s

alamb pushed a commit that referenced this issue Dec 31, 2023
* added basic test

* added `fn repartitioned`

* added basic version of FileOpener

* refactor: extract calculate_range

* refactor: handle GetResultPayload::Stream

* refactor: extract common functions to mod.rs

* refactor: use common functions

* added docs

* added test

* clippy

* fix: test_chunked_json

* fix: sqllogictest

* delete imports

* update docs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request good first issue Good for newcomers
Projects
None yet
5 participants