Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closes #8502: Parallel NDJSON file reading #8659

Merged
merged 15 commits into from
Dec 31, 2023

Conversation

marvinlanhenke
Copy link
Contributor

Which issue does this PR close?

Closes #8502.

Rationale for this change

As stated in the issue:

DataFusion can now automatically read CSV and parquet files in parallel (see #6325 for CSV)
It would be great to do the same for "NDJSON" files -- namely files that have multiple JSON objects placed one after the other.

What changes are included in this PR?

Basically the same approach as in #6325.
However, I refactored and extracted some of the common functions like find_first_newline which is used by calculate_range, to be used by both the CSV and JSON implementation.

Are these changes tested?

Yes, basic tests are included.
However, I was not sure about benchmarking the changes (since benchmarks/bench.sh does not provide JSON dataset?).

Are there any user-facing changes?

No.

@github-actions github-actions bot added the core Core DataFusion crate label Dec 26, 2023
@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Dec 26, 2023
@alamb
Copy link
Contributor

alamb commented Dec 28, 2023

Thank you for this @marvinlanhenke - I plan to review it tomorrow.

cc @devinjdangelo

@marvinlanhenke
Copy link
Contributor Author

...just as a sidenote and originally stated in #6801 and #6922; this implementation is suboptimal since it performs multiple get operations on the object store. This will be handled in a follow-up Issue / PR - which I will (most) likely tackle in the next week.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @marvinlanhenke -- this PR was a pleasure to read and well tested. 👏

The only thing I think is needed is to resolve the merge conflicts and and update the test comments.

Thank you again for the contribution

----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------JsonExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/json_table/1.json]]}, projection=[column1]

----JsonExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/json_table/1.json:0..18], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/json_table/1.json:18..36], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/json_table/1.json:36..54], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/json_table/1.json:54..70]]}, projection=[column1]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉 -- can you also update the comment in this file to reflect the fact that it now reads the file in parallel 🥳 🦜


let calculated_range = calculate_range(&file_meta, &store).await?;

let range = match calculated_range {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really nice refactoring

/// Else `file_meta.range` is `Some(FileRange{start, end})`, which corresponds to the byte range [start, end) within the file.
///
/// Note: `start` or `end` might be in the middle of some lines. In such cases, the following rules
/// are applied to determine which lines to read:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could potentially help to link to the CsvOpener documentation too (which has an example)

@@ -441,4 +446,94 @@ mod tests {
.collect::<Vec<_>>();
assert_eq!(vec!["a: Int64", "b: Float64", "c: Boolean"], fields);
}

async fn count_num_partitions(ctx: &SessionContext, query: &str) -> Result<usize> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another potential way to figure out the file groups would be to make the physical plan and then walk it to find the JsonExec and its number of partitions

@marvinlanhenke
Copy link
Contributor Author

@alamb
Thank you so much for the review.

I resolved the merge conflicts and updated the docs according to your comments.

@alamb
Copy link
Contributor

alamb commented Dec 31, 2023

Epic work @marvinlanhenke -- thank you very much!

@alamb alamb merged commit 03bd9b4 into apache:main Dec 31, 2023
22 checks passed
@marvinlanhenke
Copy link
Contributor Author

... Some epic strg+c+v combos from the CSV implementation @alamb 😂 but thanks again for the kind review.

I am hoping to improve this implementation next week, by reducing the number of GetRequests on the object store down to one. However, the stream handling is more complicated than I wished for...

@alamb
Copy link
Contributor

alamb commented Jan 1, 2024

... Some epic strg+c+v combos from the CSV implementation @alamb 😂 but thanks again for the kind review.

Well, I think being able to extract and reuse code is far harder than just copy/paste/modify which you could have done ;)

However, the stream handling is more complicated than I wished for...

Yeah -- I agree -- it might need to get wrapped up into its own state machine / stream impl 🤔

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Parallel NDSON file reading
2 participants