Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[authority] Batch crash robustness #714

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 25 additions & 13 deletions sui_core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ use temporary_store::AuthorityTemporaryStore;
mod authority_store;
pub use authority_store::AuthorityStore;

pub mod authority_autoinc_channel;

// based on https://github.com/diem/move/blob/62d48ce0d8f439faa83d05a4f5cd568d4bfcb325/language/tools/move-cli/src/sandbox/utils/mod.rs#L50
const MAX_GAS_BUDGET: u64 = 18446744073709551615 / 1000 - 1;
const MAX_ITEMS_LIMIT: u64 = 10_000;
Expand Down Expand Up @@ -98,6 +100,7 @@ impl AuthorityState {
if self.batch_channels.is_some() {
return Err(SuiError::AuthorityUpdateFailure);
}

self.batch_channels = Some((batch_sender, broadcast_sender));
Ok(())
}
Expand Down Expand Up @@ -441,17 +444,9 @@ impl AuthorityState {
unwrapped_object_ids,
);
// Update the database in an atomic manner
let (seq, resp) = self
.update_state(temporary_store, certificate, to_signed_effects)
self.update_state(temporary_store, certificate, to_signed_effects)
.instrument(tracing::debug_span!("db_update_state"))
.await?; // Returns the OrderInfoResponse

// If there is a notifier registered, notify:
if let Some((sender, _)) = &self.batch_channels {
sender.send_item(seq, transaction_digest).await?;
}

Ok(resp)
.await // Returns the OrderInfoResponse
}

fn execute_transaction(
Expand Down Expand Up @@ -842,15 +837,32 @@ impl AuthorityState {
.set_transaction_lock(mutable_input_objects, signed_transaction)
}

/// Update state and signals that a new transactions has been processed
/// to the batch maker service.
async fn update_state(
&self,
temporary_store: AuthorityTemporaryStore,

certificate: CertifiedTransaction,
signed_effects: SignedTransactionEffects,
) -> Result<(u64, TransactionInfoResponse), SuiError> {
self._database
.update_state(temporary_store, certificate, signed_effects)
) -> Result<TransactionInfoResponse, SuiError> {
let ticket_opt = self
.batch_channels
.as_ref()
.map(|(sender, _)| sender.ticket());
let seq_opt = ticket_opt.as_ref().map(|ticket| ticket.ticket());

let transaction_digest = signed_effects.effects.transaction_digest;
let response =
self._database
.update_state(temporary_store, certificate, signed_effects, seq_opt)?;

// If there is a notifier registered, notify:
if let Some(mut ticket) = ticket_opt {
ticket.send(transaction_digest);
}

Ok(response)
}

/// Get a read reference to an object/seq lock
Expand Down
163 changes: 163 additions & 0 deletions sui_core/src/authority/authority_autoinc_channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use super::*;

use parking_lot::Mutex;
use std::collections::HashMap;
use tokio::sync::mpsc::UnboundedSender;

pub struct AutoIncSenderInner<T> {
pub next_available_sequence_number: u64,
pub next_expected_sequence_number: u64,
pub sender: UnboundedSender<(u64, T)>,
pub waiting: HashMap<u64, Option<T>>,
}

impl<T> AutoIncSenderInner<T> {
pub fn send_all_waiting(&mut self) {
while let Some(item_opt) = self.waiting.remove(&self.next_expected_sequence_number) {
if let Some(item) = item_opt {
if let Err(_err) = self.sender.send((self.next_expected_sequence_number, item)) {
/*
An error here indicates the other side of the channel is closed.
There is not very much we can do, as if the batcher is closed we
will write to the DB and the recover when we recover.
*/

self.waiting.clear();
}
}
self.next_expected_sequence_number += 1;
}
}
Comment on lines +18 to +32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing we could do is at least post a log line if the size of this is past a threshold, and possibly turn that into a bigger warning if it's growing w/o removal.

}

/*
A wrapper around a channel sender that ensures items sent are associated with
integer tickets and sent in increasing ticket order. When a ticket is dropped
its ticket value is skipped and the subsequent tickets are sent.

If the receiver end of the channel is closed, the autoinc sender simply drops
all the items sent.
*/

#[derive(Clone)]
pub struct AutoIncSender<T>(pub Arc<Mutex<AutoIncSenderInner<T>>>);

impl<T> AutoIncSender<T> {
// Creates a new auto-incrementing sender
pub fn new(sender: UnboundedSender<(u64, T)>, next_sequence_number: u64) -> AutoIncSender<T> {
AutoIncSender(Arc::new(Mutex::new(AutoIncSenderInner {
// TODO: next_available_sequence_number could be an AtomicU64 instead.
next_available_sequence_number: next_sequence_number,
next_expected_sequence_number: next_sequence_number,
sender,
waiting: HashMap::new(),
})))
}

/// Creates a new ticket with the next available sequence number.
pub fn next_ticket(&self) -> Ticket<T> {
let ticket_number = {
// Keep the critical region as small as possible
let mut aic = self.0.lock();
let ticket_number_inner = aic.next_available_sequence_number;
aic.next_available_sequence_number += 1;
ticket_number_inner
};

Ticket {
autoinc_sender: self.0.clone(),
sequence_number: ticket_number,
sent: false,
}
}
}

/// A ticket represents a slot in the sequence to be sent in the channel
pub struct Ticket<T> {
autoinc_sender: Arc<Mutex<AutoIncSenderInner<T>>>,
sequence_number: u64,
sent: bool,
}

impl<T> Ticket<T>
where
T: std::fmt::Debug,
{
/// Send an item at that sequence in the channel.
pub fn send(&mut self, item: T) {
let mut aic = self.autoinc_sender.lock();
if aic.sender.is_closed() {
// To ensure we do not fill our memory
return;
}
aic.waiting.insert(self.sequence_number, Some(item));
self.sent = true;
aic.send_all_waiting();
}

/// Get the ticket sequence number
pub fn ticket(&self) -> u64 {
self.sequence_number
}
}

/// A custom drop that indicates that there may not be a item
/// associated with this sequence number,
impl<T> Drop for Ticket<T> {
fn drop(&mut self) {
if !self.sent {
let mut aic = self.autoinc_sender.lock();
if aic.sender.is_closed() {
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to unlock here before returning?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upon return the lock falls out of scope and the mutex is unlocked automatically -- this is the flip-side of RAII, dropping a resource frees it. Exactly in the same way as dropping the ticket does the cleanup.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Locks drop when they go out of scope.

}
aic.waiting.insert(self.sequence_number, None);
aic.send_all_waiting();
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_ticketing() {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let autoinc = AutoIncSender::new(tx, 10);

let mut t1 = autoinc.next_ticket();
let t2 = autoinc.next_ticket();
let t3 = autoinc.next_ticket();
let mut t4 = autoinc.next_ticket();

// Send a value out of order
t4.send(1010);

// Drop a ticket
drop(t2);

// Panic and lose a ticket in a task
let handle = tokio::spawn(async move {
let _inner = t3;
panic!("Crash here!");
// t3.send(1010).await;
});

// drive the task to completion, ie panic
assert!(handle.await.is_err());

// Send the initial ticket
t1.send(1040);

// Try to read
let (s1, v1) = rx.recv().await.unwrap();
let (s2, v2) = rx.recv().await.unwrap();

assert_eq!(10, s1);
assert_eq!(13, s2);
assert_eq!(1040, v1);
assert_eq!(1010, v2);
}
}
53 changes: 29 additions & 24 deletions sui_core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use sui_types::batch::{SignedBatch, TxSequenceNumber};
use tracing::warn;
use typed_store::rocks::{open_cf, DBBatch, DBMap};

use std::sync::atomic::Ordering;
use typed_store::traits::Map;

pub type AuthorityStore = SuiDataStore<true>;
Expand Down Expand Up @@ -156,6 +155,16 @@ impl<const ALL_OBJ_VER: bool> SuiDataStore<ALL_OBJ_VER> {
}
}

pub fn next_sequence_number(&self) -> Result<TxSequenceNumber, SuiError> {
Ok(self
.executed_sequence
.iter()
.skip_prior_to(&TxSequenceNumber::MAX)?
.next()
.map(|(v, _)| v + 1u64)
.unwrap_or(0))
}

/// A function that acquires all locks associated with the objects (in order to avoid deadlocks).
fn acquire_locks(&self, _input_objects: &[ObjectRef]) -> Vec<parking_lot::MutexGuard<'_, ()>> {
let num_locks = self.lock_table.len();
Expand Down Expand Up @@ -389,7 +398,8 @@ impl<const ALL_OBJ_VER: bool> SuiDataStore<ALL_OBJ_VER> {
temporary_store: AuthorityTemporaryStore,
certificate: CertifiedTransaction,
signed_effects: SignedTransactionEffects,
) -> Result<(TxSequenceNumber, TransactionInfoResponse), SuiError> {
sequence_number: Option<TxSequenceNumber>,
) -> Result<TransactionInfoResponse, SuiError> {
// Extract the new state from the execution
// TODO: events are already stored in the TxDigest -> TransactionEffects store. Is that enough?
let mut write_batch = self.transaction_lock.batch();
Expand All @@ -415,18 +425,18 @@ impl<const ALL_OBJ_VER: bool> SuiDataStore<ALL_OBJ_VER> {
)?;

// Safe to unwrap since the "true" flag ensures we get a sequence value back.
let seq: TxSequenceNumber = self
.batch_update_objects(write_batch, temporary_store, transaction_digest, true)?
.unwrap();

Ok((
seq,
TransactionInfoResponse {
signed_transaction: self.signed_transactions.get(&transaction_digest)?,
certified_transaction: Some(certificate),
signed_effects: Some(signed_effects),
},
))
self.batch_update_objects(
write_batch,
temporary_store,
transaction_digest,
sequence_number,
)?;

Ok(TransactionInfoResponse {
signed_transaction: self.signed_transactions.get(&transaction_digest)?,
certified_transaction: Some(certificate),
signed_effects: Some(signed_effects),
})
}

/// Persist temporary storage to DB for genesis modules
Expand All @@ -437,8 +447,7 @@ impl<const ALL_OBJ_VER: bool> SuiDataStore<ALL_OBJ_VER> {
) -> Result<(), SuiError> {
debug_assert_eq!(transaction_digest, TransactionDigest::genesis());
let write_batch = self.transaction_lock.batch();
self.batch_update_objects(write_batch, temporary_store, transaction_digest, false)
.map(|_| ())
self.batch_update_objects(write_batch, temporary_store, transaction_digest, None)
}

/// Helper function for updating the objects in the state
Expand All @@ -447,8 +456,8 @@ impl<const ALL_OBJ_VER: bool> SuiDataStore<ALL_OBJ_VER> {
mut write_batch: DBBatch,
temporary_store: AuthorityTemporaryStore,
transaction_digest: TransactionDigest,
should_sequence: bool,
) -> Result<Option<TxSequenceNumber>, SuiError> {
seq_opt: Option<TxSequenceNumber>,
) -> Result<(), SuiError> {
let (objects, active_inputs, written, deleted, _events) = temporary_store.into_inner();

// Archive the old lock.
Expand Down Expand Up @@ -550,7 +559,6 @@ impl<const ALL_OBJ_VER: bool> SuiDataStore<ALL_OBJ_VER> {

// This is the critical region: testing the locks and writing the
// new locks must be atomic, and no writes should happen in between.
let mut return_seq = None;
{
// Acquire the lock to ensure no one else writes when we are in here.
let _mutexes = self.acquire_locks(&active_inputs[..]);
Expand All @@ -562,21 +570,18 @@ impl<const ALL_OBJ_VER: bool> SuiDataStore<ALL_OBJ_VER> {
object_lock.ok_or(SuiError::TransactionLockDoesNotExist)?;
}

if should_sequence {
if let Some(next_seq) = seq_opt {
// Now we are sure we are going to execute, add to the sequence
// number and insert into authority sequence.
//
// NOTE: it is possible that we commit to the database transactions
// out of order with respect to their sequence number. It is also
// possible for the authority to crash without committing the
// full sequence, and the batching logic needs to deal with this.
let next_seq = self.next_sequence_number.fetch_add(1, Ordering::SeqCst);
write_batch = write_batch.insert_batch(
&self.executed_sequence,
std::iter::once((next_seq, transaction_digest)),
)?;

return_seq = Some(next_seq);
}

// Atomic write of all locks & other data
Expand All @@ -585,7 +590,7 @@ impl<const ALL_OBJ_VER: bool> SuiDataStore<ALL_OBJ_VER> {
// implicit: drop(_mutexes);
} // End of critical region

Ok(return_seq)
Ok(())
}

/// Returns the last entry we have for this object in the parents_sync index used
Expand Down
Loading