Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ECO-624] Fix async dedupe aggregator logic #525

Merged
merged 3 commits into from
Oct 8, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

125 changes: 63 additions & 62 deletions src/rust/aggregator/src/data/user_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,62 +280,43 @@ impl Data for UserHistory {
.map_err(|e| DataAggregationError::ProcessingError(anyhow!(e)))?;
mark_as_aggregated(&mut transaction, &x.txn_version, &x.event_idx).await?;
}
// Step through fill and change events in total order.
let mut fill_index = 0;
let mut change_index = 0;
for _ in 0..(fill_events.len() + change_events.len()) {
match (fill_events.get(fill_index), change_events.get(change_index)) {
(Some(fill), Some(change)) => {
if fill.txn_version < change.txn_version
|| (fill.txn_version == change.txn_version
&& fill.event_idx < change.event_idx)
{
if fill.maker_address == fill.emit_address {
aggregate_fill(
&mut transaction,
&fill.size,
&fill.maker_order_id,
&fill.market_id,
&fill.time,
)
.await?;
let (fill_event_to_aggregate, change_event_to_aggregate) =
match (fill_events.get(fill_index), change_events.get(change_index)) {
(Some(fill), Some(change)) => {
if fill.txn_version < change.txn_version
|| (fill.txn_version == change.txn_version
&& fill.event_idx < change.event_idx)
{
(Some(fill), None)
} else {
(None, Some(change))
}
mark_as_aggregated(&mut transaction, &fill.txn_version, &fill.event_idx)
.await?;
fill_index = fill_index + 1;
} else {
aggregate_change(
&mut transaction,
&change.new_size,
&change.order_id,
&change.market_id,
&change.time,
&fill.txn_version,
&fill.event_idx,
)
.await?;
mark_as_aggregated(
&mut transaction,
&change.txn_version,
&change.event_idx,
)
.await?;
change_index = change_index + 1;
}
}
(Some(fill), None) => (Some(fill), None),
(None, Some(change)) => (None, Some(change)),
(None, None) => unreachable!(),
};
match (fill_event_to_aggregate, change_event_to_aggregate) {
(Some(fill), None) => {
// Dedupe if needed by only aggregating events emitted to maker handle.
if fill.maker_address == fill.emit_address {
aggregate_fill(
aggregate_fill_for_maker_and_taker(
&mut transaction,
&fill.size,
&fill.maker_order_id,
&fill.taker_order_id,
&fill.market_id,
&fill.time,
)
.await?;
}
mark_as_aggregated(&mut transaction, &fill.txn_version, &fill.event_idx)
.await?;
fill_index = fill_index + 1;
fill_index += 1;
}
(None, Some(change)) => {
aggregate_change(
Expand All @@ -350,9 +331,9 @@ impl Data for UserHistory {
.await?;
mark_as_aggregated(&mut transaction, &change.txn_version, &change.event_idx)
.await?;
change_index = change_index + 1;
change_index += 1;
}
(None, None) => unreachable!(),
_ => unreachable!(),
};
}
for x in &cancel_events {
Expand All @@ -379,36 +360,52 @@ impl Data for UserHistory {
}
}

async fn aggregate_fill<'a>(
async fn aggregate_fill_for_maker_and_taker<'a>(
tx: &mut Transaction<'a, Postgres>,
size: &BigDecimal,
maker_order_id: &BigDecimal,
taker_order_id: &BigDecimal,
market_id: &BigDecimal,
time: &DateTime<Utc>,
) -> DataAggregationResult {
aggregate_fill(tx, size, maker_order_id, market_id, time).await?;
aggregate_fill(tx, size, taker_order_id, market_id, time).await?;
Ok(())
}

async fn aggregate_fill<'a>(
tx: &mut Transaction<'a, Postgres>,
size: &BigDecimal,
order_id: &BigDecimal,
market_id: &BigDecimal,
time: &DateTime<Utc>,
) -> DataAggregationResult {
// To protect against unexpected asynchronous behavior, only update order status to closed upon
// remaining size hitting 0 if the order is not marked cancelled. Note that the cancel event and
// change event aggregators should enforce that orders are respectively marked cancelled and
// open whenever they are called, such that events can be aggregated out of order. This logic
// applies for limit orders that post only, take only, and take then post, as well as market
// orders and swaps.
sqlx::query!(
r#"
UPDATE aggregator.user_history
SET
remaining_size = remaining_size - $1,
total_filled = total_filled + $1,
order_status = CASE remaining_size - $1
WHEN 0 THEN CASE order_status
WHEN 'cancelled' THEN order_status
ELSE 'closed'
END
ELSE CASE order_type
WHEN 'swap' THEN 'closed'
ELSE order_status
END
END,
last_updated_at = $4
WHERE order_id = $2 AND market_id = $3
UPDATE aggregator.user_history
SET
remaining_size = remaining_size - $1,
total_filled = total_filled + $1,
order_status = CASE order_status
WHEN 'cancelled' THEN order_status
ELSE CASE remaining_size - $1
WHEN 0 THEN 'closed'
ELSE order_status
END
END,
last_updated_at = $4
WHERE order_id = $2 AND market_id = $3
"#,
size,
maker_order_id,
order_id,
market_id,
time,
time
)
.execute(tx as &mut PgConnection)
.await
Expand Down Expand Up @@ -441,7 +438,7 @@ async fn aggregate_change<'a>(
.map_err(|e| DataAggregationError::ProcessingError(anyhow!(e)))?;
let (order_type, original_size): (OrderType, BigDecimal) =
(record.order_type, record.remaining_size);
// If its a limit order and needs reordering
// If it's a limit order and needs reordering
if matches!(order_type, OrderType::Limit) && &original_size < new_size {
let txn = txn_version
.to_bigint()
Expand Down Expand Up @@ -470,10 +467,14 @@ async fn aggregate_change<'a>(
.await
.map_err(|e| DataAggregationError::ProcessingError(anyhow!(e)))?;
}
// Enforce that order status is set to open after a size change to guard against async issues
sqlx::query!(
r#"
UPDATE aggregator.user_history
SET remaining_size = $1, last_updated_at = $4
SET
last_updated_at = $4,
order_status = 'open',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would you set order status to open ? It seems not very useful since an order cannot be re-opened. It can also introduce errors if cancel and change events are not emitted or processed in error. I think there is no downside to not setting the status to open so why set it to open ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider:

  1. User opens order with remaining size 10
  2. User changes remaining size to 15
  3. 10 fills

If the events are aggregated out of order, (e.g. event 1, then 3, then 2), the order will be marked closed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 0eb778a

remaining_size = $1
WHERE order_id = $2 AND market_id = $3;
"#,
new_size,
Expand Down