Skip to content

Commit

Permalink
Update guard conditions for effectively serial
Browse files Browse the repository at this point in the history
  • Loading branch information
alnoki committed Oct 8, 2023
1 parent e1423de commit 0eb778a
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 18 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 6 additions & 14 deletions src/rust/aggregator/src/data/user_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ impl Data for UserHistory {
Some(std::time::Duration::from_secs(5))
}

/// All database interactions are handled in a single atomic transaction. Processor insertions
/// are also handled in a single atomic transaction for each batch of transactions, such that
/// user history aggregation logic is effectively serialized across historical chain state.
async fn process_and_save_internal(&mut self) -> DataAggregationResult {
let mut transaction = self
.pool
Expand Down Expand Up @@ -380,24 +383,15 @@ async fn aggregate_fill<'a>(
market_id: &BigDecimal,
time: &DateTime<Utc>,
) -> DataAggregationResult {
// To protect against unexpected asynchronous behavior, only update order status to closed upon
// remaining size hitting 0 if the order is not marked cancelled. Note that the cancel event and
// change event aggregators should enforce that orders are respectively marked cancelled and
// open whenever they are called, such that events can be aggregated out of order. This logic
// applies for limit orders that post only, take only, and take then post, as well as market
// orders and swaps.
sqlx::query!(
r#"
UPDATE aggregator.user_history
SET
remaining_size = remaining_size - $1,
total_filled = total_filled + $1,
order_status = CASE order_status
WHEN 'cancelled' THEN order_status
ELSE CASE remaining_size - $1
WHEN 0 THEN 'closed'
ELSE order_status
END
order_status = CASE remaining_size - $1
WHEN 0 THEN 'closed'
ELSE order_status
END,
last_updated_at = $4
WHERE order_id = $2 AND market_id = $3
Expand Down Expand Up @@ -467,13 +461,11 @@ async fn aggregate_change<'a>(
.await
.map_err(|e| DataAggregationError::ProcessingError(anyhow!(e)))?;
}
// Enforce that order status is set to open after a size change to guard against async issues
sqlx::query!(
r#"
UPDATE aggregator.user_history
SET
last_updated_at = $4,
order_status = 'open',
remaining_size = $1
WHERE order_id = $2 AND market_id = $3;
"#,
Expand Down

0 comments on commit 0eb778a

Please sign in to comment.