diff --git a/Cargo.lock b/Cargo.lock index f1b64dd71..564a4d56d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1509,9 +1509,9 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" [[package]] name = "aws-config" -version = "1.5.10" +version = "1.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b49afaa341e8dd8577e1a2200468f98956d6eda50bcf4a53246cc00174ba924" +checksum = "c03a50b30228d3af8865ce83376b4e99e1ffa34728220fe2860e4df0bb5278d6" dependencies = [ "aws-credential-types", "aws-runtime", @@ -1520,7 +1520,7 @@ dependencies = [ "aws-sdk-sts", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json", + "aws-smithy-json 0.61.1", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -1569,9 +1569,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.4.3" +version = "1.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a10d5c055aa540164d9561a0e2e74ad30f0dcf7393c3a92f6733ddf9c5762468" +checksum = "b16d1aa50accc11a4b4d5c50f7fb81cc0cf60328259c587d0e6b0f11385bde46" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -1602,7 +1602,7 @@ dependencies = [ "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json", + "aws-smithy-json 0.60.7", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -1625,7 +1625,7 @@ dependencies = [ "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json", + "aws-smithy-json 0.60.7", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -1647,7 +1647,7 @@ dependencies = [ "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json", + "aws-smithy-json 0.60.7", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -1661,15 +1661,15 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.49.0" +version = "1.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09677244a9da92172c8dc60109b4a9658597d4d298b188dd0018b6a66b410ca4" +checksum = "1605dc0bf9f0a4b05b451441a17fcb0bda229db384f23bf5cead3adbab0664ac" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json", + "aws-smithy-json 0.61.1", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -1683,15 +1683,15 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.50.0" +version = "1.54.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81fea2f3a8bb3bd10932ae7ad59cc59f65f270fc9183a7e91f501dc5efbef7ee" +checksum = "59f3f73466ff24f6ad109095e0f3f2c830bfb4cd6c8b12f744c8e61ebf4d3ba1" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json", + "aws-smithy-json 0.61.1", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -1705,15 +1705,15 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.49.0" +version = "1.54.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53dcf5e7d9bd1517b8b998e170e650047cea8a2b85fe1835abe3210713e541b7" +checksum = "249b2acaa8e02fd4718705a9494e3eb633637139aa4bb09d70965b0448e865db" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json", + "aws-smithy-json 0.61.1", "aws-smithy-query", "aws-smithy-runtime", "aws-smithy-runtime-api", @@ -1728,9 +1728,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.2.5" +version = "1.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5619742a0d8f253be760bfbb8e8e8368c69e3587e4637af5754e488a611499b1" +checksum = "7d3820e0c08d0737872ff3c7c1f21ebbb6693d832312d6152bf18ef50a5471c2" dependencies = [ "aws-credential-types", "aws-smithy-http", @@ -1751,9 +1751,9 @@ dependencies = [ [[package]] name = "aws-smithy-async" -version = "1.2.1" +version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62220bc6e97f946ddd51b5f1361f78996e704677afc518a4ff66b7a72ea1378c" +checksum = "427cb637d15d63d6f9aae26358e1c9a9c09d5aa490d64b09354c8217cfef0f28" dependencies = [ "futures-util", "pin-project-lite", @@ -1789,6 +1789,15 @@ dependencies = [ "aws-smithy-types", ] +[[package]] +name = "aws-smithy-json" +version = "0.61.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee4e69cc50921eb913c6b662f8d909131bb3e6ad6cb6090d3a39b66fc5c52095" +dependencies = [ + "aws-smithy-types", +] + [[package]] name = "aws-smithy-query" version = "0.60.7" @@ -1801,9 +1810,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.7.3" +version = "1.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be28bd063fa91fd871d131fc8b68d7cd4c5fa0869bea68daca50dcb1cbd76be2" +checksum = "a05dd41a70fc74051758ee75b5c4db2c0ca070ed9229c3df50e9475cda1cb985" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -1845,9 +1854,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.2.9" +version = "1.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fbd94a32b3a7d55d3806fe27d98d3ad393050439dd05eb53ece36ec5e3d3510" +checksum = "38ddc9bd6c28aeb303477170ddd183760a956a03e083b3902a990238a7e3792d" dependencies = [ "base64-simd", "bytes", @@ -3369,6 +3378,7 @@ source = "git+https://github.com/delta-io/delta-rs?rev=25ce38956e25722ba7a6cbc0f dependencies = [ "deltalake-aws", "deltalake-core", + "deltalake-gcp", ] [[package]] @@ -3454,6 +3464,24 @@ dependencies = [ "z85", ] +[[package]] +name = "deltalake-gcp" +version = "0.6.0" +source = "git+https://github.com/delta-io/delta-rs?rev=25ce38956e25722ba7a6cbc0f5a7dba6b3361554#25ce38956e25722ba7a6cbc0f5a7dba6b3361554" +dependencies = [ + "async-trait", + "bytes", + "deltalake-core", + "futures", + "lazy_static", + "object_store", + "regex", + "thiserror 2.0.3", + "tokio", + "tracing", + "url", +] + [[package]] name = "der" version = "0.6.1" @@ -6284,7 +6312,7 @@ dependencies = [ [[package]] name = "object_store" version = "0.11.1" -source = "git+http://github.com/ArroyoSystems/arrow-rs?branch=object_store_0.11.1%2Farroyo#4cfe48061503161e43cd3cd7960e74ce789bd3b9" +source = "git+http://github.com/ArroyoSystems/arrow-rs?branch=object_store_0.11.1%2Farroyo#3eb9c26755e935892b8dfadd7acdb376ea52cc95" dependencies = [ "async-trait", "base64 0.22.1", @@ -7225,8 +7253,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" dependencies = [ "bytes", - "heck 0.5.0", - "itertools 0.13.0", + "heck 0.4.1", + "itertools 0.10.5", "log", "multimap", "once_cell", @@ -7272,7 +7300,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" dependencies = [ "anyhow", - "itertools 0.13.0", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.87", @@ -7362,7 +7390,7 @@ dependencies = [ "indoc", "libc", "memoffset", - "parking_lot 0.12.3", + "parking_lot 0.11.2", "portable-atomic", "pyo3-build-config", "pyo3-ffi", @@ -8747,7 +8775,7 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03c3c6b7927ffe7ecaa769ee0e3994da3b8cafc8f444578982c83ecb161af917" dependencies = [ - "heck 0.5.0", + "heck 0.4.1", "proc-macro2", "quote", "syn 2.0.87", @@ -10041,9 +10069,9 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "url" -version = "2.5.3" +version = "2.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d157f1b96d14500ffdc1f10ba712e780825526c03d9a49b4d0324b0d9113ada" +checksum = "32f8b686cadd1473f4bd0117a5d28d36b1ade384ea9b5069a1c40aefed7fda60" dependencies = [ "form_urlencoded", "idna", @@ -10376,7 +10404,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.48.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 5195d8186..1e6728393 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,7 +63,7 @@ prost = { version = "0.13", features = ["no-recursion-limit"] } prost-reflect = "0.14.0" prost-build = {version = "0.13" } prost-types = "0.13" -aws-config = "1.5.6" +aws-config = "1.5.13" reqwest = "0.12" [profile.release] diff --git a/crates/arroyo-connectors/Cargo.toml b/crates/arroyo-connectors/Cargo.toml index edd62af6e..22331d691 100644 --- a/crates/arroyo-connectors/Cargo.toml +++ b/crates/arroyo-connectors/Cargo.toml @@ -83,7 +83,7 @@ uuid = { version = "1.7.0", features = ["v4"] } # Filesystem parquet = { workspace = true, features = ["async"]} object_store = { workspace = true } -deltalake = { workspace = true, features = ["s3"] } +deltalake = { workspace = true, features = ["s3", "gcs"] } async-compression = { version = "0.4.3", features = ["tokio", "zstd", "gzip"] } # MQTT diff --git a/crates/arroyo-connectors/src/filesystem/sink/delta.rs b/crates/arroyo-connectors/src/filesystem/sink/delta.rs index 3dd3eff7b..da01b2cc1 100644 --- a/crates/arroyo-connectors/src/filesystem/sink/delta.rs +++ b/crates/arroyo-connectors/src/filesystem/sink/delta.rs @@ -1,52 +1,40 @@ use super::FinishedFile; use anyhow::{Context, Result}; -use arrow::datatypes::{Schema, SchemaRef}; -use arroyo_storage::{get_current_credentials, StorageProvider}; +use arrow::datatypes::Schema; +use arroyo_storage::{BackendConfig, StorageProvider}; use arroyo_types::to_millis; +use deltalake::aws::storage::S3StorageBackend; use deltalake::TableProperty::{MinReaderVersion, MinWriterVersion}; use deltalake::{ - aws::storage::s3_constants::AWS_S3_ALLOW_UNSAFE_RENAME, kernel::{Action, Add}, operations::create::CreateBuilder, protocol::SaveMode, table::PeekCommit, - DeltaTableBuilder, + DeltaTable, DeltaTableBuilder, }; -use object_store::{aws::AmazonS3ConfigKey, path::Path}; -use once_cell::sync::Lazy; +use object_store::{path::Path, ObjectStore}; +use std::sync::Arc; use std::{ collections::{HashMap, HashSet}, time::SystemTime, }; use tracing::debug; - -static INIT: Lazy<()> = Lazy::new(|| { - deltalake::aws::register_handlers(None); -}); +use url::Url; pub(crate) async fn commit_files_to_delta( finished_files: &[FinishedFile], relative_table_path: &Path, - storage_provider: &StorageProvider, + table: &mut DeltaTable, last_version: i64, - schema: SchemaRef, ) -> Result> { if finished_files.is_empty() { return Ok(None); } let add_actions = create_add_actions(finished_files, relative_table_path)?; - let table_path = build_table_path(storage_provider, relative_table_path); - let storage_options = configure_storage_options(&table_path, storage_provider).await?; - let mut table = load_or_create_table(&table_path, storage_options, &schema).await?; - if let Some(new_version) = check_existing_files( - &mut table, - last_version, - finished_files, - relative_table_path, - ) - .await? + if let Some(new_version) = + check_existing_files(table, last_version, finished_files, relative_table_path).await? { return Ok(Some(new_version)); } @@ -55,74 +43,47 @@ pub(crate) async fn commit_files_to_delta( Ok(Some(new_version)) } -async fn load_or_create_table( - table_path: &str, - storage_options: HashMap, +pub(crate) async fn load_or_create_table( + table_path: &Path, + storage_provider: &StorageProvider, schema: &Schema, -) -> Result { - Lazy::force(&INIT); +) -> Result { deltalake::aws::register_handlers(None); - match DeltaTableBuilder::from_uri(table_path) - .with_storage_options(storage_options.clone()) - .load() - .await - { - Ok(table) => Ok(table), - Err(deltalake::DeltaTableError::NotATable(_)) => { - create_new_table(table_path, storage_options, schema).await - } - Err(err) => Err(err.into()), + deltalake::gcp::register_handlers(None); + + let (backing_store, url): (Arc, _) = match storage_provider.config() { + BackendConfig::S3(_) => ( + Arc::new(S3StorageBackend::try_new( + storage_provider.get_backing_store(), + true, + )?), + format!("s3://{}", storage_provider.qualify_path(table_path)), + ), + BackendConfig::GCS(_) => ( + storage_provider.get_backing_store(), + format!("gs://{}", storage_provider.qualify_path(table_path)), + ), + BackendConfig::Local(_) => (storage_provider.get_backing_store(), table_path.to_string()), + }; + + let mut delta = DeltaTableBuilder::from_uri(&url) + .with_storage_backend(backing_store, Url::parse(storage_provider.canonical_url())?) + .build()?; + + if delta.verify_deltatable_existence().await? { + delta.load().await?; + Ok(delta) + } else { + let delta_schema: deltalake::kernel::Schema = schema.try_into()?; + Ok(CreateBuilder::new() + .with_log_store(delta.log_store()) + .with_columns(delta_schema.fields().cloned()) + .with_configuration_property(MinReaderVersion, Some("3")) + .with_configuration_property(MinWriterVersion, Some("7")) + .await?) } } -async fn create_new_table( - table_path: &str, - storage_options: HashMap, - schema: &Schema, -) -> Result { - let delta_object_store = DeltaTableBuilder::from_uri(table_path) - .with_storage_options(storage_options) - .build_storage()?; - let delta_schema: deltalake::kernel::Schema = (schema).try_into()?; - CreateBuilder::new() - .with_log_store(delta_object_store) - .with_columns(delta_schema.fields().cloned()) - .with_configuration_property(MinReaderVersion, Some("3")) - .with_configuration_property(MinWriterVersion, Some("7")) - .await - .map_err(Into::into) -} - -async fn configure_storage_options( - table_path: &str, - storage_provider: &StorageProvider, -) -> Result> { - let mut options = storage_provider.storage_options().clone(); - if table_path.starts_with("s3://") { - update_s3_credentials(&mut options).await?; - } - Ok(options) -} - -async fn update_s3_credentials(options: &mut HashMap) -> Result<()> { - if !options.contains_key(AmazonS3ConfigKey::SecretAccessKey.as_ref()) { - let tmp_credentials = get_current_credentials().await?; - options.insert( - AmazonS3ConfigKey::AccessKeyId.as_ref().to_string(), - tmp_credentials.key_id.clone(), - ); - options.insert( - AmazonS3ConfigKey::SecretAccessKey.as_ref().to_string(), - tmp_credentials.secret_key.clone(), - ); - if let Some(token) = tmp_credentials.token.as_ref() { - options.insert(AmazonS3ConfigKey::Token.as_ref().to_string(), token.clone()); - } - } - options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_string(), "true".to_string()); - Ok(()) -} - fn create_add_actions( finished_files: &[FinishedFile], relative_table_path: &Path, @@ -158,7 +119,7 @@ fn create_add_action(file: &FinishedFile, relative_table_path: &Path) -> Result< } async fn check_existing_files( - table: &mut deltalake::DeltaTable, + table: &mut DeltaTable, last_version: i64, finished_files: &[FinishedFile], relative_table_path: &Path, @@ -191,7 +152,7 @@ async fn check_existing_files( Ok(None) } -async fn commit_to_delta(table: deltalake::DeltaTable, add_actions: Vec) -> Result { +async fn commit_to_delta(table: &mut DeltaTable, add_actions: Vec) -> Result { Ok(deltalake::operations::transaction::CommitBuilder::default() .with_actions(add_actions) .build( @@ -206,11 +167,3 @@ async fn commit_to_delta(table: deltalake::DeltaTable, add_actions: Vec) .await? .version) } - -fn build_table_path(storage_provider: &StorageProvider, relative_table_path: &Path) -> String { - format!( - "{}/{}", - storage_provider.object_store_base_url(), - relative_table_path - ) -} diff --git a/crates/arroyo-connectors/src/filesystem/sink/local.rs b/crates/arroyo-connectors/src/filesystem/sink/local.rs index 96b426547..a9a8ae07b 100644 --- a/crates/arroyo-connectors/src/filesystem/sink/local.rs +++ b/crates/arroyo-connectors/src/filesystem/sink/local.rs @@ -2,29 +2,25 @@ use std::{collections::HashMap, fs::create_dir_all, path::Path, sync::Arc, time: use arrow::record_batch::RecordBatch; use arroyo_operator::context::OperatorContext; -use arroyo_rpc::{ - df::{ArroyoSchema, ArroyoSchemaRef}, - formats::Format, - OperatorConfig, -}; +use arroyo_rpc::{df::ArroyoSchemaRef, formats::Format, OperatorConfig}; use arroyo_storage::StorageProvider; use arroyo_types::TaskInfo; use async_trait::async_trait; use bincode::{Decode, Encode}; use datafusion::physical_plan::PhysicalExpr; use tokio::{fs::OpenOptions, io::AsyncWriteExt}; -use tracing::info; +use tracing::debug; use uuid::Uuid; -use crate::filesystem::{sink::two_phase_committer::TwoPhaseCommitter, FileSettings}; -use anyhow::{bail, Result}; - use super::{ add_suffix_prefix, delta, get_partitioner_from_file_settings, parquet::batches_by_partition, two_phase_committer::TwoPhaseCommitterOperator, CommitState, CommitStyle, FileNaming, FileSystemTable, FilenameStrategy, FinishedFile, MultiPartWriterStats, RollingPolicy, TableType, }; +use crate::filesystem::sink::delta::load_or_create_table; +use crate::filesystem::{sink::two_phase_committer::TwoPhaseCommitter, FileSettings}; +use anyhow::{bail, Result}; pub struct LocalFileSystemWriter { // writer to a local tmp file @@ -40,7 +36,7 @@ pub struct LocalFileSystemWriter { file_settings: FileSettings, format: Option, schema: Option, - commit_state: CommitState, + commit_state: Option, filenaming: FileNaming, } @@ -64,10 +60,6 @@ impl LocalFileSystemWriter { else { unreachable!("LocalFileSystemWriter can only be used as a sink") }; - let commit_state = match file_settings.as_ref().unwrap().commit_style.unwrap() { - CommitStyle::DeltaLake => CommitState::DeltaLake { last_version: -1 }, - CommitStyle::Direct => CommitState::VanillaParquet, - }; let mut filenaming = file_settings .clone() @@ -92,36 +84,16 @@ impl LocalFileSystemWriter { partitioner: None, finished_files: Vec::new(), file_settings: file_settings.clone().unwrap(), - schema: None, format: config.format, rolling_policy: RollingPolicy::from_file_settings(file_settings.as_ref().unwrap()), table_properties, - commit_state, + schema: None, + commit_state: None, filenaming, }; TwoPhaseCommitterOperator::new(writer) } - fn init_schema_and_partitioner(&mut self, record_batch: &RecordBatch) -> Result<()> { - if self.schema.is_none() { - self.schema = Some(Arc::new(ArroyoSchema::from_fields( - record_batch - .schema() - .fields() - .into_iter() - .map(|field| field.as_ref().clone()) - .collect(), - ))); - } - if self.partitioner.is_none() { - self.partitioner = get_partitioner_from_file_settings( - self.file_settings.clone(), - self.schema.as_ref().unwrap().clone(), - ); - } - Ok(()) - } - fn get_or_insert_writer(&mut self, partition: &Option) -> &mut V { let filename_strategy = match self.filenaming.strategy { Some(FilenameStrategy::Uuid) => FilenameStrategy::Uuid, @@ -258,13 +230,33 @@ impl TwoPhaseCommitter for LocalFileSystemWrite self.subtask_id = ctx.task_info.task_index as usize; self.finished_files = recovered_files; self.next_file_index = max_file_index; + + let storage_provider = StorageProvider::for_url(&self.final_dir).await?; + + let schema = Arc::new(ctx.in_schemas[0].clone()); + + self.commit_state = Some(match self.file_settings.commit_style.unwrap() { + CommitStyle::DeltaLake => CommitState::DeltaLake { + last_version: -1, + table: load_or_create_table( + &object_store::path::Path::parse(&self.final_dir)?, + &storage_provider, + &schema.schema_without_timestamp(), + ) + .await?, + }, + CommitStyle::Direct => CommitState::VanillaParquet, + }); + + self.partitioner = + get_partitioner_from_file_settings(self.file_settings.clone(), schema.clone()); + + self.schema = Some(schema); + Ok(()) } async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> { - if self.schema.is_none() { - self.init_schema_and_partitioner(&batch)?; - } if let Some(partitioner) = self.partitioner.as_ref() { for (batch, partition) in batches_by_partition(batch, partitioner.clone())? { let writer = self.get_or_insert_writer(&partition); @@ -298,7 +290,7 @@ impl TwoPhaseCommitter for LocalFileSystemWrite if !tmp_file.exists() { bail!("tmp file {} does not exist", tmp_file.to_string_lossy()); } - info!( + debug!( "committing file {} to {}", tmp_file.to_string_lossy(), destination.to_string_lossy() @@ -311,20 +303,21 @@ impl TwoPhaseCommitter for LocalFileSystemWrite size: destination.metadata()?.len() as usize, }); } - if let CommitState::DeltaLake { last_version } = self.commit_state { - let storage_provider = Arc::new(StorageProvider::for_url("/").await?); + + if let CommitState::DeltaLake { + last_version, + table, + } = self.commit_state.as_mut().unwrap() + { if let Some(version) = delta::commit_files_to_delta( &finished_files, &object_store::path::Path::parse(&self.final_dir)?, - &storage_provider, - last_version, - Arc::new(self.schema.as_ref().unwrap().schema_without_timestamp()), + table, + *last_version, ) .await? { - self.commit_state = CommitState::DeltaLake { - last_version: version, - }; + *last_version = version; } } Ok(()) diff --git a/crates/arroyo-connectors/src/filesystem/sink/mod.rs b/crates/arroyo-connectors/src/filesystem/sink/mod.rs index 8c0c24249..46521e056 100644 --- a/crates/arroyo-connectors/src/filesystem/sink/mod.rs +++ b/crates/arroyo-connectors/src/filesystem/sink/mod.rs @@ -32,11 +32,12 @@ use datafusion::{ physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}, scalar::ScalarValue, }; +use deltalake::DeltaTable; use futures::{stream::FuturesUnordered, Future}; use futures::{stream::StreamExt, TryStreamExt}; use object_store::{multipart::PartId, path::Path, MultipartId}; use tokio::sync::mpsc::{Receiver, Sender}; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; use uuid::Uuid; use arroyo_types::*; @@ -56,6 +57,7 @@ use self::{ }, }; +use crate::filesystem::sink::delta::load_or_create_table; use crate::filesystem::{ CommitStyle, FileNaming, FileSettings, FileSystemTable, FilenameStrategy, TableType, }; @@ -109,7 +111,7 @@ impl FileSystemSink { Self::create_and_start(table_properties, config.format) } - pub fn start(&mut self, schema: ArroyoSchemaRef) -> Result<()> { + pub async fn start(&mut self, schema: ArroyoSchemaRef) -> Result<()> { let TableType::Sink { write_path, file_settings, @@ -132,21 +134,22 @@ impl FileSystemSink { self.partitioner = partition_func; let table = self.table.clone(); let format = self.format.clone(); + let storage_path: Path = StorageProvider::get_key(&write_path).unwrap(); + let provider = StorageProvider::for_url_with_options(&write_path, storage_options.clone()) + .await + .unwrap(); + let mut writer = AsyncMultipartFileSystemWriter::::new( + storage_path, + Arc::new(provider), + receiver, + checkpoint_sender, + table, + format, + schema, + ) + .await?; + tokio::spawn(async move { - let storage_path: Path = StorageProvider::get_key(&write_path).unwrap(); - let provider = - StorageProvider::for_url_with_options(&write_path, storage_options.clone()) - .await - .unwrap(); - let mut writer = AsyncMultipartFileSystemWriter::::new( - storage_path, - Arc::new(provider), - receiver, - checkpoint_sender, - table, - format, - schema, - ); writer.run().await.unwrap(); }); Ok(()) @@ -437,9 +440,12 @@ struct AsyncMultipartFileSystemWriter { schema: ArroyoSchemaRef, } -#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] +#[derive(Debug)] pub enum CommitState { - DeltaLake { last_version: i64 }, + DeltaLake { + last_version: i64, + table: DeltaTable, + }, VanillaParquet, } @@ -501,7 +507,7 @@ async fn from_checkpoint( parts_to_add, trailing_bytes, } => { - info!("finishing file for path {:?}", path); + debug!("finishing file for path {:?}", path); let multipart_id = object_store .start_multipart(path) .await @@ -521,7 +527,7 @@ async fn from_checkpoint( .await?; parts.push(upload_part); } - info!( + debug!( "parts: {:?}, pushed_size: {:?}, multipart id: {:?}", parts, pushed_size, multipart_id ); @@ -701,7 +707,7 @@ impl AsyncMultipartFileSystemWriter where R: MultiPartWriter, { - fn new( + async fn new( path: Path, object_store: Arc, receiver: Receiver, @@ -709,7 +715,7 @@ where writer_properties: FileSystemTable, format: Option, schema: ArroyoSchemaRef, - ) -> Self { + ) -> Result { let file_settings = if let TableType::Sink { ref file_settings, .. } = writer_properties.table_type @@ -720,7 +726,15 @@ where }; let commit_state = match file_settings.commit_style.unwrap() { - CommitStyle::DeltaLake => CommitState::DeltaLake { last_version: -1 }, + CommitStyle::DeltaLake => CommitState::DeltaLake { + last_version: -1, + table: load_or_create_table( + &path, + &object_store, + &schema.schema_without_timestamp(), + ) + .await?, + }, CommitStyle::Direct => CommitState::VanillaParquet, }; let mut file_naming = file_settings.file_naming.clone().unwrap_or(FileNaming { @@ -732,7 +746,7 @@ where file_naming.suffix = Some(R::suffix()); } - Self { + Ok(Self { path, active_writers: HashMap::new(), watermark: None, @@ -750,7 +764,7 @@ where file_naming, format, schema, - } + }) } fn add_part_to_finish(&mut self, file_to_finish: FileToFinish) { @@ -926,19 +940,16 @@ where finished_files.push(file); } } - if let CommitState::DeltaLake { last_version } = self.commit_state { - if let Some(new_version) = delta::commit_files_to_delta( - &finished_files, - &self.path, - &self.object_store, - last_version, - Arc::new(self.schema.schema_without_timestamp()), - ) - .await? + if let CommitState::DeltaLake { + last_version, + table, + } = &mut self.commit_state + { + if let Some(new_version) = + delta::commit_files_to_delta(&finished_files, &self.path, table, *last_version) + .await? { - self.commit_state = CommitState::DeltaLake { - last_version: new_version, - }; + *last_version = new_version; } } let finished_message = CheckpointData::Finished { @@ -950,8 +961,8 @@ where } fn delta_version(&mut self) -> i64 { - match self.commit_state { - CommitState::DeltaLake { last_version } => last_version, + match &self.commit_state { + CommitState::DeltaLake { last_version, .. } => *last_version, CommitState::VanillaParquet => 0, } } @@ -1553,9 +1564,12 @@ impl TwoPhaseCommitter for FileSystemSink, ) -> Result<()> { - self.start(Arc::new(ctx.in_schemas.first().unwrap().clone()))?; + self.start(Arc::new(ctx.in_schemas.first().unwrap().clone())) + .await?; + let mut max_file_index = 0; let mut recovered_files = Vec::new(); + for file_system_data_recovery in data_recovery { max_file_index = max_file_index.max(file_system_data_recovery.next_file_index); // task 0 is responsible for recovering all files. diff --git a/crates/arroyo-connectors/src/filesystem/sink/two_phase_committer.rs b/crates/arroyo-connectors/src/filesystem/sink/two_phase_committer.rs index 497dc5209..69fee9f46 100644 --- a/crates/arroyo-connectors/src/filesystem/sink/two_phase_committer.rs +++ b/crates/arroyo-connectors/src/filesystem/sink/two_phase_committer.rs @@ -13,7 +13,7 @@ use bincode::config; use prost::Message; use std::fmt::Debug; use std::{collections::HashMap, time::SystemTime}; -use tracing::info; +use tracing::debug; pub struct TwoPhaseCommitterOperator { committer: TPC, @@ -86,7 +86,7 @@ impl TwoPhaseCommitterOperator { mut commit_data: HashMap>>, ctx: &mut OperatorContext, ) { - info!("received commit message"); + debug!("received commit message"); let pre_commits = match self.committer.commit_strategy() { CommitStrategy::PerSubtask => std::mem::take(&mut self.pre_commits), CommitStrategy::PerOperator => { diff --git a/crates/arroyo-controller/src/schedulers/kubernetes/mod.rs b/crates/arroyo-controller/src/schedulers/kubernetes/mod.rs index 06e88f537..ad2831ba1 100644 --- a/crates/arroyo-controller/src/schedulers/kubernetes/mod.rs +++ b/crates/arroyo-controller/src/schedulers/kubernetes/mod.rs @@ -180,13 +180,6 @@ impl Scheduler for KubernetesScheduler { let replicas = (req.slots as f32 / config().kubernetes_scheduler.worker.task_slots as f32) .ceil() as usize; - info!( - job_id = *req.job_id, - message = "starting workers on k8s", - replicas, - task_slots = req.slots - ); - let max_slots_per_pod = config().kubernetes_scheduler.worker.task_slots as usize; let mut slots_scheduled = 0; let mut pods = vec![]; diff --git a/crates/arroyo-state/src/tables/global_keyed_map.rs b/crates/arroyo-state/src/tables/global_keyed_map.rs index 2f359b0d9..117bc2d2b 100644 --- a/crates/arroyo-state/src/tables/global_keyed_map.rs +++ b/crates/arroyo-state/src/tables/global_keyed_map.rs @@ -17,7 +17,7 @@ use parquet::{ basic::ZstdLevel, file::properties::{EnabledStatistics, WriterProperties}, }; -use tracing::info; +use tracing::debug; use std::iter::Zip; @@ -244,7 +244,7 @@ impl TableEpochCheckpointer for GlobalKeyedCheckpointer { bail!("global keyed data expects KeyedData, not record batches") } TableData::CommitData { data } => { - info!("received commit data"); + debug!("received commit data"); // set commit data, failing if it was already set if self.commit_data.is_some() { bail!("commit data already set for this epoch") diff --git a/crates/arroyo-storage/src/aws.rs b/crates/arroyo-storage/src/aws.rs index e5bba4d90..f7701a748 100644 --- a/crates/arroyo-storage/src/aws.rs +++ b/crates/arroyo-storage/src/aws.rs @@ -1,11 +1,23 @@ use crate::StorageError; -use aws_config::BehaviorVersion; -use aws_credential_types::provider::ProvideCredentials; +use aws_config::timeout::TimeoutConfig; +use aws_config::{BehaviorVersion, SdkConfig}; +use aws_credential_types::provider::{ProvideCredentials, SharedCredentialsProvider}; use object_store::{aws::AwsCredential, CredentialProvider}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant, SystemTime}; +use tokio::sync::OnceCell; +use tokio::task::JoinHandle; +use tracing::info; +const EXPIRATION_BUFFER: Duration = Duration::from_secs(5 * 60); + +type TemporaryToken = (Arc, Option, Instant); + +#[derive(Clone)] pub struct ArroyoCredentialProvider { - provider: aws_credential_types::provider::SharedCredentialsProvider, + cache: Arc>>, + provider: SharedCredentialsProvider, + refresh_task: Arc>>>, } impl std::fmt::Debug for ArroyoCredentialProvider { @@ -14,48 +26,141 @@ impl std::fmt::Debug for ArroyoCredentialProvider { } } +static AWS_CONFIG: OnceCell> = OnceCell::const_new(); +static CREDENTIAL_PROVIDER: OnceCell = OnceCell::const_new(); + +async fn get_config<'a>() -> &'a SdkConfig { + AWS_CONFIG + .get_or_init(|| async { + Arc::new( + aws_config::defaults(BehaviorVersion::latest()) + .timeout_config( + TimeoutConfig::builder() + .operation_timeout(Duration::from_secs(60)) + .operation_attempt_timeout(Duration::from_secs(5)) + .build(), + ) + .load() + .await, + ) + }) + .await +} + impl ArroyoCredentialProvider { pub async fn try_new() -> Result { - let config = aws_config::defaults(BehaviorVersion::latest()).load().await; - - let credentials = config - .credentials_provider() - .ok_or_else(|| { - StorageError::CredentialsError( - "Unable to load S3 credentials from environment".to_string(), - ) - })? - .clone(); - - Ok(Self { - provider: credentials, - }) + Ok(CREDENTIAL_PROVIDER + .get_or_try_init(|| async { + let config = get_config().await; + + let credentials = config + .credentials_provider() + .ok_or_else(|| { + StorageError::CredentialsError( + "Unable to load S3 credentials from environment".to_string(), + ) + })? + .clone(); + + info!("Creating credential provider"); + Ok::(Self { + cache: Default::default(), + refresh_task: Default::default(), + provider: credentials, + }) + }) + .await? + .clone()) } pub async fn default_region() -> Option { - aws_config::defaults(BehaviorVersion::latest()) - .load() - .await - .region() - .map(|r| r.to_string()) + get_config().await.region().map(|r| r.to_string()) } } +async fn get_token( + provider: &SharedCredentialsProvider, +) -> Result<(Arc, Option, Instant), object_store::Error> { + info!("fetching new AWS token"); + let creds = provider + .provide_credentials() + .await + .map_err(|e| object_store::Error::Generic { + store: "S3", + source: Box::new(e), + })?; + Ok(( + Arc::new(AwsCredential { + key_id: creds.access_key_id().to_string(), + secret_key: creds.secret_access_key().to_string(), + token: creds.session_token().map(ToString::to_string), + }), + creds.expiry(), + Instant::now(), + )) +} + #[async_trait::async_trait] impl CredentialProvider for ArroyoCredentialProvider { type Credential = AwsCredential; async fn get_credential(&self) -> object_store::Result> { - let creds = self.provider.provide_credentials().await.map_err(|e| { - object_store::Error::Generic { - store: "S3", - source: Box::new(e), + let token = self.cache.lock().await.clone(); + match token { + Some((token, Some(expiration), last_refreshed)) => { + let expires_in = expiration + .duration_since(SystemTime::now()) + .unwrap_or_default(); + if expires_in < Duration::from_millis(100) { + info!("AWS token has expired, immediately refreshing"); + let lock = self.cache.try_lock(); + + let token = get_token(&self.provider).await?; + + if let Ok(mut lock) = lock { + *lock = Some(token.clone()); + } + return Ok(token.0); + } + + if expires_in < EXPIRATION_BUFFER + && last_refreshed.elapsed() > Duration::from_millis(100) + { + let refresh_lock = self.refresh_task.try_lock(); + if let Ok(mut task) = refresh_lock { + if task.is_some() && !task.as_ref().unwrap().is_finished() { + // the task is working on refreshing, let it do its job + return Ok(token); + } + + // else we need to start a refresh task + let our_provider = self.provider.clone(); + let our_lock = self.cache.clone(); + *task = Some(tokio::spawn(async move { + let token = get_token(&our_provider) + .await + .unwrap_or_else(|e| panic!("Failed to refresh AWS token: {:?}", e)); + + let mut lock = our_lock.lock().await; + *lock = Some(token); + })); + } + } + + Ok(token) } - })?; - Ok(Arc::new(AwsCredential { - key_id: creds.access_key_id().to_string(), - secret_key: creds.secret_access_key().to_string(), - token: creds.session_token().map(ToString::to_string), - })) + Some((token, None, _)) => Ok(token), + None => { + // get the initial token + let mut cache = self.cache.lock().await; + if let Some((token, _, _)) = &*cache { + return Ok(token.clone()); + } + + let token = get_token(&self.provider).await?; + *cache = Some(token.clone()); + Ok(token.0) + } + } } } diff --git a/crates/arroyo-storage/src/lib.rs b/crates/arroyo-storage/src/lib.rs index b187d672d..b7a6eacef 100644 --- a/crates/arroyo-storage/src/lib.rs +++ b/crates/arroyo-storage/src/lib.rs @@ -2,15 +2,15 @@ use arroyo_rpc::retry; use aws::ArroyoCredentialProvider; use bytes::Bytes; use futures::{Stream, StreamExt}; -use object_store::aws::{AmazonS3ConfigKey, AwsCredential}; +use object_store::aws::AmazonS3ConfigKey; use object_store::buffered::BufWriter; use object_store::gcp::GoogleCloudStorageBuilder; use object_store::multipart::{MultipartStore, PartId}; use object_store::path::Path; +use object_store::ObjectMeta; use object_store::{ aws::AmazonS3Builder, local::LocalFileSystem, MultipartId, ObjectStore, PutPayload, }; -use object_store::{CredentialProvider, ObjectMeta}; use regex::{Captures, Regex}; use std::borrow::Cow; use std::fmt::{Debug, Formatter}; @@ -35,9 +35,6 @@ pub struct StorageProvider { object_store: Arc, multipart_store: Option>, canonical_url: String, - // A URL that object_store can parse. - // May require storage_options to properly instantiate - object_store_base_url: String, storage_options: HashMap, } @@ -305,12 +302,6 @@ fn last(opts: [Option; COUNT]) -> Option { opts.into_iter().flatten().last() } -pub async fn get_current_credentials() -> Result, StorageError> { - let provider = ArroyoCredentialProvider::try_new().await?; - let credentials = provider.get_credential().await?; - Ok(credentials) -} - impl StorageProvider { pub async fn for_url(url: &str) -> Result { Self::for_url_with_options(url, HashMap::new()).await @@ -424,7 +415,6 @@ impl StorageProvider { if let Some(key) = &config.key { canonical_url = format!("{}/{}", canonical_url, key); } - let object_store_base_url = format!("s3://{}", config.bucket); let object_store = Arc::new(builder.build().map_err(Into::::into)?); @@ -433,7 +423,6 @@ impl StorageProvider { object_store: object_store.clone(), multipart_store: Some(object_store), canonical_url, - object_store_base_url, storage_options: s3_options .into_iter() .map(|(k, v)| (k.as_ref().to_string(), v)) @@ -454,15 +443,12 @@ impl StorageProvider { canonical_url = format!("{}/{}", canonical_url, key); } - let object_store_base_url = format!("https://{}.storage.googleapis.com", config.bucket); - let object_store = Arc::new(builder.build()?); Ok(Self { config: BackendConfig::GCS(config), object_store: object_store.clone(), multipart_store: Some(object_store), - object_store_base_url, canonical_url, storage_options: HashMap::new(), }) @@ -481,13 +467,11 @@ impl StorageProvider { ); let canonical_url = format!("file://{}", config.path); - let object_store_base_url = canonical_url.clone(); Ok(Self { config: BackendConfig::Local(config), object_store, multipart_store: None, canonical_url, - object_store_base_url, storage_options: HashMap::new(), }) } @@ -627,13 +611,6 @@ impl StorageProvider { format!("{}/{}", self.canonical_url, path) } - // Returns a url that will, combined with storage_options, parse to - // the same ObjectStore as self.object_store. - // Needed for systems that build their own ObjectStore, such as delta-rs - pub fn object_store_base_url(&self) -> &str { - &self.object_store_base_url - } - pub fn storage_options(&self) -> &HashMap { &self.storage_options }