-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rewrite Task of listening Stellar Messages #545
Conversation
clients/service/src/lib.rs
Outdated
where | ||
F: Future<Output = Result<(), E>>, | ||
{ | ||
if let Some(mut precheck_signal) = precheck_signal { | ||
if let Err(e) = precheck_signal.recv().await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
start the thread ONLY if precheck is finished.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a form of timeout here so that it's easier to find tasks that never start because the precheck doesn't resolve?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forgot to reply to this..
I did not use a timeout, but the try_recv()
; if it's not ready, then the loop will continue.
The loop ends until it is a success OR an error happens.
@@ -35,13 +35,13 @@ impl StellarOverlayConfig { | |||
|
|||
#[allow(dead_code)] | |||
pub(crate) fn node_info(&self) -> NodeInfo { | |||
self.node_info.clone().into() | |||
NodeInfo::new(&self.node_info) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reduce clones
@@ -340,7 +340,7 @@ mod test { | |||
cert: new_auth_cert, | |||
nonce: [0; 32], | |||
}; | |||
connector.set_remote(RemoteInfo::new(&hello)); | |||
connector.set_remote(RemoteInfo::new(hello)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consume hello
Ok(None) => continue, | ||
Ok(Some(xdr)) => return Ok(xdr), | ||
Ok(false) => continue, | ||
Ok(true) => return Ok(readbuf), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of returning a "cloned" readbuf
, reuse it.
@@ -180,7 +183,7 @@ async fn read_message( | |||
|
|||
// only when the message has the exact expected size bytes, should we send to user. | |||
if actual_msg_len == xpect_msg_len { | |||
return Ok(Some(readbuf.clone())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of sending back a cloned value, reuse the readbuf
.
@@ -19,6 +19,19 @@ pub struct NodeInfo { | |||
pub network_id: NetworkId, | |||
} | |||
|
|||
impl NodeInfo { | |||
pub(crate) fn new(cfg:&NodeInfoCfg) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eliminate an all out clone of NodeInfoCfg
. We only need to clone 1 field, the version_str
@@ -36,11 +36,11 @@ impl Debug for RemoteInfo { | |||
} | |||
|
|||
impl RemoteInfo { | |||
pub fn new(hello: &Hello) -> Self { | |||
pub fn new(hello: Hello) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reduce clone by consuming Hello
@@ -65,8 +73,7 @@ impl StellarOverlayConnection { | |||
return Err(Error::Disconnected) | |||
} | |||
|
|||
timeout(Duration::from_secs(1), self.receiver.recv()).await | |||
.map_err(|_| Error::Timeout) | |||
Ok(self.receiver.recv().await) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove extra timeout. Timeouts can be triggered from inside the polling.
F::Output: Send + 'static, | ||
{ | ||
cfg_if::cfg_if!{ | ||
if #[cfg(all(tokio_unstable, feature = "allow-debugger"))] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
allow naming tasks, if allow-debugger
feature is on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to add this check? Or can we just always use the builder pattern and in case the allow-debugger
feature is not enabled, the name just wouldn't show?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the builder only works for tokio unstable, unfortunately.
pub is_public_network: bool, | ||
/// sends message directly to Stellar Node | ||
message_sender: Option<StellarMessageSender>, | ||
/// sends an entire Vault shutdown | ||
shutdown_sender: ShutdownSender, | ||
/// sends a 'stop' signal to `StellarOverlayConnection` poll | ||
overlay_conn_end_signal: mpsc::Sender<()>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not need this signal anymore; OverlayConnection already implements Drop
trait.
/// Start the connection to the Stellar Node. | ||
/// Returns an `OracleAgent` that will handle incoming messages from Stellar Node, | ||
/// and to send messages to Stellar Node | ||
pub async fn start_oracle_agent( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed to listen_for_stellar_messages(...)
to return nothing. This is an added task to be monitored in the tokio_metrics
clients/vault/src/oracle/agent.rs
Outdated
}, | ||
}}, | ||
// log a new message received, every 1 minute. | ||
let interval = Duration::from_secs(60); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is intentional, to keep logging every minute to show whether the vault is stuck or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move the 60 to an extra constant variable instead of using a magic number. And let's increase it to something like 10 minutes. Otherwise the logs will get bloated quickly just by this health check.
clients/vault/src/oracle/agent.rs
Outdated
let sender_clone = overlay_conn.sender(); | ||
tokio::spawn(async move { | ||
loop { | ||
tokio::select! { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed tokio::select, since this task will close based on the run_and_monitor_task()
.
} | ||
|
||
/// Stops listening for new SCP messages. | ||
pub async fn shutdown(&self) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is unnecessary already; the Drop
trait will handle dropping connections
…r<oracle::errors::Error>>` is not satisfied --> clients/vault/src/system.rs:820:4
…893/job/28704943452?pr=545 OracleAgent is now `Arc<RwLock<>>`, since the message_sender is updated late in the code; inside the `listen_for_stellar_messages`
Ok(()) | ||
} | ||
#[cfg(any(test, feature = "integration"))] | ||
pub async fn start_oracle_agent( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only for testing purposes
clients/vault/src/oracle/agent.rs
Outdated
"handle_message(): First slot saved: {slot}. Ready to build proofs " | ||
); | ||
} | ||
*is_proof_building_ready = None; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
proof building is ready; we do not need this flag/signal anymore.
@@ -16,16 +16,6 @@ use crate::oracle::{ | |||
ScpArchiveStorage, ScpMessageCollector, TransactionsArchiveStorage, | |||
}; | |||
|
|||
/// Returns true if the SCP messages for a given slot are still recoverable from the overlay | |||
/// because the slot is not too far back. | |||
fn check_slot_still_recoverable_from_overlay(last_slot_index: Slot, slot: Slot) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved this at the bottom of the file
/// | ||
/// * `slot` - the slot where the txset is to get. | ||
/// * `sender` - used to send messages to Stellar Node | ||
pub async fn build_proof(&self, slot: Slot, sender: &StellarMessageSender) -> Option<Proof> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved this at the top of the file
|
||
// check if both the config file and the wallet are the same. | ||
if is_public_network != stellar_overlay_cfg.is_public_network() { | ||
return Err(ServiceError::IncompatibleNetwork); | ||
} | ||
|
||
let oracle_agent = crate::oracle::start_oracle_agent( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are now starting the StellarRelay inside the monitored tasks; NOT here/
…. Please upgrade to using Environment Files."
@@ -18,6 +18,8 @@ runs: | |||
token: ${{ inputs.token }} | |||
|
|||
- name: Install Rust Nightly | |||
uses: ./.github/actions/install-rust-nightly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This github actions is using a deprecated set-output
:
The `set-output` command is deprecated and will be disabled soon. Please upgrade to using Environment Files.
Replaced with dtolnay
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a great improvement, thanks a lot @b-yap! We need to adjust some of the log levels because some messages are a bit spammy for 'regular' vault operators.
Some experiments
I ran the amplitude USDC #1 vault locally to check the resource consumption with tokio-
console.
tokio-console after ~1.5 h (run #1)
CPU usage after ~1.5h: (84% is quite a lot.., almost a full single core of my system consumed by the vault)
After a while it seemed like tokio-console was not updating properly anymore so I was curious if tokio-console stopped working andI tried restarting it but it is not able to connect again. I tested if the vault is still operational by creating fresh issue and redeem requests against it and indeed, it was able to pick them up and complete them as expected. So maybe it's just some bug with tokio-console that makes it unable to connect after a while.
Proof that it's still able to process issue and redeem requests even though tokio-console shows blank:
To check the CPU usage, I then ran the vault a second time
We can see that in the beginning (before having received/processed any issue/redeem request) the % CPU is at ~2%- 3%.
I then ran the spacewalk-testing-service so created first 1 issue request and then 1 redeem request.
This increased the % CPU to ~5%
I kept monitoring this for about 10 minutes and from then on, the % CPU fluctuated between 4%-10%. It's hard to say this for sure but it might suggest that the resource consumption increases after an issue or redeem request is processed but stays more or less constant before.
After 20 minutes it stayed almost always above 10%..
tokio-console after 20 minutes:
I'll try to experiment with this again tomorrow. I still don't understand how the CPU consumption constantly increases over time. But at least we don't have any deadlocked tasks anymore and every task apparently properly yields. Which is a great improvement over what we had before.
send_to_user_sender, | ||
send_to_node_receiver, | ||
)) | ||
.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: let's expect()
here
/// Insert envelopes fetched from the archive to the map | ||
/// | ||
/// # Arguments | ||
/// | ||
/// * `envelopes_map_lock` - the map to insert the envelopes to. | ||
/// * `slot` - the slot where the envelopes belong to | ||
fn get_envelopes_from_horizon_archive(&self, slot: Slot) -> impl Future<Output = ()> { | ||
tracing::debug!("get_envelopes_from_horizon_archive(): Fetching SCP envelopes from horizon archive for slot {slot}..."); | ||
tracing::info!("get_envelopes_from_horizon_archive(): Fetching SCP envelopes from horizon archive for slot {slot}..."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tracing::info!("get_envelopes_from_horizon_archive(): Fetching SCP envelopes from horizon archive for slot {slot}..."); | |
tracing::debug!("get_envelopes_from_horizon_archive(): Fetching SCP envelopes from horizon archive for slot {slot}..."); |
clients/vault/src/oracle/agent.rs
Outdated
}, | ||
}}, | ||
// log a new message received, every 1 minute. | ||
let interval = Duration::from_secs(60); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move the 60 to an extra constant variable instead of using a magic number. And let's increase it to something like 10 minutes. Otherwise the logs will get bloated quickly just by this health check.
@@ -158,11 +171,12 @@ impl ScpMessageCollector { | |||
match tx_set { | |||
Some(res) => Some(res), | |||
None => { | |||
tracing::info!("get_txset(): FOR SLOT {slot} check_slot_still_recoverable_from_overlay: LAST SLOT INDEX: {}",self.last_slot_index()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tracing::info!("get_txset(): FOR SLOT {slot} check_slot_still_recoverable_from_overlay: LAST SLOT INDEX: {}",self.last_slot_index()); | |
tracing::debug!("get_txset(): FOR SLOT {slot} check_slot_still_recoverable_from_overlay: LAST SLOT INDEX: {}",self.last_slot_index()); |
|
||
return None; | ||
} | ||
|
||
/// fetch envelopes not found in the collector | ||
async fn _get_envelopes(&self, slot: Slot, sender: &StellarMessageSender) { | ||
tracing::info!("_get_envelopes(): FOR SLOT {slot} check_slot_still_recoverable_from_overlay: LAST SLOT INDEX: {}",self.last_slot_index()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tracing::info!("_get_envelopes(): FOR SLOT {slot} check_slot_still_recoverable_from_overlay: LAST SLOT INDEX: {}",self.last_slot_index()); | |
tracing::debug!("_get_envelopes(): FOR SLOT {slot} check_slot_still_recoverable_from_overlay: LAST SLOT INDEX: {}",self.last_slot_index()); |
clients/vault/src/oracle/agent.rs
Outdated
@@ -156,19 +206,20 @@ impl OracleAgent { | |||
|
|||
timeout(Duration::from_secs(timeout_seconds), async move { | |||
loop { | |||
tracing::info!("get_proof(): attempt to build proof for slot {slot}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tracing::info!("get_proof(): attempt to build proof for slot {slot}"); | |
tracing::debug!("get_proof(): attempt to build proof for slot {slot}"); |
} else { | ||
tokio::spawn(self.get_txset_from_horizon_archive(slot)); | ||
self.get_txset_from_horizon_archive(slot).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we are now doing await
on this vs spawning in a new thread as before. This means if I understand correctly that build_proof
will now wait here and the loop won't continue and call build_proof
again. Why don't we need this behavior anymore now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before tokio-console: when the vaults hang, the last logs are in build_proof
section getting data from the archive.
I wanted to eliminate unnecessary tasks as possible, especially since these are not monitored at all. Should the original tokio::spawn(self.get_txset_from_horizon_archive(slot));
become a zombie, there is nothing to stop that task.
The time spent looping build_proof
continuously while the map is empty
VS
The time spent waiting for the archive to fill the map
is just the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see! makes sense, in fact makes much more sense since get_txset_from_horizon_archive
will return when the tx_set
was found. But before we were potentially actually spawning many instances of get_txset_from_horizon_archive
for the same slot, I thought maybe we needed several of these tasks, or that we needed to continue and return a proof with None
set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for leaving more comments yet again. I pointed out more logs that I think we should turn to the debug log level instead.
Besides that I encountered an error when running the vault locally. It happened on start
thread 'tokio-runtime-worker' panicked at clients/runtime/src/rpc.rs:296:49:
called `Result::unwrap()` on an `Err` value: Rpc(ClientError(MaxSlotsExceeded))
It's unrelated to the changes of this PR but @b-yap if it's not too complex, maybe you could try to replace all the unwrap()
calls in clients/runtime/src/rpc.rs
as part of this as well? 😬
I also want to share good news though. Further experiments I did locally showed that the CPU usage only piles up when running the vault with --features allow-debugger
ie. support for debugging with tokio-console. Without that flag, the client's resource consumption remains quite low at ~1-4% when idling. Seems like the refactoring indeed fixed our bugs with tokio and we can be optimistic that these changes greatly improve the performance once live on production (without the debugging enabled).
clients/vault/src/oracle/agent.rs
Outdated
tracing::info!( | ||
"listen_for_stellar_messages(): received hello message from Stellar" | ||
); | ||
is_proof_building_ready = Some(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand why we need this variable? Also, with Option it can now be three things, None
, Some(true)
and Some(false)
. As far as I can tell, we only check for Some(true)
and never for None
so it should be fine if we don't make it an Option
type but a simple boolean as we apparently only need the two cases.
payment_margin: Duration, | ||
precheck_signal: tokio::sync::broadcast::Sender<()>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: this is missing in the doc comment above.
clients/vault/src/issue.rs
Outdated
@@ -26,6 +26,7 @@ pub(crate) async fn initialize_issue_set( | |||
issue_set: &ArcRwLock<IssueRequestsMap>, | |||
memos_to_issue_ids: &ArcRwLock<IssueIdLookup>, | |||
) -> Result<(), Error> { | |||
tracing::info!("initialize_issue_set(): started"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: make this and the other similar logs debug
clients/vault/src/replace.rs
Outdated
@@ -96,6 +96,8 @@ pub async fn listen_for_replace_requests( | |||
event_channel: Sender<Event>, | |||
accept_replace_requests: bool, | |||
) -> Result<(), ServiceError<Error>> { | |||
tracing::info!("listen_for_replace_requests(): started"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tracing::info!("listen_for_replace_requests(): started"); | |
tracing::debug!("listen_for_replace_requests(): started"); |
@@ -1163,17 +1158,23 @@ async fn test_execute_open_requests_succeeds() { | |||
// add it to the set | |||
sleep(Duration::from_secs(5)).await; | |||
|
|||
let (precheck_signal, mut rceiver) = tokio::sync::broadcast::channel(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let (precheck_signal, mut rceiver) = tokio::sync::broadcast::channel(1); | |
let (precheck_signal, mut receiver) = tokio::sync::broadcast::channel(1); |
Err(TryRecvError::Empty) => | ||
tracing::trace!("wait_or_shutdown precheck signal: waiting..."), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is problematic because we are again creating tasks that never yield, potentially causing issues for the scheduling of other tasks.
Err(TryRecvError::Empty) => | |
tracing::trace!("wait_or_shutdown precheck signal: waiting..."), | |
Err(TryRecvError::Empty) => { | |
tracing::trace!("wait_or_shutdown precheck signal: waiting..."), | |
sleep(Duration::from_millis(100)).await; // Yield to the runtime | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or we can use precheck_signal.recv().await
? this one is async.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
await
will not yield; it will potentially wait forever. I've yielded instead of the sleep.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if the receiver precheck_signal
has already consumed the message then next time calling recv().await
, it will yield until a new one arrives. Or do you think there will always be a message in the broadcast channel?
This implementation is equivalent though. But with try_recv
we do need to yield that's true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you think there will always be a message in the broadcast channel?
It will only send 1 ()
message. See lines 335-337 in clients/vault/src/requests/execution.rs.
The first try was to use recv().await
but the possibility of it waiting forever is there.
However.. we also cannot move forward with the task anyway, if the precheck (or the execute_open_requests
) is not finished.
I also would not like to wrap the recv()
in a timeout()
because open requests might really take longer than the timeout we set. Also, a timeout
is another thread; That's 8 tasks requiring prechecks, meaning 8 additional threads. (See Marcel's comment for the image of these tasks: Issue Cancel Listener, Issue Request Listener, etc.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Readability, reverting to recv().await
would be better. I would add that back, but we have to risk it awaiting.
I think execute_open_requests()
is reliable enough to be able to send the message fast.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super nice (and involved😅) refactor 💪! And I'm glad it solved the hanging and apparently also the high CPU usage issue. Thanks @b-yap for going back to my questions and comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great that you found a way to get rid of the ArcRwLOck
on the oracle agent, improves the readability a lot 🙏
clients/runtime/src/rpc.rs
Outdated
async fn get_latest_storage( | ||
&self, | ||
) -> Storage<SpacewalkRuntime, OnlineClient<SpacewalkRuntime>> { | ||
self.api.storage().at_latest().await.expect("failed to get latest storage") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we map this to an error instead of expect
ing or would that require a lot of refactoring?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we unfortunately cannot expect this to never fail, so it would be very good if we can handle this error gracefully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrapped it inside Error::SubxtRuntimeError()
…job/31813445329?pr=545
@ebma @gianfra-t CI passed. Merging this. |
Summary
We need to fix the vaults from getting unknowingly stuck, potentially caused by running unhandled zombie tasks.
One idea is to include the polling of the stellar messages:
spacewalk/clients/vault/src/oracle/agent.rs
Lines 100 to 104 in 7c79898
in the monitoring:
spacewalk/clients/vault/src/system.rs
Lines 806 to 817 in 7c79898
This means updating the
OracleAgent
's message_sender is delayed; passing theOracleAgent
to tasks must be mutable; hence usingArc<RwLock<>>
instead ofArc<>
alone.But we cannot have these current tasks STARTING TOGETHER WITH the polling task. The Stellar-overlay has to run already, and all open requests MUST finish first.
spacewalk/clients/vault/src/system.rs
Lines 787 to 790 in 7c79898
An idea is to introduce another variant of the
ServiceTask
, where it waits for something to finish before a task starts. Prechecking will be required.And to make sure the stellar-overlay and the client are communicating well, stellar-overlay will also send to client the:
hello
message - useful to signal the client to prepare itselfHow to start reviewing:
stellar-relay-lib
(¶m)
to(param)
message_reader.rs
filevault
err-derive
dependency, and just use the existingthiserror
dependency.tokio_spawn
for naming tasks. This is only viable fortokio_unstable
.OracleAgent
adjustmentArc<OracleAgent>
, it will beArc<RwLock<OracleAgent>>
message_sender
is moved a bit later in the code; hence we needOracleAgent
to be mutable.ServiceTask
; thePrecheckRequired
The following tasks need this precheck:
*
issue::listen_for_issue_requests
*
issue::listen_for_issue_cancels
*
issue::listen_for_executed_issues
*
CancellationScheduler
for Issue*
CancellationScheduler
for Replace*
listen_for_replace_requests
*
listen_for_accept_replace
*
active_block_listener
I will add relevant comments when necessary.