Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Trino cannot read map column written by GPU #15473

Closed
jlowe opened this issue Apr 5, 2024 · 2 comments · Fixed by #15474
Closed

[BUG] Trino cannot read map column written by GPU #15473

jlowe opened this issue Apr 5, 2024 · 2 comments · Fixed by #15474
Labels
bug Something isn't working libcudf Affects libcudf (C++/CUDA) code. Spark Functionality that helps Spark RAPIDS

Comments

@jlowe
Copy link
Member

jlowe commented Apr 5, 2024

Describe the bug
Parquet files written by libcudf that contain map columns (libcudf sees them as LIST<STRUCT<keytype,valtype>> columns) can end up being unreadable by Trino version 376. The error on the Trino CLI when it fails is:

Error reading parquet page in column [m, key_value, key] required binary key (STRING)
The stacktrace on the Trino worker
org.apache.parquet.io.ParquetDecodingException: Error reading parquet page in column [entry_event_dtl, key_value, key] required binary key (STRING)
        at io.trino.parquet.reader.PrimitiveColumnReader.initDataReader(PrimitiveColumnReader.java:434)
        at io.trino.parquet.reader.PrimitiveColumnReader.readPageV1(PrimitiveColumnReader.java:391)
        at io.trino.parquet.reader.PrimitiveColumnReader.readNextPage(PrimitiveColumnReader.java:362)
        at io.trino.parquet.reader.PrimitiveColumnReader.processValues(PrimitiveColumnReader.java:312)
        at io.trino.parquet.reader.PrimitiveColumnReader.readValues(PrimitiveColumnReader.java:242)
        at io.trino.parquet.reader.PrimitiveColumnReader.readPrimitive(PrimitiveColumnReader.java:231)
        at io.trino.parquet.reader.ParquetReader.readPrimitive(ParquetReader.java:367)
        at io.trino.parquet.reader.ParquetReader.readColumnChunk(ParquetReader.java:441)
        at io.trino.parquet.reader.ParquetReader.readMap(ParquetReader.java:302)
        at io.trino.parquet.reader.ParquetReader.readColumnChunk(ParquetReader.java:435)
        at io.trino.parquet.reader.ParquetReader.readBlock(ParquetReader.java:424)
        at io.trino.plugin.hive.parquet.ParquetPageSource$ParquetBlockLoader.load(ParquetPageSource.java:211)
        at io.trino.spi.block.LazyBlock$LazyData.load(LazyBlock.java:400)
        at io.trino.spi.block.LazyBlock$LazyData.getFullyLoadedBlock(LazyBlock.java:379)
        at io.trino.spi.block.LazyBlock.getLoadedBlock(LazyBlock.java:286)
        at io.trino.spi.Page.getLoadedPage(Page.java:323)
        at io.trino.operator.project.InputChannels.getInputChannels(InputChannels.java:60)
        at io.trino.operator.project.PageProcessor.createWorkProcessor(PageProcessor.java:120)
        at io.trino.operator.ScanFilterAndProjectOperator$SplitToPages.lambda$processPageSource$1(ScanFilterAndProjectOperator.java:295)
        at io.trino.operator.WorkProcessorUtils.lambda$flatMap$5(WorkProcessorUtils.java:264)
        at io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:338)
        at io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:391)
        at io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:325)
        at io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:391)
        at io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:325)
        at io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:391)
        at io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:325)
 at io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:391)
        at io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:325)
        at io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:391)
        at io.trino.operator.WorkProcessorUtils.lambda$flatten$7(WorkProcessorUtils.java:296)
        at io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:338)
        at io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:391)
        at io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:325)
        at io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:391)
        at io.trino.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:240)
        at io.trino.operator.WorkProcessorUtils.lambda$processStateMonitor$3(WorkProcessorUtils.java:219)
        at io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:391)
        at io.trino.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:240)
        at io.trino.operator.WorkProcessorUtils.lambda$finishWhen$4(WorkProcessorUtils.java:234)
        at io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:391)
        at io.trino.operator.WorkProcessorSourceOperatorAdapter.getOutput(WorkProcessorSourceOperatorAdapter.java:150)
        at io.trino.operator.Driver.processInternal(Driver.java:388)
        at io.trino.operator.Driver.lambda$processFor$9(Driver.java:292)
        at io.trino.operator.Driver.tryWithLock(Driver.java:693)
        at io.trino.operator.Driver.processFor(Driver.java:285)
        at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1092)
        at io.trino.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
        at io.trino.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:488)
        at io.trino.$gen.Trino_376____20240403_172137_2.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.EOFException
        at org.apache.parquet.bytes.SingleBufferInputStream.read(SingleBufferInputStream.java:52)
        at org.apache.parquet.bytes.BytesUtils.readIntLittleEndianOnOneByte(BytesUtils.java:94)
        at io.trino.parquet.dictionary.DictionaryReader.initFromPage(DictionaryReader.java:40)
        at io.trino.parquet.reader.PrimitiveColumnReader.initDataReader(PrimitiveColumnReader.java:427)
        ... 50 more
