-
Notifications
You must be signed in to change notification settings - Fork 11.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[authority] Batch crash robustness v3 #835
Conversation
49dcc49
to
ddef948
Compare
sui_core/src/authority.rs
Outdated
.expect("Init batches failed!"); | ||
|
||
// Only initialize an empty database. | ||
if store |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re https://github.com/MystenLabs/sui/pull/807/files#r828079220: this piece of code is only expected to run during Genesis. And when I (as a new developer) think of genesis, I will assume everything starts from a fresh copy. Should we add code to clean up the DB if it's not empty during genesis?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am quite worried about deleting data here. Its better if we throw an error maybe, and let the user know they have pointed to a database that already contains data? Maybe its a mistake and the data is valuable to them.
.high_watermark | ||
.fetch_add(1, std::sync::atomic::Ordering::SeqCst); | ||
Ok(TransactionNotifierTicket { | ||
transaction_notifier: self.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I notice that we do a lot of self.clone()
on the transaction notifier, which has an AuthorityStore inside. Does this not create an opportunity for divergence of store contents of different notifiers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our authority store is wrapped in a Arc<AuthorityStore>
within the notifier structure. So the .clone
will copy the pointer (and increment the atomic counter within the Arc>, and they will all point to the same store.
Ok(TransactionNotifier { | ||
state, | ||
low_watermark: AtomicU64::new(seq), | ||
high_watermark: AtomicU64::new(seq), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the watermarks don't seem to get updated yet, is that a future PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The low watermark get updated on the ticket drop:
impl Drop for TransactionNotifierTicket {
fn drop(&mut self) {
self.transaction_notifier
.low_watermark
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
self.transaction_notifier.notify.notify_one();
}
}
And the high watermark on the ticket issuance:
/// Get a ticket with a sequence number
pub fn ticket(self: &Arc<Self>) -> SuiResult<TransactionNotifierTicket> {
if self.is_closed.load(std::sync::atomic::Ordering::SeqCst) {
return Err(SuiError::ClosedNotifierError);
}
let seq = self
.high_watermark
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Ok(TransactionNotifierTicket {
transaction_notifier: self.clone(),
seq,
})
}
I think that ensures that low_watermark <= high_watermark
and the "gap" represents the number of certificates still pending being written to the db.
.is_err() | ||
{ | ||
return Err(SuiError::ConcurrentIteratorError); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is only one stream allowed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This stream is read by the task running the run_batch_service
that creates batches. I wanted to make it hard for anyone to make a mistake here and create two concurrent services that create batches, since they would (a) potentially take turns being notified (not sure if notify wakes everyone up), and (b) would compete in making batches for transactions within the same range.
store.side_sequence(t8.seq(), &TransactionDigest::random()); | ||
drop(t8); | ||
|
||
assert!(matches!(iter.next().await, Some((5, _)))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so dropping a sequence leaves it intact in the batch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The drop
calls above are tickets. Dropping a ticket increases the low watermark (see above) and notifies the that there may be a new transaction available and safe to sequence. The sequence itself is stored in the db (when the write has not failed) so it is there for the stream generated by TransactionNotifier::iter_from
to pick up and deliver (if it exists).
let _batch_join_handle = tokio::task::spawn(async move { | ||
local_server | ||
.state | ||
.run_batch_service(min_batch_size, max_delay) | ||
.await | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This spawn_batch_subsystem
can be retrofit to be run inside a resilient component.
We would create a wrapper struct and add in the local_server
, min_batch_size
, max_delay
, etc. and then implement a trait that would call spawn_batch_subsystem
which would clone local_server
, etc from self and pass that into the tokio task. This will work because it is cloned. 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is where the re-startable component would come in very handy. When the task handling the run_batch_service
fails, we need to reclaim any resources (there are none for the moment actually) and then start a new one.
ddef948
to
69c3f55
Compare
The Background
This is the v3 attempt at creating the infrastructure on the authority side to support Priority A, namely making a sequence of transactions and interspersed batches providing meta-data and authentication. It follows the saga of:
The issues with previous approaches were:
What this PR does
UpdateItems
is unchanged and based on the tokio broadcast primitive.Minor issues fixed:
What this PR does not do
Sadly this PR does not resolve the fact that if two transactions interact with each other poorly, and are given sequence numbers in the opposite order than they are written to disk, it is possible that the second may have to wait for the first to write to disk. This is a slight delay.