Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add write_ipc to ExecutionContext #1893

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion datafusion/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! DataFrame API for building and executing query plans.

use crate::arrow::ipc::writer::IpcWriteOptions;
use crate::arrow::record_batch::RecordBatch;
use crate::error::Result;
use crate::logical_plan::{
Expand All @@ -35,7 +36,7 @@ use crate::arrow::util::pretty;
use crate::datasource::TableProvider;
use crate::datasource::TableType;
use crate::execution::context::{ExecutionContext, ExecutionContextState};
use crate::physical_plan::file_format::{plan_to_csv, plan_to_parquet};
use crate::physical_plan::file_format::{plan_to_csv, plan_to_ipc, plan_to_parquet};
use crate::physical_plan::{collect, collect_partitioned};
use crate::physical_plan::{execute_stream, execute_stream_partitioned, ExecutionPlan};
use crate::scalar::ScalarValue;
Expand Down Expand Up @@ -579,6 +580,18 @@ impl DataFrame {
let ctx = ExecutionContext::from(Arc::new(Mutex::new(state)));
plan_to_parquet(&ctx, plan, path, writer_properties).await
}

/// Write a `DataFrame` to a IPC file.
pub async fn write_ipc(
&self,
path: &str,
writer_properties: IpcWriteOptions,
) -> Result<()> {
let plan = self.create_physical_plan().await?;
let state = self.ctx_state.lock().clone();
let ctx = ExecutionContext::from(Arc::new(Mutex::new(state)));
plan_to_ipc(&ctx, plan, path, writer_properties).await
}
}

#[async_trait]
Expand Down
14 changes: 13 additions & 1 deletion datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use std::string::String;
use std::sync::Arc;

use arrow::datatypes::{DataType, SchemaRef};
use arrow::ipc::writer::IpcWriteOptions;

use crate::catalog::{
catalog::{CatalogProvider, MemoryCatalogProvider},
Expand Down Expand Up @@ -77,7 +78,7 @@ use crate::physical_optimizer::repartition::Repartition;

use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::logical_plan::plan::Explain;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_parquet};
use crate::physical_plan::file_format::{plan_to_csv, plan_to_ipc, plan_to_parquet};
use crate::physical_plan::planner::DefaultPhysicalPlanner;
use crate::physical_plan::udaf::AggregateUDF;
use crate::physical_plan::udf::ScalarUDF;
Expand All @@ -88,6 +89,7 @@ use crate::sql::{
planner::{ContextProvider, SqlToRel},
};
use crate::variable::{VarProvider, VarType};

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use parquet::file::properties::WriterProperties;
Expand Down Expand Up @@ -726,6 +728,16 @@ impl ExecutionContext {
plan_to_parquet(self, plan, path, writer_properties).await
}

/// Executes a query and writes the results to an Arrow IPC file.
pub async fn write_ipc(
&self,
plan: Arc<dyn ExecutionPlan>,
path: impl AsRef<str>,
writer_properties: IpcWriteOptions,
) -> Result<()> {
plan_to_ipc(self, plan, path, writer_properties).await
}

