Skip to content

Commit

Permalink
[kv store] retry unprocessed items (#14453)
Browse files Browse the repository at this point in the history
  • Loading branch information
phoenix-o authored Oct 26, 2023
1 parent b9f12a8 commit 141e83c
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-kvstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ aws-config.workspace = true
aws-sdk-dynamodb.workspace = true
aws-sdk-s3.workspace = true
async-trait.workspace = true
backoff.workspace = true
base64-url.workspace = true
tokio = { workspace = true, features = ["full"] }
anyhow = { workspace = true, features = ["backtrace"] }
Expand Down
23 changes: 20 additions & 3 deletions crates/sui-kvstore/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ use aws_sdk_dynamodb::config::{Credentials, Region};
use aws_sdk_dynamodb::primitives::Blob;
use aws_sdk_dynamodb::types::{AttributeValue, PutRequest, WriteRequest};
use aws_sdk_s3 as s3;
use backoff::backoff::Backoff;
use backoff::ExponentialBackoff;
use serde::Serialize;
use std::borrow::Borrow;
use std::collections::{HashMap, HashSet};
use std::collections::{HashMap, HashSet, VecDeque};
use sui_config::node::TransactionKeyValueStoreWriteConfig;

#[derive(Hash, Eq, PartialEq, Debug, Copy, Clone)]
Expand Down Expand Up @@ -126,15 +128,30 @@ impl KVWriteClient for DynamoDbClient {
if items.is_empty() {
return Ok(());
}
for chunk in items.chunks(25) {
self.dynamo_client
let mut backoff = ExponentialBackoff::default();
let mut queue: VecDeque<Vec<_>> = items.chunks(25).map(|ck| ck.to_vec()).collect();
while let Some(chunk) = queue.pop_front() {
let response = self
.dynamo_client
.batch_write_item()
.set_request_items(Some(HashMap::from([(
self.table_name.clone(),
chunk.to_vec(),
)])))
.send()
.await?;
if let Some(response) = response.unprocessed_items {
if let Some(unprocessed) = response.into_iter().next() {
if !unprocessed.1.is_empty() {
if queue.is_empty() {
if let Some(duration) = backoff.next_backoff() {
tokio::time::sleep(duration).await;
}
}
queue.push_back(unprocessed.1);
}
}
}
}
Ok(())
}
Expand Down

1 comment on commit 141e83c

@vercel
Copy link

@vercel vercel bot commented on 141e83c Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.