Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix batch processing for parquet writer #13438

Merged
merged 9 commits into from
May 25, 2023

Conversation

ttnghia
Copy link
Contributor

@ttnghia ttnghia commented May 24, 2023

In parquet writer, the input table is divided into multiple batches (at 1GB limit), each batch is processed and flushed to sink one after another. The buffers storing data for processing each batch are reused among batches. This is to reduce peak GPU memory usage.

Unfortunately, in order to support retry mechanism, we have to have separate buffers for each batch. This is equivalent to always having one batch. The benefit of batch processing is stripped away. In #13076, we expect to keep data for all batches but failed to do that, causing a bug reported in #13414.

This PR fixes the issue introduced in #13076. And since we have to strip away the benefit of batch processing, peak memory usage may go up.

Flag this as breaking because peak GPU memory usage may go up and cause the downstream application to crash.

Note that this PR is a temporary fix for the outstanding issue. With this fix, the batch processing mechanism no longer gives any benefit for reducing peak memory usage. We consider removing all the batch processing code completely in the follow-up work, which involves a lot more changes.

Closes #13414.

@ttnghia ttnghia added bug Something isn't working 3 - Ready for Review Ready for review by team libcudf Affects libcudf (C++/CUDA) code. cuIO cuIO issue breaking Breaking change labels May 24, 2023
@ttnghia ttnghia requested a review from a team as a code owner May 24, 2023 21:54
@ttnghia ttnghia self-assigned this May 24, 2023
@ttnghia ttnghia requested review from hyperbolic2346 and vuule May 24, 2023 21:54
@github-actions github-actions bot added the CMake CMake build issue label May 24, 2023
@github-actions github-actions bot removed the CMake CMake build issue label May 24, 2023
@ttnghia
Copy link
Contributor Author

ttnghia commented May 24, 2023

Verify that this python test was successful with the data file provided in #13414:

import cudf

input_path ="batch_0_part_0.parquet"
output_path = "repro.parquet"

df = cudf.read_parquet(input_path)
df.to_parquet(output_path, partition_cols=["output_partition_id"])
shuffled_df = cudf.read_parquet(output_path)

@etseidl
Copy link
Contributor

etseidl commented May 24, 2023

There's an existing parquet test I wrote when I fixed the column indexes to work with multiple batches, but it only checked the file metadata, not the actual contents. Modifying that test as below detects the buffer reuse.

diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp
index a7199ad56e..8b6cd98afd 100644
--- a/cpp/tests/io/parquet_test.cpp
+++ b/cpp/tests/io/parquet_test.cpp
@@ -4095,6 +4095,12 @@ TEST_F(ParquetWriterTest, LargeColumnIndex)
       EXPECT_TRUE(compare_binary(ci.max_values[0], stats.max_value, ptype, ctype) >= 0);
     }
   }
+
+  cudf::io::parquet_reader_options read_args =
+    cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath});
+  auto result = cudf::io::read_parquet(read_args);
+
+  CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view());
 }
 
 TEST_F(ParquetWriterTest, CheckColumnOffsetIndex)

@ttnghia
Copy link
Contributor Author

ttnghia commented May 24, 2023

@etseidl proposed an alternative solution:

diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu
index 05d42cd9e2..2f4918f8f2 100644
--- a/cpp/src/io/parquet/writer_impl.cu
+++ b/cpp/src/io/parquet/writer_impl.cu
@@ -1782,7 +1782,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
   // Initialize batches of rowgroups to encode (mainly to limit peak memory usage)
   std::vector<size_type> batch_list;
   size_type num_pages          = 0;
-  size_t max_bytes_in_batch    = 1024 * 1024 * 1024;  // 1GB - TODO: Tune this
+  size_t max_bytes_in_batch    = std::numeric_limits<int64_t>::max();
   size_t max_uncomp_bfr_size   = 0;
   size_t max_comp_bfr_size     = 0;
   size_t max_chunk_bfr_size    = 0;

