Skip to content

Commit

Permalink
Buffer flushes periodically (#54)
Browse files Browse the repository at this point in the history
* Removed unneeded lifetimes and Clone

* Moved client out of Config struct

* Changed to StreamClient

* Buffer ticker
  • Loading branch information
gr211 authored Apr 24, 2024
1 parent aeb1027 commit 367ade3
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 95 deletions.
52 changes: 31 additions & 21 deletions src/aws.rs
Original file line number Diff line number Diff line change
@@ -1,63 +1,73 @@
pub mod client {
use anyhow::Result;
pub mod stream {
use async_trait::async_trait;
use aws_config::meta::region::RegionProviderChain;
use aws_config::retry::RetryConfig;
use aws_config::BehaviorVersion;
use aws_sdk_kinesis::config::Region;
use aws_sdk_kinesis::error::SdkError;
use aws_config::Region;
use aws_sdk_kinesis::operation::get_records::GetRecordsOutput;
use aws_sdk_kinesis::operation::get_shard_iterator::GetShardIteratorOutput;
use aws_sdk_kinesis::operation::list_shards::ListShardsOutput;
use aws_sdk_kinesis::primitives::DateTime;
use aws_sdk_kinesis::types::ShardIteratorType;
use aws_sdk_kinesis::Client;
use chrono::Utc;

#[derive(Clone, Debug)]
pub struct AwsKinesisClient {
client: Client,
}

#[async_trait]
pub trait KinesisClient: Sync + Send + Clone {
pub trait StreamClient: Sync + Send {
async fn list_shards(
&self,
stream: &str,
next_token: Option<&str>,
) -> Result<ListShardsOutput>;
) -> anyhow::Result<ListShardsOutput>;

async fn get_records(&self, shard_iterator: &str) -> Result<GetRecordsOutput>;
async fn get_records(&self, shard_iterator: &str) -> anyhow::Result<GetRecordsOutput>;

async fn get_shard_iterator_at_timestamp(
&self,
stream: &str,
shard_id: &str,
timestamp: &chrono::DateTime<Utc>,
) -> Result<GetShardIteratorOutput>;
) -> anyhow::Result<GetShardIteratorOutput>;

async fn get_shard_iterator_at_sequence(
&self,
stream: &str,
shard_id: &str,
starting_sequence_number: &str,
) -> Result<GetShardIteratorOutput>;
) -> anyhow::Result<GetShardIteratorOutput>;

async fn get_shard_iterator_latest(
&self,
stream: &str,
shard_id: &str,
) -> Result<GetShardIteratorOutput>;
) -> anyhow::Result<GetShardIteratorOutput>;

fn get_region(&self) -> Option<&Region>;

fn aws_datetime(timestamp: &chrono::DateTime<Utc>) -> DateTime {
DateTime::from_millis(timestamp.timestamp_millis())
}
}
}

pub mod client {
use crate::aws::stream::StreamClient;
use anyhow::Result;
use async_trait::async_trait;
use aws_config::meta::region::RegionProviderChain;
use aws_config::retry::RetryConfig;
use aws_config::BehaviorVersion;
use aws_sdk_kinesis::config::Region;
use aws_sdk_kinesis::error::SdkError;
use aws_sdk_kinesis::operation::get_records::GetRecordsOutput;
use aws_sdk_kinesis::operation::get_shard_iterator::GetShardIteratorOutput;
use aws_sdk_kinesis::operation::list_shards::ListShardsOutput;
use aws_sdk_kinesis::types::ShardIteratorType;
use aws_sdk_kinesis::Client;
use chrono::Utc;

#[derive(Clone, Debug)]
pub struct AwsKinesisClient {
client: Client,
}

#[async_trait]
impl KinesisClient for AwsKinesisClient {
impl StreamClient for AwsKinesisClient {
async fn list_shards(
&self,
stream: &str,
Expand Down
64 changes: 37 additions & 27 deletions src/iterator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::aws::client::KinesisClient;
use crate::aws::stream::StreamClient;
use crate::kinesis::models::ShardProcessorConfig;
use anyhow::Result;
use async_trait::async_trait;
Expand All @@ -7,59 +7,70 @@ use chrono::Utc;

#[async_trait]
pub trait ShardIterator {
async fn iterator<'a>(&'a self) -> Result<GetShardIteratorOutput>;
async fn iterator(&self) -> Result<GetShardIteratorOutput>;
}

pub fn latest<K: KinesisClient>(config: &ShardProcessorConfig<K>) -> LatestShardIterator<'_, K> {
LatestShardIterator { config }
pub fn latest<'a, K: StreamClient>(
client: &'a K,
config: &'a ShardProcessorConfig,
) -> LatestShardIterator<'a, K> {
LatestShardIterator { client, config }
}

pub fn at_sequence<'a, K: KinesisClient>(
config: &'a ShardProcessorConfig<K>,
pub fn at_sequence<'a, K: StreamClient>(
client: &'a K,
config: &'a ShardProcessorConfig,
starting_sequence_number: &'a str,
) -> AtSequenceShardIterator<'a, K> {
AtSequenceShardIterator {
client,
config,
starting_sequence_number,
}
}

