Skip to content

Commit

Permalink
protocol: add a timer for operation annoucement
Browse files Browse the repository at this point in the history
  • Loading branch information
gterzian committed Oct 3, 2022
1 parent b2930ab commit 28cac34
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 5 deletions.
2 changes: 2 additions & 0 deletions massa-node/base_config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@
operation_batch_proc_period = 500
# All operations asked are prune each `operation_asked_pruning_period` millisecond
asked_operations_pruning_period = 100000
# Interval at which operations are announced in batches.
operation_announcement_interval = 500
# Max number of operation per message, same as network param but can be smaller
max_operations_per_message = 1024
# Time threshold after which operation are not propagated
Expand Down
1 change: 1 addition & 0 deletions massa-node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ async fn launch(
operation_batch_buffer_capacity: SETTINGS.protocol.operation_batch_buffer_capacity,
operation_batch_proc_period: SETTINGS.protocol.operation_batch_proc_period,
asked_operations_pruning_period: SETTINGS.protocol.asked_operations_pruning_period,
operation_announcement_interval: SETTINGS.protocol.operation_announcement_interval,
max_operations_per_message: SETTINGS.protocol.max_operations_per_message,
max_serialized_operations_size_per_block: MAX_BLOCK_SIZE as usize,
controller_channel_size: PROTOCOL_CONTROLLER_CHANNEL_SIZE,
Expand Down
2 changes: 2 additions & 0 deletions massa-node/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ pub struct ProtocolSettings {
pub operation_batch_proc_period: MassaTime,
/// All operations asked are prune each `operation_asked_pruning_period` millisecond
pub asked_operations_pruning_period: MassaTime,
/// Interval at which operations are announced in batches.
pub operation_announcement_interval: MassaTime,
/// Maximum of operations sent in one message.
pub max_operations_per_message: u64,
/// Time threshold after which operation are not propagated
Expand Down
2 changes: 2 additions & 0 deletions massa-protocol-exports/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ pub struct ProtocolConfig {
pub operation_batch_proc_period: MassaTime,
/// All operations asked are prune each `operation_asked_pruning_period` millisecond
pub asked_operations_pruning_period: MassaTime,
/// Interval at which operations are announced in batches.
pub operation_announcement_interval: MassaTime,
/// Maximum of operations sent in one message.
pub max_operations_per_message: u64,
/// Maximum size in bytes of all serialized operations size in a block
Expand Down
1 change: 1 addition & 0 deletions massa-protocol-exports/src/tests/tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ pub fn create_protocol_config() -> ProtocolConfig {
operation_batch_buffer_capacity: 1000,
operation_batch_proc_period: 200.into(),
asked_operations_pruning_period: 500.into(),
operation_announcement_interval: 500.into(),
max_operations_per_message: 1024,
thread_count: 32,
max_serialized_operations_size_per_block: 1024,
Expand Down
23 changes: 18 additions & 5 deletions massa-protocol-worker/src/protocol_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl ProtocolWorker {
config.operation_batch_buffer_capacity,
),
storage,
operations_to_announce: Default::default(),
operations_to_announce: Vec::with_capacity(config.operation_batch_buffer_capacity),
}
}

Expand Down Expand Up @@ -258,6 +258,9 @@ impl ProtocolWorker {
let operation_batch_proc_period_timer =
sleep(self.config.operation_batch_proc_period.into());
tokio::pin!(operation_batch_proc_period_timer);
let operation_announcement_interval =
sleep(self.config.operation_announcement_interval.into());
tokio::pin!(operation_announcement_interval);
loop {
massa_trace!("protocol.protocol_worker.run_loop.begin", {});
/*
Expand Down Expand Up @@ -296,13 +299,23 @@ impl ProtocolWorker {
self.update_ask_block(&mut block_ask_timer).await?;
}

// Operation announcement interval.
_ = &mut operation_announcement_interval => {
// Announce operations.
self.announce_ops().await;

// Reset timer.
let now = Instant::now();
let next_tick = now
.checked_add(self.config.operation_announcement_interval.into())
.ok_or(TimeError::TimeOverflowError)?;
operation_announcement_interval.set(sleep_until(next_tick));
}

// operation ask, and announce, timer
_ = &mut operation_batch_proc_period_timer => {
massa_trace!("protocol.protocol_worker.run_loop.operation_ask_and_announce_timer", { });

// Announce operations.
self.announce_ops().await;

// Update operations to ask.
self.update_ask_operation(&mut operation_batch_proc_period_timer).await?;
}
Expand Down Expand Up @@ -359,7 +372,7 @@ impl ProtocolWorker {
// If the buffer is full,
// announce operations immediately,
// clearing the data at the same time.
if self.operations_to_announce.len() > self.config.max_known_ops_size {
if self.operations_to_announce.len() > self.config.operation_batch_buffer_capacity {
self.announce_ops().await;
}
}
Expand Down

0 comments on commit 28cac34

Please sign in to comment.