Skip to content

Commit

Permalink
v2: Implement transforms against DataFusion DataFrame, drop custom UD…
Browse files Browse the repository at this point in the history
…Fs (#525)

* wip refactor to use DataFusion's DataFrame

* Add aggregate support

* Port additional transforms

* Port additional transforms

* Port window transform

* Port fold transform

* Port impute transform

* Port pivot transform

* port timeunit

* get schema from first batch

* Don't require metadata match

* start porting stack

* finish stack transform port

* Use object-store with DataFusion to load from http

* wip time functions

* parse %Y-%m-%d in UTC like the browser

* Update timeunit transform to use datafusion operations

* remove unused UDFs

* json fallback to reqwest

* Fix timezone parsing

* Fix selection_test

* all custom spec tests passing

* get image_comparison tests passing

* Get all vegafusion-runtime tests passing

* fix

* fix

* remove more udfs

* remove vegafusion-datafusion-udfs, vegafusion-dataframe, and vegafusion-sql crates

* fix tests

* clippy fix

* format

* warnings / format

* python test updates

* Update to datafusion main

* fmt

* re-enable format millis test, fix substr args

* Support Utf8View in json writer

* fix remaining python tests

* fmt

* clippy fix

* fmt

* work around wasm-pack error

* add call to update-pkg.js

* remove some stale comments
  • Loading branch information
jonmmease authored Oct 30, 2024
1 parent 3ec0509 commit 8363810
Show file tree
Hide file tree
Showing 178 changed files with 2,964 additions and 21,658 deletions.
1,136 changes: 278 additions & 858 deletions Cargo.lock

Large diffs are not rendered by default.

43 changes: 27 additions & 16 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ members = [
"vegafusion-common",
"vegafusion-core",
"vegafusion-runtime",
"vegafusion-dataframe",
"vegafusion-datafusion-udfs",
"vegafusion-sql",
"vegafusion-python",
"vegafusion-wasm",
"vegafusion-server",
Expand All @@ -15,14 +12,16 @@ members = [
[workspace.dependencies]
arrow = { version = "53.1.0", default-features = false }

sqlparser = { version = "0.50.0" }
sqlparser = { version = "0.51.0" }
chrono = { version = "0.4.35", default-features = false }
chrono-tz = { version = "0.9.0", features = [
"case-insensitive",
"filter-by-regex",
] }
deterministic-hash = "1.0.1"
reqwest = { version = "0.11.22", default-features = false }
reqwest = { version = "0.12.8", default-features = false }
reqwest-middleware = { version = "0.3" }
reqwest-retry = "0.6"
tokio = { version = "1.36.0" }
pyo3 = { version = "0.22.4" }
pythonize = { version = "0.22" }
Expand All @@ -33,55 +32,67 @@ object_store = { version = "0.11.0" }
lazy_static = { version = "1.5" }
async-trait = "0.1.73"
futures = "0.3.21"
url = "2.3.1"

[workspace.dependencies.serde_json]
version = "1.0.91"
default-features = false

[workspace.dependencies.datafusion]
version = "42.0.0"
git = "https://github.com/apache/datafusion.git"
rev = "b30d12a73fb9867180c2fdf8ddc818b45f957bac"
default-features = false
features = ["parquet", "nested_expressions"]

[workspace.dependencies.datafusion-common]
version = "42.0.0"
git = "https://github.com/apache/datafusion.git"
rev = "b30d12a73fb9867180c2fdf8ddc818b45f957bac"
# no default features

[workspace.dependencies.datafusion-expr]
version = "42.0.0"
git = "https://github.com/apache/datafusion.git"
rev = "b30d12a73fb9867180c2fdf8ddc818b45f957bac"
# no default features

[workspace.dependencies.datafusion-proto]
version = "42.0.0"
git = "https://github.com/apache/datafusion.git"
rev = "b30d12a73fb9867180c2fdf8ddc818b45f957bac"
default-features = false
features = ["parquet"]

[workspace.dependencies.datafusion-proto-common]
version = "42.0.0"
git = "https://github.com/apache/datafusion.git"
rev = "b30d12a73fb9867180c2fdf8ddc818b45f957bac"
default-features = false

[workspace.dependencies.datafusion-physical-expr]
version = "42.0.0"
git = "https://github.com/apache/datafusion.git"
rev = "b30d12a73fb9867180c2fdf8ddc818b45f957bac"
default-features = false

[workspace.dependencies.datafusion-optimizer]
version = "42.0.0"
git = "https://github.com/apache/datafusion.git"
rev = "b30d12a73fb9867180c2fdf8ddc818b45f957bac"
default-features = false

[workspace.dependencies.datafusion-functions]
version = "42.0.0"
git = "https://github.com/apache/datafusion.git"
rev = "b30d12a73fb9867180c2fdf8ddc818b45f957bac"
default-features = false

[workspace.dependencies.datafusion-functions-nested]
version = "42.0.0"
git = "https://github.com/apache/datafusion.git"
rev = "b30d12a73fb9867180c2fdf8ddc818b45f957bac"
# no default features

[workspace.dependencies.datafusion-functions-aggregate]
version = "42.0.0"
git = "https://github.com/apache/datafusion.git"
rev = "b30d12a73fb9867180c2fdf8ddc818b45f957bac"
# no default features

[workspace.dependencies.datafusion-functions-window]
version = "42.0.0"
git = "https://github.com/apache/datafusion.git"
rev = "b30d12a73fb9867180c2fdf8ddc818b45f957bac"
# no default features

# Profile with good speed for local development and testing
Expand Down
3 changes: 0 additions & 3 deletions automation/bump_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ def bump_version(version):
cargo_packages = [
"vegafusion-common",
"vegafusion-core",
"vegafusion-datafusion-udfs",
"vegafusion-dataframe",
"vegafusion-sql",
"vegafusion-runtime",
"vegafusion-python",
"vegafusion-server",
Expand Down
2 changes: 1 addition & 1 deletion pixi.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ build-wasm = { cmd = "cd vegafusion-wasm && npm install && wasm-pack build --rel
"install-wasm-toolchain",
"install-wasm-pack",
] }
pack-wasm = { cmd = "cd vegafusion-wasm && wasm-pack pack", depends_on = [
pack-wasm = { cmd = "cd vegafusion-wasm && wasm-pack pack && node scripts/update-pkg.js", depends_on = [
"build-wasm",
] }

Expand Down
4 changes: 4 additions & 0 deletions vegafusion-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ optional = true
workspace = true
optional = true

[dependencies.url]
workspace = true
optional = true

[dependencies.jni]
version = "0.21.1"
optional = true
Expand Down
4 changes: 4 additions & 0 deletions vegafusion-common/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ pub fn flat_col(col_name: &str) -> Expr {
Expr::Column(Column::from_name(col_name))
}

pub fn relation_col(col_name: &str, relation_name: &str) -> Expr {
Expr::Column(Column::new(Some(relation_name), col_name))
}

pub fn unescaped_col(col_name: &str) -> Expr {
flat_col(&unescape_field(col_name))
}
24 changes: 18 additions & 6 deletions vegafusion-common/src/data/json_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,16 @@
//! [`record_batches_to_json_rows`]:
//!
use std::iter;
use std::{fmt::Debug, io::Write};

use serde_json::map::Map as JsonMap;
use serde_json::Value;

use arrow::array::*;
use arrow::datatypes::*;
use arrow::error::{ArrowError, Result};
use arrow::json::JsonSerializable;
use arrow::record_batch::RecordBatch;
use datafusion_common::cast::as_string_view_array;
use serde_json::map::Map as JsonMap;
use serde_json::Value;
use std::iter;
use std::{fmt::Debug, io::Write};

fn primitive_array_to_json<T>(array: &ArrayRef) -> Result<Vec<Value>>
where
Expand Down Expand Up @@ -273,6 +272,19 @@ fn set_column_for_json_rows(
DataType::LargeUtf8 => {
set_column_by_array_type!(as_largestring_array, col_name, rows, array, row_count);
}
DataType::Utf8View => {
let arr = as_string_view_array(array)?;
rows.iter_mut()
.zip(arr.iter())
.take(row_count)
.for_each(|(row, maybe_value)| {
if let Some(v) = maybe_value {
row.insert(col_name.to_string(), v.into());
} else {
row.insert(col_name.to_string(), Value::Null);
}
});
}
DataType::Date32 => {
// Write as integer UTC milliseconds
let arr = array.as_any().downcast_ref::<Date32Array>().unwrap();
Expand Down
21 changes: 21 additions & 0 deletions vegafusion-common/src/data/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub trait ScalarValueHelpers {
#[cfg(feature = "json")]
fn to_json(&self) -> Result<Value>;

fn to_i32(&self) -> Result<i32>;
fn to_f64(&self) -> Result<f64>;
fn to_f64x2(&self) -> Result<[f64; 2]>;
fn to_scalar_string(&self) -> Result<String>;
Expand Down Expand Up @@ -163,6 +164,26 @@ impl ScalarValueHelpers for ScalarValue {
Ok(res)
}

fn to_i32(&self) -> Result<i32> {
Ok(match self {
ScalarValue::Float32(Some(e)) => *e as i32,
ScalarValue::Float64(Some(e)) => *e as i32,
ScalarValue::Int8(Some(e)) => *e as i32,
ScalarValue::Int16(Some(e)) => *e as i32,
ScalarValue::Int32(Some(e)) => *e,
ScalarValue::Int64(Some(e)) => *e as i32,
ScalarValue::UInt8(Some(e)) => *e as i32,
ScalarValue::UInt16(Some(e)) => *e as i32,
ScalarValue::UInt32(Some(e)) => *e as i32,
ScalarValue::UInt64(Some(e)) => *e as i32,
_ => {
return Err(VegaFusionError::internal(format!(
"Cannot convert {self} to i32"
)))
}
})
}

fn to_f64(&self) -> Result<f64> {
Ok(match self {
ScalarValue::Float32(Some(e)) => *e as f64,
Expand Down
8 changes: 4 additions & 4 deletions vegafusion-common/src/data/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ impl VegaFusionTable {
.map(|f| f.as_ref().clone().with_nullable(true))
.collect();
let schema = Arc::new(Schema::new(schema_fields));
if partitions.iter().all(|batches| {
let batch_schema_fields: Vec<_> = batches
if partitions.iter().all(|batch| {
let batch_schema_fields: Vec<_> = batch
.schema()
.fields
.iter()
.map(|f| f.as_ref().clone().with_nullable(true))
.collect();
let batch_schema = Arc::new(Schema::new(batch_schema_fields));
schema.contains(&batch_schema)
schema.fields.contains(&batch_schema.fields)
}) {
Ok(Self {
schema,
Expand Down Expand Up @@ -605,7 +605,7 @@ fn hash_array_data<H: Hasher>(array_data: &ArrayData, state: &mut H) {
// For nested types (list, struct), recursively hash child arrays
let child_data = array_data.child_data();
for child in child_data {
hash_array_data(&child, state);
hash_array_data(child, state);
}
}

Expand Down
20 changes: 20 additions & 0 deletions vegafusion-common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ use base64::DecodeError as Base64DecodeError;
#[cfg(feature = "object_store")]
use object_store::{path::Error as ObjectStorePathError, Error as ObjectStoreError};

#[cfg(feature = "url")]
use url::ParseError as UrlParseError;

pub type Result<T> = result::Result<T, VegaFusionError>;

#[derive(Clone, Debug, Default)]
Expand Down Expand Up @@ -97,6 +100,10 @@ pub enum VegaFusionError {
#[cfg(feature = "object_store")]
#[error("ObjectStoreError Error: {0}\n{1}")]
ObjectStoreError(ObjectStoreError, ErrorContext),

#[cfg(feature = "url")]
#[error("url::ParseError Error: {0}\n{1}")]
UrlParseError(UrlParseError, ErrorContext),
}

impl VegaFusionError {
Expand Down Expand Up @@ -187,6 +194,11 @@ impl VegaFusionError {
context.contexts.push(context_fn().into());
VegaFusionError::ObjectStoreError(err, context)
}
#[cfg(feature = "url")]
UrlParseError(err, mut context) => {
context.contexts.push(context_fn().into());
VegaFusionError::UrlParseError(err, context)
}
}
}

Expand Down Expand Up @@ -280,6 +292,8 @@ impl VegaFusionError {
ObjectStoreError(err, context) => {
VegaFusionError::ExternalError(err.to_string(), context.clone())
}
#[cfg(feature = "url")]
UrlParseError(err, context) => VegaFusionError::UrlParseError(*err, context.clone()),
}
}
}
Expand Down Expand Up @@ -412,6 +426,12 @@ impl From<ObjectStorePathError> for VegaFusionError {
}
}

#[cfg(feature = "url")]
impl From<UrlParseError> for VegaFusionError {
fn from(err: UrlParseError) -> Self {
Self::UrlParseError(err, Default::default())
}
}
pub trait ToExternalError<T> {
fn external<S: Into<String>>(self, context: S) -> Result<T>;
}
Expand Down
4 changes: 0 additions & 4 deletions vegafusion-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ version = "1.6.9"
[dependencies.datafusion-common]
workspace = true

[dependencies.vegafusion-dataframe]
path = "../vegafusion-dataframe"
version = "1.6.9"

[dependencies.pyo3]
workspace = true
optional = true
Expand Down
4 changes: 0 additions & 4 deletions vegafusion-core/src/data/dataset.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
use crate::error::Result;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use vegafusion_common::data::table::VegaFusionTable;
use vegafusion_dataframe::dataframe::DataFrame;

#[derive(Clone)]
pub enum VegaFusionDataset {
Table { table: VegaFusionTable, hash: u64 },
DataFrame(Arc<dyn DataFrame>),
}

impl VegaFusionDataset {
pub fn fingerprint(&self) -> String {
match self {
VegaFusionDataset::Table { hash, .. } => hash.to_string(),
VegaFusionDataset::DataFrame(df) => df.fingerprint().to_string(),
}
}

Expand Down
8 changes: 4 additions & 4 deletions vegafusion-core/src/runtime/grpc_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl VegaFusionRuntimeTrait for GrpcVegaFusionRuntime {
indices: &[NodeValueIndex],
inline_datasets: &HashMap<String, VegaFusionDataset>,
) -> Result<Vec<NamedTaskValue>> {
let inline_datasets = encode_inline_datasets(&inline_datasets)?;
let inline_datasets = encode_inline_datasets(inline_datasets)?;
let request = QueryRequest {
request: Some(query_request::Request::TaskGraphValues(
TaskGraphValueRequest {
Expand Down Expand Up @@ -82,7 +82,7 @@ impl VegaFusionRuntimeTrait for GrpcVegaFusionRuntime {
inline_datasets: &HashMap<String, VegaFusionDataset>,
options: &PreTransformSpecOpts,
) -> Result<(ChartSpec, Vec<PreTransformSpecWarning>)> {
let inline_datasets = encode_inline_datasets(&inline_datasets)?;
let inline_datasets = encode_inline_datasets(inline_datasets)?;

let request = PreTransformSpecRequest {
spec: serde_json::to_string(spec)?,
Expand Down Expand Up @@ -115,7 +115,7 @@ impl VegaFusionRuntimeTrait for GrpcVegaFusionRuntime {
Vec<PreTransformExtractTable>,
Vec<PreTransformExtractWarning>,
)> {
let inline_datasets = encode_inline_datasets(&inline_datasets)?;
let inline_datasets = encode_inline_datasets(inline_datasets)?;

let request = PreTransformExtractRequest {
spec: serde_json::to_string(spec)?,
Expand Down Expand Up @@ -157,7 +157,7 @@ impl VegaFusionRuntimeTrait for GrpcVegaFusionRuntime {
inline_datasets: &HashMap<String, VegaFusionDataset>,
options: &PreTransformValuesOpts,
) -> Result<(Vec<TaskValue>, Vec<PreTransformValuesWarning>)> {
let inline_datasets = encode_inline_datasets(&inline_datasets)?;
let inline_datasets = encode_inline_datasets(inline_datasets)?;

let request = PreTransformValuesRequest {
spec: serde_json::to_string(spec)?,
Expand Down
Loading

0 comments on commit 8363810

Please sign in to comment.