pub fn at_timestamp<'a, K: KinesisClient>(
config: &'a ShardProcessorConfig<K>,
pub fn at_timestamp<'a, K: StreamClient>(
client: &'a K,
config: &'a ShardProcessorConfig,
timestamp: &'a chrono::DateTime<Utc>,
) -> AtTimestampShardIterator<'a, K> {
AtTimestampShardIterator { config, timestamp }
AtTimestampShardIterator {
client,
config,
timestamp,
}
}

pub struct LatestShardIterator<'a, K: KinesisClient> {
config: &'a ShardProcessorConfig<K>,
pub struct LatestShardIterator<'a, K: StreamClient> {
client: &'a K,
config: &'a ShardProcessorConfig,
}

pub struct AtSequenceShardIterator<'a, K: KinesisClient> {
config: &'a ShardProcessorConfig<K>,
pub struct AtSequenceShardIterator<'a, K: StreamClient> {
client: &'a K,
config: &'a ShardProcessorConfig,
starting_sequence_number: &'a str,
}

pub struct AtTimestampShardIterator<'a, K: KinesisClient> {
config: &'a ShardProcessorConfig<K>,
pub struct AtTimestampShardIterator<'a, K: StreamClient> {
client: &'a K,
config: &'a ShardProcessorConfig,
timestamp: &'a chrono::DateTime<Utc>,
}

#[async_trait]
impl<K: KinesisClient> ShardIterator for LatestShardIterator<'_, K> {
async fn iterator<'a>(&'a self) -> Result<GetShardIteratorOutput> {
self.config
.client
impl<K: StreamClient> ShardIterator for LatestShardIterator<'_, K> {
async fn iterator(&self) -> Result<GetShardIteratorOutput> {
self.client
.get_shard_iterator_latest(&self.config.stream, &self.config.shard_id)
.await
}
}

