Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cli arg to write output to file #260

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ metrics-exporter-prometheus = {version = "0.16.0", optional = true }
num_cpus = "1.16.0"
object_store = { version = "0.11.0", features = ["aws"], optional = true }
parking_lot = "0.12.3"
parquet = "53.0.0"
pin-project-lite = {version = "0.2.14" }
prost = "0.13.1"
ratatui = "0.28.0"
Expand Down
7 changes: 7 additions & 0 deletions src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ pub struct DftArgs {
#[cfg(any(feature = "flightsql", feature = "experimental-flightsql-server"))]
#[clap(long, help = "Set the host and port to be used for FlightSQL")]
pub flightsql_host: Option<String>,

#[clap(
long,
short,
help = "Path to save output to. Type is inferred from file suffix"
)]
pub output: Option<PathBuf>,
}

impl DftArgs {
Expand Down
110 changes: 107 additions & 3 deletions src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@ use crate::args::DftArgs;
use crate::execution::{local_benchmarks::LocalBenchmarkStats, AppExecution};
use color_eyre::eyre::eyre;
use color_eyre::Result;
use datafusion::arrow::array::RecordBatch;
use datafusion::arrow::array::{RecordBatch, RecordBatchWriter};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::util::pretty::pretty_format_batches;
use datafusion::arrow::{csv, json};
use datafusion::sql::parser::DFParser;
use futures::{Stream, StreamExt};
use log::info;
use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
use std::error::Error;
use std::fs::File;
use std::io::Write;
use std::path::{Path, PathBuf};
#[cfg(feature = "flightsql")]
Expand Down Expand Up @@ -53,6 +57,19 @@ impl CliApp {
}
}

fn validate_args(&self) -> color_eyre::Result<()> {
let more_than_one_command_or_file = (self.args.commands.len() > 1
|| self.args.files.len() > 1)
&& self.args.output.is_some();
if more_than_one_command_or_file {
return Err(eyre!(
"Output can only be saved for a single file or command"
));
}

Ok(())
}

/// Execute the provided sql, which was passed as an argument from CLI.
///
/// Optionally, use the FlightSQL client for execution.
Expand All @@ -61,6 +78,8 @@ impl CliApp {
self.app_execution.execution_ctx().execute_ddl().await;
}

self.validate_args()?;

