Skip to content

Commit

Permalink
[ECO-624] Fix async dedupe aggregator logic
Browse files Browse the repository at this point in the history
  • Loading branch information
alnoki committed Oct 7, 2023
1 parent 188537c commit e1423de
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 81 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

125 changes: 63 additions & 62 deletions src/rust/aggregator/src/data/user_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,62 +280,43 @@ impl Data for UserHistory {
.map_err(|e| DataAggregationError::ProcessingError(anyhow!(e)))?;
mark_as_aggregated(&mut transaction, &x.txn_version, &x.event_idx).await?;
}
// Step through fill and change events in total order.
let mut fill_index = 0;
let mut change_index = 0;
for _ in 0..(fill_events.len() + change_events.len()) {
match (fill_events.get(fill_index), change_events.get(change_index)) {
(Some(fill), Some(change)) => {
if fill.txn_version < change.txn_version
|| (fill.txn_version == change.txn_version
&& fill.event_idx < change.event_idx)
{
if fill.maker_address == fill.emit_address {
aggregate_fill(
&mut transaction,
&fill.size,
&fill.maker_order_id,
&fill.market_id,
&fill.time,
)
.await?;
let (fill_event_to_aggregate, change_event_to_aggregate) =
match (fill_events.get(fill_index), change_events.get(change_index)) {
(Some(fill), Some(change)) => {
if fill.txn_version < change.txn_version
|| (fill.txn_version == change.txn_version
&& fill.event_idx < change.event_idx)
{
(Some(fill), None)
} else {
(None, Some(change))
}
mark_as_aggregated(&mut transaction, &fill.txn_version, &fill.event_idx)
.await?;
fill_index = fill_index + 1;
} else {
aggregate_change(
&mut transaction,
&change.new_size,
&change.order_id,
&change.market_id,
&change.time,
&fill.txn_version,
&fill.event_idx,
)
.await?;
mark_as_aggregated(
&mut transaction,
&change.txn_version,
&change.event_idx,
)
.await?;
change_index = change_index + 1;
}
}
(Some(fill), None) => (Some(fill), None),
(None, Some(change)) => (None, Some(change)),
(None, None) => unreachable!(),
};
match (fill_event_to_aggregate, change_event_to_aggregate) {
(Some(fill), None) => {
// Dedupe if needed by only aggregating events emitted to maker handle.
if fill.maker_address == fill.emit_address {
aggregate_fill(
aggregate_fill_for_maker_and_taker(
&mut transaction,
&fill.size,
&fill.maker_order_id,
&fill.taker_order_id,
&fill.market_id,
&fill.time,
)
.await?;
}
mark_as_aggregated(&mut transaction, &fill.txn_version, &fill.event_idx)
.await?;
fill_index = fill_index + 1;
fill_index += 1;
}
(None, Some(change)) => {
aggregate_change(
Expand All @@ -350,9 +331,9 @@ impl Data for UserHistory {
.await?;
mark_as_aggregated(&mut transaction, &change.txn_version, &change.event_idx)
.await?;
change_index = change_index + 1;
change_index += 1;
}
(None, None) => unreachable!(),
_ => unreachable!(),
};
}
for x in &cancel_events {
Expand All @@ -379,36 +360,52 @@ impl Data for UserHistory {
}
}

async fn aggregate_fill<'a>(
async fn aggregate_fill_for_maker_and_taker<'a>(
tx: &mut Transaction<'a, Postgres>,
size: &BigDecimal,
maker_order_id: &BigDecimal,
taker_order_id: &BigDecimal,
market_id: &BigDecimal,
time: &DateTime<Utc>,
) -> DataAggregationResult {
aggregate_fill(tx, size, maker_order_id, market_id, time).await?;
aggregate_fill(tx, size, taker_order_id, market_id, time).await?;
Ok(())
}

async fn aggregate_fill<'a>(
tx: &mut Transaction<'a, Postgres>,
size: &BigDecimal,
order_id: &BigDecimal,
market_id: &BigDecimal,
time: &DateTime<Utc>,
) -> DataAggregationResult {
// To protect against unexpected asynchronous behavior, only update order status to closed upon
// remaining size hitting 0 if the order is not marked cancelled. Note that the cancel event and
// change event aggregators should enforce that orders are respectively marked cancelled and
// open whenever they are called, such that events can be aggregated out of order. This logic
// applies for limit orders that post only, take only, and take then post, as well as market
// orders and swaps.
sqlx::query!(
r#"
UPDATE aggregator.user_history
SET
remaining_size = remaining_size - $1,
total_filled = total_filled + $1,
order_status = CASE remaining_size - $1
WHEN 0 THEN CASE order_status
WHEN 'cancelled' THEN order_status
ELSE 'closed'
END
ELSE CASE order_type
WHEN 'swap' THEN 'closed'
ELSE order_status
END
END,
last_updated_at = $4
WHERE order_id = $2 AND market_id = $3
UPDATE aggregator.user_history
SET
remaining_size = remaining_size - $1,
total_filled = total_filled + $1,
order_status = CASE order_status
WHEN 'cancelled' THEN order_status
ELSE CASE remaining_size - $1
WHEN 0 THEN 'closed'
ELSE order_status
END
END,
last_updated_at = $4
WHERE order_id = $2 AND market_id = $3
"#,
size,
maker_order_id,
order_id,
market_id,
time,
time
)
.execute(tx as &mut PgConnection)
.await
Expand Down Expand Up @@ -441,7 +438,7 @@ async fn aggregate_change<'a>(
.map_err(|e| DataAggregationError::ProcessingError(anyhow!(e)))?;
let (order_type, original_size): (OrderType, BigDecimal) =
(record.order_type, record.remaining_size);
// If its a limit order and needs reordering
// If it's a limit order and needs reordering
if matches!(order_type, OrderType::Limit) && &original_size < new_size {
let txn = txn_version
.to_bigint()
Expand Down Expand Up @@ -470,10 +467,14 @@ async fn aggregate_change<'a>(
.await
.map_err(|e| DataAggregationError::ProcessingError(anyhow!(e)))?;
}
// Enforce that order status is set to open after a size change to guard against async issues
sqlx::query!(
r#"
UPDATE aggregator.user_history
SET remaining_size = $1, last_updated_at = $4
SET
last_updated_at = $4,
order_status = 'open',
remaining_size = $1
WHERE order_id = $2 AND market_id = $3;
"#,
new_size,
Expand Down

0 comments on commit e1423de

Please sign in to comment.