#[async_trait]
impl<K: KinesisClient> ShardIterator for AtSequenceShardIterator<'_, K> {
async fn iterator<'a>(&'a self) -> Result<GetShardIteratorOutput> {
self.config
.client
impl<K: StreamClient> ShardIterator for AtSequenceShardIterator<'_, K> {
async fn iterator(&self) -> Result<GetShardIteratorOutput> {
self.client
.get_shard_iterator_at_sequence(
&self.config.stream,
&self.config.shard_id,
Expand All @@ -70,10 +81,9 @@ impl<K: KinesisClient> ShardIterator for AtSequenceShardIterator<'_, K> {
}

#[async_trait]
impl<K: KinesisClient> ShardIterator for AtTimestampShardIterator<'_, K> {
async fn iterator<'a>(&'a self) -> Result<GetShardIteratorOutput> {
self.config
.client
impl<K: StreamClient> ShardIterator for AtTimestampShardIterator<'_, K> {
async fn iterator(&self) -> Result<GetShardIteratorOutput> {
self.client
.get_shard_iterator_at_timestamp(
&self.config.stream,
&self.config.shard_id,
Expand Down
16 changes: 9 additions & 7 deletions src/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::sync::mpsc::Sender;
use tokio::time::{sleep, Duration};
use GetRecordsError::{ExpiredIteratorException, ProvisionedThroughputExceededException};

use crate::aws::client::KinesisClient;
use crate::aws::stream::StreamClient;
use crate::kinesis::helpers::wait_milliseconds;
use crate::kinesis::models::*;
use crate::kinesis::ticker::{ShardCountUpdate, TickerMessage};
Expand All @@ -21,16 +21,18 @@ pub mod models;
pub mod ticker;

#[async_trait]
pub trait IteratorProvider<K: KinesisClient>: Send + Sync + Clone {
fn get_config(&self) -> &ShardProcessorConfig<K>;
pub trait IteratorProvider<K: StreamClient>: Send + Sync {
fn get_client(&self) -> &K;

fn get_config(&self) -> &ShardProcessorConfig;

async fn get_iterator(&self) -> Result<GetShardIteratorOutput>;
}

#[async_trait]
impl<T, K> ShardProcessor<K> for T
where
K: KinesisClient,
K: StreamClient,
T: IteratorProvider<K>,
{
async fn run(&self) -> Result<()> {
Expand Down Expand Up @@ -66,7 +68,7 @@ where
);
helpers::handle_iterator_refresh(
res_clone.clone(),
self.clone(),
self,
tx_shard_iterator_progress.clone(),
)
.await
Expand All @@ -81,7 +83,7 @@ where
sleep(Duration::from_millis(milliseconds)).await;
helpers::handle_iterator_refresh(
res_clone.clone(),
self.clone(),
self,
tx_shard_iterator_progress.clone(),
)
.await
Expand Down Expand Up @@ -170,7 +172,7 @@ where
shard_iterator: &str,
tx_shard_iterator_progress: Sender<ShardIteratorProgress>,
) -> Result<()> {
let resp = self.get_config().client.get_records(shard_iterator).await?;
let resp = self.get_client().get_records(shard_iterator).await?;
let tx_ticker_updates = &self.get_config().tx_ticker_updates;

let next_shard_iterator = resp.next_shard_iterator();
Expand Down
38 changes: 24 additions & 14 deletions src/kinesis/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use tokio::sync::mpsc::Sender;
use tokio::sync::Semaphore;
use tokio::time::sleep;

use crate::aws::client::{AwsKinesisClient, KinesisClient};
use crate::aws::client::AwsKinesisClient;
use crate::aws::stream::StreamClient;
use crate::iterator::at_sequence;
use crate::iterator::latest;
use crate::iterator::ShardIterator;
Expand All @@ -40,8 +41,8 @@ pub fn new(

match from_datetime {
Some(from_datetime) => Box::new(ShardProcessorAtTimestamp {
client,
config: ShardProcessorConfig {
client,
stream,
shard_id: Arc::new(shard_id),
to_datetime,
Expand All @@ -52,8 +53,8 @@ pub fn new(
from_datetime,
}),
None => Box::new(ShardProcessorLatest {
client,
config: ShardProcessorConfig {
client,
stream,
shard_id: Arc::new(shard_id),
to_datetime,
Expand All @@ -65,30 +66,39 @@ pub fn new(
}
}

pub async fn get_latest_iterator<T, K: KinesisClient>(
iterator_provider: T,
pub async fn get_latest_iterator<T, K: StreamClient>(
iterator_provider: &T,
) -> Result<GetShardIteratorOutput>
where
T: IteratorProvider<K>,
{
latest(iterator_provider.get_config()).iterator().await
latest(
iterator_provider.get_client(),
iterator_provider.get_config(),
)
.iterator()
.await
}

pub async fn get_iterator_since<T, K: KinesisClient>(
iterator_provider: T,
pub async fn get_iterator_since<T, K: StreamClient>(
iterator_provider: &T,
starting_sequence_number: &str,
) -> Result<GetShardIteratorOutput>
where
T: IteratorProvider<K>,
{
at_sequence(iterator_provider.get_config(), starting_sequence_number)
.iterator()
.await
at_sequence(
iterator_provider.get_client(),
iterator_provider.get_config(),
starting_sequence_number,
)
.iterator()
.await
}

pub async fn handle_iterator_refresh<T, K: KinesisClient>(
pub async fn handle_iterator_refresh<T, K: StreamClient>(
shard_iterator_progress: ShardIteratorProgress,
iterator_provider: T,
iterator_provider: &T,
tx_shard_iterator_progress: Sender<ShardIteratorProgress>,
) -> Result<()>
where
Expand All @@ -97,7 +107,7 @@ where
let cloned_shard_iterator_progress = shard_iterator_progress.clone();

let result = match shard_iterator_progress.last_sequence_id {
Some(last_sequence_id) => get_iterator_since(iterator_provider.clone(), &last_sequence_id)
Some(last_sequence_id) => get_iterator_since(iterator_provider, &last_sequence_id)
.await
.map(|output| {
(
Expand Down
Loading

0 comments on commit 367ade3

Please sign in to comment.