2024-04-04T03:20:04.376Z        ERROR   stage-scheduler io.trino.execution.scheduler.SqlQueryScheduler  Failure in distributed stage for query 20240404_032003_00012_ai2y5
org.apache.parquet.io.ParquetDecodingException: Error reading parquet page in column [entry_event_dtl, key_value, key] required binary key (STRING)
        at io.trino.parquet.reader.PrimitiveColumnReader.initDataReader(PrimitiveColumnReader.java:434)
        at io.trino.parquet.reader.PrimitiveColumnReader.readPageV1(PrimitiveColumnReader.java:391)
        at io.trino.parquet.reader.PrimitiveColumnReader.readNextPage(PrimitiveColumnReader.java:362)
        at io.trino.parquet.reader.PrimitiveColumnReader.processValues(PrimitiveColumnReader.java:312)
        at io.trino.parquet.reader.PrimitiveColumnReader.readValues(PrimitiveColumnReader.java:242)
        at io.trino.parquet.reader.PrimitiveColumnReader.readPrimitive(PrimitiveColumnReader.java:231)
        at io.trino.parquet.reader.ParquetReader.readPrimitive(ParquetReader.java:367)
        at io.trino.parquet.reader.ParquetReader.readColumnChunk(ParquetReader.java:441)
        at io.trino.parquet.reader.ParquetReader.readMap(ParquetReader.java:302)
        at io.trino.parquet.reader.ParquetReader.readColumnChunk(ParquetReader.java:435)
        at io.trino.parquet.reader.ParquetReader.readBlock(ParquetReader.java:424)
        at io.trino.plugin.hive.parquet.ParquetPageSource$ParquetBlockLoader.load(ParquetPageSource.java:211)
        at io.trino.spi.block.LazyBlock$LazyData.load(LazyBlock.java:400)
        at io.trino.spi.block.LazyBlock$LazyData.getFullyLoadedBlock(LazyBlock.java:379)
        at io.trino.spi.block.LazyBlock.getLoadedBlock(LazyBlock.java:286)
        at io.trino.spi.Page.getLoadedPage(Page.java:323)
 at io.trino.operator.project.InputChannels.getInputChannels(InputChannels.java:60)
        at io.trino.operator.project.PageProcessor.createWorkProcessor(PageProcessor.java:120)
        at io.trino.operator.ScanFilterAndProjectOperator$SplitToPages.lambda$processPageSource$1(ScanFilterAndProjectOperator.java:295)
        at io.trino.operator.WorkProcessorUtils.lambda$flatMap$5(WorkProcessorUtils.java:264)
        at io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:338)
        at io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:391)
        at io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:325)
        at io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:391)
        at io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:325)
        at io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:391)
        at io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:325)
        at io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:391)
        at io.trino.operator.WorkProcessorUtils.lambda$flatten$7(WorkProcessorUtils.java:296)
        at io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:338)
        at io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:391)
        at io.trino.operator.WorkProcessorUtils$3.process(WorkProcessorUtils.java:325)
        at io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:391)
        at io.trino.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:240)
        at io.trino.operator.WorkProcessorUtils.lambda$processStateMonitor$3(WorkProcessorUtils.java:219)
        at io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:391)
        at io.trino.operator.WorkProcessorUtils.getNextState(WorkProcessorUtils.java:240)
        at io.trino.operator.WorkProcessorUtils.lambda$finishWhen$4(WorkProcessorUtils.java:234)
        at io.trino.operator.WorkProcessorUtils$ProcessWorkProcessor.process(WorkProcessorUtils.java:391)
        at io.trino.operator.WorkProcessorSourceOperatorAdapter.getOutput(WorkProcessorSourceOperatorAdapter.java:150)
        at io.trino.operator.Driver.processInternal(Driver.java:388)
        at io.trino.operator.Driver.lambda$processFor$9(Driver.java:292)
        at io.trino.operator.Driver.tryWithLock(Driver.java:693)
        at io.trino.operator.Driver.processFor(Driver.java:285)
        at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1092)
        at io.trino.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
 at io.trino.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:488)
        at io.trino.$gen.Trino_376____20240403_172137_2.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.EOFException
        at org.apache.parquet.bytes.SingleBufferInputStream.read(SingleBufferInputStream.java:52)
        at org.apache.parquet.bytes.BytesUtils.readIntLittleEndianOnOneByte(BytesUtils.java:94)
        at io.trino.parquet.dictionary.DictionaryReader.initFromPage(DictionaryReader.java:40)
        at io.trino.parquet.reader.PrimitiveColumnReader.initDataReader(PrimitiveColumnReader.java:427)
        ... 50 more

