Skip to content

Commit

Permalink
Upgrade to DataFusion 43/Arrow 53.2 (#790)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde authored Nov 22, 2024
1 parent 4d57354 commit a9ac8f7
Show file tree
Hide file tree
Showing 68 changed files with 1,704 additions and 1,252 deletions.
1,947 changes: 1,141 additions & 806 deletions Cargo.lock

Large diffs are not rendered by default.

81 changes: 47 additions & 34 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,31 +32,37 @@ members = [
resolver = "2"

[workspace.dependencies]
tonic = { version = "0.11", features = ["zstd"] }
tonic-build = { version = "0.11" }
tonic-web = { version = "0.11" }
tonic-reflection = { version = "0.11" }
arrow = { version = "=52.1.0" }
arrow-ord = { version = "=52.1.0" }
arrow-array = { version = "=52.1.0" }
arrow-schema = { version = "=52.1.0" }
arrow-json = { version = "=52.1.0" }
object_store = { version = "0.10" }
parquet = { version = "=52.1.0" }
tonic = { version = "0.13", features = ["zstd"] }
tonic-build = { version = "0.12" }
tonic-web = { version = "0.12" }
tonic-reflection = { version = "0.12" }
tower = { version = "0.5" }
tower-http = {version = "0.6"}
axum = { version = "0.7" }
utoipa = { version = "4" }

arrow = { version = "=53.2.0" }
arrow-ord = { version = "=53.2.0" }
arrow-array = { version = "=53.2.0" }
arrow-schema = { version = "=53.2.0" }
arrow-json = { version = "=53.2.0" }
object_store = { version = "0.11" }
parquet = { version = "=53.2.0" }
ahash = { version = "=0.8.7" }
datafusion = { version = "40.0.0" }
datafusion-common = { version = "40.0.0" }
datafusion-proto = { version = "40.0.0" }
datafusion-functions = { version = "40.0.0", features = ["crypto_expressions"] }
datafusion-functions-json = { version = "0.40.0" }
deltalake = { version = "0.18.2" }
datafusion = { version = "43.0.0" }
datafusion-common = { version = "43.0.0" }
datafusion-proto = { version = "43.0.0" }
datafusion-functions = { version = "43.0.0", features = ["crypto_expressions"] }
datafusion-functions-window = {version = "43.0.0"}
datafusion-functions-json = { version = "0.43.0" }
deltalake = { version = "0.22.0" }
cornucopia = { version = "0.9.0" }
cornucopia_async = {version = "0.6.0"}
deadpool-postgres = "0.12"
prost = { version = "0.12", features = ["no-recursion-limit"] }
prost-reflect = "0.12.0"
prost-build = {version = "0.12" }
prost-types = "0.12"
deadpool-postgres = "0.14"
prost = { version = "0.13", features = ["no-recursion-limit"] }
prost-reflect = "0.14.0"
prost-build = {version = "0.13" }
prost-types = "0.13"
aws-config = "1.5.6"
reqwest = "0.12"

Expand All @@ -68,19 +74,26 @@ split-debuginfo = "unpacked"


[patch.crates-io]
deltalake = { git = 'https://github.com/delta-io/delta-rs', rev = 'e75a0b49b40f35ed361444bbea0e5720f359d732' }
deltalake = { git = 'https://github.com/delta-io/delta-rs', rev = '25ce38956e25722ba7a6cbc0f5a7dba6b3361554' }
typify = { git = 'https://github.com/ArroyoSystems/typify.git', branch = 'arroyo' }
parquet = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '52.1.0/parquet_bytes'}
arrow-json = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '52.1.0/json'}
datafusion = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-common = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-execution = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-physical-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-physical-plan = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-proto = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-functions = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
object_store = { git = 'http://github.com/ArroyoSystems/arrow-rs', branch = 'object_store_0.10.2/arroyo' }
parquet = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '53.2.0/parquet_bytes'}
arrow-json = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '53.2.0/json'}
datafusion = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '43.0.0/arroyo'}
datafusion-common = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '43.0.0/arroyo'}
datafusion-execution = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '43.0.0/arroyo'}
datafusion-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '43.0.0/arroyo'}
datafusion-physical-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '43.0.0/arroyo'}
datafusion-physical-plan = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '43.0.0/arroyo'}
datafusion-proto = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '43.0.0/arroyo'}
datafusion-functions = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '43.0.0/arroyo'}
datafusion-functions-window = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '43.0.0/arroyo'}

