From 367ade38eaeedbc01505a03778bf6f843ae38f08 Mon Sep 17 00:00:00 2001 From: Romain Gallet Date: Wed, 24 Apr 2024 22:09:19 +0200 Subject: [PATCH] Buffer flushes periodically (#54) * Removed unneeded lifetimes and Clone * Moved client out of Config struct * Changed to StreamClient * Buffer ticker --- src/aws.rs | 52 +++++++++++++++++++------------- src/iterator.rs | 64 +++++++++++++++++++++++----------------- src/kinesis.rs | 16 +++++----- src/kinesis/helpers.rs | 38 +++++++++++++++--------- src/kinesis/models.rs | 38 +++++++++++++++--------- src/kinesis/tests.rs | 22 +++++++------- src/main.rs | 2 +- src/sink.rs | 11 +++++++ src/sink/buffer_flush.rs | 37 +++++++++++++++++++++++ 9 files changed, 185 insertions(+), 95 deletions(-) create mode 100644 src/sink/buffer_flush.rs diff --git a/src/aws.rs b/src/aws.rs index e12a042..a6a16dc 100644 --- a/src/aws.rs +++ b/src/aws.rs @@ -1,53 +1,41 @@ -pub mod client { - use anyhow::Result; +pub mod stream { use async_trait::async_trait; - use aws_config::meta::region::RegionProviderChain; - use aws_config::retry::RetryConfig; - use aws_config::BehaviorVersion; - use aws_sdk_kinesis::config::Region; - use aws_sdk_kinesis::error::SdkError; + use aws_config::Region; use aws_sdk_kinesis::operation::get_records::GetRecordsOutput; use aws_sdk_kinesis::operation::get_shard_iterator::GetShardIteratorOutput; use aws_sdk_kinesis::operation::list_shards::ListShardsOutput; use aws_sdk_kinesis::primitives::DateTime; - use aws_sdk_kinesis::types::ShardIteratorType; - use aws_sdk_kinesis::Client; use chrono::Utc; - #[derive(Clone, Debug)] - pub struct AwsKinesisClient { - client: Client, - } - #[async_trait] - pub trait KinesisClient: Sync + Send + Clone { + pub trait StreamClient: Sync + Send { async fn list_shards( &self, stream: &str, next_token: Option<&str>, - ) -> Result; + ) -> anyhow::Result; - async fn get_records(&self, shard_iterator: &str) -> Result; + async fn get_records(&self, shard_iterator: &str) -> anyhow::Result; async fn get_shard_iterator_at_timestamp( &self, stream: &str, shard_id: &str, timestamp: &chrono::DateTime, - ) -> Result; + ) -> anyhow::Result; async fn get_shard_iterator_at_sequence( &self, stream: &str, shard_id: &str, starting_sequence_number: &str, - ) -> Result; + ) -> anyhow::Result; async fn get_shard_iterator_latest( &self, stream: &str, shard_id: &str, - ) -> Result; + ) -> anyhow::Result; fn get_region(&self) -> Option<&Region>; @@ -55,9 +43,31 @@ pub mod client { DateTime::from_millis(timestamp.timestamp_millis()) } } +} + +pub mod client { + use crate::aws::stream::StreamClient; + use anyhow::Result; + use async_trait::async_trait; + use aws_config::meta::region::RegionProviderChain; + use aws_config::retry::RetryConfig; + use aws_config::BehaviorVersion; + use aws_sdk_kinesis::config::Region; + use aws_sdk_kinesis::error::SdkError; + use aws_sdk_kinesis::operation::get_records::GetRecordsOutput; + use aws_sdk_kinesis::operation::get_shard_iterator::GetShardIteratorOutput; + use aws_sdk_kinesis::operation::list_shards::ListShardsOutput; + use aws_sdk_kinesis::types::ShardIteratorType; + use aws_sdk_kinesis::Client; + use chrono::Utc; + + #[derive(Clone, Debug)] + pub struct AwsKinesisClient { + client: Client, + } #[async_trait] - impl KinesisClient for AwsKinesisClient { + impl StreamClient for AwsKinesisClient { async fn list_shards( &self, stream: &str, diff --git a/src/iterator.rs b/src/iterator.rs index ecaec0d..3b3ac29 100644 --- a/src/iterator.rs +++ b/src/iterator.rs @@ -1,4 +1,4 @@ -use crate::aws::client::KinesisClient; +use crate::aws::stream::StreamClient; use crate::kinesis::models::ShardProcessorConfig; use anyhow::Result; use async_trait::async_trait; @@ -7,59 +7,70 @@ use chrono::Utc; #[async_trait] pub trait ShardIterator { - async fn iterator<'a>(&'a self) -> Result; + async fn iterator(&self) -> Result; } -pub fn latest(config: &ShardProcessorConfig) -> LatestShardIterator<'_, K> { - LatestShardIterator { config } +pub fn latest<'a, K: StreamClient>( + client: &'a K, + config: &'a ShardProcessorConfig, +) -> LatestShardIterator<'a, K> { + LatestShardIterator { client, config } } -pub fn at_sequence<'a, K: KinesisClient>( - config: &'a ShardProcessorConfig, +pub fn at_sequence<'a, K: StreamClient>( + client: &'a K, + config: &'a ShardProcessorConfig, starting_sequence_number: &'a str, ) -> AtSequenceShardIterator<'a, K> { AtSequenceShardIterator { + client, config, starting_sequence_number, } } -pub fn at_timestamp<'a, K: KinesisClient>( - config: &'a ShardProcessorConfig, +pub fn at_timestamp<'a, K: StreamClient>( + client: &'a K, + config: &'a ShardProcessorConfig, timestamp: &'a chrono::DateTime, ) -> AtTimestampShardIterator<'a, K> { - AtTimestampShardIterator { config, timestamp } + AtTimestampShardIterator { + client, + config, + timestamp, + } } -pub struct LatestShardIterator<'a, K: KinesisClient> { - config: &'a ShardProcessorConfig, +pub struct LatestShardIterator<'a, K: StreamClient> { + client: &'a K, + config: &'a ShardProcessorConfig, } -pub struct AtSequenceShardIterator<'a, K: KinesisClient> { - config: &'a ShardProcessorConfig, +pub struct AtSequenceShardIterator<'a, K: StreamClient> { + client: &'a K, + config: &'a ShardProcessorConfig, starting_sequence_number: &'a str, } -pub struct AtTimestampShardIterator<'a, K: KinesisClient> { - config: &'a ShardProcessorConfig, +pub struct AtTimestampShardIterator<'a, K: StreamClient> { + client: &'a K, + config: &'a ShardProcessorConfig, timestamp: &'a chrono::DateTime, } #[async_trait] -impl ShardIterator for LatestShardIterator<'_, K> { - async fn iterator<'a>(&'a self) -> Result { - self.config - .client +impl ShardIterator for LatestShardIterator<'_, K> { + async fn iterator(&self) -> Result { + self.client .get_shard_iterator_latest(&self.config.stream, &self.config.shard_id) .await } } #[async_trait] -impl ShardIterator for AtSequenceShardIterator<'_, K> { - async fn iterator<'a>(&'a self) -> Result { - self.config - .client +impl ShardIterator for AtSequenceShardIterator<'_, K> { + async fn iterator(&self) -> Result { + self.client .get_shard_iterator_at_sequence( &self.config.stream, &self.config.shard_id, @@ -70,10 +81,9 @@ impl ShardIterator for AtSequenceShardIterator<'_, K> { } #[async_trait] -impl ShardIterator for AtTimestampShardIterator<'_, K> { - async fn iterator<'a>(&'a self) -> Result { - self.config - .client +impl ShardIterator for AtTimestampShardIterator<'_, K> { + async fn iterator(&self) -> Result { + self.client .get_shard_iterator_at_timestamp( &self.config.stream, &self.config.shard_id, diff --git a/src/kinesis.rs b/src/kinesis.rs index 46092dd..ffd5ac2 100644 --- a/src/kinesis.rs +++ b/src/kinesis.rs @@ -11,7 +11,7 @@ use tokio::sync::mpsc::Sender; use tokio::time::{sleep, Duration}; use GetRecordsError::{ExpiredIteratorException, ProvisionedThroughputExceededException}; -use crate::aws::client::KinesisClient; +use crate::aws::stream::StreamClient; use crate::kinesis::helpers::wait_milliseconds; use crate::kinesis::models::*; use crate::kinesis::ticker::{ShardCountUpdate, TickerMessage}; @@ -21,8 +21,10 @@ pub mod models; pub mod ticker; #[async_trait] -pub trait IteratorProvider: Send + Sync + Clone { - fn get_config(&self) -> &ShardProcessorConfig; +pub trait IteratorProvider: Send + Sync { + fn get_client(&self) -> &K; + + fn get_config(&self) -> &ShardProcessorConfig; async fn get_iterator(&self) -> Result; } @@ -30,7 +32,7 @@ pub trait IteratorProvider: Send + Sync + Clone { #[async_trait] impl ShardProcessor for T where - K: KinesisClient, + K: StreamClient, T: IteratorProvider, { async fn run(&self) -> Result<()> { @@ -66,7 +68,7 @@ where ); helpers::handle_iterator_refresh( res_clone.clone(), - self.clone(), + self, tx_shard_iterator_progress.clone(), ) .await @@ -81,7 +83,7 @@ where sleep(Duration::from_millis(milliseconds)).await; helpers::handle_iterator_refresh( res_clone.clone(), - self.clone(), + self, tx_shard_iterator_progress.clone(), ) .await @@ -170,7 +172,7 @@ where shard_iterator: &str, tx_shard_iterator_progress: Sender, ) -> Result<()> { - let resp = self.get_config().client.get_records(shard_iterator).await?; + let resp = self.get_client().get_records(shard_iterator).await?; let tx_ticker_updates = &self.get_config().tx_ticker_updates; let next_shard_iterator = resp.next_shard_iterator(); diff --git a/src/kinesis/helpers.rs b/src/kinesis/helpers.rs index c9fad7f..340893d 100644 --- a/src/kinesis/helpers.rs +++ b/src/kinesis/helpers.rs @@ -14,7 +14,8 @@ use tokio::sync::mpsc::Sender; use tokio::sync::Semaphore; use tokio::time::sleep; -use crate::aws::client::{AwsKinesisClient, KinesisClient}; +use crate::aws::client::AwsKinesisClient; +use crate::aws::stream::StreamClient; use crate::iterator::at_sequence; use crate::iterator::latest; use crate::iterator::ShardIterator; @@ -40,8 +41,8 @@ pub fn new( match from_datetime { Some(from_datetime) => Box::new(ShardProcessorAtTimestamp { + client, config: ShardProcessorConfig { - client, stream, shard_id: Arc::new(shard_id), to_datetime, @@ -52,8 +53,8 @@ pub fn new( from_datetime, }), None => Box::new(ShardProcessorLatest { + client, config: ShardProcessorConfig { - client, stream, shard_id: Arc::new(shard_id), to_datetime, @@ -65,30 +66,39 @@ pub fn new( } } -pub async fn get_latest_iterator( - iterator_provider: T, +pub async fn get_latest_iterator( + iterator_provider: &T, ) -> Result where T: IteratorProvider, { - latest(iterator_provider.get_config()).iterator().await + latest( + iterator_provider.get_client(), + iterator_provider.get_config(), + ) + .iterator() + .await } -pub async fn get_iterator_since( - iterator_provider: T, +pub async fn get_iterator_since( + iterator_provider: &T, starting_sequence_number: &str, ) -> Result where T: IteratorProvider, { - at_sequence(iterator_provider.get_config(), starting_sequence_number) - .iterator() - .await + at_sequence( + iterator_provider.get_client(), + iterator_provider.get_config(), + starting_sequence_number, + ) + .iterator() + .await } -pub async fn handle_iterator_refresh( +pub async fn handle_iterator_refresh( shard_iterator_progress: ShardIteratorProgress, - iterator_provider: T, + iterator_provider: &T, tx_shard_iterator_progress: Sender, ) -> Result<()> where @@ -97,7 +107,7 @@ where let cloned_shard_iterator_progress = shard_iterator_progress.clone(); let result = match shard_iterator_progress.last_sequence_id { - Some(last_sequence_id) => get_iterator_since(iterator_provider.clone(), &last_sequence_id) + Some(last_sequence_id) => get_iterator_since(iterator_provider, &last_sequence_id) .await .map(|output| { ( diff --git a/src/kinesis/models.rs b/src/kinesis/models.rs index 40a4e1b..53549d2 100644 --- a/src/kinesis/models.rs +++ b/src/kinesis/models.rs @@ -1,4 +1,4 @@ -use crate::aws::client::KinesisClient; +use crate::aws::stream::StreamClient; use crate::iterator::ShardIterator; use crate::iterator::{at_timestamp, latest}; use crate::kinesis::ticker::TickerMessage; @@ -24,6 +24,7 @@ pub struct ShardIteratorProgress { pub enum ShardProcessorADT { Termination, BeyondToTimestamp, + Flush, Progress(Vec), } @@ -45,8 +46,7 @@ pub struct RecordResult { } #[derive(Clone)] -pub struct ShardProcessorConfig { - pub client: K, +pub struct ShardProcessorConfig { pub stream: String, pub shard_id: Arc, pub to_datetime: Option>, @@ -56,42 +56,52 @@ pub struct ShardProcessorConfig { } #[derive(Clone)] -pub struct ShardProcessorLatest { - pub config: ShardProcessorConfig, +pub struct ShardProcessorLatest { + pub client: K, + pub config: ShardProcessorConfig, } #[derive(Clone)] -pub struct ShardProcessorAtTimestamp { - pub config: ShardProcessorConfig, +pub struct ShardProcessorAtTimestamp { + pub client: K, + pub config: ShardProcessorConfig, pub from_datetime: chrono::DateTime, } #[async_trait] -impl IteratorProvider for ShardProcessorLatest { - fn get_config(&self) -> &ShardProcessorConfig { +impl IteratorProvider for ShardProcessorLatest { + fn get_client(&self) -> &K { + &self.client + } + + fn get_config(&self) -> &ShardProcessorConfig { &self.config } async fn get_iterator(&self) -> Result { - latest(&self.config).iterator().await + latest(&self.client, &self.config).iterator().await } } #[async_trait] -impl IteratorProvider for ShardProcessorAtTimestamp { - fn get_config(&self) -> &ShardProcessorConfig { +impl IteratorProvider for ShardProcessorAtTimestamp { + fn get_client(&self) -> &K { + &self.client + } + + fn get_config(&self) -> &ShardProcessorConfig { &self.config } async fn get_iterator(&self) -> Result { - at_timestamp(&self.config, &self.from_datetime) + at_timestamp(&self.client, &self.config, &self.from_datetime) .iterator() .await } } #[async_trait] -pub trait ShardProcessor: Send + Sync { +pub trait ShardProcessor: Send + Sync { async fn run(&self) -> Result<()>; async fn seed_shards( diff --git a/src/kinesis/tests.rs b/src/kinesis/tests.rs index fad4f65..4d891e4 100644 --- a/src/kinesis/tests.rs +++ b/src/kinesis/tests.rs @@ -14,7 +14,7 @@ use chrono::prelude::*; use chrono::Utc; use tokio::sync::{mpsc, Semaphore}; -use crate::aws::client::KinesisClient; +use crate::aws::stream::StreamClient; use crate::kinesis::helpers; use crate::kinesis::helpers::wait_milliseconds; use crate::kinesis::models::{ @@ -39,8 +39,8 @@ async fn seed_shards_test() { let semaphore: Arc = Arc::new(Semaphore::new(10)); let processor = ShardProcessorLatest { + client, config: ShardProcessorConfig { - client, stream: "test".to_string(), shard_id: Arc::new("shardId-000000000000".to_string()), to_datetime: None, @@ -77,8 +77,8 @@ async fn seed_shards_test_timestamp_in_future() { let semaphore: Arc = Arc::new(Semaphore::new(10)); let processor = ShardProcessorAtTimestamp { + client, config: ShardProcessorConfig { - client, stream: "test".to_string(), shard_id: Arc::new("shardId-000000000000".to_string()), to_datetime: None, @@ -108,8 +108,8 @@ async fn produced_record_is_processed() { let semaphore: Arc = Arc::new(Semaphore::new(10)); let processor = ShardProcessorLatest { + client: client.clone(), config: ShardProcessorConfig { - client: client.clone(), stream: "test".to_string(), shard_id: Arc::new("shardId-000000000000".to_string()), to_datetime: None, @@ -157,8 +157,8 @@ async fn beyond_to_timestamp_is_received() { let to_datetime = Utc.with_ymd_and_hms(2020, 6, 1, 12, 0, 0).unwrap(); let processor = ShardProcessorLatest { + client, config: ShardProcessorConfig { - client, stream: "test".to_string(), shard_id: Arc::new("shardId-000000000000".to_string()), to_datetime: Some(to_datetime), @@ -199,8 +199,8 @@ async fn has_records_beyond_end_ts_when_has_end_ts() { let to_datetime = Utc.with_ymd_and_hms(2020, 6, 1, 12, 0, 0).unwrap(); let processor = ShardProcessorLatest { + client, config: ShardProcessorConfig { - client, stream: "test".to_string(), shard_id: Arc::new("shardId-000000000000".to_string()), to_datetime: Some(to_datetime), @@ -260,8 +260,8 @@ async fn has_records_beyond_end_ts_when_no_end_ts() { let semaphore: Arc = Arc::new(Semaphore::new(10)); let processor = ShardProcessorLatest { + client, config: ShardProcessorConfig { - client, stream: "test".to_string(), shard_id: Arc::new("shardId-000000000000".to_string()), to_datetime: None, @@ -305,8 +305,8 @@ async fn handle_iterator_refresh_ok() { }; let provider = ShardProcessorLatest { + client, config: ShardProcessorConfig { - client, stream: "test".to_string(), shard_id: Arc::new("shardId-000000000000".to_string()), to_datetime: None, @@ -321,7 +321,7 @@ async fn handle_iterator_refresh_ok() { helpers::handle_iterator_refresh( shard_iterator_progress, - provider, + &provider, tx_shard_iterator_progress, ) .await @@ -353,7 +353,7 @@ pub struct TestKinesisClient { } #[async_trait] -impl KinesisClient for TestKinesisClient { +impl StreamClient for TestKinesisClient { async fn list_shards( &self, _stream: &str, @@ -432,7 +432,7 @@ impl KinesisClient for TestKinesisClient { pub struct TestTimestampInFutureKinesisClient {} #[async_trait] -impl KinesisClient for TestTimestampInFutureKinesisClient { +impl StreamClient for TestTimestampInFutureKinesisClient { async fn list_shards( &self, _stream: &str, diff --git a/src/main.rs b/src/main.rs index df78cfe..23af412 100644 --- a/src/main.rs +++ b/src/main.rs @@ -126,7 +126,7 @@ async fn main() -> Result<()> { let semaphore = semaphore.clone(); let shard_processor = kinesis::helpers::new( - client.clone(), + client, stream_name, shard_id, from_datetime, diff --git a/src/sink.rs b/src/sink.rs index ec274b1..620a5b0 100644 --- a/src/sink.rs +++ b/src/sink.rs @@ -4,6 +4,7 @@ use std::io::{BufWriter, Write}; use anyhow::Error; use anyhow::Result; use async_trait::async_trait; +use buffer_flush::BufferTicker; use chrono::TimeZone; use log::{debug, error, warn}; use tokio::sync::mpsc::{Receiver, Sender}; @@ -113,6 +114,13 @@ where ) -> io::Result<()> { self.delimiter(handle).unwrap(); + /* + * Start the buffer ticker to flush the buffer every 5 second + * This is needed because if the buffer is not full (not enough message to trigger a nature flush), + * then no output is displayed until ctrl^c is pressed. + */ + BufferTicker::new(tx_records.clone()).start(); + let mut count = 0; let mut sc = self.shard_count(); @@ -165,6 +173,7 @@ where }); } }, + ShardProcessorADT::Flush => handle.flush().unwrap(), ShardProcessorADT::Termination => { debug!("Termination message received"); let messages_processed = count; @@ -299,6 +308,8 @@ where } } +mod buffer_flush; + #[cfg(test)] mod console_tests; diff --git a/src/sink/buffer_flush.rs b/src/sink/buffer_flush.rs new file mode 100644 index 0000000..b36f7f7 --- /dev/null +++ b/src/sink/buffer_flush.rs @@ -0,0 +1,37 @@ +use crate::kinesis::models::{ProcessError, ShardProcessorADT}; +use anyhow::Result; +use std::time::Duration; +use tokio::sync::mpsc::Sender; +use tokio::time::sleep; + +pub struct BufferTicker { + tx_records: Sender>, +} + +impl BufferTicker { + pub fn new(tx_records: Sender>) -> Self { + Self { tx_records } + } + + /** + * THis method will loop in the background and send a flush message to the + * shard Sink every 5 seconds. + */ + pub fn start(&self) { + let tx_records = self.tx_records.clone(); + + tokio::spawn({ + async move { + let delay = Duration::from_secs(5); + + loop { + sleep(delay).await; + tx_records + .send(Ok(ShardProcessorADT::Flush)) + .await + .expect("Count not send "); + } + } + }); + } +}