Skip to content

Commit

Permalink
Merge pull request #525 from econia-labs/ECO-624
Browse files Browse the repository at this point in the history
[ECO-624] Fix async dedupe aggregator logic
  • Loading branch information
CRBl69 authored Oct 8, 2023
2 parents 8494adc + 54ad20f commit dfd6a34
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 82 deletions.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

124 changes: 62 additions & 62 deletions src/rust/aggregator/src/data/user_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ impl Data for UserHistory {
Some(std::time::Duration::from_secs(5))
}

/// All database interactions are handled in a single atomic transaction. Processor insertions
/// are also handled in a single atomic transaction for each batch of transactions, such that
/// user history aggregation logic is effectively serialized across historical chain state.
async fn process_and_save_internal(&mut self) -> DataAggregationResult {
let mut transaction = self
.pool
Expand Down Expand Up @@ -283,62 +286,43 @@ impl Data for UserHistory {
.map_err(|e| DataAggregationError::ProcessingError(anyhow!(e)))?;
mark_as_aggregated(&mut transaction, &x.txn_version, &x.event_idx).await?;
}
// Step through fill and change events in total order.
let mut fill_index = 0;
let mut change_index = 0;
for _ in 0..(fill_events.len() + change_events.len()) {
match (fill_events.get(fill_index), change_events.get(change_index)) {
(Some(fill), Some(change)) => {
if fill.txn_version < change.txn_version
|| (fill.txn_version == change.txn_version
&& fill.event_idx < change.event_idx)
{
if fill.maker_address == fill.emit_address {
aggregate_fill(
&mut transaction,
&fill.size,
&fill.maker_order_id,
&fill.market_id,
&fill.time,
)
.await?;
let (fill_event_to_aggregate, change_event_to_aggregate) =
match (fill_events.get(fill_index), change_events.get(change_index)) {
(Some(fill), Some(change)) => {
if fill.txn_version < change.txn_version
|| (fill.txn_version == change.txn_version
&& fill.event_idx < change.event_idx)
{
(Some(fill), None)
} else {
(None, Some(change))
}
mark_as_aggregated(&mut transaction, &fill.txn_version, &fill.event_idx)
.await?;
fill_index = fill_index + 1;
} else {
aggregate_change(
&mut transaction,
&change.new_size,
&change.order_id,
&change.market_id,
&change.time,
&fill.txn_version,
&fill.event_idx,
)
.await?;
mark_as_aggregated(
&mut transaction,
&change.txn_version,
&change.event_idx,
)
.await?;
change_index = change_index + 1;
}
}
(Some(fill), None) => (Some(fill), None),
(None, Some(change)) => (None, Some(change)),
(None, None) => unreachable!(),
};
match (fill_event_to_aggregate, change_event_to_aggregate) {
(Some(fill), None) => {
// Dedupe if needed by only aggregating events emitted to maker handle.
if fill.maker_address == fill.emit_address {
aggregate_fill(
aggregate_fill_for_maker_and_taker(
&mut transaction,
&fill.size,
&fill.maker_order_id,
&fill.taker_order_id,
&fill.market_id,
&fill.time,
)
.await?;
}
mark_as_aggregated(&mut transaction, &fill.txn_version, &fill.event_idx)
.await?;
fill_index = fill_index + 1;
fill_index += 1;
}
(None, Some(change)) => {
aggregate_change(
Expand All @@ -353,9 +337,9 @@ impl Data for UserHistory {
.await?;
mark_as_aggregated(&mut transaction, &change.txn_version, &change.event_idx)
.await?;
change_index = change_index + 1;
change_index += 1;
}
(None, None) => unreachable!(),
_ => unreachable!(),
};
}
for x in &cancel_events {
Expand All @@ -382,36 +366,50 @@ impl Data for UserHistory {
}
}

async fn aggregate_fill<'a>(
async fn aggregate_fill_for_maker_and_taker<'a>(
tx: &mut Transaction<'a, Postgres>,
size: &BigDecimal,
maker_order_id: &BigDecimal,
taker_order_id: &BigDecimal,
market_id: &BigDecimal,
time: &DateTime<Utc>,
) -> DataAggregationResult {
aggregate_fill(tx, size, maker_order_id, market_id, time).await?;
aggregate_fill(tx, size, taker_order_id, market_id, time).await?;
Ok(())
}

async fn aggregate_fill<'a>(
tx: &mut Transaction<'a, Postgres>,
size: &BigDecimal,
order_id: &BigDecimal,
market_id: &BigDecimal,
time: &DateTime<Utc>,
) -> DataAggregationResult {
// Only limit orders can remain open after a transaction during which they are filled against,
// so flag market orders and swaps as closed by default: if they end up being cancelled instead
// of closed, the cancel event emitted during the same transaction (aggregated after fills) will
// clean up the order status to cancelled.
sqlx::query!(
r#"
UPDATE aggregator.user_history
SET
remaining_size = remaining_size - $1,
total_filled = total_filled + $1,
order_status = CASE remaining_size - $1
WHEN 0 THEN CASE order_status
WHEN 'cancelled' THEN order_status
ELSE 'closed'
END
ELSE CASE order_type
WHEN 'swap' THEN 'closed'
ELSE order_status
END
END,
last_updated_at = $4
WHERE order_id = $2 AND market_id = $3
UPDATE aggregator.user_history
SET
remaining_size = remaining_size - $1,
total_filled = total_filled + $1,
order_status = CASE order_type
WHEN 'limit' THEN CASE remaining_size - $1
WHEN 0 THEN 'closed'
ELSE order_status
END
ELSE 'closed'
END,
last_updated_at = $4
WHERE order_id = $2 AND market_id = $3
"#,
size,
maker_order_id,
order_id,
market_id,
time,
time
)
.execute(tx as &mut PgConnection)
.await
Expand Down Expand Up @@ -444,7 +442,7 @@ async fn aggregate_change<'a>(
.map_err(|e| DataAggregationError::ProcessingError(anyhow!(e)))?;
let (order_type, original_size): (OrderType, BigDecimal) =
(record.order_type, record.remaining_size);
// If its a limit order and needs reordering
// If it's a limit order and needs reordering
if matches!(order_type, OrderType::Limit) && &original_size < new_size {
let txn = txn_version
.to_bigint()
Expand Down Expand Up @@ -476,7 +474,9 @@ async fn aggregate_change<'a>(
sqlx::query!(
r#"
UPDATE aggregator.user_history
SET remaining_size = $1, last_updated_at = $4
SET
last_updated_at = $4,
remaining_size = $1
WHERE order_id = $2 AND market_id = $3;
"#,
new_size,
Expand Down

0 comments on commit dfd6a34

Please sign in to comment.