datafusion-functions-json = {git = 'https://github.com/ArroyoSystems/datafusion-functions-json', branch = 'datafusion_43'}

object_store = { git = 'http://github.com/ArroyoSystems/arrow-rs', branch = 'object_store_0.11.1/arroyo' }

cornucopia_async = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" }
cornucopia = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" }

# needed until 0.13 is released to get tower 0.5 upgrade
tonic = { git = "https://github.com/hyperium/tonic", rev = "b80428ba7dfcc607dfaac07022c360159ee1dc79" }
25 changes: 12 additions & 13 deletions crates/arroyo-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ tonic = { workspace = true }
tonic-reflection = { workspace = true }
tonic-web = { workspace = true }
prost = {workspace = true}
prost-reflect = "0.12.0"
prost-reflect = {workspace = true}
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1.12"
tower = "0.4"
tower = {workspace = true}
rand = "0.8"
rand_chacha = "0.3"
async-trait = "0.1"
Expand All @@ -40,14 +40,14 @@ arrow-schema = {workspace = true, features = ["serde"]}
bincode = { version = "2.0.0-rc.3", features = ["serde"]}
petgraph = {version = "0.6", features = ["serde-1"]}

http = "0.2"
tower-http = {version = "0.4", features = ["trace", "fs", "cors", "validate-request", "auth", "compression-zstd"]}
axum = {version = "0.6.20", features = ["headers", "tokio", "macros"]}
axum-extra = "0.7.4"
h2 = "0.3.26"
http = "1"
tower-http = { workspace = true, features = ["trace", "fs", "cors", "validate-request", "auth", "compression-zstd"]}
axum = {workspace = true, features = ["tokio", "macros"]}
axum-extra = { version = "0.9", features = ["typed-header"] }
h2 = "0.4"
thiserror = "1.0.40"
utoipa = "4"
utoipa-swagger-ui = { version = "4", features = ["axum"] }
utoipa = { workspace = true }
utoipa-swagger-ui = { version = "7", features = ["axum"] }

serde = { version = "1", features = ["derive"] }
serde_json = "1"
Expand All @@ -63,7 +63,6 @@ typify = "0.0.13"
schemars = "0.8"

# metric querying
prometheus-http-query = "0.6.5"
reqwest = {workspace = true}
base64 = '0.21'

Expand All @@ -87,16 +86,16 @@ time = "0.3"
cornucopia_async = { workspace = true, features = ["with-serde_json-1"]}
jwt-simple = "0.11.4"
uuid = "1.3.3"
regress = "0.6.0"
regress = "0.10"
apache-avro = "0.16.0"
toml = "0.8"
rust-embed = { version = "6.8.1", features = ["axum"] }
rust-embed = { version = "8", features = ["axum"] }
mime_guess = "2.0.4"

[build-dependencies]
cornucopia = { workspace = true }
postgres = "0.19.5"
arroyo-types = { path = "../arroyo-types" }
utoipa = "3"
utoipa = { workspace = true }
rusqlite = "0.31.0"
refinery = { version = "0.8.14", features = ["rusqlite"] }
5 changes: 3 additions & 2 deletions crates/arroyo-api/src/cloud.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{rest_utils::ErrorResp, AuthData, OrgMetadata};
use axum::headers::authorization::{Authorization, Bearer};
use axum::TypedHeader;
use axum_extra::headers::authorization::Bearer;
use axum_extra::headers::Authorization;
use axum_extra::TypedHeader;
use cornucopia_async::Database;

pub(crate) async fn authenticate(
Expand Down
15 changes: 7 additions & 8 deletions crates/arroyo-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use axum::Json;
use cornucopia_async::DatabaseSource;
use http::StatusCode;
use serde::{Deserialize, Serialize};
use std::net::{SocketAddr, TcpListener};
use std::net::SocketAddr;
use time::OffsetDateTime;
use tokio::net::TcpListener;
use tonic::transport::Channel;
use tower_http::compression::predicate::NotForContentType;
use tower_http::compression::{CompressionLayer, DefaultPredicate, Predicate};
Expand Down Expand Up @@ -121,10 +122,10 @@ pub async fn compiler_service() -> Result<CompilerGrpcClient<Channel>, ErrorResp
})
}

pub fn start_server(database: DatabaseSource, guard: ShutdownGuard) -> anyhow::Result<u16> {
pub async fn start_server(database: DatabaseSource, guard: ShutdownGuard) -> anyhow::Result<u16> {
let config = config();
let addr = SocketAddr::new(config.api.bind_address, config.api.http_port);
let listener = TcpListener::bind(addr)?;
let listener = TcpListener::bind(addr).await?;
let local_addr = listener.local_addr()?;

let app = rest::create_rest_app(database, &config.controller_endpoint()).layer(
Expand All @@ -137,11 +138,9 @@ pub fn start_server(database: DatabaseSource, guard: ShutdownGuard) -> anyhow::R
);

info!("Starting API server on {:?}", local_addr);
guard.into_spawn_task(wrap_start(
"api",
local_addr,
axum::Server::from_tcp(listener)?.serve(app.into_make_service()),
));
guard.into_spawn_task(wrap_start("api", local_addr, async {
axum::serve(listener, app.into_make_service()).await
}));

Ok(local_addr.port())
}
Expand Down
6 changes: 4 additions & 2 deletions crates/arroyo-api/src/rest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ use arroyo_server_common::log_event;
use axum::extract::rejection::JsonRejection;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use axum::{Json, TypedHeader};
use axum::Json;
use axum_extra::headers::authorization::Bearer;
use axum_extra::headers::Authorization;
use axum_extra::TypedHeader;
use serde_json::json;
use tracing::{error, warn};

use axum::headers::authorization::{Authorization, Bearer};
use cornucopia_async::{DatabaseSource, DbError};
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-connectors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ anyhow = "1.0.71"
tracing = "0.1.37"
regress = "0.10.0"
futures = "0.3.28"
axum = {version = "0.6.12"}
axum = {workspace = true}
rand = "0.8.5"
base64 = "0.13.1"
bytes = "1.5.0"
Expand Down
9 changes: 3 additions & 6 deletions crates/arroyo-connectors/src/filesystem/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,10 @@ use arroyo_storage::StorageProvider;
use async_trait::async_trait;
use bincode::{Decode, Encode};
use chrono::{DateTime, Utc};
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::concat;
use datafusion::{
common::{Column, Result as DFResult},
execution::{
context::{SessionConfig, SessionState},
runtime_env::RuntimeEnv,
},
logical_expr::{
expr::ScalarFunction, Expr, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature, Volatility,
},
Expand Down Expand Up @@ -210,8 +207,8 @@ fn partition_string_for_fields_and_time(

fn compile_expression(expr: &Expr, schema: ArroyoSchemaRef) -> Result<Arc<dyn PhysicalExpr>> {
let physical_planner = DefaultPhysicalPlanner::default();
let session_state =
SessionState::new_with_config_rt(SessionConfig::new(), Arc::new(RuntimeEnv::default()));
let session_state = SessionStateBuilder::new().build();

let plan = physical_planner.create_physical_expr(
expr,
&(schema.schema.as_ref().clone()).try_into()?,
Expand Down
4 changes: 2 additions & 2 deletions crates/arroyo-connectors/src/kinesis/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use arroyo_operator::operator::ArrowOperator;
use arroyo_rpc::retry;
use arroyo_types::CheckpointBarrier;
use async_trait::async_trait;
use aws_config::{from_env, Region};
use aws_config::{BehaviorVersion, Region};
use aws_sdk_kinesis::primitives::Blob;
use aws_sdk_kinesis::types::PutRecordsRequestEntry;
use aws_sdk_kinesis::Client as KinesisClient;
Expand All @@ -33,7 +33,7 @@ impl ArrowOperator for KinesisSinkFunc {
}

async fn on_start(&mut self, _ctx: &mut ArrowContext) {
let mut loader = from_env();
let mut loader = aws_config::defaults(BehaviorVersion::v2024_03_28());
if let Some(region) = &self.aws_region {
loader = loader.region(Region::new(region.clone()));
}
Expand Down
4 changes: 2 additions & 2 deletions crates/arroyo-connectors/src/kinesis/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use arroyo_state::global_table_config;
use arroyo_state::tables::global_keyed_map::GlobalKeyedView;
use arroyo_types::{from_nanos, UserError};
use async_trait::async_trait;
use aws_config::{from_env, Region};
use aws_config::{BehaviorVersion, Region};
use aws_sdk_kinesis::error::SdkError;
use aws_sdk_kinesis::operation::get_records::GetRecordsOutput;
use aws_sdk_kinesis::operation::get_shard_iterator::builders::GetShardIteratorFluentBuilder;
Expand Down Expand Up @@ -337,7 +337,7 @@ impl KinesisSourceFunc {
}

async fn init_client(&mut self) {
let mut loader = from_env();
let mut loader = aws_config::defaults(BehaviorVersion::v2024_03_28());
if let Some(region) = &self.aws_region {
loader = loader.region(Region::new(region.clone()));
}
Expand Down
6 changes: 3 additions & 3 deletions crates/arroyo-controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ serde = "1"
anyhow = "1.0.70"

# Kubernetes
kube = { version = "0.91", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.22.0", features = ["v1_30"] }
kube = { version = "0.96", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.23.0", features = ["v1_30"] }
serde_yaml = {version = "0.9"}
shlex = "1.3"

Expand All @@ -64,7 +64,7 @@ regex = "1.7.3"
reqwest = { workspace = true, features = ["json"] }
uuid = "1.3.3"
async-stream = "0.3.5"
base64 = "0.21.5"
base64 = "0.22"
rusqlite = { version = "0.31.0", features = ["serde_json", "time"] }

[build-dependencies]
Expand Down
11 changes: 6 additions & 5 deletions crates/arroyo-controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,10 +628,10 @@ impl ControllerServer {
}

pub async fn start(self, guard: ShutdownGuard) -> anyhow::Result<u16> {
let reflection = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(arroyo_rpc::grpc::API_FILE_DESCRIPTOR_SET)
.build()
.unwrap();
// let reflection = tonic_reflection::server::Builder::configure()
// .register_encoded_file_descriptor_set(arroyo_rpc::grpc::API_FILE_DESCRIPTOR_SET)
// .build_v1()
// .unwrap();

let addr = SocketAddr::new(
config().controller.bind_address,
Expand All @@ -655,7 +655,8 @@ impl ControllerServer {
.send_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Zstd),
)
.add_service(reflection)
// TODO: re-enable once tonic 0.13 is released
//.add_service(reflection)
.serve_with_incoming(TcpListenerStream::new(listener)),
));

Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-datastream/src/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl Debug for LogicalNode {

pub type LogicalGraph = DiGraph<LogicalNode, LogicalEdge>;

#[derive(Clone, Debug, Eq, PartialEq, Hash)]
#[derive(Clone, Debug, Eq, PartialEq, Hash, PartialOrd)]
pub struct DylibUdfConfig {
pub dylib_path: String,
pub arg_types: Vec<DataType>,
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-formats/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ arroyo-rpc = { path = "../arroyo-rpc" }
apache-avro = "0.16.0"
serde = {version = "1.0", features = ["derive"]}
serde_json = "1.0"
utoipa = "4"
utoipa = { workspace = true }
arrow = { workspace = true }
arrow-schema = { workspace = true }
arrow-array = { workspace = true}
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-openapi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ progenitor = { version = '0.8' }
serde_json = "1.0"
syn = "1.0"

utoipa = "4"
utoipa = {workspace = true}
arroyo-api = { path = "../arroyo-api" }
2 changes: 1 addition & 1 deletion crates/arroyo-operator/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ impl Display for AsDisplayable<'_> {
write!(f, "{:?}", d)
}
AsDisplayable::Plan(p) => {
write!(f, "`{}`", displayable(*p).indent(false))
write!(f, "```\n{}\n```", displayable(*p).indent(false))
}
AsDisplayable::Schema(s) => {
for field in s.fields() {
Expand Down
3 changes: 2 additions & 1 deletion crates/arroyo-planner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ arroyo-udf-python = { path = "../arroyo-udf/arroyo-udf-python" }
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
datafusion-functions = { workspace = true }
datafusion-functions-window = { workspace = true }
datafusion-functions-json = { workspace = true }

prost = {workspace = true}
Expand All @@ -42,7 +43,7 @@ syn = {version = "2", features = ["full", "parsing", "extra-traits"]}
tracing = "0.1.37"
tracing-subscriber = "0.3"

serde_json_path = "0.6.3"
serde_json_path = "0.7"
apache-avro = "0.16.0"
prettyplease = "0.2.4"
unicase = "2.7.0"
Expand Down
Loading

0 comments on commit a9ac8f7

Please sign in to comment.