From 141e83c6fda193a8a127ec9c571c8eb0c64c40c6 Mon Sep 17 00:00:00 2001 From: phoenix <51927076+phoenix-o@users.noreply.github.com> Date: Thu, 26 Oct 2023 13:44:23 -0400 Subject: [PATCH] [kv store] retry unprocessed items (#14453) --- Cargo.lock | 1 + crates/sui-kvstore/Cargo.toml | 1 + crates/sui-kvstore/src/client.rs | 23 ++++++++++++++++++++--- 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 785f2ffdf252b..0a7e92773b1e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10901,6 +10901,7 @@ dependencies = [ "aws-config", "aws-sdk-dynamodb", "aws-sdk-s3", + "backoff", "base64-url", "bcs", "mysten-metrics", diff --git a/crates/sui-kvstore/Cargo.toml b/crates/sui-kvstore/Cargo.toml index a529b2eab20cd..f217a79fcecae 100644 --- a/crates/sui-kvstore/Cargo.toml +++ b/crates/sui-kvstore/Cargo.toml @@ -11,6 +11,7 @@ aws-config.workspace = true aws-sdk-dynamodb.workspace = true aws-sdk-s3.workspace = true async-trait.workspace = true +backoff.workspace = true base64-url.workspace = true tokio = { workspace = true, features = ["full"] } anyhow = { workspace = true, features = ["backtrace"] } diff --git a/crates/sui-kvstore/src/client.rs b/crates/sui-kvstore/src/client.rs index 2f662529079b2..e3d106f5df3ae 100644 --- a/crates/sui-kvstore/src/client.rs +++ b/crates/sui-kvstore/src/client.rs @@ -7,9 +7,11 @@ use aws_sdk_dynamodb::config::{Credentials, Region}; use aws_sdk_dynamodb::primitives::Blob; use aws_sdk_dynamodb::types::{AttributeValue, PutRequest, WriteRequest}; use aws_sdk_s3 as s3; +use backoff::backoff::Backoff; +use backoff::ExponentialBackoff; use serde::Serialize; use std::borrow::Borrow; -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap, HashSet, VecDeque}; use sui_config::node::TransactionKeyValueStoreWriteConfig; #[derive(Hash, Eq, PartialEq, Debug, Copy, Clone)] @@ -126,8 +128,11 @@ impl KVWriteClient for DynamoDbClient { if items.is_empty() { return Ok(()); } - for chunk in items.chunks(25) { - self.dynamo_client + let mut backoff = ExponentialBackoff::default(); + let mut queue: VecDeque> = items.chunks(25).map(|ck| ck.to_vec()).collect(); + while let Some(chunk) = queue.pop_front() { + let response = self + .dynamo_client .batch_write_item() .set_request_items(Some(HashMap::from([( self.table_name.clone(), @@ -135,6 +140,18 @@ impl KVWriteClient for DynamoDbClient { )]))) .send() .await?; + if let Some(response) = response.unprocessed_items { + if let Some(unprocessed) = response.into_iter().next() { + if !unprocessed.1.is_empty() { + if queue.is_empty() { + if let Some(duration) = backoff.next_backoff() { + tokio::time::sleep(duration).await; + } + } + queue.push_back(unprocessed.1); + } + } + } } Ok(()) }