Digging into the Trino error, it is complaining about a PLAIN_DICTIONARY V1 page where the page is exhausted after the repetition and definition levels when trying to read the byte for the dictionary bit width.

Steps/Code to reproduce bug
It appears to be related to a page containing all nulls is written in such a way where the chunk is using a dictionary but the dictionary bit width byte is missing. @nvdbaranec helped create code to write a file that will trigger this missing byte.

Parquet write code
void write_trunc_dict()
{
  constexpr int num_rows = 20006;
  auto str_iter = cudf::detail::make_counting_transform_iterator(0, [](int i){ return "abc"; });
  auto valids = cudf::detail::make_counting_transform_iterator(0, [](int i){ return i >= 20000 ? 0 : 1; });

  std::vector<std::unique_ptr<cudf::column>> struct_children;  
  cudf::test::strings_column_wrapper keys{str_iter, str_iter + num_rows, valids};
  struct_children.push_back(keys.release());
  cudf::test::strings_column_wrapper values{str_iter, str_iter + num_rows, valids};
  struct_children.push_back(values.release());

  std::vector<bool> struct_valids(valids, valids + num_rows);
  cudf::test::structs_column_wrapper struct_col(std::move(struct_children), struct_valids);

  auto offset_iter = cudf::detail::make_counting_transform_iterator(0, [](int i){ return i < 20000 ? i : 20000; });
  cudf::test::fixed_width_column_wrapper<int> offsets(offset_iter, offset_iter + num_rows + 1);

  auto [null_mask, null_count] = cudf::test::detail::make_null_mask(valids, valids + num_rows);
  auto list_col = cudf::make_lists_column(num_rows, offsets.release(), struct_col.release(), null_count, std::move(null_mask));

  { 
    cudf::io::parquet_writer_options opts =
        cudf::io::parquet_writer_options::builder(cudf::io::sink_info{"parquet/trunc_page.parquet"}, table_view{{*list_col}})
          .dictionary_policy(cudf::io::dictionary_policy::ALWAYS);
    cudf::io::write_parquet(opts);
  }

  {
    cudf::io::parquet_reader_options in_opts = cudf::io::parquet_reader_options::builder(cudf::io::source_info{"parquet/trunc_page.parquet"});
    auto result = cudf::io::read_parquet(in_opts);    
  }  
}

The following patch to the libcudf reader will print out when it's properly seeing the dictionary bit width byte or missing the byte when decoding Parquet files, and can be used to detect when the byte is missing, which simulates the behavior of the Trino reader.

diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh
index a081ee4e03..06735f5719 100644
--- a/cpp/src/io/parquet/page_decode.cuh
+++ b/cpp/src/io/parquet/page_decode.cuh
@@ -1035,7 +1035,8 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
                                           size_t min_row,
                                           size_t num_rows,
                                           Filter filter,
-                                          page_processing_stage stage)
+                                          page_processing_stage stage,
+                                          int page_idx = 0)
 {
   int t = threadIdx.x;

@@ -1308,6 +1309,11 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
           }
           s->dict_run  = 0;
           s->dict_val  = 0;
+          if(cur < end){
+            printf("WILL read %d\n", page_idx);
+          } else {
+            printf("WONT read %d\n", page_idx);
+          }
           s->dict_bits = (cur < end) ? *cur++ : 0;
           if (s->dict_bits > 32 || (!s->dict_base && s->col.dict_page->num_input_values > 0)) {
             s->set_error_code(decode_error::INVALID_DICT_WIDTH);
diff --git a/cpp/src/io/parquet/page_string_decode.cu b/cpp/src/io/parquet/page_string_decode.cu
index d8b1c1cc04..62f3c8c372 100644
--- a/cpp/src/io/parquet/page_string_decode.cu
+++ b/cpp/src/io/parquet/page_string_decode.cu
@@ -973,7 +973,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
                           min_row,
                           num_rows,
                           mask_filter{decode_kernel_mask::STRING},
-                          page_processing_stage::DECODE)) {
+                          page_processing_stage::DECODE, page_idx)) {
     return;
   }

