Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support coalescing reading for avro #5306

Merged
merged 9 commits into from
May 4, 2022

Conversation

firestarman
Copy link
Collaborator

This PR is to enable the coalescing reading for avro.

It has mainly

  • added the relevant configs.
  • created the classes for the multi-file reading factory and reader, along with some utils.
  • done some small refactor to reduce some duplicated code.
  • updated the avro tests for the coalescing reading case.

Performance on Local

  • CPU 12 cores, and one GPU (Titan V, with 12GB memory)

  • Non-partitioned 2000 avro files, 4.4GB in total in LOCAL storage

    CPU PERFILE COALESCING
    time(sec) 27.844 24.758 16.005

closes #5149

Signed-off-by: Firestarman [email protected]

@firestarman
Copy link
Collaborator Author

build

Signed-off-by: Firestarman <[email protected]>
@sameerz sameerz added the performance A performance related task/issue label Apr 25, 2022
@sameerz sameerz added this to the Apr 18 - Apr 29 milestone Apr 25, 2022
@firestarman
Copy link
Collaborator Author

build

@firestarman
Copy link
Collaborator Author

A follow-up issue #5312

integration_tests/src/main/python/avro_test.py Outdated Show resolved Hide resolved

private lazy val hasAvroJar = ExternalSource.hasSparkAvroJar

test("Use coalescing reading for local files") {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like many duplicated test code for avro and MultiReaderTypeSuite ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can wrap the same code into MultiReaderTypeSuite and pass the "assume condition" to it

Copy link
Collaborator Author

@firestarman firestarman Apr 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation is clear and easy to understand, and these tests will be removed once multi-threaded reading is done.

val singleFileInfo = try {
filterHandler.filterBlocks(file)
} catch {
case e: FileNotFoundException if ignoreMissingFiles =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add the related tests for this function?

Copy link
Collaborator Author

@firestarman firestarman Apr 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean unit test or IT ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I failed to figure out how to create such case ? Also there are no related tests in ORC and Parquet.
Do you have any suggestion for this ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for the wrong place. I mean we should add a related test for corrupt files, Please refer to test_parquet_read_with_corrupt_files

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val headerAndBlocks = BlockInfo(0, header.firstBlockStart, 0, 0) +: blocks
copyBlocksData(headerAndBlocks, in, out)
// check we didn't go over memory
if (out.getPos > estOutSize) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we do re-allocate here instead of throwing an exception?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just copied the code from ORC and Parquet. If this is a corner case, I will do it in a follow-up PR.

out: OutputStream): Seq[BlockInfo] = {
val copyRanges = computeCopyRanges(blocks)
// copy cache: 8MB
val copyCache = new Array[Byte](8 * 1024 * 1024)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you consider to use the same way like parquet?

val copyBufferSize = conf.getInt("parquet.read.allocation.size", 8 * 1024 * 1024)

Copy link
Collaborator Author

@firestarman firestarman Apr 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, nice catch, I am doing it and it will come with the multi-threaded reading code.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please file an followup, just in case you have forgot this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Signed-off-by: Firestarman <[email protected]>
@firestarman firestarman requested a review from jlowe April 29, 2022 04:22
@firestarman
Copy link
Collaborator Author

build

Signed-off-by: Firestarman <[email protected]>
@firestarman
Copy link
Collaborator Author

build

Signed-off-by: Firestarman <[email protected]>
@firestarman firestarman requested a review from jlowe May 2, 2022 09:04
@firestarman
Copy link
Collaborator Author

build

Signed-off-by: Firestarman <[email protected]>
@firestarman firestarman requested a review from jlowe May 3, 2022 01:45
@firestarman
Copy link
Collaborator Author

build

Signed-off-by: Firestarman <[email protected]>
@firestarman
Copy link
Collaborator Author

build

Signed-off-by: Firestarman <[email protected]>
@firestarman
Copy link
Collaborator Author

build

@firestarman firestarman merged commit 4b13b04 into NVIDIA:branch-22.06 May 4, 2022
@firestarman firestarman deleted the coal-avro branch May 4, 2022 10:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance A performance related task/issue
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add the COALESCING reading support for avro
4 participants