Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make /settle call blocking and report tx_hash #1999

Merged
merged 7 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/autopilot/src/driver_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ pub mod reveal {
pub mod settle {
use {
model::bytes_hex,
primitive_types::H256,
serde::{Deserialize, Serialize},
serde_with::serde_as,
};
Expand All @@ -212,6 +213,7 @@ pub mod settle {
#[serde(rename_all = "camelCase", deny_unknown_fields)]
pub struct Response {
pub calldata: Calldata,
pub tx_hash: H256,
}

#[serde_as]
Expand Down
124 changes: 25 additions & 99 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,15 @@ use {
},
},
number::nonzero::U256 as NonZeroU256,
primitive_types::{H160, H256, U256},
primitive_types::{H160, U256},
rand::seq::SliceRandom,
shared::{
event_handling::MAX_REORG_BLOCK_COUNT,
remaining_amounts,
token_list::AutoUpdatingTokenList,
},
shared::{remaining_amounts, token_list::AutoUpdatingTokenList},
std::{
collections::{BTreeMap, HashSet},
sync::Arc,
time::{Duration, Instant},
},
tracing::Instrument,
web3::types::Transaction,
};

pub const SOLVE_TIME_LIMIT: Duration = Duration::from_secs(15);
Expand Down Expand Up @@ -290,7 +285,7 @@ impl RunLoop {
}

tracing::info!(driver = %driver.name, "settling");
match self.settle(driver, auction_id, solution, &revealed).await {
match self.settle(driver, solution, &revealed).await {
Ok(()) => Metrics::settle_ok(driver),
Err(err) => {
Metrics::settle_err(driver, &err);
Expand Down Expand Up @@ -413,7 +408,6 @@ impl RunLoop {
async fn settle(
&self,
driver: &Driver,
id: AuctionId,
solved: &Solution,
revealed: &reveal::Response,
) -> Result<(), SettleError> {
Expand All @@ -424,95 +418,33 @@ impl RunLoop {
.collect_vec();
self.database.store_order_events(&events).await;

driver
.settle(&settle::Request {
solution_id: solved.id,
})
.await
.map_err(SettleError::Failure)?;
let deadline = tokio::time::Instant::now() + self.max_settlement_transaction_wait;
let request = settle::Request {
solution_id: solved.id,
};
let settle = driver.settle(&request);

let tx_hash = match tokio::time::timeout_at(deadline, settle).await {
Ok(Ok(res)) => res.tx_hash,
Ok(Err(err)) => return Err(SettleError::Failure(err)),
Err(_) => {
return Err(SettleError::Failure(anyhow::anyhow!(
"submission deadline exceeded"
)))
}
};

// TODO: React to deadline expiring.
let transaction = self
.wait_for_settlement_transaction(id, solved.account)
.await?;
if let Some(tx) = transaction {
let events = revealed
.orders
.iter()
.map(|uid| (*uid, OrderEventLabel::Traded))
.collect_vec();
self.database.store_order_events(&events).await;
tracing::debug!("settled in tx {:?}", tx.hash);
} else {
tracing::warn!("could not find a mined transaction in time");
}
let events = revealed
.orders
.iter()
.map(|uid| (*uid, OrderEventLabel::Traded))
.collect_vec();
self.database.store_order_events(&events).await;
tracing::debug!(?tx_hash, "solution settled");

Ok(())
}

/// Tries to find a `settle` contract call with calldata ending in `tag`.
///
/// Returns None if no transaction was found within the deadline.
async fn wait_for_settlement_transaction(
&self,
id: AuctionId,
submission_address: H160,
) -> Result<Option<Transaction>, SettleError> {
// Start earlier than current block because there might be a delay when
// receiving the Solver's /execute response during which it already
// started broadcasting the tx.
let start_offset = MAX_REORG_BLOCK_COUNT;
let max_wait_time_blocks = (self.max_settlement_transaction_wait.as_secs_f32()
/ self.network_block_interval.as_secs_f32())
.ceil() as u64;
let current = self.current_block.borrow().number;
let start = current.saturating_sub(start_offset);
let deadline = current.saturating_add(max_wait_time_blocks);
tracing::debug!(
%current, %start, %deadline, ?id, ?submission_address,
"waiting for settlement",
);

// Use the existing event indexing infrastructure to find the transaction. We
// query all settlement events in the block range to get tx hashes and
// query the node for the full calldata.
//
// If the block range was large, we would make the query more efficient by
// moving the starting block up while taking reorgs into account. With
// the current range of 30 blocks this isn't necessary.
//
// We do keep track of hashes we have already seen to reduce load from the node.

let mut seen_transactions: HashSet<H256> = Default::default();
while self.current_block.borrow().number <= deadline {
let mut hashes = self
.database
.recent_settlement_tx_hashes(start..deadline + 1)
.await
.map_err(SettleError::Database)?;
hashes.retain(|hash| !seen_transactions.contains(hash));
for hash in hashes {
let Some(tx) = self
.web3
.eth()
.transaction(web3::types::TransactionId::Hash(hash))
.await
.map_err(|err| SettleError::TransactionFetch(hash, err))?
else {
continue;
};
if tx.input.0.ends_with(&id.to_be_bytes()) && tx.from == Some(submission_address) {
return Ok(Some(tx));
}
seen_transactions.insert(hash);
}
// It would be more correct to wait until just after the last event update run,
// but that is hard to synchronize.
tokio::time::sleep(self.network_block_interval.div_f32(2.)).await;
}
Ok(None)
}

/// Saves the competition data to the database
async fn save_competition(&self, competition: &Competition) -> Result<()> {
self.database.save_competition(competition).await
Expand Down Expand Up @@ -632,10 +564,6 @@ enum RevealError {

#[derive(Debug, thiserror::Error)]
enum SettleError {
#[error("unexpected database error: {0}")]
Database(anyhow::Error),
#[error("error fetching transaction receipts for {0:?}: {1}")]
TransactionFetch(H256, web3::Error),
#[error(transparent)]
Failure(anyhow::Error),
}
Expand Down Expand Up @@ -732,8 +660,6 @@ impl Metrics {

fn settle_err(driver: &Driver, err: &SettleError) {
let label = match err {
SettleError::Database(_) => "internal_error",
SettleError::TransactionFetch(..) => "tx_error",
SettleError::Failure(_) => "error",
};
Self::get()
Expand Down
2 changes: 1 addition & 1 deletion crates/driver/src/boundary/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl Mempool {
self.submitted_transactions.clone(),
web3.clone(),
&web3,
)?;
);
let receipt = submitter
.submit(
settlement.boundary.inner,
Expand Down
40 changes: 24 additions & 16 deletions crates/driver/src/domain/competition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
self::solution::settlement,
super::Mempools,
crate::{
domain::competition::solution::Settlement,
domain::{competition::solution::Settlement, eth},
infra::{
self,
blockchain::Ethereum,
Expand Down Expand Up @@ -208,21 +208,25 @@ impl Competition {
.unwrap()
.take()
.ok_or(Error::SolutionNotAvailable)?;
self.mempools.execute(&self.solver, &settlement);
Ok(Settled {
internalized_calldata: settlement
.calldata(
self.eth.contracts().settlement(),
settlement::Internalization::Enable,
)
.into(),
uninternalized_calldata: settlement
.calldata(
self.eth.contracts().settlement(),
settlement::Internalization::Disable,
)
.into(),
})

match self.mempools.execute(&self.solver, &settlement).await {
Err(_) => Err(Error::SubmissionError),
Ok(tx_hash) => Ok(Settled {
internalized_calldata: settlement
.calldata(
self.eth.contracts().settlement(),
settlement::Internalization::Enable,
)
.into(),
uninternalized_calldata: settlement
.calldata(
self.eth.contracts().settlement(),
settlement::Internalization::Disable,
)
.into(),
tx_hash,
}),
}
}

/// The ID of the auction being competed on.
Expand Down Expand Up @@ -264,6 +268,8 @@ pub struct Settled {
/// can manually enforce certain rules which can not be enforced
/// automatically.
pub uninternalized_calldata: Bytes<Vec<u8>>,
/// The transaction hash in which the solution was submitted.
pub tx_hash: eth::TxId,
}

#[derive(Debug, thiserror::Error)]
Expand All @@ -277,4 +283,6 @@ pub enum Error {
DeadlineExceeded(#[from] solution::DeadlineExceeded),
#[error("solver error: {0:?}")]
Solver(#[from] solver::Error),
#[error("failed to submit the solution")]
SubmissionError,
}
30 changes: 20 additions & 10 deletions crates/driver/src/domain/mempools.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
crate::{
domain::competition::solution::Settlement,
domain::{competition::solution::Settlement, eth},
infra::{self, observe, solver::Solver},
},
futures::{future::select_ok, FutureExt},
Expand All @@ -21,17 +21,19 @@ impl Mempools {
}
}

/// Publish a settlement to the mempools. Wait until it is confirmed in the
/// background.
pub fn execute(&self, solver: &Solver, settlement: &Settlement) {
/// Publish a settlement to the mempools.
pub async fn execute(
&self,
solver: &Solver,
settlement: &Settlement,
) -> Result<eth::TxId, AllFailed> {
let auction_id = settlement.auction_id;
let solver_name = solver.name();
tokio::spawn(select_ok(self.0.iter().cloned().map(|mempool| {
let solver = solver.clone();
let settlement = settlement.clone();

let (tx_hash, _remaining_futures) = select_ok(self.0.iter().cloned().map(|mempool| {
async move {
let result = mempool.execute(&solver, settlement.clone()).await;
observe::mempool_executed(&mempool, &settlement, &result);
let result = mempool.execute(solver, settlement.clone()).await;
observe::mempool_executed(&mempool, settlement, &result);
result
}
.instrument(tracing::info_span!(
Expand All @@ -40,7 +42,11 @@ impl Mempools {
?auction_id,
))
.boxed()
})));
}))
.await
.map_err(|_| AllFailed)?;

Ok(tx_hash)
}

/// Defines if the mempools are configured in a way that guarantees that
Expand Down Expand Up @@ -70,3 +76,7 @@ pub enum RevertProtection {
Enabled,
Disabled,
}

#[derive(Debug, Error)]
#[error("none of the submission strategies successfully submitted the solution")]
pub struct AllFailed;
3 changes: 3 additions & 0 deletions crates/driver/src/infra/api/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ enum Kind {
InvalidTokens,
InvalidAmounts,
QuoteSameTokens,
FailedToSubmit,
}

#[derive(Debug, Serialize)]
Expand Down Expand Up @@ -49,6 +50,7 @@ impl From<Kind> for (hyper::StatusCode, axum::Json<Error>) {
"Invalid order specified in the auction, some orders have either a 0 remaining buy \
or sell amount"
}
Kind::FailedToSubmit => "Could not submit the solution to the blockchain",
};
(
hyper::StatusCode::BAD_REQUEST,
Expand Down Expand Up @@ -79,6 +81,7 @@ impl From<competition::Error> for (hyper::StatusCode, axum::Json<Error>) {
competition::Error::SolutionNotAvailable => Kind::SolutionNotAvailable,
competition::Error::DeadlineExceeded(_) => Kind::DeadlineExceeded,
competition::Error::Solver(_) => Kind::SolverFailed,
competition::Error::SubmissionError => Kind::FailedToSubmit,
};
error.into()
}
Expand Down
8 changes: 5 additions & 3 deletions crates/driver/src/infra/api/routes/settle/dto/settled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ use {
};

impl Settled {
pub fn new(calldata: competition::Settled) -> Self {
pub fn new(settled: competition::Settled) -> Self {
Self {
calldata: CalldataInner {
internalized: calldata.internalized_calldata.into(),
uninternalized: calldata.uninternalized_calldata.into(),
internalized: settled.internalized_calldata.into(),
uninternalized: settled.uninternalized_calldata.into(),
},
tx_hash: settled.tx_hash.0,
}
}
}
Expand All @@ -20,6 +21,7 @@ impl Settled {
#[serde(rename_all = "camelCase")]
pub struct Settled {
calldata: CalldataInner,
tx_hash: primitive_types::H256,
}

#[serde_as]
Expand Down
1 change: 1 addition & 0 deletions crates/driver/src/infra/observe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ fn competition_error(err: &competition::Error) -> &'static str {
competition::Error::Solver(solver::Error::Deserialize(_)) => "SolverDeserializeError",
competition::Error::Solver(solver::Error::RepeatedSolutionIds) => "RepeatedSolutionIds",
competition::Error::Solver(solver::Error::Dto(_)) => "SolverDtoError",
competition::Error::SubmissionError => "SubmissionError",
}
}

Expand Down
Loading
Loading