/// Optimizes the logical plan by applying optimizer rules, and
/// invoking observer function after each call
fn optimize_internal<F>(
Expand Down
173 changes: 173 additions & 0 deletions datafusion/src/physical_plan/file_format/ipc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Execution plan for reading CSV files

use crate::error::{DataFusionError, Result};
use crate::execution::context::ExecutionContext;
use crate::physical_plan::ExecutionPlan;

use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
use futures::{StreamExt, TryStreamExt};
use std::fs;
use std::path::Path;
use std::sync::Arc;
use tokio::task::{self, JoinHandle};

pub async fn plan_to_ipc(
context: &ExecutionContext,
plan: Arc<dyn ExecutionPlan>,
path: impl AsRef<str>,
writer_options: IpcWriteOptions,
) -> Result<()> {
let path = path.as_ref();
// create directory to contain the CSV files (one per partition)
let fs_path = Path::new(path);
let runtime = context.runtime_env();
match fs::create_dir(fs_path) {
Ok(()) => {
let mut tasks = vec![];
for i in 0..plan.output_partitioning().partition_count() {
let plan = plan.clone();
let filename = format!("part-{}.arrow", i);
let path = fs_path.join(&filename);
let file = fs::File::create(path)?;
let stream = plan.execute(i, runtime.clone()).await?;
let schema = stream.schema();
let mut writer = FileWriter::try_new_with_options(
file,
schema.as_ref(),
writer_options.clone(),
)?;
let handle: JoinHandle<Result<()>> = task::spawn(async move {
stream
.map(|batch| writer.write(&batch?))
.try_collect()
.await
.map_err(DataFusionError::from)
});
tasks.push(handle);
}
futures::future::join_all(tasks).await;
Ok(())
}
Err(e) => Err(DataFusionError::Execution(format!(
"Could not create directory {}: {:?}",
path, e
))),
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::prelude::*;
use arrow::datatypes::*;
use arrow::ipc::writer::IpcWriteOptions;
use std::fs::File;
use std::io::Write;
use tempfile::TempDir;

/// Generate CSV partitions within the supplied directory
fn populate_csv_partitions(
tmp_dir: &TempDir,
partition_count: usize,
file_extension: &str,
) -> Result<SchemaRef> {
// define schema for data source (csv file)
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::UInt32, false),
Field::new("c2", DataType::UInt64, false),
Field::new("c3", DataType::Boolean, false),
]));

// generate a partitioned file
for partition in 0..partition_count {
let filename = format!("partition-{}.{}", partition, file_extension);
let file_path = tmp_dir.path().join(&filename);
let mut file = File::create(file_path)?;

// generate some data
for i in 0..=10 {
let data = format!("{},{},{}\n", partition, i, i % 2 == 0);
file.write_all(data.as_bytes())?;
}
}

Ok(schema)
}

#[tokio::test]
async fn write_ipc_results() -> Result<()> {
// create partitioned input file and context
let tmp_dir = TempDir::new()?;
let mut ctx = ExecutionContext::with_config(
ExecutionConfig::new().with_target_partitions(8),
);

let schema = populate_csv_partitions(&tmp_dir, 8, ".csv")?;

// register csv file with the execution context
ctx.register_csv(
"test",
tmp_dir.path().to_str().unwrap(),
CsvReadOptions::new().schema(&schema),
)
.await?;

// execute a simple query and write the results to CSV
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
let df = ctx.sql("SELECT c1, c2 FROM test").await?;
let opts = IpcWriteOptions::default();

df.write_ipc(&out_dir, opts).await?;

// create a new context and verify that the results were saved to a partitioned csv file
let mut ctx = ExecutionContext::new();

let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::UInt32, false),
Field::new("c2", DataType::UInt64, false),
]));

// register each partition as well as the top level dir
let csv_read_option = CsvReadOptions::new().schema(&schema);
ctx.register_csv(
"part0",
&format!("{}/part-0.arrow", out_dir),
csv_read_option,
)
.await?;
ctx.register_csv("allparts", &out_dir, csv_read_option)
.await?;

let part0 = ctx.sql("SELECT c1, c2 FROM part0").await?.collect().await?;
let allparts = ctx
.sql("SELECT c1, c2 FROM allparts")
.await?
.collect()
.await?;

let allparts_count: usize = allparts.iter().map(|batch| batch.num_rows()).sum();

assert_eq!(part0[0].schema(), allparts[0].schema());

assert_eq!(allparts_count, 80);

Ok(())
}
}
2 changes: 2 additions & 0 deletions datafusion/src/physical_plan/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
mod avro;
mod csv;
mod file_stream;
mod ipc;
mod json;
mod parquet;

Expand All @@ -35,6 +36,7 @@ use arrow::{
pub use avro::AvroExec;
pub(crate) use csv::plan_to_csv;
pub use csv::CsvExec;
pub(crate) use ipc::plan_to_ipc;
pub use json::NdJsonExec;

use crate::error::DataFusionError;
Expand Down