From bab0ba3b3f636cbe94502e6990f4eabe7678c26c Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Sun, 8 Sep 2024 07:37:21 +0900 Subject: [PATCH] GH-40860: [GLib][Parquet] Add `gparquet_arrow_file_writer_write_record_batch()` The following APIs are also added: * `gparquet_arrow_file_writer_get_schema()` * Parquet::ArrowFileWriter#write` in Ruby --- c_glib/parquet-glib/arrow-file-writer.cpp | 46 ++++++++- c_glib/parquet-glib/arrow-file-writer.h | 10 ++ c_glib/test/parquet/test-arrow-file-writer.rb | 38 ++++++- .../lib/parquet/arrow-file-writer.rb | 98 +++++++++++++++++++ ruby/red-parquet/lib/parquet/loader.rb | 1 + .../test/test-arrow-file-writer.rb | 68 +++++++++++++ 6 files changed, 254 insertions(+), 7 deletions(-) create mode 100644 ruby/red-parquet/lib/parquet/arrow-file-writer.rb create mode 100644 ruby/red-parquet/test/test-arrow-file-writer.rb diff --git a/c_glib/parquet-glib/arrow-file-writer.cpp b/c_glib/parquet-glib/arrow-file-writer.cpp index b6f019ed27d46..0d0e87e7e3ede 100644 --- a/c_glib/parquet-glib/arrow-file-writer.cpp +++ b/c_glib/parquet-glib/arrow-file-writer.cpp @@ -316,14 +316,13 @@ gparquet_writer_properties_get_data_page_size(GParquetWriterProperties *properti return parquet_properties->data_pagesize(); } -typedef struct GParquetArrowFileWriterPrivate_ +struct GParquetArrowFileWriterPrivate { parquet::arrow::FileWriter *arrow_file_writer; -} GParquetArrowFileWriterPrivate; +}; enum { - PROP_0, - PROP_ARROW_FILE_WRITER + PROP_ARROW_FILE_WRITER = 1, }; G_DEFINE_TYPE_WITH_PRIVATE(GParquetArrowFileWriter, @@ -496,6 +495,45 @@ gparquet_arrow_file_writer_new_path(GArrowSchema *schema, } } +/** + * gparquet_arrow_file_writer_get_schema: + * @writer: A #GParquetArrowFileWriter. + * + * Returns: (transfer full): The schema to be written to. + * + * Since: 18.0.0 + */ +GArrowSchema * +gparquet_arrow_file_writer_get_schema(GParquetArrowFileWriter *writer) +{ + auto parquet_arrow_file_writer = gparquet_arrow_file_writer_get_raw(writer); + auto arrow_schema = parquet_arrow_file_writer->schema(); + return garrow_schema_new_raw(&arrow_schema); +} + +/** + * gparquet_arrow_file_writer_write_record_batch: + * @writer: A #GParquetArrowFileWriter. + * @record_batch: A record batch to be written. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Returns: %TRUE on success, %FALSE if there was an error. + * + * Since: 18.0.0 + */ +gboolean +gparquet_arrow_file_writer_write_record_batch(GParquetArrowFileWriter *writer, + GArrowRecordBatch *record_batch, + GError **error) +{ + auto parquet_arrow_file_writer = gparquet_arrow_file_writer_get_raw(writer); + auto arrow_record_batch = garrow_record_batch_get_raw(record_batch).get(); + auto status = parquet_arrow_file_writer->WriteRecordBatch(*arrow_record_batch); + return garrow_error_check(error, + status, + "[parquet][arrow][file-writer][write-record-batch]"); +} + /** * gparquet_arrow_file_writer_write_table: * @writer: A #GParquetArrowFileWriter. diff --git a/c_glib/parquet-glib/arrow-file-writer.h b/c_glib/parquet-glib/arrow-file-writer.h index 71cbfa195e842..7eb14fe27a8bf 100644 --- a/c_glib/parquet-glib/arrow-file-writer.h +++ b/c_glib/parquet-glib/arrow-file-writer.h @@ -116,6 +116,16 @@ gparquet_arrow_file_writer_new_path(GArrowSchema *schema, GParquetWriterProperties *writer_properties, GError **error); +GPARQUET_AVAILABLE_IN_18_0 +GArrowSchema * +gparquet_arrow_file_writer_get_schema(GParquetArrowFileWriter *writer); + +GPARQUET_AVAILABLE_IN_18_0 +gboolean +gparquet_arrow_file_writer_write_record_batch(GParquetArrowFileWriter *writer, + GArrowRecordBatch *record_batch, + GError **error); + GPARQUET_AVAILABLE_IN_0_11 gboolean gparquet_arrow_file_writer_write_table(GParquetArrowFileWriter *writer, diff --git a/c_glib/test/parquet/test-arrow-file-writer.rb b/c_glib/test/parquet/test-arrow-file-writer.rb index f899e7273b2a2..e348c9b679524 100644 --- a/c_glib/test/parquet/test-arrow-file-writer.rb +++ b/c_glib/test/parquet/test-arrow-file-writer.rb @@ -26,7 +26,39 @@ def setup end end - def test_write + def test_schema + schema = build_schema("enabled" => :boolean) + writer = Parquet::ArrowFileWriter.new(schema, @file.path) + assert_equal(schema, writer.schema) + writer.close + end + + def test_write_record_batch + enabled_values = [true, nil, false, true] + record_batch = + build_record_batch("enabled" => build_boolean_array(enabled_values)) + + writer = Parquet::ArrowFileWriter.new(record_batch.schema, @file.path) + writer.write_record_batch(record_batch) + writer.close + + reader = Parquet::ArrowFileReader.new(@file.path) + begin + reader.use_threads = true + assert_equal([ + 1, + Arrow::Table.new(record_batch.schema, [record_batch]), + ], + [ + reader.n_row_groups, + reader.read_table, + ]) + ensure + reader.unref + end + end + + def test_write_table enabled_values = [true, nil, false, true] table = build_table("enabled" => build_boolean_array(enabled_values)) chunk_size = 2 @@ -40,11 +72,11 @@ def test_write reader.use_threads = true assert_equal([ enabled_values.length / chunk_size, - true, + table, ], [ reader.n_row_groups, - table.equal_metadata(reader.read_table, false), + reader.read_table, ]) ensure reader.unref diff --git a/ruby/red-parquet/lib/parquet/arrow-file-writer.rb b/ruby/red-parquet/lib/parquet/arrow-file-writer.rb new file mode 100644 index 0000000000000..137dc518e3f95 --- /dev/null +++ b/ruby/red-parquet/lib/parquet/arrow-file-writer.rb @@ -0,0 +1,98 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +module Parquet + class ArrowFileWriter + # Write data to Apache Parquet. + # + # @return [void] + # + # @overload write(record_batch) + # + # @param record_batch [Arrow::RecordBatch] The record batch to + # be written. + # + # @example Write a record batch + # record_batch = Arrow::RecordBatch.new(enabled: [true, false]) + # schema = record_batch.schema + # Parquet::ArrowFileWriter.open(schema, "data.parquet") do |writer| + # writer.write(record_batch) + # end + # + # @overload write(table, chunk_size: nil) + # + # @param table [Arrow::Table] The table to be written. + # + # @param chunk_size [nil, Integer] (nil) The maximum number of + # rows to write per row group. + # + # If this is `nil`, the default value (`1024 * 1024`) is used. + # + # @example Write a record batch with the default chunk size + # table = Arrow::Table.new(enabled: [true, false]) + # schema = table.schema + # Parquet::ArrowFileWriter.open(schema, "data.parquet") do |writer| + # writer.write(table) + # end + # + # @example Write a record batch with the specified chunk size + # table = Arrow::Table.new(enabled: [true, false]) + # schema = table.schema + # Parquet::ArrowFileWriter.open(schema, "data.parquet") do |writer| + # writer.write(table, chunk_size: 1) + # end + # + # @overload write(raw_records) + # + # @param data [Array, Array] The data to be written + # as primitive Ruby objects. + # + # @example Write a record batch with Array based data + # schema = Arrow::Schema.new(enabled: :boolean) + # raw_records = [ + # [true], + # [false], + # ] + # Parquet::ArrowFileWriter.open(schema, "data.parquet") do |writer| + # writer.write(raw_records) + # end + # + # @example Write a record batch with Array based data + # schema = Arrow::Schema.new(enabled: :boolean) + # raw_columns = [ + # enabled: [true, false], + # ] + # Parquet::ArrowFileWriter.open(schema, "data.parquet") do |writer| + # writer.write(raw_columns) + # end + # + # @since 18.0.0 + def write(target, chunk_size: nil) + case target + when Arrow::RecordBatch + write_record_batch(target) + when Arrow::Table + # Same as parquet::DEFAULT_MAX_ROW_GROUP_LENGTH in C++ + chunk_size ||= 1024 * 1024 + write_table(target, chunk_size) + else + record_batch = Arrow::RecordBatch.new(schema, target) + write_record_batch(record_batch) + end + end + end +end diff --git a/ruby/red-parquet/lib/parquet/loader.rb b/ruby/red-parquet/lib/parquet/loader.rb index 0c20ad2b52a21..018a35ce459eb 100644 --- a/ruby/red-parquet/lib/parquet/loader.rb +++ b/ruby/red-parquet/lib/parquet/loader.rb @@ -30,6 +30,7 @@ def post_load(repository, namespace) def require_libraries require "parquet/arrow-file-reader" + require "parquet/arrow-file-writer" require "parquet/arrow-table-loadable" require "parquet/arrow-table-savable" require "parquet/writer-properties" diff --git a/ruby/red-parquet/test/test-arrow-file-writer.rb b/ruby/red-parquet/test/test-arrow-file-writer.rb new file mode 100644 index 0000000000000..eb8704776c410 --- /dev/null +++ b/ruby/red-parquet/test/test-arrow-file-writer.rb @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +class TestArrowFileWriter < Test::Unit::TestCase + sub_test_case("#write") do + test("RecordBatch") do + schema = Arrow::Schema.new(visible: :boolean) + record_batch = Arrow::RecordBatch.new(schema, [[true], [false]]) + Tempfile.create(["red-parquet", ".parquet"]) do |file| + Parquet::ArrowFileWriter.open(record_batch.schema, file.path) do |writer| + writer.write(record_batch) + end + assert_equal(record_batch.to_table, + Arrow::Table.load(file.path)) + end + end + + test("Table") do + schema = Arrow::Schema.new(visible: :boolean) + table = Arrow::Table.new(schema, [[true], [false]]) + Tempfile.create(["red-parquet", ".parquet"]) do |file| + Parquet::ArrowFileWriter.open(table.schema, file.path) do |writer| + writer.write(table) + end + assert_equal(table, + Arrow::Table.load(file.path)) + end + end + + test("[[]]") do + schema = Arrow::Schema.new(visible: :boolean) + raw_records = [[true], [false]] + Tempfile.create(["red-parquet", ".parquet"]) do |file| + Parquet::ArrowFileWriter.open(schema, file.path) do |writer| + writer.write(raw_records) + end + assert_equal(Arrow::RecordBatch.new(schema, raw_records).to_table, + Arrow::Table.load(file.path)) + end + end + + test("[{}]") do + schema = Arrow::Schema.new(visible: :boolean) + raw_columns = [visible: [true, false]] + Tempfile.create(["red-parquet", ".parquet"]) do |file| + Parquet::ArrowFileWriter.open(schema, file.path) do |writer| + writer.write(raw_columns) + end + assert_equal(Arrow::RecordBatch.new(schema, raw_columns).to_table, + Arrow::Table.load(file.path)) + end + end + end +end