Skip to content

Commit

Permalink
Changes following review
Browse files Browse the repository at this point in the history
  • Loading branch information
George Danezis committed Mar 10, 2022
1 parent 601ae06 commit df9980f
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 18 deletions.
14 changes: 7 additions & 7 deletions sui_core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,14 @@ impl<const ALL_OBJ_VER: bool> SuiDataStore<ALL_OBJ_VER> {
}
}

pub fn next_sequence_number(&self) -> TxSequenceNumber {
self.executed_sequence
pub fn next_sequence_number(&self) -> Result<TxSequenceNumber, SuiError> {
Ok(self
.executed_sequence
.iter()
.skip_prior_to(&TxSequenceNumber::MAX)
.expect("Error reading table.")
.skip_prior_to(&TxSequenceNumber::MAX)?
.next()
.map(|(v, _)| v + 1u64)
.unwrap_or(0)
.unwrap_or(0))
}

/// A function that acquires all locks associated with the objects (in order to avoid deadlocks).
Expand Down Expand Up @@ -456,7 +456,7 @@ impl<const ALL_OBJ_VER: bool> SuiDataStore<ALL_OBJ_VER> {
mut write_batch: DBBatch,
temporary_store: AuthorityTemporaryStore,
transaction_digest: TransactionDigest,
should_sequence: Option<TxSequenceNumber>,
seq_opt: Option<TxSequenceNumber>,
) -> Result<(), SuiError> {
let (objects, active_inputs, written, deleted, _events) = temporary_store.into_inner();

Expand Down Expand Up @@ -570,7 +570,7 @@ impl<const ALL_OBJ_VER: bool> SuiDataStore<ALL_OBJ_VER> {
object_lock.ok_or(SuiError::TransactionLockDoesNotExist)?;
}

if let Some(next_seq) = should_sequence {
if let Some(next_seq) = seq_opt {
// Now we are sure we are going to execute, add to the sequence
// number and insert into authority sequence.
//
Expand Down
6 changes: 3 additions & 3 deletions sui_core/src/authority_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ impl BatchManager {
pub fn new(
db: Arc<AuthorityStore>,
capacity: usize,
) -> (BatchSender, BatchManager, BroadcastPair) {
) -> Result<(BatchSender, BatchManager, BroadcastPair), SuiError> {
let (tx_send, tx_recv) = unbounded_channel();
let (tx_broadcast, rx_broadcast) = tokio::sync::broadcast::channel(capacity);
let latest_sequence_number = db.next_sequence_number();
let latest_sequence_number = db.next_sequence_number()?;
let sender = BatchSender {
autoinc: AutoIncSender::new(tx_send, latest_sequence_number),
};
Expand All @@ -85,7 +85,7 @@ impl BatchManager {
db,
};

(sender, manager, (tx_broadcast, rx_broadcast))
Ok((sender, manager, (tx_broadcast, rx_broadcast)))
}

/// Starts the manager service / tokio task
Expand Down
2 changes: 1 addition & 1 deletion sui_core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl AuthorityServer {
) -> Result<tokio::task::JoinHandle<()>, SuiError> {
// Start the batching subsystem, and register the handles with the authority.
let (tx_sender, manager, (batch_sender, _batch_receiver)) =
BatchManager::new(self.state.db(), 1000);
BatchManager::new(self.state.db(), 1000)?;

let _batch_join_handle = manager
.start_service(
Expand Down
21 changes: 14 additions & 7 deletions sui_core/src/unit_tests/batch_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ async fn test_open_manager() {
let store = Arc::new(AuthorityStore::open(&path, Some(opts)));

// TEST 1: init from an empty database should return to a zero block
let (_send, manager, _pair) = BatchManager::new(store.clone(), 100);
let (_send, manager, _pair) =
BatchManager::new(store.clone(), 100).expect("Problem opening batch manager");
let last_block = manager
.init_from_database(address, key_pair.clone())
.await
Expand All @@ -51,7 +52,8 @@ async fn test_open_manager() {
opts.set_max_open_files(max_files_authority_tests());
let store = Arc::new(AuthorityStore::open(&path, Some(opts)));

let (_send, manager, _pair) = BatchManager::new(store.clone(), 100);
let (_send, manager, _pair) =
BatchManager::new(store.clone(), 100).expect("Problem opening batch manager");
let last_block = manager
.init_from_database(address, key_pair.clone())
.await
Expand All @@ -72,7 +74,8 @@ async fn test_open_manager() {
opts.set_max_open_files(max_files_authority_tests());
let store = Arc::new(AuthorityStore::open(&path, Some(opts)));

let (_send, manager, _pair) = BatchManager::new(store.clone(), 100);
let (_send, manager, _pair) =
BatchManager::new(store.clone(), 100).expect("Problem opening batch manager");
let last_block = manager
.init_from_database(address, key_pair.clone())
.await
Expand Down Expand Up @@ -104,7 +107,8 @@ async fn test_batch_manager_happy_path() {
let address = *key_pair.public_key_bytes();

// TEST 1: init from an empty database should return to a zero block
let (_send, manager, _pair) = BatchManager::new(store.clone(), 100);
let (_send, manager, _pair) =
BatchManager::new(store.clone(), 100).expect("Problem opening batch manager");
let _join = manager
.start_service(address, key_pair, 1000, Duration::from_millis(500))
.await
Expand Down Expand Up @@ -159,7 +163,8 @@ async fn test_batch_manager_out_of_order() {
let address = *key_pair.public_key_bytes();

// TEST 1: init from an empty database should return to a zero block
let (_send, manager, _pair) = BatchManager::new(store.clone(), 100);
let (_send, manager, _pair) =
BatchManager::new(store.clone(), 100).expect("Problem opening batch manager");
let _join = manager
.start_service(address, key_pair, 4, Duration::from_millis(5000))
.await
Expand Down Expand Up @@ -222,7 +227,8 @@ async fn test_handle_move_order_with_batch() {
let mut authority_state = init_state_with_objects(vec![gas_payment_object]).await;

// Create a listening infrastrucure.
let (_send, manager, _pair) = BatchManager::new(authority_state.db(), 100);
let (_send, manager, _pair) =
BatchManager::new(authority_state.db(), 100).expect("Problem opening batch manager");
let _join = manager
.start_service(
authority_state.name,
Expand Down Expand Up @@ -283,7 +289,8 @@ async fn test_batch_store_retrieval() {
let address = *key_pair.public_key_bytes();

// TEST 1: init from an empty database should return to a zero block
let (_send, manager, _pair) = BatchManager::new(store.clone(), 100);
let (_send, manager, _pair) =
BatchManager::new(store.clone(), 100).expect("Problem opening batch manager");
let _join = manager
.start_service(address, key_pair, 10, Duration::from_secs(60))
.await
Expand Down

0 comments on commit df9980f

Please sign in to comment.