Skip to content

Commit

Permalink
Optimize Parquet file writing with WriteBatch function (#1028)
Browse files Browse the repository at this point in the history
  • Loading branch information
bmcdonald3 authored Jan 18, 2022
1 parent 5b7c222 commit f68e9e2
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 15 deletions.
48 changes: 33 additions & 15 deletions src/ArrowFunctions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,24 +159,42 @@ int cpp_writeColumnToParquet(const char* filename, void* chpl_arr,
int64_t colnum, const char* dsetname, int64_t numelems,
int64_t rowGroupSize, char** errMsg) {
auto chpl_ptr = (int64_t*)chpl_arr;
arrow::Int64Builder i64builder;
arrow::Status status;
for(int64_t i = 0; i < numelems; i++) {
ARROWSTATUS_OK(i64builder.AppendValues({chpl_ptr[i]}));
using FileClass = ::arrow::io::FileOutputStream;
std::shared_ptr<FileClass> out_file;
PARQUET_ASSIGN_OR_THROW(out_file, FileClass::Open(filename));

// Setup schema of a single int64 column
parquet::schema::NodeVector fields;
fields.push_back(parquet::schema::PrimitiveNode::Make(dsetname, parquet::Repetition::REQUIRED, parquet::Type::INT64, parquet::ConvertedType::NONE));
std::shared_ptr<parquet::schema::GroupNode> schema = std::static_pointer_cast<parquet::schema::GroupNode>
(parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, fields));

// TODO: add conditionals and arguments for writing with Snappy/RLE
parquet::WriterProperties::Builder builder;
std::shared_ptr<parquet::WriterProperties> props = builder.build();

std::shared_ptr<parquet::ParquetFileWriter> file_writer =
parquet::ParquetFileWriter::Open(out_file, schema, props);

int64_t i = 0;
int64_t numLeft = numelems;

while(numLeft > 0) {
parquet::RowGroupWriter* rg_writer = file_writer->AppendRowGroup();
parquet::Int64Writer* int64_writer =
static_cast<parquet::Int64Writer*>(rg_writer->NextColumn());

int64_t batchSize = rowGroupSize;
if(numLeft < rowGroupSize)
batchSize = numLeft;
int64_writer->WriteBatch(batchSize, nullptr, nullptr, &chpl_ptr[i]);
numLeft -= batchSize;
i += batchSize;
}
std::shared_ptr<arrow::Array> i64array;
ARROWSTATUS_OK(i64builder.Finish(&i64array));

std::shared_ptr<arrow::Schema> schema = arrow::schema(
{arrow::field(dsetname, arrow::int64())});
file_writer->Close();
ARROWSTATUS_OK(out_file->Close());

auto table = arrow::Table::Make(schema, {i64array});

std::shared_ptr<arrow::io::FileOutputStream> outfile;
ARROWRESULT_OK(arrow::io::FileOutputStream::Open(filename),
outfile);

ARROWSTATUS_OK(parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, rowGroupSize));
return 0;
}

Expand Down
1 change: 1 addition & 0 deletions src/ArrowFunctions.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>
#include <parquet/column_reader.h>
#include <parquet/api/writer.h>
extern "C" {
#endif

Expand Down

0 comments on commit f68e9e2

Please sign in to comment.