#[cfg(not(feature = "flightsql"))]
match (
self.args.files.is_empty(),
Expand Down Expand Up @@ -122,6 +141,8 @@ impl CliApp {
// Execution cases
(true, false, false, false, false) => self.execute_commands(&self.args.commands).await,
(false, true, false, false, false) => self.execute_files(&self.args.files).await,

// FlightSQL execution cases
(false, true, true, false, false) => {
self.flightsql_execute_files(&self.args.files).await
}
Expand Down Expand Up @@ -240,7 +261,9 @@ impl CliApp {
for endpoint in flight_info.endpoint {
if let Some(ticket) = endpoint.ticket {
let stream = client.do_get(ticket.into_request()).await?;
if let Some(start) = start {
if let Some(output_path) = &self.args.output {
self.output_stream(stream, output_path).await?
} else if let Some(start) = start {
self.exec_stream(stream).await;
let elapsed = start.elapsed();
println!("Query {i} executed in {:?}", elapsed);
Expand Down Expand Up @@ -371,7 +394,9 @@ impl CliApp {
.execution_ctx()
.execute_statement(statement)
.await?;
if let Some(start) = start {
if let Some(output_path) = &self.args.output {
self.output_stream(stream, output_path).await?;
} else if let Some(start) = start {
self.exec_stream(stream).await;
let elapsed = start.elapsed();
println!("Query {i} executed in {:?}", elapsed);
Expand Down Expand Up @@ -460,4 +485,83 @@ impl CliApp {
}
}
}

async fn output_stream<S, E>(&self, mut stream: S, path: &Path) -> Result<()>
where
S: Stream<Item = Result<RecordBatch, E>> + Unpin,
E: Error,
{
// We get the schema from the first batch and use that for creating the writer
if let Some(Ok(first_batch)) = stream.next().await {
let schema = first_batch.schema();
let mut writer = path_to_writer(path, schema)?;
writer.write(&first_batch)?;

while let Some(maybe_batch) = stream.next().await {
match maybe_batch {
Ok(batch) => writer.write(&batch)?,
Err(e) => return Err(eyre!("Error executing SQL: {e}")),
}
}
writer.close()?;
}

Ok(())
}
}

/// We use an Enum for this because of limitations with using trait objects and the `close` method
/// on a writer taking `self` as an argument which requires a size for the trait object which is
/// not known at compile time.
#[allow(clippy::large_enum_variant)]
enum AnyWriter {
Csv(csv::writer::Writer<File>),
Json(json::writer::LineDelimitedWriter<File>),
Parquet(ArrowWriter<File>),
}

impl AnyWriter {
fn write(&mut self, batch: &RecordBatch) -> Result<()> {
match self {
AnyWriter::Csv(w) => Ok(w.write(batch)?),
AnyWriter::Json(w) => Ok(w.write(batch)?),
AnyWriter::Parquet(w) => Ok(w.write(batch)?),
}
}

fn close(self) -> Result<()> {
match self {
AnyWriter::Csv(w) => Ok(w.close()?),
AnyWriter::Json(w) => Ok(w.close()?),
AnyWriter::Parquet(w) => {
w.close()?;
Ok(())
}
}
}
}

fn path_to_writer(path: &Path, schema: SchemaRef) -> Result<AnyWriter> {
if let Some(extension) = path.extension() {
if let Some(e) = extension.to_ascii_lowercase().to_str() {
let file = std::fs::File::create(path)?;
return match e {
"csv" => Ok(AnyWriter::Csv(csv::writer::Writer::new(file))),
"json" => Ok(AnyWriter::Json(json::writer::LineDelimitedWriter::new(
file,
))),
"parquet" => {
let props = WriterProperties::default();
let writer = ArrowWriter::try_new(file, schema, Some(props))?;
Ok(AnyWriter::Parquet(writer))
}
_ => {
return Err(eyre!(
"Only 'csv', 'parquet', and 'json' file types can be output"
))
}
};
}
}
Err(eyre!("Unable to parse extension"))
}
99 changes: 98 additions & 1 deletion tests/cli_cases/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Tests for the CLI (e.g. run from files)

use assert_cmd::Command;
use std::path::PathBuf;
use std::{io::Read, path::PathBuf};

use super::{assert_output_contains, contains_str, sql_in_file};

Expand Down Expand Up @@ -393,3 +393,100 @@ fn test_query_non_existent_local_file() {
.assert()
.failure();
}

#[test]
fn test_more_than_one_command_with_output() {
let sql = "SELECT 1".to_string();
let assert = Command::cargo_bin("dft")
.unwrap()
.arg("-c")
.arg(sql.clone())
.arg("-c")
.arg(sql)
.arg("-o")
.arg("test.csv")
.assert()
.failure();
let expected = "Error: Output can only be saved for a single file or command";
assert.stderr(contains_str(expected));
}

#[test]
fn test_output_csv() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.csv");

let sql = "SELECT 1".to_string();
Command::cargo_bin("dft")
.unwrap()
.arg("-c")
.arg(sql.clone())
.arg("-o")
.arg(path.clone())
.assert()
.success();

let mut file = std::fs::File::open(path).unwrap();
let mut buffer = String::new();
file.read_to_string(&mut buffer).unwrap();

let expected = "Int64(1)\n1\n";
assert_eq!(buffer, expected);
}

#[test]
fn test_output_json() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.json");

let sql = "SELECT 1".to_string();
Command::cargo_bin("dft")
.unwrap()
.arg("-c")
.arg(sql.clone())
.arg("-o")
.arg(path.clone())
.assert()
.success();

let mut file = std::fs::File::open(path).unwrap();
let mut buffer = String::new();
file.read_to_string(&mut buffer).unwrap();

let expected = "{\"Int64(1)\":1}\n";
assert_eq!(buffer, expected);
}

#[test]
fn test_output_parquet() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.parquet");

let sql = "SELECT 1".to_string();
Command::cargo_bin("dft")
.unwrap()
.arg("-c")
.arg(sql.clone())
.arg("-o")
.arg(path.clone())
.assert()
.success();

let read_sql = format!("SELECT * FROM '{}'", path.to_str().unwrap());

let assert = Command::cargo_bin("dft")
.unwrap()
.arg("-c")
.arg(read_sql)
.assert()
.success();

let expected = r#"
+----------+
| Int64(1) |
+----------+
| 1 |
+----------+"#;

assert.stdout(contains_str(expected));
}
Loading
Loading