Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow and DataFusion. It is built on an architecture that allows other programming languages (such as Python, C++, and Java) to be supported as first-class citizens without paying a penalty for serialization costs.
The foundational technologies in Ballista are:
- Apache Arrow memory model and compute kernels for efficient processing of data.
- Apache Arrow Flight Protocol for efficient data transfer between processes.
- Google Protocol Buffers for serializing query plans.
- Docker for packaging up executors along with user-defined code.
Ballista can be deployed as a standalone cluster and also supports Kubernetes. In either case, the scheduler can be configured to use etcd as a backing store to (eventually) provide redundancy in the case of a scheduler failing.
This crate is tested with the latest stable version of Rust. We do not currrently test against other, older versions of the Rust compiler.
There are numerous ways to start a Ballista cluster, including support for Docker and Kubernetes. For full documentation, refer to the DataFusion User Guide
A simple way to start a local cluster for testing purposes is to use cargo to install the scheduler and executor crates.
cargo install --locked ballista-scheduler
cargo install --locked ballista-executor
With these crates installed, it is now possible to start a scheduler process.
RUST_LOG=info ballista-scheduler
The scheduler will bind to port 50050 by default.
Next, start an executor processes in a new terminal session with the specified concurrency level.
RUST_LOG=info ballista-executor -c 4
The executor will bind to port 50051 by default. Additional executors can be started by manually specifying a bind port. For example:
RUST_LOG=info ballista-executor --bind-port 50052 -c 4
Ballista provides a BallistaContext
as a starting point for creating queries. DataFrames can be created
by invoking the read_csv
, read_parquet
, and sql
methods.
To build a simple ballista example, add the following dependencies to your Cargo.toml
file:
[dependencies]
ballista = "0.6"
datafusion = "7.0"
tokio = "1.0"
The following example runs a simple aggregate SQL query against a CSV file from the New York Taxi and Limousine Commission data set.
use ballista::prelude::*;
use datafusion::arrow::util::pretty;
use datafusion::prelude::CsvReadOptions;
#[tokio::main]
async fn main() -> Result<()> {
// create configuration
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "4")
.build()?;
// connect to Ballista scheduler
let ctx = BallistaContext::remote("localhost", 50050, &config).await?;
// register csv file with the execution context
ctx.register_csv(
"tripdata",
"/path/to/yellow_tripdata_2020-01.csv",
CsvReadOptions::new(),
).await?;
// execute the query
let df = ctx.sql(
"SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), AVG(fare_amount), SUM(fare_amount)
FROM tripdata
GROUP BY passenger_count
ORDER BY passenger_count",
).await?;
// collect the results and print them to stdout
let results = df.collect().await?;
pretty::print_batches(&results)?;
Ok(())
}
More examples can be found in the arrow-datafusion repository.