Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA][Java] ArrowIPCTableWriter writes en empty batch in the case of an empty table. #11882

Closed
firestarman opened this issue Oct 8, 2022 · 0 comments · Fixed by #11883
Closed
Labels
feature request New feature or request

Comments

@firestarman
Copy link
Contributor

firestarman commented Oct 8, 2022

Is your feature request related to a problem? Please describe.
Currently the ArrowIPCTableWriter will write no batches into the steam if giving an empty table, which PySpark cannot handle correctly, complaining of the error as below.

E                     File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 297, in load_stream
E                       [self.arrow_to_pandas(c) for c in pa.Table.from_batches(batch2).itercolumns()]
E                     File "pyarrow/table.pxi", line 1609, in pyarrow.lib.Table.from_batches
E                   ValueError: Must pass schema, or at least one RecordBatch

Pyspark is calling the pyarrow.Table.from_batches without specifying a schema, then it expects at least one batch (even an empty one) to be received to infer the batch schema.

I have made an unit test to reproduce this case.

  @Test
  void testArrowIPCWriteEmptyToBufferChunked() {
    try (Table emptyTable = new Table.TestBuilder().timestampDayColumn().build();
         MyBufferConsumer consumer = new MyBufferConsumer()) {
      ArrowIPCWriterOptions options = ArrowIPCWriterOptions.builder()
              .withColumnNames("day")
              .build();
      try (TableWriter writer = Table.writeArrowIPCChunked(options, consumer)) {
        writer.write(emptyTable);
      }
      try (StreamedTableReader reader = Table.readArrowIPCChunked(new MyBufferProvider(consumer))) {
        boolean done = false;
        int count = 0;
        while (!done) {
          try (Table t = reader.getNextIfAvailable()) {
            if (t == null) {
              done = true;
            } else {
              assertTablesAreEqual(emptyTable, t);
              count++;
            }
          }
        }
        // Expect one empty batch for the empty table.
        assertEquals(1, count);
      }
    }
  }

Describe the solution you'd like
ArrowIPCTableWriter writes en empty batch explicitly in the case of an empty table. We can do this in the JNI layer easily, e.g.

diff --git a/java/src/main/native/src/TableJni.cpp b/java/src/main/native/src/TableJni.cpp
index ad280cad5f..c23c5a3ccb 100644
--- a/java/src/main/native/src/TableJni.cpp
+++ b/java/src/main/native/src/TableJni.cpp
@@ -258,7 +258,15 @@ public:
       writer = *tmp_writer;
       initialized = true;
     }
-    writer->WriteTable(*arrow_tab, max_chunk);
+    if (arrow_tab->num_rows() == 0) {
+      // Arrow C++ IPC writer will not write an empty batch in the case of an
+      // empty table, so need to write an empty batch explicitly.
+      // For more please see https://issues.apache.org/jira/browse/ARROW-17912.
+      auto empty_batch = arrow::RecordBatch::MakeEmpty(arrow_tab->schema());
+      writer->WriteRecordBatch(*(*empty_batch));
+    } else {
+      writer->WriteTable(*arrow_tab, max_chunk);
+    }
   }

Describe alternatives you've considered
Let the Arrow C++ IPC writer used by the cuDF JNI support writing en empty batch for an empty table, I tried but failed.
For more details, please refer to https://issues.apache.org/jira/browse/ARROW-17912. BTW, Arrow Java IPC writer can do this, I mean, sending out an empty batch implicitly.

We can also update the Pyspark to specify the schema, but not sure how long it will take to have the change done.

So in a short term, we can do it in the cuDF JNI.

@firestarman firestarman added feature request New feature or request Needs Triage Need team to review and classify labels Oct 8, 2022
@firestarman firestarman changed the title [FEA][JVM] ArrowIPCTableWriter writes en empty batch in the case of an empty table. [FEA][Java] ArrowIPCTableWriter writes en empty batch in the case of an empty table. Oct 8, 2022
rapids-bot bot pushed a commit that referenced this issue Oct 11, 2022
…le. (#11883)

closes #11882

Updated the `ArrowIPCTableWriter` to write en empty batch explicitly in the case of an empty table, because the Arrow IPC writer will write no batches out for this case, leading to an error as below when calling the `Pyarrow.Table.from_batches` without specifying a schema.
```
E                     File "pyarrow/table.pxi", line 1609, in pyarrow.lib.Table.from_batches
E                   ValueError: Must pass schema, or at least one RecordBatch
```

Signed-off-by: Liangcai Li <[email protected]>

Authors:
  - Liangcai Li (https://github.com/firestarman)

Approvers:
  - Nghia Truong (https://github.com/ttnghia)

URL: #11883
@bdice bdice removed the Needs Triage Need team to review and classify label Mar 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants