Skip to content

Commit

Permalink
GH-40860: [GLib][Parquet] Add `gparquet_arrow_file_writer_write_recor…
Browse files Browse the repository at this point in the history
…d_batch()` (#44001)

### Rationale for this change

We don't need to create a `GArrowTable` only for writing a `GArrowRecordBatch`.

### What changes are included in this PR?

The following APIs are also added:
* `gparquet_arrow_file_writer_get_schema()`
* Parquet::ArrowFileWriter#write` in Ruby

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.
* GitHub Issue: #40860

Authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
  • Loading branch information
kou authored Sep 9, 2024
1 parent a15956f commit d88dd19
Show file tree
Hide file tree
Showing 6 changed files with 262 additions and 7 deletions.
46 changes: 42 additions & 4 deletions c_glib/parquet-glib/arrow-file-writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,14 +316,13 @@ gparquet_writer_properties_get_data_page_size(GParquetWriterProperties *properti
return parquet_properties->data_pagesize();
}

typedef struct GParquetArrowFileWriterPrivate_
struct GParquetArrowFileWriterPrivate
{
parquet::arrow::FileWriter *arrow_file_writer;
} GParquetArrowFileWriterPrivate;
};

enum {
PROP_0,
PROP_ARROW_FILE_WRITER
PROP_ARROW_FILE_WRITER = 1,
};

G_DEFINE_TYPE_WITH_PRIVATE(GParquetArrowFileWriter,
Expand Down Expand Up @@ -496,6 +495,45 @@ gparquet_arrow_file_writer_new_path(GArrowSchema *schema,
}
}

/**
* gparquet_arrow_file_writer_get_schema:
* @writer: A #GParquetArrowFileWriter.
*
* Returns: (transfer full): The schema to be written to.
*
* Since: 18.0.0
*/
GArrowSchema *
gparquet_arrow_file_writer_get_schema(GParquetArrowFileWriter *writer)
{
auto parquet_arrow_file_writer = gparquet_arrow_file_writer_get_raw(writer);
auto arrow_schema = parquet_arrow_file_writer->schema();
return garrow_schema_new_raw(&arrow_schema);
}

/**
* gparquet_arrow_file_writer_write_record_batch:
* @writer: A #GParquetArrowFileWriter.
* @record_batch: A record batch to be written.
* @error: (nullable): Return location for a #GError or %NULL.
*
* Returns: %TRUE on success, %FALSE if there was an error.
*
* Since: 18.0.0
*/
gboolean
gparquet_arrow_file_writer_write_record_batch(GParquetArrowFileWriter *writer,
GArrowRecordBatch *record_batch,
GError **error)
{
auto parquet_arrow_file_writer = gparquet_arrow_file_writer_get_raw(writer);
auto arrow_record_batch = garrow_record_batch_get_raw(record_batch).get();
auto status = parquet_arrow_file_writer->WriteRecordBatch(*arrow_record_batch);
return garrow_error_check(error,
status,
"[parquet][arrow][file-writer][write-record-batch]");
}

/**
* gparquet_arrow_file_writer_write_table:
* @writer: A #GParquetArrowFileWriter.
Expand Down
10 changes: 10 additions & 0 deletions c_glib/parquet-glib/arrow-file-writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,16 @@ gparquet_arrow_file_writer_new_path(GArrowSchema *schema,
GParquetWriterProperties *writer_properties,
GError **error);

GPARQUET_AVAILABLE_IN_18_0
GArrowSchema *
gparquet_arrow_file_writer_get_schema(GParquetArrowFileWriter *writer);

GPARQUET_AVAILABLE_IN_18_0
gboolean
gparquet_arrow_file_writer_write_record_batch(GParquetArrowFileWriter *writer,
GArrowRecordBatch *record_batch,
GError **error);

GPARQUET_AVAILABLE_IN_0_11
gboolean
gparquet_arrow_file_writer_write_table(GParquetArrowFileWriter *writer,
Expand Down
38 changes: 35 additions & 3 deletions c_glib/test/parquet/test-arrow-file-writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,39 @@ def setup
end
end

def test_write
def test_schema
schema = build_schema("enabled" => :boolean)
writer = Parquet::ArrowFileWriter.new(schema, @file.path)
assert_equal(schema, writer.schema)
writer.close
end

def test_write_record_batch
enabled_values = [true, nil, false, true]
record_batch =
build_record_batch("enabled" => build_boolean_array(enabled_values))

writer = Parquet::ArrowFileWriter.new(record_batch.schema, @file.path)
writer.write_record_batch(record_batch)
writer.close

reader = Parquet::ArrowFileReader.new(@file.path)
begin
reader.use_threads = true
assert_equal([
1,
Arrow::Table.new(record_batch.schema, [record_batch]),
],
[
reader.n_row_groups,
reader.read_table,
])
ensure
reader.unref
end
end

def test_write_table
enabled_values = [true, nil, false, true]
table = build_table("enabled" => build_boolean_array(enabled_values))
chunk_size = 2
Expand All @@ -40,11 +72,11 @@ def test_write
reader.use_threads = true
assert_equal([
enabled_values.length / chunk_size,
true,
table,
],
[
reader.n_row_groups,
table.equal_metadata(reader.read_table, false),
reader.read_table,
])
ensure
reader.unref
Expand Down
98 changes: 98 additions & 0 deletions ruby/red-parquet/lib/parquet/arrow-file-writer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

module Parquet
class ArrowFileWriter
# Write data to Apache Parquet.
#
# @return [void]
#
# @overload write(record_batch)
#
# @param record_batch [Arrow::RecordBatch] The record batch to
# be written.
#
# @example Write a record batch
# record_batch = Arrow::RecordBatch.new(enabled: [true, false])
# schema = record_batch.schema
# Parquet::ArrowFileWriter.open(schema, "data.parquet") do |writer|
# writer.write(record_batch)
# end
#
# @overload write(table, chunk_size: nil)
#
# @param table [Arrow::Table] The table to be written.
#
# @param chunk_size [nil, Integer] (nil) The maximum number of
# rows to write per row group.
#
# If this is `nil`, the default value (`1024 * 1024`) is used.
#
# @example Write a record batch with the default chunk size
# table = Arrow::Table.new(enabled: [true, false])
# schema = table.schema
# Parquet::ArrowFileWriter.open(schema, "data.parquet") do |writer|
# writer.write(table)
# end
#
# @example Write a record batch with the specified chunk size
# table = Arrow::Table.new(enabled: [true, false])
# schema = table.schema
# Parquet::ArrowFileWriter.open(schema, "data.parquet") do |writer|
# writer.write(table, chunk_size: 1)
# end
#
# @overload write(raw_records)
#
# @param data [Array<Hash>, Array<Array>] The data to be written
# as primitive Ruby objects.
#
# @example Write a record batch with Array<Array> based data
# schema = Arrow::Schema.new(enabled: :boolean)
# raw_records = [
# [true],
# [false],
# ]
# Parquet::ArrowFileWriter.open(schema, "data.parquet") do |writer|
# writer.write(raw_records)
# end
#
# @example Write a record batch with Array<Hash> based data
# schema = Arrow::Schema.new(enabled: :boolean)
# raw_columns = [
# enabled: [true, false],
# ]
# Parquet::ArrowFileWriter.open(schema, "data.parquet") do |writer|
# writer.write(raw_columns)
# end
#
# @since 18.0.0
def write(target, chunk_size: nil)
case target
when Arrow::RecordBatch
write_record_batch(target)
when Arrow::Table
# Same as parquet::DEFAULT_MAX_ROW_GROUP_LENGTH in C++
chunk_size ||= 1024 * 1024
write_table(target, chunk_size)
else
record_batch = Arrow::RecordBatch.new(schema, target)
write_record_batch(record_batch)
end
end
end
end
1 change: 1 addition & 0 deletions ruby/red-parquet/lib/parquet/loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def post_load(repository, namespace)

def require_libraries
require "parquet/arrow-file-reader"
require "parquet/arrow-file-writer"
require "parquet/arrow-table-loadable"
require "parquet/arrow-table-savable"
require "parquet/writer-properties"
Expand Down
76 changes: 76 additions & 0 deletions ruby/red-parquet/test/test-arrow-file-writer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

class TestArrowFileWriter < Test::Unit::TestCase
def open_buffer_output_stream
buffer = Arrow::ResizableBuffer.new(4096)
Arrow::BufferOutputStream.open(buffer) do |output|
yield(output)
end
buffer
end

sub_test_case("#write") do
test("RecordBatch") do
schema = Arrow::Schema.new(visible: :boolean)
record_batch = Arrow::RecordBatch.new(schema, [[true], [false]])
buffer = open_buffer_output_stream do |output|
Parquet::ArrowFileWriter.open(record_batch.schema, output) do |writer|
writer.write(record_batch)
end
end
assert_equal(record_batch.to_table,
Arrow::Table.load(buffer, format: :parquet))
end

test("Table") do
schema = Arrow::Schema.new(visible: :boolean)
table = Arrow::Table.new(schema, [[true], [false]])
buffer = open_buffer_output_stream do |output|
Parquet::ArrowFileWriter.open(table.schema, output) do |writer|
writer.write(table)
end
end
assert_equal(table,
Arrow::Table.load(buffer, format: :parquet))
end

test("[[]]") do
schema = Arrow::Schema.new(visible: :boolean)
raw_records = [[true], [false]]
buffer = open_buffer_output_stream do |output|
Parquet::ArrowFileWriter.open(schema, output) do |writer|
writer.write(raw_records)
end
end
assert_equal(Arrow::RecordBatch.new(schema, raw_records).to_table,
Arrow::Table.load(buffer, format: :parquet))
end

test("[{}]") do
schema = Arrow::Schema.new(visible: :boolean)
raw_columns = [visible: [true, false]]
buffer = open_buffer_output_stream do |output|
Parquet::ArrowFileWriter.open(schema, output) do |writer|
writer.write(raw_columns)
end
end
assert_equal(Arrow::RecordBatch.new(schema, raw_columns).to_table,
Arrow::Table.load(buffer, format: :parquet))
end
end
end

0 comments on commit d88dd19

Please sign in to comment.