-
Notifications
You must be signed in to change notification settings - Fork 7
/
execution.rs
330 lines (304 loc) · 9.99 KB
/
execution.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
use crate::{
error::Error,
oracle::OracleAgent,
requests::{
helper::{
get_all_transactions_of_wallet_async, get_request_for_stellar_tx,
retrieve_open_redeem_replace_requests_async, PayAndExecuteExt,
},
structs::Request,
PayAndExecute,
},
VaultIdManager, YIELD_RATE,
};
use async_trait::async_trait;
use governor::{
clock::{Clock, ReasonablyRealtime},
middleware::RateLimitingMiddleware,
state::{DirectStateStore, NotKeyed},
NotUntil, RateLimiter,
};
use primitives::{derive_shortened_request_id, stellar::TransactionEnvelope, TextMemo};
use runtime::{PrettyPrint, ShutdownSender, SpacewalkParachain, UtilFuncs};
use service::{spawn_cancelable, Error as ServiceError};
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::sync::RwLock;
use wallet::{Slot, StellarWallet, TransactionResponse};
// max of 3 retries for failed request execution
const MAX_EXECUTION_RETRIES: u32 = 3;
/// Spawns cancelable task for each open request.
/// The task performs the `execute` function of the request.
///
/// # Arguments
///
/// * `wallet` - the vault's wallet; used to retrieve a list of stellar transactions
/// * `requests` - a list of all open/pending requests
/// * `shutdown_tx` - for sending and receiving shutdown signals
/// * `parachain_rpc` - the parachain RPC handle
/// * `oracle_agent` - the agent used to get the proofs
/// * `rate_limiter` - a rate limiter
async fn spawn_tasks_to_execute_open_requests_async<S, C, MW>(
requests: &mut HashMap<TextMemo, Request>,
wallet: Arc<RwLock<StellarWallet>>,
shutdown_tx: ShutdownSender,
parachain_rpc: &SpacewalkParachain,
oracle_agent: Arc<OracleAgent>,
rate_limiter: Arc<RateLimiter<NotKeyed, S, C, MW>>,
) where
S: DirectStateStore,
C: ReasonablyRealtime,
MW: RateLimitingMiddleware<C::Instant, NegativeOutcome = NotUntil<C::Instant>>,
{
if let Some(mut tx_iter) = get_all_transactions_of_wallet_async(wallet).await {
// Check if some of the open requests have a corresponding payment on Stellar
// and are just waiting to be executed on the parachain
while let Some(transaction) = tx_iter.next().await {
if rate_limiter.check().is_ok() {
// give the outer `select` a chance to check the shutdown signal
tokio::task::yield_now().await;
}
// stop the loop
if requests.is_empty() {
break;
}
if let Some(request) = get_request_for_stellar_tx(&transaction, &requests) {
let hash_as_memo = spawn_task_to_execute_open_request(
request,
transaction,
shutdown_tx.clone(),
parachain_rpc.clone(),
oracle_agent.clone(),
);
// remove request from the hashmap, using the memo
requests.retain(|key, _| key != &hash_as_memo);
}
}
}
}
/// Spawns a cancelable task to execute an open request.
/// Returns the memo of the request.
///
/// # Arguments
///
/// * `request` - the open/pending request
/// * `transaction` - the transaction that the request is based from
/// * `shutdown_tx` - for sending and receiving shutdown signals
/// * `parachain_rpc` - the parachain RPC handle
/// * `oracle_agent` - the agent used to get the proofs
fn spawn_task_to_execute_open_request(
request: Request,
transaction: TransactionResponse,
shutdown_tx: ShutdownSender,
parachain_rpc: SpacewalkParachain,
oracle_agent: Arc<OracleAgent>,
) -> TextMemo {
let hash_as_memo = derive_shortened_request_id(&request.hash_inner());
tracing::info!(
"Processing valid Stellar payment for open {:?} request #{}: ",
request.request_type(),
request.hash()
);
match transaction.to_envelope() {
Err(e) => {
tracing::error!(
"Failed to decode transaction envelope for {:?} request #{}: {e:?}",
request.request_type(),
request.hash()
);
},
Ok(tx_envelope) => {
// start a new task to execute on the parachain
spawn_cancelable(
shutdown_tx.subscribe(),
execute_open_request_async(
request,
tx_envelope,
transaction.ledger as Slot,
parachain_rpc,
oracle_agent,
),
);
},
}
hash_as_memo
}
/// Executes the open request based on the transaction envelope and the proof.
/// The proof is obtained using the slot.
///
/// # Arguments
///
/// * `request` - the open request
/// * `tx_envelope` - the transaction envelope that the request is based from
/// * `slot` - the ledger number of the transaction envelope
/// * `parachain_rpc` - the parachain RPC handle
/// * `oracle_agent` - the agent used to get the proofs
async fn execute_open_request_async(
request: Request,
tx_envelope: TransactionEnvelope,
slot: Slot,
parachain_rpc: SpacewalkParachain,
oracle_agent: Arc<OracleAgent>,
) {
let mut retry_count = 0; // A counter for every execution retry
while retry_count < MAX_EXECUTION_RETRIES {
if retry_count > 0 {
tracing::info!("Performing retry #{retry_count} out of {MAX_EXECUTION_RETRIES} retries for {:?} request #{}",request.request_type(),request.hash());
}
match oracle_agent.get_proof(slot).await {
Ok(proof) => {
let Err(e) =
request.execute(parachain_rpc.clone(), tx_envelope.clone(), proof).await
else {
tracing::info!(
"Successfully executed {:?} request #{}",
request.request_type(),
request.hash()
);
break; // There is no need to retry again, so exit from while loop
};
tracing::error!(
"Failed to execute {:?} request #{} because of error: {e:?}",
request.request_type(),
request.hash()
);
break; // There is also no need to retry on an unrecoverable error.
},
Err(error) => {
retry_count += 1; // increase retry count
tracing::error!("Failed to get proof for slot {slot} for {:?} request #{:?} due to error: {error:?}",
request.request_type(),
request.hash(),
);
},
}
}
if retry_count >= MAX_EXECUTION_RETRIES {
tracing::error!("Exceeded max number of retries ({MAX_EXECUTION_RETRIES}) to execute {:?} request #{:?}. Giving up...",
request.request_type(),
request.hash(),
);
}
}
#[async_trait]
impl<S, C, MW> PayAndExecuteExt<RateLimiter<NotKeyed, S, C, MW>> for PayAndExecute
where
S: DirectStateStore + Send + Sync + 'static,
C: ReasonablyRealtime + Send + Sync + 'static,
MW: RateLimitingMiddleware<C::Instant, NegativeOutcome = NotUntil<C::Instant>>
+ Send
+ Sync
+ 'static,
<MW as RateLimitingMiddleware<<C as Clock>::Instant>>::PositiveOutcome: Send,
{
fn spawn_tasks_to_pay_and_execute_open_requests(
requests: HashMap<TextMemo, Request>,
vault_id_manager: VaultIdManager,
shutdown_tx: ShutdownSender,
parachain_rpc: &SpacewalkParachain,
oracle_agent: Arc<OracleAgent>,
rate_limiter: Arc<RateLimiter<NotKeyed, S, C, MW>>,
) {
for (_, request) in requests {
// there are potentially a large number of open requests - pay and execute each
// in a separate task to ensure that awaiting confirmations does not significantly
// delay other requests
// make copies of the variables we move into the task
spawn_cancelable(
shutdown_tx.subscribe(),
Self::pay_and_execute_open_request_async(
request,
vault_id_manager.clone(),
parachain_rpc.clone(),
oracle_agent.clone(),
rate_limiter.clone(),
),
);
}
}
async fn pay_and_execute_open_request_async(
request: Request,
vault_id_manager: VaultIdManager,
parachain_rpc: SpacewalkParachain,
oracle_agent: Arc<OracleAgent>,
rate_limiter: Arc<RateLimiter<NotKeyed, S, C, MW>>,
) {
let Some(vault) = vault_id_manager.get_vault(request.vault_id()).await else {
tracing::error!(
"Couldn't process open {:?} request #{:?}: Failed to fetch vault data for vault {}",
request.request_type(),
request.hash(),
request.vault_id().pretty_print()
);
return; // nothing we can do - bail
};
// We rate limit the number of transactions we pay and execute simultaneously because
// sending too many at once might cause the Stellar network to respond with a timeout
// error.
rate_limiter.until_ready().await;
match request.pay_and_execute(parachain_rpc, vault, oracle_agent).await {
Ok(_) => tracing::info!(
"Successfully executed open {:?} request #{:?}",
request.request_type(),
request.hash()
),
Err(e) => tracing::info!(
"Failed to process open {:?} request #{:?} due to error: {e}",
request.request_type(),
request.hash(),
),
}
}
}
/// Queries the parachain for open requests and executes them. It checks the
/// stellar blockchain to see if a payment has already been made.
///
/// # Arguments
///
/// * `shutdown_tx` - for sending and receiving shutdown signals
/// * `parachain_rpc` - the parachain RPC handle
/// * `vault_id_manager` - contains all the vault ids and their data.
/// * `wallet` - the vault's wallet; used to retrieve a list of stellar transactions
/// * `oracle_agent` - the agent used to get the proofs
/// * `payment_margin` - minimum time to the the redeem execution deadline to make the stellar
/// payment.
#[allow(clippy::too_many_arguments)]
pub async fn execute_open_requests(
shutdown_tx: ShutdownSender,
parachain_rpc: SpacewalkParachain,
vault_id_manager: VaultIdManager,
wallet: Arc<RwLock<StellarWallet>>,
oracle_agent: Arc<OracleAgent>,
payment_margin: Duration,
) -> Result<(), ServiceError<Error>> {
let parachain_rpc_ref = ¶chain_rpc;
// get all redeem and replace requests
let mut open_requests = retrieve_open_redeem_replace_requests_async(
parachain_rpc_ref,
parachain_rpc.get_account_id().clone(),
payment_margin,
)
.await?;
let rate_limiter = Arc::new(RateLimiter::direct(YIELD_RATE));
// Check if the open requests have a corresponding payment on Stellar
// and are just waiting to be executed on the parachain
spawn_tasks_to_execute_open_requests_async(
&mut open_requests,
wallet,
shutdown_tx.clone(),
parachain_rpc_ref,
oracle_agent.clone(),
rate_limiter.clone(),
)
.await;
// Remaining requests in the hashmap did not have a Stellar payment yet,
// so pay and execute all of these
PayAndExecute::spawn_tasks_to_pay_and_execute_open_requests(
open_requests,
vault_id_manager,
shutdown_tx,
parachain_rpc_ref,
oracle_agent,
rate_limiter,
);
Ok(())
}