Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2: Implement transforms against DataFusion DataFrame, drop custom UDFs #525

Merged
merged 44 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
bac87e7
wip refactor to use DataFusion's DataFrame
jonmmease Oct 20, 2024
9ca0f3e
Add aggregate support
jonmmease Oct 20, 2024
7317fa7
Port additional transforms
jonmmease Oct 20, 2024
86261d5
Port additional transforms
jonmmease Oct 21, 2024
e5dcc95
Port window transform
jonmmease Oct 21, 2024
94d60cb
Port fold transform
jonmmease Oct 21, 2024
6ec7ed1
Port impute transform
jonmmease Oct 21, 2024
2b6e597
Port pivot transform
jonmmease Oct 21, 2024
a11839a
port timeunit
jonmmease Oct 21, 2024
26f4b38
get schema from first batch
jonmmease Oct 21, 2024
c9a81b1
Don't require metadata match
jonmmease Oct 21, 2024
2a6a49f
start porting stack
jonmmease Oct 21, 2024
1e78bd5
finish stack transform port
jonmmease Oct 22, 2024
cb34571
Use object-store with DataFusion to load from http
jonmmease Oct 23, 2024
446fa83
wip time functions
jonmmease Oct 24, 2024
51a8e00
parse %Y-%m-%d in UTC like the browser
jonmmease Oct 24, 2024
f633095
Update timeunit transform to use datafusion operations
jonmmease Oct 24, 2024
c37f082
remove unused UDFs
jonmmease Oct 24, 2024
87f0a26
json fallback to reqwest
jonmmease Oct 25, 2024
aaf2f6f
Fix timezone parsing
jonmmease Oct 25, 2024
084a1f5
Fix selection_test
jonmmease Oct 25, 2024
18344d8
all custom spec tests passing
jonmmease Oct 25, 2024
31d6e6f
get image_comparison tests passing
jonmmease Oct 26, 2024
349f662
Get all vegafusion-runtime tests passing
jonmmease Oct 26, 2024
9087255
fix
jonmmease Oct 26, 2024
97210c0
fix
jonmmease Oct 26, 2024
b47c1ce
remove more udfs
jonmmease Oct 26, 2024
c4d6538
remove vegafusion-datafusion-udfs, vegafusion-dataframe, and vegafusi…
jonmmease Oct 26, 2024
85f7d6a
fix tests
jonmmease Oct 26, 2024
b2f9da5
clippy fix
jonmmease Oct 26, 2024
b439e99
format
jonmmease Oct 26, 2024
d493a18
warnings / format
jonmmease Oct 26, 2024
dcd8ea4
python test updates
jonmmease Oct 28, 2024
cc93cc4
Update to datafusion main
jonmmease Oct 29, 2024
e20aa13
fmt
jonmmease Oct 29, 2024
8c86818
re-enable format millis test, fix substr args
jonmmease Oct 29, 2024
3820b9a
Support Utf8View in json writer
jonmmease Oct 29, 2024
b68cdfe
fix remaining python tests
jonmmease Oct 29, 2024
a0bbcb1
fmt
jonmmease Oct 30, 2024
9cbe638
clippy fix
jonmmease Oct 30, 2024
fb61ac8
fmt
jonmmease Oct 30, 2024
1617f2d
work around wasm-pack error
jonmmease Oct 30, 2024
e0ddd5e
add call to update-pkg.js
jonmmease Oct 30, 2024
d39ec8a
remove some stale comments
jonmmease Oct 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,136 changes: 278 additions & 858 deletions Cargo.lock

Large diffs are not rendered by default.

43 changes: 27 additions & 16 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ members = [
"vegafusion-common",
"vegafusion-core",
"vegafusion-runtime",
"vegafusion-dataframe",
"vegafusion-datafusion-udfs",
"vegafusion-sql",
"vegafusion-python",
"vegafusion-wasm",
"vegafusion-server",
Expand All @@ -15,14 +12,16 @@ members = [
[workspace.dependencies]
arrow = { version = "53.1.0", default-features = false }

sqlparser = { version = "0.50.0" }
sqlparser = { version = "0.51.0" }
chrono = { version = "0.4.35", default-features = false }
chrono-tz = { version = "0.9.0", features = [
"case-insensitive",
"filter-by-regex",
] }
deterministic-hash = "1.0.1"
reqwest = { version = "0.11.22", default-features = false }
reqwest = { version = "0.12.8", default-features = false }
reqwest-middleware = { version = "0.3" }
reqwest-retry = "0.6"
tokio = { version = "1.36.0" }
pyo3 = { version = "0.22.4" }
pythonize = { version = "0.22" }
Expand All @@ -33,55 +32,67 @@ object_store = { version = "0.11.0" }
lazy_static = { version = "1.5" }
async-trait = "0.1.73"
futures = "0.3.21"
url = "2.3.1"

[workspace.dependencies.serde_json]
version = "1.0.91"
default-features = false

[workspace.dependencies.datafusion]
version = "42.0.0"
git = "https://github.com/apache/datafusion.git"
rev = "b30d12a73fb9867180c2fdf8ddc818b45f957bac"
default-features = false
features = ["parquet", "nested_expressions"]

[workspace.dependencies.datafusion-common]
version = "42.0.0"
git = "https://github.com/apache/datafusion.git"
rev = "b30d12a73fb9867180c2fdf8ddc818b45f957bac"
# no default features

[workspace.dependencies.datafusion-expr]
version = "42.0.0"
git = "https://github.com/apache/datafusion.git"
rev = "b30d12a73fb9867180c2fdf8ddc818b45f957bac"
# no default features

[workspace.dependencies.datafusion-proto]
version = "42.0.0"
git = "https://github.com/apache/datafusion.git"
rev = "b30d12a73fb9867180c2fdf8ddc818b45f957bac"
default-features = false
features = ["parquet"]

[workspace.dependencies.datafusion-proto-common]
version = "42.0.0"
git = "https://github.com/apache/datafusion.git"
rev = "b30d12a73fb9867180c2fdf8ddc818b45f957bac"
default-features = false

[workspace.dependencies.datafusion-physical-expr]
version = "42.0.0"
git = "https://github.com/apache/datafusion.git"
rev = "b30d12a73fb9867180c2fdf8ddc818b45f957bac"
default-features = false

[workspace.dependencies.datafusion-optimizer]
version = "42.0.0"
git = "https://github.com/apache/datafusion.git"
rev = "b30d12a73fb9867180c2fdf8ddc818b45f957bac"
default-features = false

[workspace.dependencies.datafusion-functions]
version = "42.0.0"
git = "https://github.com/apache/datafusion.git"
rev = "b30d12a73fb9867180c2fdf8ddc818b45f957bac"
default-features = false

[workspace.dependencies.datafusion-functions-nested]
version = "42.0.0"
git = "https://github.com/apache/datafusion.git"
rev = "b30d12a73fb9867180c2fdf8ddc818b45f957bac"
# no default features

[workspace.dependencies.datafusion-functions-aggregate]
version = "42.0.0"
git = "https://github.com/apache/datafusion.git"
rev = "b30d12a73fb9867180c2fdf8ddc818b45f957bac"
# no default features

[workspace.dependencies.datafusion-functions-window]
version = "42.0.0"
git = "https://github.com/apache/datafusion.git"
rev = "b30d12a73fb9867180c2fdf8ddc818b45f957bac"
# no default features

# Profile with good speed for local development and testing
Expand Down
3 changes: 0 additions & 3 deletions automation/bump_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ def bump_version(version):
cargo_packages = [
"vegafusion-common",
"vegafusion-core",
"vegafusion-datafusion-udfs",
"vegafusion-dataframe",
"vegafusion-sql",
"vegafusion-runtime",
"vegafusion-python",
"vegafusion-server",
Expand Down
4 changes: 4 additions & 0 deletions vegafusion-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ optional = true
workspace = true
optional = true

[dependencies.url]
workspace = true
optional = true

[dependencies.jni]
version = "0.21.1"
optional = true
Expand Down
4 changes: 4 additions & 0 deletions vegafusion-common/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ pub fn flat_col(col_name: &str) -> Expr {
Expr::Column(Column::from_name(col_name))
}

pub fn relation_col(col_name: &str, relation_name: &str) -> Expr {
Expr::Column(Column::new(Some(relation_name), col_name))
}

pub fn unescaped_col(col_name: &str) -> Expr {
flat_col(&unescape_field(col_name))
}
24 changes: 18 additions & 6 deletions vegafusion-common/src/data/json_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,16 @@
//! [`record_batches_to_json_rows`]:
//!

use std::iter;
use std::{fmt::Debug, io::Write};

use serde_json::map::Map as JsonMap;
use serde_json::Value;

use arrow::array::*;
use arrow::datatypes::*;
use arrow::error::{ArrowError, Result};
use arrow::json::JsonSerializable;
use arrow::record_batch::RecordBatch;
use datafusion_common::cast::as_string_view_array;
use serde_json::map::Map as JsonMap;
use serde_json::Value;
use std::iter;
use std::{fmt::Debug, io::Write};

fn primitive_array_to_json<T>(array: &ArrayRef) -> Result<Vec<Value>>
where
Expand Down Expand Up @@ -273,6 +272,19 @@ fn set_column_for_json_rows(
DataType::LargeUtf8 => {
set_column_by_array_type!(as_largestring_array, col_name, rows, array, row_count);
}
DataType::Utf8View => {
let arr = as_string_view_array(array)?;
rows.iter_mut()
.zip(arr.iter())
.take(row_count)
.for_each(|(row, maybe_value)| {
if let Some(v) = maybe_value {
row.insert(col_name.to_string(), v.into());
} else {
row.insert(col_name.to_string(), Value::Null);
}
});
}
DataType::Date32 => {
// Write as integer UTC milliseconds
let arr = array.as_any().downcast_ref::<Date32Array>().unwrap();
Expand Down
21 changes: 21 additions & 0 deletions vegafusion-common/src/data/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub trait ScalarValueHelpers {
#[cfg(feature = "json")]
fn to_json(&self) -> Result<Value>;

fn to_i32(&self) -> Result<i32>;
fn to_f64(&self) -> Result<f64>;
fn to_f64x2(&self) -> Result<[f64; 2]>;
fn to_scalar_string(&self) -> Result<String>;
Expand Down Expand Up @@ -163,6 +164,26 @@ impl ScalarValueHelpers for ScalarValue {
Ok(res)
}

fn to_i32(&self) -> Result<i32> {
Ok(match self {
ScalarValue::Float32(Some(e)) => *e as i32,
ScalarValue::Float64(Some(e)) => *e as i32,
ScalarValue::Int8(Some(e)) => *e as i32,
ScalarValue::Int16(Some(e)) => *e as i32,
ScalarValue::Int32(Some(e)) => *e,
ScalarValue::Int64(Some(e)) => *e as i32,
ScalarValue::UInt8(Some(e)) => *e as i32,
ScalarValue::UInt16(Some(e)) => *e as i32,
ScalarValue::UInt32(Some(e)) => *e as i32,
ScalarValue::UInt64(Some(e)) => *e as i32,
_ => {
return Err(VegaFusionError::internal(format!(
"Cannot convert {self} to i32"
)))
}
})
}

fn to_f64(&self) -> Result<f64> {
Ok(match self {
ScalarValue::Float32(Some(e)) => *e as f64,
Expand Down
8 changes: 4 additions & 4 deletions vegafusion-common/src/data/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ impl VegaFusionTable {
.map(|f| f.as_ref().clone().with_nullable(true))
.collect();
let schema = Arc::new(Schema::new(schema_fields));
if partitions.iter().all(|batches| {
let batch_schema_fields: Vec<_> = batches
if partitions.iter().all(|batch| {
let batch_schema_fields: Vec<_> = batch
.schema()
.fields
.iter()
.map(|f| f.as_ref().clone().with_nullable(true))
.collect();
let batch_schema = Arc::new(Schema::new(batch_schema_fields));
schema.contains(&batch_schema)
schema.fields.contains(&batch_schema.fields)
}) {
Ok(Self {
schema,
Expand Down Expand Up @@ -605,7 +605,7 @@ fn hash_array_data<H: Hasher>(array_data: &ArrayData, state: &mut H) {
// For nested types (list, struct), recursively hash child arrays
let child_data = array_data.child_data();
for child in child_data {
hash_array_data(&child, state);
hash_array_data(child, state);
}
}

Expand Down
20 changes: 20 additions & 0 deletions vegafusion-common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ use base64::DecodeError as Base64DecodeError;
#[cfg(feature = "object_store")]
use object_store::{path::Error as ObjectStorePathError, Error as ObjectStoreError};

#[cfg(feature = "url")]
use url::ParseError as UrlParseError;

pub type Result<T> = result::Result<T, VegaFusionError>;

#[derive(Clone, Debug, Default)]
Expand Down Expand Up @@ -97,6 +100,10 @@ pub enum VegaFusionError {
#[cfg(feature = "object_store")]
#[error("ObjectStoreError Error: {0}\n{1}")]
ObjectStoreError(ObjectStoreError, ErrorContext),

#[cfg(feature = "url")]
#[error("url::ParseError Error: {0}\n{1}")]
UrlParseError(UrlParseError, ErrorContext),
}

impl VegaFusionError {
Expand Down Expand Up @@ -187,6 +194,11 @@ impl VegaFusionError {
context.contexts.push(context_fn().into());
VegaFusionError::ObjectStoreError(err, context)
}
#[cfg(feature = "url")]
UrlParseError(err, mut context) => {
context.contexts.push(context_fn().into());
VegaFusionError::UrlParseError(err, context)
}
}
}

Expand Down Expand Up @@ -280,6 +292,8 @@ impl VegaFusionError {
ObjectStoreError(err, context) => {
VegaFusionError::ExternalError(err.to_string(), context.clone())
}
#[cfg(feature = "url")]
UrlParseError(err, context) => VegaFusionError::UrlParseError(*err, context.clone()),
}
}
}
Expand Down Expand Up @@ -412,6 +426,12 @@ impl From<ObjectStorePathError> for VegaFusionError {
}
}

#[cfg(feature = "url")]
impl From<UrlParseError> for VegaFusionError {
fn from(err: UrlParseError) -> Self {
Self::UrlParseError(err, Default::default())
}
}
pub trait ToExternalError<T> {
fn external<S: Into<String>>(self, context: S) -> Result<T>;
}
Expand Down
4 changes: 0 additions & 4 deletions vegafusion-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ version = "1.6.9"
[dependencies.datafusion-common]
workspace = true

[dependencies.vegafusion-dataframe]
path = "../vegafusion-dataframe"
version = "1.6.9"

[dependencies.pyo3]
workspace = true
optional = true
Expand Down
4 changes: 0 additions & 4 deletions vegafusion-core/src/data/dataset.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
use crate::error::Result;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use vegafusion_common::data::table::VegaFusionTable;
use vegafusion_dataframe::dataframe::DataFrame;

#[derive(Clone)]
pub enum VegaFusionDataset {
Table { table: VegaFusionTable, hash: u64 },
DataFrame(Arc<dyn DataFrame>),
}

impl VegaFusionDataset {
pub fn fingerprint(&self) -> String {
match self {
VegaFusionDataset::Table { hash, .. } => hash.to_string(),
VegaFusionDataset::DataFrame(df) => df.fingerprint().to_string(),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping this VegaFusionDataset wrapper to hold the fingerprint, and to leave open possibility to add other plans types (e.g. DataFusion or substrait plans).

}
}

Expand Down
8 changes: 4 additions & 4 deletions vegafusion-core/src/runtime/grpc_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl VegaFusionRuntimeTrait for GrpcVegaFusionRuntime {
indices: &[NodeValueIndex],
inline_datasets: &HashMap<String, VegaFusionDataset>,
) -> Result<Vec<NamedTaskValue>> {
let inline_datasets = encode_inline_datasets(&inline_datasets)?;
let inline_datasets = encode_inline_datasets(inline_datasets)?;
let request = QueryRequest {
request: Some(query_request::Request::TaskGraphValues(
TaskGraphValueRequest {
Expand Down Expand Up @@ -82,7 +82,7 @@ impl VegaFusionRuntimeTrait for GrpcVegaFusionRuntime {
inline_datasets: &HashMap<String, VegaFusionDataset>,
options: &PreTransformSpecOpts,
) -> Result<(ChartSpec, Vec<PreTransformSpecWarning>)> {
let inline_datasets = encode_inline_datasets(&inline_datasets)?;
let inline_datasets = encode_inline_datasets(inline_datasets)?;

let request = PreTransformSpecRequest {
spec: serde_json::to_string(spec)?,
Expand Down Expand Up @@ -115,7 +115,7 @@ impl VegaFusionRuntimeTrait for GrpcVegaFusionRuntime {
Vec<PreTransformExtractTable>,
Vec<PreTransformExtractWarning>,
)> {
let inline_datasets = encode_inline_datasets(&inline_datasets)?;
let inline_datasets = encode_inline_datasets(inline_datasets)?;

let request = PreTransformExtractRequest {
spec: serde_json::to_string(spec)?,
Expand Down Expand Up @@ -157,7 +157,7 @@ impl VegaFusionRuntimeTrait for GrpcVegaFusionRuntime {
inline_datasets: &HashMap<String, VegaFusionDataset>,
options: &PreTransformValuesOpts,
) -> Result<(Vec<TaskValue>, Vec<PreTransformValuesWarning>)> {
let inline_datasets = encode_inline_datasets(&inline_datasets)?;
let inline_datasets = encode_inline_datasets(inline_datasets)?;

let request = PreTransformValuesRequest {
spec: serde_json::to_string(spec)?,
Expand Down
10 changes: 3 additions & 7 deletions vegafusion-core/src/runtime/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ pub trait VegaFusionRuntimeTrait: Send + Sync {
)
.await?;

apply_pre_transform_datasets(input_spec, &plan, init, options.row_limit.map(|l| l as u32))
apply_pre_transform_datasets(input_spec, &plan, init, options.row_limit)
}

async fn pre_transform_extract(
Expand Down Expand Up @@ -398,13 +398,9 @@ pub fn encode_inline_datasets(
datasets: &HashMap<String, VegaFusionDataset>,
) -> Result<Vec<InlineDataset>> {
datasets
.into_iter()
.iter()
.map(|(name, dataset)| {
let VegaFusionDataset::Table { table, hash: _ } = dataset else {
return Err(VegaFusionError::internal(
"grpc runtime suppors Arrow tables only, not general Datasets".to_string(),
));
};
let VegaFusionDataset::Table { table, hash: _ } = dataset;
Ok(InlineDataset {
name: name.clone(),
table: table.to_ipc_bytes()?,
Expand Down
Loading
Loading