This may be better in term of performance, since that can reduce the number of kernel calls. I need more opinions on this before adopting it.

std::vector<rmm::device_buffer> comp_bfr;
std::vector<rmm::device_buffer> col_idx_bfr;
for (std::size_t i = 0; i < batch_list.size(); ++i) {
uncomp_bfr.emplace_back(max_uncomp_bfr_size, stream);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can lead to a large overuse of memory if there are unbalanced batches. For instance, in testing I had a file that did one batch of 14 row groups, and a second batch of a single row group, all sized the same. For that case this will have two buffers large enough for 14 rowgroups, 28 total, instead of the 15 needed. Could you add a note here to that effect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the alternative solution (#13438 (comment)) should be better. With that, we won't have overuse memory.

@etseidl
Copy link
Contributor

etseidl commented May 24, 2023

@etseidl proposed an alternative solution:

bandaid 🤣

@vuule
Copy link
Contributor

vuule commented May 24, 2023

There should still be a peak memory use benefit from batching, as we can at least free the buffers for uncompressed data before we process the next batch. This is somewhat speculative as I haven't looked into this part of the implementation and verified if this is actually how the writer works.

Comment on lines +1784 to +1785
// TODO: All the relevant code will be removed in the follow-up work:
// https://github.com/rapidsai/cudf/issues/13440
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this needs to be said, but please don't remove this code without further discussion. It could play a role in future performance improvements.

@GregoryKimball
Copy link
Contributor

Hello @wence-, thank you for your help studying #13414. Would you please take a look at this proposed solution?

@GregoryKimball GregoryKimball requested a review from wence- May 25, 2023 05:48
@wence-
Copy link
Contributor

wence- commented May 25, 2023

Hello @wence-, thank you for your help studying #13414. Would you please take a look at this proposed solution?

I can confirm this fixes the read/write issues from @etseidl's minimal reproducer.

That is, I can successfully run:

import pandas as pd
import random
import cudf
df = pd.DataFrame([(random.randbytes(16).hex(), random.randbytes(16).hex()) for _ in range(30_000_000)], columns=["a", "b"])
cdf = cudf.from_pandas(df)

cdf.to_parquet("cudf.pq")

cdf2 = cudf.read_parquet("cudf.pq")
df2 = pd.read_parquet("cudf.pq")

assert (cdf == cdf2).all().all() # => True
assert (df == df2).all().all() # => True

I can also round-trip the repro dataset from #13414, but like @ttnghia don't know exactly how to check for correctness.

cc: @quasiben and @VibhuJawa to check too.

Copy link
Contributor

@wence- wence- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I am happy with this change, but like @vuule think full removal of batching long-term should be discussed in more detail.

// This line disables batch processing (so batch size will no longer be limited at 1GB as before).
// TODO: All the relevant code will be removed in the follow-up work:
// https://github.com/rapidsai/cudf/issues/13440
auto const max_bytes_in_batch = std::numeric_limits<size_t>::max();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so this is the minimal change to "switch off" batching without changing any of the logic, which is likely the safest thing to do.

@GregoryKimball
Copy link
Contributor

I can confirm that now we can round-trip the repro dataset from #13414. The only difference is that the output_partition_id roundtrips into a category dtype instead of the starting int32 dtype.

@quasiben
Copy link
Member

Confirmed that repro dataframes are equal (after ignoring indexes and sorting)

@ttnghia
Copy link
Contributor Author

ttnghia commented May 25, 2023

/merge

@rapids-bot rapids-bot bot merged commit ae375d2 into rapidsai:branch-23.06 May 25, 2023
@ttnghia ttnghia deleted the fix_parquet_writer branch May 25, 2023 14:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
3 - Ready for Review Ready for review by team breaking Breaking change bug Something isn't working cuIO cuIO issue libcudf Affects libcudf (C++/CUDA) code.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] Partitioned Writing/Reading Failure
6 participants