From e1423de110c31f711e1657285b7fd658b01ec5af Mon Sep 17 00:00:00 2001 From: alnoki <43892045+alnoki@users.noreply.github.com> Date: Sat, 7 Oct 2023 12:28:18 -0700 Subject: [PATCH] [ECO-624] Fix async dedupe aggregator logic --- ...a19aba42e1e7812129ce7b86faeb1c528ab3.json} | 4 +- ...07cfe1eb4b38d21ee243ec75bb1521c7e244e.json | 17 --- ...b14a33b1c8f435666f5f5f86d97acb311db42.json | 17 +++ src/rust/aggregator/src/data/user_history.rs | 125 +++++++++--------- 4 files changed, 82 insertions(+), 81 deletions(-) rename src/rust/.sqlx/{query-27c9d7ad7295379478defbe176649938af328e0389621537ca6cc55642240dc4.json => query-078b270e79d36efce4abbac15749a19aba42e1e7812129ce7b86faeb1c528ab3.json} (51%) delete mode 100644 src/rust/.sqlx/query-2edfd0fdf08dc902ae1db2a7fe607cfe1eb4b38d21ee243ec75bb1521c7e244e.json create mode 100644 src/rust/.sqlx/query-9ef7e9da5d5996ddb0baaaaa6d4b14a33b1c8f435666f5f5f86d97acb311db42.json diff --git a/src/rust/.sqlx/query-27c9d7ad7295379478defbe176649938af328e0389621537ca6cc55642240dc4.json b/src/rust/.sqlx/query-078b270e79d36efce4abbac15749a19aba42e1e7812129ce7b86faeb1c528ab3.json similarity index 51% rename from src/rust/.sqlx/query-27c9d7ad7295379478defbe176649938af328e0389621537ca6cc55642240dc4.json rename to src/rust/.sqlx/query-078b270e79d36efce4abbac15749a19aba42e1e7812129ce7b86faeb1c528ab3.json index 1465a6f47..d0479c837 100644 --- a/src/rust/.sqlx/query-27c9d7ad7295379478defbe176649938af328e0389621537ca6cc55642240dc4.json +++ b/src/rust/.sqlx/query-078b270e79d36efce4abbac15749a19aba42e1e7812129ce7b86faeb1c528ab3.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE aggregator.user_history\n SET remaining_size = $1, last_updated_at = $4\n WHERE order_id = $2 AND market_id = $3;\n ", + "query": "\n UPDATE aggregator.user_history\n SET\n last_updated_at = $4,\n order_status = 'open',\n remaining_size = $1\n WHERE order_id = $2 AND market_id = $3;\n ", "describe": { "columns": [], "parameters": { @@ -13,5 +13,5 @@ }, "nullable": [] }, - "hash": "27c9d7ad7295379478defbe176649938af328e0389621537ca6cc55642240dc4" + "hash": "078b270e79d36efce4abbac15749a19aba42e1e7812129ce7b86faeb1c528ab3" } diff --git a/src/rust/.sqlx/query-2edfd0fdf08dc902ae1db2a7fe607cfe1eb4b38d21ee243ec75bb1521c7e244e.json b/src/rust/.sqlx/query-2edfd0fdf08dc902ae1db2a7fe607cfe1eb4b38d21ee243ec75bb1521c7e244e.json deleted file mode 100644 index b799a84b6..000000000 --- a/src/rust/.sqlx/query-2edfd0fdf08dc902ae1db2a7fe607cfe1eb4b38d21ee243ec75bb1521c7e244e.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE aggregator.user_history\n SET\n remaining_size = remaining_size - $1,\n total_filled = total_filled + $1,\n order_status = CASE remaining_size - $1\n WHEN 0 THEN CASE order_status\n WHEN 'cancelled' THEN order_status\n ELSE 'closed'\n END\n ELSE CASE order_type\n WHEN 'swap' THEN 'closed'\n ELSE order_status\n END\n END,\n last_updated_at = $4\n WHERE order_id = $2 AND market_id = $3\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Numeric", - "Numeric", - "Numeric", - "Timestamptz" - ] - }, - "nullable": [] - }, - "hash": "2edfd0fdf08dc902ae1db2a7fe607cfe1eb4b38d21ee243ec75bb1521c7e244e" -} diff --git a/src/rust/.sqlx/query-9ef7e9da5d5996ddb0baaaaa6d4b14a33b1c8f435666f5f5f86d97acb311db42.json b/src/rust/.sqlx/query-9ef7e9da5d5996ddb0baaaaa6d4b14a33b1c8f435666f5f5f86d97acb311db42.json new file mode 100644 index 000000000..efe61400b --- /dev/null +++ b/src/rust/.sqlx/query-9ef7e9da5d5996ddb0baaaaa6d4b14a33b1c8f435666f5f5f86d97acb311db42.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE aggregator.user_history\n SET\n remaining_size = remaining_size - $1,\n total_filled = total_filled + $1,\n order_status = CASE order_status\n WHEN 'cancelled' THEN order_status\n ELSE CASE remaining_size - $1\n WHEN 0 THEN 'closed'\n ELSE order_status\n END\n END,\n last_updated_at = $4\n WHERE order_id = $2 AND market_id = $3\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Numeric", + "Numeric", + "Numeric", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "9ef7e9da5d5996ddb0baaaaa6d4b14a33b1c8f435666f5f5f86d97acb311db42" +} diff --git a/src/rust/aggregator/src/data/user_history.rs b/src/rust/aggregator/src/data/user_history.rs index b3bef33b3..1fe22989f 100644 --- a/src/rust/aggregator/src/data/user_history.rs +++ b/src/rust/aggregator/src/data/user_history.rs @@ -280,54 +280,35 @@ impl Data for UserHistory { .map_err(|e| DataAggregationError::ProcessingError(anyhow!(e)))?; mark_as_aggregated(&mut transaction, &x.txn_version, &x.event_idx).await?; } + // Step through fill and change events in total order. let mut fill_index = 0; let mut change_index = 0; for _ in 0..(fill_events.len() + change_events.len()) { - match (fill_events.get(fill_index), change_events.get(change_index)) { - (Some(fill), Some(change)) => { - if fill.txn_version < change.txn_version - || (fill.txn_version == change.txn_version - && fill.event_idx < change.event_idx) - { - if fill.maker_address == fill.emit_address { - aggregate_fill( - &mut transaction, - &fill.size, - &fill.maker_order_id, - &fill.market_id, - &fill.time, - ) - .await?; + let (fill_event_to_aggregate, change_event_to_aggregate) = + match (fill_events.get(fill_index), change_events.get(change_index)) { + (Some(fill), Some(change)) => { + if fill.txn_version < change.txn_version + || (fill.txn_version == change.txn_version + && fill.event_idx < change.event_idx) + { + (Some(fill), None) + } else { + (None, Some(change)) } - mark_as_aggregated(&mut transaction, &fill.txn_version, &fill.event_idx) - .await?; - fill_index = fill_index + 1; - } else { - aggregate_change( - &mut transaction, - &change.new_size, - &change.order_id, - &change.market_id, - &change.time, - &fill.txn_version, - &fill.event_idx, - ) - .await?; - mark_as_aggregated( - &mut transaction, - &change.txn_version, - &change.event_idx, - ) - .await?; - change_index = change_index + 1; } - } + (Some(fill), None) => (Some(fill), None), + (None, Some(change)) => (None, Some(change)), + (None, None) => unreachable!(), + }; + match (fill_event_to_aggregate, change_event_to_aggregate) { (Some(fill), None) => { + // Dedupe if needed by only aggregating events emitted to maker handle. if fill.maker_address == fill.emit_address { - aggregate_fill( + aggregate_fill_for_maker_and_taker( &mut transaction, &fill.size, &fill.maker_order_id, + &fill.taker_order_id, &fill.market_id, &fill.time, ) @@ -335,7 +316,7 @@ impl Data for UserHistory { } mark_as_aggregated(&mut transaction, &fill.txn_version, &fill.event_idx) .await?; - fill_index = fill_index + 1; + fill_index += 1; } (None, Some(change)) => { aggregate_change( @@ -350,9 +331,9 @@ impl Data for UserHistory { .await?; mark_as_aggregated(&mut transaction, &change.txn_version, &change.event_idx) .await?; - change_index = change_index + 1; + change_index += 1; } - (None, None) => unreachable!(), + _ => unreachable!(), }; } for x in &cancel_events { @@ -379,36 +360,52 @@ impl Data for UserHistory { } } -async fn aggregate_fill<'a>( +async fn aggregate_fill_for_maker_and_taker<'a>( tx: &mut Transaction<'a, Postgres>, size: &BigDecimal, maker_order_id: &BigDecimal, + taker_order_id: &BigDecimal, market_id: &BigDecimal, time: &DateTime, ) -> DataAggregationResult { + aggregate_fill(tx, size, maker_order_id, market_id, time).await?; + aggregate_fill(tx, size, taker_order_id, market_id, time).await?; + Ok(()) +} + +async fn aggregate_fill<'a>( + tx: &mut Transaction<'a, Postgres>, + size: &BigDecimal, + order_id: &BigDecimal, + market_id: &BigDecimal, + time: &DateTime, +) -> DataAggregationResult { + // To protect against unexpected asynchronous behavior, only update order status to closed upon + // remaining size hitting 0 if the order is not marked cancelled. Note that the cancel event and + // change event aggregators should enforce that orders are respectively marked cancelled and + // open whenever they are called, such that events can be aggregated out of order. This logic + // applies for limit orders that post only, take only, and take then post, as well as market + // orders and swaps. sqlx::query!( r#" - UPDATE aggregator.user_history - SET - remaining_size = remaining_size - $1, - total_filled = total_filled + $1, - order_status = CASE remaining_size - $1 - WHEN 0 THEN CASE order_status - WHEN 'cancelled' THEN order_status - ELSE 'closed' - END - ELSE CASE order_type - WHEN 'swap' THEN 'closed' - ELSE order_status - END - END, - last_updated_at = $4 - WHERE order_id = $2 AND market_id = $3 + UPDATE aggregator.user_history + SET + remaining_size = remaining_size - $1, + total_filled = total_filled + $1, + order_status = CASE order_status + WHEN 'cancelled' THEN order_status + ELSE CASE remaining_size - $1 + WHEN 0 THEN 'closed' + ELSE order_status + END + END, + last_updated_at = $4 + WHERE order_id = $2 AND market_id = $3 "#, size, - maker_order_id, + order_id, market_id, - time, + time ) .execute(tx as &mut PgConnection) .await @@ -441,7 +438,7 @@ async fn aggregate_change<'a>( .map_err(|e| DataAggregationError::ProcessingError(anyhow!(e)))?; let (order_type, original_size): (OrderType, BigDecimal) = (record.order_type, record.remaining_size); - // If its a limit order and needs reordering + // If it's a limit order and needs reordering if matches!(order_type, OrderType::Limit) && &original_size < new_size { let txn = txn_version .to_bigint() @@ -470,10 +467,14 @@ async fn aggregate_change<'a>( .await .map_err(|e| DataAggregationError::ProcessingError(anyhow!(e)))?; } + // Enforce that order status is set to open after a size change to guard against async issues sqlx::query!( r#" UPDATE aggregator.user_history - SET remaining_size = $1, last_updated_at = $4 + SET + last_updated_at = $4, + order_status = 'open', + remaining_size = $1 WHERE order_id = $2 AND market_id = $3; "#, new_size,