From 79ac88944626ff6bd8312f81d0bffce7de91f848 Mon Sep 17 00:00:00 2001 From: Micah Wylde Date: Mon, 6 Jan 2025 16:37:10 -0800 Subject: [PATCH] Add cache to AWS token provider --- Cargo.lock | 2 +- Cargo.toml | 3 +- .../filesystem/sink/two_phase_committer.rs | 4 +- crates/arroyo-storage/src/aws.rs | 49 ++++++++++++++----- 4 files changed, 42 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f1b64dd71..d3c0700d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6284,7 +6284,7 @@ dependencies = [ [[package]] name = "object_store" version = "0.11.1" -source = "git+http://github.com/ArroyoSystems/arrow-rs?branch=object_store_0.11.1%2Farroyo#4cfe48061503161e43cd3cd7960e74ce789bd3b9" +source = "git+http://github.com/ArroyoSystems/arrow-rs?branch=public_token_cache#8b0175855610c7e895546afc8df966a4aeabee88" dependencies = [ "async-trait", "base64 0.22.1", diff --git a/Cargo.toml b/Cargo.toml index 5195d8186..f1c277f32 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,7 +90,8 @@ datafusion-functions-window = {git = 'https://github.com/ArroyoSystems/arrow-dat datafusion-functions-json = {git = 'https://github.com/ArroyoSystems/datafusion-functions-json', branch = 'datafusion_43'} -object_store = { git = 'http://github.com/ArroyoSystems/arrow-rs', branch = 'object_store_0.11.1/arroyo' } +# object_store = { git = 'http://github.com/ArroyoSystems/arrow-rs', branch = 'object_store_0.11.1/arroyo' } +object_store = { git = 'http://github.com/ArroyoSystems/arrow-rs', branch = 'public_token_cache' } cornucopia_async = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" } cornucopia = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" } diff --git a/crates/arroyo-connectors/src/filesystem/sink/two_phase_committer.rs b/crates/arroyo-connectors/src/filesystem/sink/two_phase_committer.rs index 497dc5209..69fee9f46 100644 --- a/crates/arroyo-connectors/src/filesystem/sink/two_phase_committer.rs +++ b/crates/arroyo-connectors/src/filesystem/sink/two_phase_committer.rs @@ -13,7 +13,7 @@ use bincode::config; use prost::Message; use std::fmt::Debug; use std::{collections::HashMap, time::SystemTime}; -use tracing::info; +use tracing::debug; pub struct TwoPhaseCommitterOperator { committer: TPC, @@ -86,7 +86,7 @@ impl TwoPhaseCommitterOperator { mut commit_data: HashMap>>, ctx: &mut OperatorContext, ) { - info!("received commit message"); + debug!("received commit message"); let pre_commits = match self.committer.commit_strategy() { CommitStrategy::PerSubtask => std::mem::take(&mut self.pre_commits), CommitStrategy::PerOperator => { diff --git a/crates/arroyo-storage/src/aws.rs b/crates/arroyo-storage/src/aws.rs index e5bba4d90..2680e1b9b 100644 --- a/crates/arroyo-storage/src/aws.rs +++ b/crates/arroyo-storage/src/aws.rs @@ -1,10 +1,13 @@ use crate::StorageError; use aws_config::BehaviorVersion; -use aws_credential_types::provider::ProvideCredentials; -use object_store::{aws::AwsCredential, CredentialProvider}; +use aws_credential_types::provider::{ProvideCredentials, SharedCredentialsProvider}; +use object_store::{aws::AwsCredential, CredentialProvider, TemporaryToken, TokenCache}; +use std::error::Error; use std::sync::Arc; +use std::time::{Duration, Instant}; pub struct ArroyoCredentialProvider { + cache: TokenCache>, provider: aws_credential_types::provider::SharedCredentialsProvider, } @@ -28,6 +31,7 @@ impl ArroyoCredentialProvider { .clone(); Ok(Self { + cache: TokenCache::default().with_min_ttl(Duration::from_secs(60)), provider: credentials, }) } @@ -41,21 +45,42 @@ impl ArroyoCredentialProvider { } } +async fn get_token( + provider: &SharedCredentialsProvider, +) -> Result>, Box> { + let creds = provider + .provide_credentials() + .await + .map_err(|e| object_store::Error::Generic { + store: "S3", + source: Box::new(e), + })?; + + let expiry = creds + .expiry() + .map(|exp| Instant::now() + exp.elapsed().unwrap_or_default()); + + Ok(TemporaryToken { + token: Arc::new(AwsCredential { + key_id: creds.access_key_id().to_string(), + secret_key: creds.secret_access_key().to_string(), + token: creds.session_token().map(ToString::to_string), + }), + expiry, + }) +} + #[async_trait::async_trait] impl CredentialProvider for ArroyoCredentialProvider { type Credential = AwsCredential; async fn get_credential(&self) -> object_store::Result> { - let creds = self.provider.provide_credentials().await.map_err(|e| { - object_store::Error::Generic { + self.cache + .get_or_insert_with(|| get_token(&self.provider)) + .await + .map_err(|e| object_store::Error::Generic { store: "S3", - source: Box::new(e), - } - })?; - Ok(Arc::new(AwsCredential { - key_id: creds.access_key_id().to_string(), - secret_key: creds.secret_access_key().to_string(), - token: creds.session_token().map(ToString::to_string), - })) + source: e, + }) } }