Expected behavior
libcudf does not write Parquet files that have PLAIN_DICTIONARY encoded pages missing the dictionary bit width byte.

Environment overview (please complete the following information)

  • Files written with the RAPIDS Accelerator for Apache Spark in Dataproc 2.1
  • Data read with Trino provided by Dataproc 2.1
@jlowe jlowe added bug Something isn't working libcudf Affects libcudf (C++/CUDA) code. Spark Functionality that helps Spark RAPIDS labels Apr 5, 2024
@nvdbaranec
Copy link
Contributor

Potential fix posted.

#15474

@nvdbaranec
Copy link
Contributor

Note, to generate a file that reproduces this:

void write_trunc_dict()
{
  constexpr int num_rows = 20006;
  auto str_iter = cudf::detail::make_counting_transform_iterator(0, [](int i){ return "abc"; });
  auto valids = cudf::detail::make_counting_transform_iterator(0, [](int i){ return i >= 20000 ? 0 : 1; });

  std::vector<std::unique_ptr<cudf::column>> struct_children;  
  cudf::test::strings_column_wrapper keys{str_iter, str_iter + num_rows, valids};
  struct_children.push_back(keys.release());
  cudf::test::strings_column_wrapper values{str_iter, str_iter + num_rows, valids};
  struct_children.push_back(values.release());

  std::vector<bool> struct_valids(valids, valids + num_rows);
  cudf::test::structs_column_wrapper struct_col(std::move(struct_children), struct_valids);

  auto offset_iter = cudf::detail::make_counting_transform_iterator(0, [](int i){ return i < 20000 ? i : 20000; });
  cudf::test::fixed_width_column_wrapper<int> offsets(offset_iter, offset_iter + num_rows + 1);

  auto [null_mask, null_count] = cudf::test::detail::make_null_mask(valids, valids + num_rows);
  auto list_col = cudf::make_lists_column(num_rows, offsets.release(), struct_col.release(), null_count, std::move(null_mask));

  { 
    cudf::io::parquet_writer_options opts =
        cudf::io::parquet_writer_options::builder(cudf::io::sink_info{"parquet/trunc_page.parquet"}, table_view{{*list_col}})
          .dictionary_policy(cudf::io::dictionary_policy::ALWAYS);
    cudf::io::write_parquet(opts);
  }

  {
    cudf::io::parquet_reader_options in_opts = cudf::io::parquet_reader_options::builder(cudf::io::source_info{"parquet/trunc_page.parquet"});
    auto result = cudf::io::read_parquet(in_opts);    
  }  
}

rapids-bot bot pushed a commit that referenced this issue Apr 9, 2024
#15474)

Fixes #15473

The issue is that in some cases, for example where we have all nulls, we can fail to update the size of the page output buffer, resulting in a missing byte expected by some readers.   Specifically, we poke the value of dict_bits into the output buffer here:

https://github.com/rapidsai/cudf/blob/6319ab708f2dff9fd7a62a5c77fd3b387bde1bb8/cpp/src/io/parquet/page_enc.cu#L1892

But, if we have no leaf values (for example, because everything in the page is null) `s->cur` never gets updated here, because we never enter the containing loop.

https://github.com/rapidsai/cudf/blob/6319ab708f2dff9fd7a62a5c77fd3b387bde1bb8/cpp/src/io/parquet/page_enc.cu#L1948

The fix is to just always update `s->cur` after this if-else block

https://github.com/rapidsai/cudf/blob/6319ab708f2dff9fd7a62a5c77fd3b387bde1bb8/cpp/src/io/parquet/page_enc.cu#L1891

Note that this was already handled by our reader.  But some third party readers (Trino) are expecting that data to be there and crash if it's not.

Authors:
  - https://github.com/nvdbaranec

Approvers:
  - Nghia Truong (https://github.com/ttnghia)
  - Muhammad Haseeb (https://github.com/mhaseeb123)

URL: #15474
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working libcudf Affects libcudf (C++/CUDA) code. Spark Functionality that helps Spark RAPIDS
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants