-
Notifications
You must be signed in to change notification settings - Fork 46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Batch producer design and features #191
Comments
IMO it makes sense to abstract over the strategy, local/distributed, instead of a prover instance, a single prover, because the later would need extra code (and maybe even another trait). If that is the case, then IMO the return type should have |
If we agree on using the async fn build_batch(&self, txs: Vec<SharedProvenTx>) -> Option<Future<Result<(), BuildBatchError>>>;
|
The last thing, is the dependency graph. At the moment the graph is: graph
API -->|add_transaction| TransactionQueue;
TransactionQueue -->|verify_tx| StateView;
TransactionQueue -->|build_batch| BatchBuilder;
BatchBuilder -->|build_block| BlockBuilder;
BlockBuilder -->|apply_block| StateView;
IOW, roughly like a pipeline. (Note: The control flow is a bit more complicated ref ). This pipeline-like design, has a few consequences:
IMO it should instead be a design like the following: #[async_trait]
pub trait BatchBuilder {
async fn build_batch(
&self,
txs: Vec<SharedProvenTx>,
) -> Option<Future<Result<TransactionBatch, BuildBatchError>>>;
}
#[async_trait]
pub trait BlockBuilder {
async fn build_block(
&self,
batches: Vec<TransactionBatch>,
) -> Option<Future<Result<Block, BuildBlockError>>>;
} That allows the sequenceDiagram
actor Network
participant TxQueue
participant BatchBuilder
participant BlockBuilder
participant Store
Note over Network,Store: New tx
Network->>TxQueue: add_transaction
activate TxQueue
TxQueue->>TxQueue: verify_transaction
alt is_valid
TxQueue->>TxQueue: add_tx
end
alt has_txs_for_batch
TxQueue--)BatchBuilder: build_batch
end
deactivate TxQueue
Note over Network,Store: New batch
BatchBuilder--)TxQueue: batch_done
activate TxQueue
TxQueue->>TxQueue: add_batch
alt has_batches_for_block
TxQueue--)BlockBuilder: build_block
end
deactivate TxQueue
Note over Network,Store: New block
BlockBuilder--)TxQueue: block_done
activate TxQueue
TxQueue->>Store: apply_block
TxQueue->>TxQueue: update_inflight
deactivate TxQueue
Note over Network,Store: Periodic
activate TxQueue
TxQueue->>TxQueue: tick
loop for each tx group
TxQueue--)BatchBuilder: build_batch
end
alt has_batches
TxQueue--)BlockBuilder: build_block
end
deactivate TxQueue
Note: The workflow above, were Note2: The workflow above is simplified, as it does not account for back pressure handling, but it gives the overall flow of the data in the system, the back pressure should be pretty easy to implement on top of the above. |
This is not accurate (transaction queue pushes transaction batches into the batch builder, and the batch builder pushes batches into the block producer), but since the diagram in #191 (comment) is correct, I'm assuming this is a typo?
I'm not quite following this part:
Basically, was pass a set of batches into
This approach could work and we actually considered it initially (basically to have a single component which manages the whole state with several tasks interacting with it). The current approach tries to follow an "actor model" where there are multiple components each responsible for maintaining their own state. I am not against re-evaluating this though, especially if it leads to a simpler design. One other general note: we should keep in mind that we are building towards a fully decentralized system (even if it takes us some time to get there). This means that we shouldn't assume that there is a single block builder or even that the block builder will be known ahead of time. This doesn't have a ton of implications for now, but there are a few. The main one relevant here is that block production times should be relatively regular. Currently, we are aiming 3-second block times (though this can change). |
Ah, yes, I meant to write the issue about the batch builder, as in the issue title. Not sure why I pasted the links to the block producer over there.
Because of the typo, it should have been the batch builder.
It is an The The
I don't know all the theoretical baggage of the actor model framework. But my main takeaway is the lack of locking primitives for synchronization, which I'm not proposing to use here, so in that regard it should have no downside (as in, this also doesn't deadlock). Are there any other benefits we are looking for? |
Could we not accomplish this by adding something like A couple of general comments about Tx queue and batch builder: The way they are set up currently is very primitive - this was basically implemented to make them work in the simplest possible way. So, we should view these as skeletons of what needs to be there. Here is a rough sketch of the flow we'd like to achieve eventually: As transactions are submitted, they are added to the transaction queue. The only checks we make here is that transactions should be valid. I think this part works fine. Assuming the batch builder is not busy, we want to submit transactions to it for batching (if it is busy, ideally, we won't even send a request to batch at all). Here we have 2 considerations:
The batch selection process right now is very primitive: periodically, we grab transactions from the queue, split them up into equal groups, and send them off to the batch builder. We should encapsulate this logic so that it is easy to update without affecting the rest of the system. For example, we could have something like this on the transaction queue: /// Extracts groups of transactions from the current transaction queue. `ProposedBatch` could be
/// as simple as `Vec<ProvenTransaction>`.
///
/// `num_batches` could come from the batch producer indicating how much capacity it has.
pub fn select_batches(&self, num_batches: usize) -> Vec<ProposedBatch> Current implementation of this function, would look something like this: pub fn select_batches(&self, num_batches: usize) -> Vec<Vec<ProvenTransaction>> {
let txs: Vec<ProvenTransaction> = {
let mut locked_ready_queue = self.ready_queue.write().await;
if locked_ready_queue.is_empty() {
return Vec::new();
}
locked_ready_queue.drain(..).collect()
};
txs.chunks(self.options.batch_size).map(|txs| txs.to_vec()).to_vec()
} The implementation of async fn try_build_batches(&self) {
let num_batches = self.batch_builder.get_capacity();
if num_batches == 0 {
return;
}
let proposed_batches = self.select_batches(num_batches);
for proposed_batch in proposed_batches {
let ready_queue = self.ready_queue.clone();
let batch_builder = self.batch_builder.clone();
tokio::spawn(
async move {
match batch_builder.build_batch(proposed_batch).await {
Ok(_) => {
// batch was successfully built, do nothing
},
Err(e) => {
// batch building failed, add txs back at the end of the queue
ready_queue.write().await.append(&mut e.into_transactions());
},
}
}
.instrument(info_span!(target: COMPONENT, "batch_builder")),
);
}
} Another point is that if we do want to have one central place to maintain global state (e.g., something like async fn try_build_batches(&self) {
let num_batches = self.batch_builder.get_capacity();
if num_batches == 0 {
return;
}
let proposed_batches = self.select_batches(num_batches);
for proposed_batch in proposed_batches {
let tx_queue = self.tx_queue.clone();
let batch_queue = self.batch_queue.clone();
let batch_builder = self.batch_builder.clone();
tokio::spawn(
async move {
match batch_builder.build_batch(proposed_batch).await {
Ok(batch) => {
// batch was successfully built, add the it to the batch queue
batch_queue.write().await.push(batch);
},
Err(e) => {
// batch building failed, add txs back at the end of the queue
tx_queue.write().await.append(&mut e.into_transactions());
},
}
}
.instrument(info_span!(target: COMPONENT, "batch_builder")),
);
}
} |
It does work. But IMO this is a more complicated API, for example, this adds the need for the |
I probably still don't fully understand your proposal. Let's say we have the following method on the async fn build_batch(
&self,
transactions: Vec<ProvenTransaction>,
) -> Option<Future<Result<TransactionBatch, BuildBatchError>>> How would it be called from transaction queue/pool (i.e., how would |
The proposal is basically:
I did a few experiments and I no longer think that returning a task/
Here is a concrete example. I think changing the type to Simulation codeuse futures::stream::{FuturesUnordered, StreamExt};
use rand::distributions::{Distribution, Uniform};
use std::cmp::min;
use std::ops::RangeInclusive;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, RwLock};
use tokio::task::JoinHandle;
use tokio::time::{self, Instant};
const BATCH_SIZE: usize = 4;
const BATCH_TIME: u64 = 4000;
const BATCH_MAX: usize = 2;
const TX_FREQUENCY: RangeInclusive<u64> = 700..=2000;
const BATCH_FREQUENCY: RangeInclusive<u64> = 4000..=6500;
type BatchBuilderResult = Result<Batch, ()>;
/// Simulates a proven Transaction
#[derive(Debug)]
pub struct Tx {
pub id: usize,
}
/// Simulates a proven batch
#[derive(Debug)]
pub struct Batch {
pub id: usize,
pub txs: Vec<Tx>,
}
/// Simulates the batch builder
#[derive(Debug)]
struct BatchBuilder {
batches: Arc<RwLock<usize>>,
id: usize,
}
/// Type to describe current backpressure state.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
enum Backpressure {
Yes,
No,
}
impl BatchBuilder {
fn new() -> Self {
Self {
batches: Arc::new(RwLock::new(0)),
id: 0,
}
}
async fn build(&mut self, txs: Vec<Tx>) -> Result<JoinHandle<BatchBuilderResult>, Vec<Tx>> {
assert!(!txs.is_empty()); // don't waste time on empty batches
assert!(txs.len() <= BATCH_SIZE);
{
let mut batches = self.batches.write().await;
// simulates all workers being busy
if *batches >= BATCH_MAX {
return Err(txs);
}
*batches += 1;
}
// helper code to simulate proving a batch
let between = Uniform::from(BATCH_FREQUENCY);
let mut rng = rand::thread_rng();
let time = Duration::from_millis(between.sample(&mut rng));
let id = self.id;
let lock = self.batches.clone();
println!("Batches started {id}");
let handle = tokio::spawn(async move {
tokio::time::sleep(time).await;
let mut batches = lock.write().await;
*batches -= 1;
Ok(Batch { id, txs })
});
self.id += 1;
Ok(handle)
}
}
/// Try to create as many full batches as possible, until all transactions are consumed or
/// a backpressure event.
async fn try_full_batches(
txpool: &mut Vec<Tx>,
batch_builder: &mut BatchBuilder,
inflight: &mut FuturesUnordered<JoinHandle<BatchBuilderResult>>,
) -> Backpressure {
while txpool.len() >= BATCH_SIZE {
let batch: Vec<_> = txpool.drain(..BATCH_SIZE).collect();
match batch_builder.build(batch).await {
Ok(handle) => inflight.push(handle),
Err(batch) => {
txpool.extend(batch);
println!("Batch builder full");
return Backpressure::Yes;
}
}
}
Backpressure::No
}
/// Maybe start build a partial batch (one with less than BATCH_SIZE transactions).
///
/// Only used for timer based batches, all other events require a full batch size.
async fn try_partial_batch(
txpool: &mut Vec<Tx>,
batch_builder: &mut BatchBuilder,
inflight: &mut FuturesUnordered<JoinHandle<BatchBuilderResult>>,
) -> Backpressure {
let size = min(txpool.len(), BATCH_SIZE);
let batch: Vec<_> = txpool.drain(..size).collect();
match batch_builder.build(batch).await {
Ok(handle) => inflight.push(handle),
Err(batch) => {
txpool.extend(batch);
println!("Batch builder full");
return Backpressure::Yes;
}
}
Backpressure::No
}
/// using a mpsc because it makes sense when adding a P2P layer
async fn txqueue(mut txs: mpsc::Receiver<Tx>) {
let mut batch_builder = BatchBuilder::new();
let mut txpool = Vec::new();
let mut inflight = FuturesUnordered::new();
let mut backpressure = Backpressure::No;
// timer used to start a new batch because of latency
// the timer is reset on every try, regardless of their result
let batch_time = Duration::from_millis(BATCH_TIME);
let sleep = time::sleep(batch_time);
tokio::pin!(sleep);
loop {
tokio::select! {
// disable timer if:
// - backpressure is enabled
// - txpool is empty
() = &mut sleep, if backpressure == Backpressure::No && !txpool.is_empty() => {
println!("[TXQUEUE] TIME");
// set backpressure to handle cases the worker pool shrinked
backpressure = try_partial_batch(&mut txpool, &mut batch_builder, &mut inflight).await;
sleep.as_mut().reset(Instant::now() + batch_time);
},
// never block receiving transactions
tx = txs.recv() => {
let tx = tx.expect("recv should not have failed");
println!("[TXQUEUE] {:?}", tx);
txpool.push(tx);
// if there is no backpressure for the time being, trigger a batch right away
if backpressure == Backpressure::No {
backpressure = try_full_batches(&mut txpool, &mut batch_builder, &mut inflight).await;
sleep.as_mut().reset(Instant::now() + batch_time);
}
}
// disable if:
// - the FuturesUnordered is empty, since it resolves right away
batch = inflight.next(), if !inflight.is_empty() => {
let batch = batch.expect("task should not have failed").unwrap().unwrap();
println!("[TXQUEUE] {:?}", batch);
backpressure = try_full_batches(&mut txpool, &mut batch_builder, &mut inflight).await;
sleep.as_mut().reset(Instant::now() + batch_time);
}
}
}
}
// helper to create fake txs to show the different behaviors
async fn create_txs(txs: mpsc::Sender<Tx>) {
let between = Uniform::from(TX_FREQUENCY);
let mut rng = rand::thread_rng();
let mut id = 0;
loop {
if id % 7 == 6 {
// trigger time based
tokio::time::sleep(Duration::from_millis(BATCH_TIME * 2)).await;
}
if id % 11 == 10 {
// overload batch builder
for _ in 0..(BATCH_SIZE * 2) {
txs.send(Tx { id }).await.unwrap();
id += 1;
}
} else {
// regular
let time = Duration::from_millis(between.sample(&mut rng));
tokio::time::sleep(time).await;
txs.send(Tx { id }).await.unwrap();
id += 1;
}
}
}
#[tokio::main]
async fn main() {
let (sender, receiver) = mpsc::channel(100);
tokio::join!(txqueue(receiver), create_txs(sender));
} This sample above will:
It shows how to collect all the handlers, but I have not looked into cancellation. The code is just a sample, it runs, it shows the concepts, but it is not prod code (so things like names, grouping of functions, or even the types was not really the focus). |
The mempool rework supersedes this. |
The batch builder is currently two pieces:
BatchBuilder
DefaultBatchBuilder
implementationThe trait has a single method:
Which is called by
TransactionQueue
. The current implementation pushes transactions batches on regular intervals.Things to discuss and improve on the current design:
BatchBuilder
is an abstraction over the cluster, or a single machine in said cluster.The text was updated successfully, but these errors were encountered: